1use sqlx::PgPool;
15use sqlx::Row;
16use crate::storage::*;
17
18macro_rules! begin_tenant_tx {
23 ($pool:expr, $tenant:expr, $op:literal) => {{
24 let mut tx = $pool.begin().await.map_err(|e| {
25 StorageError::ConnectionError(format!("{}: begin tx: {e}", $op))
26 })?;
27 sqlx::query("SET LOCAL axon.current_tenant = $1")
28 .bind($tenant)
29 .execute(&mut *tx)
30 .await
31 .map_err(|e| StorageError::QueryError(format!("{}: set_tenant: {e}", $op)))?;
32 tx
33 }};
34}
35
36macro_rules! commit_tx {
37 ($tx:expr, $op:literal) => {
38 $tx.commit().await.map_err(|e| {
39 StorageError::QueryError(format!("{}: commit: {e}", $op))
40 })?
41 };
42}
43
44pub struct PostgresBackend {
48 pub pool: PgPool,
49}
50
51impl PostgresBackend {
52 pub fn new(pool: PgPool) -> Self {
53 Self { pool }
54 }
55}
56
57impl StorageBackend for PostgresBackend {
58
59 async fn save_trace(&self, trace: &TraceRow) -> Result<(), StorageError> {
62 let tid = crate::tenant::current_tenant_id();
63 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_trace");
64 sqlx::query(
65 "INSERT INTO traces \
66 (tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
67 tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
68 source_file, backend, client_key, replay_of, correlation_id, events, annotations) \
69 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19) \
70 ON CONFLICT (tenant_id, trace_id) DO UPDATE SET \
71 status = EXCLUDED.status, steps_executed = EXCLUDED.steps_executed, \
72 latency_ms = EXCLUDED.latency_ms, tokens_input = EXCLUDED.tokens_input, \
73 tokens_output = EXCLUDED.tokens_output, events = EXCLUDED.events, \
74 annotations = EXCLUDED.annotations"
75 )
76 .bind(&tid)
77 .bind(trace.trace_id as i64)
78 .bind(&trace.flow_name)
79 .bind(&trace.status)
80 .bind(trace.steps_executed as i32)
81 .bind(trace.latency_ms as i64)
82 .bind(trace.tokens_input as i64)
83 .bind(trace.tokens_output as i64)
84 .bind(trace.anchor_checks as i32)
85 .bind(trace.anchor_breaches as i32)
86 .bind(trace.errors as i32)
87 .bind(trace.retries as i32)
88 .bind(&trace.source_file)
89 .bind(&trace.backend)
90 .bind(&trace.client_key)
91 .bind(trace.replay_of.map(|v| v as i64))
92 .bind(&trace.correlation_id)
93 .bind(&trace.events)
94 .bind(&trace.annotations)
95 .execute(&mut *tx)
96 .await
97 .map_err(|e| StorageError::QueryError(format!("save_trace: {e}")))?;
98 commit_tx!(tx, "save_trace");
99 Ok(())
100 }
101
102 async fn load_traces(&self, limit: usize, offset: usize) -> Result<Vec<TraceRow>, StorageError> {
103 let tid = crate::tenant::current_tenant_id();
104 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_traces");
105 let rows = sqlx::query(
106 "SELECT tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
107 tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
108 source_file, backend, client_key, replay_of, correlation_id, events, annotations \
109 FROM traces ORDER BY timestamp_utc DESC LIMIT $1 OFFSET $2"
110 )
111 .bind(limit as i64)
112 .bind(offset as i64)
113 .fetch_all(&mut *tx)
114 .await
115 .map_err(|e| StorageError::QueryError(format!("load_traces: {e}")))?;
116 commit_tx!(tx, "load_traces");
117 Ok(rows.iter().map(|r| TraceRow {
118 tenant_id: r.get("tenant_id"),
119 trace_id: r.get::<i64, _>("trace_id") as u64,
120 flow_name: r.get("flow_name"),
121 status: r.get("status"),
122 steps_executed: r.get::<i32, _>("steps_executed") as u32,
123 latency_ms: r.get::<i64, _>("latency_ms") as u64,
124 tokens_input: r.get::<i64, _>("tokens_input") as u64,
125 tokens_output: r.get::<i64, _>("tokens_output") as u64,
126 anchor_checks: r.get::<i32, _>("anchor_checks") as u32,
127 anchor_breaches: r.get::<i32, _>("anchor_breaches") as u32,
128 errors: r.get::<i32, _>("errors") as u32,
129 retries: r.get::<i32, _>("retries") as u32,
130 source_file: r.get("source_file"),
131 backend: r.get("backend"),
132 client_key: r.get("client_key"),
133 replay_of: r.get::<Option<i64>, _>("replay_of").map(|v| v as u64),
134 correlation_id: r.get("correlation_id"),
135 events: r.get("events"),
136 annotations: r.get("annotations"),
137 }).collect())
138 }
139
140 async fn get_trace(&self, trace_id: u64) -> Result<Option<TraceRow>, StorageError> {
141 let tid = crate::tenant::current_tenant_id();
142 let mut tx = begin_tenant_tx!(&self.pool, &tid, "get_trace");
143 let row = sqlx::query(
144 "SELECT tenant_id, trace_id, flow_name, status, steps_executed, latency_ms, \
145 tokens_input, tokens_output, anchor_checks, anchor_breaches, errors, retries, \
146 source_file, backend, client_key, replay_of, correlation_id, events, annotations \
147 FROM traces WHERE trace_id = $1"
148 )
149 .bind(trace_id as i64)
150 .fetch_optional(&mut *tx)
151 .await
152 .map_err(|e| StorageError::QueryError(format!("get_trace: {e}")))?;
153 commit_tx!(tx, "get_trace");
154 Ok(row.map(|r| TraceRow {
155 tenant_id: r.get("tenant_id"),
156 trace_id: r.get::<i64, _>("trace_id") as u64,
157 flow_name: r.get("flow_name"),
158 status: r.get("status"),
159 steps_executed: r.get::<i32, _>("steps_executed") as u32,
160 latency_ms: r.get::<i64, _>("latency_ms") as u64,
161 tokens_input: r.get::<i64, _>("tokens_input") as u64,
162 tokens_output: r.get::<i64, _>("tokens_output") as u64,
163 anchor_checks: r.get::<i32, _>("anchor_checks") as u32,
164 anchor_breaches: r.get::<i32, _>("anchor_breaches") as u32,
165 errors: r.get::<i32, _>("errors") as u32,
166 retries: r.get::<i32, _>("retries") as u32,
167 source_file: r.get("source_file"),
168 backend: r.get("backend"),
169 client_key: r.get("client_key"),
170 replay_of: r.get::<Option<i64>, _>("replay_of").map(|v| v as u64),
171 correlation_id: r.get("correlation_id"),
172 events: r.get("events"),
173 annotations: r.get("annotations"),
174 }))
175 }
176
177 async fn delete_traces(&self, ids: &[u64]) -> Result<u64, StorageError> {
178 let tid = crate::tenant::current_tenant_id();
179 let ids_i64: Vec<i64> = ids.iter().map(|&id| id as i64).collect();
180 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_traces");
181 let result = sqlx::query("DELETE FROM traces WHERE trace_id = ANY($1)")
182 .bind(&ids_i64)
183 .execute(&mut *tx)
184 .await
185 .map_err(|e| StorageError::QueryError(format!("delete_traces: {e}")))?;
186 commit_tx!(tx, "delete_traces");
187 Ok(result.rows_affected())
188 }
189
190 async fn save_session(&self, entry: &SessionRow) -> Result<(), StorageError> {
193 let tid = crate::tenant::current_tenant_id();
194 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_session");
195 sqlx::query(
196 "INSERT INTO sessions (tenant_id, scope, key, value, source_step) \
197 VALUES ($1,$2,$3,$4,$5) \
198 ON CONFLICT (tenant_id, scope, key) DO UPDATE SET \
199 value = EXCLUDED.value, source_step = EXCLUDED.source_step"
200 )
201 .bind(&tid)
202 .bind(&entry.scope)
203 .bind(&entry.key)
204 .bind(&entry.value)
205 .bind(&entry.source_step)
206 .execute(&mut *tx)
207 .await
208 .map_err(|e| StorageError::QueryError(format!("save_session: {e}")))?;
209 commit_tx!(tx, "save_session");
210 Ok(())
211 }
212
213 async fn load_sessions(&self, scope: &str) -> Result<Vec<SessionRow>, StorageError> {
214 let tid = crate::tenant::current_tenant_id();
215 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_sessions");
216 let rows = sqlx::query(
217 "SELECT tenant_id, scope, key, value, source_step FROM sessions WHERE scope = $1"
218 )
219 .bind(scope)
220 .fetch_all(&mut *tx)
221 .await
222 .map_err(|e| StorageError::QueryError(format!("load_sessions: {e}")))?;
223 commit_tx!(tx, "load_sessions");
224 Ok(rows.iter().map(|r| SessionRow {
225 tenant_id: r.get("tenant_id"),
226 scope: r.get("scope"),
227 key: r.get("key"),
228 value: r.get("value"),
229 source_step: r.get("source_step"),
230 }).collect())
231 }
232
233 async fn delete_session(&self, scope: &str, key: &str) -> Result<bool, StorageError> {
234 let tid = crate::tenant::current_tenant_id();
235 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_session");
236 let result = sqlx::query("DELETE FROM sessions WHERE scope = $1 AND key = $2")
237 .bind(scope)
238 .bind(key)
239 .execute(&mut *tx)
240 .await
241 .map_err(|e| StorageError::QueryError(format!("delete_session: {e}")))?;
242 commit_tx!(tx, "delete_session");
243 Ok(result.rows_affected() > 0)
244 }
245
246 async fn save_daemon(&self, daemon: &DaemonRow) -> Result<(), StorageError> {
249 let tid = crate::tenant::current_tenant_id();
250 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_daemon");
251 sqlx::query(
252 "INSERT INTO daemons \
253 (tenant_id, name, state, source_file, flow_name, event_count, restart_count, \
254 trigger_topic, output_topic, lifecycle_events) \
255 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
256 ON CONFLICT (tenant_id, name) DO UPDATE SET \
257 state = EXCLUDED.state, event_count = EXCLUDED.event_count, \
258 restart_count = EXCLUDED.restart_count, lifecycle_events = EXCLUDED.lifecycle_events"
259 )
260 .bind(&tid)
261 .bind(&daemon.name)
262 .bind(&daemon.state)
263 .bind(&daemon.source_file)
264 .bind(&daemon.flow_name)
265 .bind(daemon.event_count as i64)
266 .bind(daemon.restart_count as i32)
267 .bind(&daemon.trigger_topic)
268 .bind(&daemon.output_topic)
269 .bind(&daemon.lifecycle_events)
270 .execute(&mut *tx)
271 .await
272 .map_err(|e| StorageError::QueryError(format!("save_daemon: {e}")))?;
273 commit_tx!(tx, "save_daemon");
274 Ok(())
275 }
276
277 async fn load_daemons(&self) -> Result<Vec<DaemonRow>, StorageError> {
278 let tid = crate::tenant::current_tenant_id();
279 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_daemons");
280 let rows = sqlx::query(
281 "SELECT tenant_id, name, state, source_file, flow_name, event_count, \
282 restart_count, trigger_topic, output_topic, lifecycle_events FROM daemons"
283 )
284 .fetch_all(&mut *tx)
285 .await
286 .map_err(|e| StorageError::QueryError(format!("load_daemons: {e}")))?;
287 commit_tx!(tx, "load_daemons");
288 Ok(rows.iter().map(|r| DaemonRow {
289 tenant_id: r.get("tenant_id"),
290 name: r.get("name"),
291 state: r.get("state"),
292 source_file: r.get("source_file"),
293 flow_name: r.get("flow_name"),
294 event_count: r.get::<i64, _>("event_count") as u64,
295 restart_count: r.get::<i32, _>("restart_count") as u32,
296 trigger_topic: r.get("trigger_topic"),
297 output_topic: r.get("output_topic"),
298 lifecycle_events: r.get("lifecycle_events"),
299 }).collect())
300 }
301
302 async fn delete_daemon(&self, name: &str) -> Result<bool, StorageError> {
303 let tid = crate::tenant::current_tenant_id();
304 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_daemon");
305 let result = sqlx::query("DELETE FROM daemons WHERE name = $1")
306 .bind(name)
307 .execute(&mut *tx)
308 .await
309 .map_err(|e| StorageError::QueryError(format!("delete_daemon: {e}")))?;
310 commit_tx!(tx, "delete_daemon");
311 Ok(result.rows_affected() > 0)
312 }
313
314 async fn append_audit(&self, entry: &AuditRow) -> Result<(), StorageError> {
317 let tid = crate::tenant::current_tenant_id();
318 let mut tx = begin_tenant_tx!(&self.pool, &tid, "append_audit");
319 sqlx::query(
320 "INSERT INTO audit_log (tenant_id, action, actor, target, detail) \
321 VALUES ($1,$2,$3,$4,$5)"
322 )
323 .bind(&tid)
324 .bind(&entry.action)
325 .bind(&entry.actor)
326 .bind(&entry.target)
327 .bind(&entry.detail)
328 .execute(&mut *tx)
329 .await
330 .map_err(|e| StorageError::QueryError(format!("append_audit: {e}")))?;
331 commit_tx!(tx, "append_audit");
332 Ok(())
333 }
334
335 async fn query_audit(&self, limit: usize) -> Result<Vec<AuditRow>, StorageError> {
336 let tid = crate::tenant::current_tenant_id();
337 let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_audit");
338 let rows = sqlx::query(
339 "SELECT tenant_id, action, actor, target, detail \
340 FROM audit_log ORDER BY timestamp_utc DESC LIMIT $1"
341 )
342 .bind(limit as i64)
343 .fetch_all(&mut *tx)
344 .await
345 .map_err(|e| StorageError::QueryError(format!("query_audit: {e}")))?;
346 commit_tx!(tx, "query_audit");
347 Ok(rows.iter().map(|r| AuditRow {
348 tenant_id: r.get("tenant_id"),
349 action: r.get("action"),
350 actor: r.get("actor"),
351 target: r.get("target"),
352 detail: r.get("detail"),
353 }).collect())
354 }
355
356 async fn save_axon_store(&self, store: &AxonStoreRow) -> Result<(), StorageError> {
359 let tid = crate::tenant::current_tenant_id();
360 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_axon_store");
361 sqlx::query(
362 "INSERT INTO axon_stores (tenant_id, name, ontology, entries, created_at, total_ops) \
363 VALUES ($1,$2,$3,$4,$5,$6) \
364 ON CONFLICT (tenant_id, name) DO UPDATE SET \
365 entries = EXCLUDED.entries, total_ops = EXCLUDED.total_ops"
366 )
367 .bind(&tid)
368 .bind(&store.name)
369 .bind(&store.ontology)
370 .bind(&store.entries)
371 .bind(store.created_at as i64)
372 .bind(store.total_ops as i64)
373 .execute(&mut *tx)
374 .await
375 .map_err(|e| StorageError::QueryError(format!("save_axon_store: {e}")))?;
376 commit_tx!(tx, "save_axon_store");
377 Ok(())
378 }
379
380 async fn load_axon_stores(&self) -> Result<Vec<AxonStoreRow>, StorageError> {
381 let tid = crate::tenant::current_tenant_id();
382 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_axon_stores");
383 let rows = sqlx::query(
384 "SELECT tenant_id, name, ontology, entries, created_at, total_ops FROM axon_stores"
385 )
386 .fetch_all(&mut *tx)
387 .await
388 .map_err(|e| StorageError::QueryError(format!("load_axon_stores: {e}")))?;
389 commit_tx!(tx, "load_axon_stores");
390 Ok(rows.iter().map(|r| AxonStoreRow {
391 tenant_id: r.get("tenant_id"),
392 name: r.get("name"),
393 ontology: r.get("ontology"),
394 entries: r.get("entries"),
395 created_at: r.get::<i64, _>("created_at") as u64,
396 total_ops: r.get::<i64, _>("total_ops") as u64,
397 }).collect())
398 }
399
400 async fn delete_axon_store(&self, name: &str) -> Result<bool, StorageError> {
401 let tid = crate::tenant::current_tenant_id();
402 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_axon_store");
403 let result = sqlx::query("DELETE FROM axon_stores WHERE name = $1")
404 .bind(name)
405 .execute(&mut *tx)
406 .await
407 .map_err(|e| StorageError::QueryError(format!("delete_axon_store: {e}")))?;
408 commit_tx!(tx, "delete_axon_store");
409 Ok(result.rows_affected() > 0)
410 }
411
412 async fn save_dataspace(&self, ds: &DataspaceRow) -> Result<(), StorageError> {
415 let tid = crate::tenant::current_tenant_id();
416 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_dataspace");
417 sqlx::query(
418 "INSERT INTO dataspaces \
419 (tenant_id, name, ontology, entries, associations, created_at, total_ops, next_id) \
420 VALUES ($1,$2,$3,$4,$5,$6,$7,$8) \
421 ON CONFLICT (tenant_id, name) DO UPDATE SET \
422 entries = EXCLUDED.entries, associations = EXCLUDED.associations, \
423 total_ops = EXCLUDED.total_ops, next_id = EXCLUDED.next_id"
424 )
425 .bind(&tid)
426 .bind(&ds.name)
427 .bind(&ds.ontology)
428 .bind(&ds.entries)
429 .bind(&ds.associations)
430 .bind(ds.created_at as i64)
431 .bind(ds.total_ops as i64)
432 .bind(ds.next_id as i64)
433 .execute(&mut *tx)
434 .await
435 .map_err(|e| StorageError::QueryError(format!("save_dataspace: {e}")))?;
436 commit_tx!(tx, "save_dataspace");
437 Ok(())
438 }
439
440 async fn load_dataspaces(&self) -> Result<Vec<DataspaceRow>, StorageError> {
441 let tid = crate::tenant::current_tenant_id();
442 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_dataspaces");
443 let rows = sqlx::query(
444 "SELECT tenant_id, name, ontology, entries, associations, \
445 created_at, total_ops, next_id FROM dataspaces"
446 )
447 .fetch_all(&mut *tx)
448 .await
449 .map_err(|e| StorageError::QueryError(format!("load_dataspaces: {e}")))?;
450 commit_tx!(tx, "load_dataspaces");
451 Ok(rows.iter().map(|r| DataspaceRow {
452 tenant_id: r.get("tenant_id"),
453 name: r.get("name"),
454 ontology: r.get("ontology"),
455 entries: r.get("entries"),
456 associations: r.get("associations"),
457 created_at: r.get::<i64, _>("created_at") as u64,
458 total_ops: r.get::<i64, _>("total_ops") as u64,
459 next_id: r.get::<i64, _>("next_id") as u64,
460 }).collect())
461 }
462
463 async fn delete_dataspace(&self, name: &str) -> Result<bool, StorageError> {
464 let tid = crate::tenant::current_tenant_id();
465 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_dataspace");
466 let result = sqlx::query("DELETE FROM dataspaces WHERE name = $1")
467 .bind(name)
468 .execute(&mut *tx)
469 .await
470 .map_err(|e| StorageError::QueryError(format!("delete_dataspace: {e}")))?;
471 commit_tx!(tx, "delete_dataspace");
472 Ok(result.rows_affected() > 0)
473 }
474
475 async fn save_hibernation(&self, session: &HibernationRow) -> Result<(), StorageError> {
478 let tid = crate::tenant::current_tenant_id();
479 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_hibernation");
480 sqlx::query(
481 "INSERT INTO hibernations \
482 (tenant_id, id, name, operation, status, checkpoints, resumed_from, \
483 created_at, last_status_change, next_checkpoint_id) \
484 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
485 ON CONFLICT (tenant_id, id) DO UPDATE SET \
486 status = EXCLUDED.status, checkpoints = EXCLUDED.checkpoints, \
487 resumed_from = EXCLUDED.resumed_from, \
488 last_status_change = EXCLUDED.last_status_change, \
489 next_checkpoint_id = EXCLUDED.next_checkpoint_id"
490 )
491 .bind(&tid)
492 .bind(&session.id)
493 .bind(&session.name)
494 .bind(&session.operation)
495 .bind(&session.status)
496 .bind(&session.checkpoints)
497 .bind(session.resumed_from)
498 .bind(session.created_at as i64)
499 .bind(session.last_status_change as i64)
500 .bind(session.next_checkpoint_id as i32)
501 .execute(&mut *tx)
502 .await
503 .map_err(|e| StorageError::QueryError(format!("save_hibernation: {e}")))?;
504 commit_tx!(tx, "save_hibernation");
505 Ok(())
506 }
507
508 async fn load_hibernations(&self) -> Result<Vec<HibernationRow>, StorageError> {
509 let tid = crate::tenant::current_tenant_id();
510 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_hibernations");
511 let rows = sqlx::query(
512 "SELECT tenant_id, id, name, operation, status, checkpoints, resumed_from, \
513 created_at, last_status_change, next_checkpoint_id FROM hibernations"
514 )
515 .fetch_all(&mut *tx)
516 .await
517 .map_err(|e| StorageError::QueryError(format!("load_hibernations: {e}")))?;
518 commit_tx!(tx, "load_hibernations");
519 Ok(rows.iter().map(|r| HibernationRow {
520 tenant_id: r.get("tenant_id"),
521 id: r.get("id"),
522 name: r.get("name"),
523 operation: r.get("operation"),
524 status: r.get("status"),
525 checkpoints: r.get("checkpoints"),
526 resumed_from: r.get("resumed_from"),
527 created_at: r.get::<i64, _>("created_at") as u64,
528 last_status_change: r.get::<i64, _>("last_status_change") as u64,
529 next_checkpoint_id: r.get::<i32, _>("next_checkpoint_id") as u32,
530 }).collect())
531 }
532
533 async fn append_event(&self, event: &EventRow) -> Result<(), StorageError> {
536 let tid = crate::tenant::current_tenant_id();
537 let mut tx = begin_tenant_tx!(&self.pool, &tid, "append_event");
538 sqlx::query(
539 "INSERT INTO event_history (tenant_id, topic, source, payload) VALUES ($1,$2,$3,$4)"
540 )
541 .bind(&tid)
542 .bind(&event.topic)
543 .bind(&event.source)
544 .bind(&event.payload)
545 .execute(&mut *tx)
546 .await
547 .map_err(|e| StorageError::QueryError(format!("append_event: {e}")))?;
548 commit_tx!(tx, "append_event");
549 Ok(())
550 }
551
552 async fn query_events(&self, topic: Option<&str>, limit: usize) -> Result<Vec<EventRow>, StorageError> {
553 let tid = crate::tenant::current_tenant_id();
554 let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_events");
555 let rows = match topic {
556 Some(t) => {
557 sqlx::query(
558 "SELECT tenant_id, topic, source, payload FROM event_history \
559 WHERE topic = $1 ORDER BY timestamp_utc DESC LIMIT $2"
560 )
561 .bind(t)
562 .bind(limit as i64)
563 .fetch_all(&mut *tx)
564 .await
565 }
566 None => {
567 sqlx::query(
568 "SELECT tenant_id, topic, source, payload FROM event_history \
569 ORDER BY timestamp_utc DESC LIMIT $1"
570 )
571 .bind(limit as i64)
572 .fetch_all(&mut *tx)
573 .await
574 }
575 }.map_err(|e| StorageError::QueryError(format!("query_events: {e}")))?;
576 commit_tx!(tx, "query_events");
577 Ok(rows.iter().map(|r| EventRow {
578 tenant_id: r.get("tenant_id"),
579 topic: r.get("topic"),
580 source: r.get("source"),
581 payload: r.get("payload"),
582 }).collect())
583 }
584
585 async fn save_cache_entry(&self, entry: &CacheRow) -> Result<(), StorageError> {
588 let tid = crate::tenant::current_tenant_id();
589 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_cache_entry");
590 sqlx::query(
591 "INSERT INTO execution_cache \
592 (tenant_id, flow_name, cache_key, result, ttl_secs, hit_count) \
593 VALUES ($1,$2,$3,$4,$5,$6) \
594 ON CONFLICT (tenant_id, cache_key) DO UPDATE SET \
595 result = EXCLUDED.result, ttl_secs = EXCLUDED.ttl_secs, \
596 hit_count = EXCLUDED.hit_count"
597 )
598 .bind(&tid)
599 .bind(&entry.flow_name)
600 .bind(&entry.cache_key)
601 .bind(&entry.result)
602 .bind(entry.ttl_secs)
603 .bind(entry.hit_count as i64)
604 .execute(&mut *tx)
605 .await
606 .map_err(|e| StorageError::QueryError(format!("save_cache_entry: {e}")))?;
607 commit_tx!(tx, "save_cache_entry");
608 Ok(())
609 }
610
611 async fn load_cache_entries(&self) -> Result<Vec<CacheRow>, StorageError> {
612 let tid = crate::tenant::current_tenant_id();
613 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_cache_entries");
614 let rows = sqlx::query(
615 "SELECT tenant_id, flow_name, cache_key, result, ttl_secs, hit_count \
616 FROM execution_cache"
617 )
618 .fetch_all(&mut *tx)
619 .await
620 .map_err(|e| StorageError::QueryError(format!("load_cache_entries: {e}")))?;
621 commit_tx!(tx, "load_cache_entries");
622 Ok(rows.iter().map(|r| CacheRow {
623 tenant_id: r.get("tenant_id"),
624 flow_name: r.get("flow_name"),
625 cache_key: r.get("cache_key"),
626 result: r.get("result"),
627 ttl_secs: r.get("ttl_secs"),
628 hit_count: r.get::<i64, _>("hit_count") as u64,
629 }).collect())
630 }
631
632 async fn evict_expired_cache(&self) -> Result<u64, StorageError> {
633 let tid = crate::tenant::current_tenant_id();
634 let mut tx = begin_tenant_tx!(&self.pool, &tid, "evict_expired_cache");
635 let result = sqlx::query(
636 "DELETE FROM execution_cache WHERE ttl_secs IS NOT NULL AND \
637 created_at + (ttl_secs || ' seconds')::interval < NOW()"
638 )
639 .execute(&mut *tx)
640 .await
641 .map_err(|e| StorageError::QueryError(format!("evict_expired_cache: {e}")))?;
642 commit_tx!(tx, "evict_expired_cache");
643 Ok(result.rows_affected())
644 }
645
646 async fn record_cost(&self, cost: &CostRow) -> Result<(), StorageError> {
649 let tid = crate::tenant::current_tenant_id();
650 let mut tx = begin_tenant_tx!(&self.pool, &tid, "record_cost");
651 sqlx::query(
652 "INSERT INTO cost_tracking \
653 (tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd) \
654 VALUES ($1,$2,$3,$4,$5,$6)"
655 )
656 .bind(&tid)
657 .bind(&cost.flow_name)
658 .bind(&cost.backend)
659 .bind(cost.input_tokens as i64)
660 .bind(cost.output_tokens as i64)
661 .bind(cost.cost_usd)
662 .execute(&mut *tx)
663 .await
664 .map_err(|e| StorageError::QueryError(format!("record_cost: {e}")))?;
665 commit_tx!(tx, "record_cost");
666 Ok(())
667 }
668
669 async fn query_costs(&self, flow: Option<&str>, limit: usize) -> Result<Vec<CostRow>, StorageError> {
670 let tid = crate::tenant::current_tenant_id();
671 let mut tx = begin_tenant_tx!(&self.pool, &tid, "query_costs");
672 let rows = match flow {
673 Some(f) => {
674 sqlx::query(
675 "SELECT tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd \
676 FROM cost_tracking WHERE flow_name = $1 \
677 ORDER BY timestamp_utc DESC LIMIT $2"
678 )
679 .bind(f)
680 .bind(limit as i64)
681 .fetch_all(&mut *tx)
682 .await
683 }
684 None => {
685 sqlx::query(
686 "SELECT tenant_id, flow_name, backend, input_tokens, output_tokens, cost_usd \
687 FROM cost_tracking ORDER BY timestamp_utc DESC LIMIT $1"
688 )
689 .bind(limit as i64)
690 .fetch_all(&mut *tx)
691 .await
692 }
693 }.map_err(|e| StorageError::QueryError(format!("query_costs: {e}")))?;
694 commit_tx!(tx, "query_costs");
695 Ok(rows.iter().map(|r| CostRow {
696 tenant_id: r.get("tenant_id"),
697 flow_name: r.get("flow_name"),
698 backend: r.get("backend"),
699 input_tokens: r.get::<i64, _>("input_tokens") as u64,
700 output_tokens: r.get::<i64, _>("output_tokens") as u64,
701 cost_usd: r.get("cost_usd"),
702 }).collect())
703 }
704
705 async fn save_schedule(&self, schedule: &ScheduleRow) -> Result<(), StorageError> {
708 let tid = crate::tenant::current_tenant_id();
709 let mut tx = begin_tenant_tx!(&self.pool, &tid, "save_schedule");
710 sqlx::query(
711 "INSERT INTO schedules \
712 (tenant_id, name, flow_name, interval_secs, enabled, backend, \
713 last_run, next_run, run_count, error_count) \
714 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) \
715 ON CONFLICT (tenant_id, name) DO UPDATE SET \
716 enabled = EXCLUDED.enabled, last_run = EXCLUDED.last_run, \
717 next_run = EXCLUDED.next_run, run_count = EXCLUDED.run_count, \
718 error_count = EXCLUDED.error_count"
719 )
720 .bind(&tid)
721 .bind(&schedule.name)
722 .bind(&schedule.flow_name)
723 .bind(schedule.interval_secs as i64)
724 .bind(schedule.enabled)
725 .bind(&schedule.backend)
726 .bind(schedule.last_run as i64)
727 .bind(schedule.next_run as i64)
728 .bind(schedule.run_count as i64)
729 .bind(schedule.error_count as i64)
730 .execute(&mut *tx)
731 .await
732 .map_err(|e| StorageError::QueryError(format!("save_schedule: {e}")))?;
733 commit_tx!(tx, "save_schedule");
734 Ok(())
735 }
736
737 async fn load_schedules(&self) -> Result<Vec<ScheduleRow>, StorageError> {
738 let tid = crate::tenant::current_tenant_id();
739 let mut tx = begin_tenant_tx!(&self.pool, &tid, "load_schedules");
740 let rows = sqlx::query(
741 "SELECT tenant_id, name, flow_name, interval_secs, enabled, backend, \
742 last_run, next_run, run_count, error_count FROM schedules"
743 )
744 .fetch_all(&mut *tx)
745 .await
746 .map_err(|e| StorageError::QueryError(format!("load_schedules: {e}")))?;
747 commit_tx!(tx, "load_schedules");
748 Ok(rows.iter().map(|r| ScheduleRow {
749 tenant_id: r.get("tenant_id"),
750 name: r.get("name"),
751 flow_name: r.get("flow_name"),
752 interval_secs: r.get::<i64, _>("interval_secs") as u64,
753 enabled: r.get("enabled"),
754 backend: r.get("backend"),
755 last_run: r.get::<i64, _>("last_run") as u64,
756 next_run: r.get::<i64, _>("next_run") as u64,
757 run_count: r.get::<i64, _>("run_count") as u64,
758 error_count: r.get::<i64, _>("error_count") as u64,
759 }).collect())
760 }
761
762 async fn delete_schedule(&self, name: &str) -> Result<bool, StorageError> {
763 let tid = crate::tenant::current_tenant_id();
764 let mut tx = begin_tenant_tx!(&self.pool, &tid, "delete_schedule");
765 let result = sqlx::query("DELETE FROM schedules WHERE name = $1")
766 .bind(name)
767 .execute(&mut *tx)
768 .await
769 .map_err(|e| StorageError::QueryError(format!("delete_schedule: {e}")))?;
770 commit_tx!(tx, "delete_schedule");
771 Ok(result.rows_affected() > 0)
772 }
773
774 async fn is_healthy(&self) -> bool {
777 crate::db_pool::check_health(&self.pool).await
778 }
779}