Skip to main content

actionqueue_actor/
registry.rs

1//! In-memory registry of registered remote actors.
2
3use std::collections::{HashMap, HashSet};
4
5use actionqueue_core::actor::ActorRegistration;
6use actionqueue_core::ids::{ActorId, TenantId};
7use tracing;
8
9/// Entry in the actor registry.
10struct ActorEntry {
11    registration: ActorRegistration,
12    active: bool,
13}
14
15/// In-memory projection of registered remote actors.
16///
17/// Reconstructed from WAL events at bootstrap via [`ActorRegistry::register`]
18/// and [`ActorRegistry::deregister`] calls. Maintains a secondary index of
19/// actors per tenant for O(N_tenant) lookups.
20#[derive(Default)]
21pub struct ActorRegistry {
22    actors: HashMap<ActorId, ActorEntry>,
23    /// Secondary index: tenant_id → set of actor_ids in that tenant.
24    actors_by_tenant: HashMap<TenantId, HashSet<ActorId>>,
25}
26
27impl ActorRegistry {
28    /// Creates an empty registry.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Registers an actor. If an actor with this ID already exists, it is replaced.
34    pub fn register(&mut self, registration: ActorRegistration) {
35        let actor_id = registration.actor_id();
36        tracing::debug!(%actor_id, "actor registered");
37        if let Some(tenant_id) = registration.tenant_id() {
38            self.actors_by_tenant.entry(tenant_id).or_default().insert(actor_id);
39        }
40        self.actors.insert(actor_id, ActorEntry { registration, active: true });
41    }
42
43    /// Marks an actor as deregistered (inactive).
44    pub fn deregister(&mut self, actor_id: ActorId) {
45        tracing::debug!(%actor_id, "actor deregistered");
46        if let Some(entry) = self.actors.get_mut(&actor_id) {
47            entry.active = false;
48        }
49    }
50
51    /// Returns `true` if the actor is registered and active.
52    pub fn is_active(&self, actor_id: ActorId) -> bool {
53        self.actors.get(&actor_id).is_some_and(|e| e.active)
54    }
55
56    /// Returns the actor registration, if known.
57    pub fn get(&self, actor_id: ActorId) -> Option<&ActorRegistration> {
58        self.actors.get(&actor_id).map(|e| &e.registration)
59    }
60
61    /// Returns the identity string for the actor (used as WAL lease owner).
62    pub fn identity(&self, actor_id: ActorId) -> Option<&str> {
63        self.actors.get(&actor_id).map(|e| e.registration.identity())
64    }
65
66    /// Returns all active actor IDs for the given tenant.
67    pub fn active_actors_for_tenant(&self, tenant_id: TenantId) -> Vec<ActorId> {
68        let Some(ids) = self.actors_by_tenant.get(&tenant_id) else {
69            return Vec::new();
70        };
71        ids.iter().copied().filter(|&id| self.is_active(id)).collect()
72    }
73
74    /// Returns an iterator over all actor IDs (active and deregistered).
75    pub fn all_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
76        self.actors.keys().copied()
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use actionqueue_core::actor::{ActorCapabilities, ActorRegistration};
83    use actionqueue_core::ids::{ActorId, TenantId};
84
85    use super::ActorRegistry;
86
87    fn make_registration(actor_id: ActorId) -> ActorRegistration {
88        let caps = ActorCapabilities::new(vec!["compute".to_string()]).unwrap();
89        ActorRegistration::new(actor_id, "test-actor", caps, 30)
90    }
91
92    #[test]
93    fn register_and_is_active() {
94        let mut registry = ActorRegistry::new();
95        let id = ActorId::new();
96        registry.register(make_registration(id));
97        assert!(registry.is_active(id));
98        assert!(registry.get(id).is_some());
99    }
100
101    #[test]
102    fn deregister_marks_inactive() {
103        let mut registry = ActorRegistry::new();
104        let id = ActorId::new();
105        registry.register(make_registration(id));
106        registry.deregister(id);
107        assert!(!registry.is_active(id));
108        // Entry still exists (for history), just inactive.
109        assert!(registry.get(id).is_some());
110    }
111
112    #[test]
113    fn active_actors_for_tenant_filters_correctly() {
114        let mut registry = ActorRegistry::new();
115        let tenant = TenantId::new();
116        let active_id = ActorId::new();
117        let deregistered_id = ActorId::new();
118        let other_tenant_id = ActorId::new();
119
120        let caps = ActorCapabilities::new(vec!["c".to_string()]).unwrap();
121        registry
122            .register(ActorRegistration::new(active_id, "a", caps.clone(), 30).with_tenant(tenant));
123        registry.register(
124            ActorRegistration::new(deregistered_id, "b", caps.clone(), 30).with_tenant(tenant),
125        );
126        registry.register(ActorRegistration::new(other_tenant_id, "c", caps, 30));
127
128        registry.deregister(deregistered_id);
129
130        let active = registry.active_actors_for_tenant(tenant);
131        assert_eq!(active.len(), 1);
132        assert!(active.contains(&active_id));
133    }
134
135    #[test]
136    fn identity_returns_lease_owner_string() {
137        let mut registry = ActorRegistry::new();
138        let id = ActorId::new();
139        let caps = ActorCapabilities::new(vec!["c".to_string()]).unwrap();
140        registry.register(ActorRegistration::new(id, "caelum-vessel-1", caps, 30));
141        assert_eq!(registry.identity(id), Some("caelum-vessel-1"));
142    }
143}