1use std::sync::Arc;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::dto::{
14 ActorStatus, ClusterMembershipDiff, DeadLetterRecord, JournalWriteInfo, RemoteAssociationInfo,
15 ShardingEvent,
16};
17use crate::exporters::Exporter;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum TelemetryEvent {
24 ActorSpawned(ActorStatus),
25 ActorStopped { path: String },
26 MailboxSampled { path: String, depth: u64 },
27 DeadLetter(DeadLetterRecord),
28 ClusterChanged(ClusterMembershipDiff),
29 ShardingChanged(ShardingEvent),
30 JournalWrite(JournalWriteInfo),
31 RemoteAssociation(RemoteAssociationInfo),
32 StreamsGraphStarted { id: u64, name: String },
33 StreamsGraphFinished { id: u64 },
34 DDataUpdated { key: String },
35}
36
37impl TelemetryEvent {
38 pub fn topic(&self) -> &'static str {
41 match self {
42 Self::ActorSpawned(_) | Self::ActorStopped { .. } | Self::MailboxSampled { .. } => "actors",
43 Self::DeadLetter(_) => "dead_letters",
44 Self::ClusterChanged(_) => "cluster",
45 Self::ShardingChanged(_) => "sharding",
46 Self::JournalWrite(_) => "persistence",
47 Self::RemoteAssociation(_) => "remote",
48 Self::StreamsGraphStarted { .. } | Self::StreamsGraphFinished { .. } => "streams",
49 Self::DDataUpdated { .. } => "ddata",
50 }
51 }
52
53 pub const ALL_TOPICS: &'static [&'static str] =
56 &["actors", "dead_letters", "cluster", "sharding", "persistence", "remote", "streams", "ddata"];
57}
58
59#[derive(Clone)]
62pub struct TelemetryBus {
63 tx: broadcast::Sender<TelemetryEvent>,
64 exporters: Arc<RwLock<Vec<Arc<dyn Exporter>>>>,
65}
66
67impl TelemetryBus {
68 pub fn new(capacity: usize) -> Self {
69 let (tx, _rx) = broadcast::channel(capacity.max(16));
70 Self { tx, exporters: Arc::new(RwLock::new(Vec::new())) }
71 }
72
73 pub fn publish(&self, event: TelemetryEvent) {
74 let exporters = self.exporters.read().clone();
76 for exp in &exporters {
77 exp.on_event(&event);
78 }
79 let _ = self.tx.send(event);
81 }
82
83 pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
84 self.tx.subscribe()
85 }
86
87 pub fn subscribe_topic(
92 &self,
93 wanted: &'static str,
94 ) -> tokio::sync::mpsc::UnboundedReceiver<TelemetryEvent> {
95 let mut src = self.tx.subscribe();
96 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
97 tokio::spawn(async move {
98 while let Ok(ev) = src.recv().await {
99 if ev.topic() == wanted && tx.send(ev).is_err() {
100 return;
101 }
102 }
103 });
104 rx
105 }
106
107 pub fn receiver_count(&self) -> usize {
108 self.tx.receiver_count()
109 }
110
111 pub(crate) fn attach_exporter(&self, exporter: Arc<dyn Exporter>) {
112 self.exporters.write().push(exporter);
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::dto::ActorStatus;
120
121 #[tokio::test]
122 async fn publish_and_subscribe_roundtrip() {
123 let bus = TelemetryBus::new(8);
124 let mut rx = bus.subscribe();
125 bus.publish(TelemetryEvent::ActorStopped { path: "/user/a".into() });
126 let got = rx.recv().await.unwrap();
127 assert_eq!(got.topic(), "actors");
128 }
129
130 #[tokio::test]
131 async fn topic_labels_correct() {
132 let e = TelemetryEvent::ActorSpawned(ActorStatus {
133 path: "/user/x".into(),
134 parent: Some("/user".into()),
135 actor_type: "Test".into(),
136 mailbox_depth: 0,
137 spawned_at: "now".into(),
138 });
139 assert_eq!(e.topic(), "actors");
140 }
141
142 #[tokio::test]
143 async fn subscribe_topic_filters_by_topic() {
144 let bus = TelemetryBus::new(16);
145 let mut rx = bus.subscribe_topic("ddata");
146 bus.publish(TelemetryEvent::ActorStopped { path: "/x".into() }); bus.publish(TelemetryEvent::DDataUpdated { key: "k".into() });
148 bus.publish(TelemetryEvent::DDataUpdated { key: "j".into() });
149 let first = rx.recv().await.unwrap();
150 let second = rx.recv().await.unwrap();
151 match (first, second) {
152 (TelemetryEvent::DDataUpdated { key: k1 }, TelemetryEvent::DDataUpdated { key: k2 }) => {
153 assert_eq!(k1, "k");
154 assert_eq!(k2, "j");
155 }
156 other => panic!("unexpected events: {other:?}"),
157 }
158 }
159
160 #[test]
161 fn all_topics_covers_every_variant() {
162 let samples = [
166 TelemetryEvent::ActorStopped { path: "/x".into() }.topic(),
167 TelemetryEvent::DeadLetter(DeadLetterRecord {
168 seq: 0,
169 recipient: "/x".into(),
170 sender: None,
171 message_type: "test".into(),
172 message_preview: "p".into(),
173 timestamp: "now".into(),
174 })
175 .topic(),
176 TelemetryEvent::DDataUpdated { key: "k".into() }.topic(),
177 ];
178 for t in samples {
179 assert!(TelemetryEvent::ALL_TOPICS.contains(&t), "topic {t} missing from ALL_TOPICS");
180 }
181 let mut sorted = TelemetryEvent::ALL_TOPICS.to_vec();
183 sorted.sort();
184 sorted.dedup();
185 assert_eq!(sorted.len(), TelemetryEvent::ALL_TOPICS.len());
186 }
187}