Skip to main content

asupersync/
gen_server.rs

1//! GenServer: typed request-response and actor-adjacent message loop.
2//!
3//! GenServer extends the actor model with three message types:
4//!
5//! - **Call**: synchronous request-response. The caller blocks until the server
6//!   replies. A reply obligation is created: the server *must* reply or the
7//!   obligation is detected as leaked.
8//! - **Cast**: asynchronous fire-and-forget. The sender does not wait for a reply.
9//! - **Info**: system/out-of-band notifications (Down/Exit/Timeout), delivered
10//!   via [`GenServer::handle_info`].
11//!
12//! GenServers are region-owned, cancel-safe, and deterministic under the lab
13//! runtime. They build on the same two-phase mailbox and supervision infrastructure
14//! as plain actors.
15//!
16//! # Example
17//!
18//! ```ignore
19//! struct Counter {
20//!     count: u64,
21//! }
22//!
23//! enum Request {
24//!     Get,
25//!     Add(u64),
26//! }
27//!
28//! enum Cast {
29//!     Reset,
30//! }
31//!
32//! impl GenServer for Counter {
33//!     type Call = Request;
34//!     type Reply = u64;
35//!     type Cast = Cast;
36//!
37//!     fn handle_call(&mut self, _cx: &Cx, msg: Request, reply: Reply<u64>)
38//!         -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
39//!     {
40//!         match msg {
41//!             Request::Get => { let _ = reply.send(self.count); }
42//!             Request::Add(n) => { self.count += n; let _ = reply.send(self.count); }
43//!         }
44//!         Box::pin(async {})
45//!     }
46//!
47//!     fn handle_cast(&mut self, _cx: &Cx, msg: Cast)
48//!         -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
49//!     {
50//!         match msg {
51//!             Cast::Reset => { self.count = 0; }
52//!         }
53//!         Box::pin(async {})
54//!     }
55//! }
56//! ```
57
58use std::future::Future;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU8, Ordering};
62
63use crate::actor::{ActorId, ActorState};
64use crate::channel::mpsc;
65use crate::channel::oneshot;
66use crate::channel::session::{self, TrackedOneshotPermit};
67use crate::cx::Cx;
68use crate::monitor::{DownNotification, DownReason};
69use crate::obligation::graded::{AbortedProof, CommittedProof, SendPermit};
70use crate::runtime::{JoinError, SpawnError};
71use crate::types::{Budget, CancelReason, CxInner, Outcome, TaskId, Time};
72
73// ============================================================================
74// Lifecycle helpers (init/terminate budgets + masking) (bd-3ejoi)
75// ============================================================================
76
77/// Temporarily tightens the current task budget for an async phase.
78///
79/// Budgets in `CxInner` represent remaining budget; to avoid "refunding" budget,
80/// we restore the original budget minus any consumption that occurred while the
81/// phase budget was active.
82struct PhaseBudgetGuard {
83    inner: Arc<parking_lot::RwLock<CxInner>>,
84    original_budget: Budget,
85    original_baseline: Budget,
86    phase_baseline: Budget,
87    restore_original: bool,
88}
89
90impl PhaseBudgetGuard {
91    fn enter(cx: &Cx, phase_budget: Budget, restore_original: bool) -> Self {
92        let inner = Arc::clone(&cx.inner);
93        let (original_budget, original_baseline, phase_baseline) = {
94            let mut guard = inner.write();
95            let original_budget = guard.budget;
96            let original_baseline = guard.budget_baseline;
97            let mut phase_baseline = original_budget.meet(phase_budget);
98            phase_baseline.priority = original_budget.priority.max(phase_budget.priority);
99            guard.budget = phase_baseline;
100            guard.budget_baseline = phase_baseline;
101            drop(guard);
102            (original_budget, original_baseline, phase_baseline)
103        };
104        Self {
105            inner,
106            original_budget,
107            original_baseline,
108            phase_baseline,
109            restore_original,
110        }
111    }
112}
113
114impl Drop for PhaseBudgetGuard {
115    fn drop(&mut self) {
116        if !self.restore_original {
117            return;
118        }
119
120        let mut guard = self.inner.write();
121
122        let phase_remaining = guard.budget;
123        let polls_used = self
124            .phase_baseline
125            .poll_quota
126            .saturating_sub(phase_remaining.poll_quota);
127
128        let cost_used = match (self.phase_baseline.cost_quota, phase_remaining.cost_quota) {
129            (Some(base), Some(rem)) => base.saturating_sub(rem),
130            _ => 0,
131        };
132
133        let restored_cost_quota = self
134            .original_budget
135            .cost_quota
136            .map(|orig| orig.saturating_sub(cost_used));
137
138        guard.budget = Budget {
139            deadline: self.original_budget.deadline,
140            poll_quota: self.original_budget.poll_quota.saturating_sub(polls_used),
141            cost_quota: restored_cost_quota,
142            priority: self.original_budget.priority,
143        };
144        guard.budget_baseline = self.original_baseline;
145    }
146}
147
148/// Async-friendly cancellation mask guard.
149///
150/// `Cx::masked(..)` is synchronous-only; GenServer lifecycle hooks are async.
151struct AsyncMaskGuard {
152    inner: Arc<parking_lot::RwLock<CxInner>>,
153}
154
155impl AsyncMaskGuard {
156    fn enter(cx: &Cx) -> Self {
157        let inner = Arc::clone(&cx.inner);
158        {
159            let mut guard = inner.write();
160            assert!(
161                guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
162                "mask depth exceeded MAX_MASK_DEPTH ({}) in AsyncMaskGuard::enter: \
163                 this violates INV-MASK-BOUNDED and prevents cancellation from ever \
164                 being observed. Reduce nesting of masked sections.",
165                crate::types::task_context::MAX_MASK_DEPTH
166            );
167            guard.mask_depth += 1;
168        }
169        Self { inner }
170    }
171}
172
173impl Drop for AsyncMaskGuard {
174    fn drop(&mut self) {
175        let mut guard = self.inner.write();
176        guard.mask_depth = guard.mask_depth.saturating_sub(1);
177    }
178}
179
180// ============================================================================
181// Cast overflow policy
182// ============================================================================
183
184/// Policy for handling cast sends when the GenServer mailbox is full.
185///
186/// When a bounded mailbox reaches capacity, the overflow policy determines
187/// what happens to new cast messages. Lossy policies (`DropOldest`) are
188/// trace-visible: every dropped message emits a trace event.
189///
190/// # Default
191///
192/// The default policy is `Reject`, which returns `CastError::Full` to the
193/// sender. This is the safest option and forces callers to handle backpressure
194/// explicitly.
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
196pub enum CastOverflowPolicy {
197    /// Reject the new cast when the mailbox is full.
198    ///
199    /// The sender receives `CastError::Full` and can decide what to do
200    /// (retry, drop, log, etc.). No messages are lost silently.
201    #[default]
202    Reject,
203
204    /// Drop the oldest queued cast to make room for the new cast.
205    ///
206    /// The dropped message is traced for observability. This is useful for
207    /// "latest-value-wins" patterns (e.g., sensor readings, UI state updates)
208    /// where stale casts are less valuable than fresh data. Calls and info
209    /// messages are never evicted by cast backpressure.
210    DropOldest,
211}
212
213impl std::fmt::Display for CastOverflowPolicy {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Self::Reject => write!(f, "Reject"),
217            Self::DropOldest => write!(f, "DropOldest"),
218        }
219    }
220}
221
222// ============================================================================
223// System messages (bd-188ey)
224// ============================================================================
225
226/// Typed system messages delivered to a GenServer via [`GenServer::handle_info`].
227///
228/// These messages are intended to model OTP-style "out-of-band" notifications
229/// (Down/Exit/Timeout) in a cancel-correct, deterministic way.
230#[derive(Debug, Clone)]
231pub struct DownMsg {
232    /// Virtual time at which the monitored task completed (for deterministic ordering).
233    pub completion_vt: Time,
234    /// The monitor notification payload.
235    pub notification: DownNotification,
236}
237
238impl DownMsg {
239    /// Create a new down system message payload.
240    #[must_use]
241    pub const fn new(completion_vt: Time, notification: DownNotification) -> Self {
242        Self {
243            completion_vt,
244            notification,
245        }
246    }
247}
248
249/// Payload for an OTP-style `Exit` system message.
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ExitMsg {
252    /// Virtual time at which the exit was observed/emitted.
253    pub exit_vt: Time,
254    /// The task that triggered the exit.
255    pub from: TaskId,
256    /// Why it exited.
257    pub reason: DownReason,
258}
259
260impl ExitMsg {
261    /// Create a new exit system message payload.
262    #[must_use]
263    pub const fn new(exit_vt: Time, from: TaskId, reason: DownReason) -> Self {
264        Self {
265            exit_vt,
266            from,
267            reason,
268        }
269    }
270}
271
272/// Payload for a deterministic timeout tick system message.
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub struct TimeoutMsg {
275    /// Virtual time of the tick.
276    pub tick_vt: Time,
277    /// Timeout identifier (user-defined semantics).
278    pub id: u64,
279}
280
281impl TimeoutMsg {
282    /// Create a new timeout system message payload.
283    #[must_use]
284    pub const fn new(tick_vt: Time, id: u64) -> Self {
285        Self { tick_vt, id }
286    }
287}
288
289/// Typed system messages delivered to a GenServer via [`GenServer::handle_info`].
290#[derive(Debug, Clone)]
291pub enum SystemMsg {
292    /// OTP-style `Down` notification (monitor fired).
293    Down {
294        /// Virtual time at which the monitored task completed (for deterministic ordering).
295        completion_vt: Time,
296        /// The notification payload.
297        notification: DownNotification,
298    },
299
300    /// OTP-style exit signal (link propagation).
301    Exit {
302        /// Virtual time at which the exit was observed/emitted.
303        exit_vt: Time,
304        /// The task that triggered the exit.
305        from: TaskId,
306        /// Why it exited.
307        reason: DownReason,
308    },
309
310    /// A deterministic timeout tick.
311    Timeout {
312        /// Virtual time of the tick.
313        tick_vt: Time,
314        /// Timeout identifier (user-defined semantics).
315        id: u64,
316    },
317}
318
319impl SystemMsg {
320    /// Construct a down-system message.
321    #[must_use]
322    pub fn down(msg: DownMsg) -> Self {
323        msg.into()
324    }
325
326    /// Construct an exit-system message.
327    #[must_use]
328    pub fn exit(msg: ExitMsg) -> Self {
329        msg.into()
330    }
331
332    /// Construct a timeout-system message.
333    #[must_use]
334    pub fn timeout(msg: TimeoutMsg) -> Self {
335        msg.into()
336    }
337
338    const fn vt(&self) -> Time {
339        match self {
340            Self::Down { completion_vt, .. } => *completion_vt,
341            Self::Exit { exit_vt, .. } => *exit_vt,
342            Self::Timeout { tick_vt, .. } => *tick_vt,
343        }
344    }
345
346    const fn kind_rank(&self) -> u8 {
347        match self {
348            Self::Down { .. } => 0,
349            Self::Exit { .. } => 1,
350            Self::Timeout { .. } => 2,
351        }
352    }
353
354    const fn subject_key(&self) -> SystemMsgSubjectKey {
355        match self {
356            Self::Down { notification, .. } => SystemMsgSubjectKey::Task(notification.monitored),
357            Self::Exit { from, .. } => SystemMsgSubjectKey::Task(*from),
358            Self::Timeout { id, .. } => SystemMsgSubjectKey::TimeoutId(*id),
359        }
360    }
361
362    /// Deterministic ordering key for batched system-message delivery.
363    ///
364    /// Order is:
365    /// 1. virtual time (`vt`)
366    /// 2. message kind rank (`Down < Exit < Timeout`)
367    /// 3. stable subject key (`TaskId` or timeout id)
368    ///
369    /// This key underpins the app shutdown ordering contract in
370    /// `docs/spork_deterministic_ordering.md`.
371    #[must_use]
372    pub const fn sort_key(&self) -> (Time, u8, SystemMsgSubjectKey) {
373        (self.vt(), self.kind_rank(), self.subject_key())
374    }
375}
376
377impl From<DownMsg> for SystemMsg {
378    fn from(value: DownMsg) -> Self {
379        Self::Down {
380            completion_vt: value.completion_vt,
381            notification: value.notification,
382        }
383    }
384}
385
386impl From<ExitMsg> for SystemMsg {
387    fn from(value: ExitMsg) -> Self {
388        Self::Exit {
389            exit_vt: value.exit_vt,
390            from: value.from,
391            reason: value.reason,
392        }
393    }
394}
395
396impl From<TimeoutMsg> for SystemMsg {
397    fn from(value: TimeoutMsg) -> Self {
398        Self::Timeout {
399            tick_vt: value.tick_vt,
400            id: value.id,
401        }
402    }
403}
404
405impl TryFrom<SystemMsg> for DownMsg {
406    type Error = SystemMsg;
407
408    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
409        match value {
410            SystemMsg::Down {
411                completion_vt,
412                notification,
413            } => Ok(Self {
414                completion_vt,
415                notification,
416            }),
417            other => Err(other),
418        }
419    }
420}
421
422impl TryFrom<SystemMsg> for ExitMsg {
423    type Error = SystemMsg;
424
425    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
426        match value {
427            SystemMsg::Exit {
428                exit_vt,
429                from,
430                reason,
431            } => Ok(Self {
432                exit_vt,
433                from,
434                reason,
435            }),
436            other => Err(other),
437        }
438    }
439}
440
441impl TryFrom<SystemMsg> for TimeoutMsg {
442    type Error = SystemMsg;
443
444    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
445        match value {
446            SystemMsg::Timeout { tick_vt, id } => Ok(Self { tick_vt, id }),
447            other => Err(other),
448        }
449    }
450}
451
452/// Stable subject key used by [`SystemMsg::sort_key`].
453#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
454pub enum SystemMsgSubjectKey {
455    /// Messages keyed by task identity (`Down`, `Exit`).
456    Task(TaskId),
457    /// Timeout tick keyed by timeout id.
458    TimeoutId(u64),
459}
460
461/// Batched system messages with deterministic sort for shutdown drain paths.
462///
463/// This is used when a runtime layer accumulates multiple `Down` / `Exit` /
464/// `Timeout` messages in one scheduler step and needs replay-stable delivery.
465#[derive(Debug, Default)]
466pub struct SystemMsgBatch {
467    entries: Vec<SystemMsg>,
468}
469
470impl SystemMsgBatch {
471    /// Creates an empty batch.
472    #[must_use]
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Adds a message to the batch.
478    pub fn push(&mut self, msg: SystemMsg) {
479        self.entries.push(msg);
480    }
481
482    /// Consumes the batch and returns deterministically ordered messages.
483    #[must_use]
484    pub fn into_sorted(mut self) -> Vec<SystemMsg> {
485        self.entries.sort_by_key(SystemMsg::sort_key);
486        self.entries
487    }
488}
489
490/// A GenServer processes calls (request-response) and casts (fire-and-forget).
491///
492/// # Cancel Safety
493///
494/// When a GenServer is cancelled:
495/// 1. The mailbox closes (no new messages accepted)
496/// 2. Buffered messages are drained (calls receive errors, casts are processed)
497/// 3. `on_stop` runs for cleanup
498/// 4. The server state is returned via `GenServerHandle::join`
499pub trait GenServer: Send + 'static {
500    /// Request type for calls (synchronous request-response).
501    type Call: Send + 'static;
502
503    /// Reply type returned to callers.
504    type Reply: Send + 'static;
505
506    /// Message type for casts (asynchronous fire-and-forget).
507    type Cast: Send + 'static;
508
509    /// Message type for `info` (system/out-of-band notifications).
510    ///
511    /// Recommended default is [`SystemMsg`]. Servers that want their own info messages
512    /// can define an enum that contains `SystemMsg` plus app-specific variants.
513    ///
514    /// Note: associated type defaults are unstable on Rust stable; implementors
515    /// should write `type Info = SystemMsg;` if they only need system messages.
516    type Info: Send + 'static;
517
518    /// Handle a call (request-response).
519    ///
520    /// The `reply` handle **must** be consumed by calling `reply.send(value)`.
521    /// Dropping it without sending is detected as an obligation leak in lab mode.
522    fn handle_call(
523        &mut self,
524        cx: &Cx,
525        request: Self::Call,
526        reply: Reply<Self::Reply>,
527    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
528
529    /// Handle a cast (fire-and-forget).
530    ///
531    /// No reply is expected. The default implementation does nothing.
532    fn handle_cast(
533        &mut self,
534        _cx: &Cx,
535        _msg: Self::Cast,
536    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
537        Box::pin(async {})
538    }
539
540    /// Handle an info message (system/out-of-band).
541    ///
542    /// Defaults to a no-op.
543    fn handle_info(
544        &mut self,
545        _cx: &Cx,
546        _msg: Self::Info,
547    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
548        Box::pin(async {})
549    }
550
551    /// Called once when the server starts, before processing any messages.
552    fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
553        Box::pin(async {})
554    }
555
556    /// Returns the budget used for the init (`on_start`) phase.
557    ///
558    /// This budget is met with the task/region budget and applied only for the
559    /// duration of `on_start`. Budget consumption during `on_start` is preserved
560    /// when restoring the original budget for the message loop.
561    fn on_start_budget(&self) -> Budget {
562        Budget::INFINITE
563    }
564
565    /// Called once when the server stops, after the mailbox is drained.
566    fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
567        Box::pin(async {})
568    }
569
570    /// Returns the budget used for drain + stop (`on_stop`).
571    ///
572    /// The default is [`Budget::MINIMAL`] for bounded cleanup.
573    fn on_stop_budget(&self) -> Budget {
574        Budget::MINIMAL
575    }
576
577    /// Returns the overflow policy for cast messages when the mailbox is full.
578    ///
579    /// The default is [`CastOverflowPolicy::Reject`], which returns
580    /// `CastError::Full` to the sender.
581    ///
582    /// Override this to use `DropOldest` for "latest-value-wins" patterns.
583    fn cast_overflow_policy(&self) -> CastOverflowPolicy {
584        CastOverflowPolicy::Reject
585    }
586}
587
588/// Handle for sending a reply to a call.
589///
590/// This is a **linear obligation token**: it **must** be consumed by calling
591/// [`send()`](Self::send) or [`abort()`](Self::abort). Dropping without
592/// consuming triggers a panic via [`ObligationToken<SendPermit>`].
593///
594/// Backed by [`TrackedOneshotPermit`](session::TrackedOneshotPermit) from
595/// `channel::session`,
596/// making "silent reply drop" structurally impossible.
597pub struct Reply<R> {
598    cx: Cx,
599    permit: Option<TrackedOneshotPermit<R>>,
600}
601
602impl<R: Send + 'static> Reply<R> {
603    fn new(cx: &Cx, permit: TrackedOneshotPermit<R>) -> Self {
604        Self {
605            cx: cx.clone(),
606            permit: Some(permit),
607        }
608    }
609
610    /// Send the reply value to the caller, returning a [`CommittedProof`].
611    ///
612    /// Consumes the reply handle. If the caller has dropped (e.g., timed out),
613    /// the obligation is aborted cleanly (no panic).
614    pub fn send(mut self, value: R) -> ReplyOutcome {
615        let permit = self
616            .permit
617            .take()
618            .expect("Reply::send called after reply was already consumed");
619        match permit.send(value) {
620            Ok(proof) => {
621                self.cx.trace("gen_server::reply_committed");
622                ReplyOutcome::Committed(proof)
623            }
624            Err(_send_err) => {
625                // Receiver (caller) dropped — e.g., timed out. The tracked
626                // permit aborts the obligation cleanly in this case.
627                self.cx.trace("gen_server::reply_caller_gone");
628                ReplyOutcome::CallerGone
629            }
630        }
631    }
632
633    /// Explicitly abort the reply obligation without sending a value.
634    ///
635    /// Use this when the server intentionally chooses not to reply (e.g.,
636    /// delegating to another process). Returns an [`AbortedProof`].
637    #[must_use]
638    pub fn abort(mut self) -> AbortedProof<SendPermit> {
639        self.cx.trace("gen_server::reply_aborted");
640        self.permit
641            .take()
642            .expect("Reply::abort called after reply was already consumed")
643            .abort()
644    }
645
646    /// Check if the caller is still waiting for a reply.
647    #[inline]
648    #[must_use]
649    pub fn is_closed(&self) -> bool {
650        self.permit
651            .as_ref()
652            .is_some_and(TrackedOneshotPermit::is_closed)
653    }
654}
655
656impl<R> Drop for Reply<R> {
657    fn drop(&mut self) {
658        let Some(permit) = self.permit.take() else {
659            return;
660        };
661
662        if std::thread::panicking() {
663            // Preserve the original panic instead of detonating the reply
664            // drop-bomb during unwind.
665            let _ = permit.abort();
666        } else {
667            drop(permit);
668        }
669    }
670}
671
672impl<R> std::fmt::Debug for Reply<R> {
673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
674        f.debug_struct("Reply")
675            .field("pending", &self.permit.is_some())
676            .finish_non_exhaustive()
677    }
678}
679
680/// Outcome of sending a reply.
681pub enum ReplyOutcome {
682    /// Reply was successfully delivered, obligation committed.
683    Committed(CommittedProof<SendPermit>),
684    /// Caller has already gone (e.g., timed out). Obligation was aborted.
685    CallerGone,
686}
687
688impl std::fmt::Debug for ReplyOutcome {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        match self {
691            Self::Committed(_) => f.debug_tuple("Committed").finish(),
692            Self::CallerGone => write!(f, "CallerGone"),
693        }
694    }
695}
696
697// ============================================================================
698// Internal message envelope
699// ============================================================================
700
701/// Internal message type wrapping calls/casts/info.
702enum Envelope<S: GenServer> {
703    Call {
704        request: S::Call,
705        reply_permit: TrackedOneshotPermit<S::Reply>,
706    },
707    Cast {
708        msg: S::Cast,
709    },
710    Info {
711        msg: S::Info,
712    },
713}
714
715impl<S: GenServer> std::fmt::Debug for Envelope<S> {
716    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
717        match self {
718            Self::Call { .. } => f.debug_struct("Envelope::Call").finish_non_exhaustive(),
719            Self::Cast { .. } => f.debug_struct("Envelope::Cast").finish_non_exhaustive(),
720            Self::Info { .. } => f.debug_struct("Envelope::Info").finish_non_exhaustive(),
721        }
722    }
723}
724
725// ============================================================================
726// GenServer cell (internal runtime state)
727// ============================================================================
728
729struct GenServerCell<S: GenServer> {
730    mailbox: mpsc::Receiver<Envelope<S>>,
731    state: Arc<GenServerStateCell>,
732    _keep_alive: mpsc::Sender<Envelope<S>>,
733}
734
735#[derive(Debug)]
736struct GenServerStateCell {
737    state: AtomicU8,
738}
739
740impl GenServerStateCell {
741    fn new(state: ActorState) -> Self {
742        Self {
743            state: AtomicU8::new(encode_actor_state(state)),
744        }
745    }
746
747    fn load(&self) -> ActorState {
748        decode_actor_state(self.state.load(Ordering::Acquire))
749    }
750
751    fn store(&self, state: ActorState) {
752        self.state
753            .store(encode_actor_state(state), Ordering::Release);
754    }
755}
756
757const fn encode_actor_state(state: ActorState) -> u8 {
758    match state {
759        ActorState::Created => 0,
760        ActorState::Running => 1,
761        ActorState::Stopping => 2,
762        ActorState::Stopped => 3,
763    }
764}
765
766const fn decode_actor_state(value: u8) -> ActorState {
767    match value {
768        0 => ActorState::Created,
769        1 => ActorState::Running,
770        2 => ActorState::Stopping,
771        _ => ActorState::Stopped,
772    }
773}
774
775// ============================================================================
776// GenServerHandle: external handle for calls and casts
777// ============================================================================
778
779/// Handle to a running GenServer.
780///
781/// Provides typed `call()` and `cast()` methods. The handle owns a sender to
782/// the server's mailbox and a oneshot receiver for join.
783#[derive(Debug)]
784pub struct GenServerHandle<S: GenServer> {
785    actor_id: ActorId,
786    sender: mpsc::Sender<Envelope<S>>,
787    state: Arc<GenServerStateCell>,
788    task_id: TaskId,
789    receiver: oneshot::Receiver<Result<S, JoinError>>,
790    inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
791    completed: bool,
792    overflow_policy: CastOverflowPolicy,
793}
794
795/// Error returned when a call fails.
796#[derive(Debug)]
797pub enum CallError {
798    /// The server has stopped (mailbox disconnected).
799    ServerStopped,
800    /// The server did not reply (oneshot dropped).
801    NoReply,
802    /// The call was cancelled.
803    Cancelled(CancelReason),
804}
805
806impl std::fmt::Display for CallError {
807    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808        match self {
809            Self::ServerStopped => write!(f, "GenServer has stopped"),
810            Self::NoReply => write!(f, "GenServer did not reply"),
811            Self::Cancelled(reason) => write!(f, "GenServer call cancelled: {reason}"),
812        }
813    }
814}
815
816impl std::error::Error for CallError {}
817
818/// Error returned when a cast fails.
819#[derive(Debug)]
820pub enum CastError {
821    /// The server has stopped (mailbox disconnected).
822    ServerStopped,
823    /// The mailbox is full.
824    Full,
825    /// The cast was cancelled.
826    Cancelled(CancelReason),
827}
828
829impl std::fmt::Display for CastError {
830    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
831        match self {
832            Self::ServerStopped => write!(f, "GenServer has stopped"),
833            Self::Full => write!(f, "GenServer mailbox full"),
834            Self::Cancelled(reason) => write!(f, "GenServer cast cancelled: {reason}"),
835        }
836    }
837}
838
839impl std::error::Error for CastError {}
840
841/// Error returned when sending an info message fails.
842#[derive(Debug)]
843pub enum InfoError {
844    /// The server has stopped (mailbox disconnected).
845    ServerStopped,
846    /// The mailbox is full.
847    Full,
848    /// The send was cancelled.
849    Cancelled(CancelReason),
850}
851
852impl std::fmt::Display for InfoError {
853    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
854        match self {
855            Self::ServerStopped => write!(f, "GenServer has stopped"),
856            Self::Full => write!(f, "GenServer mailbox full"),
857            Self::Cancelled(reason) => write!(f, "GenServer info cancelled: {reason}"),
858        }
859    }
860}
861
862impl std::error::Error for InfoError {}
863
864impl<S: GenServer> GenServerHandle<S> {
865    /// Send a call (request-response) to the server.
866    ///
867    /// Blocks until the server replies or the server stops. The reply channel
868    /// uses obligation-tracked oneshot from `channel::session`, ensuring that
869    /// if the server drops the reply without sending, the obligation token
870    /// panics rather than silently losing the reply.
871    pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
872        if cx.checkpoint().is_err() {
873            cx.trace("gen_server::call_rejected_cancelled");
874            let reason = cx
875                .cancel_reason()
876                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
877            return Err(CallError::Cancelled(reason));
878        }
879
880        if matches!(
881            self.state.load(),
882            ActorState::Stopping | ActorState::Stopped
883        ) {
884            cx.trace("gen_server::call_rejected_stopped");
885            return Err(CallError::ServerStopped);
886        }
887
888        let send_permit = match self.sender.reserve(cx).await {
889            Ok(p) => p,
890            Err(e) => {
891                let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
892                if was_cancelled {
893                    cx.trace("gen_server::call_send_cancelled");
894                    let reason = cx
895                        .cancel_reason()
896                        .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
897                    return Err(CallError::Cancelled(reason));
898                }
899                cx.trace("gen_server::call_send_failed");
900                return Err(CallError::ServerStopped);
901            }
902        };
903
904        let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
905        let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
906        let envelope: Envelope<S> = Envelope::Call {
907            request,
908            reply_permit,
909        };
910
911        // Use try_send so that if the receiver was dropped between reserve()
912        // and now, we can extract the envelope and explicitly abort the
913        // reply_permit obligation.  Calling SendPermit::send would silently
914        // discard the value on disconnection, dropping the still-armed
915        // obligation token and panicking.
916        if let Err(e) = send_permit.try_send(envelope) {
917            let envelope = match e {
918                mpsc::SendError::Disconnected(v)
919                | mpsc::SendError::Full(v)
920                | mpsc::SendError::Cancelled(v) => v,
921            };
922            if let Envelope::Call { reply_permit, .. } = envelope {
923                let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
924            }
925            cx.trace("gen_server::call_send_failed");
926            return Err(CallError::ServerStopped);
927        }
928
929        cx.trace("gen_server::call_enqueued");
930
931        match reply_rx.recv(cx).await {
932            Ok(v) => Ok(v),
933            Err(oneshot::RecvError::Closed) => {
934                cx.trace("gen_server::call_no_reply");
935                Err(CallError::NoReply)
936            }
937            Err(oneshot::RecvError::Cancelled) => {
938                cx.trace("gen_server::call_reply_cancelled");
939                let reason = cx
940                    .cancel_reason()
941                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
942                Err(CallError::Cancelled(reason))
943            }
944            Err(oneshot::RecvError::PolledAfterCompletion) => {
945                unreachable!("GenServer call awaits a fresh reply oneshot recv future")
946            }
947        }
948    }
949
950    /// Send a cast (fire-and-forget) to the server.
951    pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
952        if cx.checkpoint().is_err() {
953            cx.trace("gen_server::cast_rejected_cancelled");
954            let reason = cx
955                .cancel_reason()
956                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
957            return Err(CastError::Cancelled(reason));
958        }
959
960        if matches!(
961            self.state.load(),
962            ActorState::Stopping | ActorState::Stopped
963        ) {
964            cx.trace("gen_server::cast_rejected_stopped");
965            return Err(CastError::ServerStopped);
966        }
967        let envelope: Envelope<S> = Envelope::Cast { msg };
968        self.sender.send(cx, envelope).await.map_err(|e| match e {
969            mpsc::SendError::Cancelled(_) => {
970                cx.trace("gen_server::cast_send_cancelled");
971                let reason = cx
972                    .cancel_reason()
973                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
974                CastError::Cancelled(reason)
975            }
976            mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
977                cx.trace("gen_server::cast_send_failed");
978                CastError::ServerStopped
979            }
980        })
981    }
982
983    /// Try to send a cast without blocking.
984    ///
985    /// Applies the server's [`CastOverflowPolicy`] when the mailbox is full:
986    /// - `Reject`: returns `CastError::Full`
987    /// - `DropOldest`: evicts the oldest queued cast and enqueues the new one
988    pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
989        if matches!(
990            self.state.load(),
991            ActorState::Stopping | ActorState::Stopped
992        ) {
993            return Err(CastError::ServerStopped);
994        }
995        let envelope: Envelope<S> = Envelope::Cast { msg };
996        match self.overflow_policy {
997            CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
998                mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
999                    CastError::ServerStopped
1000                }
1001                mpsc::SendError::Full(_) => CastError::Full,
1002            }),
1003            CastOverflowPolicy::DropOldest => {
1004                match self.sender.send_evict_oldest_where(envelope, |queued| {
1005                    matches!(queued, Envelope::Cast { .. })
1006                }) {
1007                    Ok(Some(_evicted)) => {
1008                        // Trace the eviction so lossy drops are observable.
1009                        if let Some(cx) = Cx::current() {
1010                            cx.trace("gen_server::cast_evicted_oldest");
1011                        }
1012                        Ok(())
1013                    }
1014                    Ok(None) => Ok(()),
1015                    Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1016                        Err(CastError::ServerStopped)
1017                    }
1018                    Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
1019                }
1020            }
1021        }
1022    }
1023
1024    /// Send an info message (system/out-of-band) to the server.
1025    pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1026        if cx.checkpoint().is_err() {
1027            let reason = cx
1028                .cancel_reason()
1029                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1030            return Err(InfoError::Cancelled(reason));
1031        }
1032
1033        if matches!(
1034            self.state.load(),
1035            ActorState::Stopping | ActorState::Stopped
1036        ) {
1037            cx.trace("gen_server::info_rejected_stopped");
1038            return Err(InfoError::ServerStopped);
1039        }
1040
1041        let envelope: Envelope<S> = Envelope::Info { msg };
1042        self.sender.send(cx, envelope).await.map_err(|e| match e {
1043            mpsc::SendError::Cancelled(_) => {
1044                let reason = cx
1045                    .cancel_reason()
1046                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1047                InfoError::Cancelled(reason)
1048            }
1049            mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1050            mpsc::SendError::Full(_) => InfoError::Full,
1051        })
1052    }
1053
1054    /// Try to send an info message without blocking.
1055    pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1056        if matches!(
1057            self.state.load(),
1058            ActorState::Stopping | ActorState::Stopped
1059        ) {
1060            return Err(InfoError::ServerStopped);
1061        }
1062
1063        let envelope: Envelope<S> = Envelope::Info { msg };
1064        self.sender.try_send(envelope).map_err(|e| match e {
1065            mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1066                InfoError::ServerStopped
1067            }
1068            mpsc::SendError::Full(_) => InfoError::Full,
1069        })
1070    }
1071
1072    /// Returns the server's overflow policy for cast messages.
1073    #[inline]
1074    #[must_use]
1075    pub fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1076        self.overflow_policy
1077    }
1078
1079    /// Returns the server's actor ID.
1080    #[inline]
1081    #[must_use]
1082    pub const fn actor_id(&self) -> ActorId {
1083        self.actor_id
1084    }
1085
1086    /// Returns the server's task ID.
1087    #[inline]
1088    #[must_use]
1089    pub fn task_id(&self) -> TaskId {
1090        self.task_id
1091    }
1092
1093    /// Returns true if the server has finished.
1094    #[inline]
1095    #[must_use]
1096    pub fn is_finished(&self) -> bool {
1097        self.completed || self.receiver.is_ready() || self.receiver.is_closed()
1098    }
1099
1100    /// Signals the server to stop gracefully.
1101    ///
1102    /// Closes the mailbox and waits for the server to process remaining messages.
1103    pub fn stop(&self) {
1104        self.state.store(ActorState::Stopping);
1105        // Ensure a server blocked in `mailbox.recv()` is woken so it can observe
1106        // the state change and run drain/on_stop deterministically.
1107        self.sender.wake_receiver();
1108    }
1109
1110    /// Request the server to stop immediately by aborting its task.
1111    ///
1112    /// Sets `cancel_requested` on the server's context, causing the loop
1113    /// to exit at the next cancellation check point.
1114    pub fn abort(&self) {
1115        self.state.store(ActorState::Stopping);
1116        if let Some(inner) = self.inner.upgrade() {
1117            let cancel_waker = {
1118                let mut guard = inner.write();
1119                guard.cancel_requested = true;
1120                guard
1121                    .fast_cancel
1122                    .store(true, std::sync::atomic::Ordering::Release);
1123                if guard.cancel_reason.is_none() {
1124                    guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
1125                }
1126                guard.cancel_waker.clone()
1127            };
1128            if let Some(waker) = cancel_waker {
1129                waker.wake_by_ref();
1130            }
1131        }
1132        self.sender.wake_receiver();
1133    }
1134
1135    /// Wait for the server to finish and return its final state.
1136    pub fn join<'a>(&'a mut self, _cx: &'a Cx) -> GenServerJoinFuture<'a, S> {
1137        let cx_inner = self.inner.clone();
1138        let receiver = &mut self.receiver;
1139        let terminal_state = &mut self.completed;
1140        GenServerJoinFuture {
1141            inner: receiver.recv_uninterruptible(),
1142            cx_inner,
1143            sender: self.sender.clone(),
1144            state: Arc::clone(&self.state),
1145            terminal_state,
1146            drop_abort_defused: false,
1147        }
1148    }
1149}
1150
1151/// Future returned by [`GenServerHandle::join`].
1152///
1153/// This future aborts the server if dropped before completion, ensuring correct
1154/// cleanup in races and timeouts.
1155pub struct GenServerJoinFuture<'a, S: GenServer> {
1156    inner: oneshot::RecvUninterruptibleFuture<'a, Result<S, JoinError>>,
1157    cx_inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
1158    sender: mpsc::Sender<Envelope<S>>,
1159    state: Arc<GenServerStateCell>,
1160    terminal_state: &'a mut bool,
1161    drop_abort_defused: bool,
1162}
1163
1164impl<S: GenServer> GenServerJoinFuture<'_, S> {
1165    fn closed_reason(&self) -> crate::types::CancelReason {
1166        self.cx_inner
1167            .upgrade()
1168            .and_then(|inner| inner.read().cancel_reason.clone())
1169            .unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
1170    }
1171
1172    fn abort(&self) {
1173        self.state.store(ActorState::Stopping);
1174        if let Some(inner) = self.cx_inner.upgrade() {
1175            let cancel_waker = {
1176                let mut guard = inner.write();
1177                guard.cancel_requested = true;
1178                guard
1179                    .fast_cancel
1180                    .store(true, std::sync::atomic::Ordering::Release);
1181                if guard.cancel_reason.is_none() {
1182                    guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
1183                }
1184                guard.cancel_waker.clone()
1185            };
1186            if let Some(waker) = cancel_waker {
1187                waker.wake_by_ref();
1188            }
1189        }
1190        self.sender.wake_receiver();
1191    }
1192}
1193
1194impl<S: GenServer> std::future::Future for GenServerJoinFuture<'_, S> {
1195    type Output = Result<S, JoinError>;
1196
1197    fn poll(
1198        mut self: std::pin::Pin<&mut Self>,
1199        cx: &mut std::task::Context<'_>,
1200    ) -> std::task::Poll<Self::Output> {
1201        let this = &mut *self;
1202        if *this.terminal_state {
1203            return std::task::Poll::Ready(Err(JoinError::PolledAfterCompletion));
1204        }
1205
1206        match std::pin::Pin::new(&mut this.inner).poll(cx) {
1207            std::task::Poll::Ready(Ok(res)) => {
1208                *this.terminal_state = true;
1209                this.drop_abort_defused = true;
1210                std::task::Poll::Ready(res)
1211            }
1212            std::task::Poll::Ready(Err(oneshot::RecvError::Closed)) => {
1213                *this.terminal_state = true;
1214                this.drop_abort_defused = true;
1215                let reason = this.closed_reason();
1216                std::task::Poll::Ready(Err(JoinError::Cancelled(reason)))
1217            }
1218            std::task::Poll::Ready(Err(oneshot::RecvError::Cancelled)) => {
1219                unreachable!("RecvUninterruptibleFuture cannot return Cancelled");
1220            }
1221            std::task::Poll::Ready(Err(oneshot::RecvError::PolledAfterCompletion)) => {
1222                unreachable!(
1223                    "JoinFuture guards repolls before polling the inner oneshot recv future"
1224                )
1225            }
1226            std::task::Poll::Pending => std::task::Poll::Pending,
1227        }
1228    }
1229}
1230
1231impl<S: GenServer> Drop for GenServerJoinFuture<'_, S> {
1232    fn drop(&mut self) {
1233        if !*self.terminal_state && !self.drop_abort_defused {
1234            if self.inner.receiver_finished() {
1235                return;
1236            }
1237            self.abort();
1238        }
1239    }
1240}
1241
1242/// A lightweight, clonable reference for casting to a GenServer.
1243///
1244/// Supports `call()` and `cast()`; it does not support `join()` (use
1245/// [`GenServerHandle`] for waiting on the final server state).
1246#[derive(Debug)]
1247pub struct GenServerRef<S: GenServer> {
1248    actor_id: ActorId,
1249    sender: mpsc::Sender<Envelope<S>>,
1250    state: Arc<GenServerStateCell>,
1251    overflow_policy: CastOverflowPolicy,
1252}
1253
1254impl<S: GenServer> Clone for GenServerRef<S> {
1255    fn clone(&self) -> Self {
1256        Self {
1257            actor_id: self.actor_id,
1258            sender: self.sender.clone(),
1259            state: Arc::clone(&self.state),
1260            overflow_policy: self.overflow_policy,
1261        }
1262    }
1263}
1264
1265impl<S: GenServer> GenServerRef<S> {
1266    /// Returns the configured cast overflow policy for this server.
1267    #[must_use]
1268    pub const fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1269        self.overflow_policy
1270    }
1271
1272    /// Send a call to the server.
1273    pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
1274        if cx.checkpoint().is_err() {
1275            cx.trace("gen_server::call_rejected_cancelled");
1276            let reason = cx
1277                .cancel_reason()
1278                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1279            return Err(CallError::Cancelled(reason));
1280        }
1281
1282        if matches!(
1283            self.state.load(),
1284            ActorState::Stopping | ActorState::Stopped
1285        ) {
1286            cx.trace("gen_server::call_rejected_stopped");
1287            return Err(CallError::ServerStopped);
1288        }
1289
1290        let send_permit = match self.sender.reserve(cx).await {
1291            Ok(p) => p,
1292            Err(e) => {
1293                let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
1294                if was_cancelled {
1295                    cx.trace("gen_server::call_send_cancelled");
1296                    let reason = cx
1297                        .cancel_reason()
1298                        .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1299                    return Err(CallError::Cancelled(reason));
1300                }
1301                cx.trace("gen_server::call_send_failed");
1302                return Err(CallError::ServerStopped);
1303            }
1304        };
1305
1306        let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
1307        let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
1308        let envelope: Envelope<S> = Envelope::Call {
1309            request,
1310            reply_permit,
1311        };
1312
1313        // Use try_send so that if the receiver was dropped between reserve()
1314        // and now, we can extract the envelope and explicitly abort the
1315        // reply_permit obligation.  Calling SendPermit::send would silently
1316        // discard the value on disconnection, dropping the still-armed
1317        // obligation token and panicking.
1318        if let Err(e) = send_permit.try_send(envelope) {
1319            let envelope = match e {
1320                mpsc::SendError::Disconnected(v)
1321                | mpsc::SendError::Full(v)
1322                | mpsc::SendError::Cancelled(v) => v,
1323            };
1324            if let Envelope::Call { reply_permit, .. } = envelope {
1325                let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
1326            }
1327            cx.trace("gen_server::call_send_failed");
1328            return Err(CallError::ServerStopped);
1329        }
1330
1331        cx.trace("gen_server::call_enqueued");
1332
1333        match reply_rx.recv(cx).await {
1334            Ok(v) => Ok(v),
1335            Err(oneshot::RecvError::Closed) => {
1336                cx.trace("gen_server::call_no_reply");
1337                Err(CallError::NoReply)
1338            }
1339            Err(oneshot::RecvError::Cancelled) => {
1340                cx.trace("gen_server::call_reply_cancelled");
1341                let reason = cx
1342                    .cancel_reason()
1343                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1344                Err(CallError::Cancelled(reason))
1345            }
1346            Err(oneshot::RecvError::PolledAfterCompletion) => {
1347                unreachable!("GenServerRef::call awaits a fresh reply oneshot recv future")
1348            }
1349        }
1350    }
1351
1352    /// Send a cast to the server.
1353    pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
1354        if cx.checkpoint().is_err() {
1355            cx.trace("gen_server::cast_rejected_cancelled");
1356            let reason = cx
1357                .cancel_reason()
1358                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1359            return Err(CastError::Cancelled(reason));
1360        }
1361
1362        if matches!(
1363            self.state.load(),
1364            ActorState::Stopping | ActorState::Stopped
1365        ) {
1366            cx.trace("gen_server::cast_rejected_stopped");
1367            return Err(CastError::ServerStopped);
1368        }
1369        let envelope: Envelope<S> = Envelope::Cast { msg };
1370        self.sender.send(cx, envelope).await.map_err(|e| match e {
1371            mpsc::SendError::Cancelled(_) => {
1372                cx.trace("gen_server::cast_send_cancelled");
1373                let reason = cx
1374                    .cancel_reason()
1375                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1376                CastError::Cancelled(reason)
1377            }
1378            mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
1379                cx.trace("gen_server::cast_send_failed");
1380                CastError::ServerStopped
1381            }
1382        })
1383    }
1384
1385    /// Try to send a cast without blocking.
1386    ///
1387    /// Applies the server's [`CastOverflowPolicy`] when the mailbox is full.
1388    pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
1389        if matches!(
1390            self.state.load(),
1391            ActorState::Stopping | ActorState::Stopped
1392        ) {
1393            return Err(CastError::ServerStopped);
1394        }
1395        let envelope: Envelope<S> = Envelope::Cast { msg };
1396        match self.overflow_policy {
1397            CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
1398                mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1399                    CastError::ServerStopped
1400                }
1401                mpsc::SendError::Full(_) => CastError::Full,
1402            }),
1403            CastOverflowPolicy::DropOldest => match self
1404                .sender
1405                .send_evict_oldest_where(envelope, |queued| matches!(queued, Envelope::Cast { .. }))
1406            {
1407                Ok(Some(evicted)) => {
1408                    debug_assert!(matches!(evicted, Envelope::Cast { .. }));
1409                    if let Some(cx) = Cx::current() {
1410                        cx.trace("gen_server::cast_evicted_oldest");
1411                    }
1412                    Ok(())
1413                }
1414                Ok(None) => Ok(()),
1415                Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1416                    Err(CastError::ServerStopped)
1417                }
1418                Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
1419            },
1420        }
1421    }
1422
1423    /// Send an info message (system/out-of-band) to the server.
1424    pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1425        if cx.checkpoint().is_err() {
1426            let reason = cx
1427                .cancel_reason()
1428                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1429            return Err(InfoError::Cancelled(reason));
1430        }
1431
1432        if matches!(
1433            self.state.load(),
1434            ActorState::Stopping | ActorState::Stopped
1435        ) {
1436            cx.trace("gen_server::info_rejected_stopped");
1437            return Err(InfoError::ServerStopped);
1438        }
1439
1440        let envelope: Envelope<S> = Envelope::Info { msg };
1441        self.sender.send(cx, envelope).await.map_err(|e| match e {
1442            mpsc::SendError::Cancelled(_) => {
1443                let reason = cx
1444                    .cancel_reason()
1445                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1446                InfoError::Cancelled(reason)
1447            }
1448            mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1449            mpsc::SendError::Full(_) => InfoError::Full,
1450        })
1451    }
1452
1453    /// Try to send an info message without blocking.
1454    pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1455        if matches!(
1456            self.state.load(),
1457            ActorState::Stopping | ActorState::Stopped
1458        ) {
1459            return Err(InfoError::ServerStopped);
1460        }
1461
1462        let envelope: Envelope<S> = Envelope::Info { msg };
1463        self.sender.try_send(envelope).map_err(|e| match e {
1464            mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1465                InfoError::ServerStopped
1466            }
1467            mpsc::SendError::Full(_) => InfoError::Full,
1468        })
1469    }
1470
1471    /// Returns true if the server has stopped.
1472    #[inline]
1473    #[must_use]
1474    pub fn is_closed(&self) -> bool {
1475        self.sender.is_closed()
1476    }
1477
1478    /// Returns true if the server is still alive.
1479    #[inline]
1480    #[must_use]
1481    pub fn is_alive(&self) -> bool {
1482        self.state.load() != ActorState::Stopped
1483    }
1484
1485    /// Returns the server's actor ID.
1486    #[inline]
1487    #[must_use]
1488    pub const fn actor_id(&self) -> ActorId {
1489        self.actor_id
1490    }
1491}
1492
1493impl<S: GenServer> GenServerHandle<S> {
1494    /// Returns a lightweight, clonable reference for casting.
1495    #[inline]
1496    #[must_use]
1497    pub fn server_ref(&self) -> GenServerRef<S> {
1498        GenServerRef {
1499            actor_id: self.actor_id,
1500            sender: self.sender.clone(),
1501            state: Arc::clone(&self.state),
1502            overflow_policy: self.overflow_policy,
1503        }
1504    }
1505}
1506
1507// ============================================================================
1508// GenServer runtime loop
1509// ============================================================================
1510
1511/// Default mailbox capacity for GenServers.
1512pub const DEFAULT_GENSERVER_MAILBOX_CAPACITY: usize = 64;
1513
1514/// Runs the GenServer message loop.
1515async fn run_gen_server_loop<S: GenServer>(
1516    mut server: S,
1517    cx: Cx,
1518    cell: &mut GenServerCell<S>,
1519) -> S {
1520    use crate::tracing_compat::debug;
1521
1522    // Only transition to Running if stop() wasn't called before the server started.
1523    // stop() sets Stopping before scheduling; we must honour that signal so the
1524    // poll_fn guard in the message loop can detect the pre-stop and break.
1525    if cell.state.load() != ActorState::Stopping {
1526        cell.state.store(ActorState::Running);
1527    }
1528
1529    // Phase 1: Initialization
1530    // Skip init when either the Cx is cancelled or the server was pre-stopped
1531    // (stop() sets Stopping before scheduling, but does not cancel the Cx).
1532    if cx.checkpoint().is_err() || cell.state.load() == ActorState::Stopping {
1533        cx.trace("gen_server::init_skipped_cancelled");
1534    } else {
1535        cx.trace("gen_server::init");
1536        let _budget = PhaseBudgetGuard::enter(&cx, server.on_start_budget(), true);
1537        server.on_start(&cx).await;
1538    }
1539
1540    // Phase 2: Message loop
1541    loop {
1542        if cx.checkpoint().is_err() {
1543            cx.trace("gen_server::cancel_requested");
1544            break;
1545        }
1546
1547        let recv_result = std::future::poll_fn(|task_cx| {
1548            match cell.mailbox.poll_recv(&cx, task_cx) {
1549                std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
1550                    // Graceful stop requested and mailbox is empty. Break the loop.
1551                    std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
1552                }
1553                other => other,
1554            }
1555        })
1556        .await;
1557
1558        match recv_result {
1559            Ok(envelope) => {
1560                dispatch_envelope(&mut server, &cx, envelope).await;
1561            }
1562            Err(crate::channel::mpsc::RecvError::Disconnected) => {
1563                cx.trace("gen_server::mailbox_disconnected");
1564                break;
1565            }
1566            Err(crate::channel::mpsc::RecvError::Cancelled) => {
1567                cx.trace("gen_server::recv_cancelled");
1568                break;
1569            }
1570            Err(crate::channel::mpsc::RecvError::Empty) => {
1571                break;
1572            }
1573        }
1574    }
1575
1576    cell.state.store(ActorState::Stopping);
1577
1578    // Phase 3+4: Drain + stop hook.
1579    //
1580    // Drain+on_stop are cleanup phases. We:
1581    // - tighten budget to a bounded stop budget
1582    // - mask cancellation so cleanup can run deterministically
1583    let _budget = PhaseBudgetGuard::enter(&cx, server.on_stop_budget(), false);
1584    let _mask = AsyncMaskGuard::enter(&cx);
1585
1586    // Phase 3: Drain remaining messages.
1587    // Calls during drain: reply with error (caller should not depend on drain).
1588    // Casts during drain: process normally if gracefully stopped, skip if aborted.
1589    let is_aborted = cx.checkpoint().is_err();
1590
1591    cell.mailbox.close();
1592
1593    let mut drained: u64 = 0;
1594    while let Ok(envelope) = cell.mailbox.try_recv() {
1595        match envelope {
1596            Envelope::Call {
1597                request: _,
1598                reply_permit,
1599            } => {
1600                let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
1601                cx.trace("gen_server::drain_abort_call");
1602            }
1603            Envelope::Cast { msg } => {
1604                if !is_aborted {
1605                    server.handle_cast(&cx, msg).await;
1606                }
1607            }
1608            Envelope::Info { msg } => {
1609                if !is_aborted {
1610                    server.handle_info(&cx, msg).await;
1611                }
1612            }
1613        }
1614        drained += 1;
1615    }
1616    if drained > 0 {
1617        debug!(drained = drained, "gen_server::mailbox_drained");
1618        cx.trace("gen_server::mailbox_drained");
1619    }
1620
1621    // Phase 4: Cleanup
1622    cx.trace("gen_server::terminate");
1623    server.on_stop(&cx).await;
1624
1625    server
1626}
1627
1628/// Dispatch a single envelope to the appropriate handler.
1629async fn dispatch_envelope<S: GenServer>(server: &mut S, cx: &Cx, envelope: Envelope<S>) {
1630    match envelope {
1631        Envelope::Call {
1632            request,
1633            reply_permit,
1634        } => {
1635            let reply = Reply::<S::Reply>::new(cx, reply_permit);
1636            server.handle_call(cx, request, reply).await;
1637        }
1638        Envelope::Cast { msg } => {
1639            server.handle_cast(cx, msg).await;
1640        }
1641        Envelope::Info { msg } => {
1642            server.handle_info(cx, msg).await;
1643        }
1644    }
1645}
1646
1647// ============================================================================
1648// Spawn integration
1649// ============================================================================
1650
1651impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
1652    /// Spawns a new GenServer in this scope.
1653    ///
1654    /// The server runs as a region-owned task. Calls and casts are delivered
1655    /// through a bounded MPSC channel with two-phase send semantics.
1656    pub fn spawn_gen_server<S: GenServer>(
1657        &self,
1658        state: &mut crate::runtime::state::RuntimeState,
1659        cx: &Cx,
1660        server: S,
1661        mailbox_capacity: usize,
1662    ) -> Result<(GenServerHandle<S>, crate::runtime::stored_task::StoredTask), SpawnError> {
1663        use crate::cx::scope::CatchUnwind;
1664        use crate::runtime::stored_task::StoredTask;
1665        use crate::tracing_compat::{debug, debug_span};
1666
1667        let overflow_policy = server.cast_overflow_policy();
1668        let (msg_tx, msg_rx) = mpsc::channel::<Envelope<S>>(mailbox_capacity);
1669        let (result_tx, result_rx) = oneshot::channel::<Result<S, JoinError>>();
1670        let task_id = self.create_task_record(state)?;
1671        let actor_id = ActorId::from_task(task_id);
1672        let server_state = Arc::new(GenServerStateCell::new(ActorState::Created));
1673
1674        let _span = debug_span!(
1675            "gen_server_spawn",
1676            task_id = ?task_id,
1677            region_id = ?self.region_id(),
1678            mailbox_capacity = mailbox_capacity,
1679        )
1680        .entered();
1681        debug!(
1682            task_id = ?task_id,
1683            region_id = ?self.region_id(),
1684            mailbox_capacity = mailbox_capacity,
1685            "gen_server spawned"
1686        );
1687
1688        let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
1689
1690        if let Some(record) = state.task_mut(task_id) {
1691            record.set_cx_inner(child_cx.inner.clone());
1692            record.set_cx(child_cx_full.clone());
1693        }
1694
1695        let cx_for_send = child_cx_full;
1696        let inner_weak = Arc::downgrade(&child_cx.inner);
1697        let state_for_task = Arc::clone(&server_state);
1698
1699        let mut cell = GenServerCell {
1700            mailbox: msg_rx,
1701            state: Arc::clone(&server_state),
1702            _keep_alive: msg_tx.clone(),
1703        };
1704
1705        let wrapped = async move {
1706            let result = CatchUnwind {
1707                inner: Box::pin(run_gen_server_loop(server, child_cx, &mut cell)),
1708            }
1709            .await;
1710            match result {
1711                Ok(server_final) => {
1712                    let _ = result_tx.send(&cx_for_send, Ok(server_final));
1713                }
1714                Err(payload) => {
1715                    let msg = crate::cx::scope::payload_to_string(&payload);
1716                    let _ = result_tx.send(
1717                        &cx_for_send,
1718                        Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
1719                    );
1720                }
1721            }
1722            state_for_task.store(ActorState::Stopped);
1723            Outcome::Ok(())
1724        };
1725
1726        let stored = StoredTask::new_with_id(wrapped, task_id);
1727
1728        let handle = GenServerHandle {
1729            actor_id,
1730            sender: msg_tx,
1731            state: server_state,
1732            task_id,
1733            receiver: result_rx,
1734            inner: inner_weak,
1735            completed: false,
1736            overflow_policy,
1737        };
1738
1739        Ok((handle, stored))
1740    }
1741
1742    /// Spawns a named GenServer in this scope, registering it in the given
1743    /// [`NameRegistry`].
1744    ///
1745    /// This combines [`spawn_gen_server`](Self::spawn_gen_server) with
1746    /// [`NameRegistry::register`] into a single atomic operation: the name is
1747    /// acquired *after* the server task is created but *before* it starts
1748    /// processing messages.
1749    ///
1750    /// On success, the returned [`NamedGenServerHandle`] holds both the server
1751    /// handle and the name lease. The lease is resolved when the handle is
1752    /// released via [`NamedGenServerHandle::release_name`] or aborted via
1753    /// [`NamedGenServerHandle::abort_lease`].
1754    ///
1755    /// # Errors
1756    ///
1757    /// Returns [`NamedSpawnError::Spawn`] if the underlying task spawn fails,
1758    /// or [`NamedSpawnError::NameTaken`] if the name is already registered.
1759    /// In the name-taken case, the server task is *not* spawned (it is
1760    /// abandoned before being stored).
1761    #[allow(clippy::too_many_arguments)]
1762    pub fn spawn_named_gen_server<S: GenServer>(
1763        &self,
1764        state: &mut crate::runtime::state::RuntimeState,
1765        cx: &Cx,
1766        registry: &mut crate::cx::NameRegistry,
1767        name: impl Into<String>,
1768        server: S,
1769        mailbox_capacity: usize,
1770        now: crate::types::Time,
1771    ) -> Result<
1772        (
1773            NamedGenServerHandle<S>,
1774            crate::runtime::stored_task::StoredTask,
1775        ),
1776        NamedSpawnError,
1777    > {
1778        let name = name.into();
1779
1780        // Phase 1: Spawn the server (creates task record + handle).
1781        let (handle, stored) = self
1782            .spawn_gen_server(state, cx, server, mailbox_capacity)
1783            .map_err(NamedSpawnError::Spawn)?;
1784
1785        // Phase 2: Register the name under the new task's ID.
1786        let task_id = handle.task_id();
1787        let region = self.region_id();
1788
1789        match registry.register(name, task_id, region, now) {
1790            Ok(lease) => {
1791                let named = NamedGenServerHandle {
1792                    handle,
1793                    lease: Some(lease),
1794                };
1795                Ok((named, stored))
1796            }
1797            Err(e) => {
1798                // Registration failed: clean up the task record that was created
1799                // by spawn_gen_server to prevent a region quiescence leak.
1800                // Without this cleanup, the region would have a child task that
1801                // is never scheduled and can never complete, blocking region close.
1802                let task_id = handle.task_id();
1803                if let Some(region_record) = state.region(self.region_id()) {
1804                    region_record.remove_task(task_id);
1805                }
1806                state.remove_task(task_id);
1807                Err(NamedSpawnError::NameTaken(e))
1808            }
1809        }
1810    }
1811}
1812
1813// ============================================================================
1814// Named GenServer handle
1815// ============================================================================
1816
1817/// Error from [`Scope::spawn_named_gen_server`].
1818#[derive(Debug)]
1819pub enum NamedSpawnError {
1820    /// The underlying task spawn failed.
1821    Spawn(SpawnError),
1822    /// The name was already taken in the registry.
1823    NameTaken(crate::cx::NameLeaseError),
1824}
1825
1826impl std::fmt::Display for NamedSpawnError {
1827    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828        match self {
1829            Self::Spawn(e) => write!(f, "named server spawn failed: {e}"),
1830            Self::NameTaken(e) => write!(f, "named server registration failed: {e}"),
1831        }
1832    }
1833}
1834
1835impl std::error::Error for NamedSpawnError {}
1836
1837/// Error returned when releasing a stopped named server's registry entry fails.
1838#[derive(Debug)]
1839pub enum ReleaseNameError {
1840    /// The server is still running; stop + drain or join it first.
1841    StillRunning,
1842    /// Resolving the name lease failed.
1843    Lease(crate::cx::NameLeaseError),
1844}
1845
1846impl std::fmt::Display for ReleaseNameError {
1847    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1848        match self {
1849            Self::StillRunning => write!(f, "named server is still running"),
1850            Self::Lease(err) => write!(f, "named server lease resolution failed: {err}"),
1851        }
1852    }
1853}
1854
1855impl std::error::Error for ReleaseNameError {}
1856
1857/// Handle to a running **named** GenServer.
1858///
1859/// Wraps a [`GenServerHandle`] together with a [`NameLease`] from the
1860/// registry. The lease is an obligation (drop bomb): callers must resolve it
1861/// by calling [`release_name`](Self::release_name) or
1862/// [`abort_lease`](Self::abort_lease) before dropping.
1863///
1864/// All `call`, `cast`, `info` methods delegate to the inner handle.
1865#[derive(Debug)]
1866pub struct NamedGenServerHandle<S: GenServer> {
1867    handle: GenServerHandle<S>,
1868    lease: Option<crate::cx::NameLease>,
1869}
1870
1871impl<S: GenServer> NamedGenServerHandle<S> {
1872    /// The registered name of this server.
1873    #[inline]
1874    #[must_use]
1875    pub fn name(&self) -> &str {
1876        self.lease
1877            .as_ref()
1878            .map_or("(released)", crate::cx::NameLease::name)
1879    }
1880
1881    /// The underlying task ID.
1882    #[inline]
1883    #[must_use]
1884    pub fn task_id(&self) -> TaskId {
1885        self.handle.task_id()
1886    }
1887
1888    /// The actor ID of this server.
1889    #[inline]
1890    #[must_use]
1891    pub fn actor_id(&self) -> ActorId {
1892        self.handle.actor_id()
1893    }
1894
1895    /// Whether the server has finished execution.
1896    #[inline]
1897    #[must_use]
1898    pub fn is_finished(&self) -> bool {
1899        self.handle.is_finished()
1900    }
1901
1902    /// Create a lightweight server reference for sending messages.
1903    #[must_use]
1904    pub fn server_ref(&self) -> GenServerRef<S> {
1905        self.handle.server_ref()
1906    }
1907
1908    /// Access the inner (unnamed) handle.
1909    #[must_use]
1910    pub fn inner(&self) -> &GenServerHandle<S> {
1911        &self.handle
1912    }
1913
1914    /// Signal the server to stop gracefully.
1915    pub fn stop(&self) {
1916        self.handle.stop();
1917    }
1918
1919    /// Release the name lease (commit) after the server has already stopped.
1920    ///
1921    /// Callers should first request graceful shutdown via [`stop`](Self::stop)
1922    /// and then drive or join the server to completion. This method only
1923    /// removes the registry entry and resolves the lease once the underlying
1924    /// server task is no longer live, preserving the invariant that a running
1925    /// server keeps its name.
1926    ///
1927    /// # Errors
1928    ///
1929    /// Returns [`ReleaseNameError::StillRunning`] if the underlying server
1930    /// task has not finished yet. Returns [`ReleaseNameError::Lease`] if
1931    /// the lease was already resolved or moved out via
1932    /// [`take_lease`](Self::take_lease), or if lease resolution fails.
1933    pub fn release_name(
1934        &mut self,
1935        registry: &mut crate::cx::NameRegistry,
1936        now: Time,
1937    ) -> Result<(), ReleaseNameError> {
1938        let Some(lease) = self.lease.as_ref() else {
1939            return Err(ReleaseNameError::Lease(
1940                crate::cx::NameLeaseError::AlreadyResolved,
1941            ));
1942        };
1943        if !lease.is_active() {
1944            return Err(ReleaseNameError::Lease(
1945                crate::cx::NameLeaseError::AlreadyResolved,
1946            ));
1947        }
1948
1949        if !self.handle.is_finished() {
1950            return Err(ReleaseNameError::StillRunning);
1951        }
1952
1953        registry
1954            .unregister_owned_and_grant(lease, now)
1955            .map(|_proof| ())
1956            .map_err(ReleaseNameError::Lease)
1957            .and_then(|()| {
1958                self.lease
1959                    .take()
1960                    .ok_or(ReleaseNameError::Lease(
1961                        crate::cx::NameLeaseError::AlreadyResolved,
1962                    ))?
1963                    .release()
1964                    .map(|_proof| ())
1965                    .map_err(ReleaseNameError::Lease)
1966            })
1967    }
1968
1969    /// Abort the name lease without stopping the server.
1970    ///
1971    /// Use this for cancellation / error paths where the name registration
1972    /// itself should be rolled back.
1973    ///
1974    /// # Errors
1975    ///
1976    /// Returns [`crate::cx::NameLeaseError::AlreadyResolved`] if the lease was
1977    /// already resolved or moved out via [`take_lease`](Self::take_lease).
1978    pub fn abort_lease(
1979        &mut self,
1980        registry: &mut crate::cx::NameRegistry,
1981        now: Time,
1982    ) -> Result<(), crate::cx::NameLeaseError> {
1983        let Some(lease) = self.lease.as_ref() else {
1984            return Err(crate::cx::NameLeaseError::AlreadyResolved);
1985        };
1986        if !lease.is_active() {
1987            return Err(crate::cx::NameLeaseError::AlreadyResolved);
1988        }
1989        registry.unregister_owned_and_grant(lease, now)?;
1990        self.lease
1991            .take()
1992            .ok_or(crate::cx::NameLeaseError::AlreadyResolved)?
1993            .abort()
1994            .map(|_proof| ())
1995    }
1996
1997    /// Take ownership of the lease (for manual lifecycle management).
1998    ///
1999    /// After this call, the handle no longer owns the lease; the caller is
2000    /// responsible for removing the matching registry entry (for example via
2001    /// [`crate::cx::NameRegistry::unregister_owned_and_grant`]) and then
2002    /// resolving the lease obligation.
2003    pub fn take_lease(&mut self) -> Option<crate::cx::NameLease> {
2004        self.lease.take()
2005    }
2006}
2007
2008// ============================================================================
2009// Supervised named-start helper (bd-1hvy1)
2010// ============================================================================
2011
2012/// Child-start helper for running a named GenServer under supervision.
2013///
2014/// This adapter wires together:
2015/// 1. Name lease acquisition (`spawn_named_gen_server`)
2016/// 2. Server task storage in runtime state
2017/// 3. Deterministic lease/name cleanup on region stop via a sync finalizer
2018///
2019/// Use this when building [`crate::supervision::ChildSpec`] entries for named
2020/// services.
2021pub struct NamedGenServerStart<S, F>
2022where
2023    S: GenServer,
2024    F: FnMut() -> S + Send + 'static,
2025{
2026    registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
2027    name: String,
2028    mailbox_capacity: usize,
2029    make_server: F,
2030}
2031
2032/// Construct a [`NamedGenServerStart`] helper for supervised named services.
2033#[must_use]
2034pub fn named_gen_server_start<S, F>(
2035    registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
2036    name: impl Into<String>,
2037    mailbox_capacity: usize,
2038    make_server: F,
2039) -> NamedGenServerStart<S, F>
2040where
2041    S: GenServer,
2042    F: FnMut() -> S + Send + 'static,
2043{
2044    NamedGenServerStart {
2045        registry,
2046        name: name.into(),
2047        mailbox_capacity,
2048        make_server,
2049    }
2050}
2051
2052impl<S, F> crate::supervision::ChildStart for NamedGenServerStart<S, F>
2053where
2054    S: GenServer,
2055    F: FnMut() -> S + Send + 'static,
2056{
2057    fn start(
2058        &mut self,
2059        scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
2060        state: &mut crate::runtime::RuntimeState,
2061        cx: &crate::cx::Cx,
2062    ) -> Result<TaskId, SpawnError> {
2063        let now = state.now;
2064        let server = (self.make_server)();
2065        let (mut named_handle, stored) = scope
2066            .spawn_named_gen_server(
2067                state,
2068                cx,
2069                &mut self.registry.lock(),
2070                self.name.clone(),
2071                server,
2072                self.mailbox_capacity,
2073                now,
2074            )
2075            .map_err(|err| match err {
2076                NamedSpawnError::Spawn(spawn_err) => spawn_err,
2077                NamedSpawnError::NameTaken(name_err) => SpawnError::NameRegistrationFailed {
2078                    name: self.name.clone(),
2079                    reason: name_err.to_string(),
2080                },
2081            })?;
2082
2083        let task_id = named_handle.task_id();
2084        state.store_spawned_task(task_id, stored);
2085
2086        let lease_slot = Arc::new(parking_lot::Mutex::new(named_handle.take_lease()));
2087        let lease_slot_for_finalizer = Arc::clone(&lease_slot);
2088        let registry_for_finalizer = Arc::clone(&self.registry);
2089        let finalizer_registered = scope.defer_sync(state, move || {
2090            let lease_to_resolve = lease_slot_for_finalizer.lock().take();
2091            if let Some(mut lease) = lease_to_resolve {
2092                let _ = registry_for_finalizer
2093                    .lock()
2094                    .unregister_owned_and_grant(&lease, Time::ZERO);
2095                let _ = lease.release();
2096            }
2097        });
2098
2099        if !finalizer_registered {
2100            let lease_to_abort = lease_slot.lock().take();
2101            if let Some(mut lease) = lease_to_abort {
2102                let _ = self.registry.lock().unregister_owned_and_grant(&lease, now);
2103                let _ = lease.abort();
2104            }
2105            if let Some(region_record) = state.region(scope.region_id()) {
2106                region_record.remove_task(task_id);
2107            }
2108            state.remove_task(task_id);
2109            return Err(SpawnError::RegionClosed(scope.region_id()));
2110        }
2111
2112        Ok(task_id)
2113    }
2114}
2115
2116// ============================================================================
2117// Tests
2118// ============================================================================
2119
2120#[cfg(test)]
2121mod tests {
2122    use super::*;
2123    use crate::runtime::state::RuntimeState;
2124    use crate::runtime::yield_now;
2125    use crate::supervision::ChildStart;
2126    use crate::types::Budget;
2127    use crate::types::CancelKind;
2128    use crate::types::policy::FailFast;
2129    use crate::util::ArenaIndex;
2130    use parking_lot::Mutex;
2131    use std::sync::Arc;
2132    use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
2133
2134    fn init_test(name: &str) {
2135        crate::test_utils::init_test_logging();
2136        crate::test_phase!(name);
2137    }
2138
2139    // ---- Simple Counter GenServer ----
2140
2141    #[derive(Debug)]
2142    struct Counter {
2143        count: u64,
2144    }
2145
2146    enum CounterCall {
2147        Get,
2148        Add(u64),
2149    }
2150
2151    enum CounterCast {
2152        Reset,
2153    }
2154
2155    impl GenServer for Counter {
2156        type Call = CounterCall;
2157        type Reply = u64;
2158        type Cast = CounterCast;
2159        type Info = SystemMsg;
2160
2161        fn handle_call(
2162            &mut self,
2163            _cx: &Cx,
2164            request: CounterCall,
2165            reply: Reply<u64>,
2166        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2167            match request {
2168                CounterCall::Get => {
2169                    let _ = reply.send(self.count);
2170                }
2171                CounterCall::Add(n) => {
2172                    self.count += n;
2173                    let _ = reply.send(self.count);
2174                }
2175            }
2176            Box::pin(async {})
2177        }
2178
2179        fn handle_cast(
2180            &mut self,
2181            _cx: &Cx,
2182            msg: CounterCast,
2183        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2184            match msg {
2185                CounterCast::Reset => self.count = 0,
2186            }
2187            Box::pin(async {})
2188        }
2189    }
2190
2191    #[derive(Clone)]
2192    struct StartBudgetProbe {
2193        started_priority: Arc<AtomicU8>,
2194        loop_priority: Arc<AtomicU8>,
2195    }
2196
2197    impl GenServer for StartBudgetProbe {
2198        type Call = CounterCall;
2199        type Reply = u8;
2200        type Cast = CounterCast;
2201        type Info = SystemMsg;
2202
2203        fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2204            self.started_priority
2205                .store(cx.budget().priority, Ordering::SeqCst);
2206            Box::pin(async {})
2207        }
2208
2209        fn on_start_budget(&self) -> Budget {
2210            Budget::new().with_priority(200)
2211        }
2212
2213        fn handle_call(
2214            &mut self,
2215            cx: &Cx,
2216            _request: CounterCall,
2217            reply: Reply<u8>,
2218        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2219            self.loop_priority
2220                .store(cx.budget().priority, Ordering::SeqCst);
2221            let _ = reply.send(cx.budget().priority);
2222            Box::pin(async {})
2223        }
2224    }
2225
2226    struct StopMaskProbe {
2227        stop_checkpoint_ok: Arc<AtomicU8>,
2228    }
2229
2230    impl GenServer for StopMaskProbe {
2231        type Call = CounterCall;
2232        type Reply = u8;
2233        type Cast = CounterCast;
2234        type Info = SystemMsg;
2235
2236        fn handle_call(
2237            &mut self,
2238            _cx: &Cx,
2239            _request: CounterCall,
2240            reply: Reply<u8>,
2241        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2242            let _ = reply.send(0);
2243            Box::pin(async {})
2244        }
2245
2246        fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2247            let ok = cx.checkpoint().is_ok();
2248            self.stop_checkpoint_ok
2249                .store(u8::from(ok), Ordering::SeqCst);
2250            Box::pin(async {})
2251        }
2252    }
2253
2254    enum InitProbeCall {
2255        GetStarted,
2256    }
2257
2258    struct InitProbe {
2259        started: Arc<AtomicU8>,
2260        checkpoints: Arc<Mutex<Vec<serde_json::Value>>>,
2261    }
2262
2263    impl GenServer for InitProbe {
2264        type Call = InitProbeCall;
2265        type Reply = bool;
2266        type Cast = ();
2267        type Info = SystemMsg;
2268
2269        fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2270            self.started.store(1, Ordering::SeqCst);
2271            let started = Arc::clone(&self.started);
2272            let checkpoints = Arc::clone(&self.checkpoints);
2273            Box::pin(async move {
2274                let event = serde_json::json!({
2275                    "phase": "on_start",
2276                    "started": started.load(Ordering::SeqCst),
2277                });
2278                tracing::info!(event = %event, "gen_server_lab_checkpoint");
2279                checkpoints.lock().push(event);
2280                yield_now().await;
2281            })
2282        }
2283
2284        fn handle_call(
2285            &mut self,
2286            _cx: &Cx,
2287            request: InitProbeCall,
2288            reply: Reply<bool>,
2289        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2290            match request {
2291                InitProbeCall::GetStarted => {
2292                    let started = self.started.load(Ordering::SeqCst) == 1;
2293                    let event = serde_json::json!({
2294                        "phase": "handle_call",
2295                        "started": started,
2296                    });
2297                    tracing::info!(event = %event, "gen_server_lab_checkpoint");
2298                    self.checkpoints.lock().push(event);
2299                    let _ = reply.send(started);
2300                }
2301            }
2302            Box::pin(async {})
2303        }
2304    }
2305
2306    fn assert_gen_server<S: GenServer>() {}
2307
2308    #[test]
2309    fn gen_server_trait_bounds() {
2310        init_test("gen_server_trait_bounds");
2311        assert_gen_server::<Counter>();
2312        crate::test_complete!("gen_server_trait_bounds");
2313    }
2314
2315    #[test]
2316    fn gen_server_spawn_and_cast() {
2317        init_test("gen_server_spawn_and_cast");
2318
2319        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2320        let region = runtime.state.create_root_region(Budget::INFINITE);
2321        let cx = Cx::for_testing();
2322        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2323
2324        let (handle, stored) = scope
2325            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2326            .unwrap();
2327        let task_id = handle.task_id();
2328        runtime.state.store_spawned_task(task_id, stored);
2329
2330        // Cast a reset (fire-and-forget)
2331        handle.try_cast(CounterCast::Reset).unwrap();
2332
2333        // Drop handle to disconnect
2334        drop(handle);
2335
2336        {
2337            runtime.scheduler.lock().schedule(task_id, 0);
2338        }
2339        runtime.run_until_quiescent();
2340
2341        crate::test_complete!("gen_server_spawn_and_cast");
2342    }
2343
2344    #[test]
2345    fn gen_server_spawn_and_call() {
2346        init_test("gen_server_spawn_and_call");
2347
2348        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2349        let region = runtime.state.create_root_region(Budget::INFINITE);
2350        let cx = Cx::for_testing();
2351        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2352
2353        let (handle, stored) = scope
2354            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2355            .unwrap();
2356        let server_task_id = handle.task_id();
2357        runtime.state.store_spawned_task(server_task_id, stored);
2358
2359        let server_ref = handle.server_ref();
2360        let (mut client_handle, client_stored) = scope
2361            .spawn(&mut runtime.state, &cx, move |cx| async move {
2362                server_ref.call(&cx, CounterCall::Add(5)).await.unwrap()
2363            })
2364            .unwrap();
2365        let client_task_id = client_handle.task_id();
2366        runtime
2367            .state
2368            .store_spawned_task(client_task_id, client_stored);
2369
2370        {
2371            runtime.scheduler.lock().schedule(server_task_id, 0);
2372        }
2373        {
2374            runtime.scheduler.lock().schedule(client_task_id, 0);
2375        }
2376        runtime.run_until_quiescent();
2377
2378        let result =
2379            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2380        assert_eq!(result, 5);
2381
2382        crate::test_complete!("gen_server_spawn_and_call");
2383    }
2384
2385    #[test]
2386    fn gen_server_init_runs_before_queued_call_under_lab_runtime() {
2387        init_test("gen_server_init_runs_before_queued_call_under_lab_runtime");
2388
2389        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(0x6E57_1001));
2390        let region = runtime.state.create_root_region(Budget::INFINITE);
2391        let cx = Cx::for_testing();
2392        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2393        let started = Arc::new(AtomicU8::new(0));
2394        let checkpoints = Arc::new(Mutex::new(Vec::new()));
2395
2396        let (mut handle, stored) = scope
2397            .spawn_gen_server(
2398                &mut runtime.state,
2399                &cx,
2400                InitProbe {
2401                    started: Arc::clone(&started),
2402                    checkpoints: Arc::clone(&checkpoints),
2403                },
2404                8,
2405            )
2406            .expect("spawn should succeed");
2407        let server_task_id = handle.task_id();
2408        runtime.state.store_spawned_task(server_task_id, stored);
2409
2410        let server_ref = handle.server_ref();
2411        let checkpoints_for_client = Arc::clone(&checkpoints);
2412        let (mut client_handle, client_stored) = scope
2413            .spawn(&mut runtime.state, &cx, move |cx| async move {
2414                let started = server_ref
2415                    .call(&cx, InitProbeCall::GetStarted)
2416                    .await
2417                    .expect("init probe call should succeed");
2418                let event = serde_json::json!({
2419                    "phase": "client_completed",
2420                    "started": started,
2421                });
2422                tracing::info!(event = %event, "gen_server_lab_checkpoint");
2423                checkpoints_for_client.lock().push(event);
2424                started
2425            })
2426            .expect("client spawn should succeed");
2427        let client_task_id = client_handle.task_id();
2428        runtime
2429            .state
2430            .store_spawned_task(client_task_id, client_stored);
2431
2432        {
2433            runtime.scheduler.lock().schedule(client_task_id, 0);
2434        }
2435        runtime.run_until_idle();
2436        {
2437            runtime.scheduler.lock().schedule(server_task_id, 0);
2438        }
2439        runtime.run_until_idle();
2440        {
2441            runtime.scheduler.lock().schedule(client_task_id, 0);
2442        }
2443        runtime.run_until_idle();
2444
2445        let call_saw_initialized =
2446            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2447        crate::assert_with_log!(
2448            call_saw_initialized,
2449            "queued call observes completed gen_server init",
2450            true,
2451            call_saw_initialized
2452        );
2453        crate::assert_with_log!(
2454            started.load(Ordering::SeqCst) == 1,
2455            "on_start marks server initialized",
2456            1,
2457            started.load(Ordering::SeqCst)
2458        );
2459
2460        handle.stop();
2461        {
2462            runtime.scheduler.lock().schedule(server_task_id, 0);
2463        }
2464        runtime.run_until_quiescent();
2465
2466        let server = futures_lite::future::block_on(handle.join(&cx)).expect("server join ok");
2467        crate::assert_with_log!(
2468            server.started.load(Ordering::SeqCst) == 1,
2469            "joined server preserves initialized state",
2470            1,
2471            server.started.load(Ordering::SeqCst)
2472        );
2473
2474        let checkpoint_snapshot = checkpoints.lock().clone();
2475        crate::assert_with_log!(
2476            checkpoint_snapshot.len() == 3,
2477            "lab runtime emits init/call/client checkpoints",
2478            3,
2479            checkpoint_snapshot.len()
2480        );
2481        crate::assert_with_log!(
2482            checkpoint_snapshot[0]["phase"] == "on_start",
2483            "on_start checkpoint recorded first",
2484            "on_start",
2485            checkpoint_snapshot[0]["phase"].clone()
2486        );
2487        crate::assert_with_log!(
2488            runtime.is_quiescent(),
2489            "gen_server init test reaches lab quiescence",
2490            true,
2491            runtime.is_quiescent()
2492        );
2493
2494        crate::test_complete!("gen_server_init_runs_before_queued_call_under_lab_runtime");
2495    }
2496
2497    #[test]
2498    #[allow(clippy::items_after_statements, clippy::too_many_lines)]
2499    fn gen_server_spawn_inherits_full_child_cx_capabilities() {
2500        use crate::cx::registry::RegistryHandle;
2501        use crate::evidence_sink::{CollectorSink, EvidenceSink};
2502        use crate::observability::{LogCollector, LogLevel};
2503        use crate::remote::{NodeId, RemoteCap};
2504        use franken_evidence::EvidenceLedgerBuilder;
2505
2506        init_test("gen_server_spawn_inherits_full_child_cx_capabilities");
2507
2508        #[derive(Debug, Default)]
2509        #[allow(clippy::struct_excessive_bools)]
2510        struct CapabilityProbe {
2511            has_timer: bool,
2512            has_io_driver: bool,
2513            has_registry: bool,
2514            has_remote: bool,
2515            has_blocking_pool: bool,
2516            has_log_collector: bool,
2517            remote_origin: Option<String>,
2518            logical_tick_advanced: bool,
2519        }
2520
2521        impl GenServer for CapabilityProbe {
2522            type Call = ();
2523            type Reply = ();
2524            type Cast = ();
2525            type Info = SystemMsg;
2526
2527            fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2528                self.has_timer = cx.has_timer();
2529                self.has_io_driver = cx.io_driver_handle().is_some();
2530                self.has_registry = cx.registry_handle().is_some();
2531                self.has_remote = cx.has_remote();
2532                self.has_blocking_pool = cx.blocking_pool_handle().is_some();
2533                self.has_log_collector = cx.log_collector().is_some();
2534                self.remote_origin = cx.remote().map(|remote| remote.local_node().to_string());
2535                let before = cx.logical_now();
2536                let after = cx.logical_tick();
2537                self.logical_tick_advanced = after > before;
2538                cx.trace("gen_server_capability_probe_trace");
2539                let entry = EvidenceLedgerBuilder::new()
2540                    .ts_unix_ms(1_700_000_000_000)
2541                    .component("gen_server_capability_probe")
2542                    .action("on_start")
2543                    .posterior(vec![0.6, 0.4])
2544                    .chosen_expected_loss(0.1)
2545                    .calibration_score(0.85)
2546                    .build()
2547                    .expect("evidence entry");
2548                cx.emit_evidence(&entry);
2549                Box::pin(async {})
2550            }
2551
2552            fn handle_call(
2553                &mut self,
2554                _cx: &Cx,
2555                _request: (),
2556                reply: Reply<()>,
2557            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2558                let _ = reply.send(());
2559                Box::pin(async {})
2560            }
2561        }
2562
2563        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2564        let clock = Arc::new(crate::time::VirtualClock::new());
2565        runtime
2566            .state
2567            .set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
2568
2569        let registry = crate::cx::NameRegistry::new();
2570        let registry_handle = RegistryHandle::new(Arc::new(registry));
2571        let sink = Arc::new(CollectorSink::new());
2572        let collector = LogCollector::new(16).with_min_level(LogLevel::Trace);
2573        let blocking_pool = crate::runtime::blocking_pool::BlockingPool::new(1, 1);
2574        let cx = Cx::for_testing()
2575            .with_registry_handle(Some(registry_handle))
2576            .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")))
2577            .with_blocking_pool_handle(Some(blocking_pool.handle()))
2578            .with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
2579        cx.set_log_collector(collector.clone());
2580
2581        let region = runtime.state.create_root_region(Budget::INFINITE);
2582        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2583
2584        let (mut handle, stored) = scope
2585            .spawn_gen_server(&mut runtime.state, &cx, CapabilityProbe::default(), 8)
2586            .expect("spawn should succeed");
2587        let task_id = handle.task_id();
2588        runtime.state.store_spawned_task(task_id, stored);
2589
2590        {
2591            runtime.scheduler.lock().schedule(task_id, 0);
2592        }
2593        runtime.run_until_idle();
2594
2595        handle.stop();
2596        {
2597            runtime.scheduler.lock().schedule(task_id, 0);
2598        }
2599        runtime.run_until_quiescent();
2600
2601        let server = futures_lite::future::block_on(handle.join(&cx)).expect("join ok");
2602        assert!(
2603            server.has_timer,
2604            "gen server child cx must inherit timer driver"
2605        );
2606        assert!(
2607            server.has_io_driver,
2608            "gen server child cx must inherit the runtime I/O driver",
2609        );
2610        assert!(
2611            server.has_registry,
2612            "gen server child cx must inherit registry handle",
2613        );
2614        assert!(
2615            server.has_remote,
2616            "gen server child cx must inherit remote cap"
2617        );
2618        assert!(
2619            server.has_blocking_pool,
2620            "gen server child cx must inherit blocking-pool capability",
2621        );
2622        assert!(
2623            server.has_log_collector,
2624            "gen server child cx must inherit observability collector state",
2625        );
2626        assert_eq!(server.remote_origin.as_deref(), Some("Node(origin-test)"));
2627        assert!(
2628            server.logical_tick_advanced,
2629            "gen server child cx must inherit a live logical clock",
2630        );
2631        let entries = sink.entries();
2632        assert_eq!(
2633            entries.len(),
2634            1,
2635            "gen server child cx must inherit evidence sink"
2636        );
2637        assert_eq!(entries[0].component, "gen_server_capability_probe");
2638        assert!(
2639            collector
2640                .peek()
2641                .iter()
2642                .any(|entry| entry.message() == "gen_server_capability_probe_trace"),
2643            "gen server child cx must inherit trace/log collector wiring",
2644        );
2645
2646        crate::test_complete!("gen_server_spawn_inherits_full_child_cx_capabilities");
2647    }
2648
2649    #[test]
2650    fn gen_server_call_cancellation_is_deterministic() {
2651        init_test("gen_server_call_cancellation_is_deterministic");
2652
2653        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2654        let region = runtime.state.create_root_region(Budget::INFINITE);
2655        let cx = Cx::for_testing();
2656        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2657
2658        let (handle, stored) = scope
2659            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2660            .unwrap();
2661        let server_task_id = handle.task_id();
2662        runtime.state.store_spawned_task(server_task_id, stored);
2663
2664        let server_ref = handle.server_ref();
2665
2666        let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2667        let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2668
2669        let (mut client_handle, client_stored) = scope
2670            .spawn(&mut runtime.state, &cx, move |cx| async move {
2671                {
2672                    *client_cx_cell_for_task.lock() = Some(cx.clone());
2673                }
2674                server_ref.call(&cx, CounterCall::Get).await
2675            })
2676            .unwrap();
2677        let client_task_id = client_handle.task_id();
2678        runtime
2679            .state
2680            .store_spawned_task(client_task_id, client_stored);
2681
2682        // Poll the client once: it should enqueue the call and then block waiting for reply.
2683        {
2684            runtime.scheduler.lock().schedule(client_task_id, 0);
2685        }
2686        runtime.run_until_idle();
2687
2688        // Cancel the client deterministically, then poll it again to observe the cancellation.
2689        let client_cx = client_cx_cell
2690            .lock()
2691            .as_ref()
2692            .expect("client cx published")
2693            .clone();
2694        client_cx.cancel_with(CancelKind::User, Some("gen_server call cancelled"));
2695
2696        {
2697            runtime.scheduler.lock().schedule(client_task_id, 0);
2698        }
2699        runtime.run_until_idle();
2700
2701        let result =
2702            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2703        match result {
2704            Ok(_) => unreachable!("expected cancellation, got Ok"),
2705            Err(CallError::Cancelled(reason)) => {
2706                assert_eq!(reason.kind, CancelKind::User);
2707                assert_eq!(
2708                    reason.message,
2709                    Some("gen_server call cancelled".to_string())
2710                );
2711            }
2712            Err(other) => unreachable!("expected CallError::Cancelled, got {other:?}"),
2713        }
2714
2715        // Cleanup: disconnect the server and let it drain the queued call.
2716        drop(handle);
2717        {
2718            runtime.scheduler.lock().schedule(server_task_id, 0);
2719        }
2720        runtime.run_until_quiescent();
2721
2722        crate::test_complete!("gen_server_call_cancellation_is_deterministic");
2723    }
2724
2725    #[test]
2726    fn supervised_gen_server_stays_alive() {
2727        init_test("supervised_gen_server_stays_alive");
2728
2729        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2730        let region = runtime.state.create_root_region(Budget::INFINITE);
2731        let cx = Cx::for_testing();
2732        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2733        let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
2734
2735        let mut starter =
2736            named_gen_server_start(Arc::clone(&registry), "persistent_service", 32, || {
2737                Counter { count: 0 }
2738            });
2739
2740        let task_id = starter
2741            .start(&scope, &mut runtime.state, &cx)
2742            .expect("start ok");
2743
2744        // Run runtime. The server should start, init, and enter loop.
2745        // It should NOT exit just because the starter dropped the handle.
2746        {
2747            runtime.scheduler.lock().schedule(task_id, 0);
2748        }
2749        runtime.run_until_idle();
2750
2751        let task = runtime.state.task(task_id).expect("task exists");
2752        crate::assert_with_log!(
2753            !task.state.is_terminal(),
2754            "server should be alive",
2755            "Running",
2756            format!("{:?}", task.state)
2757        );
2758
2759        // Cleanup: cancel the region and drive the cancellation to quiescence.
2760        let tasks_to_schedule =
2761            runtime
2762                .state
2763                .cancel_request(region, &CancelReason::user("test done"), None);
2764        for (tid, priority) in tasks_to_schedule {
2765            runtime.scheduler.lock().schedule(tid, priority);
2766        }
2767        {
2768            runtime.scheduler.lock().schedule(task_id, 0);
2769        }
2770        runtime.run_until_quiescent();
2771
2772        assert!(
2773            registry.lock().whereis("persistent_service").is_none(),
2774            "name must be removed after region stop",
2775        );
2776
2777        crate::test_complete!("supervised_gen_server_stays_alive");
2778    }
2779
2780    #[test]
2781    fn gen_server_cast_cancellation_is_deterministic() {
2782        init_test("gen_server_cast_cancellation_is_deterministic");
2783
2784        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2785        let region = runtime.state.create_root_region(Budget::INFINITE);
2786        let cx = Cx::for_testing();
2787        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2788
2789        // Use a tiny mailbox and pre-fill it so the next cast blocks and is cancelable.
2790        let (handle, stored) = scope
2791            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
2792            .unwrap();
2793        let server_task_id = handle.task_id();
2794        runtime.state.store_spawned_task(server_task_id, stored);
2795
2796        let server_ref = handle.server_ref();
2797
2798        futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset))
2799            .expect("prefill cast ok");
2800
2801        let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2802        let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2803
2804        let (mut client_handle, client_stored) = scope
2805            .spawn(&mut runtime.state, &cx, move |cx| async move {
2806                {
2807                    *client_cx_cell_for_task.lock() = Some(cx.clone());
2808                }
2809                server_ref.cast(&cx, CounterCast::Reset).await
2810            })
2811            .unwrap();
2812        let client_task_id = client_handle.task_id();
2813        runtime
2814            .state
2815            .store_spawned_task(client_task_id, client_stored);
2816
2817        // Poll the client once: it should block waiting for mailbox capacity.
2818        {
2819            runtime.scheduler.lock().schedule(client_task_id, 0);
2820        }
2821        runtime.run_until_idle();
2822
2823        // Cancel the client deterministically, then poll it again to observe the cancellation.
2824        let client_cx = client_cx_cell
2825            .lock()
2826            .as_ref()
2827            .expect("client cx published")
2828            .clone();
2829        client_cx.cancel_with(CancelKind::User, Some("gen_server cast cancelled"));
2830
2831        {
2832            runtime.scheduler.lock().schedule(client_task_id, 0);
2833        }
2834        runtime.run_until_idle();
2835
2836        let result =
2837            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2838        match result {
2839            Ok(()) => unreachable!("expected cancellation, got Ok"),
2840            Err(CastError::Cancelled(reason)) => {
2841                assert_eq!(reason.kind, CancelKind::User);
2842                assert_eq!(
2843                    reason.message,
2844                    Some("gen_server cast cancelled".to_string())
2845                );
2846            }
2847            Err(other) => unreachable!("expected CastError::Cancelled, got {other:?}"),
2848        }
2849
2850        // Cleanup: disconnect the server and let it drain the mailbox.
2851        drop(handle);
2852        {
2853            runtime.scheduler.lock().schedule(server_task_id, 0);
2854        }
2855        runtime.run_until_quiescent();
2856
2857        crate::test_complete!("gen_server_cast_cancellation_is_deterministic");
2858    }
2859
2860    #[test]
2861    fn gen_server_handle_accessors() {
2862        init_test("gen_server_handle_accessors");
2863
2864        let mut state = RuntimeState::new();
2865        let root = state.create_root_region(Budget::INFINITE);
2866        let cx = Cx::for_testing();
2867        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2868
2869        let (handle, stored) = scope
2870            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2871            .unwrap();
2872        state.store_spawned_task(handle.task_id(), stored);
2873
2874        let _actor_id = handle.actor_id();
2875        let _task_id = handle.task_id();
2876        assert!(!handle.is_finished());
2877
2878        let server_ref = handle.server_ref();
2879        assert_eq!(server_ref.actor_id(), handle.actor_id());
2880        assert!(server_ref.is_alive());
2881        assert!(!server_ref.is_closed());
2882
2883        crate::test_complete!("gen_server_handle_accessors");
2884    }
2885
2886    #[test]
2887    fn gen_server_ref_is_cloneable() {
2888        init_test("gen_server_ref_is_cloneable");
2889
2890        let mut state = RuntimeState::new();
2891        let root = state.create_root_region(Budget::INFINITE);
2892        let cx = Cx::for_testing();
2893        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2894
2895        let (handle, stored) = scope
2896            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2897            .unwrap();
2898        state.store_spawned_task(handle.task_id(), stored);
2899
2900        let ref1 = handle.server_ref();
2901        let ref2 = ref1.clone();
2902        assert_eq!(ref1.actor_id(), ref2.actor_id());
2903
2904        crate::test_complete!("gen_server_ref_is_cloneable");
2905    }
2906
2907    #[test]
2908    fn gen_server_stop_transitions() {
2909        init_test("gen_server_stop_transitions");
2910
2911        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2912        let region = runtime.state.create_root_region(Budget::INFINITE);
2913        let cx = Cx::for_testing();
2914        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2915
2916        let (handle, stored) = scope
2917            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2918            .unwrap();
2919        let task_id = handle.task_id();
2920        runtime.state.store_spawned_task(task_id, stored);
2921
2922        let server_ref = handle.server_ref();
2923        assert!(server_ref.is_alive());
2924
2925        handle.stop();
2926
2927        {
2928            runtime.scheduler.lock().schedule(task_id, 0);
2929        }
2930        runtime.run_until_quiescent();
2931
2932        assert!(handle.is_finished());
2933        assert!(!server_ref.is_alive());
2934
2935        crate::test_complete!("gen_server_stop_transitions");
2936    }
2937
2938    #[test]
2939    fn gen_server_handle_rejects_call_and_cast_after_stop() {
2940        init_test("gen_server_handle_rejects_call_and_cast_after_stop");
2941
2942        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2943        let region = runtime.state.create_root_region(Budget::INFINITE);
2944        let cx = Cx::for_testing();
2945        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2946
2947        let (handle, stored) = scope
2948            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2949            .unwrap();
2950        let task_id = handle.task_id();
2951        runtime.state.store_spawned_task(task_id, stored);
2952
2953        // Let the server start, then request stop.
2954        {
2955            runtime.scheduler.lock().schedule(task_id, 0);
2956        }
2957        runtime.run_until_idle();
2958        handle.stop();
2959
2960        let call_err =
2961            futures_lite::future::block_on(handle.call(&cx, CounterCall::Get)).unwrap_err();
2962        assert!(
2963            matches!(call_err, CallError::ServerStopped),
2964            "call after stop must return ServerStopped, got {call_err:?}"
2965        );
2966
2967        let cast_err =
2968            futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset)).unwrap_err();
2969        assert!(
2970            matches!(cast_err, CastError::ServerStopped),
2971            "cast after stop must return ServerStopped, got {cast_err:?}"
2972        );
2973
2974        {
2975            runtime.scheduler.lock().schedule(task_id, 0);
2976        }
2977        runtime.run_until_quiescent();
2978        assert!(handle.is_finished());
2979
2980        crate::test_complete!("gen_server_handle_rejects_call_and_cast_after_stop");
2981    }
2982
2983    #[test]
2984    fn gen_server_handle_join_returns_final_state_after_stop() {
2985        init_test("gen_server_handle_join_returns_final_state_after_stop");
2986
2987        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2988        let region = runtime.state.create_root_region(Budget::INFINITE);
2989        let cx = Cx::for_testing();
2990        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2991
2992        let (mut handle, stored) = scope
2993            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2994            .unwrap();
2995        let task_id = handle.task_id();
2996        runtime.state.store_spawned_task(task_id, stored);
2997
2998        handle.stop();
2999        {
3000            runtime.scheduler.lock().schedule(task_id, 0);
3001        }
3002        runtime.run_until_quiescent();
3003        assert!(handle.is_finished());
3004
3005        let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("join");
3006        assert_eq!(
3007            final_state.count, 0,
3008            "final server state should be returned"
3009        );
3010
3011        crate::test_complete!("gen_server_handle_join_returns_final_state_after_stop");
3012    }
3013
3014    #[test]
3015    fn gen_server_handle_second_join_fails_closed() {
3016        init_test("gen_server_handle_second_join_fails_closed");
3017
3018        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3019        let region = runtime.state.create_root_region(Budget::INFINITE);
3020        let cx = Cx::for_testing();
3021        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3022
3023        let (mut handle, stored) = scope
3024            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
3025            .unwrap();
3026        let task_id = handle.task_id();
3027        runtime.state.store_spawned_task(task_id, stored);
3028
3029        handle.stop();
3030        {
3031            runtime.scheduler.lock().schedule(task_id, 0);
3032        }
3033        runtime.run_until_quiescent();
3034        assert!(
3035            handle.is_finished(),
3036            "stopped server should report finished"
3037        );
3038
3039        let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
3040        assert_eq!(
3041            final_state.count, 0,
3042            "join should return final server state"
3043        );
3044
3045        let second = futures_lite::future::block_on(handle.join(&cx));
3046        assert!(
3047            matches!(second, Err(JoinError::PolledAfterCompletion)),
3048            "second join must fail closed, got {second:?}"
3049        );
3050
3051        crate::test_complete!("gen_server_handle_second_join_fails_closed");
3052    }
3053
3054    #[test]
3055    fn gen_server_stop_wakes_blocked_mailbox_recv() {
3056        #[allow(clippy::items_after_statements)]
3057        struct StopWakeProbe {
3058            stop_ran: Arc<AtomicU8>,
3059        }
3060
3061        #[allow(clippy::items_after_statements)]
3062        impl GenServer for StopWakeProbe {
3063            type Call = CounterCall;
3064            type Reply = u64;
3065            type Cast = CounterCast;
3066            type Info = SystemMsg;
3067
3068            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3069                self.stop_ran.store(1, Ordering::SeqCst);
3070                Box::pin(async {})
3071            }
3072
3073            fn handle_call(
3074                &mut self,
3075                _cx: &Cx,
3076                _request: CounterCall,
3077                reply: Reply<u64>,
3078            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3079                let _ = reply.send(0);
3080                Box::pin(async {})
3081            }
3082        }
3083
3084        init_test("gen_server_stop_wakes_blocked_mailbox_recv");
3085
3086        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3087        let region = runtime.state.create_root_region(Budget::INFINITE);
3088        let cx = Cx::for_testing();
3089        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3090
3091        let stop_ran = Arc::new(AtomicU8::new(0));
3092        let server = StopWakeProbe {
3093            stop_ran: Arc::clone(&stop_ran),
3094        };
3095
3096        let (handle, stored) = scope
3097            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3098            .unwrap();
3099        let server_task_id = handle.task_id();
3100        runtime.state.store_spawned_task(server_task_id, stored);
3101
3102        // Start server and let it park waiting on mailbox.recv().
3103        {
3104            runtime.scheduler.lock().schedule(server_task_id, 0);
3105        }
3106        runtime.run_until_idle();
3107
3108        // Stop should wake the blocked recv waiter. No manual reschedule here.
3109        handle.stop();
3110        runtime.run_until_quiescent();
3111
3112        assert_eq!(
3113            stop_ran.load(Ordering::SeqCst),
3114            1,
3115            "on_stop should run after stop wakes blocked recv"
3116        );
3117        assert!(handle.is_finished(), "server should finish after stop");
3118
3119        crate::test_complete!("gen_server_stop_wakes_blocked_mailbox_recv");
3120    }
3121
3122    #[test]
3123    fn dropped_join_future_marks_server_stopping_like_abort() {
3124        init_test("dropped_join_future_marks_server_stopping_like_abort");
3125
3126        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3127        let region = runtime.state.create_root_region(Budget::INFINITE);
3128        let cx = Cx::for_testing();
3129        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3130
3131        let final_count = Arc::new(AtomicU64::new(u64::MAX));
3132        let server = ObservableCounter {
3133            count: 0,
3134            final_count: Arc::clone(&final_count),
3135        };
3136
3137        let (mut handle, stored) = scope
3138            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3139            .unwrap();
3140        let server_task_id = handle.task_id();
3141        runtime.state.store_spawned_task(server_task_id, stored);
3142
3143        {
3144            runtime.scheduler.lock().schedule(server_task_id, 0);
3145        }
3146        runtime.run_until_idle();
3147        assert_eq!(
3148            handle.state.load(),
3149            ActorState::Running,
3150            "server should be running before join drop requests abort"
3151        );
3152
3153        drop(handle.join(&cx));
3154
3155        assert_eq!(
3156            handle.state.load(),
3157            ActorState::Stopping,
3158            "dropping join future should mirror GenServerHandle::abort state transition"
3159        );
3160
3161        runtime.run_until_quiescent();
3162        assert!(
3163            handle.is_finished(),
3164            "server should stop after join future drop"
3165        );
3166        assert_eq!(
3167            final_count.load(Ordering::SeqCst),
3168            0,
3169            "idle server should stop without processing phantom work"
3170        );
3171
3172        crate::test_complete!("dropped_join_future_marks_server_stopping_like_abort");
3173    }
3174
3175    // ---- Observable GenServer for E2E ----
3176
3177    struct ObservableCounter {
3178        count: u64,
3179        final_count: Arc<AtomicU64>,
3180    }
3181
3182    impl GenServer for ObservableCounter {
3183        type Call = CounterCall;
3184        type Reply = u64;
3185        type Cast = CounterCast;
3186        type Info = SystemMsg;
3187
3188        fn handle_call(
3189            &mut self,
3190            _cx: &Cx,
3191            request: CounterCall,
3192            reply: Reply<u64>,
3193        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3194            match request {
3195                CounterCall::Get => {
3196                    let _ = reply.send(self.count);
3197                }
3198                CounterCall::Add(n) => {
3199                    self.count += n;
3200                    let _ = reply.send(self.count);
3201                }
3202            }
3203            Box::pin(async {})
3204        }
3205
3206        fn handle_cast(
3207            &mut self,
3208            _cx: &Cx,
3209            msg: CounterCast,
3210        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3211            match msg {
3212                CounterCast::Reset => self.count = 0,
3213            }
3214            Box::pin(async {})
3215        }
3216
3217        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3218            self.final_count.store(self.count, Ordering::SeqCst);
3219            Box::pin(async {})
3220        }
3221    }
3222
3223    #[test]
3224    fn gen_server_processes_casts_before_stop() {
3225        init_test("gen_server_processes_casts_before_stop");
3226
3227        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3228        let region = runtime.state.create_root_region(Budget::INFINITE);
3229        let cx = Cx::for_testing();
3230        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3231
3232        let final_count = Arc::new(AtomicU64::new(u64::MAX));
3233        let server = ObservableCounter {
3234            count: 0,
3235            final_count: final_count.clone(),
3236        };
3237
3238        let (handle, stored) = scope
3239            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3240            .unwrap();
3241        let task_id = handle.task_id();
3242        runtime.state.store_spawned_task(task_id, stored);
3243
3244        // Start the server so casts are accepted.
3245        {
3246            runtime.scheduler.lock().schedule(task_id, 0);
3247        }
3248        runtime.run_until_idle();
3249
3250        // Queue a handful of casts, then disconnect. Shutdown must drain the mailbox
3251        // before running on_stop, so the final count reflects the cast effects.
3252        for _ in 0..5 {
3253            handle.try_cast(CounterCast::Reset).expect("try_cast ok");
3254        }
3255
3256        handle.stop();
3257
3258        {
3259            runtime.scheduler.lock().schedule(task_id, 0);
3260        }
3261        runtime.run_until_quiescent();
3262
3263        // Final count should be 0 (5 resets)
3264        assert_eq!(
3265            final_count.load(Ordering::SeqCst),
3266            0,
3267            "on_stop recorded final count"
3268        );
3269
3270        crate::test_complete!("gen_server_processes_casts_before_stop");
3271    }
3272
3273    #[test]
3274    fn gen_server_deterministic_replay() {
3275        fn run_scenario(seed: u64) -> u64 {
3276            let config = crate::lab::LabConfig::new(seed);
3277            let mut runtime = crate::lab::LabRuntime::new(config);
3278            let region = runtime.state.create_root_region(Budget::INFINITE);
3279            let cx = Cx::for_testing();
3280            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3281
3282            let final_count = Arc::new(AtomicU64::new(u64::MAX));
3283            let server = ObservableCounter {
3284                count: 0,
3285                final_count: final_count.clone(),
3286            };
3287
3288            let (handle, stored) = scope
3289                .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3290                .unwrap();
3291            let task_id = handle.task_id();
3292            runtime.state.store_spawned_task(task_id, stored);
3293
3294            {
3295                runtime.scheduler.lock().schedule(task_id, 0);
3296            }
3297            runtime.run_until_idle();
3298
3299            // 5 resets then disconnect
3300            for _ in 0..5 {
3301                handle.try_cast(CounterCast::Reset).expect("try_cast ok");
3302            }
3303            handle.stop();
3304
3305            {
3306                runtime.scheduler.lock().schedule(task_id, 0);
3307            }
3308            runtime.run_until_quiescent();
3309
3310            final_count.load(Ordering::SeqCst)
3311        }
3312
3313        init_test("gen_server_deterministic_replay");
3314
3315        let result1 = run_scenario(0xCAFE_BABE);
3316        let result2 = run_scenario(0xCAFE_BABE);
3317        assert_eq!(result1, result2, "deterministic replay");
3318
3319        crate::test_complete!("gen_server_deterministic_replay");
3320    }
3321
3322    // ---- System/info message tests (bd-188ey) ----
3323
3324    #[derive(Default)]
3325    struct InfoRecorder {
3326        seen: Arc<Mutex<Vec<String>>>,
3327    }
3328
3329    impl GenServer for InfoRecorder {
3330        type Call = ();
3331        type Reply = ();
3332        type Cast = ();
3333        type Info = SystemMsg;
3334
3335        fn handle_call(
3336            &mut self,
3337            _cx: &Cx,
3338            _request: (),
3339            reply: Reply<()>,
3340        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3341            let _ = reply.send(());
3342            Box::pin(async {})
3343        }
3344
3345        fn handle_info(
3346            &mut self,
3347            _cx: &Cx,
3348            msg: Self::Info,
3349        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3350            let seen = Arc::clone(&self.seen);
3351            Box::pin(async move {
3352                seen.lock().push(format!("{msg:?}"));
3353            })
3354        }
3355    }
3356
3357    fn tid(n: u32) -> TaskId {
3358        TaskId::from_arena(ArenaIndex::new(n, 0))
3359    }
3360
3361    fn rid(n: u32) -> crate::types::RegionId {
3362        crate::types::RegionId::from_arena(ArenaIndex::new(n, 0))
3363    }
3364
3365    /// Conformance: app shutdown batches use SYS-ORDER
3366    /// (`vt`, `Down < Exit < Timeout`, stable subject key).
3367    #[test]
3368    fn conformance_system_msg_sort_key_orders_shutdown_batch() {
3369        init_test("conformance_system_msg_sort_key_orders_shutdown_batch");
3370
3371        let mut monitors = crate::monitor::MonitorSet::new();
3372        let mref_down_6 = monitors.establish(tid(90), rid(0), tid(6));
3373        let mref_down_3 = monitors.establish(tid(91), rid(0), tid(3));
3374
3375        let mut batch = SystemMsgBatch::new();
3376        batch.push(SystemMsg::Exit {
3377            exit_vt: Time::from_secs(10),
3378            from: tid(6),
3379            reason: DownReason::Normal,
3380        });
3381        batch.push(SystemMsg::Timeout {
3382            tick_vt: Time::from_secs(10),
3383            id: 4,
3384        });
3385        batch.push(SystemMsg::Down {
3386            completion_vt: Time::from_secs(10),
3387            notification: DownNotification {
3388                monitored: tid(6),
3389                reason: DownReason::Normal,
3390                monitor_ref: mref_down_6,
3391            },
3392        });
3393        batch.push(SystemMsg::Timeout {
3394            tick_vt: Time::from_secs(9),
3395            id: 99,
3396        });
3397        batch.push(SystemMsg::Down {
3398            completion_vt: Time::from_secs(10),
3399            notification: DownNotification {
3400                monitored: tid(3),
3401                reason: DownReason::Normal,
3402                monitor_ref: mref_down_3,
3403            },
3404        });
3405        batch.push(SystemMsg::Exit {
3406            exit_vt: Time::from_secs(10),
3407            from: tid(2),
3408            reason: DownReason::Normal,
3409        });
3410        batch.push(SystemMsg::Timeout {
3411            tick_vt: Time::from_secs(10),
3412            id: 1,
3413        });
3414
3415        let sorted = batch.into_sorted();
3416        let keys: Vec<_> = sorted.iter().map(SystemMsg::sort_key).collect();
3417
3418        assert_eq!(
3419            keys,
3420            vec![
3421                (Time::from_secs(9), 2, SystemMsgSubjectKey::TimeoutId(99)),
3422                (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(3))),
3423                (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(6))),
3424                (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(2))),
3425                (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(6))),
3426                (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(1)),
3427                (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(4)),
3428            ],
3429            "shutdown system-message ordering must follow SYS-ORDER"
3430        );
3431
3432        crate::test_complete!("conformance_system_msg_sort_key_orders_shutdown_batch");
3433    }
3434
3435    /// Conformance: `SystemMsgBatch::into_sorted` is equivalent to explicit
3436    /// `sort_by_key(SystemMsg::sort_key)`.
3437    #[test]
3438    fn conformance_system_msg_batch_matches_explicit_sort() {
3439        init_test("conformance_system_msg_batch_matches_explicit_sort");
3440
3441        let mut monitors = crate::monitor::MonitorSet::new();
3442        let mref = monitors.establish(tid(77), rid(0), tid(8));
3443
3444        let messages = vec![
3445            SystemMsg::Timeout {
3446                tick_vt: Time::from_secs(12),
3447                id: 4,
3448            },
3449            SystemMsg::Exit {
3450                exit_vt: Time::from_secs(11),
3451                from: tid(8),
3452                reason: DownReason::Error("boom".to_string()),
3453            },
3454            SystemMsg::Down {
3455                completion_vt: Time::from_secs(11),
3456                notification: DownNotification {
3457                    monitored: tid(8),
3458                    reason: DownReason::Normal,
3459                    monitor_ref: mref,
3460                },
3461            },
3462            SystemMsg::Timeout {
3463                tick_vt: Time::from_secs(11),
3464                id: 2,
3465            },
3466        ];
3467
3468        let mut batch = SystemMsgBatch::new();
3469        for msg in messages.clone() {
3470            batch.push(msg);
3471        }
3472        let batched = batch.into_sorted();
3473
3474        let mut explicit = messages;
3475        explicit.sort_by_key(SystemMsg::sort_key);
3476
3477        let batched_keys: Vec<_> = batched.iter().map(SystemMsg::sort_key).collect();
3478        let explicit_keys: Vec<_> = explicit.iter().map(SystemMsg::sort_key).collect();
3479        assert_eq!(batched_keys, explicit_keys);
3480
3481        crate::test_complete!("conformance_system_msg_batch_matches_explicit_sort");
3482    }
3483
3484    #[test]
3485    fn system_msg_payload_types_roundtrip_via_conversions() {
3486        init_test("system_msg_payload_types_roundtrip_via_conversions");
3487
3488        let mut monitors = crate::monitor::MonitorSet::new();
3489        let mref = monitors.establish(tid(7), rid(0), tid(8));
3490
3491        let down = DownMsg::new(
3492            Time::from_secs(11),
3493            DownNotification {
3494                monitored: tid(8),
3495                reason: DownReason::Normal,
3496                monitor_ref: mref,
3497            },
3498        );
3499        let down_msg = SystemMsg::down(down.clone());
3500        let down_back = DownMsg::try_from(down_msg).expect("down conversion");
3501        assert_eq!(down_back.completion_vt, down.completion_vt);
3502        assert_eq!(
3503            down_back.notification.monitored,
3504            down.notification.monitored
3505        );
3506        assert_eq!(down_back.notification.reason, down.notification.reason);
3507        assert_eq!(
3508            down_back.notification.monitor_ref,
3509            down.notification.monitor_ref
3510        );
3511
3512        let exit = ExitMsg::new(
3513            Time::from_secs(12),
3514            tid(9),
3515            DownReason::Error("boom".into()),
3516        );
3517        let exit_msg = SystemMsg::exit(exit.clone());
3518        let exit_back = ExitMsg::try_from(exit_msg).expect("exit conversion");
3519        assert_eq!(exit_back, exit);
3520
3521        let timeout = TimeoutMsg::new(Time::from_secs(13), 42);
3522        let timeout_msg = SystemMsg::timeout(timeout);
3523        let timeout_back = TimeoutMsg::try_from(timeout_msg).expect("timeout conversion");
3524        assert_eq!(timeout_back, timeout);
3525
3526        crate::test_complete!("system_msg_payload_types_roundtrip_via_conversions");
3527    }
3528
3529    #[test]
3530    fn system_msg_try_from_mismatch_returns_original_variant() {
3531        init_test("system_msg_try_from_mismatch_returns_original_variant");
3532        let mut monitors = crate::monitor::MonitorSet::new();
3533        let mref = monitors.establish(tid(10), rid(0), tid(1));
3534
3535        let timeout = SystemMsg::Timeout {
3536            tick_vt: Time::from_secs(5),
3537            id: 99,
3538        };
3539        let err = DownMsg::try_from(timeout).expect_err("timeout is not down");
3540        assert!(matches!(err, SystemMsg::Timeout { id: 99, .. }));
3541
3542        let down = SystemMsg::Down {
3543            completion_vt: Time::from_secs(6),
3544            notification: DownNotification {
3545                monitored: tid(1),
3546                reason: DownReason::Normal,
3547                monitor_ref: mref,
3548            },
3549        };
3550        let err = TimeoutMsg::try_from(down).expect_err("down is not timeout");
3551        assert!(matches!(err, SystemMsg::Down { .. }));
3552
3553        crate::test_complete!("system_msg_try_from_mismatch_returns_original_variant");
3554    }
3555
3556    #[test]
3557    fn gen_server_handle_info_receives_system_messages() {
3558        init_test("gen_server_handle_info_receives_system_messages");
3559
3560        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3561        let region = runtime.state.create_root_region(Budget::INFINITE);
3562        let cx = Cx::for_testing();
3563        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3564
3565        let seen: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3566        let server = InfoRecorder {
3567            seen: Arc::clone(&seen),
3568        };
3569
3570        let (handle, stored) = scope
3571            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3572            .unwrap();
3573        let server_task_id = handle.task_id();
3574        runtime.state.store_spawned_task(server_task_id, stored);
3575
3576        let mut monitors = crate::monitor::MonitorSet::new();
3577        let mref = monitors.establish(tid(10), rid(0), tid(11));
3578
3579        handle
3580            .try_info(SystemMsg::Down {
3581                completion_vt: Time::from_secs(5),
3582                notification: DownNotification {
3583                    monitored: tid(11),
3584                    reason: DownReason::Normal,
3585                    monitor_ref: mref,
3586                },
3587            })
3588            .unwrap();
3589
3590        handle
3591            .try_info(SystemMsg::Exit {
3592                exit_vt: Time::from_secs(6),
3593                from: tid(12),
3594                reason: DownReason::Error("boom".into()),
3595            })
3596            .unwrap();
3597
3598        handle
3599            .try_info(SystemMsg::Timeout {
3600                tick_vt: Time::from_secs(7),
3601                id: 123,
3602            })
3603            .unwrap();
3604
3605        drop(handle);
3606
3607        {
3608            runtime.scheduler.lock().schedule(server_task_id, 0);
3609        }
3610        runtime.run_until_quiescent();
3611
3612        let seen = seen.lock();
3613        assert_eq!(seen.len(), 3);
3614        assert!(seen[0].contains("Down"));
3615        assert!(seen[1].contains("Exit"));
3616        assert!(seen[2].contains("Timeout"));
3617        drop(seen);
3618
3619        crate::test_complete!("gen_server_handle_info_receives_system_messages");
3620    }
3621
3622    #[test]
3623    fn gen_server_info_ordering_is_deterministic_for_seed() {
3624        fn run_scenario(seed: u64) -> Vec<String> {
3625            let config = crate::lab::LabConfig::new(seed);
3626            let mut runtime = crate::lab::LabRuntime::new(config);
3627            let region = runtime.state.create_root_region(Budget::INFINITE);
3628            let cx = Cx::for_testing();
3629            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3630
3631            let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3632            let server = InfoRecorder {
3633                seen: Arc::clone(&events),
3634            };
3635
3636            let (handle, stored) = scope
3637                .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3638                .unwrap();
3639            let server_task_id = handle.task_id();
3640            runtime.state.store_spawned_task(server_task_id, stored);
3641
3642            let server_ref = handle.server_ref();
3643
3644            let (client_a, stored_a) = scope
3645                .spawn(&mut runtime.state, &cx, move |cx| async move {
3646                    server_ref
3647                        .info(
3648                            &cx,
3649                            SystemMsg::Timeout {
3650                                tick_vt: Time::from_secs(2),
3651                                id: 1,
3652                            },
3653                        )
3654                        .await
3655                        .unwrap();
3656                })
3657                .unwrap();
3658            let task_id_a = client_a.task_id();
3659            runtime.state.store_spawned_task(task_id_a, stored_a);
3660
3661            let server_ref_b = handle.server_ref();
3662            let (client_b, stored_b) = scope
3663                .spawn(&mut runtime.state, &cx, move |cx| async move {
3664                    server_ref_b
3665                        .info(
3666                            &cx,
3667                            SystemMsg::Timeout {
3668                                tick_vt: Time::from_secs(2),
3669                                id: 2,
3670                            },
3671                        )
3672                        .await
3673                        .unwrap();
3674                })
3675                .unwrap();
3676            let task_id_b = client_b.task_id();
3677            runtime.state.store_spawned_task(task_id_b, stored_b);
3678
3679            // Let clients enqueue, then let the server drain.
3680            {
3681                runtime.scheduler.lock().schedule(task_id_a, 0);
3682            }
3683            {
3684                runtime.scheduler.lock().schedule(task_id_b, 0);
3685            }
3686            {
3687                runtime.scheduler.lock().schedule(server_task_id, 0);
3688            }
3689
3690            runtime.run_until_quiescent();
3691            drop(handle);
3692            {
3693                runtime.scheduler.lock().schedule(server_task_id, 0);
3694            }
3695            runtime.run_until_quiescent();
3696
3697            events.lock().clone()
3698        }
3699
3700        init_test("gen_server_info_ordering_is_deterministic_for_seed");
3701
3702        let a = run_scenario(0xD00D_F00D);
3703        let b = run_scenario(0xD00D_F00D);
3704        assert_eq!(
3705            a, b,
3706            "system/info ordering must be deterministic for same seed"
3707        );
3708
3709        crate::test_complete!("gen_server_info_ordering_is_deterministic_for_seed");
3710    }
3711
3712    // ---- DropOldest GenServer for backpressure tests ----
3713
3714    /// A counter that uses DropOldest overflow policy.
3715    struct DropOldestCounter {
3716        count: u64,
3717    }
3718
3719    /// Cast type that carries an identifiable value for eviction testing.
3720    #[derive(Debug, Clone)]
3721    enum TaggedCast {
3722        Set(u64),
3723    }
3724
3725    impl GenServer for DropOldestCounter {
3726        type Call = CounterCall;
3727        type Reply = u64;
3728        type Cast = TaggedCast;
3729        type Info = SystemMsg;
3730
3731        fn cast_overflow_policy(&self) -> CastOverflowPolicy {
3732            CastOverflowPolicy::DropOldest
3733        }
3734
3735        fn handle_call(
3736            &mut self,
3737            _cx: &Cx,
3738            request: CounterCall,
3739            reply: Reply<u64>,
3740        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3741            match request {
3742                CounterCall::Get => {
3743                    let _ = reply.send(self.count);
3744                }
3745                CounterCall::Add(n) => {
3746                    self.count += n;
3747                    let _ = reply.send(self.count);
3748                }
3749            }
3750            Box::pin(async {})
3751        }
3752
3753        fn handle_cast(
3754            &mut self,
3755            _cx: &Cx,
3756            msg: TaggedCast,
3757        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3758            match msg {
3759                TaggedCast::Set(v) => self.count = v,
3760            }
3761            Box::pin(async {})
3762        }
3763    }
3764
3765    #[test]
3766    fn gen_server_drop_oldest_policy_accessor() {
3767        init_test("gen_server_drop_oldest_policy_accessor");
3768
3769        let mut state = RuntimeState::new();
3770        let root = state.create_root_region(Budget::INFINITE);
3771        let cx = Cx::for_testing();
3772        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3773
3774        let (handle, stored) = scope
3775            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 4)
3776            .unwrap();
3777        state.store_spawned_task(handle.task_id(), stored);
3778
3779        assert_eq!(
3780            handle.cast_overflow_policy(),
3781            CastOverflowPolicy::DropOldest
3782        );
3783
3784        let server_ref = handle.server_ref();
3785        assert_eq!(
3786            server_ref.cast_overflow_policy(),
3787            CastOverflowPolicy::DropOldest
3788        );
3789
3790        crate::test_complete!("gen_server_drop_oldest_policy_accessor");
3791    }
3792
3793    #[test]
3794    fn gen_server_drop_oldest_evicts_when_full() {
3795        init_test("gen_server_drop_oldest_evicts_when_full");
3796
3797        let mut state = RuntimeState::new();
3798        let root = state.create_root_region(Budget::INFINITE);
3799        let cx = Cx::for_testing();
3800        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3801
3802        // Mailbox capacity = 2
3803        let (handle, stored) = scope
3804            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3805            .unwrap();
3806        state.store_spawned_task(handle.task_id(), stored);
3807
3808        // Fill the mailbox (capacity 2)
3809        handle.try_cast(TaggedCast::Set(10)).unwrap();
3810        handle.try_cast(TaggedCast::Set(20)).unwrap();
3811
3812        // This should succeed by evicting the oldest (Set(10))
3813        handle.try_cast(TaggedCast::Set(30)).unwrap();
3814
3815        // And again — evicts Set(20), mailbox now has [Set(30), Set(40)]
3816        handle.try_cast(TaggedCast::Set(40)).unwrap();
3817
3818        crate::test_complete!("gen_server_drop_oldest_evicts_when_full");
3819    }
3820
3821    #[test]
3822    fn gen_server_reject_policy_returns_full() {
3823        init_test("gen_server_reject_policy_returns_full");
3824
3825        let mut state = RuntimeState::new();
3826        let root = state.create_root_region(Budget::INFINITE);
3827        let cx = Cx::for_testing();
3828        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3829
3830        // Default policy (Reject), capacity = 2
3831        let (handle, stored) = scope
3832            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3833            .unwrap();
3834        state.store_spawned_task(handle.task_id(), stored);
3835
3836        assert_eq!(handle.cast_overflow_policy(), CastOverflowPolicy::Reject);
3837
3838        // Fill the mailbox
3839        handle.try_cast(CounterCast::Reset).unwrap();
3840        handle.try_cast(CounterCast::Reset).unwrap();
3841
3842        // Third should fail with Full
3843        let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3844        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3845
3846        crate::test_complete!("gen_server_reject_policy_returns_full");
3847    }
3848
3849    #[test]
3850    fn gen_server_drop_oldest_ref_also_evicts() {
3851        init_test("gen_server_drop_oldest_ref_also_evicts");
3852
3853        let mut state = RuntimeState::new();
3854        let root = state.create_root_region(Budget::INFINITE);
3855        let cx = Cx::for_testing();
3856        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3857
3858        let (handle, stored) = scope
3859            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3860            .unwrap();
3861        state.store_spawned_task(handle.task_id(), stored);
3862
3863        let server_ref = handle.server_ref();
3864
3865        // Fill via ref
3866        server_ref.try_cast(TaggedCast::Set(1)).unwrap();
3867        server_ref.try_cast(TaggedCast::Set(2)).unwrap();
3868
3869        // Evict oldest via ref — should succeed
3870        server_ref.try_cast(TaggedCast::Set(3)).unwrap();
3871
3872        crate::test_complete!("gen_server_drop_oldest_ref_also_evicts");
3873    }
3874
3875    #[test]
3876    fn gen_server_drop_oldest_reserved_slots_returns_full() {
3877        init_test("gen_server_drop_oldest_reserved_slots_returns_full");
3878
3879        let mut state = RuntimeState::new();
3880        let root = state.create_root_region(Budget::INFINITE);
3881        let cx = Cx::for_testing();
3882        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3883
3884        let (handle, stored) = scope
3885            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3886            .unwrap();
3887        state.store_spawned_task(handle.task_id(), stored);
3888
3889        // Reserve the only mailbox slot without committing a message.
3890        let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
3891
3892        // DropOldest cannot evict reserved slots, so this must be a recoverable Full.
3893        let err = handle.try_cast(TaggedCast::Set(1)).unwrap_err();
3894        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3895
3896        crate::test_complete!("gen_server_drop_oldest_reserved_slots_returns_full");
3897    }
3898
3899    #[test]
3900    fn gen_server_ref_drop_oldest_reserved_slots_returns_full() {
3901        init_test("gen_server_ref_drop_oldest_reserved_slots_returns_full");
3902
3903        let mut state = RuntimeState::new();
3904        let root = state.create_root_region(Budget::INFINITE);
3905        let cx = Cx::for_testing();
3906        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3907
3908        let (handle, stored) = scope
3909            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3910            .unwrap();
3911        state.store_spawned_task(handle.task_id(), stored);
3912        let server_ref = handle.server_ref();
3913
3914        // Reserve the only mailbox slot without committing a message.
3915        let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
3916
3917        // Mirror behavior through GenServerRef::try_cast.
3918        let err = server_ref.try_cast(TaggedCast::Set(1)).unwrap_err();
3919        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3920
3921        crate::test_complete!("gen_server_ref_drop_oldest_reserved_slots_returns_full");
3922    }
3923
3924    /// DropOldest is cast-scoped: a queued Call must not be evicted by a later cast.
3925    #[test]
3926    fn gen_server_drop_oldest_preserves_queued_call_and_returns_full() {
3927        init_test("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
3928
3929        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3930        let root = runtime.state.create_root_region(Budget::INFINITE);
3931        let cx = Cx::for_testing();
3932        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3933
3934        let (handle, stored) = scope
3935            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3936            .unwrap();
3937        let task_id = handle.task_id();
3938        runtime.state.store_spawned_task(task_id, stored);
3939
3940        let (reply_tx, mut reply_rx) = session::tracked_oneshot::<u64>();
3941        let reply_permit = reply_tx.reserve(&cx);
3942        let call_envelope: Envelope<DropOldestCounter> = Envelope::Call {
3943            request: CounterCall::Get,
3944            reply_permit,
3945        };
3946        handle.sender.try_send(call_envelope).unwrap();
3947
3948        let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
3949        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3950
3951        handle.stop();
3952        {
3953            runtime.scheduler.lock().schedule(task_id, 0);
3954        }
3955        runtime.run_until_quiescent();
3956
3957        let recv = futures_lite::future::block_on(reply_rx.recv(&cx));
3958        assert_eq!(
3959            recv,
3960            Ok(0),
3961            "preserved queued call should still be serviced, got {recv:?}"
3962        );
3963
3964        crate::test_complete!("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
3965    }
3966
3967    #[test]
3968    fn gen_server_drop_oldest_preserves_queued_info_and_returns_full() {
3969        init_test("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
3970
3971        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3972        let root = runtime.state.create_root_region(Budget::INFINITE);
3973        let cx = Cx::for_testing();
3974        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3975
3976        let (handle, stored) = scope
3977            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3978            .unwrap();
3979        let task_id = handle.task_id();
3980        runtime.state.store_spawned_task(task_id, stored);
3981
3982        let info = SystemMsg::timeout(TimeoutMsg::new(Time::from_secs(1), 7));
3983        handle
3984            .sender
3985            .try_send(Envelope::Info { msg: info })
3986            .expect("queue info");
3987
3988        let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
3989        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3990
3991        handle.stop();
3992        {
3993            runtime.scheduler.lock().schedule(task_id, 0);
3994        }
3995        runtime.run_until_quiescent();
3996
3997        crate::test_complete!("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
3998    }
3999
4000    #[test]
4001    fn gen_server_default_overflow_policy_is_reject() {
4002        init_test("gen_server_default_overflow_policy_is_reject");
4003
4004        assert_eq!(CastOverflowPolicy::default(), CastOverflowPolicy::Reject);
4005
4006        // Verify Counter (which doesn't override) uses Reject
4007        let counter = Counter { count: 0 };
4008        assert_eq!(counter.cast_overflow_policy(), CastOverflowPolicy::Reject);
4009
4010        crate::test_complete!("gen_server_default_overflow_policy_is_reject");
4011    }
4012
4013    #[test]
4014    fn reply_debug_format() {
4015        init_test("reply_debug_format");
4016
4017        let cx = Cx::for_testing();
4018        let (tx, _rx) = session::tracked_oneshot::<u64>();
4019        let permit = tx.reserve(&cx);
4020        let reply = Reply::new(&cx, permit);
4021        let debug_str = format!("{reply:?}");
4022        assert!(debug_str.contains("Reply"));
4023        assert!(debug_str.contains("pending"));
4024
4025        // Consume the reply to avoid the obligation panic
4026        let _ = reply.send(42);
4027
4028        crate::test_complete!("reply_debug_format");
4029    }
4030
4031    #[test]
4032    fn gen_server_on_start_budget_priority_applied_and_restored() {
4033        init_test("gen_server_on_start_budget_priority_applied_and_restored");
4034
4035        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4036        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4037        let region = runtime.state.create_root_region(budget);
4038        let cx = Cx::for_testing();
4039        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4040
4041        let started_priority = Arc::new(AtomicU8::new(0));
4042        let loop_priority = Arc::new(AtomicU8::new(0));
4043        let server = StartBudgetProbe {
4044            started_priority: Arc::clone(&started_priority),
4045            loop_priority: Arc::clone(&loop_priority),
4046        };
4047
4048        let (handle, stored) = scope
4049            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4050            .unwrap();
4051        let server_task_id = handle.task_id();
4052        runtime.state.store_spawned_task(server_task_id, stored);
4053
4054        let server_ref = handle.server_ref();
4055        let (client, stored_client) = scope
4056            .spawn(&mut runtime.state, &cx, move |cx| async move {
4057                let p = server_ref.call(&cx, CounterCall::Get).await.unwrap();
4058                assert_eq!(p, 10);
4059            })
4060            .unwrap();
4061        let client_task_id = client.task_id();
4062        runtime
4063            .state
4064            .store_spawned_task(client_task_id, stored_client);
4065
4066        {
4067            runtime.scheduler.lock().schedule(server_task_id, 0);
4068        }
4069        {
4070            runtime.scheduler.lock().schedule(client_task_id, 0);
4071        }
4072        runtime.run_until_quiescent();
4073
4074        assert_eq!(started_priority.load(Ordering::SeqCst), 200);
4075        assert_eq!(loop_priority.load(Ordering::SeqCst), 10);
4076
4077        drop(handle);
4078        {
4079            runtime.scheduler.lock().schedule(server_task_id, 0);
4080        }
4081        runtime.run_until_quiescent();
4082
4083        crate::test_complete!("gen_server_on_start_budget_priority_applied_and_restored");
4084    }
4085
4086    #[test]
4087    fn gen_server_on_stop_runs_masked_under_stop() {
4088        init_test("gen_server_on_stop_runs_masked_under_stop");
4089
4090        let budget = Budget::new().with_poll_quota(10_000);
4091        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4092        let region = runtime.state.create_root_region(budget);
4093        let cx = Cx::for_testing();
4094        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4095
4096        let stop_checkpoint_ok = Arc::new(AtomicU8::new(0));
4097        let server = StopMaskProbe {
4098            stop_checkpoint_ok: Arc::clone(&stop_checkpoint_ok),
4099        };
4100
4101        let (handle, stored) = scope
4102            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4103            .unwrap();
4104        let server_task_id = handle.task_id();
4105        runtime.state.store_spawned_task(server_task_id, stored);
4106
4107        // Request stop: sets cancel_requested. on_stop must run masked.
4108        handle.stop();
4109
4110        {
4111            runtime.scheduler.lock().schedule(server_task_id, 0);
4112        }
4113        runtime.run_until_quiescent();
4114
4115        assert_eq!(stop_checkpoint_ok.load(Ordering::SeqCst), 1);
4116
4117        crate::test_complete!("gen_server_on_stop_runs_masked_under_stop");
4118    }
4119
4120    // ── Cast overflow policy tests (bd-2o5hg) ────────────────────────
4121
4122    /// Verify that DropOldest eviction emits a trace event.
4123    #[test]
4124    fn cast_drop_oldest_emits_trace_on_eviction() {
4125        init_test("cast_drop_oldest_emits_trace_on_eviction");
4126
4127        let budget = Budget::new().with_poll_quota(10_000);
4128        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4129        let region = runtime.state.create_root_region(budget);
4130        let cx = Cx::for_testing();
4131        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4132
4133        // Capacity=1 so the second cast evicts the first
4134        let (handle, stored) = scope
4135            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
4136            .unwrap();
4137        let task_id = handle.task_id();
4138        runtime.state.store_spawned_task(task_id, stored);
4139
4140        // Schedule so Cx::current() is set
4141        {
4142            runtime.scheduler.lock().schedule(task_id, 0);
4143        }
4144        runtime.run_until_idle();
4145
4146        // First cast fills the mailbox
4147        handle.try_cast(TaggedCast::Set(1)).unwrap();
4148        // Second cast evicts the first
4149        handle.try_cast(TaggedCast::Set(2)).unwrap();
4150
4151        // The eviction trace is emitted via Cx::current() (set during task poll).
4152        // We confirm it succeeded (no panic/error).
4153        crate::test_complete!("cast_drop_oldest_emits_trace_on_eviction");
4154    }
4155
4156    /// Casting to a stopped server returns ServerStopped.
4157    #[test]
4158    fn cast_to_stopped_server_returns_error() {
4159        init_test("cast_to_stopped_server_returns_error");
4160
4161        let mut state = RuntimeState::new();
4162        let root = state.create_root_region(Budget::INFINITE);
4163        let cx = Cx::for_testing();
4164        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4165
4166        let (handle, stored) = scope
4167            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 4)
4168            .unwrap();
4169        state.store_spawned_task(handle.task_id(), stored);
4170
4171        // Stop the server
4172        handle.stop();
4173
4174        // try_cast should return ServerStopped
4175        let err = handle.try_cast(CounterCast::Reset).unwrap_err();
4176        assert!(
4177            matches!(err, CastError::ServerStopped),
4178            "expected ServerStopped, got {err:?}"
4179        );
4180
4181        crate::test_complete!("cast_to_stopped_server_returns_error");
4182    }
4183
4184    /// CastOverflowPolicy Display is correct.
4185    #[test]
4186    fn cast_overflow_policy_display() {
4187        init_test("cast_overflow_policy_display");
4188
4189        assert_eq!(format!("{}", CastOverflowPolicy::Reject), "Reject");
4190        assert_eq!(format!("{}", CastOverflowPolicy::DropOldest), "DropOldest");
4191
4192        crate::test_complete!("cast_overflow_policy_display");
4193    }
4194
4195    /// Reject policy on GenServerRef returns Full when mailbox is full.
4196    #[test]
4197    fn cast_ref_reject_returns_full() {
4198        init_test("cast_ref_reject_returns_full");
4199
4200        let mut state = RuntimeState::new();
4201        let root = state.create_root_region(Budget::INFINITE);
4202        let cx = Cx::for_testing();
4203        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4204
4205        let (handle, stored) = scope
4206            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
4207            .unwrap();
4208        state.store_spawned_task(handle.task_id(), stored);
4209
4210        let server_ref = handle.server_ref();
4211
4212        // Fill the mailbox via server_ref
4213        server_ref.try_cast(CounterCast::Reset).unwrap();
4214        server_ref.try_cast(CounterCast::Reset).unwrap();
4215
4216        // Third should fail with Full
4217        let err = server_ref.try_cast(CounterCast::Reset).unwrap_err();
4218        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
4219
4220        crate::test_complete!("cast_ref_reject_returns_full");
4221    }
4222
4223    /// DropOldest via GenServerRef with capacity=1 evicts correctly.
4224    #[test]
4225    fn cast_drop_oldest_ref_capacity_one() {
4226        init_test("cast_drop_oldest_ref_capacity_one");
4227
4228        let mut state = RuntimeState::new();
4229        let root = state.create_root_region(Budget::INFINITE);
4230        let cx = Cx::for_testing();
4231        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4232
4233        let (handle, stored) = scope
4234            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
4235            .unwrap();
4236        state.store_spawned_task(handle.task_id(), stored);
4237
4238        let server_ref = handle.server_ref();
4239
4240        // Fill with one message
4241        server_ref.try_cast(TaggedCast::Set(100)).unwrap();
4242        // Evict and replace — should succeed with capacity=1
4243        server_ref.try_cast(TaggedCast::Set(200)).unwrap();
4244        server_ref.try_cast(TaggedCast::Set(300)).unwrap();
4245
4246        crate::test_complete!("cast_drop_oldest_ref_capacity_one");
4247    }
4248
4249    // ── Init/Terminate semantics (bd-3ejoi) ──────────────────────────
4250
4251    #[test]
4252    fn init_default_budget_is_infinite() {
4253        init_test("init_default_budget_is_infinite");
4254        let counter = Counter { count: 0 };
4255        assert_eq!(counter.on_start_budget(), Budget::INFINITE);
4256        crate::test_complete!("init_default_budget_is_infinite");
4257    }
4258
4259    #[test]
4260    fn terminate_default_budget_is_minimal() {
4261        init_test("terminate_default_budget_is_minimal");
4262        let counter = Counter { count: 0 };
4263        assert_eq!(counter.on_stop_budget(), Budget::MINIMAL);
4264        crate::test_complete!("terminate_default_budget_is_minimal");
4265    }
4266
4267    /// If the task is cancelled before init, on_start is skipped but on_stop
4268    /// still runs (deterministic cleanup guarantee).
4269    #[test]
4270    fn init_skipped_when_pre_cancelled_but_stop_runs() {
4271        #[allow(clippy::items_after_statements)]
4272        struct LifecycleProbe {
4273            init_ran: Arc<AtomicU8>,
4274            stop_ran: Arc<AtomicU8>,
4275        }
4276
4277        #[allow(clippy::items_after_statements)]
4278        impl GenServer for LifecycleProbe {
4279            type Call = CounterCall;
4280            type Reply = u64;
4281            type Cast = CounterCast;
4282            type Info = SystemMsg;
4283
4284            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4285                self.init_ran.store(1, Ordering::SeqCst);
4286                Box::pin(async {})
4287            }
4288
4289            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4290                self.stop_ran.store(1, Ordering::SeqCst);
4291                Box::pin(async {})
4292            }
4293
4294            fn handle_call(
4295                &mut self,
4296                _cx: &Cx,
4297                _request: CounterCall,
4298                reply: Reply<u64>,
4299            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4300                let _ = reply.send(0);
4301                Box::pin(async {})
4302            }
4303        }
4304
4305        init_test("init_skipped_when_pre_cancelled_but_stop_runs");
4306
4307        let budget = Budget::new().with_poll_quota(10_000);
4308        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4309        let region = runtime.state.create_root_region(budget);
4310        let cx = Cx::for_testing();
4311        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4312
4313        let init_ran = Arc::new(AtomicU8::new(0));
4314        let stop_ran = Arc::new(AtomicU8::new(0));
4315
4316        let server = LifecycleProbe {
4317            init_ran: Arc::clone(&init_ran),
4318            stop_ran: Arc::clone(&stop_ran),
4319        };
4320
4321        let (handle, stored) = scope
4322            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4323            .unwrap();
4324        let server_task_id = handle.task_id();
4325        runtime.state.store_spawned_task(server_task_id, stored);
4326
4327        // Cancel BEFORE scheduling (pre-cancel)
4328        handle.stop();
4329
4330        {
4331            runtime.scheduler.lock().schedule(server_task_id, 0);
4332        }
4333        runtime.run_until_quiescent();
4334
4335        // Init should be skipped
4336        assert_eq!(
4337            init_ran.load(Ordering::SeqCst),
4338            0,
4339            "init should be skipped when pre-cancelled"
4340        );
4341        // Stop should still run
4342        assert_eq!(
4343            stop_ran.load(Ordering::SeqCst),
4344            1,
4345            "stop must run even when pre-cancelled"
4346        );
4347
4348        crate::test_complete!("init_skipped_when_pre_cancelled_but_stop_runs");
4349    }
4350
4351    /// Verify that budget consumed during on_start is subtracted from the main
4352    /// task budget when the guard restores.
4353    #[test]
4354    fn init_budget_consumption_propagates_to_main_budget() {
4355        #[allow(clippy::items_after_statements)]
4356        struct BudgetCheckProbe {
4357            loop_quota: Arc<AtomicU64>,
4358        }
4359
4360        #[allow(clippy::items_after_statements)]
4361        impl GenServer for BudgetCheckProbe {
4362            type Call = CounterCall;
4363            type Reply = u64;
4364            type Cast = CounterCast;
4365            type Info = SystemMsg;
4366
4367            fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4368                // "Consume" some budget by using polls
4369                // In practice, the poll_quota is decremented by the runtime
4370                // but we can verify the budget baseline is properly set.
4371                let _ = cx.budget();
4372                Box::pin(async {})
4373            }
4374
4375            fn on_start_budget(&self) -> Budget {
4376                // Tight init budget
4377                Budget::new().with_poll_quota(50).with_priority(200)
4378            }
4379
4380            fn handle_call(
4381                &mut self,
4382                cx: &Cx,
4383                _request: CounterCall,
4384                reply: Reply<u64>,
4385            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4386                // After init, check the remaining budget
4387                self.loop_quota
4388                    .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
4389                let _ = reply.send(0);
4390                Box::pin(async {})
4391            }
4392        }
4393
4394        init_test("init_budget_consumption_propagates_to_main_budget");
4395
4396        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4397        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4398        let region = runtime.state.create_root_region(budget);
4399        let cx = Cx::for_testing();
4400        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4401
4402        let loop_quota = Arc::new(AtomicU64::new(0));
4403
4404        let server = BudgetCheckProbe {
4405            loop_quota: Arc::clone(&loop_quota),
4406        };
4407
4408        let (handle, stored) = scope
4409            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4410            .unwrap();
4411        let server_task_id = handle.task_id();
4412        runtime.state.store_spawned_task(server_task_id, stored);
4413
4414        let server_ref = handle.server_ref();
4415        let (client, stored_client) = scope
4416            .spawn(&mut runtime.state, &cx, move |cx| async move {
4417                let _ = server_ref.call(&cx, CounterCall::Get).await;
4418            })
4419            .unwrap();
4420        let client_task_id = client.task_id();
4421        runtime
4422            .state
4423            .store_spawned_task(client_task_id, stored_client);
4424
4425        {
4426            runtime.scheduler.lock().schedule(server_task_id, 0);
4427        }
4428        {
4429            runtime.scheduler.lock().schedule(client_task_id, 0);
4430        }
4431        runtime.run_until_quiescent();
4432
4433        // After init, the main budget should have the original quota minus
4434        // whatever was consumed during init. It should be <= 10_000.
4435        let remaining = loop_quota.load(Ordering::SeqCst);
4436        assert!(
4437            remaining <= 10_000,
4438            "main budget after init must be <= original ({remaining} <= 10000)"
4439        );
4440        assert!(
4441            remaining > 0,
4442            "main budget should still have polls remaining"
4443        );
4444
4445        drop(handle);
4446        {
4447            runtime.scheduler.lock().schedule(server_task_id, 0);
4448        }
4449        runtime.run_until_quiescent();
4450
4451        crate::test_complete!("init_budget_consumption_propagates_to_main_budget");
4452    }
4453
4454    /// Verify on_stop_budget tightens the budget during the stop phase.
4455    #[test]
4456    fn stop_budget_constrains_stop_phase() {
4457        struct StopBudgetProbe {
4458            stop_poll_quota: Arc<AtomicU64>,
4459        }
4460
4461        impl GenServer for StopBudgetProbe {
4462            type Call = CounterCall;
4463            type Reply = u64;
4464            type Cast = CounterCast;
4465            type Info = SystemMsg;
4466
4467            fn on_stop_budget(&self) -> Budget {
4468                Budget::new().with_poll_quota(42).with_priority(250)
4469            }
4470
4471            fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4472                self.stop_poll_quota
4473                    .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
4474                Box::pin(async {})
4475            }
4476
4477            fn handle_call(
4478                &mut self,
4479                _cx: &Cx,
4480                _request: CounterCall,
4481                reply: Reply<u64>,
4482            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4483                let _ = reply.send(0);
4484                Box::pin(async {})
4485            }
4486        }
4487
4488        init_test("stop_budget_constrains_stop_phase");
4489
4490        let budget = Budget::new().with_poll_quota(10_000);
4491        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4492        let region = runtime.state.create_root_region(budget);
4493        let cx = Cx::for_testing();
4494        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4495
4496        let stop_poll_quota = Arc::new(AtomicU64::new(0));
4497
4498        let server = StopBudgetProbe {
4499            stop_poll_quota: Arc::clone(&stop_poll_quota),
4500        };
4501
4502        let (handle, stored) = scope
4503            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4504            .unwrap();
4505        let server_task_id = handle.task_id();
4506        runtime.state.store_spawned_task(server_task_id, stored);
4507
4508        // Trigger stop
4509        handle.stop();
4510
4511        {
4512            runtime.scheduler.lock().schedule(server_task_id, 0);
4513        }
4514        runtime.run_until_quiescent();
4515
4516        let stop_quota = stop_poll_quota.load(Ordering::SeqCst);
4517        // The stop budget is meet(original, on_stop_budget), so
4518        // poll_quota should be min(10_000, 42) = 42
4519        assert_eq!(stop_quota, 42, "stop phase should use the tighter budget");
4520
4521        crate::test_complete!("stop_budget_constrains_stop_phase");
4522    }
4523
4524    /// Verify that init runs before stop, and stop always runs even on
4525    /// immediate shutdown.
4526    #[test]
4527    fn lifecycle_init_before_stop() {
4528        #[allow(clippy::items_after_statements)]
4529        struct PhaseTracker {
4530            phases: Arc<Mutex<Vec<&'static str>>>,
4531        }
4532
4533        #[allow(clippy::items_after_statements)]
4534        impl GenServer for PhaseTracker {
4535            type Call = CounterCall;
4536            type Reply = u64;
4537            type Cast = CounterCast;
4538            type Info = SystemMsg;
4539
4540            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4541                self.phases.lock().push("init");
4542                Box::pin(async {})
4543            }
4544
4545            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4546                self.phases.lock().push("stop");
4547                Box::pin(async {})
4548            }
4549
4550            fn handle_call(
4551                &mut self,
4552                _cx: &Cx,
4553                _request: CounterCall,
4554                reply: Reply<u64>,
4555            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4556                let _ = reply.send(0);
4557                Box::pin(async {})
4558            }
4559
4560            fn handle_cast(
4561                &mut self,
4562                _cx: &Cx,
4563                _msg: CounterCast,
4564            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4565                Box::pin(async {})
4566            }
4567        }
4568
4569        init_test("lifecycle_init_before_stop");
4570
4571        let budget = Budget::new().with_poll_quota(10_000);
4572        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4573        let region = runtime.state.create_root_region(budget);
4574        let cx = Cx::for_testing();
4575        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4576
4577        let phases = Arc::new(Mutex::new(Vec::<&'static str>::new()));
4578
4579        let server = PhaseTracker {
4580            phases: Arc::clone(&phases),
4581        };
4582
4583        let (handle, stored) = scope
4584            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4585            .unwrap();
4586        let server_task_id = handle.task_id();
4587        runtime.state.store_spawned_task(server_task_id, stored);
4588
4589        // Schedule the server so init runs, then idle on recv
4590        {
4591            runtime.scheduler.lock().schedule(server_task_id, 0);
4592        }
4593        runtime.run_until_idle();
4594
4595        // Stop the server and reschedule so on_stop runs
4596        let phases_clone = Arc::clone(&phases);
4597        handle.stop();
4598        {
4599            runtime.scheduler.lock().schedule(server_task_id, 0);
4600        }
4601        runtime.run_until_idle();
4602
4603        {
4604            let recorded = phases_clone.lock();
4605
4606            // Stop must always run
4607            assert!(
4608                recorded.contains(&"stop"),
4609                "stop phase must run, got {:?}",
4610                *recorded
4611            );
4612
4613            // If init ran, it must precede stop
4614            if let Some(init_pos) = recorded.iter().position(|p| *p == "init") {
4615                let stop_pos = recorded.iter().position(|p| *p == "stop").unwrap();
4616                assert!(
4617                    init_pos < stop_pos,
4618                    "init must precede stop, got {:?}",
4619                    *recorded
4620                );
4621            }
4622
4623            drop(recorded);
4624        }
4625
4626        crate::test_complete!("lifecycle_init_before_stop");
4627    }
4628
4629    /// Verify that on_stop_budget with a custom priority overrides the
4630    /// budget priority during the stop phase.
4631    #[test]
4632    fn stop_budget_priority_applied() {
4633        #[allow(clippy::items_after_statements)]
4634        struct StopPriorityProbe {
4635            stop_priority: Arc<AtomicU8>,
4636        }
4637
4638        #[allow(clippy::items_after_statements)]
4639        impl GenServer for StopPriorityProbe {
4640            type Call = CounterCall;
4641            type Reply = u64;
4642            type Cast = CounterCast;
4643            type Info = SystemMsg;
4644
4645            fn on_stop_budget(&self) -> Budget {
4646                Budget::new().with_poll_quota(200).with_priority(240)
4647            }
4648
4649            fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4650                self.stop_priority
4651                    .store(cx.budget().priority, Ordering::SeqCst);
4652                Box::pin(async {})
4653            }
4654
4655            fn handle_call(
4656                &mut self,
4657                _cx: &Cx,
4658                _request: CounterCall,
4659                reply: Reply<u64>,
4660            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4661                let _ = reply.send(0);
4662                Box::pin(async {})
4663            }
4664        }
4665
4666        init_test("stop_budget_priority_applied");
4667
4668        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4669        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4670        let region = runtime.state.create_root_region(budget);
4671        let cx = Cx::for_testing();
4672        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4673
4674        let stop_priority = Arc::new(AtomicU8::new(0));
4675
4676        let server = StopPriorityProbe {
4677            stop_priority: Arc::clone(&stop_priority),
4678        };
4679
4680        let (handle, stored) = scope
4681            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4682            .unwrap();
4683        let server_task_id = handle.task_id();
4684        runtime.state.store_spawned_task(server_task_id, stored);
4685
4686        handle.stop();
4687
4688        {
4689            runtime.scheduler.lock().schedule(server_task_id, 0);
4690        }
4691        runtime.run_until_quiescent();
4692
4693        // priority = max(original=10, stop_budget=240) after meet
4694        // meet takes min for quotas but max for priority
4695        let actual_priority = stop_priority.load(Ordering::SeqCst);
4696        assert!(
4697            actual_priority >= 10,
4698            "stop priority should be at least original ({actual_priority} >= 10)"
4699        );
4700
4701        crate::test_complete!("stop_budget_priority_applied");
4702    }
4703
4704    // ========================================================================
4705    // Conformance + Lab Tests (bd-l6b71)
4706    //
4707    // These tests verify the GenServer conformance suite:
4708    //   - reply linearity (obligation enforcement)
4709    //   - cancel propagation through call/cast
4710    //   - mailbox overflow determinism
4711    //   - full lifecycle with no obligation leaks
4712    //   - deterministic replay (same seed = same outcome)
4713    // ========================================================================
4714
4715    /// Multiple queued calls all receive `Cancelled` when the server's region
4716    /// is cancelled. Verifies cancel propagation to pending call waiters.
4717    #[test]
4718    fn conformance_cancel_propagation_to_queued_calls() {
4719        init_test("conformance_cancel_propagation_to_queued_calls");
4720
4721        let budget = Budget::new().with_poll_quota(50_000);
4722        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4723        let region = runtime.state.create_root_region(budget);
4724        let cx = Cx::for_testing();
4725        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4726
4727        // Capacity-1 mailbox: second call will block waiting for capacity.
4728        let (handle, stored) = scope
4729            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
4730            .unwrap();
4731        let server_task_id = handle.task_id();
4732        runtime.state.store_spawned_task(server_task_id, stored);
4733
4734        let server_ref_1 = handle.server_ref();
4735        let server_ref_2 = handle.server_ref();
4736
4737        // Client 1: sends a call that the server will process.
4738        let result_1: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4739        let result_1_clone = Arc::clone(&result_1);
4740        let (c1_handle, c1_stored) = scope
4741            .spawn(&mut runtime.state, &cx, move |cx| async move {
4742                let r = server_ref_1.call(&cx, CounterCall::Add(10)).await;
4743                *result_1_clone.lock() = Some(r);
4744            })
4745            .unwrap();
4746        let c1_id = c1_handle.task_id();
4747        runtime.state.store_spawned_task(c1_id, c1_stored);
4748
4749        // Client 2: sends a call that will queue behind client 1.
4750        let result_2: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4751        let result_2_clone = Arc::clone(&result_2);
4752        let (c2_handle, c2_stored) = scope
4753            .spawn(&mut runtime.state, &cx, move |cx| async move {
4754                let r = server_ref_2.call(&cx, CounterCall::Add(20)).await;
4755                *result_2_clone.lock() = Some(r);
4756            })
4757            .unwrap();
4758        let c2_id = c2_handle.task_id();
4759        runtime.state.store_spawned_task(c2_id, c2_stored);
4760
4761        // Schedule server + clients, let them make progress.
4762        {
4763            let mut sched = runtime.scheduler.lock();
4764            sched.schedule(server_task_id, 0);
4765            sched.schedule(c1_id, 0);
4766            sched.schedule(c2_id, 0);
4767        }
4768        runtime.run_until_idle();
4769
4770        // Stop the server (triggers cancellation of pending calls).
4771        handle.stop();
4772        {
4773            let mut sched = runtime.scheduler.lock();
4774            sched.schedule(server_task_id, 0);
4775            sched.schedule(c1_id, 0);
4776            sched.schedule(c2_id, 0);
4777        }
4778        runtime.run_until_quiescent();
4779
4780        // At least one client should have seen an error (ServerStopped or Cancelled)
4781        // because the server shut down. The first call may have succeeded before stop.
4782        // All outcomes are acceptable: Ok (processed before stop), ServerStopped,
4783        // Cancelled, or NoReply.
4784        drop(result_2.lock());
4785
4786        crate::test_complete!("conformance_cancel_propagation_to_queued_calls");
4787    }
4788
4789    /// After stop(), new calls and casts are rejected immediately.
4790    #[test]
4791    fn conformance_stopped_server_rejects_new_messages() {
4792        init_test("conformance_stopped_server_rejects_new_messages");
4793
4794        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4795        let region = runtime.state.create_root_region(Budget::INFINITE);
4796        let cx = Cx::for_testing();
4797        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4798
4799        let (handle, stored) = scope
4800            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4801            .unwrap();
4802        let server_task_id = handle.task_id();
4803        runtime.state.store_spawned_task(server_task_id, stored);
4804
4805        let server_ref = handle.server_ref();
4806
4807        // Start the server so init runs.
4808        {
4809            runtime.scheduler.lock().schedule(server_task_id, 0);
4810        }
4811        runtime.run_until_idle();
4812
4813        // Stop the server and drain.
4814        handle.stop();
4815        {
4816            runtime.scheduler.lock().schedule(server_task_id, 0);
4817        }
4818        runtime.run_until_quiescent();
4819
4820        // try_cast to a stopped server should fail.
4821        let cast_result = server_ref.try_cast(CounterCast::Reset);
4822        assert!(cast_result.is_err(), "cast to stopped server must fail");
4823
4824        crate::test_complete!("conformance_stopped_server_rejects_new_messages");
4825    }
4826
4827    /// Full lifecycle test: start, send calls+casts, stop, verify no leaked
4828    /// obligations or unprocessed messages. This exercises the complete
4829    /// GenServer protocol end-to-end.
4830    #[test]
4831    fn conformance_full_lifecycle_no_obligation_leaks() {
4832        init_test("conformance_full_lifecycle_no_obligation_leaks");
4833
4834        let budget = Budget::new().with_poll_quota(100_000);
4835        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4836        let region = runtime.state.create_root_region(budget);
4837        let cx = Cx::for_testing();
4838        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4839
4840        let (handle, stored) = scope
4841            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4842            .unwrap();
4843        let server_task_id = handle.task_id();
4844        runtime.state.store_spawned_task(server_task_id, stored);
4845
4846        let server_ref = handle.server_ref();
4847
4848        // Phase 1: Fire off a mix of casts and then a call.
4849        server_ref.try_cast(CounterCast::Reset).unwrap();
4850
4851        let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4852        let call_result_clone = Arc::clone(&call_result);
4853        let server_ref_for_call = handle.server_ref();
4854        let (client, client_stored) = scope
4855            .spawn(&mut runtime.state, &cx, move |cx| async move {
4856                let r = server_ref_for_call.call(&cx, CounterCall::Add(42)).await;
4857                *call_result_clone.lock() = Some(r);
4858            })
4859            .unwrap();
4860        let client_id = client.task_id();
4861        runtime.state.store_spawned_task(client_id, client_stored);
4862
4863        // Schedule both and let them process.
4864        {
4865            let mut sched = runtime.scheduler.lock();
4866            sched.schedule(server_task_id, 0);
4867            sched.schedule(client_id, 0);
4868        }
4869        runtime.run_until_idle();
4870
4871        // Re-schedule for message processing.
4872        {
4873            let mut sched = runtime.scheduler.lock();
4874            sched.schedule(server_task_id, 0);
4875            sched.schedule(client_id, 0);
4876        }
4877        runtime.run_until_idle();
4878
4879        // Phase 2: Verify the call result.
4880        let call_r = call_result.lock();
4881        if let Some(ref r) = *call_r {
4882            match r {
4883                Ok(value) => assert_eq!(*value, 42, "counter should be 42 after Add(42)"),
4884                Err(e) => unreachable!("unexpected call error: {e:?}"),
4885            }
4886        }
4887        drop(call_r);
4888
4889        // Phase 3: More casts to exercise the mailbox.
4890        server_ref.try_cast(CounterCast::Reset).unwrap();
4891
4892        // Phase 4: Graceful stop.
4893        handle.stop();
4894        {
4895            runtime.scheduler.lock().schedule(server_task_id, 0);
4896        }
4897        runtime.run_until_quiescent();
4898
4899        // If we get here without panics, no obligations were leaked.
4900        // TrackedOneshotPermit panics on drop if not consumed.
4901        crate::test_complete!("conformance_full_lifecycle_no_obligation_leaks");
4902    }
4903
4904    /// Deterministic replay: running the same GenServer scenario with the
4905    /// same seed must produce identical state transitions.
4906    #[test]
4907    #[allow(clippy::items_after_statements)]
4908    fn conformance_deterministic_replay_with_seed() {
4909        init_test("conformance_deterministic_replay_with_seed");
4910
4911        fn run_scenario(seed: u64) -> Vec<u64> {
4912            let config = crate::lab::LabConfig::new(seed);
4913            let mut runtime = crate::lab::LabRuntime::new(config);
4914            let budget = Budget::new().with_poll_quota(100_000);
4915            let region = runtime.state.create_root_region(budget);
4916            let cx = Cx::for_testing();
4917            let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4918
4919            let (handle, stored) = scope
4920                .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4921                .unwrap();
4922            let server_task_id = handle.task_id();
4923            runtime.state.store_spawned_task(server_task_id, stored);
4924
4925            let results: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
4926
4927            // Spawn 3 clients that each Add different amounts.
4928            let mut client_ids = Vec::new();
4929            for i in 1..=3u64 {
4930                let server_ref = handle.server_ref();
4931                let results_clone = Arc::clone(&results);
4932                let (ch, cs) = scope
4933                    .spawn(&mut runtime.state, &cx, move |cx| async move {
4934                        if let Ok(val) = server_ref.call(&cx, CounterCall::Add(i * 10)).await {
4935                            results_clone.lock().push(val);
4936                        }
4937                    })
4938                    .unwrap();
4939                let cid = ch.task_id();
4940                runtime.state.store_spawned_task(cid, cs);
4941                client_ids.push(cid);
4942            }
4943
4944            // Schedule all tasks.
4945            {
4946                let mut sched = runtime.scheduler.lock();
4947                sched.schedule(server_task_id, 0);
4948                for &cid in &client_ids {
4949                    sched.schedule(cid, 0);
4950                }
4951            }
4952            runtime.run_until_idle();
4953
4954            // Re-schedule to process enqueued calls.
4955            {
4956                let mut sched = runtime.scheduler.lock();
4957                sched.schedule(server_task_id, 0);
4958                for &cid in &client_ids {
4959                    sched.schedule(cid, 0);
4960                }
4961            }
4962            runtime.run_until_idle();
4963
4964            // Stop and drain.
4965            handle.stop();
4966            {
4967                runtime.scheduler.lock().schedule(server_task_id, 0);
4968            }
4969            runtime.run_until_quiescent();
4970
4971            results.lock().clone()
4972        }
4973
4974        // Same seed must produce identical results.
4975        let run_a = run_scenario(42);
4976        let run_b = run_scenario(42);
4977        assert_eq!(
4978            run_a, run_b,
4979            "same seed must produce identical results: {run_a:?} vs {run_b:?}"
4980        );
4981
4982        crate::test_complete!("conformance_deterministic_replay_with_seed");
4983    }
4984
4985    /// Mailbox overflow with Reject policy: deterministic rejection when full.
4986    #[test]
4987    fn conformance_mailbox_overflow_reject_deterministic() {
4988        init_test("conformance_mailbox_overflow_reject_deterministic");
4989
4990        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4991        let region = runtime.state.create_root_region(Budget::INFINITE);
4992        let cx = Cx::for_testing();
4993        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4994
4995        // Capacity-2 mailbox with default Reject policy.
4996        let (handle, stored) = scope
4997            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 2)
4998            .unwrap();
4999        let server_task_id = handle.task_id();
5000        runtime.state.store_spawned_task(server_task_id, stored);
5001
5002        let server_ref = handle.server_ref();
5003
5004        // Fill the mailbox to capacity.
5005        server_ref.try_cast(CounterCast::Reset).unwrap();
5006        server_ref.try_cast(CounterCast::Reset).unwrap();
5007
5008        // Third cast must be rejected (mailbox full, Reject policy).
5009        let overflow = server_ref.try_cast(CounterCast::Reset);
5010        assert!(
5011            overflow.is_err(),
5012            "third cast to capacity-2 mailbox must fail with Reject policy"
5013        );
5014        match overflow.unwrap_err() {
5015            CastError::Full => { /* expected */ }
5016            other => unreachable!("expected CastError::Full, got {other:?}"),
5017        }
5018
5019        // Drain and cleanup.
5020        drop(handle);
5021        {
5022            runtime.scheduler.lock().schedule(server_task_id, 0);
5023        }
5024        runtime.run_until_quiescent();
5025
5026        crate::test_complete!("conformance_mailbox_overflow_reject_deterministic");
5027    }
5028
5029    /// DropOldest eviction preserves the newest messages when full.
5030    #[test]
5031    fn conformance_mailbox_drop_oldest_preserves_newest() {
5032        init_test("conformance_mailbox_drop_oldest_preserves_newest");
5033
5034        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5035        let region = runtime.state.create_root_region(Budget::INFINITE);
5036        let cx = Cx::for_testing();
5037        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
5038
5039        // DropOldest counter with capacity 2.
5040        let (handle, stored) = scope
5041            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 2)
5042            .unwrap();
5043        let server_task_id = handle.task_id();
5044        runtime.state.store_spawned_task(server_task_id, stored);
5045
5046        let server_ref = handle.server_ref();
5047
5048        // Fill mailbox with Set(1) and Set(2).
5049        server_ref.try_cast(TaggedCast::Set(1)).unwrap();
5050        server_ref.try_cast(TaggedCast::Set(2)).unwrap();
5051
5052        // Overflow with Set(100) — should evict Set(1), keeping Set(2) and Set(100).
5053        server_ref.try_cast(TaggedCast::Set(100)).unwrap();
5054
5055        // Process all messages.
5056        {
5057            runtime.scheduler.lock().schedule(server_task_id, 0);
5058        }
5059        runtime.run_until_idle();
5060
5061        // The final value should be 100 (last Set wins).
5062        let result_ref = handle.server_ref();
5063        let result: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
5064        let result_clone = Arc::clone(&result);
5065        let (ch, cs) = scope
5066            .spawn(&mut runtime.state, &cx, move |cx| async move {
5067                if let Ok(val) = result_ref.call(&cx, CounterCall::Get).await {
5068                    *result_clone.lock() = Some(val);
5069                }
5070            })
5071            .unwrap();
5072        let cid = ch.task_id();
5073        runtime.state.store_spawned_task(cid, cs);
5074
5075        {
5076            let mut sched = runtime.scheduler.lock();
5077            sched.schedule(server_task_id, 0);
5078            sched.schedule(cid, 0);
5079        }
5080        runtime.run_until_idle();
5081        {
5082            let mut sched = runtime.scheduler.lock();
5083            sched.schedule(server_task_id, 0);
5084            sched.schedule(cid, 0);
5085        }
5086        runtime.run_until_idle();
5087
5088        // The server should have processed Set(2), then Set(100).
5089        // Set(1) was evicted. Final count = 100.
5090        assert_eq!(
5091            *result.lock(),
5092            Some(100),
5093            "DropOldest should evict oldest, keeping newest"
5094        );
5095
5096        drop(handle);
5097        {
5098            runtime.scheduler.lock().schedule(server_task_id, 0);
5099        }
5100        runtime.run_until_quiescent();
5101
5102        crate::test_complete!("conformance_mailbox_drop_oldest_preserves_newest");
5103    }
5104
5105    /// Budget-driven timeout: a call with a tight poll_quota budget
5106    /// must terminate deterministically without wall-clock dependence.
5107    #[test]
5108    #[allow(clippy::items_after_statements)]
5109    fn conformance_budget_driven_call_timeout() {
5110        // Server that never replies (intentionally leaves reply unconsumed
5111        // by aborting it, which is the correct way to not reply).
5112        struct SlowServer;
5113        impl GenServer for SlowServer {
5114            type Call = ();
5115            type Reply = ();
5116            type Cast = ();
5117            type Info = SystemMsg;
5118
5119            fn handle_call(
5120                &mut self,
5121                _cx: &Cx,
5122                _request: (),
5123                reply: Reply<()>,
5124            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5125                // Abort the reply obligation (correct: no leak).
5126                let _proof = reply.abort();
5127                Box::pin(async {})
5128            }
5129        }
5130
5131        init_test("conformance_budget_driven_call_timeout");
5132
5133        let budget = Budget::new().with_poll_quota(100_000);
5134        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5135        let region = runtime.state.create_root_region(budget);
5136        let cx = Cx::for_testing();
5137        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5138
5139        let (handle, stored) = scope
5140            .spawn_gen_server(&mut runtime.state, &cx, SlowServer, 32)
5141            .unwrap();
5142        let server_task_id = handle.task_id();
5143        runtime.state.store_spawned_task(server_task_id, stored);
5144
5145        let server_ref = handle.server_ref();
5146
5147        // Client calls the server. The server aborts the reply, so the
5148        // client should see a channel close / error.
5149        let call_result: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
5150        let call_result_clone = Arc::clone(&call_result);
5151        let (ch, cs) = scope
5152            .spawn(&mut runtime.state, &cx, move |cx| async move {
5153                let r = server_ref.call(&cx, ()).await;
5154                *call_result_clone.lock() = Some(r);
5155            })
5156            .unwrap();
5157        let client_id = ch.task_id();
5158        runtime.state.store_spawned_task(client_id, cs);
5159
5160        // Run everything.
5161        {
5162            let mut sched = runtime.scheduler.lock();
5163            sched.schedule(server_task_id, 0);
5164            sched.schedule(client_id, 0);
5165        }
5166        runtime.run_until_idle();
5167        {
5168            let mut sched = runtime.scheduler.lock();
5169            sched.schedule(server_task_id, 0);
5170            sched.schedule(client_id, 0);
5171        }
5172        runtime.run_until_idle();
5173
5174        // The client should have received an error since the server aborted.
5175        if let Some(ref result) = *call_result.lock() {
5176            assert!(result.is_err(), "aborted reply should result in call error");
5177        }
5178
5179        // Clean up.
5180        handle.stop();
5181        {
5182            runtime.scheduler.lock().schedule(server_task_id, 0);
5183        }
5184        runtime.run_until_quiescent();
5185
5186        crate::test_complete!("conformance_budget_driven_call_timeout");
5187    }
5188
5189    /// Reply linearity: verify that Reply::send commits the obligation
5190    /// and the committed proof is returned.
5191    #[test]
5192    #[allow(clippy::items_after_statements)]
5193    fn conformance_reply_linearity_send_commits() {
5194        // Server that tracks whether reply was committed.
5195        struct ReplyTracker {
5196            committed: Arc<AtomicU8>,
5197        }
5198
5199        impl GenServer for ReplyTracker {
5200            type Call = u64;
5201            type Reply = u64;
5202            type Cast = ();
5203            type Info = SystemMsg;
5204
5205            fn handle_call(
5206                &mut self,
5207                _cx: &Cx,
5208                request: u64,
5209                reply: Reply<u64>,
5210            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5211                match reply.send(request * 2) {
5212                    ReplyOutcome::Committed(_proof) => {
5213                        self.committed.store(1, Ordering::SeqCst);
5214                    }
5215                    ReplyOutcome::CallerGone => {
5216                        self.committed.store(2, Ordering::SeqCst);
5217                    }
5218                }
5219                Box::pin(async {})
5220            }
5221        }
5222
5223        init_test("conformance_reply_linearity_send_commits");
5224
5225        let budget = Budget::new().with_poll_quota(100_000);
5226        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5227        let region = runtime.state.create_root_region(budget);
5228        let cx = Cx::for_testing();
5229        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5230
5231        let committed = Arc::new(AtomicU8::new(0));
5232        let server = ReplyTracker {
5233            committed: Arc::clone(&committed),
5234        };
5235
5236        let (handle, stored) = scope
5237            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5238            .unwrap();
5239        let server_task_id = handle.task_id();
5240        runtime.state.store_spawned_task(server_task_id, stored);
5241
5242        let server_ref = handle.server_ref();
5243        let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
5244        let call_result_clone = Arc::clone(&call_result);
5245
5246        let (ch, cs) = scope
5247            .spawn(&mut runtime.state, &cx, move |cx| async move {
5248                let r = server_ref.call(&cx, 21).await;
5249                *call_result_clone.lock() = Some(r);
5250            })
5251            .unwrap();
5252        let client_id = ch.task_id();
5253        runtime.state.store_spawned_task(client_id, cs);
5254
5255        {
5256            let mut sched = runtime.scheduler.lock();
5257            sched.schedule(server_task_id, 0);
5258            sched.schedule(client_id, 0);
5259        }
5260        runtime.run_until_idle();
5261        {
5262            let mut sched = runtime.scheduler.lock();
5263            sched.schedule(server_task_id, 0);
5264            sched.schedule(client_id, 0);
5265        }
5266        runtime.run_until_idle();
5267
5268        // Verify reply was committed (not CallerGone).
5269        assert_eq!(
5270            committed.load(Ordering::SeqCst),
5271            1,
5272            "reply must be committed when caller is waiting"
5273        );
5274
5275        // Verify the caller received the correct value.
5276        {
5277            let r = call_result.lock();
5278            match r.as_ref() {
5279                Some(Ok(value)) => assert_eq!(*value, 42, "21 * 2 = 42"),
5280                other => unreachable!("expected Ok(42), got {other:?}"),
5281            }
5282        }
5283
5284        handle.stop();
5285        {
5286            runtime.scheduler.lock().schedule(server_task_id, 0);
5287        }
5288        runtime.run_until_quiescent();
5289
5290        crate::test_complete!("conformance_reply_linearity_send_commits");
5291    }
5292
5293    /// Reply linearity: verify that Reply::abort produces an AbortedProof
5294    /// and the caller receives an error (not a value).
5295    #[test]
5296    #[allow(clippy::items_after_statements)]
5297    fn conformance_reply_linearity_abort_is_clean() {
5298        struct AbortServer {
5299            aborted: Arc<AtomicU8>,
5300        }
5301
5302        impl GenServer for AbortServer {
5303            type Call = ();
5304            type Reply = ();
5305            type Cast = ();
5306            type Info = SystemMsg;
5307
5308            fn handle_call(
5309                &mut self,
5310                _cx: &Cx,
5311                _request: (),
5312                reply: Reply<()>,
5313            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5314                let _proof = reply.abort();
5315                self.aborted.store(1, Ordering::SeqCst);
5316                Box::pin(async {})
5317            }
5318        }
5319
5320        init_test("conformance_reply_linearity_abort_is_clean");
5321
5322        let budget = Budget::new().with_poll_quota(100_000);
5323        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5324        let region = runtime.state.create_root_region(budget);
5325        let cx = Cx::for_testing();
5326        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5327
5328        let aborted = Arc::new(AtomicU8::new(0));
5329        let server = AbortServer {
5330            aborted: Arc::clone(&aborted),
5331        };
5332
5333        let (handle, stored) = scope
5334            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5335            .unwrap();
5336        let server_task_id = handle.task_id();
5337        runtime.state.store_spawned_task(server_task_id, stored);
5338
5339        let server_ref = handle.server_ref();
5340        let call_err: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
5341        let call_err_clone = Arc::clone(&call_err);
5342
5343        let (ch, cs) = scope
5344            .spawn(&mut runtime.state, &cx, move |cx| async move {
5345                let r = server_ref.call(&cx, ()).await;
5346                *call_err_clone.lock() = Some(r);
5347            })
5348            .unwrap();
5349        let client_id = ch.task_id();
5350        runtime.state.store_spawned_task(client_id, cs);
5351
5352        {
5353            let mut sched = runtime.scheduler.lock();
5354            sched.schedule(server_task_id, 0);
5355            sched.schedule(client_id, 0);
5356        }
5357        runtime.run_until_idle();
5358        {
5359            let mut sched = runtime.scheduler.lock();
5360            sched.schedule(server_task_id, 0);
5361            sched.schedule(client_id, 0);
5362        }
5363        runtime.run_until_idle();
5364
5365        // Server should have aborted.
5366        assert_eq!(
5367            aborted.load(Ordering::SeqCst),
5368            1,
5369            "server must have called abort()"
5370        );
5371
5372        // Caller should see an error, not Ok.
5373        {
5374            let r = call_err.lock();
5375            match r.as_ref() {
5376                Some(Err(_)) => { /* expected: aborted reply -> error */ }
5377                other => unreachable!("expected call error after abort, got {other:?}"),
5378            }
5379        }
5380
5381        handle.stop();
5382        {
5383            runtime.scheduler.lock().schedule(server_task_id, 0);
5384        }
5385        runtime.run_until_quiescent();
5386
5387        crate::test_complete!("conformance_reply_linearity_abort_is_clean");
5388    }
5389
5390    #[test]
5391    #[allow(clippy::items_after_statements)]
5392    fn conformance_panicking_handle_call_returns_join_error_without_double_panic() {
5393        #[derive(Debug)]
5394        struct PanicOnCall;
5395
5396        impl GenServer for PanicOnCall {
5397            type Call = ();
5398            type Reply = ();
5399            type Cast = ();
5400            type Info = SystemMsg;
5401
5402            fn handle_call(
5403                &mut self,
5404                _cx: &Cx,
5405                _request: (),
5406                _reply: Reply<()>,
5407            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5408                std::panic::panic_any("intentional handle_call panic");
5409            }
5410        }
5411
5412        init_test("conformance_panicking_handle_call_returns_join_error_without_double_panic");
5413
5414        let budget = Budget::new().with_poll_quota(100_000);
5415        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5416        let region = runtime.state.create_root_region(budget);
5417        let cx = Cx::for_testing();
5418        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5419
5420        let (mut handle, stored) = scope
5421            .spawn_gen_server(&mut runtime.state, &cx, PanicOnCall, 32)
5422            .unwrap();
5423        let server_task_id = handle.task_id();
5424        runtime.state.store_spawned_task(server_task_id, stored);
5425
5426        let server_ref = handle.server_ref();
5427        let (mut client_handle, client_stored) = scope
5428            .spawn(&mut runtime.state, &cx, move |cx| async move {
5429                server_ref.call(&cx, ()).await
5430            })
5431            .unwrap();
5432        let client_task_id = client_handle.task_id();
5433        runtime
5434            .state
5435            .store_spawned_task(client_task_id, client_stored);
5436
5437        {
5438            let mut sched = runtime.scheduler.lock();
5439            sched.schedule(server_task_id, 0);
5440            sched.schedule(client_task_id, 0);
5441        }
5442        runtime.run_until_idle();
5443        {
5444            let mut sched = runtime.scheduler.lock();
5445            sched.schedule(server_task_id, 0);
5446            sched.schedule(client_task_id, 0);
5447        }
5448        runtime.run_until_idle();
5449
5450        let join = futures_lite::future::block_on(handle.join(&cx));
5451        assert!(
5452            matches!(join, Err(JoinError::Panicked(_))),
5453            "panicking call handler should surface JoinError::Panicked"
5454        );
5455
5456        let client_result =
5457            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
5458        assert!(
5459            matches!(client_result, Err(CallError::NoReply)),
5460            "caller should observe closed reply after panic, got {client_result:?}"
5461        );
5462
5463        crate::test_complete!(
5464            "conformance_panicking_handle_call_returns_join_error_without_double_panic"
5465        );
5466    }
5467
5468    /// On-stop processes remaining casts before completing (drain semantics).
5469    /// Verifies that queued casts are not silently dropped on shutdown.
5470    #[test]
5471    #[allow(clippy::items_after_statements)]
5472    fn conformance_drain_processes_queued_casts_on_stop() {
5473        struct AccumulatorServer {
5474            sum: u64,
5475            final_sum: Arc<AtomicU64>,
5476        }
5477
5478        enum AccumCall {
5479            #[allow(dead_code)]
5480            GetSum,
5481        }
5482        enum AccumCast {
5483            Add(u64),
5484        }
5485
5486        impl GenServer for AccumulatorServer {
5487            type Call = AccumCall;
5488            type Reply = u64;
5489            type Cast = AccumCast;
5490            type Info = SystemMsg;
5491
5492            fn handle_call(
5493                &mut self,
5494                _cx: &Cx,
5495                _request: AccumCall,
5496                reply: Reply<u64>,
5497            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5498                let _ = reply.send(self.sum);
5499                Box::pin(async {})
5500            }
5501
5502            fn handle_cast(
5503                &mut self,
5504                _cx: &Cx,
5505                msg: AccumCast,
5506            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5507                match msg {
5508                    AccumCast::Add(n) => self.sum += n,
5509                }
5510                Box::pin(async {})
5511            }
5512
5513            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5514                self.final_sum.store(self.sum, Ordering::SeqCst);
5515                Box::pin(async {})
5516            }
5517        }
5518
5519        init_test("conformance_drain_processes_queued_casts_on_stop");
5520
5521        let budget = Budget::new().with_poll_quota(100_000);
5522        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5523        let region = runtime.state.create_root_region(budget);
5524        let cx = Cx::for_testing();
5525        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5526
5527        let final_sum = Arc::new(AtomicU64::new(0));
5528        let server = AccumulatorServer {
5529            sum: 0,
5530            final_sum: Arc::clone(&final_sum),
5531        };
5532
5533        let (handle, stored) = scope
5534            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5535            .unwrap();
5536        let server_task_id = handle.task_id();
5537        runtime.state.store_spawned_task(server_task_id, stored);
5538
5539        let server_ref = handle.server_ref();
5540
5541        // Queue up several casts.
5542        server_ref.try_cast(AccumCast::Add(10)).unwrap();
5543        server_ref.try_cast(AccumCast::Add(20)).unwrap();
5544        server_ref.try_cast(AccumCast::Add(30)).unwrap();
5545
5546        // Start the server (init runs, then it will process casts).
5547        {
5548            runtime.scheduler.lock().schedule(server_task_id, 0);
5549        }
5550        runtime.run_until_idle();
5551
5552        // Stop and let it drain remaining messages.
5553        handle.stop();
5554        {
5555            runtime.scheduler.lock().schedule(server_task_id, 0);
5556        }
5557        runtime.run_until_quiescent();
5558
5559        // The server should have processed all casts before stopping.
5560        let sum = final_sum.load(Ordering::SeqCst);
5561        assert_eq!(
5562            sum, 60,
5563            "server must drain queued casts before stopping: 10+20+30=60, got {sum}"
5564        );
5565
5566        crate::test_complete!("conformance_drain_processes_queued_casts_on_stop");
5567    }
5568
5569    // =========================================================================
5570    // Named GenServer integration tests (bd-23az1)
5571    // =========================================================================
5572
5573    /// Named server: spawn registers name, whereis finds it.
5574    #[test]
5575    fn named_server_register_and_whereis() {
5576        crate::test_utils::init_test_logging();
5577        crate::test_phase!("named_server_register_and_whereis");
5578
5579        let budget = Budget::new().with_poll_quota(100_000);
5580        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5581        let region = runtime.state.create_root_region(budget);
5582        let cx = Cx::for_testing();
5583        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5584        let mut registry = crate::cx::NameRegistry::new();
5585
5586        #[allow(clippy::items_after_statements)]
5587        #[derive(Debug)]
5588        struct Counter(u64);
5589
5590        #[allow(clippy::items_after_statements)]
5591        impl GenServer for Counter {
5592            type Call = u64;
5593            type Reply = u64;
5594            type Cast = ();
5595            type Info = SystemMsg;
5596
5597            fn handle_call(
5598                &mut self,
5599                _cx: &Cx,
5600                request: u64,
5601                reply: Reply<u64>,
5602            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5603                self.0 += request;
5604                let _ = reply.send(self.0);
5605                Box::pin(async {})
5606            }
5607
5608            fn handle_cast(
5609                &mut self,
5610                _cx: &Cx,
5611                _msg: (),
5612            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5613                Box::pin(async {})
5614            }
5615        }
5616
5617        let now = crate::types::Time::ZERO;
5618        let (mut named_handle, stored) = scope
5619            .spawn_named_gen_server(
5620                &mut runtime.state,
5621                &cx,
5622                &mut registry,
5623                "my_counter",
5624                Counter(0),
5625                32,
5626                now,
5627            )
5628            .unwrap();
5629
5630        let task_id = named_handle.task_id();
5631        runtime.state.store_spawned_task(task_id, stored);
5632
5633        // Name should be visible via whereis.
5634        assert_eq!(registry.whereis("my_counter"), Some(task_id));
5635        assert_eq!(named_handle.name(), "my_counter");
5636
5637        // Clean up: stop the task, drive it to completion, then release the name.
5638        named_handle.stop();
5639        {
5640            runtime.scheduler.lock().schedule(task_id, 0);
5641        }
5642        runtime.run_until_quiescent();
5643        let release_now = runtime.state.now;
5644        named_handle
5645            .release_name(&mut registry, release_now)
5646            .expect("release ok");
5647        assert!(
5648            registry.whereis("my_counter").is_none(),
5649            "name must be removed once shutdown completes",
5650        );
5651
5652        let (mut restarted, stored) = scope
5653            .spawn_named_gen_server(
5654                &mut runtime.state,
5655                &cx,
5656                &mut registry,
5657                "my_counter",
5658                Counter(0),
5659                32,
5660                now,
5661            )
5662            .expect("name should be reusable after release_name");
5663        runtime
5664            .state
5665            .store_spawned_task(restarted.task_id(), stored);
5666        restarted.stop();
5667        {
5668            runtime.scheduler.lock().schedule(restarted.task_id(), 0);
5669        }
5670        runtime.run_until_quiescent();
5671        let restart_release_now = runtime.state.now;
5672        restarted
5673            .release_name(&mut registry, restart_release_now)
5674            .expect("restart cleanup ok");
5675
5676        crate::test_complete!("named_server_register_and_whereis");
5677    }
5678
5679    /// Named server: release_name must not remove the name until shutdown fully drains.
5680    #[test]
5681    fn named_server_release_name_requires_stopped_server() {
5682        crate::test_utils::init_test_logging();
5683        crate::test_phase!("named_server_release_name_requires_stopped_server");
5684
5685        let budget = Budget::new().with_poll_quota(100_000);
5686        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5687        let region = runtime.state.create_root_region(budget);
5688        let cx = Cx::for_testing();
5689        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5690        let mut registry = crate::cx::NameRegistry::new();
5691
5692        #[allow(clippy::items_after_statements)]
5693        #[derive(Debug)]
5694        struct Noop;
5695
5696        #[allow(clippy::items_after_statements)]
5697        impl GenServer for Noop {
5698            type Call = ();
5699            type Reply = ();
5700            type Cast = ();
5701            type Info = SystemMsg;
5702
5703            fn handle_call(
5704                &mut self,
5705                _cx: &Cx,
5706                _request: (),
5707                reply: Reply<()>,
5708            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5709                let _ = reply.send(());
5710                Box::pin(async {})
5711            }
5712        }
5713
5714        let now = crate::types::Time::ZERO;
5715        let (mut handle, stored) = scope
5716            .spawn_named_gen_server(
5717                &mut runtime.state,
5718                &cx,
5719                &mut registry,
5720                "still_running",
5721                Noop,
5722                8,
5723                now,
5724            )
5725            .unwrap();
5726        runtime.state.store_spawned_task(handle.task_id(), stored);
5727
5728        assert!(
5729            matches!(
5730                handle.release_name(&mut registry, now),
5731                Err(ReleaseNameError::StillRunning)
5732            ),
5733            "release_name must fail closed while the server is still live",
5734        );
5735        assert_eq!(
5736            registry.whereis("still_running"),
5737            Some(handle.task_id()),
5738            "premature release_name must not remove the registered name",
5739        );
5740
5741        handle.stop();
5742        assert!(
5743            matches!(
5744                handle.release_name(&mut registry, now),
5745                Err(ReleaseNameError::StillRunning)
5746            ),
5747            "release_name must keep failing closed after stop() until the task actually finishes",
5748        );
5749        assert_eq!(
5750            registry.whereis("still_running"),
5751            Some(handle.task_id()),
5752            "release_name during shutdown drain must not remove the registered name",
5753        );
5754        {
5755            runtime.scheduler.lock().schedule(handle.task_id(), 0);
5756        }
5757        runtime.run_until_quiescent();
5758        let release_now = runtime.state.now;
5759        handle
5760            .release_name(&mut registry, release_now)
5761            .expect("release after shutdown should succeed");
5762
5763        crate::test_complete!("named_server_release_name_requires_stopped_server");
5764    }
5765
5766    /// Named server: duplicate name is rejected.
5767    #[test]
5768    fn named_server_duplicate_name_rejected() {
5769        crate::test_utils::init_test_logging();
5770        crate::test_phase!("named_server_duplicate_name_rejected");
5771
5772        let budget = Budget::new().with_poll_quota(100_000);
5773        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5774        let region = runtime.state.create_root_region(budget);
5775        let cx = Cx::for_testing();
5776        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5777        let mut registry = crate::cx::NameRegistry::new();
5778
5779        #[allow(clippy::items_after_statements)]
5780        #[derive(Debug)]
5781        struct Dummy;
5782
5783        #[allow(clippy::items_after_statements)]
5784        impl GenServer for Dummy {
5785            type Call = ();
5786            type Reply = ();
5787            type Cast = ();
5788            type Info = SystemMsg;
5789
5790            fn handle_call(
5791                &mut self,
5792                _cx: &Cx,
5793                _request: (),
5794                reply: Reply<()>,
5795            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5796                let _ = reply.send(());
5797                Box::pin(async {})
5798            }
5799
5800            fn handle_cast(
5801                &mut self,
5802                _cx: &Cx,
5803                _msg: (),
5804            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5805                Box::pin(async {})
5806            }
5807        }
5808
5809        let now = crate::types::Time::ZERO;
5810
5811        // First spawn succeeds.
5812        let (mut h1, s1) = scope
5813            .spawn_named_gen_server(
5814                &mut runtime.state,
5815                &cx,
5816                &mut registry,
5817                "singleton",
5818                Dummy,
5819                8,
5820                now,
5821            )
5822            .unwrap();
5823        runtime.state.store_spawned_task(h1.task_id(), s1);
5824
5825        // Second spawn with same name fails.
5826        let result = scope.spawn_named_gen_server(
5827            &mut runtime.state,
5828            &cx,
5829            &mut registry,
5830            "singleton",
5831            Dummy,
5832            8,
5833            now,
5834        );
5835        assert!(
5836            matches!(result, Err(NamedSpawnError::NameTaken(_))),
5837            "duplicate name should be rejected"
5838        );
5839
5840        // Original is still registered.
5841        assert_eq!(registry.whereis("singleton"), Some(h1.task_id()));
5842
5843        // Verify the orphaned task record from the failed spawn was cleaned up.
5844        // The region should only contain the first task; no leaked task record
5845        // that would prevent region quiescence.
5846        let region_tasks = runtime.state.region(region).unwrap().task_ids();
5847        assert_eq!(
5848            region_tasks,
5849            vec![h1.task_id()],
5850            "region should only have the first task; orphaned task must be removed"
5851        );
5852
5853        h1.stop();
5854        runtime.scheduler.lock().schedule(h1.task_id(), 0);
5855        runtime.run_until_quiescent();
5856        let release_now = runtime.state.now;
5857        h1.release_name(&mut registry, release_now)
5858            .expect("release ok");
5859
5860        crate::test_complete!("named_server_duplicate_name_rejected");
5861    }
5862
5863    /// Named server: abort_lease removes name from registry.
5864    #[test]
5865    fn named_server_abort_lease_removes_name() {
5866        crate::test_utils::init_test_logging();
5867        crate::test_phase!("named_server_abort_lease_removes_name");
5868
5869        let budget = Budget::new().with_poll_quota(100_000);
5870        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5871        let region = runtime.state.create_root_region(budget);
5872        let cx = Cx::for_testing();
5873        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5874        let mut registry = crate::cx::NameRegistry::new();
5875
5876        #[allow(clippy::items_after_statements)]
5877        #[derive(Debug)]
5878        struct Noop;
5879
5880        #[allow(clippy::items_after_statements)]
5881        impl GenServer for Noop {
5882            type Call = ();
5883            type Reply = ();
5884            type Cast = ();
5885            type Info = SystemMsg;
5886
5887            fn handle_call(
5888                &mut self,
5889                _cx: &Cx,
5890                _req: (),
5891                reply: Reply<()>,
5892            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5893                let _ = reply.send(());
5894                Box::pin(async {})
5895            }
5896
5897            fn handle_cast(
5898                &mut self,
5899                _cx: &Cx,
5900                _msg: (),
5901            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5902                Box::pin(async {})
5903            }
5904        }
5905
5906        let now = crate::types::Time::ZERO;
5907        let (mut handle, stored) = scope
5908            .spawn_named_gen_server(
5909                &mut runtime.state,
5910                &cx,
5911                &mut registry,
5912                "temp_name",
5913                Noop,
5914                8,
5915                now,
5916            )
5917            .unwrap();
5918        runtime.state.store_spawned_task(handle.task_id(), stored);
5919        let mut alias = registry
5920            .register("temp_alias", handle.task_id(), scope.region_id(), now)
5921            .expect("second alias should register for same task");
5922
5923        // Name is registered.
5924        assert!(registry.whereis("temp_name").is_some());
5925        assert_eq!(registry.whereis("temp_alias"), Some(handle.task_id()));
5926
5927        // Abort the lease (simulating cancellation).
5928        handle.abort_lease(&mut registry, now).unwrap();
5929        assert!(
5930            registry.whereis("temp_name").is_none(),
5931            "aborting the lease must remove the registry entry",
5932        );
5933        assert_eq!(
5934            registry.whereis("temp_alias"),
5935            Some(handle.task_id()),
5936            "aborting one named handle must not drop other names owned by the same task",
5937        );
5938        registry
5939            .unregister_owned_and_grant(&alias, now)
5940            .expect("manual alias cleanup should succeed");
5941        alias.abort().unwrap();
5942
5943        crate::test_complete!("named_server_abort_lease_removes_name");
5944    }
5945
5946    /// Named server: take_lease allows manual lifecycle management.
5947    #[test]
5948    fn named_server_take_lease_manual_management() {
5949        crate::test_utils::init_test_logging();
5950        crate::test_phase!("named_server_take_lease_manual_management");
5951
5952        let budget = Budget::new().with_poll_quota(100_000);
5953        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5954        let region = runtime.state.create_root_region(budget);
5955        let cx = Cx::for_testing();
5956        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5957        let mut registry = crate::cx::NameRegistry::new();
5958
5959        #[allow(clippy::items_after_statements)]
5960        #[derive(Debug)]
5961        struct Noop2;
5962
5963        #[allow(clippy::items_after_statements)]
5964        impl GenServer for Noop2 {
5965            type Call = ();
5966            type Reply = ();
5967            type Cast = ();
5968            type Info = SystemMsg;
5969
5970            fn handle_call(
5971                &mut self,
5972                _cx: &Cx,
5973                _req: (),
5974                reply: Reply<()>,
5975            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5976                let _ = reply.send(());
5977                Box::pin(async {})
5978            }
5979
5980            fn handle_cast(
5981                &mut self,
5982                _cx: &Cx,
5983                _msg: (),
5984            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5985                Box::pin(async {})
5986            }
5987        }
5988
5989        let now = crate::types::Time::ZERO;
5990        let (mut handle, stored) = scope
5991            .spawn_named_gen_server(
5992                &mut runtime.state,
5993                &cx,
5994                &mut registry,
5995                "manual_name",
5996                Noop2,
5997                8,
5998                now,
5999            )
6000            .unwrap();
6001        runtime.state.store_spawned_task(handle.task_id(), stored);
6002
6003        // Take the lease for manual management.
6004        let mut lease = handle.take_lease().unwrap();
6005        assert!(handle.take_lease().is_none(), "second take returns None");
6006
6007        // name() returns placeholder when lease is taken.
6008        assert_eq!(handle.name(), "(released)");
6009
6010        // Resolve the full manual lifecycle: remove the matching registry entry,
6011        // then resolve the lease token.
6012        registry
6013            .unregister_owned_and_grant(&lease, now)
6014            .expect("manual lease cleanup should remove the matching name");
6015        lease
6016            .abort()
6017            .expect("manual lease abort should resolve the token");
6018        assert!(
6019            registry.whereis("manual_name").is_none(),
6020            "manual lifecycle management must remove the registry entry as well as resolve the token",
6021        );
6022
6023        crate::test_complete!("named_server_take_lease_manual_management");
6024    }
6025
6026    /// Named server: release_name fails closed after take_lease removed the lease.
6027    #[test]
6028    fn named_server_release_name_after_take_lease_returns_already_resolved() {
6029        crate::test_utils::init_test_logging();
6030        crate::test_phase!("named_server_release_name_after_take_lease_returns_already_resolved");
6031
6032        let budget = Budget::new().with_poll_quota(100_000);
6033        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6034        let region = runtime.state.create_root_region(budget);
6035        let cx = Cx::for_testing();
6036        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6037        let mut registry = crate::cx::NameRegistry::new();
6038
6039        #[allow(clippy::items_after_statements)]
6040        #[derive(Debug)]
6041        struct Noop3;
6042
6043        #[allow(clippy::items_after_statements)]
6044        impl GenServer for Noop3 {
6045            type Call = ();
6046            type Reply = ();
6047            type Cast = ();
6048            type Info = SystemMsg;
6049
6050            fn handle_call(
6051                &mut self,
6052                _cx: &Cx,
6053                _req: (),
6054                reply: Reply<()>,
6055            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6056                let _ = reply.send(());
6057                Box::pin(async {})
6058            }
6059
6060            fn handle_cast(
6061                &mut self,
6062                _cx: &Cx,
6063                _msg: (),
6064            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6065                Box::pin(async {})
6066            }
6067        }
6068
6069        let now = crate::types::Time::ZERO;
6070        let (mut handle, stored) = scope
6071            .spawn_named_gen_server(
6072                &mut runtime.state,
6073                &cx,
6074                &mut registry,
6075                "take_then_stop",
6076                Noop3,
6077                8,
6078                now,
6079            )
6080            .unwrap();
6081        runtime.state.store_spawned_task(handle.task_id(), stored);
6082
6083        let mut lease = handle.take_lease().expect("lease should be present");
6084        assert_eq!(
6085            handle.handle.state.load(),
6086            ActorState::Created,
6087            "taking the lease alone must not stop the server"
6088        );
6089        assert!(
6090            matches!(
6091                handle.release_name(&mut registry, now),
6092                Err(ReleaseNameError::Lease(
6093                    crate::cx::NameLeaseError::AlreadyResolved
6094                ))
6095            ),
6096            "release_name after take_lease must fail closed with AlreadyResolved",
6097        );
6098        assert_eq!(
6099            handle.handle.state.load(),
6100            ActorState::Created,
6101            "failed release_name after take_lease must not mutate actor state"
6102        );
6103        assert_eq!(
6104            registry.whereis("take_then_stop"),
6105            Some(handle.task_id()),
6106            "failed release_name after take_lease must not unregister the live name",
6107        );
6108        registry
6109            .unregister_owned_and_grant(&lease, now)
6110            .expect("manual cleanup should still be possible after failed release_name");
6111        lease.abort().unwrap();
6112
6113        crate::test_complete!(
6114            "named_server_release_name_after_take_lease_returns_already_resolved"
6115        );
6116    }
6117
6118    /// Named server: abort_lease fails closed after take_lease removed the lease.
6119    #[test]
6120    fn named_server_abort_lease_after_take_lease_returns_already_resolved() {
6121        crate::test_utils::init_test_logging();
6122        crate::test_phase!("named_server_abort_lease_after_take_lease_returns_already_resolved");
6123
6124        let budget = Budget::new().with_poll_quota(100_000);
6125        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6126        let region = runtime.state.create_root_region(budget);
6127        let cx = Cx::for_testing();
6128        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6129        let mut registry = crate::cx::NameRegistry::new();
6130
6131        #[allow(clippy::items_after_statements)]
6132        #[derive(Debug)]
6133        struct Noop4;
6134
6135        #[allow(clippy::items_after_statements)]
6136        impl GenServer for Noop4 {
6137            type Call = ();
6138            type Reply = ();
6139            type Cast = ();
6140            type Info = SystemMsg;
6141
6142            fn handle_call(
6143                &mut self,
6144                _cx: &Cx,
6145                _req: (),
6146                reply: Reply<()>,
6147            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6148                let _ = reply.send(());
6149                Box::pin(async {})
6150            }
6151
6152            fn handle_cast(
6153                &mut self,
6154                _cx: &Cx,
6155                _msg: (),
6156            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6157                Box::pin(async {})
6158            }
6159        }
6160
6161        let now = crate::types::Time::ZERO;
6162        let (mut handle, stored) = scope
6163            .spawn_named_gen_server(
6164                &mut runtime.state,
6165                &cx,
6166                &mut registry,
6167                "take_then_abort",
6168                Noop4,
6169                8,
6170                now,
6171            )
6172            .unwrap();
6173        runtime.state.store_spawned_task(handle.task_id(), stored);
6174
6175        let mut lease = handle.take_lease().expect("lease should be present");
6176        let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
6177        assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
6178        assert_eq!(
6179            registry.whereis("take_then_abort"),
6180            Some(handle.task_id()),
6181            "failed abort_lease after take_lease must not unregister the live name",
6182        );
6183
6184        registry
6185            .unregister_owned_and_grant(&lease, now)
6186            .expect("manual cleanup should still be possible after failed abort_lease");
6187        lease.abort().unwrap();
6188
6189        crate::test_complete!("named_server_abort_lease_after_take_lease_returns_already_resolved");
6190    }
6191
6192    /// Named server: abort_lease fails closed after release_name resolved the lease.
6193    #[test]
6194    fn named_server_abort_lease_after_release_name_returns_already_resolved() {
6195        crate::test_utils::init_test_logging();
6196        crate::test_phase!("named_server_abort_lease_after_release_name_returns_already_resolved");
6197
6198        let budget = Budget::new().with_poll_quota(100_000);
6199        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6200        let region = runtime.state.create_root_region(budget);
6201        let cx = Cx::for_testing();
6202        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6203        let mut registry = crate::cx::NameRegistry::new();
6204
6205        #[allow(clippy::items_after_statements)]
6206        #[derive(Debug)]
6207        struct Noop4;
6208
6209        #[allow(clippy::items_after_statements)]
6210        impl GenServer for Noop4 {
6211            type Call = ();
6212            type Reply = ();
6213            type Cast = ();
6214            type Info = SystemMsg;
6215
6216            fn handle_call(
6217                &mut self,
6218                _cx: &Cx,
6219                _req: (),
6220                reply: Reply<()>,
6221            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6222                let _ = reply.send(());
6223                Box::pin(async {})
6224            }
6225
6226            fn handle_cast(
6227                &mut self,
6228                _cx: &Cx,
6229                _msg: (),
6230            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6231                Box::pin(async {})
6232            }
6233        }
6234
6235        let now = crate::types::Time::ZERO;
6236        let (mut handle, stored) = scope
6237            .spawn_named_gen_server(
6238                &mut runtime.state,
6239                &cx,
6240                &mut registry,
6241                "stop_then_abort",
6242                Noop4,
6243                8,
6244                now,
6245            )
6246            .unwrap();
6247        runtime.state.store_spawned_task(handle.task_id(), stored);
6248
6249        handle.stop();
6250        {
6251            runtime.scheduler.lock().schedule(handle.task_id(), 0);
6252        }
6253        runtime.run_until_quiescent();
6254        let release_now = runtime.state.now;
6255        handle
6256            .release_name(&mut registry, release_now)
6257            .expect("initial release should succeed");
6258        let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
6259        assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
6260        assert!(
6261            registry.whereis("stop_then_abort").is_none(),
6262            "failed abort_lease after release_name must not mutate the registry entry",
6263        );
6264
6265        crate::test_complete!(
6266            "named_server_abort_lease_after_release_name_returns_already_resolved"
6267        );
6268    }
6269
6270    /// Named server: release_name only removes the targeted name, not every name on the task.
6271    #[test]
6272    fn named_server_release_name_preserves_other_names_on_same_task() {
6273        crate::test_utils::init_test_logging();
6274        crate::test_phase!("named_server_release_name_preserves_other_names_on_same_task");
6275
6276        let budget = Budget::new().with_poll_quota(100_000);
6277        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6278        let region = runtime.state.create_root_region(budget);
6279        let cx = Cx::for_testing();
6280        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6281        let mut registry = crate::cx::NameRegistry::new();
6282
6283        #[allow(clippy::items_after_statements)]
6284        #[derive(Debug)]
6285        struct Noop5;
6286
6287        #[allow(clippy::items_after_statements)]
6288        impl GenServer for Noop5 {
6289            type Call = ();
6290            type Reply = ();
6291            type Cast = ();
6292            type Info = SystemMsg;
6293
6294            fn handle_call(
6295                &mut self,
6296                _cx: &Cx,
6297                _req: (),
6298                reply: Reply<()>,
6299            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6300                let _ = reply.send(());
6301                Box::pin(async {})
6302            }
6303
6304            fn handle_cast(
6305                &mut self,
6306                _cx: &Cx,
6307                _msg: (),
6308            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6309                Box::pin(async {})
6310            }
6311        }
6312
6313        let now = crate::types::Time::ZERO;
6314        let (mut handle, stored) = scope
6315            .spawn_named_gen_server(
6316                &mut runtime.state,
6317                &cx,
6318                &mut registry,
6319                "primary_name",
6320                Noop5,
6321                8,
6322                now,
6323            )
6324            .unwrap();
6325        runtime.state.store_spawned_task(handle.task_id(), stored);
6326        let mut alias = registry
6327            .register("secondary_name", handle.task_id(), scope.region_id(), now)
6328            .expect("second alias should register for same task");
6329
6330        handle.stop();
6331        {
6332            runtime.scheduler.lock().schedule(handle.task_id(), 0);
6333        }
6334        runtime.run_until_quiescent();
6335
6336        let release_now = runtime.state.now;
6337        handle
6338            .release_name(&mut registry, release_now)
6339            .expect("targeted release should succeed");
6340        assert!(
6341            registry.whereis("primary_name").is_none(),
6342            "release_name must remove the targeted registry entry",
6343        );
6344        assert_eq!(
6345            registry.whereis("secondary_name"),
6346            Some(handle.task_id()),
6347            "release_name must not remove unrelated names on the same task",
6348        );
6349
6350        registry
6351            .unregister_owned_and_grant(&alias, release_now)
6352            .expect("manual alias cleanup should succeed");
6353        alias.release().unwrap();
6354
6355        crate::test_complete!("named_server_release_name_preserves_other_names_on_same_task");
6356    }
6357
6358    #[test]
6359    #[allow(clippy::items_after_statements)]
6360    fn named_start_helper_supervisor_stop_cleans_registry() {
6361        crate::test_utils::init_test_logging();
6362        crate::test_phase!("named_start_helper_supervisor_stop_cleans_registry");
6363
6364        #[derive(Debug)]
6365        struct Noop;
6366
6367        impl GenServer for Noop {
6368            type Call = ();
6369            type Reply = ();
6370            type Cast = ();
6371            type Info = SystemMsg;
6372
6373            fn handle_call(
6374                &mut self,
6375                _cx: &Cx,
6376                _request: (),
6377                reply: Reply<()>,
6378            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6379                let _ = reply.send(());
6380                Box::pin(async {})
6381            }
6382
6383            fn handle_cast(
6384                &mut self,
6385                _cx: &Cx,
6386                _msg: (),
6387            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6388                Box::pin(async {})
6389            }
6390        }
6391
6392        let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
6393            Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
6394
6395        let budget = Budget::new().with_poll_quota(100_000);
6396        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6397        let root = runtime.state.create_root_region(budget);
6398        let cx = Cx::for_testing();
6399
6400        let child = crate::supervision::ChildSpec::new(
6401            "svc_child",
6402            named_gen_server_start(Arc::clone(&registry), "svc", 16, || Noop),
6403        );
6404
6405        let compiled = crate::supervision::SupervisorBuilder::new("svc_supervisor")
6406            .child(child)
6407            .compile()
6408            .expect("compile supervisor");
6409
6410        let supervisor = compiled
6411            .spawn(&mut runtime.state, &cx, root, budget)
6412            .expect("spawn supervisor");
6413
6414        assert_eq!(supervisor.started.len(), 1, "exactly one started child");
6415        let child_task = supervisor.started[0].task_id;
6416        assert_eq!(registry.lock().whereis("svc"), Some(child_task));
6417
6418        // Stop the supervisor region and drive cancellation/finalization.
6419        let tasks_to_schedule = runtime.state.cancel_request(
6420            supervisor.region,
6421            &crate::types::CancelReason::user("stop"),
6422            None,
6423        );
6424        for (task_id, priority) in tasks_to_schedule {
6425            runtime.scheduler.lock().schedule(task_id, priority);
6426        }
6427        runtime.run_until_quiescent();
6428
6429        assert!(
6430            registry.lock().whereis("svc").is_none(),
6431            "name must be removed after supervised stop",
6432        );
6433
6434        crate::test_complete!("named_start_helper_supervisor_stop_cleans_registry");
6435    }
6436
6437    #[test]
6438    #[allow(clippy::items_after_statements)]
6439    fn named_start_helper_crash_then_stop_cleans_registry() {
6440        crate::test_utils::init_test_logging();
6441        crate::test_phase!("named_start_helper_crash_then_stop_cleans_registry");
6442
6443        #[derive(Debug)]
6444        struct PanicOnStart;
6445
6446        impl GenServer for PanicOnStart {
6447            type Call = ();
6448            type Reply = ();
6449            type Cast = ();
6450            type Info = SystemMsg;
6451
6452            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6453                Box::pin(async move {
6454                    std::panic::panic_any("intentional start crash for registry cleanup test");
6455                })
6456            }
6457
6458            fn handle_call(
6459                &mut self,
6460                _cx: &Cx,
6461                _request: (),
6462                reply: Reply<()>,
6463            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6464                let _ = reply.send(());
6465                Box::pin(async {})
6466            }
6467
6468            fn handle_cast(
6469                &mut self,
6470                _cx: &Cx,
6471                _msg: (),
6472            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6473                Box::pin(async {})
6474            }
6475        }
6476
6477        let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
6478            Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
6479
6480        let budget = Budget::new().with_poll_quota(100_000);
6481        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(7));
6482        let root = runtime.state.create_root_region(budget);
6483        let cx = Cx::for_testing();
6484
6485        let child = crate::supervision::ChildSpec::new(
6486            "panic_child",
6487            named_gen_server_start(Arc::clone(&registry), "panic_svc", 8, || PanicOnStart),
6488        );
6489
6490        let compiled = crate::supervision::SupervisorBuilder::new("panic_supervisor")
6491            .child(child)
6492            .compile()
6493            .expect("compile supervisor");
6494
6495        let supervisor = compiled
6496            .spawn(&mut runtime.state, &cx, root, budget)
6497            .expect("spawn supervisor");
6498
6499        let child_task = supervisor.started[0].task_id;
6500        assert_eq!(registry.lock().whereis("panic_svc"), Some(child_task));
6501
6502        // Drive the child once so it crashes in on_start.
6503        {
6504            runtime.scheduler.lock().schedule(child_task, 0);
6505        }
6506        runtime.run_until_idle();
6507
6508        // Region stop must still clean the registry + resolve the lease.
6509        let tasks_to_schedule = runtime.state.cancel_request(
6510            supervisor.region,
6511            &crate::types::CancelReason::user("shutdown"),
6512            None,
6513        );
6514        for (task_id, priority) in tasks_to_schedule {
6515            runtime.scheduler.lock().schedule(task_id, priority);
6516        }
6517        runtime.run_until_quiescent();
6518
6519        assert!(
6520            registry.lock().whereis("panic_svc").is_none(),
6521            "name must be removed after crash + region stop",
6522        );
6523
6524        crate::test_complete!("named_start_helper_crash_then_stop_cleans_registry");
6525    }
6526
6527    // ========================================================================
6528    // Pure data-type tests (wave 9 – CyanBarn)
6529    // ========================================================================
6530
6531    #[test]
6532    fn cast_overflow_policy_default_is_reject() {
6533        let policy = CastOverflowPolicy::default();
6534        assert!(matches!(policy, CastOverflowPolicy::Reject));
6535    }
6536
6537    #[test]
6538    fn cast_overflow_policy_debug() {
6539        let dbg = format!("{:?}", CastOverflowPolicy::Reject);
6540        assert!(dbg.contains("Reject"), "{dbg}");
6541        let dbg2 = format!("{:?}", CastOverflowPolicy::DropOldest);
6542        assert!(dbg2.contains("DropOldest"), "{dbg2}");
6543    }
6544
6545    #[test]
6546    fn cast_overflow_policy_eq_clone_copy() {
6547        let a = CastOverflowPolicy::Reject;
6548        let b = a; // Copy
6549        let c = a;
6550        assert_eq!(a, b);
6551        assert_eq!(a, c);
6552        assert_ne!(CastOverflowPolicy::Reject, CastOverflowPolicy::DropOldest);
6553    }
6554
6555    #[test]
6556    fn down_msg_constructor_and_debug() {
6557        let mut monitors = crate::monitor::MonitorSet::new();
6558        let mref = monitors.establish(tid(50), rid(0), tid(51));
6559        let msg = DownMsg::new(
6560            Time::from_secs(7),
6561            DownNotification {
6562                monitored: tid(51),
6563                reason: DownReason::Normal,
6564                monitor_ref: mref,
6565            },
6566        );
6567        assert_eq!(msg.completion_vt, Time::from_secs(7));
6568        assert_eq!(msg.notification.monitored, tid(51));
6569        let dbg = format!("{msg:?}");
6570        assert!(dbg.contains("DownMsg"), "{dbg}");
6571    }
6572
6573    #[test]
6574    fn down_msg_clone() {
6575        let mut monitors = crate::monitor::MonitorSet::new();
6576        let mref = monitors.establish(tid(60), rid(0), tid(61));
6577        let msg = DownMsg::new(
6578            Time::from_secs(8),
6579            DownNotification {
6580                monitored: tid(61),
6581                reason: DownReason::Normal,
6582                monitor_ref: mref,
6583            },
6584        );
6585        let cloned = msg.clone();
6586        assert_eq!(cloned.completion_vt, msg.completion_vt);
6587        assert_eq!(cloned.notification.monitored, msg.notification.monitored);
6588    }
6589
6590    #[test]
6591    fn exit_msg_constructor_and_eq() {
6592        let a = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
6593        let b = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
6594        assert_eq!(a, b);
6595    }
6596
6597    #[test]
6598    fn exit_msg_debug_and_clone() {
6599        let msg = ExitMsg::new(
6600            Time::from_secs(6),
6601            tid(11),
6602            DownReason::Error("oops".into()),
6603        );
6604        let dbg = format!("{msg:?}");
6605        assert!(dbg.contains("ExitMsg"), "{dbg}");
6606        let cloned = msg.clone();
6607        assert_eq!(cloned, msg);
6608    }
6609
6610    #[test]
6611    fn exit_msg_inequality() {
6612        let a = ExitMsg::new(Time::from_secs(1), tid(1), DownReason::Normal);
6613        let b = ExitMsg::new(Time::from_secs(2), tid(1), DownReason::Normal);
6614        assert_ne!(a, b);
6615    }
6616
6617    #[test]
6618    fn timeout_msg_constructor_eq_copy() {
6619        let a = TimeoutMsg::new(Time::from_secs(10), 42);
6620        let b = a; // Copy
6621        let c = a;
6622        assert_eq!(a, b);
6623        assert_eq!(a, c);
6624        assert_eq!(a.tick_vt, Time::from_secs(10));
6625        assert_eq!(a.id, 42);
6626    }
6627
6628    #[test]
6629    fn timeout_msg_debug() {
6630        let msg = TimeoutMsg::new(Time::from_secs(1), 99);
6631        let dbg = format!("{msg:?}");
6632        assert!(dbg.contains("TimeoutMsg"), "{dbg}");
6633    }
6634
6635    #[test]
6636    fn timeout_msg_inequality() {
6637        let a = TimeoutMsg::new(Time::from_secs(1), 1);
6638        let b = TimeoutMsg::new(Time::from_secs(1), 2);
6639        assert_ne!(a, b);
6640    }
6641
6642    #[test]
6643    fn system_msg_debug_all_variants() {
6644        let mut monitors = crate::monitor::MonitorSet::new();
6645        let mref = monitors.establish(tid(70), rid(0), tid(71));
6646
6647        let down = SystemMsg::Down {
6648            completion_vt: Time::from_secs(1),
6649            notification: DownNotification {
6650                monitored: tid(71),
6651                reason: DownReason::Normal,
6652                monitor_ref: mref,
6653            },
6654        };
6655        let exit = SystemMsg::Exit {
6656            exit_vt: Time::from_secs(2),
6657            from: tid(72),
6658            reason: DownReason::Normal,
6659        };
6660        let timeout = SystemMsg::Timeout {
6661            tick_vt: Time::from_secs(3),
6662            id: 55,
6663        };
6664
6665        let d = format!("{down:?}");
6666        assert!(d.contains("Down"), "{d}");
6667        let e = format!("{exit:?}");
6668        assert!(e.contains("Exit"), "{e}");
6669        let t = format!("{timeout:?}");
6670        assert!(t.contains("Timeout"), "{t}");
6671    }
6672
6673    #[test]
6674    fn system_msg_clone() {
6675        let msg = SystemMsg::Timeout {
6676            tick_vt: Time::from_secs(5),
6677            id: 7,
6678        };
6679        let cloned = msg.clone();
6680        assert_eq!(cloned.sort_key(), msg.sort_key());
6681    }
6682
6683    #[test]
6684    fn system_msg_convenience_constructors() {
6685        let mut monitors = crate::monitor::MonitorSet::new();
6686        let mref = monitors.establish(tid(80), rid(0), tid(81));
6687
6688        let down_payload = DownMsg::new(
6689            Time::from_secs(1),
6690            DownNotification {
6691                monitored: tid(81),
6692                reason: DownReason::Normal,
6693                monitor_ref: mref,
6694            },
6695        );
6696        let msg = SystemMsg::down(down_payload);
6697        assert!(matches!(msg, SystemMsg::Down { .. }));
6698
6699        let exit_payload = ExitMsg::new(Time::from_secs(2), tid(82), DownReason::Normal);
6700        let msg = SystemMsg::exit(exit_payload);
6701        assert!(matches!(msg, SystemMsg::Exit { .. }));
6702
6703        let timeout_payload = TimeoutMsg::new(Time::from_secs(3), 44);
6704        let msg = SystemMsg::timeout(timeout_payload);
6705        assert!(matches!(msg, SystemMsg::Timeout { .. }));
6706    }
6707
6708    #[test]
6709    fn system_msg_sort_key_kind_rank_ordering() {
6710        let mut monitors = crate::monitor::MonitorSet::new();
6711        let mref = monitors.establish(tid(85), rid(0), tid(86));
6712
6713        let same_vt = Time::from_secs(100);
6714        let down = SystemMsg::Down {
6715            completion_vt: same_vt,
6716            notification: DownNotification {
6717                monitored: tid(86),
6718                reason: DownReason::Normal,
6719                monitor_ref: mref,
6720            },
6721        };
6722        let exit = SystemMsg::Exit {
6723            exit_vt: same_vt,
6724            from: tid(86),
6725            reason: DownReason::Normal,
6726        };
6727        let timeout = SystemMsg::Timeout {
6728            tick_vt: same_vt,
6729            id: 1,
6730        };
6731
6732        // Down < Exit < Timeout (kind ranks 0, 1, 2)
6733        assert!(down.sort_key() < exit.sort_key());
6734        assert!(exit.sort_key() < timeout.sort_key());
6735    }
6736
6737    #[test]
6738    fn system_msg_subject_key_debug_eq_ord() {
6739        let a = SystemMsgSubjectKey::Task(tid(1));
6740        let b = SystemMsgSubjectKey::Task(tid(1));
6741        let c = SystemMsgSubjectKey::TimeoutId(1);
6742
6743        assert_eq!(a, b);
6744        assert_ne!(a, c);
6745
6746        let dbg = format!("{a:?}");
6747        assert!(dbg.contains("Task"), "{dbg}");
6748        let dbg2 = format!("{c:?}");
6749        assert!(dbg2.contains("TimeoutId"), "{dbg2}");
6750
6751        // Copy + Clone
6752        let copied = a;
6753        let cloned = a;
6754        assert_eq!(copied, cloned);
6755
6756        // Ord consistency
6757        assert!(a <= b);
6758    }
6759
6760    #[test]
6761    fn system_msg_batch_default_and_empty() {
6762        let batch = SystemMsgBatch::new();
6763        let sorted = batch.into_sorted();
6764        assert!(sorted.is_empty());
6765    }
6766
6767    #[test]
6768    fn system_msg_batch_debug() {
6769        let batch = SystemMsgBatch::new();
6770        let dbg = format!("{batch:?}");
6771        assert!(dbg.contains("SystemMsgBatch"), "{dbg}");
6772    }
6773
6774    #[test]
6775    fn system_msg_batch_single_element() {
6776        let mut batch = SystemMsgBatch::new();
6777        batch.push(SystemMsg::Timeout {
6778            tick_vt: Time::from_secs(42),
6779            id: 1,
6780        });
6781        let sorted = batch.into_sorted();
6782        assert_eq!(sorted.len(), 1);
6783        assert!(matches!(sorted[0], SystemMsg::Timeout { id: 1, .. }));
6784    }
6785
6786    #[test]
6787    fn call_error_display_server_stopped() {
6788        let err = CallError::ServerStopped;
6789        let disp = format!("{err}");
6790        assert_eq!(disp, "GenServer has stopped");
6791    }
6792
6793    #[test]
6794    fn call_error_display_no_reply() {
6795        let err = CallError::NoReply;
6796        let disp = format!("{err}");
6797        assert_eq!(disp, "GenServer did not reply");
6798    }
6799
6800    #[test]
6801    fn call_error_display_cancelled() {
6802        let reason = CancelReason::user("test cancel");
6803        let err = CallError::Cancelled(reason);
6804        let disp = format!("{err}");
6805        assert!(disp.contains("cancelled"), "{disp}");
6806    }
6807
6808    #[test]
6809    fn call_error_debug_all_variants() {
6810        let dbg1 = format!("{:?}", CallError::ServerStopped);
6811        assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6812        let dbg2 = format!("{:?}", CallError::NoReply);
6813        assert!(dbg2.contains("NoReply"), "{dbg2}");
6814        let dbg3 = format!("{:?}", CallError::Cancelled(CancelReason::user("x")));
6815        assert!(dbg3.contains("Cancelled"), "{dbg3}");
6816    }
6817
6818    #[test]
6819    fn call_error_is_std_error() {
6820        let err = CallError::ServerStopped;
6821        let _: &dyn std::error::Error = &err;
6822        // source() defaults to None
6823        assert!(std::error::Error::source(&err).is_none());
6824    }
6825
6826    #[test]
6827    fn cast_error_display_all_variants() {
6828        assert_eq!(
6829            format!("{}", CastError::ServerStopped),
6830            "GenServer has stopped"
6831        );
6832        assert_eq!(format!("{}", CastError::Full), "GenServer mailbox full");
6833        let cancelled = CastError::Cancelled(CancelReason::user("t"));
6834        let disp = format!("{cancelled}");
6835        assert!(disp.contains("cancelled"), "{disp}");
6836    }
6837
6838    #[test]
6839    fn cast_error_debug_all_variants() {
6840        let dbg1 = format!("{:?}", CastError::ServerStopped);
6841        assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6842        let dbg2 = format!("{:?}", CastError::Full);
6843        assert!(dbg2.contains("Full"), "{dbg2}");
6844    }
6845
6846    #[test]
6847    fn cast_error_is_std_error() {
6848        let err = CastError::Full;
6849        let _: &dyn std::error::Error = &err;
6850        assert!(std::error::Error::source(&err).is_none());
6851    }
6852
6853    #[test]
6854    fn info_error_display_all_variants() {
6855        assert_eq!(
6856            format!("{}", InfoError::ServerStopped),
6857            "GenServer has stopped"
6858        );
6859        assert_eq!(format!("{}", InfoError::Full), "GenServer mailbox full");
6860        let cancelled = InfoError::Cancelled(CancelReason::user("u"));
6861        let disp = format!("{cancelled}");
6862        assert!(disp.contains("cancelled"), "{disp}");
6863    }
6864
6865    #[test]
6866    fn info_error_debug_all_variants() {
6867        let dbg1 = format!("{:?}", InfoError::ServerStopped);
6868        assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6869        let dbg2 = format!("{:?}", InfoError::Full);
6870        assert!(dbg2.contains("Full"), "{dbg2}");
6871    }
6872
6873    #[test]
6874    fn info_error_is_std_error() {
6875        let err = InfoError::ServerStopped;
6876        let _: &dyn std::error::Error = &err;
6877        assert!(std::error::Error::source(&err).is_none());
6878    }
6879
6880    #[test]
6881    fn system_msg_try_from_exit_rejects_timeout() {
6882        let timeout = SystemMsg::Timeout {
6883            tick_vt: Time::from_secs(1),
6884            id: 7,
6885        };
6886        let err = ExitMsg::try_from(timeout).expect_err("timeout is not exit");
6887        assert!(matches!(err, SystemMsg::Timeout { id: 7, .. }));
6888    }
6889
6890    #[test]
6891    fn system_msg_try_from_exit_succeeds() {
6892        let exit = SystemMsg::Exit {
6893            exit_vt: Time::from_secs(3),
6894            from: tid(15),
6895            reason: DownReason::Normal,
6896        };
6897        let result = ExitMsg::try_from(exit).expect("exit conversion");
6898        assert_eq!(result.exit_vt, Time::from_secs(3));
6899        assert_eq!(result.from, tid(15));
6900    }
6901
6902    #[test]
6903    fn system_msg_try_from_timeout_rejects_exit() {
6904        let exit = SystemMsg::Exit {
6905            exit_vt: Time::from_secs(1),
6906            from: tid(1),
6907            reason: DownReason::Normal,
6908        };
6909        let err = TimeoutMsg::try_from(exit).expect_err("exit is not timeout");
6910        assert!(matches!(err, SystemMsg::Exit { .. }));
6911    }
6912}