Skip to main content

aegis_server/
state.rs

1//! Aegis Server State
2//!
3//! Application state shared across request handlers. Provides access to
4//! database engines, query engine, and configuration.
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use crate::activity::ActivityLogger;
10use crate::admin::AdminService;
11use crate::auth::{AuthService, RbacManager};
12use crate::breach::{BreachDetector, WebhookNotifier};
13use crate::config::ServerConfig;
14use crate::consent::ConsentManager;
15use crate::gdpr::GdprService;
16use crate::handlers::{MetricsDataPoint, ServerSettings};
17use crate::middleware::RateLimiter;
18use aegis_document::{Document, DocumentEngine};
19use aegis_query::executor::{ExecutionContext, ExecutionContextSnapshot};
20use aegis_query::planner::{PlanNode, PlannerSchema};
21use aegis_query::{Executor, Parser, Planner, Statement};
22use aegis_shield::ShieldEngine;
23use aegis_streaming::StreamingEngine;
24use aegis_timeseries::TimeSeriesEngine;
25use aegis_updates::orchestrator::UpdateOrchestrator;
26use aegis_vault::AegisVault;
27use chrono::Utc;
28use parking_lot::RwLock as SyncRwLock;
29use std::collections::{HashMap, VecDeque};
30use std::path::PathBuf;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use tokio::sync::RwLock;
34
35// =============================================================================
36// Application State
37// =============================================================================
38
39/// Shared application state with real engine integrations.
40#[derive(Clone)]
41pub struct AppState {
42    pub config: Arc<ServerConfig>,
43    pub query_engine: Arc<QueryEngine>,
44    pub document_engine: Arc<DocumentEngine>,
45    pub timeseries_engine: Arc<TimeSeriesEngine>,
46    pub streaming_engine: Arc<StreamingEngine>,
47    pub kv_store: Arc<KvStore>,
48    pub metrics: Arc<RwLock<Metrics>>,
49    pub admin: Arc<AdminService>,
50    pub auth: Arc<AuthService>,
51    pub activity: Arc<ActivityLogger>,
52    pub settings: Arc<RwLock<ServerSettings>>,
53    pub metrics_history: Arc<RwLock<VecDeque<MetricsDataPoint>>>,
54    pub graph_store: Arc<GraphStore>,
55    pub rbac: Arc<RbacManager>,
56    pub rate_limiter: Arc<RateLimiter>,
57    pub login_rate_limiter: Arc<RateLimiter>,
58    pub gdpr: Arc<GdprService>,
59    pub consent_manager: Arc<ConsentManager>,
60    pub breach_detector: Arc<BreachDetector>,
61    pub update_orchestrator: Arc<UpdateOrchestrator>,
62    pub vault: Arc<AegisVault>,
63    pub shield: Arc<ShieldEngine>,
64    data_dir: Option<PathBuf>,
65}
66
67impl AppState {
68    /// Create new application state with the given configuration.
69    /// Admin credentials are resolved from environment variables only.
70    /// For vault-backed credentials, use `with_secrets` instead.
71    pub fn new(config: ServerConfig) -> Self {
72        Self::with_secrets(config, None)
73    }
74
75    /// Create new application state with secrets provider for admin credentials.
76    pub fn with_secrets(
77        config: ServerConfig,
78        secrets: Option<&dyn crate::secrets::SecretsProvider>,
79    ) -> Self {
80        let data_dir = config.data_dir.as_ref().map(PathBuf::from);
81
82        // Create activity logger with persistence if data directory is configured
83        let activity = if let Some(ref dir) = data_dir {
84            let audit_dir = dir.join("audit_logs");
85            match ActivityLogger::with_persistence(audit_dir.clone()) {
86                Ok(logger) => {
87                    tracing::info!("Audit logging enabled with persistence to {:?}", audit_dir);
88                    Arc::new(logger)
89                }
90                Err(e) => {
91                    tracing::error!("Failed to initialize persistent audit logging: {}. Falling back to in-memory only.", e);
92                    Arc::new(ActivityLogger::new())
93                }
94            }
95        } else {
96            Arc::new(ActivityLogger::new())
97        };
98
99        // Create data directory if specified
100        if let Some(ref dir) = data_dir {
101            if let Err(e) = std::fs::create_dir_all(dir) {
102                tracing::error!("Failed to create data directory {:?}: {}", dir, e);
103            }
104        }
105
106        // Initialize engines
107        let document_engine = Arc::new(DocumentEngine::new());
108        let kv_store = Arc::new(KvStore::with_data_dir(data_dir.clone()));
109
110        // Load persisted data if data directory is specified
111        if let Some(ref dir) = data_dir {
112            // Load document collections
113            let docs_dir = dir.join("documents");
114            if docs_dir.exists() {
115                if let Ok(entries) = std::fs::read_dir(&docs_dir) {
116                    for entry in entries.flatten() {
117                        let path = entry.path();
118                        if path.extension().is_some_and(|e| e == "json") {
119                            if let Some(collection_name) = path.file_stem().and_then(|s| s.to_str())
120                            {
121                                if let Ok(data) = std::fs::read_to_string(&path) {
122                                    if let Ok(docs) =
123                                        serde_json::from_str::<Vec<serde_json::Value>>(&data)
124                                    {
125                                        let _ = document_engine.create_collection(collection_name);
126                                        let mut count = 0;
127                                        for doc_json in docs {
128                                            let doc = json_to_document(doc_json);
129                                            if document_engine.insert(collection_name, doc).is_ok()
130                                            {
131                                                count += 1;
132                                            }
133                                        }
134                                        tracing::info!(
135                                            "Loaded {} documents into collection '{}'",
136                                            count,
137                                            collection_name
138                                        );
139                                    }
140                                }
141                            }
142                        }
143                    }
144                }
145            }
146        }
147
148        // Log server startup
149        let node_name_display = config
150            .node_name
151            .as_ref()
152            .map(|n| format!(" ({})", n))
153            .unwrap_or_default();
154        activity.log_system(&format!(
155            "Aegis DB server started - Node: {}{}",
156            config.node_id, node_name_display
157        ));
158
159        // Create graph store with persistence
160        let graph_store = Arc::new(GraphStore::with_data_dir(data_dir.clone()));
161
162        // Initialize metrics history with some data points
163        let metrics_history = Arc::new(RwLock::new(VecDeque::new()));
164
165        // Start metrics collection background task
166        let metrics_history_clone = metrics_history.clone();
167        tokio::spawn(async move {
168            Self::collect_metrics_loop(metrics_history_clone).await;
169        });
170
171        // Create admin service with cluster config
172        let admin = Arc::new(AdminService::with_config(
173            &config.node_id,
174            config.node_name.clone(),
175            &config.address(),
176            &config.cluster_name,
177            config.peers.clone(),
178        ));
179
180        // Create rate limiters using config values
181        let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit_per_minute));
182        let login_rate_limiter = Arc::new(RateLimiter::new(config.login_rate_limit_per_minute));
183
184        // Create query engine with persistence if data_dir is configured
185        let query_engine = match &data_dir {
186            Some(dir) => Arc::new(QueryEngine::with_persistence(dir)),
187            None => Arc::new(QueryEngine::new()),
188        };
189
190        // Set peer addresses for mutation replication
191        if !config.peers.is_empty() {
192            query_engine.set_peers(config.peers.clone());
193        }
194
195        // Create breach detector with optional webhook notifier
196        let breach_detector = Arc::new(BreachDetector::with_data_dir(data_dir.clone()));
197        if let Ok(webhook_url) = std::env::var("AEGIS_BREACH_WEBHOOK_URL") {
198            if !webhook_url.is_empty() {
199                tracing::info!("Breach webhook notification enabled: {}", webhook_url);
200                breach_detector.register_notifier(Box::new(WebhookNotifier::new(&webhook_url)));
201            }
202        }
203
204        // Initialize update orchestrator
205        let update_orchestrator = {
206            let binary_path =
207                std::env::current_exe().unwrap_or_else(|_| PathBuf::from("aegis-server"));
208            let base_dir = data_dir
209                .as_ref()
210                .map(|d| d.clone())
211                .unwrap_or_else(|| PathBuf::from("/tmp/aegis-updates"));
212            Arc::new(UpdateOrchestrator::new(
213                binary_path,
214                base_dir.join("staging"),
215                base_dir.join("backups"),
216            ))
217        };
218
219        Self {
220            config: Arc::new(config),
221            query_engine,
222            document_engine,
223            timeseries_engine: Arc::new({
224                let ts_config = aegis_timeseries::engine::EngineConfig {
225                    data_path: data_dir.as_ref().map(|d| d.join("timeseries")),
226                    ..Default::default()
227                };
228                TimeSeriesEngine::with_config(ts_config)
229            }),
230            streaming_engine: Arc::new(StreamingEngine::new()),
231            kv_store,
232            metrics: Arc::new(RwLock::new(Metrics::default())),
233            admin,
234            auth: Arc::new(AuthService::with_data_dir_and_secrets(data_dir.clone(), secrets)),
235            activity,
236            settings: Arc::new(RwLock::new({
237                let mut loaded_settings = ServerSettings::default();
238                if let Some(ref dir) = data_dir {
239                    let settings_path = dir.join("settings.json");
240                    if settings_path.exists() {
241                        match std::fs::read_to_string(&settings_path) {
242                            Ok(contents) => {
243                                match serde_json::from_str::<ServerSettings>(&contents) {
244                                    Ok(s) => {
245                                        tracing::info!("Loaded server settings from disk");
246                                        loaded_settings = s;
247                                    }
248                                    Err(e) => {
249                                        tracing::error!(
250                                            "Failed to parse settings from {}: {}",
251                                            settings_path.display(),
252                                            e
253                                        );
254                                    }
255                                }
256                            }
257                            Err(e) => {
258                                tracing::error!(
259                                    "Failed to read settings from {}: {}",
260                                    settings_path.display(),
261                                    e
262                                );
263                            }
264                        }
265                    }
266                }
267                loaded_settings
268            })),
269            metrics_history,
270            graph_store,
271            rbac: Arc::new(RbacManager::with_data_dir(data_dir.clone())),
272            rate_limiter,
273            login_rate_limiter,
274            gdpr: Arc::new(GdprService::new()),
275            consent_manager: Arc::new(ConsentManager::with_data_dir(data_dir.clone())),
276            breach_detector,
277            update_orchestrator,
278            vault: Arc::new(AegisVault::new_auto(
279                data_dir.as_ref().map(|d| d.join("vault")),
280            )),
281            shield: Arc::new(ShieldEngine::new(aegis_shield::ShieldConfig::default())),
282            data_dir,
283        }
284    }
285
286    /// Save server settings to disk (if data_dir is configured).
287    pub async fn save_settings(&self) {
288        let Some(ref dir) = self.data_dir else {
289            return;
290        };
291        let path = dir.join("settings.json");
292        let settings = self.settings.read().await;
293        match serde_json::to_string_pretty(&*settings) {
294            Ok(json) => {
295                if let Err(e) = std::fs::write(&path, json) {
296                    tracing::error!("Failed to write settings to {}: {}", path.display(), e);
297                }
298            }
299            Err(e) => {
300                tracing::error!("Failed to serialize settings: {}", e);
301            }
302        }
303    }
304
305    /// Flush a single document collection to disk (if data_dir is configured).
306    pub fn flush_collection(&self, collection_name: &str) {
307        let Some(ref dir) = self.data_dir else { return };
308        let docs_dir = dir.join("documents");
309        if let Err(e) = std::fs::create_dir_all(&docs_dir) {
310            tracing::error!("Failed to create documents dir: {}", e);
311            return;
312        }
313        let query = aegis_document::Query::new();
314        if let Ok(result) = self.document_engine.find(collection_name, &query) {
315            let docs: Vec<serde_json::Value> =
316                result.documents.iter().map(document_to_json).collect();
317            let path = docs_dir.join(format!("{}.json", collection_name));
318            match serde_json::to_string_pretty(&docs) {
319                Ok(json) => {
320                    if let Err(e) = std::fs::write(&path, json) {
321                        tracing::error!("Failed to write collection '{}': {}", collection_name, e);
322                    }
323                }
324                Err(e) => tracing::error!(
325                    "Failed to serialize collection '{}': {}",
326                    collection_name,
327                    e
328                ),
329            }
330        }
331    }
332
333    /// Save all data to disk (if data_dir is configured).
334    pub fn save_to_disk(&self) -> std::io::Result<()> {
335        let Some(ref dir) = self.data_dir else {
336            return Ok(());
337        };
338
339        // KV and documents already flush per-mutation, so periodic save
340        // uses compact JSON as a consistency checkpoint
341        let kv_path = dir.join("kv_store.json");
342        let entries = self.kv_store.list(None, usize::MAX);
343        let json = serde_json::to_string(&entries)?;
344        std::fs::write(&kv_path, json)?;
345
346        let docs_dir = dir.join("documents");
347        std::fs::create_dir_all(&docs_dir)?;
348        for collection_name in self.document_engine.list_collections() {
349            let query = aegis_document::Query::new();
350            if let Ok(result) = self.document_engine.find(&collection_name, &query) {
351                let docs: Vec<serde_json::Value> =
352                    result.documents.iter().map(document_to_json).collect();
353                let json = serde_json::to_string(&docs)?;
354                let path = docs_dir.join(format!("{}.json", collection_name));
355                std::fs::write(&path, json)?;
356            }
357        }
358
359        // SQL tables — flush all contexts
360        self.query_engine.flush();
361
362        // Flush timeseries data
363        self.timeseries_engine.flush();
364        tracing::debug!("Flushed timeseries data to disk");
365
366        // Flush audit logs
367        if let Err(e) = self.activity.flush() {
368            tracing::error!("Failed to flush audit logs: {}", e);
369        }
370        tracing::debug!("Flushed audit logs to disk");
371
372        Ok(())
373    }
374
375    /// Execute a SQL query against the specified database.
376    pub async fn execute_query(
377        &self,
378        sql: &str,
379        database: Option<&str>,
380    ) -> Result<QueryResult, QueryError> {
381        let result = self.query_engine.execute(sql, database)?;
382        let db_name = database.unwrap_or("default");
383        // Replicate mutations to peers and emit CDC events
384        if QueryEngine::is_mutation(sql) {
385            self.query_engine.replicate_to_peers(sql, db_name);
386            self.emit_cdc_event(sql, db_name, result.rows_affected);
387        }
388        Ok(result)
389    }
390
391    /// Emit a CDC event to the streaming engine for a SQL mutation.
392    fn emit_cdc_event(&self, sql: &str, database: &str, rows_affected: u64) {
393        use aegis_streaming::cdc::{ChangeEvent, ChangeSource, ChangeType};
394
395        // Extract table name and change type from SQL
396        let sql_trimmed = sql.trim();
397        let (change_type, table_name) = if sql_trimmed.len() > 6 {
398            let upper = sql_trimmed[..12.min(sql_trimmed.len())].to_uppercase();
399            if upper.starts_with("INSERT") {
400                (ChangeType::Insert, extract_table_after(sql_trimmed, "INTO"))
401            } else if upper.starts_with("UPDATE") {
402                (
403                    ChangeType::Update,
404                    extract_table_after(sql_trimmed, "UPDATE"),
405                )
406            } else if upper.starts_with("DELETE") {
407                (ChangeType::Delete, extract_table_after(sql_trimmed, "FROM"))
408            } else if upper.starts_with("TRUNCATE") {
409                (
410                    ChangeType::Truncate,
411                    extract_table_after(sql_trimmed, "TRUNCATE"),
412                )
413            } else {
414                return; // DDL — skip CDC
415            }
416        } else {
417            return;
418        };
419
420        let source = ChangeSource::new(database, &table_name);
421        let change = ChangeEvent {
422            change_type,
423            source,
424            timestamp: std::time::SystemTime::now()
425                .duration_since(std::time::UNIX_EPOCH)
426                .unwrap_or_default()
427                .as_millis() as u64,
428            key: None,
429            before: None,
430            after: Some(serde_json::json!({
431                "sql": sql,
432                "rows_affected": rows_affected,
433            })),
434            metadata: std::collections::HashMap::new(),
435        };
436
437        // Publish to a CDC channel named "cdc.{database}.{table}"
438        let channel_name = format!("cdc.{}.{}", database, table_name);
439        let channel_id = aegis_streaming::channel::ChannelId::new(&channel_name);
440
441        // Auto-create channel if it doesn't exist
442        let _ = self.streaming_engine.create_channel(channel_name.clone());
443
444        if let Err(e) = self.streaming_engine.publish_change(&channel_id, change) {
445            tracing::debug!("CDC publish to {}: {}", channel_name, e);
446        }
447    }
448
449    /// Execute a query that was received via replication (skip re-replication).
450    pub async fn execute_query_replicated(
451        &self,
452        sql: &str,
453        database: Option<&str>,
454    ) -> Result<QueryResult, QueryError> {
455        self.query_engine.execute(sql, database)
456    }
457
458    /// Execute a SQL query with bound parameters.
459    pub async fn execute_query_with_params(
460        &self,
461        sql: &str,
462        database: Option<&str>,
463        params: &[serde_json::Value],
464    ) -> Result<QueryResult, QueryError> {
465        self.query_engine.execute_with_params(sql, database, params)
466    }
467
468    /// Record a request metric.
469    pub async fn record_request(&self, duration_ms: u64, success: bool) {
470        let mut metrics = self.metrics.write().await;
471        metrics.total_requests += 1;
472        metrics.total_duration_ms += duration_ms;
473        if !success {
474            metrics.failed_requests += 1;
475        }
476    }
477
478    /// Background task to collect real system metrics periodically.
479    async fn collect_metrics_loop(metrics_history: Arc<RwLock<VecDeque<MetricsDataPoint>>>) {
480        use sysinfo::{Networks, System};
481
482        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
483        let mut sys = System::new_all();
484        let mut networks = Networks::new_with_refreshed_list();
485        let mut last_bytes_in: u64 = 0;
486        let mut last_bytes_out: u64 = 0;
487
488        // Initialize network bytes
489        for data in networks.list().values() {
490            last_bytes_in += data.total_received();
491            last_bytes_out += data.total_transmitted();
492        }
493
494        loop {
495            interval.tick().await;
496
497            // Refresh system info
498            sys.refresh_all();
499            networks.refresh();
500
501            let now = Utc::now().timestamp();
502
503            // Calculate CPU usage (average across all CPUs)
504            let cpu_percent = sys
505                .cpus()
506                .iter()
507                .map(|cpu| cpu.cpu_usage() as f64)
508                .sum::<f64>()
509                / sys.cpus().len().max(1) as f64;
510
511            // Calculate memory usage
512            let memory_total = sys.total_memory();
513            let memory_used = sys.used_memory();
514            let memory_percent = if memory_total > 0 {
515                (memory_used as f64 / memory_total as f64) * 100.0
516            } else {
517                0.0
518            };
519
520            // Calculate network throughput
521            let mut current_bytes_in: u64 = 0;
522            let mut current_bytes_out: u64 = 0;
523            for data in networks.list().values() {
524                current_bytes_in += data.total_received();
525                current_bytes_out += data.total_transmitted();
526            }
527
528            let bytes_in = current_bytes_in.saturating_sub(last_bytes_in);
529            let bytes_out = current_bytes_out.saturating_sub(last_bytes_out);
530            last_bytes_in = current_bytes_in;
531            last_bytes_out = current_bytes_out;
532
533            // Get process count as proxy for connections
534            let connections = sys.processes().len() as u64;
535
536            let point = MetricsDataPoint {
537                timestamp: now,
538                cpu_percent,
539                memory_percent,
540                queries_per_second: 0.0, // Will be updated by actual query tracking
541                latency_ms: 0.0,         // Will be updated by actual query tracking
542                connections,
543                bytes_in,
544                bytes_out,
545            };
546
547            let mut history = metrics_history.write().await;
548
549            // Keep last 30 days of minute-resolution data (43200 points)
550            if history.len() >= 43200 {
551                history.pop_front();
552            }
553            history.push_back(point);
554        }
555    }
556
557    /// Initialize metrics history - starts empty, will be populated by real metrics collection.
558    pub async fn init_metrics_history(&self) {
559        // Metrics history starts empty and is populated by the background collection task
560        // with real system metrics. No fake historical data is generated.
561        let history = self.metrics_history.write().await;
562        tracing::info!(
563            "Metrics history initialized (currently {} data points)",
564            history.len()
565        );
566    }
567
568    /// Get comprehensive database statistics.
569    pub fn get_database_stats(&self) -> DatabaseStats {
570        // Count KV entries
571        let total_keys = self.kv_store.count();
572
573        // Count documents across all collections
574        let collections = self.document_engine.list_collections();
575        let total_documents: usize = collections
576            .iter()
577            .filter_map(|name| self.document_engine.collection_stats(name))
578            .map(|stats| stats.document_count)
579            .sum();
580
581        // Get engine stats
582        let engine_stats = self.document_engine.stats();
583
584        DatabaseStats {
585            total_keys,
586            total_documents,
587            collection_count: collections.len(),
588            documents_inserted: engine_stats.documents_inserted,
589            documents_updated: engine_stats.documents_updated,
590            documents_deleted: engine_stats.documents_deleted,
591            queries_executed: engine_stats.queries_executed,
592        }
593    }
594}
595
596/// Comprehensive database statistics.
597#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
598pub struct DatabaseStats {
599    pub total_keys: usize,
600    pub total_documents: usize,
601    pub collection_count: usize,
602    pub documents_inserted: u64,
603    pub documents_updated: u64,
604    pub documents_deleted: u64,
605    pub queries_executed: u64,
606}
607
608// =============================================================================
609// Key-Value Store
610// =============================================================================
611
612/// In-memory key-value store with real persistence.
613pub struct KvStore {
614    data: SyncRwLock<HashMap<String, KvEntry>>,
615    data_dir: Option<PathBuf>,
616}
617
618/// Key-value entry with metadata.
619#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
620pub struct KvEntry {
621    pub key: String,
622    pub value: serde_json::Value,
623    pub ttl: Option<u64>,
624    pub created_at: chrono::DateTime<chrono::Utc>,
625    pub updated_at: chrono::DateTime<chrono::Utc>,
626}
627
628impl KvStore {
629    pub fn new() -> Self {
630        Self::with_data_dir(None)
631    }
632
633    pub fn with_data_dir(data_dir: Option<PathBuf>) -> Self {
634        let mut entries = HashMap::new();
635
636        if let Some(ref dir) = data_dir {
637            let kv_path = dir.join("kv_store.json");
638            if kv_path.exists() {
639                if let Ok(data) = std::fs::read_to_string(&kv_path) {
640                    if let Ok(loaded) = serde_json::from_str::<Vec<KvEntry>>(&data) {
641                        for entry in loaded {
642                            entries.insert(entry.key.clone(), entry);
643                        }
644                        tracing::info!("Loaded {} KV entries from disk", entries.len());
645                    }
646                }
647            }
648        }
649
650        Self {
651            data: SyncRwLock::new(entries),
652            data_dir,
653        }
654    }
655
656    /// Flush all KV entries to disk as JSON.
657    fn flush_to_disk(&self) {
658        if let Some(ref dir) = self.data_dir {
659            let kv_path = dir.join("kv_store.json");
660            let data = self.data.read();
661            let entries: Vec<&KvEntry> = data.values().collect();
662            match serde_json::to_string(&entries) {
663                Ok(json) => {
664                    if let Err(e) = std::fs::write(&kv_path, json) {
665                        tracing::error!("Failed to flush KV store to {:?}: {}", kv_path, e);
666                    }
667                }
668                Err(e) => {
669                    tracing::error!("Failed to serialize KV store: {}", e);
670                }
671            }
672        }
673    }
674
675    /// Set a key-value pair.
676    pub fn set(&self, key: String, value: serde_json::Value, ttl: Option<u64>) -> KvEntry {
677        let now = chrono::Utc::now();
678        let mut data = self.data.write();
679
680        let entry = if let Some(existing) = data.get(&key) {
681            KvEntry {
682                key: key.clone(),
683                value,
684                ttl,
685                created_at: existing.created_at,
686                updated_at: now,
687            }
688        } else {
689            KvEntry {
690                key: key.clone(),
691                value,
692                ttl,
693                created_at: now,
694                updated_at: now,
695            }
696        };
697
698        data.insert(key, entry.clone());
699        drop(data);
700        self.flush_to_disk();
701        entry
702    }
703
704    /// Get a value by key.
705    pub fn get(&self, key: &str) -> Option<KvEntry> {
706        let data = self.data.read();
707        data.get(key).cloned()
708    }
709
710    /// Delete a key.
711    pub fn delete(&self, key: &str) -> Option<KvEntry> {
712        let mut data = self.data.write();
713        let removed = data.remove(key);
714        drop(data);
715        if removed.is_some() {
716            self.flush_to_disk();
717        }
718        removed
719    }
720
721    /// List all keys with optional prefix filter.
722    pub fn list(&self, prefix: Option<&str>, limit: usize) -> Vec<KvEntry> {
723        let data = self.data.read();
724        let iter = data.values();
725
726        if let Some(p) = prefix {
727            iter.filter(|e| e.key.starts_with(p))
728                .take(limit)
729                .cloned()
730                .collect()
731        } else {
732            iter.take(limit).cloned().collect()
733        }
734    }
735
736    /// Get total count of keys.
737    pub fn count(&self) -> usize {
738        self.data.read().len()
739    }
740}
741
742impl Default for KvStore {
743    fn default() -> Self {
744        Self::new()
745    }
746}
747
748// =============================================================================
749// Graph Store
750// =============================================================================
751
752/// Graph node for visualization.
753#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
754pub struct GraphNode {
755    pub id: String,
756    pub label: String,
757    pub properties: serde_json::Value,
758}
759
760/// Graph edge for visualization.
761#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
762pub struct GraphEdge {
763    pub id: String,
764    pub source: String,
765    pub target: String,
766    pub relationship: String,
767}
768
769/// In-memory graph store.
770pub struct GraphStore {
771    nodes: SyncRwLock<HashMap<String, GraphNode>>,
772    edges: SyncRwLock<HashMap<String, GraphEdge>>,
773    node_counter: AtomicU64,
774    edge_counter: AtomicU64,
775    data_dir: Option<PathBuf>,
776}
777
778#[derive(serde::Serialize, serde::Deserialize)]
779struct GraphSnapshot {
780    nodes: HashMap<String, GraphNode>,
781    edges: HashMap<String, GraphEdge>,
782    node_counter: u64,
783    edge_counter: u64,
784}
785
786impl GraphStore {
787    pub fn new() -> Self {
788        Self::with_data_dir(None)
789    }
790
791    pub fn with_data_dir(data_dir: Option<PathBuf>) -> Self {
792        if let Some(ref dir) = data_dir {
793            let graph_path = dir.join("graph_store.json");
794            if graph_path.exists() {
795                if let Ok(data) = std::fs::read_to_string(&graph_path) {
796                    if let Ok(snapshot) = serde_json::from_str::<GraphSnapshot>(&data) {
797                        tracing::info!(
798                            "Loaded {} graph nodes and {} edges from disk",
799                            snapshot.nodes.len(),
800                            snapshot.edges.len()
801                        );
802                        return Self {
803                            nodes: SyncRwLock::new(snapshot.nodes),
804                            edges: SyncRwLock::new(snapshot.edges),
805                            node_counter: AtomicU64::new(snapshot.node_counter),
806                            edge_counter: AtomicU64::new(snapshot.edge_counter),
807                            data_dir,
808                        };
809                    }
810                }
811            }
812        }
813
814        Self {
815            nodes: SyncRwLock::new(HashMap::new()),
816            edges: SyncRwLock::new(HashMap::new()),
817            node_counter: AtomicU64::new(1),
818            edge_counter: AtomicU64::new(1),
819            data_dir,
820        }
821    }
822
823    /// Flush graph data to disk as JSON.
824    fn flush_to_disk(&self) {
825        if let Some(ref dir) = self.data_dir {
826            let graph_path = dir.join("graph_store.json");
827            let snapshot = GraphSnapshot {
828                nodes: self.nodes.read().clone(),
829                edges: self.edges.read().clone(),
830                node_counter: self.node_counter.load(Ordering::SeqCst),
831                edge_counter: self.edge_counter.load(Ordering::SeqCst),
832            };
833            match serde_json::to_string(&snapshot) {
834                Ok(json) => {
835                    if let Err(e) = std::fs::write(&graph_path, json) {
836                        tracing::error!("Failed to flush graph store to {:?}: {}", graph_path, e);
837                    }
838                }
839                Err(e) => {
840                    tracing::error!("Failed to serialize graph store: {}", e);
841                }
842            }
843        }
844    }
845
846    /// Create a new node.
847    pub fn create_node(&self, label: &str, properties: serde_json::Value) -> GraphNode {
848        let id = format!(
849            "{}:{}",
850            label.to_lowercase(),
851            self.node_counter.fetch_add(1, Ordering::SeqCst)
852        );
853        let node = GraphNode {
854            id: id.clone(),
855            label: label.to_string(),
856            properties,
857        };
858        self.nodes.write().insert(id, node.clone());
859        self.flush_to_disk();
860        node
861    }
862
863    /// Create a new edge between nodes.
864    pub fn create_edge(
865        &self,
866        source: &str,
867        target: &str,
868        relationship: &str,
869    ) -> Result<GraphEdge, String> {
870        let nodes = self.nodes.read();
871        if !nodes.contains_key(source) {
872            return Err(format!("Source node '{}' not found", source));
873        }
874        if !nodes.contains_key(target) {
875            return Err(format!("Target node '{}' not found", target));
876        }
877        drop(nodes);
878
879        let id = format!("e{}", self.edge_counter.fetch_add(1, Ordering::SeqCst));
880        let edge = GraphEdge {
881            id: id.clone(),
882            source: source.to_string(),
883            target: target.to_string(),
884            relationship: relationship.to_string(),
885        };
886        self.edges.write().insert(id, edge.clone());
887        self.flush_to_disk();
888        Ok(edge)
889    }
890
891    /// Get a node by ID.
892    pub fn get_node(&self, id: &str) -> Option<GraphNode> {
893        self.nodes.read().get(id).cloned()
894    }
895
896    /// Delete a node and its edges.
897    /// Lock order: nodes first, then edges (consistent with create_edge).
898    pub fn delete_node(&self, id: &str) -> Result<(), String> {
899        // Acquire both locks in consistent order: nodes → edges
900        let mut nodes = self.nodes.write();
901        let mut edges = self.edges.write();
902
903        if nodes.remove(id).is_none() {
904            return Err(format!("Node '{}' not found", id));
905        }
906        edges.retain(|_, e| e.source != id && e.target != id);
907
908        drop(edges);
909        drop(nodes);
910        self.flush_to_disk();
911        Ok(())
912    }
913
914    /// Delete an edge.
915    pub fn delete_edge(&self, id: &str) -> Result<(), String> {
916        if self.edges.write().remove(id).is_none() {
917            return Err(format!("Edge '{}' not found", id));
918        }
919        self.flush_to_disk();
920        Ok(())
921    }
922
923    /// List all nodes.
924    pub fn list_nodes(&self) -> Vec<GraphNode> {
925        self.nodes.read().values().cloned().collect()
926    }
927
928    /// List all edges.
929    pub fn list_edges(&self) -> Vec<GraphEdge> {
930        self.edges.read().values().cloned().collect()
931    }
932
933    /// Get all nodes and edges.
934    pub fn get_all(&self) -> (Vec<GraphNode>, Vec<GraphEdge>) {
935        (self.list_nodes(), self.list_edges())
936    }
937
938    /// Search nodes by label.
939    pub fn find_by_label(&self, label: &str) -> Vec<GraphNode> {
940        self.nodes
941            .read()
942            .values()
943            .filter(|n| n.label.to_lowercase() == label.to_lowercase())
944            .cloned()
945            .collect()
946    }
947
948    /// Get edges for a node.
949    pub fn get_edges_for_node(&self, node_id: &str) -> Vec<GraphEdge> {
950        self.edges
951            .read()
952            .values()
953            .filter(|e| e.source == node_id || e.target == node_id)
954            .cloned()
955            .collect()
956    }
957}
958
959impl Default for GraphStore {
960    fn default() -> Self {
961        Self::new()
962    }
963}
964
965// =============================================================================
966// Query Engine Wrapper
967// =============================================================================
968
969/// Query engine for executing SQL statements.
970/// Maintains separate ExecutionContexts per database for multi-tenancy.
971/// Now with disk persistence support for crash recovery.
972pub struct QueryEngine {
973    parser: Parser,
974    planner: Planner,
975    /// Map of database name -> ExecutionContext
976    contexts: Arc<std::sync::RwLock<HashMap<String, Arc<std::sync::RwLock<ExecutionContext>>>>>,
977    /// Path to persist SQL tables (if set, enables persistence)
978    data_path: Option<PathBuf>,
979    /// Peer addresses for mutation replication (e.g. ["127.0.0.1:9091", "127.0.0.1:7001"])
980    peers: Arc<std::sync::RwLock<Vec<String>>>,
981    /// Write-ahead log for crash recovery
982    wal: Option<Arc<aegis_storage::wal::WriteAheadLog>>,
983    /// Query plan cache: SQL string -> planned QueryPlan (LRU-style, max 1024 entries)
984    plan_cache: std::sync::RwLock<HashMap<String, aegis_query::QueryPlan>>,
985}
986
987impl QueryEngine {
988    pub fn new() -> Self {
989        let schema = Arc::new(PlannerSchema::new());
990        let mut contexts = HashMap::new();
991        // Create default database
992        contexts.insert(
993            "default".to_string(),
994            Arc::new(std::sync::RwLock::new(ExecutionContext::new())),
995        );
996        Self {
997            parser: Parser::new(),
998            planner: Planner::new(schema),
999            contexts: Arc::new(std::sync::RwLock::new(contexts)),
1000            data_path: None,
1001            peers: Arc::new(std::sync::RwLock::new(Vec::new())),
1002            wal: None,
1003            plan_cache: std::sync::RwLock::new(HashMap::new()),
1004        }
1005    }
1006
1007    /// Create a QueryEngine with persistence to the specified directory.
1008    pub fn with_persistence(data_dir: &std::path::Path) -> Self {
1009        let schema = Arc::new(PlannerSchema::new());
1010        let db_dir = data_dir.join("databases");
1011
1012        // Create databases directory if it doesn't exist
1013        if let Err(e) = std::fs::create_dir_all(&db_dir) {
1014            tracing::warn!("Failed to create databases directory: {}", e);
1015        }
1016
1017        let mut contexts = HashMap::new();
1018
1019        // Load all existing databases from the directory
1020        if let Ok(entries) = std::fs::read_dir(&db_dir) {
1021            for entry in entries.flatten() {
1022                let path = entry.path();
1023                if path.extension().map(|e| e == "json").unwrap_or(false) {
1024                    if let Some(db_name) = path.file_stem().and_then(|s| s.to_str()) {
1025                        match ExecutionContext::load_from_file(&path) {
1026                            Ok(ctx) => {
1027                                tracing::info!("Loaded database '{}' from {:?}", db_name, path);
1028                                contexts.insert(
1029                                    db_name.to_string(),
1030                                    Arc::new(std::sync::RwLock::new(ctx)),
1031                                );
1032                            }
1033                            Err(e) => {
1034                                tracing::warn!(
1035                                    "Failed to load database '{}' from {:?}: {}",
1036                                    db_name,
1037                                    path,
1038                                    e
1039                                );
1040                            }
1041                        }
1042                    }
1043                }
1044            }
1045        }
1046
1047        // Ensure default database exists
1048        if !contexts.contains_key("default") {
1049            contexts.insert(
1050                "default".to_string(),
1051                Arc::new(std::sync::RwLock::new(ExecutionContext::new())),
1052            );
1053        }
1054
1055        // Initialize WAL with crash recovery — replay any committed records
1056        let wal_dir = data_dir.join("wal");
1057        let wal = match aegis_storage::wal::WriteAheadLog::open_and_recover(wal_dir, true) {
1058            Ok((w, recovery)) => {
1059                if recovery.records_processed > 0 {
1060                    tracing::info!(
1061                        "WAL recovery: {} records processed, {} segments scanned, {} incomplete transactions",
1062                        recovery.records_processed,
1063                        recovery.segments_scanned,
1064                        recovery.incomplete_transactions.len(),
1065                    );
1066                } else {
1067                    tracing::info!("WAL initialized (clean startup, no recovery needed)");
1068                }
1069                Some(Arc::new(w))
1070            }
1071            Err(e) => {
1072                tracing::warn!("Failed to initialize WAL: {}. Continuing without WAL.", e);
1073                None
1074            }
1075        };
1076
1077        Self {
1078            parser: Parser::new(),
1079            planner: Planner::new(schema),
1080            contexts: Arc::new(std::sync::RwLock::new(contexts)),
1081            data_path: Some(db_dir),
1082            peers: Arc::new(std::sync::RwLock::new(Vec::new())),
1083            wal,
1084            plan_cache: std::sync::RwLock::new(HashMap::new()),
1085        }
1086    }
1087
1088    /// Set peer addresses for mutation replication.
1089    pub fn set_peers(&self, peers: Vec<String>) {
1090        *self.peers.write().unwrap() = peers;
1091    }
1092
1093    /// Asynchronously replicate a SQL mutation to all peer nodes.
1094    fn replicate_to_peers(&self, sql: &str, database: &str) {
1095        let peers = self.peers.read().unwrap().clone();
1096        if peers.is_empty() {
1097            return;
1098        }
1099
1100        let sql = sql.to_string();
1101        let db = database.to_string();
1102
1103        tokio::spawn(async move {
1104            let client = reqwest::Client::builder()
1105                .timeout(std::time::Duration::from_secs(5))
1106                .build()
1107                .unwrap_or_else(|_| reqwest::Client::new());
1108
1109            for peer in &peers {
1110                let url = format!("http://{}/api/v1/query", peer);
1111                let body = serde_json::json!({
1112                    "sql": sql,
1113                    "database": db,
1114                });
1115
1116                // Retry up to 3 times with exponential backoff
1117                let mut success = false;
1118                for attempt in 0..3u32 {
1119                    if attempt > 0 {
1120                        tokio::time::sleep(std::time::Duration::from_millis(
1121                            100 * 2u64.pow(attempt),
1122                        ))
1123                        .await;
1124                    }
1125                    let resp = client
1126                        .post(&url)
1127                        .header("X-Aegis-Replicated", "true")
1128                        .json(&body)
1129                        .send()
1130                        .await;
1131
1132                    match resp {
1133                        Ok(r) if r.status().is_success() => {
1134                            tracing::debug!(
1135                                "Replicated to {}: {}",
1136                                peer,
1137                                &sql[..sql.len().min(80)]
1138                            );
1139                            success = true;
1140                            break;
1141                        }
1142                        Ok(r) => {
1143                            tracing::warn!(
1144                                "Replication to {} returned {} (attempt {})",
1145                                peer,
1146                                r.status(),
1147                                attempt + 1
1148                            );
1149                        }
1150                        Err(e) => {
1151                            tracing::warn!(
1152                                "Replication to {} failed: {} (attempt {})",
1153                                peer,
1154                                e,
1155                                attempt + 1
1156                            );
1157                        }
1158                    }
1159                }
1160                if !success {
1161                    tracing::error!("Replication to {} failed after 3 attempts", peer);
1162                }
1163            }
1164        });
1165    }
1166
1167    /// Plan a statement, using cache for SELECT queries.
1168    fn plan_cached(&self, stmt: &Statement) -> Result<aegis_query::QueryPlan, QueryError> {
1169        // Only cache read-only queries
1170        let cache_key = if !Self::is_mutation_stmt(stmt) {
1171            Some(format!("{:?}", stmt))
1172        } else {
1173            None
1174        };
1175
1176        // Check cache
1177        if let Some(ref key) = cache_key {
1178            if let Ok(cache) = self.plan_cache.read() {
1179                if let Some(plan) = cache.get(key) {
1180                    return Ok(plan.clone());
1181                }
1182            }
1183        }
1184
1185        // Plan and cache
1186        let plan = self
1187            .planner
1188            .plan(stmt)
1189            .map_err(|e| QueryError::Plan(e.to_string()))?;
1190
1191        if let Some(key) = cache_key {
1192            if let Ok(mut cache) = self.plan_cache.write() {
1193                // LRU eviction: if cache is full, clear it
1194                if cache.len() >= 1024 {
1195                    cache.clear();
1196                }
1197                cache.insert(key, plan.clone());
1198            }
1199        }
1200
1201        Ok(plan)
1202    }
1203
1204    /// Get or create an ExecutionContext for the specified database.
1205    fn get_or_create_context(&self, database: &str) -> Arc<std::sync::RwLock<ExecutionContext>> {
1206        let db_name = if database.is_empty() {
1207            "default"
1208        } else {
1209            database
1210        };
1211
1212        // Try to get existing context
1213        {
1214            let contexts = self.contexts.read().unwrap();
1215            if let Some(ctx) = contexts.get(db_name) {
1216                return ctx.clone();
1217            }
1218        }
1219
1220        // Create new context for this database
1221        let mut contexts = self.contexts.write().unwrap();
1222        // Double-check after acquiring write lock
1223        if let Some(ctx) = contexts.get(db_name) {
1224            return ctx.clone();
1225        }
1226
1227        tracing::info!("Creating new database: {}", db_name);
1228        let ctx = Arc::new(std::sync::RwLock::new(ExecutionContext::new()));
1229        contexts.insert(db_name.to_string(), ctx.clone());
1230        ctx
1231    }
1232
1233    /// Persist a specific database to disk.
1234    fn persist(&self, database: &str) {
1235        if let Some(ref base_path) = self.data_path {
1236            let db_name = if database.is_empty() {
1237                "default"
1238            } else {
1239                database
1240            };
1241            let path = base_path.join(format!("{}.json", db_name));
1242
1243            // Write to WAL before persisting snapshot (write-ahead guarantee)
1244            if let Some(ref wal) = self.wal {
1245                use aegis_common::TransactionId;
1246                use aegis_storage::wal::{LogRecord, LogRecordType};
1247
1248                let lsn = wal.next_lsn();
1249                let record = LogRecord {
1250                    lsn,
1251                    prev_lsn: None,
1252                    tx_id: TransactionId(0),
1253                    record_type: LogRecordType::Checkpoint,
1254                    page_id: None,
1255                    data: ::bytes::Bytes::from(format!("persist:{}", db_name)),
1256                };
1257                if let Err(e) = wal.append(record) {
1258                    tracing::warn!("WAL append failed: {}", e);
1259                }
1260            }
1261
1262            let contexts = self.contexts.read().unwrap();
1263            if let Some(ctx) = contexts.get(db_name) {
1264                if let Ok(ctx_guard) = ctx.read() {
1265                    if let Err(e) = ctx_guard.save_to_file(&path) {
1266                        tracing::error!(
1267                            "Failed to persist database '{}' to {:?}: {}",
1268                            db_name,
1269                            path,
1270                            e
1271                        );
1272                    }
1273                }
1274            }
1275        }
1276    }
1277
1278    /// Check if a SQL statement is a mutation (DDL/DML that modifies data).
1279    pub fn is_mutation(sql: &str) -> bool {
1280        let sql_upper = sql.trim().to_uppercase();
1281        sql_upper.starts_with("CREATE")
1282            || sql_upper.starts_with("DROP")
1283            || sql_upper.starts_with("ALTER")
1284            || sql_upper.starts_with("INSERT")
1285            || sql_upper.starts_with("UPDATE")
1286            || sql_upper.starts_with("DELETE")
1287            || sql_upper.starts_with("TRUNCATE")
1288    }
1289
1290    /// Check if a parsed statement is a mutation (DDL/DML).
1291    fn is_mutation_stmt(stmt: &Statement) -> bool {
1292        matches!(
1293            stmt,
1294            Statement::Insert(_)
1295                | Statement::Update(_)
1296                | Statement::Delete(_)
1297                | Statement::CreateTable(_)
1298                | Statement::DropTable(_)
1299                | Statement::AlterTable(_)
1300                | Statement::CreateIndex(_)
1301                | Statement::DropIndex(_)
1302        )
1303    }
1304
1305    /// Check if a statement is DDL (schema-changing) — invalidates plan cache.
1306    fn is_ddl(stmt: &Statement) -> bool {
1307        matches!(
1308            stmt,
1309            Statement::CreateTable(_)
1310                | Statement::DropTable(_)
1311                | Statement::AlterTable(_)
1312                | Statement::CreateIndex(_)
1313                | Statement::DropIndex(_)
1314        )
1315    }
1316
1317    /// Invalidate the plan cache (called after DDL changes).
1318    fn invalidate_plan_cache(&self) {
1319        if let Ok(mut cache) = self.plan_cache.write() {
1320            cache.clear();
1321        }
1322    }
1323
1324    /// Execute a SQL query against the specified database.
1325    /// Supports multi-statement input with transaction control:
1326    /// `BEGIN; INSERT ...; INSERT ...; COMMIT;` executes atomically.
1327    /// On ROLLBACK (explicit or on error inside a transaction), all
1328    /// changes since BEGIN are undone.
1329    pub fn execute(&self, sql: &str, database: Option<&str>) -> Result<QueryResult, QueryError> {
1330        let db_name = database.unwrap_or("default");
1331
1332        let statements = self
1333            .parser
1334            .parse(sql)
1335            .map_err(|e| QueryError::Parse(e.to_string()))?;
1336
1337        if statements.is_empty() {
1338            return Ok(QueryResult {
1339                columns: vec![],
1340                rows: vec![],
1341                rows_affected: 0,
1342            });
1343        }
1344
1345        let context = self.get_or_create_context(db_name);
1346        let executor = Executor::with_shared_context(context.clone());
1347
1348        let mut last_result = QueryResult {
1349            columns: vec![],
1350            rows: vec![],
1351            rows_affected: 0,
1352        };
1353        let mut in_transaction = false;
1354        let mut txn_snapshot: Option<ExecutionContextSnapshot> = None;
1355        let mut had_mutation = false;
1356
1357        for statement in &statements {
1358            let plan = self.plan_cached(statement)?;
1359
1360            match &plan.root {
1361                PlanNode::BeginTransaction => {
1362                    if in_transaction {
1363                        return Err(QueryError::Execute("Already in a transaction".to_string()));
1364                    }
1365                    // Snapshot current state for rollback + MVCC snapshot isolation
1366                    let mut ctx = context
1367                        .write()
1368                        .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1369                    txn_snapshot = Some(ctx.to_snapshot());
1370                    ctx.begin_snapshot();
1371                    drop(ctx);
1372                    in_transaction = true;
1373                    last_result = QueryResult {
1374                        columns: vec!["status".to_string()],
1375                        rows: vec![vec![serde_json::Value::String("BEGIN".to_string())]],
1376                        rows_affected: 0,
1377                    };
1378                }
1379                PlanNode::CommitTransaction => {
1380                    if !in_transaction {
1381                        return Err(QueryError::Execute(
1382                            "No transaction in progress".to_string(),
1383                        ));
1384                    }
1385                    // Commit: release MVCC snapshot, advance version, discard rollback snapshot, persist
1386                    {
1387                        let mut ctx = context
1388                            .write()
1389                            .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1390                        ctx.commit_snapshot();
1391                    }
1392                    txn_snapshot = None;
1393                    in_transaction = false;
1394                    if had_mutation {
1395                        self.persist(db_name);
1396                        had_mutation = false;
1397                    }
1398                    last_result = QueryResult {
1399                        columns: vec!["status".to_string()],
1400                        rows: vec![vec![serde_json::Value::String("COMMIT".to_string())]],
1401                        rows_affected: 0,
1402                    };
1403                }
1404                PlanNode::RollbackTransaction => {
1405                    if !in_transaction {
1406                        return Err(QueryError::Execute(
1407                            "No transaction in progress".to_string(),
1408                        ));
1409                    }
1410                    // Rollback: restore snapshot and release MVCC snapshot
1411                    if let Some(snapshot) = txn_snapshot.take() {
1412                        let mut ctx = context
1413                            .write()
1414                            .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1415                        ctx.restore_from_snapshot(snapshot);
1416                        ctx.rollback_snapshot();
1417                    }
1418                    in_transaction = false;
1419                    had_mutation = false;
1420                    last_result = QueryResult {
1421                        columns: vec!["status".to_string()],
1422                        rows: vec![vec![serde_json::Value::String("ROLLBACK".to_string())]],
1423                        rows_affected: 0,
1424                    };
1425                }
1426                _ => {
1427                    // Regular statement — execute it
1428                    let result = match executor.execute(&plan) {
1429                        Ok(r) => r,
1430                        Err(e) => {
1431                            // If inside a transaction, auto-rollback on error
1432                            if let Some(snapshot) = txn_snapshot.take() {
1433                                if let Ok(mut ctx) = context.write() {
1434                                    ctx.restore_from_snapshot(snapshot);
1435                                    ctx.rollback_snapshot();
1436                                }
1437                                tracing::warn!("Transaction rolled back due to error: {}", e);
1438                            }
1439                            return Err(QueryError::Execute(e.to_string()));
1440                        }
1441                    };
1442
1443                    let is_mut = Self::is_mutation_stmt(statement);
1444                    if is_mut {
1445                        had_mutation = true;
1446                        if Self::is_ddl(statement) {
1447                            self.invalidate_plan_cache();
1448                        }
1449                    }
1450
1451                    last_result = QueryResult {
1452                        columns: result.columns,
1453                        rows: result
1454                            .rows
1455                            .into_iter()
1456                            .map(|r| r.values.into_iter().map(value_to_json).collect())
1457                            .collect(),
1458                        rows_affected: result.rows_affected,
1459                    };
1460
1461                    // Persist immediately if not inside a transaction
1462                    if is_mut && !in_transaction {
1463                        self.persist(db_name);
1464                    }
1465                }
1466            }
1467        }
1468
1469        // If transaction was started but never committed, auto-rollback
1470        if in_transaction {
1471            if let Some(snapshot) = txn_snapshot.take() {
1472                if let Ok(mut ctx) = context.write() {
1473                    ctx.restore_from_snapshot(snapshot);
1474                    ctx.rollback_snapshot();
1475                }
1476            }
1477            return Err(QueryError::Execute(
1478                "Transaction was not committed (missing COMMIT). Changes rolled back.".to_string(),
1479            ));
1480        }
1481
1482        Ok(last_result)
1483    }
1484
1485    /// List all tables in the specified database.
1486    /// Execute a parameterized SQL query.
1487    /// Params are JSON values that replace $1, $2, ... placeholders.
1488    pub fn execute_with_params(
1489        &self,
1490        sql: &str,
1491        database: Option<&str>,
1492        params: &[serde_json::Value],
1493    ) -> Result<QueryResult, QueryError> {
1494        if params.is_empty() {
1495            return self.execute(sql, database);
1496        }
1497
1498        let db_name = database.unwrap_or("default");
1499
1500        let statements = self
1501            .parser
1502            .parse(sql)
1503            .map_err(|e| QueryError::Parse(e.to_string()))?;
1504
1505        if statements.is_empty() {
1506            return Ok(QueryResult {
1507                columns: vec![],
1508                rows: vec![],
1509                rows_affected: 0,
1510            });
1511        }
1512
1513        // Convert JSON params to aegis Values
1514        let values: Vec<aegis_common::Value> = params.iter().map(json_param_to_value).collect();
1515
1516        let statement = &statements[0];
1517        let plan = self
1518            .planner
1519            .plan(statement)
1520            .map_err(|e| QueryError::Plan(e.to_string()))?;
1521
1522        let context = self.get_or_create_context(db_name);
1523        let executor = Executor::with_shared_context(context);
1524        let result = executor
1525            .execute_with_params(&plan, &values)
1526            .map_err(|e| QueryError::Execute(e.to_string()))?;
1527
1528        if Self::is_mutation(sql) {
1529            self.persist(db_name);
1530        }
1531
1532        Ok(QueryResult {
1533            columns: result.columns,
1534            rows: result
1535                .rows
1536                .into_iter()
1537                .map(|r| r.values.into_iter().map(value_to_json).collect())
1538                .collect(),
1539            rows_affected: result.rows_affected,
1540        })
1541    }
1542
1543    pub fn list_tables(&self, database: Option<&str>) -> Vec<String> {
1544        let db_name = database.unwrap_or("default");
1545        let contexts = self.contexts.read().unwrap();
1546        contexts
1547            .get(db_name)
1548            .and_then(|ctx| ctx.read().ok())
1549            .map(|ctx| ctx.list_tables())
1550            .unwrap_or_default()
1551    }
1552
1553    /// List all databases.
1554    pub fn list_databases(&self) -> Vec<String> {
1555        self.contexts
1556            .read()
1557            .map(|contexts| contexts.keys().cloned().collect())
1558            .unwrap_or_default()
1559    }
1560
1561    /// Get table schema information from the specified database.
1562    pub fn get_table_info(&self, name: &str, database: Option<&str>) -> Option<TableInfo> {
1563        let db_name = database.unwrap_or("default");
1564        let contexts = self.contexts.read().ok()?;
1565        let ctx_lock = contexts.get(db_name)?;
1566        let ctx = ctx_lock.read().ok()?;
1567        let schema = ctx.get_table_schema(name)?;
1568        let table_data = ctx.get_table(name)?;
1569        let row_count = table_data.read().ok().map(|t| t.rows.len() as u64);
1570
1571        Some(TableInfo {
1572            name: schema.name.clone(),
1573            columns: schema
1574                .columns
1575                .iter()
1576                .map(|c| ColumnInfo {
1577                    name: c.name.clone(),
1578                    data_type: format!("{:?}", c.data_type),
1579                    nullable: c.nullable,
1580                })
1581                .collect(),
1582            row_count,
1583        })
1584    }
1585
1586    /// Force persist all databases to disk (for graceful shutdown).
1587    pub fn flush(&self) {
1588        if let Some(ref base_path) = self.data_path {
1589            let contexts = self.contexts.read().unwrap();
1590            for (db_name, ctx) in contexts.iter() {
1591                let path = base_path.join(format!("{}.json", db_name));
1592                if let Ok(ctx_guard) = ctx.read() {
1593                    if let Err(e) = ctx_guard.save_to_file(&path) {
1594                        tracing::error!(
1595                            "Failed to persist database '{}' to {:?}: {}",
1596                            db_name,
1597                            path,
1598                            e
1599                        );
1600                    }
1601                }
1602            }
1603        }
1604    }
1605}
1606
1607/// Table information for API response.
1608#[derive(Debug, Clone, serde::Serialize)]
1609pub struct TableInfo {
1610    pub name: String,
1611    pub columns: Vec<ColumnInfo>,
1612    pub row_count: Option<u64>,
1613}
1614
1615/// Column information for API response.
1616#[derive(Debug, Clone, serde::Serialize)]
1617pub struct ColumnInfo {
1618    pub name: String,
1619    pub data_type: String,
1620    pub nullable: bool,
1621}
1622
1623impl Default for QueryEngine {
1624    fn default() -> Self {
1625        Self::new()
1626    }
1627}
1628
1629// =============================================================================
1630// Query Result
1631// =============================================================================
1632
1633/// Result of a query execution.
1634#[derive(Debug, Clone, serde::Serialize)]
1635pub struct QueryResult {
1636    pub columns: Vec<String>,
1637    pub rows: Vec<Vec<serde_json::Value>>,
1638    pub rows_affected: u64,
1639}
1640
1641/// Convert aegis Value to JSON.
1642/// Convert a JSON parameter value to an aegis Value for query binding.
1643fn json_param_to_value(json: &serde_json::Value) -> aegis_common::Value {
1644    match json {
1645        serde_json::Value::Null => aegis_common::Value::Null,
1646        serde_json::Value::Bool(b) => aegis_common::Value::Boolean(*b),
1647        serde_json::Value::Number(n) => {
1648            if let Some(i) = n.as_i64() {
1649                aegis_common::Value::Integer(i)
1650            } else if let Some(f) = n.as_f64() {
1651                aegis_common::Value::Float(f)
1652            } else {
1653                aegis_common::Value::Null
1654            }
1655        }
1656        serde_json::Value::String(s) => aegis_common::Value::String(s.clone()),
1657        _ => aegis_common::Value::String(json.to_string()),
1658    }
1659}
1660
1661/// Extract the table name following a keyword in SQL (e.g., "INTO" in INSERT INTO).
1662fn extract_table_after(sql: &str, keyword: &str) -> String {
1663    let upper = sql.to_uppercase();
1664    if let Some(pos) = upper.find(keyword) {
1665        let after = &sql[pos + keyword.len()..].trim_start();
1666        after
1667            .split(|c: char| c.is_whitespace() || c == '(')
1668            .next()
1669            .unwrap_or("unknown")
1670            .to_string()
1671    } else {
1672        "unknown".to_string()
1673    }
1674}
1675
1676fn value_to_json(value: aegis_common::Value) -> serde_json::Value {
1677    match value {
1678        aegis_common::Value::Null => serde_json::Value::Null,
1679        aegis_common::Value::Boolean(b) => serde_json::Value::Bool(b),
1680        aegis_common::Value::Integer(i) => serde_json::Value::Number(i.into()),
1681        aegis_common::Value::Float(f) => serde_json::Number::from_f64(f)
1682            .map(serde_json::Value::Number)
1683            .unwrap_or(serde_json::Value::Null),
1684        aegis_common::Value::String(s) => serde_json::Value::String(s),
1685        aegis_common::Value::Bytes(b) => serde_json::Value::String(base64_encode(&b)),
1686        aegis_common::Value::Timestamp(t) => serde_json::Value::String(t.to_rfc3339()),
1687        aegis_common::Value::Array(arr) => {
1688            serde_json::Value::Array(arr.into_iter().map(value_to_json).collect())
1689        }
1690        aegis_common::Value::Object(obj) => {
1691            let map: serde_json::Map<String, serde_json::Value> = obj
1692                .into_iter()
1693                .map(|(k, v)| (k, value_to_json(v)))
1694                .collect();
1695            serde_json::Value::Object(map)
1696        }
1697    }
1698}
1699
1700fn base64_encode(data: &[u8]) -> String {
1701    const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1702    let mut result = String::new();
1703
1704    for chunk in data.chunks(3) {
1705        let b0 = chunk[0] as usize;
1706        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
1707        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
1708
1709        result.push(CHARS[b0 >> 2] as char);
1710        result.push(CHARS[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
1711
1712        if chunk.len() > 1 {
1713            result.push(CHARS[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
1714        } else {
1715            result.push('=');
1716        }
1717
1718        if chunk.len() > 2 {
1719            result.push(CHARS[b2 & 0x3f] as char);
1720        } else {
1721            result.push('=');
1722        }
1723    }
1724
1725    result
1726}
1727
1728// =============================================================================
1729// Query Error
1730// =============================================================================
1731
1732/// Errors during query execution.
1733#[derive(Debug, thiserror::Error)]
1734pub enum QueryError {
1735    #[error("Parse error: {0}")]
1736    Parse(String),
1737
1738    #[error("Planning error: {0}")]
1739    Plan(String),
1740
1741    #[error("Execution error: {0}")]
1742    Execute(String),
1743}
1744
1745// =============================================================================
1746// Metrics
1747// =============================================================================
1748
1749/// Server metrics.
1750#[derive(Debug, Default, Clone, serde::Serialize)]
1751pub struct Metrics {
1752    pub total_requests: u64,
1753    pub failed_requests: u64,
1754    pub total_duration_ms: u64,
1755}
1756
1757impl Metrics {
1758    /// Calculate average request duration.
1759    pub fn avg_duration_ms(&self) -> f64 {
1760        if self.total_requests == 0 {
1761            0.0
1762        } else {
1763            self.total_duration_ms as f64 / self.total_requests as f64
1764        }
1765    }
1766
1767    /// Calculate success rate.
1768    pub fn success_rate(&self) -> f64 {
1769        if self.total_requests == 0 {
1770            1.0
1771        } else {
1772            1.0 - (self.failed_requests as f64 / self.total_requests as f64)
1773        }
1774    }
1775}
1776
1777// =============================================================================
1778// Document Persistence Helpers
1779// =============================================================================
1780
1781/// Convert JSON to Document for loading from disk.
1782fn json_to_document(json: serde_json::Value) -> Document {
1783    let mut doc = if let Some(id) = json.get("_id").and_then(|v| v.as_str()) {
1784        Document::with_id(id)
1785    } else {
1786        Document::new()
1787    };
1788
1789    if let serde_json::Value::Object(map) = json {
1790        for (key, value) in map {
1791            if key != "_id" {
1792                doc.set(&key, json_to_doc_value(value));
1793            }
1794        }
1795    }
1796    doc
1797}
1798
1799/// Convert Document to JSON for saving to disk.
1800fn document_to_json(doc: &Document) -> serde_json::Value {
1801    let mut map = serde_json::Map::new();
1802    map.insert(
1803        "_id".to_string(),
1804        serde_json::Value::String(doc.id.to_string()),
1805    );
1806    for (key, value) in &doc.data {
1807        map.insert(key.clone(), doc_value_to_json(value));
1808    }
1809    serde_json::Value::Object(map)
1810}
1811
1812/// Convert JSON value to aegis_document::Value.
1813fn json_to_doc_value(json: serde_json::Value) -> aegis_document::Value {
1814    match json {
1815        serde_json::Value::Null => aegis_document::Value::Null,
1816        serde_json::Value::Bool(b) => aegis_document::Value::Bool(b),
1817        serde_json::Value::Number(n) => {
1818            if let Some(i) = n.as_i64() {
1819                aegis_document::Value::Int(i)
1820            } else if let Some(f) = n.as_f64() {
1821                aegis_document::Value::Float(f)
1822            } else {
1823                aegis_document::Value::Null
1824            }
1825        }
1826        serde_json::Value::String(s) => aegis_document::Value::String(s),
1827        serde_json::Value::Array(arr) => {
1828            aegis_document::Value::Array(arr.into_iter().map(json_to_doc_value).collect())
1829        }
1830        serde_json::Value::Object(map) => aegis_document::Value::Object(
1831            map.into_iter()
1832                .map(|(k, v)| (k, json_to_doc_value(v)))
1833                .collect(),
1834        ),
1835    }
1836}
1837
1838/// Convert aegis_document::Value to JSON.
1839fn doc_value_to_json(value: &aegis_document::Value) -> serde_json::Value {
1840    match value {
1841        aegis_document::Value::Null => serde_json::Value::Null,
1842        aegis_document::Value::Bool(b) => serde_json::Value::Bool(*b),
1843        aegis_document::Value::Int(i) => serde_json::Value::Number((*i).into()),
1844        aegis_document::Value::Float(f) => serde_json::Number::from_f64(*f)
1845            .map(serde_json::Value::Number)
1846            .unwrap_or(serde_json::Value::Null),
1847        aegis_document::Value::String(s) => serde_json::Value::String(s.clone()),
1848        aegis_document::Value::Array(arr) => {
1849            serde_json::Value::Array(arr.iter().map(doc_value_to_json).collect())
1850        }
1851        aegis_document::Value::Object(obj) => {
1852            let map: serde_json::Map<String, serde_json::Value> = obj
1853                .iter()
1854                .map(|(k, v)| (k.clone(), doc_value_to_json(v)))
1855                .collect();
1856            serde_json::Value::Object(map)
1857        }
1858    }
1859}
1860
1861// =============================================================================
1862// Tests
1863// =============================================================================
1864
1865#[cfg(test)]
1866mod tests {
1867    use super::*;
1868
1869    #[test]
1870    fn test_metrics_calculations() {
1871        let mut metrics = Metrics::default();
1872        metrics.total_requests = 100;
1873        metrics.failed_requests = 10;
1874        metrics.total_duration_ms = 5000;
1875
1876        assert_eq!(metrics.avg_duration_ms(), 50.0);
1877        assert!((metrics.success_rate() - 0.9).abs() < 0.001);
1878    }
1879
1880    #[test]
1881    fn test_value_to_json() {
1882        let value = aegis_common::Value::String("test".to_string());
1883        let json = value_to_json(value);
1884        assert_eq!(json, serde_json::Value::String("test".to_string()));
1885    }
1886
1887    #[test]
1888    fn test_kv_store_operations() {
1889        let store = KvStore::new();
1890
1891        // Set
1892        let entry = store.set("key1".to_string(), serde_json::json!("value1"), None);
1893        assert_eq!(entry.key, "key1");
1894        assert_eq!(entry.value, serde_json::json!("value1"));
1895
1896        // Get
1897        let retrieved = store.get("key1").expect("key1 should exist after set");
1898        assert_eq!(retrieved.value, serde_json::json!("value1"));
1899
1900        // List
1901        store.set("key2".to_string(), serde_json::json!("value2"), None);
1902        let all = store.list(None, 100);
1903        assert_eq!(all.len(), 2);
1904
1905        // Delete
1906        let deleted = store.delete("key1");
1907        assert!(deleted.is_some());
1908        assert!(store.get("key1").is_none());
1909    }
1910
1911    #[test]
1912    fn test_transaction_commit() {
1913        let engine = QueryEngine::new();
1914        engine
1915            .execute("CREATE TABLE txn_test (id INT, name VARCHAR(50))", None)
1916            .unwrap();
1917
1918        // Multi-statement transaction with COMMIT
1919        let result = engine.execute(
1920            "BEGIN; INSERT INTO txn_test VALUES (1, 'Alice'); INSERT INTO txn_test VALUES (2, 'Bob'); COMMIT",
1921            None,
1922        ).unwrap();
1923        assert_eq!(
1924            result.rows[0][0],
1925            serde_json::Value::String("COMMIT".to_string())
1926        );
1927
1928        // Data should be visible after commit
1929        let select = engine.execute("SELECT * FROM txn_test", None).unwrap();
1930        assert_eq!(select.rows.len(), 2);
1931    }
1932
1933    #[test]
1934    fn test_transaction_rollback() {
1935        let engine = QueryEngine::new();
1936        engine
1937            .execute("CREATE TABLE txn_rb (id INT, name VARCHAR(50))", None)
1938            .unwrap();
1939        engine
1940            .execute("INSERT INTO txn_rb VALUES (1, 'Original')", None)
1941            .unwrap();
1942
1943        // Transaction with ROLLBACK — inserts should be undone
1944        let result = engine.execute(
1945            "BEGIN; INSERT INTO txn_rb VALUES (2, 'Should vanish'); INSERT INTO txn_rb VALUES (3, 'Also gone'); ROLLBACK",
1946            None,
1947        ).unwrap();
1948        assert_eq!(
1949            result.rows[0][0],
1950            serde_json::Value::String("ROLLBACK".to_string())
1951        );
1952
1953        // Only the original row should remain
1954        let select = engine.execute("SELECT * FROM txn_rb", None).unwrap();
1955        assert_eq!(select.rows.len(), 1);
1956    }
1957
1958    #[test]
1959    fn test_transaction_auto_rollback_on_missing_commit() {
1960        let engine = QueryEngine::new();
1961        engine
1962            .execute("CREATE TABLE txn_nocommit (id INT)", None)
1963            .unwrap();
1964        engine
1965            .execute("INSERT INTO txn_nocommit VALUES (1)", None)
1966            .unwrap();
1967
1968        // BEGIN without COMMIT should error and auto-rollback
1969        let result = engine.execute("BEGIN; INSERT INTO txn_nocommit VALUES (2)", None);
1970        assert!(result.is_err());
1971        assert!(result.unwrap_err().to_string().contains("not committed"));
1972
1973        // Original row should still be there, uncommitted row should not
1974        let select = engine.execute("SELECT * FROM txn_nocommit", None).unwrap();
1975        assert_eq!(select.rows.len(), 1);
1976    }
1977
1978    #[test]
1979    fn test_transaction_auto_rollback_on_error() {
1980        let engine = QueryEngine::new();
1981        engine
1982            .execute("CREATE TABLE txn_err (id INT, name VARCHAR(50))", None)
1983            .unwrap();
1984        engine
1985            .execute("INSERT INTO txn_err VALUES (1, 'Keep')", None)
1986            .unwrap();
1987
1988        // Transaction with an error mid-way — should auto-rollback
1989        let result = engine.execute(
1990            "BEGIN; INSERT INTO txn_err VALUES (2, 'Lose'); INSERT INTO nonexistent VALUES (3, 'Fail'); COMMIT",
1991            None,
1992        );
1993        assert!(result.is_err());
1994
1995        // Only the pre-transaction row should remain
1996        let select = engine.execute("SELECT * FROM txn_err", None).unwrap();
1997        assert_eq!(select.rows.len(), 1);
1998    }
1999
2000    #[test]
2001    fn test_single_statement_still_works() {
2002        let engine = QueryEngine::new();
2003        engine
2004            .execute("CREATE TABLE single (id INT)", None)
2005            .unwrap();
2006        engine
2007            .execute("INSERT INTO single VALUES (1)", None)
2008            .unwrap();
2009        engine
2010            .execute("INSERT INTO single VALUES (2)", None)
2011            .unwrap();
2012
2013        let select = engine.execute("SELECT * FROM single", None).unwrap();
2014        assert_eq!(select.rows.len(), 2);
2015    }
2016
2017    #[test]
2018    fn test_parameterized_insert() {
2019        let engine = QueryEngine::new();
2020        engine
2021            .execute("CREATE TABLE param_test (id INT, name VARCHAR(50))", None)
2022            .unwrap();
2023
2024        // Insert with $1, $2 placeholders
2025        engine
2026            .execute_with_params(
2027                "INSERT INTO param_test VALUES ($1, $2)",
2028                None,
2029                &[serde_json::json!(1), serde_json::json!("Alice")],
2030            )
2031            .unwrap();
2032
2033        engine
2034            .execute_with_params(
2035                "INSERT INTO param_test VALUES ($1, $2)",
2036                None,
2037                &[serde_json::json!(2), serde_json::json!("Bob")],
2038            )
2039            .unwrap();
2040
2041        let select = engine.execute("SELECT * FROM param_test", None).unwrap();
2042        assert_eq!(select.rows.len(), 2);
2043        assert_eq!(
2044            select.rows[0][1],
2045            serde_json::Value::String("Alice".to_string())
2046        );
2047        assert_eq!(
2048            select.rows[1][1],
2049            serde_json::Value::String("Bob".to_string())
2050        );
2051    }
2052
2053    #[test]
2054    fn test_parameterized_select() {
2055        let engine = QueryEngine::new();
2056        engine
2057            .execute("CREATE TABLE param_sel (id INT, name VARCHAR(50))", None)
2058            .unwrap();
2059        engine
2060            .execute("INSERT INTO param_sel VALUES (1, 'Alice')", None)
2061            .unwrap();
2062        engine
2063            .execute("INSERT INTO param_sel VALUES (2, 'Bob')", None)
2064            .unwrap();
2065        engine
2066            .execute("INSERT INTO param_sel VALUES (3, 'Charlie')", None)
2067            .unwrap();
2068
2069        // SELECT with parameterized WHERE
2070        let result = engine
2071            .execute_with_params(
2072                "SELECT * FROM param_sel WHERE id = $1",
2073                None,
2074                &[serde_json::json!(2)],
2075            )
2076            .unwrap();
2077        assert_eq!(result.rows.len(), 1);
2078        assert_eq!(
2079            result.rows[0][1],
2080            serde_json::Value::String("Bob".to_string())
2081        );
2082    }
2083
2084    #[test]
2085    fn test_parameterized_update() {
2086        let engine = QueryEngine::new();
2087        engine
2088            .execute("CREATE TABLE param_upd (id INT, name VARCHAR(50))", None)
2089            .unwrap();
2090        engine
2091            .execute("INSERT INTO param_upd VALUES (1, 'Alice')", None)
2092            .unwrap();
2093
2094        engine
2095            .execute_with_params(
2096                "UPDATE param_upd SET name = $1 WHERE id = $2",
2097                None,
2098                &[serde_json::json!("Alicia"), serde_json::json!(1)],
2099            )
2100            .unwrap();
2101
2102        let result = engine
2103            .execute("SELECT * FROM param_upd WHERE id = 1", None)
2104            .unwrap();
2105        assert_eq!(
2106            result.rows[0][1],
2107            serde_json::Value::String("Alicia".to_string())
2108        );
2109    }
2110}