Skip to main content

atomr_telemetry/
streams.rs

1//! Streams probe — running-graph counter + list hooked into
2//! `ActorMaterializer`.
3
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use dashmap::DashMap;
7
8use crate::bus::{TelemetryBus, TelemetryEvent};
9use crate::dto::{StreamGraphInfo, StreamsSnapshot};
10
11pub struct StreamsProbe {
12    bus: TelemetryBus,
13    active: DashMap<u64, StreamGraphInfo>,
14    next_id: AtomicU64,
15    started: AtomicU64,
16    finished: AtomicU64,
17}
18
19impl StreamsProbe {
20    pub fn new(bus: TelemetryBus) -> Self {
21        Self {
22            bus,
23            active: DashMap::new(),
24            next_id: AtomicU64::new(1),
25            started: AtomicU64::new(0),
26            finished: AtomicU64::new(0),
27        }
28    }
29
30    pub fn start_graph(&self, name: impl Into<String>) -> u64 {
31        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
32        let name = name.into();
33        let info = StreamGraphInfo { id, name: name.clone(), started_at: chrono::Utc::now().to_rfc3339() };
34        self.active.insert(id, info);
35        self.started.fetch_add(1, Ordering::Relaxed);
36        self.bus.publish(TelemetryEvent::StreamsGraphStarted { id, name });
37        id
38    }
39
40    pub fn finish_graph(&self, id: u64) {
41        if self.active.remove(&id).is_some() {
42            self.finished.fetch_add(1, Ordering::Relaxed);
43            self.bus.publish(TelemetryEvent::StreamsGraphFinished { id });
44        }
45    }
46
47    pub fn running(&self) -> u64 {
48        self.active.len() as u64
49    }
50
51    pub fn snapshot(&self) -> StreamsSnapshot {
52        StreamsSnapshot {
53            running_graphs: self.running(),
54            total_started: self.started.load(Ordering::Relaxed),
55            total_finished: self.finished.load(Ordering::Relaxed),
56            active: self.active.iter().map(|e| e.value().clone()).collect(),
57        }
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64
65    #[test]
66    fn counts_running_graphs() {
67        let bus = TelemetryBus::new(8);
68        let p = StreamsProbe::new(bus);
69        let a = p.start_graph("g1");
70        let _b = p.start_graph("g2");
71        assert_eq!(p.running(), 2);
72        p.finish_graph(a);
73        assert_eq!(p.running(), 1);
74        let s = p.snapshot();
75        assert_eq!(s.total_started, 2);
76        assert_eq!(s.total_finished, 1);
77    }
78}