aimdb_persistence/builder_ext.rs
1//! Builder extension: `.with_persistence()` on [`AimDbBuilder`].
2
3use std::sync::Arc;
4
5use aimdb_core::builder::AimDbBuilder;
6use aimdb_core::remote::{QueryHandlerFn, QueryHandlerParams};
7use aimdb_executor::{Spawn, TimeOps};
8
9use crate::backend::{PersistenceBackend, QueryParams};
10
11/// State stored in the builder's [`Extensions`](aimdb_core::Extensions) TypeMap.
12///
13/// Both `.persist()` (on `RecordRegistrar`) and `AimDbQueryExt` (on `AimDb<R>`)
14/// retrieve this via `extensions().get::<PersistenceState>()`.
15pub struct PersistenceState {
16 /// The configured persistence backend.
17 pub backend: Arc<dyn PersistenceBackend>,
18 /// How long to keep persisted values before automatic cleanup.
19 pub retention_ms: u64,
20}
21
22/// Extension trait that adds `.with_persistence()` to [`AimDbBuilder`].
23pub trait AimDbBuilderPersistExt<R: Spawn + TimeOps> {
24 /// Configures a persistence backend with a retention window.
25 ///
26 /// Stores the backend in the builder's `Extensions` TypeMap (accessible to
27 /// `.persist()` and `AimDbQueryExt` methods) and registers an `on_start()`
28 /// task that runs an initial cleanup sweep then repeats every 24 hours.
29 ///
30 /// Also registers a `QueryHandlerFn` in extensions so the AimX protocol's
31 /// `record.query` method can delegate to the backend without importing
32 /// persistence types in `aimdb-core`.
33 ///
34 /// # Arguments
35 /// * `backend` - The persistence backend (e.g. `SqliteBackend`)
36 /// * `retention` - How long to keep values (e.g. `Duration::from_secs(7 * 24 * 3600)`)
37 fn with_persistence(
38 self,
39 backend: Arc<dyn PersistenceBackend>,
40 retention: core::time::Duration,
41 ) -> Self;
42}
43
44impl<R> AimDbBuilderPersistExt<R> for AimDbBuilder<R>
45where
46 R: Spawn + TimeOps + 'static,
47{
48 fn with_persistence(
49 mut self,
50 backend: Arc<dyn PersistenceBackend>,
51 retention: core::time::Duration,
52 ) -> Self {
53 let retention_ms = u64::try_from(retention.as_millis()).unwrap_or(u64::MAX);
54
55 // Store backend + retention as a typed entry in the Extensions TypeMap.
56 self.extensions_mut().insert(PersistenceState {
57 backend: backend.clone(),
58 retention_ms,
59 });
60
61 // Register a QueryHandlerFn so AimX record.query can delegate to us.
62 let query_backend = backend.clone();
63 let handler: QueryHandlerFn = Box::new(move |params: QueryHandlerParams| {
64 let backend = query_backend.clone();
65 Box::pin(async move {
66 let query_params = QueryParams {
67 limit_per_record: params.limit.or(Some(1)),
68 start_time: params.start,
69 end_time: params.end,
70 };
71
72 let stored = backend
73 .query(¶ms.name, query_params)
74 .await
75 .map_err(|e| e.to_string())?;
76
77 let values: Vec<serde_json::Value> = stored
78 .into_iter()
79 .map(|sv| {
80 serde_json::json!({
81 "record": sv.record_name,
82 "value": sv.value,
83 "stored_at": sv.stored_at,
84 })
85 })
86 .collect();
87
88 let count = values.len();
89 Ok(serde_json::json!({
90 "values": values,
91 "count": count,
92 }))
93 })
94 });
95 self.extensions_mut().insert(handler);
96
97 // Register a startup task for periodic retention cleanup.
98 let backend_task = backend.clone();
99 self.on_start(move |runtime: Arc<R>| async move {
100 loop {
101 // Calculate the cutoff: now minus retention window.
102 let now = u64::try_from(
103 std::time::SystemTime::now()
104 .duration_since(std::time::UNIX_EPOCH)
105 .unwrap_or_default()
106 .as_millis(),
107 )
108 .unwrap_or(u64::MAX);
109 let cutoff = now.saturating_sub(retention_ms);
110
111 match backend_task.cleanup(cutoff).await {
112 Ok(_deleted) => {
113 #[cfg(feature = "tracing")]
114 tracing::debug!(
115 "Persistence cleanup: deleted {} rows older than {}ms",
116 _deleted,
117 cutoff
118 );
119 }
120 Err(e) => {
121 #[cfg(feature = "tracing")]
122 tracing::warn!("Persistence cleanup failed: {}", e);
123 #[cfg(not(feature = "tracing"))]
124 eprintln!("[aimdb-persistence] retention cleanup failed: {e}");
125 }
126 }
127
128 // Sleep 24 hours using the runtime's TimeOps.
129 let day = runtime.secs(24 * 3600);
130 runtime.sleep(day).await;
131 }
132 });
133
134 self
135 }
136}