atomr_telemetry/
streams.rs1use std::sync::atomic::{AtomicU64, Ordering};
5
6use dashmap::DashMap;
7
8use crate::bus::{TelemetryBus, TelemetryEvent};
9use crate::dto::{StreamGraphInfo, StreamsSnapshot};
10
11pub struct StreamsProbe {
12 bus: TelemetryBus,
13 active: DashMap<u64, StreamGraphInfo>,
14 next_id: AtomicU64,
15 started: AtomicU64,
16 finished: AtomicU64,
17}
18
19impl StreamsProbe {
20 pub fn new(bus: TelemetryBus) -> Self {
21 Self {
22 bus,
23 active: DashMap::new(),
24 next_id: AtomicU64::new(1),
25 started: AtomicU64::new(0),
26 finished: AtomicU64::new(0),
27 }
28 }
29
30 pub fn start_graph(&self, name: impl Into<String>) -> u64 {
31 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
32 let name = name.into();
33 let info = StreamGraphInfo { id, name: name.clone(), started_at: chrono::Utc::now().to_rfc3339() };
34 self.active.insert(id, info);
35 self.started.fetch_add(1, Ordering::Relaxed);
36 self.bus.publish(TelemetryEvent::StreamsGraphStarted { id, name });
37 id
38 }
39
40 pub fn finish_graph(&self, id: u64) {
41 if self.active.remove(&id).is_some() {
42 self.finished.fetch_add(1, Ordering::Relaxed);
43 self.bus.publish(TelemetryEvent::StreamsGraphFinished { id });
44 }
45 }
46
47 pub fn running(&self) -> u64 {
48 self.active.len() as u64
49 }
50
51 pub fn snapshot(&self) -> StreamsSnapshot {
52 StreamsSnapshot {
53 running_graphs: self.running(),
54 total_started: self.started.load(Ordering::Relaxed),
55 total_finished: self.finished.load(Ordering::Relaxed),
56 active: self.active.iter().map(|e| e.value().clone()).collect(),
57 }
58 }
59}
60
61#[cfg(test)]
62mod tests {
63 use super::*;
64
65 #[test]
66 fn counts_running_graphs() {
67 let bus = TelemetryBus::new(8);
68 let p = StreamsProbe::new(bus);
69 let a = p.start_graph("g1");
70 let _b = p.start_graph("g2");
71 assert_eq!(p.running(), 2);
72 p.finish_graph(a);
73 assert_eq!(p.running(), 1);
74 let s = p.snapshot();
75 assert_eq!(s.total_started, 2);
76 assert_eq!(s.total_finished, 1);
77 }
78}