aegis_server/
state.rs

1//! Aegis Server State
2//!
3//! Application state shared across request handlers. Provides access to
4//! database engines, query engine, and configuration.
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use crate::activity::ActivityLogger;
10use crate::admin::AdminService;
11use crate::auth::AuthService;
12use crate::config::ServerConfig;
13use aegis_document::DocumentEngine;
14use aegis_query::{Executor, Parser, Planner};
15use aegis_query::planner::PlannerSchema;
16use aegis_query::executor::ExecutionContext;
17use aegis_streaming::StreamingEngine;
18use aegis_timeseries::TimeSeriesEngine;
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use parking_lot::RwLock as SyncRwLock;
23
24// =============================================================================
25// Application State
26// =============================================================================
27
28/// Shared application state with real engine integrations.
29#[derive(Clone)]
30pub struct AppState {
31    pub config: Arc<ServerConfig>,
32    pub query_engine: Arc<QueryEngine>,
33    pub document_engine: Arc<DocumentEngine>,
34    pub timeseries_engine: Arc<TimeSeriesEngine>,
35    pub streaming_engine: Arc<StreamingEngine>,
36    pub kv_store: Arc<KvStore>,
37    pub metrics: Arc<RwLock<Metrics>>,
38    pub admin: Arc<AdminService>,
39    pub auth: Arc<AuthService>,
40    pub activity: Arc<ActivityLogger>,
41}
42
43impl AppState {
44    /// Create new application state with the given configuration.
45    pub fn new(config: ServerConfig) -> Self {
46        let activity = Arc::new(ActivityLogger::new());
47
48        // Log server startup
49        activity.log_system("Aegis DB server started");
50
51        let document_engine = Arc::new(DocumentEngine::new());
52
53        // Initialize default collections for AxonML
54        Self::init_default_collections(&document_engine, &activity);
55
56        Self {
57            config: Arc::new(config),
58            query_engine: Arc::new(QueryEngine::new()),
59            document_engine,
60            timeseries_engine: Arc::new(TimeSeriesEngine::new()),
61            streaming_engine: Arc::new(StreamingEngine::new()),
62            kv_store: Arc::new(KvStore::new()),
63            metrics: Arc::new(RwLock::new(Metrics::default())),
64            admin: Arc::new(AdminService::new()),
65            auth: Arc::new(AuthService::new()),
66            activity,
67        }
68    }
69
70    /// Initialize default collections for AxonML and other clients
71    fn init_default_collections(engine: &DocumentEngine, activity: &ActivityLogger) {
72        let default_collections = vec![
73            "axonml_users",
74            "axonml_runs",
75            "axonml_models",
76            "axonml_model_versions",
77            "axonml_endpoints",
78        ];
79
80        for collection in default_collections {
81            if !engine.collection_exists(collection) {
82                if let Err(e) = engine.create_collection(collection) {
83                    tracing::warn!("Failed to create default collection {}: {}", collection, e);
84                } else {
85                    tracing::info!("Created default collection: {}", collection);
86                    activity.log_system(&format!("Created default collection: {}", collection));
87                }
88            } else {
89                tracing::debug!("Collection {} already exists", collection);
90            }
91        }
92    }
93
94    /// Execute a SQL query.
95    pub async fn execute_query(&self, sql: &str) -> Result<QueryResult, QueryError> {
96        self.query_engine.execute(sql)
97    }
98
99    /// Record a request metric.
100    pub async fn record_request(&self, duration_ms: u64, success: bool) {
101        let mut metrics = self.metrics.write().await;
102        metrics.total_requests += 1;
103        metrics.total_duration_ms += duration_ms;
104        if !success {
105            metrics.failed_requests += 1;
106        }
107    }
108}
109
110// =============================================================================
111// Key-Value Store
112// =============================================================================
113
114/// In-memory key-value store with real persistence.
115pub struct KvStore {
116    data: SyncRwLock<HashMap<String, KvEntry>>,
117}
118
119/// Key-value entry with metadata.
120#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct KvEntry {
122    pub key: String,
123    pub value: serde_json::Value,
124    pub ttl: Option<u64>,
125    pub created_at: chrono::DateTime<chrono::Utc>,
126    pub updated_at: chrono::DateTime<chrono::Utc>,
127}
128
129impl KvStore {
130    pub fn new() -> Self {
131        Self {
132            data: SyncRwLock::new(HashMap::new()),
133        }
134    }
135
136    /// Set a key-value pair.
137    pub fn set(&self, key: String, value: serde_json::Value, ttl: Option<u64>) -> KvEntry {
138        let now = chrono::Utc::now();
139        let mut data = self.data.write();
140
141        let entry = if let Some(existing) = data.get(&key) {
142            KvEntry {
143                key: key.clone(),
144                value,
145                ttl,
146                created_at: existing.created_at,
147                updated_at: now,
148            }
149        } else {
150            KvEntry {
151                key: key.clone(),
152                value,
153                ttl,
154                created_at: now,
155                updated_at: now,
156            }
157        };
158
159        data.insert(key, entry.clone());
160        entry
161    }
162
163    /// Get a value by key.
164    pub fn get(&self, key: &str) -> Option<KvEntry> {
165        let data = self.data.read();
166        data.get(key).cloned()
167    }
168
169    /// Delete a key.
170    pub fn delete(&self, key: &str) -> Option<KvEntry> {
171        let mut data = self.data.write();
172        data.remove(key)
173    }
174
175    /// List all keys with optional prefix filter.
176    pub fn list(&self, prefix: Option<&str>, limit: usize) -> Vec<KvEntry> {
177        let data = self.data.read();
178        let iter = data.values();
179
180        if let Some(p) = prefix {
181            iter.filter(|e| e.key.starts_with(p))
182                .take(limit)
183                .cloned()
184                .collect()
185        } else {
186            iter.take(limit).cloned().collect()
187        }
188    }
189
190    /// Get total count of keys.
191    pub fn count(&self) -> usize {
192        self.data.read().len()
193    }
194}
195
196impl Default for KvStore {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202// =============================================================================
203// Query Engine Wrapper
204// =============================================================================
205
206/// Query engine for executing SQL statements.
207pub struct QueryEngine {
208    parser: Parser,
209    planner: Planner,
210    context: SyncRwLock<ExecutionContext>,
211}
212
213impl QueryEngine {
214    pub fn new() -> Self {
215        let schema = Arc::new(PlannerSchema::new());
216        Self {
217            parser: Parser::new(),
218            planner: Planner::new(schema),
219            context: SyncRwLock::new(ExecutionContext::new()),
220        }
221    }
222
223    /// Execute a SQL query.
224    pub fn execute(&self, sql: &str) -> Result<QueryResult, QueryError> {
225        let statements = self.parser.parse(sql)
226            .map_err(|e| QueryError::Parse(e.to_string()))?;
227
228        if statements.is_empty() {
229            return Ok(QueryResult {
230                columns: vec![],
231                rows: vec![],
232                rows_affected: 0,
233            });
234        }
235
236        let statement = &statements[0];
237        let plan = self.planner.plan(statement)
238            .map_err(|e| QueryError::Plan(e.to_string()))?;
239
240        // Execute with mutable access to the context
241        let mut context = self.context.write();
242        let mut executor = Executor::new(context.clone());
243        let result = executor.execute(&plan)
244            .map_err(|e| QueryError::Execute(e.to_string()))?;
245
246        // Update the context with any changes (important for DDL/DML)
247        *context = executor.into_context();
248
249        Ok(QueryResult {
250            columns: result.columns,
251            rows: result.rows.into_iter().map(|r| {
252                r.values.into_iter().map(value_to_json).collect()
253            }).collect(),
254            rows_affected: result.rows_affected,
255        })
256    }
257}
258
259impl Default for QueryEngine {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265// =============================================================================
266// Query Result
267// =============================================================================
268
269/// Result of a query execution.
270#[derive(Debug, Clone, serde::Serialize)]
271pub struct QueryResult {
272    pub columns: Vec<String>,
273    pub rows: Vec<Vec<serde_json::Value>>,
274    pub rows_affected: u64,
275}
276
277/// Convert aegis Value to JSON.
278fn value_to_json(value: aegis_common::Value) -> serde_json::Value {
279    match value {
280        aegis_common::Value::Null => serde_json::Value::Null,
281        aegis_common::Value::Boolean(b) => serde_json::Value::Bool(b),
282        aegis_common::Value::Integer(i) => serde_json::Value::Number(i.into()),
283        aegis_common::Value::Float(f) => {
284            serde_json::Number::from_f64(f)
285                .map(serde_json::Value::Number)
286                .unwrap_or(serde_json::Value::Null)
287        }
288        aegis_common::Value::String(s) => serde_json::Value::String(s),
289        aegis_common::Value::Bytes(b) => {
290            serde_json::Value::String(base64_encode(&b))
291        }
292        aegis_common::Value::Timestamp(t) => {
293            serde_json::Value::String(t.to_rfc3339())
294        }
295        aegis_common::Value::Array(arr) => {
296            serde_json::Value::Array(arr.into_iter().map(value_to_json).collect())
297        }
298        aegis_common::Value::Object(obj) => {
299            let map: serde_json::Map<String, serde_json::Value> = obj
300                .into_iter()
301                .map(|(k, v)| (k, value_to_json(v)))
302                .collect();
303            serde_json::Value::Object(map)
304        }
305    }
306}
307
308fn base64_encode(data: &[u8]) -> String {
309    const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
310    let mut result = String::new();
311
312    for chunk in data.chunks(3) {
313        let b0 = chunk[0] as usize;
314        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
315        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
316
317        result.push(CHARS[b0 >> 2] as char);
318        result.push(CHARS[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
319
320        if chunk.len() > 1 {
321            result.push(CHARS[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
322        } else {
323            result.push('=');
324        }
325
326        if chunk.len() > 2 {
327            result.push(CHARS[b2 & 0x3f] as char);
328        } else {
329            result.push('=');
330        }
331    }
332
333    result
334}
335
336// =============================================================================
337// Query Error
338// =============================================================================
339
340/// Errors during query execution.
341#[derive(Debug, thiserror::Error)]
342pub enum QueryError {
343    #[error("Parse error: {0}")]
344    Parse(String),
345
346    #[error("Planning error: {0}")]
347    Plan(String),
348
349    #[error("Execution error: {0}")]
350    Execute(String),
351}
352
353// =============================================================================
354// Metrics
355// =============================================================================
356
357/// Server metrics.
358#[derive(Debug, Default, Clone, serde::Serialize)]
359pub struct Metrics {
360    pub total_requests: u64,
361    pub failed_requests: u64,
362    pub total_duration_ms: u64,
363}
364
365impl Metrics {
366    /// Calculate average request duration.
367    pub fn avg_duration_ms(&self) -> f64 {
368        if self.total_requests == 0 {
369            0.0
370        } else {
371            self.total_duration_ms as f64 / self.total_requests as f64
372        }
373    }
374
375    /// Calculate success rate.
376    pub fn success_rate(&self) -> f64 {
377        if self.total_requests == 0 {
378            1.0
379        } else {
380            1.0 - (self.failed_requests as f64 / self.total_requests as f64)
381        }
382    }
383}
384
385// =============================================================================
386// Tests
387// =============================================================================
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_metrics_calculations() {
395        let mut metrics = Metrics::default();
396        metrics.total_requests = 100;
397        metrics.failed_requests = 10;
398        metrics.total_duration_ms = 5000;
399
400        assert_eq!(metrics.avg_duration_ms(), 50.0);
401        assert!((metrics.success_rate() - 0.9).abs() < 0.001);
402    }
403
404    #[test]
405    fn test_value_to_json() {
406        let value = aegis_common::Value::String("test".to_string());
407        let json = value_to_json(value);
408        assert_eq!(json, serde_json::Value::String("test".to_string()));
409    }
410
411    #[test]
412    fn test_kv_store_operations() {
413        let store = KvStore::new();
414
415        // Set
416        let entry = store.set("key1".to_string(), serde_json::json!("value1"), None);
417        assert_eq!(entry.key, "key1");
418        assert_eq!(entry.value, serde_json::json!("value1"));
419
420        // Get
421        let retrieved = store.get("key1").unwrap();
422        assert_eq!(retrieved.value, serde_json::json!("value1"));
423
424        // List
425        store.set("key2".to_string(), serde_json::json!("value2"), None);
426        let all = store.list(None, 100);
427        assert_eq!(all.len(), 2);
428
429        // Delete
430        let deleted = store.delete("key1");
431        assert!(deleted.is_some());
432        assert!(store.get("key1").is_none());
433    }
434}