nirv_engine/connectors/
connector_trait.rs1use async_trait::async_trait;
2use std::collections::HashMap;
3use crate::utils::{
4 types::{ConnectorType, ConnectorQuery, QueryResult, Schema},
5 error::NirvResult,
6};
7
8#[derive(Debug, Clone)]
10pub struct ConnectorInitConfig {
11 pub connection_params: HashMap<String, String>,
12 pub timeout_seconds: Option<u64>,
13 pub max_connections: Option<u32>,
14}
15
16impl ConnectorInitConfig {
17 pub fn new() -> Self {
19 Self {
20 connection_params: HashMap::new(),
21 timeout_seconds: Some(30),
22 max_connections: Some(10),
23 }
24 }
25
26 pub fn with_param(mut self, key: &str, value: &str) -> Self {
28 self.connection_params.insert(key.to_string(), value.to_string());
29 self
30 }
31
32 pub fn with_timeout(mut self, seconds: u64) -> Self {
34 self.timeout_seconds = Some(seconds);
35 self
36 }
37
38 pub fn with_max_connections(mut self, max: u32) -> Self {
40 self.max_connections = Some(max);
41 self
42 }
43}
44
45impl Default for ConnectorInitConfig {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51#[async_trait]
53pub trait Connector: Send + Sync {
54 async fn connect(&mut self, config: ConnectorInitConfig) -> NirvResult<()>;
56
57 async fn execute_query(&self, query: ConnectorQuery) -> NirvResult<QueryResult>;
59
60 async fn get_schema(&self, object_name: &str) -> NirvResult<Schema>;
62
63 async fn disconnect(&mut self) -> NirvResult<()>;
65
66 fn get_connector_type(&self) -> ConnectorType;
68
69 fn supports_transactions(&self) -> bool;
71
72 fn is_connected(&self) -> bool;
74
75 fn get_capabilities(&self) -> ConnectorCapabilities;
77}
78
79#[derive(Debug, Clone)]
81pub struct ConnectorCapabilities {
82 pub supports_joins: bool,
83 pub supports_aggregations: bool,
84 pub supports_subqueries: bool,
85 pub supports_transactions: bool,
86 pub supports_schema_introspection: bool,
87 pub max_concurrent_queries: Option<u32>,
88}
89
90impl Default for ConnectorCapabilities {
91 fn default() -> Self {
92 Self {
93 supports_joins: false,
94 supports_aggregations: false,
95 supports_subqueries: false,
96 supports_transactions: false,
97 supports_schema_introspection: true,
98 max_concurrent_queries: Some(1),
99 }
100 }
101}
102
103pub struct ConnectorRegistry {
105 connectors: HashMap<String, Box<dyn Connector>>,
106}
107
108impl ConnectorRegistry {
109 pub fn new() -> Self {
111 Self {
112 connectors: HashMap::new(),
113 }
114 }
115
116 pub fn register(&mut self, name: String, connector: Box<dyn Connector>) -> NirvResult<()> {
118 if self.connectors.contains_key(&name) {
119 return Err(crate::utils::error::NirvError::Dispatcher(
120 crate::utils::error::DispatcherError::RegistrationFailed(
121 format!("Connector '{}' is already registered", name)
122 )
123 ));
124 }
125
126 self.connectors.insert(name, connector);
127 Ok(())
128 }
129
130 pub fn get(&self, name: &str) -> Option<&dyn Connector> {
132 self.connectors.get(name).map(|c| c.as_ref())
133 }
134
135 pub fn get_mut(&mut self, name: &str) -> Option<&mut Box<dyn Connector>> {
137 self.connectors.get_mut(name)
138 }
139
140 pub fn list_connectors(&self) -> Vec<String> {
142 self.connectors.keys().cloned().collect()
143 }
144
145 pub fn unregister(&mut self, name: &str) -> Option<Box<dyn Connector>> {
147 self.connectors.remove(name)
148 }
149
150 pub fn contains(&self, name: &str) -> bool {
152 self.connectors.contains_key(name)
153 }
154
155 pub fn len(&self) -> usize {
157 self.connectors.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.connectors.is_empty()
163 }
164}
165
166impl Default for ConnectorRegistry {
167 fn default() -> Self {
168 Self::new()
169 }
170}
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::utils::types::ConnectorType;
175
176 #[test]
177 fn test_connector_init_config_creation() {
178 let config = ConnectorInitConfig::new();
179
180 assert!(config.connection_params.is_empty());
181 assert_eq!(config.timeout_seconds, Some(30));
182 assert_eq!(config.max_connections, Some(10));
183 }
184
185 #[test]
186 fn test_connector_init_config_builder_pattern() {
187 let config = ConnectorInitConfig::new()
188 .with_param("host", "localhost")
189 .with_param("port", "5432")
190 .with_timeout(60)
191 .with_max_connections(20);
192
193 assert_eq!(config.connection_params.get("host"), Some(&"localhost".to_string()));
194 assert_eq!(config.connection_params.get("port"), Some(&"5432".to_string()));
195 assert_eq!(config.timeout_seconds, Some(60));
196 assert_eq!(config.max_connections, Some(20));
197 }
198
199 #[test]
200 fn test_connector_init_config_default() {
201 let config = ConnectorInitConfig::default();
202
203 assert!(config.connection_params.is_empty());
204 assert_eq!(config.timeout_seconds, Some(30));
205 assert_eq!(config.max_connections, Some(10));
206 }
207
208 #[test]
209 fn test_connector_capabilities_default() {
210 let capabilities = ConnectorCapabilities::default();
211
212 assert!(!capabilities.supports_joins);
213 assert!(!capabilities.supports_aggregations);
214 assert!(!capabilities.supports_subqueries);
215 assert!(!capabilities.supports_transactions);
216 assert!(capabilities.supports_schema_introspection);
217 assert_eq!(capabilities.max_concurrent_queries, Some(1));
218 }
219
220 #[test]
221 fn test_connector_registry_creation() {
222 let registry = ConnectorRegistry::new();
223
224 assert!(registry.is_empty());
225 assert_eq!(registry.len(), 0);
226 assert!(registry.list_connectors().is_empty());
227 }
228
229 #[test]
230 fn test_connector_registry_default() {
231 let registry = ConnectorRegistry::default();
232
233 assert!(registry.is_empty());
234 assert_eq!(registry.len(), 0);
235 }
236
237 struct TestConnector {
239 connector_type: ConnectorType,
240 connected: bool,
241 }
242
243 impl TestConnector {
244 fn new(connector_type: ConnectorType) -> Self {
245 Self {
246 connector_type,
247 connected: false,
248 }
249 }
250 }
251
252 #[async_trait]
253 impl Connector for TestConnector {
254 async fn connect(&mut self, _config: ConnectorInitConfig) -> NirvResult<()> {
255 self.connected = true;
256 Ok(())
257 }
258
259 async fn execute_query(&self, _query: ConnectorQuery) -> NirvResult<QueryResult> {
260 Ok(QueryResult::new())
261 }
262
263 async fn get_schema(&self, _object_name: &str) -> NirvResult<Schema> {
264 Ok(Schema {
265 name: "test".to_string(),
266 columns: vec![],
267 primary_key: None,
268 indexes: vec![],
269 })
270 }
271
272 async fn disconnect(&mut self) -> NirvResult<()> {
273 self.connected = false;
274 Ok(())
275 }
276
277 fn get_connector_type(&self) -> ConnectorType {
278 self.connector_type.clone()
279 }
280
281 fn supports_transactions(&self) -> bool {
282 false
283 }
284
285 fn is_connected(&self) -> bool {
286 self.connected
287 }
288
289 fn get_capabilities(&self) -> ConnectorCapabilities {
290 ConnectorCapabilities::default()
291 }
292 }
293
294 #[test]
295 fn test_connector_registry_register_and_get() {
296 let mut registry = ConnectorRegistry::new();
297 let connector = Box::new(TestConnector::new(ConnectorType::Mock));
298
299 let result = registry.register("test_connector".to_string(), connector);
301 assert!(result.is_ok());
302
303 assert!(!registry.is_empty());
305 assert_eq!(registry.len(), 1);
306 assert!(registry.contains("test_connector"));
307
308 let retrieved = registry.get("test_connector");
310 assert!(retrieved.is_some());
311 assert_eq!(retrieved.unwrap().get_connector_type(), ConnectorType::Mock);
312
313 let connectors = registry.list_connectors();
315 assert_eq!(connectors.len(), 1);
316 assert!(connectors.contains(&"test_connector".to_string()));
317 }
318
319 #[test]
320 fn test_connector_registry_duplicate_registration() {
321 let mut registry = ConnectorRegistry::new();
322 let connector1 = Box::new(TestConnector::new(ConnectorType::Mock));
323 let connector2 = Box::new(TestConnector::new(ConnectorType::PostgreSQL));
324
325 let result1 = registry.register("test_connector".to_string(), connector1);
327 assert!(result1.is_ok());
328
329 let result2 = registry.register("test_connector".to_string(), connector2);
331 assert!(result2.is_err());
332
333 assert_eq!(registry.len(), 1);
335 }
336
337 #[test]
338 fn test_connector_registry_get_mut() {
339 let mut registry = ConnectorRegistry::new();
340 let connector = Box::new(TestConnector::new(ConnectorType::Mock));
341
342 registry.register("test_connector".to_string(), connector).unwrap();
343
344 let connector_mut = registry.get_mut("test_connector");
346 assert!(connector_mut.is_some());
347
348 let non_existent = registry.get_mut("non_existent");
350 assert!(non_existent.is_none());
351 }
352
353 #[test]
354 fn test_connector_registry_unregister() {
355 let mut registry = ConnectorRegistry::new();
356 let connector = Box::new(TestConnector::new(ConnectorType::Mock));
357
358 registry.register("test_connector".to_string(), connector).unwrap();
359 assert_eq!(registry.len(), 1);
360
361 let removed = registry.unregister("test_connector");
363 assert!(removed.is_some());
364 assert_eq!(registry.len(), 0);
365 assert!(registry.is_empty());
366
367 let non_existent = registry.unregister("non_existent");
369 assert!(non_existent.is_none());
370 }
371
372 #[test]
373 fn test_connector_registry_get_non_existent() {
374 let registry = ConnectorRegistry::new();
375
376 let connector = registry.get("non_existent");
377 assert!(connector.is_none());
378
379 assert!(!registry.contains("non_existent"));
380 }
381}