1use std::sync::Arc;
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::dto::{
14 ActorStatus, ClusterMembershipDiff, DeadLetterRecord, JournalWriteInfo, RemoteAssociationInfo,
15 ShardingEvent,
16};
17use crate::exporters::Exporter;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind", rename_all = "snake_case")]
23pub enum TelemetryEvent {
24 ActorSpawned(ActorStatus),
25 ActorStopped { path: String },
26 MailboxSampled { path: String, depth: u64 },
27 DeadLetter(DeadLetterRecord),
28 ClusterChanged(ClusterMembershipDiff),
29 ShardingChanged(ShardingEvent),
30 JournalWrite(JournalWriteInfo),
31 RemoteAssociation(RemoteAssociationInfo),
32 StreamsGraphStarted { id: u64, name: String },
33 StreamsGraphFinished { id: u64 },
34 DDataUpdated { key: String },
35}
36
37impl TelemetryEvent {
38 pub fn topic(&self) -> &'static str {
41 match self {
42 Self::ActorSpawned(_) | Self::ActorStopped { .. } | Self::MailboxSampled { .. } => "actors",
43 Self::DeadLetter(_) => "dead_letters",
44 Self::ClusterChanged(_) => "cluster",
45 Self::ShardingChanged(_) => "sharding",
46 Self::JournalWrite(_) => "persistence",
47 Self::RemoteAssociation(_) => "remote",
48 Self::StreamsGraphStarted { .. } | Self::StreamsGraphFinished { .. } => "streams",
49 Self::DDataUpdated { .. } => "ddata",
50 }
51 }
52}
53
54#[derive(Clone)]
57pub struct TelemetryBus {
58 tx: broadcast::Sender<TelemetryEvent>,
59 exporters: Arc<RwLock<Vec<Arc<dyn Exporter>>>>,
60}
61
62impl TelemetryBus {
63 pub fn new(capacity: usize) -> Self {
64 let (tx, _rx) = broadcast::channel(capacity.max(16));
65 Self { tx, exporters: Arc::new(RwLock::new(Vec::new())) }
66 }
67
68 pub fn publish(&self, event: TelemetryEvent) {
69 let exporters = self.exporters.read().clone();
71 for exp in &exporters {
72 exp.on_event(&event);
73 }
74 let _ = self.tx.send(event);
76 }
77
78 pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
79 self.tx.subscribe()
80 }
81
82 pub fn receiver_count(&self) -> usize {
83 self.tx.receiver_count()
84 }
85
86 pub(crate) fn attach_exporter(&self, exporter: Arc<dyn Exporter>) {
87 self.exporters.write().push(exporter);
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use crate::dto::ActorStatus;
95
96 #[tokio::test]
97 async fn publish_and_subscribe_roundtrip() {
98 let bus = TelemetryBus::new(8);
99 let mut rx = bus.subscribe();
100 bus.publish(TelemetryEvent::ActorStopped { path: "/user/a".into() });
101 let got = rx.recv().await.unwrap();
102 assert_eq!(got.topic(), "actors");
103 }
104
105 #[tokio::test]
106 async fn topic_labels_correct() {
107 let e = TelemetryEvent::ActorSpawned(ActorStatus {
108 path: "/user/x".into(),
109 parent: Some("/user".into()),
110 actor_type: "Test".into(),
111 mailbox_depth: 0,
112 spawned_at: "now".into(),
113 });
114 assert_eq!(e.topic(), "actors");
115 }
116}