starlang_runtime/
context.rs1use crate::SendError;
7use crate::mailbox::Mailbox;
8use crate::process_handle::{ProcessHandle, ProcessState};
9use crate::registry::ProcessRegistry;
10use starlang_core::{ExitReason, Pid, Ref, SystemMessage, Term};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13
14pub struct Context {
38 pid: Pid,
40 mailbox: Mailbox,
42 state: Arc<RwLock<ProcessState>>,
44 registry: ProcessRegistry,
46}
47
48impl Context {
49 pub fn new(
51 pid: Pid,
52 mailbox: Mailbox,
53 state: Arc<RwLock<ProcessState>>,
54 registry: ProcessRegistry,
55 ) -> Self {
56 Self {
57 pid,
58 mailbox,
59 state,
60 registry,
61 }
62 }
63
64 pub fn pid(&self) -> Pid {
66 self.pid
67 }
68
69 pub async fn recv(&mut self) -> Option<Vec<u8>> {
73 self.mailbox.recv().await.map(|e| e.data)
74 }
75
76 pub async fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<Vec<u8>>, ()> {
82 self.mailbox
83 .recv_timeout(timeout)
84 .await
85 .map(|opt| opt.map(|e| e.data))
86 }
87
88 pub fn try_recv(&mut self) -> Option<Vec<u8>> {
90 self.mailbox.try_recv().ok().map(|e| e.data)
91 }
92
93 pub fn send_raw(&self, pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
95 self.registry.send_raw(pid, data)
96 }
97
98 pub fn send<M: Term>(&self, pid: Pid, msg: &M) -> Result<(), SendError> {
100 self.registry.send(pid, msg)
101 }
102
103 pub fn set_trap_exit(&self, trap: bool) -> bool {
110 let mut state = self.state.write().unwrap();
111 let prev = state.trap_exit;
112 state.trap_exit = trap;
113 prev
114 }
115
116 pub fn is_trapping_exits(&self) -> bool {
118 let state = self.state.read().unwrap();
119 state.trap_exit
120 }
121
122 pub fn link(&self, other: Pid) -> Result<(), SendError> {
127 {
129 let mut state = self.state.write().unwrap();
130 state.links.insert(other);
131 }
132
133 if let Some(other_handle) = self.registry.get(other) {
135 other_handle.add_link(self.pid);
136 Ok(())
137 } else {
138 let mut state = self.state.write().unwrap();
140 state.links.remove(&other);
141 Err(SendError::ProcessNotFound(other))
142 }
143 }
144
145 pub fn unlink(&self, other: Pid) {
147 {
149 let mut state = self.state.write().unwrap();
150 state.links.remove(&other);
151 }
152
153 if let Some(other_handle) = self.registry.get(other) {
155 other_handle.remove_link(self.pid);
156 }
157 }
158
159 pub fn monitor(&self, target: Pid) -> Result<Ref, SendError> {
164 let reference = Ref::new();
165
166 {
168 let mut state = self.state.write().unwrap();
169 state.monitors.insert(reference, target);
170 }
171
172 if let Some(target_handle) = self.registry.get(target) {
174 target_handle.add_monitored_by(reference, self.pid);
175 Ok(reference)
176 } else {
177 let mut state = self.state.write().unwrap();
179 state.monitors.remove(&reference);
180
181 let down = SystemMessage::down(reference, target, ExitReason::error("noproc"));
183 let _ = self.registry.send(self.pid, &down);
184
185 Ok(reference)
186 }
187 }
188
189 pub fn demonitor(&self, reference: Ref) {
194 let target = {
196 let mut state = self.state.write().unwrap();
197 state.monitors.remove(&reference)
198 };
199
200 if let Some(target_pid) = target
202 && let Some(target_handle) = self.registry.get(target_pid)
203 {
204 target_handle.remove_monitored_by(reference);
205 }
206 }
207
208 pub fn exit(&self, target: Pid, reason: ExitReason) -> Result<(), SendError> {
214 if let Some(handle) = self.registry.get(target) {
215 if reason.is_killed() {
216 handle.mark_terminated(reason);
218 } else if handle.is_trapping_exits() {
219 let exit_msg = SystemMessage::exit(self.pid, reason);
221 handle.send(&exit_msg)?;
222 } else if reason.is_abnormal() {
223 handle.mark_terminated(reason);
225 }
226 Ok(())
228 } else {
229 Err(SendError::ProcessNotFound(target))
230 }
231 }
232
233 pub fn whereis(&self, name: &str) -> Option<Pid> {
235 self.registry.whereis(name)
236 }
237
238 pub fn register(&self, name: String) -> bool {
242 self.registry.register_name(name, self.pid)
243 }
244
245 pub fn unregister(&self, name: &str) -> Option<Pid> {
247 self.registry.unregister_name(name)
248 }
249
250 pub fn is_alive(&self, pid: Pid) -> bool {
252 self.registry
253 .get(pid)
254 .map(|h| h.is_alive())
255 .unwrap_or(false)
256 }
257
258 pub(crate) fn handle(&self) -> ProcessHandle {
260 self.registry.get(self.pid).unwrap()
261 }
262}
263
264impl std::fmt::Debug for Context {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 f.debug_struct("Context").field("pid", &self.pid).finish()
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::mailbox::MailboxSender;
274
275 fn create_test_context(registry: &ProcessRegistry) -> (Context, MailboxSender) {
276 let pid = Pid::new();
277 let (mailbox, sender) = Mailbox::new();
278 let state = Arc::new(RwLock::new(ProcessState::new(pid)));
279
280 let handle = ProcessHandle::new(pid, sender.clone(), state.clone(), None);
282 registry.register(handle);
283
284 let ctx = Context::new(pid, mailbox, state, registry.clone());
285 (ctx, sender)
286 }
287
288 #[test]
289 fn test_context_pid() {
290 let registry = ProcessRegistry::new();
291 let (ctx, _sender) = create_test_context(®istry);
292 assert!(ctx.pid().is_local());
293 }
294
295 #[test]
296 fn test_trap_exit() {
297 let registry = ProcessRegistry::new();
298 let (ctx, _sender) = create_test_context(®istry);
299
300 assert!(!ctx.is_trapping_exits());
301
302 let prev = ctx.set_trap_exit(true);
303 assert!(!prev);
304 assert!(ctx.is_trapping_exits());
305 }
306
307 #[tokio::test]
308 async fn test_recv() {
309 let registry = ProcessRegistry::new();
310 let (mut ctx, sender) = create_test_context(®istry);
311
312 sender
314 .send(crate::mailbox::Envelope::new(vec![1, 2, 3]))
315 .unwrap();
316
317 let msg = ctx.recv().await.unwrap();
318 assert_eq!(msg, vec![1, 2, 3]);
319 }
320
321 #[test]
322 fn test_link() {
323 let registry = ProcessRegistry::new();
324 let (ctx1, _sender1) = create_test_context(®istry);
325 let (ctx2, _sender2) = create_test_context(®istry);
326
327 ctx1.link(ctx2.pid()).unwrap();
328
329 let state1 = ctx1.state.read().unwrap();
331 assert!(state1.links.contains(&ctx2.pid()));
332
333 let state2 = ctx2.state.read().unwrap();
334 assert!(state2.links.contains(&ctx1.pid()));
335 }
336
337 #[test]
338 fn test_unlink() {
339 let registry = ProcessRegistry::new();
340 let (ctx1, _sender1) = create_test_context(®istry);
341 let (ctx2, _sender2) = create_test_context(®istry);
342
343 ctx1.link(ctx2.pid()).unwrap();
344 ctx1.unlink(ctx2.pid());
345
346 let state1 = ctx1.state.read().unwrap();
347 assert!(!state1.links.contains(&ctx2.pid()));
348
349 let state2 = ctx2.state.read().unwrap();
350 assert!(!state2.links.contains(&ctx1.pid()));
351 }
352
353 #[test]
354 fn test_monitor() {
355 let registry = ProcessRegistry::new();
356 let (ctx1, _sender1) = create_test_context(®istry);
357 let (ctx2, _sender2) = create_test_context(®istry);
358
359 let reference = ctx1.monitor(ctx2.pid()).unwrap();
360
361 let state1 = ctx1.state.read().unwrap();
363 assert_eq!(state1.monitors.get(&reference), Some(&ctx2.pid()));
364
365 let state2 = ctx2.state.read().unwrap();
367 assert_eq!(state2.monitored_by.get(&reference), Some(&ctx1.pid()));
368 }
369
370 #[test]
371 fn test_demonitor() {
372 let registry = ProcessRegistry::new();
373 let (ctx1, _sender1) = create_test_context(®istry);
374 let (ctx2, _sender2) = create_test_context(®istry);
375
376 let reference = ctx1.monitor(ctx2.pid()).unwrap();
377 ctx1.demonitor(reference);
378
379 let state1 = ctx1.state.read().unwrap();
380 assert!(!state1.monitors.contains_key(&reference));
381
382 let state2 = ctx2.state.read().unwrap();
383 assert!(!state2.monitored_by.contains_key(&reference));
384 }
385
386 #[test]
387 fn test_register_name() {
388 let registry = ProcessRegistry::new();
389 let (ctx, _sender) = create_test_context(®istry);
390
391 assert!(ctx.register("test_proc".to_string()));
392 assert_eq!(ctx.whereis("test_proc"), Some(ctx.pid()));
393
394 assert!(!ctx.register("test_proc".to_string()));
396 }
397}