1use std::sync::atomic::{AtomicU64, Ordering};
4
5use dashmap::DashSet;
6
7use crate::bus::{TelemetryBus, TelemetryEvent};
8use crate::dto::DDataSnapshot;
9
10pub struct DDataProbe {
11 bus: TelemetryBus,
12 keys: DashSet<String>,
13 updates: AtomicU64,
14}
15
16impl DDataProbe {
17 pub fn new(bus: TelemetryBus) -> Self {
18 Self { bus, keys: DashSet::new(), updates: AtomicU64::new(0) }
19 }
20
21 pub fn record_update(&self, key: &str) {
22 self.keys.insert(key.to_string());
23 self.updates.fetch_add(1, Ordering::Relaxed);
24 self.bus.publish(TelemetryEvent::DDataUpdated { key: key.to_string() });
25 }
26
27 pub fn record_delete(&self, key: &str) {
28 self.keys.remove(key);
29 }
30
31 pub fn key_count(&self) -> usize {
32 self.keys.len()
33 }
34
35 pub fn snapshot(&self) -> DDataSnapshot {
36 let mut keys: Vec<String> = self.keys.iter().map(|k| k.clone()).collect();
37 keys.sort();
38 DDataSnapshot { keys, total_updates: self.updates.load(Ordering::Relaxed) }
39 }
40
41 #[cfg(feature = "ddata")]
43 pub fn refresh_from(&self, replicator: &atomr_distributed_data::Replicator) {
44 let current: std::collections::HashSet<String> = self.keys.iter().map(|k| k.clone()).collect();
45 let fresh: std::collections::HashSet<String> = replicator.keys().into_iter().collect();
46 for gone in current.difference(&fresh) {
47 self.keys.remove(gone);
48 }
49 for new in fresh.difference(¤t) {
50 self.keys.insert(new.clone());
51 }
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 #[test]
60 fn tracks_keys_and_counts() {
61 let bus = TelemetryBus::new(8);
62 let p = DDataProbe::new(bus);
63 p.record_update("counter");
64 p.record_update("set");
65 p.record_update("counter");
66 assert_eq!(p.key_count(), 2);
67 let s = p.snapshot();
68 assert_eq!(s.total_updates, 3);
69 assert_eq!(s.keys.len(), 2);
70 }
71}