Skip to main content

atomr_telemetry/
bus.rs

1//! Typed telemetry bus backed by a `tokio::sync::broadcast` channel.
2//!
3//! Probes publish `TelemetryEvent`s through this bus. Dashboard clients
4//! (WebSocket subscribers), exporters (Prometheus/OpenTelemetry), and
5//! the in-memory `DeadLetterFeed` ring buffer all receive copies.
6
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::dto::{
14    ActorStatus, ClusterMembershipDiff, DeadLetterRecord, JournalWriteInfo, RemoteAssociationInfo,
15    ShardingEvent,
16};
17use crate::exporters::Exporter;
18
19/// A single telemetry event. Kept deliberately wide so clients can filter
20/// by the `topic` field without deserializing every payload variant.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum TelemetryEvent {
24    ActorSpawned(ActorStatus),
25    ActorStopped { path: String },
26    MailboxSampled { path: String, depth: u64 },
27    DeadLetter(DeadLetterRecord),
28    ClusterChanged(ClusterMembershipDiff),
29    ShardingChanged(ShardingEvent),
30    JournalWrite(JournalWriteInfo),
31    RemoteAssociation(RemoteAssociationInfo),
32    StreamsGraphStarted { id: u64, name: String },
33    StreamsGraphFinished { id: u64 },
34    DDataUpdated { key: String },
35}
36
37impl TelemetryEvent {
38    /// A short, stable topic string used by WebSocket clients to filter
39    /// the event stream.
40    pub fn topic(&self) -> &'static str {
41        match self {
42            Self::ActorSpawned(_) | Self::ActorStopped { .. } | Self::MailboxSampled { .. } => "actors",
43            Self::DeadLetter(_) => "dead_letters",
44            Self::ClusterChanged(_) => "cluster",
45            Self::ShardingChanged(_) => "sharding",
46            Self::JournalWrite(_) => "persistence",
47            Self::RemoteAssociation(_) => "remote",
48            Self::StreamsGraphStarted { .. } | Self::StreamsGraphFinished { .. } => "streams",
49            Self::DDataUpdated { .. } => "ddata",
50        }
51    }
52
53    /// All telemetry topics the bus emits. Used by the dashboard /
54    /// spec parity tests to ensure every probe surface is wired.
55    pub const ALL_TOPICS: &'static [&'static str] =
56        &["actors", "dead_letters", "cluster", "sharding", "persistence", "remote", "streams", "ddata"];
57}
58
59/// Cheap-to-clone broadcast bus. Wraps a `tokio::sync::broadcast` sender
60/// plus a slot for attached exporters (synchronous callbacks).
61#[derive(Clone)]
62pub struct TelemetryBus {
63    tx: broadcast::Sender<TelemetryEvent>,
64    exporters: Arc<RwLock<Vec<Arc<dyn Exporter>>>>,
65}
66
67impl TelemetryBus {
68    pub fn new(capacity: usize) -> Self {
69        let (tx, _rx) = broadcast::channel(capacity.max(16));
70        Self { tx, exporters: Arc::new(RwLock::new(Vec::new())) }
71    }
72
73    pub fn publish(&self, event: TelemetryEvent) {
74        // Fan out to in-process exporters first (synchronous, cheap).
75        let exporters = self.exporters.read().clone();
76        for exp in &exporters {
77            exp.on_event(&event);
78        }
79        // Then broadcast to async subscribers (WS clients, tests).
80        let _ = self.tx.send(event);
81    }
82
83    pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
84        self.tx.subscribe()
85    }
86
87    /// Subscribe to a single topic. The returned receiver yields only
88    /// events whose `topic()` matches `wanted`. Backed by a forwarder
89    /// task that filters the broadcast stream — drop the receiver to
90    /// stop the forwarder.
91    pub fn subscribe_topic(
92        &self,
93        wanted: &'static str,
94    ) -> tokio::sync::mpsc::UnboundedReceiver<TelemetryEvent> {
95        let mut src = self.tx.subscribe();
96        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
97        tokio::spawn(async move {
98            while let Ok(ev) = src.recv().await {
99                if ev.topic() == wanted && tx.send(ev).is_err() {
100                    return;
101                }
102            }
103        });
104        rx
105    }
106
107    pub fn receiver_count(&self) -> usize {
108        self.tx.receiver_count()
109    }
110
111    pub(crate) fn attach_exporter(&self, exporter: Arc<dyn Exporter>) {
112        self.exporters.write().push(exporter);
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::dto::ActorStatus;
120
121    #[tokio::test]
122    async fn publish_and_subscribe_roundtrip() {
123        let bus = TelemetryBus::new(8);
124        let mut rx = bus.subscribe();
125        bus.publish(TelemetryEvent::ActorStopped { path: "/user/a".into() });
126        let got = rx.recv().await.unwrap();
127        assert_eq!(got.topic(), "actors");
128    }
129
130    #[tokio::test]
131    async fn topic_labels_correct() {
132        let e = TelemetryEvent::ActorSpawned(ActorStatus {
133            path: "/user/x".into(),
134            parent: Some("/user".into()),
135            actor_type: "Test".into(),
136            mailbox_depth: 0,
137            spawned_at: "now".into(),
138        });
139        assert_eq!(e.topic(), "actors");
140    }
141
142    #[tokio::test]
143    async fn subscribe_topic_filters_by_topic() {
144        let bus = TelemetryBus::new(16);
145        let mut rx = bus.subscribe_topic("ddata");
146        bus.publish(TelemetryEvent::ActorStopped { path: "/x".into() }); // actors — skipped
147        bus.publish(TelemetryEvent::DDataUpdated { key: "k".into() });
148        bus.publish(TelemetryEvent::DDataUpdated { key: "j".into() });
149        let first = rx.recv().await.unwrap();
150        let second = rx.recv().await.unwrap();
151        match (first, second) {
152            (TelemetryEvent::DDataUpdated { key: k1 }, TelemetryEvent::DDataUpdated { key: k2 }) => {
153                assert_eq!(k1, "k");
154                assert_eq!(k2, "j");
155            }
156            other => panic!("unexpected events: {other:?}"),
157        }
158    }
159
160    #[test]
161    fn all_topics_covers_every_variant() {
162        // Confirm every topic that variants advertise is listed in
163        // ALL_TOPICS — guards against drift between TelemetryEvent and
164        // the dashboard topic enumeration.
165        let samples = [
166            TelemetryEvent::ActorStopped { path: "/x".into() }.topic(),
167            TelemetryEvent::DeadLetter(DeadLetterRecord {
168                seq: 0,
169                recipient: "/x".into(),
170                sender: None,
171                message_type: "test".into(),
172                message_preview: "p".into(),
173                timestamp: "now".into(),
174            })
175            .topic(),
176            TelemetryEvent::DDataUpdated { key: "k".into() }.topic(),
177        ];
178        for t in samples {
179            assert!(TelemetryEvent::ALL_TOPICS.contains(&t), "topic {t} missing from ALL_TOPICS");
180        }
181        // No duplicates.
182        let mut sorted = TelemetryEvent::ALL_TOPICS.to_vec();
183        sorted.sort();
184        sorted.dedup();
185        assert_eq!(sorted.len(), TelemetryEvent::ALL_TOPICS.len());
186    }
187}