Skip to main content

aimdb_persistence/
ext.rs

1//! Record registration extension: `.persist()` on [`RecordRegistrar`].
2
3use std::sync::Arc;
4
5use aimdb_core::typed_api::RecordRegistrar;
6use aimdb_executor::Spawn;
7
8use crate::backend::PersistenceBackend;
9use crate::builder_ext::PersistenceState;
10
11/// Extension trait that adds `.persist()` to [`RecordRegistrar`].
12///
13/// `T: Serialize` is required so values can be converted to JSON for storage.
14/// `.with_remote_access()` is **not** required — persistence subscribes to the
15/// typed buffer directly.
16pub trait RecordRegistrarPersistExt<'a, T, R>
17where
18    T: serde::Serialize + Send + Sync + Clone + core::fmt::Debug + 'static,
19    R: Spawn + 'static,
20{
21    /// Opt this record into persistence.
22    ///
23    /// Spawns a background subscriber (via `tap_raw`) that serializes each
24    /// value to JSON and writes it to the configured backend. Retention is
25    /// managed by the cleanup task registered during `with_persistence()`.
26    fn persist(&'a mut self, record_name: impl Into<String>) -> &'a mut RecordRegistrar<'a, T, R>;
27}
28
29impl<'a, T, R> RecordRegistrarPersistExt<'a, T, R> for RecordRegistrar<'a, T, R>
30where
31    T: serde::Serialize + Send + Sync + Clone + core::fmt::Debug + 'static,
32    R: Spawn + 'static,
33{
34    fn persist(&'a mut self, record_name: impl Into<String>) -> &'a mut RecordRegistrar<'a, T, R> {
35        let record_name: String = record_name.into();
36        // Retrieve the backend from the builder's Extensions TypeMap, if configured.
37        let backend: Option<Arc<dyn PersistenceBackend>> = self
38            .extensions()
39            .get::<PersistenceState>()
40            .map(|s| s.backend.clone());
41
42        // If no backend is configured, treat `.persist()` as a no-op so that
43        // persistence remains optional and does not cause runtime panics.
44        let Some(backend) = backend else {
45            #[cfg(feature = "tracing")]
46            tracing::warn!(
47                "Record '{}' marked for persistence, but no backend is configured via with_persistence(); .persist() will be a no-op",
48                record_name
49            );
50            return self;
51        };
52        // Subscribe to the typed buffer as a tap (side-effect observer).
53        // The second closure argument is the runtime context (Arc<dyn Any>),
54        // which we don't need — persistence is runtime-agnostic.
55        self.tap_raw(move |consumer, _ctx| async move {
56            let mut reader = match consumer.subscribe() {
57                Ok(r) => r,
58                Err(e) => {
59                    #[cfg(feature = "tracing")]
60                    tracing::error!(
61                        "Persistence subscriber for '{}' failed to subscribe: {:?}",
62                        record_name,
63                        e
64                    );
65                    let _ = e;
66                    return;
67                }
68            };
69
70            while let Ok(value) = reader.recv().await {
71                // T is known here — serialize directly, no with_remote_access() needed.
72                let json = match serde_json::to_value(&value) {
73                    Ok(v) => v,
74                    Err(e) => {
75                        #[cfg(feature = "tracing")]
76                        tracing::warn!("Persistence: failed to serialize '{}': {}", record_name, e);
77                        let _ = e;
78                        continue;
79                    }
80                };
81
82                let timestamp = std::time::SystemTime::now()
83                    .duration_since(std::time::UNIX_EPOCH)
84                    .unwrap_or_default()
85                    .as_millis() as u64;
86
87                if let Err(e) = backend.store(&record_name, &json, timestamp).await {
88                    #[cfg(feature = "tracing")]
89                    tracing::warn!("Persistence: failed to store '{}': {}", record_name, e);
90                    let _ = e;
91                }
92            }
93
94            #[cfg(feature = "tracing")]
95            tracing::debug!(
96                "Persistence subscriber for '{}' stopping (buffer closed)",
97                record_name
98            );
99        })
100    }
101}