Skip to main content

datasynth_core/traits/
registry.rs

1//! Plugin registry for managing registered plugins.
2//!
3//! Thread-safe registry that stores generator, sink, and transform plugins.
4
5use std::collections::HashMap;
6use std::sync::{Arc, RwLock};
7
8use crate::error::SynthError;
9
10use super::plugin::{GeneratorPlugin, PluginInfo, PluginType, SinkPlugin, TransformPlugin};
11
12/// Type alias for sink plugin storage to reduce type complexity.
13type SinkStorage = Arc<RwLock<HashMap<String, Arc<RwLock<Box<dyn SinkPlugin>>>>>>;
14
15/// Thread-safe registry for managing plugins.
16#[derive(Clone)]
17pub struct PluginRegistry {
18    generators: Arc<RwLock<HashMap<String, Arc<dyn GeneratorPlugin>>>>,
19    sinks: SinkStorage,
20    transforms: Arc<RwLock<HashMap<String, Arc<dyn TransformPlugin>>>>,
21}
22
23impl PluginRegistry {
24    /// Create a new empty plugin registry.
25    pub fn new() -> Self {
26        Self {
27            generators: Arc::new(RwLock::new(HashMap::new())),
28            sinks: Arc::new(RwLock::new(HashMap::new())),
29            transforms: Arc::new(RwLock::new(HashMap::new())),
30        }
31    }
32
33    /// Register a generator plugin.
34    ///
35    /// Returns an error if a generator with the same name is already registered.
36    pub fn register_generator(&self, plugin: Box<dyn GeneratorPlugin>) -> Result<(), SynthError> {
37        let name = plugin.name().to_string();
38        let mut generators = self
39            .generators
40            .write()
41            .map_err(|e| SynthError::generation(format!("Failed to acquire write lock: {e}")))?;
42        if generators.contains_key(&name) {
43            return Err(SynthError::generation(format!(
44                "Generator plugin '{name}' is already registered"
45            )));
46        }
47        generators.insert(name, Arc::from(plugin));
48        Ok(())
49    }
50
51    /// Register a sink plugin.
52    ///
53    /// Returns an error if a sink with the same name is already registered.
54    pub fn register_sink(&self, plugin: Box<dyn SinkPlugin>) -> Result<(), SynthError> {
55        let name = plugin.name().to_string();
56        let mut sinks = self
57            .sinks
58            .write()
59            .map_err(|e| SynthError::generation(format!("Failed to acquire write lock: {e}")))?;
60        if sinks.contains_key(&name) {
61            return Err(SynthError::generation(format!(
62                "Sink plugin '{name}' is already registered"
63            )));
64        }
65        sinks.insert(name, Arc::new(RwLock::new(plugin)));
66        Ok(())
67    }
68
69    /// Register a transform plugin.
70    ///
71    /// Returns an error if a transform with the same name is already registered.
72    pub fn register_transform(&self, plugin: Box<dyn TransformPlugin>) -> Result<(), SynthError> {
73        let name = plugin.name().to_string();
74        let mut transforms = self
75            .transforms
76            .write()
77            .map_err(|e| SynthError::generation(format!("Failed to acquire write lock: {e}")))?;
78        if transforms.contains_key(&name) {
79            return Err(SynthError::generation(format!(
80                "Transform plugin '{name}' is already registered"
81            )));
82        }
83        transforms.insert(name, Arc::from(plugin));
84        Ok(())
85    }
86
87    /// Get a generator plugin by name.
88    pub fn get_generator(&self, name: &str) -> Option<Arc<dyn GeneratorPlugin>> {
89        self.generators
90            .read()
91            .ok()
92            .and_then(|g| g.get(name).cloned())
93    }
94
95    /// Get a transform plugin by name.
96    pub fn get_transform(&self, name: &str) -> Option<Arc<dyn TransformPlugin>> {
97        self.transforms
98            .read()
99            .ok()
100            .and_then(|t| t.get(name).cloned())
101    }
102
103    /// List all registered plugins.
104    pub fn list_plugins(&self) -> Vec<PluginInfo> {
105        let mut plugins = Vec::new();
106
107        if let Ok(generators) = self.generators.read() {
108            for gen in generators.values() {
109                plugins.push(PluginInfo {
110                    name: gen.name().to_string(),
111                    version: gen.version().to_string(),
112                    description: gen.description().to_string(),
113                    plugin_type: PluginType::Generator,
114                });
115            }
116        }
117
118        if let Ok(sinks) = self.sinks.read() {
119            for sink_lock in sinks.values() {
120                if let Ok(sink) = sink_lock.read() {
121                    plugins.push(PluginInfo {
122                        name: sink.name().to_string(),
123                        version: sink.version().to_string(),
124                        description: sink.description().to_string(),
125                        plugin_type: PluginType::Sink,
126                    });
127                }
128            }
129        }
130
131        if let Ok(transforms) = self.transforms.read() {
132            for t in transforms.values() {
133                plugins.push(PluginInfo {
134                    name: t.name().to_string(),
135                    version: t.version().to_string(),
136                    description: t.description().to_string(),
137                    plugin_type: PluginType::Transform,
138                });
139            }
140        }
141
142        plugins
143    }
144
145    /// Get the count of registered plugins.
146    pub fn plugin_count(&self) -> usize {
147        let g = self.generators.read().map(|g| g.len()).unwrap_or(0);
148        let s = self.sinks.read().map(|s| s.len()).unwrap_or(0);
149        let t = self.transforms.read().map(|t| t.len()).unwrap_or(0);
150        g + s + t
151    }
152}
153
154impl Default for PluginRegistry {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160#[cfg(test)]
161#[allow(clippy::unwrap_used)]
162mod tests {
163    use super::*;
164    use crate::error::SynthError;
165    use crate::traits::plugin::{GeneratedRecord, GenerationContext, SinkSummary};
166
167    // Test generator plugin
168    struct TestGenerator {
169        name: String,
170    }
171
172    impl TestGenerator {
173        fn new(name: &str) -> Self {
174            Self {
175                name: name.to_string(),
176            }
177        }
178    }
179
180    impl GeneratorPlugin for TestGenerator {
181        fn name(&self) -> &str {
182            &self.name
183        }
184        fn version(&self) -> &str {
185            "1.0.0"
186        }
187        fn description(&self) -> &str {
188            "Test generator"
189        }
190        fn config_schema(&self) -> Option<serde_json::Value> {
191            None
192        }
193        fn generate(
194            &self,
195            _config: &serde_json::Value,
196            _context: &GenerationContext,
197        ) -> Result<Vec<GeneratedRecord>, SynthError> {
198            Ok(vec![GeneratedRecord::new("test")])
199        }
200    }
201
202    // Test sink plugin
203    struct TestSink {
204        name: String,
205        count: usize,
206    }
207
208    impl TestSink {
209        fn new(name: &str) -> Self {
210            Self {
211                name: name.to_string(),
212                count: 0,
213            }
214        }
215    }
216
217    impl SinkPlugin for TestSink {
218        fn name(&self) -> &str {
219            &self.name
220        }
221        fn initialize(&mut self, _config: &serde_json::Value) -> Result<(), SynthError> {
222            Ok(())
223        }
224        fn write_records(&mut self, records: &[GeneratedRecord]) -> Result<usize, SynthError> {
225            self.count += records.len();
226            Ok(records.len())
227        }
228        fn finalize(&mut self) -> Result<SinkSummary, SynthError> {
229            Ok(SinkSummary::new(self.count))
230        }
231    }
232
233    // Test transform plugin
234    struct TestTransform;
235
236    impl TransformPlugin for TestTransform {
237        fn name(&self) -> &str {
238            "test_transform"
239        }
240        fn transform(
241            &self,
242            mut records: Vec<GeneratedRecord>,
243        ) -> Result<Vec<GeneratedRecord>, SynthError> {
244            for record in &mut records {
245                record
246                    .fields
247                    .insert("_transformed".to_string(), serde_json::Value::Bool(true));
248            }
249            Ok(records)
250        }
251    }
252
253    #[test]
254    fn test_register_and_retrieve_generator() {
255        let registry = PluginRegistry::new();
256        registry
257            .register_generator(Box::new(TestGenerator::new("gen1")))
258            .expect("should register");
259
260        let gen = registry.get_generator("gen1");
261        assert!(gen.is_some());
262        assert_eq!(gen.as_ref().map(|g| g.name()), Some("gen1"));
263    }
264
265    #[test]
266    fn test_register_duplicate_generator_rejected() {
267        let registry = PluginRegistry::new();
268        registry
269            .register_generator(Box::new(TestGenerator::new("gen1")))
270            .expect("first registration should succeed");
271
272        let result = registry.register_generator(Box::new(TestGenerator::new("gen1")));
273        assert!(result.is_err());
274    }
275
276    #[test]
277    fn test_register_and_retrieve_sink() {
278        let registry = PluginRegistry::new();
279        registry
280            .register_sink(Box::new(TestSink::new("sink1")))
281            .expect("should register");
282
283        let plugins = registry.list_plugins();
284        assert!(plugins.iter().any(|p| p.name == "sink1"));
285    }
286
287    #[test]
288    fn test_register_and_retrieve_transform() {
289        let registry = PluginRegistry::new();
290        registry
291            .register_transform(Box::new(TestTransform))
292            .expect("should register");
293
294        let t = registry.get_transform("test_transform");
295        assert!(t.is_some());
296    }
297
298    #[test]
299    fn test_list_all_plugins() {
300        let registry = PluginRegistry::new();
301        registry
302            .register_generator(Box::new(TestGenerator::new("gen1")))
303            .expect("register gen");
304        registry
305            .register_sink(Box::new(TestSink::new("sink1")))
306            .expect("register sink");
307        registry
308            .register_transform(Box::new(TestTransform))
309            .expect("register transform");
310
311        let plugins = registry.list_plugins();
312        assert_eq!(plugins.len(), 3);
313        assert_eq!(registry.plugin_count(), 3);
314    }
315
316    #[test]
317    fn test_get_nonexistent_plugin() {
318        let registry = PluginRegistry::new();
319        assert!(registry.get_generator("nonexistent").is_none());
320        assert!(registry.get_transform("nonexistent").is_none());
321    }
322
323    #[test]
324    fn test_empty_registry() {
325        let registry = PluginRegistry::new();
326        assert_eq!(registry.plugin_count(), 0);
327        assert!(registry.list_plugins().is_empty());
328    }
329}