Skip to main content

asupersync/
actor.rs

1//! Actor abstraction for region-owned, message-driven concurrency.
2//!
3//! Actors in Asupersync are region-owned tasks that process messages from a
4//! bounded mailbox. They integrate with the runtime's structured concurrency
5//! model:
6//!
7//! - **Region-owned**: Actors are spawned within a region and cannot outlive it.
8//! - **Cancel-safe mailbox**: Messages use the two-phase reserve/send pattern.
9//! - **Lifecycle hooks**: `on_start` and `on_stop` for initialization and cleanup.
10//!
11//! # Example
12//!
13//! ```ignore
14//! struct Counter {
15//!     count: u64,
16//! }
17//!
18//! impl Actor for Counter {
19//!     type Message = u64;
20//!
21//!     async fn handle(&mut self, _cx: &Cx, msg: u64) {
22//!         self.count += msg;
23//!     }
24//! }
25//!
26//! // In a scope:
27//! let (handle, stored) = scope.spawn_actor(
28//!     &mut state, &cx, Counter { count: 0 }, 32,
29//! )?;
30//! state.store_spawned_task(handle.task_id(), stored);
31//!
32//! // Send messages:
33//! handle.send(&cx, 5).await?;
34//! handle.send(&cx, 10).await?;
35//!
36//! // Stop the actor:
37//! handle.stop();
38//! let result = handle.join(&cx).await?;
39//! assert_eq!(result.count, 15);
40//! ```
41
42use std::future::Future;
43use std::pin::Pin;
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::Arc;
46
47use crate::channel::mpsc;
48use crate::channel::mpsc::SendError;
49use crate::cx::Cx;
50use crate::runtime::{JoinError, SpawnError};
51use crate::types::{CxInner, Outcome, RegionId, TaskId, Time};
52
53/// Unique identifier for an actor.
54///
55/// For now this is a thin wrapper around the actor task's `TaskId`, which already
56/// provides arena + generation semantics. Keeping a distinct type avoids mixing
57/// actor IDs with generic tasks at call sites.
58#[derive(Clone, Copy, PartialEq, Eq, Hash)]
59pub struct ActorId(TaskId);
60
61impl ActorId {
62    /// Create an actor ID from a task ID.
63    #[must_use]
64    pub const fn from_task(task_id: TaskId) -> Self {
65        Self(task_id)
66    }
67
68    /// Returns the underlying task ID.
69    #[must_use]
70    pub const fn task_id(self) -> TaskId {
71        self.0
72    }
73}
74
75impl std::fmt::Debug for ActorId {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_tuple("ActorId").field(&self.0).finish()
78    }
79}
80
81impl std::fmt::Display for ActorId {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        // Preserve the compact, deterministic formatting of TaskId while keeping
84        // a distinct type at the API level.
85        write!(f, "{}", self.0)
86    }
87}
88
89/// Lifecycle state for an actor.
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ActorState {
92    /// Actor constructed but not yet started.
93    Created,
94    /// Actor is running and processing messages.
95    Running,
96    /// Actor is stopping (cancellation requested / mailbox closed).
97    Stopping,
98    /// Actor has stopped and will not process further messages.
99    Stopped,
100}
101
102#[derive(Debug)]
103struct ActorStateCell {
104    state: AtomicU8,
105}
106
107impl ActorStateCell {
108    fn new(state: ActorState) -> Self {
109        Self {
110            state: AtomicU8::new(Self::encode(state)),
111        }
112    }
113
114    fn load(&self) -> ActorState {
115        Self::decode(self.state.load(Ordering::Acquire))
116    }
117
118    fn store(&self, state: ActorState) {
119        self.state.store(Self::encode(state), Ordering::Release);
120    }
121
122    const fn encode(state: ActorState) -> u8 {
123        match state {
124            ActorState::Created => 0,
125            ActorState::Running => 1,
126            ActorState::Stopping => 2,
127            ActorState::Stopped => 3,
128        }
129    }
130
131    const fn decode(value: u8) -> ActorState {
132        match value {
133            0 => ActorState::Created,
134            1 => ActorState::Running,
135            2 => ActorState::Stopping,
136            _ => ActorState::Stopped,
137        }
138    }
139}
140
141/// Internal runtime state for an actor.
142///
143/// This is intentionally lightweight and non-opinionated; higher-level actor
144/// features (mailbox policies, supervision trees, etc.) can extend this.
145struct ActorCell<M> {
146    mailbox: mpsc::Receiver<M>,
147    state: Arc<ActorStateCell>,
148}
149
150/// A message-driven actor that processes messages from a bounded mailbox.
151///
152/// Actors are the unit of stateful, message-driven concurrency. Each actor:
153/// - Owns mutable state (`self`)
154/// - Receives messages sequentially (no data races)
155/// - Runs inside a region (structured lifetime)
156///
157/// # Cancel Safety
158///
159/// When an actor is cancelled (region close, explicit abort), the runtime:
160/// 1. Closes the mailbox (no new messages accepted)
161/// 2. Calls `on_stop` for cleanup
162/// 3. Returns the actor state to the caller via `ActorHandle::join`
163pub trait Actor: Send + 'static {
164    /// The type of messages this actor can receive.
165    type Message: Send + 'static;
166
167    /// Called once when the actor starts, before processing any messages.
168    ///
169    /// Use this for initialization that requires the capability context.
170    /// The default implementation does nothing.
171    fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
172        Box::pin(async {})
173    }
174
175    /// Handle a single message.
176    ///
177    /// This is called sequentially for each message in the mailbox.
178    /// The actor has exclusive access to its state during handling.
179    fn handle(
180        &mut self,
181        cx: &Cx,
182        msg: Self::Message,
183    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
184
185    /// Called once when the actor is stopping, after the mailbox is drained.
186    ///
187    /// Use this for cleanup. The default implementation does nothing.
188    fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
189        Box::pin(async {})
190    }
191}
192
193/// Handle to a running actor, used to send messages and manage its lifecycle.
194///
195/// The handle owns:
196/// - A sender for the actor's mailbox
197/// - A task handle for join/abort operations
198///
199/// When the handle is dropped, the mailbox sender is dropped, which causes
200/// the actor loop to exit after processing remaining messages.
201#[derive(Debug)]
202pub struct ActorHandle<A: Actor> {
203    actor_id: ActorId,
204    sender: mpsc::Sender<A::Message>,
205    state: Arc<ActorStateCell>,
206    task_id: TaskId,
207    receiver: crate::channel::oneshot::Receiver<Result<A, JoinError>>,
208    inner: std::sync::Weak<std::sync::RwLock<CxInner>>,
209}
210
211impl<A: Actor> ActorHandle<A> {
212    /// Send a message to the actor using two-phase reserve/send.
213    ///
214    /// Returns an error if the actor has stopped or the mailbox is full.
215    pub async fn send(&self, cx: &Cx, msg: A::Message) -> Result<(), SendError<A::Message>> {
216        self.sender.send(cx, msg).await
217    }
218
219    /// Try to send a message without blocking.
220    ///
221    /// Returns `Err(SendError::Full(msg))` if the mailbox is full, or
222    /// `Err(SendError::Disconnected(msg))` if the actor has stopped.
223    pub fn try_send(&self, msg: A::Message) -> Result<(), SendError<A::Message>> {
224        self.sender.try_send(msg)
225    }
226
227    /// Returns a lightweight, clonable reference for sending messages.
228    #[must_use]
229    pub fn sender(&self) -> ActorRef<A::Message> {
230        ActorRef {
231            actor_id: self.actor_id,
232            sender: self.sender.clone(),
233            state: Arc::clone(&self.state),
234        }
235    }
236
237    /// Returns the actor's unique identifier.
238    #[must_use]
239    pub const fn actor_id(&self) -> ActorId {
240        self.actor_id
241    }
242
243    /// Returns the task ID of the actor's underlying task.
244    #[must_use]
245    pub fn task_id(&self) -> crate::types::TaskId {
246        self.task_id
247    }
248
249    /// Signals the actor to stop gracefully.
250    ///
251    /// Sets the actor state to `Stopping` and requests cancellation so the
252    /// actor loop will exit after the current message finishes processing.
253    /// The actor will call `on_stop` before returning.
254    ///
255    /// This is identical to [`abort`](Self::abort) — both request cancellation
256    /// and set the Stopping state. A future improvement could differentiate
257    /// them by having `stop()` drain buffered messages first.
258    pub fn stop(&self) {
259        self.state.store(ActorState::Stopping);
260        if let Some(inner) = self.inner.upgrade() {
261            if let Ok(mut guard) = inner.write() {
262                guard.cancel_requested = true;
263            }
264        }
265    }
266
267    /// Returns true if the actor has finished.
268    #[must_use]
269    pub fn is_finished(&self) -> bool {
270        self.receiver.is_ready()
271    }
272
273    /// Wait for the actor to finish and return its final state.
274    ///
275    /// Blocks until the actor loop completes (mailbox closed or cancelled),
276    /// then returns the actor's final state or a join error.
277    pub async fn join(&self, cx: &Cx) -> Result<A, JoinError> {
278        self.receiver.recv(cx).await.unwrap_or_else(|_| {
279            // The oneshot was dropped without sending — the actor task was
280            // cancelled or the runtime shut down. Propagate the actual
281            // cancel reason from the Cx if available; fall back to
282            // parent-cancelled since this is typically a scope teardown.
283            let reason = cx
284                .cancel_reason()
285                .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
286            Err(JoinError::Cancelled(reason))
287        })
288    }
289
290    /// Request the actor to stop immediately by aborting its task.
291    ///
292    /// Sets `cancel_requested` on the actor's context, causing the actor loop
293    /// to exit at the next cancellation check point. The actor will call
294    /// `on_stop` before returning.
295    ///
296    /// Currently identical to [`stop`](Self::stop). A future improvement
297    /// should differentiate them: `stop()` would drain buffered messages
298    /// while `abort()` exits immediately.
299    pub fn abort(&self) {
300        self.state.store(ActorState::Stopping);
301        if let Some(inner) = self.inner.upgrade() {
302            if let Ok(mut guard) = inner.write() {
303                guard.cancel_requested = true;
304            }
305        }
306    }
307}
308
309/// A lightweight, clonable reference to an actor's mailbox.
310///
311/// Use this to send messages to an actor from multiple locations without
312/// needing to share the `ActorHandle`.
313#[derive(Debug)]
314pub struct ActorRef<M> {
315    actor_id: ActorId,
316    sender: mpsc::Sender<M>,
317    state: Arc<ActorStateCell>,
318}
319
320// Manual Clone impl without requiring M: Clone, since all fields are
321// independently clonable (ActorId is Copy, Sender<M> clones without M: Clone,
322// and Arc is always Clone).
323impl<M> Clone for ActorRef<M> {
324    fn clone(&self) -> Self {
325        Self {
326            actor_id: self.actor_id,
327            sender: self.sender.clone(),
328            state: Arc::clone(&self.state),
329        }
330    }
331}
332
333impl<M: Send + 'static> ActorRef<M> {
334    /// Send a message to the actor.
335    pub async fn send(&self, cx: &Cx, msg: M) -> Result<(), SendError<M>> {
336        self.sender.send(cx, msg).await
337    }
338
339    /// Reserve a slot in the mailbox (two-phase send: reserve -> commit).
340    #[must_use]
341    pub fn reserve<'a>(&'a self, cx: &'a Cx) -> mpsc::Reserve<'a, M> {
342        self.sender.reserve(cx)
343    }
344
345    /// Try to send a message without blocking.
346    pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
347        self.sender.try_send(msg)
348    }
349
350    /// Returns true if the actor has stopped (mailbox closed).
351    #[must_use]
352    pub fn is_closed(&self) -> bool {
353        self.sender.is_closed()
354    }
355
356    /// Returns true if the actor is still alive (not fully stopped).
357    ///
358    /// Note: This is best-effort. The definitive shutdown signal is `ActorHandle::join()`.
359    #[must_use]
360    pub fn is_alive(&self) -> bool {
361        self.state.load() != ActorState::Stopped
362    }
363
364    /// Returns the actor's unique identifier.
365    #[must_use]
366    pub const fn actor_id(&self) -> ActorId {
367        self.actor_id
368    }
369}
370
371// ============================================================================
372// ActorContext: Actor-Specific Capability Extension
373// ============================================================================
374
375/// Configuration for actor mailbox.
376#[derive(Debug, Clone, Copy)]
377pub struct MailboxConfig {
378    /// Maximum number of messages the mailbox can hold.
379    pub capacity: usize,
380    /// Whether to use backpressure (block senders) or drop oldest messages.
381    pub backpressure: bool,
382}
383
384impl Default for MailboxConfig {
385    fn default() -> Self {
386        Self {
387            capacity: DEFAULT_MAILBOX_CAPACITY,
388            backpressure: true,
389        }
390    }
391}
392
393impl MailboxConfig {
394    /// Create a mailbox config with the specified capacity.
395    #[must_use]
396    pub const fn with_capacity(capacity: usize) -> Self {
397        Self {
398            capacity,
399            backpressure: true,
400        }
401    }
402}
403
404/// Messages that can be sent to a supervisor about child lifecycle events.
405#[derive(Debug, Clone)]
406pub enum SupervisorMessage {
407    /// A supervised child actor has failed.
408    ChildFailed {
409        /// The ID of the failed child.
410        child_id: ActorId,
411        /// Description of the failure.
412        reason: String,
413    },
414    /// A supervised child actor has stopped normally.
415    ChildStopped {
416        /// The ID of the stopped child.
417        child_id: ActorId,
418    },
419}
420
421/// Actor-specific capability context extending [`Cx`].
422///
423/// Provides actors with access to:
424/// - Self-reference for tell() patterns
425/// - Child management for supervision
426/// - Self-termination controls
427/// - Parent reference for escalation
428///
429/// All [`Cx`] methods are available through [`Deref`].
430///
431/// # Example
432///
433/// ```ignore
434/// async fn handle(&mut self, ctx: &ActorContext<'_, MyMessage>, msg: MyMessage) {
435///     // Access Cx methods directly
436///     if ctx.is_cancel_requested() {
437///         return;
438///     }
439///
440///     // Use actor-specific capabilities
441///     let my_id = ctx.self_actor_id();
442///     ctx.trace("handling message");
443/// }
444/// ```
445pub struct ActorContext<'a, M: Send + 'static> {
446    /// Underlying capability context.
447    cx: &'a Cx,
448    /// Reference to this actor's mailbox sender.
449    self_ref: ActorRef<M>,
450    /// This actor's unique identifier.
451    actor_id: ActorId,
452    /// Parent supervisor reference (None for root actors).
453    parent: Option<ActorRef<SupervisorMessage>>,
454    /// IDs of children currently supervised by this actor.
455    children: Vec<ActorId>,
456    /// Whether this actor has been requested to stop.
457    stopping: bool,
458}
459
460#[allow(clippy::elidable_lifetime_names)]
461impl<'a, M: Send + 'static> ActorContext<'a, M> {
462    /// Create a new actor context.
463    ///
464    /// This is typically called internally by the actor runtime.
465    #[must_use]
466    pub fn new(
467        cx: &'a Cx,
468        self_ref: ActorRef<M>,
469        actor_id: ActorId,
470        parent: Option<ActorRef<SupervisorMessage>>,
471    ) -> Self {
472        Self {
473            cx,
474            self_ref,
475            actor_id,
476            parent,
477            children: Vec::new(),
478            stopping: false,
479        }
480    }
481
482    /// Returns this actor's unique identifier.
483    ///
484    /// Unlike `self_ref()`, this avoids cloning the actor reference and is
485    /// useful for logging, debugging, or identity comparisons.
486    #[must_use]
487    pub const fn self_actor_id(&self) -> ActorId {
488        self.actor_id
489    }
490
491    /// Returns the underlying actor ID (alias for `self_actor_id`).
492    #[must_use]
493    pub const fn actor_id(&self) -> ActorId {
494        self.actor_id
495    }
496
497    // ========================================================================
498    // Child Management Methods
499    // ========================================================================
500
501    /// Register a child actor as supervised by this actor.
502    ///
503    /// Called internally when spawning supervised children.
504    pub fn register_child(&mut self, child_id: ActorId) {
505        self.children.push(child_id);
506    }
507
508    /// Unregister a child actor (after it has stopped).
509    ///
510    /// Returns true if the child was found and removed.
511    pub fn unregister_child(&mut self, child_id: ActorId) -> bool {
512        if let Some(pos) = self.children.iter().position(|&id| id == child_id) {
513            self.children.swap_remove(pos);
514            true
515        } else {
516            false
517        }
518    }
519
520    /// Returns the list of currently supervised child actor IDs.
521    #[must_use]
522    pub fn children(&self) -> &[ActorId] {
523        &self.children
524    }
525
526    /// Returns true if this actor has any supervised children.
527    #[must_use]
528    pub fn has_children(&self) -> bool {
529        !self.children.is_empty()
530    }
531
532    /// Returns the number of supervised children.
533    #[must_use]
534    pub fn child_count(&self) -> usize {
535        self.children.len()
536    }
537
538    // ========================================================================
539    // Self-Termination Methods
540    // ========================================================================
541
542    /// Request this actor to stop gracefully.
543    ///
544    /// Sets the stopping flag. The actor loop will exit after the current
545    /// message is processed and the mailbox is drained.
546    pub fn stop_self(&mut self) {
547        self.stopping = true;
548    }
549
550    /// Returns true if this actor has been requested to stop.
551    #[must_use]
552    pub fn is_stopping(&self) -> bool {
553        self.stopping
554    }
555
556    // ========================================================================
557    // Parent Interaction Methods
558    // ========================================================================
559
560    /// Returns a reference to the parent supervisor, if any.
561    ///
562    /// Root actors spawned without supervision return `None`.
563    #[must_use]
564    pub fn parent(&self) -> Option<&ActorRef<SupervisorMessage>> {
565        self.parent.as_ref()
566    }
567
568    /// Returns true if this actor has a parent supervisor.
569    #[must_use]
570    pub fn has_parent(&self) -> bool {
571        self.parent.is_some()
572    }
573
574    /// Escalate an error to the parent supervisor.
575    ///
576    /// Sends a `SupervisorMessage::ChildFailed` to the parent if one exists.
577    /// Does nothing if this is a root actor.
578    pub async fn escalate(&self, reason: String) {
579        if let Some(parent) = &self.parent {
580            let msg = SupervisorMessage::ChildFailed {
581                child_id: self.actor_id,
582                reason,
583            };
584            // Best-effort: ignore send failures (parent may have stopped)
585            let _ = parent.send(self.cx, msg).await;
586        }
587    }
588
589    // ========================================================================
590    // Cx Delegation Methods
591    // ========================================================================
592
593    /// Check for cancellation and return early if requested.
594    ///
595    /// This is a convenience method that checks both actor stopping
596    /// and Cx cancellation.
597    #[allow(clippy::result_large_err)]
598    pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
599        if self.stopping {
600            let reason = crate::types::CancelReason::user("actor stopping")
601                .with_region(self.cx.region_id())
602                .with_task(self.cx.task_id());
603            return Err(crate::error::Error::cancelled(&reason));
604        }
605        self.cx.checkpoint()
606    }
607
608    /// Returns true if cancellation has been requested.
609    ///
610    /// Checks both actor stopping flag and Cx cancellation.
611    #[must_use]
612    pub fn is_cancel_requested(&self) -> bool {
613        self.stopping || self.cx.is_cancel_requested()
614    }
615
616    /// Returns the current budget.
617    #[must_use]
618    pub fn budget(&self) -> crate::types::Budget {
619        self.cx.budget()
620    }
621
622    /// Returns the deadline from the budget, if set.
623    #[must_use]
624    pub fn deadline(&self) -> Option<Time> {
625        self.cx.budget().deadline
626    }
627
628    /// Emit a trace event.
629    pub fn trace(&self, event: &str) {
630        self.cx.trace(event);
631    }
632
633    /// Returns a clonable reference to this actor's mailbox.
634    ///
635    /// Use this to give other actors a way to send messages to this actor.
636    /// The `ActorRef<M>` type is always Clone regardless of whether M is Clone.
637    #[must_use]
638    pub fn self_ref(&self) -> ActorRef<M> {
639        self.self_ref.clone()
640    }
641
642    /// Returns a reference to the underlying Cx.
643    #[must_use]
644    pub const fn cx(&self) -> &Cx {
645        self.cx
646    }
647}
648
649impl<M: Send + 'static> std::ops::Deref for ActorContext<'_, M> {
650    type Target = Cx;
651
652    fn deref(&self) -> &Self::Target {
653        self.cx
654    }
655}
656
657impl<M: Send + 'static> std::fmt::Debug for ActorContext<'_, M> {
658    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659        f.debug_struct("ActorContext")
660            .field("actor_id", &self.actor_id)
661            .field("children", &self.children.len())
662            .field("stopping", &self.stopping)
663            .field("has_parent", &self.parent.is_some())
664            .finish()
665    }
666}
667
668/// The default mailbox capacity for actors.
669pub const DEFAULT_MAILBOX_CAPACITY: usize = 64;
670
671/// Internal: runs the actor message loop.
672///
673/// This function is the core of the actor runtime. It:
674/// 1. Calls `on_start`
675/// 2. Receives and handles messages until the mailbox is closed or cancelled
676/// 3. Drains remaining buffered messages (no silent drops)
677/// 4. Calls `on_stop`
678/// 5. Returns the actor state
679async fn run_actor_loop<A: Actor>(mut actor: A, cx: Cx, cell: &ActorCell<A::Message>) -> A {
680    use crate::tracing_compat::debug;
681
682    cell.state.store(ActorState::Running);
683
684    // Phase 1: Initialization
685    cx.trace("actor::on_start");
686    actor.on_start(&cx).await;
687
688    // Phase 2: Message loop
689    loop {
690        // Check for cancellation
691        if cx.is_cancel_requested() {
692            cx.trace("actor::cancel_requested");
693            break;
694        }
695
696        match cell.mailbox.recv(&cx).await {
697            Ok(msg) => {
698                actor.handle(&cx, msg).await;
699            }
700            Err(crate::channel::mpsc::RecvError::Disconnected) => {
701                // All senders dropped - graceful shutdown
702                cx.trace("actor::mailbox_disconnected");
703                break;
704            }
705            Err(crate::channel::mpsc::RecvError::Cancelled) => {
706                // Cancellation requested
707                cx.trace("actor::recv_cancelled");
708                break;
709            }
710            Err(crate::channel::mpsc::RecvError::Empty) => {
711                // Shouldn't happen with recv() (only try_recv), but handle gracefully
712                break;
713            }
714        }
715    }
716
717    cell.state.store(ActorState::Stopping);
718
719    // Phase 3: Drain remaining buffered messages.
720    // Two-phase mailbox guarantee: no message silently dropped. Every message
721    // that was successfully sent (committed) into the mailbox will be handled
722    // before the actor's on_stop runs. We cap the drain to the mailbox
723    // capacity to avoid unbounded work if the mailbox is being filled
724    // concurrently during shutdown.
725    let drain_limit = cell.mailbox.capacity() as u64;
726    let mut drained: u64 = 0;
727    while let Ok(msg) = cell.mailbox.try_recv() {
728        actor.handle(&cx, msg).await;
729        drained += 1;
730        if drained >= drain_limit {
731            break;
732        }
733    }
734    if drained > 0 {
735        debug!(drained = drained, "actor::mailbox_drained");
736        cx.trace("actor::mailbox_drained");
737    }
738
739    // Phase 4: Cleanup
740    cx.trace("actor::on_stop");
741    actor.on_stop(&cx).await;
742
743    actor
744}
745
746// Extension for Scope to spawn actors
747impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
748    /// Spawns a new actor in this scope with the given mailbox capacity.
749    ///
750    /// The actor runs as a region-owned task. Messages are delivered through
751    /// a bounded MPSC channel with two-phase send semantics.
752    ///
753    /// # Arguments
754    ///
755    /// * `state` - Runtime state for task creation
756    /// * `cx` - Capability context
757    /// * `actor` - The actor instance
758    /// * `mailbox_capacity` - Bounded mailbox size
759    ///
760    /// # Returns
761    ///
762    /// A tuple of `(ActorHandle, StoredTask)`. The `StoredTask` must be
763    /// registered with the runtime via `state.store_spawned_task()`.
764    pub fn spawn_actor<A: Actor>(
765        &self,
766        state: &mut crate::runtime::state::RuntimeState,
767        cx: &Cx,
768        actor: A,
769        mailbox_capacity: usize,
770    ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError> {
771        use crate::channel::oneshot;
772        use crate::cx::scope::CatchUnwind;
773        use crate::runtime::stored_task::StoredTask;
774        use crate::tracing_compat::{debug, debug_span};
775
776        // Create the actor's mailbox
777        let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
778
779        // Create oneshot for returning the actor state
780        let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
781
782        // Create task record
783        let task_id = self.create_task_record(state)?;
784        let actor_id = ActorId::from_task(task_id);
785        let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
786
787        let _span = debug_span!(
788            "actor_spawn",
789            task_id = ?task_id,
790            region_id = ?self.region_id(),
791            mailbox_capacity = mailbox_capacity,
792        )
793        .entered();
794        debug!(
795            task_id = ?task_id,
796            region_id = ?self.region_id(),
797            mailbox_capacity = mailbox_capacity,
798            "actor spawned"
799        );
800
801        // Create child context
802        let child_observability = cx.child_observability(self.region_id(), task_id);
803        let child_entropy = cx.child_entropy(task_id);
804        let io_driver = state.io_driver_handle();
805        let child_cx = Cx::new_with_observability(
806            self.region_id(),
807            task_id,
808            self.budget(),
809            Some(child_observability),
810            io_driver,
811            Some(child_entropy),
812        )
813        .with_blocking_pool_handle(cx.blocking_pool_handle());
814
815        // Link Cx to TaskRecord
816        if let Some(record) = state.task_mut(task_id) {
817            record.set_cx_inner(child_cx.inner.clone());
818            record.set_cx(child_cx.clone());
819        }
820
821        let cx_for_send = child_cx.clone();
822        let inner_weak = Arc::downgrade(&child_cx.inner);
823        let state_for_task = Arc::clone(&actor_state);
824
825        let cell = ActorCell {
826            mailbox: msg_rx,
827            state: Arc::clone(&actor_state),
828        };
829
830        // Create the actor loop future
831        let wrapped = async move {
832            let result = CatchUnwind(Box::pin(run_actor_loop(actor, child_cx, &cell))).await;
833            match result {
834                Ok(actor_final) => {
835                    let _ = result_tx.send(&cx_for_send, Ok(actor_final));
836                }
837                Err(payload) => {
838                    let msg = crate::cx::scope::payload_to_string(&payload);
839                    let _ = result_tx.send(
840                        &cx_for_send,
841                        Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
842                    );
843                }
844            }
845            state_for_task.store(ActorState::Stopped);
846            Outcome::Ok(())
847        };
848
849        let stored = StoredTask::new_with_id(wrapped, task_id);
850
851        let handle = ActorHandle {
852            actor_id,
853            sender: msg_tx,
854            state: actor_state,
855            task_id,
856            receiver: result_rx,
857            inner: inner_weak,
858        };
859
860        Ok((handle, stored))
861    }
862
863    /// Spawns a supervised actor with automatic restart on failure.
864    ///
865    /// Unlike `spawn_actor`, this method takes a factory closure that can
866    /// produce new actor instances for restarts. The mailbox persists across
867    /// restarts, so messages sent during restart are buffered and processed
868    /// by the new instance.
869    ///
870    /// # Arguments
871    ///
872    /// * `state` - Runtime state for task creation
873    /// * `cx` - Capability context
874    /// * `factory` - Closure that creates actor instances (called on each restart)
875    /// * `strategy` - Supervision strategy (Stop, Restart, Escalate)
876    /// * `mailbox_capacity` - Bounded mailbox size
877    pub fn spawn_supervised_actor<A, F>(
878        &self,
879        state: &mut crate::runtime::state::RuntimeState,
880        cx: &Cx,
881        mut factory: F,
882        strategy: crate::supervision::SupervisionStrategy,
883        mailbox_capacity: usize,
884    ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError>
885    where
886        A: Actor,
887        F: FnMut() -> A + Send + 'static,
888    {
889        use crate::channel::oneshot;
890        use crate::runtime::stored_task::StoredTask;
891        use crate::supervision::Supervisor;
892        use crate::tracing_compat::{debug, debug_span};
893
894        let actor = factory();
895        let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
896        let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
897        let task_id = self.create_task_record(state)?;
898        let actor_id = ActorId::from_task(task_id);
899        let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
900
901        let _span = debug_span!(
902            "supervised_actor_spawn",
903            task_id = ?task_id,
904            region_id = ?self.region_id(),
905            mailbox_capacity = mailbox_capacity,
906        )
907        .entered();
908        debug!(
909            task_id = ?task_id,
910            region_id = ?self.region_id(),
911            "supervised actor spawned"
912        );
913
914        let child_observability = cx.child_observability(self.region_id(), task_id);
915        let child_entropy = cx.child_entropy(task_id);
916        let io_driver = state.io_driver_handle();
917        let child_cx = Cx::new_with_observability(
918            self.region_id(),
919            task_id,
920            self.budget(),
921            Some(child_observability),
922            io_driver,
923            Some(child_entropy),
924        );
925
926        if let Some(record) = state.task_mut(task_id) {
927            record.set_cx_inner(child_cx.inner.clone());
928            record.set_cx(child_cx.clone());
929        }
930
931        let cx_for_send = child_cx.clone();
932        let inner_weak = Arc::downgrade(&child_cx.inner);
933        let region_id = self.region_id();
934        let state_for_task = Arc::clone(&actor_state);
935
936        let cell = ActorCell {
937            mailbox: msg_rx,
938            state: Arc::clone(&actor_state),
939        };
940
941        let wrapped = async move {
942            let result = run_supervised_loop(
943                actor,
944                &mut factory,
945                child_cx,
946                &cell,
947                Supervisor::new(strategy),
948                task_id,
949                region_id,
950            )
951            .await;
952            let _ = result_tx.send(&cx_for_send, result);
953            state_for_task.store(ActorState::Stopped);
954            Outcome::Ok(())
955        };
956
957        let stored = StoredTask::new_with_id(wrapped, task_id);
958
959        let handle = ActorHandle {
960            actor_id,
961            sender: msg_tx,
962            state: actor_state,
963            task_id,
964            receiver: result_rx,
965            inner: inner_weak,
966        };
967
968        Ok((handle, stored))
969    }
970}
971
972/// Outcome of a supervised actor run.
973#[derive(Debug)]
974pub enum SupervisedOutcome {
975    /// Actor stopped normally (no failure).
976    Stopped,
977    /// Actor stopped after restart budget exhaustion.
978    RestartBudgetExhausted {
979        /// Total restarts before budget was exhausted.
980        total_restarts: u32,
981    },
982    /// Failure was escalated to parent region.
983    Escalated,
984}
985
986/// Internal: runs a supervised actor loop with restart support.
987///
988/// The mailbox receiver is shared across restarts — messages sent while the
989/// actor is restarting are buffered and processed by the new instance.
990async fn run_supervised_loop<A, F>(
991    initial_actor: A,
992    factory: &mut F,
993    cx: Cx,
994    cell: &ActorCell<A::Message>,
995    mut supervisor: crate::supervision::Supervisor,
996    task_id: TaskId,
997    region_id: RegionId,
998) -> Result<A, JoinError>
999where
1000    A: Actor,
1001    F: FnMut() -> A,
1002{
1003    use crate::cx::scope::CatchUnwind;
1004    use crate::supervision::SupervisionDecision;
1005    use crate::types::Outcome;
1006
1007    let mut current_actor = initial_actor;
1008
1009    loop {
1010        // Run the actor until it finishes (normally or via panic)
1011        let result = CatchUnwind(Box::pin(run_actor_loop(current_actor, cx.clone(), cell))).await;
1012
1013        match result {
1014            Ok(actor_final) => {
1015                // Actor completed normally — no supervision needed
1016                return Ok(actor_final);
1017            }
1018            Err(payload) => {
1019                // Actor panicked — consult supervisor.
1020                // We report this as Failed (not Panicked) because actor crashes
1021                // are the expected failure mode for supervision. The Erlang/OTP
1022                // model restarts on crashes; Outcome::Panicked would always Stop.
1023                let msg = crate::cx::scope::payload_to_string(&payload);
1024                cx.trace("supervised_actor::failure");
1025
1026                let outcome = Outcome::err(());
1027                let now = cx.timer_driver().map_or(0, |td| td.now().as_nanos());
1028                let decision = supervisor.on_failure(task_id, region_id, None, &outcome, now);
1029
1030                match decision {
1031                    SupervisionDecision::Restart { .. } => {
1032                        cx.trace("supervised_actor::restart");
1033                        current_actor = factory();
1034                    }
1035                    SupervisionDecision::Stop { .. } => {
1036                        cx.trace("supervised_actor::stopped");
1037                        return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1038                    }
1039                    SupervisionDecision::Escalate { .. } => {
1040                        cx.trace("supervised_actor::escalated");
1041                        return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1042                    }
1043                }
1044            }
1045        }
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    use super::*;
1052    use crate::runtime::state::RuntimeState;
1053    use crate::types::policy::FailFast;
1054    use crate::types::Budget;
1055
1056    fn init_test(name: &str) {
1057        crate::test_utils::init_test_logging();
1058        crate::test_phase!(name);
1059    }
1060
1061    /// Simple counter actor for testing.
1062    struct Counter {
1063        count: u64,
1064        started: bool,
1065        stopped: bool,
1066    }
1067
1068    impl Counter {
1069        fn new() -> Self {
1070            Self {
1071                count: 0,
1072                started: false,
1073                stopped: false,
1074            }
1075        }
1076    }
1077
1078    impl Actor for Counter {
1079        type Message = u64;
1080
1081        fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1082            self.started = true;
1083            Box::pin(async {})
1084        }
1085
1086        fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1087            self.count += msg;
1088            Box::pin(async {})
1089        }
1090
1091        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1092            self.stopped = true;
1093            Box::pin(async {})
1094        }
1095    }
1096
1097    fn assert_actor<A: Actor>() {}
1098
1099    #[test]
1100    fn actor_trait_object_safety() {
1101        init_test("actor_trait_object_safety");
1102
1103        // Verify Counter implements Actor with the right bounds
1104        assert_actor::<Counter>();
1105
1106        crate::test_complete!("actor_trait_object_safety");
1107    }
1108
1109    #[test]
1110    fn actor_handle_creation() {
1111        init_test("actor_handle_creation");
1112
1113        let mut state = RuntimeState::new();
1114        let root = state.create_root_region(Budget::INFINITE);
1115        let cx: Cx = Cx::for_testing();
1116        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1117
1118        let result = scope.spawn_actor(&mut state, &cx, Counter::new(), 32);
1119        assert!(result.is_ok(), "spawn_actor should succeed");
1120
1121        let (handle, stored) = result.unwrap();
1122        state.store_spawned_task(handle.task_id(), stored);
1123
1124        // Handle should have valid task ID
1125        let _tid = handle.task_id();
1126
1127        // Actor should not be finished yet (not polled)
1128        assert!(!handle.is_finished());
1129
1130        crate::test_complete!("actor_handle_creation");
1131    }
1132
1133    #[test]
1134    fn actor_id_generation_distinct() {
1135        init_test("actor_id_generation_distinct");
1136
1137        let id1 = ActorId::from_task(TaskId::new_for_test(1, 1));
1138        let id2 = ActorId::from_task(TaskId::new_for_test(1, 2));
1139        assert!(id1 != id2, "generation must distinguish actor reuse");
1140
1141        crate::test_complete!("actor_id_generation_distinct");
1142    }
1143
1144    #[test]
1145    fn actor_ref_is_cloneable() {
1146        init_test("actor_ref_is_cloneable");
1147
1148        let mut state = RuntimeState::new();
1149        let root = state.create_root_region(Budget::INFINITE);
1150        let cx: Cx = Cx::for_testing();
1151        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1152
1153        let (handle, stored) = scope
1154            .spawn_actor(&mut state, &cx, Counter::new(), 32)
1155            .unwrap();
1156        state.store_spawned_task(handle.task_id(), stored);
1157
1158        // Get multiple refs
1159        let ref1 = handle.sender();
1160        let ref2 = ref1.clone();
1161
1162        // Actor identity is preserved across clones
1163        assert_eq!(ref1.actor_id(), handle.actor_id());
1164        assert_eq!(ref2.actor_id(), handle.actor_id());
1165
1166        // Actor is alive at creation time (even before first poll)
1167        assert!(ref1.is_alive());
1168        assert!(ref2.is_alive());
1169
1170        // Both should be open
1171        assert!(!ref1.is_closed());
1172        assert!(!ref2.is_closed());
1173
1174        crate::test_complete!("actor_ref_is_cloneable");
1175    }
1176
1177    // ---- E2E Actor Scenarios ----
1178
1179    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1180
1181    /// Observable counter actor: writes final count to shared state during on_stop.
1182    /// Used by E2E tests to verify actor behavior without needing join().
1183    struct ObservableCounter {
1184        count: u64,
1185        on_stop_count: Arc<AtomicU64>,
1186        started: Arc<AtomicBool>,
1187        stopped: Arc<AtomicBool>,
1188    }
1189
1190    impl ObservableCounter {
1191        fn new(
1192            on_stop_count: Arc<AtomicU64>,
1193            started: Arc<AtomicBool>,
1194            stopped: Arc<AtomicBool>,
1195        ) -> Self {
1196            Self {
1197                count: 0,
1198                on_stop_count,
1199                started,
1200                stopped,
1201            }
1202        }
1203    }
1204
1205    impl Actor for ObservableCounter {
1206        type Message = u64;
1207
1208        fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1209            self.started.store(true, Ordering::SeqCst);
1210            Box::pin(async {})
1211        }
1212
1213        fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1214            self.count += msg;
1215            Box::pin(async {})
1216        }
1217
1218        fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1219            self.on_stop_count.store(self.count, Ordering::SeqCst);
1220            self.stopped.store(true, Ordering::SeqCst);
1221            Box::pin(async {})
1222        }
1223    }
1224
1225    fn observable_state() -> (Arc<AtomicU64>, Arc<AtomicBool>, Arc<AtomicBool>) {
1226        (
1227            Arc::new(AtomicU64::new(u64::MAX)),
1228            Arc::new(AtomicBool::new(false)),
1229            Arc::new(AtomicBool::new(false)),
1230        )
1231    }
1232
1233    /// E2E: Actor processes all messages sent before channel disconnect.
1234    /// Verifies: messages delivered, on_start called, on_stop called.
1235    #[test]
1236    fn actor_processes_all_messages() {
1237        init_test("actor_processes_all_messages");
1238
1239        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1240        let region = runtime.state.create_root_region(Budget::INFINITE);
1241        let cx: Cx = Cx::for_testing();
1242        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1243
1244        let (on_stop_count, started, stopped) = observable_state();
1245        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1246
1247        let (handle, stored) = scope
1248            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1249            .unwrap();
1250        let task_id = handle.task_id();
1251        runtime.state.store_spawned_task(task_id, stored);
1252
1253        // Pre-fill mailbox with 5 messages (each adding 1)
1254        for _ in 0..5 {
1255            handle.try_send(1).unwrap();
1256        }
1257
1258        // Drop handle to disconnect channel — actor will process buffered
1259        // messages via recv, then see Disconnected and stop gracefully.
1260        drop(handle);
1261
1262        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1263        runtime.run_until_quiescent();
1264
1265        assert_eq!(
1266            on_stop_count.load(Ordering::SeqCst),
1267            5,
1268            "all messages processed"
1269        );
1270        assert!(started.load(Ordering::SeqCst), "on_start was called");
1271        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1272
1273        crate::test_complete!("actor_processes_all_messages");
1274    }
1275
1276    /// E2E: Mailbox drain on cancellation.
1277    /// Pre-fills mailbox, cancels actor before it runs, verifies all messages
1278    /// are still processed during the drain phase (no silent drops).
1279    #[test]
1280    fn actor_drains_mailbox_on_cancel() {
1281        init_test("actor_drains_mailbox_on_cancel");
1282
1283        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1284        let region = runtime.state.create_root_region(Budget::INFINITE);
1285        let cx: Cx = Cx::for_testing();
1286        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1287
1288        let (on_stop_count, started, stopped) = observable_state();
1289        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1290
1291        let (handle, stored) = scope
1292            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1293            .unwrap();
1294        let task_id = handle.task_id();
1295        runtime.state.store_spawned_task(task_id, stored);
1296
1297        // Pre-fill mailbox with 5 messages
1298        for _ in 0..5 {
1299            handle.try_send(1).unwrap();
1300        }
1301
1302        // Cancel the actor BEFORE running.
1303        // The actor loop will: on_start → check cancel → break → drain → on_stop
1304        handle.stop();
1305
1306        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1307        runtime.run_until_quiescent();
1308
1309        // All 5 messages processed during drain phase
1310        assert_eq!(
1311            on_stop_count.load(Ordering::SeqCst),
1312            5,
1313            "drain processed all messages"
1314        );
1315        assert!(started.load(Ordering::SeqCst), "on_start was called");
1316        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1317
1318        crate::test_complete!("actor_drains_mailbox_on_cancel");
1319    }
1320
1321    /// E2E: ActorRef liveness tracks actor lifecycle (Created -> Stopping -> Stopped).
1322    #[test]
1323    fn actor_ref_is_alive_transitions() {
1324        init_test("actor_ref_is_alive_transitions");
1325
1326        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1327        let region = runtime.state.create_root_region(Budget::INFINITE);
1328        let cx: Cx = Cx::for_testing();
1329        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1330
1331        let (on_stop_count, started, stopped) = observable_state();
1332        let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1333
1334        let (handle, stored) = scope
1335            .spawn_actor(&mut runtime.state, &cx, actor, 32)
1336            .unwrap();
1337        let task_id = handle.task_id();
1338        runtime.state.store_spawned_task(task_id, stored);
1339
1340        let actor_ref = handle.sender();
1341        assert!(actor_ref.is_alive(), "created actor should be alive");
1342        assert_eq!(actor_ref.actor_id(), handle.actor_id());
1343
1344        handle.stop();
1345        assert!(actor_ref.is_alive(), "stopping actor is still alive");
1346
1347        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1348        runtime.run_until_quiescent();
1349
1350        assert!(
1351            handle.is_finished(),
1352            "actor should be finished after stop + run"
1353        );
1354        assert!(!actor_ref.is_alive(), "finished actor is not alive");
1355
1356        // Sanity: the actor ran its hooks.
1357        assert!(started.load(Ordering::SeqCst), "on_start was called");
1358        assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1359        assert_ne!(
1360            on_stop_count.load(Ordering::SeqCst),
1361            u64::MAX,
1362            "on_stop_count updated"
1363        );
1364
1365        crate::test_complete!("actor_ref_is_alive_transitions");
1366    }
1367
1368    /// E2E: Supervised actor restarts on panic within budget.
1369    /// Actor panics on messages >= threshold, supervisor restarts it.
1370    /// After restart, actor processes subsequent normal messages.
1371    #[test]
1372    fn supervised_actor_restarts_on_panic() {
1373        use std::sync::atomic::AtomicU32;
1374
1375        struct PanickingCounter {
1376            count: u64,
1377            panic_on: u64,
1378            final_count: Arc<AtomicU64>,
1379            restart_count: Arc<AtomicU32>,
1380        }
1381
1382        impl Actor for PanickingCounter {
1383            type Message = u64;
1384
1385            fn handle(
1386                &mut self,
1387                _cx: &Cx,
1388                msg: u64,
1389            ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1390                assert!(msg != self.panic_on, "threshold exceeded: {msg}");
1391                self.count += msg;
1392                Box::pin(async {})
1393            }
1394
1395            fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1396                self.final_count.store(self.count, Ordering::SeqCst);
1397                Box::pin(async {})
1398            }
1399        }
1400
1401        init_test("supervised_actor_restarts_on_panic");
1402
1403        let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1404        let region = runtime.state.create_root_region(Budget::INFINITE);
1405        let cx: Cx = Cx::for_testing();
1406        let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1407
1408        let final_count = Arc::new(AtomicU64::new(u64::MAX));
1409        let restart_count = Arc::new(AtomicU32::new(0));
1410        let fc = final_count.clone();
1411        let rc = restart_count.clone();
1412
1413        let strategy = crate::supervision::SupervisionStrategy::Restart(
1414            crate::supervision::RestartConfig::new(3, std::time::Duration::from_mins(1)),
1415        );
1416
1417        let (handle, stored) = scope
1418            .spawn_supervised_actor(
1419                &mut runtime.state,
1420                &cx,
1421                move || {
1422                    rc.fetch_add(1, Ordering::SeqCst);
1423                    PanickingCounter {
1424                        count: 0,
1425                        panic_on: 999,
1426                        final_count: fc.clone(),
1427                        restart_count: rc.clone(),
1428                    }
1429                },
1430                strategy,
1431                32,
1432            )
1433            .unwrap();
1434        let task_id = handle.task_id();
1435        runtime.state.store_spawned_task(task_id, stored);
1436
1437        // Message sequence:
1438        // 1. Normal message (count += 1)
1439        // 2. Panic trigger (actor panics, supervisor restarts)
1440        // 3. Normal message after restart (count += 1 on new instance)
1441        handle.try_send(1).unwrap();
1442        handle.try_send(999).unwrap(); // triggers panic
1443        handle.try_send(1).unwrap(); // processed by restarted actor
1444
1445        // Drop handle to disconnect channel after the restarted actor processes messages
1446        drop(handle);
1447
1448        runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1449        runtime.run_until_quiescent();
1450
1451        // Factory was called: once for initial + once for restart = fetch_add called twice
1452        // (first call was during spawn_supervised_actor, so count starts at 1;
1453        //  restart increments to 2)
1454        assert!(
1455            restart_count.load(Ordering::SeqCst) >= 2,
1456            "factory should have been called at least twice (initial + restart), got {}",
1457            restart_count.load(Ordering::SeqCst)
1458        );
1459
1460        // After restart, actor processes msg=1, then stops => final_count=1
1461        assert_eq!(
1462            final_count.load(Ordering::SeqCst),
1463            1,
1464            "restarted actor should have processed the post-panic message"
1465        );
1466
1467        crate::test_complete!("supervised_actor_restarts_on_panic");
1468    }
1469
1470    /// E2E: Deterministic replay — same seed produces same actor execution.
1471    #[test]
1472    fn actor_deterministic_replay() {
1473        fn run_scenario(seed: u64) -> u64 {
1474            let config = crate::lab::LabConfig::new(seed);
1475            let mut runtime = crate::lab::LabRuntime::new(config);
1476            let region = runtime.state.create_root_region(Budget::INFINITE);
1477            let cx: Cx = Cx::for_testing();
1478            let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1479
1480            let (on_stop_count, started, stopped) = observable_state();
1481            let actor = ObservableCounter::new(on_stop_count.clone(), started, stopped);
1482
1483            let (handle, stored) = scope
1484                .spawn_actor(&mut runtime.state, &cx, actor, 32)
1485                .unwrap();
1486            let task_id = handle.task_id();
1487            runtime.state.store_spawned_task(task_id, stored);
1488
1489            for i in 1..=10 {
1490                handle.try_send(i).unwrap();
1491            }
1492            drop(handle);
1493
1494            runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1495            runtime.run_until_quiescent();
1496
1497            on_stop_count.load(Ordering::SeqCst)
1498        }
1499
1500        init_test("actor_deterministic_replay");
1501
1502        // Run the same scenario twice with the same seed
1503        let result1 = run_scenario(0xDEAD_BEEF);
1504        let result2 = run_scenario(0xDEAD_BEEF);
1505
1506        assert_eq!(
1507            result1, result2,
1508            "deterministic replay: same seed → same result"
1509        );
1510        assert_eq!(result1, 55, "sum of 1..=10");
1511
1512        crate::test_complete!("actor_deterministic_replay");
1513    }
1514
1515    // ---- ActorContext Tests ----
1516
1517    #[test]
1518    fn actor_context_self_reference() {
1519        init_test("actor_context_self_reference");
1520
1521        let mut state = RuntimeState::new();
1522        let root = state.create_root_region(Budget::INFINITE);
1523        let cx: Cx = Cx::for_testing();
1524        let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1525
1526        let (handle, stored) = scope
1527            .spawn_actor(&mut state, &cx, Counter::new(), 32)
1528            .unwrap();
1529        state.store_spawned_task(handle.task_id(), stored);
1530
1531        // Create an ActorContext using the handle's sender
1532        let actor_ref = handle.sender();
1533        let actor_id = handle.actor_id();
1534        let ctx: ActorContext<'_, u64> = ActorContext::new(&cx, actor_ref, actor_id, None);
1535
1536        // Test self_actor_id() - doesn't require Clone
1537        assert_eq!(ctx.self_actor_id(), actor_id);
1538        assert_eq!(ctx.actor_id(), actor_id);
1539
1540        crate::test_complete!("actor_context_self_reference");
1541    }
1542
1543    #[test]
1544    fn actor_context_child_management() {
1545        init_test("actor_context_child_management");
1546
1547        let cx: Cx = Cx::for_testing();
1548        let (sender, _receiver) = mpsc::channel::<u64>(32);
1549        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1550        let actor_ref = ActorRef {
1551            actor_id,
1552            sender,
1553            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1554        };
1555
1556        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1557
1558        // Initially no children
1559        assert!(!ctx.has_children());
1560        assert_eq!(ctx.child_count(), 0);
1561        assert!(ctx.children().is_empty());
1562
1563        // Register children
1564        let child1 = ActorId::from_task(TaskId::new_for_test(2, 1));
1565        let child2 = ActorId::from_task(TaskId::new_for_test(3, 1));
1566
1567        ctx.register_child(child1);
1568        assert!(ctx.has_children());
1569        assert_eq!(ctx.child_count(), 1);
1570
1571        ctx.register_child(child2);
1572        assert_eq!(ctx.child_count(), 2);
1573
1574        // Unregister child
1575        assert!(ctx.unregister_child(child1));
1576        assert_eq!(ctx.child_count(), 1);
1577
1578        // Unregistering non-existent child returns false
1579        assert!(!ctx.unregister_child(child1));
1580
1581        crate::test_complete!("actor_context_child_management");
1582    }
1583
1584    #[test]
1585    fn actor_context_stopping() {
1586        init_test("actor_context_stopping");
1587
1588        let cx: Cx = Cx::for_testing();
1589        let (sender, _receiver) = mpsc::channel::<u64>(32);
1590        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1591        let actor_ref = ActorRef {
1592            actor_id,
1593            sender,
1594            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1595        };
1596
1597        let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1598
1599        // Initially not stopping
1600        assert!(!ctx.is_stopping());
1601        assert!(ctx.checkpoint().is_ok());
1602
1603        // Request stop
1604        ctx.stop_self();
1605        assert!(ctx.is_stopping());
1606        assert!(ctx.checkpoint().is_err());
1607        assert!(ctx.is_cancel_requested());
1608
1609        crate::test_complete!("actor_context_stopping");
1610    }
1611
1612    #[test]
1613    fn actor_context_parent_none() {
1614        init_test("actor_context_parent_none");
1615
1616        let cx: Cx = Cx::for_testing();
1617        let (sender, _receiver) = mpsc::channel::<u64>(32);
1618        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1619        let actor_ref = ActorRef {
1620            actor_id,
1621            sender,
1622            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1623        };
1624
1625        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1626
1627        // Root actor has no parent
1628        assert!(!ctx.has_parent());
1629        assert!(ctx.parent().is_none());
1630
1631        crate::test_complete!("actor_context_parent_none");
1632    }
1633
1634    #[test]
1635    fn actor_context_cx_delegation() {
1636        init_test("actor_context_cx_delegation");
1637
1638        let cx: Cx = Cx::for_testing();
1639        let (sender, _receiver) = mpsc::channel::<u64>(32);
1640        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1641        let actor_ref = ActorRef {
1642            actor_id,
1643            sender,
1644            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1645        };
1646
1647        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1648
1649        // Test Cx delegation via Deref
1650        let _budget = ctx.budget();
1651        ctx.trace("test_event");
1652
1653        // Test cx() accessor
1654        let _cx_ref = ctx.cx();
1655
1656        crate::test_complete!("actor_context_cx_delegation");
1657    }
1658
1659    #[test]
1660    fn actor_context_debug() {
1661        init_test("actor_context_debug");
1662
1663        let cx: Cx = Cx::for_testing();
1664        let (sender, _receiver) = mpsc::channel::<u64>(32);
1665        let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1666        let actor_ref = ActorRef {
1667            actor_id,
1668            sender,
1669            state: Arc::new(ActorStateCell::new(ActorState::Running)),
1670        };
1671
1672        let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1673
1674        // Debug formatting should work
1675        let debug_str = format!("{ctx:?}");
1676        assert!(debug_str.contains("ActorContext"));
1677        assert!(debug_str.contains("actor_id"));
1678
1679        crate::test_complete!("actor_context_debug");
1680    }
1681}