Skip to main content

axon/
storage.rs

1//! Storage Backend — trait abstraction for persistent state in AxonServer.
2//!
3//! Defines `StorageBackend` — an async trait for persisting server state across
4//! restarts. Two implementations:
5//!   - `InMemoryBackend`: No-op — state lives only in memory (development/testing)
6//!   - `PostgresBackend`: Full PostgreSQL persistence (production) — in `storage_postgres.rs`
7//!
8//! Architecture ready for future backends: Oracle, MariaDB, MySQL, etc.
9//! Each backend simply implements this trait.
10//!
11//! The server uses write-through: in-memory state is the source of truth during
12//! process lifetime; the backend provides durability across restarts.
13
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17
18// ── Error type ─────────────────────────────────────────────────────────────
19
20/// Storage operation error.
21#[derive(Debug, Clone)]
22pub enum StorageError {
23    /// Database connection or pool error.
24    ConnectionError(String),
25    /// Query execution error.
26    QueryError(String),
27    /// Serialization/deserialization error.
28    SerializationError(String),
29    /// Requested entity not found.
30    NotFound(String),
31}
32
33impl std::fmt::Display for StorageError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            StorageError::ConnectionError(s) => write!(f, "connection error: {s}"),
37            StorageError::QueryError(s) => write!(f, "query error: {s}"),
38            StorageError::SerializationError(s) => write!(f, "serialization error: {s}"),
39            StorageError::NotFound(s) => write!(f, "not found: {s}"),
40        }
41    }
42}
43
44// ── Portable row types ─────────────────────────────────────────────────────
45
46/// Portable trace record for storage.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TraceRow {
49    pub tenant_id: String,
50    pub trace_id: u64,
51    pub flow_name: String,
52    pub status: String,
53    pub steps_executed: u32,
54    pub latency_ms: u64,
55    pub tokens_input: u64,
56    pub tokens_output: u64,
57    pub anchor_checks: u32,
58    pub anchor_breaches: u32,
59    pub errors: u32,
60    pub retries: u32,
61    pub source_file: String,
62    pub backend: String,
63    pub client_key: String,
64    pub replay_of: Option<u64>,
65    pub correlation_id: Option<String>,
66    pub events: Value,
67    pub annotations: Value,
68}
69
70/// Portable session entry for storage.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct SessionRow {
73    pub tenant_id: String,
74    pub scope: String,
75    pub key: String,
76    pub value: String,
77    pub source_step: String,
78}
79
80/// Portable daemon record for storage.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DaemonRow {
83    pub tenant_id: String,
84    pub name: String,
85    pub state: String,
86    pub source_file: String,
87    pub flow_name: String,
88    pub event_count: u64,
89    pub restart_count: u32,
90    pub trigger_topic: Option<String>,
91    pub output_topic: Option<String>,
92    pub lifecycle_events: Value,
93}
94
95/// Portable audit entry for storage.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct AuditRow {
98    pub tenant_id: String,
99    pub action: String,
100    pub actor: String,
101    pub target: String,
102    pub detail: Value,
103}
104
105/// Portable AxonStore instance for storage.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct AxonStoreRow {
108    pub tenant_id: String,
109    pub name: String,
110    pub ontology: String,
111    pub entries: Value,
112    pub created_at: u64,
113    pub total_ops: u64,
114}
115
116/// Portable Dataspace instance for storage.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct DataspaceRow {
119    pub tenant_id: String,
120    pub name: String,
121    pub ontology: String,
122    pub entries: Value,
123    pub associations: Value,
124    pub created_at: u64,
125    pub total_ops: u64,
126    pub next_id: u64,
127}
128
129/// Portable hibernation session for storage.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct HibernationRow {
132    pub tenant_id: String,
133    pub id: String,
134    pub name: String,
135    pub operation: String,
136    pub status: String,
137    pub checkpoints: Value,
138    pub resumed_from: Option<i32>,
139    pub created_at: u64,
140    pub last_status_change: u64,
141    pub next_checkpoint_id: u32,
142}
143
144/// Portable event record for storage.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct EventRow {
147    pub tenant_id: String,
148    pub topic: String,
149    pub source: String,
150    pub payload: Value,
151}
152
153/// Portable cached execution result for storage.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct CacheRow {
156    pub tenant_id: String,
157    pub flow_name: String,
158    pub cache_key: String,
159    pub result: Value,
160    pub ttl_secs: Option<i32>,
161    pub hit_count: u64,
162}
163
164/// Portable cost record for storage.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct CostRow {
167    pub tenant_id: String,
168    pub flow_name: String,
169    pub backend: String,
170    pub input_tokens: u64,
171    pub output_tokens: u64,
172    pub cost_usd: f64,
173}
174
175/// Portable schedule entry for storage.
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct ScheduleRow {
178    pub tenant_id: String,
179    pub name: String,
180    pub flow_name: String,
181    pub interval_secs: u64,
182    pub enabled: bool,
183    pub backend: String,
184    pub last_run: u64,
185    pub next_run: u64,
186    pub run_count: u64,
187    pub error_count: u64,
188}
189
190// ── Storage Backend Trait ──────────────────────────────────────────────────
191
192/// Async trait for persistent storage backends.
193///
194/// All methods are async and return `Result<T, StorageError>`.
195/// Implementations must be `Send + Sync` for use across async tasks.
196pub trait StorageBackend: Send + Sync {
197    // ── Traces ──
198    fn save_trace(&self, trace: &TraceRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
199    fn load_traces(&self, limit: usize, offset: usize) -> impl std::future::Future<Output = Result<Vec<TraceRow>, StorageError>> + Send;
200    fn get_trace(&self, trace_id: u64) -> impl std::future::Future<Output = Result<Option<TraceRow>, StorageError>> + Send;
201    fn delete_traces(&self, ids: &[u64]) -> impl std::future::Future<Output = Result<u64, StorageError>> + Send;
202
203    // ── Sessions ──
204    fn save_session(&self, entry: &SessionRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
205    fn load_sessions(&self, scope: &str) -> impl std::future::Future<Output = Result<Vec<SessionRow>, StorageError>> + Send;
206    fn delete_session(&self, scope: &str, key: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
207
208    // ── Daemons ──
209    fn save_daemon(&self, daemon: &DaemonRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
210    fn load_daemons(&self) -> impl std::future::Future<Output = Result<Vec<DaemonRow>, StorageError>> + Send;
211    fn delete_daemon(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
212
213    // ── Audit ──
214    fn append_audit(&self, entry: &AuditRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
215    fn query_audit(&self, limit: usize) -> impl std::future::Future<Output = Result<Vec<AuditRow>, StorageError>> + Send;
216
217    // ── AxonStores ──
218    fn save_axon_store(&self, store: &AxonStoreRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
219    fn load_axon_stores(&self) -> impl std::future::Future<Output = Result<Vec<AxonStoreRow>, StorageError>> + Send;
220    fn delete_axon_store(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
221
222    // ── Dataspaces ──
223    fn save_dataspace(&self, ds: &DataspaceRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
224    fn load_dataspaces(&self) -> impl std::future::Future<Output = Result<Vec<DataspaceRow>, StorageError>> + Send;
225    fn delete_dataspace(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
226
227    // ── Hibernations ──
228    fn save_hibernation(&self, session: &HibernationRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
229    fn load_hibernations(&self) -> impl std::future::Future<Output = Result<Vec<HibernationRow>, StorageError>> + Send;
230
231    // ── Events ──
232    fn append_event(&self, event: &EventRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
233    fn query_events(&self, topic: Option<&str>, limit: usize) -> impl std::future::Future<Output = Result<Vec<EventRow>, StorageError>> + Send;
234
235    // ── Cache ──
236    fn save_cache_entry(&self, entry: &CacheRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
237    fn load_cache_entries(&self) -> impl std::future::Future<Output = Result<Vec<CacheRow>, StorageError>> + Send;
238    fn evict_expired_cache(&self) -> impl std::future::Future<Output = Result<u64, StorageError>> + Send;
239
240    // ── Cost tracking ──
241    fn record_cost(&self, cost: &CostRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
242    fn query_costs(&self, flow: Option<&str>, limit: usize) -> impl std::future::Future<Output = Result<Vec<CostRow>, StorageError>> + Send;
243
244    // ── Schedules ──
245    fn save_schedule(&self, schedule: &ScheduleRow) -> impl std::future::Future<Output = Result<(), StorageError>> + Send;
246    fn load_schedules(&self) -> impl std::future::Future<Output = Result<Vec<ScheduleRow>, StorageError>> + Send;
247    fn delete_schedule(&self, name: &str) -> impl std::future::Future<Output = Result<bool, StorageError>> + Send;
248
249    // ── Health ──
250    fn is_healthy(&self) -> impl std::future::Future<Output = bool> + Send;
251}
252
253// ── InMemoryBackend ────────────────────────────────────────────────────────
254
255/// No-op storage backend — all writes succeed, all loads return empty.
256/// Used when no DATABASE_URL is configured (development/testing).
257pub struct InMemoryBackend;
258
259impl InMemoryBackend {
260    pub fn new() -> Self {
261        InMemoryBackend
262    }
263}
264
265impl StorageBackend for InMemoryBackend {
266    async fn save_trace(&self, _trace: &TraceRow) -> Result<(), StorageError> { Ok(()) }
267    async fn load_traces(&self, _limit: usize, _offset: usize) -> Result<Vec<TraceRow>, StorageError> { Ok(vec![]) }
268    async fn get_trace(&self, _trace_id: u64) -> Result<Option<TraceRow>, StorageError> { Ok(None) }
269    async fn delete_traces(&self, _ids: &[u64]) -> Result<u64, StorageError> { Ok(0) }
270
271    async fn save_session(&self, _entry: &SessionRow) -> Result<(), StorageError> { Ok(()) }
272    async fn load_sessions(&self, _scope: &str) -> Result<Vec<SessionRow>, StorageError> { Ok(vec![]) }
273    async fn delete_session(&self, _scope: &str, _key: &str) -> Result<bool, StorageError> { Ok(false) }
274
275    async fn save_daemon(&self, _daemon: &DaemonRow) -> Result<(), StorageError> { Ok(()) }
276    async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> { Ok(vec![]) }
277    async fn delete_daemon(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
278
279    async fn append_audit(&self, _entry: &AuditRow) -> Result<(), StorageError> { Ok(()) }
280    async fn query_audit(&self, _limit: usize) -> Result<Vec<AuditRow>, StorageError> { Ok(vec![]) }
281
282    async fn save_axon_store(&self, _store: &AxonStoreRow) -> Result<(), StorageError> { Ok(()) }
283    async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> { Ok(vec![]) }
284    async fn delete_axon_store(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
285
286    async fn save_dataspace(&self, _ds: &DataspaceRow) -> Result<(), StorageError> { Ok(()) }
287    async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> { Ok(vec![]) }
288    async fn delete_dataspace(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
289
290    async fn save_hibernation(&self, _session: &HibernationRow) -> Result<(), StorageError> { Ok(()) }
291    async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> { Ok(vec![]) }
292
293    async fn append_event(&self, _event: &EventRow) -> Result<(), StorageError> { Ok(()) }
294    async fn query_events(&self, _topic: Option<&str>, _limit: usize) -> Result<Vec<EventRow>, StorageError> { Ok(vec![]) }
295
296    async fn save_cache_entry(&self, _entry: &CacheRow) -> Result<(), StorageError> { Ok(()) }
297    async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> { Ok(vec![]) }
298    async fn evict_expired_cache(&self) -> Result<u64, StorageError> { Ok(0) }
299
300    async fn record_cost(&self, _cost: &CostRow) -> Result<(), StorageError> { Ok(()) }
301    async fn query_costs(&self, _flow: Option<&str>, _limit: usize) -> Result<Vec<CostRow>, StorageError> { Ok(vec![]) }
302
303    async fn save_schedule(&self, _schedule: &ScheduleRow) -> Result<(), StorageError> { Ok(()) }
304    async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> { Ok(vec![]) }
305    async fn delete_schedule(&self, _name: &str) -> Result<bool, StorageError> { Ok(false) }
306
307    async fn is_healthy(&self) -> bool { true }
308}
309
310// ── Storage Dispatcher ─────────────────────────────────────────────────────
311
312/// Concrete dispatcher that delegates to the configured storage backend.
313/// Uses an enum instead of `dyn` trait to avoid dyn-compatibility issues
314/// with async return types.
315pub enum StorageDispatcher {
316    InMemory(InMemoryBackend),
317    Postgres(crate::storage_postgres::PostgresBackend),
318}
319
320impl StorageDispatcher {
321    pub fn in_memory() -> Self {
322        StorageDispatcher::InMemory(InMemoryBackend::new())
323    }
324
325    pub fn postgres(pool: sqlx::PgPool) -> Self {
326        StorageDispatcher::Postgres(crate::storage_postgres::PostgresBackend::new(pool))
327    }
328}
329
330/// Macro to delegate all StorageBackend methods to the inner variant.
331macro_rules! dispatch {
332    ($self:expr, $method:ident $(, $arg:expr)*) => {
333        match $self {
334            StorageDispatcher::InMemory(b) => b.$method($($arg),*).await,
335            StorageDispatcher::Postgres(b) => b.$method($($arg),*).await,
336        }
337    };
338}
339
340impl StorageBackend for StorageDispatcher {
341    async fn save_trace(&self, trace: &TraceRow) -> Result<(), StorageError> { dispatch!(self, save_trace, trace) }
342    async fn load_traces(&self, limit: usize, offset: usize) -> Result<Vec<TraceRow>, StorageError> { dispatch!(self, load_traces, limit, offset) }
343    async fn get_trace(&self, trace_id: u64) -> Result<Option<TraceRow>, StorageError> { dispatch!(self, get_trace, trace_id) }
344    async fn delete_traces(&self, ids: &[u64]) -> Result<u64, StorageError> { dispatch!(self, delete_traces, ids) }
345
346    async fn save_session(&self, entry: &SessionRow) -> Result<(), StorageError> { dispatch!(self, save_session, entry) }
347    async fn load_sessions(&self, scope: &str) -> Result<Vec<SessionRow>, StorageError> { dispatch!(self, load_sessions, scope) }
348    async fn delete_session(&self, scope: &str, key: &str) -> Result<bool, StorageError> { dispatch!(self, delete_session, scope, key) }
349
350    async fn save_daemon(&self, daemon: &DaemonRow) -> Result<(), StorageError> { dispatch!(self, save_daemon, daemon) }
351    async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> { dispatch!(self, load_daemons) }
352    async fn delete_daemon(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_daemon, name) }
353
354    async fn append_audit(&self, entry: &AuditRow) -> Result<(), StorageError> { dispatch!(self, append_audit, entry) }
355    async fn query_audit(&self, limit: usize) -> Result<Vec<AuditRow>, StorageError> { dispatch!(self, query_audit, limit) }
356
357    async fn save_axon_store(&self, store: &AxonStoreRow) -> Result<(), StorageError> { dispatch!(self, save_axon_store, store) }
358    async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> { dispatch!(self, load_axon_stores) }
359    async fn delete_axon_store(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_axon_store, name) }
360
361    async fn save_dataspace(&self, ds: &DataspaceRow) -> Result<(), StorageError> { dispatch!(self, save_dataspace, ds) }
362    async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> { dispatch!(self, load_dataspaces) }
363    async fn delete_dataspace(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_dataspace, name) }
364
365    async fn save_hibernation(&self, session: &HibernationRow) -> Result<(), StorageError> { dispatch!(self, save_hibernation, session) }
366    async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> { dispatch!(self, load_hibernations) }
367
368    async fn append_event(&self, event: &EventRow) -> Result<(), StorageError> { dispatch!(self, append_event, event) }
369    async fn query_events(&self, topic: Option<&str>, limit: usize) -> Result<Vec<EventRow>, StorageError> { dispatch!(self, query_events, topic, limit) }
370
371    async fn save_cache_entry(&self, entry: &CacheRow) -> Result<(), StorageError> { dispatch!(self, save_cache_entry, entry) }
372    async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> { dispatch!(self, load_cache_entries) }
373    async fn evict_expired_cache(&self) -> Result<u64, StorageError> { dispatch!(self, evict_expired_cache) }
374
375    async fn record_cost(&self, cost: &CostRow) -> Result<(), StorageError> { dispatch!(self, record_cost, cost) }
376    async fn query_costs(&self, flow: Option<&str>, limit: usize) -> Result<Vec<CostRow>, StorageError> { dispatch!(self, query_costs, flow, limit) }
377
378    async fn save_schedule(&self, schedule: &ScheduleRow) -> Result<(), StorageError> { dispatch!(self, save_schedule, schedule) }
379    async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> { dispatch!(self, load_schedules) }
380    async fn delete_schedule(&self, name: &str) -> Result<bool, StorageError> { dispatch!(self, delete_schedule, name) }
381
382    async fn is_healthy(&self) -> bool {
383        match self {
384            StorageDispatcher::InMemory(b) => b.is_healthy().await,
385            StorageDispatcher::Postgres(b) => b.is_healthy().await,
386        }
387    }
388}
389
390// ── Tests ──────────────────────────────────────────────────────────────────
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    #[tokio::test]
397    async fn test_in_memory_trace_round_trip() {
398        let backend = InMemoryBackend::new();
399        let trace = TraceRow {
400            tenant_id: "default".into(),
401            trace_id: 1,
402            flow_name: "test_flow".into(),
403            status: "success".into(),
404            steps_executed: 3,
405            latency_ms: 150,
406            tokens_input: 100,
407            tokens_output: 50,
408            anchor_checks: 2,
409            anchor_breaches: 0,
410            errors: 0,
411            retries: 0,
412            source_file: "test.axon".into(),
413            backend: "stub".into(),
414            client_key: "".into(),
415            replay_of: None,
416            correlation_id: None,
417            events: serde_json::json!([]),
418            annotations: serde_json::json!([]),
419        };
420        assert!(backend.save_trace(&trace).await.is_ok());
421        let loaded = backend.load_traces(10, 0).await.unwrap();
422        assert!(loaded.is_empty()); // InMemory returns empty
423    }
424
425    #[tokio::test]
426    async fn test_in_memory_session_ops() {
427        let backend = InMemoryBackend::new();
428        let session = SessionRow {
429            tenant_id: "default".into(),
430            scope: "default".into(),
431            key: "user_name".into(),
432            value: "Alice".into(),
433            source_step: "step_1".into(),
434        };
435        assert!(backend.save_session(&session).await.is_ok());
436        assert!(backend.load_sessions("default").await.unwrap().is_empty());
437        assert!(!backend.delete_session("default", "user_name").await.unwrap());
438    }
439
440    #[tokio::test]
441    async fn test_in_memory_daemon_ops() {
442        let backend = InMemoryBackend::new();
443        let daemon = DaemonRow {
444            tenant_id: "default".into(),
445            name: "agent_1".into(),
446            state: "running".into(),
447            source_file: "agent.axon".into(),
448            flow_name: "main".into(),
449            event_count: 0,
450            restart_count: 0,
451            trigger_topic: Some("user.input".into()),
452            output_topic: None,
453            lifecycle_events: serde_json::json!([]),
454        };
455        assert!(backend.save_daemon(&daemon).await.is_ok());
456        assert!(backend.load_daemons().await.unwrap().is_empty());
457    }
458
459    #[tokio::test]
460    async fn test_in_memory_audit_ops() {
461        let backend = InMemoryBackend::new();
462        let entry = AuditRow {
463            tenant_id: "default".into(),
464            action: "deploy".into(),
465            actor: "admin".into(),
466            target: "flow_1".into(),
467            detail: serde_json::json!({"version": "1.0"}),
468        };
469        assert!(backend.append_audit(&entry).await.is_ok());
470        assert!(backend.query_audit(10).await.unwrap().is_empty());
471    }
472
473    #[tokio::test]
474    async fn test_in_memory_hibernation_ops() {
475        let backend = InMemoryBackend::new();
476        let hib = HibernationRow {
477            tenant_id: "default".into(),
478            id: "h1".into(),
479            name: "example_agent".into(),
480            operation: "process_document".into(),
481            status: "active".into(),
482            checkpoints: serde_json::json!([]),
483            resumed_from: None,
484            created_at: 1700000000,
485            last_status_change: 1700000000,
486            next_checkpoint_id: 1,
487        };
488        assert!(backend.save_hibernation(&hib).await.is_ok());
489        assert!(backend.load_hibernations().await.unwrap().is_empty());
490    }
491
492    #[tokio::test]
493    async fn test_in_memory_cost_ops() {
494        let backend = InMemoryBackend::new();
495        let cost = CostRow {
496            tenant_id: "default".into(),
497            flow_name: "analysis".into(),
498            backend: "anthropic".into(),
499            input_tokens: 1000,
500            output_tokens: 500,
501            cost_usd: 0.015,
502        };
503        assert!(backend.record_cost(&cost).await.is_ok());
504        assert!(backend.query_costs(None, 10).await.unwrap().is_empty());
505    }
506
507    #[tokio::test]
508    async fn test_in_memory_health() {
509        let backend = InMemoryBackend::new();
510        assert!(backend.is_healthy().await);
511    }
512
513    #[tokio::test]
514    async fn test_in_memory_cache_ops() {
515        let backend = InMemoryBackend::new();
516        let cache = CacheRow {
517            tenant_id: "default".into(),
518            flow_name: "test".into(),
519            cache_key: "k1".into(),
520            result: serde_json::json!({"output": "hello"}),
521            ttl_secs: Some(300),
522            hit_count: 0,
523        };
524        assert!(backend.save_cache_entry(&cache).await.is_ok());
525        assert!(backend.load_cache_entries().await.unwrap().is_empty());
526        assert_eq!(backend.evict_expired_cache().await.unwrap(), 0);
527    }
528
529    #[tokio::test]
530    async fn test_in_memory_schedule_ops() {
531        let backend = InMemoryBackend::new();
532        let schedule = ScheduleRow {
533            tenant_id: "default".into(),
534            name: "daily_report".into(),
535            flow_name: "report".into(),
536            interval_secs: 86400,
537            enabled: true,
538            backend: "anthropic".into(),
539            last_run: 0,
540            next_run: 86400,
541            run_count: 0,
542            error_count: 0,
543        };
544        assert!(backend.save_schedule(&schedule).await.is_ok());
545        assert!(backend.load_schedules().await.unwrap().is_empty());
546        assert!(!backend.delete_schedule("daily_report").await.unwrap());
547    }
548
549    #[tokio::test]
550    async fn test_in_memory_event_ops() {
551        let backend = InMemoryBackend::new();
552        let event = EventRow {
553            tenant_id: "default".into(),
554            topic: "flow.completed".into(),
555            source: "executor".into(),
556            payload: serde_json::json!({"flow": "test"}),
557        };
558        assert!(backend.append_event(&event).await.is_ok());
559        assert!(backend.query_events(None, 10).await.unwrap().is_empty());
560        assert!(backend.query_events(Some("flow.completed"), 10).await.unwrap().is_empty());
561    }
562
563    #[test]
564    fn test_storage_error_display() {
565        assert_eq!(
566            format!("{}", StorageError::ConnectionError("timeout".into())),
567            "connection error: timeout"
568        );
569        assert_eq!(
570            format!("{}", StorageError::QueryError("syntax".into())),
571            "query error: syntax"
572        );
573        assert_eq!(
574            format!("{}", StorageError::NotFound("trace_42".into())),
575            "not found: trace_42"
576        );
577    }
578}