shape_runtime/plugins/data_source/
mod.rs1mod providers;
6mod query;
7mod schema;
8
9pub use schema::{ParsedOutputField, ParsedOutputSchema, ParsedQueryParam, ParsedQuerySchema};
11
12use std::ffi::c_void;
13
14use serde_json::Value;
15use shape_abi_v1::DataSourceVTable;
16use shape_ast::error::{Result, ShapeError};
17
18pub struct PluginDataSource {
23 name: String,
25 vtable: &'static DataSourceVTable,
27 instance: *mut c_void,
29 query_schema: ParsedQuerySchema,
31 output_schema: ParsedOutputSchema,
33}
34
35impl PluginDataSource {
36 pub fn new(name: String, vtable: &'static DataSourceVTable, config: &Value) -> Result<Self> {
43 let config_bytes = rmp_serde::to_vec(config).map_err(|e| ShapeError::RuntimeError {
45 message: format!("Failed to serialize plugin config: {}", e),
46 location: None,
47 })?;
48
49 let init_fn = vtable.init.ok_or_else(|| ShapeError::RuntimeError {
51 message: format!("Plugin '{}' has no init function", name),
52 location: None,
53 })?;
54
55 let instance = unsafe { init_fn(config_bytes.as_ptr(), config_bytes.len()) };
56 if instance.is_null() {
57 return Err(ShapeError::RuntimeError {
58 message: format!("Plugin '{}' init returned null", name),
59 location: None,
60 });
61 }
62
63 let query_schema = query::parse_query_schema_from_vtable(vtable, instance)?;
65
66 let output_schema = query::parse_output_schema_from_vtable(vtable, instance)?;
68
69 Ok(Self {
70 name,
71 vtable,
72 instance,
73 query_schema,
74 output_schema,
75 })
76 }
77
78 pub fn get_query_schema(&self) -> &ParsedQuerySchema {
80 &self.query_schema
81 }
82
83 pub fn get_output_schema(&self) -> &ParsedOutputSchema {
85 &self.output_schema
86 }
87
88 pub fn name(&self) -> &str {
90 &self.name
91 }
92
93 pub fn validate_query(&self, query: &Value) -> Result<()> {
95 query::validate_query(self.vtable, self.instance, query)
96 }
97
98 pub fn load(&self, query: &Value) -> Result<Value> {
100 providers::load(self.vtable, self.instance, &self.name, query)
101 }
102
103 pub fn subscribe<F>(&self, query: &Value, callback: F) -> Result<u64>
112 where
113 F: Fn(Value) + Send + Sync + 'static,
114 {
115 providers::subscribe(self.vtable, self.instance, &self.name, query, callback)
116 }
117
118 pub fn unsubscribe(&self, subscription_id: u64) -> Result<()> {
120 providers::unsubscribe(self.vtable, self.instance, &self.name, subscription_id)
121 }
122
123 pub fn get_source_schema(&self, source_id: &str) -> Result<shape_abi_v1::PluginSchema> {
134 providers::get_source_schema(self.vtable, self.instance, &self.name, source_id)
135 }
136
137 pub fn supports_schema_discovery(&self) -> bool {
139 self.vtable.get_source_schema.is_some()
140 }
141
142 pub fn supports_binary(&self) -> bool {
144 self.vtable.load_binary.is_some()
145 }
146
147 pub fn load_binary(
149 &self,
150 query: &Value,
151 granularity: crate::progress::ProgressGranularity,
152 progress_handle: Option<&crate::progress::ProgressHandle>,
153 ) -> Result<shape_value::ValueWord> {
154 providers::load_binary(
155 self.vtable,
156 self.instance,
157 &self.name,
158 query,
159 granularity,
160 progress_handle,
161 )
162 }
163}
164
165impl Drop for PluginDataSource {
166 fn drop(&mut self) {
167 if let Some(drop_fn) = self.vtable.drop {
168 unsafe { drop_fn(self.instance) };
169 }
170 }
171}
172
173unsafe impl Send for PluginDataSource {}
176unsafe impl Sync for PluginDataSource {}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use shape_abi_v1::ParamType;
182
183 #[test]
184 fn test_parsed_query_schema_default() {
185 let schema = ParsedQuerySchema {
186 params: Vec::new(),
187 example_query: None,
188 };
189 assert!(schema.params.is_empty());
190 assert!(schema.example_query.is_none());
191 }
192
193 #[test]
194 fn test_parsed_query_param_creation() {
195 let param = ParsedQueryParam {
196 name: "symbol".to_string(),
197 description: "Trading symbol".to_string(),
198 param_type: ParamType::String,
199 required: true,
200 default_value: None,
201 allowed_values: None,
202 nested_schema: None,
203 };
204
205 assert_eq!(param.name, "symbol");
206 assert!(param.required);
207 assert!(matches!(param.param_type, ParamType::String));
208 }
209
210 #[test]
211 fn test_parsed_query_param_with_defaults() {
212 let param = ParsedQueryParam {
213 name: "timeframe".to_string(),
214 description: "Data timeframe".to_string(),
215 param_type: ParamType::String,
216 required: false,
217 default_value: Some(serde_json::json!("1d")),
218 allowed_values: Some(vec![
219 serde_json::json!("1m"),
220 serde_json::json!("1h"),
221 serde_json::json!("1d"),
222 ]),
223 nested_schema: None,
224 };
225
226 assert!(!param.required);
227 assert!(param.default_value.is_some());
228 assert_eq!(param.allowed_values.as_ref().unwrap().len(), 3);
229 }
230
231 #[test]
232 fn test_parsed_query_param_with_nested_schema() {
233 let nested = ParsedQuerySchema {
234 params: vec![ParsedQueryParam {
235 name: "field".to_string(),
236 description: "Nested field".to_string(),
237 param_type: ParamType::String,
238 required: true,
239 default_value: None,
240 allowed_values: None,
241 nested_schema: None,
242 }],
243 example_query: None,
244 };
245
246 let param = ParsedQueryParam {
247 name: "filter".to_string(),
248 description: "Filter object".to_string(),
249 param_type: ParamType::Object,
250 required: false,
251 default_value: None,
252 allowed_values: None,
253 nested_schema: Some(Box::new(nested)),
254 };
255
256 assert!(param.nested_schema.is_some());
257 assert_eq!(param.nested_schema.as_ref().unwrap().params.len(), 1);
258 }
259
260 #[test]
261 fn test_parsed_output_schema() {
262 let schema = ParsedOutputSchema {
263 fields: vec![
264 ParsedOutputField {
265 name: "timestamp".to_string(),
266 field_type: ParamType::String,
267 description: "Unix timestamp".to_string(),
268 },
269 ParsedOutputField {
270 name: "value".to_string(),
271 field_type: ParamType::Number,
272 description: "Measurement value".to_string(),
273 },
274 ],
275 };
276
277 assert_eq!(schema.fields.len(), 2);
278 assert_eq!(schema.fields[0].name, "timestamp");
279 assert_eq!(schema.fields[1].name, "value");
280 }
281
282 #[test]
283 fn test_parsed_query_schema_with_params() {
284 let schema = ParsedQuerySchema {
285 params: vec![
286 ParsedQueryParam {
287 name: "symbol".to_string(),
288 description: "Symbol".to_string(),
289 param_type: ParamType::String,
290 required: true,
291 default_value: None,
292 allowed_values: None,
293 nested_schema: None,
294 },
295 ParsedQueryParam {
296 name: "start_date".to_string(),
297 description: "Start date".to_string(),
298 param_type: ParamType::String,
299 required: false,
300 default_value: None,
301 allowed_values: None,
302 nested_schema: None,
303 },
304 ],
305 example_query: Some(serde_json::json!({"symbol": "AAPL"})),
306 };
307
308 assert_eq!(schema.params.len(), 2);
309 assert!(schema.example_query.is_some());
310
311 assert!(schema.params[0].required);
313 assert!(!schema.params[1].required);
314 }
315
316 #[test]
317 fn test_param_type_variants() {
318 let string_type = ParamType::String;
320 let number_type = ParamType::Number;
321 let bool_type = ParamType::Bool;
322 let string_array_type = ParamType::StringArray;
323 let number_array_type = ParamType::NumberArray;
324 let object_type = ParamType::Object;
325
326 assert!(matches!(string_type, ParamType::String));
327 assert!(matches!(number_type, ParamType::Number));
328 assert!(matches!(bool_type, ParamType::Bool));
329 assert!(matches!(string_array_type, ParamType::StringArray));
330 assert!(matches!(number_array_type, ParamType::NumberArray));
331 assert!(matches!(object_type, ParamType::Object));
332 }
333}