Skip to main content

atomr_telemetry/
persistence.rs

1//! Persistence probe — tracks a roll-up of journal activity and provides
2//! an admin trait for listing `persistence_id`s + highest sequence
3//! numbers. Default impls are feature-gated so the telemetry crate works
4//! even when persistence is disabled.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use parking_lot::RwLock;
11
12use crate::bus::{TelemetryBus, TelemetryEvent};
13use crate::dto::{JournalInfo, JournalWriteInfo, PersistenceIdStat, PersistenceSnapshot};
14
15/// Admin surface journals can implement to expose their contents to the
16/// dashboard. Default methods return empty vectors so implementing it is
17/// opt-in per backend.
18#[async_trait]
19pub trait JournalAdmin: Send + Sync + 'static {
20    fn name(&self) -> &str;
21
22    async fn list_persistence_ids(&self) -> Vec<String> {
23        Vec::new()
24    }
25
26    async fn highest_sequence_nr_for(&self, persistence_id: &str) -> u64 {
27        let _ = persistence_id;
28        0
29    }
30
31    async fn event_count_for(&self, persistence_id: &str) -> u64 {
32        let _ = persistence_id;
33        0
34    }
35}
36
37pub struct PersistenceProbe {
38    bus: TelemetryBus,
39    journals: RwLock<Vec<Arc<dyn JournalAdmin>>>,
40    recent_writes: RwLock<std::collections::VecDeque<JournalWriteInfo>>,
41    total_events: AtomicU64,
42    max_recent: usize,
43}
44
45impl PersistenceProbe {
46    pub fn new(bus: TelemetryBus) -> Self {
47        Self {
48            bus,
49            journals: RwLock::new(Vec::new()),
50            recent_writes: RwLock::new(std::collections::VecDeque::with_capacity(128)),
51            total_events: AtomicU64::new(0),
52            max_recent: 128,
53        }
54    }
55
56    pub fn register_journal(&self, admin: Arc<dyn JournalAdmin>) {
57        self.journals.write().push(admin);
58    }
59
60    pub fn record_write(&self, journal: &str, persistence_id: &str, sequence_nr: u64) {
61        let info = JournalWriteInfo {
62            journal: journal.to_string(),
63            persistence_id: persistence_id.to_string(),
64            sequence_nr,
65            timestamp: chrono::Utc::now().to_rfc3339(),
66        };
67        self.total_events.fetch_add(1, Ordering::Relaxed);
68        {
69            let mut b = self.recent_writes.write();
70            if b.len() == self.max_recent {
71                b.pop_front();
72            }
73            b.push_back(info.clone());
74        }
75        self.bus.publish(TelemetryEvent::JournalWrite(info));
76    }
77
78    pub fn total_events(&self) -> u64 {
79        self.total_events.load(Ordering::Relaxed)
80    }
81
82    pub fn snapshot(&self) -> PersistenceSnapshot {
83        PersistenceSnapshot {
84            journals: Vec::new(),
85            total_events: self.total_events(),
86            recent_writes: self.recent_writes.read().iter().cloned().collect(),
87        }
88    }
89
90    /// Same as [`Self::snapshot`] but populates per-journal persistence
91    /// ids + sequence numbers by calling into every registered
92    /// [`JournalAdmin`]. Awaits each admin sequentially.
93    pub async fn snapshot_async(&self) -> PersistenceSnapshot {
94        let journals = self.journals.read().clone();
95        let mut out: Vec<JournalInfo> = Vec::with_capacity(journals.len());
96        for j in journals {
97            let ids = j.list_persistence_ids().await;
98            let mut pids: Vec<PersistenceIdStat> = Vec::with_capacity(ids.len());
99            for id in ids {
100                let seq = j.highest_sequence_nr_for(&id).await;
101                let count = j.event_count_for(&id).await;
102                pids.push(PersistenceIdStat {
103                    persistence_id: id,
104                    highest_sequence_nr: seq,
105                    event_count: count,
106                });
107            }
108            out.push(JournalInfo { name: j.name().to_string(), persistence_ids: pids });
109        }
110        PersistenceSnapshot {
111            journals: out,
112            total_events: self.total_events(),
113            recent_writes: self.recent_writes.read().iter().cloned().collect(),
114        }
115    }
116}
117
118/// Admin wrapper around [`atomr_persistence::InMemoryJournal`].
119/// Feature-gated.
120#[cfg(feature = "persistence")]
121pub struct InMemoryJournalAdmin {
122    name: String,
123    inner: Arc<atomr_persistence::InMemoryJournal>,
124}
125
126#[cfg(feature = "persistence")]
127impl InMemoryJournalAdmin {
128    pub fn new(name: impl Into<String>, journal: Arc<atomr_persistence::InMemoryJournal>) -> Self {
129        Self { name: name.into(), inner: journal }
130    }
131}
132
133#[cfg(feature = "persistence")]
134#[async_trait]
135impl JournalAdmin for InMemoryJournalAdmin {
136    fn name(&self) -> &str {
137        &self.name
138    }
139    async fn list_persistence_ids(&self) -> Vec<String> {
140        self.inner.list_persistence_ids()
141    }
142    async fn highest_sequence_nr_for(&self, pid: &str) -> u64 {
143        use atomr_persistence::Journal;
144        self.inner.highest_sequence_nr(pid, 0).await.unwrap_or(0)
145    }
146    async fn event_count_for(&self, pid: &str) -> u64 {
147        self.inner.event_count(pid)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    struct Dummy;
156    #[async_trait]
157    impl JournalAdmin for Dummy {
158        fn name(&self) -> &str {
159            "dummy"
160        }
161        async fn list_persistence_ids(&self) -> Vec<String> {
162            vec!["p1".into()]
163        }
164        async fn highest_sequence_nr_for(&self, _pid: &str) -> u64 {
165            42
166        }
167        async fn event_count_for(&self, _pid: &str) -> u64 {
168            3
169        }
170    }
171
172    #[tokio::test]
173    async fn records_writes_and_snapshot_async() {
174        let bus = TelemetryBus::new(8);
175        let probe = PersistenceProbe::new(bus);
176        probe.register_journal(Arc::new(Dummy));
177        probe.record_write("j", "p1", 1);
178        probe.record_write("j", "p1", 2);
179        assert_eq!(probe.total_events(), 2);
180        let snap = probe.snapshot_async().await;
181        assert_eq!(snap.journals.len(), 1);
182        assert_eq!(snap.journals[0].persistence_ids[0].highest_sequence_nr, 42);
183    }
184}