Skip to main content

asupersync/
actor.rs

1//! Actor abstraction for region-owned, message-driven concurrency.
2//!
3//! Actors in Asupersync are region-owned tasks that process messages from a
4//! bounded mailbox. They integrate with the runtime's structured concurrency
5//! model:
6//!
7//! - **Region-owned**: Actors are spawned within a region and cannot outlive it.
8//! - **Cancel-safe mailbox**: Messages use the two-phase reserve/send pattern.
9//! - **Lifecycle hooks**: `on_start` and `on_stop` for initialization and cleanup.
10//!
11//! # Example
12//!
13//! ```ignore
14//! struct Counter {
15//!     count: u64,
16//! }
17//!
18//! impl Actor for Counter {
19//!     type Message = u64;
20//!
21//!     async fn handle(&mut self, _cx: &Cx, msg: u64) {
22//!         self.count += msg;
23//!     }
24//! }
25//!
26//! // In a scope:
27//! let (handle, stored) = scope.spawn_actor(
28//!     &mut state, &cx, Counter { count: 0 }, 32,
29//! )?;
30//! state.store_spawned_task(handle.task_id(), stored);
31//!
32//! // Send messages:
33//! handle.send(&cx, 5).await?;
34//! handle.send(&cx, 10).await?;
35//!
36//! // Stop the actor:
37//! handle.stop();
38//! let result = (&mut handle).join(&cx).await?;
39//! assert_eq!(result.count, 15);
40//! ```
41
42use std::future::Future;
43use std::pin::Pin;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicU8, Ordering};
46
47use crate::channel::mpsc;
48use crate::channel::mpsc::SendError;
49use crate::cx::Cx;
50use crate::runtime::{JoinError, SpawnError};
51use crate::types::{CxInner, Outcome, RegionId, TaskId, Time};
52
53/// Unique identifier for an actor.
54///
55/// For now this is a thin wrapper around the actor task's `TaskId`, which already
56/// provides arena + generation semantics. Keeping a distinct type avoids mixing
57/// actor IDs with generic tasks at call sites.
58#[derive(Clone, Copy, PartialEq, Eq, Hash)]
59pub struct ActorId(TaskId);
60
61impl ActorId {
62    /// Create an actor ID from a task ID.
63    #[must_use]
64    pub const fn from_task(task_id: TaskId) -> Self {
65        Self(task_id)
66    }
67
68    /// Returns the underlying task ID.
69    #[must_use]
70    pub const fn task_id(self) -> TaskId {
71        self.0
72    }
73}
74
75impl std::fmt::Debug for ActorId {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_tuple("ActorId").field(&self.0).finish()
78    }
79}
80
81impl std::fmt::Display for ActorId {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        // Preserve the compact, deterministic formatting of TaskId while keeping
84        // a distinct type at the API level.
85        write!(f, "{}", self.0)
86    }
87}
88
89/// Lifecycle state for an actor.
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ActorState {
92    /// Actor constructed but not yet started.
93    Created,
94    /// Actor is running and processing messages.
95    Running,
96    /// Actor is stopping (cancellation requested / mailbox closed).
97    Stopping,
98    /// Actor has stopped and will not process further messages.
99    Stopped,
100}
101
102#[derive(Debug)]
103struct ActorStateCell {
104    state: AtomicU8,
105}
106
107impl ActorStateCell {
108    fn new(state: ActorState) -> Self {
109        Self {
110            state: AtomicU8::new(Self::encode(state)),
111        }
112    }
113
114    fn load(&self) -> ActorState {
115        Self::decode(self.state.load(Ordering::Acquire))
116    }
117
118    fn store(&self, state: ActorState) {
119        self.state.store(Self::encode(state), Ordering::Release);
120    }
121
122    const fn encode(state: ActorState) -> u8 {
123        match state {
124            ActorState::Created => 0,
125            ActorState::Running => 1,
126            ActorState::Stopping => 2,
127            ActorState::Stopped => 3,
128        }
129    }
130
131    const fn decode(value: u8) -> ActorState {
132        match value {
133            0 => ActorState::Created,
134            1 => ActorState::Running,
135            2 => ActorState::Stopping,
136            _ => ActorState::Stopped,
137        }
138    }
139}
140
141/// Internal runtime state for an actor.
142///
143/// This is intentionally lightweight and non-opinionated; higher-level actor
144/// features (mailbox policies, supervision trees, etc.) can extend this.
145struct ActorCell<M> {
146    mailbox: mpsc::Receiver<M>,
147    state: Arc<ActorStateCell>,
148}
149
150/// A message-driven actor that processes messages from a bounded mailbox.
151///
152/// Actors are the unit of stateful, message-driven concurrency. Each actor:
153/// - Owns mutable state (`self`)
154/// - Receives messages sequentially (no data races)
155/// - Runs inside a region (structured lifetime)
156///
157/// # Cancel Safety
158///
159/// When an actor is cancelled (region close, explicit abort), the runtime:
160/// 1. Closes the mailbox (no new messages accepted)
161/// 2. Calls `on_stop` for cleanup
162/// 3. Returns the actor state to the caller via `ActorHandle::join`
163pub trait Actor: Send + 'static {
164    /// The type of messages this actor can receive.
165    type Message: Send + 'static;
166
167    /// Called once when the actor starts, before processing any messages.
168    ///
169    /// Use this for initialization that requires the capability context.
170    /// The default implementation does nothing.
171    fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
172        Box::pin(async {})
173    }
174
175    /// Handle a single message.
176    ///
177    /// This is called sequentially for each message in the mailbox.
178    /// The actor has exclusive access to its state during handling.
179    fn handle(
180        &mut self,
181        cx: &Cx,
182        msg: Self::Message,
183    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
184
185    /// Called once when the actor is stopping, after the mailbox is drained.
186    ///
187    /// Use this for cleanup. The default implementation does nothing.
188    fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
189        Box::pin(async {})
190    }
191}
192
193/// Handle to a running actor, used to send messages and manage its lifecycle.
194///
195/// The handle owns:
196/// - A sender for the actor's mailbox
197/// - A task handle for join/abort operations
198///
199/// When the handle is dropped, the mailbox sender is dropped, which causes
200/// the actor loop to exit after processing remaining messages.
201#[derive(Debug)]
202pub struct ActorHandle<A: Actor> {
203    actor_id: ActorId,
204    sender: mpsc::Sender<A::Message>,
205    state: Arc<ActorStateCell>,
206    task_id: TaskId,
207    receiver: crate::channel::oneshot::Receiver<Result<A, JoinError>>,
208    inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
209    completed: bool,
210}
211
212impl<A: Actor> ActorHandle<A> {
213    /// Send a message to the actor using two-phase reserve/send.
214    ///
215    /// Returns an error if the actor has stopped or the mailbox is full.
216    pub async fn send(&self, cx: &Cx, msg: A::Message) -> Result<(), SendError<A::Message>> {
217        self.sender.send(cx, msg).await
218    }
219
220    /// Try to send a message without blocking.
221    ///
222    /// Returns `Err(SendError::Full(msg))` if the mailbox is full, or
223    /// `Err(SendError::Disconnected(msg))` if the actor has stopped.
224    pub fn try_send(&self, msg: A::Message) -> Result<(), SendError<A::Message>> {
225        self.sender.try_send(msg)
226    }
227
228    /// Returns a lightweight, clonable reference for sending messages.
229    #[must_use]
230    pub fn sender(&self) -> ActorRef<A::Message> {
231        ActorRef {
232            actor_id: self.actor_id,
233            sender: self.sender.clone(),
234            state: Arc::clone(&self.state),
235        }
236    }
237
238    /// Returns the actor's unique identifier.
239    #[must_use]
240    pub const fn actor_id(&self) -> ActorId {
241        self.actor_id
242    }
243
244    /// Returns the task ID of the actor's underlying task.
245    #[must_use]
246    pub fn task_id(&self) -> crate::types::TaskId {
247        self.task_id
248    }
249
250    /// Signals the actor to stop gracefully.
251    ///
252    /// Sets the actor state to `Stopping` and requests cancellation so the
253    /// actor loop will exit after the current message finishes processing.
254    /// The actor will call `on_stop` before returning.
255    ///
256    /// This is identical to [`abort`](Self::abort) — both request cancellation
257    /// and set the Stopping state. A future improvement could differentiate
258    /// them by having `stop()` drain buffered messages first.
259    pub fn stop(&self) {
260        self.state.store(ActorState::Stopping);
261        self.sender.wake_receiver();
262    }
263
264    /// Returns true if the actor has finished.
265    #[must_use]
266    pub fn is_finished(&self) -> bool {
267        self.completed || self.receiver.is_ready() || self.receiver.is_closed()
268    }
269
270    fn closed_reason(&self) -> crate::types::CancelReason {
271        self.inner
272            .upgrade()
273            .and_then(|inner| inner.read().cancel_reason.clone())
274            .unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
275    }
276
277    /// Wait for the actor to finish and return its final state.
278    ///
279    /// Blocks until the actor loop completes (mailbox closed or cancelled),
280    /// then returns the actor's final state or a join error.
281    pub async fn join(&mut self, cx: &Cx) -> Result<A, JoinError> {
282        if self.completed {
283            return Err(JoinError::PolledAfterCompletion);
284        }
285
286        match self.receiver.recv(cx).await {
287            Ok(result) => {
288                self.completed = true;
289                result
290            }
291            Err(crate::channel::oneshot::RecvError::Closed) => {
292                self.completed = true;
293                Err(JoinError::Cancelled(self.closed_reason()))
294            }
295            Err(crate::channel::oneshot::RecvError::Cancelled) => {
296                let reason = cx
297                    .cancel_reason()
298                    .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
299                Err(JoinError::Cancelled(reason))
300            }
301            Err(crate::channel::oneshot::RecvError::PolledAfterCompletion) => {
302                unreachable!("ActorHandle::join awaits a fresh oneshot recv future")
303            }
304        }
305    }
306
307    /// Request the actor to stop immediately by aborting its task.
308    ///
309    /// Sets `cancel_requested` on the actor's context, causing the actor loop
310    /// to exit at the next cancellation check point. The actor will call
311    /// `on_stop` before returning.
312    pub fn abort(&self) {
313        self.state.store(ActorState::Stopping);
314        if let Some(inner) = self.inner.upgrade() {
315            let mut guard = inner.write();
316            guard.cancel_requested = true;
317            guard
318                .fast_cancel
319                .store(true, std::sync::atomic::Ordering::Release);
320        }
321        self.sender.wake_receiver();
322    }
323}
324
325/// A lightweight, clonable reference to an actor's mailbox.
326///
327/// Use this to send messages to an actor from multiple locations without
328/// needing to share the `ActorHandle`.
329#[derive(Debug)]
330pub struct ActorRef<M> {
331    actor_id: ActorId,
332    sender: mpsc::Sender<M>,
333    state: Arc<ActorStateCell>,
334}
335
336// Manual Clone impl without requiring M: Clone, since all fields are
337// independently clonable (ActorId is Copy, Sender<M> clones without M: Clone,
338// and Arc is always Clone).
339impl<M> Clone for ActorRef<M> {
340    fn clone(&self) -> Self {
341        Self {
342            actor_id: self.actor_id,
343            sender: self.sender.clone(),
344            state: Arc::clone(&self.state),
345        }
346    }
347}
348
349impl<M: Send + 'static> ActorRef<M> {
350    /// Send a message to the actor.
351    pub async fn send(&self, cx: &Cx, msg: M) -> Result<(), SendError<M>> {
352        self.sender.send(cx, msg).await
353    }
354
355    /// Reserve a slot in the mailbox (two-phase send: reserve -> commit).
356    #[must_use]
357    pub fn reserve<'a>(&'a self, cx: &'a Cx) -> mpsc::Reserve<'a, M> {
358        self.sender.reserve(cx)
359    }
360
361    /// Try to send a message without blocking.
362    pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
363        self.sender.try_send(msg)
364    }
365
366    /// Returns true if the actor has stopped (mailbox closed).
367    #[must_use]
368    pub fn is_closed(&self) -> bool {
369        self.sender.is_closed()
370    }
371
372    /// Returns true if the actor is still alive (not fully stopped).
373    ///
374    /// Note: This is best-effort. The definitive shutdown signal is `ActorHandle::join()`.
375    #[must_use]
376    pub fn is_alive(&self) -> bool {
377        self.state.load() != ActorState::Stopped
378    }
379
380    /// Returns the actor's unique identifier.
381    #[must_use]
382    pub const fn actor_id(&self) -> ActorId {
383        self.actor_id
384    }
385}
386
387// ============================================================================
388// ActorContext: Actor-Specific Capability Extension
389// ============================================================================
390
391/// Configuration for actor mailbox.
392#[derive(Debug, Clone, Copy)]
393pub struct MailboxConfig {
394    /// Maximum number of messages the mailbox can hold.
395    pub capacity: usize,
396    /// Whether to use backpressure (block senders) or drop oldest messages.
397    pub backpressure: bool,
398}
399
400impl Default for MailboxConfig {
401    fn default() -> Self {
402        Self {
403            capacity: DEFAULT_MAILBOX_CAPACITY,
404            backpressure: true,
405        }
406    }
407}
408
409impl MailboxConfig {
410    /// Create a mailbox config with the specified capacity.
411    #[must_use]
412    pub const fn with_capacity(capacity: usize) -> Self {
413        Self {
414            capacity,
415            backpressure: true,
416        }
417    }
418}
419
420/// Messages that can be sent to a supervisor about child lifecycle events.
421#[derive(Debug, Clone)]
422pub enum SupervisorMessage {
423    /// A supervised child actor has failed.
424    ChildFailed {
425        /// The ID of the failed child.
426        child_id: ActorId,
427        /// Description of the failure.
428        reason: String,
429    },
430    /// A supervised child actor has stopped normally.
431    ChildStopped {
432        /// The ID of the stopped child.
433        child_id: ActorId,
434    },
435}
436
437/// Actor-specific capability context extending [`Cx`].
438///
439/// Provides actors with access to:
440/// - Self-reference for tell() patterns
441/// - Child management for supervision
442/// - Self-termination controls
443/// - Parent reference for escalation
444///
445/// All [`Cx`] methods are available through [`Deref`].
446///
447/// # Example
448///
449/// ```ignore
450/// async fn handle(&mut self, ctx: &ActorContext<'_, MyMessage>, msg: MyMessage) {
451///     // Access Cx methods directly
452///     if ctx.is_cancel_requested() {
453///         return;
454///     }
455///
456///     // Use actor-specific capabilities
457///     let my_id = ctx.self_actor_id();
458///     ctx.trace("handling message");
459/// }
460/// ```
461pub struct ActorContext<'a, M: Send + 'static> {
462    /// Underlying capability context.
463    cx: &'a Cx,
464    /// Reference to this actor's mailbox sender.
465    self_ref: ActorRef<M>,
466    /// This actor's unique identifier.
467    actor_id: ActorId,
468    /// Parent supervisor reference (None for root actors).
469    parent: Option<ActorRef<SupervisorMessage>>,
470    /// IDs of children currently supervised by this actor.
471    children: Vec<ActorId>,
472    /// Whether this actor has been requested to stop.
473    stopping: bool,
474}
475
476#[allow(clippy::elidable_lifetime_names)]
477impl<'a, M: Send + 'static> ActorContext<'a, M> {
478    /// Create a new actor context.
479    ///
480    /// This is typically called internally by the actor runtime.
481    #[must_use]
482    pub fn new(
483        cx: &'a Cx,
484        self_ref: ActorRef<M>,
485        actor_id: ActorId,
486        parent: Option<ActorRef<SupervisorMessage>>,
487    ) -> Self {
488        Self {
489            cx,
490            self_ref,
491            actor_id,
492            parent,
493            children: Vec::new(),
494            stopping: false,
495        }
496    }
497
498    /// Returns this actor's unique identifier.
499    ///
500    /// Unlike `self_ref()`, this avoids cloning the actor reference and is
501    /// useful for logging, debugging, or identity comparisons.
502    #[must_use]
503    pub const fn self_actor_id(&self) -> ActorId {
504        self.actor_id
505    }
506
507    /// Returns the underlying actor ID (alias for `self_actor_id`).
508    #[must_use]
509    pub const fn actor_id(&self) -> ActorId {
510        self.actor_id
511    }
512
513    // ========================================================================
514    // Child Management Methods
515    // ========================================================================
516
517    /// Register a child actor as supervised by this actor.
518    ///
519    /// Called internally when spawning supervised children.
520    pub fn register_child(&mut self, child_id: ActorId) {
521        self.children.push(child_id);
522    }
523
524    /// Unregister a child actor (after it has stopped).
525    ///
526    /// Returns true if the child was found and removed.
527    pub fn unregister_child(&mut self, child_id: ActorId) -> bool {
528        if let Some(pos) = self.children.iter().position(|&id| id == child_id) {
529            self.children.swap_remove(pos);
530            true
531        } else {
532            false
533        }
534    }
535
536    /// Returns the list of currently supervised child actor IDs.
537    #[must_use]
538    pub fn children(&self) -> &[ActorId] {
539        &self.children
540    }
541
542    /// Returns true if this actor has any supervised children.
543    #[must_use]
544    pub fn has_children(&self) -> bool {
545        !self.children.is_empty()
546    }
547
548    /// Returns the number of supervised children.
549    #[must_use]
550    pub fn child_count(&self) -> usize {
551        self.children.len()
552    }
553
554    // ========================================================================
555    // Self-Termination Methods
556    // ========================================================================
557
558    /// Request this actor to stop gracefully.
559    ///
560    /// Sets the stopping flag. The actor loop will exit after the current
561    /// message is processed and the mailbox is drained.
562    pub fn stop_self(&mut self) {
563        self.stopping = true;
564    }
565
566    /// Returns true if this actor has been requested to stop.
567    #[must_use]
568    pub fn is_stopping(&self) -> bool {
569        self.stopping
570    }
571
572    // ========================================================================
573    // Parent Interaction Methods
574    // ========================================================================
575
576    /// Returns a reference to the parent supervisor, if any.
577    ///
578    /// Root actors spawned without supervision return `None`.
579    #[must_use]
580    pub fn parent(&self) -> Option<&ActorRef<SupervisorMessage>> {
581        self.parent.as_ref()
582    }
583
584    /// Returns true if this actor has a parent supervisor.
585    #[must_use]
586    pub fn has_parent(&self) -> bool {
587        self.parent.is_some()
588    }
589
590    /// Escalate an error to the parent supervisor.
591    ///
592    /// Sends a `SupervisorMessage::ChildFailed` to the parent if one exists.
593    /// Does nothing if this is a root actor.
594    pub async fn escalate(&self, reason: String) {
595        if let Some(parent) = &self.parent {
596            let msg = SupervisorMessage::ChildFailed {
597                child_id: self.actor_id,
598                reason,
599            };
600            // Best-effort: ignore send failures (parent may have stopped)
601            let _ = parent.send(self.cx, msg).await;
602        }
603    }
604
605    // ========================================================================
606    // Cx Delegation Methods
607    // ========================================================================
608
609    /// Check for cancellation and return early if requested.
610    ///
611    /// This is a convenience method that checks both actor stopping
612    /// and Cx cancellation.
613    #[allow(clippy::result_large_err)]
614    pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
615        if self.stopping {
616            let reason = crate::types::CancelReason::user("actor stopping")
617                .with_region(self.cx.region_id())
618                .with_task(self.cx.task_id());
619            return Err(crate::error::Error::cancelled(&reason));
620        }
621        self.cx.checkpoint()
622    }
623
624    /// Returns true if cancellation has been requested.
625    ///
626    /// Checks both actor stopping flag and Cx cancellation.
627    #[must_use]
628    pub fn is_cancel_requested(&self) -> bool {
629        self.stopping || self.cx.is_cancel_requested()
630    }
631
632    /// Returns the current budget.
633    #[must_use]
634    pub fn budget(&self) -> crate::types::Budget {
635        self.cx.budget()
636    }
637
638    /// Returns the deadline from the budget, if set.
639    #[must_use]
640    pub fn deadline(&self) -> Option<Time> {
641        self.cx.budget().deadline
642    }
643
644    /// Emit a trace event.
645    pub fn trace(&self, event: &str) {
646        self.cx.trace(event);
647    }
648
649    /// Returns a clonable reference to this actor's mailbox.
650    ///
651    /// Use this to give other actors a way to send messages to this actor.
652    /// The `ActorRef<M>` type is always Clone regardless of whether M is Clone.
653    #[must_use]
654    pub fn self_ref(&self) -> ActorRef<M> {
655        self.self_ref.clone()
656    }
657
658    /// Returns a reference to the underlying Cx.
659    #[must_use]
660    pub const fn cx(&self) -> &Cx {
661        self.cx
662    }
663}
664
665impl<M: Send + 'static> std::ops::Deref for ActorContext<'_, M> {
666    type Target = Cx;
667
668    fn deref(&self) -> &Self::Target {
669        self.cx
670    }
671}
672
673impl<M: Send + 'static> std::fmt::Debug for ActorContext<'_, M> {
674    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        f.debug_struct("ActorContext")
676            .field("actor_id", &self.actor_id)
677            .field("children", &self.children.len())
678            .field("stopping", &self.stopping)
679            .field("has_parent", &self.parent.is_some())
680            .finish()
681    }
682}
683
684/// The default mailbox capacity for actors.
685pub const DEFAULT_MAILBOX_CAPACITY: usize = 64;
686
687struct OnStopMaskGuard(Arc<parking_lot::RwLock<CxInner>>);
688
689impl Drop for OnStopMaskGuard {
690    fn drop(&mut self) {
691        let mut g = self.0.write();
692        g.mask_depth = g.mask_depth.saturating_sub(1);
693    }
694}
695
696/// Internal: runs the actor message loop.
697///
698/// This function is the core of the actor runtime. It:
699/// 1. Calls `on_start`
700/// 2. Receives and handles messages until the mailbox is closed or cancelled
701/// 3. Drains remaining buffered messages (no silent drops)
702/// 4. Calls `on_stop`
703/// 5. Returns the actor state
704async fn run_actor_loop<A: Actor>(mut actor: A, cx: Cx, cell: &mut ActorCell<A::Message>) -> A {
705    use crate::tracing_compat::debug;
706
707    // Only transition to Running if stop() wasn't called before the actor started.
708    // stop() sets Stopping before scheduling; we must honour that signal so the
709    // poll_fn guard in the message loop can detect the pre-stop and break.
710    if cell.state.load() != ActorState::Stopping {
711        cell.state.store(ActorState::Running);
712    }
713
714    // Phase 1: Initialization
715    // We always run on_start, even if cancelled or pre-stopped, because
716    // it serves as the actor's initial setup and matches the expectation
717    // that lifecycle hooks are symmetrically executed.
718    cx.trace("actor::on_start");
719    actor.on_start(&cx).await;
720
721    // Phase 2: Message loop
722    loop {
723        // Check for cancellation
724        if cx.is_cancel_requested() {
725            cx.trace("actor::cancel_requested");
726            break;
727        }
728
729        let recv_result = std::future::poll_fn(|task_cx| {
730            match cell.mailbox.poll_recv(&cx, task_cx) {
731                std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
732                    // Graceful stop requested and mailbox is empty. Break the loop.
733                    std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
734                }
735                other => other,
736            }
737        })
738        .await;
739
740        match recv_result {
741            Ok(msg) => {
742                actor.handle(&cx, msg).await;
743            }
744            Err(crate::channel::mpsc::RecvError::Disconnected) => {
745                // All senders dropped - graceful shutdown
746                cx.trace("actor::mailbox_disconnected");
747                break;
748            }
749            Err(crate::channel::mpsc::RecvError::Cancelled) => {
750                // Cancellation requested
751                cx.trace("actor::recv_cancelled");
752                break;
753            }
754            Err(crate::channel::mpsc::RecvError::Empty) => {
755                // Shouldn't happen with recv() (only try_recv), but handle gracefully
756                break;
757            }
758        }
759    }
760
761    cell.state.store(ActorState::Stopping);
762
763    let is_aborted = cx.is_cancel_requested();
764
765    // Phase 3: Drain remaining buffered messages.
766    // Two-phase mailbox guarantee: no message silently dropped (unless aborted).
767    // We seal the mailbox to prevent any new reservations or commits, then
768    // process remaining messages if gracefully stopped. If aborted, we just
769    // empty the mailbox to drop the messages.
770    cell.mailbox.close();
771
772    if is_aborted {
773        while let Ok(_msg) = cell.mailbox.try_recv() {}
774    } else {
775        let mut drained: u64 = 0;
776        while let Ok(msg) = cell.mailbox.try_recv() {
777            actor.handle(&cx, msg).await;
778            drained += 1;
779        }
780        if drained > 0 {
781            debug!(drained = drained, "actor::mailbox_drained");
782            cx.trace("actor::mailbox_drained");
783        }
784    }
785
786    // Phase 4: Cleanup — mask cancellation so on_stop runs to completion.
787    // Without masking, an aborted actor's on_stop could observe a stale
788    // cancel_requested=true and bail early via cx.checkpoint().
789
790    cx.trace("actor::on_stop");
791    let inner = cx.inner.clone();
792    inner.write().mask_depth += 1;
793    let mask_guard = OnStopMaskGuard(inner);
794    actor.on_stop(&cx).await;
795    drop(mask_guard);
796
797    actor
798}
799
800// Extension for Scope to spawn actors
801impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
802    /// Spawns a new actor in this scope with the given mailbox capacity.
803    ///
804    /// The actor runs as a region-owned task. Messages are delivered through
805    /// a bounded MPSC channel with two-phase send semantics.
806    ///
807    /// # Arguments
808    ///
809    /// * `state` - Runtime state for task creation
810    /// * `cx` - Capability context
811    /// * `actor` - The actor instance
812    /// * `mailbox_capacity` - Bounded mailbox size
813    ///
814    /// # Returns
815    ///
816    /// A tuple of `(ActorHandle, StoredTask)`. The `StoredTask` must be
817    /// registered with the runtime via `state.store_spawned_task()`.
818    pub fn spawn_actor<A: Actor>(
819        &self,
820        state: &mut crate::runtime::state::RuntimeState,
821        cx: &Cx,
822        actor: A,
823        mailbox_capacity: usize,
824    ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError> {
825        use crate::channel::oneshot;
826        use crate::cx::scope::CatchUnwind;
827        use crate::runtime::stored_task::StoredTask;
828        use crate::tracing_compat::{debug, debug_span};
829
830        // Create the actor's mailbox
831        let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
832
833        // Create oneshot for returning the actor state
834        let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
835
836        // Create task record
837        let task_id = self.create_task_record(state)?;
838        let actor_id = ActorId::from_task(task_id);
839        let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
840
841        let _span = debug_span!(
842            "actor_spawn",
843            task_id = ?task_id,
844            region_id = ?self.region_id(),
845            mailbox_capacity = mailbox_capacity,
846        )
847        .entered();
848        debug!(
849            task_id = ?task_id,
850            region_id = ?self.region_id(),
851            mailbox_capacity = mailbox_capacity,
852            "actor spawned"
853        );
854
855        // Create child context
856        let child_observability = cx.child_observability(self.region_id(), task_id);
857        let child_entropy = cx.child_entropy(task_id);
858        let io_driver = state.io_driver_handle();
859        let child_cx = Cx::new_with_observability(
860            self.region_id(),
861            task_id,
862            self.budget(),
863            Some(child_observability),
864            io_driver,
865            Some(child_entropy),
866        )
867        .with_blocking_pool_handle(cx.blocking_pool_handle());
868
869        // Link Cx to TaskRecord
870        if let Some(record) = state.task_mut(task_id) {
871            record.set_cx_inner(child_cx.inner.clone());
872            record.set_cx(child_cx.clone());
873        }
874
875        let cx_for_send = child_cx.clone();
876        let inner_weak = Arc::downgrade(&child_cx.inner);
877        let state_for_task = Arc::clone(&actor_state);
878
879        let mut cell = ActorCell {
880            mailbox: msg_rx,
881            state: Arc::clone(&actor_state),
882        };
883
884        // Create the actor loop future
885        let wrapped = async move {
886            let result = CatchUnwind {
887                inner: Box::pin(run_actor_loop(actor, child_cx, &mut cell)),
888            }
889            .await;
890            match result {
891                Ok(actor_final) => {
892                    let _ = result_tx.send(&cx_for_send, Ok(actor_final));
893                }
894                Err(payload) => {
895                    let msg = crate::cx::scope::payload_to_string(&payload);
896                    let _ = result_tx.send(
897                        &cx_for_send,
898                        Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
899                    );
900                }
901            }
902            state_for_task.store(ActorState::Stopped);
903            Outcome::Ok(())
904        };
905
906        let stored = StoredTask::new_with_id(wrapped, task_id);
907
908        let handle = ActorHandle {
909            actor_id,
910            sender: msg_tx,
911            state: actor_state,
912            task_id,
913            receiver: result_rx,
914            inner: inner_weak,
915            completed: false,
916        };
917
918        Ok((handle, stored))
919    }
920
921    /// Spawns a supervised actor with automatic restart on failure.
922    ///
923    /// Unlike `spawn_actor`, this method takes a factory closure that can
924    /// produce new actor instances for restarts. The mailbox persists across
925    /// restarts, so messages sent during restart are buffered and processed
926    /// by the new instance.
927    ///
928    /// # Arguments
929    ///
930    /// * `state` - Runtime state for task creation
931    /// * `cx` - Capability context
932    /// * `factory` - Closure that creates actor instances (called on each restart)
933    /// * `strategy` - Supervision strategy (Stop, Restart, Escalate)
934    /// * `mailbox_capacity` - Bounded mailbox size
935    pub fn spawn_supervised_actor<A, F>(
936        &self,
937        state: &mut crate::runtime::state::RuntimeState,
938        cx: &Cx,
939        mut factory: F,
940        strategy: crate::supervision::SupervisionStrategy,
941        mailbox_capacity: usize,
942    ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError>
943    where
944        A: Actor,
945        F: FnMut() -> A + Send + 'static,
946    {
947        use crate::channel::oneshot;
948        use crate::runtime::stored_task::StoredTask;
949        use crate::supervision::Supervisor;
950        use crate::tracing_compat::{debug, debug_span};
951
952        let actor = factory();
953        let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
954        let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
955        let task_id = self.create_task_record(state)?;
956        let actor_id = ActorId::from_task(task_id);
957        let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
958
959        let _span = debug_span!(
960            "supervised_actor_spawn",
961            task_id = ?task_id,
962            region_id = ?self.region_id(),
963            mailbox_capacity = mailbox_capacity,
964        )
965        .entered();
966        debug!(
967            task_id = ?task_id,
968            region_id = ?self.region_id(),
969            "supervised actor spawned"
970        );
971
972        let child_observability = cx.child_observability(self.region_id(), task_id);
973        let child_entropy = cx.child_entropy(task_id);
974        let io_driver = state.io_driver_handle();
975        let child_cx = Cx::new_with_observability(
976            self.region_id(),
977            task_id,
978            self.budget(),
979            Some(child_observability),
980            io_driver,
981            Some(child_entropy),
982        )
983        .with_blocking_pool_handle(cx.blocking_pool_handle());
984
985        if let Some(record) = state.task_mut(task_id) {
986            record.set_cx_inner(child_cx.inner.clone());
987            record.set_cx(child_cx.clone());
988        }
989
990        let cx_for_send = child_cx.clone();
991        let inner_weak = Arc::downgrade(&child_cx.inner);
992        let region_id = self.region_id();
993        let state_for_task = Arc::clone(&actor_state);
994
995        let mut cell = ActorCell {
996            mailbox: msg_rx,
997            state: Arc::clone(&actor_state),
998        };
999
1000        let wrapped = async move {
1001            let result = run_supervised_loop(
1002                actor,
1003                &mut factory,
1004                child_cx,
1005                &mut cell,
1006                Supervisor::new(strategy),
1007                task_id,
1008                region_id,
1009            )
1010            .await;
1011            let _ = result_tx.send(&cx_for_send, result);
1012            state_for_task.store(ActorState::Stopped);
1013            Outcome::Ok(())
1014        };
1015
1016        let stored = StoredTask::new_with_id(wrapped, task_id);
1017
1018        let handle = ActorHandle {
1019            actor_id,
1020            sender: msg_tx,
1021            state: actor_state,
1022            task_id,
1023            receiver: result_rx,
1024            inner: inner_weak,
1025            completed: false,
1026        };
1027
1028        Ok((handle, stored))
1029    }
1030}
1031
1032/// Outcome of a supervised actor run.
1033#[derive(Debug)]
1034pub enum SupervisedOutcome {
1035    /// Actor stopped normally (no failure).
1036    Stopped,
1037    /// Actor stopped after restart budget exhaustion.
1038    RestartBudgetExhausted {
1039        /// Total restarts before budget was exhausted.
1040        total_restarts: u32,
1041    },
1042    /// Failure was escalated to parent region.
1043    Escalated,
1044}
1045
1046/// Internal: runs a supervised actor loop with restart support.
1047///
1048/// The mailbox receiver is shared across restarts — messages sent while the
1049/// actor is restarting are buffered and processed by the new instance.
1050async fn run_supervised_loop<A, F>(
1051    initial_actor: A,
1052    factory: &mut F,
1053    cx: Cx,
1054    cell: &mut ActorCell<A::Message>,
1055    mut supervisor: crate::supervision::Supervisor,
1056    task_id: TaskId,
1057    region_id: RegionId,
1058) -> Result<A, JoinError>
1059where
1060    A: Actor,
1061    F: FnMut() -> A,
1062{
1063    use crate::cx::scope::CatchUnwind;
1064    use crate::supervision::SupervisionDecision;
1065    use crate::types::Outcome;
1066
1067    let mut current_actor = initial_actor;
1068
1069    loop {
1070        // Run the actor until it finishes (normally or via panic)
1071        let result = CatchUnwind {
1072            inner: Box::pin(run_actor_loop(current_actor, cx.clone(), cell)),
1073        }
1074        .await;
1075
1076        match result {
1077            Ok(actor_final) => {
1078                // Actor completed normally — no supervision needed
1079                return Ok(actor_final);
1080            }
1081            Err(payload) => {
1082                // Actor panicked — consult supervisor.
1083                // We report this as Failed (not Panicked) because actor crashes
1084                // are the expected failure mode for supervision. The Erlang/OTP
1085                // model restarts on crashes; Outcome::Panicked would always Stop.
1086                let msg = crate::cx::scope::payload_to_string(&payload);
1087                cx.trace("supervised_actor::failure");
1088
1089                let outcome = Outcome::err(());
1090                let now = cx.timer_driver().map_or(0, |td| td.now().as_nanos());
1091                let decision = supervisor.on_failure(task_id, region_id, None, &outcome, now);
1092
1093                match decision {
1094                    SupervisionDecision::Restart { delay, .. } => {
1095                        cx.trace("supervised_actor::restart");
1096
1097                        // Reset actor state so the restarted actor enters
1098                        // Running instead of staying in Stopping (which
1099                        // would cause it to exit immediately on empty
1100                        // mailbox).
1101                        cell.state.store(ActorState::Created);
1102
1103                        // Apply backoff delay if the supervisor computed one.
1104                        if let Some(backoff) = delay {
1105                            if !backoff.is_zero() {
1106                                let now = cx
1107                                    .timer_driver()
1108                                    .map_or_else(crate::time::wall_now, |td| td.now());
1109                                crate::time::sleep(now, backoff).await;
1110                            }
1111                        }
1112
1113                        current_actor = factory();
1114                    }
1115                    SupervisionDecision::Stop { .. } => {
1116                        cx.trace("supervised_actor::stopped");
1117                        return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1118                    }
1119                    SupervisionDecision::Escalate { .. } => {
1120                        cx.trace("supervised_actor::escalated");
1121                        return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1122                    }
1123                }
1124            }
1125        }
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use crate::runtime::state::RuntimeState;
1133    use crate::types::Budget;
1134    use crate::types::policy::FailFast;
1135
1136    fn init_test(name: &str) {
1137        crate::test_utils::init_test_logging();
1138        crate::test_phase!(name);
1139    }
1140
1141    /// Simple counter actor for testing.
1142    #[derive(Debug)]
1143    struct Counter {
1144        count: u64,
1145        started: bool,
1146        stopped: bool,
1147    }
1148
1149    impl Counter {
1150        fn new() -> Self {
1151            Self {
1152                count: 0,
1153                started: false,
1154                stopped: false,
1155            }
1156        }
1157    }
1158
1159    impl Actor for Counter {
1160        type Message = u64;
1161
1162        fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1163            self.started = true;
1164            Box::pin(async {})
1165        }
1166
1167        fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1168            self.count += msg;
1169            Box::pin(async {})
1170        }
1171
1172        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1173            self.stopped = true;
1174            Box::pin(async {})
1175        }
1176    }
1177
1178    fn assert_actor<A: Actor>() {}
1179
1180    #[test]
1181    fn actor_trait_object_safety() {
1182        init_test("actor_trait_object_safety");
1183
1184        // Verify Counter implements Actor with the right bounds
1185        assert_actor::<Counter>();
1186
1187        crate::test_complete!("actor_trait_object_safety");
1188    }
1189
1190    #[test]
1191    fn actor_handle_creation() {
1192        init_test("actor_handle_creation");
1193
1194        let mut state = RuntimeState::new();
1195        let root = state.create_root_region(Budget::INFINITE);
1196        let cx: Cx = Cx::for_testing();
1197        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1198
1199        let result = scope.spawn_actor(&mut state, &cx, Counter::new(), 32);
1200        assert!(result.is_ok(), "spawn_actor should succeed");
1201
1202        let (handle, stored) = result.unwrap();
1203        state.store_spawned_task(handle.task_id(), stored);
1204
1205        // Handle should have valid task ID
1206        let _tid = handle.task_id();
1207
1208        // Actor should not be finished yet (not polled)
1209        assert!(!handle.is_finished());
1210
1211        crate::test_complete!("actor_handle_creation");
1212    }
1213
1214    #[test]
1215    fn actor_id_generation_distinct() {
1216        init_test("actor_id_generation_distinct");
1217
1218        let id1 = ActorId::from_task(TaskId::new_for_test(1, 1));
1219        let id2 = ActorId::from_task(TaskId::new_for_test(1, 2));
1220        assert!(id1 != id2, "generation must distinguish actor reuse");
1221
1222        crate::test_complete!("actor_id_generation_distinct");
1223    }
1224
1225    #[test]
1226    fn actor_ref_is_cloneable() {
1227        init_test("actor_ref_is_cloneable");
1228
1229        let mut state = RuntimeState::new();
1230        let root = state.create_root_region(Budget::INFINITE);
1231        let cx: Cx = Cx::for_testing();
1232        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1233
1234        let (handle, stored) = scope
1235            .spawn_actor(&mut state, &cx, Counter::new(), 32)
1236            .unwrap();
1237        state.store_spawned_task(handle.task_id(), stored);
1238
1239        // Get multiple refs
1240        let ref1 = handle.sender();
1241        let ref2 = ref1.clone();
1242
1243        // Actor identity is preserved across clones
1244        assert_eq!(ref1.actor_id(), handle.actor_id());
1245        assert_eq!(ref2.actor_id(), handle.actor_id());
1246
1247        // Actor is alive at creation time (even before first poll)
1248        assert!(ref1.is_alive());
1249        assert!(ref2.is_alive());
1250
1251        // Both should be open
1252        assert!(!ref1.is_closed());
1253        assert!(!ref2.is_closed());
1254
1255        crate::test_complete!("actor_ref_is_cloneable");
1256    }
1257
1258    // ---- E2E Actor Scenarios ----
1259
1260    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1261
1262    /// Observable counter actor: writes final count to shared state during on_stop.
1263    /// Used by E2E tests to verify actor behavior without needing join().
1264    struct ObservableCounter {
1265        count: u64,
1266        on_stop_count: Arc<AtomicU64>,
1267        started: Arc<AtomicBool>,
1268        stopped: Arc<AtomicBool>,
1269    }
1270
1271    impl ObservableCounter {
1272        fn new(
1273            on_stop_count: Arc<AtomicU64>,
1274            started: Arc<AtomicBool>,
1275            stopped: Arc<AtomicBool>,
1276        ) -> Self {
1277            Self {
1278                count: 0,
1279                on_stop_count,
1280                started,
1281                stopped,
1282            }
1283        }
1284    }
1285
1286    impl Actor for ObservableCounter {
1287        type Message = u64;
1288
1289        fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1290            self.started.store(true, Ordering::SeqCst);
1291            Box::pin(async {})
1292        }
1293
1294        fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1295            self.count += msg;
1296            Box::pin(async {})
1297        }
1298
1299        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1300            self.on_stop_count.store(self.count, Ordering::SeqCst);
1301            self.stopped.store(true, Ordering::SeqCst);
1302            Box::pin(async {})
1303        }
1304    }
1305
1306    fn observable_state() -> (Arc<AtomicU64>, Arc<AtomicBool>, Arc<AtomicBool>) {
1307        (
1308            Arc::new(AtomicU64::new(u64::MAX)),
1309            Arc::new(AtomicBool::new(false)),
1310            Arc::new(AtomicBool::new(false)),
1311        )
1312    }
1313
1314    /// E2E: Actor processes all messages sent before channel disconnect.
1315    /// Verifies: messages delivered, on_start called, on_stop called.
1316    #[test]
1317    fn actor_processes_all_messages() {
1318        init_test("actor_processes_all_messages");
1319
1320        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1321        let region = runtime.state.create_root_region(Budget::INFINITE);
1322        let cx: Cx = Cx::for_testing();
1323        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1324
1325        let (on_stop_count, started, stopped) = observable_state();
1326        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1327
1328        let (handle, stored) = scope
1329            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1330            .unwrap();
1331        let task_id = handle.task_id();
1332        runtime.state.store_spawned_task(task_id, stored);
1333
1334        // Pre-fill mailbox with 5 messages (each adding 1)
1335        for _ in 0..5 {
1336            handle.try_send(1).unwrap();
1337        }
1338
1339        // Drop handle to disconnect channel — actor will process buffered
1340        // messages via recv, then see Disconnected and stop gracefully.
1341        drop(handle);
1342
1343        runtime.scheduler.lock().schedule(task_id, 0);
1344        runtime.run_until_quiescent();
1345
1346        assert_eq!(
1347            on_stop_count.load(Ordering::SeqCst),
1348            5,
1349            "all messages processed"
1350        );
1351        assert!(started.load(Ordering::SeqCst), "on_start was called");
1352        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1353
1354        crate::test_complete!("actor_processes_all_messages");
1355    }
1356
1357    /// E2E: Mailbox drain on cancellation.
1358    /// Pre-fills mailbox, cancels actor before it runs, verifies all messages
1359    /// are still processed during the drain phase (no silent drops).
1360    #[test]
1361    fn actor_drains_mailbox_on_cancel() {
1362        init_test("actor_drains_mailbox_on_cancel");
1363
1364        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1365        let region = runtime.state.create_root_region(Budget::INFINITE);
1366        let cx: Cx = Cx::for_testing();
1367        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1368
1369        let (on_stop_count, started, stopped) = observable_state();
1370        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1371
1372        let (handle, stored) = scope
1373            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1374            .unwrap();
1375        let task_id = handle.task_id();
1376        runtime.state.store_spawned_task(task_id, stored);
1377
1378        // Pre-fill mailbox with 5 messages
1379        for _ in 0..5 {
1380            handle.try_send(1).unwrap();
1381        }
1382
1383        // Cancel the actor BEFORE running.
1384        // The actor loop will: on_start → check cancel → break → drain → on_stop
1385        handle.stop();
1386
1387        runtime.scheduler.lock().schedule(task_id, 0);
1388        runtime.run_until_quiescent();
1389
1390        // All 5 messages processed during drain phase
1391        assert_eq!(
1392            on_stop_count.load(Ordering::SeqCst),
1393            5,
1394            "drain processed all messages"
1395        );
1396        assert!(started.load(Ordering::SeqCst), "on_start was called");
1397        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1398
1399        crate::test_complete!("actor_drains_mailbox_on_cancel");
1400    }
1401
1402    /// E2E: ActorRef liveness tracks actor lifecycle (Created -> Stopping -> Stopped).
1403    #[test]
1404    fn actor_ref_is_alive_transitions() {
1405        init_test("actor_ref_is_alive_transitions");
1406
1407        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1408        let region = runtime.state.create_root_region(Budget::INFINITE);
1409        let cx: Cx = Cx::for_testing();
1410        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1411
1412        let (on_stop_count, started, stopped) = observable_state();
1413        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1414
1415        let (handle, stored) = scope
1416            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1417            .unwrap();
1418        let task_id = handle.task_id();
1419        runtime.state.store_spawned_task(task_id, stored);
1420
1421        let actor_ref = handle.sender();
1422        assert!(actor_ref.is_alive(), "created actor should be alive");
1423        assert_eq!(actor_ref.actor_id(), handle.actor_id());
1424
1425        handle.stop();
1426        assert!(actor_ref.is_alive(), "stopping actor is still alive");
1427
1428        runtime.scheduler.lock().schedule(task_id, 0);
1429        runtime.run_until_quiescent();
1430
1431        assert!(
1432            handle.is_finished(),
1433            "actor should be finished after stop + run"
1434        );
1435        assert!(!actor_ref.is_alive(), "finished actor is not alive");
1436
1437        // Sanity: the actor ran its hooks.
1438        assert!(started.load(Ordering::SeqCst), "on_start was called");
1439        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1440        assert_ne!(
1441            on_stop_count.load(Ordering::SeqCst),
1442            u64::MAX,
1443            "on_stop_count updated"
1444        );
1445
1446        crate::test_complete!("actor_ref_is_alive_transitions");
1447    }
1448
1449    /// E2E: Supervised actor restarts on panic within budget.
1450    /// Actor panics on messages >= threshold, supervisor restarts it.
1451    /// After restart, actor processes subsequent normal messages.
1452    #[test]
1453    fn supervised_actor_restarts_on_panic() {
1454        use std::sync::atomic::AtomicU32;
1455
1456        struct PanickingCounter {
1457            count: u64,
1458            panic_on: u64,
1459            final_count: Arc<AtomicU64>,
1460            restart_count: Arc<AtomicU32>,
1461        }
1462
1463        impl Actor for PanickingCounter {
1464            type Message = u64;
1465
1466            fn handle(
1467                &mut self,
1468                _cx: &Cx,
1469                msg: u64,
1470            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1471                assert!(msg != self.panic_on, "threshold exceeded: {msg}");
1472                self.count += msg;
1473                Box::pin(async {})
1474            }
1475
1476            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1477                self.final_count.store(self.count, Ordering::SeqCst);
1478                Box::pin(async {})
1479            }
1480        }
1481
1482        init_test("supervised_actor_restarts_on_panic");
1483
1484        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1485        let region = runtime.state.create_root_region(Budget::INFINITE);
1486        let cx: Cx = Cx::for_testing();
1487        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1488
1489        let final_count = Arc::new(AtomicU64::new(u64::MAX));
1490        let restart_count = Arc::new(AtomicU32::new(0));
1491        let fc = final_count.clone();
1492        let rc = restart_count.clone();
1493
1494        let strategy = crate::supervision::SupervisionStrategy::Restart(
1495            crate::supervision::RestartConfig::new(3, std::time::Duration::from_mins(1)),
1496        );
1497
1498        let (handle, stored) = scope
1499            .spawn_supervised_actor(
1500                &mut runtime.state,
1501                &cx,
1502                move || {
1503                    rc.fetch_add(1, Ordering::SeqCst);
1504                    PanickingCounter {
1505                        count: 0,
1506                        panic_on: 999,
1507                        final_count: fc.clone(),
1508                        restart_count: rc.clone(),
1509                    }
1510                },
1511                strategy,
1512                32,
1513            )
1514            .unwrap();
1515        let task_id = handle.task_id();
1516        runtime.state.store_spawned_task(task_id, stored);
1517
1518        // Message sequence:
1519        // 1. Normal message (count += 1)
1520        // 2. Panic trigger (actor panics, supervisor restarts)
1521        // 3. Normal message after restart (count += 1 on new instance)
1522        handle.try_send(1).unwrap();
1523        handle.try_send(999).unwrap(); // triggers panic
1524        handle.try_send(1).unwrap(); // processed by restarted actor
1525
1526        // Drop handle to disconnect channel after the restarted actor processes messages
1527        drop(handle);
1528
1529        runtime.scheduler.lock().schedule(task_id, 0);
1530        runtime.run_until_quiescent();
1531
1532        // Factory was called: once for initial + once for restart = fetch_add called twice
1533        // (first call was during spawn_supervised_actor, so count starts at 1;
1534        //  restart increments to 2)
1535        assert!(
1536            restart_count.load(Ordering::SeqCst) >= 2,
1537            "factory should have been called at least twice (initial + restart), got {}",
1538            restart_count.load(Ordering::SeqCst)
1539        );
1540
1541        // After restart, actor processes msg=1, then stops => final_count=1
1542        assert_eq!(
1543            final_count.load(Ordering::SeqCst),
1544            1,
1545            "restarted actor should have processed the post-panic message"
1546        );
1547
1548        crate::test_complete!("supervised_actor_restarts_on_panic");
1549    }
1550
1551    /// E2E: Deterministic replay — same seed produces same actor execution.
1552    #[test]
1553    fn actor_deterministic_replay() {
1554        fn run_scenario(seed: u64) -> u64 {
1555            let config = crate::lab::LabConfig::new(seed);
1556            let mut runtime = crate::lab::LabRuntime::new(config);
1557            let region = runtime.state.create_root_region(Budget::INFINITE);
1558            let cx: Cx = Cx::for_testing();
1559            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1560
1561            let (on_stop_count, started, stopped) = observable_state();
1562            let actor = ObservableCounter::new(on_stop_count.clone(), started, stopped);
1563
1564            let (handle, stored) = scope
1565                .spawn_actor(&mut runtime.state, &cx, actor, 32)
1566                .unwrap();
1567            let task_id = handle.task_id();
1568            runtime.state.store_spawned_task(task_id, stored);
1569
1570            for i in 1..=10 {
1571                handle.try_send(i).unwrap();
1572            }
1573            drop(handle);
1574
1575            runtime.scheduler.lock().schedule(task_id, 0);
1576            runtime.run_until_quiescent();
1577
1578            on_stop_count.load(Ordering::SeqCst)
1579        }
1580
1581        init_test("actor_deterministic_replay");
1582
1583        // Run the same scenario twice with the same seed
1584        let result1 = run_scenario(0xDEAD_BEEF);
1585        let result2 = run_scenario(0xDEAD_BEEF);
1586
1587        assert_eq!(
1588            result1, result2,
1589            "deterministic replay: same seed → same result"
1590        );
1591        assert_eq!(result1, 55, "sum of 1..=10");
1592
1593        crate::test_complete!("actor_deterministic_replay");
1594    }
1595
1596    // ---- ActorContext Tests ----
1597
1598    #[test]
1599    fn actor_context_self_reference() {
1600        init_test("actor_context_self_reference");
1601
1602        let mut state = RuntimeState::new();
1603        let root = state.create_root_region(Budget::INFINITE);
1604        let cx: Cx = Cx::for_testing();
1605        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1606
1607        let (handle, stored) = scope
1608            .spawn_actor(&mut state, &cx, Counter::new(), 32)
1609            .unwrap();
1610        state.store_spawned_task(handle.task_id(), stored);
1611
1612        // Create an ActorContext using the handle's sender
1613        let actor_ref = handle.sender();
1614        let actor_id = handle.actor_id();
1615        let ctx: ActorContext<'_, u64> = ActorContext::new(&cx, actor_ref, actor_id, None);
1616
1617        // Test self_actor_id() - doesn't require Clone
1618        assert_eq!(ctx.self_actor_id(), actor_id);
1619        assert_eq!(ctx.actor_id(), actor_id);
1620
1621        crate::test_complete!("actor_context_self_reference");
1622    }
1623
1624    #[test]
1625    fn actor_context_child_management() {
1626        init_test("actor_context_child_management");
1627
1628        let cx: Cx = Cx::for_testing();
1629        let (sender, _receiver) = mpsc::channel::<u64>(32);
1630        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1631        let actor_ref = ActorRef {
1632            actor_id,
1633            sender,
1634            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1635        };
1636
1637        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1638
1639        // Initially no children
1640        assert!(!ctx.has_children());
1641        assert_eq!(ctx.child_count(), 0);
1642        assert!(ctx.children().is_empty());
1643
1644        // Register children
1645        let child1 = ActorId::from_task(TaskId::new_for_test(2, 1));
1646        let child2 = ActorId::from_task(TaskId::new_for_test(3, 1));
1647
1648        ctx.register_child(child1);
1649        assert!(ctx.has_children());
1650        assert_eq!(ctx.child_count(), 1);
1651
1652        ctx.register_child(child2);
1653        assert_eq!(ctx.child_count(), 2);
1654
1655        // Unregister child
1656        assert!(ctx.unregister_child(child1));
1657        assert_eq!(ctx.child_count(), 1);
1658
1659        // Unregistering non-existent child returns false
1660        assert!(!ctx.unregister_child(child1));
1661
1662        crate::test_complete!("actor_context_child_management");
1663    }
1664
1665    #[test]
1666    fn actor_context_stopping() {
1667        init_test("actor_context_stopping");
1668
1669        let cx: Cx = Cx::for_testing();
1670        let (sender, _receiver) = mpsc::channel::<u64>(32);
1671        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1672        let actor_ref = ActorRef {
1673            actor_id,
1674            sender,
1675            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1676        };
1677
1678        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1679
1680        // Initially not stopping
1681        assert!(!ctx.is_stopping());
1682        assert!(ctx.checkpoint().is_ok());
1683
1684        // Request stop
1685        ctx.stop_self();
1686        assert!(ctx.is_stopping());
1687        assert!(ctx.checkpoint().is_err());
1688        assert!(ctx.is_cancel_requested());
1689
1690        crate::test_complete!("actor_context_stopping");
1691    }
1692
1693    #[test]
1694    fn actor_context_parent_none() {
1695        init_test("actor_context_parent_none");
1696
1697        let cx: Cx = Cx::for_testing();
1698        let (sender, _receiver) = mpsc::channel::<u64>(32);
1699        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1700        let actor_ref = ActorRef {
1701            actor_id,
1702            sender,
1703            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1704        };
1705
1706        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1707
1708        // Root actor has no parent
1709        assert!(!ctx.has_parent());
1710        assert!(ctx.parent().is_none());
1711
1712        crate::test_complete!("actor_context_parent_none");
1713    }
1714
1715    #[test]
1716    fn actor_context_cx_delegation() {
1717        init_test("actor_context_cx_delegation");
1718
1719        let cx: Cx = Cx::for_testing();
1720        let (sender, _receiver) = mpsc::channel::<u64>(32);
1721        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1722        let actor_ref = ActorRef {
1723            actor_id,
1724            sender,
1725            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1726        };
1727
1728        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1729
1730        // Test Cx delegation via Deref
1731        let _budget = ctx.budget();
1732        ctx.trace("test_event");
1733
1734        // Test cx() accessor
1735        let _cx_ref = ctx.cx();
1736
1737        crate::test_complete!("actor_context_cx_delegation");
1738    }
1739
1740    #[test]
1741    fn actor_context_debug() {
1742        init_test("actor_context_debug");
1743
1744        let cx: Cx = Cx::for_testing();
1745        let (sender, _receiver) = mpsc::channel::<u64>(32);
1746        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1747        let actor_ref = ActorRef {
1748            actor_id,
1749            sender,
1750            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1751        };
1752
1753        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1754
1755        // Debug formatting should work
1756        let debug_str = format!("{ctx:?}");
1757        assert!(debug_str.contains("ActorContext"));
1758        assert!(debug_str.contains("actor_id"));
1759
1760        crate::test_complete!("actor_context_debug");
1761    }
1762
1763    // ---- Invariant Tests ----
1764
1765    /// Invariant: `ActorStateCell` encode/decode roundtrips correctly for all
1766    /// valid states, and unknown u8 values map to `Stopped` (fail-safe).
1767    #[test]
1768    fn actor_state_cell_encode_decode_roundtrip() {
1769        init_test("actor_state_cell_encode_decode_roundtrip");
1770
1771        let states = [
1772            ActorState::Created,
1773            ActorState::Running,
1774            ActorState::Stopping,
1775            ActorState::Stopped,
1776        ];
1777
1778        for &state in &states {
1779            let cell = ActorStateCell::new(state);
1780            let loaded = cell.load();
1781            crate::assert_with_log!(loaded == state, "roundtrip", state, loaded);
1782        }
1783
1784        // Unknown values (4+) should map to Stopped (fail-safe).
1785        for raw in 4_u8..=10 {
1786            let decoded = ActorStateCell::decode(raw);
1787            let is_stopped = decoded == ActorState::Stopped;
1788            crate::assert_with_log!(is_stopped, "unknown u8 -> Stopped", true, is_stopped);
1789        }
1790
1791        crate::test_complete!("actor_state_cell_encode_decode_roundtrip");
1792    }
1793
1794    /// Invariant: `MailboxConfig::default()` has documented capacity and
1795    /// backpressure enabled.
1796    #[test]
1797    fn mailbox_config_defaults() {
1798        init_test("mailbox_config_defaults");
1799
1800        let config = MailboxConfig::default();
1801        crate::assert_with_log!(
1802            config.capacity == DEFAULT_MAILBOX_CAPACITY,
1803            "default capacity",
1804            DEFAULT_MAILBOX_CAPACITY,
1805            config.capacity
1806        );
1807        crate::assert_with_log!(
1808            config.backpressure,
1809            "backpressure enabled by default",
1810            true,
1811            config.backpressure
1812        );
1813
1814        let custom = MailboxConfig::with_capacity(8);
1815        crate::assert_with_log!(
1816            custom.capacity == 8,
1817            "custom capacity",
1818            8usize,
1819            custom.capacity
1820        );
1821        crate::assert_with_log!(
1822            custom.backpressure,
1823            "with_capacity enables backpressure",
1824            true,
1825            custom.backpressure
1826        );
1827
1828        crate::test_complete!("mailbox_config_defaults");
1829    }
1830
1831    /// Invariant: `try_send` on a full mailbox returns an error without
1832    /// blocking, and the message is recoverable from the error.
1833    #[test]
1834    fn actor_try_send_full_mailbox_returns_error() {
1835        init_test("actor_try_send_full_mailbox_returns_error");
1836
1837        let mut state = RuntimeState::new();
1838        let root = state.create_root_region(Budget::INFINITE);
1839        let cx: Cx = Cx::for_testing();
1840        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1841
1842        // Create actor with capacity=2 mailbox.
1843        let (handle, stored) = scope
1844            .spawn_actor(&mut state, &cx, Counter::new(), 2)
1845            .unwrap();
1846        state.store_spawned_task(handle.task_id(), stored);
1847
1848        // Fill the mailbox.
1849        let ok1 = handle.try_send(1).is_ok();
1850        crate::assert_with_log!(ok1, "first send ok", true, ok1);
1851        let ok2 = handle.try_send(2).is_ok();
1852        crate::assert_with_log!(ok2, "second send ok", true, ok2);
1853
1854        // Third send should fail — mailbox full.
1855        let result = handle.try_send(3);
1856        let is_full = result.is_err();
1857        crate::assert_with_log!(is_full, "third send fails (full)", true, is_full);
1858
1859        crate::test_complete!("actor_try_send_full_mailbox_returns_error");
1860    }
1861
1862    /// Invariant: `ActorContext` with a parent supervisor set exposes it
1863    /// and reports `has_parent() == true`.
1864    #[test]
1865    fn actor_context_with_parent_supervisor() {
1866        init_test("actor_context_with_parent_supervisor");
1867
1868        let cx: Cx = Cx::for_testing();
1869
1870        // Create parent supervisor channel.
1871        let (parent_sender, _parent_receiver) = mpsc::channel::<SupervisorMessage>(8);
1872        let parent_id = ActorId::from_task(TaskId::new_for_test(10, 1));
1873        let parent_ref = ActorRef {
1874            actor_id: parent_id,
1875            sender: parent_sender,
1876            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1877        };
1878
1879        // Create child actor context with parent.
1880        let (child_sender, _child_receiver) = mpsc::channel::<u64>(32);
1881        let child_id = ActorId::from_task(TaskId::new_for_test(20, 1));
1882        let child_ref = ActorRef {
1883            actor_id: child_id,
1884            sender: child_sender,
1885            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1886        };
1887
1888        let ctx = ActorContext::new(&cx, child_ref, child_id, Some(parent_ref));
1889
1890        let has_parent = ctx.has_parent();
1891        crate::assert_with_log!(has_parent, "has parent", true, has_parent);
1892
1893        let parent = ctx.parent().expect("parent should be Some");
1894        let parent_id_matches = parent.actor_id() == parent_id;
1895        crate::assert_with_log!(
1896            parent_id_matches,
1897            "parent id matches",
1898            true,
1899            parent_id_matches
1900        );
1901
1902        crate::test_complete!("actor_context_with_parent_supervisor");
1903    }
1904
1905    // ---- Pure Data Type Tests (no runtime needed) ----
1906
1907    #[test]
1908    fn actor_id_debug_format() {
1909        let id = ActorId::from_task(TaskId::new_for_test(5, 3));
1910        let dbg = format!("{id:?}");
1911        assert!(dbg.contains("ActorId"), "{dbg}");
1912    }
1913
1914    #[test]
1915    fn actor_id_display_delegates_to_task_id() {
1916        let tid = TaskId::new_for_test(7, 2);
1917        let aid = ActorId::from_task(tid);
1918        assert_eq!(format!("{aid}"), format!("{tid}"));
1919    }
1920
1921    #[test]
1922    fn actor_id_from_task_roundtrip() {
1923        let tid = TaskId::new_for_test(3, 1);
1924        let aid = ActorId::from_task(tid);
1925        assert_eq!(aid.task_id(), tid);
1926    }
1927
1928    #[test]
1929    fn actor_id_copy_clone() {
1930        let id = ActorId::from_task(TaskId::new_for_test(1, 1));
1931        let copied = id; // Copy
1932        let cloned = id;
1933        assert_eq!(id, copied);
1934        assert_eq!(id, cloned);
1935    }
1936
1937    #[test]
1938    fn actor_id_hash_consistency() {
1939        use crate::util::DetHasher;
1940        use std::hash::{Hash, Hasher};
1941
1942        let id1 = ActorId::from_task(TaskId::new_for_test(4, 2));
1943        let id2 = ActorId::from_task(TaskId::new_for_test(4, 2));
1944        assert_eq!(id1, id2);
1945
1946        let mut h1 = DetHasher::default();
1947        let mut h2 = DetHasher::default();
1948        id1.hash(&mut h1);
1949        id2.hash(&mut h2);
1950        assert_eq!(h1.finish(), h2.finish(), "equal IDs must hash equal");
1951    }
1952
1953    #[test]
1954    fn actor_state_debug_all_variants() {
1955        for (state, expected) in [
1956            (ActorState::Created, "Created"),
1957            (ActorState::Running, "Running"),
1958            (ActorState::Stopping, "Stopping"),
1959            (ActorState::Stopped, "Stopped"),
1960        ] {
1961            let dbg = format!("{state:?}");
1962            assert_eq!(dbg, expected, "ActorState::{expected}");
1963        }
1964    }
1965
1966    #[test]
1967    fn actor_state_clone_copy_eq() {
1968        let s = ActorState::Running;
1969        let copied = s;
1970        let cloned = s;
1971        assert_eq!(s, copied);
1972        assert_eq!(s, cloned);
1973    }
1974
1975    #[test]
1976    fn actor_state_exhaustive_inequality() {
1977        let all = [
1978            ActorState::Created,
1979            ActorState::Running,
1980            ActorState::Stopping,
1981            ActorState::Stopped,
1982        ];
1983        for (i, a) in all.iter().enumerate() {
1984            for (j, b) in all.iter().enumerate() {
1985                if i == j {
1986                    assert_eq!(a, b);
1987                } else {
1988                    assert_ne!(a, b);
1989                }
1990            }
1991        }
1992    }
1993
1994    #[test]
1995    fn actor_state_cell_sequential_transitions() {
1996        let cell = ActorStateCell::new(ActorState::Created);
1997        assert_eq!(cell.load(), ActorState::Created);
1998
1999        cell.store(ActorState::Running);
2000        assert_eq!(cell.load(), ActorState::Running);
2001
2002        cell.store(ActorState::Stopping);
2003        assert_eq!(cell.load(), ActorState::Stopping);
2004
2005        cell.store(ActorState::Stopped);
2006        assert_eq!(cell.load(), ActorState::Stopped);
2007    }
2008
2009    #[test]
2010    fn supervisor_message_debug_child_failed() {
2011        let msg = SupervisorMessage::ChildFailed {
2012            child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
2013            reason: "panicked".to_string(),
2014        };
2015        let dbg = format!("{msg:?}");
2016        assert!(dbg.contains("ChildFailed"), "{dbg}");
2017        assert!(dbg.contains("panicked"), "{dbg}");
2018    }
2019
2020    #[test]
2021    fn supervisor_message_debug_child_stopped() {
2022        let msg = SupervisorMessage::ChildStopped {
2023            child_id: ActorId::from_task(TaskId::new_for_test(2, 1)),
2024        };
2025        let dbg = format!("{msg:?}");
2026        assert!(dbg.contains("ChildStopped"), "{dbg}");
2027    }
2028
2029    #[test]
2030    fn supervisor_message_clone() {
2031        let msg = SupervisorMessage::ChildFailed {
2032            child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
2033            reason: "boom".to_string(),
2034        };
2035        let cloned = msg.clone();
2036        let (a, b) = (format!("{msg:?}"), format!("{cloned:?}"));
2037        assert_eq!(a, b);
2038    }
2039
2040    #[test]
2041    fn supervised_outcome_debug_all_variants() {
2042        let variants: Vec<SupervisedOutcome> = vec![
2043            SupervisedOutcome::Stopped,
2044            SupervisedOutcome::RestartBudgetExhausted { total_restarts: 5 },
2045            SupervisedOutcome::Escalated,
2046        ];
2047        for v in &variants {
2048            let dbg = format!("{v:?}");
2049            assert!(!dbg.is_empty());
2050        }
2051        assert!(format!("{:?}", variants[0]).contains("Stopped"));
2052        assert!(format!("{:?}", variants[1]).contains('5'));
2053        assert!(format!("{:?}", variants[2]).contains("Escalated"));
2054    }
2055
2056    #[test]
2057    fn mailbox_config_debug_clone_copy() {
2058        let cfg = MailboxConfig::default();
2059        let dbg = format!("{cfg:?}");
2060        assert!(dbg.contains("MailboxConfig"), "{dbg}");
2061        assert!(dbg.contains("64"), "{dbg}");
2062
2063        let copied = cfg;
2064        let cloned = cfg;
2065        assert_eq!(copied.capacity, cfg.capacity);
2066        assert_eq!(cloned.backpressure, cfg.backpressure);
2067    }
2068
2069    #[test]
2070    fn mailbox_config_zero_capacity() {
2071        let cfg = MailboxConfig::with_capacity(0);
2072        assert_eq!(cfg.capacity, 0);
2073        assert!(cfg.backpressure);
2074    }
2075
2076    #[test]
2077    fn mailbox_config_max_capacity() {
2078        let cfg = MailboxConfig::with_capacity(usize::MAX);
2079        assert_eq!(cfg.capacity, usize::MAX);
2080    }
2081
2082    #[test]
2083    fn default_mailbox_capacity_is_64() {
2084        assert_eq!(DEFAULT_MAILBOX_CAPACITY, 64);
2085    }
2086
2087    #[test]
2088    fn actor_context_duplicate_child_registration() {
2089        let cx: Cx = Cx::for_testing();
2090        let (sender, _receiver) = mpsc::channel::<u64>(32);
2091        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2092        let actor_ref = ActorRef {
2093            actor_id,
2094            sender,
2095            state: Arc::new(ActorStateCell::new(ActorState::Running)),
2096        };
2097
2098        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2099        let child = ActorId::from_task(TaskId::new_for_test(2, 1));
2100
2101        ctx.register_child(child);
2102        ctx.register_child(child); // duplicate
2103        assert_eq!(ctx.child_count(), 2, "register_child does not dedup");
2104
2105        // Unregister removes first occurrence
2106        assert!(ctx.unregister_child(child));
2107        assert_eq!(ctx.child_count(), 1, "one copy remains");
2108        assert!(ctx.unregister_child(child));
2109        assert_eq!(ctx.child_count(), 0);
2110        assert!(!ctx.unregister_child(child), "nothing left to remove");
2111    }
2112
2113    #[test]
2114    fn actor_context_stop_self_is_idempotent() {
2115        let cx: Cx = Cx::for_testing();
2116        let (sender, _receiver) = mpsc::channel::<u64>(32);
2117        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2118        let actor_ref = ActorRef {
2119            actor_id,
2120            sender,
2121            state: Arc::new(ActorStateCell::new(ActorState::Running)),
2122        };
2123
2124        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2125        ctx.stop_self();
2126        assert!(ctx.is_stopping());
2127        ctx.stop_self(); // idempotent
2128        assert!(ctx.is_stopping());
2129    }
2130
2131    #[test]
2132    fn actor_context_self_ref_returns_working_ref() {
2133        let cx: Cx = Cx::for_testing();
2134        let (sender, _receiver) = mpsc::channel::<u64>(32);
2135        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2136        let actor_ref = ActorRef {
2137            actor_id,
2138            sender,
2139            state: Arc::new(ActorStateCell::new(ActorState::Running)),
2140        };
2141
2142        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2143        let self_ref = ctx.self_ref();
2144        assert_eq!(self_ref.actor_id(), actor_id);
2145        assert!(self_ref.is_alive());
2146    }
2147
2148    #[test]
2149    fn actor_context_deadline_reflects_budget() {
2150        let cx: Cx = Cx::for_testing();
2151        let (sender, _receiver) = mpsc::channel::<u64>(32);
2152        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2153        let actor_ref = ActorRef {
2154            actor_id,
2155            sender,
2156            state: Arc::new(ActorStateCell::new(ActorState::Running)),
2157        };
2158
2159        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2160        // for_testing() Cx has INFINITE budget, which has no deadline
2161        assert!(ctx.deadline().is_none());
2162    }
2163
2164    #[test]
2165    fn actor_handle_debug() {
2166        let mut state = RuntimeState::new();
2167        let root = state.create_root_region(Budget::INFINITE);
2168        let cx: Cx = Cx::for_testing();
2169        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2170
2171        let (handle, stored) = scope
2172            .spawn_actor(&mut state, &cx, Counter::new(), 32)
2173            .unwrap();
2174        state.store_spawned_task(handle.task_id(), stored);
2175
2176        let dbg = format!("{handle:?}");
2177        assert!(dbg.contains("ActorHandle"), "{dbg}");
2178    }
2179
2180    #[test]
2181    fn actor_handle_second_join_fails_closed() {
2182        init_test("actor_handle_second_join_fails_closed");
2183
2184        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2185        let region = runtime.state.create_root_region(Budget::INFINITE);
2186        let cx = Cx::for_testing();
2187        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2188
2189        let (mut handle, stored) = scope
2190            .spawn_actor(&mut runtime.state, &cx, Counter::new(), 32)
2191            .unwrap();
2192        let task_id = handle.task_id();
2193        runtime.state.store_spawned_task(task_id, stored);
2194
2195        handle.stop();
2196        runtime.scheduler.lock().schedule(task_id, 0);
2197        runtime.run_until_quiescent();
2198        assert!(handle.is_finished(), "stopped actor should report finished");
2199
2200        let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
2201        assert_eq!(final_state.count, 0, "join should return final actor state");
2202
2203        let second = futures_lite::future::block_on(handle.join(&cx));
2204        assert!(
2205            matches!(second, Err(JoinError::PolledAfterCompletion)),
2206            "second join must fail closed, got {second:?}"
2207        );
2208
2209        crate::test_complete!("actor_handle_second_join_fails_closed");
2210    }
2211
2212    #[test]
2213    fn actor_ref_debug() {
2214        let mut state = RuntimeState::new();
2215        let root = state.create_root_region(Budget::INFINITE);
2216        let cx: Cx = Cx::for_testing();
2217        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2218
2219        let (handle, stored) = scope
2220            .spawn_actor(&mut state, &cx, Counter::new(), 32)
2221            .unwrap();
2222        state.store_spawned_task(handle.task_id(), stored);
2223
2224        let actor_ref = handle.sender();
2225        let dbg = format!("{actor_ref:?}");
2226        assert!(dbg.contains("ActorRef"), "{dbg}");
2227    }
2228
2229    #[test]
2230    fn actor_state_cell_debug() {
2231        let cell = ActorStateCell::new(ActorState::Running);
2232        let dbg = format!("{cell:?}");
2233        assert!(dbg.contains("ActorStateCell"), "{dbg}");
2234    }
2235
2236    #[test]
2237    fn actor_id_clone_copy_eq_hash() {
2238        use std::collections::HashSet;
2239
2240        let id = ActorId::from_task(TaskId::new_for_test(1, 0));
2241        let dbg = format!("{id:?}");
2242        assert!(dbg.contains("ActorId"));
2243
2244        let id2 = id;
2245        assert_eq!(id, id2);
2246
2247        // Copy
2248        let id3 = id;
2249        assert_eq!(id, id3);
2250
2251        // Hash
2252        let mut set = HashSet::new();
2253        set.insert(id);
2254        set.insert(ActorId::from_task(TaskId::new_for_test(2, 0)));
2255        assert_eq!(set.len(), 2);
2256    }
2257
2258    #[test]
2259    fn actor_state_debug_clone_copy_eq() {
2260        let s = ActorState::Running;
2261        let dbg = format!("{s:?}");
2262        assert!(dbg.contains("Running"));
2263
2264        let s2 = s;
2265        assert_eq!(s, s2);
2266
2267        let s3 = s;
2268        assert_eq!(s, s3);
2269
2270        assert_ne!(ActorState::Created, ActorState::Stopped);
2271    }
2272
2273    #[test]
2274    fn mailbox_config_debug_clone_copy_default() {
2275        let c = MailboxConfig::default();
2276        let dbg = format!("{c:?}");
2277        assert!(dbg.contains("MailboxConfig"));
2278
2279        let c2 = c;
2280        assert_eq!(c2.capacity, c.capacity);
2281        assert_eq!(c2.backpressure, c.backpressure);
2282
2283        // Copy
2284        let c3 = c;
2285        assert_eq!(c3.capacity, c.capacity);
2286    }
2287}