atomr_telemetry/
persistence.rs1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use parking_lot::RwLock;
11
12use crate::bus::{TelemetryBus, TelemetryEvent};
13use crate::dto::{JournalInfo, JournalWriteInfo, PersistenceIdStat, PersistenceSnapshot};
14
15#[async_trait]
19pub trait JournalAdmin: Send + Sync + 'static {
20 fn name(&self) -> &str;
21
22 async fn list_persistence_ids(&self) -> Vec<String> {
23 Vec::new()
24 }
25
26 async fn highest_sequence_nr_for(&self, persistence_id: &str) -> u64 {
27 let _ = persistence_id;
28 0
29 }
30
31 async fn event_count_for(&self, persistence_id: &str) -> u64 {
32 let _ = persistence_id;
33 0
34 }
35}
36
37pub struct PersistenceProbe {
38 bus: TelemetryBus,
39 journals: RwLock<Vec<Arc<dyn JournalAdmin>>>,
40 recent_writes: RwLock<std::collections::VecDeque<JournalWriteInfo>>,
41 total_events: AtomicU64,
42 max_recent: usize,
43}
44
45impl PersistenceProbe {
46 pub fn new(bus: TelemetryBus) -> Self {
47 Self {
48 bus,
49 journals: RwLock::new(Vec::new()),
50 recent_writes: RwLock::new(std::collections::VecDeque::with_capacity(128)),
51 total_events: AtomicU64::new(0),
52 max_recent: 128,
53 }
54 }
55
56 pub fn register_journal(&self, admin: Arc<dyn JournalAdmin>) {
57 self.journals.write().push(admin);
58 }
59
60 pub fn record_write(&self, journal: &str, persistence_id: &str, sequence_nr: u64) {
61 let info = JournalWriteInfo {
62 journal: journal.to_string(),
63 persistence_id: persistence_id.to_string(),
64 sequence_nr,
65 timestamp: chrono::Utc::now().to_rfc3339(),
66 };
67 self.total_events.fetch_add(1, Ordering::Relaxed);
68 {
69 let mut b = self.recent_writes.write();
70 if b.len() == self.max_recent {
71 b.pop_front();
72 }
73 b.push_back(info.clone());
74 }
75 self.bus.publish(TelemetryEvent::JournalWrite(info));
76 }
77
78 pub fn total_events(&self) -> u64 {
79 self.total_events.load(Ordering::Relaxed)
80 }
81
82 pub fn snapshot(&self) -> PersistenceSnapshot {
83 PersistenceSnapshot {
84 journals: Vec::new(),
85 total_events: self.total_events(),
86 recent_writes: self.recent_writes.read().iter().cloned().collect(),
87 }
88 }
89
90 pub async fn snapshot_async(&self) -> PersistenceSnapshot {
94 let journals = self.journals.read().clone();
95 let mut out: Vec<JournalInfo> = Vec::with_capacity(journals.len());
96 for j in journals {
97 let ids = j.list_persistence_ids().await;
98 let mut pids: Vec<PersistenceIdStat> = Vec::with_capacity(ids.len());
99 for id in ids {
100 let seq = j.highest_sequence_nr_for(&id).await;
101 let count = j.event_count_for(&id).await;
102 pids.push(PersistenceIdStat {
103 persistence_id: id,
104 highest_sequence_nr: seq,
105 event_count: count,
106 });
107 }
108 out.push(JournalInfo { name: j.name().to_string(), persistence_ids: pids });
109 }
110 PersistenceSnapshot {
111 journals: out,
112 total_events: self.total_events(),
113 recent_writes: self.recent_writes.read().iter().cloned().collect(),
114 }
115 }
116}
117
118#[cfg(feature = "persistence")]
121pub struct InMemoryJournalAdmin {
122 name: String,
123 inner: Arc<atomr_persistence::InMemoryJournal>,
124}
125
126#[cfg(feature = "persistence")]
127impl InMemoryJournalAdmin {
128 pub fn new(name: impl Into<String>, journal: Arc<atomr_persistence::InMemoryJournal>) -> Self {
129 Self { name: name.into(), inner: journal }
130 }
131}
132
133#[cfg(feature = "persistence")]
134#[async_trait]
135impl JournalAdmin for InMemoryJournalAdmin {
136 fn name(&self) -> &str {
137 &self.name
138 }
139 async fn list_persistence_ids(&self) -> Vec<String> {
140 self.inner.list_persistence_ids()
141 }
142 async fn highest_sequence_nr_for(&self, pid: &str) -> u64 {
143 use atomr_persistence::Journal;
144 self.inner.highest_sequence_nr(pid, 0).await.unwrap_or(0)
145 }
146 async fn event_count_for(&self, pid: &str) -> u64 {
147 self.inner.event_count(pid)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 struct Dummy;
156 #[async_trait]
157 impl JournalAdmin for Dummy {
158 fn name(&self) -> &str {
159 "dummy"
160 }
161 async fn list_persistence_ids(&self) -> Vec<String> {
162 vec!["p1".into()]
163 }
164 async fn highest_sequence_nr_for(&self, _pid: &str) -> u64 {
165 42
166 }
167 async fn event_count_for(&self, _pid: &str) -> u64 {
168 3
169 }
170 }
171
172 #[tokio::test]
173 async fn records_writes_and_snapshot_async() {
174 let bus = TelemetryBus::new(8);
175 let probe = PersistenceProbe::new(bus);
176 probe.register_journal(Arc::new(Dummy));
177 probe.record_write("j", "p1", 1);
178 probe.record_write("j", "p1", 2);
179 assert_eq!(probe.total_events(), 2);
180 let snap = probe.snapshot_async().await;
181 assert_eq!(snap.journals.len(), 1);
182 assert_eq!(snap.journals[0].persistence_ids[0].highest_sequence_nr, 42);
183 }
184}