nirv_engine/connectors/
connector_trait.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use crate::utils::{
4    types::{ConnectorType, ConnectorQuery, QueryResult, Schema},
5    error::NirvResult,
6};
7
8/// Configuration for connector initialization
9#[derive(Debug, Clone)]
10pub struct ConnectorInitConfig {
11    pub connection_params: HashMap<String, String>,
12    pub timeout_seconds: Option<u64>,
13    pub max_connections: Option<u32>,
14}
15
16impl ConnectorInitConfig {
17    /// Create a new connector configuration
18    pub fn new() -> Self {
19        Self {
20            connection_params: HashMap::new(),
21            timeout_seconds: Some(30),
22            max_connections: Some(10),
23        }
24    }
25    
26    /// Add a connection parameter
27    pub fn with_param(mut self, key: &str, value: &str) -> Self {
28        self.connection_params.insert(key.to_string(), value.to_string());
29        self
30    }
31    
32    /// Set timeout in seconds
33    pub fn with_timeout(mut self, seconds: u64) -> Self {
34        self.timeout_seconds = Some(seconds);
35        self
36    }
37    
38    /// Set maximum connections
39    pub fn with_max_connections(mut self, max: u32) -> Self {
40        self.max_connections = Some(max);
41        self
42    }
43}
44
45impl Default for ConnectorInitConfig {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51/// Base trait for all data source connectors
52#[async_trait]
53pub trait Connector: Send + Sync {
54    /// Establish connection to the backend data source
55    async fn connect(&mut self, config: ConnectorInitConfig) -> NirvResult<()>;
56    
57    /// Execute a query against the connected data source
58    async fn execute_query(&self, query: ConnectorQuery) -> NirvResult<QueryResult>;
59    
60    /// Retrieve schema information for a specific data object
61    async fn get_schema(&self, object_name: &str) -> NirvResult<Schema>;
62    
63    /// Close connection and cleanup resources
64    async fn disconnect(&mut self) -> NirvResult<()>;
65    
66    /// Get the type of this connector
67    fn get_connector_type(&self) -> ConnectorType;
68    
69    /// Check if this connector supports transactions
70    fn supports_transactions(&self) -> bool;
71    
72    /// Check if the connector is currently connected
73    fn is_connected(&self) -> bool;
74    
75    /// Get connector-specific capabilities
76    fn get_capabilities(&self) -> ConnectorCapabilities;
77}
78
79/// Capabilities supported by a connector
80#[derive(Debug, Clone)]
81pub struct ConnectorCapabilities {
82    pub supports_joins: bool,
83    pub supports_aggregations: bool,
84    pub supports_subqueries: bool,
85    pub supports_transactions: bool,
86    pub supports_schema_introspection: bool,
87    pub max_concurrent_queries: Option<u32>,
88}
89
90impl Default for ConnectorCapabilities {
91    fn default() -> Self {
92        Self {
93            supports_joins: false,
94            supports_aggregations: false,
95            supports_subqueries: false,
96            supports_transactions: false,
97            supports_schema_introspection: true,
98            max_concurrent_queries: Some(1),
99        }
100    }
101}
102
103/// Registry for managing connector instances
104pub struct ConnectorRegistry {
105    connectors: HashMap<String, Box<dyn Connector>>,
106}
107
108impl ConnectorRegistry {
109    /// Create a new empty registry
110    pub fn new() -> Self {
111        Self {
112            connectors: HashMap::new(),
113        }
114    }
115    
116    /// Register a connector with a given name
117    pub fn register(&mut self, name: String, connector: Box<dyn Connector>) -> NirvResult<()> {
118        if self.connectors.contains_key(&name) {
119            return Err(crate::utils::error::NirvError::Dispatcher(
120                crate::utils::error::DispatcherError::RegistrationFailed(
121                    format!("Connector '{}' is already registered", name)
122                )
123            ));
124        }
125        
126        self.connectors.insert(name, connector);
127        Ok(())
128    }
129    
130    /// Get a connector by name
131    pub fn get(&self, name: &str) -> Option<&dyn Connector> {
132        self.connectors.get(name).map(|c| c.as_ref())
133    }
134    
135    /// Get a mutable reference to a connector by name
136    pub fn get_mut(&mut self, name: &str) -> Option<&mut Box<dyn Connector>> {
137        self.connectors.get_mut(name)
138    }
139    
140    /// List all registered connector names
141    pub fn list_connectors(&self) -> Vec<String> {
142        self.connectors.keys().cloned().collect()
143    }
144    
145    /// Remove a connector from the registry
146    pub fn unregister(&mut self, name: &str) -> Option<Box<dyn Connector>> {
147        self.connectors.remove(name)
148    }
149    
150    /// Check if a connector is registered
151    pub fn contains(&self, name: &str) -> bool {
152        self.connectors.contains_key(name)
153    }
154    
155    /// Get the number of registered connectors
156    pub fn len(&self) -> usize {
157        self.connectors.len()
158    }
159    
160    /// Check if the registry is empty
161    pub fn is_empty(&self) -> bool {
162        self.connectors.is_empty()
163    }
164}
165
166impl Default for ConnectorRegistry {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::utils::types::ConnectorType;
175
176    #[test]
177    fn test_connector_init_config_creation() {
178        let config = ConnectorInitConfig::new();
179        
180        assert!(config.connection_params.is_empty());
181        assert_eq!(config.timeout_seconds, Some(30));
182        assert_eq!(config.max_connections, Some(10));
183    }
184
185    #[test]
186    fn test_connector_init_config_builder_pattern() {
187        let config = ConnectorInitConfig::new()
188            .with_param("host", "localhost")
189            .with_param("port", "5432")
190            .with_timeout(60)
191            .with_max_connections(20);
192        
193        assert_eq!(config.connection_params.get("host"), Some(&"localhost".to_string()));
194        assert_eq!(config.connection_params.get("port"), Some(&"5432".to_string()));
195        assert_eq!(config.timeout_seconds, Some(60));
196        assert_eq!(config.max_connections, Some(20));
197    }
198
199    #[test]
200    fn test_connector_init_config_default() {
201        let config = ConnectorInitConfig::default();
202        
203        assert!(config.connection_params.is_empty());
204        assert_eq!(config.timeout_seconds, Some(30));
205        assert_eq!(config.max_connections, Some(10));
206    }
207
208    #[test]
209    fn test_connector_capabilities_default() {
210        let capabilities = ConnectorCapabilities::default();
211        
212        assert!(!capabilities.supports_joins);
213        assert!(!capabilities.supports_aggregations);
214        assert!(!capabilities.supports_subqueries);
215        assert!(!capabilities.supports_transactions);
216        assert!(capabilities.supports_schema_introspection);
217        assert_eq!(capabilities.max_concurrent_queries, Some(1));
218    }
219
220    #[test]
221    fn test_connector_registry_creation() {
222        let registry = ConnectorRegistry::new();
223        
224        assert!(registry.is_empty());
225        assert_eq!(registry.len(), 0);
226        assert!(registry.list_connectors().is_empty());
227    }
228
229    #[test]
230    fn test_connector_registry_default() {
231        let registry = ConnectorRegistry::default();
232        
233        assert!(registry.is_empty());
234        assert_eq!(registry.len(), 0);
235    }
236
237    // Mock connector for testing registry functionality
238    struct TestConnector {
239        connector_type: ConnectorType,
240        connected: bool,
241    }
242
243    impl TestConnector {
244        fn new(connector_type: ConnectorType) -> Self {
245            Self {
246                connector_type,
247                connected: false,
248            }
249        }
250    }
251
252    #[async_trait]
253    impl Connector for TestConnector {
254        async fn connect(&mut self, _config: ConnectorInitConfig) -> NirvResult<()> {
255            self.connected = true;
256            Ok(())
257        }
258
259        async fn execute_query(&self, _query: ConnectorQuery) -> NirvResult<QueryResult> {
260            Ok(QueryResult::new())
261        }
262
263        async fn get_schema(&self, _object_name: &str) -> NirvResult<Schema> {
264            Ok(Schema {
265                name: "test".to_string(),
266                columns: vec![],
267                primary_key: None,
268                indexes: vec![],
269            })
270        }
271
272        async fn disconnect(&mut self) -> NirvResult<()> {
273            self.connected = false;
274            Ok(())
275        }
276
277        fn get_connector_type(&self) -> ConnectorType {
278            self.connector_type.clone()
279        }
280
281        fn supports_transactions(&self) -> bool {
282            false
283        }
284
285        fn is_connected(&self) -> bool {
286            self.connected
287        }
288
289        fn get_capabilities(&self) -> ConnectorCapabilities {
290            ConnectorCapabilities::default()
291        }
292    }
293
294    #[test]
295    fn test_connector_registry_register_and_get() {
296        let mut registry = ConnectorRegistry::new();
297        let connector = Box::new(TestConnector::new(ConnectorType::Mock));
298        
299        // Register connector
300        let result = registry.register("test_connector".to_string(), connector);
301        assert!(result.is_ok());
302        
303        // Check registry state
304        assert!(!registry.is_empty());
305        assert_eq!(registry.len(), 1);
306        assert!(registry.contains("test_connector"));
307        
308        // Get connector
309        let retrieved = registry.get("test_connector");
310        assert!(retrieved.is_some());
311        assert_eq!(retrieved.unwrap().get_connector_type(), ConnectorType::Mock);
312        
313        // List connectors
314        let connectors = registry.list_connectors();
315        assert_eq!(connectors.len(), 1);
316        assert!(connectors.contains(&"test_connector".to_string()));
317    }
318
319    #[test]
320    fn test_connector_registry_duplicate_registration() {
321        let mut registry = ConnectorRegistry::new();
322        let connector1 = Box::new(TestConnector::new(ConnectorType::Mock));
323        let connector2 = Box::new(TestConnector::new(ConnectorType::PostgreSQL));
324        
325        // Register first connector
326        let result1 = registry.register("test_connector".to_string(), connector1);
327        assert!(result1.is_ok());
328        
329        // Try to register with same name
330        let result2 = registry.register("test_connector".to_string(), connector2);
331        assert!(result2.is_err());
332        
333        // Should still have only one connector
334        assert_eq!(registry.len(), 1);
335    }
336
337    #[test]
338    fn test_connector_registry_get_mut() {
339        let mut registry = ConnectorRegistry::new();
340        let connector = Box::new(TestConnector::new(ConnectorType::Mock));
341        
342        registry.register("test_connector".to_string(), connector).unwrap();
343        
344        // Get mutable reference
345        let connector_mut = registry.get_mut("test_connector");
346        assert!(connector_mut.is_some());
347        
348        // Get non-existent connector
349        let non_existent = registry.get_mut("non_existent");
350        assert!(non_existent.is_none());
351    }
352
353    #[test]
354    fn test_connector_registry_unregister() {
355        let mut registry = ConnectorRegistry::new();
356        let connector = Box::new(TestConnector::new(ConnectorType::Mock));
357        
358        registry.register("test_connector".to_string(), connector).unwrap();
359        assert_eq!(registry.len(), 1);
360        
361        // Unregister connector
362        let removed = registry.unregister("test_connector");
363        assert!(removed.is_some());
364        assert_eq!(registry.len(), 0);
365        assert!(registry.is_empty());
366        
367        // Try to unregister non-existent connector
368        let non_existent = registry.unregister("non_existent");
369        assert!(non_existent.is_none());
370    }
371
372    #[test]
373    fn test_connector_registry_get_non_existent() {
374        let registry = ConnectorRegistry::new();
375        
376        let connector = registry.get("non_existent");
377        assert!(connector.is_none());
378        
379        assert!(!registry.contains("non_existent"));
380    }
381}