1use crate::activity::ActivityLogger;
10use crate::admin::AdminService;
11use crate::auth::{AuthService, RbacManager};
12use crate::breach::{BreachDetector, WebhookNotifier};
13use crate::config::ServerConfig;
14use crate::consent::ConsentManager;
15use crate::gdpr::GdprService;
16use crate::handlers::{MetricsDataPoint, ServerSettings};
17use crate::middleware::RateLimiter;
18use aegis_document::{Document, DocumentEngine};
19use aegis_query::executor::{ExecutionContext, ExecutionContextSnapshot};
20use aegis_query::planner::{PlanNode, PlannerSchema};
21use aegis_query::{Executor, Parser, Planner, Statement};
22use aegis_shield::ShieldEngine;
23use aegis_streaming::StreamingEngine;
24use aegis_timeseries::TimeSeriesEngine;
25use aegis_updates::orchestrator::UpdateOrchestrator;
26use aegis_vault::AegisVault;
27use chrono::Utc;
28use parking_lot::RwLock as SyncRwLock;
29use std::collections::{HashMap, VecDeque};
30use std::path::PathBuf;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use tokio::sync::RwLock;
34
35#[derive(Clone)]
41pub struct AppState {
42 pub config: Arc<ServerConfig>,
43 pub query_engine: Arc<QueryEngine>,
44 pub document_engine: Arc<DocumentEngine>,
45 pub timeseries_engine: Arc<TimeSeriesEngine>,
46 pub streaming_engine: Arc<StreamingEngine>,
47 pub kv_store: Arc<KvStore>,
48 pub metrics: Arc<RwLock<Metrics>>,
49 pub admin: Arc<AdminService>,
50 pub auth: Arc<AuthService>,
51 pub activity: Arc<ActivityLogger>,
52 pub settings: Arc<RwLock<ServerSettings>>,
53 pub metrics_history: Arc<RwLock<VecDeque<MetricsDataPoint>>>,
54 pub graph_store: Arc<GraphStore>,
55 pub rbac: Arc<RbacManager>,
56 pub rate_limiter: Arc<RateLimiter>,
57 pub login_rate_limiter: Arc<RateLimiter>,
58 pub gdpr: Arc<GdprService>,
59 pub consent_manager: Arc<ConsentManager>,
60 pub breach_detector: Arc<BreachDetector>,
61 pub update_orchestrator: Arc<UpdateOrchestrator>,
62 pub vault: Arc<AegisVault>,
63 pub shield: Arc<ShieldEngine>,
64 data_dir: Option<PathBuf>,
65}
66
67impl AppState {
68 pub fn new(config: ServerConfig) -> Self {
72 Self::with_secrets(config, None)
73 }
74
75 pub fn with_secrets(
77 config: ServerConfig,
78 secrets: Option<&dyn crate::secrets::SecretsProvider>,
79 ) -> Self {
80 let data_dir = config.data_dir.as_ref().map(PathBuf::from);
81
82 let activity = if let Some(ref dir) = data_dir {
84 let audit_dir = dir.join("audit_logs");
85 match ActivityLogger::with_persistence(audit_dir.clone()) {
86 Ok(logger) => {
87 tracing::info!("Audit logging enabled with persistence to {:?}", audit_dir);
88 Arc::new(logger)
89 }
90 Err(e) => {
91 tracing::error!("Failed to initialize persistent audit logging: {}. Falling back to in-memory only.", e);
92 Arc::new(ActivityLogger::new())
93 }
94 }
95 } else {
96 Arc::new(ActivityLogger::new())
97 };
98
99 if let Some(ref dir) = data_dir {
101 if let Err(e) = std::fs::create_dir_all(dir) {
102 tracing::error!("Failed to create data directory {:?}: {}", dir, e);
103 }
104 }
105
106 let document_engine = Arc::new(DocumentEngine::new());
108 let kv_store = Arc::new(KvStore::with_data_dir(data_dir.clone()));
109
110 if let Some(ref dir) = data_dir {
112 let docs_dir = dir.join("documents");
114 if docs_dir.exists() {
115 if let Ok(entries) = std::fs::read_dir(&docs_dir) {
116 for entry in entries.flatten() {
117 let path = entry.path();
118 if path.extension().is_some_and(|e| e == "json") {
119 if let Some(collection_name) = path.file_stem().and_then(|s| s.to_str())
120 {
121 if let Ok(data) = std::fs::read_to_string(&path) {
122 if let Ok(docs) =
123 serde_json::from_str::<Vec<serde_json::Value>>(&data)
124 {
125 let _ = document_engine.create_collection(collection_name);
126 let mut count = 0;
127 for doc_json in docs {
128 let doc = json_to_document(doc_json);
129 if document_engine.insert(collection_name, doc).is_ok()
130 {
131 count += 1;
132 }
133 }
134 tracing::info!(
135 "Loaded {} documents into collection '{}'",
136 count,
137 collection_name
138 );
139 }
140 }
141 }
142 }
143 }
144 }
145 }
146 }
147
148 let node_name_display = config
150 .node_name
151 .as_ref()
152 .map(|n| format!(" ({})", n))
153 .unwrap_or_default();
154 activity.log_system(&format!(
155 "Aegis DB server started - Node: {}{}",
156 config.node_id, node_name_display
157 ));
158
159 let graph_store = Arc::new(GraphStore::with_data_dir(data_dir.clone()));
161
162 let metrics_history = Arc::new(RwLock::new(VecDeque::new()));
164
165 let metrics_history_clone = metrics_history.clone();
167 tokio::spawn(async move {
168 Self::collect_metrics_loop(metrics_history_clone).await;
169 });
170
171 let admin = Arc::new(AdminService::with_config(
173 &config.node_id,
174 config.node_name.clone(),
175 &config.address(),
176 &config.cluster_name,
177 config.peers.clone(),
178 ));
179
180 let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit_per_minute));
182 let login_rate_limiter = Arc::new(RateLimiter::new(config.login_rate_limit_per_minute));
183
184 let query_engine = match &data_dir {
186 Some(dir) => Arc::new(QueryEngine::with_persistence(dir)),
187 None => Arc::new(QueryEngine::new()),
188 };
189
190 if !config.peers.is_empty() {
192 query_engine.set_peers(config.peers.clone());
193 }
194
195 let breach_detector = Arc::new(BreachDetector::with_data_dir(data_dir.clone()));
197 if let Ok(webhook_url) = std::env::var("AEGIS_BREACH_WEBHOOK_URL") {
198 if !webhook_url.is_empty() {
199 tracing::info!("Breach webhook notification enabled: {}", webhook_url);
200 breach_detector.register_notifier(Box::new(WebhookNotifier::new(&webhook_url)));
201 }
202 }
203
204 let update_orchestrator = {
206 let binary_path =
207 std::env::current_exe().unwrap_or_else(|_| PathBuf::from("aegis-server"));
208 let base_dir = data_dir
209 .as_ref()
210 .map(|d| d.clone())
211 .unwrap_or_else(|| PathBuf::from("/tmp/aegis-updates"));
212 Arc::new(UpdateOrchestrator::new(
213 binary_path,
214 base_dir.join("staging"),
215 base_dir.join("backups"),
216 ))
217 };
218
219 Self {
220 config: Arc::new(config),
221 query_engine,
222 document_engine,
223 timeseries_engine: Arc::new({
224 let ts_config = aegis_timeseries::engine::EngineConfig {
225 data_path: data_dir.as_ref().map(|d| d.join("timeseries")),
226 ..Default::default()
227 };
228 TimeSeriesEngine::with_config(ts_config)
229 }),
230 streaming_engine: Arc::new(StreamingEngine::new()),
231 kv_store,
232 metrics: Arc::new(RwLock::new(Metrics::default())),
233 admin,
234 auth: Arc::new(AuthService::with_data_dir_and_secrets(data_dir.clone(), secrets)),
235 activity,
236 settings: Arc::new(RwLock::new({
237 let mut loaded_settings = ServerSettings::default();
238 if let Some(ref dir) = data_dir {
239 let settings_path = dir.join("settings.json");
240 if settings_path.exists() {
241 match std::fs::read_to_string(&settings_path) {
242 Ok(contents) => {
243 match serde_json::from_str::<ServerSettings>(&contents) {
244 Ok(s) => {
245 tracing::info!("Loaded server settings from disk");
246 loaded_settings = s;
247 }
248 Err(e) => {
249 tracing::error!(
250 "Failed to parse settings from {}: {}",
251 settings_path.display(),
252 e
253 );
254 }
255 }
256 }
257 Err(e) => {
258 tracing::error!(
259 "Failed to read settings from {}: {}",
260 settings_path.display(),
261 e
262 );
263 }
264 }
265 }
266 }
267 loaded_settings
268 })),
269 metrics_history,
270 graph_store,
271 rbac: Arc::new(RbacManager::with_data_dir(data_dir.clone())),
272 rate_limiter,
273 login_rate_limiter,
274 gdpr: Arc::new(GdprService::new()),
275 consent_manager: Arc::new(ConsentManager::with_data_dir(data_dir.clone())),
276 breach_detector,
277 update_orchestrator,
278 vault: Arc::new(AegisVault::new_auto(
279 data_dir.as_ref().map(|d| d.join("vault")),
280 )),
281 shield: Arc::new(ShieldEngine::new(aegis_shield::ShieldConfig::default())),
282 data_dir,
283 }
284 }
285
286 pub async fn save_settings(&self) {
288 let Some(ref dir) = self.data_dir else {
289 return;
290 };
291 let path = dir.join("settings.json");
292 let settings = self.settings.read().await;
293 match serde_json::to_string_pretty(&*settings) {
294 Ok(json) => {
295 if let Err(e) = std::fs::write(&path, json) {
296 tracing::error!("Failed to write settings to {}: {}", path.display(), e);
297 }
298 }
299 Err(e) => {
300 tracing::error!("Failed to serialize settings: {}", e);
301 }
302 }
303 }
304
305 pub fn flush_collection(&self, collection_name: &str) {
307 let Some(ref dir) = self.data_dir else { return };
308 let docs_dir = dir.join("documents");
309 if let Err(e) = std::fs::create_dir_all(&docs_dir) {
310 tracing::error!("Failed to create documents dir: {}", e);
311 return;
312 }
313 let query = aegis_document::Query::new();
314 if let Ok(result) = self.document_engine.find(collection_name, &query) {
315 let docs: Vec<serde_json::Value> =
316 result.documents.iter().map(document_to_json).collect();
317 let path = docs_dir.join(format!("{}.json", collection_name));
318 match serde_json::to_string_pretty(&docs) {
319 Ok(json) => {
320 if let Err(e) = std::fs::write(&path, json) {
321 tracing::error!("Failed to write collection '{}': {}", collection_name, e);
322 }
323 }
324 Err(e) => tracing::error!(
325 "Failed to serialize collection '{}': {}",
326 collection_name,
327 e
328 ),
329 }
330 }
331 }
332
333 pub fn save_to_disk(&self) -> std::io::Result<()> {
335 let Some(ref dir) = self.data_dir else {
336 return Ok(());
337 };
338
339 let kv_path = dir.join("kv_store.json");
342 let entries = self.kv_store.list(None, usize::MAX);
343 let json = serde_json::to_string(&entries)?;
344 std::fs::write(&kv_path, json)?;
345
346 let docs_dir = dir.join("documents");
347 std::fs::create_dir_all(&docs_dir)?;
348 for collection_name in self.document_engine.list_collections() {
349 let query = aegis_document::Query::new();
350 if let Ok(result) = self.document_engine.find(&collection_name, &query) {
351 let docs: Vec<serde_json::Value> =
352 result.documents.iter().map(document_to_json).collect();
353 let json = serde_json::to_string(&docs)?;
354 let path = docs_dir.join(format!("{}.json", collection_name));
355 std::fs::write(&path, json)?;
356 }
357 }
358
359 self.query_engine.flush();
361
362 self.timeseries_engine.flush();
364 tracing::debug!("Flushed timeseries data to disk");
365
366 if let Err(e) = self.activity.flush() {
368 tracing::error!("Failed to flush audit logs: {}", e);
369 }
370 tracing::debug!("Flushed audit logs to disk");
371
372 Ok(())
373 }
374
375 pub async fn execute_query(
377 &self,
378 sql: &str,
379 database: Option<&str>,
380 ) -> Result<QueryResult, QueryError> {
381 let result = self.query_engine.execute(sql, database)?;
382 let db_name = database.unwrap_or("default");
383 if QueryEngine::is_mutation(sql) {
385 self.query_engine.replicate_to_peers(sql, db_name);
386 self.emit_cdc_event(sql, db_name, result.rows_affected);
387 }
388 Ok(result)
389 }
390
391 fn emit_cdc_event(&self, sql: &str, database: &str, rows_affected: u64) {
393 use aegis_streaming::cdc::{ChangeEvent, ChangeSource, ChangeType};
394
395 let sql_trimmed = sql.trim();
397 let (change_type, table_name) = if sql_trimmed.len() > 6 {
398 let upper = sql_trimmed[..12.min(sql_trimmed.len())].to_uppercase();
399 if upper.starts_with("INSERT") {
400 (ChangeType::Insert, extract_table_after(sql_trimmed, "INTO"))
401 } else if upper.starts_with("UPDATE") {
402 (
403 ChangeType::Update,
404 extract_table_after(sql_trimmed, "UPDATE"),
405 )
406 } else if upper.starts_with("DELETE") {
407 (ChangeType::Delete, extract_table_after(sql_trimmed, "FROM"))
408 } else if upper.starts_with("TRUNCATE") {
409 (
410 ChangeType::Truncate,
411 extract_table_after(sql_trimmed, "TRUNCATE"),
412 )
413 } else {
414 return; }
416 } else {
417 return;
418 };
419
420 let source = ChangeSource::new(database, &table_name);
421 let change = ChangeEvent {
422 change_type,
423 source,
424 timestamp: std::time::SystemTime::now()
425 .duration_since(std::time::UNIX_EPOCH)
426 .unwrap_or_default()
427 .as_millis() as u64,
428 key: None,
429 before: None,
430 after: Some(serde_json::json!({
431 "sql": sql,
432 "rows_affected": rows_affected,
433 })),
434 metadata: std::collections::HashMap::new(),
435 };
436
437 let channel_name = format!("cdc.{}.{}", database, table_name);
439 let channel_id = aegis_streaming::channel::ChannelId::new(&channel_name);
440
441 let _ = self.streaming_engine.create_channel(channel_name.clone());
443
444 if let Err(e) = self.streaming_engine.publish_change(&channel_id, change) {
445 tracing::debug!("CDC publish to {}: {}", channel_name, e);
446 }
447 }
448
449 pub async fn execute_query_replicated(
451 &self,
452 sql: &str,
453 database: Option<&str>,
454 ) -> Result<QueryResult, QueryError> {
455 self.query_engine.execute(sql, database)
456 }
457
458 pub async fn execute_query_with_params(
460 &self,
461 sql: &str,
462 database: Option<&str>,
463 params: &[serde_json::Value],
464 ) -> Result<QueryResult, QueryError> {
465 self.query_engine.execute_with_params(sql, database, params)
466 }
467
468 pub async fn record_request(&self, duration_ms: u64, success: bool) {
470 let mut metrics = self.metrics.write().await;
471 metrics.total_requests += 1;
472 metrics.total_duration_ms += duration_ms;
473 if !success {
474 metrics.failed_requests += 1;
475 }
476 }
477
478 async fn collect_metrics_loop(metrics_history: Arc<RwLock<VecDeque<MetricsDataPoint>>>) {
480 use sysinfo::{Networks, System};
481
482 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
483 let mut sys = System::new_all();
484 let mut networks = Networks::new_with_refreshed_list();
485 let mut last_bytes_in: u64 = 0;
486 let mut last_bytes_out: u64 = 0;
487
488 for data in networks.list().values() {
490 last_bytes_in += data.total_received();
491 last_bytes_out += data.total_transmitted();
492 }
493
494 loop {
495 interval.tick().await;
496
497 sys.refresh_all();
499 networks.refresh();
500
501 let now = Utc::now().timestamp();
502
503 let cpu_percent = sys
505 .cpus()
506 .iter()
507 .map(|cpu| cpu.cpu_usage() as f64)
508 .sum::<f64>()
509 / sys.cpus().len().max(1) as f64;
510
511 let memory_total = sys.total_memory();
513 let memory_used = sys.used_memory();
514 let memory_percent = if memory_total > 0 {
515 (memory_used as f64 / memory_total as f64) * 100.0
516 } else {
517 0.0
518 };
519
520 let mut current_bytes_in: u64 = 0;
522 let mut current_bytes_out: u64 = 0;
523 for data in networks.list().values() {
524 current_bytes_in += data.total_received();
525 current_bytes_out += data.total_transmitted();
526 }
527
528 let bytes_in = current_bytes_in.saturating_sub(last_bytes_in);
529 let bytes_out = current_bytes_out.saturating_sub(last_bytes_out);
530 last_bytes_in = current_bytes_in;
531 last_bytes_out = current_bytes_out;
532
533 let connections = sys.processes().len() as u64;
535
536 let point = MetricsDataPoint {
537 timestamp: now,
538 cpu_percent,
539 memory_percent,
540 queries_per_second: 0.0, latency_ms: 0.0, connections,
543 bytes_in,
544 bytes_out,
545 };
546
547 let mut history = metrics_history.write().await;
548
549 if history.len() >= 43200 {
551 history.pop_front();
552 }
553 history.push_back(point);
554 }
555 }
556
557 pub async fn init_metrics_history(&self) {
559 let history = self.metrics_history.write().await;
562 tracing::info!(
563 "Metrics history initialized (currently {} data points)",
564 history.len()
565 );
566 }
567
568 pub fn get_database_stats(&self) -> DatabaseStats {
570 let total_keys = self.kv_store.count();
572
573 let collections = self.document_engine.list_collections();
575 let total_documents: usize = collections
576 .iter()
577 .filter_map(|name| self.document_engine.collection_stats(name))
578 .map(|stats| stats.document_count)
579 .sum();
580
581 let engine_stats = self.document_engine.stats();
583
584 DatabaseStats {
585 total_keys,
586 total_documents,
587 collection_count: collections.len(),
588 documents_inserted: engine_stats.documents_inserted,
589 documents_updated: engine_stats.documents_updated,
590 documents_deleted: engine_stats.documents_deleted,
591 queries_executed: engine_stats.queries_executed,
592 }
593 }
594}
595
596#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
598pub struct DatabaseStats {
599 pub total_keys: usize,
600 pub total_documents: usize,
601 pub collection_count: usize,
602 pub documents_inserted: u64,
603 pub documents_updated: u64,
604 pub documents_deleted: u64,
605 pub queries_executed: u64,
606}
607
608pub struct KvStore {
614 data: SyncRwLock<HashMap<String, KvEntry>>,
615 data_dir: Option<PathBuf>,
616}
617
618#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
620pub struct KvEntry {
621 pub key: String,
622 pub value: serde_json::Value,
623 pub ttl: Option<u64>,
624 pub created_at: chrono::DateTime<chrono::Utc>,
625 pub updated_at: chrono::DateTime<chrono::Utc>,
626}
627
628impl KvStore {
629 pub fn new() -> Self {
630 Self::with_data_dir(None)
631 }
632
633 pub fn with_data_dir(data_dir: Option<PathBuf>) -> Self {
634 let mut entries = HashMap::new();
635
636 if let Some(ref dir) = data_dir {
637 let kv_path = dir.join("kv_store.json");
638 if kv_path.exists() {
639 if let Ok(data) = std::fs::read_to_string(&kv_path) {
640 if let Ok(loaded) = serde_json::from_str::<Vec<KvEntry>>(&data) {
641 for entry in loaded {
642 entries.insert(entry.key.clone(), entry);
643 }
644 tracing::info!("Loaded {} KV entries from disk", entries.len());
645 }
646 }
647 }
648 }
649
650 Self {
651 data: SyncRwLock::new(entries),
652 data_dir,
653 }
654 }
655
656 fn flush_to_disk(&self) {
658 if let Some(ref dir) = self.data_dir {
659 let kv_path = dir.join("kv_store.json");
660 let data = self.data.read();
661 let entries: Vec<&KvEntry> = data.values().collect();
662 match serde_json::to_string(&entries) {
663 Ok(json) => {
664 if let Err(e) = std::fs::write(&kv_path, json) {
665 tracing::error!("Failed to flush KV store to {:?}: {}", kv_path, e);
666 }
667 }
668 Err(e) => {
669 tracing::error!("Failed to serialize KV store: {}", e);
670 }
671 }
672 }
673 }
674
675 pub fn set(&self, key: String, value: serde_json::Value, ttl: Option<u64>) -> KvEntry {
677 let now = chrono::Utc::now();
678 let mut data = self.data.write();
679
680 let entry = if let Some(existing) = data.get(&key) {
681 KvEntry {
682 key: key.clone(),
683 value,
684 ttl,
685 created_at: existing.created_at,
686 updated_at: now,
687 }
688 } else {
689 KvEntry {
690 key: key.clone(),
691 value,
692 ttl,
693 created_at: now,
694 updated_at: now,
695 }
696 };
697
698 data.insert(key, entry.clone());
699 drop(data);
700 self.flush_to_disk();
701 entry
702 }
703
704 pub fn get(&self, key: &str) -> Option<KvEntry> {
706 let data = self.data.read();
707 data.get(key).cloned()
708 }
709
710 pub fn delete(&self, key: &str) -> Option<KvEntry> {
712 let mut data = self.data.write();
713 let removed = data.remove(key);
714 drop(data);
715 if removed.is_some() {
716 self.flush_to_disk();
717 }
718 removed
719 }
720
721 pub fn list(&self, prefix: Option<&str>, limit: usize) -> Vec<KvEntry> {
723 let data = self.data.read();
724 let iter = data.values();
725
726 if let Some(p) = prefix {
727 iter.filter(|e| e.key.starts_with(p))
728 .take(limit)
729 .cloned()
730 .collect()
731 } else {
732 iter.take(limit).cloned().collect()
733 }
734 }
735
736 pub fn count(&self) -> usize {
738 self.data.read().len()
739 }
740}
741
742impl Default for KvStore {
743 fn default() -> Self {
744 Self::new()
745 }
746}
747
748#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
754pub struct GraphNode {
755 pub id: String,
756 pub label: String,
757 pub properties: serde_json::Value,
758}
759
760#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
762pub struct GraphEdge {
763 pub id: String,
764 pub source: String,
765 pub target: String,
766 pub relationship: String,
767}
768
769pub struct GraphStore {
771 nodes: SyncRwLock<HashMap<String, GraphNode>>,
772 edges: SyncRwLock<HashMap<String, GraphEdge>>,
773 node_counter: AtomicU64,
774 edge_counter: AtomicU64,
775 data_dir: Option<PathBuf>,
776}
777
778#[derive(serde::Serialize, serde::Deserialize)]
779struct GraphSnapshot {
780 nodes: HashMap<String, GraphNode>,
781 edges: HashMap<String, GraphEdge>,
782 node_counter: u64,
783 edge_counter: u64,
784}
785
786impl GraphStore {
787 pub fn new() -> Self {
788 Self::with_data_dir(None)
789 }
790
791 pub fn with_data_dir(data_dir: Option<PathBuf>) -> Self {
792 if let Some(ref dir) = data_dir {
793 let graph_path = dir.join("graph_store.json");
794 if graph_path.exists() {
795 if let Ok(data) = std::fs::read_to_string(&graph_path) {
796 if let Ok(snapshot) = serde_json::from_str::<GraphSnapshot>(&data) {
797 tracing::info!(
798 "Loaded {} graph nodes and {} edges from disk",
799 snapshot.nodes.len(),
800 snapshot.edges.len()
801 );
802 return Self {
803 nodes: SyncRwLock::new(snapshot.nodes),
804 edges: SyncRwLock::new(snapshot.edges),
805 node_counter: AtomicU64::new(snapshot.node_counter),
806 edge_counter: AtomicU64::new(snapshot.edge_counter),
807 data_dir,
808 };
809 }
810 }
811 }
812 }
813
814 Self {
815 nodes: SyncRwLock::new(HashMap::new()),
816 edges: SyncRwLock::new(HashMap::new()),
817 node_counter: AtomicU64::new(1),
818 edge_counter: AtomicU64::new(1),
819 data_dir,
820 }
821 }
822
823 fn flush_to_disk(&self) {
825 if let Some(ref dir) = self.data_dir {
826 let graph_path = dir.join("graph_store.json");
827 let snapshot = GraphSnapshot {
828 nodes: self.nodes.read().clone(),
829 edges: self.edges.read().clone(),
830 node_counter: self.node_counter.load(Ordering::SeqCst),
831 edge_counter: self.edge_counter.load(Ordering::SeqCst),
832 };
833 match serde_json::to_string(&snapshot) {
834 Ok(json) => {
835 if let Err(e) = std::fs::write(&graph_path, json) {
836 tracing::error!("Failed to flush graph store to {:?}: {}", graph_path, e);
837 }
838 }
839 Err(e) => {
840 tracing::error!("Failed to serialize graph store: {}", e);
841 }
842 }
843 }
844 }
845
846 pub fn create_node(&self, label: &str, properties: serde_json::Value) -> GraphNode {
848 let id = format!(
849 "{}:{}",
850 label.to_lowercase(),
851 self.node_counter.fetch_add(1, Ordering::SeqCst)
852 );
853 let node = GraphNode {
854 id: id.clone(),
855 label: label.to_string(),
856 properties,
857 };
858 self.nodes.write().insert(id, node.clone());
859 self.flush_to_disk();
860 node
861 }
862
863 pub fn create_edge(
865 &self,
866 source: &str,
867 target: &str,
868 relationship: &str,
869 ) -> Result<GraphEdge, String> {
870 let nodes = self.nodes.read();
871 if !nodes.contains_key(source) {
872 return Err(format!("Source node '{}' not found", source));
873 }
874 if !nodes.contains_key(target) {
875 return Err(format!("Target node '{}' not found", target));
876 }
877 drop(nodes);
878
879 let id = format!("e{}", self.edge_counter.fetch_add(1, Ordering::SeqCst));
880 let edge = GraphEdge {
881 id: id.clone(),
882 source: source.to_string(),
883 target: target.to_string(),
884 relationship: relationship.to_string(),
885 };
886 self.edges.write().insert(id, edge.clone());
887 self.flush_to_disk();
888 Ok(edge)
889 }
890
891 pub fn get_node(&self, id: &str) -> Option<GraphNode> {
893 self.nodes.read().get(id).cloned()
894 }
895
896 pub fn delete_node(&self, id: &str) -> Result<(), String> {
899 let mut nodes = self.nodes.write();
901 let mut edges = self.edges.write();
902
903 if nodes.remove(id).is_none() {
904 return Err(format!("Node '{}' not found", id));
905 }
906 edges.retain(|_, e| e.source != id && e.target != id);
907
908 drop(edges);
909 drop(nodes);
910 self.flush_to_disk();
911 Ok(())
912 }
913
914 pub fn delete_edge(&self, id: &str) -> Result<(), String> {
916 if self.edges.write().remove(id).is_none() {
917 return Err(format!("Edge '{}' not found", id));
918 }
919 self.flush_to_disk();
920 Ok(())
921 }
922
923 pub fn list_nodes(&self) -> Vec<GraphNode> {
925 self.nodes.read().values().cloned().collect()
926 }
927
928 pub fn list_edges(&self) -> Vec<GraphEdge> {
930 self.edges.read().values().cloned().collect()
931 }
932
933 pub fn get_all(&self) -> (Vec<GraphNode>, Vec<GraphEdge>) {
935 (self.list_nodes(), self.list_edges())
936 }
937
938 pub fn find_by_label(&self, label: &str) -> Vec<GraphNode> {
940 self.nodes
941 .read()
942 .values()
943 .filter(|n| n.label.to_lowercase() == label.to_lowercase())
944 .cloned()
945 .collect()
946 }
947
948 pub fn get_edges_for_node(&self, node_id: &str) -> Vec<GraphEdge> {
950 self.edges
951 .read()
952 .values()
953 .filter(|e| e.source == node_id || e.target == node_id)
954 .cloned()
955 .collect()
956 }
957}
958
959impl Default for GraphStore {
960 fn default() -> Self {
961 Self::new()
962 }
963}
964
965pub struct QueryEngine {
973 parser: Parser,
974 planner: Planner,
975 contexts: Arc<std::sync::RwLock<HashMap<String, Arc<std::sync::RwLock<ExecutionContext>>>>>,
977 data_path: Option<PathBuf>,
979 peers: Arc<std::sync::RwLock<Vec<String>>>,
981 wal: Option<Arc<aegis_storage::wal::WriteAheadLog>>,
983 plan_cache: std::sync::RwLock<HashMap<String, aegis_query::QueryPlan>>,
985}
986
987impl QueryEngine {
988 pub fn new() -> Self {
989 let schema = Arc::new(PlannerSchema::new());
990 let mut contexts = HashMap::new();
991 contexts.insert(
993 "default".to_string(),
994 Arc::new(std::sync::RwLock::new(ExecutionContext::new())),
995 );
996 Self {
997 parser: Parser::new(),
998 planner: Planner::new(schema),
999 contexts: Arc::new(std::sync::RwLock::new(contexts)),
1000 data_path: None,
1001 peers: Arc::new(std::sync::RwLock::new(Vec::new())),
1002 wal: None,
1003 plan_cache: std::sync::RwLock::new(HashMap::new()),
1004 }
1005 }
1006
1007 pub fn with_persistence(data_dir: &std::path::Path) -> Self {
1009 let schema = Arc::new(PlannerSchema::new());
1010 let db_dir = data_dir.join("databases");
1011
1012 if let Err(e) = std::fs::create_dir_all(&db_dir) {
1014 tracing::warn!("Failed to create databases directory: {}", e);
1015 }
1016
1017 let mut contexts = HashMap::new();
1018
1019 if let Ok(entries) = std::fs::read_dir(&db_dir) {
1021 for entry in entries.flatten() {
1022 let path = entry.path();
1023 if path.extension().map(|e| e == "json").unwrap_or(false) {
1024 if let Some(db_name) = path.file_stem().and_then(|s| s.to_str()) {
1025 match ExecutionContext::load_from_file(&path) {
1026 Ok(ctx) => {
1027 tracing::info!("Loaded database '{}' from {:?}", db_name, path);
1028 contexts.insert(
1029 db_name.to_string(),
1030 Arc::new(std::sync::RwLock::new(ctx)),
1031 );
1032 }
1033 Err(e) => {
1034 tracing::warn!(
1035 "Failed to load database '{}' from {:?}: {}",
1036 db_name,
1037 path,
1038 e
1039 );
1040 }
1041 }
1042 }
1043 }
1044 }
1045 }
1046
1047 if !contexts.contains_key("default") {
1049 contexts.insert(
1050 "default".to_string(),
1051 Arc::new(std::sync::RwLock::new(ExecutionContext::new())),
1052 );
1053 }
1054
1055 let wal_dir = data_dir.join("wal");
1057 let wal = match aegis_storage::wal::WriteAheadLog::open_and_recover(wal_dir, true) {
1058 Ok((w, recovery)) => {
1059 if recovery.records_processed > 0 {
1060 tracing::info!(
1061 "WAL recovery: {} records processed, {} segments scanned, {} incomplete transactions",
1062 recovery.records_processed,
1063 recovery.segments_scanned,
1064 recovery.incomplete_transactions.len(),
1065 );
1066 } else {
1067 tracing::info!("WAL initialized (clean startup, no recovery needed)");
1068 }
1069 Some(Arc::new(w))
1070 }
1071 Err(e) => {
1072 tracing::warn!("Failed to initialize WAL: {}. Continuing without WAL.", e);
1073 None
1074 }
1075 };
1076
1077 Self {
1078 parser: Parser::new(),
1079 planner: Planner::new(schema),
1080 contexts: Arc::new(std::sync::RwLock::new(contexts)),
1081 data_path: Some(db_dir),
1082 peers: Arc::new(std::sync::RwLock::new(Vec::new())),
1083 wal,
1084 plan_cache: std::sync::RwLock::new(HashMap::new()),
1085 }
1086 }
1087
1088 pub fn set_peers(&self, peers: Vec<String>) {
1090 *self.peers.write().unwrap() = peers;
1091 }
1092
1093 fn replicate_to_peers(&self, sql: &str, database: &str) {
1095 let peers = self.peers.read().unwrap().clone();
1096 if peers.is_empty() {
1097 return;
1098 }
1099
1100 let sql = sql.to_string();
1101 let db = database.to_string();
1102
1103 tokio::spawn(async move {
1104 let client = reqwest::Client::builder()
1105 .timeout(std::time::Duration::from_secs(5))
1106 .build()
1107 .unwrap_or_else(|_| reqwest::Client::new());
1108
1109 for peer in &peers {
1110 let url = format!("http://{}/api/v1/query", peer);
1111 let body = serde_json::json!({
1112 "sql": sql,
1113 "database": db,
1114 });
1115
1116 let mut success = false;
1118 for attempt in 0..3u32 {
1119 if attempt > 0 {
1120 tokio::time::sleep(std::time::Duration::from_millis(
1121 100 * 2u64.pow(attempt),
1122 ))
1123 .await;
1124 }
1125 let resp = client
1126 .post(&url)
1127 .header("X-Aegis-Replicated", "true")
1128 .json(&body)
1129 .send()
1130 .await;
1131
1132 match resp {
1133 Ok(r) if r.status().is_success() => {
1134 tracing::debug!(
1135 "Replicated to {}: {}",
1136 peer,
1137 &sql[..sql.len().min(80)]
1138 );
1139 success = true;
1140 break;
1141 }
1142 Ok(r) => {
1143 tracing::warn!(
1144 "Replication to {} returned {} (attempt {})",
1145 peer,
1146 r.status(),
1147 attempt + 1
1148 );
1149 }
1150 Err(e) => {
1151 tracing::warn!(
1152 "Replication to {} failed: {} (attempt {})",
1153 peer,
1154 e,
1155 attempt + 1
1156 );
1157 }
1158 }
1159 }
1160 if !success {
1161 tracing::error!("Replication to {} failed after 3 attempts", peer);
1162 }
1163 }
1164 });
1165 }
1166
1167 fn plan_cached(&self, stmt: &Statement) -> Result<aegis_query::QueryPlan, QueryError> {
1169 let cache_key = if !Self::is_mutation_stmt(stmt) {
1171 Some(format!("{:?}", stmt))
1172 } else {
1173 None
1174 };
1175
1176 if let Some(ref key) = cache_key {
1178 if let Ok(cache) = self.plan_cache.read() {
1179 if let Some(plan) = cache.get(key) {
1180 return Ok(plan.clone());
1181 }
1182 }
1183 }
1184
1185 let plan = self
1187 .planner
1188 .plan(stmt)
1189 .map_err(|e| QueryError::Plan(e.to_string()))?;
1190
1191 if let Some(key) = cache_key {
1192 if let Ok(mut cache) = self.plan_cache.write() {
1193 if cache.len() >= 1024 {
1195 cache.clear();
1196 }
1197 cache.insert(key, plan.clone());
1198 }
1199 }
1200
1201 Ok(plan)
1202 }
1203
1204 fn get_or_create_context(&self, database: &str) -> Arc<std::sync::RwLock<ExecutionContext>> {
1206 let db_name = if database.is_empty() {
1207 "default"
1208 } else {
1209 database
1210 };
1211
1212 {
1214 let contexts = self.contexts.read().unwrap();
1215 if let Some(ctx) = contexts.get(db_name) {
1216 return ctx.clone();
1217 }
1218 }
1219
1220 let mut contexts = self.contexts.write().unwrap();
1222 if let Some(ctx) = contexts.get(db_name) {
1224 return ctx.clone();
1225 }
1226
1227 tracing::info!("Creating new database: {}", db_name);
1228 let ctx = Arc::new(std::sync::RwLock::new(ExecutionContext::new()));
1229 contexts.insert(db_name.to_string(), ctx.clone());
1230 ctx
1231 }
1232
1233 fn persist(&self, database: &str) {
1235 if let Some(ref base_path) = self.data_path {
1236 let db_name = if database.is_empty() {
1237 "default"
1238 } else {
1239 database
1240 };
1241 let path = base_path.join(format!("{}.json", db_name));
1242
1243 if let Some(ref wal) = self.wal {
1245 use aegis_common::TransactionId;
1246 use aegis_storage::wal::{LogRecord, LogRecordType};
1247
1248 let lsn = wal.next_lsn();
1249 let record = LogRecord {
1250 lsn,
1251 prev_lsn: None,
1252 tx_id: TransactionId(0),
1253 record_type: LogRecordType::Checkpoint,
1254 page_id: None,
1255 data: ::bytes::Bytes::from(format!("persist:{}", db_name)),
1256 };
1257 if let Err(e) = wal.append(record) {
1258 tracing::warn!("WAL append failed: {}", e);
1259 }
1260 }
1261
1262 let contexts = self.contexts.read().unwrap();
1263 if let Some(ctx) = contexts.get(db_name) {
1264 if let Ok(ctx_guard) = ctx.read() {
1265 if let Err(e) = ctx_guard.save_to_file(&path) {
1266 tracing::error!(
1267 "Failed to persist database '{}' to {:?}: {}",
1268 db_name,
1269 path,
1270 e
1271 );
1272 }
1273 }
1274 }
1275 }
1276 }
1277
1278 pub fn is_mutation(sql: &str) -> bool {
1280 let sql_upper = sql.trim().to_uppercase();
1281 sql_upper.starts_with("CREATE")
1282 || sql_upper.starts_with("DROP")
1283 || sql_upper.starts_with("ALTER")
1284 || sql_upper.starts_with("INSERT")
1285 || sql_upper.starts_with("UPDATE")
1286 || sql_upper.starts_with("DELETE")
1287 || sql_upper.starts_with("TRUNCATE")
1288 }
1289
1290 fn is_mutation_stmt(stmt: &Statement) -> bool {
1292 matches!(
1293 stmt,
1294 Statement::Insert(_)
1295 | Statement::Update(_)
1296 | Statement::Delete(_)
1297 | Statement::CreateTable(_)
1298 | Statement::DropTable(_)
1299 | Statement::AlterTable(_)
1300 | Statement::CreateIndex(_)
1301 | Statement::DropIndex(_)
1302 )
1303 }
1304
1305 fn is_ddl(stmt: &Statement) -> bool {
1307 matches!(
1308 stmt,
1309 Statement::CreateTable(_)
1310 | Statement::DropTable(_)
1311 | Statement::AlterTable(_)
1312 | Statement::CreateIndex(_)
1313 | Statement::DropIndex(_)
1314 )
1315 }
1316
1317 fn invalidate_plan_cache(&self) {
1319 if let Ok(mut cache) = self.plan_cache.write() {
1320 cache.clear();
1321 }
1322 }
1323
1324 pub fn execute(&self, sql: &str, database: Option<&str>) -> Result<QueryResult, QueryError> {
1330 let db_name = database.unwrap_or("default");
1331
1332 let statements = self
1333 .parser
1334 .parse(sql)
1335 .map_err(|e| QueryError::Parse(e.to_string()))?;
1336
1337 if statements.is_empty() {
1338 return Ok(QueryResult {
1339 columns: vec![],
1340 rows: vec![],
1341 rows_affected: 0,
1342 });
1343 }
1344
1345 let context = self.get_or_create_context(db_name);
1346 let executor = Executor::with_shared_context(context.clone());
1347
1348 let mut last_result = QueryResult {
1349 columns: vec![],
1350 rows: vec![],
1351 rows_affected: 0,
1352 };
1353 let mut in_transaction = false;
1354 let mut txn_snapshot: Option<ExecutionContextSnapshot> = None;
1355 let mut had_mutation = false;
1356
1357 for statement in &statements {
1358 let plan = self.plan_cached(statement)?;
1359
1360 match &plan.root {
1361 PlanNode::BeginTransaction => {
1362 if in_transaction {
1363 return Err(QueryError::Execute("Already in a transaction".to_string()));
1364 }
1365 let mut ctx = context
1367 .write()
1368 .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1369 txn_snapshot = Some(ctx.to_snapshot());
1370 ctx.begin_snapshot();
1371 drop(ctx);
1372 in_transaction = true;
1373 last_result = QueryResult {
1374 columns: vec!["status".to_string()],
1375 rows: vec![vec![serde_json::Value::String("BEGIN".to_string())]],
1376 rows_affected: 0,
1377 };
1378 }
1379 PlanNode::CommitTransaction => {
1380 if !in_transaction {
1381 return Err(QueryError::Execute(
1382 "No transaction in progress".to_string(),
1383 ));
1384 }
1385 {
1387 let mut ctx = context
1388 .write()
1389 .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1390 ctx.commit_snapshot();
1391 }
1392 txn_snapshot = None;
1393 in_transaction = false;
1394 if had_mutation {
1395 self.persist(db_name);
1396 had_mutation = false;
1397 }
1398 last_result = QueryResult {
1399 columns: vec!["status".to_string()],
1400 rows: vec![vec![serde_json::Value::String("COMMIT".to_string())]],
1401 rows_affected: 0,
1402 };
1403 }
1404 PlanNode::RollbackTransaction => {
1405 if !in_transaction {
1406 return Err(QueryError::Execute(
1407 "No transaction in progress".to_string(),
1408 ));
1409 }
1410 if let Some(snapshot) = txn_snapshot.take() {
1412 let mut ctx = context
1413 .write()
1414 .map_err(|_| QueryError::Execute("Lock poisoned".to_string()))?;
1415 ctx.restore_from_snapshot(snapshot);
1416 ctx.rollback_snapshot();
1417 }
1418 in_transaction = false;
1419 had_mutation = false;
1420 last_result = QueryResult {
1421 columns: vec!["status".to_string()],
1422 rows: vec![vec![serde_json::Value::String("ROLLBACK".to_string())]],
1423 rows_affected: 0,
1424 };
1425 }
1426 _ => {
1427 let result = match executor.execute(&plan) {
1429 Ok(r) => r,
1430 Err(e) => {
1431 if let Some(snapshot) = txn_snapshot.take() {
1433 if let Ok(mut ctx) = context.write() {
1434 ctx.restore_from_snapshot(snapshot);
1435 ctx.rollback_snapshot();
1436 }
1437 tracing::warn!("Transaction rolled back due to error: {}", e);
1438 }
1439 return Err(QueryError::Execute(e.to_string()));
1440 }
1441 };
1442
1443 let is_mut = Self::is_mutation_stmt(statement);
1444 if is_mut {
1445 had_mutation = true;
1446 if Self::is_ddl(statement) {
1447 self.invalidate_plan_cache();
1448 }
1449 }
1450
1451 last_result = QueryResult {
1452 columns: result.columns,
1453 rows: result
1454 .rows
1455 .into_iter()
1456 .map(|r| r.values.into_iter().map(value_to_json).collect())
1457 .collect(),
1458 rows_affected: result.rows_affected,
1459 };
1460
1461 if is_mut && !in_transaction {
1463 self.persist(db_name);
1464 }
1465 }
1466 }
1467 }
1468
1469 if in_transaction {
1471 if let Some(snapshot) = txn_snapshot.take() {
1472 if let Ok(mut ctx) = context.write() {
1473 ctx.restore_from_snapshot(snapshot);
1474 ctx.rollback_snapshot();
1475 }
1476 }
1477 return Err(QueryError::Execute(
1478 "Transaction was not committed (missing COMMIT). Changes rolled back.".to_string(),
1479 ));
1480 }
1481
1482 Ok(last_result)
1483 }
1484
1485 pub fn execute_with_params(
1489 &self,
1490 sql: &str,
1491 database: Option<&str>,
1492 params: &[serde_json::Value],
1493 ) -> Result<QueryResult, QueryError> {
1494 if params.is_empty() {
1495 return self.execute(sql, database);
1496 }
1497
1498 let db_name = database.unwrap_or("default");
1499
1500 let statements = self
1501 .parser
1502 .parse(sql)
1503 .map_err(|e| QueryError::Parse(e.to_string()))?;
1504
1505 if statements.is_empty() {
1506 return Ok(QueryResult {
1507 columns: vec![],
1508 rows: vec![],
1509 rows_affected: 0,
1510 });
1511 }
1512
1513 let values: Vec<aegis_common::Value> = params.iter().map(json_param_to_value).collect();
1515
1516 let statement = &statements[0];
1517 let plan = self
1518 .planner
1519 .plan(statement)
1520 .map_err(|e| QueryError::Plan(e.to_string()))?;
1521
1522 let context = self.get_or_create_context(db_name);
1523 let executor = Executor::with_shared_context(context);
1524 let result = executor
1525 .execute_with_params(&plan, &values)
1526 .map_err(|e| QueryError::Execute(e.to_string()))?;
1527
1528 if Self::is_mutation(sql) {
1529 self.persist(db_name);
1530 }
1531
1532 Ok(QueryResult {
1533 columns: result.columns,
1534 rows: result
1535 .rows
1536 .into_iter()
1537 .map(|r| r.values.into_iter().map(value_to_json).collect())
1538 .collect(),
1539 rows_affected: result.rows_affected,
1540 })
1541 }
1542
1543 pub fn list_tables(&self, database: Option<&str>) -> Vec<String> {
1544 let db_name = database.unwrap_or("default");
1545 let contexts = self.contexts.read().unwrap();
1546 contexts
1547 .get(db_name)
1548 .and_then(|ctx| ctx.read().ok())
1549 .map(|ctx| ctx.list_tables())
1550 .unwrap_or_default()
1551 }
1552
1553 pub fn list_databases(&self) -> Vec<String> {
1555 self.contexts
1556 .read()
1557 .map(|contexts| contexts.keys().cloned().collect())
1558 .unwrap_or_default()
1559 }
1560
1561 pub fn get_table_info(&self, name: &str, database: Option<&str>) -> Option<TableInfo> {
1563 let db_name = database.unwrap_or("default");
1564 let contexts = self.contexts.read().ok()?;
1565 let ctx_lock = contexts.get(db_name)?;
1566 let ctx = ctx_lock.read().ok()?;
1567 let schema = ctx.get_table_schema(name)?;
1568 let table_data = ctx.get_table(name)?;
1569 let row_count = table_data.read().ok().map(|t| t.rows.len() as u64);
1570
1571 Some(TableInfo {
1572 name: schema.name.clone(),
1573 columns: schema
1574 .columns
1575 .iter()
1576 .map(|c| ColumnInfo {
1577 name: c.name.clone(),
1578 data_type: format!("{:?}", c.data_type),
1579 nullable: c.nullable,
1580 })
1581 .collect(),
1582 row_count,
1583 })
1584 }
1585
1586 pub fn flush(&self) {
1588 if let Some(ref base_path) = self.data_path {
1589 let contexts = self.contexts.read().unwrap();
1590 for (db_name, ctx) in contexts.iter() {
1591 let path = base_path.join(format!("{}.json", db_name));
1592 if let Ok(ctx_guard) = ctx.read() {
1593 if let Err(e) = ctx_guard.save_to_file(&path) {
1594 tracing::error!(
1595 "Failed to persist database '{}' to {:?}: {}",
1596 db_name,
1597 path,
1598 e
1599 );
1600 }
1601 }
1602 }
1603 }
1604 }
1605}
1606
1607#[derive(Debug, Clone, serde::Serialize)]
1609pub struct TableInfo {
1610 pub name: String,
1611 pub columns: Vec<ColumnInfo>,
1612 pub row_count: Option<u64>,
1613}
1614
1615#[derive(Debug, Clone, serde::Serialize)]
1617pub struct ColumnInfo {
1618 pub name: String,
1619 pub data_type: String,
1620 pub nullable: bool,
1621}
1622
1623impl Default for QueryEngine {
1624 fn default() -> Self {
1625 Self::new()
1626 }
1627}
1628
1629#[derive(Debug, Clone, serde::Serialize)]
1635pub struct QueryResult {
1636 pub columns: Vec<String>,
1637 pub rows: Vec<Vec<serde_json::Value>>,
1638 pub rows_affected: u64,
1639}
1640
1641fn json_param_to_value(json: &serde_json::Value) -> aegis_common::Value {
1644 match json {
1645 serde_json::Value::Null => aegis_common::Value::Null,
1646 serde_json::Value::Bool(b) => aegis_common::Value::Boolean(*b),
1647 serde_json::Value::Number(n) => {
1648 if let Some(i) = n.as_i64() {
1649 aegis_common::Value::Integer(i)
1650 } else if let Some(f) = n.as_f64() {
1651 aegis_common::Value::Float(f)
1652 } else {
1653 aegis_common::Value::Null
1654 }
1655 }
1656 serde_json::Value::String(s) => aegis_common::Value::String(s.clone()),
1657 _ => aegis_common::Value::String(json.to_string()),
1658 }
1659}
1660
1661fn extract_table_after(sql: &str, keyword: &str) -> String {
1663 let upper = sql.to_uppercase();
1664 if let Some(pos) = upper.find(keyword) {
1665 let after = &sql[pos + keyword.len()..].trim_start();
1666 after
1667 .split(|c: char| c.is_whitespace() || c == '(')
1668 .next()
1669 .unwrap_or("unknown")
1670 .to_string()
1671 } else {
1672 "unknown".to_string()
1673 }
1674}
1675
1676fn value_to_json(value: aegis_common::Value) -> serde_json::Value {
1677 match value {
1678 aegis_common::Value::Null => serde_json::Value::Null,
1679 aegis_common::Value::Boolean(b) => serde_json::Value::Bool(b),
1680 aegis_common::Value::Integer(i) => serde_json::Value::Number(i.into()),
1681 aegis_common::Value::Float(f) => serde_json::Number::from_f64(f)
1682 .map(serde_json::Value::Number)
1683 .unwrap_or(serde_json::Value::Null),
1684 aegis_common::Value::String(s) => serde_json::Value::String(s),
1685 aegis_common::Value::Bytes(b) => serde_json::Value::String(base64_encode(&b)),
1686 aegis_common::Value::Timestamp(t) => serde_json::Value::String(t.to_rfc3339()),
1687 aegis_common::Value::Array(arr) => {
1688 serde_json::Value::Array(arr.into_iter().map(value_to_json).collect())
1689 }
1690 aegis_common::Value::Object(obj) => {
1691 let map: serde_json::Map<String, serde_json::Value> = obj
1692 .into_iter()
1693 .map(|(k, v)| (k, value_to_json(v)))
1694 .collect();
1695 serde_json::Value::Object(map)
1696 }
1697 }
1698}
1699
1700fn base64_encode(data: &[u8]) -> String {
1701 const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1702 let mut result = String::new();
1703
1704 for chunk in data.chunks(3) {
1705 let b0 = chunk[0] as usize;
1706 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
1707 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
1708
1709 result.push(CHARS[b0 >> 2] as char);
1710 result.push(CHARS[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
1711
1712 if chunk.len() > 1 {
1713 result.push(CHARS[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
1714 } else {
1715 result.push('=');
1716 }
1717
1718 if chunk.len() > 2 {
1719 result.push(CHARS[b2 & 0x3f] as char);
1720 } else {
1721 result.push('=');
1722 }
1723 }
1724
1725 result
1726}
1727
1728#[derive(Debug, thiserror::Error)]
1734pub enum QueryError {
1735 #[error("Parse error: {0}")]
1736 Parse(String),
1737
1738 #[error("Planning error: {0}")]
1739 Plan(String),
1740
1741 #[error("Execution error: {0}")]
1742 Execute(String),
1743}
1744
1745#[derive(Debug, Default, Clone, serde::Serialize)]
1751pub struct Metrics {
1752 pub total_requests: u64,
1753 pub failed_requests: u64,
1754 pub total_duration_ms: u64,
1755}
1756
1757impl Metrics {
1758 pub fn avg_duration_ms(&self) -> f64 {
1760 if self.total_requests == 0 {
1761 0.0
1762 } else {
1763 self.total_duration_ms as f64 / self.total_requests as f64
1764 }
1765 }
1766
1767 pub fn success_rate(&self) -> f64 {
1769 if self.total_requests == 0 {
1770 1.0
1771 } else {
1772 1.0 - (self.failed_requests as f64 / self.total_requests as f64)
1773 }
1774 }
1775}
1776
1777fn json_to_document(json: serde_json::Value) -> Document {
1783 let mut doc = if let Some(id) = json.get("_id").and_then(|v| v.as_str()) {
1784 Document::with_id(id)
1785 } else {
1786 Document::new()
1787 };
1788
1789 if let serde_json::Value::Object(map) = json {
1790 for (key, value) in map {
1791 if key != "_id" {
1792 doc.set(&key, json_to_doc_value(value));
1793 }
1794 }
1795 }
1796 doc
1797}
1798
1799fn document_to_json(doc: &Document) -> serde_json::Value {
1801 let mut map = serde_json::Map::new();
1802 map.insert(
1803 "_id".to_string(),
1804 serde_json::Value::String(doc.id.to_string()),
1805 );
1806 for (key, value) in &doc.data {
1807 map.insert(key.clone(), doc_value_to_json(value));
1808 }
1809 serde_json::Value::Object(map)
1810}
1811
1812fn json_to_doc_value(json: serde_json::Value) -> aegis_document::Value {
1814 match json {
1815 serde_json::Value::Null => aegis_document::Value::Null,
1816 serde_json::Value::Bool(b) => aegis_document::Value::Bool(b),
1817 serde_json::Value::Number(n) => {
1818 if let Some(i) = n.as_i64() {
1819 aegis_document::Value::Int(i)
1820 } else if let Some(f) = n.as_f64() {
1821 aegis_document::Value::Float(f)
1822 } else {
1823 aegis_document::Value::Null
1824 }
1825 }
1826 serde_json::Value::String(s) => aegis_document::Value::String(s),
1827 serde_json::Value::Array(arr) => {
1828 aegis_document::Value::Array(arr.into_iter().map(json_to_doc_value).collect())
1829 }
1830 serde_json::Value::Object(map) => aegis_document::Value::Object(
1831 map.into_iter()
1832 .map(|(k, v)| (k, json_to_doc_value(v)))
1833 .collect(),
1834 ),
1835 }
1836}
1837
1838fn doc_value_to_json(value: &aegis_document::Value) -> serde_json::Value {
1840 match value {
1841 aegis_document::Value::Null => serde_json::Value::Null,
1842 aegis_document::Value::Bool(b) => serde_json::Value::Bool(*b),
1843 aegis_document::Value::Int(i) => serde_json::Value::Number((*i).into()),
1844 aegis_document::Value::Float(f) => serde_json::Number::from_f64(*f)
1845 .map(serde_json::Value::Number)
1846 .unwrap_or(serde_json::Value::Null),
1847 aegis_document::Value::String(s) => serde_json::Value::String(s.clone()),
1848 aegis_document::Value::Array(arr) => {
1849 serde_json::Value::Array(arr.iter().map(doc_value_to_json).collect())
1850 }
1851 aegis_document::Value::Object(obj) => {
1852 let map: serde_json::Map<String, serde_json::Value> = obj
1853 .iter()
1854 .map(|(k, v)| (k.clone(), doc_value_to_json(v)))
1855 .collect();
1856 serde_json::Value::Object(map)
1857 }
1858 }
1859}
1860
1861#[cfg(test)]
1866mod tests {
1867 use super::*;
1868
1869 #[test]
1870 fn test_metrics_calculations() {
1871 let mut metrics = Metrics::default();
1872 metrics.total_requests = 100;
1873 metrics.failed_requests = 10;
1874 metrics.total_duration_ms = 5000;
1875
1876 assert_eq!(metrics.avg_duration_ms(), 50.0);
1877 assert!((metrics.success_rate() - 0.9).abs() < 0.001);
1878 }
1879
1880 #[test]
1881 fn test_value_to_json() {
1882 let value = aegis_common::Value::String("test".to_string());
1883 let json = value_to_json(value);
1884 assert_eq!(json, serde_json::Value::String("test".to_string()));
1885 }
1886
1887 #[test]
1888 fn test_kv_store_operations() {
1889 let store = KvStore::new();
1890
1891 let entry = store.set("key1".to_string(), serde_json::json!("value1"), None);
1893 assert_eq!(entry.key, "key1");
1894 assert_eq!(entry.value, serde_json::json!("value1"));
1895
1896 let retrieved = store.get("key1").expect("key1 should exist after set");
1898 assert_eq!(retrieved.value, serde_json::json!("value1"));
1899
1900 store.set("key2".to_string(), serde_json::json!("value2"), None);
1902 let all = store.list(None, 100);
1903 assert_eq!(all.len(), 2);
1904
1905 let deleted = store.delete("key1");
1907 assert!(deleted.is_some());
1908 assert!(store.get("key1").is_none());
1909 }
1910
1911 #[test]
1912 fn test_transaction_commit() {
1913 let engine = QueryEngine::new();
1914 engine
1915 .execute("CREATE TABLE txn_test (id INT, name VARCHAR(50))", None)
1916 .unwrap();
1917
1918 let result = engine.execute(
1920 "BEGIN; INSERT INTO txn_test VALUES (1, 'Alice'); INSERT INTO txn_test VALUES (2, 'Bob'); COMMIT",
1921 None,
1922 ).unwrap();
1923 assert_eq!(
1924 result.rows[0][0],
1925 serde_json::Value::String("COMMIT".to_string())
1926 );
1927
1928 let select = engine.execute("SELECT * FROM txn_test", None).unwrap();
1930 assert_eq!(select.rows.len(), 2);
1931 }
1932
1933 #[test]
1934 fn test_transaction_rollback() {
1935 let engine = QueryEngine::new();
1936 engine
1937 .execute("CREATE TABLE txn_rb (id INT, name VARCHAR(50))", None)
1938 .unwrap();
1939 engine
1940 .execute("INSERT INTO txn_rb VALUES (1, 'Original')", None)
1941 .unwrap();
1942
1943 let result = engine.execute(
1945 "BEGIN; INSERT INTO txn_rb VALUES (2, 'Should vanish'); INSERT INTO txn_rb VALUES (3, 'Also gone'); ROLLBACK",
1946 None,
1947 ).unwrap();
1948 assert_eq!(
1949 result.rows[0][0],
1950 serde_json::Value::String("ROLLBACK".to_string())
1951 );
1952
1953 let select = engine.execute("SELECT * FROM txn_rb", None).unwrap();
1955 assert_eq!(select.rows.len(), 1);
1956 }
1957
1958 #[test]
1959 fn test_transaction_auto_rollback_on_missing_commit() {
1960 let engine = QueryEngine::new();
1961 engine
1962 .execute("CREATE TABLE txn_nocommit (id INT)", None)
1963 .unwrap();
1964 engine
1965 .execute("INSERT INTO txn_nocommit VALUES (1)", None)
1966 .unwrap();
1967
1968 let result = engine.execute("BEGIN; INSERT INTO txn_nocommit VALUES (2)", None);
1970 assert!(result.is_err());
1971 assert!(result.unwrap_err().to_string().contains("not committed"));
1972
1973 let select = engine.execute("SELECT * FROM txn_nocommit", None).unwrap();
1975 assert_eq!(select.rows.len(), 1);
1976 }
1977
1978 #[test]
1979 fn test_transaction_auto_rollback_on_error() {
1980 let engine = QueryEngine::new();
1981 engine
1982 .execute("CREATE TABLE txn_err (id INT, name VARCHAR(50))", None)
1983 .unwrap();
1984 engine
1985 .execute("INSERT INTO txn_err VALUES (1, 'Keep')", None)
1986 .unwrap();
1987
1988 let result = engine.execute(
1990 "BEGIN; INSERT INTO txn_err VALUES (2, 'Lose'); INSERT INTO nonexistent VALUES (3, 'Fail'); COMMIT",
1991 None,
1992 );
1993 assert!(result.is_err());
1994
1995 let select = engine.execute("SELECT * FROM txn_err", None).unwrap();
1997 assert_eq!(select.rows.len(), 1);
1998 }
1999
2000 #[test]
2001 fn test_single_statement_still_works() {
2002 let engine = QueryEngine::new();
2003 engine
2004 .execute("CREATE TABLE single (id INT)", None)
2005 .unwrap();
2006 engine
2007 .execute("INSERT INTO single VALUES (1)", None)
2008 .unwrap();
2009 engine
2010 .execute("INSERT INTO single VALUES (2)", None)
2011 .unwrap();
2012
2013 let select = engine.execute("SELECT * FROM single", None).unwrap();
2014 assert_eq!(select.rows.len(), 2);
2015 }
2016
2017 #[test]
2018 fn test_parameterized_insert() {
2019 let engine = QueryEngine::new();
2020 engine
2021 .execute("CREATE TABLE param_test (id INT, name VARCHAR(50))", None)
2022 .unwrap();
2023
2024 engine
2026 .execute_with_params(
2027 "INSERT INTO param_test VALUES ($1, $2)",
2028 None,
2029 &[serde_json::json!(1), serde_json::json!("Alice")],
2030 )
2031 .unwrap();
2032
2033 engine
2034 .execute_with_params(
2035 "INSERT INTO param_test VALUES ($1, $2)",
2036 None,
2037 &[serde_json::json!(2), serde_json::json!("Bob")],
2038 )
2039 .unwrap();
2040
2041 let select = engine.execute("SELECT * FROM param_test", None).unwrap();
2042 assert_eq!(select.rows.len(), 2);
2043 assert_eq!(
2044 select.rows[0][1],
2045 serde_json::Value::String("Alice".to_string())
2046 );
2047 assert_eq!(
2048 select.rows[1][1],
2049 serde_json::Value::String("Bob".to_string())
2050 );
2051 }
2052
2053 #[test]
2054 fn test_parameterized_select() {
2055 let engine = QueryEngine::new();
2056 engine
2057 .execute("CREATE TABLE param_sel (id INT, name VARCHAR(50))", None)
2058 .unwrap();
2059 engine
2060 .execute("INSERT INTO param_sel VALUES (1, 'Alice')", None)
2061 .unwrap();
2062 engine
2063 .execute("INSERT INTO param_sel VALUES (2, 'Bob')", None)
2064 .unwrap();
2065 engine
2066 .execute("INSERT INTO param_sel VALUES (3, 'Charlie')", None)
2067 .unwrap();
2068
2069 let result = engine
2071 .execute_with_params(
2072 "SELECT * FROM param_sel WHERE id = $1",
2073 None,
2074 &[serde_json::json!(2)],
2075 )
2076 .unwrap();
2077 assert_eq!(result.rows.len(), 1);
2078 assert_eq!(
2079 result.rows[0][1],
2080 serde_json::Value::String("Bob".to_string())
2081 );
2082 }
2083
2084 #[test]
2085 fn test_parameterized_update() {
2086 let engine = QueryEngine::new();
2087 engine
2088 .execute("CREATE TABLE param_upd (id INT, name VARCHAR(50))", None)
2089 .unwrap();
2090 engine
2091 .execute("INSERT INTO param_upd VALUES (1, 'Alice')", None)
2092 .unwrap();
2093
2094 engine
2095 .execute_with_params(
2096 "UPDATE param_upd SET name = $1 WHERE id = $2",
2097 None,
2098 &[serde_json::json!("Alicia"), serde_json::json!(1)],
2099 )
2100 .unwrap();
2101
2102 let result = engine
2103 .execute("SELECT * FROM param_upd WHERE id = 1", None)
2104 .unwrap();
2105 assert_eq!(
2106 result.rows[0][1],
2107 serde_json::Value::String("Alicia".to_string())
2108 );
2109 }
2110}