nirv_engine/engine/
engine.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use tokio::signal;
5use tokio::task::JoinHandle;
6use tokio::net::TcpListener;
7
8use crate::{
9    engine::{
10        Dispatcher, DefaultDispatcher,
11        QueryParser, DefaultQueryParser,
12        QueryPlanner, DefaultQueryPlanner,
13        QueryExecutor, DefaultQueryExecutor,
14    },
15    protocol::{ProtocolAdapter, ProtocolType},
16    connectors::{ConnectorRegistry, Connector},
17    utils::{
18        config::{EngineConfig, ProtocolConfig, ProtocolType as ConfigProtocolType},
19        error::{NirvResult, NirvError},
20        types::QueryResult,
21    },
22};
23
24/// Main NIRV Engine that coordinates all components
25pub struct Engine {
26    /// Engine configuration
27    config: EngineConfig,
28    /// Query parser for SQL parsing
29    query_parser: Arc<dyn QueryParser>,
30    /// Query planner for execution planning
31    query_planner: Arc<dyn QueryPlanner>,
32    /// Query executor for plan execution
33    query_executor: Arc<RwLock<dyn QueryExecutor>>,
34    /// Dispatcher for routing queries to connectors
35    dispatcher: Arc<RwLock<dyn Dispatcher>>,
36    /// Protocol adapters for client connections
37    protocol_adapters: HashMap<ProtocolType, Arc<dyn ProtocolAdapter>>,
38    /// Running protocol server tasks
39    server_tasks: Vec<JoinHandle<()>>,
40    /// Shutdown signal
41    shutdown_signal: Option<tokio::sync::broadcast::Sender<()>>,
42}
43
44impl Engine {
45    /// Create a new engine with the given configuration
46    pub fn new(config: EngineConfig) -> Self {
47        let query_parser = Arc::new(DefaultQueryParser::new().expect("Failed to create query parser"));
48        let query_planner = Arc::new(DefaultQueryPlanner::new());
49        let query_executor = Arc::new(RwLock::new(DefaultQueryExecutor::new()));
50        let dispatcher = Arc::new(RwLock::new(DefaultDispatcher::new()));
51        
52        Self {
53            config,
54            query_parser,
55            query_planner,
56            query_executor,
57            dispatcher,
58            protocol_adapters: HashMap::new(),
59            server_tasks: Vec::new(),
60            shutdown_signal: None,
61        }
62    }
63    
64    /// Create an engine with custom components
65    pub fn with_components(
66        config: EngineConfig,
67        query_parser: Arc<dyn QueryParser>,
68        query_planner: Arc<dyn QueryPlanner>,
69        query_executor: Arc<RwLock<dyn QueryExecutor>>,
70        dispatcher: Arc<RwLock<dyn Dispatcher>>,
71    ) -> Self {
72        Self {
73            config,
74            query_parser,
75            query_planner,
76            query_executor,
77            dispatcher,
78            protocol_adapters: HashMap::new(),
79            server_tasks: Vec::new(),
80            shutdown_signal: None,
81        }
82    }
83    
84    /// Initialize the engine and start all services
85    pub async fn initialize(&mut self) -> NirvResult<()> {
86        // Initialize connector registry
87        let connector_registry = self.initialize_connectors().await?;
88        
89        // Set connector registry in query executor
90        {
91            let mut executor = self.query_executor.write().await;
92            executor.set_connector_registry(connector_registry);
93        }
94        
95        // Initialize protocol adapters
96        self.initialize_protocol_adapters().await?;
97        
98        // Start protocol servers (only if we have protocol adapters configured)
99        if !self.config.protocol_adapters.is_empty() {
100            self.start_protocol_servers().await?;
101        }
102        
103        Ok(())
104    }
105    
106    /// Initialize connectors from configuration
107    async fn initialize_connectors(&mut self) -> NirvResult<ConnectorRegistry> {
108        let mut registry = ConnectorRegistry::new();
109        
110        for (name, connector_config) in &self.config.connectors {
111            // For MVP, we'll create mock connectors
112            // In future tasks, we'll create actual connector implementations
113            let connector = self.create_connector(connector_config)?;
114            registry.register(name.clone(), connector)?;
115        }
116        
117        // Also register any connectors that were added manually
118        // This ensures the registry is properly initialized even with empty config
119        
120        Ok(registry)
121    }
122    
123    /// Create a connector based on configuration
124    fn create_connector(&self, _config: &crate::utils::config::ConnectorConfig) -> NirvResult<Box<dyn Connector>> {
125        // For MVP, return a mock connector
126        // This will be expanded in future tasks to create actual connectors
127        use crate::connectors::MockConnector;
128        Ok(Box::new(MockConnector::new()))
129    }
130    
131    /// Initialize protocol adapters
132    async fn initialize_protocol_adapters(&mut self) -> NirvResult<()> {
133        for protocol_config in &self.config.protocol_adapters {
134            let adapter = self.create_protocol_adapter(protocol_config)?;
135            let protocol_type = match protocol_config.protocol_type {
136                ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
137                ConfigProtocolType::MySQL => ProtocolType::MySQL,
138                ConfigProtocolType::SQLite => ProtocolType::SQLite,
139            };
140            self.protocol_adapters.insert(protocol_type, adapter);
141        }
142        Ok(())
143    }
144    
145    /// Create a protocol adapter based on configuration
146    fn create_protocol_adapter(&self, config: &ProtocolConfig) -> NirvResult<Arc<dyn ProtocolAdapter>> {
147        match config.protocol_type {
148            ConfigProtocolType::PostgreSQL => {
149                use crate::protocol::PostgreSQLProtocolAdapter;
150                Ok(Arc::new(PostgreSQLProtocolAdapter::new()))
151            }
152            ConfigProtocolType::MySQL => {
153                use crate::protocol::MySQLProtocolAdapter;
154                Ok(Arc::new(MySQLProtocolAdapter::new()))
155            }
156            ConfigProtocolType::SQLite => {
157                use crate::protocol::SQLiteProtocolAdapter;
158                Ok(Arc::new(SQLiteProtocolAdapter::new()))
159            }
160        }
161    }
162    
163    /// Start protocol servers for client connections
164    async fn start_protocol_servers(&mut self) -> NirvResult<()> {
165        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
166        self.shutdown_signal = Some(shutdown_tx.clone());
167        
168        for protocol_config in &self.config.protocol_adapters {
169            let protocol_type = match protocol_config.protocol_type {
170                ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
171                ConfigProtocolType::MySQL => ProtocolType::MySQL,
172                ConfigProtocolType::SQLite => ProtocolType::SQLite,
173            };
174            
175            let adapter = self.protocol_adapters
176                .get(&protocol_type)
177                .ok_or_else(|| NirvError::Internal(
178                    format!("Protocol adapter not found: {:?}", protocol_config.protocol_type)
179                ))?
180                .clone();
181            
182            let bind_address = format!("{}:{}", protocol_config.bind_address, protocol_config.port);
183            let listener = TcpListener::bind(&bind_address).await
184                .map_err(|e| NirvError::Internal(
185                    format!("Failed to bind to {}: {}", bind_address, e)
186                ))?;
187            
188            let engine_ref = EngineRef {
189                query_parser: self.query_parser.clone(),
190                query_planner: self.query_planner.clone(),
191                query_executor: self.query_executor.clone(),
192                dispatcher: self.dispatcher.clone(),
193            };
194            
195            let mut shutdown_rx = shutdown_tx.subscribe();
196            let task = tokio::spawn(async move {
197                loop {
198                    tokio::select! {
199                        result = listener.accept() => {
200                            match result {
201                                Ok((stream, _addr)) => {
202                                    let adapter_clone = adapter.clone();
203                                    let engine_clone = engine_ref.clone();
204                                    tokio::spawn(async move {
205                                        if let Err(e) = Self::handle_client_connection(
206                                            adapter_clone,
207                                            engine_clone,
208                                            stream
209                                        ).await {
210                                            eprintln!("Client connection error: {}", e);
211                                        }
212                                    });
213                                }
214                                Err(e) => {
215                                    eprintln!("Failed to accept connection: {}", e);
216                                }
217                            }
218                        }
219                        _ = shutdown_rx.recv() => {
220                            break;
221                        }
222                    }
223                }
224            });
225            
226            self.server_tasks.push(task);
227        }
228        
229        Ok(())
230    }
231    
232    /// Handle a client connection through a protocol adapter
233    async fn handle_client_connection(
234        adapter: Arc<dyn ProtocolAdapter>,
235        _engine: EngineRef,
236        stream: tokio::net::TcpStream,
237    ) -> NirvResult<()> {
238        // Accept the connection
239        let mut connection = adapter.accept_connection(stream).await?;
240        
241        // For MVP, we'll skip authentication
242        // In future tasks, we'll implement proper authentication
243        
244        // Handle queries in a loop
245        loop {
246            // For MVP, we'll implement a simple query handling loop
247            // In future tasks, we'll implement proper protocol message handling
248            break;
249        }
250        
251        // Terminate the connection
252        adapter.terminate_connection(&mut connection).await?;
253        
254        Ok(())
255    }
256    
257    /// Execute a query through the engine
258    pub async fn execute_query(&self, query_string: &str) -> NirvResult<QueryResult> {
259        // Parse the query
260        let internal_query = self.query_parser.parse_sql(query_string).await?;
261        
262        // Route the query through the dispatcher
263        let dispatcher = self.dispatcher.read().await;
264        let connector_queries = dispatcher.route_query(&internal_query).await?;
265        
266        // Execute the distributed query
267        dispatcher.execute_distributed_query(connector_queries).await
268    }
269    
270    /// Register a connector with the dispatcher
271    pub async fn register_connector(&self, object_type: &str, connector: Box<dyn Connector>) -> NirvResult<()> {
272        let mut dispatcher = self.dispatcher.write().await;
273        dispatcher.register_connector(object_type, connector).await
274    }
275    
276    /// Initialize the engine for testing (without starting protocol servers)
277    pub async fn initialize_for_testing(&mut self) -> NirvResult<()> {
278        // Initialize connector registry
279        let connector_registry = self.initialize_connectors().await?;
280        
281        // Set connector registry in query executor
282        {
283            let mut executor = self.query_executor.write().await;
284            executor.set_connector_registry(connector_registry);
285        }
286        
287        // Initialize protocol adapters but don't start servers
288        self.initialize_protocol_adapters().await?;
289        
290        Ok(())
291    }
292    
293    /// Get available data object types
294    pub async fn list_available_types(&self) -> Vec<String> {
295        let dispatcher = self.dispatcher.read().await;
296        dispatcher.list_available_types()
297    }
298    
299    /// Shutdown the engine gracefully
300    pub async fn shutdown(&mut self) -> NirvResult<()> {
301        // Send shutdown signal to all servers
302        if let Some(shutdown_tx) = &self.shutdown_signal {
303            let _ = shutdown_tx.send(());
304        }
305        
306        // Wait for all server tasks to complete
307        for task in self.server_tasks.drain(..) {
308            let _ = task.await;
309        }
310        
311        // Disconnect all connectors
312        // This would be implemented when we have actual connector implementations
313        
314        Ok(())
315    }
316    
317    /// Wait for shutdown signal (Ctrl+C)
318    pub async fn wait_for_shutdown(&self) -> NirvResult<()> {
319        signal::ctrl_c().await
320            .map_err(|e| NirvError::Internal(format!("Failed to listen for shutdown signal: {}", e)))?;
321        Ok(())
322    }
323}
324
325/// Reference to engine components for use in async tasks
326#[derive(Clone)]
327struct EngineRef {
328    query_parser: Arc<dyn QueryParser>,
329    query_planner: Arc<dyn QueryPlanner>,
330    query_executor: Arc<RwLock<dyn QueryExecutor>>,
331    dispatcher: Arc<RwLock<dyn Dispatcher>>,
332}
333
334/// Builder for creating Engine instances
335pub struct EngineBuilder {
336    config: Option<EngineConfig>,
337    query_parser: Option<Arc<dyn QueryParser>>,
338    query_planner: Option<Arc<dyn QueryPlanner>>,
339    query_executor: Option<Arc<RwLock<dyn QueryExecutor>>>,
340    dispatcher: Option<Arc<RwLock<dyn Dispatcher>>>,
341}
342
343impl EngineBuilder {
344    /// Create a new engine builder
345    pub fn new() -> Self {
346        Self {
347            config: None,
348            query_parser: None,
349            query_planner: None,
350            query_executor: None,
351            dispatcher: None,
352        }
353    }
354    
355    /// Set the engine configuration
356    pub fn with_config(mut self, config: EngineConfig) -> Self {
357        self.config = Some(config);
358        self
359    }
360    
361    /// Set a custom query parser
362    pub fn with_query_parser(mut self, parser: Arc<dyn QueryParser>) -> Self {
363        self.query_parser = Some(parser);
364        self
365    }
366    
367    /// Set a custom query planner
368    pub fn with_query_planner(mut self, planner: Arc<dyn QueryPlanner>) -> Self {
369        self.query_planner = Some(planner);
370        self
371    }
372    
373    /// Set a custom query executor
374    pub fn with_query_executor(mut self, executor: Arc<RwLock<dyn QueryExecutor>>) -> Self {
375        self.query_executor = Some(executor);
376        self
377    }
378    
379    /// Set a custom dispatcher
380    pub fn with_dispatcher(mut self, dispatcher: Arc<RwLock<dyn Dispatcher>>) -> Self {
381        self.dispatcher = Some(dispatcher);
382        self
383    }
384    
385    /// Build the engine
386    pub fn build(self) -> NirvResult<Engine> {
387        let config = self.config.unwrap_or_else(EngineConfig::default);
388        
389        if let (Some(parser), Some(planner), Some(executor), Some(dispatcher)) = (
390            self.query_parser,
391            self.query_planner,
392            self.query_executor,
393            self.dispatcher,
394        ) {
395            Ok(Engine::with_components(config, parser, planner, executor, dispatcher))
396        } else {
397            Ok(Engine::new(config))
398        }
399    }
400}
401
402impl Default for EngineBuilder {
403    fn default() -> Self {
404        Self::new()
405    }
406}