Skip to main content

rusm_otp/
runtime.rs

1use std::collections::{BTreeMap, HashSet, VecDeque};
2use std::fmt;
3use std::future::Future;
4use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use dashmap::mapref::entry::Entry;
9use dashmap::DashMap;
10use futures_util::future::{AbortHandle, Abortable};
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
12use tokio::sync::Notify;
13use tokio::task::JoinHandle;
14
15use crate::exit::{ExitReason, MonitorRef};
16use crate::message::{Message, Received};
17use crate::pid::Pid;
18use crate::stream::StreamHandle;
19
20/// What a process body receives when it starts: its own [`Pid`] and its
21/// **mailbox** — the receiving end of its message queue.
22pub struct Context {
23    pid: Pid,
24    mailbox: UnboundedReceiver<Received>,
25    /// Items pulled from the channel but skipped over by a selective
26    /// [`recv_match`](Context::recv_match), kept in arrival order. A later
27    /// receive sees them before anything still in the channel — the Erlang
28    /// "save queue". Empty (and allocation-free) unless selective receive is used.
29    saved: VecDeque<Received>,
30    /// Optional mailbox-depth counter (decrement side). `None` unless the runtime
31    /// was built with [`Runtime::with_mailbox_depth`] — so the default hot path
32    /// pays no allocation and no atomic.
33    depth: Option<Arc<AtomicUsize>>,
34}
35
36impl Context {
37    pub fn pid(&self) -> Pid {
38        self.pid
39    }
40
41    /// Records that one item left the mailbox (no-op unless depth is tracked).
42    fn note_consumed(&self) {
43        if let Some(depth) = &self.depth {
44            depth.fetch_sub(1, Ordering::Relaxed);
45        }
46    }
47
48    /// Receives the next item, suspending the process until one arrives (FIFO),
49    /// exactly like an Erlang `receive`. The result is usually a user
50    /// [`Received::Message`], but a process that monitors or trap-links others
51    /// also gets [`Received::Down`]/[`Received::Exit`] here, in arrival order. A
52    /// process blocked here parks with zero cost until something arrives or a
53    /// [`kill`](Runtime::kill) wakes it.
54    pub async fn recv(&mut self) -> Received {
55        let item = match self.saved.pop_front() {
56            Some(item) => item,
57            None => self.next_from_mailbox().await,
58        };
59        self.note_consumed();
60        item
61    }
62
63    /// Receives the next item for which `matches` is true, suspending until one
64    /// arrives. Items that don't match are left queued in arrival order for a
65    /// later receive — Erlang's selective `receive`. Already-saved items are
66    /// considered first, so this never reorders the mailbox.
67    pub async fn recv_match<F>(&mut self, mut matches: F) -> Received
68    where
69        F: FnMut(&Received) -> bool,
70    {
71        if let Some(pos) = self.saved.iter().position(&mut matches) {
72            let item = self.saved.remove(pos).expect("position is in bounds");
73            self.note_consumed();
74            return item;
75        }
76        loop {
77            let item = self.next_from_mailbox().await;
78            if matches(&item) {
79                self.note_consumed();
80                return item;
81            }
82            self.saved.push_back(item);
83        }
84    }
85
86    async fn next_from_mailbox(&mut self) -> Received {
87        // The sole sender lives in the process table, which the running task
88        // keeps alive through its own `Arc<Inner>`; it is removed only after this
89        // body returns. So while we are awaiting here the channel cannot close —
90        // a live process always has a message coming or is parked forever.
91        self.mailbox
92            .recv()
93            .await
94            .expect("a live process always holds its own mailbox sender")
95    }
96}
97
98impl fmt::Debug for Context {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        // The mailbox receiver isn't meaningfully printable; the pid identifies it.
101        f.debug_struct("Context").field("pid", &self.pid).finish()
102    }
103}
104
105/// A handle to a spawned process: address it ([`kill`](ProcessHandle::kill)) and
106/// await it ([`join`](ProcessHandle::join)).
107pub struct ProcessHandle {
108    pid: Pid,
109    abort: AbortHandle,
110    join: JoinHandle<()>,
111}
112
113impl ProcessHandle {
114    pub fn pid(&self) -> Pid {
115        self.pid
116    }
117
118    /// Stops the process at its next suspension point. Cleanup still runs (the
119    /// table entry is removed and the process counted finished), because that
120    /// lives on the body's drop path.
121    pub fn kill(&self) {
122        self.abort.abort();
123    }
124
125    /// Waits for the process to terminate (ignores a body panic or a kill).
126    pub async fn join(self) {
127        let _ = self.join.await;
128    }
129}
130
131/// A point-in-time snapshot of a live process for observability — the analogue
132/// of Erlang's `Process.info/1`. Cheap to produce (a single table lookup). Run
133/// vs. suspended *status* is deliberately omitted: Tokio doesn't expose a task's
134/// park state, and faking it would mislead.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct ProcessInfo {
137    pub pid: Pid,
138    /// Number of bidirectionally linked peers.
139    pub links: usize,
140    /// Number of processes monitoring this one.
141    pub monitors: usize,
142    /// Registry names this process holds.
143    pub names: Vec<String>,
144    /// The optional human-readable label (see [`Runtime::set_label`]).
145    pub label: Option<String>,
146    /// Items waiting in the mailbox (channel + save queue), not yet consumed.
147    pub mailbox_depth: usize,
148    /// Whether this process traps exits.
149    pub trap_exit: bool,
150}
151
152/// One process this entry records is monitoring us; on our exit it gets a
153/// [`Received::Down`] tagged with `reference`.
154struct Monitor {
155    watcher: Pid,
156    reference: MonitorRef,
157}
158
159/// What the runtime keeps for each live process. A process needs **only one
160/// channel** — the mailbox; exit signals ride it as [`Received`], and kill rides
161/// a `futures` abort handle (which exists *before* the task is spawned, so the
162/// whole entry is written in a single race-free insert). Erlang runtimes and
163/// Lunatic keep a *second*, signal channel per process; we don't.
164///
165/// `links`, `monitors` and `exit_reason` are empty/false/`None` for an ordinary
166/// process and cost no allocation — only fault-tolerant processes pay for them.
167struct ProcessEntry {
168    abort: AbortHandle,
169    mailbox: UnboundedSender<Received>,
170    /// When set, incoming exit signals arrive as [`Received::Exit`] messages
171    /// instead of killing this process (Erlang's `process_flag(trap_exit, true)`).
172    trap_exit: bool,
173    /// Bidirectionally linked peers — each also lists us.
174    links: Vec<Pid>,
175    /// Processes monitoring us.
176    monitors: Vec<Monitor>,
177    /// Names this process holds in the registry, released on exit.
178    names: Vec<String>,
179    /// Process-group tags this process holds (Erlang `pg`-style: one pid → many tags,
180    /// one tag → many pids), released on exit alongside `names`. Empty (zero-allocation)
181    /// until the process tags itself, so an untagged process costs nothing here.
182    tags: Vec<String>,
183    /// A reason staged by a link cascade, so this process exits with the
184    /// *original* reason rather than the bare `Killed` an abort would imply.
185    exit_reason: Option<ExitReason>,
186    /// An optional human-readable label for observability (Elixir's
187    /// `Process.set_label`), distinct from a registered name. `None` and
188    /// allocation-free until set.
189    label: Option<String>,
190    /// Optional mailbox-depth counter (increment side). `None` unless the runtime
191    /// tracks depth (see [`Runtime::with_mailbox_depth`]); shared with the
192    /// [`Context`] receive side and read by [`ProcessInfo`].
193    depth: Option<Arc<AtomicUsize>>,
194}
195
196impl ProcessEntry {
197    /// Records that one item entered the mailbox (no-op unless depth is tracked).
198    fn note_enqueued(&self) {
199        if let Some(depth) = &self.depth {
200            depth.fetch_add(1, Ordering::Relaxed);
201        }
202    }
203
204    /// Current mailbox depth (0 unless depth is tracked — which it always is when a
205    /// capacity is set, since capacity is enforced against this count).
206    fn depth_value(&self) -> usize {
207        self.depth.as_ref().map_or(0, |d| d.load(Ordering::Relaxed))
208    }
209}
210
211#[derive(Default)]
212struct Inner {
213    /// Whether to track per-process mailbox depth (off by default). Off means a
214    /// spawn allocates no counter and send/recv do no atomics — see
215    /// [`Runtime::with_mailbox_depth`].
216    track_depth: bool,
217    /// Opt-in per-process mailbox capacity (off by default → unbounded). When set,
218    /// `enqueue` sheds *user* messages once a mailbox holds this many — overload
219    /// protection. System signals are never shed. See
220    /// [`Runtime::with_mailbox_capacity`].
221    mailbox_capacity: Option<usize>,
222    /// User messages shed because a bounded mailbox was at capacity.
223    dropped: AtomicU64,
224    /// **Platform lifecycle log** verbosity (a [`LogLevel`] rank; `0` = `Off`, the
225    /// default → zero hot-path cost). When above `Off`, a labeled process's spawn /
226    /// exit / restart is logged to stderr at its event level. Set via
227    /// [`Runtime::set_log_level`].
228    log_level: std::sync::atomic::AtomicU8,
229    next_id: AtomicU64,
230    next_ref: AtomicU64,
231    spawned: AtomicU64,
232    finished: AtomicU64,
233    // Sharded concurrent map: spawners and completers mostly touch different
234    // shards, so the process table isn't a global-lock bottleneck under a storm.
235    table: DashMap<u64, ProcessEntry>,
236    // name -> pid. Sharded too, so name lookups never take a global lock the way
237    // Lunatic's single `RwLock<HashMap>` registry does.
238    registry: DashMap<String, u64>,
239    // tag -> live pids: Erlang `pg`-style process groups (one pid → many tags, one tag →
240    // many pids). Sharded like the registry; touched only by register/unregister/whereis/
241    // kill_tag and the exit reaper — never by spawn or message passing, so the hot path is
242    // unaffected. Members are removed on exit (see `deregister`).
243    tags: DashMap<String, HashSet<u64>>,
244    /// Bumped on each labeled-process state change (a spawn gets named, or a labeled
245    /// process exits) **only when logging is at `Info`+** — the census task reads it to
246    /// tell "something happened since my last line" from "nothing changed". This is what
247    /// distinguishes a real spawn+exit (gen advanced twice, net-same counts → *still*
248    /// logged) from a quiet node (gen unchanged → silent).
249    census_gen: AtomicU64,
250    /// Woken alongside [`census_gen`] so the census task parks at zero cost while idle;
251    /// a `Notify` holds at most one permit, so a burst coalesces to a single wake.
252    census_dirty: Notify,
253    /// Guards the single census task — spawned once, on the first `Info`+ enable.
254    census_started: AtomicBool,
255}
256
257/// How long the census task lets process-state changes settle before emitting one
258/// summary — a burst (e.g. a fan-out of workers) coalesces into a single line.
259const CENSUS_DEBOUNCE: Duration = Duration::from_secs(2);
260
261impl Inner {
262    /// Whether the configured platform-log level is at least `level` — the single
263    /// place the level rank is compared (the gate behind every lifecycle line).
264    fn wants(&self, level: crate::LogLevel) -> bool {
265        self.log_level.load(Ordering::Relaxed) >= level as u8
266    }
267
268    /// Record a labeled-process state change for the census: advance the generation
269    /// (so the task knows something happened, even if counts net out unchanged) and wake
270    /// it. The single place both spawn-naming and exit funnel through; call only when
271    /// [`wants`](Self::wants)`(Info)` (the caller gates it, so off pays nothing).
272    fn note_census(&self) {
273        self.census_gen.fetch_add(1, Ordering::Relaxed);
274        self.census_dirty.notify_one();
275    }
276
277    /// Enqueues `item` into `to`'s mailbox if it is still alive, keeping the
278    /// mailbox-depth counter in step. The single place a mailbox grows — used by
279    /// user sends, stream sends, and system deliveries alike. Returns whether it
280    /// landed. (The exit cascade in [`propagate_exit`] enqueues inline because it
281    /// already holds the entry lock.)
282    fn enqueue(&self, to: Pid, item: Received) -> bool {
283        match self.table.get(&to.0) {
284            Some(entry) => {
285                // Opt-in overload protection: once a bounded mailbox is at
286                // capacity, shed further *user* messages. System signals (exits,
287                // monitor downs — delivered via `propagate_exit`/`deliver`) are
288                // never shed, so back-pressure never breaks supervision.
289                if let Some(cap) = self.mailbox_capacity {
290                    if matches!(item, Received::Message(_)) && entry.depth_value() >= cap {
291                        self.dropped.fetch_add(1, Ordering::Relaxed);
292                        return false;
293                    }
294                }
295                if entry.mailbox.send(item).is_ok() {
296                    entry.note_enqueued();
297                    true
298                } else {
299                    false
300                }
301            }
302            None => false,
303        }
304    }
305
306    /// Delivers a system item to `to`'s mailbox if it is still alive.
307    fn deliver(&self, to: Pid, item: Received) {
308        self.enqueue(to, item);
309    }
310
311    /// Removes `pid`, counts it finished, and fans its exit out to everyone who
312    /// was watching: a [`Received::Down`] to each monitor and a propagated exit to
313    /// each link. A staged cascade reason (see [`ProcessEntry::exit_reason`])
314    /// overrides `reason`.
315    fn deregister(&self, pid: Pid, reason: ExitReason) {
316        let Some((_, entry)) = self.table.remove(&pid.0) else {
317            return;
318        };
319        self.finished.fetch_add(1, Ordering::Relaxed);
320        let reason = entry.exit_reason.unwrap_or(reason);
321
322        // Platform log (opt-in): only labeled processes — i.e. components the host
323        // named — so the stream is signal, not internal plumbing. `for_exit` is the one
324        // place that maps a reason to its level (crash → Error, kill → Warn, clean → Info).
325        if let Some(label) = &entry.label {
326            if self.wants(crate::LogLevel::for_exit(reason)) {
327                crate::lifecycle::log_exit(pid, label, reason);
328            }
329            // A labeled process ended → census activity; recount (debounced).
330            if self.wants(crate::LogLevel::Info) {
331                self.note_census();
332            }
333        }
334
335        for name in &entry.names {
336            self.registry.remove(name);
337        }
338        // Release process-group memberships the same way (and drop a group that empties),
339        // so `whereis_tag`/`kill_tag` only ever see live members.
340        for tag in &entry.tags {
341            if let Some(mut members) = self.tags.get_mut(tag) {
342                members.remove(&pid.0);
343            }
344            self.tags.remove_if(tag, |_, members| members.is_empty());
345        }
346        for monitor in entry.monitors {
347            self.deliver(
348                monitor.watcher,
349                Received::Down {
350                    reference: monitor.reference,
351                    pid,
352                    reason,
353                },
354            );
355        }
356        for peer in entry.links {
357            self.propagate_exit(peer, pid, reason);
358        }
359    }
360
361    /// Applies `from`'s exit to a linked `peer`: a trapping peer gets a
362    /// [`Received::Exit`] message; an ordinary peer is taken down too on an
363    /// abnormal exit (the cascade), carrying the same reason.
364    fn propagate_exit(&self, peer: Pid, from: Pid, reason: ExitReason) {
365        let Some(mut entry) = self.table.get_mut(&peer.0) else {
366            return;
367        };
368        entry.links.retain(|&linked| linked != from);
369        if entry.trap_exit {
370            if entry.mailbox.send(Received::Exit { from, reason }).is_ok() {
371                entry.note_enqueued();
372            }
373        } else if reason.is_abnormal() {
374            entry.exit_reason = Some(reason);
375            entry.abort.abort();
376        }
377    }
378}
379
380/// Spawns and tracks lightweight processes. Cheap to clone — clones share the
381/// same process table and counters.
382#[derive(Clone, Default)]
383pub struct Runtime {
384    inner: Arc<Inner>,
385}
386
387impl Runtime {
388    pub fn new() -> Self {
389        Self::default()
390    }
391
392    /// Like [`new`](Runtime::new) but **tracks per-process mailbox depth**, so
393    /// [`info`](Runtime::info) reports it. This costs a per-spawn counter
394    /// allocation and a relaxed atomic per send/receive, so it's opt-in: enable it
395    /// for an observer/REPL node; leave it off (the default) for peak throughput.
396    pub fn with_mailbox_depth() -> Self {
397        Self {
398            inner: Arc::new(Inner {
399                track_depth: true,
400                ..Default::default()
401            }),
402        }
403    }
404
405    /// Like [`new`](Runtime::new) but **bounds each process mailbox** at `capacity`
406    /// user messages. Once a mailbox holds that many, further *user* messages are
407    /// **shed** — dropped and counted in
408    /// [`dropped_messages`](Runtime::dropped_messages) — rather than growing memory
409    /// without bound under a producer faster than its consumer (Erlang's
410    /// `max_heap_size` / a bounded `GenStage`, in spirit).
411    ///
412    /// **System signals are never shed:** exit and monitor-down messages always
413    /// land (they ride the same mailbox but bypass the capacity check), so overload
414    /// back-pressure can never break links, monitors, or supervision. Enabling this
415    /// also tracks mailbox depth — that count is how capacity is enforced.
416    pub fn with_mailbox_capacity(capacity: usize) -> Self {
417        Self {
418            inner: Arc::new(Inner {
419                track_depth: true,
420                mailbox_capacity: Some(capacity),
421                ..Default::default()
422            }),
423        }
424    }
425
426    /// Spawns a process running `body`, returning a handle to it. The body is a
427    /// plain async closure today; in Phase 6 a Wasm instance becomes another
428    /// kind of body behind the same API.
429    pub fn spawn<F, Fut>(&self, body: F) -> ProcessHandle
430    where
431        F: FnOnce(Context) -> Fut,
432        Fut: Future<Output = ()> + Send + 'static,
433    {
434        self.spawn_entry(Vec::new(), body).0
435    }
436
437    /// Like [`spawn`](Runtime::spawn), but the child is **linked** to `parent`
438    /// before it runs — so the link is in place even if the child exits
439    /// immediately, with no race (Erlang's `spawn_link`).
440    pub fn spawn_link<F, Fut>(&self, parent: Pid, body: F) -> ProcessHandle
441    where
442        F: FnOnce(Context) -> Fut,
443        Fut: Future<Output = ()> + Send + 'static,
444    {
445        let (handle, child) = self.spawn_entry(vec![parent], body);
446        if let Some(mut entry) = self.inner.table.get_mut(&parent.0) {
447            entry.links.push(child);
448        }
449        handle
450    }
451
452    fn spawn_entry<F, Fut>(&self, links: Vec<Pid>, body: F) -> (ProcessHandle, Pid)
453    where
454        F: FnOnce(Context) -> Fut,
455        Fut: Future<Output = ()> + Send + 'static,
456    {
457        let pid = Pid(self.inner.next_id.fetch_add(1, Ordering::Relaxed));
458        let (mailbox, mailbox_rx) = unbounded_channel();
459        let (abort, abort_registration) = AbortHandle::new_pair();
460        // No allocation unless depth tracking is on (default off — see
461        // `with_mailbox_depth`), keeping the spawn hot path allocation-lean.
462        let depth = self
463            .inner
464            .track_depth
465            .then(|| Arc::new(AtomicUsize::new(0)));
466
467        // One write registers the whole process *before* it is spawned: a message
468        // sent the instant after can't be lost, the reaper's remove always
469        // balances this insert, and `kill`/`link` can already reach it.
470        self.inner.table.insert(
471            pid.0,
472            ProcessEntry {
473                abort: abort.clone(),
474                mailbox,
475                trap_exit: false,
476                links,
477                monitors: Vec::new(),
478                names: Vec::new(),
479                tags: Vec::new(),
480                exit_reason: None,
481                label: None,
482                depth: depth.clone(),
483            },
484        );
485        self.inner.spawned.fetch_add(1, Ordering::Relaxed);
486
487        let body = body(Context {
488            pid,
489            mailbox: mailbox_rx,
490            saved: VecDeque::new(),
491            depth,
492        });
493        // The guard is moved *into* the task, so the process is deregistered on
494        // every teardown path: completion, panic (drop runs during unwind), or a
495        // kill (which makes `Abortable` resolve, ending the task).
496        let guard = ProcessGuard {
497            pid,
498            inner: Arc::clone(&self.inner),
499            reason: ExitReason::Killed,
500        };
501        let join = tokio::spawn(run(guard, Abortable::new(body, abort_registration)));
502        (ProcessHandle { pid, abort, join }, pid)
503    }
504
505    /// Delivers `message` to `pid`'s mailbox. Returns `false` if there is no such
506    /// live process — sending to a dead process is a silent no-op, like Erlang.
507    pub fn send(&self, pid: Pid, message: Message) -> bool {
508        self.inner.enqueue(pid, Received::Message(message))
509    }
510
511    /// Delivers a byte `stream` to `pid` as a [`Received::Stream`]. Like
512    /// [`send`](Runtime::send), returns `false` if there's no such live process.
513    /// The recipient reads chunks at its own pace; back-pressure flows to the
514    /// writer (the channel is bounded). The stream itself is the Wasm-free
515    /// substrate the p3 component bridge maps `stream<u8>` onto.
516    pub fn send_stream(&self, pid: Pid, stream: StreamHandle) -> bool {
517        self.inner.enqueue(pid, Received::Stream(stream))
518    }
519
520    /// Number of currently-live processes.
521    pub fn process_count(&self) -> usize {
522        self.inner.table.len()
523    }
524
525    /// Total processes ever spawned.
526    pub fn spawned(&self) -> u64 {
527        self.inner.spawned.load(Ordering::Relaxed)
528    }
529
530    /// Total processes that have terminated (for any reason).
531    pub fn finished(&self) -> u64 {
532        self.inner.finished.load(Ordering::Relaxed)
533    }
534
535    /// Total user messages shed because a bounded mailbox was at capacity — always
536    /// 0 unless built with [`with_mailbox_capacity`](Runtime::with_mailbox_capacity).
537    /// A rising count is the signal that producers are outrunning a consumer.
538    pub fn dropped_messages(&self) -> u64 {
539        self.inner.dropped.load(Ordering::Relaxed)
540    }
541
542    pub fn is_alive(&self, pid: Pid) -> bool {
543        self.inner.table.contains_key(&pid.0)
544    }
545
546    /// A snapshot of every live process's pid — Erlang's `Process.list/0`. Walks
547    /// the sharded table without a global lock; a best-effort view (processes may
548    /// spawn/exit during the walk).
549    pub fn list(&self) -> Vec<Pid> {
550        self.inner
551            .table
552            .iter()
553            .map(|entry| Pid(*entry.key()))
554            .collect()
555    }
556
557    /// A [`ProcessInfo`] snapshot for `pid`, or `None` if it isn't live —
558    /// Erlang's `Process.info/1`. One table lookup; off the messaging hot path.
559    pub fn info(&self, pid: Pid) -> Option<ProcessInfo> {
560        self.inner.table.get(&pid.0).map(|entry| ProcessInfo {
561            pid,
562            links: entry.links.len(),
563            monitors: entry.monitors.len(),
564            names: entry.names.clone(),
565            label: entry.label.clone(),
566            mailbox_depth: entry
567                .depth
568                .as_ref()
569                .map_or(0, |d| d.load(Ordering::Relaxed)),
570            trap_exit: entry.trap_exit,
571        })
572    }
573
574    /// Attaches a human-readable `label` to `pid` for observability (like
575    /// Elixir's `Process.set_label/1`) — distinct from a registered name and
576    /// need not be unique. Returns `false` if `pid` isn't live. One allocation,
577    /// only when called; never touched on the send/receive path.
578    pub fn set_label(&self, pid: Pid, label: impl Into<String>) -> bool {
579        match self.inner.table.get_mut(&pid.0) {
580            Some(mut entry) => {
581                entry.label = Some(label.into());
582                // A newly-named (or relabeled) process is census activity.
583                if self.inner.wants(crate::LogLevel::Info) {
584                    self.inner.note_census();
585                }
586                true
587            }
588            None => false,
589        }
590    }
591
592    /// Set the **platform lifecycle log** level (spawn / exit / restart, coloured to
593    /// stderr — see [`lifecycle`](crate::lifecycle)). `Off` by default; a node sets it
594    /// explicitly (`rusm.toml [log] level = "debug"`). Set once at startup.
595    pub fn set_log_level(&self, level: crate::LogLevel) {
596        self.inner.log_level.store(level as u8, Ordering::Relaxed);
597        // Bring up the single census task the first time logging reaches `Info`+ (the
598        // level its per-component summary belongs to). It then parks on `census_dirty`
599        // at no cost, and quiets itself if the level later drops (no more notifies).
600        // Needs a Tokio runtime — skip cleanly if set outside one (e.g. a sync test).
601        if level >= crate::LogLevel::Info
602            && tokio::runtime::Handle::try_current().is_ok()
603            && !self.inner.census_started.swap(true, Ordering::AcqRel)
604        {
605            self.spawn_census_loop();
606        }
607    }
608
609    /// The per-component live-process **census**: every labeled process grouped by its
610    /// label. Unlabeled processes (internal plumbing — responders, writers) are omitted,
611    /// so the summary is about *components*. The single source of the count, shared by
612    /// the census task and tests.
613    pub(crate) fn census_counts(&self) -> BTreeMap<String, u64> {
614        let mut counts = BTreeMap::new();
615        for entry in self.inner.table.iter() {
616            if let Some(label) = &entry.label {
617                *counts.entry(label.clone()).or_insert(0) += 1;
618            }
619        }
620        counts
621    }
622
623    /// The **census of process groups**: each tag → how many live processes hold it (the
624    /// tag-side companion to [`census_counts`](Self::census_counts)). Emptied groups are
625    /// reaped on their last member's exit, so they never appear.
626    pub(crate) fn tag_counts(&self) -> BTreeMap<String, u64> {
627        self.inner
628            .tags
629            .iter()
630            .filter(|members| !members.is_empty())
631            .map(|members| (members.key().clone(), members.len() as u64))
632            .collect()
633    }
634
635    /// One census step: emit a line iff a labeled-process change has happened **since the
636    /// last emission** — tracked by the change generation, not by comparing counts, so a
637    /// real spawn+exit (gen advanced, counts net-same) still logs, while a quiet stretch
638    /// (gen unchanged) stays silent. `printed` is the caller's last-logged generation,
639    /// updated in place. Returns whether a line was emitted. Factored out of the task so
640    /// the decision is testable without the debounce timing.
641    fn census_step(&self, printed: &mut u64) -> bool {
642        let gen = self.inner.census_gen.load(Ordering::Relaxed);
643        if gen == *printed {
644            return false; // nothing happened since the last line — no duplicate
645        }
646        crate::lifecycle::log_census(&self.census_counts(), &self.tag_counts());
647        *printed = gen;
648        true
649    }
650
651    /// The debounced census task — one per runtime, started on the first `Info`+ enable.
652    /// It parks until a change, lets the burst settle for [`CENSUS_DEBOUNCE`], then takes
653    /// one [`census_step`](Self::census_step). Idle ⇒ parked (zero cost); a spawn storm ⇒
654    /// one line; a spawn+exit ⇒ one line (activity, even if counts net out). `printed` is
655    /// task-local — no shared state.
656    fn spawn_census_loop(&self) {
657        let runtime = self.clone();
658        tokio::spawn(async move {
659            let mut printed = 0u64;
660            loop {
661                runtime.inner.census_dirty.notified().await;
662                tokio::time::sleep(CENSUS_DEBOUNCE).await;
663                runtime.census_step(&mut printed);
664            }
665        });
666    }
667
668    /// Whether the configured level would log an event at `event` — a spawn site checks
669    /// this before building a label/detail it would otherwise not need (off path free).
670    pub fn wants_log(&self, event: crate::LogLevel) -> bool {
671        self.inner.wants(event)
672    }
673
674    /// Emit a platform `spawn` log line (`detail` = the process's effective
675    /// capabilities). The caller gates this with [`wants_log`](Self::wants_log)`(Debug)`
676    /// and separately [`set_label`](Self::set_label)s the process (so the later `exit`
677    /// line can name it even at levels below `Debug`).
678    ///
679    /// A **restart** needs no special event: it reads as the crashed instance's
680    /// abnormal `exit` line followed by a fresh `spawn` line for the same component —
681    /// with the crash reason and the new pid, which a bare "restart" couldn't carry.
682    pub fn log_spawn(&self, pid: Pid, label: &str, detail: &str) {
683        crate::lifecycle::log_spawn(pid, label, detail);
684    }
685
686    /// Stops `pid` at its next suspension point. Returns `false` if there is no
687    /// such live process. Equivalent to `exit(pid, ExitReason::Killed)`. Logs the
688    /// kill (yellow, at `Info`+) — the cause line ahead of the `exit` it triggers.
689    pub fn kill(&self, pid: Pid) -> bool {
690        if !self.terminate(pid) {
691            return false;
692        }
693        if self.inner.wants(crate::LogLevel::Info) {
694            crate::lifecycle::log_kill(pid);
695        }
696        true
697    }
698
699    /// Abort `pid` at its next suspension point **without logging** — the shared core
700    /// of [`kill`](Self::kill) and [`kill_tag`](Self::kill_tag), which log at their own
701    /// granularity (one `kill` line, or one `kill-tag` summary). `false` if not live.
702    fn terminate(&self, pid: Pid) -> bool {
703        match self.inner.table.get(&pid.0) {
704            Some(entry) => {
705                entry.abort.abort();
706                true
707            }
708            None => false,
709        }
710    }
711
712    /// Terminates `pid` with an explicit `reason` (Erlang's `exit/2`) — the
713    /// reason links and monitors will observe. Lets a process "crash" without a
714    /// Rust panic. Returns `false` if there is no such live process.
715    pub fn exit(&self, pid: Pid, reason: ExitReason) -> bool {
716        match self.inner.table.get_mut(&pid.0) {
717            Some(mut entry) => {
718                entry.exit_reason = Some(reason);
719                entry.abort.abort();
720                true
721            }
722            None => false,
723        }
724    }
725
726    /// Sets whether `pid` traps exits. A trapping process receives a linked
727    /// peer's exit as a [`Received::Exit`] message instead of dying with it — how
728    /// a supervisor survives its children. No-op if `pid` is not alive.
729    pub fn set_trap_exit(&self, pid: Pid, trap: bool) {
730        if let Some(mut entry) = self.inner.table.get_mut(&pid.0) {
731            entry.trap_exit = trap;
732        }
733    }
734
735    /// Bidirectionally links two live processes: when either exits abnormally the
736    /// other is taken down too (or, if it traps exits, gets a [`Received::Exit`]).
737    /// A no-op if either is already dead or they are the same process.
738    pub fn link(&self, a: Pid, b: Pid) {
739        if a == b {
740            return;
741        }
742        // Only link if both are live; record on each side. If one vanished
743        // between the checks, undo so we never leave a half-link dangling.
744        if self.add_link(a, b) {
745            if self.add_link(b, a) {
746                return;
747            }
748            self.remove_link(a, b);
749        }
750    }
751
752    /// Removes the link between `a` and `b` in both directions.
753    pub fn unlink(&self, a: Pid, b: Pid) {
754        self.remove_link(a, b);
755        self.remove_link(b, a);
756    }
757
758    fn add_link(&self, owner: Pid, peer: Pid) -> bool {
759        match self.inner.table.get_mut(&owner.0) {
760            Some(mut entry) => {
761                if !entry.links.contains(&peer) {
762                    entry.links.push(peer);
763                }
764                true
765            }
766            None => false,
767        }
768    }
769
770    fn remove_link(&self, owner: Pid, peer: Pid) {
771        if let Some(mut entry) = self.inner.table.get_mut(&owner.0) {
772            entry.links.retain(|&linked| linked != peer);
773        }
774    }
775
776    /// `watcher` starts monitoring `target`: when `target` exits, `watcher`
777    /// receives a [`Received::Down`] carrying the returned reference and the exit
778    /// reason. Monitoring is one-way and never propagates death. If `target` is
779    /// already gone, the `Down` (reason [`ExitReason::NoProc`]) is delivered at
780    /// once, like Erlang.
781    pub fn monitor(&self, watcher: Pid, target: Pid) -> MonitorRef {
782        let reference = MonitorRef(self.inner.next_ref.fetch_add(1, Ordering::Relaxed));
783        match self.inner.table.get_mut(&target.0) {
784            Some(mut entry) => entry.monitors.push(Monitor { watcher, reference }),
785            None => self.inner.deliver(
786                watcher,
787                Received::Down {
788                    reference,
789                    pid: target,
790                    reason: ExitReason::NoProc,
791                },
792            ),
793        }
794        reference
795    }
796
797    /// Registers `name` for `pid`, so it can be reached by name. Returns `false`
798    /// if the name is already taken or `pid` is not alive. A pid may hold several
799    /// names; a name maps to exactly one pid. Names are released automatically
800    /// when the process exits (or via [`unregister`](Runtime::unregister)).
801    pub fn register(&self, name: impl Into<String>, pid: Pid) -> bool {
802        let name = name.into();
803        // Hold the process entry first, then the registry slot — one consistent
804        // lock order, so register can never deadlock against teardown.
805        let Some(mut entry) = self.inner.table.get_mut(&pid.0) else {
806            return false;
807        };
808        match self.inner.registry.entry(name.clone()) {
809            Entry::Occupied(_) => false,
810            Entry::Vacant(slot) => {
811                slot.insert(pid.0);
812                entry.names.push(name);
813                true
814            }
815        }
816    }
817
818    /// Resolves a registered `name` to its (live) pid.
819    pub fn whereis(&self, name: &str) -> Option<Pid> {
820        self.inner.registry.get(name).map(|pid| Pid(*pid))
821    }
822
823    /// Returns the process registered under `name`, or spawns `body` and registers
824    /// it under `name` — race-free. If concurrent callers race, exactly one wins the
825    /// registration; the losers' just-spawned processes are killed and every caller
826    /// gets the winner (Erlang's "whereis-or-start" / a registered singleton). The
827    /// registration is the only synchronization point, so no lock is held across the
828    /// spawn — `body` should be a long-lived process (a service/handler), not a
829    /// one-shot. Use this to stand up a named singleton without a check-then-act race.
830    pub fn whereis_or_spawn<F, Fut>(&self, name: impl Into<String>, body: F) -> Pid
831    where
832        F: FnOnce(Context) -> Fut,
833        Fut: Future<Output = ()> + Send + 'static,
834    {
835        let name = name.into();
836        if let Some(pid) = self.whereis(&name) {
837            return pid;
838        }
839        // Optimistically spawn a candidate, then claim the name atomically.
840        let handle = self.spawn(body);
841        if self.register(name.clone(), handle.pid()) {
842            return handle.pid(); // we won — drop the handle; the registry keeps it live
843        }
844        // Lost the race: reap our candidate so it can't leak, and return the
845        // incumbent the winner registered.
846        handle.kill();
847        self.whereis(&name).unwrap_or(handle.pid())
848    }
849
850    /// Releases `name`. Returns `false` if it wasn't registered.
851    pub fn unregister(&self, name: &str) -> bool {
852        match self.inner.registry.remove(name) {
853            Some((_, pid)) => {
854                if let Some(mut entry) = self.inner.table.get_mut(&pid) {
855                    entry.names.retain(|held| held != name);
856                }
857                true
858            }
859            None => false,
860        }
861    }
862
863    /// Adds `pid` to the process group `tag` (Erlang's `pg`): one pid may hold many tags,
864    /// one tag many pids. Idempotent. Returns `false` only if `pid` is not alive. Tags are
865    /// released automatically on exit (like names) or via
866    /// [`unregister_tag`](Runtime::unregister_tag). This is the unprivileged half of the
867    /// mechanism — a process tags *itself*; terminating a group ([`kill_tag`]) is where
868    /// capability gating belongs, at the host ABI.
869    ///
870    /// [`kill_tag`]: Runtime::kill_tag
871    pub fn register_tag(&self, tag: impl Into<String>, pid: Pid) -> bool {
872        let tag = tag.into();
873        // Same lock order as `register` — process entry first, then the tag map — so the
874        // two can never deadlock against each other or against teardown.
875        let Some(mut entry) = self.inner.table.get_mut(&pid.0) else {
876            return false;
877        };
878        if self
879            .inner
880            .tags
881            .entry(tag.clone())
882            .or_default()
883            .insert(pid.0)
884        {
885            entry.tags.push(tag);
886            drop(entry); // release the table lock before waking the census task
887                         // A new group membership changes the tag census.
888            if self.inner.wants(crate::LogLevel::Info) {
889                self.inner.note_census();
890            }
891        }
892        true
893    }
894
895    /// Live members of process group `tag` (empty if unknown). The set holds only live
896    /// pids — [`deregister`](Inner::deregister) removes a process from its tags on exit.
897    pub fn whereis_tag(&self, tag: &str) -> Vec<Pid> {
898        self.inner.tags.get(tag).map_or_else(Vec::new, |members| {
899            members.iter().map(|&id| Pid(id)).collect()
900        })
901    }
902
903    /// Removes `pid` from process group `tag`. Returns `false` if it wasn't a member.
904    pub fn unregister_tag(&self, tag: &str, pid: Pid) -> bool {
905        // Drop the tag-map lock before touching the table (mirrors `unregister`), so the
906        // order here is never tag→table to clash with `register_tag`'s table→tag.
907        let removed = self
908            .inner
909            .tags
910            .get_mut(tag)
911            .is_some_and(|mut members| members.remove(&pid.0));
912        if removed {
913            if let Some(mut entry) = self.inner.table.get_mut(&pid.0) {
914                entry.tags.retain(|held| held != tag);
915            }
916            self.inner
917                .tags
918                .remove_if(tag, |_, members| members.is_empty());
919            // Leaving a group changes the tag census.
920            if self.inner.wants(crate::LogLevel::Info) {
921                self.inner.note_census();
922            }
923        }
924        removed
925    }
926
927    /// Terminates every live member of process group `tag`; returns how many were killed
928    /// (`0` for an unknown/empty tag). Each member deregisters on exit, emptying the group
929    /// — Erlang's "kill the whole `pg`". Members are snapshotted first, so one that dies
930    /// concurrently is simply not counted, never double-killed.
931    pub fn kill_tag(&self, tag: &str) -> usize {
932        // `terminate` (not `kill`) per member → one `kill-tag` summary line, not N `kill`
933        // lines (each member still logs its own `exit`).
934        let killed = self
935            .whereis_tag(tag)
936            .into_iter()
937            .filter(|&pid| self.terminate(pid))
938            .count();
939        if self.inner.wants(crate::LogLevel::Info) {
940            crate::lifecycle::log_kill_tag(tag, killed);
941        }
942        killed
943    }
944
945    /// Sends to a registered `name`. Returns `false` if the name is unknown (or
946    /// its process just died).
947    pub fn send_named(&self, name: &str, message: Message) -> bool {
948        match self.whereis(name) {
949            Some(pid) => self.send(pid, message),
950            None => false,
951        }
952    }
953
954    /// Delivers `message` to `pid` after `delay`, returning a handle that can
955    /// [`cancel`](TimerRef::cancel) it before it fires. Built on Tokio's timer
956    /// wheel — many pending timers cost little, and cancellation is a free abort.
957    pub fn send_after(&self, pid: Pid, delay: Duration, message: Message) -> TimerRef {
958        let runtime = self.clone();
959        let task = tokio::spawn(async move {
960            tokio::time::sleep(delay).await;
961            runtime.send(pid, message);
962        });
963        TimerRef {
964            abort: task.abort_handle(),
965        }
966    }
967
968    /// Stops every live process (each still runs its normal teardown — links and
969    /// monitors are notified, names released). Returns how many were signalled.
970    /// Teardown is asynchronous; poll [`process_count`](Runtime::process_count)
971    /// to wait for the drain.
972    pub fn shutdown(&self) -> usize {
973        // `abort()` only flips an atomic flag (it never touches the table), so it
974        // is safe — and allocation-free — to signal each process during iteration.
975        let mut stopped = 0;
976        for entry in self.inner.table.iter() {
977            entry.abort.abort();
978            stopped += 1;
979        }
980        stopped
981    }
982}
983
984/// A handle to a pending timer from [`send_after`](Runtime::send_after).
985pub struct TimerRef {
986    abort: tokio::task::AbortHandle,
987}
988
989impl TimerRef {
990    /// Cancels the timer if it hasn't fired yet; a no-op once it has.
991    pub fn cancel(&self) {
992        self.abort.abort();
993    }
994}
995
996/// Deregisters a process — and fans its exit out to links and monitors — on the
997/// **Drop** path, so it runs however the body ends: completion, panic, or kill.
998/// The guard lives inside the task (see [`Runtime::spawn_entry`]).
999struct ProcessGuard {
1000    pid: Pid,
1001    inner: Arc<Inner>,
1002    reason: ExitReason,
1003}
1004
1005impl Drop for ProcessGuard {
1006    fn drop(&mut self) {
1007        // A panic unwinding through the task means the body crashed; otherwise
1008        // `run` has set the reason (Normal on completion, Killed on abort).
1009        let reason = if std::thread::panicking() {
1010            ExitReason::Crashed
1011        } else {
1012            self.reason
1013        };
1014        self.inner.deregister(self.pid, reason);
1015    }
1016}
1017
1018async fn run<Fut>(mut guard: ProcessGuard, body: Abortable<Fut>)
1019where
1020    Fut: Future<Output = ()> + Send + 'static,
1021{
1022    // The guard lives in the task and deregisters the process on every exit path.
1023    // We only need to distinguish completion from a kill here; a panic is caught
1024    // by the guard via `std::thread::panicking()`. No select loop is needed — the
1025    // abort handle is the stop signal, and it drops the inner body future.
1026    guard.reason = match body.await {
1027        Ok(()) => ExitReason::Normal,
1028        Err(_aborted) => ExitReason::Killed,
1029    };
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use super::*;
1035
1036    #[test]
1037    fn wants_log_respects_the_configured_threshold() {
1038        let rt = Runtime::new();
1039        // Off by default: nothing is logged.
1040        assert!(!rt.wants_log(crate::LogLevel::Error));
1041        // At Warn: crashes (Error) and kills (Warn) log; clean exits / spawns don't.
1042        rt.set_log_level(crate::LogLevel::Warn);
1043        assert!(rt.wants_log(crate::LogLevel::Error));
1044        assert!(rt.wants_log(crate::LogLevel::Warn));
1045        assert!(!rt.wants_log(crate::LogLevel::Info));
1046        assert!(!rt.wants_log(crate::LogLevel::Debug));
1047        // At Debug: everything logs.
1048        rt.set_log_level(crate::LogLevel::Debug);
1049        assert!(rt.wants_log(crate::LogLevel::Debug));
1050    }
1051
1052    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1053    async fn census_counts_live_processes_by_label() {
1054        let rt = Runtime::new();
1055        // Four parked processes; label two "alpha", one "beta", leave one unlabeled.
1056        let mut procs: Vec<_> = (0..4)
1057            .map(|_| {
1058                rt.spawn(|mut ctx| async move {
1059                    loop {
1060                        ctx.recv().await;
1061                    }
1062                })
1063            })
1064            .collect();
1065        assert!(rt.set_label(procs[0].pid(), "alpha"));
1066        assert!(rt.set_label(procs[1].pid(), "alpha"));
1067        assert!(rt.set_label(procs[2].pid(), "beta"));
1068
1069        let counts = rt.census_counts();
1070        assert_eq!(counts.get("alpha"), Some(&2));
1071        assert_eq!(counts.get("beta"), Some(&1));
1072        assert_eq!(counts.len(), 2, "the unlabeled process is excluded");
1073
1074        // When a labeled process exits, the census reflects it.
1075        let victim = procs.remove(1); // an "alpha"
1076        victim.kill();
1077        victim.join().await;
1078        assert_eq!(
1079            rt.census_counts().get("alpha"),
1080            Some(&1),
1081            "a drained process drops out of the census"
1082        );
1083    }
1084
1085    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1086    async fn census_step_emits_on_activity_not_on_count_equality() {
1087        let rt = Runtime::new();
1088        // The change generation only advances at Info+ (the level the census lives at).
1089        rt.set_log_level(crate::LogLevel::Info);
1090        fn park(mut ctx: Context) -> impl std::future::Future<Output = ()> {
1091            async move {
1092                loop {
1093                    ctx.recv().await;
1094                }
1095            }
1096        }
1097        let mut printed = 0u64;
1098
1099        // A labeled spawn is activity → emits; nothing since → no duplicate.
1100        let a = rt.spawn(park);
1101        rt.set_label(a.pid(), "alpha");
1102        assert!(rt.census_step(&mut printed), "a labeled spawn emits");
1103        assert!(
1104            !rt.census_step(&mut printed),
1105            "nothing happened since → no duplicate line"
1106        );
1107
1108        // A spawn+exit nets back to the same counts ({alpha:1}) — but processes genuinely
1109        // changed, so it must STILL emit (the case a count-comparison dedup wrongly hid).
1110        let b = rt.spawn(park);
1111        rt.set_label(b.pid(), "beta");
1112        b.kill();
1113        b.join().await;
1114        assert_eq!(
1115            rt.census_counts().get("alpha"),
1116            Some(&1),
1117            "counts netted back to the pre-spawn picture"
1118        );
1119        assert!(
1120            rt.census_step(&mut printed),
1121            "a net-zero spawn+exit is real activity → emits"
1122        );
1123        assert!(!rt.census_step(&mut printed), "and then stays quiet");
1124    }
1125
1126    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1127    async fn census_tag_counts_reflect_group_membership() {
1128        let rt = Runtime::new();
1129        let procs: Vec<_> = (0..3)
1130            .map(|_| {
1131                rt.spawn(|mut ctx| async move {
1132                    loop {
1133                        ctx.recv().await;
1134                    }
1135                })
1136            })
1137            .collect();
1138        // Three processes in `plan:abc123`; one of them also in `plan:def456`.
1139        for p in &procs {
1140            assert!(rt.register_tag("plan:abc123", p.pid()));
1141        }
1142        assert!(rt.register_tag("plan:def456", procs[0].pid()));
1143
1144        let tags = rt.tag_counts();
1145        assert_eq!(tags.get("plan:abc123"), Some(&3));
1146        assert_eq!(tags.get("plan:def456"), Some(&1));
1147
1148        // kill_tag terminates the whole group; as each member drains it leaves both its
1149        // groups, so the emptied tags drop out of the census entirely.
1150        assert_eq!(rt.kill_tag("plan:abc123"), 3);
1151        for p in procs {
1152            p.join().await;
1153        }
1154        let tags = rt.tag_counts();
1155        assert_eq!(tags.get("plan:abc123"), None, "an emptied group drops out");
1156        assert_eq!(
1157            tags.get("plan:def456"),
1158            None,
1159            "its lone member died too, so it's gone as well"
1160        );
1161    }
1162
1163    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1164    async fn joining_or_leaving_a_group_is_census_activity() {
1165        let rt = Runtime::new();
1166        rt.set_log_level(crate::LogLevel::Info); // the census gen only advances at Info+
1167        let p = rt.spawn(|mut ctx| async move {
1168            loop {
1169                ctx.recv().await;
1170            }
1171        });
1172        let mut printed = 0u64;
1173        assert!(rt.register_tag("plan:x", p.pid()));
1174        assert!(
1175            rt.census_step(&mut printed),
1176            "joining a group is census activity → emits"
1177        );
1178        assert!(rt.unregister_tag("plan:x", p.pid()));
1179        assert!(
1180            rt.census_step(&mut printed),
1181            "leaving a group is census activity → emits"
1182        );
1183        assert!(!rt.census_step(&mut printed), "and then stays quiet");
1184    }
1185
1186    #[tokio::test]
1187    async fn a_process_receives_a_message_sent_to_its_pid() {
1188        let rt = Runtime::new();
1189        let (tx, rx) = tokio::sync::oneshot::channel();
1190        let handle = rt.spawn(|mut ctx| async move {
1191            let msg = ctx.recv().await.message().unwrap();
1192            let _ = tx.send(msg);
1193        });
1194        assert!(rt.send(handle.pid(), b"hello".to_vec()));
1195        assert_eq!(rx.await.unwrap(), b"hello".to_vec());
1196        handle.join().await;
1197    }
1198
1199    #[tokio::test]
1200    async fn messages_arrive_in_fifo_order() {
1201        let rt = Runtime::new();
1202        let (tx, rx) = tokio::sync::oneshot::channel();
1203        let handle = rt.spawn(|mut ctx| async move {
1204            let mut got = Vec::new();
1205            for _ in 0..3 {
1206                got.push(ctx.recv().await.message().unwrap());
1207            }
1208            let _ = tx.send(got);
1209        });
1210        for byte in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
1211            assert!(rt.send(handle.pid(), byte));
1212        }
1213        assert_eq!(
1214            rx.await.unwrap(),
1215            vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]
1216        );
1217        handle.join().await;
1218    }
1219
1220    #[tokio::test]
1221    async fn recv_match_takes_a_match_and_leaves_the_rest_in_order() {
1222        let rt = Runtime::new();
1223        let (tx, rx) = tokio::sync::oneshot::channel();
1224        let handle = rt.spawn(|mut ctx| async move {
1225            // Want the "B" message; "A" arrives first and must be left queued.
1226            let matched = ctx
1227                .recv_match(|m| matches!(m, Received::Message(b) if b.first() == Some(&b'B')))
1228                .await
1229                .message()
1230                .unwrap();
1231            let then = ctx.recv().await.message().unwrap(); // the deferred "A"
1232            let last = ctx.recv().await.message().unwrap(); // then "C"
1233            let _ = tx.send((matched, then, last));
1234        });
1235        for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
1236            assert!(rt.send(handle.pid(), m));
1237        }
1238        let (matched, then, last) = rx.await.unwrap();
1239        assert_eq!(matched, b"B".to_vec());
1240        assert_eq!(then, b"A".to_vec());
1241        assert_eq!(last, b"C".to_vec());
1242        handle.join().await;
1243    }
1244
1245    #[tokio::test]
1246    async fn recv_match_finds_a_previously_deferred_message() {
1247        let rt = Runtime::new();
1248        let (tx, rx) = tokio::sync::oneshot::channel();
1249        let handle = rt.spawn(|mut ctx| async move {
1250            // Match "C" first, deferring A and B; then selectively pull B out of
1251            // the save queue, leaving A for an ordinary recv.
1252            let is = |byte: u8| move |m: &Received| matches!(m, Received::Message(b) if b.first() == Some(&byte));
1253            let c = ctx.recv_match(is(b'C')).await.message().unwrap();
1254            let b = ctx.recv_match(is(b'B')).await.message().unwrap();
1255            let a = ctx.recv().await.message().unwrap();
1256            let _ = tx.send((a, b, c));
1257        });
1258        for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
1259            assert!(rt.send(handle.pid(), m));
1260        }
1261        let (a, b, c) = rx.await.unwrap();
1262        assert_eq!((a, b, c), (b"A".to_vec(), b"B".to_vec(), b"C".to_vec()));
1263        handle.join().await;
1264    }
1265
1266    #[tokio::test]
1267    async fn send_to_unknown_pid_returns_false() {
1268        let rt = Runtime::new();
1269        assert!(!rt.send(Pid(424242), b"hi".to_vec()));
1270    }
1271
1272    #[tokio::test]
1273    async fn send_to_a_finished_process_returns_false() {
1274        let rt = Runtime::new();
1275        let handle = rt.spawn(|_| async {});
1276        let pid = handle.pid();
1277        handle.join().await; // finished and reaped — mailbox is gone
1278        assert!(!rt.send(pid, b"too late".to_vec()));
1279    }
1280
1281    #[tokio::test]
1282    async fn tags_group_processes_and_kill_tag_terminates_only_that_group() {
1283        let rt = Runtime::new();
1284        let a = rt.spawn(|_| std::future::pending::<()>());
1285        let b = rt.spawn(|_| std::future::pending::<()>());
1286        let c = rt.spawn(|_| std::future::pending::<()>());
1287        assert!(rt.register_tag("plan:1", a.pid()));
1288        assert!(rt.register_tag("plan:1", b.pid()));
1289        assert!(rt.register_tag("plan:2", c.pid())); // a separate group
1290
1291        let mut g1 = rt.whereis_tag("plan:1");
1292        g1.sort_by_key(|p| p.0);
1293        let mut want = vec![a.pid(), b.pid()];
1294        want.sort_by_key(|p| p.0);
1295        assert_eq!(g1, want);
1296        assert_eq!(rt.whereis_tag("plan:2"), vec![c.pid()]);
1297        assert!(rt.whereis_tag("nope").is_empty());
1298
1299        assert_eq!(rt.kill_tag("plan:1"), 2);
1300        a.join().await;
1301        b.join().await;
1302        assert!(rt.whereis_tag("plan:1").is_empty()); // members reaped to empty
1303        assert!(rt.is_alive(c.pid())); // the other group is untouched
1304        assert_eq!(rt.whereis_tag("plan:2"), vec![c.pid()]);
1305        c.kill();
1306        c.join().await;
1307    }
1308
1309    #[tokio::test]
1310    async fn a_dead_process_leaves_its_tags_and_cannot_be_re_tagged() {
1311        let rt = Runtime::new();
1312        let a = rt.spawn(|_| std::future::pending::<()>());
1313        let pid = a.pid();
1314        assert!(rt.register_tag("g", pid));
1315        a.kill();
1316        a.join().await;
1317        assert!(rt.whereis_tag("g").is_empty()); // membership reaped on exit
1318        assert!(!rt.register_tag("g", pid)); // a dead pid can't be tagged
1319    }
1320
1321    #[tokio::test]
1322    async fn a_process_holds_multiple_tags_and_can_leave_one() {
1323        let rt = Runtime::new();
1324        let a = rt.spawn(|_| std::future::pending::<()>());
1325        assert!(rt.register_tag("x", a.pid()));
1326        assert!(rt.register_tag("y", a.pid()));
1327        assert!(rt.register_tag("x", a.pid())); // idempotent re-tag
1328        assert_eq!(rt.whereis_tag("x"), vec![a.pid()]);
1329
1330        assert!(rt.unregister_tag("x", a.pid()));
1331        assert!(rt.whereis_tag("x").is_empty()); // emptied group is dropped
1332        assert_eq!(rt.whereis_tag("y"), vec![a.pid()]); // the other tag is intact
1333        assert!(!rt.unregister_tag("x", a.pid())); // already gone
1334        a.kill();
1335        a.join().await;
1336    }
1337
1338    #[tokio::test]
1339    async fn kill_tag_of_an_unknown_group_is_zero() {
1340        let rt = Runtime::new();
1341        assert_eq!(rt.kill_tag("ghost"), 0);
1342    }
1343
1344    #[tokio::test]
1345    async fn killing_a_parked_receiver_stops_it_and_cleans_up() {
1346        // A process blocked in recv (no message will ever come) must still be
1347        // killable — abort wakes it at the suspension point and the guard reaps it.
1348        let rt = Runtime::new();
1349        let handle = rt.spawn(|mut ctx| async move {
1350            let _forever = ctx.recv().await;
1351        });
1352        let pid = handle.pid();
1353        assert!(rt.is_alive(pid));
1354        handle.kill();
1355        handle.join().await;
1356        assert!(!rt.is_alive(pid));
1357        assert_eq!(rt.finished(), 1);
1358    }
1359
1360    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1361    async fn two_processes_play_ping_pong() {
1362        // A message carries its sender's pid (its first 8 bytes), so the ponger
1363        // knows whom to reply to — the byte-level analogue of Erlang's
1364        // `send(peer, {self(), :ping})`.
1365        let rt = Runtime::new();
1366        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1367
1368        let ponger_rt = rt.clone();
1369        let ponger = rt.spawn(move |mut ctx| async move {
1370            let ball = ctx.recv().await.message().unwrap();
1371            let reply_to = Pid::from_raw(u64::from_le_bytes(ball[..8].try_into().unwrap()));
1372            ponger_rt.send(reply_to, b"pong".to_vec());
1373        });
1374        let ponger_pid = ponger.pid();
1375
1376        let pinger_rt = rt.clone();
1377        let pinger = rt.spawn(move |mut ctx| async move {
1378            let mut ball = ctx.pid().raw().to_le_bytes().to_vec();
1379            ball.extend_from_slice(b"ping");
1380            pinger_rt.send(ponger_pid, ball);
1381            let _ = done_tx.send(ctx.recv().await.message().unwrap());
1382        });
1383
1384        assert_eq!(done_rx.await.unwrap(), b"pong".to_vec());
1385        pinger.join().await;
1386        ponger.join().await;
1387    }
1388
1389    #[tokio::test]
1390    async fn a_process_runs_to_completion_and_is_cleaned_up() {
1391        let rt = Runtime::new();
1392        let handle = rt.spawn(|_| async {});
1393        let pid = handle.pid();
1394        handle.join().await;
1395        assert_eq!(rt.spawned(), 1);
1396        assert_eq!(rt.finished(), 1);
1397        assert_eq!(rt.process_count(), 0);
1398        assert!(!rt.is_alive(pid));
1399    }
1400
1401    #[tokio::test]
1402    async fn body_receives_its_own_pid() {
1403        let rt = Runtime::new();
1404        let (tx, rx) = tokio::sync::oneshot::channel();
1405        let handle = rt.spawn(move |ctx| async move {
1406            assert_eq!(
1407                format!("{ctx:?}"),
1408                format!("Context {{ pid: {:?} }}", ctx.pid())
1409            );
1410            let _ = tx.send(ctx.pid());
1411        });
1412        let pid = handle.pid();
1413        assert_eq!(rx.await.unwrap(), pid);
1414        handle.join().await;
1415    }
1416
1417    #[tokio::test]
1418    async fn pids_are_unique_and_increasing() {
1419        let rt = Runtime::new();
1420        let a = rt.spawn(|_| async {});
1421        let b = rt.spawn(|_| async {});
1422        assert_ne!(a.pid(), b.pid());
1423        assert!(b.pid().raw() > a.pid().raw());
1424        a.join().await;
1425        b.join().await;
1426    }
1427
1428    #[tokio::test]
1429    async fn kill_terminates_a_running_process() {
1430        let rt = Runtime::new();
1431        // A body that never completes on its own, so termination can only come
1432        // from the kill — `finished == 1` afterwards proves the kill worked.
1433        let handle = rt.spawn(|_| std::future::pending::<()>());
1434        let pid = handle.pid();
1435        assert!(rt.is_alive(pid));
1436        handle.kill();
1437        handle.join().await;
1438        assert!(!rt.is_alive(pid));
1439        assert_eq!(rt.process_count(), 0);
1440        assert_eq!(rt.finished(), 1);
1441    }
1442
1443    #[tokio::test]
1444    async fn runtime_kill_signals_a_live_process() {
1445        let rt = Runtime::new();
1446        let handle = rt.spawn(|_| std::future::pending::<()>());
1447        let pid = handle.pid();
1448        assert!(rt.kill(pid));
1449        handle.join().await;
1450        assert!(!rt.is_alive(pid));
1451    }
1452
1453    #[tokio::test]
1454    async fn kill_unknown_pid_returns_false() {
1455        let rt = Runtime::new();
1456        assert!(!rt.kill(Pid(999)));
1457    }
1458
1459    #[tokio::test]
1460    async fn a_panicking_body_is_still_cleaned_up() {
1461        let rt = Runtime::new();
1462        let handle = rt.spawn(|_| async { panic!("boom") });
1463        handle.join().await; // join swallows the JoinError
1464        assert_eq!(rt.process_count(), 0);
1465        assert_eq!(rt.finished(), 1);
1466    }
1467
1468    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1469    async fn spawns_many_processes_concurrently() {
1470        let rt = Runtime::new();
1471        let handles: Vec<_> = (0..1000).map(|_| rt.spawn(|_| async {})).collect();
1472        for handle in handles {
1473            handle.join().await;
1474        }
1475        assert_eq!(rt.spawned(), 1000);
1476        assert_eq!(rt.finished(), 1000);
1477        assert_eq!(rt.process_count(), 0);
1478    }
1479
1480    // --- Phase 3: links, monitors, supervision -------------------------------
1481
1482    /// A watcher process that forwards the first thing it receives to the test,
1483    /// then parks (staying alive so it can't race its own teardown). Returns its
1484    /// pid and the receiving end.
1485    fn watch(rt: &Runtime) -> (Pid, tokio::sync::oneshot::Receiver<Received>) {
1486        let (tx, rx) = tokio::sync::oneshot::channel();
1487        let pid = rt
1488            .spawn(move |mut ctx| async move {
1489                let item = ctx.recv().await;
1490                let _ = tx.send(item);
1491                std::future::pending::<()>().await;
1492            })
1493            .pid();
1494        (pid, rx)
1495    }
1496
1497    /// A process that parks until `go` fires, then ends the given way. The gate
1498    /// lets the test wire up links/monitors *before* the exit, with no sleeps.
1499    fn gated<F>(rt: &Runtime, ending: F) -> (Pid, tokio::sync::oneshot::Sender<()>)
1500    where
1501        F: FnOnce() + Send + 'static,
1502    {
1503        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1504        let pid = rt
1505            .spawn(move |_| async move {
1506                let _ = go_rx.await;
1507                ending();
1508            })
1509            .pid();
1510        (pid, go_tx)
1511    }
1512
1513    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1514    async fn monitor_reports_each_kind_of_exit() {
1515        let rt = Runtime::new();
1516
1517        // Normal completion.
1518        let (w1, d1) = watch(&rt);
1519        let (t1, go1) = gated(&rt, || {});
1520        let r1 = rt.monitor(w1, t1);
1521        let _ = go1.send(());
1522        assert_eq!(
1523            d1.await.unwrap(),
1524            Received::Down {
1525                reference: r1,
1526                pid: t1,
1527                reason: ExitReason::Normal
1528            }
1529        );
1530
1531        // Panic -> Crashed.
1532        let (w2, d2) = watch(&rt);
1533        let (t2, go2) = gated(&rt, || panic!("boom"));
1534        let r2 = rt.monitor(w2, t2);
1535        let _ = go2.send(());
1536        assert_eq!(
1537            d2.await.unwrap(),
1538            Received::Down {
1539                reference: r2,
1540                pid: t2,
1541                reason: ExitReason::Crashed
1542            }
1543        );
1544
1545        // Kill -> Killed.
1546        let (w3, d3) = watch(&rt);
1547        let t3 = rt.spawn(|_| std::future::pending::<()>()).pid();
1548        let r3 = rt.monitor(w3, t3);
1549        assert!(rt.kill(t3));
1550        assert_eq!(
1551            d3.await.unwrap(),
1552            Received::Down {
1553                reference: r3,
1554                pid: t3,
1555                reason: ExitReason::Killed
1556            }
1557        );
1558    }
1559
1560    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1561    async fn monitoring_a_dead_process_reports_noproc_at_once() {
1562        let rt = Runtime::new();
1563        let dead = rt.spawn(|_| async {});
1564        let dead_pid = dead.pid();
1565        dead.join().await;
1566
1567        let (watcher, down) = watch(&rt);
1568        let reference = rt.monitor(watcher, dead_pid);
1569        assert_eq!(
1570            down.await.unwrap(),
1571            Received::Down {
1572                reference,
1573                pid: dead_pid,
1574                reason: ExitReason::NoProc
1575            }
1576        );
1577    }
1578
1579    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1580    async fn an_abnormal_exit_cascades_down_links_with_its_reason() {
1581        let rt = Runtime::new();
1582        let peer = rt.spawn(|_| std::future::pending::<()>()).pid();
1583        let (crasher, go) = gated(&rt, || panic!("boom"));
1584        rt.link(peer, crasher);
1585
1586        // Watch the peer: it must go down too, carrying the *crash* reason, not
1587        // the bare Killed an abort would otherwise imply.
1588        let (watcher, down) = watch(&rt);
1589        let reference = rt.monitor(watcher, peer);
1590
1591        let _ = go.send(());
1592        assert_eq!(
1593            down.await.unwrap(),
1594            Received::Down {
1595                reference,
1596                pid: peer,
1597                reason: ExitReason::Crashed
1598            }
1599        );
1600        assert!(!rt.is_alive(peer));
1601    }
1602
1603    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1604    async fn a_normal_exit_does_not_cascade() {
1605        let rt = Runtime::new();
1606        let survivor = rt.spawn(|_| std::future::pending::<()>());
1607        let (quitter, go) = gated(&rt, || {});
1608        rt.link(survivor.pid(), quitter);
1609
1610        let _ = go.send(());
1611        // Drain the quitter to completion; its teardown (and any propagation) has
1612        // run by the time the table no longer lists it.
1613        while rt.is_alive(quitter) {
1614            tokio::task::yield_now().await;
1615        }
1616        assert!(
1617            rt.is_alive(survivor.pid()),
1618            "a normal exit must not kill links"
1619        );
1620        survivor.kill();
1621    }
1622
1623    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1624    async fn a_trapping_process_gets_an_exit_message_instead_of_dying() {
1625        let rt = Runtime::new();
1626        let (tx, rx) = tokio::sync::oneshot::channel();
1627        let trapper = rt.spawn(move |mut ctx| async move {
1628            let item = ctx.recv().await;
1629            let _ = tx.send(item);
1630            std::future::pending::<()>().await; // stay alive to prove we trapped
1631        });
1632        rt.set_trap_exit(trapper.pid(), true);
1633
1634        let (child, go) = gated(&rt, || panic!("boom"));
1635        rt.link(trapper.pid(), child);
1636        let _ = go.send(());
1637
1638        assert_eq!(
1639            rx.await.unwrap(),
1640            Received::Exit {
1641                from: child,
1642                reason: ExitReason::Crashed
1643            }
1644        );
1645        assert!(
1646            rt.is_alive(trapper.pid()),
1647            "a trapping process must survive"
1648        );
1649        trapper.kill();
1650    }
1651
1652    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1653    async fn spawn_link_links_the_child_to_its_parent() {
1654        let rt = Runtime::new();
1655        let (tx, rx) = tokio::sync::oneshot::channel();
1656        let parent = rt.spawn(move |mut ctx| async move {
1657            let item = ctx.recv().await;
1658            let _ = tx.send(item);
1659            std::future::pending::<()>().await;
1660        });
1661        rt.set_trap_exit(parent.pid(), true);
1662
1663        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1664        let child = rt
1665            .spawn_link(parent.pid(), move |_| async move {
1666                let _ = go_rx.await;
1667                panic!("boom");
1668            })
1669            .pid();
1670        let _ = go_tx.send(());
1671
1672        assert_eq!(
1673            rx.await.unwrap(),
1674            Received::Exit {
1675                from: child,
1676                reason: ExitReason::Crashed
1677            }
1678        );
1679        parent.kill();
1680    }
1681
1682    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1683    async fn unlinking_stops_propagation() {
1684        let rt = Runtime::new();
1685        let survivor = rt.spawn(|_| std::future::pending::<()>());
1686        let (crasher, go) = gated(&rt, || panic!("boom"));
1687        rt.link(survivor.pid(), crasher);
1688        rt.unlink(survivor.pid(), crasher);
1689
1690        let _ = go.send(());
1691        while rt.is_alive(crasher) {
1692            tokio::task::yield_now().await;
1693        }
1694        assert!(
1695            rt.is_alive(survivor.pid()),
1696            "an unlinked peer must not be taken down"
1697        );
1698        survivor.kill();
1699    }
1700
1701    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1702    async fn linking_a_dead_peer_leaves_no_half_link() {
1703        let rt = Runtime::new();
1704        let alive = rt.spawn(|_| std::future::pending::<()>());
1705        let dead = rt.spawn(|_| async {});
1706        let dead_pid = dead.pid();
1707        dead.join().await;
1708
1709        // The dead side can't be recorded; the half-link on `alive` is undone.
1710        rt.link(alive.pid(), dead_pid);
1711
1712        // Prove `alive`'s link set is intact: a fresh linked crasher still
1713        // cascades to it. (If the undo had corrupted the list this would hang.)
1714        let (crasher, go) = gated(&rt, || panic!("boom"));
1715        rt.link(alive.pid(), crasher);
1716        let _ = go.send(());
1717        while rt.is_alive(alive.pid()) {
1718            tokio::task::yield_now().await;
1719        }
1720        assert!(!rt.is_alive(crasher));
1721    }
1722
1723    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1724    async fn exit_terminates_with_the_chosen_reason() {
1725        let rt = Runtime::new();
1726        let (watcher, down) = watch(&rt);
1727        let target = rt.spawn(|_| std::future::pending::<()>()).pid();
1728        let reference = rt.monitor(watcher, target);
1729
1730        // exit/2 with a custom reason — no panic, yet observed as Crashed.
1731        assert!(rt.exit(target, ExitReason::Crashed));
1732        assert_eq!(
1733            down.await.unwrap(),
1734            Received::Down {
1735                reference,
1736                pid: target,
1737                reason: ExitReason::Crashed
1738            }
1739        );
1740        assert!(!rt.exit(Pid::from_raw(987_654), ExitReason::Normal)); // unknown pid
1741    }
1742
1743    #[tokio::test]
1744    async fn link_and_trap_on_missing_processes_are_no_ops() {
1745        let rt = Runtime::new();
1746        let p = rt.spawn(|_| std::future::pending::<()>());
1747        let dead = Pid::from_raw(999_999);
1748        rt.link(p.pid(), p.pid()); // self-link: ignored
1749        rt.link(dead, p.pid()); // dead first arg: nothing recorded
1750        rt.link(p.pid(), dead); // dead second arg: half-link undone
1751        rt.unlink(dead, p.pid()); // unlink with a dead owner: no-op
1752        rt.set_trap_exit(dead, true); // dead pid: no-op, no panic
1753        assert!(rt.is_alive(p.pid()));
1754        p.kill();
1755    }
1756
1757    // --- Phase 4: registry, timers, shutdown ---------------------------------
1758
1759    #[tokio::test]
1760    async fn register_whereis_send_named_then_auto_release_on_exit() {
1761        let rt = Runtime::new();
1762        let (tx, rx) = tokio::sync::oneshot::channel();
1763        let worker = rt.spawn(move |mut ctx| async move {
1764            let job = ctx.recv().await.message().unwrap();
1765            let _ = tx.send(job);
1766        });
1767        assert!(rt.register("worker", worker.pid()));
1768        assert_eq!(rt.whereis("worker"), Some(worker.pid()));
1769        assert!(!rt.register("worker", worker.pid())); // already taken
1770        assert!(rt.send_named("worker", b"job".to_vec()));
1771        assert_eq!(rx.await.unwrap(), b"job".to_vec());
1772
1773        worker.join().await; // exiting auto-releases the name
1774        assert_eq!(rt.whereis("worker"), None);
1775        assert!(!rt.send_named("worker", b"late".to_vec()));
1776    }
1777
1778    #[tokio::test]
1779    async fn names_are_released_by_unregister_and_reusable_after_death() {
1780        let rt = Runtime::new();
1781        let a = rt.spawn(|_| std::future::pending::<()>());
1782        assert!(rt.register("svc", a.pid()));
1783        assert!(rt.unregister("svc"));
1784        assert_eq!(rt.whereis("svc"), None);
1785        assert!(!rt.unregister("svc")); // already gone
1786
1787        assert!(rt.register("svc", a.pid()));
1788        a.kill();
1789        a.join().await;
1790        assert_eq!(rt.whereis("svc"), None);
1791        let b = rt.spawn(|_| std::future::pending::<()>());
1792        assert!(rt.register("svc", b.pid())); // a dead process's name is reusable
1793        b.kill();
1794    }
1795
1796    #[tokio::test]
1797    async fn register_to_a_dead_pid_fails_and_a_pid_can_hold_several_names() {
1798        let rt = Runtime::new();
1799        let dead = rt.spawn(|_| async {});
1800        let dead_pid = dead.pid();
1801        dead.join().await;
1802        assert!(!rt.register("ghost", dead_pid));
1803
1804        let p = rt.spawn(|_| std::future::pending::<()>());
1805        assert!(rt.register("one", p.pid()));
1806        assert!(rt.register("two", p.pid()));
1807        assert_eq!(rt.whereis("one"), Some(p.pid()));
1808        assert_eq!(rt.whereis("two"), Some(p.pid()));
1809        p.kill();
1810        p.join().await;
1811        assert_eq!(rt.whereis("one"), None); // all of a pid's names go on exit
1812        assert_eq!(rt.whereis("two"), None);
1813    }
1814
1815    #[tokio::test]
1816    async fn whereis_or_spawn_returns_the_incumbent_without_spawning() {
1817        let rt = Runtime::new();
1818        let first = rt.whereis_or_spawn("svc", |_| std::future::pending::<()>());
1819        // A second call finds the registered process and does NOT start a new one.
1820        let before = rt.spawned();
1821        let again = rt.whereis_or_spawn("svc", |_| std::future::pending::<()>());
1822        assert_eq!(again, first, "must return the already-registered pid");
1823        assert_eq!(
1824            rt.spawned(),
1825            before,
1826            "must not spawn when the name is taken"
1827        );
1828        assert_eq!(rt.whereis("svc"), Some(first));
1829        rt.kill(first);
1830    }
1831
1832    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1833    async fn whereis_or_spawn_is_race_free_and_kills_the_loser() {
1834        let rt = Runtime::new();
1835        // Many threads race to get-or-spawn the same name concurrently.
1836        let mut tasks = Vec::new();
1837        for _ in 0..32 {
1838            let rt = rt.clone();
1839            tasks.push(tokio::spawn(async move {
1840                rt.whereis_or_spawn("singleton", |_| std::future::pending::<()>())
1841            }));
1842        }
1843        let mut pids = Vec::new();
1844        for t in tasks {
1845            pids.push(t.await.unwrap());
1846        }
1847        // Exactly one pid wins; everyone sees it.
1848        let winner = rt.whereis("singleton").expect("a winner is registered");
1849        assert!(
1850            pids.iter().all(|&p| p == winner),
1851            "all callers see the winner"
1852        );
1853        // Every loser it spawned was killed — only the winner remains live.
1854        loop {
1855            if rt.process_count() == 1 {
1856                break;
1857            }
1858            tokio::task::yield_now().await;
1859        }
1860        assert_eq!(rt.whereis("singleton"), Some(winner));
1861        rt.kill(winner);
1862    }
1863
1864    #[tokio::test(start_paused = true)]
1865    async fn send_after_delivers_when_the_timer_fires() {
1866        let rt = Runtime::new();
1867        let (tx, rx) = tokio::sync::oneshot::channel();
1868        let target = rt.spawn(move |mut ctx| async move {
1869            let msg = ctx.recv().await.message().unwrap();
1870            let _ = tx.send(msg);
1871        });
1872        rt.send_after(target.pid(), Duration::from_secs(60), b"ding".to_vec());
1873        // Paused time auto-advances to the timer once everything else is idle.
1874        assert_eq!(rx.await.unwrap(), b"ding".to_vec());
1875    }
1876
1877    #[tokio::test(start_paused = true)]
1878    async fn a_cancelled_timer_never_fires() {
1879        let rt = Runtime::new();
1880        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1881        let target = rt.spawn(move |mut ctx| async move {
1882            loop {
1883                let _ = tx.send(ctx.recv().await);
1884            }
1885        });
1886        let timer = rt.send_after(target.pid(), Duration::from_secs(60), b"x".to_vec());
1887        timer.cancel();
1888        tokio::time::advance(Duration::from_secs(120)).await;
1889        tokio::task::yield_now().await; // let any (erroneous) delivery land before we check
1890        assert!(
1891            rx.try_recv().is_err(),
1892            "a cancelled timer must deliver nothing"
1893        );
1894        target.kill();
1895    }
1896
1897    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1898    async fn shutdown_stops_every_process() {
1899        let rt = Runtime::new();
1900        let procs: Vec<_> = (0..5)
1901            .map(|_| rt.spawn(|_| std::future::pending::<()>()))
1902            .collect();
1903        assert_eq!(rt.process_count(), 5);
1904        assert_eq!(rt.shutdown(), 5);
1905        for p in procs {
1906            p.join().await;
1907        }
1908        assert_eq!(rt.process_count(), 0);
1909        assert_eq!(rt.shutdown(), 0); // nothing left to stop
1910    }
1911
1912    // --- Phase 7: introspection & labels -------------------------------------
1913
1914    #[tokio::test]
1915    async fn list_reflects_live_processes() {
1916        use std::collections::HashSet;
1917        let rt = Runtime::new();
1918        assert!(rt.list().is_empty());
1919        let a = rt.spawn(|_| std::future::pending::<()>());
1920        let b = rt.spawn(|_| std::future::pending::<()>());
1921        let live: HashSet<u64> = rt.list().iter().map(|p| p.raw()).collect();
1922        assert_eq!(live, HashSet::from([a.pid().raw(), b.pid().raw()]));
1923        a.kill();
1924        a.join().await;
1925        assert_eq!(rt.list(), vec![b.pid()]);
1926        b.kill();
1927    }
1928
1929    #[tokio::test]
1930    async fn info_reports_links_names_label_and_trap() {
1931        let rt = Runtime::new();
1932        let p = rt.spawn(|_| std::future::pending::<()>());
1933        let peer = rt.spawn(|_| std::future::pending::<()>());
1934        rt.link(p.pid(), peer.pid());
1935        assert!(rt.register("svc", p.pid()));
1936        rt.set_trap_exit(p.pid(), true);
1937        assert!(rt.set_label(p.pid(), "worker #1"));
1938
1939        let info = rt.info(p.pid()).unwrap();
1940        assert_eq!(info.pid, p.pid());
1941        assert_eq!(info.links, 1);
1942        assert_eq!(info.monitors, 0);
1943        assert_eq!(info.names, vec!["svc".to_string()]);
1944        assert_eq!(info.label.as_deref(), Some("worker #1"));
1945        assert!(info.trap_exit);
1946        assert_eq!(info.mailbox_depth, 0);
1947        p.kill();
1948        peer.kill();
1949    }
1950
1951    #[tokio::test]
1952    async fn info_and_set_label_on_a_dead_pid() {
1953        let rt = Runtime::new();
1954        let d = rt.spawn(|_| async {});
1955        let pid = d.pid();
1956        d.join().await;
1957        assert!(rt.info(pid).is_none());
1958        assert!(!rt.set_label(pid, "ghost"));
1959    }
1960
1961    #[tokio::test]
1962    async fn mailbox_depth_tracks_unconsumed_messages() {
1963        let rt = Runtime::with_mailbox_depth();
1964        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
1965        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
1966        let p = rt.spawn(move |mut ctx| async move {
1967            let _ = go_rx.await; // hold off consuming until the test has filled the box
1968            for _ in 0..3 {
1969                ctx.recv().await;
1970            }
1971            let _ = done_tx.send(());
1972            std::future::pending::<()>().await;
1973        });
1974        // `send` increments depth synchronously, so this is race-free even though
1975        // the process hasn't been polled past its gate yet.
1976        for m in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
1977            assert!(rt.send(p.pid(), m));
1978        }
1979        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
1980
1981        let _ = go_tx.send(());
1982        let _ = done_rx.await; // all three consumed by now
1983        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 0);
1984        p.kill();
1985    }
1986
1987    #[tokio::test]
1988    async fn a_bounded_mailbox_sheds_user_messages_past_capacity() {
1989        let rt = Runtime::with_mailbox_capacity(3);
1990        // A process that holds its mailbox open but never consumes it.
1991        let p = rt.spawn(|ctx| async move {
1992            let _hold = ctx; // keep the receiver alive (don't drain it)
1993            std::future::pending::<()>().await;
1994        });
1995        // `send` is synchronous, so this is race-free: the first three land, the
1996        // rest are shed once depth hits the capacity.
1997        for i in 0..10u8 {
1998            rt.send(p.pid(), vec![i]);
1999        }
2000        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
2001        assert_eq!(rt.dropped_messages(), 7);
2002        p.kill();
2003    }
2004
2005    #[tokio::test]
2006    async fn a_full_mailbox_still_accepts_system_signals() {
2007        let rt = Runtime::with_mailbox_capacity(2);
2008        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2009        let (report_tx, report_rx) = tokio::sync::oneshot::channel();
2010        let watcher = rt.spawn(move |mut ctx| async move {
2011            let _ = go_rx.await; // stay parked until the test has filled the box
2012            let mut got = Vec::new();
2013            for _ in 0..3 {
2014                got.push(ctx.recv().await);
2015            }
2016            let _ = report_tx.send(got);
2017        });
2018        let wpid = watcher.pid();
2019
2020        // A target the watcher monitors; killing it produces a Down for the watcher.
2021        let target = rt.spawn(|ctx| async move {
2022            let _hold = ctx;
2023            std::future::pending::<()>().await;
2024        });
2025        let tpid = target.pid();
2026        rt.monitor(wpid, tpid);
2027
2028        // Fill the mailbox to capacity, then over it — the third is shed.
2029        assert!(rt.send(wpid, b"a".to_vec()));
2030        assert!(rt.send(wpid, b"b".to_vec()));
2031        assert!(!rt.send(wpid, b"c".to_vec()));
2032        assert_eq!(rt.dropped_messages(), 1);
2033
2034        // Killing the target delivers a Down — a *system* signal, so it must land
2035        // even though the mailbox is at capacity. Depth rising to 3 proves it.
2036        rt.kill(tpid);
2037        for _ in 0..500 {
2038            if rt.info(wpid).map(|i| i.mailbox_depth) == Some(3) {
2039                break;
2040            }
2041            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
2042        }
2043        assert_eq!(
2044            rt.info(wpid).unwrap().mailbox_depth,
2045            3,
2046            "the Down landed despite the full mailbox"
2047        );
2048
2049        // Released, the watcher sees both user messages and the Down (never "c").
2050        let _ = go_tx.send(());
2051        let got = tokio::time::timeout(std::time::Duration::from_secs(5), report_rx)
2052            .await
2053            .expect("watcher never reported")
2054            .unwrap();
2055        let users = got
2056            .iter()
2057            .filter(|r| matches!(r, Received::Message(_)))
2058            .count();
2059        let downs = got
2060            .iter()
2061            .filter(|r| matches!(r, Received::Down { .. }))
2062            .count();
2063        assert_eq!(users, 2, "both queued user messages survive");
2064        assert_eq!(downs, 1, "the system Down was delivered, not shed");
2065    }
2066
2067    #[tokio::test]
2068    async fn mailbox_depth_counts_messages_deferred_by_selective_receive() {
2069        let rt = Runtime::with_mailbox_depth();
2070        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2071        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
2072        let p = rt.spawn(move |mut ctx| async move {
2073            let _ = go_rx.await;
2074            // Consume only "B"; "A" and "C" stay deferred — still unconsumed.
2075            let _ = ctx
2076                .recv_match(|m| matches!(m, Received::Message(b) if b.first() == Some(&b'B')))
2077                .await;
2078            let _ = done_tx.send(());
2079            std::future::pending::<()>().await;
2080        });
2081        for m in [b"A".to_vec(), b"B".to_vec(), b"C".to_vec()] {
2082            assert!(rt.send(p.pid(), m));
2083        }
2084        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 3);
2085
2086        let _ = go_tx.send(());
2087        let _ = done_rx.await;
2088        // One consumed (B); A and C remain deferred but counted unconsumed.
2089        while rt.info(p.pid()).map_or(false, |i| i.mailbox_depth != 2) {
2090            tokio::task::yield_now().await;
2091        }
2092        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 2);
2093        p.kill();
2094    }
2095
2096    // --- Phase 7: stream-carrying messages -----------------------------------
2097
2098    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2099    async fn a_stream_is_delivered_and_read_in_order_after_a_message() {
2100        use crate::stream::stream;
2101        let rt = Runtime::new();
2102        let (out_tx, out_rx) = tokio::sync::oneshot::channel();
2103        let p = rt.spawn(move |mut ctx| async move {
2104            // A normal message, then a stream — FIFO across both kinds.
2105            let first = ctx.recv().await.message();
2106            let mut handle = ctx.recv().await.stream().expect("a stream");
2107            let mut chunks = Vec::new();
2108            while let Some(chunk) = handle.read().await {
2109                chunks.push(chunk);
2110            }
2111            let _ = out_tx.send((first, chunks));
2112        });
2113
2114        assert!(rt.send(p.pid(), b"hello".to_vec()));
2115        let (writer, handle) = stream();
2116        assert!(rt.send_stream(p.pid(), handle));
2117        tokio::spawn(async move {
2118            for chunk in [b"a".to_vec(), b"b".to_vec(), b"c".to_vec()] {
2119                writer.write(chunk).await.unwrap();
2120            }
2121            // Dropping the writer here closes the stream so the reader sees the end.
2122        });
2123
2124        let (first, chunks) = out_rx.await.unwrap();
2125        assert_eq!(first, Some(b"hello".to_vec()));
2126        assert_eq!(chunks, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
2127        p.join().await;
2128    }
2129
2130    #[tokio::test]
2131    async fn send_stream_to_a_dead_pid_returns_false() {
2132        use crate::stream::stream;
2133        let rt = Runtime::new();
2134        let (_writer, handle) = stream();
2135        assert!(!rt.send_stream(Pid::from_raw(123_456), handle));
2136    }
2137
2138    #[tokio::test]
2139    async fn a_stream_counts_toward_mailbox_depth_until_consumed() {
2140        use crate::stream::stream;
2141        let rt = Runtime::with_mailbox_depth();
2142        let (go_tx, go_rx) = tokio::sync::oneshot::channel::<()>();
2143        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
2144        let p = rt.spawn(move |mut ctx| async move {
2145            let _ = go_rx.await;
2146            let _ = ctx.recv().await; // consume the stream message
2147            let _ = done_tx.send(());
2148            std::future::pending::<()>().await;
2149        });
2150        let (_writer, handle) = stream();
2151        assert!(rt.send_stream(p.pid(), handle));
2152        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 1);
2153        let _ = go_tx.send(());
2154        let _ = done_rx.await;
2155        assert_eq!(rt.info(p.pid()).unwrap().mailbox_depth, 0);
2156        p.kill();
2157    }
2158}