1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use tokio::signal;
5use tokio::task::JoinHandle;
6use tokio::net::TcpListener;
7
8use crate::{
9 engine::{
10 Dispatcher, DefaultDispatcher,
11 QueryParser, DefaultQueryParser,
12 QueryPlanner, DefaultQueryPlanner,
13 QueryExecutor, DefaultQueryExecutor,
14 },
15 protocol::{ProtocolAdapter, ProtocolType},
16 connectors::{ConnectorRegistry, Connector},
17 utils::{
18 config::{EngineConfig, ProtocolConfig, ProtocolType as ConfigProtocolType},
19 error::{NirvResult, NirvError},
20 types::QueryResult,
21 },
22};
23
24pub struct Engine {
26 config: EngineConfig,
28 query_parser: Arc<dyn QueryParser>,
30 query_planner: Arc<dyn QueryPlanner>,
32 query_executor: Arc<RwLock<dyn QueryExecutor>>,
34 dispatcher: Arc<RwLock<dyn Dispatcher>>,
36 protocol_adapters: HashMap<ProtocolType, Arc<dyn ProtocolAdapter>>,
38 server_tasks: Vec<JoinHandle<()>>,
40 shutdown_signal: Option<tokio::sync::broadcast::Sender<()>>,
42}
43
44impl Engine {
45 pub fn new(config: EngineConfig) -> Self {
47 let query_parser = Arc::new(DefaultQueryParser::new().expect("Failed to create query parser"));
48 let query_planner = Arc::new(DefaultQueryPlanner::new());
49 let query_executor = Arc::new(RwLock::new(DefaultQueryExecutor::new()));
50 let dispatcher = Arc::new(RwLock::new(DefaultDispatcher::new()));
51
52 Self {
53 config,
54 query_parser,
55 query_planner,
56 query_executor,
57 dispatcher,
58 protocol_adapters: HashMap::new(),
59 server_tasks: Vec::new(),
60 shutdown_signal: None,
61 }
62 }
63
64 pub fn with_components(
66 config: EngineConfig,
67 query_parser: Arc<dyn QueryParser>,
68 query_planner: Arc<dyn QueryPlanner>,
69 query_executor: Arc<RwLock<dyn QueryExecutor>>,
70 dispatcher: Arc<RwLock<dyn Dispatcher>>,
71 ) -> Self {
72 Self {
73 config,
74 query_parser,
75 query_planner,
76 query_executor,
77 dispatcher,
78 protocol_adapters: HashMap::new(),
79 server_tasks: Vec::new(),
80 shutdown_signal: None,
81 }
82 }
83
84 pub async fn initialize(&mut self) -> NirvResult<()> {
86 let connector_registry = self.initialize_connectors().await?;
88
89 {
91 let mut executor = self.query_executor.write().await;
92 executor.set_connector_registry(connector_registry);
93 }
94
95 self.initialize_protocol_adapters().await?;
97
98 if !self.config.protocol_adapters.is_empty() {
100 self.start_protocol_servers().await?;
101 }
102
103 Ok(())
104 }
105
106 async fn initialize_connectors(&mut self) -> NirvResult<ConnectorRegistry> {
108 let mut registry = ConnectorRegistry::new();
109
110 for (name, connector_config) in &self.config.connectors {
111 let connector = self.create_connector(connector_config)?;
114 registry.register(name.clone(), connector)?;
115 }
116
117 Ok(registry)
121 }
122
123 fn create_connector(&self, _config: &crate::utils::config::ConnectorConfig) -> NirvResult<Box<dyn Connector>> {
125 use crate::connectors::MockConnector;
128 Ok(Box::new(MockConnector::new()))
129 }
130
131 async fn initialize_protocol_adapters(&mut self) -> NirvResult<()> {
133 for protocol_config in &self.config.protocol_adapters {
134 let adapter = self.create_protocol_adapter(protocol_config)?;
135 let protocol_type = match protocol_config.protocol_type {
136 ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
137 ConfigProtocolType::MySQL => ProtocolType::MySQL,
138 ConfigProtocolType::SQLite => ProtocolType::SQLite,
139 };
140 self.protocol_adapters.insert(protocol_type, adapter);
141 }
142 Ok(())
143 }
144
145 fn create_protocol_adapter(&self, config: &ProtocolConfig) -> NirvResult<Arc<dyn ProtocolAdapter>> {
147 match config.protocol_type {
148 ConfigProtocolType::PostgreSQL => {
149 use crate::protocol::PostgreSQLProtocolAdapter;
150 Ok(Arc::new(PostgreSQLProtocolAdapter::new()))
151 }
152 ConfigProtocolType::MySQL => {
153 use crate::protocol::MySQLProtocolAdapter;
154 Ok(Arc::new(MySQLProtocolAdapter::new()))
155 }
156 ConfigProtocolType::SQLite => {
157 use crate::protocol::SQLiteProtocolAdapter;
158 Ok(Arc::new(SQLiteProtocolAdapter::new()))
159 }
160 }
161 }
162
163 async fn start_protocol_servers(&mut self) -> NirvResult<()> {
165 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
166 self.shutdown_signal = Some(shutdown_tx.clone());
167
168 for protocol_config in &self.config.protocol_adapters {
169 let protocol_type = match protocol_config.protocol_type {
170 ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
171 ConfigProtocolType::MySQL => ProtocolType::MySQL,
172 ConfigProtocolType::SQLite => ProtocolType::SQLite,
173 };
174
175 let adapter = self.protocol_adapters
176 .get(&protocol_type)
177 .ok_or_else(|| NirvError::Internal(
178 format!("Protocol adapter not found: {:?}", protocol_config.protocol_type)
179 ))?
180 .clone();
181
182 let bind_address = format!("{}:{}", protocol_config.bind_address, protocol_config.port);
183 let listener = TcpListener::bind(&bind_address).await
184 .map_err(|e| NirvError::Internal(
185 format!("Failed to bind to {}: {}", bind_address, e)
186 ))?;
187
188 let engine_ref = EngineRef {
189 query_parser: self.query_parser.clone(),
190 query_planner: self.query_planner.clone(),
191 query_executor: self.query_executor.clone(),
192 dispatcher: self.dispatcher.clone(),
193 };
194
195 let mut shutdown_rx = shutdown_tx.subscribe();
196 let task = tokio::spawn(async move {
197 loop {
198 tokio::select! {
199 result = listener.accept() => {
200 match result {
201 Ok((stream, _addr)) => {
202 let adapter_clone = adapter.clone();
203 let engine_clone = engine_ref.clone();
204 tokio::spawn(async move {
205 if let Err(e) = Self::handle_client_connection(
206 adapter_clone,
207 engine_clone,
208 stream
209 ).await {
210 eprintln!("Client connection error: {}", e);
211 }
212 });
213 }
214 Err(e) => {
215 eprintln!("Failed to accept connection: {}", e);
216 }
217 }
218 }
219 _ = shutdown_rx.recv() => {
220 break;
221 }
222 }
223 }
224 });
225
226 self.server_tasks.push(task);
227 }
228
229 Ok(())
230 }
231
232 async fn handle_client_connection(
234 adapter: Arc<dyn ProtocolAdapter>,
235 _engine: EngineRef,
236 stream: tokio::net::TcpStream,
237 ) -> NirvResult<()> {
238 let mut connection = adapter.accept_connection(stream).await?;
240
241 loop {
246 break;
249 }
250
251 adapter.terminate_connection(&mut connection).await?;
253
254 Ok(())
255 }
256
257 pub async fn execute_query(&self, query_string: &str) -> NirvResult<QueryResult> {
259 let internal_query = self.query_parser.parse_sql(query_string).await?;
261
262 let dispatcher = self.dispatcher.read().await;
264 let connector_queries = dispatcher.route_query(&internal_query).await?;
265
266 dispatcher.execute_distributed_query(connector_queries).await
268 }
269
270 pub async fn register_connector(&self, object_type: &str, connector: Box<dyn Connector>) -> NirvResult<()> {
272 let mut dispatcher = self.dispatcher.write().await;
273 dispatcher.register_connector(object_type, connector).await
274 }
275
276 pub async fn initialize_for_testing(&mut self) -> NirvResult<()> {
278 let connector_registry = self.initialize_connectors().await?;
280
281 {
283 let mut executor = self.query_executor.write().await;
284 executor.set_connector_registry(connector_registry);
285 }
286
287 self.initialize_protocol_adapters().await?;
289
290 Ok(())
291 }
292
293 pub async fn list_available_types(&self) -> Vec<String> {
295 let dispatcher = self.dispatcher.read().await;
296 dispatcher.list_available_types()
297 }
298
299 pub async fn shutdown(&mut self) -> NirvResult<()> {
301 if let Some(shutdown_tx) = &self.shutdown_signal {
303 let _ = shutdown_tx.send(());
304 }
305
306 for task in self.server_tasks.drain(..) {
308 let _ = task.await;
309 }
310
311 Ok(())
315 }
316
317 pub async fn wait_for_shutdown(&self) -> NirvResult<()> {
319 signal::ctrl_c().await
320 .map_err(|e| NirvError::Internal(format!("Failed to listen for shutdown signal: {}", e)))?;
321 Ok(())
322 }
323}
324
325#[derive(Clone)]
327struct EngineRef {
328 query_parser: Arc<dyn QueryParser>,
329 query_planner: Arc<dyn QueryPlanner>,
330 query_executor: Arc<RwLock<dyn QueryExecutor>>,
331 dispatcher: Arc<RwLock<dyn Dispatcher>>,
332}
333
334pub struct EngineBuilder {
336 config: Option<EngineConfig>,
337 query_parser: Option<Arc<dyn QueryParser>>,
338 query_planner: Option<Arc<dyn QueryPlanner>>,
339 query_executor: Option<Arc<RwLock<dyn QueryExecutor>>>,
340 dispatcher: Option<Arc<RwLock<dyn Dispatcher>>>,
341}
342
343impl EngineBuilder {
344 pub fn new() -> Self {
346 Self {
347 config: None,
348 query_parser: None,
349 query_planner: None,
350 query_executor: None,
351 dispatcher: None,
352 }
353 }
354
355 pub fn with_config(mut self, config: EngineConfig) -> Self {
357 self.config = Some(config);
358 self
359 }
360
361 pub fn with_query_parser(mut self, parser: Arc<dyn QueryParser>) -> Self {
363 self.query_parser = Some(parser);
364 self
365 }
366
367 pub fn with_query_planner(mut self, planner: Arc<dyn QueryPlanner>) -> Self {
369 self.query_planner = Some(planner);
370 self
371 }
372
373 pub fn with_query_executor(mut self, executor: Arc<RwLock<dyn QueryExecutor>>) -> Self {
375 self.query_executor = Some(executor);
376 self
377 }
378
379 pub fn with_dispatcher(mut self, dispatcher: Arc<RwLock<dyn Dispatcher>>) -> Self {
381 self.dispatcher = Some(dispatcher);
382 self
383 }
384
385 pub fn build(self) -> NirvResult<Engine> {
387 let config = self.config.unwrap_or_else(EngineConfig::default);
388
389 if let (Some(parser), Some(planner), Some(executor), Some(dispatcher)) = (
390 self.query_parser,
391 self.query_planner,
392 self.query_executor,
393 self.dispatcher,
394 ) {
395 Ok(Engine::with_components(config, parser, planner, executor, dispatcher))
396 } else {
397 Ok(Engine::new(config))
398 }
399 }
400}
401
402impl Default for EngineBuilder {
403 fn default() -> Self {
404 Self::new()
405 }
406}