Skip to main content

starlang_runtime/
process_handle.rs

1//! Process handle for interacting with running processes.
2//!
3//! A [`ProcessHandle`] provides an interface for sending messages to a process,
4//! managing links and monitors, and querying process state.
5
6use crate::SendError;
7use crate::mailbox::{Envelope, MailboxSender};
8use starlang_core::{ExitReason, Pid, Ref, Term};
9use std::collections::HashSet;
10use std::sync::{Arc, RwLock};
11use tokio::sync::oneshot;
12
13/// Internal state shared between the process and its handle.
14#[derive(Debug)]
15pub struct ProcessState {
16    /// The process identifier.
17    pub pid: Pid,
18    /// Whether the process is trapping exits.
19    pub trap_exit: bool,
20    /// Processes linked to this one (bidirectional).
21    pub links: HashSet<Pid>,
22    /// Monitors this process has created (ref -> monitored pid).
23    pub monitors: std::collections::HashMap<Ref, Pid>,
24    /// Processes monitoring this one (ref -> monitoring pid).
25    pub monitored_by: std::collections::HashMap<Ref, Pid>,
26    /// Whether the process has terminated.
27    pub terminated: bool,
28    /// The exit reason if terminated.
29    pub exit_reason: Option<ExitReason>,
30}
31
32impl ProcessState {
33    /// Creates a new process state.
34    pub fn new(pid: Pid) -> Self {
35        Self {
36            pid,
37            trap_exit: false,
38            links: HashSet::new(),
39            monitors: std::collections::HashMap::new(),
40            monitored_by: std::collections::HashMap::new(),
41            terminated: false,
42            exit_reason: None,
43        }
44    }
45}
46
47/// A handle to a running process.
48///
49/// This handle can be cloned and shared between threads. It provides
50/// methods for sending messages and managing process relationships.
51#[derive(Clone)]
52pub struct ProcessHandle {
53    /// The process identifier.
54    pid: Pid,
55    /// Channel for sending messages to the process.
56    sender: MailboxSender,
57    /// Shared process state.
58    state: Arc<RwLock<ProcessState>>,
59    /// Channel to signal process termination (for joining).
60    #[allow(dead_code)]
61    termination_tx: Arc<Option<oneshot::Sender<ExitReason>>>,
62}
63
64impl ProcessHandle {
65    /// Creates a new process handle.
66    pub fn new(
67        pid: Pid,
68        sender: MailboxSender,
69        state: Arc<RwLock<ProcessState>>,
70        termination_tx: Option<oneshot::Sender<ExitReason>>,
71    ) -> Self {
72        Self {
73            pid,
74            sender,
75            state,
76            termination_tx: Arc::new(termination_tx),
77        }
78    }
79
80    /// Returns the process identifier.
81    pub fn pid(&self) -> Pid {
82        self.pid
83    }
84
85    /// Sends a raw message (bytes) to the process.
86    pub fn send_raw(&self, data: Vec<u8>) -> Result<(), SendError> {
87        if self.sender.is_closed() {
88            return Err(SendError::ProcessTerminated);
89        }
90        self.sender
91            .send(Envelope::new(data))
92            .map_err(|_| SendError::ProcessTerminated)
93    }
94
95    /// Sends a typed message to the process.
96    pub fn send<M: Term>(&self, msg: &M) -> Result<(), SendError> {
97        self.send_raw(msg.encode())
98    }
99
100    /// Returns `true` if the process is still alive.
101    pub fn is_alive(&self) -> bool {
102        let state = self.state.read().unwrap();
103        !state.terminated
104    }
105
106    /// Returns `true` if the process is trapping exits.
107    pub fn is_trapping_exits(&self) -> bool {
108        let state = self.state.read().unwrap();
109        state.trap_exit
110    }
111
112    /// Sets the trap_exit flag.
113    pub fn set_trap_exit(&self, trap: bool) {
114        let mut state = self.state.write().unwrap();
115        state.trap_exit = trap;
116    }
117
118    /// Adds a link to another process.
119    ///
120    /// Links are bidirectional - this only updates our side.
121    /// The caller must also update the other process.
122    pub fn add_link(&self, other: Pid) {
123        let mut state = self.state.write().unwrap();
124        state.links.insert(other);
125    }
126
127    /// Removes a link to another process.
128    pub fn remove_link(&self, other: Pid) {
129        let mut state = self.state.write().unwrap();
130        state.links.remove(&other);
131    }
132
133    /// Returns all linked processes.
134    pub fn links(&self) -> Vec<Pid> {
135        let state = self.state.read().unwrap();
136        state.links.iter().copied().collect()
137    }
138
139    /// Adds a monitor (we are monitoring `target`).
140    pub fn add_monitor(&self, reference: Ref, target: Pid) {
141        let mut state = self.state.write().unwrap();
142        state.monitors.insert(reference, target);
143    }
144
145    /// Removes a monitor.
146    pub fn remove_monitor(&self, reference: Ref) -> Option<Pid> {
147        let mut state = self.state.write().unwrap();
148        state.monitors.remove(&reference)
149    }
150
151    /// Adds a process that is monitoring us.
152    pub fn add_monitored_by(&self, reference: Ref, monitoring_pid: Pid) {
153        let mut state = self.state.write().unwrap();
154        state.monitored_by.insert(reference, monitoring_pid);
155    }
156
157    /// Removes a process from our monitored_by set.
158    pub fn remove_monitored_by(&self, reference: Ref) -> Option<Pid> {
159        let mut state = self.state.write().unwrap();
160        state.monitored_by.remove(&reference)
161    }
162
163    /// Returns all processes monitoring this one.
164    pub fn monitored_by(&self) -> Vec<(Ref, Pid)> {
165        let state = self.state.read().unwrap();
166        state.monitored_by.iter().map(|(r, p)| (*r, *p)).collect()
167    }
168
169    /// Marks the process as terminated.
170    pub fn mark_terminated(&self, reason: ExitReason) {
171        let mut state = self.state.write().unwrap();
172        state.terminated = true;
173        state.exit_reason = Some(reason);
174    }
175
176    /// Returns the exit reason if the process has terminated.
177    pub fn exit_reason(&self) -> Option<ExitReason> {
178        let state = self.state.read().unwrap();
179        state.exit_reason.clone()
180    }
181}
182
183impl std::fmt::Debug for ProcessHandle {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        f.debug_struct("ProcessHandle")
186            .field("pid", &self.pid)
187            .field("alive", &self.is_alive())
188            .finish()
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::mailbox::Mailbox;
196
197    fn create_test_handle() -> (ProcessHandle, crate::mailbox::Mailbox) {
198        let pid = Pid::new();
199        let (mailbox, sender) = Mailbox::new();
200        let state = Arc::new(RwLock::new(ProcessState::new(pid)));
201        let handle = ProcessHandle::new(pid, sender, state, None);
202        (handle, mailbox)
203    }
204
205    #[test]
206    fn test_process_handle_pid() {
207        let (handle, _mailbox) = create_test_handle();
208        let pid = handle.pid();
209        assert!(pid.is_local());
210    }
211
212    #[tokio::test]
213    async fn test_send_message() {
214        let (handle, mut mailbox) = create_test_handle();
215
216        handle.send_raw(vec![1, 2, 3]).unwrap();
217
218        let envelope = mailbox.recv().await.unwrap();
219        assert_eq!(envelope.data, vec![1, 2, 3]);
220    }
221
222    #[test]
223    fn test_trap_exit() {
224        let (handle, _mailbox) = create_test_handle();
225
226        assert!(!handle.is_trapping_exits());
227
228        handle.set_trap_exit(true);
229        assert!(handle.is_trapping_exits());
230
231        handle.set_trap_exit(false);
232        assert!(!handle.is_trapping_exits());
233    }
234
235    #[test]
236    fn test_links() {
237        let (handle, _mailbox) = create_test_handle();
238        let other_pid = Pid::new();
239
240        assert!(handle.links().is_empty());
241
242        handle.add_link(other_pid);
243        assert_eq!(handle.links(), vec![other_pid]);
244
245        handle.remove_link(other_pid);
246        assert!(handle.links().is_empty());
247    }
248
249    #[test]
250    fn test_monitors() {
251        let (handle, _mailbox) = create_test_handle();
252        let target_pid = Pid::new();
253        let reference = Ref::new();
254
255        handle.add_monitor(reference, target_pid);
256
257        let removed = handle.remove_monitor(reference);
258        assert_eq!(removed, Some(target_pid));
259
260        let removed_again = handle.remove_monitor(reference);
261        assert_eq!(removed_again, None);
262    }
263
264    #[test]
265    fn test_terminated() {
266        let (handle, _mailbox) = create_test_handle();
267
268        assert!(handle.is_alive());
269        assert!(handle.exit_reason().is_none());
270
271        handle.mark_terminated(ExitReason::Normal);
272
273        assert!(!handle.is_alive());
274        assert_eq!(handle.exit_reason(), Some(ExitReason::Normal));
275    }
276}