Skip to main content

atomr_telemetry/
ddata.rs

1//! Distributed-data probe — tracks replicator key updates.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use dashmap::DashSet;
6
7use crate::bus::{TelemetryBus, TelemetryEvent};
8use crate::dto::DDataSnapshot;
9
10pub struct DDataProbe {
11    bus: TelemetryBus,
12    keys: DashSet<String>,
13    updates: AtomicU64,
14}
15
16impl DDataProbe {
17    pub fn new(bus: TelemetryBus) -> Self {
18        Self { bus, keys: DashSet::new(), updates: AtomicU64::new(0) }
19    }
20
21    pub fn record_update(&self, key: &str) {
22        self.keys.insert(key.to_string());
23        self.updates.fetch_add(1, Ordering::Relaxed);
24        self.bus.publish(TelemetryEvent::DDataUpdated { key: key.to_string() });
25    }
26
27    pub fn record_delete(&self, key: &str) {
28        self.keys.remove(key);
29    }
30
31    pub fn key_count(&self) -> usize {
32        self.keys.len()
33    }
34
35    pub fn snapshot(&self) -> DDataSnapshot {
36        let mut keys: Vec<String> = self.keys.iter().map(|k| k.clone()).collect();
37        keys.sort();
38        DDataSnapshot { keys, total_updates: self.updates.load(Ordering::Relaxed) }
39    }
40
41    /// Refresh key set from a live replicator. Feature-gated.
42    #[cfg(feature = "ddata")]
43    pub fn refresh_from(&self, replicator: &atomr_distributed_data::Replicator) {
44        let current: std::collections::HashSet<String> = self.keys.iter().map(|k| k.clone()).collect();
45        let fresh: std::collections::HashSet<String> = replicator.keys().into_iter().collect();
46        for gone in current.difference(&fresh) {
47            self.keys.remove(gone);
48        }
49        for new in fresh.difference(&current) {
50            self.keys.insert(new.clone());
51        }
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58
59    #[test]
60    fn tracks_keys_and_counts() {
61        let bus = TelemetryBus::new(8);
62        let p = DDataProbe::new(bus);
63        p.record_update("counter");
64        p.record_update("set");
65        p.record_update("counter");
66        assert_eq!(p.key_count(), 2);
67        let s = p.snapshot();
68        assert_eq!(s.total_updates, 3);
69        assert_eq!(s.keys.len(), 2);
70    }
71}