Skip to main content

palladium_runtime/
registry.rs

1use crate::common::LifecycleSignal;
2use crate::introspection::{ActorInfo, ActorQuery, ActorState};
3use crate::reactor::Reactor;
4use crate::sharded_map::ShardedIndexMap;
5use palladium_actor::{ActorPath, AddrHash};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use tokio::sync::mpsc::UnboundedSender;
9
10/// A slot in the actor registry.
11pub struct ActorSlot<R: Reactor> {
12    pub path: ActorPath,
13    pub addr: AddrHash,
14    pub mailbox_tx: palladium_transport::MailboxSender,
15    pub ctrl_tx: UnboundedSender<LifecycleSignal<R>>,
16    pub task_handle: Arc<parking_lot::Mutex<Option<Box<dyn palladium_actor::SpawnHandle>>>>,
17    pub supervisor_addr: Option<AddrHash>,
18    pub running: Arc<std::sync::atomic::AtomicBool>,
19    /// Times this actor path has been restarted by its supervisor.
20    pub restart_count: Arc<std::sync::atomic::AtomicU32>,
21    /// Total successful `on_message` completions. Shared with the actor task
22    /// so the task can increment without holding the registry lock.
23    pub message_count: Arc<AtomicU64>,
24    /// Total wall-clock nanoseconds spent in lifecycle hooks.
25    pub compute_time_ns: Arc<AtomicU64>,
26}
27
28/// Central registry for all actors in the engine (REQ-014).
29///
30/// Uses sharded concurrent maps for deterministic iteration order (shard 0..16,
31/// insertion order within each shard) while still supporting concurrent access.
32pub struct ActorRegistry<R: Reactor> {
33    /// Incremented on every insert to provide generational safety in `AddrHash`.
34    pub(crate) next_generation: AtomicU64,
35    /// addr -> slot mapping for message delivery.
36    pub(crate) addr_to_slot: ShardedIndexMap<AddrHash, Arc<ActorSlot<R>>>,
37    /// path -> addr mapping for name resolution.
38    pub(crate) path_to_addr: ShardedIndexMap<ActorPath, AddrHash>,
39}
40
41impl<R: Reactor> Default for ActorRegistry<R> {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl<R: Reactor> ActorRegistry<R> {
48    pub fn new() -> Self {
49        Self {
50            next_generation: AtomicU64::new(1),
51            addr_to_slot: ShardedIndexMap::new(),
52            path_to_addr: ShardedIndexMap::new(),
53        }
54    }
55
56    /// Assign a unique generation ID for a new actor.
57    pub fn next_generation(&self) -> u64 {
58        self.next_generation.fetch_add(1, Ordering::Relaxed)
59    }
60
61    pub fn insert(&self, slot: ActorSlot<R>) {
62        let addr = slot.addr;
63        let path = slot.path.clone();
64        let slot = Arc::new(slot);
65        self.addr_to_slot.insert(addr, slot);
66        self.path_to_addr.insert(path, addr);
67    }
68
69    #[allow(dead_code)]
70    pub fn remove(&self, addr: AddrHash) -> Option<Arc<ActorSlot<R>>> {
71        if let Some(slot) = self.addr_to_slot.remove(&addr) {
72            // Only remove from path_to_addr if this is still the CURRENT address
73            // for that path (to prevent a slow 'stop' from removing a fresh 'start').
74            self.path_to_addr
75                .remove_if(&slot.path, |current_addr| *current_addr == addr);
76            return Some(slot);
77        }
78        None
79    }
80
81    pub fn get_by_addr(&self, addr: AddrHash) -> Option<Arc<ActorSlot<R>>> {
82        self.addr_to_slot.get(&addr, |v| v.cloned())
83    }
84
85    pub fn get_by_path(&self, path: &ActorPath) -> Option<Arc<ActorSlot<R>>> {
86        let addr = self.path_to_addr.get(path, |v| v.copied())?;
87        self.get_by_addr(addr)
88    }
89
90    pub fn update_running(&self, addr: AddrHash, running: bool) {
91        self.addr_to_slot.get(&addr, |slot| {
92            if let Some(slot) = slot {
93                slot.running.store(running, Ordering::Relaxed);
94            }
95        });
96    }
97
98    /// Increment the restart counter for an actor (called by supervisor on restart).
99    pub fn increment_restart_count(&self, addr: AddrHash) {
100        self.addr_to_slot.get(&addr, |slot| {
101            if let Some(slot) = slot {
102                slot.restart_count.fetch_add(1, Ordering::Relaxed);
103            }
104        });
105    }
106
107    /// Build an [`ActorInfo`] from a slot.
108    fn slot_to_info(slot: &ActorSlot<R>, core_id: usize) -> ActorInfo {
109        ActorInfo {
110            path: slot.path.clone(),
111            state: if slot.running.load(Ordering::Relaxed) {
112                ActorState::Running
113            } else {
114                ActorState::Stopped
115            },
116            core_id,
117            mailbox_depth: slot.mailbox_tx.len(),
118            mailbox_capacity: slot.mailbox_tx.capacity(),
119            restart_count: slot.restart_count.load(Ordering::Relaxed),
120            message_count: slot.message_count.load(Ordering::Relaxed),
121            total_compute_time_ns: slot.compute_time_ns.load(Ordering::Relaxed),
122        }
123    }
124
125    /// Snapshot all actors, optionally filtered by `query`.
126    ///
127    /// Iteration order is deterministic: shard 0..16, insertion order within
128    /// each shard.
129    pub(crate) fn snapshot(&self, query: &ActorQuery, core_id: usize) -> Vec<ActorInfo> {
130        let mut result = Vec::new();
131        self.addr_to_slot.for_each(|_, slot| {
132            let info = Self::slot_to_info(slot, core_id);
133            if query.matches(&info) {
134                result.push(info);
135            }
136        });
137        result
138    }
139
140    /// Get info for a single actor by path.
141    pub(crate) fn actor_info_by_path(&self, path: &ActorPath, core_id: usize) -> Option<ActorInfo> {
142        self.get_by_path(path)
143            .map(|slot| Self::slot_to_info(&slot, core_id))
144    }
145
146    /// Brutally cancel all running actor tasks (REQ-073).
147    pub(crate) fn cancel_all(&self) {
148        self.addr_to_slot.for_each(|_, slot| {
149            if let Some(mut handle_guard) = slot.task_handle.try_lock() {
150                if let Some(handle) = handle_guard.take() {
151                    handle.abort();
152                }
153            }
154            slot.running.store(false, Ordering::Relaxed);
155        });
156        self.addr_to_slot.clear();
157        self.path_to_addr.clear();
158    }
159}