starlang_runtime/
process_handle.rs1use crate::SendError;
7use crate::mailbox::{Envelope, MailboxSender};
8use starlang_core::{ExitReason, Pid, Ref, Term};
9use std::collections::HashSet;
10use std::sync::{Arc, RwLock};
11use tokio::sync::oneshot;
12
13#[derive(Debug)]
15pub struct ProcessState {
16 pub pid: Pid,
18 pub trap_exit: bool,
20 pub links: HashSet<Pid>,
22 pub monitors: std::collections::HashMap<Ref, Pid>,
24 pub monitored_by: std::collections::HashMap<Ref, Pid>,
26 pub terminated: bool,
28 pub exit_reason: Option<ExitReason>,
30}
31
32impl ProcessState {
33 pub fn new(pid: Pid) -> Self {
35 Self {
36 pid,
37 trap_exit: false,
38 links: HashSet::new(),
39 monitors: std::collections::HashMap::new(),
40 monitored_by: std::collections::HashMap::new(),
41 terminated: false,
42 exit_reason: None,
43 }
44 }
45}
46
47#[derive(Clone)]
52pub struct ProcessHandle {
53 pid: Pid,
55 sender: MailboxSender,
57 state: Arc<RwLock<ProcessState>>,
59 #[allow(dead_code)]
61 termination_tx: Arc<Option<oneshot::Sender<ExitReason>>>,
62}
63
64impl ProcessHandle {
65 pub fn new(
67 pid: Pid,
68 sender: MailboxSender,
69 state: Arc<RwLock<ProcessState>>,
70 termination_tx: Option<oneshot::Sender<ExitReason>>,
71 ) -> Self {
72 Self {
73 pid,
74 sender,
75 state,
76 termination_tx: Arc::new(termination_tx),
77 }
78 }
79
80 pub fn pid(&self) -> Pid {
82 self.pid
83 }
84
85 pub fn send_raw(&self, data: Vec<u8>) -> Result<(), SendError> {
87 if self.sender.is_closed() {
88 return Err(SendError::ProcessTerminated);
89 }
90 self.sender
91 .send(Envelope::new(data))
92 .map_err(|_| SendError::ProcessTerminated)
93 }
94
95 pub fn send<M: Term>(&self, msg: &M) -> Result<(), SendError> {
97 self.send_raw(msg.encode())
98 }
99
100 pub fn is_alive(&self) -> bool {
102 let state = self.state.read().unwrap();
103 !state.terminated
104 }
105
106 pub fn is_trapping_exits(&self) -> bool {
108 let state = self.state.read().unwrap();
109 state.trap_exit
110 }
111
112 pub fn set_trap_exit(&self, trap: bool) {
114 let mut state = self.state.write().unwrap();
115 state.trap_exit = trap;
116 }
117
118 pub fn add_link(&self, other: Pid) {
123 let mut state = self.state.write().unwrap();
124 state.links.insert(other);
125 }
126
127 pub fn remove_link(&self, other: Pid) {
129 let mut state = self.state.write().unwrap();
130 state.links.remove(&other);
131 }
132
133 pub fn links(&self) -> Vec<Pid> {
135 let state = self.state.read().unwrap();
136 state.links.iter().copied().collect()
137 }
138
139 pub fn add_monitor(&self, reference: Ref, target: Pid) {
141 let mut state = self.state.write().unwrap();
142 state.monitors.insert(reference, target);
143 }
144
145 pub fn remove_monitor(&self, reference: Ref) -> Option<Pid> {
147 let mut state = self.state.write().unwrap();
148 state.monitors.remove(&reference)
149 }
150
151 pub fn add_monitored_by(&self, reference: Ref, monitoring_pid: Pid) {
153 let mut state = self.state.write().unwrap();
154 state.monitored_by.insert(reference, monitoring_pid);
155 }
156
157 pub fn remove_monitored_by(&self, reference: Ref) -> Option<Pid> {
159 let mut state = self.state.write().unwrap();
160 state.monitored_by.remove(&reference)
161 }
162
163 pub fn monitored_by(&self) -> Vec<(Ref, Pid)> {
165 let state = self.state.read().unwrap();
166 state.monitored_by.iter().map(|(r, p)| (*r, *p)).collect()
167 }
168
169 pub fn mark_terminated(&self, reason: ExitReason) {
171 let mut state = self.state.write().unwrap();
172 state.terminated = true;
173 state.exit_reason = Some(reason);
174 }
175
176 pub fn exit_reason(&self) -> Option<ExitReason> {
178 let state = self.state.read().unwrap();
179 state.exit_reason.clone()
180 }
181}
182
183impl std::fmt::Debug for ProcessHandle {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 f.debug_struct("ProcessHandle")
186 .field("pid", &self.pid)
187 .field("alive", &self.is_alive())
188 .finish()
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::mailbox::Mailbox;
196
197 fn create_test_handle() -> (ProcessHandle, crate::mailbox::Mailbox) {
198 let pid = Pid::new();
199 let (mailbox, sender) = Mailbox::new();
200 let state = Arc::new(RwLock::new(ProcessState::new(pid)));
201 let handle = ProcessHandle::new(pid, sender, state, None);
202 (handle, mailbox)
203 }
204
205 #[test]
206 fn test_process_handle_pid() {
207 let (handle, _mailbox) = create_test_handle();
208 let pid = handle.pid();
209 assert!(pid.is_local());
210 }
211
212 #[tokio::test]
213 async fn test_send_message() {
214 let (handle, mut mailbox) = create_test_handle();
215
216 handle.send_raw(vec![1, 2, 3]).unwrap();
217
218 let envelope = mailbox.recv().await.unwrap();
219 assert_eq!(envelope.data, vec![1, 2, 3]);
220 }
221
222 #[test]
223 fn test_trap_exit() {
224 let (handle, _mailbox) = create_test_handle();
225
226 assert!(!handle.is_trapping_exits());
227
228 handle.set_trap_exit(true);
229 assert!(handle.is_trapping_exits());
230
231 handle.set_trap_exit(false);
232 assert!(!handle.is_trapping_exits());
233 }
234
235 #[test]
236 fn test_links() {
237 let (handle, _mailbox) = create_test_handle();
238 let other_pid = Pid::new();
239
240 assert!(handle.links().is_empty());
241
242 handle.add_link(other_pid);
243 assert_eq!(handle.links(), vec![other_pid]);
244
245 handle.remove_link(other_pid);
246 assert!(handle.links().is_empty());
247 }
248
249 #[test]
250 fn test_monitors() {
251 let (handle, _mailbox) = create_test_handle();
252 let target_pid = Pid::new();
253 let reference = Ref::new();
254
255 handle.add_monitor(reference, target_pid);
256
257 let removed = handle.remove_monitor(reference);
258 assert_eq!(removed, Some(target_pid));
259
260 let removed_again = handle.remove_monitor(reference);
261 assert_eq!(removed_again, None);
262 }
263
264 #[test]
265 fn test_terminated() {
266 let (handle, _mailbox) = create_test_handle();
267
268 assert!(handle.is_alive());
269 assert!(handle.exit_reason().is_none());
270
271 handle.mark_terminated(ExitReason::Normal);
272
273 assert!(!handle.is_alive());
274 assert_eq!(handle.exit_reason(), Some(ExitReason::Normal));
275 }
276}