Skip to main content

asupersync/
gen_server.rs

1//! GenServer: typed request-response and actor-adjacent message loop.
2//!
3//! GenServer extends the actor model with three message types:
4//!
5//! - **Call**: synchronous request-response. The caller blocks until the server
6//!   replies. A reply obligation is created: the server *must* reply or the
7//!   obligation is detected as leaked.
8//! - **Cast**: asynchronous fire-and-forget. The sender does not wait for a reply.
9//! - **Info**: system/out-of-band notifications (Down/Exit/Timeout), delivered
10//!   via [`GenServer::handle_info`].
11//!
12//! GenServers are region-owned, cancel-safe, and deterministic under the lab
13//! runtime. They build on the same two-phase mailbox and supervision infrastructure
14//! as plain actors.
15//!
16//! # Example
17//!
18//! ```ignore
19//! struct Counter {
20//!     count: u64,
21//! }
22//!
23//! enum Request {
24//!     Get,
25//!     Add(u64),
26//! }
27//!
28//! enum Cast {
29//!     Reset,
30//! }
31//!
32//! impl GenServer for Counter {
33//!     type Call = Request;
34//!     type Reply = u64;
35//!     type Cast = Cast;
36//!
37//!     fn handle_call(&mut self, _cx: &Cx, msg: Request, reply: Reply<u64>)
38//!         -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
39//!     {
40//!         match msg {
41//!             Request::Get => { let _ = reply.send(self.count); }
42//!             Request::Add(n) => { self.count += n; let _ = reply.send(self.count); }
43//!         }
44//!         Box::pin(async {})
45//!     }
46//!
47//!     fn handle_cast(&mut self, _cx: &Cx, msg: Cast)
48//!         -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
49//!     {
50//!         match msg {
51//!             Cast::Reset => { self.count = 0; }
52//!         }
53//!         Box::pin(async {})
54//!     }
55//! }
56//! ```
57
58use std::future::Future;
59use std::pin::Pin;
60use std::sync::atomic::{AtomicU8, Ordering};
61use std::sync::Arc;
62
63use crate::actor::{ActorId, ActorState};
64use crate::channel::mpsc;
65use crate::channel::oneshot;
66use crate::channel::session::{self, TrackedOneshotPermit};
67use crate::cx::Cx;
68use crate::monitor::{DownNotification, DownReason};
69use crate::obligation::graded::{AbortedProof, CommittedProof, SendPermit};
70use crate::runtime::{JoinError, SpawnError};
71use crate::types::{Budget, CancelReason, CxInner, Outcome, TaskId, Time};
72
73// ============================================================================
74// Lifecycle helpers (init/terminate budgets + masking) (bd-3ejoi)
75// ============================================================================
76
77/// Temporarily tightens the current task budget for an async phase.
78///
79/// Budgets in `CxInner` represent remaining budget; to avoid "refunding" budget,
80/// we restore the original budget minus any consumption that occurred while the
81/// phase budget was active.
82struct PhaseBudgetGuard {
83    inner: Arc<std::sync::RwLock<CxInner>>,
84    original_budget: Budget,
85    original_baseline: Budget,
86    phase_baseline: Budget,
87    restore_original: bool,
88}
89
90impl PhaseBudgetGuard {
91    fn enter(cx: &Cx, phase_budget: Budget, restore_original: bool) -> Self {
92        let inner = Arc::clone(&cx.inner);
93        let (original_budget, original_baseline, phase_baseline) = {
94            let mut guard = inner.write().expect("lock poisoned");
95            let original_budget = guard.budget;
96            let original_baseline = guard.budget_baseline;
97            let phase_baseline = original_budget.meet(phase_budget);
98            guard.budget = phase_baseline;
99            guard.budget_baseline = phase_baseline;
100            drop(guard);
101            (original_budget, original_baseline, phase_baseline)
102        };
103        Self {
104            inner,
105            original_budget,
106            original_baseline,
107            phase_baseline,
108            restore_original,
109        }
110    }
111}
112
113impl Drop for PhaseBudgetGuard {
114    fn drop(&mut self) {
115        if !self.restore_original {
116            return;
117        }
118
119        let Ok(mut guard) = self.inner.write() else {
120            return;
121        };
122
123        let phase_remaining = guard.budget;
124        let polls_used = self
125            .phase_baseline
126            .poll_quota
127            .saturating_sub(phase_remaining.poll_quota);
128
129        let cost_used = match (self.phase_baseline.cost_quota, phase_remaining.cost_quota) {
130            (Some(base), Some(rem)) => base.saturating_sub(rem),
131            _ => 0,
132        };
133
134        let restored_cost_quota = self
135            .original_budget
136            .cost_quota
137            .map(|orig| orig.saturating_sub(cost_used));
138
139        guard.budget = Budget {
140            deadline: self.original_budget.deadline,
141            poll_quota: self.original_budget.poll_quota.saturating_sub(polls_used),
142            cost_quota: restored_cost_quota,
143            priority: self.original_budget.priority,
144        };
145        guard.budget_baseline = self.original_baseline;
146    }
147}
148
149/// Async-friendly cancellation mask guard.
150///
151/// `Cx::masked(..)` is synchronous-only; GenServer lifecycle hooks are async.
152struct AsyncMaskGuard {
153    inner: Arc<std::sync::RwLock<CxInner>>,
154}
155
156impl AsyncMaskGuard {
157    fn enter(cx: &Cx) -> Self {
158        let inner = Arc::clone(&cx.inner);
159        {
160            let mut guard = inner.write().expect("lock poisoned");
161            crate::assert_with_log!(
162                guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
163                "mask_depth",
164                guard.mask_depth,
165                guard.mask_depth
166            );
167            guard.mask_depth += 1;
168        }
169        Self { inner }
170    }
171}
172
173impl Drop for AsyncMaskGuard {
174    fn drop(&mut self) {
175        if let Ok(mut guard) = self.inner.write() {
176            guard.mask_depth = guard.mask_depth.saturating_sub(1);
177        }
178    }
179}
180
181// ============================================================================
182// Cast overflow policy
183// ============================================================================
184
185/// Policy for handling cast sends when the GenServer mailbox is full.
186///
187/// When a bounded mailbox reaches capacity, the overflow policy determines
188/// what happens to new cast messages. Lossy policies (`DropOldest`) are
189/// trace-visible: every dropped message emits a trace event.
190///
191/// # Default
192///
193/// The default policy is `Reject`, which returns `CastError::Full` to the
194/// sender. This is the safest option and forces callers to handle backpressure
195/// explicitly.
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
197pub enum CastOverflowPolicy {
198    /// Reject the new cast when the mailbox is full.
199    ///
200    /// The sender receives `CastError::Full` and can decide what to do
201    /// (retry, drop, log, etc.). No messages are lost silently.
202    #[default]
203    Reject,
204
205    /// Drop the oldest unprocessed message to make room for the new one.
206    ///
207    /// The dropped message is traced for observability. This is useful for
208    /// "latest-value-wins" patterns (e.g., sensor readings, UI state updates)
209    /// where stale data is less valuable than fresh data.
210    DropOldest,
211}
212
213impl std::fmt::Display for CastOverflowPolicy {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Self::Reject => write!(f, "Reject"),
217            Self::DropOldest => write!(f, "DropOldest"),
218        }
219    }
220}
221
222// ============================================================================
223// System messages (bd-188ey)
224// ============================================================================
225
226/// Typed system messages delivered to a GenServer via [`GenServer::handle_info`].
227///
228/// These messages are intended to model OTP-style "out-of-band" notifications
229/// (Down/Exit/Timeout) in a cancel-correct, deterministic way.
230#[derive(Debug, Clone)]
231pub struct DownMsg {
232    /// Virtual time at which the monitored task completed (for deterministic ordering).
233    pub completion_vt: Time,
234    /// The monitor notification payload.
235    pub notification: DownNotification,
236}
237
238impl DownMsg {
239    /// Create a new down system message payload.
240    #[must_use]
241    pub const fn new(completion_vt: Time, notification: DownNotification) -> Self {
242        Self {
243            completion_vt,
244            notification,
245        }
246    }
247}
248
249/// Payload for an OTP-style `Exit` system message.
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ExitMsg {
252    /// Virtual time at which the exit was observed/emitted.
253    pub exit_vt: Time,
254    /// The task that triggered the exit.
255    pub from: TaskId,
256    /// Why it exited.
257    pub reason: DownReason,
258}
259
260impl ExitMsg {
261    /// Create a new exit system message payload.
262    #[must_use]
263    pub const fn new(exit_vt: Time, from: TaskId, reason: DownReason) -> Self {
264        Self {
265            exit_vt,
266            from,
267            reason,
268        }
269    }
270}
271
272/// Payload for a deterministic timeout tick system message.
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub struct TimeoutMsg {
275    /// Virtual time of the tick.
276    pub tick_vt: Time,
277    /// Timeout identifier (user-defined semantics).
278    pub id: u64,
279}
280
281impl TimeoutMsg {
282    /// Create a new timeout system message payload.
283    #[must_use]
284    pub const fn new(tick_vt: Time, id: u64) -> Self {
285        Self { tick_vt, id }
286    }
287}
288
289/// Typed system messages delivered to a GenServer via [`GenServer::handle_info`].
290#[derive(Debug, Clone)]
291pub enum SystemMsg {
292    /// OTP-style `Down` notification (monitor fired).
293    Down {
294        /// Virtual time at which the monitored task completed (for deterministic ordering).
295        completion_vt: Time,
296        /// The notification payload.
297        notification: DownNotification,
298    },
299
300    /// OTP-style exit signal (link propagation).
301    Exit {
302        /// Virtual time at which the exit was observed/emitted.
303        exit_vt: Time,
304        /// The task that triggered the exit.
305        from: TaskId,
306        /// Why it exited.
307        reason: DownReason,
308    },
309
310    /// A deterministic timeout tick.
311    Timeout {
312        /// Virtual time of the tick.
313        tick_vt: Time,
314        /// Timeout identifier (user-defined semantics).
315        id: u64,
316    },
317}
318
319impl SystemMsg {
320    /// Construct a down-system message.
321    #[must_use]
322    pub fn down(msg: DownMsg) -> Self {
323        msg.into()
324    }
325
326    /// Construct an exit-system message.
327    #[must_use]
328    pub fn exit(msg: ExitMsg) -> Self {
329        msg.into()
330    }
331
332    /// Construct a timeout-system message.
333    #[must_use]
334    pub fn timeout(msg: TimeoutMsg) -> Self {
335        msg.into()
336    }
337
338    const fn vt(&self) -> Time {
339        match self {
340            Self::Down { completion_vt, .. } => *completion_vt,
341            Self::Exit { exit_vt, .. } => *exit_vt,
342            Self::Timeout { tick_vt, .. } => *tick_vt,
343        }
344    }
345
346    const fn kind_rank(&self) -> u8 {
347        match self {
348            Self::Down { .. } => 0,
349            Self::Exit { .. } => 1,
350            Self::Timeout { .. } => 2,
351        }
352    }
353
354    const fn subject_key(&self) -> SystemMsgSubjectKey {
355        match self {
356            Self::Down { notification, .. } => SystemMsgSubjectKey::Task(notification.monitored),
357            Self::Exit { from, .. } => SystemMsgSubjectKey::Task(*from),
358            Self::Timeout { id, .. } => SystemMsgSubjectKey::TimeoutId(*id),
359        }
360    }
361
362    /// Deterministic ordering key for batched system-message delivery.
363    ///
364    /// Order is:
365    /// 1. virtual time (`vt`)
366    /// 2. message kind rank (`Down < Exit < Timeout`)
367    /// 3. stable subject key (`TaskId` or timeout id)
368    ///
369    /// This key underpins the app shutdown ordering contract in
370    /// `docs/spork_deterministic_ordering.md`.
371    #[must_use]
372    pub const fn sort_key(&self) -> (Time, u8, SystemMsgSubjectKey) {
373        (self.vt(), self.kind_rank(), self.subject_key())
374    }
375}
376
377impl From<DownMsg> for SystemMsg {
378    fn from(value: DownMsg) -> Self {
379        Self::Down {
380            completion_vt: value.completion_vt,
381            notification: value.notification,
382        }
383    }
384}
385
386impl From<ExitMsg> for SystemMsg {
387    fn from(value: ExitMsg) -> Self {
388        Self::Exit {
389            exit_vt: value.exit_vt,
390            from: value.from,
391            reason: value.reason,
392        }
393    }
394}
395
396impl From<TimeoutMsg> for SystemMsg {
397    fn from(value: TimeoutMsg) -> Self {
398        Self::Timeout {
399            tick_vt: value.tick_vt,
400            id: value.id,
401        }
402    }
403}
404
405impl TryFrom<SystemMsg> for DownMsg {
406    type Error = SystemMsg;
407
408    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
409        match value {
410            SystemMsg::Down {
411                completion_vt,
412                notification,
413            } => Ok(Self {
414                completion_vt,
415                notification,
416            }),
417            other => Err(other),
418        }
419    }
420}
421
422impl TryFrom<SystemMsg> for ExitMsg {
423    type Error = SystemMsg;
424
425    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
426        match value {
427            SystemMsg::Exit {
428                exit_vt,
429                from,
430                reason,
431            } => Ok(Self {
432                exit_vt,
433                from,
434                reason,
435            }),
436            other => Err(other),
437        }
438    }
439}
440
441impl TryFrom<SystemMsg> for TimeoutMsg {
442    type Error = SystemMsg;
443
444    fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
445        match value {
446            SystemMsg::Timeout { tick_vt, id } => Ok(Self { tick_vt, id }),
447            other => Err(other),
448        }
449    }
450}
451
452/// Stable subject key used by [`SystemMsg::sort_key`].
453#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
454pub enum SystemMsgSubjectKey {
455    /// Messages keyed by task identity (`Down`, `Exit`).
456    Task(TaskId),
457    /// Timeout tick keyed by timeout id.
458    TimeoutId(u64),
459}
460
461/// Batched system messages with deterministic sort for shutdown drain paths.
462///
463/// This is used when a runtime layer accumulates multiple `Down` / `Exit` /
464/// `Timeout` messages in one scheduler step and needs replay-stable delivery.
465#[derive(Debug, Default)]
466pub struct SystemMsgBatch {
467    entries: Vec<SystemMsg>,
468}
469
470impl SystemMsgBatch {
471    /// Creates an empty batch.
472    #[must_use]
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    /// Adds a message to the batch.
478    pub fn push(&mut self, msg: SystemMsg) {
479        self.entries.push(msg);
480    }
481
482    /// Consumes the batch and returns deterministically ordered messages.
483    #[must_use]
484    pub fn into_sorted(mut self) -> Vec<SystemMsg> {
485        self.entries.sort_by_key(SystemMsg::sort_key);
486        self.entries
487    }
488}
489
490/// A GenServer processes calls (request-response) and casts (fire-and-forget).
491///
492/// # Cancel Safety
493///
494/// When a GenServer is cancelled:
495/// 1. The mailbox closes (no new messages accepted)
496/// 2. Buffered messages are drained (calls receive errors, casts are processed)
497/// 3. `on_stop` runs for cleanup
498/// 4. The server state is returned via `GenServerHandle::join`
499pub trait GenServer: Send + 'static {
500    /// Request type for calls (synchronous request-response).
501    type Call: Send + 'static;
502
503    /// Reply type returned to callers.
504    type Reply: Send + 'static;
505
506    /// Message type for casts (asynchronous fire-and-forget).
507    type Cast: Send + 'static;
508
509    /// Message type for `info` (system/out-of-band notifications).
510    ///
511    /// Recommended default is [`SystemMsg`]. Servers that want their own info messages
512    /// can define an enum that contains `SystemMsg` plus app-specific variants.
513    ///
514    /// Note: associated type defaults are unstable on Rust stable; implementors
515    /// should write `type Info = SystemMsg;` if they only need system messages.
516    type Info: Send + 'static;
517
518    /// Handle a call (request-response).
519    ///
520    /// The `reply` handle **must** be consumed by calling `reply.send(value)`.
521    /// Dropping it without sending is detected as an obligation leak in lab mode.
522    fn handle_call(
523        &mut self,
524        cx: &Cx,
525        request: Self::Call,
526        reply: Reply<Self::Reply>,
527    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
528
529    /// Handle a cast (fire-and-forget).
530    ///
531    /// No reply is expected. The default implementation does nothing.
532    fn handle_cast(
533        &mut self,
534        _cx: &Cx,
535        _msg: Self::Cast,
536    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
537        Box::pin(async {})
538    }
539
540    /// Handle an info message (system/out-of-band).
541    ///
542    /// Defaults to a no-op.
543    fn handle_info(
544        &mut self,
545        _cx: &Cx,
546        _msg: Self::Info,
547    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
548        Box::pin(async {})
549    }
550
551    /// Called once when the server starts, before processing any messages.
552    fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
553        Box::pin(async {})
554    }
555
556    /// Returns the budget used for the init (`on_start`) phase.
557    ///
558    /// This budget is met with the task/region budget and applied only for the
559    /// duration of `on_start`. Budget consumption during `on_start` is preserved
560    /// when restoring the original budget for the message loop.
561    fn on_start_budget(&self) -> Budget {
562        Budget::INFINITE
563    }
564
565    /// Called once when the server stops, after the mailbox is drained.
566    fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
567        Box::pin(async {})
568    }
569
570    /// Returns the budget used for drain + stop (`on_stop`).
571    ///
572    /// The default is [`Budget::MINIMAL`] for bounded cleanup.
573    fn on_stop_budget(&self) -> Budget {
574        Budget::MINIMAL
575    }
576
577    /// Returns the overflow policy for cast messages when the mailbox is full.
578    ///
579    /// The default is [`CastOverflowPolicy::Reject`], which returns
580    /// `CastError::Full` to the sender.
581    ///
582    /// Override this to use `DropOldest` for "latest-value-wins" patterns.
583    fn cast_overflow_policy(&self) -> CastOverflowPolicy {
584        CastOverflowPolicy::Reject
585    }
586}
587
588/// Handle for sending a reply to a call.
589///
590/// This is a **linear obligation token**: it **must** be consumed by calling
591/// [`send()`](Self::send) or [`abort()`](Self::abort). Dropping without
592/// consuming triggers a panic via [`ObligationToken<SendPermit>`].
593///
594/// Backed by [`TrackedOneshotPermit`](session::TrackedOneshotPermit) from
595/// `channel::session`,
596/// making "silent reply drop" structurally impossible.
597pub struct Reply<R> {
598    cx: Cx,
599    permit: TrackedOneshotPermit<R>,
600}
601
602impl<R: Send + 'static> Reply<R> {
603    fn new(cx: &Cx, permit: TrackedOneshotPermit<R>) -> Self {
604        Self {
605            cx: cx.clone(),
606            permit,
607        }
608    }
609
610    /// Send the reply value to the caller, returning a [`CommittedProof`].
611    ///
612    /// Consumes the reply handle. If the caller has dropped (e.g., timed out),
613    /// the obligation is aborted cleanly (no panic).
614    pub fn send(self, value: R) -> ReplyOutcome {
615        match self.permit.send(value) {
616            Ok(proof) => {
617                self.cx.trace("gen_server::reply_committed");
618                ReplyOutcome::Committed(proof)
619            }
620            Err(_send_err) => {
621                // Receiver (caller) dropped — e.g., timed out. The tracked
622                // permit aborts the obligation cleanly in this case.
623                self.cx.trace("gen_server::reply_caller_gone");
624                ReplyOutcome::CallerGone
625            }
626        }
627    }
628
629    /// Explicitly abort the reply obligation without sending a value.
630    ///
631    /// Use this when the server intentionally chooses not to reply (e.g.,
632    /// delegating to another process). Returns an [`AbortedProof`].
633    #[must_use]
634    pub fn abort(self) -> AbortedProof<SendPermit> {
635        self.cx.trace("gen_server::reply_aborted");
636        self.permit.abort()
637    }
638
639    /// Check if the caller is still waiting for a reply.
640    #[must_use]
641    pub fn is_closed(&self) -> bool {
642        self.permit.is_closed()
643    }
644}
645
646impl<R> std::fmt::Debug for Reply<R> {
647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648        f.debug_struct("Reply")
649            .field("pending", &!self.permit.is_closed())
650            .finish_non_exhaustive()
651    }
652}
653
654/// Outcome of sending a reply.
655pub enum ReplyOutcome {
656    /// Reply was successfully delivered, obligation committed.
657    Committed(CommittedProof<SendPermit>),
658    /// Caller has already gone (e.g., timed out). Obligation was aborted.
659    CallerGone,
660}
661
662impl std::fmt::Debug for ReplyOutcome {
663    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
664        match self {
665            Self::Committed(_) => f.debug_tuple("Committed").finish(),
666            Self::CallerGone => write!(f, "CallerGone"),
667        }
668    }
669}
670
671// ============================================================================
672// Internal message envelope
673// ============================================================================
674
675/// Internal message type wrapping calls/casts/info.
676enum Envelope<S: GenServer> {
677    Call {
678        request: S::Call,
679        reply_permit: TrackedOneshotPermit<S::Reply>,
680    },
681    Cast {
682        msg: S::Cast,
683    },
684    Info {
685        msg: S::Info,
686    },
687}
688
689impl<S: GenServer> std::fmt::Debug for Envelope<S> {
690    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691        match self {
692            Self::Call { .. } => f.debug_struct("Envelope::Call").finish_non_exhaustive(),
693            Self::Cast { .. } => f.debug_struct("Envelope::Cast").finish_non_exhaustive(),
694            Self::Info { .. } => f.debug_struct("Envelope::Info").finish_non_exhaustive(),
695        }
696    }
697}
698
699// ============================================================================
700// GenServer cell (internal runtime state)
701// ============================================================================
702
703struct GenServerCell<S: GenServer> {
704    mailbox: mpsc::Receiver<Envelope<S>>,
705    state: Arc<GenServerStateCell>,
706    _keep_alive: mpsc::Sender<Envelope<S>>,
707}
708
709#[derive(Debug)]
710struct GenServerStateCell {
711    state: AtomicU8,
712}
713
714impl GenServerStateCell {
715    fn new(state: ActorState) -> Self {
716        Self {
717            state: AtomicU8::new(encode_actor_state(state)),
718        }
719    }
720
721    fn load(&self) -> ActorState {
722        decode_actor_state(self.state.load(Ordering::Acquire))
723    }
724
725    fn store(&self, state: ActorState) {
726        self.state
727            .store(encode_actor_state(state), Ordering::Release);
728    }
729}
730
731const fn encode_actor_state(state: ActorState) -> u8 {
732    match state {
733        ActorState::Created => 0,
734        ActorState::Running => 1,
735        ActorState::Stopping => 2,
736        ActorState::Stopped => 3,
737    }
738}
739
740const fn decode_actor_state(value: u8) -> ActorState {
741    match value {
742        0 => ActorState::Created,
743        1 => ActorState::Running,
744        2 => ActorState::Stopping,
745        _ => ActorState::Stopped,
746    }
747}
748
749// ============================================================================
750// GenServerHandle: external handle for calls and casts
751// ============================================================================
752
753/// Handle to a running GenServer.
754///
755/// Provides typed `call()` and `cast()` methods. The handle owns a sender to
756/// the server's mailbox and a oneshot receiver for join.
757#[derive(Debug)]
758pub struct GenServerHandle<S: GenServer> {
759    actor_id: ActorId,
760    sender: mpsc::Sender<Envelope<S>>,
761    state: Arc<GenServerStateCell>,
762    task_id: TaskId,
763    receiver: oneshot::Receiver<Result<S, JoinError>>,
764    inner: std::sync::Weak<std::sync::RwLock<CxInner>>,
765    overflow_policy: CastOverflowPolicy,
766}
767
768/// Error returned when a call fails.
769#[derive(Debug)]
770pub enum CallError {
771    /// The server has stopped (mailbox disconnected).
772    ServerStopped,
773    /// The server did not reply (oneshot dropped).
774    NoReply,
775    /// The call was cancelled.
776    Cancelled(CancelReason),
777}
778
779impl std::fmt::Display for CallError {
780    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781        match self {
782            Self::ServerStopped => write!(f, "GenServer has stopped"),
783            Self::NoReply => write!(f, "GenServer did not reply"),
784            Self::Cancelled(reason) => write!(f, "GenServer call cancelled: {reason}"),
785        }
786    }
787}
788
789impl std::error::Error for CallError {}
790
791/// Error returned when a cast fails.
792#[derive(Debug)]
793pub enum CastError {
794    /// The server has stopped (mailbox disconnected).
795    ServerStopped,
796    /// The mailbox is full.
797    Full,
798    /// The cast was cancelled.
799    Cancelled(CancelReason),
800}
801
802impl std::fmt::Display for CastError {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        match self {
805            Self::ServerStopped => write!(f, "GenServer has stopped"),
806            Self::Full => write!(f, "GenServer mailbox full"),
807            Self::Cancelled(reason) => write!(f, "GenServer cast cancelled: {reason}"),
808        }
809    }
810}
811
812impl std::error::Error for CastError {}
813
814/// Error returned when sending an info message fails.
815#[derive(Debug)]
816pub enum InfoError {
817    /// The server has stopped (mailbox disconnected).
818    ServerStopped,
819    /// The mailbox is full.
820    Full,
821    /// The send was cancelled.
822    Cancelled(CancelReason),
823}
824
825impl std::fmt::Display for InfoError {
826    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827        match self {
828            Self::ServerStopped => write!(f, "GenServer has stopped"),
829            Self::Full => write!(f, "GenServer mailbox full"),
830            Self::Cancelled(reason) => write!(f, "GenServer info cancelled: {reason}"),
831        }
832    }
833}
834
835impl std::error::Error for InfoError {}
836
837impl<S: GenServer> GenServerHandle<S> {
838    /// Send a call (request-response) to the server.
839    ///
840    /// Blocks until the server replies or the server stops. The reply channel
841    /// uses obligation-tracked oneshot from `channel::session`, ensuring that
842    /// if the server drops the reply without sending, the obligation token
843    /// panics rather than silently losing the reply.
844    pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
845        if cx.checkpoint().is_err() {
846            cx.trace("gen_server::call_rejected_cancelled");
847            let reason = cx
848                .cancel_reason()
849                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
850            return Err(CallError::Cancelled(reason));
851        }
852
853        if matches!(
854            self.state.load(),
855            ActorState::Stopping | ActorState::Stopped
856        ) {
857            cx.trace("gen_server::call_rejected_stopped");
858            return Err(CallError::ServerStopped);
859        }
860
861        let (reply_tx, reply_rx) = session::tracked_oneshot::<S::Reply>();
862        let reply_permit = reply_tx.reserve(cx);
863        let envelope = Envelope::Call {
864            request,
865            reply_permit,
866        };
867
868        if let Err(e) = self.sender.send(cx, envelope).await {
869            // If the envelope couldn't be enqueued, we must abort the reply
870            // obligation to avoid an obligation-token leak.
871            let (envelope, was_cancelled) = match e {
872                mpsc::SendError::Cancelled(v) => (v, true),
873                mpsc::SendError::Disconnected(v) | mpsc::SendError::Full(v) => (v, false),
874            };
875            if let Envelope::Call { reply_permit, .. } = envelope {
876                let _aborted = reply_permit.abort();
877            }
878            if was_cancelled {
879                cx.trace("gen_server::call_send_cancelled");
880                let reason = cx
881                    .cancel_reason()
882                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
883                return Err(CallError::Cancelled(reason));
884            }
885            cx.trace("gen_server::call_send_failed");
886            return Err(CallError::ServerStopped);
887        }
888
889        cx.trace("gen_server::call_enqueued");
890
891        match reply_rx.recv(cx).await {
892            Ok(v) => Ok(v),
893            Err(oneshot::RecvError::Closed) => {
894                cx.trace("gen_server::call_no_reply");
895                Err(CallError::NoReply)
896            }
897            Err(oneshot::RecvError::Cancelled) => {
898                cx.trace("gen_server::call_reply_cancelled");
899                let reason = cx
900                    .cancel_reason()
901                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
902                Err(CallError::Cancelled(reason))
903            }
904        }
905    }
906
907    /// Send a cast (fire-and-forget) to the server.
908    pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
909        if cx.checkpoint().is_err() {
910            cx.trace("gen_server::cast_rejected_cancelled");
911            let reason = cx
912                .cancel_reason()
913                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
914            return Err(CastError::Cancelled(reason));
915        }
916
917        if matches!(
918            self.state.load(),
919            ActorState::Stopping | ActorState::Stopped
920        ) {
921            cx.trace("gen_server::cast_rejected_stopped");
922            return Err(CastError::ServerStopped);
923        }
924        let envelope = Envelope::Cast { msg };
925        self.sender.send(cx, envelope).await.map_err(|e| match e {
926            mpsc::SendError::Cancelled(_) => {
927                cx.trace("gen_server::cast_send_cancelled");
928                let reason = cx
929                    .cancel_reason()
930                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
931                CastError::Cancelled(reason)
932            }
933            mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
934                cx.trace("gen_server::cast_send_failed");
935                CastError::ServerStopped
936            }
937        })
938    }
939
940    /// Try to send a cast without blocking.
941    ///
942    /// Applies the server's [`CastOverflowPolicy`] when the mailbox is full:
943    /// - `Reject`: returns `CastError::Full`
944    /// - `DropOldest`: evicts the oldest message and enqueues the new one
945    pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
946        if matches!(
947            self.state.load(),
948            ActorState::Stopping | ActorState::Stopped
949        ) {
950            return Err(CastError::ServerStopped);
951        }
952        let envelope = Envelope::Cast { msg };
953        match self.overflow_policy {
954            CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
955                mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
956                    CastError::ServerStopped
957                }
958                mpsc::SendError::Full(_) => CastError::Full,
959            }),
960            CastOverflowPolicy::DropOldest => {
961                match self.sender.send_evict_oldest(envelope) {
962                    Ok(Some(evicted)) => {
963                        // If the evicted message was a Call, abort the reply
964                        // obligation to prevent an obligation-token-leak panic.
965                        if let Envelope::Call { reply_permit, .. } = evicted {
966                            let _aborted = reply_permit.abort();
967                        }
968                        // Trace the eviction so lossy drops are observable.
969                        if let Some(cx) = Cx::current() {
970                            cx.trace("gen_server::cast_evicted_oldest");
971                        }
972                        Ok(())
973                    }
974                    Ok(None) => Ok(()),
975                    Err(mpsc::SendError::Disconnected(_)) => Err(CastError::ServerStopped),
976                    Err(mpsc::SendError::Full(_) | mpsc::SendError::Cancelled(_)) => {
977                        unreachable!("send_evict_oldest never returns Full or Cancelled")
978                    }
979                }
980            }
981        }
982    }
983
984    /// Send an info message (system/out-of-band) to the server.
985    pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
986        if cx.checkpoint().is_err() {
987            let reason = cx
988                .cancel_reason()
989                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
990            return Err(InfoError::Cancelled(reason));
991        }
992
993        if matches!(
994            self.state.load(),
995            ActorState::Stopping | ActorState::Stopped
996        ) {
997            return Err(InfoError::ServerStopped);
998        }
999
1000        let envelope = Envelope::Info { msg };
1001        self.sender.send(cx, envelope).await.map_err(|e| match e {
1002            mpsc::SendError::Cancelled(_) => {
1003                let reason = cx
1004                    .cancel_reason()
1005                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1006                InfoError::Cancelled(reason)
1007            }
1008            mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1009            mpsc::SendError::Full(_) => InfoError::Full,
1010        })
1011    }
1012
1013    /// Try to send an info message without blocking.
1014    pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1015        if matches!(
1016            self.state.load(),
1017            ActorState::Stopping | ActorState::Stopped
1018        ) {
1019            return Err(InfoError::ServerStopped);
1020        }
1021
1022        let envelope = Envelope::Info { msg };
1023        self.sender.try_send(envelope).map_err(|e| match e {
1024            mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1025                InfoError::ServerStopped
1026            }
1027            mpsc::SendError::Full(_) => InfoError::Full,
1028        })
1029    }
1030
1031    /// Returns the server's overflow policy for cast messages.
1032    #[must_use]
1033    pub fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1034        self.overflow_policy
1035    }
1036
1037    /// Returns the server's actor ID.
1038    #[must_use]
1039    pub const fn actor_id(&self) -> ActorId {
1040        self.actor_id
1041    }
1042
1043    /// Returns the server's task ID.
1044    #[must_use]
1045    pub fn task_id(&self) -> TaskId {
1046        self.task_id
1047    }
1048
1049    /// Returns true if the server has finished.
1050    #[must_use]
1051    pub fn is_finished(&self) -> bool {
1052        self.receiver.is_ready()
1053    }
1054
1055    /// Signals the server to stop gracefully.
1056    pub fn stop(&self) {
1057        self.state.store(ActorState::Stopping);
1058        if let Some(inner) = self.inner.upgrade() {
1059            if let Ok(mut guard) = inner.write() {
1060                guard.cancel_requested = true;
1061            }
1062        }
1063        // Ensure a server blocked in `mailbox.recv()` is woken so it can observe
1064        // the cancellation request and run drain/on_stop deterministically.
1065        self.sender.wake_receiver();
1066    }
1067
1068    /// Wait for the server to finish and return its final state.
1069    pub async fn join(&self, cx: &Cx) -> Result<S, JoinError> {
1070        self.receiver.recv(cx).await.unwrap_or_else(|_| {
1071            let reason = cx
1072                .cancel_reason()
1073                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1074            Err(JoinError::Cancelled(reason))
1075        })
1076    }
1077}
1078
1079/// A lightweight, clonable reference for casting to a GenServer.
1080///
1081/// Supports `call()` and `cast()`; it does not support `join()` (use
1082/// [`GenServerHandle`] for waiting on the final server state).
1083#[derive(Debug)]
1084pub struct GenServerRef<S: GenServer> {
1085    actor_id: ActorId,
1086    sender: mpsc::Sender<Envelope<S>>,
1087    state: Arc<GenServerStateCell>,
1088    overflow_policy: CastOverflowPolicy,
1089}
1090
1091impl<S: GenServer> Clone for GenServerRef<S> {
1092    fn clone(&self) -> Self {
1093        Self {
1094            actor_id: self.actor_id,
1095            sender: self.sender.clone(),
1096            state: Arc::clone(&self.state),
1097            overflow_policy: self.overflow_policy,
1098        }
1099    }
1100}
1101
1102impl<S: GenServer> GenServerRef<S> {
1103    /// Returns the configured cast overflow policy for this server.
1104    #[must_use]
1105    pub const fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1106        self.overflow_policy
1107    }
1108
1109    /// Send a call to the server.
1110    pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
1111        if cx.checkpoint().is_err() {
1112            cx.trace("gen_server::call_rejected_cancelled");
1113            let reason = cx
1114                .cancel_reason()
1115                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1116            return Err(CallError::Cancelled(reason));
1117        }
1118
1119        if matches!(
1120            self.state.load(),
1121            ActorState::Stopping | ActorState::Stopped
1122        ) {
1123            cx.trace("gen_server::call_rejected_stopped");
1124            return Err(CallError::ServerStopped);
1125        }
1126
1127        let (reply_tx, reply_rx) = session::tracked_oneshot::<S::Reply>();
1128        let reply_permit = reply_tx.reserve(cx);
1129        let envelope = Envelope::Call {
1130            request,
1131            reply_permit,
1132        };
1133
1134        if let Err(e) = self.sender.send(cx, envelope).await {
1135            let (envelope, was_cancelled) = match e {
1136                mpsc::SendError::Cancelled(v) => (v, true),
1137                mpsc::SendError::Disconnected(v) | mpsc::SendError::Full(v) => (v, false),
1138            };
1139            if let Envelope::Call { reply_permit, .. } = envelope {
1140                let _aborted = reply_permit.abort();
1141            }
1142            if was_cancelled {
1143                cx.trace("gen_server::call_send_cancelled");
1144                let reason = cx
1145                    .cancel_reason()
1146                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1147                return Err(CallError::Cancelled(reason));
1148            }
1149            cx.trace("gen_server::call_send_failed");
1150            return Err(CallError::ServerStopped);
1151        }
1152
1153        cx.trace("gen_server::call_enqueued");
1154
1155        match reply_rx.recv(cx).await {
1156            Ok(v) => Ok(v),
1157            Err(oneshot::RecvError::Closed) => {
1158                cx.trace("gen_server::call_no_reply");
1159                Err(CallError::NoReply)
1160            }
1161            Err(oneshot::RecvError::Cancelled) => {
1162                cx.trace("gen_server::call_reply_cancelled");
1163                let reason = cx
1164                    .cancel_reason()
1165                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1166                Err(CallError::Cancelled(reason))
1167            }
1168        }
1169    }
1170
1171    /// Send a cast to the server.
1172    pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
1173        if cx.checkpoint().is_err() {
1174            cx.trace("gen_server::cast_rejected_cancelled");
1175            let reason = cx
1176                .cancel_reason()
1177                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1178            return Err(CastError::Cancelled(reason));
1179        }
1180
1181        if matches!(
1182            self.state.load(),
1183            ActorState::Stopping | ActorState::Stopped
1184        ) {
1185            cx.trace("gen_server::cast_rejected_stopped");
1186            return Err(CastError::ServerStopped);
1187        }
1188        let envelope = Envelope::Cast { msg };
1189        self.sender.send(cx, envelope).await.map_err(|e| match e {
1190            mpsc::SendError::Cancelled(_) => {
1191                cx.trace("gen_server::cast_send_cancelled");
1192                let reason = cx
1193                    .cancel_reason()
1194                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1195                CastError::Cancelled(reason)
1196            }
1197            mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
1198                cx.trace("gen_server::cast_send_failed");
1199                CastError::ServerStopped
1200            }
1201        })
1202    }
1203
1204    /// Try to send a cast without blocking.
1205    ///
1206    /// Applies the server's [`CastOverflowPolicy`] when the mailbox is full.
1207    pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
1208        if matches!(
1209            self.state.load(),
1210            ActorState::Stopping | ActorState::Stopped
1211        ) {
1212            return Err(CastError::ServerStopped);
1213        }
1214        let envelope = Envelope::Cast { msg };
1215        match self.overflow_policy {
1216            CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
1217                mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1218                    CastError::ServerStopped
1219                }
1220                mpsc::SendError::Full(_) => CastError::Full,
1221            }),
1222            CastOverflowPolicy::DropOldest => match self.sender.send_evict_oldest(envelope) {
1223                Ok(Some(evicted)) => {
1224                    // If the evicted message was a Call, abort the reply
1225                    // obligation to prevent an obligation-token-leak panic.
1226                    if let Envelope::Call { reply_permit, .. } = evicted {
1227                        let _aborted = reply_permit.abort();
1228                    }
1229                    if let Some(cx) = Cx::current() {
1230                        cx.trace("gen_server::cast_evicted_oldest");
1231                    }
1232                    Ok(())
1233                }
1234                Ok(None) => Ok(()),
1235                Err(mpsc::SendError::Disconnected(_)) => Err(CastError::ServerStopped),
1236                Err(mpsc::SendError::Full(_) | mpsc::SendError::Cancelled(_)) => {
1237                    unreachable!("send_evict_oldest never returns Full or Cancelled")
1238                }
1239            },
1240        }
1241    }
1242
1243    /// Send an info message (system/out-of-band) to the server.
1244    pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1245        if cx.checkpoint().is_err() {
1246            let reason = cx
1247                .cancel_reason()
1248                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1249            return Err(InfoError::Cancelled(reason));
1250        }
1251
1252        if matches!(
1253            self.state.load(),
1254            ActorState::Stopping | ActorState::Stopped
1255        ) {
1256            return Err(InfoError::ServerStopped);
1257        }
1258
1259        let envelope = Envelope::Info { msg };
1260        self.sender.send(cx, envelope).await.map_err(|e| match e {
1261            mpsc::SendError::Cancelled(_) => {
1262                let reason = cx
1263                    .cancel_reason()
1264                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1265                InfoError::Cancelled(reason)
1266            }
1267            mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1268            mpsc::SendError::Full(_) => InfoError::Full,
1269        })
1270    }
1271
1272    /// Try to send an info message without blocking.
1273    pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1274        if matches!(
1275            self.state.load(),
1276            ActorState::Stopping | ActorState::Stopped
1277        ) {
1278            return Err(InfoError::ServerStopped);
1279        }
1280
1281        let envelope = Envelope::Info { msg };
1282        self.sender.try_send(envelope).map_err(|e| match e {
1283            mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1284                InfoError::ServerStopped
1285            }
1286            mpsc::SendError::Full(_) => InfoError::Full,
1287        })
1288    }
1289
1290    /// Returns true if the server has stopped.
1291    #[must_use]
1292    pub fn is_closed(&self) -> bool {
1293        self.sender.is_closed()
1294    }
1295
1296    /// Returns true if the server is still alive.
1297    #[must_use]
1298    pub fn is_alive(&self) -> bool {
1299        self.state.load() != ActorState::Stopped
1300    }
1301
1302    /// Returns the server's actor ID.
1303    #[must_use]
1304    pub const fn actor_id(&self) -> ActorId {
1305        self.actor_id
1306    }
1307}
1308
1309impl<S: GenServer> GenServerHandle<S> {
1310    /// Returns a lightweight, clonable reference for casting.
1311    #[must_use]
1312    pub fn server_ref(&self) -> GenServerRef<S> {
1313        GenServerRef {
1314            actor_id: self.actor_id,
1315            sender: self.sender.clone(),
1316            state: Arc::clone(&self.state),
1317            overflow_policy: self.overflow_policy,
1318        }
1319    }
1320}
1321
1322// ============================================================================
1323// GenServer runtime loop
1324// ============================================================================
1325
1326/// Default mailbox capacity for GenServers.
1327pub const DEFAULT_GENSERVER_MAILBOX_CAPACITY: usize = 64;
1328
1329/// Runs the GenServer message loop.
1330async fn run_gen_server_loop<S: GenServer>(mut server: S, cx: Cx, cell: &GenServerCell<S>) -> S {
1331    use crate::tracing_compat::debug;
1332
1333    cell.state.store(ActorState::Running);
1334
1335    // Phase 1: Initialization
1336    if cx.is_cancel_requested() {
1337        cx.trace("gen_server::init_skipped_cancelled");
1338    } else {
1339        cx.trace("gen_server::init");
1340        let _budget = PhaseBudgetGuard::enter(&cx, server.on_start_budget(), true);
1341        server.on_start(&cx).await;
1342    }
1343
1344    // Phase 2: Message loop
1345    loop {
1346        if cx.is_cancel_requested() {
1347            cx.trace("gen_server::cancel_requested");
1348            break;
1349        }
1350
1351        match cell.mailbox.recv(&cx).await {
1352            Ok(envelope) => {
1353                dispatch_envelope(&mut server, &cx, envelope).await;
1354            }
1355            Err(crate::channel::mpsc::RecvError::Disconnected) => {
1356                cx.trace("gen_server::mailbox_disconnected");
1357                break;
1358            }
1359            Err(crate::channel::mpsc::RecvError::Cancelled) => {
1360                cx.trace("gen_server::recv_cancelled");
1361                break;
1362            }
1363            Err(crate::channel::mpsc::RecvError::Empty) => {
1364                break;
1365            }
1366        }
1367    }
1368
1369    cell.state.store(ActorState::Stopping);
1370
1371    // Phase 3+4: Drain + stop hook.
1372    //
1373    // Drain+on_stop are cleanup phases. We:
1374    // - tighten budget to a bounded stop budget
1375    // - mask cancellation so cleanup can run deterministically
1376    let _budget = PhaseBudgetGuard::enter(&cx, server.on_stop_budget(), false);
1377    let _mask = AsyncMaskGuard::enter(&cx);
1378
1379    // Phase 3: Drain remaining messages.
1380    // Calls during drain: reply with error (caller should not depend on drain).
1381    // Casts during drain: process normally.
1382    let drain_limit = cell.mailbox.capacity() as u64;
1383    let mut drained: u64 = 0;
1384    while let Ok(envelope) = cell.mailbox.try_recv() {
1385        match envelope {
1386            Envelope::Call {
1387                request: _,
1388                reply_permit,
1389            } => {
1390                let _aborted: AbortedProof<SendPermit> = reply_permit.abort();
1391                cx.trace("gen_server::drain_abort_call");
1392            }
1393            Envelope::Cast { msg } => {
1394                server.handle_cast(&cx, msg).await;
1395            }
1396            Envelope::Info { msg } => {
1397                server.handle_info(&cx, msg).await;
1398            }
1399        }
1400        drained += 1;
1401        if drained >= drain_limit {
1402            break;
1403        }
1404    }
1405    if drained > 0 {
1406        debug!(drained = drained, "gen_server::mailbox_drained");
1407        cx.trace("gen_server::mailbox_drained");
1408    }
1409
1410    // Phase 4: Cleanup
1411    cx.trace("gen_server::terminate");
1412    server.on_stop(&cx).await;
1413
1414    server
1415}
1416
1417/// Dispatch a single envelope to the appropriate handler.
1418async fn dispatch_envelope<S: GenServer>(server: &mut S, cx: &Cx, envelope: Envelope<S>) {
1419    match envelope {
1420        Envelope::Call {
1421            request,
1422            reply_permit,
1423        } => {
1424            let reply = Reply::new(cx, reply_permit);
1425            server.handle_call(cx, request, reply).await;
1426        }
1427        Envelope::Cast { msg } => {
1428            server.handle_cast(cx, msg).await;
1429        }
1430        Envelope::Info { msg } => {
1431            server.handle_info(cx, msg).await;
1432        }
1433    }
1434}
1435
1436// ============================================================================
1437// Spawn integration
1438// ============================================================================
1439
1440impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
1441    /// Spawns a new GenServer in this scope.
1442    ///
1443    /// The server runs as a region-owned task. Calls and casts are delivered
1444    /// through a bounded MPSC channel with two-phase send semantics.
1445    pub fn spawn_gen_server<S: GenServer>(
1446        &self,
1447        state: &mut crate::runtime::state::RuntimeState,
1448        cx: &Cx,
1449        server: S,
1450        mailbox_capacity: usize,
1451    ) -> Result<(GenServerHandle<S>, crate::runtime::stored_task::StoredTask), SpawnError> {
1452        use crate::cx::scope::CatchUnwind;
1453        use crate::runtime::stored_task::StoredTask;
1454        use crate::tracing_compat::{debug, debug_span};
1455
1456        let overflow_policy = server.cast_overflow_policy();
1457        let (msg_tx, msg_rx) = mpsc::channel::<Envelope<S>>(mailbox_capacity);
1458        let (result_tx, result_rx) = oneshot::channel::<Result<S, JoinError>>();
1459        let task_id = self.create_task_record(state)?;
1460        let actor_id = ActorId::from_task(task_id);
1461        let server_state = Arc::new(GenServerStateCell::new(ActorState::Created));
1462
1463        let _span = debug_span!(
1464            "gen_server_spawn",
1465            task_id = ?task_id,
1466            region_id = ?self.region_id(),
1467            mailbox_capacity = mailbox_capacity,
1468        )
1469        .entered();
1470        debug!(
1471            task_id = ?task_id,
1472            region_id = ?self.region_id(),
1473            mailbox_capacity = mailbox_capacity,
1474            "gen_server spawned"
1475        );
1476
1477        let child_observability = cx.child_observability(self.region_id(), task_id);
1478        let child_entropy = cx.child_entropy(task_id);
1479        let io_driver = state.io_driver_handle();
1480        let child_cx = Cx::new_with_observability(
1481            self.region_id(),
1482            task_id,
1483            self.budget(),
1484            Some(child_observability),
1485            io_driver,
1486            Some(child_entropy),
1487        )
1488        .with_blocking_pool_handle(cx.blocking_pool_handle());
1489
1490        if let Some(record) = state.task_mut(task_id) {
1491            record.set_cx_inner(child_cx.inner.clone());
1492            record.set_cx(child_cx.clone());
1493        }
1494
1495        let cx_for_send = child_cx.clone();
1496        let inner_weak = Arc::downgrade(&child_cx.inner);
1497        let state_for_task = Arc::clone(&server_state);
1498
1499        let cell = GenServerCell {
1500            mailbox: msg_rx,
1501            state: Arc::clone(&server_state),
1502            _keep_alive: msg_tx.clone(),
1503        };
1504
1505        let wrapped = async move {
1506            let result = CatchUnwind(Box::pin(run_gen_server_loop(server, child_cx, &cell))).await;
1507            match result {
1508                Ok(server_final) => {
1509                    let _ = result_tx.send(&cx_for_send, Ok(server_final));
1510                }
1511                Err(payload) => {
1512                    let msg = crate::cx::scope::payload_to_string(&payload);
1513                    let _ = result_tx.send(
1514                        &cx_for_send,
1515                        Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
1516                    );
1517                }
1518            }
1519            state_for_task.store(ActorState::Stopped);
1520            Outcome::Ok(())
1521        };
1522
1523        let stored = StoredTask::new_with_id(wrapped, task_id);
1524
1525        let handle = GenServerHandle {
1526            actor_id,
1527            sender: msg_tx,
1528            state: server_state,
1529            task_id,
1530            receiver: result_rx,
1531            inner: inner_weak,
1532            overflow_policy,
1533        };
1534
1535        Ok((handle, stored))
1536    }
1537
1538    /// Spawns a named GenServer in this scope, registering it in the given
1539    /// [`NameRegistry`].
1540    ///
1541    /// This combines [`spawn_gen_server`](Self::spawn_gen_server) with
1542    /// [`NameRegistry::register`] into a single atomic operation: the name is
1543    /// acquired *after* the server task is created but *before* it starts
1544    /// processing messages.
1545    ///
1546    /// On success, the returned [`NamedGenServerHandle`] holds both the server
1547    /// handle and the name lease. The lease is released when the handle is
1548    /// stopped via [`NamedGenServerHandle::stop_and_release`] or aborted via
1549    /// [`NamedGenServerHandle::abort_lease`].
1550    ///
1551    /// # Errors
1552    ///
1553    /// Returns [`NamedSpawnError::Spawn`] if the underlying task spawn fails,
1554    /// or [`NamedSpawnError::NameTaken`] if the name is already registered.
1555    /// In the name-taken case, the server task is *not* spawned (it is
1556    /// abandoned before being stored).
1557    #[allow(clippy::too_many_arguments)]
1558    pub fn spawn_named_gen_server<S: GenServer>(
1559        &self,
1560        state: &mut crate::runtime::state::RuntimeState,
1561        cx: &Cx,
1562        registry: &mut crate::cx::NameRegistry,
1563        name: impl Into<String>,
1564        server: S,
1565        mailbox_capacity: usize,
1566        now: crate::types::Time,
1567    ) -> Result<
1568        (
1569            NamedGenServerHandle<S>,
1570            crate::runtime::stored_task::StoredTask,
1571        ),
1572        NamedSpawnError,
1573    > {
1574        let name = name.into();
1575
1576        // Phase 1: Spawn the server (creates task record + handle).
1577        let (handle, stored) = self
1578            .spawn_gen_server(state, cx, server, mailbox_capacity)
1579            .map_err(NamedSpawnError::Spawn)?;
1580
1581        // Phase 2: Register the name under the new task's ID.
1582        let task_id = handle.task_id();
1583        let region = self.region_id();
1584
1585        match registry.register(name, task_id, region, now) {
1586            Ok(lease) => {
1587                let named = NamedGenServerHandle {
1588                    handle,
1589                    lease: Some(lease),
1590                };
1591                Ok((named, stored))
1592            }
1593            Err(e) => {
1594                // Registration failed: clean up the task record that was created
1595                // by spawn_gen_server to prevent a region quiescence leak.
1596                // Without this cleanup, the region would have a child task that
1597                // is never scheduled and can never complete, blocking region close.
1598                let task_id = handle.task_id();
1599                if let Some(region_record) = state.region(self.region_id()) {
1600                    region_record.remove_task(task_id);
1601                }
1602                state.remove_task(task_id);
1603                Err(NamedSpawnError::NameTaken(e))
1604            }
1605        }
1606    }
1607}
1608
1609// ============================================================================
1610// Named GenServer handle
1611// ============================================================================
1612
1613/// Error from [`Scope::spawn_named_gen_server`].
1614#[derive(Debug)]
1615pub enum NamedSpawnError {
1616    /// The underlying task spawn failed.
1617    Spawn(SpawnError),
1618    /// The name was already taken in the registry.
1619    NameTaken(crate::cx::NameLeaseError),
1620}
1621
1622impl std::fmt::Display for NamedSpawnError {
1623    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1624        match self {
1625            Self::Spawn(e) => write!(f, "named server spawn failed: {e}"),
1626            Self::NameTaken(e) => write!(f, "named server registration failed: {e}"),
1627        }
1628    }
1629}
1630
1631impl std::error::Error for NamedSpawnError {}
1632
1633/// Handle to a running **named** GenServer.
1634///
1635/// Wraps a [`GenServerHandle`] together with a [`NameLease`] from the
1636/// registry. The lease is an obligation (drop bomb): callers must resolve it
1637/// by calling [`stop_and_release`](Self::stop_and_release) or
1638/// [`abort_lease`](Self::abort_lease) before dropping.
1639///
1640/// All `call`, `cast`, `info` methods delegate to the inner handle.
1641#[derive(Debug)]
1642pub struct NamedGenServerHandle<S: GenServer> {
1643    handle: GenServerHandle<S>,
1644    lease: Option<crate::cx::NameLease>,
1645}
1646
1647impl<S: GenServer> NamedGenServerHandle<S> {
1648    /// The registered name of this server.
1649    #[must_use]
1650    pub fn name(&self) -> &str {
1651        self.lease
1652            .as_ref()
1653            .map_or("(released)", crate::cx::NameLease::name)
1654    }
1655
1656    /// The underlying task ID.
1657    #[must_use]
1658    pub fn task_id(&self) -> TaskId {
1659        self.handle.task_id()
1660    }
1661
1662    /// The actor ID of this server.
1663    #[must_use]
1664    pub fn actor_id(&self) -> ActorId {
1665        self.handle.actor_id()
1666    }
1667
1668    /// Whether the server has finished execution.
1669    #[must_use]
1670    pub fn is_finished(&self) -> bool {
1671        self.handle.is_finished()
1672    }
1673
1674    /// Create a lightweight server reference for sending messages.
1675    #[must_use]
1676    pub fn server_ref(&self) -> GenServerRef<S> {
1677        self.handle.server_ref()
1678    }
1679
1680    /// Access the inner (unnamed) handle.
1681    #[must_use]
1682    pub fn inner(&self) -> &GenServerHandle<S> {
1683        &self.handle
1684    }
1685
1686    /// Stop the server and release the name lease (commit).
1687    ///
1688    /// This is the normal shutdown path: the name becomes available for
1689    /// re-registration after this call.
1690    ///
1691    /// # Panics
1692    ///
1693    /// Panics if the lease was already resolved (double release/abort).
1694    pub fn stop_and_release(&mut self) -> Result<(), crate::cx::NameLeaseError> {
1695        self.handle.stop();
1696        self.lease
1697            .as_mut()
1698            .expect("lease already resolved")
1699            .release()
1700            .map(|_proof| ())
1701    }
1702
1703    /// Abort the name lease without stopping the server.
1704    ///
1705    /// Use this for cancellation / error paths where the name registration
1706    /// itself should be rolled back.
1707    ///
1708    /// # Panics
1709    ///
1710    /// Panics if the lease was already resolved.
1711    pub fn abort_lease(&mut self) -> Result<(), crate::cx::NameLeaseError> {
1712        self.lease
1713            .as_mut()
1714            .expect("lease already resolved")
1715            .abort()
1716            .map(|_proof| ())
1717    }
1718
1719    /// Take ownership of the lease (for manual lifecycle management).
1720    ///
1721    /// After this call, the handle no longer owns the lease; the caller is
1722    /// responsible for resolving it.
1723    pub fn take_lease(&mut self) -> Option<crate::cx::NameLease> {
1724        self.lease.take()
1725    }
1726}
1727
1728// ============================================================================
1729// Supervised named-start helper (bd-1hvy1)
1730// ============================================================================
1731
1732/// Child-start helper for running a named GenServer under supervision.
1733///
1734/// This adapter wires together:
1735/// 1. Name lease acquisition (`spawn_named_gen_server`)
1736/// 2. Server task storage in runtime state
1737/// 3. Deterministic lease/name cleanup on region stop via a sync finalizer
1738///
1739/// Use this when building [`crate::supervision::ChildSpec`] entries for named
1740/// services.
1741pub struct NamedGenServerStart<S, F>
1742where
1743    S: GenServer,
1744    F: FnMut() -> S + Send + 'static,
1745{
1746    registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
1747    name: String,
1748    mailbox_capacity: usize,
1749    make_server: F,
1750}
1751
1752/// Construct a [`NamedGenServerStart`] helper for supervised named services.
1753#[must_use]
1754pub fn named_gen_server_start<S, F>(
1755    registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
1756    name: impl Into<String>,
1757    mailbox_capacity: usize,
1758    make_server: F,
1759) -> NamedGenServerStart<S, F>
1760where
1761    S: GenServer,
1762    F: FnMut() -> S + Send + 'static,
1763{
1764    NamedGenServerStart {
1765        registry,
1766        name: name.into(),
1767        mailbox_capacity,
1768        make_server,
1769    }
1770}
1771
1772impl<S, F> crate::supervision::ChildStart for NamedGenServerStart<S, F>
1773where
1774    S: GenServer,
1775    F: FnMut() -> S + Send + 'static,
1776{
1777    fn start(
1778        &mut self,
1779        scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
1780        state: &mut crate::runtime::RuntimeState,
1781        cx: &crate::cx::Cx,
1782    ) -> Result<TaskId, SpawnError> {
1783        let now = crate::types::Time::ZERO;
1784        let server = (self.make_server)();
1785        let (mut named_handle, stored) = scope
1786            .spawn_named_gen_server(
1787                state,
1788                cx,
1789                &mut self.registry.lock(),
1790                self.name.clone(),
1791                server,
1792                self.mailbox_capacity,
1793                now,
1794            )
1795            .map_err(|err| match err {
1796                NamedSpawnError::Spawn(spawn_err) => spawn_err,
1797                NamedSpawnError::NameTaken(name_err) => SpawnError::NameRegistrationFailed {
1798                    name: self.name.clone(),
1799                    reason: name_err.to_string(),
1800                },
1801            })?;
1802
1803        let task_id = named_handle.task_id();
1804        state.store_spawned_task(task_id, stored);
1805
1806        let lease_slot = Arc::new(parking_lot::Mutex::new(named_handle.take_lease()));
1807        let lease_slot_for_finalizer = Arc::clone(&lease_slot);
1808        let registry_for_finalizer = Arc::clone(&self.registry);
1809        let finalizer_registered = scope.defer_sync(state, move || {
1810            let _ = registry_for_finalizer.lock().cleanup_task(task_id);
1811            let lease_to_resolve = lease_slot_for_finalizer.lock().take();
1812            if let Some(mut lease) = lease_to_resolve {
1813                let _ = lease.release();
1814            }
1815        });
1816
1817        if !finalizer_registered {
1818            let _ = self.registry.lock().cleanup_task(task_id);
1819            let lease_to_abort = lease_slot.lock().take();
1820            if let Some(mut lease) = lease_to_abort {
1821                let _ = lease.abort();
1822            }
1823            return Err(SpawnError::RegionClosed(scope.region_id()));
1824        }
1825
1826        Ok(task_id)
1827    }
1828}
1829
1830// ============================================================================
1831// Tests
1832// ============================================================================
1833
1834#[cfg(test)]
1835mod tests {
1836    use super::*;
1837    use crate::runtime::state::RuntimeState;
1838    use crate::supervision::ChildStart;
1839    use crate::types::policy::FailFast;
1840    use crate::types::Budget;
1841    use crate::types::CancelKind;
1842    use crate::util::ArenaIndex;
1843    use std::sync::atomic::{AtomicU64, Ordering};
1844    use std::sync::{Arc, Mutex};
1845
1846    fn init_test(name: &str) {
1847        crate::test_utils::init_test_logging();
1848        crate::test_phase!(name);
1849    }
1850
1851    // ---- Simple Counter GenServer ----
1852
1853    struct Counter {
1854        count: u64,
1855    }
1856
1857    enum CounterCall {
1858        Get,
1859        Add(u64),
1860    }
1861
1862    enum CounterCast {
1863        Reset,
1864    }
1865
1866    impl GenServer for Counter {
1867        type Call = CounterCall;
1868        type Reply = u64;
1869        type Cast = CounterCast;
1870        type Info = SystemMsg;
1871
1872        fn handle_call(
1873            &mut self,
1874            _cx: &Cx,
1875            request: CounterCall,
1876            reply: Reply<u64>,
1877        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1878            match request {
1879                CounterCall::Get => {
1880                    let _ = reply.send(self.count);
1881                }
1882                CounterCall::Add(n) => {
1883                    self.count += n;
1884                    let _ = reply.send(self.count);
1885                }
1886            }
1887            Box::pin(async {})
1888        }
1889
1890        fn handle_cast(
1891            &mut self,
1892            _cx: &Cx,
1893            msg: CounterCast,
1894        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1895            match msg {
1896                CounterCast::Reset => self.count = 0,
1897            }
1898            Box::pin(async {})
1899        }
1900    }
1901
1902    #[derive(Clone)]
1903    struct StartBudgetProbe {
1904        started_priority: Arc<AtomicU8>,
1905        loop_priority: Arc<AtomicU8>,
1906    }
1907
1908    impl GenServer for StartBudgetProbe {
1909        type Call = CounterCall;
1910        type Reply = u8;
1911        type Cast = CounterCast;
1912        type Info = SystemMsg;
1913
1914        fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1915            self.started_priority
1916                .store(cx.budget().priority, Ordering::SeqCst);
1917            Box::pin(async {})
1918        }
1919
1920        fn on_start_budget(&self) -> Budget {
1921            Budget::new().with_priority(200)
1922        }
1923
1924        fn handle_call(
1925            &mut self,
1926            cx: &Cx,
1927            _request: CounterCall,
1928            reply: Reply<u8>,
1929        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1930            self.loop_priority
1931                .store(cx.budget().priority, Ordering::SeqCst);
1932            let _ = reply.send(cx.budget().priority);
1933            Box::pin(async {})
1934        }
1935    }
1936
1937    struct StopMaskProbe {
1938        stop_checkpoint_ok: Arc<AtomicU8>,
1939    }
1940
1941    impl GenServer for StopMaskProbe {
1942        type Call = CounterCall;
1943        type Reply = u8;
1944        type Cast = CounterCast;
1945        type Info = SystemMsg;
1946
1947        fn handle_call(
1948            &mut self,
1949            _cx: &Cx,
1950            _request: CounterCall,
1951            reply: Reply<u8>,
1952        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1953            let _ = reply.send(0);
1954            Box::pin(async {})
1955        }
1956
1957        fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1958            let ok = cx.checkpoint().is_ok();
1959            self.stop_checkpoint_ok
1960                .store(u8::from(ok), Ordering::SeqCst);
1961            Box::pin(async {})
1962        }
1963    }
1964
1965    fn assert_gen_server<S: GenServer>() {}
1966
1967    #[test]
1968    fn gen_server_trait_bounds() {
1969        init_test("gen_server_trait_bounds");
1970        assert_gen_server::<Counter>();
1971        crate::test_complete!("gen_server_trait_bounds");
1972    }
1973
1974    #[test]
1975    fn gen_server_spawn_and_cast() {
1976        init_test("gen_server_spawn_and_cast");
1977
1978        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1979        let region = runtime.state.create_root_region(Budget::INFINITE);
1980        let cx = Cx::for_testing();
1981        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1982
1983        let (handle, stored) = scope
1984            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
1985            .unwrap();
1986        let task_id = handle.task_id();
1987        runtime.state.store_spawned_task(task_id, stored);
1988
1989        // Cast a reset (fire-and-forget)
1990        handle.try_cast(CounterCast::Reset).unwrap();
1991
1992        // Drop handle to disconnect
1993        drop(handle);
1994
1995        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1996        runtime.run_until_quiescent();
1997
1998        crate::test_complete!("gen_server_spawn_and_cast");
1999    }
2000
2001    #[test]
2002    fn gen_server_spawn_and_call() {
2003        init_test("gen_server_spawn_and_call");
2004
2005        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2006        let region = runtime.state.create_root_region(Budget::INFINITE);
2007        let cx = Cx::for_testing();
2008        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2009
2010        let (handle, stored) = scope
2011            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2012            .unwrap();
2013        let server_task_id = handle.task_id();
2014        runtime.state.store_spawned_task(server_task_id, stored);
2015
2016        let server_ref = handle.server_ref();
2017        let (client_handle, client_stored) = scope
2018            .spawn(&mut runtime.state, &cx, move |cx| async move {
2019                server_ref.call(&cx, CounterCall::Add(5)).await.unwrap()
2020            })
2021            .unwrap();
2022        let client_task_id = client_handle.task_id();
2023        runtime
2024            .state
2025            .store_spawned_task(client_task_id, client_stored);
2026
2027        runtime
2028            .scheduler
2029            .lock()
2030            .unwrap()
2031            .schedule(server_task_id, 0);
2032        runtime
2033            .scheduler
2034            .lock()
2035            .unwrap()
2036            .schedule(client_task_id, 0);
2037        runtime.run_until_quiescent();
2038
2039        let result =
2040            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2041        assert_eq!(result, 5);
2042
2043        crate::test_complete!("gen_server_spawn_and_call");
2044    }
2045
2046    #[test]
2047    fn gen_server_call_cancellation_is_deterministic() {
2048        init_test("gen_server_call_cancellation_is_deterministic");
2049
2050        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2051        let region = runtime.state.create_root_region(Budget::INFINITE);
2052        let cx = Cx::for_testing();
2053        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2054
2055        let (handle, stored) = scope
2056            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2057            .unwrap();
2058        let server_task_id = handle.task_id();
2059        runtime.state.store_spawned_task(server_task_id, stored);
2060
2061        let server_ref = handle.server_ref();
2062
2063        let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2064        let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2065
2066        let (client_handle, client_stored) = scope
2067            .spawn(&mut runtime.state, &cx, move |cx| async move {
2068                *client_cx_cell_for_task.lock().expect("lock poisoned") = Some(cx.clone());
2069                server_ref.call(&cx, CounterCall::Get).await
2070            })
2071            .unwrap();
2072        let client_task_id = client_handle.task_id();
2073        runtime
2074            .state
2075            .store_spawned_task(client_task_id, client_stored);
2076
2077        // Poll the client once: it should enqueue the call and then block waiting for reply.
2078        runtime
2079            .scheduler
2080            .lock()
2081            .unwrap()
2082            .schedule(client_task_id, 0);
2083        runtime.run_until_idle();
2084
2085        // Cancel the client deterministically, then poll it again to observe the cancellation.
2086        let client_cx = client_cx_cell
2087            .lock()
2088            .expect("lock poisoned")
2089            .clone()
2090            .expect("client cx published");
2091        client_cx.cancel_with(CancelKind::User, Some("gen_server call cancelled"));
2092
2093        runtime
2094            .scheduler
2095            .lock()
2096            .unwrap()
2097            .schedule(client_task_id, 0);
2098        runtime.run_until_idle();
2099
2100        let result =
2101            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2102        match result {
2103            Ok(_) => unreachable!("expected cancellation, got Ok"),
2104            Err(CallError::Cancelled(reason)) => {
2105                assert_eq!(reason.kind, CancelKind::User);
2106                assert_eq!(reason.message, Some("gen_server call cancelled"));
2107            }
2108            Err(other) => unreachable!("expected CallError::Cancelled, got {other:?}"),
2109        }
2110
2111        // Cleanup: disconnect the server and let it drain the queued call.
2112        drop(handle);
2113        runtime
2114            .scheduler
2115            .lock()
2116            .unwrap()
2117            .schedule(server_task_id, 0);
2118        runtime.run_until_quiescent();
2119
2120        crate::test_complete!("gen_server_call_cancellation_is_deterministic");
2121    }
2122
2123    #[test]
2124    fn supervised_gen_server_stays_alive() {
2125        init_test("supervised_gen_server_stays_alive");
2126
2127        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2128        let region = runtime.state.create_root_region(Budget::INFINITE);
2129        let cx = Cx::for_testing();
2130        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2131        let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
2132
2133        let mut starter =
2134            named_gen_server_start(Arc::clone(&registry), "persistent_service", 32, || {
2135                Counter { count: 0 }
2136            });
2137
2138        let task_id = starter
2139            .start(&scope, &mut runtime.state, &cx)
2140            .expect("start ok");
2141
2142        // Run runtime. The server should start, init, and enter loop.
2143        // It should NOT exit just because the starter dropped the handle.
2144        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2145        runtime.run_until_idle();
2146
2147        let task = runtime.state.task(task_id).expect("task exists");
2148        crate::assert_with_log!(
2149            !task.state.is_terminal(),
2150            "server should be alive",
2151            "Running",
2152            format!("{:?}", task.state)
2153        );
2154
2155        // Cleanup: cancel the region and drive the cancellation to quiescence.
2156        let tasks_to_schedule =
2157            runtime
2158                .state
2159                .cancel_request(region, &CancelReason::user("test done"), None);
2160        for (tid, priority) in tasks_to_schedule {
2161            runtime.scheduler.lock().unwrap().schedule(tid, priority);
2162        }
2163        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2164        runtime.run_until_quiescent();
2165
2166        assert!(
2167            registry.lock().whereis("persistent_service").is_none(),
2168            "name must be removed after region stop",
2169        );
2170
2171        crate::test_complete!("supervised_gen_server_stays_alive");
2172    }
2173
2174    #[test]
2175    fn gen_server_cast_cancellation_is_deterministic() {
2176        init_test("gen_server_cast_cancellation_is_deterministic");
2177
2178        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2179        let region = runtime.state.create_root_region(Budget::INFINITE);
2180        let cx = Cx::for_testing();
2181        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2182
2183        // Use a tiny mailbox and pre-fill it so the next cast blocks and is cancelable.
2184        let (handle, stored) = scope
2185            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
2186            .unwrap();
2187        let server_task_id = handle.task_id();
2188        runtime.state.store_spawned_task(server_task_id, stored);
2189
2190        let server_ref = handle.server_ref();
2191
2192        futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset))
2193            .expect("prefill cast ok");
2194
2195        let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2196        let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2197
2198        let (client_handle, client_stored) = scope
2199            .spawn(&mut runtime.state, &cx, move |cx| async move {
2200                *client_cx_cell_for_task.lock().expect("lock poisoned") = Some(cx.clone());
2201                server_ref.cast(&cx, CounterCast::Reset).await
2202            })
2203            .unwrap();
2204        let client_task_id = client_handle.task_id();
2205        runtime
2206            .state
2207            .store_spawned_task(client_task_id, client_stored);
2208
2209        // Poll the client once: it should block waiting for mailbox capacity.
2210        runtime
2211            .scheduler
2212            .lock()
2213            .unwrap()
2214            .schedule(client_task_id, 0);
2215        runtime.run_until_idle();
2216
2217        // Cancel the client deterministically, then poll it again to observe the cancellation.
2218        let client_cx = client_cx_cell
2219            .lock()
2220            .expect("lock poisoned")
2221            .clone()
2222            .expect("client cx published");
2223        client_cx.cancel_with(CancelKind::User, Some("gen_server cast cancelled"));
2224
2225        runtime
2226            .scheduler
2227            .lock()
2228            .unwrap()
2229            .schedule(client_task_id, 0);
2230        runtime.run_until_idle();
2231
2232        let result =
2233            futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2234        match result {
2235            Ok(()) => unreachable!("expected cancellation, got Ok"),
2236            Err(CastError::Cancelled(reason)) => {
2237                assert_eq!(reason.kind, CancelKind::User);
2238                assert_eq!(reason.message, Some("gen_server cast cancelled"));
2239            }
2240            Err(other) => unreachable!("expected CastError::Cancelled, got {other:?}"),
2241        }
2242
2243        // Cleanup: disconnect the server and let it drain the mailbox.
2244        drop(handle);
2245        runtime
2246            .scheduler
2247            .lock()
2248            .unwrap()
2249            .schedule(server_task_id, 0);
2250        runtime.run_until_quiescent();
2251
2252        crate::test_complete!("gen_server_cast_cancellation_is_deterministic");
2253    }
2254
2255    #[test]
2256    fn gen_server_handle_accessors() {
2257        init_test("gen_server_handle_accessors");
2258
2259        let mut state = RuntimeState::new();
2260        let root = state.create_root_region(Budget::INFINITE);
2261        let cx = Cx::for_testing();
2262        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2263
2264        let (handle, stored) = scope
2265            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2266            .unwrap();
2267        state.store_spawned_task(handle.task_id(), stored);
2268
2269        let _actor_id = handle.actor_id();
2270        let _task_id = handle.task_id();
2271        assert!(!handle.is_finished());
2272
2273        let server_ref = handle.server_ref();
2274        assert_eq!(server_ref.actor_id(), handle.actor_id());
2275        assert!(server_ref.is_alive());
2276        assert!(!server_ref.is_closed());
2277
2278        crate::test_complete!("gen_server_handle_accessors");
2279    }
2280
2281    #[test]
2282    fn gen_server_ref_is_cloneable() {
2283        init_test("gen_server_ref_is_cloneable");
2284
2285        let mut state = RuntimeState::new();
2286        let root = state.create_root_region(Budget::INFINITE);
2287        let cx = Cx::for_testing();
2288        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2289
2290        let (handle, stored) = scope
2291            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2292            .unwrap();
2293        state.store_spawned_task(handle.task_id(), stored);
2294
2295        let ref1 = handle.server_ref();
2296        let ref2 = ref1.clone();
2297        assert_eq!(ref1.actor_id(), ref2.actor_id());
2298
2299        crate::test_complete!("gen_server_ref_is_cloneable");
2300    }
2301
2302    #[test]
2303    fn gen_server_stop_transitions() {
2304        init_test("gen_server_stop_transitions");
2305
2306        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2307        let region = runtime.state.create_root_region(Budget::INFINITE);
2308        let cx = Cx::for_testing();
2309        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2310
2311        let (handle, stored) = scope
2312            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2313            .unwrap();
2314        let task_id = handle.task_id();
2315        runtime.state.store_spawned_task(task_id, stored);
2316
2317        let server_ref = handle.server_ref();
2318        assert!(server_ref.is_alive());
2319
2320        handle.stop();
2321
2322        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2323        runtime.run_until_quiescent();
2324
2325        assert!(handle.is_finished());
2326        assert!(!server_ref.is_alive());
2327
2328        crate::test_complete!("gen_server_stop_transitions");
2329    }
2330
2331    #[test]
2332    fn gen_server_handle_rejects_call_and_cast_after_stop() {
2333        init_test("gen_server_handle_rejects_call_and_cast_after_stop");
2334
2335        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2336        let region = runtime.state.create_root_region(Budget::INFINITE);
2337        let cx = Cx::for_testing();
2338        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2339
2340        let (handle, stored) = scope
2341            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2342            .unwrap();
2343        let task_id = handle.task_id();
2344        runtime.state.store_spawned_task(task_id, stored);
2345
2346        // Let the server start, then request stop.
2347        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2348        runtime.run_until_idle();
2349        handle.stop();
2350
2351        let call_err =
2352            futures_lite::future::block_on(handle.call(&cx, CounterCall::Get)).unwrap_err();
2353        assert!(
2354            matches!(call_err, CallError::ServerStopped),
2355            "call after stop must return ServerStopped, got {call_err:?}"
2356        );
2357
2358        let cast_err =
2359            futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset)).unwrap_err();
2360        assert!(
2361            matches!(cast_err, CastError::ServerStopped),
2362            "cast after stop must return ServerStopped, got {cast_err:?}"
2363        );
2364
2365        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2366        runtime.run_until_quiescent();
2367        assert!(handle.is_finished());
2368
2369        crate::test_complete!("gen_server_handle_rejects_call_and_cast_after_stop");
2370    }
2371
2372    #[test]
2373    fn gen_server_handle_join_returns_final_state_after_stop() {
2374        init_test("gen_server_handle_join_returns_final_state_after_stop");
2375
2376        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2377        let region = runtime.state.create_root_region(Budget::INFINITE);
2378        let cx = Cx::for_testing();
2379        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2380
2381        let (handle, stored) = scope
2382            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2383            .unwrap();
2384        let task_id = handle.task_id();
2385        runtime.state.store_spawned_task(task_id, stored);
2386
2387        handle.stop();
2388        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2389        runtime.run_until_quiescent();
2390        assert!(handle.is_finished());
2391
2392        let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("join");
2393        assert_eq!(
2394            final_state.count, 0,
2395            "final server state should be returned"
2396        );
2397
2398        crate::test_complete!("gen_server_handle_join_returns_final_state_after_stop");
2399    }
2400
2401    #[test]
2402    fn gen_server_stop_wakes_blocked_mailbox_recv() {
2403        #[allow(clippy::items_after_statements)]
2404        struct StopWakeProbe {
2405            stop_ran: Arc<AtomicU8>,
2406        }
2407
2408        #[allow(clippy::items_after_statements)]
2409        impl GenServer for StopWakeProbe {
2410            type Call = CounterCall;
2411            type Reply = u64;
2412            type Cast = CounterCast;
2413            type Info = SystemMsg;
2414
2415            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2416                self.stop_ran.store(1, Ordering::SeqCst);
2417                Box::pin(async {})
2418            }
2419
2420            fn handle_call(
2421                &mut self,
2422                _cx: &Cx,
2423                _request: CounterCall,
2424                reply: Reply<u64>,
2425            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2426                let _ = reply.send(0);
2427                Box::pin(async {})
2428            }
2429        }
2430
2431        init_test("gen_server_stop_wakes_blocked_mailbox_recv");
2432
2433        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2434        let region = runtime.state.create_root_region(Budget::INFINITE);
2435        let cx = Cx::for_testing();
2436        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2437
2438        let stop_ran = Arc::new(AtomicU8::new(0));
2439        let server = StopWakeProbe {
2440            stop_ran: Arc::clone(&stop_ran),
2441        };
2442
2443        let (handle, stored) = scope
2444            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2445            .unwrap();
2446        let server_task_id = handle.task_id();
2447        runtime.state.store_spawned_task(server_task_id, stored);
2448
2449        // Start server and let it park waiting on mailbox.recv().
2450        runtime
2451            .scheduler
2452            .lock()
2453            .unwrap()
2454            .schedule(server_task_id, 0);
2455        runtime.run_until_idle();
2456
2457        // Stop should wake the blocked recv waiter. No manual reschedule here.
2458        handle.stop();
2459        runtime.run_until_quiescent();
2460
2461        assert_eq!(
2462            stop_ran.load(Ordering::SeqCst),
2463            1,
2464            "on_stop should run after stop wakes blocked recv"
2465        );
2466        assert!(handle.is_finished(), "server should finish after stop");
2467
2468        crate::test_complete!("gen_server_stop_wakes_blocked_mailbox_recv");
2469    }
2470
2471    // ---- Observable GenServer for E2E ----
2472
2473    struct ObservableCounter {
2474        count: u64,
2475        final_count: Arc<AtomicU64>,
2476    }
2477
2478    impl GenServer for ObservableCounter {
2479        type Call = CounterCall;
2480        type Reply = u64;
2481        type Cast = CounterCast;
2482        type Info = SystemMsg;
2483
2484        fn handle_call(
2485            &mut self,
2486            _cx: &Cx,
2487            request: CounterCall,
2488            reply: Reply<u64>,
2489        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2490            match request {
2491                CounterCall::Get => {
2492                    let _ = reply.send(self.count);
2493                }
2494                CounterCall::Add(n) => {
2495                    self.count += n;
2496                    let _ = reply.send(self.count);
2497                }
2498            }
2499            Box::pin(async {})
2500        }
2501
2502        fn handle_cast(
2503            &mut self,
2504            _cx: &Cx,
2505            msg: CounterCast,
2506        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2507            match msg {
2508                CounterCast::Reset => self.count = 0,
2509            }
2510            Box::pin(async {})
2511        }
2512
2513        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2514            self.final_count.store(self.count, Ordering::SeqCst);
2515            Box::pin(async {})
2516        }
2517    }
2518
2519    #[test]
2520    fn gen_server_processes_casts_before_stop() {
2521        init_test("gen_server_processes_casts_before_stop");
2522
2523        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2524        let region = runtime.state.create_root_region(Budget::INFINITE);
2525        let cx = Cx::for_testing();
2526        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2527
2528        let final_count = Arc::new(AtomicU64::new(u64::MAX));
2529        let server = ObservableCounter {
2530            count: 0,
2531            final_count: final_count.clone(),
2532        };
2533
2534        let (handle, stored) = scope
2535            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2536            .unwrap();
2537        let task_id = handle.task_id();
2538        runtime.state.store_spawned_task(task_id, stored);
2539
2540        // Start the server so casts are accepted.
2541        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2542        runtime.run_until_idle();
2543
2544        // Queue a handful of casts, then disconnect. Shutdown must drain the mailbox
2545        // before running on_stop, so the final count reflects the cast effects.
2546        for _ in 0..5 {
2547            handle.try_cast(CounterCast::Reset).expect("try_cast ok");
2548        }
2549
2550        handle.stop();
2551
2552        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2553        runtime.run_until_quiescent();
2554
2555        // Final count should be 0 (5 resets)
2556        assert_eq!(
2557            final_count.load(Ordering::SeqCst),
2558            0,
2559            "on_stop recorded final count"
2560        );
2561
2562        crate::test_complete!("gen_server_processes_casts_before_stop");
2563    }
2564
2565    #[test]
2566    fn gen_server_deterministic_replay() {
2567        fn run_scenario(seed: u64) -> u64 {
2568            let config = crate::lab::LabConfig::new(seed);
2569            let mut runtime = crate::lab::LabRuntime::new(config);
2570            let region = runtime.state.create_root_region(Budget::INFINITE);
2571            let cx = Cx::for_testing();
2572            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2573
2574            let final_count = Arc::new(AtomicU64::new(u64::MAX));
2575            let server = ObservableCounter {
2576                count: 0,
2577                final_count: final_count.clone(),
2578            };
2579
2580            let (handle, stored) = scope
2581                .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2582                .unwrap();
2583            let task_id = handle.task_id();
2584            runtime.state.store_spawned_task(task_id, stored);
2585
2586            runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2587            runtime.run_until_idle();
2588
2589            // 5 resets then disconnect
2590            for _ in 0..5 {
2591                handle.try_cast(CounterCast::Reset).expect("try_cast ok");
2592            }
2593            handle.stop();
2594
2595            runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2596            runtime.run_until_quiescent();
2597
2598            final_count.load(Ordering::SeqCst)
2599        }
2600
2601        init_test("gen_server_deterministic_replay");
2602
2603        let result1 = run_scenario(0xCAFE_BABE);
2604        let result2 = run_scenario(0xCAFE_BABE);
2605        assert_eq!(result1, result2, "deterministic replay");
2606
2607        crate::test_complete!("gen_server_deterministic_replay");
2608    }
2609
2610    // ---- System/info message tests (bd-188ey) ----
2611
2612    #[derive(Default)]
2613    struct InfoRecorder {
2614        seen: Arc<Mutex<Vec<String>>>,
2615    }
2616
2617    impl GenServer for InfoRecorder {
2618        type Call = ();
2619        type Reply = ();
2620        type Cast = ();
2621        type Info = SystemMsg;
2622
2623        fn handle_call(
2624            &mut self,
2625            _cx: &Cx,
2626            _request: (),
2627            reply: Reply<()>,
2628        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2629            let _ = reply.send(());
2630            Box::pin(async {})
2631        }
2632
2633        fn handle_info(
2634            &mut self,
2635            _cx: &Cx,
2636            msg: Self::Info,
2637        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2638            let seen = Arc::clone(&self.seen);
2639            Box::pin(async move {
2640                seen.lock().expect("lock poisoned").push(format!("{msg:?}"));
2641            })
2642        }
2643    }
2644
2645    fn tid(n: u32) -> TaskId {
2646        TaskId::from_arena(ArenaIndex::new(n, 0))
2647    }
2648
2649    fn rid(n: u32) -> crate::types::RegionId {
2650        crate::types::RegionId::from_arena(ArenaIndex::new(n, 0))
2651    }
2652
2653    /// Conformance: app shutdown batches use SYS-ORDER
2654    /// (`vt`, `Down < Exit < Timeout`, stable subject key).
2655    #[test]
2656    fn conformance_system_msg_sort_key_orders_shutdown_batch() {
2657        init_test("conformance_system_msg_sort_key_orders_shutdown_batch");
2658
2659        let mut monitors = crate::monitor::MonitorSet::new();
2660        let mref_down_6 = monitors.establish(tid(90), rid(0), tid(6));
2661        let mref_down_3 = monitors.establish(tid(91), rid(0), tid(3));
2662
2663        let mut batch = SystemMsgBatch::new();
2664        batch.push(SystemMsg::Exit {
2665            exit_vt: Time::from_secs(10),
2666            from: tid(6),
2667            reason: DownReason::Normal,
2668        });
2669        batch.push(SystemMsg::Timeout {
2670            tick_vt: Time::from_secs(10),
2671            id: 4,
2672        });
2673        batch.push(SystemMsg::Down {
2674            completion_vt: Time::from_secs(10),
2675            notification: DownNotification {
2676                monitored: tid(6),
2677                reason: DownReason::Normal,
2678                monitor_ref: mref_down_6,
2679            },
2680        });
2681        batch.push(SystemMsg::Timeout {
2682            tick_vt: Time::from_secs(9),
2683            id: 99,
2684        });
2685        batch.push(SystemMsg::Down {
2686            completion_vt: Time::from_secs(10),
2687            notification: DownNotification {
2688                monitored: tid(3),
2689                reason: DownReason::Normal,
2690                monitor_ref: mref_down_3,
2691            },
2692        });
2693        batch.push(SystemMsg::Exit {
2694            exit_vt: Time::from_secs(10),
2695            from: tid(2),
2696            reason: DownReason::Normal,
2697        });
2698        batch.push(SystemMsg::Timeout {
2699            tick_vt: Time::from_secs(10),
2700            id: 1,
2701        });
2702
2703        let sorted = batch.into_sorted();
2704        let keys: Vec<_> = sorted.iter().map(SystemMsg::sort_key).collect();
2705
2706        assert_eq!(
2707            keys,
2708            vec![
2709                (Time::from_secs(9), 2, SystemMsgSubjectKey::TimeoutId(99)),
2710                (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(3))),
2711                (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(6))),
2712                (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(2))),
2713                (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(6))),
2714                (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(1)),
2715                (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(4)),
2716            ],
2717            "shutdown system-message ordering must follow SYS-ORDER"
2718        );
2719
2720        crate::test_complete!("conformance_system_msg_sort_key_orders_shutdown_batch");
2721    }
2722
2723    /// Conformance: `SystemMsgBatch::into_sorted` is equivalent to explicit
2724    /// `sort_by_key(SystemMsg::sort_key)`.
2725    #[test]
2726    fn conformance_system_msg_batch_matches_explicit_sort() {
2727        init_test("conformance_system_msg_batch_matches_explicit_sort");
2728
2729        let mut monitors = crate::monitor::MonitorSet::new();
2730        let mref = monitors.establish(tid(77), rid(0), tid(8));
2731
2732        let messages = vec![
2733            SystemMsg::Timeout {
2734                tick_vt: Time::from_secs(12),
2735                id: 4,
2736            },
2737            SystemMsg::Exit {
2738                exit_vt: Time::from_secs(11),
2739                from: tid(8),
2740                reason: DownReason::Error("boom".to_string()),
2741            },
2742            SystemMsg::Down {
2743                completion_vt: Time::from_secs(11),
2744                notification: DownNotification {
2745                    monitored: tid(8),
2746                    reason: DownReason::Normal,
2747                    monitor_ref: mref,
2748                },
2749            },
2750            SystemMsg::Timeout {
2751                tick_vt: Time::from_secs(11),
2752                id: 2,
2753            },
2754        ];
2755
2756        let mut batch = SystemMsgBatch::new();
2757        for msg in messages.clone() {
2758            batch.push(msg);
2759        }
2760        let batched = batch.into_sorted();
2761
2762        let mut explicit = messages;
2763        explicit.sort_by_key(SystemMsg::sort_key);
2764
2765        let batched_keys: Vec<_> = batched.iter().map(SystemMsg::sort_key).collect();
2766        let explicit_keys: Vec<_> = explicit.iter().map(SystemMsg::sort_key).collect();
2767        assert_eq!(batched_keys, explicit_keys);
2768
2769        crate::test_complete!("conformance_system_msg_batch_matches_explicit_sort");
2770    }
2771
2772    #[test]
2773    fn system_msg_payload_types_roundtrip_via_conversions() {
2774        init_test("system_msg_payload_types_roundtrip_via_conversions");
2775
2776        let mut monitors = crate::monitor::MonitorSet::new();
2777        let mref = monitors.establish(tid(7), rid(0), tid(8));
2778
2779        let down = DownMsg::new(
2780            Time::from_secs(11),
2781            DownNotification {
2782                monitored: tid(8),
2783                reason: DownReason::Normal,
2784                monitor_ref: mref,
2785            },
2786        );
2787        let down_msg = SystemMsg::down(down.clone());
2788        let down_back = DownMsg::try_from(down_msg).expect("down conversion");
2789        assert_eq!(down_back.completion_vt, down.completion_vt);
2790        assert_eq!(
2791            down_back.notification.monitored,
2792            down.notification.monitored
2793        );
2794        assert_eq!(down_back.notification.reason, down.notification.reason);
2795        assert_eq!(
2796            down_back.notification.monitor_ref,
2797            down.notification.monitor_ref
2798        );
2799
2800        let exit = ExitMsg::new(
2801            Time::from_secs(12),
2802            tid(9),
2803            DownReason::Error("boom".into()),
2804        );
2805        let exit_msg = SystemMsg::exit(exit.clone());
2806        let exit_back = ExitMsg::try_from(exit_msg).expect("exit conversion");
2807        assert_eq!(exit_back, exit);
2808
2809        let timeout = TimeoutMsg::new(Time::from_secs(13), 42);
2810        let timeout_msg = SystemMsg::timeout(timeout);
2811        let timeout_back = TimeoutMsg::try_from(timeout_msg).expect("timeout conversion");
2812        assert_eq!(timeout_back, timeout);
2813
2814        crate::test_complete!("system_msg_payload_types_roundtrip_via_conversions");
2815    }
2816
2817    #[test]
2818    fn system_msg_try_from_mismatch_returns_original_variant() {
2819        init_test("system_msg_try_from_mismatch_returns_original_variant");
2820        let mut monitors = crate::monitor::MonitorSet::new();
2821        let mref = monitors.establish(tid(10), rid(0), tid(1));
2822
2823        let timeout = SystemMsg::Timeout {
2824            tick_vt: Time::from_secs(5),
2825            id: 99,
2826        };
2827        let err = DownMsg::try_from(timeout).expect_err("timeout is not down");
2828        assert!(matches!(err, SystemMsg::Timeout { id: 99, .. }));
2829
2830        let down = SystemMsg::Down {
2831            completion_vt: Time::from_secs(6),
2832            notification: DownNotification {
2833                monitored: tid(1),
2834                reason: DownReason::Normal,
2835                monitor_ref: mref,
2836            },
2837        };
2838        let err = TimeoutMsg::try_from(down).expect_err("down is not timeout");
2839        assert!(matches!(err, SystemMsg::Down { .. }));
2840
2841        crate::test_complete!("system_msg_try_from_mismatch_returns_original_variant");
2842    }
2843
2844    #[test]
2845    fn gen_server_handle_info_receives_system_messages() {
2846        init_test("gen_server_handle_info_receives_system_messages");
2847
2848        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2849        let region = runtime.state.create_root_region(Budget::INFINITE);
2850        let cx = Cx::for_testing();
2851        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2852
2853        let seen: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
2854        let server = InfoRecorder {
2855            seen: Arc::clone(&seen),
2856        };
2857
2858        let (handle, stored) = scope
2859            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2860            .unwrap();
2861        let server_task_id = handle.task_id();
2862        runtime.state.store_spawned_task(server_task_id, stored);
2863
2864        let mut monitors = crate::monitor::MonitorSet::new();
2865        let mref = monitors.establish(tid(10), rid(0), tid(11));
2866
2867        handle
2868            .try_info(SystemMsg::Down {
2869                completion_vt: Time::from_secs(5),
2870                notification: DownNotification {
2871                    monitored: tid(11),
2872                    reason: DownReason::Normal,
2873                    monitor_ref: mref,
2874                },
2875            })
2876            .unwrap();
2877
2878        handle
2879            .try_info(SystemMsg::Exit {
2880                exit_vt: Time::from_secs(6),
2881                from: tid(12),
2882                reason: DownReason::Error("boom".into()),
2883            })
2884            .unwrap();
2885
2886        handle
2887            .try_info(SystemMsg::Timeout {
2888                tick_vt: Time::from_secs(7),
2889                id: 123,
2890            })
2891            .unwrap();
2892
2893        drop(handle);
2894
2895        runtime
2896            .scheduler
2897            .lock()
2898            .unwrap()
2899            .schedule(server_task_id, 0);
2900        runtime.run_until_quiescent();
2901
2902        let seen = seen.lock().expect("lock poisoned");
2903        assert_eq!(seen.len(), 3);
2904        assert!(seen[0].contains("Down"));
2905        assert!(seen[1].contains("Exit"));
2906        assert!(seen[2].contains("Timeout"));
2907        drop(seen);
2908
2909        crate::test_complete!("gen_server_handle_info_receives_system_messages");
2910    }
2911
2912    #[test]
2913    fn gen_server_info_ordering_is_deterministic_for_seed() {
2914        fn run_scenario(seed: u64) -> Vec<String> {
2915            let config = crate::lab::LabConfig::new(seed);
2916            let mut runtime = crate::lab::LabRuntime::new(config);
2917            let region = runtime.state.create_root_region(Budget::INFINITE);
2918            let cx = Cx::for_testing();
2919            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2920
2921            let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
2922            let server = InfoRecorder {
2923                seen: Arc::clone(&events),
2924            };
2925
2926            let (handle, stored) = scope
2927                .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2928                .unwrap();
2929            let server_task_id = handle.task_id();
2930            runtime.state.store_spawned_task(server_task_id, stored);
2931
2932            let server_ref = handle.server_ref();
2933
2934            let (client_a, stored_a) = scope
2935                .spawn(&mut runtime.state, &cx, move |cx| async move {
2936                    server_ref
2937                        .info(
2938                            &cx,
2939                            SystemMsg::Timeout {
2940                                tick_vt: Time::from_secs(2),
2941                                id: 1,
2942                            },
2943                        )
2944                        .await
2945                        .unwrap();
2946                })
2947                .unwrap();
2948            let task_id_a = client_a.task_id();
2949            runtime.state.store_spawned_task(task_id_a, stored_a);
2950
2951            let server_ref_b = handle.server_ref();
2952            let (client_b, stored_b) = scope
2953                .spawn(&mut runtime.state, &cx, move |cx| async move {
2954                    server_ref_b
2955                        .info(
2956                            &cx,
2957                            SystemMsg::Timeout {
2958                                tick_vt: Time::from_secs(2),
2959                                id: 2,
2960                            },
2961                        )
2962                        .await
2963                        .unwrap();
2964                })
2965                .unwrap();
2966            let task_id_b = client_b.task_id();
2967            runtime.state.store_spawned_task(task_id_b, stored_b);
2968
2969            // Let clients enqueue, then let the server drain.
2970            runtime.scheduler.lock().unwrap().schedule(task_id_a, 0);
2971            runtime.scheduler.lock().unwrap().schedule(task_id_b, 0);
2972            runtime
2973                .scheduler
2974                .lock()
2975                .unwrap()
2976                .schedule(server_task_id, 0);
2977
2978            runtime.run_until_quiescent();
2979            drop(handle);
2980            runtime
2981                .scheduler
2982                .lock()
2983                .unwrap()
2984                .schedule(server_task_id, 0);
2985            runtime.run_until_quiescent();
2986
2987            let out = events.lock().expect("lock poisoned").clone();
2988            out
2989        }
2990
2991        init_test("gen_server_info_ordering_is_deterministic_for_seed");
2992
2993        let a = run_scenario(0xD00D_F00D);
2994        let b = run_scenario(0xD00D_F00D);
2995        assert_eq!(
2996            a, b,
2997            "system/info ordering must be deterministic for same seed"
2998        );
2999
3000        crate::test_complete!("gen_server_info_ordering_is_deterministic_for_seed");
3001    }
3002
3003    // ---- DropOldest GenServer for backpressure tests ----
3004
3005    /// A counter that uses DropOldest overflow policy.
3006    struct DropOldestCounter {
3007        count: u64,
3008    }
3009
3010    /// Cast type that carries an identifiable value for eviction testing.
3011    #[derive(Debug, Clone)]
3012    enum TaggedCast {
3013        Set(u64),
3014    }
3015
3016    impl GenServer for DropOldestCounter {
3017        type Call = CounterCall;
3018        type Reply = u64;
3019        type Cast = TaggedCast;
3020        type Info = SystemMsg;
3021
3022        fn cast_overflow_policy(&self) -> CastOverflowPolicy {
3023            CastOverflowPolicy::DropOldest
3024        }
3025
3026        fn handle_call(
3027            &mut self,
3028            _cx: &Cx,
3029            request: CounterCall,
3030            reply: Reply<u64>,
3031        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3032            match request {
3033                CounterCall::Get => {
3034                    let _ = reply.send(self.count);
3035                }
3036                CounterCall::Add(n) => {
3037                    self.count += n;
3038                    let _ = reply.send(self.count);
3039                }
3040            }
3041            Box::pin(async {})
3042        }
3043
3044        fn handle_cast(
3045            &mut self,
3046            _cx: &Cx,
3047            msg: TaggedCast,
3048        ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3049            match msg {
3050                TaggedCast::Set(v) => self.count = v,
3051            }
3052            Box::pin(async {})
3053        }
3054    }
3055
3056    #[test]
3057    fn gen_server_drop_oldest_policy_accessor() {
3058        init_test("gen_server_drop_oldest_policy_accessor");
3059
3060        let mut state = RuntimeState::new();
3061        let root = state.create_root_region(Budget::INFINITE);
3062        let cx = Cx::for_testing();
3063        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3064
3065        let (handle, stored) = scope
3066            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 4)
3067            .unwrap();
3068        state.store_spawned_task(handle.task_id(), stored);
3069
3070        assert_eq!(
3071            handle.cast_overflow_policy(),
3072            CastOverflowPolicy::DropOldest
3073        );
3074
3075        let server_ref = handle.server_ref();
3076        assert_eq!(
3077            server_ref.cast_overflow_policy(),
3078            CastOverflowPolicy::DropOldest
3079        );
3080
3081        crate::test_complete!("gen_server_drop_oldest_policy_accessor");
3082    }
3083
3084    #[test]
3085    fn gen_server_drop_oldest_evicts_when_full() {
3086        init_test("gen_server_drop_oldest_evicts_when_full");
3087
3088        let mut state = RuntimeState::new();
3089        let root = state.create_root_region(Budget::INFINITE);
3090        let cx = Cx::for_testing();
3091        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3092
3093        // Mailbox capacity = 2
3094        let (handle, stored) = scope
3095            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3096            .unwrap();
3097        state.store_spawned_task(handle.task_id(), stored);
3098
3099        // Fill the mailbox (capacity 2)
3100        handle.try_cast(TaggedCast::Set(10)).unwrap();
3101        handle.try_cast(TaggedCast::Set(20)).unwrap();
3102
3103        // This should succeed by evicting the oldest (Set(10))
3104        handle.try_cast(TaggedCast::Set(30)).unwrap();
3105
3106        // And again — evicts Set(20), mailbox now has [Set(30), Set(40)]
3107        handle.try_cast(TaggedCast::Set(40)).unwrap();
3108
3109        crate::test_complete!("gen_server_drop_oldest_evicts_when_full");
3110    }
3111
3112    #[test]
3113    fn gen_server_reject_policy_returns_full() {
3114        init_test("gen_server_reject_policy_returns_full");
3115
3116        let mut state = RuntimeState::new();
3117        let root = state.create_root_region(Budget::INFINITE);
3118        let cx = Cx::for_testing();
3119        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3120
3121        // Default policy (Reject), capacity = 2
3122        let (handle, stored) = scope
3123            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3124            .unwrap();
3125        state.store_spawned_task(handle.task_id(), stored);
3126
3127        assert_eq!(handle.cast_overflow_policy(), CastOverflowPolicy::Reject);
3128
3129        // Fill the mailbox
3130        handle.try_cast(CounterCast::Reset).unwrap();
3131        handle.try_cast(CounterCast::Reset).unwrap();
3132
3133        // Third should fail with Full
3134        let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3135        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3136
3137        crate::test_complete!("gen_server_reject_policy_returns_full");
3138    }
3139
3140    #[test]
3141    fn gen_server_drop_oldest_ref_also_evicts() {
3142        init_test("gen_server_drop_oldest_ref_also_evicts");
3143
3144        let mut state = RuntimeState::new();
3145        let root = state.create_root_region(Budget::INFINITE);
3146        let cx = Cx::for_testing();
3147        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3148
3149        let (handle, stored) = scope
3150            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3151            .unwrap();
3152        state.store_spawned_task(handle.task_id(), stored);
3153
3154        let server_ref = handle.server_ref();
3155
3156        // Fill via ref
3157        server_ref.try_cast(TaggedCast::Set(1)).unwrap();
3158        server_ref.try_cast(TaggedCast::Set(2)).unwrap();
3159
3160        // Evict oldest via ref — should succeed
3161        server_ref.try_cast(TaggedCast::Set(3)).unwrap();
3162
3163        crate::test_complete!("gen_server_drop_oldest_ref_also_evicts");
3164    }
3165
3166    /// DropOldest eviction of a Call envelope must abort the reply obligation
3167    /// instead of leaking it. Before the fix, dropping an evicted Call caused
3168    /// an "OBLIGATION TOKEN LEAKED" panic.
3169    #[test]
3170    fn gen_server_drop_oldest_evicting_call_aborts_obligation() {
3171        init_test("gen_server_drop_oldest_evicting_call_aborts_obligation");
3172
3173        let mut state = RuntimeState::new();
3174        let root = state.create_root_region(Budget::INFINITE);
3175        let cx = Cx::for_testing();
3176        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3177
3178        // Capacity 1 so we can fill it with one message.
3179        let (handle, stored) = scope
3180            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3181            .unwrap();
3182        state.store_spawned_task(handle.task_id(), stored);
3183
3184        // Manually enqueue a Call envelope so it sits in the mailbox.
3185        let (reply_tx, _reply_rx) = session::tracked_oneshot::<u64>();
3186        let reply_permit = reply_tx.reserve(&cx);
3187        let call_envelope: Envelope<DropOldestCounter> = Envelope::Call {
3188            request: CounterCall::Get,
3189            reply_permit,
3190        };
3191        handle.sender.try_send(call_envelope).unwrap();
3192
3193        // Now try_cast with DropOldest should evict the Call and abort its
3194        // reply obligation cleanly — no panic.
3195        handle.try_cast(TaggedCast::Set(99)).unwrap();
3196
3197        crate::test_complete!("gen_server_drop_oldest_evicting_call_aborts_obligation");
3198    }
3199
3200    #[test]
3201    fn gen_server_default_overflow_policy_is_reject() {
3202        init_test("gen_server_default_overflow_policy_is_reject");
3203
3204        assert_eq!(CastOverflowPolicy::default(), CastOverflowPolicy::Reject);
3205
3206        // Verify Counter (which doesn't override) uses Reject
3207        let counter = Counter { count: 0 };
3208        assert_eq!(counter.cast_overflow_policy(), CastOverflowPolicy::Reject);
3209
3210        crate::test_complete!("gen_server_default_overflow_policy_is_reject");
3211    }
3212
3213    #[test]
3214    fn reply_debug_format() {
3215        init_test("reply_debug_format");
3216
3217        let cx = Cx::for_testing();
3218        let (tx, _rx) = session::tracked_oneshot::<u64>();
3219        let permit = tx.reserve(&cx);
3220        let reply = Reply::new(&cx, permit);
3221        let debug_str = format!("{reply:?}");
3222        assert!(debug_str.contains("Reply"));
3223        assert!(debug_str.contains("pending"));
3224
3225        // Consume the reply to avoid the obligation panic
3226        let _ = reply.send(42);
3227
3228        crate::test_complete!("reply_debug_format");
3229    }
3230
3231    #[test]
3232    fn gen_server_on_start_budget_priority_applied_and_restored() {
3233        init_test("gen_server_on_start_budget_priority_applied_and_restored");
3234
3235        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3236        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3237        let region = runtime.state.create_root_region(budget);
3238        let cx = Cx::for_testing();
3239        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3240
3241        let started_priority = Arc::new(AtomicU8::new(0));
3242        let loop_priority = Arc::new(AtomicU8::new(0));
3243        let server = StartBudgetProbe {
3244            started_priority: Arc::clone(&started_priority),
3245            loop_priority: Arc::clone(&loop_priority),
3246        };
3247
3248        let (handle, stored) = scope
3249            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3250            .unwrap();
3251        let server_task_id = handle.task_id();
3252        runtime.state.store_spawned_task(server_task_id, stored);
3253
3254        let server_ref = handle.server_ref();
3255        let (client, stored_client) = scope
3256            .spawn(&mut runtime.state, &cx, move |cx| async move {
3257                let p = server_ref.call(&cx, CounterCall::Get).await.unwrap();
3258                assert_eq!(p, 10);
3259            })
3260            .unwrap();
3261        let client_task_id = client.task_id();
3262        runtime
3263            .state
3264            .store_spawned_task(client_task_id, stored_client);
3265
3266        runtime
3267            .scheduler
3268            .lock()
3269            .unwrap()
3270            .schedule(server_task_id, 0);
3271        runtime
3272            .scheduler
3273            .lock()
3274            .unwrap()
3275            .schedule(client_task_id, 0);
3276        runtime.run_until_quiescent();
3277
3278        assert_eq!(started_priority.load(Ordering::SeqCst), 200);
3279        assert_eq!(loop_priority.load(Ordering::SeqCst), 10);
3280
3281        drop(handle);
3282        runtime
3283            .scheduler
3284            .lock()
3285            .unwrap()
3286            .schedule(server_task_id, 0);
3287        runtime.run_until_quiescent();
3288
3289        crate::test_complete!("gen_server_on_start_budget_priority_applied_and_restored");
3290    }
3291
3292    #[test]
3293    fn gen_server_on_stop_runs_masked_under_stop() {
3294        init_test("gen_server_on_stop_runs_masked_under_stop");
3295
3296        let budget = Budget::new().with_poll_quota(10_000);
3297        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3298        let region = runtime.state.create_root_region(budget);
3299        let cx = Cx::for_testing();
3300        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3301
3302        let stop_checkpoint_ok = Arc::new(AtomicU8::new(0));
3303        let server = StopMaskProbe {
3304            stop_checkpoint_ok: Arc::clone(&stop_checkpoint_ok),
3305        };
3306
3307        let (handle, stored) = scope
3308            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3309            .unwrap();
3310        let server_task_id = handle.task_id();
3311        runtime.state.store_spawned_task(server_task_id, stored);
3312
3313        // Request stop: sets cancel_requested. on_stop must run masked.
3314        handle.stop();
3315
3316        runtime
3317            .scheduler
3318            .lock()
3319            .unwrap()
3320            .schedule(server_task_id, 0);
3321        runtime.run_until_quiescent();
3322
3323        assert_eq!(stop_checkpoint_ok.load(Ordering::SeqCst), 1);
3324
3325        crate::test_complete!("gen_server_on_stop_runs_masked_under_stop");
3326    }
3327
3328    // ── Cast overflow policy tests (bd-2o5hg) ────────────────────────
3329
3330    /// Verify that DropOldest eviction emits a trace event.
3331    #[test]
3332    fn cast_drop_oldest_emits_trace_on_eviction() {
3333        init_test("cast_drop_oldest_emits_trace_on_eviction");
3334
3335        let budget = Budget::new().with_poll_quota(10_000);
3336        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3337        let region = runtime.state.create_root_region(budget);
3338        let cx = Cx::for_testing();
3339        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3340
3341        // Capacity=1 so the second cast evicts the first
3342        let (handle, stored) = scope
3343            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3344            .unwrap();
3345        let task_id = handle.task_id();
3346        runtime.state.store_spawned_task(task_id, stored);
3347
3348        // Schedule so Cx::current() is set
3349        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
3350        runtime.run_until_idle();
3351
3352        // First cast fills the mailbox
3353        handle.try_cast(TaggedCast::Set(1)).unwrap();
3354        // Second cast evicts the first
3355        handle.try_cast(TaggedCast::Set(2)).unwrap();
3356
3357        // The eviction trace is emitted via Cx::current() (set during task poll).
3358        // We confirm it succeeded (no panic/error).
3359        crate::test_complete!("cast_drop_oldest_emits_trace_on_eviction");
3360    }
3361
3362    /// Casting to a stopped server returns ServerStopped.
3363    #[test]
3364    fn cast_to_stopped_server_returns_error() {
3365        init_test("cast_to_stopped_server_returns_error");
3366
3367        let mut state = RuntimeState::new();
3368        let root = state.create_root_region(Budget::INFINITE);
3369        let cx = Cx::for_testing();
3370        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3371
3372        let (handle, stored) = scope
3373            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 4)
3374            .unwrap();
3375        state.store_spawned_task(handle.task_id(), stored);
3376
3377        // Stop the server
3378        handle.stop();
3379
3380        // try_cast should return ServerStopped
3381        let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3382        assert!(
3383            matches!(err, CastError::ServerStopped),
3384            "expected ServerStopped, got {err:?}"
3385        );
3386
3387        crate::test_complete!("cast_to_stopped_server_returns_error");
3388    }
3389
3390    /// CastOverflowPolicy Display is correct.
3391    #[test]
3392    fn cast_overflow_policy_display() {
3393        init_test("cast_overflow_policy_display");
3394
3395        assert_eq!(format!("{}", CastOverflowPolicy::Reject), "Reject");
3396        assert_eq!(format!("{}", CastOverflowPolicy::DropOldest), "DropOldest");
3397
3398        crate::test_complete!("cast_overflow_policy_display");
3399    }
3400
3401    /// Reject policy on GenServerRef returns Full when mailbox is full.
3402    #[test]
3403    fn cast_ref_reject_returns_full() {
3404        init_test("cast_ref_reject_returns_full");
3405
3406        let mut state = RuntimeState::new();
3407        let root = state.create_root_region(Budget::INFINITE);
3408        let cx = Cx::for_testing();
3409        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3410
3411        let (handle, stored) = scope
3412            .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3413            .unwrap();
3414        state.store_spawned_task(handle.task_id(), stored);
3415
3416        let server_ref = handle.server_ref();
3417
3418        // Fill the mailbox via server_ref
3419        server_ref.try_cast(CounterCast::Reset).unwrap();
3420        server_ref.try_cast(CounterCast::Reset).unwrap();
3421
3422        // Third should fail with Full
3423        let err = server_ref.try_cast(CounterCast::Reset).unwrap_err();
3424        assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3425
3426        crate::test_complete!("cast_ref_reject_returns_full");
3427    }
3428
3429    /// DropOldest via GenServerRef with capacity=1 evicts correctly.
3430    #[test]
3431    fn cast_drop_oldest_ref_capacity_one() {
3432        init_test("cast_drop_oldest_ref_capacity_one");
3433
3434        let mut state = RuntimeState::new();
3435        let root = state.create_root_region(Budget::INFINITE);
3436        let cx = Cx::for_testing();
3437        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3438
3439        let (handle, stored) = scope
3440            .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3441            .unwrap();
3442        state.store_spawned_task(handle.task_id(), stored);
3443
3444        let server_ref = handle.server_ref();
3445
3446        // Fill with one message
3447        server_ref.try_cast(TaggedCast::Set(100)).unwrap();
3448        // Evict and replace — should succeed with capacity=1
3449        server_ref.try_cast(TaggedCast::Set(200)).unwrap();
3450        server_ref.try_cast(TaggedCast::Set(300)).unwrap();
3451
3452        crate::test_complete!("cast_drop_oldest_ref_capacity_one");
3453    }
3454
3455    // ── Init/Terminate semantics (bd-3ejoi) ──────────────────────────
3456
3457    #[test]
3458    fn init_default_budget_is_infinite() {
3459        init_test("init_default_budget_is_infinite");
3460        let counter = Counter { count: 0 };
3461        assert_eq!(counter.on_start_budget(), Budget::INFINITE);
3462        crate::test_complete!("init_default_budget_is_infinite");
3463    }
3464
3465    #[test]
3466    fn terminate_default_budget_is_minimal() {
3467        init_test("terminate_default_budget_is_minimal");
3468        let counter = Counter { count: 0 };
3469        assert_eq!(counter.on_stop_budget(), Budget::MINIMAL);
3470        crate::test_complete!("terminate_default_budget_is_minimal");
3471    }
3472
3473    /// If the task is cancelled before init, on_start is skipped but on_stop
3474    /// still runs (deterministic cleanup guarantee).
3475    #[test]
3476    fn init_skipped_when_pre_cancelled_but_stop_runs() {
3477        #[allow(clippy::items_after_statements)]
3478        struct LifecycleProbe {
3479            init_ran: Arc<AtomicU8>,
3480            stop_ran: Arc<AtomicU8>,
3481        }
3482
3483        #[allow(clippy::items_after_statements)]
3484        impl GenServer for LifecycleProbe {
3485            type Call = CounterCall;
3486            type Reply = u64;
3487            type Cast = CounterCast;
3488            type Info = SystemMsg;
3489
3490            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3491                self.init_ran.store(1, Ordering::SeqCst);
3492                Box::pin(async {})
3493            }
3494
3495            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3496                self.stop_ran.store(1, Ordering::SeqCst);
3497                Box::pin(async {})
3498            }
3499
3500            fn handle_call(
3501                &mut self,
3502                _cx: &Cx,
3503                _request: CounterCall,
3504                reply: Reply<u64>,
3505            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3506                let _ = reply.send(0);
3507                Box::pin(async {})
3508            }
3509        }
3510
3511        init_test("init_skipped_when_pre_cancelled_but_stop_runs");
3512
3513        let budget = Budget::new().with_poll_quota(10_000);
3514        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3515        let region = runtime.state.create_root_region(budget);
3516        let cx = Cx::for_testing();
3517        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3518
3519        let init_ran = Arc::new(AtomicU8::new(0));
3520        let stop_ran = Arc::new(AtomicU8::new(0));
3521
3522        let server = LifecycleProbe {
3523            init_ran: Arc::clone(&init_ran),
3524            stop_ran: Arc::clone(&stop_ran),
3525        };
3526
3527        let (handle, stored) = scope
3528            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3529            .unwrap();
3530        let server_task_id = handle.task_id();
3531        runtime.state.store_spawned_task(server_task_id, stored);
3532
3533        // Cancel BEFORE scheduling (pre-cancel)
3534        handle.stop();
3535
3536        runtime
3537            .scheduler
3538            .lock()
3539            .unwrap()
3540            .schedule(server_task_id, 0);
3541        runtime.run_until_quiescent();
3542
3543        // Init should be skipped
3544        assert_eq!(
3545            init_ran.load(Ordering::SeqCst),
3546            0,
3547            "init should be skipped when pre-cancelled"
3548        );
3549        // Stop should still run
3550        assert_eq!(
3551            stop_ran.load(Ordering::SeqCst),
3552            1,
3553            "stop must run even when pre-cancelled"
3554        );
3555
3556        crate::test_complete!("init_skipped_when_pre_cancelled_but_stop_runs");
3557    }
3558
3559    /// Verify that budget consumed during on_start is subtracted from the main
3560    /// task budget when the guard restores.
3561    #[test]
3562    fn init_budget_consumption_propagates_to_main_budget() {
3563        #[allow(clippy::items_after_statements)]
3564        struct BudgetCheckProbe {
3565            loop_quota: Arc<AtomicU64>,
3566        }
3567
3568        #[allow(clippy::items_after_statements)]
3569        impl GenServer for BudgetCheckProbe {
3570            type Call = CounterCall;
3571            type Reply = u64;
3572            type Cast = CounterCast;
3573            type Info = SystemMsg;
3574
3575            fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3576                // "Consume" some budget by using polls
3577                // In practice, the poll_quota is decremented by the runtime
3578                // but we can verify the budget baseline is properly set.
3579                let _ = cx.budget();
3580                Box::pin(async {})
3581            }
3582
3583            fn on_start_budget(&self) -> Budget {
3584                // Tight init budget
3585                Budget::new().with_poll_quota(50).with_priority(200)
3586            }
3587
3588            fn handle_call(
3589                &mut self,
3590                cx: &Cx,
3591                _request: CounterCall,
3592                reply: Reply<u64>,
3593            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3594                // After init, check the remaining budget
3595                self.loop_quota
3596                    .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
3597                let _ = reply.send(0);
3598                Box::pin(async {})
3599            }
3600        }
3601
3602        init_test("init_budget_consumption_propagates_to_main_budget");
3603
3604        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3605        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3606        let region = runtime.state.create_root_region(budget);
3607        let cx = Cx::for_testing();
3608        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3609
3610        let loop_quota = Arc::new(AtomicU64::new(0));
3611
3612        let server = BudgetCheckProbe {
3613            loop_quota: Arc::clone(&loop_quota),
3614        };
3615
3616        let (handle, stored) = scope
3617            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3618            .unwrap();
3619        let server_task_id = handle.task_id();
3620        runtime.state.store_spawned_task(server_task_id, stored);
3621
3622        let server_ref = handle.server_ref();
3623        let (client, stored_client) = scope
3624            .spawn(&mut runtime.state, &cx, move |cx| async move {
3625                let _ = server_ref.call(&cx, CounterCall::Get).await;
3626            })
3627            .unwrap();
3628        let client_task_id = client.task_id();
3629        runtime
3630            .state
3631            .store_spawned_task(client_task_id, stored_client);
3632
3633        runtime
3634            .scheduler
3635            .lock()
3636            .unwrap()
3637            .schedule(server_task_id, 0);
3638        runtime
3639            .scheduler
3640            .lock()
3641            .unwrap()
3642            .schedule(client_task_id, 0);
3643        runtime.run_until_quiescent();
3644
3645        // After init, the main budget should have the original quota minus
3646        // whatever was consumed during init. It should be <= 10_000.
3647        let remaining = loop_quota.load(Ordering::SeqCst);
3648        assert!(
3649            remaining <= 10_000,
3650            "main budget after init must be <= original ({remaining} <= 10000)"
3651        );
3652        assert!(
3653            remaining > 0,
3654            "main budget should still have polls remaining"
3655        );
3656
3657        drop(handle);
3658        runtime
3659            .scheduler
3660            .lock()
3661            .unwrap()
3662            .schedule(server_task_id, 0);
3663        runtime.run_until_quiescent();
3664
3665        crate::test_complete!("init_budget_consumption_propagates_to_main_budget");
3666    }
3667
3668    /// Verify on_stop_budget tightens the budget during the stop phase.
3669    #[test]
3670    fn stop_budget_constrains_stop_phase() {
3671        struct StopBudgetProbe {
3672            stop_poll_quota: Arc<AtomicU64>,
3673        }
3674
3675        impl GenServer for StopBudgetProbe {
3676            type Call = CounterCall;
3677            type Reply = u64;
3678            type Cast = CounterCast;
3679            type Info = SystemMsg;
3680
3681            fn on_stop_budget(&self) -> Budget {
3682                Budget::new().with_poll_quota(42).with_priority(250)
3683            }
3684
3685            fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3686                self.stop_poll_quota
3687                    .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
3688                Box::pin(async {})
3689            }
3690
3691            fn handle_call(
3692                &mut self,
3693                _cx: &Cx,
3694                _request: CounterCall,
3695                reply: Reply<u64>,
3696            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3697                let _ = reply.send(0);
3698                Box::pin(async {})
3699            }
3700        }
3701
3702        init_test("stop_budget_constrains_stop_phase");
3703
3704        let budget = Budget::new().with_poll_quota(10_000);
3705        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3706        let region = runtime.state.create_root_region(budget);
3707        let cx = Cx::for_testing();
3708        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3709
3710        let stop_poll_quota = Arc::new(AtomicU64::new(0));
3711
3712        let server = StopBudgetProbe {
3713            stop_poll_quota: Arc::clone(&stop_poll_quota),
3714        };
3715
3716        let (handle, stored) = scope
3717            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3718            .unwrap();
3719        let server_task_id = handle.task_id();
3720        runtime.state.store_spawned_task(server_task_id, stored);
3721
3722        // Trigger stop
3723        handle.stop();
3724
3725        runtime
3726            .scheduler
3727            .lock()
3728            .unwrap()
3729            .schedule(server_task_id, 0);
3730        runtime.run_until_quiescent();
3731
3732        let stop_quota = stop_poll_quota.load(Ordering::SeqCst);
3733        // The stop budget is meet(original, on_stop_budget), so
3734        // poll_quota should be min(10_000, 42) = 42
3735        assert_eq!(stop_quota, 42, "stop phase should use the tighter budget");
3736
3737        crate::test_complete!("stop_budget_constrains_stop_phase");
3738    }
3739
3740    /// Verify that init runs before stop, and stop always runs even on
3741    /// immediate shutdown.
3742    #[test]
3743    fn lifecycle_init_before_stop() {
3744        #[allow(clippy::items_after_statements)]
3745        struct PhaseTracker {
3746            phases: Arc<Mutex<Vec<&'static str>>>,
3747        }
3748
3749        #[allow(clippy::items_after_statements)]
3750        impl GenServer for PhaseTracker {
3751            type Call = CounterCall;
3752            type Reply = u64;
3753            type Cast = CounterCast;
3754            type Info = SystemMsg;
3755
3756            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3757                self.phases.lock().unwrap().push("init");
3758                Box::pin(async {})
3759            }
3760
3761            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3762                self.phases.lock().unwrap().push("stop");
3763                Box::pin(async {})
3764            }
3765
3766            fn handle_call(
3767                &mut self,
3768                _cx: &Cx,
3769                _request: CounterCall,
3770                reply: Reply<u64>,
3771            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3772                let _ = reply.send(0);
3773                Box::pin(async {})
3774            }
3775
3776            fn handle_cast(
3777                &mut self,
3778                _cx: &Cx,
3779                _msg: CounterCast,
3780            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3781                Box::pin(async {})
3782            }
3783        }
3784
3785        init_test("lifecycle_init_before_stop");
3786
3787        let budget = Budget::new().with_poll_quota(10_000);
3788        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3789        let region = runtime.state.create_root_region(budget);
3790        let cx = Cx::for_testing();
3791        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3792
3793        let phases = Arc::new(Mutex::new(Vec::<&'static str>::new()));
3794
3795        let server = PhaseTracker {
3796            phases: Arc::clone(&phases),
3797        };
3798
3799        let (handle, stored) = scope
3800            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3801            .unwrap();
3802        let server_task_id = handle.task_id();
3803        runtime.state.store_spawned_task(server_task_id, stored);
3804
3805        // Schedule the server so init runs, then idle on recv
3806        runtime
3807            .scheduler
3808            .lock()
3809            .unwrap()
3810            .schedule(server_task_id, 0);
3811        runtime.run_until_idle();
3812
3813        // Stop the server and reschedule so on_stop runs
3814        let phases_clone = Arc::clone(&phases);
3815        handle.stop();
3816        runtime
3817            .scheduler
3818            .lock()
3819            .unwrap()
3820            .schedule(server_task_id, 0);
3821        runtime.run_until_idle();
3822
3823        {
3824            let recorded = phases_clone.lock().unwrap();
3825
3826            // Stop must always run
3827            assert!(
3828                recorded.contains(&"stop"),
3829                "stop phase must run, got {:?}",
3830                *recorded
3831            );
3832
3833            // If init ran, it must precede stop
3834            if let Some(init_pos) = recorded.iter().position(|p| *p == "init") {
3835                let stop_pos = recorded.iter().position(|p| *p == "stop").unwrap();
3836                assert!(
3837                    init_pos < stop_pos,
3838                    "init must precede stop, got {:?}",
3839                    *recorded
3840                );
3841            }
3842
3843            drop(recorded);
3844        }
3845
3846        crate::test_complete!("lifecycle_init_before_stop");
3847    }
3848
3849    /// Verify that on_stop_budget with a custom priority overrides the
3850    /// budget priority during the stop phase.
3851    #[test]
3852    fn stop_budget_priority_applied() {
3853        #[allow(clippy::items_after_statements)]
3854        struct StopPriorityProbe {
3855            stop_priority: Arc<AtomicU8>,
3856        }
3857
3858        #[allow(clippy::items_after_statements)]
3859        impl GenServer for StopPriorityProbe {
3860            type Call = CounterCall;
3861            type Reply = u64;
3862            type Cast = CounterCast;
3863            type Info = SystemMsg;
3864
3865            fn on_stop_budget(&self) -> Budget {
3866                Budget::new().with_poll_quota(200).with_priority(240)
3867            }
3868
3869            fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3870                self.stop_priority
3871                    .store(cx.budget().priority, Ordering::SeqCst);
3872                Box::pin(async {})
3873            }
3874
3875            fn handle_call(
3876                &mut self,
3877                _cx: &Cx,
3878                _request: CounterCall,
3879                reply: Reply<u64>,
3880            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3881                let _ = reply.send(0);
3882                Box::pin(async {})
3883            }
3884        }
3885
3886        init_test("stop_budget_priority_applied");
3887
3888        let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3889        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3890        let region = runtime.state.create_root_region(budget);
3891        let cx = Cx::for_testing();
3892        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3893
3894        let stop_priority = Arc::new(AtomicU8::new(0));
3895
3896        let server = StopPriorityProbe {
3897            stop_priority: Arc::clone(&stop_priority),
3898        };
3899
3900        let (handle, stored) = scope
3901            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3902            .unwrap();
3903        let server_task_id = handle.task_id();
3904        runtime.state.store_spawned_task(server_task_id, stored);
3905
3906        handle.stop();
3907
3908        runtime
3909            .scheduler
3910            .lock()
3911            .unwrap()
3912            .schedule(server_task_id, 0);
3913        runtime.run_until_quiescent();
3914
3915        // priority = max(original=10, stop_budget=240) after meet
3916        // meet takes min for quotas but max for priority
3917        let actual_priority = stop_priority.load(Ordering::SeqCst);
3918        assert!(
3919            actual_priority >= 10,
3920            "stop priority should be at least original ({actual_priority} >= 10)"
3921        );
3922
3923        crate::test_complete!("stop_budget_priority_applied");
3924    }
3925
3926    // ========================================================================
3927    // Conformance + Lab Tests (bd-l6b71)
3928    //
3929    // These tests verify the GenServer conformance suite:
3930    //   - reply linearity (obligation enforcement)
3931    //   - cancel propagation through call/cast
3932    //   - mailbox overflow determinism
3933    //   - full lifecycle with no obligation leaks
3934    //   - deterministic replay (same seed = same outcome)
3935    // ========================================================================
3936
3937    /// Multiple queued calls all receive `Cancelled` when the server's region
3938    /// is cancelled. Verifies cancel propagation to pending call waiters.
3939    #[test]
3940    fn conformance_cancel_propagation_to_queued_calls() {
3941        init_test("conformance_cancel_propagation_to_queued_calls");
3942
3943        let budget = Budget::new().with_poll_quota(50_000);
3944        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3945        let region = runtime.state.create_root_region(budget);
3946        let cx = Cx::for_testing();
3947        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3948
3949        // Capacity-1 mailbox: second call will block waiting for capacity.
3950        let (handle, stored) = scope
3951            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
3952            .unwrap();
3953        let server_task_id = handle.task_id();
3954        runtime.state.store_spawned_task(server_task_id, stored);
3955
3956        let server_ref_1 = handle.server_ref();
3957        let server_ref_2 = handle.server_ref();
3958
3959        // Client 1: sends a call that the server will process.
3960        let result_1: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
3961        let result_1_clone = Arc::clone(&result_1);
3962        let (c1_handle, c1_stored) = scope
3963            .spawn(&mut runtime.state, &cx, move |cx| async move {
3964                let r = server_ref_1.call(&cx, CounterCall::Add(10)).await;
3965                *result_1_clone.lock().unwrap() = Some(r);
3966            })
3967            .unwrap();
3968        let c1_id = c1_handle.task_id();
3969        runtime.state.store_spawned_task(c1_id, c1_stored);
3970
3971        // Client 2: sends a call that will queue behind client 1.
3972        let result_2: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
3973        let result_2_clone = Arc::clone(&result_2);
3974        let (c2_handle, c2_stored) = scope
3975            .spawn(&mut runtime.state, &cx, move |cx| async move {
3976                let r = server_ref_2.call(&cx, CounterCall::Add(20)).await;
3977                *result_2_clone.lock().unwrap() = Some(r);
3978            })
3979            .unwrap();
3980        let c2_id = c2_handle.task_id();
3981        runtime.state.store_spawned_task(c2_id, c2_stored);
3982
3983        // Schedule server + clients, let them make progress.
3984        {
3985            let mut sched = runtime.scheduler.lock().unwrap();
3986            sched.schedule(server_task_id, 0);
3987            sched.schedule(c1_id, 0);
3988            sched.schedule(c2_id, 0);
3989        }
3990        runtime.run_until_idle();
3991
3992        // Stop the server (triggers cancellation of pending calls).
3993        handle.stop();
3994        {
3995            let mut sched = runtime.scheduler.lock().unwrap();
3996            sched.schedule(server_task_id, 0);
3997            sched.schedule(c1_id, 0);
3998            sched.schedule(c2_id, 0);
3999        }
4000        runtime.run_until_quiescent();
4001
4002        // At least one client should have seen an error (ServerStopped or Cancelled)
4003        // because the server shut down. The first call may have succeeded before stop.
4004        // All outcomes are acceptable: Ok (processed before stop), ServerStopped,
4005        // Cancelled, or NoReply.
4006        drop(result_2.lock().unwrap());
4007
4008        crate::test_complete!("conformance_cancel_propagation_to_queued_calls");
4009    }
4010
4011    /// After stop(), new calls and casts are rejected immediately.
4012    #[test]
4013    fn conformance_stopped_server_rejects_new_messages() {
4014        init_test("conformance_stopped_server_rejects_new_messages");
4015
4016        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4017        let region = runtime.state.create_root_region(Budget::INFINITE);
4018        let cx = Cx::for_testing();
4019        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4020
4021        let (handle, stored) = scope
4022            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4023            .unwrap();
4024        let server_task_id = handle.task_id();
4025        runtime.state.store_spawned_task(server_task_id, stored);
4026
4027        let server_ref = handle.server_ref();
4028
4029        // Start the server so init runs.
4030        runtime
4031            .scheduler
4032            .lock()
4033            .unwrap()
4034            .schedule(server_task_id, 0);
4035        runtime.run_until_idle();
4036
4037        // Stop the server and drain.
4038        handle.stop();
4039        runtime
4040            .scheduler
4041            .lock()
4042            .unwrap()
4043            .schedule(server_task_id, 0);
4044        runtime.run_until_quiescent();
4045
4046        // try_cast to a stopped server should fail.
4047        let cast_result = server_ref.try_cast(CounterCast::Reset);
4048        assert!(cast_result.is_err(), "cast to stopped server must fail");
4049
4050        crate::test_complete!("conformance_stopped_server_rejects_new_messages");
4051    }
4052
4053    /// Full lifecycle test: start, send calls+casts, stop, verify no leaked
4054    /// obligations or unprocessed messages. This exercises the complete
4055    /// GenServer protocol end-to-end.
4056    #[test]
4057    fn conformance_full_lifecycle_no_obligation_leaks() {
4058        init_test("conformance_full_lifecycle_no_obligation_leaks");
4059
4060        let budget = Budget::new().with_poll_quota(100_000);
4061        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4062        let region = runtime.state.create_root_region(budget);
4063        let cx = Cx::for_testing();
4064        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4065
4066        let (handle, stored) = scope
4067            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4068            .unwrap();
4069        let server_task_id = handle.task_id();
4070        runtime.state.store_spawned_task(server_task_id, stored);
4071
4072        let server_ref = handle.server_ref();
4073
4074        // Phase 1: Fire off a mix of casts and then a call.
4075        server_ref.try_cast(CounterCast::Reset).unwrap();
4076
4077        let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4078        let call_result_clone = Arc::clone(&call_result);
4079        let server_ref_for_call = handle.server_ref();
4080        let (client, client_stored) = scope
4081            .spawn(&mut runtime.state, &cx, move |cx| async move {
4082                let r = server_ref_for_call.call(&cx, CounterCall::Add(42)).await;
4083                *call_result_clone.lock().unwrap() = Some(r);
4084            })
4085            .unwrap();
4086        let client_id = client.task_id();
4087        runtime.state.store_spawned_task(client_id, client_stored);
4088
4089        // Schedule both and let them process.
4090        {
4091            let mut sched = runtime.scheduler.lock().unwrap();
4092            sched.schedule(server_task_id, 0);
4093            sched.schedule(client_id, 0);
4094        }
4095        runtime.run_until_idle();
4096
4097        // Re-schedule for message processing.
4098        {
4099            let mut sched = runtime.scheduler.lock().unwrap();
4100            sched.schedule(server_task_id, 0);
4101            sched.schedule(client_id, 0);
4102        }
4103        runtime.run_until_idle();
4104
4105        // Phase 2: Verify the call result.
4106        let call_r = call_result.lock().unwrap();
4107        if let Some(ref r) = *call_r {
4108            match r {
4109                Ok(value) => assert_eq!(*value, 42, "counter should be 42 after Add(42)"),
4110                Err(e) => unreachable!("unexpected call error: {e:?}"),
4111            }
4112        }
4113        drop(call_r);
4114
4115        // Phase 3: More casts to exercise the mailbox.
4116        server_ref.try_cast(CounterCast::Reset).unwrap();
4117
4118        // Phase 4: Graceful stop.
4119        handle.stop();
4120        runtime
4121            .scheduler
4122            .lock()
4123            .unwrap()
4124            .schedule(server_task_id, 0);
4125        runtime.run_until_quiescent();
4126
4127        // If we get here without panics, no obligations were leaked.
4128        // TrackedOneshotPermit panics on drop if not consumed.
4129        crate::test_complete!("conformance_full_lifecycle_no_obligation_leaks");
4130    }
4131
4132    /// Deterministic replay: running the same GenServer scenario with the
4133    /// same seed must produce identical state transitions.
4134    #[test]
4135    #[allow(clippy::items_after_statements)]
4136    fn conformance_deterministic_replay_with_seed() {
4137        init_test("conformance_deterministic_replay_with_seed");
4138
4139        fn run_scenario(seed: u64) -> Vec<u64> {
4140            let config = crate::lab::LabConfig::new(seed);
4141            let mut runtime = crate::lab::LabRuntime::new(config);
4142            let budget = Budget::new().with_poll_quota(100_000);
4143            let region = runtime.state.create_root_region(budget);
4144            let cx = Cx::for_testing();
4145            let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4146
4147            let (handle, stored) = scope
4148                .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4149                .unwrap();
4150            let server_task_id = handle.task_id();
4151            runtime.state.store_spawned_task(server_task_id, stored);
4152
4153            let results: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
4154
4155            // Spawn 3 clients that each Add different amounts.
4156            let mut client_ids = Vec::new();
4157            for i in 1..=3u64 {
4158                let server_ref = handle.server_ref();
4159                let results_clone = Arc::clone(&results);
4160                let (ch, cs) = scope
4161                    .spawn(&mut runtime.state, &cx, move |cx| async move {
4162                        if let Ok(val) = server_ref.call(&cx, CounterCall::Add(i * 10)).await {
4163                            results_clone.lock().unwrap().push(val);
4164                        }
4165                    })
4166                    .unwrap();
4167                let cid = ch.task_id();
4168                runtime.state.store_spawned_task(cid, cs);
4169                client_ids.push(cid);
4170            }
4171
4172            // Schedule all tasks.
4173            {
4174                let mut sched = runtime.scheduler.lock().unwrap();
4175                sched.schedule(server_task_id, 0);
4176                for &cid in &client_ids {
4177                    sched.schedule(cid, 0);
4178                }
4179            }
4180            runtime.run_until_idle();
4181
4182            // Re-schedule to process enqueued calls.
4183            {
4184                let mut sched = runtime.scheduler.lock().unwrap();
4185                sched.schedule(server_task_id, 0);
4186                for &cid in &client_ids {
4187                    sched.schedule(cid, 0);
4188                }
4189            }
4190            runtime.run_until_idle();
4191
4192            // Stop and drain.
4193            handle.stop();
4194            runtime
4195                .scheduler
4196                .lock()
4197                .unwrap()
4198                .schedule(server_task_id, 0);
4199            runtime.run_until_quiescent();
4200
4201            let r = results.lock().unwrap().clone();
4202            r
4203        }
4204
4205        // Same seed must produce identical results.
4206        let run_a = run_scenario(42);
4207        let run_b = run_scenario(42);
4208        assert_eq!(
4209            run_a, run_b,
4210            "same seed must produce identical results: {run_a:?} vs {run_b:?}"
4211        );
4212
4213        crate::test_complete!("conformance_deterministic_replay_with_seed");
4214    }
4215
4216    /// Mailbox overflow with Reject policy: deterministic rejection when full.
4217    #[test]
4218    fn conformance_mailbox_overflow_reject_deterministic() {
4219        init_test("conformance_mailbox_overflow_reject_deterministic");
4220
4221        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4222        let region = runtime.state.create_root_region(Budget::INFINITE);
4223        let cx = Cx::for_testing();
4224        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4225
4226        // Capacity-2 mailbox with default Reject policy.
4227        let (handle, stored) = scope
4228            .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 2)
4229            .unwrap();
4230        let server_task_id = handle.task_id();
4231        runtime.state.store_spawned_task(server_task_id, stored);
4232
4233        let server_ref = handle.server_ref();
4234
4235        // Fill the mailbox to capacity.
4236        server_ref.try_cast(CounterCast::Reset).unwrap();
4237        server_ref.try_cast(CounterCast::Reset).unwrap();
4238
4239        // Third cast must be rejected (mailbox full, Reject policy).
4240        let overflow = server_ref.try_cast(CounterCast::Reset);
4241        assert!(
4242            overflow.is_err(),
4243            "third cast to capacity-2 mailbox must fail with Reject policy"
4244        );
4245        match overflow.unwrap_err() {
4246            CastError::Full => { /* expected */ }
4247            other => unreachable!("expected CastError::Full, got {other:?}"),
4248        }
4249
4250        // Drain and cleanup.
4251        drop(handle);
4252        runtime
4253            .scheduler
4254            .lock()
4255            .unwrap()
4256            .schedule(server_task_id, 0);
4257        runtime.run_until_quiescent();
4258
4259        crate::test_complete!("conformance_mailbox_overflow_reject_deterministic");
4260    }
4261
4262    /// DropOldest eviction preserves the newest messages when full.
4263    #[test]
4264    fn conformance_mailbox_drop_oldest_preserves_newest() {
4265        init_test("conformance_mailbox_drop_oldest_preserves_newest");
4266
4267        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4268        let region = runtime.state.create_root_region(Budget::INFINITE);
4269        let cx = Cx::for_testing();
4270        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4271
4272        // DropOldest counter with capacity 2.
4273        let (handle, stored) = scope
4274            .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 2)
4275            .unwrap();
4276        let server_task_id = handle.task_id();
4277        runtime.state.store_spawned_task(server_task_id, stored);
4278
4279        let server_ref = handle.server_ref();
4280
4281        // Fill mailbox with Set(1) and Set(2).
4282        server_ref.try_cast(TaggedCast::Set(1)).unwrap();
4283        server_ref.try_cast(TaggedCast::Set(2)).unwrap();
4284
4285        // Overflow with Set(100) — should evict Set(1), keeping Set(2) and Set(100).
4286        server_ref.try_cast(TaggedCast::Set(100)).unwrap();
4287
4288        // Process all messages.
4289        runtime
4290            .scheduler
4291            .lock()
4292            .unwrap()
4293            .schedule(server_task_id, 0);
4294        runtime.run_until_idle();
4295
4296        // The final value should be 100 (last Set wins).
4297        let result_ref = handle.server_ref();
4298        let result: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
4299        let result_clone = Arc::clone(&result);
4300        let (ch, cs) = scope
4301            .spawn(&mut runtime.state, &cx, move |cx| async move {
4302                if let Ok(val) = result_ref.call(&cx, CounterCall::Get).await {
4303                    *result_clone.lock().unwrap() = Some(val);
4304                }
4305            })
4306            .unwrap();
4307        let cid = ch.task_id();
4308        runtime.state.store_spawned_task(cid, cs);
4309
4310        {
4311            let mut sched = runtime.scheduler.lock().unwrap();
4312            sched.schedule(server_task_id, 0);
4313            sched.schedule(cid, 0);
4314        }
4315        runtime.run_until_idle();
4316        {
4317            let mut sched = runtime.scheduler.lock().unwrap();
4318            sched.schedule(server_task_id, 0);
4319            sched.schedule(cid, 0);
4320        }
4321        runtime.run_until_idle();
4322
4323        // The server should have processed Set(2), then Set(100).
4324        // Set(1) was evicted. Final count = 100.
4325        assert_eq!(
4326            *result.lock().unwrap(),
4327            Some(100),
4328            "DropOldest should evict oldest, keeping newest"
4329        );
4330
4331        drop(handle);
4332        runtime
4333            .scheduler
4334            .lock()
4335            .unwrap()
4336            .schedule(server_task_id, 0);
4337        runtime.run_until_quiescent();
4338
4339        crate::test_complete!("conformance_mailbox_drop_oldest_preserves_newest");
4340    }
4341
4342    /// Budget-driven timeout: a call with a tight poll_quota budget
4343    /// must terminate deterministically without wall-clock dependence.
4344    #[test]
4345    #[allow(clippy::items_after_statements)]
4346    fn conformance_budget_driven_call_timeout() {
4347        // Server that never replies (intentionally leaves reply unconsumed
4348        // by aborting it, which is the correct way to not reply).
4349        struct SlowServer;
4350        impl GenServer for SlowServer {
4351            type Call = ();
4352            type Reply = ();
4353            type Cast = ();
4354            type Info = SystemMsg;
4355
4356            fn handle_call(
4357                &mut self,
4358                _cx: &Cx,
4359                _request: (),
4360                reply: Reply<()>,
4361            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4362                // Abort the reply obligation (correct: no leak).
4363                let _proof = reply.abort();
4364                Box::pin(async {})
4365            }
4366        }
4367
4368        init_test("conformance_budget_driven_call_timeout");
4369
4370        let budget = Budget::new().with_poll_quota(100_000);
4371        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4372        let region = runtime.state.create_root_region(budget);
4373        let cx = Cx::for_testing();
4374        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4375
4376        let (handle, stored) = scope
4377            .spawn_gen_server(&mut runtime.state, &cx, SlowServer, 32)
4378            .unwrap();
4379        let server_task_id = handle.task_id();
4380        runtime.state.store_spawned_task(server_task_id, stored);
4381
4382        let server_ref = handle.server_ref();
4383
4384        // Client calls the server. The server aborts the reply, so the
4385        // client should see a channel close / error.
4386        let call_result: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
4387        let call_result_clone = Arc::clone(&call_result);
4388        let (ch, cs) = scope
4389            .spawn(&mut runtime.state, &cx, move |cx| async move {
4390                let r = server_ref.call(&cx, ()).await;
4391                *call_result_clone.lock().unwrap() = Some(r);
4392            })
4393            .unwrap();
4394        let client_id = ch.task_id();
4395        runtime.state.store_spawned_task(client_id, cs);
4396
4397        // Run everything.
4398        {
4399            let mut sched = runtime.scheduler.lock().unwrap();
4400            sched.schedule(server_task_id, 0);
4401            sched.schedule(client_id, 0);
4402        }
4403        runtime.run_until_idle();
4404        {
4405            let mut sched = runtime.scheduler.lock().unwrap();
4406            sched.schedule(server_task_id, 0);
4407            sched.schedule(client_id, 0);
4408        }
4409        runtime.run_until_idle();
4410
4411        // The client should have received an error since the server aborted.
4412        if let Some(ref result) = *call_result.lock().unwrap() {
4413            assert!(result.is_err(), "aborted reply should result in call error");
4414        }
4415
4416        // Clean up.
4417        handle.stop();
4418        runtime
4419            .scheduler
4420            .lock()
4421            .unwrap()
4422            .schedule(server_task_id, 0);
4423        runtime.run_until_quiescent();
4424
4425        crate::test_complete!("conformance_budget_driven_call_timeout");
4426    }
4427
4428    /// Reply linearity: verify that Reply::send commits the obligation
4429    /// and the committed proof is returned.
4430    #[test]
4431    #[allow(clippy::items_after_statements)]
4432    fn conformance_reply_linearity_send_commits() {
4433        // Server that tracks whether reply was committed.
4434        struct ReplyTracker {
4435            committed: Arc<AtomicU8>,
4436        }
4437
4438        impl GenServer for ReplyTracker {
4439            type Call = u64;
4440            type Reply = u64;
4441            type Cast = ();
4442            type Info = SystemMsg;
4443
4444            fn handle_call(
4445                &mut self,
4446                _cx: &Cx,
4447                request: u64,
4448                reply: Reply<u64>,
4449            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4450                match reply.send(request * 2) {
4451                    ReplyOutcome::Committed(_proof) => {
4452                        self.committed.store(1, Ordering::SeqCst);
4453                    }
4454                    ReplyOutcome::CallerGone => {
4455                        self.committed.store(2, Ordering::SeqCst);
4456                    }
4457                }
4458                Box::pin(async {})
4459            }
4460        }
4461
4462        init_test("conformance_reply_linearity_send_commits");
4463
4464        let budget = Budget::new().with_poll_quota(100_000);
4465        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4466        let region = runtime.state.create_root_region(budget);
4467        let cx = Cx::for_testing();
4468        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4469
4470        let committed = Arc::new(AtomicU8::new(0));
4471        let server = ReplyTracker {
4472            committed: Arc::clone(&committed),
4473        };
4474
4475        let (handle, stored) = scope
4476            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4477            .unwrap();
4478        let server_task_id = handle.task_id();
4479        runtime.state.store_spawned_task(server_task_id, stored);
4480
4481        let server_ref = handle.server_ref();
4482        let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4483        let call_result_clone = Arc::clone(&call_result);
4484
4485        let (ch, cs) = scope
4486            .spawn(&mut runtime.state, &cx, move |cx| async move {
4487                let r = server_ref.call(&cx, 21).await;
4488                *call_result_clone.lock().unwrap() = Some(r);
4489            })
4490            .unwrap();
4491        let client_id = ch.task_id();
4492        runtime.state.store_spawned_task(client_id, cs);
4493
4494        {
4495            let mut sched = runtime.scheduler.lock().unwrap();
4496            sched.schedule(server_task_id, 0);
4497            sched.schedule(client_id, 0);
4498        }
4499        runtime.run_until_idle();
4500        {
4501            let mut sched = runtime.scheduler.lock().unwrap();
4502            sched.schedule(server_task_id, 0);
4503            sched.schedule(client_id, 0);
4504        }
4505        runtime.run_until_idle();
4506
4507        // Verify reply was committed (not CallerGone).
4508        assert_eq!(
4509            committed.load(Ordering::SeqCst),
4510            1,
4511            "reply must be committed when caller is waiting"
4512        );
4513
4514        // Verify the caller received the correct value.
4515        {
4516            let r = call_result.lock().unwrap();
4517            match r.as_ref() {
4518                Some(Ok(value)) => assert_eq!(*value, 42, "21 * 2 = 42"),
4519                other => unreachable!("expected Ok(42), got {other:?}"),
4520            }
4521        }
4522
4523        handle.stop();
4524        runtime
4525            .scheduler
4526            .lock()
4527            .unwrap()
4528            .schedule(server_task_id, 0);
4529        runtime.run_until_quiescent();
4530
4531        crate::test_complete!("conformance_reply_linearity_send_commits");
4532    }
4533
4534    /// Reply linearity: verify that Reply::abort produces an AbortedProof
4535    /// and the caller receives an error (not a value).
4536    #[test]
4537    #[allow(clippy::items_after_statements)]
4538    fn conformance_reply_linearity_abort_is_clean() {
4539        struct AbortServer {
4540            aborted: Arc<AtomicU8>,
4541        }
4542
4543        impl GenServer for AbortServer {
4544            type Call = ();
4545            type Reply = ();
4546            type Cast = ();
4547            type Info = SystemMsg;
4548
4549            fn handle_call(
4550                &mut self,
4551                _cx: &Cx,
4552                _request: (),
4553                reply: Reply<()>,
4554            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4555                let _proof = reply.abort();
4556                self.aborted.store(1, Ordering::SeqCst);
4557                Box::pin(async {})
4558            }
4559        }
4560
4561        init_test("conformance_reply_linearity_abort_is_clean");
4562
4563        let budget = Budget::new().with_poll_quota(100_000);
4564        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4565        let region = runtime.state.create_root_region(budget);
4566        let cx = Cx::for_testing();
4567        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4568
4569        let aborted = Arc::new(AtomicU8::new(0));
4570        let server = AbortServer {
4571            aborted: Arc::clone(&aborted),
4572        };
4573
4574        let (handle, stored) = scope
4575            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4576            .unwrap();
4577        let server_task_id = handle.task_id();
4578        runtime.state.store_spawned_task(server_task_id, stored);
4579
4580        let server_ref = handle.server_ref();
4581        let call_err: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
4582        let call_err_clone = Arc::clone(&call_err);
4583
4584        let (ch, cs) = scope
4585            .spawn(&mut runtime.state, &cx, move |cx| async move {
4586                let r = server_ref.call(&cx, ()).await;
4587                *call_err_clone.lock().unwrap() = Some(r);
4588            })
4589            .unwrap();
4590        let client_id = ch.task_id();
4591        runtime.state.store_spawned_task(client_id, cs);
4592
4593        {
4594            let mut sched = runtime.scheduler.lock().unwrap();
4595            sched.schedule(server_task_id, 0);
4596            sched.schedule(client_id, 0);
4597        }
4598        runtime.run_until_idle();
4599        {
4600            let mut sched = runtime.scheduler.lock().unwrap();
4601            sched.schedule(server_task_id, 0);
4602            sched.schedule(client_id, 0);
4603        }
4604        runtime.run_until_idle();
4605
4606        // Server should have aborted.
4607        assert_eq!(
4608            aborted.load(Ordering::SeqCst),
4609            1,
4610            "server must have called abort()"
4611        );
4612
4613        // Caller should see an error, not Ok.
4614        {
4615            let r = call_err.lock().unwrap();
4616            match r.as_ref() {
4617                Some(Err(_)) => { /* expected: aborted reply -> error */ }
4618                other => unreachable!("expected call error after abort, got {other:?}"),
4619            }
4620        }
4621
4622        handle.stop();
4623        runtime
4624            .scheduler
4625            .lock()
4626            .unwrap()
4627            .schedule(server_task_id, 0);
4628        runtime.run_until_quiescent();
4629
4630        crate::test_complete!("conformance_reply_linearity_abort_is_clean");
4631    }
4632
4633    /// On-stop processes remaining casts before completing (drain semantics).
4634    /// Verifies that queued casts are not silently dropped on shutdown.
4635    #[test]
4636    #[allow(clippy::items_after_statements)]
4637    fn conformance_drain_processes_queued_casts_on_stop() {
4638        struct AccumulatorServer {
4639            sum: u64,
4640            final_sum: Arc<AtomicU64>,
4641        }
4642
4643        enum AccumCall {
4644            GetSum,
4645        }
4646        enum AccumCast {
4647            Add(u64),
4648        }
4649
4650        impl GenServer for AccumulatorServer {
4651            type Call = AccumCall;
4652            type Reply = u64;
4653            type Cast = AccumCast;
4654            type Info = SystemMsg;
4655
4656            fn handle_call(
4657                &mut self,
4658                _cx: &Cx,
4659                _request: AccumCall,
4660                reply: Reply<u64>,
4661            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4662                let _ = reply.send(self.sum);
4663                Box::pin(async {})
4664            }
4665
4666            fn handle_cast(
4667                &mut self,
4668                _cx: &Cx,
4669                msg: AccumCast,
4670            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4671                match msg {
4672                    AccumCast::Add(n) => self.sum += n,
4673                }
4674                Box::pin(async {})
4675            }
4676
4677            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4678                self.final_sum.store(self.sum, Ordering::SeqCst);
4679                Box::pin(async {})
4680            }
4681        }
4682
4683        init_test("conformance_drain_processes_queued_casts_on_stop");
4684
4685        let budget = Budget::new().with_poll_quota(100_000);
4686        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4687        let region = runtime.state.create_root_region(budget);
4688        let cx = Cx::for_testing();
4689        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4690
4691        let final_sum = Arc::new(AtomicU64::new(0));
4692        let server = AccumulatorServer {
4693            sum: 0,
4694            final_sum: Arc::clone(&final_sum),
4695        };
4696
4697        let (handle, stored) = scope
4698            .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4699            .unwrap();
4700        let server_task_id = handle.task_id();
4701        runtime.state.store_spawned_task(server_task_id, stored);
4702
4703        let server_ref = handle.server_ref();
4704
4705        // Queue up several casts.
4706        server_ref.try_cast(AccumCast::Add(10)).unwrap();
4707        server_ref.try_cast(AccumCast::Add(20)).unwrap();
4708        server_ref.try_cast(AccumCast::Add(30)).unwrap();
4709
4710        // Start the server (init runs, then it will process casts).
4711        runtime
4712            .scheduler
4713            .lock()
4714            .unwrap()
4715            .schedule(server_task_id, 0);
4716        runtime.run_until_idle();
4717
4718        // Stop and let it drain remaining messages.
4719        handle.stop();
4720        runtime
4721            .scheduler
4722            .lock()
4723            .unwrap()
4724            .schedule(server_task_id, 0);
4725        runtime.run_until_quiescent();
4726
4727        // The server should have processed all casts before stopping.
4728        let sum = final_sum.load(Ordering::SeqCst);
4729        assert_eq!(
4730            sum, 60,
4731            "server must drain queued casts before stopping: 10+20+30=60, got {sum}"
4732        );
4733
4734        crate::test_complete!("conformance_drain_processes_queued_casts_on_stop");
4735    }
4736
4737    // =========================================================================
4738    // Named GenServer integration tests (bd-23az1)
4739    // =========================================================================
4740
4741    /// Named server: spawn registers name, whereis finds it.
4742    #[test]
4743    fn named_server_register_and_whereis() {
4744        crate::test_utils::init_test_logging();
4745        crate::test_phase!("named_server_register_and_whereis");
4746
4747        let budget = Budget::new().with_poll_quota(100_000);
4748        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4749        let region = runtime.state.create_root_region(budget);
4750        let cx = Cx::for_testing();
4751        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4752        let mut registry = crate::cx::NameRegistry::new();
4753
4754        #[allow(clippy::items_after_statements)]
4755        #[derive(Debug)]
4756        struct Counter(u64);
4757
4758        #[allow(clippy::items_after_statements)]
4759        impl GenServer for Counter {
4760            type Call = u64;
4761            type Reply = u64;
4762            type Cast = ();
4763            type Info = SystemMsg;
4764
4765            fn handle_call(
4766                &mut self,
4767                _cx: &Cx,
4768                request: u64,
4769                reply: Reply<u64>,
4770            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4771                self.0 += request;
4772                let _ = reply.send(self.0);
4773                Box::pin(async {})
4774            }
4775
4776            fn handle_cast(
4777                &mut self,
4778                _cx: &Cx,
4779                _msg: (),
4780            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4781                Box::pin(async {})
4782            }
4783        }
4784
4785        let now = crate::types::Time::ZERO;
4786        let (mut named_handle, stored) = scope
4787            .spawn_named_gen_server(
4788                &mut runtime.state,
4789                &cx,
4790                &mut registry,
4791                "my_counter",
4792                Counter(0),
4793                32,
4794                now,
4795            )
4796            .unwrap();
4797
4798        let task_id = named_handle.task_id();
4799        runtime.state.store_spawned_task(task_id, stored);
4800
4801        // Name should be visible via whereis.
4802        assert_eq!(registry.whereis("my_counter"), Some(task_id));
4803        assert_eq!(named_handle.name(), "my_counter");
4804
4805        // Clean up: release lease.
4806        named_handle.stop_and_release().unwrap();
4807
4808        crate::test_complete!("named_server_register_and_whereis");
4809    }
4810
4811    /// Named server: duplicate name is rejected.
4812    #[test]
4813    fn named_server_duplicate_name_rejected() {
4814        crate::test_utils::init_test_logging();
4815        crate::test_phase!("named_server_duplicate_name_rejected");
4816
4817        let budget = Budget::new().with_poll_quota(100_000);
4818        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4819        let region = runtime.state.create_root_region(budget);
4820        let cx = Cx::for_testing();
4821        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4822        let mut registry = crate::cx::NameRegistry::new();
4823
4824        #[allow(clippy::items_after_statements)]
4825        #[derive(Debug)]
4826        struct Dummy;
4827
4828        #[allow(clippy::items_after_statements)]
4829        impl GenServer for Dummy {
4830            type Call = ();
4831            type Reply = ();
4832            type Cast = ();
4833            type Info = SystemMsg;
4834
4835            fn handle_call(
4836                &mut self,
4837                _cx: &Cx,
4838                _request: (),
4839                reply: Reply<()>,
4840            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4841                let _ = reply.send(());
4842                Box::pin(async {})
4843            }
4844
4845            fn handle_cast(
4846                &mut self,
4847                _cx: &Cx,
4848                _msg: (),
4849            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4850                Box::pin(async {})
4851            }
4852        }
4853
4854        let now = crate::types::Time::ZERO;
4855
4856        // First spawn succeeds.
4857        let (mut h1, s1) = scope
4858            .spawn_named_gen_server(
4859                &mut runtime.state,
4860                &cx,
4861                &mut registry,
4862                "singleton",
4863                Dummy,
4864                8,
4865                now,
4866            )
4867            .unwrap();
4868        runtime.state.store_spawned_task(h1.task_id(), s1);
4869
4870        // Second spawn with same name fails.
4871        let result = scope.spawn_named_gen_server(
4872            &mut runtime.state,
4873            &cx,
4874            &mut registry,
4875            "singleton",
4876            Dummy,
4877            8,
4878            now,
4879        );
4880        assert!(
4881            matches!(result, Err(NamedSpawnError::NameTaken(_))),
4882            "duplicate name should be rejected"
4883        );
4884
4885        // Original is still registered.
4886        assert_eq!(registry.whereis("singleton"), Some(h1.task_id()));
4887
4888        // Verify the orphaned task record from the failed spawn was cleaned up.
4889        // The region should only contain the first task; no leaked task record
4890        // that would prevent region quiescence.
4891        let region_tasks = runtime.state.region(region).unwrap().task_ids();
4892        assert_eq!(
4893            region_tasks,
4894            vec![h1.task_id()],
4895            "region should only have the first task; orphaned task must be removed"
4896        );
4897
4898        h1.stop_and_release().unwrap();
4899
4900        crate::test_complete!("named_server_duplicate_name_rejected");
4901    }
4902
4903    /// Named server: abort_lease removes name from registry.
4904    #[test]
4905    fn named_server_abort_lease_removes_name() {
4906        crate::test_utils::init_test_logging();
4907        crate::test_phase!("named_server_abort_lease_removes_name");
4908
4909        let budget = Budget::new().with_poll_quota(100_000);
4910        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4911        let region = runtime.state.create_root_region(budget);
4912        let cx = Cx::for_testing();
4913        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4914        let mut registry = crate::cx::NameRegistry::new();
4915
4916        #[allow(clippy::items_after_statements)]
4917        #[derive(Debug)]
4918        struct Noop;
4919
4920        #[allow(clippy::items_after_statements)]
4921        impl GenServer for Noop {
4922            type Call = ();
4923            type Reply = ();
4924            type Cast = ();
4925            type Info = SystemMsg;
4926
4927            fn handle_call(
4928                &mut self,
4929                _cx: &Cx,
4930                _req: (),
4931                reply: Reply<()>,
4932            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4933                let _ = reply.send(());
4934                Box::pin(async {})
4935            }
4936
4937            fn handle_cast(
4938                &mut self,
4939                _cx: &Cx,
4940                _msg: (),
4941            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4942                Box::pin(async {})
4943            }
4944        }
4945
4946        let now = crate::types::Time::ZERO;
4947        let (mut handle, stored) = scope
4948            .spawn_named_gen_server(
4949                &mut runtime.state,
4950                &cx,
4951                &mut registry,
4952                "temp_name",
4953                Noop,
4954                8,
4955                now,
4956            )
4957            .unwrap();
4958        runtime.state.store_spawned_task(handle.task_id(), stored);
4959
4960        // Name is registered.
4961        assert!(registry.whereis("temp_name").is_some());
4962
4963        // Abort the lease (simulating cancellation).
4964        handle.abort_lease().unwrap();
4965
4966        // Name should no longer be registered after unregister.
4967        // Note: abort resolves the obligation but doesn't auto-unregister;
4968        // the name entry stays until explicitly unregistered.
4969        // The lease obligation is resolved (no drop bomb).
4970
4971        crate::test_complete!("named_server_abort_lease_removes_name");
4972    }
4973
4974    /// Named server: take_lease allows manual lifecycle management.
4975    #[test]
4976    fn named_server_take_lease_manual_management() {
4977        crate::test_utils::init_test_logging();
4978        crate::test_phase!("named_server_take_lease_manual_management");
4979
4980        let budget = Budget::new().with_poll_quota(100_000);
4981        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4982        let region = runtime.state.create_root_region(budget);
4983        let cx = Cx::for_testing();
4984        let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4985        let mut registry = crate::cx::NameRegistry::new();
4986
4987        #[allow(clippy::items_after_statements)]
4988        #[derive(Debug)]
4989        struct Noop2;
4990
4991        #[allow(clippy::items_after_statements)]
4992        impl GenServer for Noop2 {
4993            type Call = ();
4994            type Reply = ();
4995            type Cast = ();
4996            type Info = SystemMsg;
4997
4998            fn handle_call(
4999                &mut self,
5000                _cx: &Cx,
5001                _req: (),
5002                reply: Reply<()>,
5003            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5004                let _ = reply.send(());
5005                Box::pin(async {})
5006            }
5007
5008            fn handle_cast(
5009                &mut self,
5010                _cx: &Cx,
5011                _msg: (),
5012            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5013                Box::pin(async {})
5014            }
5015        }
5016
5017        let now = crate::types::Time::ZERO;
5018        let (mut handle, stored) = scope
5019            .spawn_named_gen_server(
5020                &mut runtime.state,
5021                &cx,
5022                &mut registry,
5023                "manual_name",
5024                Noop2,
5025                8,
5026                now,
5027            )
5028            .unwrap();
5029        runtime.state.store_spawned_task(handle.task_id(), stored);
5030
5031        // Take the lease for manual management.
5032        let mut lease = handle.take_lease().unwrap();
5033        assert!(handle.take_lease().is_none(), "second take returns None");
5034
5035        // name() returns placeholder when lease is taken.
5036        assert_eq!(handle.name(), "(released)");
5037
5038        // Resolve the lease manually.
5039        let _ = lease.abort();
5040
5041        crate::test_complete!("named_server_take_lease_manual_management");
5042    }
5043
5044    #[test]
5045    #[allow(clippy::items_after_statements)]
5046    fn named_start_helper_supervisor_stop_cleans_registry() {
5047        crate::test_utils::init_test_logging();
5048        crate::test_phase!("named_start_helper_supervisor_stop_cleans_registry");
5049
5050        #[derive(Debug)]
5051        struct Noop;
5052
5053        impl GenServer for Noop {
5054            type Call = ();
5055            type Reply = ();
5056            type Cast = ();
5057            type Info = SystemMsg;
5058
5059            fn handle_call(
5060                &mut self,
5061                _cx: &Cx,
5062                _request: (),
5063                reply: Reply<()>,
5064            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5065                let _ = reply.send(());
5066                Box::pin(async {})
5067            }
5068
5069            fn handle_cast(
5070                &mut self,
5071                _cx: &Cx,
5072                _msg: (),
5073            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5074                Box::pin(async {})
5075            }
5076        }
5077
5078        let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
5079            Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
5080
5081        let budget = Budget::new().with_poll_quota(100_000);
5082        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5083        let root = runtime.state.create_root_region(budget);
5084        let cx = Cx::for_testing();
5085
5086        let child = crate::supervision::ChildSpec::new(
5087            "svc_child",
5088            named_gen_server_start(Arc::clone(&registry), "svc", 16, || Noop),
5089        );
5090
5091        let compiled = crate::supervision::SupervisorBuilder::new("svc_supervisor")
5092            .child(child)
5093            .compile()
5094            .expect("compile supervisor");
5095
5096        let supervisor = compiled
5097            .spawn(&mut runtime.state, &cx, root, budget)
5098            .expect("spawn supervisor");
5099
5100        assert_eq!(supervisor.started.len(), 1, "exactly one started child");
5101        let child_task = supervisor.started[0].task_id;
5102        assert_eq!(registry.lock().whereis("svc"), Some(child_task));
5103
5104        // Stop the supervisor region and drive cancellation/finalization.
5105        let tasks_to_schedule = runtime.state.cancel_request(
5106            supervisor.region,
5107            &crate::types::CancelReason::user("stop"),
5108            None,
5109        );
5110        for (task_id, priority) in tasks_to_schedule {
5111            runtime
5112                .scheduler
5113                .lock()
5114                .unwrap()
5115                .schedule(task_id, priority);
5116        }
5117        runtime.run_until_quiescent();
5118
5119        assert!(
5120            registry.lock().whereis("svc").is_none(),
5121            "name must be removed after supervised stop",
5122        );
5123
5124        crate::test_complete!("named_start_helper_supervisor_stop_cleans_registry");
5125    }
5126
5127    #[test]
5128    #[allow(clippy::items_after_statements)]
5129    fn named_start_helper_crash_then_stop_cleans_registry() {
5130        crate::test_utils::init_test_logging();
5131        crate::test_phase!("named_start_helper_crash_then_stop_cleans_registry");
5132
5133        #[derive(Debug)]
5134        struct PanicOnStart;
5135
5136        impl GenServer for PanicOnStart {
5137            type Call = ();
5138            type Reply = ();
5139            type Cast = ();
5140            type Info = SystemMsg;
5141
5142            fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5143                Box::pin(async move {
5144                    std::panic::panic_any("intentional start crash for registry cleanup test");
5145                })
5146            }
5147
5148            fn handle_call(
5149                &mut self,
5150                _cx: &Cx,
5151                _request: (),
5152                reply: Reply<()>,
5153            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5154                let _ = reply.send(());
5155                Box::pin(async {})
5156            }
5157
5158            fn handle_cast(
5159                &mut self,
5160                _cx: &Cx,
5161                _msg: (),
5162            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5163                Box::pin(async {})
5164            }
5165        }
5166
5167        let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
5168            Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
5169
5170        let budget = Budget::new().with_poll_quota(100_000);
5171        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(7));
5172        let root = runtime.state.create_root_region(budget);
5173        let cx = Cx::for_testing();
5174
5175        let child = crate::supervision::ChildSpec::new(
5176            "panic_child",
5177            named_gen_server_start(Arc::clone(&registry), "panic_svc", 8, || PanicOnStart),
5178        );
5179
5180        let compiled = crate::supervision::SupervisorBuilder::new("panic_supervisor")
5181            .child(child)
5182            .compile()
5183            .expect("compile supervisor");
5184
5185        let supervisor = compiled
5186            .spawn(&mut runtime.state, &cx, root, budget)
5187            .expect("spawn supervisor");
5188
5189        let child_task = supervisor.started[0].task_id;
5190        assert_eq!(registry.lock().whereis("panic_svc"), Some(child_task));
5191
5192        // Drive the child once so it crashes in on_start.
5193        runtime.scheduler.lock().unwrap().schedule(child_task, 0);
5194        runtime.run_until_idle();
5195
5196        // Region stop must still clean the registry + resolve the lease.
5197        let tasks_to_schedule = runtime.state.cancel_request(
5198            supervisor.region,
5199            &crate::types::CancelReason::user("shutdown"),
5200            None,
5201        );
5202        for (task_id, priority) in tasks_to_schedule {
5203            runtime
5204                .scheduler
5205                .lock()
5206                .unwrap()
5207                .schedule(task_id, priority);
5208        }
5209        runtime.run_until_quiescent();
5210
5211        assert!(
5212            registry.lock().whereis("panic_svc").is_none(),
5213            "name must be removed after crash + region stop",
5214        );
5215
5216        crate::test_complete!("named_start_helper_crash_then_stop_cleans_registry");
5217    }
5218}