1use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17
18#[derive(Debug, Clone)]
22pub enum StorageError {
23 ConnectionError(String),
25 QueryError(String),
27 SerializationError(String),
29 NotFound(String),
31}
32
33impl std::fmt::Display for StorageError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 StorageError::ConnectionError(s) => write!(f, "connection error: {s}"),
37 StorageError::QueryError(s) => write!(f, "query error: {s}"),
38 StorageError::SerializationError(s) => write!(f, "serialization error: {s}"),
39 StorageError::NotFound(s) => write!(f, "not found: {s}"),
40 }
41 }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TraceRow {
49 pub tenant_id: String,
50 pub trace_id: u64,
51 pub flow_name: String,
52 pub status: String,
53 pub steps_executed: u32,
54 pub latency_ms: u64,
55 pub tokens_input: u64,
56 pub tokens_output: u64,
57 pub anchor_checks: u32,
58 pub anchor_breaches: u32,
59 pub errors: u32,
60 pub retries: u32,
61 pub source_file: String,
62 pub backend: String,
63 pub client_key: String,
64 pub replay_of: Option<u64>,
65 pub correlation_id: Option<String>,
66 pub events: Value,
67 pub annotations: Value,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct SessionRow {
73 pub tenant_id: String,
74 pub scope: String,
75 pub key: String,
76 pub value: String,
77 pub source_step: String,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DaemonRow {
83 pub tenant_id: String,
84 pub name: String,
85 pub state: String,
86 pub source_file: String,
87 pub flow_name: String,
88 pub event_count: u64,
89 pub restart_count: u32,
90 pub trigger_topic: Option<String>,
91 pub output_topic: Option<String>,
92 pub lifecycle_events: Value,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct AuditRow {
98 pub tenant_id: String,
99 pub action: String,
100 pub actor: String,
101 pub target: String,
102 pub detail: Value,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct AxonStoreRow {
108 pub tenant_id: String,
109 pub name: String,
110 pub ontology: String,
111 pub entries: Value,
112 pub created_at: u64,
113 pub total_ops: u64,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct DataspaceRow {
119 pub tenant_id: String,
120 pub name: String,
121 pub ontology: String,
122 pub entries: Value,
123 pub associations: Value,
124 pub created_at: u64,
125 pub total_ops: u64,
126 pub next_id: u64,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct HibernationRow {
132 pub tenant_id: String,
133 pub id: String,
134 pub name: String,
135 pub operation: String,
136 pub status: String,
137 pub checkpoints: Value,
138 pub resumed_from: Option<i32>,
139 pub created_at: u64,
140 pub last_status_change: u64,
141 pub next_checkpoint_id: u32,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct EventRow {
147 pub tenant_id: String,
148 pub topic: String,
149 pub source: String,
150 pub payload: Value,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct CacheRow {
156 pub tenant_id: String,
157 pub flow_name: String,
158 pub cache_key: String,
159 pub result: Value,
160 pub ttl_secs: Option<i32>,
161 pub hit_count: u64,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct CostRow {
167 pub tenant_id: String,
168 pub flow_name: String,
169 pub backend: String,
170 pub input_tokens: u64,
171 pub output_tokens: u64,
172 pub cost_usd: f64,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct ScheduleRow {
178 pub tenant_id: String,
179 pub name: String,
180 pub flow_name: String,
181 pub interval_secs: u64,
182 pub enabled: bool,
183 pub backend: String,
184 pub last_run: u64,
185 pub next_run: u64,
186 pub run_count: u64,
187 pub error_count: u64,
188}
189
190pub trait StorageBackend: Send + Sync {
197 fn save_trace(&self, trace: &TraceRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
199 fn load_traces(&self, limit: usize, offset: usize) -> impl std::future::Future<Output = Result<Vec<TraceRow>, StorageError>> + Send;
200 fn get_trace(&self, trace_id: u64) -> impl std::future::Future<Output = Result<Option<TraceRow>, StorageError>> + Send;
201 fn delete_traces(&self, ids: &[u64]) -> impl std::future::Future<Output = Result<u64, StorageError>> + Send;
202
203 fn save_session(&self, entry: &SessionRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
205 fn load_sessions(&self, scope: &str) -> impl std::future::Future<Output = Result<Vec<SessionRow>, StorageError>> + Send;
206 fn delete_session(&self, scope: &str, key: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
207
208 fn save_daemon(&self, daemon: &DaemonRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
210 fn load_daemons(&self) -> impl std::future::Future<Output = Result<Vec<DaemonRow>, StorageError>> + Send;
211 fn delete_daemon(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
212
213 fn append_audit(&self, entry: &AuditRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
215 fn query_audit(&self, limit: usize) -> impl std::future::Future<Output = Result<Vec<AuditRow>, StorageError>> + Send;
216
217 fn save_axon_store(&self, store: &AxonStoreRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
219 fn load_axon_stores(&self) -> impl std::future::Future<Output = Result<Vec<AxonStoreRow>, StorageError>> + Send;
220 fn delete_axon_store(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
221
222 fn save_dataspace(&self, ds: &DataspaceRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
224 fn load_dataspaces(&self) -> impl std::future::Future<Output = Result<Vec<DataspaceRow>, StorageError>> + Send;
225 fn delete_dataspace(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
226
227 fn save_hibernation(&self, session: &HibernationRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
229 fn load_hibernations(&self) -> impl std::future::Future<Output = Result<Vec<HibernationRow>, StorageError>> + Send;
230
231 fn append_event(&self, event: &EventRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
233 fn query_events(&self, topic: Option<&str>, limit: usize) -> impl std::future::Future<Output = Result<Vec<EventRow>, StorageError>> + Send;
234
235 fn save_cache_entry(&self, entry: &CacheRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
237 fn load_cache_entries(&self) -> impl std::future::Future<Output = Result<Vec<CacheRow>, StorageError>> + Send;
238 fn evict_expired_cache(&self) -> impl std::future::Future<Output = Result<u64, StorageError>> + Send;
239
240 fn record_cost(&self, cost: &CostRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
242 fn query_costs(&self, flow: Option<&str>, limit: usize) -> impl std::future::Future<Output = Result<Vec<CostRow>, StorageError>> + Send;
243
244 fn save_schedule(&self, schedule: &ScheduleRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
246 fn load_schedules(&self) -> impl std::future::Future<Output = Result<Vec<ScheduleRow>, StorageError>> + Send;
247 fn delete_schedule(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
248
249 fn is_healthy(&self) -> impl std::future::Future<Output = bool> + Send;
251}
252
253pub struct InMemoryBackend;
258
259impl InMemoryBackend {
260 pub fn new() -> Self {
261 InMemoryBackend
262 }
263}
264
265impl StorageBackend for InMemoryBackend {
266 async fn save_trace(&self, _trace: &TraceRow) -> Result<(), StorageError> { Ok(()) }
267 async fn load_traces(&self, _limit: usize, _offset: usize) -> Result<Vec<TraceRow>, StorageError> { Ok(vec![]) }
268 async fn get_trace(&self, _trace_id: u64) -> Result<Option<TraceRow>, StorageError> { Ok(None) }
269 async fn delete_traces(&self, _ids: &[u64]) -> Result<u64, StorageError> { Ok(0) }
270
271 async fn save_session(&self, _entry: &SessionRow) -> Result<(), StorageError> { Ok(()) }
272 async fn load_sessions(&self, _scope: &str) -> Result<Vec<SessionRow>, StorageError> { Ok(vec![]) }
273 async fn delete_session(&self, _scope: &str, _key: &str) -> Result<bool, StorageError> { Ok(false) }
274
275 async fn save_daemon(&self, _daemon: &DaemonRow) -> Result<(), StorageError> { Ok(()) }
276 async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> { Ok(vec![]) }
277 async fn delete_daemon(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
278
279 async fn append_audit(&self, _entry: &AuditRow) -> Result<(), StorageError> { Ok(()) }
280 async fn query_audit(&self, _limit: usize) -> Result<Vec<AuditRow>, StorageError> { Ok(vec![]) }
281
282 async fn save_axon_store(&self, _store: &AxonStoreRow) -> Result<(), StorageError> { Ok(()) }
283 async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> { Ok(vec![]) }
284 async fn delete_axon_store(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
285
286 async fn save_dataspace(&self, _ds: &DataspaceRow) -> Result<(), StorageError> { Ok(()) }
287 async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> { Ok(vec![]) }
288 async fn delete_dataspace(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
289
290 async fn save_hibernation(&self, _session: &HibernationRow) -> Result<(), StorageError> { Ok(()) }
291 async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> { Ok(vec![]) }
292
293 async fn append_event(&self, _event: &EventRow) -> Result<(), StorageError> { Ok(()) }
294 async fn query_events(&self, _topic: Option<&str>, _limit: usize) -> Result<Vec<EventRow>, StorageError> { Ok(vec![]) }
295
296 async fn save_cache_entry(&self, _entry: &CacheRow) -> Result<(), StorageError> { Ok(()) }
297 async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> { Ok(vec![]) }
298 async fn evict_expired_cache(&self) -> Result<u64, StorageError> { Ok(0) }
299
300 async fn record_cost(&self, _cost: &CostRow) -> Result<(), StorageError> { Ok(()) }
301 async fn query_costs(&self, _flow: Option<&str>, _limit: usize) -> Result<Vec<CostRow>, StorageError> { Ok(vec![]) }
302
303 async fn save_schedule(&self, _schedule: &ScheduleRow) -> Result<(), StorageError> { Ok(()) }
304 async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> { Ok(vec![]) }
305 async fn delete_schedule(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
306
307 async fn is_healthy(&self) -> bool { true }
308}
309
310pub enum StorageDispatcher {
316 InMemory(InMemoryBackend),
317 Postgres(crate::storage_postgres::PostgresBackend),
318}
319
320impl StorageDispatcher {
321 pub fn in_memory() -> Self {
322 StorageDispatcher::InMemory(InMemoryBackend::new())
323 }
324
325 pub fn postgres(pool: sqlx::PgPool) -> Self {
326 StorageDispatcher::Postgres(crate::storage_postgres::PostgresBackend::new(pool))
327 }
328}
329
330macro_rules! dispatch {
332 ($self:expr, $method:ident $(, $arg:expr)*) => {
333 match $self {
334 StorageDispatcher::InMemory(b) => b.$method($($arg),*).await,
335 StorageDispatcher::Postgres(b) => b.$method($($arg),*).await,
336 }
337 };
338}
339
340impl StorageBackend for StorageDispatcher {
341 async fn save_trace(&self, trace: &TraceRow) -> Result<(), StorageError> { dispatch!(self, save_trace, trace) }
342 async fn load_traces(&self, limit: usize, offset: usize) -> Result<Vec<TraceRow>, StorageError> { dispatch!(self, load_traces, limit, offset) }
343 async fn get_trace(&self, trace_id: u64) -> Result<Option<TraceRow>, StorageError> { dispatch!(self, get_trace, trace_id) }
344 async fn delete_traces(&self, ids: &[u64]) -> Result<u64, StorageError> { dispatch!(self, delete_traces, ids) }
345
346 async fn save_session(&self, entry: &SessionRow) -> Result<(), StorageError> { dispatch!(self, save_session, entry) }
347 async fn load_sessions(&self, scope: &str) -> Result<Vec<SessionRow>, StorageError> { dispatch!(self, load_sessions, scope) }
348 async fn delete_session(&self, scope: &str, key: &str) -> Result<bool, StorageError> { dispatch!(self, delete_session, scope, key) }
349
350 async fn save_daemon(&self, daemon: &DaemonRow) -> Result<(), StorageError> { dispatch!(self, save_daemon, daemon) }
351 async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> { dispatch!(self, load_daemons) }
352 async fn delete_daemon(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_daemon, name) }
353
354 async fn append_audit(&self, entry: &AuditRow) -> Result<(), StorageError> { dispatch!(self, append_audit, entry) }
355 async fn query_audit(&self, limit: usize) -> Result<Vec<AuditRow>, StorageError> { dispatch!(self, query_audit, limit) }
356
357 async fn save_axon_store(&self, store: &AxonStoreRow) -> Result<(), StorageError> { dispatch!(self, save_axon_store, store) }
358 async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> { dispatch!(self, load_axon_stores) }
359 async fn delete_axon_store(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_axon_store, name) }
360
361 async fn save_dataspace(&self, ds: &DataspaceRow) -> Result<(), StorageError> { dispatch!(self, save_dataspace, ds) }
362 async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> { dispatch!(self, load_dataspaces) }
363 async fn delete_dataspace(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_dataspace, name) }
364
365 async fn save_hibernation(&self, session: &HibernationRow) -> Result<(), StorageError> { dispatch!(self, save_hibernation, session) }
366 async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> { dispatch!(self, load_hibernations) }
367
368 async fn append_event(&self, event: &EventRow) -> Result<(), StorageError> { dispatch!(self, append_event, event) }
369 async fn query_events(&self, topic: Option<&str>, limit: usize) -> Result<Vec<EventRow>, StorageError> { dispatch!(self, query_events, topic, limit) }
370
371 async fn save_cache_entry(&self, entry: &CacheRow) -> Result<(), StorageError> { dispatch!(self, save_cache_entry, entry) }
372 async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> { dispatch!(self, load_cache_entries) }
373 async fn evict_expired_cache(&self) -> Result<u64, StorageError> { dispatch!(self, evict_expired_cache) }
374
375 async fn record_cost(&self, cost: &CostRow) -> Result<(), StorageError> { dispatch!(self, record_cost, cost) }
376 async fn query_costs(&self, flow: Option<&str>, limit: usize) -> Result<Vec<CostRow>, StorageError> { dispatch!(self, query_costs, flow, limit) }
377
378 async fn save_schedule(&self, schedule: &ScheduleRow) -> Result<(), StorageError> { dispatch!(self, save_schedule, schedule) }
379 async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> { dispatch!(self, load_schedules) }
380 async fn delete_schedule(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_schedule, name) }
381
382 async fn is_healthy(&self) -> bool {
383 match self {
384 StorageDispatcher::InMemory(b) => b.is_healthy().await,
385 StorageDispatcher::Postgres(b) => b.is_healthy().await,
386 }
387 }
388}
389
390#[cfg(test)]
393mod tests {
394 use super::*;
395
396 #[tokio::test]
397 async fn test_in_memory_trace_round_trip() {
398 let backend = InMemoryBackend::new();
399 let trace = TraceRow {
400 tenant_id: "default".into(),
401 trace_id: 1,
402 flow_name: "test_flow".into(),
403 status: "success".into(),
404 steps_executed: 3,
405 latency_ms: 150,
406 tokens_input: 100,
407 tokens_output: 50,
408 anchor_checks: 2,
409 anchor_breaches: 0,
410 errors: 0,
411 retries: 0,
412 source_file: "test.axon".into(),
413 backend: "stub".into(),
414 client_key: "".into(),
415 replay_of: None,
416 correlation_id: None,
417 events: serde_json::json!([]),
418 annotations: serde_json::json!([]),
419 };
420 assert!(backend.save_trace(&trace).await.is_ok());
421 let loaded = backend.load_traces(10, 0).await.unwrap();
422 assert!(loaded.is_empty()); }
424
425 #[tokio::test]
426 async fn test_in_memory_session_ops() {
427 let backend = InMemoryBackend::new();
428 let session = SessionRow {
429 tenant_id: "default".into(),
430 scope: "default".into(),
431 key: "user_name".into(),
432 value: "Alice".into(),
433 source_step: "step_1".into(),
434 };
435 assert!(backend.save_session(&session).await.is_ok());
436 assert!(backend.load_sessions("default").await.unwrap().is_empty());
437 assert!(!backend.delete_session("default", "user_name").await.unwrap());
438 }
439
440 #[tokio::test]
441 async fn test_in_memory_daemon_ops() {
442 let backend = InMemoryBackend::new();
443 let daemon = DaemonRow {
444 tenant_id: "default".into(),
445 name: "agent_1".into(),
446 state: "running".into(),
447 source_file: "agent.axon".into(),
448 flow_name: "main".into(),
449 event_count: 0,
450 restart_count: 0,
451 trigger_topic: Some("user.input".into()),
452 output_topic: None,
453 lifecycle_events: serde_json::json!([]),
454 };
455 assert!(backend.save_daemon(&daemon).await.is_ok());
456 assert!(backend.load_daemons().await.unwrap().is_empty());
457 }
458
459 #[tokio::test]
460 async fn test_in_memory_audit_ops() {
461 let backend = InMemoryBackend::new();
462 let entry = AuditRow {
463 tenant_id: "default".into(),
464 action: "deploy".into(),
465 actor: "admin".into(),
466 target: "flow_1".into(),
467 detail: serde_json::json!({"version": "1.0"}),
468 };
469 assert!(backend.append_audit(&entry).await.is_ok());
470 assert!(backend.query_audit(10).await.unwrap().is_empty());
471 }
472
473 #[tokio::test]
474 async fn test_in_memory_hibernation_ops() {
475 let backend = InMemoryBackend::new();
476 let hib = HibernationRow {
477 tenant_id: "default".into(),
478 id: "h1".into(),
479 name: "example_agent".into(),
480 operation: "process_document".into(),
481 status: "active".into(),
482 checkpoints: serde_json::json!([]),
483 resumed_from: None,
484 created_at: 1700000000,
485 last_status_change: 1700000000,
486 next_checkpoint_id: 1,
487 };
488 assert!(backend.save_hibernation(&hib).await.is_ok());
489 assert!(backend.load_hibernations().await.unwrap().is_empty());
490 }
491
492 #[tokio::test]
493 async fn test_in_memory_cost_ops() {
494 let backend = InMemoryBackend::new();
495 let cost = CostRow {
496 tenant_id: "default".into(),
497 flow_name: "analysis".into(),
498 backend: "anthropic".into(),
499 input_tokens: 1000,
500 output_tokens: 500,
501 cost_usd: 0.015,
502 };
503 assert!(backend.record_cost(&cost).await.is_ok());
504 assert!(backend.query_costs(None, 10).await.unwrap().is_empty());
505 }
506
507 #[tokio::test]
508 async fn test_in_memory_health() {
509 let backend = InMemoryBackend::new();
510 assert!(backend.is_healthy().await);
511 }
512
513 #[tokio::test]
514 async fn test_in_memory_cache_ops() {
515 let backend = InMemoryBackend::new();
516 let cache = CacheRow {
517 tenant_id: "default".into(),
518 flow_name: "test".into(),
519 cache_key: "k1".into(),
520 result: serde_json::json!({"output": "hello"}),
521 ttl_secs: Some(300),
522 hit_count: 0,
523 };
524 assert!(backend.save_cache_entry(&cache).await.is_ok());
525 assert!(backend.load_cache_entries().await.unwrap().is_empty());
526 assert_eq!(backend.evict_expired_cache().await.unwrap(), 0);
527 }
528
529 #[tokio::test]
530 async fn test_in_memory_schedule_ops() {
531 let backend = InMemoryBackend::new();
532 let schedule = ScheduleRow {
533 tenant_id: "default".into(),
534 name: "daily_report".into(),
535 flow_name: "report".into(),
536 interval_secs: 86400,
537 enabled: true,
538 backend: "anthropic".into(),
539 last_run: 0,
540 next_run: 86400,
541 run_count: 0,
542 error_count: 0,
543 };
544 assert!(backend.save_schedule(&schedule).await.is_ok());
545 assert!(backend.load_schedules().await.unwrap().is_empty());
546 assert!(!backend.delete_schedule("daily_report").await.unwrap());
547 }
548
549 #[tokio::test]
550 async fn test_in_memory_event_ops() {
551 let backend = InMemoryBackend::new();
552 let event = EventRow {
553 tenant_id: "default".into(),
554 topic: "flow.completed".into(),
555 source: "executor".into(),
556 payload: serde_json::json!({"flow": "test"}),
557 };
558 assert!(backend.append_event(&event).await.is_ok());
559 assert!(backend.query_events(None, 10).await.unwrap().is_empty());
560 assert!(backend.query_events(Some("flow.completed"), 10).await.unwrap().is_empty());
561 }
562
563 #[test]
564 fn test_storage_error_display() {
565 assert_eq!(
566 format!("{}", StorageError::ConnectionError("timeout".into())),
567 "connection error: timeout"
568 );
569 assert_eq!(
570 format!("{}", StorageError::QueryError("syntax".into())),
571 "query error: syntax"
572 );
573 assert_eq!(
574 format!("{}", StorageError::NotFound("trace_42".into())),
575 "not found: trace_42"
576 );
577 }
578}