1use crate::activity::ActivityLogger;
10use crate::admin::AdminService;
11use crate::auth::AuthService;
12use crate::config::ServerConfig;
13use aegis_document::DocumentEngine;
14use aegis_query::{Executor, Parser, Planner};
15use aegis_query::planner::PlannerSchema;
16use aegis_query::executor::ExecutionContext;
17use aegis_streaming::StreamingEngine;
18use aegis_timeseries::TimeSeriesEngine;
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22use parking_lot::RwLock as SyncRwLock;
23
24#[derive(Clone)]
30pub struct AppState {
31 pub config: Arc<ServerConfig>,
32 pub query_engine: Arc<QueryEngine>,
33 pub document_engine: Arc<DocumentEngine>,
34 pub timeseries_engine: Arc<TimeSeriesEngine>,
35 pub streaming_engine: Arc<StreamingEngine>,
36 pub kv_store: Arc<KvStore>,
37 pub metrics: Arc<RwLock<Metrics>>,
38 pub admin: Arc<AdminService>,
39 pub auth: Arc<AuthService>,
40 pub activity: Arc<ActivityLogger>,
41}
42
43impl AppState {
44 pub fn new(config: ServerConfig) -> Self {
46 let activity = Arc::new(ActivityLogger::new());
47
48 activity.log_system("Aegis DB server started");
50
51 let document_engine = Arc::new(DocumentEngine::new());
52
53 Self::init_default_collections(&document_engine, &activity);
55
56 Self {
57 config: Arc::new(config),
58 query_engine: Arc::new(QueryEngine::new()),
59 document_engine,
60 timeseries_engine: Arc::new(TimeSeriesEngine::new()),
61 streaming_engine: Arc::new(StreamingEngine::new()),
62 kv_store: Arc::new(KvStore::new()),
63 metrics: Arc::new(RwLock::new(Metrics::default())),
64 admin: Arc::new(AdminService::new()),
65 auth: Arc::new(AuthService::new()),
66 activity,
67 }
68 }
69
70 fn init_default_collections(engine: &DocumentEngine, activity: &ActivityLogger) {
72 let default_collections = vec![
73 "axonml_users",
74 "axonml_runs",
75 "axonml_models",
76 "axonml_model_versions",
77 "axonml_endpoints",
78 ];
79
80 for collection in default_collections {
81 if !engine.collection_exists(collection) {
82 if let Err(e) = engine.create_collection(collection) {
83 tracing::warn!("Failed to create default collection {}: {}", collection, e);
84 } else {
85 tracing::info!("Created default collection: {}", collection);
86 activity.log_system(&format!("Created default collection: {}", collection));
87 }
88 } else {
89 tracing::debug!("Collection {} already exists", collection);
90 }
91 }
92 }
93
94 pub async fn execute_query(&self, sql: &str) -> Result<QueryResult, QueryError> {
96 self.query_engine.execute(sql)
97 }
98
99 pub async fn record_request(&self, duration_ms: u64, success: bool) {
101 let mut metrics = self.metrics.write().await;
102 metrics.total_requests += 1;
103 metrics.total_duration_ms += duration_ms;
104 if !success {
105 metrics.failed_requests += 1;
106 }
107 }
108}
109
110pub struct KvStore {
116 data: SyncRwLock<HashMap<String, KvEntry>>,
117}
118
119#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct KvEntry {
122 pub key: String,
123 pub value: serde_json::Value,
124 pub ttl: Option<u64>,
125 pub created_at: chrono::DateTime<chrono::Utc>,
126 pub updated_at: chrono::DateTime<chrono::Utc>,
127}
128
129impl KvStore {
130 pub fn new() -> Self {
131 Self {
132 data: SyncRwLock::new(HashMap::new()),
133 }
134 }
135
136 pub fn set(&self, key: String, value: serde_json::Value, ttl: Option<u64>) -> KvEntry {
138 let now = chrono::Utc::now();
139 let mut data = self.data.write();
140
141 let entry = if let Some(existing) = data.get(&key) {
142 KvEntry {
143 key: key.clone(),
144 value,
145 ttl,
146 created_at: existing.created_at,
147 updated_at: now,
148 }
149 } else {
150 KvEntry {
151 key: key.clone(),
152 value,
153 ttl,
154 created_at: now,
155 updated_at: now,
156 }
157 };
158
159 data.insert(key, entry.clone());
160 entry
161 }
162
163 pub fn get(&self, key: &str) -> Option<KvEntry> {
165 let data = self.data.read();
166 data.get(key).cloned()
167 }
168
169 pub fn delete(&self, key: &str) -> Option<KvEntry> {
171 let mut data = self.data.write();
172 data.remove(key)
173 }
174
175 pub fn list(&self, prefix: Option<&str>, limit: usize) -> Vec<KvEntry> {
177 let data = self.data.read();
178 let iter = data.values();
179
180 if let Some(p) = prefix {
181 iter.filter(|e| e.key.starts_with(p))
182 .take(limit)
183 .cloned()
184 .collect()
185 } else {
186 iter.take(limit).cloned().collect()
187 }
188 }
189
190 pub fn count(&self) -> usize {
192 self.data.read().len()
193 }
194}
195
196impl Default for KvStore {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202pub struct QueryEngine {
208 parser: Parser,
209 planner: Planner,
210 context: SyncRwLock<ExecutionContext>,
211}
212
213impl QueryEngine {
214 pub fn new() -> Self {
215 let schema = Arc::new(PlannerSchema::new());
216 Self {
217 parser: Parser::new(),
218 planner: Planner::new(schema),
219 context: SyncRwLock::new(ExecutionContext::new()),
220 }
221 }
222
223 pub fn execute(&self, sql: &str) -> Result<QueryResult, QueryError> {
225 let statements = self.parser.parse(sql)
226 .map_err(|e| QueryError::Parse(e.to_string()))?;
227
228 if statements.is_empty() {
229 return Ok(QueryResult {
230 columns: vec![],
231 rows: vec![],
232 rows_affected: 0,
233 });
234 }
235
236 let statement = &statements[0];
237 let plan = self.planner.plan(statement)
238 .map_err(|e| QueryError::Plan(e.to_string()))?;
239
240 let mut context = self.context.write();
242 let mut executor = Executor::new(context.clone());
243 let result = executor.execute(&plan)
244 .map_err(|e| QueryError::Execute(e.to_string()))?;
245
246 *context = executor.into_context();
248
249 Ok(QueryResult {
250 columns: result.columns,
251 rows: result.rows.into_iter().map(|r| {
252 r.values.into_iter().map(value_to_json).collect()
253 }).collect(),
254 rows_affected: result.rows_affected,
255 })
256 }
257}
258
259impl Default for QueryEngine {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265#[derive(Debug, Clone, serde::Serialize)]
271pub struct QueryResult {
272 pub columns: Vec<String>,
273 pub rows: Vec<Vec<serde_json::Value>>,
274 pub rows_affected: u64,
275}
276
277fn value_to_json(value: aegis_common::Value) -> serde_json::Value {
279 match value {
280 aegis_common::Value::Null => serde_json::Value::Null,
281 aegis_common::Value::Boolean(b) => serde_json::Value::Bool(b),
282 aegis_common::Value::Integer(i) => serde_json::Value::Number(i.into()),
283 aegis_common::Value::Float(f) => {
284 serde_json::Number::from_f64(f)
285 .map(serde_json::Value::Number)
286 .unwrap_or(serde_json::Value::Null)
287 }
288 aegis_common::Value::String(s) => serde_json::Value::String(s),
289 aegis_common::Value::Bytes(b) => {
290 serde_json::Value::String(base64_encode(&b))
291 }
292 aegis_common::Value::Timestamp(t) => {
293 serde_json::Value::String(t.to_rfc3339())
294 }
295 aegis_common::Value::Array(arr) => {
296 serde_json::Value::Array(arr.into_iter().map(value_to_json).collect())
297 }
298 aegis_common::Value::Object(obj) => {
299 let map: serde_json::Map<String, serde_json::Value> = obj
300 .into_iter()
301 .map(|(k, v)| (k, value_to_json(v)))
302 .collect();
303 serde_json::Value::Object(map)
304 }
305 }
306}
307
308fn base64_encode(data: &[u8]) -> String {
309 const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
310 let mut result = String::new();
311
312 for chunk in data.chunks(3) {
313 let b0 = chunk[0] as usize;
314 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
315 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
316
317 result.push(CHARS[b0 >> 2] as char);
318 result.push(CHARS[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
319
320 if chunk.len() > 1 {
321 result.push(CHARS[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
322 } else {
323 result.push('=');
324 }
325
326 if chunk.len() > 2 {
327 result.push(CHARS[b2 & 0x3f] as char);
328 } else {
329 result.push('=');
330 }
331 }
332
333 result
334}
335
336#[derive(Debug, thiserror::Error)]
342pub enum QueryError {
343 #[error("Parse error: {0}")]
344 Parse(String),
345
346 #[error("Planning error: {0}")]
347 Plan(String),
348
349 #[error("Execution error: {0}")]
350 Execute(String),
351}
352
353#[derive(Debug, Default, Clone, serde::Serialize)]
359pub struct Metrics {
360 pub total_requests: u64,
361 pub failed_requests: u64,
362 pub total_duration_ms: u64,
363}
364
365impl Metrics {
366 pub fn avg_duration_ms(&self) -> f64 {
368 if self.total_requests == 0 {
369 0.0
370 } else {
371 self.total_duration_ms as f64 / self.total_requests as f64
372 }
373 }
374
375 pub fn success_rate(&self) -> f64 {
377 if self.total_requests == 0 {
378 1.0
379 } else {
380 1.0 - (self.failed_requests as f64 / self.total_requests as f64)
381 }
382 }
383}
384
385#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_metrics_calculations() {
395 let mut metrics = Metrics::default();
396 metrics.total_requests = 100;
397 metrics.failed_requests = 10;
398 metrics.total_duration_ms = 5000;
399
400 assert_eq!(metrics.avg_duration_ms(), 50.0);
401 assert!((metrics.success_rate() - 0.9).abs() < 0.001);
402 }
403
404 #[test]
405 fn test_value_to_json() {
406 let value = aegis_common::Value::String("test".to_string());
407 let json = value_to_json(value);
408 assert_eq!(json, serde_json::Value::String("test".to_string()));
409 }
410
411 #[test]
412 fn test_kv_store_operations() {
413 let store = KvStore::new();
414
415 let entry = store.set("key1".to_string(), serde_json::json!("value1"), None);
417 assert_eq!(entry.key, "key1");
418 assert_eq!(entry.value, serde_json::json!("value1"));
419
420 let retrieved = store.get("key1").unwrap();
422 assert_eq!(retrieved.value, serde_json::json!("value1"));
423
424 store.set("key2".to_string(), serde_json::json!("value2"), None);
426 let all = store.list(None, 100);
427 assert_eq!(all.len(), 2);
428
429 let deleted = store.delete("key1");
431 assert!(deleted.is_some());
432 assert!(store.get("key1").is_none());
433 }
434}