Skip to main content

atomr_telemetry/
actor_registry.rs

1//! Actor registry probe — tracks path/parent/mailbox-depth for every live
2//! actor in an `ActorSystem`.
3//!
4//! Populated by the `atomr-core` spawn/stop hooks when a
5//! [`crate::TelemetryExtension`] is registered on the actor system.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use atomr_core::actor::{ActorPath, SpawnObserver};
10use dashmap::DashMap;
11
12use crate::bus::{TelemetryBus, TelemetryEvent};
13use crate::dto::{ActorSnapshot, ActorStatus, ActorTreeNode};
14
15pub struct ActorRegistry {
16    entries: DashMap<String, ActorStatus>,
17    bus: TelemetryBus,
18    spawned: AtomicU64,
19    stopped: AtomicU64,
20}
21
22impl ActorRegistry {
23    pub fn new(bus: TelemetryBus) -> Self {
24        Self { entries: DashMap::new(), bus, spawned: AtomicU64::new(0), stopped: AtomicU64::new(0) }
25    }
26
27    pub fn record_spawn(&self, status: ActorStatus) {
28        self.spawned.fetch_add(1, Ordering::Relaxed);
29        self.entries.insert(status.path.clone(), status.clone());
30        self.bus.publish(TelemetryEvent::ActorSpawned(status));
31    }
32
33    pub fn record_stop(&self, path: &str) {
34        if self.entries.remove(path).is_some() {
35            self.stopped.fetch_add(1, Ordering::Relaxed);
36            self.bus.publish(TelemetryEvent::ActorStopped { path: path.to_string() });
37        }
38    }
39
40    pub fn record_mailbox_depth(&self, path: &str, depth: u64) {
41        if let Some(mut e) = self.entries.get_mut(path) {
42            e.mailbox_depth = depth;
43        }
44        self.bus.publish(TelemetryEvent::MailboxSampled { path: path.to_string(), depth });
45    }
46
47    pub fn total_spawned(&self) -> u64 {
48        self.spawned.load(Ordering::Relaxed)
49    }
50
51    pub fn total_stopped(&self) -> u64 {
52        self.stopped.load(Ordering::Relaxed)
53    }
54
55    pub fn live_count(&self) -> u64 {
56        self.entries.len() as u64
57    }
58
59    pub fn snapshot(&self) -> ActorSnapshot {
60        let flat: Vec<ActorStatus> = self.entries.iter().map(|e| e.value().clone()).collect();
61        let roots = build_tree(&flat);
62        ActorSnapshot { total: flat.len() as u64, roots, flat }
63    }
64}
65
66impl SpawnObserver for ActorRegistry {
67    fn on_spawn(&self, path: &ActorPath, parent: Option<&ActorPath>, actor_type: &'static str) {
68        self.record_spawn(ActorStatus {
69            path: path.to_string(),
70            parent: parent.map(|p| p.to_string()),
71            actor_type: actor_type.to_string(),
72            mailbox_depth: 0,
73            spawned_at: chrono::Utc::now().to_rfc3339(),
74        });
75    }
76
77    fn on_stop(&self, path: &ActorPath) {
78        self.record_stop(&path.to_string());
79    }
80
81    fn on_mailbox_depth(&self, path: &ActorPath, depth: u64) {
82        self.record_mailbox_depth(&path.to_string(), depth);
83    }
84}
85
86fn build_tree(flat: &[ActorStatus]) -> Vec<ActorTreeNode> {
87    use std::collections::HashMap;
88    let mut children_of: HashMap<String, Vec<&ActorStatus>> = HashMap::new();
89    let mut roots: Vec<&ActorStatus> = Vec::new();
90
91    for s in flat {
92        match &s.parent {
93            Some(p) if flat.iter().any(|x| &x.path == p) => {
94                children_of.entry(p.clone()).or_default().push(s);
95            }
96            _ => roots.push(s),
97        }
98    }
99
100    fn to_node(
101        s: &ActorStatus,
102        children_of: &std::collections::HashMap<String, Vec<&ActorStatus>>,
103    ) -> ActorTreeNode {
104        let name = s.path.rsplit('/').next().unwrap_or(&s.path).to_string();
105        let children = children_of
106            .get(&s.path)
107            .map(|v| v.iter().map(|c| to_node(c, children_of)).collect())
108            .unwrap_or_default();
109        ActorTreeNode {
110            path: s.path.clone(),
111            name,
112            actor_type: s.actor_type.clone(),
113            mailbox_depth: s.mailbox_depth,
114            children,
115        }
116    }
117
118    roots.iter().map(|r| to_node(r, &children_of)).collect()
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124
125    fn status(path: &str, parent: Option<&str>) -> ActorStatus {
126        ActorStatus {
127            path: path.into(),
128            parent: parent.map(|s| s.into()),
129            actor_type: "Test".into(),
130            mailbox_depth: 0,
131            spawned_at: "now".into(),
132        }
133    }
134
135    #[test]
136    fn tracks_spawn_and_stop() {
137        let bus = TelemetryBus::new(8);
138        let reg = ActorRegistry::new(bus);
139        reg.record_spawn(status("/user/a", Some("/user")));
140        reg.record_spawn(status("/user/b", Some("/user")));
141        assert_eq!(reg.live_count(), 2);
142        reg.record_stop("/user/a");
143        assert_eq!(reg.live_count(), 1);
144        assert_eq!(reg.total_spawned(), 2);
145        assert_eq!(reg.total_stopped(), 1);
146    }
147
148    #[test]
149    fn builds_tree_with_known_parents() {
150        let bus = TelemetryBus::new(8);
151        let reg = ActorRegistry::new(bus);
152        reg.record_spawn(status("/user", None));
153        reg.record_spawn(status("/user/a", Some("/user")));
154        reg.record_spawn(status("/user/a/aa", Some("/user/a")));
155        let snap = reg.snapshot();
156        assert_eq!(snap.total, 3);
157        assert_eq!(snap.roots.len(), 1);
158        assert_eq!(snap.roots[0].children.len(), 1);
159        assert_eq!(snap.roots[0].children[0].children.len(), 1);
160    }
161
162    #[test]
163    fn mailbox_depth_updates_status() {
164        let bus = TelemetryBus::new(8);
165        let reg = ActorRegistry::new(bus);
166        reg.record_spawn(status("/user/a", Some("/user")));
167        reg.record_mailbox_depth("/user/a", 42);
168        let snap = reg.snapshot();
169        assert_eq!(snap.flat[0].mailbox_depth, 42);
170    }
171}