Skip to main content

aimdb_persistence/
builder_ext.rs

1//! Builder extension: `.with_persistence()` on [`AimDbBuilder`].
2
3use std::sync::Arc;
4
5use aimdb_core::builder::AimDbBuilder;
6use aimdb_core::remote::{QueryHandlerFn, QueryHandlerParams};
7use aimdb_executor::{Spawn, TimeOps};
8
9use crate::backend::{PersistenceBackend, QueryParams};
10
11/// State stored in the builder's [`Extensions`](aimdb_core::Extensions) TypeMap.
12///
13/// Both `.persist()` (on `RecordRegistrar`) and `AimDbQueryExt` (on `AimDb<R>`)
14/// retrieve this via `extensions().get::<PersistenceState>()`.
15pub struct PersistenceState {
16    /// The configured persistence backend.
17    pub backend: Arc<dyn PersistenceBackend>,
18    /// How long to keep persisted values before automatic cleanup.
19    pub retention_ms: u64,
20}
21
22/// Extension trait that adds `.with_persistence()` to [`AimDbBuilder`].
23pub trait AimDbBuilderPersistExt<R: Spawn + TimeOps> {
24    /// Configures a persistence backend with a retention window.
25    ///
26    /// Stores the backend in the builder's `Extensions` TypeMap (accessible to
27    /// `.persist()` and `AimDbQueryExt` methods) and registers an `on_start()`
28    /// task that runs an initial cleanup sweep then repeats every 24 hours.
29    ///
30    /// Also registers a `QueryHandlerFn` in extensions so the AimX protocol's
31    /// `record.query` method can delegate to the backend without importing
32    /// persistence types in `aimdb-core`.
33    ///
34    /// # Arguments
35    /// * `backend` - The persistence backend (e.g. `SqliteBackend`)
36    /// * `retention` - How long to keep values (e.g. `Duration::from_secs(7 * 24 * 3600)`)
37    fn with_persistence(
38        self,
39        backend: Arc<dyn PersistenceBackend>,
40        retention: core::time::Duration,
41    ) -> Self;
42}
43
44impl<R> AimDbBuilderPersistExt<R> for AimDbBuilder<R>
45where
46    R: Spawn + TimeOps + 'static,
47{
48    fn with_persistence(
49        mut self,
50        backend: Arc<dyn PersistenceBackend>,
51        retention: core::time::Duration,
52    ) -> Self {
53        let retention_ms = u64::try_from(retention.as_millis()).unwrap_or(u64::MAX);
54
55        // Store backend + retention as a typed entry in the Extensions TypeMap.
56        self.extensions_mut().insert(PersistenceState {
57            backend: backend.clone(),
58            retention_ms,
59        });
60
61        // Register a QueryHandlerFn so AimX record.query can delegate to us.
62        let query_backend = backend.clone();
63        let handler: QueryHandlerFn = Box::new(move |params: QueryHandlerParams| {
64            let backend = query_backend.clone();
65            Box::pin(async move {
66                let query_params = QueryParams {
67                    limit_per_record: params.limit.or(Some(1)),
68                    start_time: params.start,
69                    end_time: params.end,
70                };
71
72                let stored = backend
73                    .query(&params.name, query_params)
74                    .await
75                    .map_err(|e| e.to_string())?;
76
77                let values: Vec<serde_json::Value> = stored
78                    .into_iter()
79                    .map(|sv| {
80                        serde_json::json!({
81                            "record": sv.record_name,
82                            "value": sv.value,
83                            "stored_at": sv.stored_at,
84                        })
85                    })
86                    .collect();
87
88                let count = values.len();
89                Ok(serde_json::json!({
90                    "values": values,
91                    "count": count,
92                }))
93            })
94        });
95        self.extensions_mut().insert(handler);
96
97        // Register a startup task for periodic retention cleanup.
98        let backend_task = backend.clone();
99        self.on_start(move |runtime: Arc<R>| async move {
100            loop {
101                // Calculate the cutoff: now minus retention window.
102                let now = u64::try_from(
103                    std::time::SystemTime::now()
104                        .duration_since(std::time::UNIX_EPOCH)
105                        .unwrap_or_default()
106                        .as_millis(),
107                )
108                .unwrap_or(u64::MAX);
109                let cutoff = now.saturating_sub(retention_ms);
110
111                match backend_task.cleanup(cutoff).await {
112                    Ok(_deleted) => {
113                        #[cfg(feature = "tracing")]
114                        tracing::debug!(
115                            "Persistence cleanup: deleted {} rows older than {}ms",
116                            _deleted,
117                            cutoff
118                        );
119                    }
120                    Err(e) => {
121                        #[cfg(feature = "tracing")]
122                        tracing::warn!("Persistence cleanup failed: {}", e);
123                        #[cfg(not(feature = "tracing"))]
124                        eprintln!("[aimdb-persistence] retention cleanup failed: {e}");
125                    }
126                }
127
128                // Sleep 24 hours using the runtime's TimeOps.
129                let day = runtime.secs(24 * 3600);
130                runtime.sleep(day).await;
131            }
132        });
133
134        self
135    }
136}