palladium_runtime/
registry.rs1use crate::common::LifecycleSignal;
2use crate::introspection::{ActorInfo, ActorQuery, ActorState};
3use crate::reactor::Reactor;
4use crate::sharded_map::ShardedIndexMap;
5use palladium_actor::{ActorPath, AddrHash};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use tokio::sync::mpsc::UnboundedSender;
9
10pub struct ActorSlot<R: Reactor> {
12 pub path: ActorPath,
13 pub addr: AddrHash,
14 pub mailbox_tx: palladium_transport::MailboxSender,
15 pub ctrl_tx: UnboundedSender<LifecycleSignal<R>>,
16 pub task_handle: Arc<parking_lot::Mutex<Option<Box<dyn palladium_actor::SpawnHandle>>>>,
17 pub supervisor_addr: Option<AddrHash>,
18 pub running: Arc<std::sync::atomic::AtomicBool>,
19 pub restart_count: Arc<std::sync::atomic::AtomicU32>,
21 pub message_count: Arc<AtomicU64>,
24 pub compute_time_ns: Arc<AtomicU64>,
26}
27
28pub struct ActorRegistry<R: Reactor> {
33 pub(crate) next_generation: AtomicU64,
35 pub(crate) addr_to_slot: ShardedIndexMap<AddrHash, Arc<ActorSlot<R>>>,
37 pub(crate) path_to_addr: ShardedIndexMap<ActorPath, AddrHash>,
39}
40
41impl<R: Reactor> Default for ActorRegistry<R> {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl<R: Reactor> ActorRegistry<R> {
48 pub fn new() -> Self {
49 Self {
50 next_generation: AtomicU64::new(1),
51 addr_to_slot: ShardedIndexMap::new(),
52 path_to_addr: ShardedIndexMap::new(),
53 }
54 }
55
56 pub fn next_generation(&self) -> u64 {
58 self.next_generation.fetch_add(1, Ordering::Relaxed)
59 }
60
61 pub fn insert(&self, slot: ActorSlot<R>) {
62 let addr = slot.addr;
63 let path = slot.path.clone();
64 let slot = Arc::new(slot);
65 self.addr_to_slot.insert(addr, slot);
66 self.path_to_addr.insert(path, addr);
67 }
68
69 #[allow(dead_code)]
70 pub fn remove(&self, addr: AddrHash) -> Option<Arc<ActorSlot<R>>> {
71 if let Some(slot) = self.addr_to_slot.remove(&addr) {
72 self.path_to_addr
75 .remove_if(&slot.path, |current_addr| *current_addr == addr);
76 return Some(slot);
77 }
78 None
79 }
80
81 pub fn get_by_addr(&self, addr: AddrHash) -> Option<Arc<ActorSlot<R>>> {
82 self.addr_to_slot.get(&addr, |v| v.cloned())
83 }
84
85 pub fn get_by_path(&self, path: &ActorPath) -> Option<Arc<ActorSlot<R>>> {
86 let addr = self.path_to_addr.get(path, |v| v.copied())?;
87 self.get_by_addr(addr)
88 }
89
90 pub fn update_running(&self, addr: AddrHash, running: bool) {
91 self.addr_to_slot.get(&addr, |slot| {
92 if let Some(slot) = slot {
93 slot.running.store(running, Ordering::Relaxed);
94 }
95 });
96 }
97
98 pub fn increment_restart_count(&self, addr: AddrHash) {
100 self.addr_to_slot.get(&addr, |slot| {
101 if let Some(slot) = slot {
102 slot.restart_count.fetch_add(1, Ordering::Relaxed);
103 }
104 });
105 }
106
107 fn slot_to_info(slot: &ActorSlot<R>, core_id: usize) -> ActorInfo {
109 ActorInfo {
110 path: slot.path.clone(),
111 state: if slot.running.load(Ordering::Relaxed) {
112 ActorState::Running
113 } else {
114 ActorState::Stopped
115 },
116 core_id,
117 mailbox_depth: slot.mailbox_tx.len(),
118 mailbox_capacity: slot.mailbox_tx.capacity(),
119 restart_count: slot.restart_count.load(Ordering::Relaxed),
120 message_count: slot.message_count.load(Ordering::Relaxed),
121 total_compute_time_ns: slot.compute_time_ns.load(Ordering::Relaxed),
122 }
123 }
124
125 pub(crate) fn snapshot(&self, query: &ActorQuery, core_id: usize) -> Vec<ActorInfo> {
130 let mut result = Vec::new();
131 self.addr_to_slot.for_each(|_, slot| {
132 let info = Self::slot_to_info(slot, core_id);
133 if query.matches(&info) {
134 result.push(info);
135 }
136 });
137 result
138 }
139
140 pub(crate) fn actor_info_by_path(&self, path: &ActorPath, core_id: usize) -> Option<ActorInfo> {
142 self.get_by_path(path)
143 .map(|slot| Self::slot_to_info(&slot, core_id))
144 }
145
146 pub(crate) fn cancel_all(&self) {
148 self.addr_to_slot.for_each(|_, slot| {
149 if let Some(mut handle_guard) = slot.task_handle.try_lock() {
150 if let Some(handle) = handle_guard.take() {
151 handle.abort();
152 }
153 }
154 slot.running.store(false, Ordering::Relaxed);
155 });
156 self.addr_to_slot.clear();
157 self.path_to_addr.clear();
158 }
159}