starlang_runtime/
registry.rs1use crate::SendError;
7use crate::process_handle::ProcessHandle;
8use dashmap::DashMap;
9use starlang_core::{Pid, Term};
10use std::sync::{Arc, OnceLock};
11
12pub type RemoteSendHook = fn(pid: Pid, data: Vec<u8>) -> Result<(), SendError>;
17
18static REMOTE_SEND_HOOK: OnceLock<RemoteSendHook> = OnceLock::new();
22
23pub fn set_remote_send_hook(hook: RemoteSendHook) -> Result<(), RemoteSendHook> {
28 REMOTE_SEND_HOOK.set(hook)
29}
30
31#[derive(Clone)]
46pub struct ProcessRegistry {
47 processes: Arc<DashMap<Pid, ProcessHandle>>,
49 names: Arc<DashMap<String, Pid>>,
51}
52
53impl ProcessRegistry {
54 pub fn new() -> Self {
56 Self {
57 processes: Arc::new(DashMap::new()),
58 names: Arc::new(DashMap::new()),
59 }
60 }
61
62 pub fn register(&self, handle: ProcessHandle) {
64 self.processes.insert(handle.pid(), handle);
65 }
66
67 pub fn unregister(&self, pid: Pid) -> Option<ProcessHandle> {
71 self.names.retain(|_, &mut p| p != pid);
73 self.processes.remove(&pid).map(|(_, h)| h)
75 }
76
77 pub fn get(&self, pid: Pid) -> Option<ProcessHandle> {
79 self.processes.get(&pid).map(|r| r.value().clone())
80 }
81
82 pub fn contains(&self, pid: Pid) -> bool {
84 self.processes.contains_key(&pid)
85 }
86
87 pub fn len(&self) -> usize {
89 self.processes.len()
90 }
91
92 pub fn is_empty(&self) -> bool {
94 self.processes.is_empty()
95 }
96
97 pub fn send_raw(&self, pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
102 if !pid.is_local() {
104 if let Some(hook) = REMOTE_SEND_HOOK.get() {
106 return hook(pid, data);
107 } else {
108 return Err(SendError::ProcessNotFound(pid));
110 }
111 }
112
113 match self.processes.get(&pid) {
115 Some(handle) => handle.send_raw(data),
116 None => Err(SendError::ProcessNotFound(pid)),
117 }
118 }
119
120 pub fn send<M: Term>(&self, pid: Pid, msg: &M) -> Result<(), SendError> {
122 self.send_raw(pid, msg.encode())
123 }
124
125 pub fn register_name(&self, name: String, pid: Pid) -> bool {
129 if self.names.contains_key(&name) {
130 return false;
131 }
132 self.names.insert(name, pid);
133 true
134 }
135
136 pub fn whereis(&self, name: &str) -> Option<Pid> {
138 self.names.get(name).map(|r| *r.value())
139 }
140
141 pub fn unregister_name(&self, name: &str) -> Option<Pid> {
145 self.names.remove(name).map(|(_, pid)| pid)
146 }
147
148 pub fn registered_names(&self) -> Vec<String> {
150 self.names.iter().map(|r| r.key().clone()).collect()
151 }
152
153 pub fn pids(&self) -> Vec<Pid> {
155 self.processes.iter().map(|r| *r.key()).collect()
156 }
157
158 pub fn for_each<F>(&self, f: F)
160 where
161 F: FnMut(ProcessHandle),
162 {
163 self.processes.iter().map(|r| r.value().clone()).for_each(f);
164 }
165}
166
167impl Default for ProcessRegistry {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl std::fmt::Debug for ProcessRegistry {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("ProcessRegistry")
176 .field("process_count", &self.processes.len())
177 .field("name_count", &self.names.len())
178 .finish()
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::mailbox::Mailbox;
186 use crate::process_handle::ProcessState;
187 use std::sync::RwLock;
188
189 fn create_test_handle(pid: Pid) -> ProcessHandle {
190 let (_mailbox, sender) = Mailbox::new();
191 let state = Arc::new(RwLock::new(ProcessState::new(pid)));
192 ProcessHandle::new(pid, sender, state, None)
193 }
194
195 #[test]
196 fn test_register_and_get() {
197 let registry = ProcessRegistry::new();
198 let pid = Pid::new();
199 let handle = create_test_handle(pid);
200
201 registry.register(handle);
202
203 assert!(registry.contains(pid));
204 assert_eq!(registry.len(), 1);
205
206 let retrieved = registry.get(pid).unwrap();
207 assert_eq!(retrieved.pid(), pid);
208 }
209
210 #[test]
211 fn test_unregister() {
212 let registry = ProcessRegistry::new();
213 let pid = Pid::new();
214 let handle = create_test_handle(pid);
215
216 registry.register(handle);
217 assert!(registry.contains(pid));
218
219 let removed = registry.unregister(pid);
220 assert!(removed.is_some());
221 assert!(!registry.contains(pid));
222 assert!(registry.is_empty());
223 }
224
225 #[test]
226 fn test_name_registration() {
227 let registry = ProcessRegistry::new();
228 let pid = Pid::new();
229 let handle = create_test_handle(pid);
230
231 registry.register(handle);
232
233 assert!(registry.register_name("my_process".to_string(), pid));
235
236 assert_eq!(registry.whereis("my_process"), Some(pid));
238
239 let pid2 = Pid::new();
241 assert!(!registry.register_name("my_process".to_string(), pid2));
242
243 assert_eq!(registry.unregister_name("my_process"), Some(pid));
245 assert_eq!(registry.whereis("my_process"), None);
246 }
247
248 #[test]
249 fn test_unregister_removes_names() {
250 let registry = ProcessRegistry::new();
251 let pid = Pid::new();
252 let handle = create_test_handle(pid);
253
254 registry.register(handle);
255 registry.register_name("my_process".to_string(), pid);
256
257 registry.unregister(pid);
259
260 assert_eq!(registry.whereis("my_process"), None);
261 }
262
263 #[test]
264 fn test_pids_and_names() {
265 let registry = ProcessRegistry::new();
266
267 let pid1 = Pid::new();
268 let pid2 = Pid::new();
269
270 registry.register(create_test_handle(pid1));
271 registry.register(create_test_handle(pid2));
272 registry.register_name("proc1".to_string(), pid1);
273 registry.register_name("proc2".to_string(), pid2);
274
275 let pids = registry.pids();
276 assert_eq!(pids.len(), 2);
277 assert!(pids.contains(&pid1));
278 assert!(pids.contains(&pid2));
279
280 let names = registry.registered_names();
281 assert_eq!(names.len(), 2);
282 assert!(names.contains(&"proc1".to_string()));
283 assert!(names.contains(&"proc2".to_string()));
284 }
285}