actionqueue_actor/
registry.rs1use std::collections::{HashMap, HashSet};
4
5use actionqueue_core::actor::ActorRegistration;
6use actionqueue_core::ids::{ActorId, TenantId};
7use tracing;
8
9struct ActorEntry {
11 registration: ActorRegistration,
12 active: bool,
13}
14
15#[derive(Default)]
21pub struct ActorRegistry {
22 actors: HashMap<ActorId, ActorEntry>,
23 actors_by_tenant: HashMap<TenantId, HashSet<ActorId>>,
25}
26
27impl ActorRegistry {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn register(&mut self, registration: ActorRegistration) {
35 let actor_id = registration.actor_id();
36 tracing::debug!(%actor_id, "actor registered");
37 if let Some(tenant_id) = registration.tenant_id() {
38 self.actors_by_tenant.entry(tenant_id).or_default().insert(actor_id);
39 }
40 self.actors.insert(actor_id, ActorEntry { registration, active: true });
41 }
42
43 pub fn deregister(&mut self, actor_id: ActorId) {
45 tracing::debug!(%actor_id, "actor deregistered");
46 if let Some(entry) = self.actors.get_mut(&actor_id) {
47 entry.active = false;
48 }
49 }
50
51 pub fn is_active(&self, actor_id: ActorId) -> bool {
53 self.actors.get(&actor_id).is_some_and(|e| e.active)
54 }
55
56 pub fn get(&self, actor_id: ActorId) -> Option<&ActorRegistration> {
58 self.actors.get(&actor_id).map(|e| &e.registration)
59 }
60
61 pub fn identity(&self, actor_id: ActorId) -> Option<&str> {
63 self.actors.get(&actor_id).map(|e| e.registration.identity())
64 }
65
66 pub fn active_actors_for_tenant(&self, tenant_id: TenantId) -> Vec<ActorId> {
68 let Some(ids) = self.actors_by_tenant.get(&tenant_id) else {
69 return Vec::new();
70 };
71 ids.iter().copied().filter(|&id| self.is_active(id)).collect()
72 }
73
74 pub fn all_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
76 self.actors.keys().copied()
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use actionqueue_core::actor::{ActorCapabilities, ActorRegistration};
83 use actionqueue_core::ids::{ActorId, TenantId};
84
85 use super::ActorRegistry;
86
87 fn make_registration(actor_id: ActorId) -> ActorRegistration {
88 let caps = ActorCapabilities::new(vec!["compute".to_string()]).unwrap();
89 ActorRegistration::new(actor_id, "test-actor", caps, 30)
90 }
91
92 #[test]
93 fn register_and_is_active() {
94 let mut registry = ActorRegistry::new();
95 let id = ActorId::new();
96 registry.register(make_registration(id));
97 assert!(registry.is_active(id));
98 assert!(registry.get(id).is_some());
99 }
100
101 #[test]
102 fn deregister_marks_inactive() {
103 let mut registry = ActorRegistry::new();
104 let id = ActorId::new();
105 registry.register(make_registration(id));
106 registry.deregister(id);
107 assert!(!registry.is_active(id));
108 assert!(registry.get(id).is_some());
110 }
111
112 #[test]
113 fn active_actors_for_tenant_filters_correctly() {
114 let mut registry = ActorRegistry::new();
115 let tenant = TenantId::new();
116 let active_id = ActorId::new();
117 let deregistered_id = ActorId::new();
118 let other_tenant_id = ActorId::new();
119
120 let caps = ActorCapabilities::new(vec!["c".to_string()]).unwrap();
121 registry
122 .register(ActorRegistration::new(active_id, "a", caps.clone(), 30).with_tenant(tenant));
123 registry.register(
124 ActorRegistration::new(deregistered_id, "b", caps.clone(), 30).with_tenant(tenant),
125 );
126 registry.register(ActorRegistration::new(other_tenant_id, "c", caps, 30));
127
128 registry.deregister(deregistered_id);
129
130 let active = registry.active_actors_for_tenant(tenant);
131 assert_eq!(active.len(), 1);
132 assert!(active.contains(&active_id));
133 }
134
135 #[test]
136 fn identity_returns_lease_owner_string() {
137 let mut registry = ActorRegistry::new();
138 let id = ActorId::new();
139 let caps = ActorCapabilities::new(vec!["c".to_string()]).unwrap();
140 registry.register(ActorRegistration::new(id, "caelum-vessel-1", caps, 30));
141 assert_eq!(registry.identity(id), Some("caelum-vessel-1"));
142 }
143}