Skip to main content

shape_runtime/plugins/data_source/
mod.rs

1//! Plugin Data Source Wrapper
2//!
3//! Provides a Rust-friendly wrapper around the C ABI data source interface.
4
5mod providers;
6mod query;
7mod schema;
8
9// Re-export schema types
10pub use schema::{ParsedOutputField, ParsedOutputSchema, ParsedQueryParam, ParsedQuerySchema};
11
12use std::ffi::c_void;
13
14use serde_json::Value;
15use shape_abi_v1::DataSourceVTable;
16use shape_ast::error::{Result, ShapeError};
17
18/// Wrapper around a plugin data source
19///
20/// Provides Rust-friendly access to plugin functionality including
21/// self-describing schema for LSP autocomplete and validation.
22pub struct PluginDataSource {
23    /// Plugin name
24    name: String,
25    /// Vtable pointer (static lifetime)
26    vtable: &'static DataSourceVTable,
27    /// Instance pointer (owned by this struct)
28    instance: *mut c_void,
29    /// Cached query schema
30    query_schema: ParsedQuerySchema,
31    /// Cached output schema
32    output_schema: ParsedOutputSchema,
33}
34
35impl PluginDataSource {
36    /// Create a new plugin data source instance
37    ///
38    /// # Arguments
39    /// * `name` - Plugin name
40    /// * `vtable` - Data source vtable (must be static)
41    /// * `config` - Configuration value (will be MessagePack encoded)
42    pub fn new(name: String, vtable: &'static DataSourceVTable, config: &Value) -> Result<Self> {
43        // Serialize config to MessagePack
44        let config_bytes = rmp_serde::to_vec(config).map_err(|e| ShapeError::RuntimeError {
45            message: format!("Failed to serialize plugin config: {}", e),
46            location: None,
47        })?;
48
49        // Initialize the plugin instance
50        let init_fn = vtable.init.ok_or_else(|| ShapeError::RuntimeError {
51            message: format!("Plugin '{}' has no init function", name),
52            location: None,
53        })?;
54
55        let instance = unsafe { init_fn(config_bytes.as_ptr(), config_bytes.len()) };
56        if instance.is_null() {
57            return Err(ShapeError::RuntimeError {
58                message: format!("Plugin '{}' init returned null", name),
59                location: None,
60            });
61        }
62
63        // Parse query schema
64        let query_schema = query::parse_query_schema_from_vtable(vtable, instance)?;
65
66        // Parse output schema
67        let output_schema = query::parse_output_schema_from_vtable(vtable, instance)?;
68
69        Ok(Self {
70            name,
71            vtable,
72            instance,
73            query_schema,
74            output_schema,
75        })
76    }
77
78    /// Get the query schema for LSP autocomplete and validation
79    pub fn get_query_schema(&self) -> &ParsedQuerySchema {
80        &self.query_schema
81    }
82
83    /// Get the output schema for LSP autocomplete
84    pub fn get_output_schema(&self) -> &ParsedOutputSchema {
85        &self.output_schema
86    }
87
88    /// Get the plugin name
89    pub fn name(&self) -> &str {
90        &self.name
91    }
92
93    /// Validate a query before execution
94    pub fn validate_query(&self, query: &Value) -> Result<()> {
95        query::validate_query(self.vtable, self.instance, query)
96    }
97
98    /// Load historical data
99    pub fn load(&self, query: &Value) -> Result<Value> {
100        providers::load(self.vtable, self.instance, &self.name, query)
101    }
102
103    /// Subscribe to streaming data
104    ///
105    /// # Arguments
106    /// * `query` - Query parameters
107    /// * `callback` - Called for each data point
108    ///
109    /// # Returns
110    /// Subscription ID (use to unsubscribe)
111    pub fn subscribe<F>(&self, query: &Value, callback: F) -> Result<u64>
112    where
113        F: Fn(Value) + Send + Sync + 'static,
114    {
115        providers::subscribe(self.vtable, self.instance, &self.name, query, callback)
116    }
117
118    /// Unsubscribe from streaming data
119    pub fn unsubscribe(&self, subscription_id: u64) -> Result<()> {
120        providers::unsubscribe(self.vtable, self.instance, &self.name, subscription_id)
121    }
122
123    /// Query the schema for a specific data source.
124    ///
125    /// This enables runtime schema discovery - the plugin returns what columns
126    /// are available for a given source ID.
127    ///
128    /// # Arguments
129    /// * `source_id` - The source identifier (e.g., symbol, table name, device ID)
130    ///
131    /// # Returns
132    /// The plugin schema with column information
133    pub fn get_source_schema(&self, source_id: &str) -> Result<shape_abi_v1::PluginSchema> {
134        providers::get_source_schema(self.vtable, self.instance, &self.name, source_id)
135    }
136
137    /// Check if this plugin supports schema discovery
138    pub fn supports_schema_discovery(&self) -> bool {
139        self.vtable.get_source_schema.is_some()
140    }
141
142    /// Check if this plugin supports binary loading (ABI v2)
143    pub fn supports_binary(&self) -> bool {
144        self.vtable.load_binary.is_some()
145    }
146
147    /// Load historical data in binary columnar format (ABI v2)
148    pub fn load_binary(
149        &self,
150        query: &Value,
151        granularity: crate::progress::ProgressGranularity,
152        progress_handle: Option<&crate::progress::ProgressHandle>,
153    ) -> Result<shape_value::ValueWord> {
154        providers::load_binary(
155            self.vtable,
156            self.instance,
157            &self.name,
158            query,
159            granularity,
160            progress_handle,
161        )
162    }
163}
164
165impl Drop for PluginDataSource {
166    fn drop(&mut self) {
167        if let Some(drop_fn) = self.vtable.drop {
168            unsafe { drop_fn(self.instance) };
169        }
170    }
171}
172
173// SAFETY: The instance pointer is only accessed through the vtable functions
174// which are required to be thread-safe by the plugin contract.
175unsafe impl Send for PluginDataSource {}
176unsafe impl Sync for PluginDataSource {}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use shape_abi_v1::ParamType;
182
183    #[test]
184    fn test_parsed_query_schema_default() {
185        let schema = ParsedQuerySchema {
186            params: Vec::new(),
187            example_query: None,
188        };
189        assert!(schema.params.is_empty());
190        assert!(schema.example_query.is_none());
191    }
192
193    #[test]
194    fn test_parsed_query_param_creation() {
195        let param = ParsedQueryParam {
196            name: "symbol".to_string(),
197            description: "Trading symbol".to_string(),
198            param_type: ParamType::String,
199            required: true,
200            default_value: None,
201            allowed_values: None,
202            nested_schema: None,
203        };
204
205        assert_eq!(param.name, "symbol");
206        assert!(param.required);
207        assert!(matches!(param.param_type, ParamType::String));
208    }
209
210    #[test]
211    fn test_parsed_query_param_with_defaults() {
212        let param = ParsedQueryParam {
213            name: "timeframe".to_string(),
214            description: "Data timeframe".to_string(),
215            param_type: ParamType::String,
216            required: false,
217            default_value: Some(serde_json::json!("1d")),
218            allowed_values: Some(vec![
219                serde_json::json!("1m"),
220                serde_json::json!("1h"),
221                serde_json::json!("1d"),
222            ]),
223            nested_schema: None,
224        };
225
226        assert!(!param.required);
227        assert!(param.default_value.is_some());
228        assert_eq!(param.allowed_values.as_ref().unwrap().len(), 3);
229    }
230
231    #[test]
232    fn test_parsed_query_param_with_nested_schema() {
233        let nested = ParsedQuerySchema {
234            params: vec![ParsedQueryParam {
235                name: "field".to_string(),
236                description: "Nested field".to_string(),
237                param_type: ParamType::String,
238                required: true,
239                default_value: None,
240                allowed_values: None,
241                nested_schema: None,
242            }],
243            example_query: None,
244        };
245
246        let param = ParsedQueryParam {
247            name: "filter".to_string(),
248            description: "Filter object".to_string(),
249            param_type: ParamType::Object,
250            required: false,
251            default_value: None,
252            allowed_values: None,
253            nested_schema: Some(Box::new(nested)),
254        };
255
256        assert!(param.nested_schema.is_some());
257        assert_eq!(param.nested_schema.as_ref().unwrap().params.len(), 1);
258    }
259
260    #[test]
261    fn test_parsed_output_schema() {
262        let schema = ParsedOutputSchema {
263            fields: vec![
264                ParsedOutputField {
265                    name: "timestamp".to_string(),
266                    field_type: ParamType::String,
267                    description: "Unix timestamp".to_string(),
268                },
269                ParsedOutputField {
270                    name: "value".to_string(),
271                    field_type: ParamType::Number,
272                    description: "Measurement value".to_string(),
273                },
274            ],
275        };
276
277        assert_eq!(schema.fields.len(), 2);
278        assert_eq!(schema.fields[0].name, "timestamp");
279        assert_eq!(schema.fields[1].name, "value");
280    }
281
282    #[test]
283    fn test_parsed_query_schema_with_params() {
284        let schema = ParsedQuerySchema {
285            params: vec![
286                ParsedQueryParam {
287                    name: "symbol".to_string(),
288                    description: "Symbol".to_string(),
289                    param_type: ParamType::String,
290                    required: true,
291                    default_value: None,
292                    allowed_values: None,
293                    nested_schema: None,
294                },
295                ParsedQueryParam {
296                    name: "start_date".to_string(),
297                    description: "Start date".to_string(),
298                    param_type: ParamType::String,
299                    required: false,
300                    default_value: None,
301                    allowed_values: None,
302                    nested_schema: None,
303                },
304            ],
305            example_query: Some(serde_json::json!({"symbol": "AAPL"})),
306        };
307
308        assert_eq!(schema.params.len(), 2);
309        assert!(schema.example_query.is_some());
310
311        // Check first param is required
312        assert!(schema.params[0].required);
313        assert!(!schema.params[1].required);
314    }
315
316    #[test]
317    fn test_param_type_variants() {
318        // Test all ParamType variants are usable
319        let string_type = ParamType::String;
320        let number_type = ParamType::Number;
321        let bool_type = ParamType::Bool;
322        let string_array_type = ParamType::StringArray;
323        let number_array_type = ParamType::NumberArray;
324        let object_type = ParamType::Object;
325
326        assert!(matches!(string_type, ParamType::String));
327        assert!(matches!(number_type, ParamType::Number));
328        assert!(matches!(bool_type, ParamType::Bool));
329        assert!(matches!(string_array_type, ParamType::StringArray));
330        assert!(matches!(number_array_type, ParamType::NumberArray));
331        assert!(matches!(object_type, ParamType::Object));
332    }
333}