1use std::collections::{BTreeMap, HashSet, VecDeque};
2use std::fmt;
3use std::future::Future;
4use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use dashmap::mapref::entry::Entry;
9use dashmap::DashMap;
10use futures_util::future::{AbortHandle, Abortable};
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
12use tokio::sync::Notify;
13use tokio::task::JoinHandle;
14
15use crate::exit::{ExitReason, MonitorRef};
16use crate::message::{Message, Received};
17use crate::pid::Pid;
18use crate::stream::StreamHandle;
19
20pub struct Context {
23 pid: Pid,
24 mailbox: UnboundedReceiver<Received>,
25 saved: VecDeque<Received>,
30 depth: Option<Arc<AtomicUsize>>,
34}
35
36impl Context {
37 pub fn pid(&self) -> Pid {
38 self.pid
39 }
40
41 fn note_consumed(&self) {
43 if let Some(depth) = &self.depth {
44 depth.fetch_sub(1, Ordering::Relaxed);
45 }
46 }
47
48 pub async fn recv(&mut self) -> Received {
55 let item = match self.saved.pop_front() {
56 Some(item) => item,
57 None => self.next_from_mailbox().await,
58 };
59 self.note_consumed();
60 item
61 }
62
63 pub async fn recv_match<F>(&mut self, mut matches: F) -> Received
68 where
69 F: FnMut(&Received) -> bool,
70 {
71 if let Some(pos) = self.saved.iter().position(&mut matches) {
72 let item = self.saved.remove(pos).expect("position is in bounds");
73 self.note_consumed();
74 return item;
75 }
76 loop {
77 let item = self.next_from_mailbox().await;
78 if matches(&item) {
79 self.note_consumed();
80 return item;
81 }
82 self.saved.push_back(item);
83 }
84 }
85
86 async fn next_from_mailbox(&mut self) -> Received {
87 self.mailbox
92 .recv()
93 .await
94 .expect("a live process always holds its own mailbox sender")
95 }
96}
97
98impl fmt::Debug for Context {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("Context").field("pid", &self.pid).finish()
102 }
103}
104
105pub struct ProcessHandle {
108 pid: Pid,
109 abort: AbortHandle,
110 join: JoinHandle<()>,
111}
112
113impl ProcessHandle {
114 pub fn pid(&self) -> Pid {
115 self.pid
116 }
117
118 pub fn kill(&self) {
122 self.abort.abort();
123 }
124
125 pub async fn join(self) {
127 let _ = self.join.await;
128 }
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct ProcessInfo {
137 pub pid: Pid,
138 pub links: usize,
140 pub monitors: usize,
142 pub names: Vec<String>,
144 pub label: Option<String>,
146 pub mailbox_depth: usize,
148 pub trap_exit: bool,
150}
151
152struct Monitor {
155 watcher: Pid,
156 reference: MonitorRef,
157}
158
159struct ProcessEntry {
168 abort: AbortHandle,
169 mailbox: UnboundedSender<Received>,
170 trap_exit: bool,
173 links: Vec<Pid>,
175 monitors: Vec<Monitor>,
177 names: Vec<String>,
179 tags: Vec<String>,
183 exit_reason: Option<ExitReason>,
186 label: Option<String>,
190 depth: Option<Arc<AtomicUsize>>,
194}
195
196impl ProcessEntry {
197 fn note_enqueued(&self) {
199 if let Some(depth) = &self.depth {
200 depth.fetch_add(1, Ordering::Relaxed);
201 }
202 }
203
204 fn depth_value(&self) -> usize {
207 self.depth.as_ref().map_or(0, |d| d.load(Ordering::Relaxed))
208 }
209}
210
211#[derive(Default)]
212struct Inner {
213 track_depth: bool,
217 mailbox_capacity: Option<usize>,
222 dropped: AtomicU64,
224 log_level: std::sync::atomic::AtomicU8,
229 next_id: AtomicU64,
230 next_ref: AtomicU64,
231 spawned: AtomicU64,
232 finished: AtomicU64,
233 table: DashMap<u64, ProcessEntry>,
236 registry: DashMap<String, u64>,
239 tags: DashMap<String, HashSet<u64>>,
244 census_gen: AtomicU64,
250 census_dirty: Notify,
253 census_started: AtomicBool,
255}
256
257const CENSUS_DEBOUNCE: Duration = Duration::from_secs(2);
260
261impl Inner {
262 fn wants(&self, level: crate::LogLevel) -> bool {
265 self.log_level.load(Ordering::Relaxed) >= level as u8
266 }
267
268 fn note_census(&self) {
273 self.census_gen.fetch_add(1, Ordering::Relaxed);
274 self.census_dirty.notify_one();
275 }
276
277 fn enqueue(&self, to: Pid, item: Received) -> bool {
283 match self.table.get(&to.0) {
284 Some(entry) => {
285 if let Some(cap) = self.mailbox_capacity {
290 if matches!(item, Received::Message(_)) && entry.depth_value() >= cap {
291 self.dropped.fetch_add(1, Ordering::Relaxed);
292 return false;
293 }
294 }
295 if entry.mailbox.send(item).is_ok() {
296 entry.note_enqueued();
297 true
298 } else {
299 false
300 }
301 }
302 None => false,
303 }
304 }
305
306 fn deliver(&self, to: Pid, item: Received) {
308 self.enqueue(to, item);
309 }
310
311 fn deregister(&self, pid: Pid, reason: ExitReason) {
316 let Some((_, entry)) = self.table.remove(&pid.0) else {
317 return;
318 };
319 self.finished.fetch_add(1, Ordering::Relaxed);
320 let reason = entry.exit_reason.unwrap_or(reason);
321
322 if let Some(label) = &entry.label {
326 if self.wants(crate::LogLevel::for_exit(reason)) {
327 crate::lifecycle::log_exit(pid, label, reason);
328 }
329 if self.wants(crate::LogLevel::Info) {
331 self.note_census();
332 }
333 }
334
335 for name in &entry.names {
336 self.registry.remove(name);
337 }
338 for tag in &entry.tags {
341 if let Some(mut members) = self.tags.get_mut(tag) {
342 members.remove(&pid.0);
343 }
344 self.tags.remove_if(tag, |_, members| members.is_empty());
345 }
346 for monitor in entry.monitors {
347 self.deliver(
348 monitor.watcher,
349 Received::Down {
350 reference: monitor.reference,
351 pid,
352 reason,
353 },
354 );
355 }
356 for peer in entry.links {
357 self.propagate_exit(peer, pid, reason);
358 }
359 }
360
361 fn propagate_exit(&self, peer: Pid, from: Pid, reason: ExitReason) {
365 let Some(mut entry) = self.table.get_mut(&peer.0) else {
366 return;
367 };
368 entry.links.retain(|&linked| linked != from);
369 if entry.trap_exit {
370 if entry.mailbox.send(Received::Exit { from, reason }).is_ok() {
371 entry.note_enqueued();
372 }
373 } else if reason.is_abnormal() {
374 entry.exit_reason = Some(reason);
375 entry.abort.abort();
376 }
377 }
378}
379
380#[derive(Clone, Default)]
383pub struct Runtime {
384 inner: Arc<Inner>,
385}
386
387impl Runtime {
388 pub fn new() -> Self {
389 Self::default()
390 }
391
392 pub fn with_mailbox_depth() -> Self {
397 Self {
398 inner: Arc::new(Inner {
399 track_depth: true,
400 ..Default::default()
401 }),
402 }
403 }
404
405 pub fn with_mailbox_capacity(capacity: usize) -> Self {
417 Self {
418 inner: Arc::new(Inner {
419 track_depth: true,
420 mailbox_capacity: Some(capacity),
421 ..Default::default()
422 }),
423 }
424 }
425
426 pub fn spawn<F, Fut>(&self, body: F) -> ProcessHandle
430 where
431 F: FnOnce(Context) -> Fut,
432 Fut: Future<Output = ()> + Send + 'static,
433 {
434 self.spawn_entry(Vec::new(), body).0
435 }
436
437 pub fn spawn_link<F, Fut>(&self, parent: Pid, body: F) -> ProcessHandle
441 where
442 F: FnOnce(Context) -> Fut,
443 Fut: Future<Output = ()> + Send + 'static,
444 {
445 let (handle, child) = self.spawn_entry(vec![parent], body);
446 if let Some(mut entry) = self.inner.table.get_mut(&parent.0) {
447 entry.links.push(child);
448 }
449 handle
450 }
451
452 fn spawn_entry<F, Fut>(&self, links: Vec<Pid>, body: F) -> (ProcessHandle, Pid)
453 where
454 F: FnOnce(Context) -> Fut,
455 Fut: Future<Output = ()> + Send + 'static,
456 {
457 let pid = Pid(self.inner.next_id.fetch_add(1, Ordering::Relaxed));
458 let (mailbox, mailbox_rx) = unbounded_channel();
459 let (abort, abort_registration) = AbortHandle::new_pair();
460 let depth = self
463 .inner
464 .track_depth
465 .then(|| Arc::new(AtomicUsize::new(0)));
466
467 self.inner.table.insert(
471 pid.0,
472 ProcessEntry {
473 abort: abort.clone(),
474 mailbox,
475 trap_exit: false,
476 links,
477 monitors: Vec::new(),
478 names: Vec::new(),
479 tags: Vec::new(),
480 exit_reason: None,
481 label: None,
482 depth: depth.clone(),
483 },
484 );
485 self.inner.spawned.fetch_add(1, Ordering::Relaxed);
486
487 let body = body(Context {
488 pid,
489 mailbox: mailbox_rx,
490 saved: VecDeque::new(),
491 depth,
492 });
493 let guard = ProcessGuard {
497 pid,
498 inner: Arc::clone(&self.inner),
499 reason: ExitReason::Killed,
500 };
501 let join = tokio::spawn(run(guard, Abortable::new(body, abort_registration)));
502 (ProcessHandle { pid, abort, join }, pid)
503 }
504
505 pub fn send(&self, pid: Pid, message: Message) -> bool {
508 self.inner.enqueue(pid, Received::Message(message))
509 }
510
511 pub fn send_stream(&self, pid: Pid, stream: StreamHandle) -> bool {
517 self.inner.enqueue(pid, Received::Stream(stream))
518 }
519
520 pub fn process_count(&self) -> usize {
522 self.inner.table.len()
523 }
524
525 pub fn spawned(&self) -> u64 {
527 self.inner.spawned.load(Ordering::Relaxed)
528 }
529
530 pub fn finished(&self) -> u64 {
532 self.inner.finished.load(Ordering::Relaxed)
533 }
534
535 pub fn dropped_messages(&self) -> u64 {
539 self.inner.dropped.load(Ordering::Relaxed)
540 }
541
542 pub fn is_alive(&self, pid: Pid) -> bool {
543 self.inner.table.contains_key(&pid.0)
544 }
545
546 pub fn list(&self) -> Vec<Pid> {
550 self.inner
551 .table
552 .iter()
553 .map(|entry| Pid(*entry.key()))
554 .collect()
555 }
556
557 pub fn info(&self, pid: Pid) -> Option<ProcessInfo> {
560 self.inner.table.get(&pid.0).map(|entry| ProcessInfo {
561 pid,
562 links: entry.links.len(),
563 monitors: entry.monitors.len(),
564 names: entry.names.clone(),
565 label: entry.label.clone(),
566 mailbox_depth: entry
567 .depth
568 .as_ref()
569 .map_or(0, |d| d.load(Ordering::Relaxed)),
570 trap_exit: entry.trap_exit,
571 })
572 }
573
574 pub fn set_label(&self, pid: Pid, label: impl Into<String>) -> bool {
579 match self.inner.table.get_mut(&pid.0) {
580 Some(mut entry) => {
581 entry.label = Some(label.into());
582 if self.inner.wants(crate::LogLevel::Info) {
584 self.inner.note_census();
585 }
586 true
587 }
588 None => false,
589 }
590 }
591
592 pub fn set_log_level(&self, level: crate::LogLevel) {
596 self.inner.log_level.store(level as u8, Ordering::Relaxed);
597 if level >= crate::LogLevel::Info
602 && tokio::runtime::Handle::try_current().is_ok()
603 && !self.inner.census_started.swap(true, Ordering::AcqRel)
604 {
605 self.spawn_census_loop();
606 }
607 }
608
609 pub(crate) fn census_counts(&self) -> BTreeMap<String, u64> {
614 let mut counts = BTreeMap::new();
615 for entry in self.inner.table.iter() {
616 if let Some(label) = &entry.label {
617 *counts.entry(label.clone()).or_insert(0) += 1;
618 }
619 }
620 counts
621 }
622
623 pub(crate) fn tag_counts(&self) -> BTreeMap<String, u64> {
627 self.inner
628 .tags
629 .iter()
630 .filter(|members| !members.is_empty())
631 .map(|members| (members.key().clone(), members.len() as u64))
632 .collect()
633 }
634
635 fn census_step(&self, printed: &mut u64) -> bool {
642 let gen = self.inner.census_gen.load(Ordering::Relaxed);
643 if gen == *printed {
644 return false; }
646 crate::lifecycle::log_census(&self.census_counts(), &self.tag_counts());
647 *printed = gen;
648 true
649 }
650
651 fn spawn_census_loop(&self) {
657 let runtime = self.clone();
658 tokio::spawn(async move {
659 let mut printed = 0u64;
660 loop {
661 runtime.inner.census_dirty.notified().await;
662 tokio::time::sleep(CENSUS_DEBOUNCE).await;
663 runtime.census_step(&mut printed);
664 }
665 });
666 }
667
668 pub fn wants_log(&self, event: crate::LogLevel) -> bool {
671 self.inner.wants(event)
672 }
673
674 pub fn log_spawn(&self, pid: Pid, label: &str, detail: &str) {
683 crate::lifecycle::log_spawn(pid, label, detail);
684 }
685
686 pub fn kill(&self, pid: Pid) -> bool {
690 if !self.terminate(pid) {
691 return false;
692 }
693 if self.inner.wants(crate::LogLevel::Info) {
694 crate::lifecycle::log_kill(pid);
695 }
696 true
697 }
698
699 fn terminate(&self, pid: Pid) -> bool {
703 match self.inner.table.get(&pid.0) {
704 Some(entry) => {
705 entry.abort.abort();
706 true
707 }
708 None => false,
709 }
710 }
711
712 pub fn exit(&self, pid: Pid, reason: ExitReason) -> bool {
716 match self.inner.table.get_mut(&pid.0) {
717 Some(mut entry) => {
718 entry.exit_reason = Some(reason);
719 entry.abort.abort();
720 true
721 }
722 None => false,
723 }
724 }
725
726 pub fn set_trap_exit(&self, pid: Pid, trap: bool) {
730 if let Some(mut entry) = self.inner.table.get_mut(&pid.0) {
731 entry.trap_exit = trap;
732 }
733 }
734
735 pub fn link(&self, a: Pid, b: Pid) {
739 if a == b {
740 return;
741 }
742 if self.add_link(a, b) {
745 if self.add_link(b, a) {
746 return;
747 }
748 self.remove_link(a, b);
749 }
750 }
751
752 pub fn unlink(&self, a: Pid, b: Pid) {
754 self.remove_link(a, b);
755 self.remove_link(b, a);
756 }
757
758 fn add_link(&self, owner: Pid, peer: Pid) -> bool {
759 match self.inner.table.get_mut(&owner.0) {
760 Some(mut entry) => {
761 if !entry.links.contains(&peer) {
762 entry.links.push(peer);
763 }
764 true
765 }
766 None => false,
767 }
768 }
769
770 fn remove_link(&self, owner: Pid, peer: Pid) {
771 if let Some(mut entry) = self.inner.table.get_mut(&owner.0) {
772 entry.links.retain(|&linked| linked != peer);
773 }
774 }
775
776 pub fn monitor(&self, watcher: Pid, target: Pid) -> MonitorRef {
782 let reference = MonitorRef(self.inner.next_ref.fetch_add(1, Ordering::Relaxed));
783 match self.inner.table.get_mut(&target.0) {
784 Some(mut entry) => entry.monitors.push(Monitor { watcher, reference }),
785 None => self.inner.deliver(
786 watcher,
787 Received::Down {
788 reference,
789 pid: target,
790 reason: ExitReason::NoProc,
791 },
792 ),
793 }
794 reference
795 }
796
797 pub fn register(&self, name: impl Into<String>, pid: Pid) -> bool {
802 let name = name.into();
803 let Some(mut entry) = self.inner.table.get_mut(&pid.0) else {
806 return false;
807 };
808 match self.inner.registry.entry(name.clone()) {
809 Entry::Occupied(_) => false,
810 Entry::Vacant(slot) => {
811 slot.insert(pid.0);
812 entry.names.push(name);
813 true
814 }
815 }
816 }
817
818 pub fn whereis(&self, name: &str) -> Option<Pid> {
820 self.inner.registry.get(name).map(|pid| Pid(*pid))
821 }
822
823 pub fn whereis_or_spawn<F, Fut>(&self, name: impl Into<String>, body: F) -> Pid
831 where
832 F: FnOnce(Context) -> Fut,
833 Fut: Future<Output = ()> + Send + 'static,
834 {
835 let name = name.into();
836 if let Some(pid) = self.whereis(&name) {
837 return pid;
838 }
839 let handle = self.spawn(body);
841 if self.register(name.clone(), handle.pid()) {
842 return handle.pid(); }
844 handle.kill();
847 self.whereis(&name).unwrap_or(handle.pid())
848 }
849
850 pub fn unregister(&self, name: &str) -> bool {
852 match self.inner.registry.remove(name) {
853 Some((_, pid)) => {
854 if let Some(mut entry) = self.inner.table.get_mut(&pid) {
855 entry.names.retain(|held| held != name);
856 }
857 true
858 }
859 None => false,
860 }
861 }
862
863 pub fn register_tag(&self, tag: impl Into<String>, pid: Pid) -> bool {
872 let tag = tag.into();
873 let Some(mut entry) = self.inner.table.get_mut(&pid.0) else {
876 return false;
877 };
878 if self
879 .inner
880 .tags
881 .entry(tag.clone())
882 .or_default()
883 .insert(pid.0)
884 {
885 entry.tags.push(tag);
886 drop(entry); if self.inner.wants(crate::LogLevel::Info) {
889 self.inner.note_census();
890 }
891 }
892 true
893 }
894
895 pub fn whereis_tag(&self, tag: &str) -> Vec<Pid> {
898 self.inner.tags.get(tag).map_or_else(Vec::new, |members| {
899 members.iter().map(|&id| Pid(id)).collect()
900 })
901 }
902
903 pub fn unregister_tag(&self, tag: &str, pid: Pid) -> bool {
905 let removed = self
908 .inner
909 .tags
910 .get_mut(tag)
911 .is_some_and(|mut members| members.remove(&pid.0));
912 if removed {
913 if let Some(mut entry) = self.inner.table.get_mut(&pid.0) {
914 entry.tags.retain(|held| held != tag);
915 }
916 self.inner
917 .tags
918 .remove_if(tag, |_, members| members.is_empty());
919 if self.inner.wants(crate::LogLevel::Info) {
921 self.inner.note_census();
922 }
923 }
924 removed
925 }
926
927 pub fn kill_tag(&self, tag: &str) -> usize {
932 let killed = self
935 .whereis_tag(tag)
936 .into_iter()
937 .filter(|&pid| self.terminate(pid))
938 .count();
939 if self.inner.wants(crate::LogLevel::Info) {
940 crate::lifecycle::log_kill_tag(tag, killed);
941 }
942 killed
943 }
944
945 pub fn send_named(&self, name: &str, message: Message) -> bool {
948 match self.whereis(name) {
949 Some(pid) => self.send(pid, message),
950 None => false,
951 }
952 }
953
954 pub fn send_after(&self, pid: Pid, delay: Duration, message: Message) -> TimerRef {
958 let runtime = self.clone();
959 let task = tokio::spawn(async move {
960 tokio::time::sleep(delay).await;
961 runtime.send(pid, message);
962 });
963 TimerRef {
964 abort: task.abort_handle(),
965 }
966 }
967
968 pub fn shutdown(&self) -> usize {
973 let mut stopped = 0;
976 for entry in self.inner.table.iter() {
977 entry.abort.abort();
978 stopped += 1;
979 }
980 stopped
981 }
982}
983
984pub struct TimerRef {
986 abort: tokio::task::AbortHandle,
987}
988
989impl TimerRef {
990 pub fn cancel(&self) {
992 self.abort.abort();
993 }
994}
995
996struct ProcessGuard {
1000 pid: Pid,
1001 inner: Arc<Inner>,
1002 reason: ExitReason,
1003}
1004
1005impl Drop for ProcessGuard {
1006 fn drop(&mut self) {
1007 let reason = if std::thread::panicking() {
1010 ExitReason::Crashed
1011 } else {
1012 self.reason
1013 };
1014 self.inner.deregister(self.pid, reason);
1015 }
1016}
1017
1018async fn run<Fut>(mut guard: ProcessGuard, body: Abortable<Fut>)
1019where
1020 Fut: Future<Output = ()> + Send + 'static,
1021{
1022 guard.reason = match body.await {
1027 Ok(()) => ExitReason::Normal,
1028 Err(_aborted) => ExitReason::Killed,
1029 };
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use super::*;
1035
1036 #[test]
1037 fn wants_log_respects_the_configured_threshold() {
1038 let rt = Runtime::new();
1039 assert!(!rt.wants_log(crate::LogLevel::Error));
1041 rt.set_log_level(crate::LogLevel::Warn);
1043 assert!(rt.wants_log(crate::LogLevel::Error));
1044 assert!(rt.wants_log(crate::LogLevel::Warn));
1045 assert!(!rt.wants_log(crate::LogLevel::Info));
1046 assert!(!rt.wants_log(crate::LogLevel::Debug));
1047 rt.set_log_level(crate::LogLevel::Debug);
1049 assert!(rt.wants_log(crate::LogLevel::Debug));
1050 }
1051
1052 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1053 async fn census_counts_live_processes_by_label() {
1054 let rt = Runtime::new();
1055 let mut procs: Vec<_> = (0..4)
1057 .map(|_| {
1058 rt.spawn(|mut ctx| async move {
1059 loop {
1060 ctx.recv().await;
1061 }
1062 })
1063 })
1064 .collect();
1065 assert!(rt.set_label(procs[0].pid(), "alpha"));
1066 assert!(rt.set_label(procs[1].pid(), "alpha"));
1067 assert!(rt.set_label(procs[2].pid(), "beta"));
1068
1069 let counts = rt.census_counts();
1070 assert_eq!(counts.get("alpha"), Some(&2));
1071 assert_eq!(counts.get("beta"), Some(&1));
1072 assert_eq!(counts.len(), 2, "the unlabeled process is excluded");
1073
1074 let victim = procs.remove(1); victim.kill();
1077 victim.join().await;
1078 assert_eq!(
1079 rt.census_counts().get("alpha"),
1080 Some(&1),
1081 "a drained process drops out of the census"
1082 );
1083 }
1084
1085 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1086 async fn census_step_emits_on_activity_not_on_count_equality() {
1087 let rt = Runtime::new();
1088 rt.set_log_level(crate::LogLevel::Info);
1090 fn park(mut ctx: Context) -> impl std::future::Future<Output = ()> {
1091 async move {
1092 loop {
1093 ctx.recv().await;
1094 }
1095 }
1096 }
1097 let mut printed = 0u64;
1098
1099 let a = rt.spawn(park);
1101 rt.set_label(a.pid(), "alpha");
1102 assert!(rt.census_step(&mut printed), "a labeled spawn emits");
1103 assert!(
1104 !rt.census_step(&mut printed),
1105 "nothing happened since → no duplicate line"
1106 );
1107
1108 let b = rt.spawn(park);
1111 rt.set_label(b.pid(), "beta");
1112 b.kill();
1113 b.join().await;
1114 assert_eq!(
1115 rt.census_counts().get("alpha"),
1116 Some(&1),
1117 "counts netted back to the pre-spawn picture"
1118 );
1119 assert!(
1120 rt.census_step(&mut printed),
1121 "a net-zero spawn+exit is real activity → emits"
1122 );
1123 assert!(!rt.census_step(&mut printed), "and then stays quiet");
1124 }
1125
1126 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1127 async fn census_tag_counts_reflect_group_membership() {
1128 let rt = Runtime::new();
1129 let procs: Vec<_> = (0..3)
1130 .map(|_| {
1131 rt.spawn(|mut ctx| async move {
1132 loop {
1133 ctx.recv().await;
1134 }
1135 })
1136 })
1137 .collect();
1138 for p in &procs {
1140 assert!(rt.register_tag("plan:abc123", p.pid()));
1141 }
1142 assert!(rt.register_tag("plan:def456", procs[0].pid()));
1143
1144 let tags = rt.tag_counts();
1145 assert_eq!(tags.get("plan:abc123"), Some(&3));
1146 assert_eq!(tags.get("plan:def456"), Some(&1));
1147
1148 assert_eq!(rt.kill_tag("plan:abc123"), 3);
1151 for p in procs {
1152 p.join().await;
1153 }
1154 let tags = rt.tag_counts();
1155 assert_eq!(tags.get("plan:abc123"), None, "an emptied group drops out");
1156 assert_eq!(
1157 tags.get("plan:def456"),
1158 None,
1159 "its lone member died too, so it's gone as well"
1160 );
1161 }
1162
1163 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1164 async fn joining_or_leaving_a_group_is_census_activity() {
1165 let rt = Runtime::new();
1166 rt.set_log_level(crate::LogLevel::Info); let p = rt.spawn(|mut ctx| async move {
1168 loop {
1169 ctx.recv().await;
1170 }
1171 });
1172 let mut printed = 0u64;
1173 assert!(rt.register_tag("plan:x", p.pid()));
1174 assert!(
1175 rt.census_step(&mut printed),
1176 "joining a group is census activity → emits"
1177 );
1178 assert!(rt.unregister_tag("plan:x", p.pid()));
1179 assert!(
1180 rt.census_step(&mut printed),
1181 "leaving a group is census activity → emits"
1182 );
1183 assert!(!rt.census_step(&mut printed), "and then stays quiet");
1184 }
1185
1186 #[tokio::test]
1187 async fn a_process_receives_a_message_sent_to_its_pid() {
1188 let rt = Runtime::new();
1189 let (tx, rx) = tokio::sync::oneshot::channel();
1190 let handle = rt.spawn(|mut ctx| async move {
1191 let msg = ctx.recv().await.message().unwrap();
1192 let _ = tx.send(msg);
1193 });
1194 assert!(rt.send(handle.pid(), b"hello".to_vec()));
1195 assert_eq!(rx.await.unwrap(), b"hello".to_vec());
1196 handle.join().await;
1197 }
1198
1199 #[tokio::test]
1200 async fn messages_arrive_in_fifo_order() {
1201 let rt = Runtime::new();
1202 let (tx, rx) = tokio::sync::oneshot::channel();
1203 let handle = rt.spawn(|mut ctx| async move {
1204 let mut got = Vec::new();
1205 for _ in 0..3 {
1206 got.push(ctx.recv().await.message().unwrap());
1207 }
1208 let _ = tx.send(got);
1209 });
1210 for byte in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
1211 assert!(rt.send(handle.pid(), byte));
1212 }
1213 assert_eq!(
1214 rx.await.unwrap(),
1215 vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]
1216 );
1217 handle.join().await;
1218 }
1219
1220 #[tokio::test]
1221 async fn recv_match_takes_a_match_and_leaves_the_rest_in_order() {
1222 let rt = Runtime::new();
1223 let (tx, rx) = tokio::sync::oneshot::channel();
1224 let handle = rt.spawn(|mut ctx| async move {
1225 let matched = ctx
1227 .recv_match(|m| matches!(m, Received::Message(b) if b.first() == Some(&b'B')))
1228 .await
1229 .message()
1230 .unwrap();
1231 let then = ctx.recv().await.message().unwrap(); let last = ctx.recv().await.message().unwrap(); let _ = tx.send((matched, then, last));
1234 });
1235 for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
1236 assert!(rt.send(handle.pid(), m));
1237 }
1238 let (matched, then, last) = rx.await.unwrap();
1239 assert_eq!(matched, b"B".to_vec());
1240 assert_eq!(then, b"A".to_vec());
1241 assert_eq!(last, b"C".to_vec());
1242 handle.join().await;
1243 }
1244
1245 #[tokio::test]
1246 async fn recv_match_finds_a_previously_deferred_message() {
1247 let rt = Runtime::new();
1248 let (tx, rx) = tokio::sync::oneshot::channel();
1249 let handle = rt.spawn(|mut ctx| async move {
1250 let is = |byte: u8| move |m: &Received| matches!(m, Received::Message(b) if b.first() == Some(&byte));
1253 let c = ctx.recv_match(is(b'C')).await.message().unwrap();
1254 let b = ctx.recv_match(is(b'B')).await.message().unwrap();
1255 let a = ctx.recv().await.message().unwrap();
1256 let _ = tx.send((a, b, c));
1257 });
1258 for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
1259 assert!(rt.send(handle.pid(), m));
1260 }
1261 let (a, b, c) = rx.await.unwrap();
1262 assert_eq!((a, b, c), (b"A".to_vec(), b"B".to_vec(), b"C".to_vec()));
1263 handle.join().await;
1264 }
1265
1266 #[tokio::test]
1267 async fn send_to_unknown_pid_returns_false() {
1268 let rt = Runtime::new();
1269 assert!(!rt.send(Pid(424242), b"hi".to_vec()));
1270 }
1271
1272 #[tokio::test]
1273 async fn send_to_a_finished_process_returns_false() {
1274 let rt = Runtime::new();
1275 let handle = rt.spawn(|_| async {});
1276 let pid = handle.pid();
1277 handle.join().await; assert!(!rt.send(pid, b"too late".to_vec()));
1279 }
1280
1281 #[tokio::test]
1282 async fn tags_group_processes_and_kill_tag_terminates_only_that_group() {
1283 let rt = Runtime::new();
1284 let a = rt.spawn(|_| std::future::pending::<()>());
1285 let b = rt.spawn(|_| std::future::pending::<()>());
1286 let c = rt.spawn(|_| std::future::pending::<()>());
1287 assert!(rt.register_tag("plan:1", a.pid()));
1288 assert!(rt.register_tag("plan:1", b.pid()));
1289 assert!(rt.register_tag("plan:2", c.pid())); let mut g1 = rt.whereis_tag("plan:1");
1292 g1.sort_by_key(|p| p.0);
1293 let mut want = vec![a.pid(), b.pid()];
1294 want.sort_by_key(|p| p.0);
1295 assert_eq!(g1, want);
1296 assert_eq!(rt.whereis_tag("plan:2"), vec![c.pid()]);
1297 assert!(rt.whereis_tag("nope").is_empty());
1298
1299 assert_eq!(rt.kill_tag("plan:1"), 2);
1300 a.join().await;
1301 b.join().await;
1302 assert!(rt.whereis_tag("plan:1").is_empty()); assert!(rt.is_alive(c.pid())); assert_eq!(rt.whereis_tag("plan:2"), vec![c.pid()]);
1305 c.kill();
1306 c.join().await;
1307 }
1308
1309 #[tokio::test]
1310 async fn a_dead_process_leaves_its_tags_and_cannot_be_re_tagged() {
1311 let rt = Runtime::new();
1312 let a = rt.spawn(|_| std::future::pending::<()>());
1313 let pid = a.pid();
1314 assert!(rt.register_tag("g", pid));
1315 a.kill();
1316 a.join().await;
1317 assert!(rt.whereis_tag("g").is_empty()); assert!(!rt.register_tag("g", pid)); }
1320
1321 #[tokio::test]
1322 async fn a_process_holds_multiple_tags_and_can_leave_one() {
1323 let rt = Runtime::new();
1324 let a = rt.spawn(|_| std::future::pending::<()>());
1325 assert!(rt.register_tag("x", a.pid()));
1326 assert!(rt.register_tag("y", a.pid()));
1327 assert!(rt.register_tag("x", a.pid())); assert_eq!(rt.whereis_tag("x"), vec![a.pid()]);
1329
1330 assert!(rt.unregister_tag("x", a.pid()));
1331 assert!(rt.whereis_tag("x").is_empty()); assert_eq!(rt.whereis_tag("y"), vec![a.pid()]); assert!(!rt.unregister_tag("x", a.pid())); a.kill();
1335 a.join().await;
1336 }
1337
1338 #[tokio::test]
1339 async fn kill_tag_of_an_unknown_group_is_zero() {
1340 let rt = Runtime::new();
1341 assert_eq!(rt.kill_tag("ghost"), 0);
1342 }
1343
1344 #[tokio::test]
1345 async fn killing_a_parked_receiver_stops_it_and_cleans_up() {
1346 let rt = Runtime::new();
1349 let handle = rt.spawn(|mut ctx| async move {
1350 let _forever = ctx.recv().await;
1351 });
1352 let pid = handle.pid();
1353 assert!(rt.is_alive(pid));
1354 handle.kill();
1355 handle.join().await;
1356 assert!(!rt.is_alive(pid));
1357 assert_eq!(rt.finished(), 1);
1358 }
1359
1360 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1361 async fn two_processes_play_ping_pong() {
1362 let rt = Runtime::new();
1366 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1367
1368 let ponger_rt = rt.clone();
1369 let ponger = rt.spawn(move |mut ctx| async move {
1370 let ball = ctx.recv().await.message().unwrap();
1371 let reply_to = Pid::from_raw(u64::from_le_bytes(ball[..8].try_into().unwrap()));
1372 ponger_rt.send(reply_to, b"pong".to_vec());
1373 });
1374 let ponger_pid = ponger.pid();
1375
1376 let pinger_rt = rt.clone();
1377 let pinger = rt.spawn(move |mut ctx| async move {
1378 let mut ball = ctx.pid().raw().to_le_bytes().to_vec();
1379 ball.extend_from_slice(b"ping");
1380 pinger_rt.send(ponger_pid, ball);
1381 let _ = done_tx.send(ctx.recv().await.message().unwrap());
1382 });
1383
1384 assert_eq!(done_rx.await.unwrap(), b"pong".to_vec());
1385 pinger.join().await;
1386 ponger.join().await;
1387 }
1388
1389 #[tokio::test]
1390 async fn a_process_runs_to_completion_and_is_cleaned_up() {
1391 let rt = Runtime::new();
1392 let handle = rt.spawn(|_| async {});
1393 let pid = handle.pid();
1394 handle.join().await;
1395 assert_eq!(rt.spawned(), 1);
1396 assert_eq!(rt.finished(), 1);
1397 assert_eq!(rt.process_count(), 0);
1398 assert!(!rt.is_alive(pid));
1399 }
1400
1401 #[tokio::test]
1402 async fn body_receives_its_own_pid() {
1403 let rt = Runtime::new();
1404 let (tx, rx) = tokio::sync::oneshot::channel();
1405 let handle = rt.spawn(move |ctx| async move {
1406 assert_eq!(
1407 format!("{ctx:?}"),
1408 format!("Context {{ pid: {:?} }}", ctx.pid())
1409 );
1410 let _ = tx.send(ctx.pid());
1411 });
1412 let pid = handle.pid();
1413 assert_eq!(rx.await.unwrap(), pid);
1414 handle.join().await;
1415 }
1416
1417 #[tokio::test]
1418 async fn pids_are_unique_and_increasing() {
1419 let rt = Runtime::new();
1420 let a = rt.spawn(|_| async {});
1421 let b = rt.spawn(|_| async {});
1422 assert_ne!(a.pid(), b.pid());
1423 assert!(b.pid().raw() > a.pid().raw());
1424 a.join().await;
1425 b.join().await;
1426 }
1427
1428 #[tokio::test]
1429 async fn kill_terminates_a_running_process() {
1430 let rt = Runtime::new();
1431 let handle = rt.spawn(|_| std::future::pending::<()>());
1434 let pid = handle.pid();
1435 assert!(rt.is_alive(pid));
1436 handle.kill();
1437 handle.join().await;
1438 assert!(!rt.is_alive(pid));
1439 assert_eq!(rt.process_count(), 0);
1440 assert_eq!(rt.finished(), 1);
1441 }
1442
1443 #[tokio::test]
1444 async fn runtime_kill_signals_a_live_process() {
1445 let rt = Runtime::new();
1446 let handle = rt.spawn(|_| std::future::pending::<()>());
1447 let pid = handle.pid();
1448 assert!(rt.kill(pid));
1449 handle.join().await;
1450 assert!(!rt.is_alive(pid));
1451 }
1452
1453 #[tokio::test]
1454 async fn kill_unknown_pid_returns_false() {
1455 let rt = Runtime::new();
1456 assert!(!rt.kill(Pid(999)));
1457 }
1458
1459 #[tokio::test]
1460 async fn a_panicking_body_is_still_cleaned_up() {
1461 let rt = Runtime::new();
1462 let handle = rt.spawn(|_| async { panic!("boom") });
1463 handle.join().await; assert_eq!(rt.process_count(), 0);
1465 assert_eq!(rt.finished(), 1);
1466 }
1467
1468 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1469 async fn spawns_many_processes_concurrently() {
1470 let rt = Runtime::new();
1471 let handles: Vec<_> = (0..1000).map(|_| rt.spawn(|_| async {})).collect();
1472 for handle in handles {
1473 handle.join().await;
1474 }
1475 assert_eq!(rt.spawned(), 1000);
1476 assert_eq!(rt.finished(), 1000);
1477 assert_eq!(rt.process_count(), 0);
1478 }
1479
1480 fn watch(rt: &Runtime) -> (Pid, tokio::sync::oneshot::Receiver<Received>) {
1486 let (tx, rx) = tokio::sync::oneshot::channel();
1487 let pid = rt
1488 .spawn(move |mut ctx| async move {
1489 let item = ctx.recv().await;
1490 let _ = tx.send(item);
1491 std::future::pending::<()>().await;
1492 })
1493 .pid();
1494 (pid, rx)
1495 }
1496
1497 fn gated<F>(rt: &Runtime, ending: F) -> (Pid, tokio::sync::oneshot::Sender<()>)
1500 where
1501 F: FnOnce() + Send + 'static,
1502 {
1503 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1504 let pid = rt
1505 .spawn(move |_| async move {
1506 let _ = go_rx.await;
1507 ending();
1508 })
1509 .pid();
1510 (pid, go_tx)
1511 }
1512
1513 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1514 async fn monitor_reports_each_kind_of_exit() {
1515 let rt = Runtime::new();
1516
1517 let (w1, d1) = watch(&rt);
1519 let (t1, go1) = gated(&rt, || {});
1520 let r1 = rt.monitor(w1, t1);
1521 let _ = go1.send(());
1522 assert_eq!(
1523 d1.await.unwrap(),
1524 Received::Down {
1525 reference: r1,
1526 pid: t1,
1527 reason: ExitReason::Normal
1528 }
1529 );
1530
1531 let (w2, d2) = watch(&rt);
1533 let (t2, go2) = gated(&rt, || panic!("boom"));
1534 let r2 = rt.monitor(w2, t2);
1535 let _ = go2.send(());
1536 assert_eq!(
1537 d2.await.unwrap(),
1538 Received::Down {
1539 reference: r2,
1540 pid: t2,
1541 reason: ExitReason::Crashed
1542 }
1543 );
1544
1545 let (w3, d3) = watch(&rt);
1547 let t3 = rt.spawn(|_| std::future::pending::<()>()).pid();
1548 let r3 = rt.monitor(w3, t3);
1549 assert!(rt.kill(t3));
1550 assert_eq!(
1551 d3.await.unwrap(),
1552 Received::Down {
1553 reference: r3,
1554 pid: t3,
1555 reason: ExitReason::Killed
1556 }
1557 );
1558 }
1559
1560 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1561 async fn monitoring_a_dead_process_reports_noproc_at_once() {
1562 let rt = Runtime::new();
1563 let dead = rt.spawn(|_| async {});
1564 let dead_pid = dead.pid();
1565 dead.join().await;
1566
1567 let (watcher, down) = watch(&rt);
1568 let reference = rt.monitor(watcher, dead_pid);
1569 assert_eq!(
1570 down.await.unwrap(),
1571 Received::Down {
1572 reference,
1573 pid: dead_pid,
1574 reason: ExitReason::NoProc
1575 }
1576 );
1577 }
1578
1579 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1580 async fn an_abnormal_exit_cascades_down_links_with_its_reason() {
1581 let rt = Runtime::new();
1582 let peer = rt.spawn(|_| std::future::pending::<()>()).pid();
1583 let (crasher, go) = gated(&rt, || panic!("boom"));
1584 rt.link(peer, crasher);
1585
1586 let (watcher, down) = watch(&rt);
1589 let reference = rt.monitor(watcher, peer);
1590
1591 let _ = go.send(());
1592 assert_eq!(
1593 down.await.unwrap(),
1594 Received::Down {
1595 reference,
1596 pid: peer,
1597 reason: ExitReason::Crashed
1598 }
1599 );
1600 assert!(!rt.is_alive(peer));
1601 }
1602
1603 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1604 async fn a_normal_exit_does_not_cascade() {
1605 let rt = Runtime::new();
1606 let survivor = rt.spawn(|_| std::future::pending::<()>());
1607 let (quitter, go) = gated(&rt, || {});
1608 rt.link(survivor.pid(), quitter);
1609
1610 let _ = go.send(());
1611 while rt.is_alive(quitter) {
1614 tokio::task::yield_now().await;
1615 }
1616 assert!(
1617 rt.is_alive(survivor.pid()),
1618 "a normal exit must not kill links"
1619 );
1620 survivor.kill();
1621 }
1622
1623 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1624 async fn a_trapping_process_gets_an_exit_message_instead_of_dying() {
1625 let rt = Runtime::new();
1626 let (tx, rx) = tokio::sync::oneshot::channel();
1627 let trapper = rt.spawn(move |mut ctx| async move {
1628 let item = ctx.recv().await;
1629 let _ = tx.send(item);
1630 std::future::pending::<()>().await; });
1632 rt.set_trap_exit(trapper.pid(), true);
1633
1634 let (child, go) = gated(&rt, || panic!("boom"));
1635 rt.link(trapper.pid(), child);
1636 let _ = go.send(());
1637
1638 assert_eq!(
1639 rx.await.unwrap(),
1640 Received::Exit {
1641 from: child,
1642 reason: ExitReason::Crashed
1643 }
1644 );
1645 assert!(
1646 rt.is_alive(trapper.pid()),
1647 "a trapping process must survive"
1648 );
1649 trapper.kill();
1650 }
1651
1652 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1653 async fn spawn_link_links_the_child_to_its_parent() {
1654 let rt = Runtime::new();
1655 let (tx, rx) = tokio::sync::oneshot::channel();
1656 let parent = rt.spawn(move |mut ctx| async move {
1657 let item = ctx.recv().await;
1658 let _ = tx.send(item);
1659 std::future::pending::<()>().await;
1660 });
1661 rt.set_trap_exit(parent.pid(), true);
1662
1663 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1664 let child = rt
1665 .spawn_link(parent.pid(), move |_| async move {
1666 let _ = go_rx.await;
1667 panic!("boom");
1668 })
1669 .pid();
1670 let _ = go_tx.send(());
1671
1672 assert_eq!(
1673 rx.await.unwrap(),
1674 Received::Exit {
1675 from: child,
1676 reason: ExitReason::Crashed
1677 }
1678 );
1679 parent.kill();
1680 }
1681
1682 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1683 async fn unlinking_stops_propagation() {
1684 let rt = Runtime::new();
1685 let survivor = rt.spawn(|_| std::future::pending::<()>());
1686 let (crasher, go) = gated(&rt, || panic!("boom"));
1687 rt.link(survivor.pid(), crasher);
1688 rt.unlink(survivor.pid(), crasher);
1689
1690 let _ = go.send(());
1691 while rt.is_alive(crasher) {
1692 tokio::task::yield_now().await;
1693 }
1694 assert!(
1695 rt.is_alive(survivor.pid()),
1696 "an unlinked peer must not be taken down"
1697 );
1698 survivor.kill();
1699 }
1700
1701 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1702 async fn linking_a_dead_peer_leaves_no_half_link() {
1703 let rt = Runtime::new();
1704 let alive = rt.spawn(|_| std::future::pending::<()>());
1705 let dead = rt.spawn(|_| async {});
1706 let dead_pid = dead.pid();
1707 dead.join().await;
1708
1709 rt.link(alive.pid(), dead_pid);
1711
1712 let (crasher, go) = gated(&rt, || panic!("boom"));
1715 rt.link(alive.pid(), crasher);
1716 let _ = go.send(());
1717 while rt.is_alive(alive.pid()) {
1718 tokio::task::yield_now().await;
1719 }
1720 assert!(!rt.is_alive(crasher));
1721 }
1722
1723 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1724 async fn exit_terminates_with_the_chosen_reason() {
1725 let rt = Runtime::new();
1726 let (watcher, down) = watch(&rt);
1727 let target = rt.spawn(|_| std::future::pending::<()>()).pid();
1728 let reference = rt.monitor(watcher, target);
1729
1730 assert!(rt.exit(target, ExitReason::Crashed));
1732 assert_eq!(
1733 down.await.unwrap(),
1734 Received::Down {
1735 reference,
1736 pid: target,
1737 reason: ExitReason::Crashed
1738 }
1739 );
1740 assert!(!rt.exit(Pid::from_raw(987_654), ExitReason::Normal)); }
1742
1743 #[tokio::test]
1744 async fn link_and_trap_on_missing_processes_are_no_ops() {
1745 let rt = Runtime::new();
1746 let p = rt.spawn(|_| std::future::pending::<()>());
1747 let dead = Pid::from_raw(999_999);
1748 rt.link(p.pid(), p.pid()); rt.link(dead, p.pid()); rt.link(p.pid(), dead); rt.unlink(dead, p.pid()); rt.set_trap_exit(dead, true); assert!(rt.is_alive(p.pid()));
1754 p.kill();
1755 }
1756
1757 #[tokio::test]
1760 async fn register_whereis_send_named_then_auto_release_on_exit() {
1761 let rt = Runtime::new();
1762 let (tx, rx) = tokio::sync::oneshot::channel();
1763 let worker = rt.spawn(move |mut ctx| async move {
1764 let job = ctx.recv().await.message().unwrap();
1765 let _ = tx.send(job);
1766 });
1767 assert!(rt.register("worker", worker.pid()));
1768 assert_eq!(rt.whereis("worker"), Some(worker.pid()));
1769 assert!(!rt.register("worker", worker.pid())); assert!(rt.send_named("worker", b"job".to_vec()));
1771 assert_eq!(rx.await.unwrap(), b"job".to_vec());
1772
1773 worker.join().await; assert_eq!(rt.whereis("worker"), None);
1775 assert!(!rt.send_named("worker", b"late".to_vec()));
1776 }
1777
1778 #[tokio::test]
1779 async fn names_are_released_by_unregister_and_reusable_after_death() {
1780 let rt = Runtime::new();
1781 let a = rt.spawn(|_| std::future::pending::<()>());
1782 assert!(rt.register("svc", a.pid()));
1783 assert!(rt.unregister("svc"));
1784 assert_eq!(rt.whereis("svc"), None);
1785 assert!(!rt.unregister("svc")); assert!(rt.register("svc", a.pid()));
1788 a.kill();
1789 a.join().await;
1790 assert_eq!(rt.whereis("svc"), None);
1791 let b = rt.spawn(|_| std::future::pending::<()>());
1792 assert!(rt.register("svc", b.pid())); b.kill();
1794 }
1795
1796 #[tokio::test]
1797 async fn register_to_a_dead_pid_fails_and_a_pid_can_hold_several_names() {
1798 let rt = Runtime::new();
1799 let dead = rt.spawn(|_| async {});
1800 let dead_pid = dead.pid();
1801 dead.join().await;
1802 assert!(!rt.register("ghost", dead_pid));
1803
1804 let p = rt.spawn(|_| std::future::pending::<()>());
1805 assert!(rt.register("one", p.pid()));
1806 assert!(rt.register("two", p.pid()));
1807 assert_eq!(rt.whereis("one"), Some(p.pid()));
1808 assert_eq!(rt.whereis("two"), Some(p.pid()));
1809 p.kill();
1810 p.join().await;
1811 assert_eq!(rt.whereis("one"), None); assert_eq!(rt.whereis("two"), None);
1813 }
1814
1815 #[tokio::test]
1816 async fn whereis_or_spawn_returns_the_incumbent_without_spawning() {
1817 let rt = Runtime::new();
1818 let first = rt.whereis_or_spawn("svc", |_| std::future::pending::<()>());
1819 let before = rt.spawned();
1821 let again = rt.whereis_or_spawn("svc", |_| std::future::pending::<()>());
1822 assert_eq!(again, first, "must return the already-registered pid");
1823 assert_eq!(
1824 rt.spawned(),
1825 before,
1826 "must not spawn when the name is taken"
1827 );
1828 assert_eq!(rt.whereis("svc"), Some(first));
1829 rt.kill(first);
1830 }
1831
1832 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1833 async fn whereis_or_spawn_is_race_free_and_kills_the_loser() {
1834 let rt = Runtime::new();
1835 let mut tasks = Vec::new();
1837 for _ in 0..32 {
1838 let rt = rt.clone();
1839 tasks.push(tokio::spawn(async move {
1840 rt.whereis_or_spawn("singleton", |_| std::future::pending::<()>())
1841 }));
1842 }
1843 let mut pids = Vec::new();
1844 for t in tasks {
1845 pids.push(t.await.unwrap());
1846 }
1847 let winner = rt.whereis("singleton").expect("a winner is registered");
1849 assert!(
1850 pids.iter().all(|&p| p == winner),
1851 "all callers see the winner"
1852 );
1853 loop {
1855 if rt.process_count() == 1 {
1856 break;
1857 }
1858 tokio::task::yield_now().await;
1859 }
1860 assert_eq!(rt.whereis("singleton"), Some(winner));
1861 rt.kill(winner);
1862 }
1863
1864 #[tokio::test(start_paused = true)]
1865 async fn send_after_delivers_when_the_timer_fires() {
1866 let rt = Runtime::new();
1867 let (tx, rx) = tokio::sync::oneshot::channel();
1868 let target = rt.spawn(move |mut ctx| async move {
1869 let msg = ctx.recv().await.message().unwrap();
1870 let _ = tx.send(msg);
1871 });
1872 rt.send_after(target.pid(), Duration::from_secs(60), b"ding".to_vec());
1873 assert_eq!(rx.await.unwrap(), b"ding".to_vec());
1875 }
1876
1877 #[tokio::test(start_paused = true)]
1878 async fn a_cancelled_timer_never_fires() {
1879 let rt = Runtime::new();
1880 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1881 let target = rt.spawn(move |mut ctx| async move {
1882 loop {
1883 let _ = tx.send(ctx.recv().await);
1884 }
1885 });
1886 let timer = rt.send_after(target.pid(), Duration::from_secs(60), b"x".to_vec());
1887 timer.cancel();
1888 tokio::time::advance(Duration::from_secs(120)).await;
1889 tokio::task::yield_now().await; assert!(
1891 rx.try_recv().is_err(),
1892 "a cancelled timer must deliver nothing"
1893 );
1894 target.kill();
1895 }
1896
1897 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1898 async fn shutdown_stops_every_process() {
1899 let rt = Runtime::new();
1900 let procs: Vec<_> = (0..5)
1901 .map(|_| rt.spawn(|_| std::future::pending::<()>()))
1902 .collect();
1903 assert_eq!(rt.process_count(), 5);
1904 assert_eq!(rt.shutdown(), 5);
1905 for p in procs {
1906 p.join().await;
1907 }
1908 assert_eq!(rt.process_count(), 0);
1909 assert_eq!(rt.shutdown(), 0); }
1911
1912 #[tokio::test]
1915 async fn list_reflects_live_processes() {
1916 use std::collections::HashSet;
1917 let rt = Runtime::new();
1918 assert!(rt.list().is_empty());
1919 let a = rt.spawn(|_| std::future::pending::<()>());
1920 let b = rt.spawn(|_| std::future::pending::<()>());
1921 let live: HashSet<u64> = rt.list().iter().map(|p| p.raw()).collect();
1922 assert_eq!(live, HashSet::from([a.pid().raw(), b.pid().raw()]));
1923 a.kill();
1924 a.join().await;
1925 assert_eq!(rt.list(), vec![b.pid()]);
1926 b.kill();
1927 }
1928
1929 #[tokio::test]
1930 async fn info_reports_links_names_label_and_trap() {
1931 let rt = Runtime::new();
1932 let p = rt.spawn(|_| std::future::pending::<()>());
1933 let peer = rt.spawn(|_| std::future::pending::<()>());
1934 rt.link(p.pid(), peer.pid());
1935 assert!(rt.register("svc", p.pid()));
1936 rt.set_trap_exit(p.pid(), true);
1937 assert!(rt.set_label(p.pid(), "worker #1"));
1938
1939 let info = rt.info(p.pid()).unwrap();
1940 assert_eq!(info.pid, p.pid());
1941 assert_eq!(info.links, 1);
1942 assert_eq!(info.monitors, 0);
1943 assert_eq!(info.names, vec!["svc".to_string()]);
1944 assert_eq!(info.label.as_deref(), Some("worker #1"));
1945 assert!(info.trap_exit);
1946 assert_eq!(info.mailbox_depth, 0);
1947 p.kill();
1948 peer.kill();
1949 }
1950
1951 #[tokio::test]
1952 async fn info_and_set_label_on_a_dead_pid() {
1953 let rt = Runtime::new();
1954 let d = rt.spawn(|_| async {});
1955 let pid = d.pid();
1956 d.join().await;
1957 assert!(rt.info(pid).is_none());
1958 assert!(!rt.set_label(pid, "ghost"));
1959 }
1960
1961 #[tokio::test]
1962 async fn mailbox_depth_tracks_unconsumed_messages() {
1963 let rt = Runtime::with_mailbox_depth();
1964 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1965 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1966 let p = rt.spawn(move |mut ctx| async move {
1967 let _ = go_rx.await; for _ in 0..3 {
1969 ctx.recv().await;
1970 }
1971 let _ = done_tx.send(());
1972 std::future::pending::<()>().await;
1973 });
1974 for m in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
1977 assert!(rt.send(p.pid(), m));
1978 }
1979 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
1980
1981 let _ = go_tx.send(());
1982 let _ = done_rx.await; assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 0);
1984 p.kill();
1985 }
1986
1987 #[tokio::test]
1988 async fn a_bounded_mailbox_sheds_user_messages_past_capacity() {
1989 let rt = Runtime::with_mailbox_capacity(3);
1990 let p = rt.spawn(|ctx| async move {
1992 let _hold = ctx; std::future::pending::<()>().await;
1994 });
1995 for i in 0..10u8 {
1998 rt.send(p.pid(), vec![i]);
1999 }
2000 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
2001 assert_eq!(rt.dropped_messages(), 7);
2002 p.kill();
2003 }
2004
2005 #[tokio::test]
2006 async fn a_full_mailbox_still_accepts_system_signals() {
2007 let rt = Runtime::with_mailbox_capacity(2);
2008 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2009 let (report_tx, report_rx) = tokio::sync::oneshot::channel();
2010 let watcher = rt.spawn(move |mut ctx| async move {
2011 let _ = go_rx.await; let mut got = Vec::new();
2013 for _ in 0..3 {
2014 got.push(ctx.recv().await);
2015 }
2016 let _ = report_tx.send(got);
2017 });
2018 let wpid = watcher.pid();
2019
2020 let target = rt.spawn(|ctx| async move {
2022 let _hold = ctx;
2023 std::future::pending::<()>().await;
2024 });
2025 let tpid = target.pid();
2026 rt.monitor(wpid, tpid);
2027
2028 assert!(rt.send(wpid, b"a".to_vec()));
2030 assert!(rt.send(wpid, b"b".to_vec()));
2031 assert!(!rt.send(wpid, b"c".to_vec()));
2032 assert_eq!(rt.dropped_messages(), 1);
2033
2034 rt.kill(tpid);
2037 for _ in 0..500 {
2038 if rt.info(wpid).map(|i| i.mailbox_depth) == Some(3) {
2039 break;
2040 }
2041 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
2042 }
2043 assert_eq!(
2044 rt.info(wpid).unwrap().mailbox_depth,
2045 3,
2046 "the Down landed despite the full mailbox"
2047 );
2048
2049 let _ = go_tx.send(());
2051 let got = tokio::time::timeout(std::time::Duration::from_secs(5), report_rx)
2052 .await
2053 .expect("watcher never reported")
2054 .unwrap();
2055 let users = got
2056 .iter()
2057 .filter(|r| matches!(r, Received::Message(_)))
2058 .count();
2059 let downs = got
2060 .iter()
2061 .filter(|r| matches!(r, Received::Down { .. }))
2062 .count();
2063 assert_eq!(users, 2, "both queued user messages survive");
2064 assert_eq!(downs, 1, "the system Down was delivered, not shed");
2065 }
2066
2067 #[tokio::test]
2068 async fn mailbox_depth_counts_messages_deferred_by_selective_receive() {
2069 let rt = Runtime::with_mailbox_depth();
2070 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2071 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
2072 let p = rt.spawn(move |mut ctx| async move {
2073 let _ = go_rx.await;
2074 let _ = ctx
2076 .recv_match(|m| matches!(m, Received::Message(b) if b.first() == Some(&b'B')))
2077 .await;
2078 let _ = done_tx.send(());
2079 std::future::pending::<()>().await;
2080 });
2081 for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
2082 assert!(rt.send(p.pid(), m));
2083 }
2084 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
2085
2086 let _ = go_tx.send(());
2087 let _ = done_rx.await;
2088 while rt.info(p.pid()).map_or(false, |i| i.mailbox_depth != 2) {
2090 tokio::task::yield_now().await;
2091 }
2092 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 2);
2093 p.kill();
2094 }
2095
2096 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2099 async fn a_stream_is_delivered_and_read_in_order_after_a_message() {
2100 use crate::stream::stream;
2101 let rt = Runtime::new();
2102 let (out_tx, out_rx) = tokio::sync::oneshot::channel();
2103 let p = rt.spawn(move |mut ctx| async move {
2104 let first = ctx.recv().await.message();
2106 let mut handle = ctx.recv().await.stream().expect("a stream");
2107 let mut chunks = Vec::new();
2108 while let Some(chunk) = handle.read().await {
2109 chunks.push(chunk);
2110 }
2111 let _ = out_tx.send((first, chunks));
2112 });
2113
2114 assert!(rt.send(p.pid(), b"hello".to_vec()));
2115 let (writer, handle) = stream();
2116 assert!(rt.send_stream(p.pid(), handle));
2117 tokio::spawn(async move {
2118 for chunk in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
2119 writer.write(chunk).await.unwrap();
2120 }
2121 });
2123
2124 let (first, chunks) = out_rx.await.unwrap();
2125 assert_eq!(first, Some(b"hello".to_vec()));
2126 assert_eq!(chunks, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
2127 p.join().await;
2128 }
2129
2130 #[tokio::test]
2131 async fn send_stream_to_a_dead_pid_returns_false() {
2132 use crate::stream::stream;
2133 let rt = Runtime::new();
2134 let (_writer, handle) = stream();
2135 assert!(!rt.send_stream(Pid::from_raw(123_456), handle));
2136 }
2137
2138 #[tokio::test]
2139 async fn a_stream_counts_toward_mailbox_depth_until_consumed() {
2140 use crate::stream::stream;
2141 let rt = Runtime::with_mailbox_depth();
2142 let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2143 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
2144 let p = rt.spawn(move |mut ctx| async move {
2145 let _ = go_rx.await;
2146 let _ = ctx.recv().await; let _ = done_tx.send(());
2148 std::future::pending::<()>().await;
2149 });
2150 let (_writer, handle) = stream();
2151 assert!(rt.send_stream(p.pid(), handle));
2152 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 1);
2153 let _ = go_tx.send(());
2154 let _ = done_rx.await;
2155 assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 0);
2156 p.kill();
2157 }
2158}