Skip to main content

starlang_runtime/
context.rs

1//! Process execution context.
2//!
3//! The [`Context`] provides a process with access to runtime services
4//! like sending messages, creating links/monitors, and spawning new processes.
5
6use crate::SendError;
7use crate::mailbox::Mailbox;
8use crate::process_handle::{ProcessHandle, ProcessState};
9use crate::registry::ProcessRegistry;
10use starlang_core::{ExitReason, Pid, Ref, SystemMessage, Term};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13
14/// The execution context for a process.
15///
16/// A `Context` is passed to each process and provides access to:
17/// - The process's own PID
18/// - The process's mailbox for receiving messages
19/// - Methods for sending messages to other processes
20/// - Methods for creating links and monitors
21///
22/// # Examples
23///
24/// ```ignore
25/// async fn my_process(ctx: Context) {
26///     let my_pid = ctx.pid();
27///
28///     // Receive a message
29///     if let Some(envelope) = ctx.recv().await {
30///         // Handle message
31///     }
32///
33///     // Send a message to another process
34///     ctx.send(other_pid, &MyMessage { data: 42 }).unwrap();
35/// }
36/// ```
37pub struct Context {
38    /// Our process ID.
39    pid: Pid,
40    /// Our mailbox for receiving messages.
41    mailbox: Mailbox,
42    /// Our process state.
43    state: Arc<RwLock<ProcessState>>,
44    /// Reference to the process registry.
45    registry: ProcessRegistry,
46}
47
48impl Context {
49    /// Creates a new context for a process.
50    pub fn new(
51        pid: Pid,
52        mailbox: Mailbox,
53        state: Arc<RwLock<ProcessState>>,
54        registry: ProcessRegistry,
55    ) -> Self {
56        Self {
57            pid,
58            mailbox,
59            state,
60            registry,
61        }
62    }
63
64    /// Returns this process's PID.
65    pub fn pid(&self) -> Pid {
66        self.pid
67    }
68
69    /// Receives the next message from the mailbox.
70    ///
71    /// Returns `None` if the mailbox is closed.
72    pub async fn recv(&mut self) -> Option<Vec<u8>> {
73        self.mailbox.recv().await.map(|e| e.data)
74    }
75
76    /// Receives the next message with a timeout.
77    ///
78    /// Returns `Ok(Some(data))` if a message was received,
79    /// `Ok(None)` if the mailbox was closed,
80    /// or `Err(())` if the timeout elapsed.
81    pub async fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, ()> {
82        self.mailbox
83            .recv_timeout(timeout)
84            .await
85            .map(|opt| opt.map(|e| e.data))
86    }
87
88    /// Tries to receive a message without blocking.
89    pub fn try_recv(&mut self) -> Option<Vec<u8>> {
90        self.mailbox.try_recv().ok().map(|e| e.data)
91    }
92
93    /// Sends a raw message to another process.
94    pub fn send_raw(&self, pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
95        self.registry.send_raw(pid, data)
96    }
97
98    /// Sends a typed message to another process.
99    pub fn send<M: Term>(&self, pid: Pid, msg: &M) -> Result<(), SendError> {
100        self.registry.send(pid, msg)
101    }
102
103    /// Sets the trap_exit flag for this process.
104    ///
105    /// When `true`, exit signals from linked processes are delivered
106    /// as `SystemMessage::Exit` messages instead of terminating this process.
107    ///
108    /// Returns the previous value.
109    pub fn set_trap_exit(&self, trap: bool) -> bool {
110        let mut state = self.state.write().unwrap();
111        let prev = state.trap_exit;
112        state.trap_exit = trap;
113        prev
114    }
115
116    /// Returns whether this process is trapping exits.
117    pub fn is_trapping_exits(&self) -> bool {
118        let state = self.state.read().unwrap();
119        state.trap_exit
120    }
121
122    /// Creates a bidirectional link with another process.
123    ///
124    /// If either process terminates abnormally, the other will receive
125    /// an exit signal.
126    pub fn link(&self, other: Pid) -> Result<(), SendError> {
127        // Get our handle to update our links
128        {
129            let mut state = self.state.write().unwrap();
130            state.links.insert(other);
131        }
132
133        // Get the other process's handle to update their links
134        if let Some(other_handle) = self.registry.get(other) {
135            other_handle.add_link(self.pid);
136            Ok(())
137        } else {
138            // Other process doesn't exist - remove our link and error
139            let mut state = self.state.write().unwrap();
140            state.links.remove(&other);
141            Err(SendError::ProcessNotFound(other))
142        }
143    }
144
145    /// Removes a link with another process.
146    pub fn unlink(&self, other: Pid) {
147        // Remove from our links
148        {
149            let mut state = self.state.write().unwrap();
150            state.links.remove(&other);
151        }
152
153        // Remove from their links
154        if let Some(other_handle) = self.registry.get(other) {
155            other_handle.remove_link(self.pid);
156        }
157    }
158
159    /// Creates a monitor on another process.
160    ///
161    /// Returns a reference that will be included in the `DOWN` message
162    /// when the monitored process terminates.
163    pub fn monitor(&self, target: Pid) -> Result<Ref, SendError> {
164        let reference = Ref::new();
165
166        // Record that we're monitoring the target
167        {
168            let mut state = self.state.write().unwrap();
169            state.monitors.insert(reference, target);
170        }
171
172        // Tell the target they're being monitored
173        if let Some(target_handle) = self.registry.get(target) {
174            target_handle.add_monitored_by(reference, self.pid);
175            Ok(reference)
176        } else {
177            // Target doesn't exist - send immediate DOWN message
178            let mut state = self.state.write().unwrap();
179            state.monitors.remove(&reference);
180
181            // Queue a DOWN message for ourselves
182            let down = SystemMessage::down(reference, target, ExitReason::error("noproc"));
183            let _ = self.registry.send(self.pid, &down);
184
185            Ok(reference)
186        }
187    }
188
189    /// Removes a monitor.
190    ///
191    /// The reference will no longer be valid and no `DOWN` message
192    /// will be sent for this monitor.
193    pub fn demonitor(&self, reference: Ref) {
194        // Get the target PID and remove from our monitors
195        let target = {
196            let mut state = self.state.write().unwrap();
197            state.monitors.remove(&reference)
198        };
199
200        // Remove from target's monitored_by
201        if let Some(target_pid) = target
202            && let Some(target_handle) = self.registry.get(target_pid)
203        {
204            target_handle.remove_monitored_by(reference);
205        }
206    }
207
208    /// Sends an exit signal to another process.
209    ///
210    /// If `reason` is `ExitReason::Killed`, the target will terminate
211    /// unconditionally. Otherwise, the behavior depends on whether
212    /// the target is trapping exits.
213    pub fn exit(&self, target: Pid, reason: ExitReason) -> Result<(), SendError> {
214        if let Some(handle) = self.registry.get(target) {
215            if reason.is_killed() {
216                // Killed is unconditional - mark terminated directly
217                handle.mark_terminated(reason);
218            } else if handle.is_trapping_exits() {
219                // Send as a message
220                let exit_msg = SystemMessage::exit(self.pid, reason);
221                handle.send(&exit_msg)?;
222            } else if reason.is_abnormal() {
223                // Propagate the exit
224                handle.mark_terminated(reason);
225            }
226            // Normal exits from other processes are ignored if not trapping
227            Ok(())
228        } else {
229            Err(SendError::ProcessNotFound(target))
230        }
231    }
232
233    /// Looks up a process by registered name.
234    pub fn whereis(&self, name: &str) -> Option<Pid> {
235        self.registry.whereis(name)
236    }
237
238    /// Registers a name for this process.
239    ///
240    /// Returns `false` if the name is already taken.
241    pub fn register(&self, name: String) -> bool {
242        self.registry.register_name(name, self.pid)
243    }
244
245    /// Unregisters a name.
246    pub fn unregister(&self, name: &str) -> Option<Pid> {
247        self.registry.unregister_name(name)
248    }
249
250    /// Returns `true` if the given process is alive.
251    pub fn is_alive(&self, pid: Pid) -> bool {
252        self.registry
253            .get(pid)
254            .map(|h| h.is_alive())
255            .unwrap_or(false)
256    }
257
258    /// Returns a handle to our own process.
259    pub(crate) fn handle(&self) -> ProcessHandle {
260        self.registry.get(self.pid).unwrap()
261    }
262}
263
264impl std::fmt::Debug for Context {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        f.debug_struct("Context").field("pid", &self.pid).finish()
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::mailbox::MailboxSender;
274
275    fn create_test_context(registry: &ProcessRegistry) -> (Context, MailboxSender) {
276        let pid = Pid::new();
277        let (mailbox, sender) = Mailbox::new();
278        let state = Arc::new(RwLock::new(ProcessState::new(pid)));
279
280        // Register the process
281        let handle = ProcessHandle::new(pid, sender.clone(), state.clone(), None);
282        registry.register(handle);
283
284        let ctx = Context::new(pid, mailbox, state, registry.clone());
285        (ctx, sender)
286    }
287
288    #[test]
289    fn test_context_pid() {
290        let registry = ProcessRegistry::new();
291        let (ctx, _sender) = create_test_context(&registry);
292        assert!(ctx.pid().is_local());
293    }
294
295    #[test]
296    fn test_trap_exit() {
297        let registry = ProcessRegistry::new();
298        let (ctx, _sender) = create_test_context(&registry);
299
300        assert!(!ctx.is_trapping_exits());
301
302        let prev = ctx.set_trap_exit(true);
303        assert!(!prev);
304        assert!(ctx.is_trapping_exits());
305    }
306
307    #[tokio::test]
308    async fn test_recv() {
309        let registry = ProcessRegistry::new();
310        let (mut ctx, sender) = create_test_context(&registry);
311
312        // Send a message via the sender
313        sender
314            .send(crate::mailbox::Envelope::new(vec![1, 2, 3]))
315            .unwrap();
316
317        let msg = ctx.recv().await.unwrap();
318        assert_eq!(msg, vec![1, 2, 3]);
319    }
320
321    #[test]
322    fn test_link() {
323        let registry = ProcessRegistry::new();
324        let (ctx1, _sender1) = create_test_context(&registry);
325        let (ctx2, _sender2) = create_test_context(&registry);
326
327        ctx1.link(ctx2.pid()).unwrap();
328
329        // Both should have links
330        let state1 = ctx1.state.read().unwrap();
331        assert!(state1.links.contains(&ctx2.pid()));
332
333        let state2 = ctx2.state.read().unwrap();
334        assert!(state2.links.contains(&ctx1.pid()));
335    }
336
337    #[test]
338    fn test_unlink() {
339        let registry = ProcessRegistry::new();
340        let (ctx1, _sender1) = create_test_context(&registry);
341        let (ctx2, _sender2) = create_test_context(&registry);
342
343        ctx1.link(ctx2.pid()).unwrap();
344        ctx1.unlink(ctx2.pid());
345
346        let state1 = ctx1.state.read().unwrap();
347        assert!(!state1.links.contains(&ctx2.pid()));
348
349        let state2 = ctx2.state.read().unwrap();
350        assert!(!state2.links.contains(&ctx1.pid()));
351    }
352
353    #[test]
354    fn test_monitor() {
355        let registry = ProcessRegistry::new();
356        let (ctx1, _sender1) = create_test_context(&registry);
357        let (ctx2, _sender2) = create_test_context(&registry);
358
359        let reference = ctx1.monitor(ctx2.pid()).unwrap();
360
361        // ctx1 should have the monitor recorded
362        let state1 = ctx1.state.read().unwrap();
363        assert_eq!(state1.monitors.get(&reference), Some(&ctx2.pid()));
364
365        // ctx2 should know it's being monitored
366        let state2 = ctx2.state.read().unwrap();
367        assert_eq!(state2.monitored_by.get(&reference), Some(&ctx1.pid()));
368    }
369
370    #[test]
371    fn test_demonitor() {
372        let registry = ProcessRegistry::new();
373        let (ctx1, _sender1) = create_test_context(&registry);
374        let (ctx2, _sender2) = create_test_context(&registry);
375
376        let reference = ctx1.monitor(ctx2.pid()).unwrap();
377        ctx1.demonitor(reference);
378
379        let state1 = ctx1.state.read().unwrap();
380        assert!(!state1.monitors.contains_key(&reference));
381
382        let state2 = ctx2.state.read().unwrap();
383        assert!(!state2.monitored_by.contains_key(&reference));
384    }
385
386    #[test]
387    fn test_register_name() {
388        let registry = ProcessRegistry::new();
389        let (ctx, _sender) = create_test_context(&registry);
390
391        assert!(ctx.register("test_proc".to_string()));
392        assert_eq!(ctx.whereis("test_proc"), Some(ctx.pid()));
393
394        // Can't register same name twice
395        assert!(!ctx.register("test_proc".to_string()));
396    }
397}