Skip to main content

atomr_telemetry/
bus.rs

1//! Typed telemetry bus backed by a `tokio::sync::broadcast` channel.
2//!
3//! Probes publish `TelemetryEvent`s through this bus. Dashboard clients
4//! (WebSocket subscribers), exporters (Prometheus/OpenTelemetry), and
5//! the in-memory `DeadLetterFeed` ring buffer all receive copies.
6
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::dto::{
14    ActorStatus, ClusterMembershipDiff, DeadLetterRecord, JournalWriteInfo, RemoteAssociationInfo,
15    ShardingEvent,
16};
17use crate::exporters::Exporter;
18
19/// A single telemetry event. Kept deliberately wide so clients can filter
20/// by the `topic` field without deserializing every payload variant.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum TelemetryEvent {
24    ActorSpawned(ActorStatus),
25    ActorStopped { path: String },
26    MailboxSampled { path: String, depth: u64 },
27    DeadLetter(DeadLetterRecord),
28    ClusterChanged(ClusterMembershipDiff),
29    ShardingChanged(ShardingEvent),
30    JournalWrite(JournalWriteInfo),
31    RemoteAssociation(RemoteAssociationInfo),
32    StreamsGraphStarted { id: u64, name: String },
33    StreamsGraphFinished { id: u64 },
34    DDataUpdated { key: String },
35}
36
37impl TelemetryEvent {
38    /// A short, stable topic string used by WebSocket clients to filter
39    /// the event stream.
40    pub fn topic(&self) -> &'static str {
41        match self {
42            Self::ActorSpawned(_) | Self::ActorStopped { .. } | Self::MailboxSampled { .. } => "actors",
43            Self::DeadLetter(_) => "dead_letters",
44            Self::ClusterChanged(_) => "cluster",
45            Self::ShardingChanged(_) => "sharding",
46            Self::JournalWrite(_) => "persistence",
47            Self::RemoteAssociation(_) => "remote",
48            Self::StreamsGraphStarted { .. } | Self::StreamsGraphFinished { .. } => "streams",
49            Self::DDataUpdated { .. } => "ddata",
50        }
51    }
52}
53
54/// Cheap-to-clone broadcast bus. Wraps a `tokio::sync::broadcast` sender
55/// plus a slot for attached exporters (synchronous callbacks).
56#[derive(Clone)]
57pub struct TelemetryBus {
58    tx: broadcast::Sender<TelemetryEvent>,
59    exporters: Arc<RwLock<Vec<Arc<dyn Exporter>>>>,
60}
61
62impl TelemetryBus {
63    pub fn new(capacity: usize) -> Self {
64        let (tx, _rx) = broadcast::channel(capacity.max(16));
65        Self { tx, exporters: Arc::new(RwLock::new(Vec::new())) }
66    }
67
68    pub fn publish(&self, event: TelemetryEvent) {
69        // Fan out to in-process exporters first (synchronous, cheap).
70        let exporters = self.exporters.read().clone();
71        for exp in &exporters {
72            exp.on_event(&event);
73        }
74        // Then broadcast to async subscribers (WS clients, tests).
75        let _ = self.tx.send(event);
76    }
77
78    pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
79        self.tx.subscribe()
80    }
81
82    pub fn receiver_count(&self) -> usize {
83        self.tx.receiver_count()
84    }
85
86    pub(crate) fn attach_exporter(&self, exporter: Arc<dyn Exporter>) {
87        self.exporters.write().push(exporter);
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use crate::dto::ActorStatus;
95
96    #[tokio::test]
97    async fn publish_and_subscribe_roundtrip() {
98        let bus = TelemetryBus::new(8);
99        let mut rx = bus.subscribe();
100        bus.publish(TelemetryEvent::ActorStopped { path: "/user/a".into() });
101        let got = rx.recv().await.unwrap();
102        assert_eq!(got.topic(), "actors");
103    }
104
105    #[tokio::test]
106    async fn topic_labels_correct() {
107        let e = TelemetryEvent::ActorSpawned(ActorStatus {
108            path: "/user/x".into(),
109            parent: Some("/user".into()),
110            actor_type: "Test".into(),
111            mailbox_depth: 0,
112            spawned_at: "now".into(),
113        });
114        assert_eq!(e.topic(), "actors");
115    }
116}