1use std::sync::Arc;
4
5use aimdb_core::typed_api::RecordRegistrar;
6use aimdb_executor::Spawn;
7
8use crate::backend::PersistenceBackend;
9use crate::builder_ext::PersistenceState;
10
11pub trait RecordRegistrarPersistExt<'a, T, R>
17where
18 T: serde::Serialize + Send + Sync + Clone + core::fmt::Debug + 'static,
19 R: Spawn + 'static,
20{
21 fn persist(&'a mut self, record_name: impl Into<String>) -> &'a mut RecordRegistrar<'a, T, R>;
27}
28
29impl<'a, T, R> RecordRegistrarPersistExt<'a, T, R> for RecordRegistrar<'a, T, R>
30where
31 T: serde::Serialize + Send + Sync + Clone + core::fmt::Debug + 'static,
32 R: Spawn + 'static,
33{
34 fn persist(&'a mut self, record_name: impl Into<String>) -> &'a mut RecordRegistrar<'a, T, R> {
35 let record_name: String = record_name.into();
36 let backend: Option<Arc<dyn PersistenceBackend>> = self
38 .extensions()
39 .get::<PersistenceState>()
40 .map(|s| s.backend.clone());
41
42 let Some(backend) = backend else {
45 #[cfg(feature = "tracing")]
46 tracing::warn!(
47 "Record '{}' marked for persistence, but no backend is configured via with_persistence(); .persist() will be a no-op",
48 record_name
49 );
50 return self;
51 };
52 self.tap_raw(move |consumer, _ctx| async move {
56 let mut reader = match consumer.subscribe() {
57 Ok(r) => r,
58 Err(e) => {
59 #[cfg(feature = "tracing")]
60 tracing::error!(
61 "Persistence subscriber for '{}' failed to subscribe: {:?}",
62 record_name,
63 e
64 );
65 let _ = e;
66 return;
67 }
68 };
69
70 while let Ok(value) = reader.recv().await {
71 let json = match serde_json::to_value(&value) {
73 Ok(v) => v,
74 Err(e) => {
75 #[cfg(feature = "tracing")]
76 tracing::warn!("Persistence: failed to serialize '{}': {}", record_name, e);
77 let _ = e;
78 continue;
79 }
80 };
81
82 let timestamp = std::time::SystemTime::now()
83 .duration_since(std::time::UNIX_EPOCH)
84 .unwrap_or_default()
85 .as_millis() as u64;
86
87 if let Err(e) = backend.store(&record_name, &json, timestamp).await {
88 #[cfg(feature = "tracing")]
89 tracing::warn!("Persistence: failed to store '{}': {}", record_name, e);
90 let _ = e;
91 }
92 }
93
94 #[cfg(feature = "tracing")]
95 tracing::debug!(
96 "Persistence subscriber for '{}' stopping (buffer closed)",
97 record_name
98 );
99 })
100 }
101}