Skip to main content

axon/
storage_postgres.rs

1//! PostgreSQL Storage Backend — full persistent storage for AxonServer.
2//!
3//! Every data method:
4//!   1. Reads the active tenant from `crate::tenant::current_tenant_id()` (task-local)
5//!   2. Opens a transaction
6//!   3. Executes `SET LOCAL axon.current_tenant = '<tenant>'` so Postgres RLS
7//!      policies activate for the duration of that transaction
8//!   4. Runs the actual DML
9//!   5. Commits
10//!
11//! This means RLS isolation is enforced even if application code forgets to
12//! filter by tenant_id — Postgres blocks the query at the row level.
13
14use sqlx::PgPool;
15use sqlx::Row;
16use crate::storage::*;
17
18// ── Helpers ───────────────────────────────────────────────────────────────────
19
20/// Open a transaction and activate RLS for the current tenant.
21/// Every data method must call this instead of `pool.begin()` directly.
22macro_rules! begin_tenant_tx {
23    ($pool:expr, $tenant:expr, $op:literal) => {{
24        let mut tx = $pool.begin().await.map_err(|e| {
25            StorageError::ConnectionError(format!("{}: begin tx: {e}", $op))
26        })?;
27        sqlx::query("SET LOCAL axon.current_tenant = $1")
28            .bind($tenant)
29            .execute(&mut *tx)
30            .await
31            .map_err(|e| StorageError::QueryError(format!("{}: set_tenant: {e}", $op)))?;
32        tx
33    }};
34}
35
36macro_rules! commit_tx {
37    ($tx:expr, $op:literal) => {
38        $tx.commit().await.map_err(|e| {
39            StorageError::QueryError(format!("{}: commit: {e}", $op))
40        })?
41    };
42}
43
44// ── PostgresBackend ───────────────────────────────────────────────────────────
45
46/// PostgreSQL implementation of StorageBackend.
47pub struct PostgresBackend {
48    pub pool: PgPool,
49}
50
51impl PostgresBackend {
52    pub fn new(pool: PgPool) -> Self {
53        Self { pool }
54    }
55}
56
57impl StorageBackend for PostgresBackend {
58
59    // ── Traces ────────────────────────────────────────────────────────────────
60
61    async fn save_trace(&self, trace: &TraceRow) -> Result<(), StorageError> {
62        let tid = crate::tenant::current_tenant_id();
63        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_trace");
64        sqlx::query(
65            "INSERT INTO traces \
66             (tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
67              tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
68              source_file, backend, client_key, replay_of, correlation_id, events, annotations) \
69             VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19) \
70             ON CONFLICT (tenant_id, trace_id) DO UPDATE SET \
71             status = EXCLUDED.status, steps_executed = EXCLUDED.steps_executed, \
72             latency_ms = EXCLUDED.latency_ms, tokens_input = EXCLUDED.tokens_input, \
73             tokens_output = EXCLUDED.tokens_output, events = EXCLUDED.events, \
74             annotations = EXCLUDED.annotations"
75        )
76        .bind(&tid)
77        .bind(trace.trace_id as i64)
78        .bind(&trace.flow_name)
79        .bind(&trace.status)
80        .bind(trace.steps_executed as i32)
81        .bind(trace.latency_ms as i64)
82        .bind(trace.tokens_input as i64)
83        .bind(trace.tokens_output as i64)
84        .bind(trace.anchor_checks as i32)
85        .bind(trace.anchor_breaches as i32)
86        .bind(trace.errors as i32)
87        .bind(trace.retries as i32)
88        .bind(&trace.source_file)
89        .bind(&trace.backend)
90        .bind(&trace.client_key)
91        .bind(trace.replay_of.map(|v| v as i64))
92        .bind(&trace.correlation_id)
93        .bind(&trace.events)
94        .bind(&trace.annotations)
95        .execute(&mut *tx)
96        .await
97        .map_err(|e| StorageError::QueryError(format!("save_trace: {e}")))?;
98        commit_tx!(tx, "save_trace");
99        Ok(())
100    }
101
102    async fn load_traces(&self, limit: usize, offset: usize) -> Result<Vec<TraceRow>, StorageError> {
103        let tid = crate::tenant::current_tenant_id();
104        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_traces");
105        let rows = sqlx::query(
106            "SELECT tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
107             tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
108             source_file, backend, client_key, replay_of, correlation_id, events, annotations \
109             FROM traces ORDER BY timestamp_utc DESC LIMIT $1 OFFSET $2"
110        )
111        .bind(limit as i64)
112        .bind(offset as i64)
113        .fetch_all(&mut *tx)
114        .await
115        .map_err(|e| StorageError::QueryError(format!("load_traces: {e}")))?;
116        commit_tx!(tx, "load_traces");
117        Ok(rows.iter().map(|r| TraceRow {
118            tenant_id: r.get("tenant_id"),
119            trace_id: r.get::<i64, _>("trace_id") as u64,
120            flow_name: r.get("flow_name"),
121            status: r.get("status"),
122            steps_executed: r.get::<i32, _>("steps_executed") as u32,
123            latency_ms: r.get::<i64, _>("latency_ms") as u64,
124            tokens_input: r.get::<i64, _>("tokens_input") as u64,
125            tokens_output: r.get::<i64, _>("tokens_output") as u64,
126            anchor_checks: r.get::<i32, _>("anchor_checks") as u32,
127            anchor_breaches: r.get::<i32, _>("anchor_breaches") as u32,
128            errors: r.get::<i32, _>("errors") as u32,
129            retries: r.get::<i32, _>("retries") as u32,
130            source_file: r.get("source_file"),
131            backend: r.get("backend"),
132            client_key: r.get("client_key"),
133            replay_of: r.get::<Option<i64>, _>("replay_of").map(|v| v as u64),
134            correlation_id: r.get("correlation_id"),
135            events: r.get("events"),
136            annotations: r.get("annotations"),
137        }).collect())
138    }
139
140    async fn get_trace(&self, trace_id: u64) -> Result<Option<TraceRow>, StorageError> {
141        let tid = crate::tenant::current_tenant_id();
142        let mut tx = begin_tenant_tx!(&self.pool, &tid, "get_trace");
143        let row = sqlx::query(
144            "SELECT tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
145             tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
146             source_file, backend, client_key, replay_of, correlation_id, events, annotations \
147             FROM traces WHERE trace_id = $1"
148        )
149        .bind(trace_id as i64)
150        .fetch_optional(&mut *tx)
151        .await
152        .map_err(|e| StorageError::QueryError(format!("get_trace: {e}")))?;
153        commit_tx!(tx, "get_trace");
154        Ok(row.map(|r| TraceRow {
155            tenant_id: r.get("tenant_id"),
156            trace_id: r.get::<i64, _>("trace_id") as u64,
157            flow_name: r.get("flow_name"),
158            status: r.get("status"),
159            steps_executed: r.get::<i32, _>("steps_executed") as u32,
160            latency_ms: r.get::<i64, _>("latency_ms") as u64,
161            tokens_input: r.get::<i64, _>("tokens_input") as u64,
162            tokens_output: r.get::<i64, _>("tokens_output") as u64,
163            anchor_checks: r.get::<i32, _>("anchor_checks") as u32,
164            anchor_breaches: r.get::<i32, _>("anchor_breaches") as u32,
165            errors: r.get::<i32, _>("errors") as u32,
166            retries: r.get::<i32, _>("retries") as u32,
167            source_file: r.get("source_file"),
168            backend: r.get("backend"),
169            client_key: r.get("client_key"),
170            replay_of: r.get::<Option<i64>, _>("replay_of").map(|v| v as u64),
171            correlation_id: r.get("correlation_id"),
172            events: r.get("events"),
173            annotations: r.get("annotations"),
174        }))
175    }
176
177    async fn delete_traces(&self, ids: &[u64]) -> Result<u64, StorageError> {
178        let tid = crate::tenant::current_tenant_id();
179        let ids_i64: Vec<i64> = ids.iter().map(|&id| id as i64).collect();
180        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_traces");
181        let result = sqlx::query("DELETE FROM traces WHERE trace_id = ANY($1)")
182            .bind(&ids_i64)
183            .execute(&mut *tx)
184            .await
185            .map_err(|e| StorageError::QueryError(format!("delete_traces: {e}")))?;
186        commit_tx!(tx, "delete_traces");
187        Ok(result.rows_affected())
188    }
189
190    // ── Sessions ──────────────────────────────────────────────────────────────
191
192    async fn save_session(&self, entry: &SessionRow) -> Result<(), StorageError> {
193        let tid = crate::tenant::current_tenant_id();
194        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_session");
195        sqlx::query(
196            "INSERT INTO sessions (tenant_id, scope, key, value, source_step) \
197             VALUES ($1,$2,$3,$4,$5) \
198             ON CONFLICT (tenant_id, scope, key) DO UPDATE SET \
199             value = EXCLUDED.value, source_step = EXCLUDED.source_step"
200        )
201        .bind(&tid)
202        .bind(&entry.scope)
203        .bind(&entry.key)
204        .bind(&entry.value)
205        .bind(&entry.source_step)
206        .execute(&mut *tx)
207        .await
208        .map_err(|e| StorageError::QueryError(format!("save_session: {e}")))?;
209        commit_tx!(tx, "save_session");
210        Ok(())
211    }
212
213    async fn load_sessions(&self, scope: &str) -> Result<Vec<SessionRow>, StorageError> {
214        let tid = crate::tenant::current_tenant_id();
215        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_sessions");
216        let rows = sqlx::query(
217            "SELECT tenant_id, scope, key, value, source_step FROM sessions WHERE scope = $1"
218        )
219        .bind(scope)
220        .fetch_all(&mut *tx)
221        .await
222        .map_err(|e| StorageError::QueryError(format!("load_sessions: {e}")))?;
223        commit_tx!(tx, "load_sessions");
224        Ok(rows.iter().map(|r| SessionRow {
225            tenant_id: r.get("tenant_id"),
226            scope: r.get("scope"),
227            key: r.get("key"),
228            value: r.get("value"),
229            source_step: r.get("source_step"),
230        }).collect())
231    }
232
233    async fn delete_session(&self, scope: &str, key: &str) -> Result<bool, StorageError> {
234        let tid = crate::tenant::current_tenant_id();
235        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_session");
236        let result = sqlx::query("DELETE FROM sessions WHERE scope = $1 AND key = $2")
237            .bind(scope)
238            .bind(key)
239            .execute(&mut *tx)
240            .await
241            .map_err(|e| StorageError::QueryError(format!("delete_session: {e}")))?;
242        commit_tx!(tx, "delete_session");
243        Ok(result.rows_affected() > 0)
244    }
245
246    // ── Daemons ───────────────────────────────────────────────────────────────
247
248    async fn save_daemon(&self, daemon: &DaemonRow) -> Result<(), StorageError> {
249        let tid = crate::tenant::current_tenant_id();
250        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_daemon");
251        sqlx::query(
252            "INSERT INTO daemons \
253             (tenant_id, name, state, source_file, flow_name, event_count, restart_count, \
254              trigger_topic, output_topic, lifecycle_events) \
255             VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
256             ON CONFLICT (tenant_id, name) DO UPDATE SET \
257             state = EXCLUDED.state, event_count = EXCLUDED.event_count, \
258             restart_count = EXCLUDED.restart_count, lifecycle_events = EXCLUDED.lifecycle_events"
259        )
260        .bind(&tid)
261        .bind(&daemon.name)
262        .bind(&daemon.state)
263        .bind(&daemon.source_file)
264        .bind(&daemon.flow_name)
265        .bind(daemon.event_count as i64)
266        .bind(daemon.restart_count as i32)
267        .bind(&daemon.trigger_topic)
268        .bind(&daemon.output_topic)
269        .bind(&daemon.lifecycle_events)
270        .execute(&mut *tx)
271        .await
272        .map_err(|e| StorageError::QueryError(format!("save_daemon: {e}")))?;
273        commit_tx!(tx, "save_daemon");
274        Ok(())
275    }
276
277    async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> {
278        let tid = crate::tenant::current_tenant_id();
279        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_daemons");
280        let rows = sqlx::query(
281            "SELECT tenant_id, name, state, source_file, flow_name, event_count, \
282             restart_count, trigger_topic, output_topic, lifecycle_events FROM daemons"
283        )
284        .fetch_all(&mut *tx)
285        .await
286        .map_err(|e| StorageError::QueryError(format!("load_daemons: {e}")))?;
287        commit_tx!(tx, "load_daemons");
288        Ok(rows.iter().map(|r| DaemonRow {
289            tenant_id: r.get("tenant_id"),
290            name: r.get("name"),
291            state: r.get("state"),
292            source_file: r.get("source_file"),
293            flow_name: r.get("flow_name"),
294            event_count: r.get::<i64, _>("event_count") as u64,
295            restart_count: r.get::<i32, _>("restart_count") as u32,
296            trigger_topic: r.get("trigger_topic"),
297            output_topic: r.get("output_topic"),
298            lifecycle_events: r.get("lifecycle_events"),
299        }).collect())
300    }
301
302    async fn delete_daemon(&self, name: &str) -> Result<bool, StorageError> {
303        let tid = crate::tenant::current_tenant_id();
304        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_daemon");
305        let result = sqlx::query("DELETE FROM daemons WHERE name = $1")
306            .bind(name)
307            .execute(&mut *tx)
308            .await
309            .map_err(|e| StorageError::QueryError(format!("delete_daemon: {e}")))?;
310        commit_tx!(tx, "delete_daemon");
311        Ok(result.rows_affected() > 0)
312    }
313
314    // ── Audit ─────────────────────────────────────────────────────────────────
315
316    async fn append_audit(&self, entry: &AuditRow) -> Result<(), StorageError> {
317        let tid = crate::tenant::current_tenant_id();
318        let mut tx = begin_tenant_tx!(&self.pool, &tid, "append_audit");
319        sqlx::query(
320            "INSERT INTO audit_log (tenant_id, action, actor, target, detail) \
321             VALUES ($1,$2,$3,$4,$5)"
322        )
323        .bind(&tid)
324        .bind(&entry.action)
325        .bind(&entry.actor)
326        .bind(&entry.target)
327        .bind(&entry.detail)
328        .execute(&mut *tx)
329        .await
330        .map_err(|e| StorageError::QueryError(format!("append_audit: {e}")))?;
331        commit_tx!(tx, "append_audit");
332        Ok(())
333    }
334
335    async fn query_audit(&self, limit: usize) -> Result<Vec<AuditRow>, StorageError> {
336        let tid = crate::tenant::current_tenant_id();
337        let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_audit");
338        let rows = sqlx::query(
339            "SELECT tenant_id, action, actor, target, detail \
340             FROM audit_log ORDER BY timestamp_utc DESC LIMIT $1"
341        )
342        .bind(limit as i64)
343        .fetch_all(&mut *tx)
344        .await
345        .map_err(|e| StorageError::QueryError(format!("query_audit: {e}")))?;
346        commit_tx!(tx, "query_audit");
347        Ok(rows.iter().map(|r| AuditRow {
348            tenant_id: r.get("tenant_id"),
349            action: r.get("action"),
350            actor: r.get("actor"),
351            target: r.get("target"),
352            detail: r.get("detail"),
353        }).collect())
354    }
355
356    // ── AxonStores ────────────────────────────────────────────────────────────
357
358    async fn save_axon_store(&self, store: &AxonStoreRow) -> Result<(), StorageError> {
359        let tid = crate::tenant::current_tenant_id();
360        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_axon_store");
361        sqlx::query(
362            "INSERT INTO axon_stores (tenant_id, name, ontology, entries, created_at, total_ops) \
363             VALUES ($1,$2,$3,$4,$5,$6) \
364             ON CONFLICT (tenant_id, name) DO UPDATE SET \
365             entries = EXCLUDED.entries, total_ops = EXCLUDED.total_ops"
366        )
367        .bind(&tid)
368        .bind(&store.name)
369        .bind(&store.ontology)
370        .bind(&store.entries)
371        .bind(store.created_at as i64)
372        .bind(store.total_ops as i64)
373        .execute(&mut *tx)
374        .await
375        .map_err(|e| StorageError::QueryError(format!("save_axon_store: {e}")))?;
376        commit_tx!(tx, "save_axon_store");
377        Ok(())
378    }
379
380    async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> {
381        let tid = crate::tenant::current_tenant_id();
382        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_axon_stores");
383        let rows = sqlx::query(
384            "SELECT tenant_id, name, ontology, entries, created_at, total_ops FROM axon_stores"
385        )
386        .fetch_all(&mut *tx)
387        .await
388        .map_err(|e| StorageError::QueryError(format!("load_axon_stores: {e}")))?;
389        commit_tx!(tx, "load_axon_stores");
390        Ok(rows.iter().map(|r| AxonStoreRow {
391            tenant_id: r.get("tenant_id"),
392            name: r.get("name"),
393            ontology: r.get("ontology"),
394            entries: r.get("entries"),
395            created_at: r.get::<i64, _>("created_at") as u64,
396            total_ops: r.get::<i64, _>("total_ops") as u64,
397        }).collect())
398    }
399
400    async fn delete_axon_store(&self, name: &str) -> Result<bool, StorageError> {
401        let tid = crate::tenant::current_tenant_id();
402        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_axon_store");
403        let result = sqlx::query("DELETE FROM axon_stores WHERE name = $1")
404            .bind(name)
405            .execute(&mut *tx)
406            .await
407            .map_err(|e| StorageError::QueryError(format!("delete_axon_store: {e}")))?;
408        commit_tx!(tx, "delete_axon_store");
409        Ok(result.rows_affected() > 0)
410    }
411
412    // ── Dataspaces ────────────────────────────────────────────────────────────
413
414    async fn save_dataspace(&self, ds: &DataspaceRow) -> Result<(), StorageError> {
415        let tid = crate::tenant::current_tenant_id();
416        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_dataspace");
417        sqlx::query(
418            "INSERT INTO dataspaces \
419             (tenant_id, name, ontology, entries, associations, created_at, total_ops, next_id) \
420             VALUES ($1,$2,$3,$4,$5,$6,$7,$8) \
421             ON CONFLICT (tenant_id, name) DO UPDATE SET \
422             entries = EXCLUDED.entries, associations = EXCLUDED.associations, \
423             total_ops = EXCLUDED.total_ops, next_id = EXCLUDED.next_id"
424        )
425        .bind(&tid)
426        .bind(&ds.name)
427        .bind(&ds.ontology)
428        .bind(&ds.entries)
429        .bind(&ds.associations)
430        .bind(ds.created_at as i64)
431        .bind(ds.total_ops as i64)
432        .bind(ds.next_id as i64)
433        .execute(&mut *tx)
434        .await
435        .map_err(|e| StorageError::QueryError(format!("save_dataspace: {e}")))?;
436        commit_tx!(tx, "save_dataspace");
437        Ok(())
438    }
439
440    async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> {
441        let tid = crate::tenant::current_tenant_id();
442        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_dataspaces");
443        let rows = sqlx::query(
444            "SELECT tenant_id, name, ontology, entries, associations, \
445             created_at, total_ops, next_id FROM dataspaces"
446        )
447        .fetch_all(&mut *tx)
448        .await
449        .map_err(|e| StorageError::QueryError(format!("load_dataspaces: {e}")))?;
450        commit_tx!(tx, "load_dataspaces");
451        Ok(rows.iter().map(|r| DataspaceRow {
452            tenant_id: r.get("tenant_id"),
453            name: r.get("name"),
454            ontology: r.get("ontology"),
455            entries: r.get("entries"),
456            associations: r.get("associations"),
457            created_at: r.get::<i64, _>("created_at") as u64,
458            total_ops: r.get::<i64, _>("total_ops") as u64,
459            next_id: r.get::<i64, _>("next_id") as u64,
460        }).collect())
461    }
462
463    async fn delete_dataspace(&self, name: &str) -> Result<bool, StorageError> {
464        let tid = crate::tenant::current_tenant_id();
465        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_dataspace");
466        let result = sqlx::query("DELETE FROM dataspaces WHERE name = $1")
467            .bind(name)
468            .execute(&mut *tx)
469            .await
470            .map_err(|e| StorageError::QueryError(format!("delete_dataspace: {e}")))?;
471        commit_tx!(tx, "delete_dataspace");
472        Ok(result.rows_affected() > 0)
473    }
474
475    // ── Hibernations ──────────────────────────────────────────────────────────
476
477    async fn save_hibernation(&self, session: &HibernationRow) -> Result<(), StorageError> {
478        let tid = crate::tenant::current_tenant_id();
479        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_hibernation");
480        sqlx::query(
481            "INSERT INTO hibernations \
482             (tenant_id, id, name, operation, status, checkpoints, resumed_from, \
483              created_at, last_status_change, next_checkpoint_id) \
484             VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
485             ON CONFLICT (tenant_id, id) DO UPDATE SET \
486             status = EXCLUDED.status, checkpoints = EXCLUDED.checkpoints, \
487             resumed_from = EXCLUDED.resumed_from, \
488             last_status_change = EXCLUDED.last_status_change, \
489             next_checkpoint_id = EXCLUDED.next_checkpoint_id"
490        )
491        .bind(&tid)
492        .bind(&session.id)
493        .bind(&session.name)
494        .bind(&session.operation)
495        .bind(&session.status)
496        .bind(&session.checkpoints)
497        .bind(session.resumed_from)
498        .bind(session.created_at as i64)
499        .bind(session.last_status_change as i64)
500        .bind(session.next_checkpoint_id as i32)
501        .execute(&mut *tx)
502        .await
503        .map_err(|e| StorageError::QueryError(format!("save_hibernation: {e}")))?;
504        commit_tx!(tx, "save_hibernation");
505        Ok(())
506    }
507
508    async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> {
509        let tid = crate::tenant::current_tenant_id();
510        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_hibernations");
511        let rows = sqlx::query(
512            "SELECT tenant_id, id, name, operation, status, checkpoints, resumed_from, \
513             created_at, last_status_change, next_checkpoint_id FROM hibernations"
514        )
515        .fetch_all(&mut *tx)
516        .await
517        .map_err(|e| StorageError::QueryError(format!("load_hibernations: {e}")))?;
518        commit_tx!(tx, "load_hibernations");
519        Ok(rows.iter().map(|r| HibernationRow {
520            tenant_id: r.get("tenant_id"),
521            id: r.get("id"),
522            name: r.get("name"),
523            operation: r.get("operation"),
524            status: r.get("status"),
525            checkpoints: r.get("checkpoints"),
526            resumed_from: r.get("resumed_from"),
527            created_at: r.get::<i64, _>("created_at") as u64,
528            last_status_change: r.get::<i64, _>("last_status_change") as u64,
529            next_checkpoint_id: r.get::<i32, _>("next_checkpoint_id") as u32,
530        }).collect())
531    }
532
533    // ── Events ────────────────────────────────────────────────────────────────
534
535    async fn append_event(&self, event: &EventRow) -> Result<(), StorageError> {
536        let tid = crate::tenant::current_tenant_id();
537        let mut tx = begin_tenant_tx!(&self.pool, &tid, "append_event");
538        sqlx::query(
539            "INSERT INTO event_history (tenant_id, topic, source, payload) VALUES ($1,$2,$3,$4)"
540        )
541        .bind(&tid)
542        .bind(&event.topic)
543        .bind(&event.source)
544        .bind(&event.payload)
545        .execute(&mut *tx)
546        .await
547        .map_err(|e| StorageError::QueryError(format!("append_event: {e}")))?;
548        commit_tx!(tx, "append_event");
549        Ok(())
550    }
551
552    async fn query_events(&self, topic: Option<&str>, limit: usize) -> Result<Vec<EventRow>, StorageError> {
553        let tid = crate::tenant::current_tenant_id();
554        let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_events");
555        let rows = match topic {
556            Some(t) => {
557                sqlx::query(
558                    "SELECT tenant_id, topic, source, payload FROM event_history \
559                     WHERE topic = $1 ORDER BY timestamp_utc DESC LIMIT $2"
560                )
561                .bind(t)
562                .bind(limit as i64)
563                .fetch_all(&mut *tx)
564                .await
565            }
566            None => {
567                sqlx::query(
568                    "SELECT tenant_id, topic, source, payload FROM event_history \
569                     ORDER BY timestamp_utc DESC LIMIT $1"
570                )
571                .bind(limit as i64)
572                .fetch_all(&mut *tx)
573                .await
574            }
575        }.map_err(|e| StorageError::QueryError(format!("query_events: {e}")))?;
576        commit_tx!(tx, "query_events");
577        Ok(rows.iter().map(|r| EventRow {
578            tenant_id: r.get("tenant_id"),
579            topic: r.get("topic"),
580            source: r.get("source"),
581            payload: r.get("payload"),
582        }).collect())
583    }
584
585    // ── Cache ─────────────────────────────────────────────────────────────────
586
587    async fn save_cache_entry(&self, entry: &CacheRow) -> Result<(), StorageError> {
588        let tid = crate::tenant::current_tenant_id();
589        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_cache_entry");
590        sqlx::query(
591            "INSERT INTO execution_cache \
592             (tenant_id, flow_name, cache_key, result, ttl_secs, hit_count) \
593             VALUES ($1,$2,$3,$4,$5,$6) \
594             ON CONFLICT (tenant_id, cache_key) DO UPDATE SET \
595             result = EXCLUDED.result, ttl_secs = EXCLUDED.ttl_secs, \
596             hit_count = EXCLUDED.hit_count"
597        )
598        .bind(&tid)
599        .bind(&entry.flow_name)
600        .bind(&entry.cache_key)
601        .bind(&entry.result)
602        .bind(entry.ttl_secs)
603        .bind(entry.hit_count as i64)
604        .execute(&mut *tx)
605        .await
606        .map_err(|e| StorageError::QueryError(format!("save_cache_entry: {e}")))?;
607        commit_tx!(tx, "save_cache_entry");
608        Ok(())
609    }
610
611    async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> {
612        let tid = crate::tenant::current_tenant_id();
613        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_cache_entries");
614        let rows = sqlx::query(
615            "SELECT tenant_id, flow_name, cache_key, result, ttl_secs, hit_count \
616             FROM execution_cache"
617        )
618        .fetch_all(&mut *tx)
619        .await
620        .map_err(|e| StorageError::QueryError(format!("load_cache_entries: {e}")))?;
621        commit_tx!(tx, "load_cache_entries");
622        Ok(rows.iter().map(|r| CacheRow {
623            tenant_id: r.get("tenant_id"),
624            flow_name: r.get("flow_name"),
625            cache_key: r.get("cache_key"),
626            result: r.get("result"),
627            ttl_secs: r.get("ttl_secs"),
628            hit_count: r.get::<i64, _>("hit_count") as u64,
629        }).collect())
630    }
631
632    async fn evict_expired_cache(&self) -> Result<u64, StorageError> {
633        let tid = crate::tenant::current_tenant_id();
634        let mut tx = begin_tenant_tx!(&self.pool, &tid, "evict_expired_cache");
635        let result = sqlx::query(
636            "DELETE FROM execution_cache WHERE ttl_secs IS NOT NULL AND \
637             created_at + (ttl_secs || ' seconds')::interval < NOW()"
638        )
639        .execute(&mut *tx)
640        .await
641        .map_err(|e| StorageError::QueryError(format!("evict_expired_cache: {e}")))?;
642        commit_tx!(tx, "evict_expired_cache");
643        Ok(result.rows_affected())
644    }
645
646    // ── Cost tracking ─────────────────────────────────────────────────────────
647
648    async fn record_cost(&self, cost: &CostRow) -> Result<(), StorageError> {
649        let tid = crate::tenant::current_tenant_id();
650        let mut tx = begin_tenant_tx!(&self.pool, &tid, "record_cost");
651        sqlx::query(
652            "INSERT INTO cost_tracking \
653             (tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd) \
654             VALUES ($1,$2,$3,$4,$5,$6)"
655        )
656        .bind(&tid)
657        .bind(&cost.flow_name)
658        .bind(&cost.backend)
659        .bind(cost.input_tokens as i64)
660        .bind(cost.output_tokens as i64)
661        .bind(cost.cost_usd)
662        .execute(&mut *tx)
663        .await
664        .map_err(|e| StorageError::QueryError(format!("record_cost: {e}")))?;
665        commit_tx!(tx, "record_cost");
666        Ok(())
667    }
668
669    async fn query_costs(&self, flow: Option<&str>, limit: usize) -> Result<Vec<CostRow>, StorageError> {
670        let tid = crate::tenant::current_tenant_id();
671        let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_costs");
672        let rows = match flow {
673            Some(f) => {
674                sqlx::query(
675                    "SELECT tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd \
676                     FROM cost_tracking WHERE flow_name = $1 \
677                     ORDER BY timestamp_utc DESC LIMIT $2"
678                )
679                .bind(f)
680                .bind(limit as i64)
681                .fetch_all(&mut *tx)
682                .await
683            }
684            None => {
685                sqlx::query(
686                    "SELECT tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd \
687                     FROM cost_tracking ORDER BY timestamp_utc DESC LIMIT $1"
688                )
689                .bind(limit as i64)
690                .fetch_all(&mut *tx)
691                .await
692            }
693        }.map_err(|e| StorageError::QueryError(format!("query_costs: {e}")))?;
694        commit_tx!(tx, "query_costs");
695        Ok(rows.iter().map(|r| CostRow {
696            tenant_id: r.get("tenant_id"),
697            flow_name: r.get("flow_name"),
698            backend: r.get("backend"),
699            input_tokens: r.get::<i64, _>("input_tokens") as u64,
700            output_tokens: r.get::<i64, _>("output_tokens") as u64,
701            cost_usd: r.get("cost_usd"),
702        }).collect())
703    }
704
705    // ── Schedules ─────────────────────────────────────────────────────────────
706
707    async fn save_schedule(&self, schedule: &ScheduleRow) -> Result<(), StorageError> {
708        let tid = crate::tenant::current_tenant_id();
709        let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_schedule");
710        sqlx::query(
711            "INSERT INTO schedules \
712             (tenant_id, name, flow_name, interval_secs, enabled, backend, \
713              last_run, next_run, run_count, error_count) \
714             VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
715             ON CONFLICT (tenant_id, name) DO UPDATE SET \
716             enabled = EXCLUDED.enabled, last_run = EXCLUDED.last_run, \
717             next_run = EXCLUDED.next_run, run_count = EXCLUDED.run_count, \
718             error_count = EXCLUDED.error_count"
719        )
720        .bind(&tid)
721        .bind(&schedule.name)
722        .bind(&schedule.flow_name)
723        .bind(schedule.interval_secs as i64)
724        .bind(schedule.enabled)
725        .bind(&schedule.backend)
726        .bind(schedule.last_run as i64)
727        .bind(schedule.next_run as i64)
728        .bind(schedule.run_count as i64)
729        .bind(schedule.error_count as i64)
730        .execute(&mut *tx)
731        .await
732        .map_err(|e| StorageError::QueryError(format!("save_schedule: {e}")))?;
733        commit_tx!(tx, "save_schedule");
734        Ok(())
735    }
736
737    async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> {
738        let tid = crate::tenant::current_tenant_id();
739        let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_schedules");
740        let rows = sqlx::query(
741            "SELECT tenant_id, name, flow_name, interval_secs, enabled, backend, \
742             last_run, next_run, run_count, error_count FROM schedules"
743        )
744        .fetch_all(&mut *tx)
745        .await
746        .map_err(|e| StorageError::QueryError(format!("load_schedules: {e}")))?;
747        commit_tx!(tx, "load_schedules");
748        Ok(rows.iter().map(|r| ScheduleRow {
749            tenant_id: r.get("tenant_id"),
750            name: r.get("name"),
751            flow_name: r.get("flow_name"),
752            interval_secs: r.get::<i64, _>("interval_secs") as u64,
753            enabled: r.get("enabled"),
754            backend: r.get("backend"),
755            last_run: r.get::<i64, _>("last_run") as u64,
756            next_run: r.get::<i64, _>("next_run") as u64,
757            run_count: r.get::<i64, _>("run_count") as u64,
758            error_count: r.get::<i64, _>("error_count") as u64,
759        }).collect())
760    }
761
762    async fn delete_schedule(&self, name: &str) -> Result<bool, StorageError> {
763        let tid = crate::tenant::current_tenant_id();
764        let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_schedule");
765        let result = sqlx::query("DELETE FROM schedules WHERE name = $1")
766            .bind(name)
767            .execute(&mut *tx)
768            .await
769            .map_err(|e| StorageError::QueryError(format!("delete_schedule: {e}")))?;
770        commit_tx!(tx, "delete_schedule");
771        Ok(result.rows_affected() > 0)
772    }
773
774    // ── Health ────────────────────────────────────────────────────────────────
775
776    async fn is_healthy(&self) -> bool {
777        crate::db_pool::check_health(&self.pool).await
778    }
779}