atomr_telemetry/
actor_registry.rs1use std::sync::atomic::{AtomicU64, Ordering};
8
9use atomr_core::actor::{ActorPath, SpawnObserver};
10use dashmap::DashMap;
11
12use crate::bus::{TelemetryBus, TelemetryEvent};
13use crate::dto::{ActorSnapshot, ActorStatus, ActorTreeNode};
14
15pub struct ActorRegistry {
16 entries: DashMap<String, ActorStatus>,
17 bus: TelemetryBus,
18 spawned: AtomicU64,
19 stopped: AtomicU64,
20}
21
22impl ActorRegistry {
23 pub fn new(bus: TelemetryBus) -> Self {
24 Self { entries: DashMap::new(), bus, spawned: AtomicU64::new(0), stopped: AtomicU64::new(0) }
25 }
26
27 pub fn record_spawn(&self, status: ActorStatus) {
28 self.spawned.fetch_add(1, Ordering::Relaxed);
29 self.entries.insert(status.path.clone(), status.clone());
30 self.bus.publish(TelemetryEvent::ActorSpawned(status));
31 }
32
33 pub fn record_stop(&self, path: &str) {
34 if self.entries.remove(path).is_some() {
35 self.stopped.fetch_add(1, Ordering::Relaxed);
36 self.bus.publish(TelemetryEvent::ActorStopped { path: path.to_string() });
37 }
38 }
39
40 pub fn record_mailbox_depth(&self, path: &str, depth: u64) {
41 if let Some(mut e) = self.entries.get_mut(path) {
42 e.mailbox_depth = depth;
43 }
44 self.bus.publish(TelemetryEvent::MailboxSampled { path: path.to_string(), depth });
45 }
46
47 pub fn total_spawned(&self) -> u64 {
48 self.spawned.load(Ordering::Relaxed)
49 }
50
51 pub fn total_stopped(&self) -> u64 {
52 self.stopped.load(Ordering::Relaxed)
53 }
54
55 pub fn live_count(&self) -> u64 {
56 self.entries.len() as u64
57 }
58
59 pub fn snapshot(&self) -> ActorSnapshot {
60 let flat: Vec<ActorStatus> = self.entries.iter().map(|e| e.value().clone()).collect();
61 let roots = build_tree(&flat);
62 ActorSnapshot { total: flat.len() as u64, roots, flat }
63 }
64}
65
66impl SpawnObserver for ActorRegistry {
67 fn on_spawn(&self, path: &ActorPath, parent: Option<&ActorPath>, actor_type: &'static str) {
68 self.record_spawn(ActorStatus {
69 path: path.to_string(),
70 parent: parent.map(|p| p.to_string()),
71 actor_type: actor_type.to_string(),
72 mailbox_depth: 0,
73 spawned_at: chrono::Utc::now().to_rfc3339(),
74 });
75 }
76
77 fn on_stop(&self, path: &ActorPath) {
78 self.record_stop(&path.to_string());
79 }
80
81 fn on_mailbox_depth(&self, path: &ActorPath, depth: u64) {
82 self.record_mailbox_depth(&path.to_string(), depth);
83 }
84}
85
86fn build_tree(flat: &[ActorStatus]) -> Vec<ActorTreeNode> {
87 use std::collections::HashMap;
88 let mut children_of: HashMap<String, Vec<&ActorStatus>> = HashMap::new();
89 let mut roots: Vec<&ActorStatus> = Vec::new();
90
91 for s in flat {
92 match &s.parent {
93 Some(p) if flat.iter().any(|x| &x.path == p) => {
94 children_of.entry(p.clone()).or_default().push(s);
95 }
96 _ => roots.push(s),
97 }
98 }
99
100 fn to_node(
101 s: &ActorStatus,
102 children_of: &std::collections::HashMap<String, Vec<&ActorStatus>>,
103 ) -> ActorTreeNode {
104 let name = s.path.rsplit('/').next().unwrap_or(&s.path).to_string();
105 let children = children_of
106 .get(&s.path)
107 .map(|v| v.iter().map(|c| to_node(c, children_of)).collect())
108 .unwrap_or_default();
109 ActorTreeNode {
110 path: s.path.clone(),
111 name,
112 actor_type: s.actor_type.clone(),
113 mailbox_depth: s.mailbox_depth,
114 children,
115 }
116 }
117
118 roots.iter().map(|r| to_node(r, &children_of)).collect()
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 fn status(path: &str, parent: Option<&str>) -> ActorStatus {
126 ActorStatus {
127 path: path.into(),
128 parent: parent.map(|s| s.into()),
129 actor_type: "Test".into(),
130 mailbox_depth: 0,
131 spawned_at: "now".into(),
132 }
133 }
134
135 #[test]
136 fn tracks_spawn_and_stop() {
137 let bus = TelemetryBus::new(8);
138 let reg = ActorRegistry::new(bus);
139 reg.record_spawn(status("/user/a", Some("/user")));
140 reg.record_spawn(status("/user/b", Some("/user")));
141 assert_eq!(reg.live_count(), 2);
142 reg.record_stop("/user/a");
143 assert_eq!(reg.live_count(), 1);
144 assert_eq!(reg.total_spawned(), 2);
145 assert_eq!(reg.total_stopped(), 1);
146 }
147
148 #[test]
149 fn builds_tree_with_known_parents() {
150 let bus = TelemetryBus::new(8);
151 let reg = ActorRegistry::new(bus);
152 reg.record_spawn(status("/user", None));
153 reg.record_spawn(status("/user/a", Some("/user")));
154 reg.record_spawn(status("/user/a/aa", Some("/user/a")));
155 let snap = reg.snapshot();
156 assert_eq!(snap.total, 3);
157 assert_eq!(snap.roots.len(), 1);
158 assert_eq!(snap.roots[0].children.len(), 1);
159 assert_eq!(snap.roots[0].children[0].children.len(), 1);
160 }
161
162 #[test]
163 fn mailbox_depth_updates_status() {
164 let bus = TelemetryBus::new(8);
165 let reg = ActorRegistry::new(bus);
166 reg.record_spawn(status("/user/a", Some("/user")));
167 reg.record_mailbox_depth("/user/a", 42);
168 let snap = reg.snapshot();
169 assert_eq!(snap.flat[0].mailbox_depth, 42);
170 }
171}