use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::signal;
use tokio::task::JoinHandle;
use tokio::net::TcpListener;
use crate::{
engine::{
Dispatcher, DefaultDispatcher,
QueryParser, DefaultQueryParser,
QueryPlanner, DefaultQueryPlanner,
QueryExecutor, DefaultQueryExecutor,
},
protocol::{ProtocolAdapter, ProtocolType},
connectors::{ConnectorRegistry, Connector},
utils::{
config::{EngineConfig, ProtocolConfig, ProtocolType as ConfigProtocolType},
error::{NirvResult, NirvError},
types::QueryResult,
},
};
pub struct Engine {
config: EngineConfig,
query_parser: Arc<dyn QueryParser>,
query_planner: Arc<dyn QueryPlanner>,
query_executor: Arc<RwLock<dyn QueryExecutor>>,
dispatcher: Arc<RwLock<dyn Dispatcher>>,
protocol_adapters: HashMap<ProtocolType, Arc<dyn ProtocolAdapter>>,
server_tasks: Vec<JoinHandle<()>>,
shutdown_signal: Option<tokio::sync::broadcast::Sender<()>>,
}
impl Engine {
pub fn new(config: EngineConfig) -> Self {
let query_parser = Arc::new(DefaultQueryParser::new().expect("Failed to create query parser"));
let query_planner = Arc::new(DefaultQueryPlanner::new());
let query_executor = Arc::new(RwLock::new(DefaultQueryExecutor::new()));
let dispatcher = Arc::new(RwLock::new(DefaultDispatcher::new()));
Self {
config,
query_parser,
query_planner,
query_executor,
dispatcher,
protocol_adapters: HashMap::new(),
server_tasks: Vec::new(),
shutdown_signal: None,
}
}
pub fn with_components(
config: EngineConfig,
query_parser: Arc<dyn QueryParser>,
query_planner: Arc<dyn QueryPlanner>,
query_executor: Arc<RwLock<dyn QueryExecutor>>,
dispatcher: Arc<RwLock<dyn Dispatcher>>,
) -> Self {
Self {
config,
query_parser,
query_planner,
query_executor,
dispatcher,
protocol_adapters: HashMap::new(),
server_tasks: Vec::new(),
shutdown_signal: None,
}
}
pub async fn initialize(&mut self) -> NirvResult<()> {
let connector_registry = self.initialize_connectors().await?;
{
let mut executor = self.query_executor.write().await;
executor.set_connector_registry(connector_registry);
}
self.initialize_protocol_adapters().await?;
if !self.config.protocol_adapters.is_empty() {
self.start_protocol_servers().await?;
}
Ok(())
}
async fn initialize_connectors(&mut self) -> NirvResult<ConnectorRegistry> {
let mut registry = ConnectorRegistry::new();
for (name, connector_config) in &self.config.connectors {
let connector = self.create_connector(connector_config)?;
registry.register(name.clone(), connector)?;
}
Ok(registry)
}
fn create_connector(&self, _config: &crate::utils::config::ConnectorConfig) -> NirvResult<Box<dyn Connector>> {
use crate::connectors::MockConnector;
Ok(Box::new(MockConnector::new()))
}
async fn initialize_protocol_adapters(&mut self) -> NirvResult<()> {
for protocol_config in &self.config.protocol_adapters {
let adapter = self.create_protocol_adapter(protocol_config)?;
let protocol_type = match protocol_config.protocol_type {
ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
ConfigProtocolType::MySQL => ProtocolType::MySQL,
ConfigProtocolType::SQLite => ProtocolType::SQLite,
};
self.protocol_adapters.insert(protocol_type, adapter);
}
Ok(())
}
fn create_protocol_adapter(&self, config: &ProtocolConfig) -> NirvResult<Arc<dyn ProtocolAdapter>> {
match config.protocol_type {
ConfigProtocolType::PostgreSQL => {
use crate::protocol::PostgreSQLProtocolAdapter;
Ok(Arc::new(PostgreSQLProtocolAdapter::new()))
}
ConfigProtocolType::MySQL => {
use crate::protocol::MySQLProtocolAdapter;
Ok(Arc::new(MySQLProtocolAdapter::new()))
}
ConfigProtocolType::SQLite => {
use crate::protocol::SQLiteProtocolAdapter;
Ok(Arc::new(SQLiteProtocolAdapter::new()))
}
}
}
async fn start_protocol_servers(&mut self) -> NirvResult<()> {
let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
self.shutdown_signal = Some(shutdown_tx.clone());
for protocol_config in &self.config.protocol_adapters {
let protocol_type = match protocol_config.protocol_type {
ConfigProtocolType::PostgreSQL => ProtocolType::PostgreSQL,
ConfigProtocolType::MySQL => ProtocolType::MySQL,
ConfigProtocolType::SQLite => ProtocolType::SQLite,
};
let adapter = self.protocol_adapters
.get(&protocol_type)
.ok_or_else(|| NirvError::Internal(
format!("Protocol adapter not found: {:?}", protocol_config.protocol_type)
))?
.clone();
let bind_address = format!("{}:{}", protocol_config.bind_address, protocol_config.port);
let listener = TcpListener::bind(&bind_address).await
.map_err(|e| NirvError::Internal(
format!("Failed to bind to {}: {}", bind_address, e)
))?;
let engine_ref = EngineRef {
query_parser: self.query_parser.clone(),
query_planner: self.query_planner.clone(),
query_executor: self.query_executor.clone(),
dispatcher: self.dispatcher.clone(),
};
let mut shutdown_rx = shutdown_tx.subscribe();
let task = tokio::spawn(async move {
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _addr)) => {
let adapter_clone = adapter.clone();
let engine_clone = engine_ref.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_client_connection(
adapter_clone,
engine_clone,
stream
).await {
eprintln!("Client connection error: {}", e);
}
});
}
Err(e) => {
eprintln!("Failed to accept connection: {}", e);
}
}
}
_ = shutdown_rx.recv() => {
break;
}
}
}
});
self.server_tasks.push(task);
}
Ok(())
}
async fn handle_client_connection(
adapter: Arc<dyn ProtocolAdapter>,
_engine: EngineRef,
stream: tokio::net::TcpStream,
) -> NirvResult<()> {
let mut connection = adapter.accept_connection(stream).await?;
loop {
break;
}
adapter.terminate_connection(&mut connection).await?;
Ok(())
}
pub async fn execute_query(&self, query_string: &str) -> NirvResult<QueryResult> {
let internal_query = self.query_parser.parse_sql(query_string).await?;
let dispatcher = self.dispatcher.read().await;
let connector_queries = dispatcher.route_query(&internal_query).await?;
dispatcher.execute_distributed_query(connector_queries).await
}
pub async fn register_connector(&self, object_type: &str, connector: Box<dyn Connector>) -> NirvResult<()> {
let mut dispatcher = self.dispatcher.write().await;
dispatcher.register_connector(object_type, connector).await
}
pub async fn initialize_for_testing(&mut self) -> NirvResult<()> {
let connector_registry = self.initialize_connectors().await?;
{
let mut executor = self.query_executor.write().await;
executor.set_connector_registry(connector_registry);
}
self.initialize_protocol_adapters().await?;
Ok(())
}
pub async fn list_available_types(&self) -> Vec<String> {
let dispatcher = self.dispatcher.read().await;
dispatcher.list_available_types()
}
pub async fn shutdown(&mut self) -> NirvResult<()> {
if let Some(shutdown_tx) = &self.shutdown_signal {
let _ = shutdown_tx.send(());
}
for task in self.server_tasks.drain(..) {
let _ = task.await;
}
Ok(())
}
pub async fn wait_for_shutdown(&self) -> NirvResult<()> {
signal::ctrl_c().await
.map_err(|e| NirvError::Internal(format!("Failed to listen for shutdown signal: {}", e)))?;
Ok(())
}
}
#[derive(Clone)]
struct EngineRef {
query_parser: Arc<dyn QueryParser>,
query_planner: Arc<dyn QueryPlanner>,
query_executor: Arc<RwLock<dyn QueryExecutor>>,
dispatcher: Arc<RwLock<dyn Dispatcher>>,
}
pub struct EngineBuilder {
config: Option<EngineConfig>,
query_parser: Option<Arc<dyn QueryParser>>,
query_planner: Option<Arc<dyn QueryPlanner>>,
query_executor: Option<Arc<RwLock<dyn QueryExecutor>>>,
dispatcher: Option<Arc<RwLock<dyn Dispatcher>>>,
}
impl EngineBuilder {
pub fn new() -> Self {
Self {
config: None,
query_parser: None,
query_planner: None,
query_executor: None,
dispatcher: None,
}
}
pub fn with_config(mut self, config: EngineConfig) -> Self {
self.config = Some(config);
self
}
pub fn with_query_parser(mut self, parser: Arc<dyn QueryParser>) -> Self {
self.query_parser = Some(parser);
self
}
pub fn with_query_planner(mut self, planner: Arc<dyn QueryPlanner>) -> Self {
self.query_planner = Some(planner);
self
}
pub fn with_query_executor(mut self, executor: Arc<RwLock<dyn QueryExecutor>>) -> Self {
self.query_executor = Some(executor);
self
}
pub fn with_dispatcher(mut self, dispatcher: Arc<RwLock<dyn Dispatcher>>) -> Self {
self.dispatcher = Some(dispatcher);
self
}
pub fn build(self) -> NirvResult<Engine> {
let config = self.config.unwrap_or_else(EngineConfig::default);
if let (Some(parser), Some(planner), Some(executor), Some(dispatcher)) = (
self.query_parser,
self.query_planner,
self.query_executor,
self.dispatcher,
) {
Ok(Engine::with_components(config, parser, planner, executor, dispatcher))
} else {
Ok(Engine::new(config))
}
}
}
impl Default for EngineBuilder {
fn default() -> Self {
Self::new()
}
}