Skip to main content

starlang_runtime/
registry.rs

1//! Process registry for mapping PIDs to process handles.
2//!
3//! The [`ProcessRegistry`] provides thread-safe access to all running processes,
4//! allowing message delivery, process lookup, and name registration.
5
6use crate::SendError;
7use crate::process_handle::ProcessHandle;
8use dashmap::DashMap;
9use starlang_core::{Pid, Term};
10use std::sync::{Arc, OnceLock};
11
12/// Type alias for the remote send hook function.
13///
14/// This function is called when attempting to send to a non-local PID.
15/// Returns `Ok(())` if the message was sent, or an error.
16pub type RemoteSendHook = fn(pid: Pid, data: Vec<u8>) -> Result<(), SendError>;
17
18/// Global hook for sending to remote processes.
19///
20/// Set by the distribution layer when initialized.
21static REMOTE_SEND_HOOK: OnceLock<RemoteSendHook> = OnceLock::new();
22
23/// Set the remote send hook.
24///
25/// This should be called by the distribution layer when it's initialized.
26/// Can only be set once.
27pub fn set_remote_send_hook(hook: RemoteSendHook) -> Result<(), RemoteSendHook> {
28    REMOTE_SEND_HOOK.set(hook)
29}
30
31/// A thread-safe registry of all running processes.
32///
33/// The registry maintains mappings from:
34/// - PIDs to process handles
35/// - Registered names to PIDs
36///
37/// # Examples
38///
39/// ```
40/// use starlang_runtime::ProcessRegistry;
41///
42/// let registry = ProcessRegistry::new();
43/// // Processes are registered when spawned via the runtime
44/// ```
45#[derive(Clone)]
46pub struct ProcessRegistry {
47    /// Map from PID to process handle.
48    processes: Arc<DashMap<Pid, ProcessHandle>>,
49    /// Map from registered name to PID.
50    names: Arc<DashMap<String, Pid>>,
51}
52
53impl ProcessRegistry {
54    /// Creates a new empty registry.
55    pub fn new() -> Self {
56        Self {
57            processes: Arc::new(DashMap::new()),
58            names: Arc::new(DashMap::new()),
59        }
60    }
61
62    /// Registers a process in the registry.
63    pub fn register(&self, handle: ProcessHandle) {
64        self.processes.insert(handle.pid(), handle);
65    }
66
67    /// Removes a process from the registry.
68    ///
69    /// Also removes any name registrations for this process.
70    pub fn unregister(&self, pid: Pid) -> Option<ProcessHandle> {
71        // Remove any name registrations
72        self.names.retain(|_, &mut p| p != pid);
73        // Remove from processes
74        self.processes.remove(&pid).map(|(_, h)| h)
75    }
76
77    /// Gets a handle to a process by PID.
78    pub fn get(&self, pid: Pid) -> Option<ProcessHandle> {
79        self.processes.get(&pid).map(|r| r.value().clone())
80    }
81
82    /// Returns `true` if a process with the given PID exists.
83    pub fn contains(&self, pid: Pid) -> bool {
84        self.processes.contains_key(&pid)
85    }
86
87    /// Returns the number of registered processes.
88    pub fn len(&self) -> usize {
89        self.processes.len()
90    }
91
92    /// Returns `true` if the registry is empty.
93    pub fn is_empty(&self) -> bool {
94        self.processes.is_empty()
95    }
96
97    /// Sends a raw message to a process.
98    ///
99    /// If the PID refers to a remote process and distribution is configured,
100    /// the message will be routed through the distribution layer.
101    pub fn send_raw(&self, pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
102        // Check if this is a remote PID
103        if !pid.is_local() {
104            // Try to send via distribution layer
105            if let Some(hook) = REMOTE_SEND_HOOK.get() {
106                return hook(pid, data);
107            } else {
108                // Distribution not configured - can't send to remote
109                return Err(SendError::ProcessNotFound(pid));
110            }
111        }
112
113        // Local PID - send directly
114        match self.processes.get(&pid) {
115            Some(handle) => handle.send_raw(data),
116            None => Err(SendError::ProcessNotFound(pid)),
117        }
118    }
119
120    /// Sends a typed message to a process.
121    pub fn send<M: Term>(&self, pid: Pid, msg: &M) -> Result<(), SendError> {
122        self.send_raw(pid, msg.encode())
123    }
124
125    /// Registers a name for a process.
126    ///
127    /// Returns `false` if the name is already registered.
128    pub fn register_name(&self, name: String, pid: Pid) -> bool {
129        if self.names.contains_key(&name) {
130            return false;
131        }
132        self.names.insert(name, pid);
133        true
134    }
135
136    /// Looks up a process by registered name.
137    pub fn whereis(&self, name: &str) -> Option<Pid> {
138        self.names.get(name).map(|r| *r.value())
139    }
140
141    /// Unregisters a name.
142    ///
143    /// Returns the PID that was registered, if any.
144    pub fn unregister_name(&self, name: &str) -> Option<Pid> {
145        self.names.remove(name).map(|(_, pid)| pid)
146    }
147
148    /// Returns all registered names.
149    pub fn registered_names(&self) -> Vec<String> {
150        self.names.iter().map(|r| r.key().clone()).collect()
151    }
152
153    /// Returns all process PIDs.
154    pub fn pids(&self) -> Vec<Pid> {
155        self.processes.iter().map(|r| *r.key()).collect()
156    }
157
158    /// Iterates over all processes, calling the provided function.
159    pub fn for_each<F>(&self, f: F)
160    where
161        F: FnMut(ProcessHandle),
162    {
163        self.processes.iter().map(|r| r.value().clone()).for_each(f);
164    }
165}
166
167impl Default for ProcessRegistry {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173impl std::fmt::Debug for ProcessRegistry {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("ProcessRegistry")
176            .field("process_count", &self.processes.len())
177            .field("name_count", &self.names.len())
178            .finish()
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::mailbox::Mailbox;
186    use crate::process_handle::ProcessState;
187    use std::sync::RwLock;
188
189    fn create_test_handle(pid: Pid) -> ProcessHandle {
190        let (_mailbox, sender) = Mailbox::new();
191        let state = Arc::new(RwLock::new(ProcessState::new(pid)));
192        ProcessHandle::new(pid, sender, state, None)
193    }
194
195    #[test]
196    fn test_register_and_get() {
197        let registry = ProcessRegistry::new();
198        let pid = Pid::new();
199        let handle = create_test_handle(pid);
200
201        registry.register(handle);
202
203        assert!(registry.contains(pid));
204        assert_eq!(registry.len(), 1);
205
206        let retrieved = registry.get(pid).unwrap();
207        assert_eq!(retrieved.pid(), pid);
208    }
209
210    #[test]
211    fn test_unregister() {
212        let registry = ProcessRegistry::new();
213        let pid = Pid::new();
214        let handle = create_test_handle(pid);
215
216        registry.register(handle);
217        assert!(registry.contains(pid));
218
219        let removed = registry.unregister(pid);
220        assert!(removed.is_some());
221        assert!(!registry.contains(pid));
222        assert!(registry.is_empty());
223    }
224
225    #[test]
226    fn test_name_registration() {
227        let registry = ProcessRegistry::new();
228        let pid = Pid::new();
229        let handle = create_test_handle(pid);
230
231        registry.register(handle);
232
233        // Register a name
234        assert!(registry.register_name("my_process".to_string(), pid));
235
236        // Can look up by name
237        assert_eq!(registry.whereis("my_process"), Some(pid));
238
239        // Can't register the same name twice
240        let pid2 = Pid::new();
241        assert!(!registry.register_name("my_process".to_string(), pid2));
242
243        // Unregister the name
244        assert_eq!(registry.unregister_name("my_process"), Some(pid));
245        assert_eq!(registry.whereis("my_process"), None);
246    }
247
248    #[test]
249    fn test_unregister_removes_names() {
250        let registry = ProcessRegistry::new();
251        let pid = Pid::new();
252        let handle = create_test_handle(pid);
253
254        registry.register(handle);
255        registry.register_name("my_process".to_string(), pid);
256
257        // Unregistering the process should also remove the name
258        registry.unregister(pid);
259
260        assert_eq!(registry.whereis("my_process"), None);
261    }
262
263    #[test]
264    fn test_pids_and_names() {
265        let registry = ProcessRegistry::new();
266
267        let pid1 = Pid::new();
268        let pid2 = Pid::new();
269
270        registry.register(create_test_handle(pid1));
271        registry.register(create_test_handle(pid2));
272        registry.register_name("proc1".to_string(), pid1);
273        registry.register_name("proc2".to_string(), pid2);
274
275        let pids = registry.pids();
276        assert_eq!(pids.len(), 2);
277        assert!(pids.contains(&pid1));
278        assert!(pids.contains(&pid2));
279
280        let names = registry.registered_names();
281        assert_eq!(names.len(), 2);
282        assert!(names.contains(&"proc1".to_string()));
283        assert!(names.contains(&"proc2".to_string()));
284    }
285}