use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::Duration;
use crate::channel::mpsc;
use crate::channel::mpsc::SendError;
use crate::cx::Cx;
use crate::runtime::{JoinError, SpawnError};
use crate::types::{CxInner, Outcome, RegionId, TaskId, Time};
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ActorId(TaskId);
impl ActorId {
#[must_use]
#[inline]
pub const fn from_task(task_id: TaskId) -> Self {
Self(task_id)
}
#[must_use]
#[inline]
pub const fn task_id(self) -> TaskId {
self.0
}
}
impl std::fmt::Debug for ActorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ActorId").field(&self.0).finish()
}
}
impl std::fmt::Display for ActorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ActorState {
Created,
Running,
Stopping,
Stopped,
}
#[derive(Debug)]
struct ActorStateCell {
state: AtomicU8,
}
impl ActorStateCell {
#[inline]
fn new(state: ActorState) -> Self {
Self {
state: AtomicU8::new(Self::encode(state)),
}
}
#[inline]
fn load(&self) -> ActorState {
Self::decode(self.state.load(Ordering::Acquire))
}
#[inline]
fn store(&self, state: ActorState) {
self.state.store(Self::encode(state), Ordering::Release);
}
#[inline]
const fn encode(state: ActorState) -> u8 {
match state {
ActorState::Created => 0,
ActorState::Running => 1,
ActorState::Stopping => 2,
ActorState::Stopped => 3,
}
}
#[inline]
const fn decode(value: u8) -> ActorState {
match value {
0 => ActorState::Created,
1 => ActorState::Running,
2 => ActorState::Stopping,
_ => ActorState::Stopped,
}
}
}
struct ActorCell<M> {
mailbox: mpsc::Receiver<M>,
state: Arc<ActorStateCell>,
}
pub trait Actor: Send + 'static {
type Message: Send + 'static;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
fn handle(
&mut self,
cx: &Cx,
msg: Self::Message,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
#[derive(Debug)]
pub struct ActorHandle<A: Actor> {
actor_id: ActorId,
sender: mpsc::Sender<A::Message>,
state: Arc<ActorStateCell>,
task_id: TaskId,
receiver: crate::channel::oneshot::Receiver<Result<A, JoinError>>,
inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
completed: bool,
}
impl<A: Actor> ActorHandle<A> {
pub async fn send(&self, cx: &Cx, msg: A::Message) -> Result<(), SendError<A::Message>> {
self.sender.send(cx, msg).await
}
pub fn try_send(&self, msg: A::Message) -> Result<(), SendError<A::Message>> {
self.sender.try_send(msg)
}
#[must_use]
pub fn sender(&self) -> ActorRef<A::Message> {
ActorRef {
actor_id: self.actor_id,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
}
}
#[must_use]
pub const fn actor_id(&self) -> ActorId {
self.actor_id
}
#[must_use]
pub fn task_id(&self) -> crate::types::TaskId {
self.task_id
}
pub fn stop(&self) {
self.state.store(ActorState::Stopping);
self.sender.close_receiver();
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.completed || self.receiver.is_ready() || self.receiver.is_closed()
}
pub fn join<'a>(&'a mut self, _cx: &'a Cx) -> ActorJoinFuture<'a, A> {
let cx_inner = self.inner.clone();
let receiver = &mut self.receiver;
let terminal_state = &mut self.completed;
ActorJoinFuture {
inner: receiver.recv_uninterruptible(),
cx_inner,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
terminal_state,
drop_abort_defused: false,
}
}
pub fn abort(&self) {
self.state.store(ActorState::Stopping);
self.sender.close_receiver();
if let Some(inner) = self.inner.upgrade() {
let cancel_waker = {
let mut guard = inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if guard.cancel_reason.is_none() {
guard.cancel_reason = Some(crate::types::CancelReason::user("actor aborted"));
}
guard.cancel_waker.clone()
};
if let Some(waker) = cancel_waker {
waker.wake_by_ref();
}
}
}
}
pub struct ActorJoinFuture<'a, A: Actor> {
inner: crate::channel::oneshot::RecvUninterruptibleFuture<'a, Result<A, JoinError>>,
cx_inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
sender: mpsc::Sender<A::Message>,
state: Arc<ActorStateCell>,
terminal_state: &'a mut bool,
drop_abort_defused: bool,
}
impl<A: Actor> ActorJoinFuture<'_, A> {
fn closed_reason(&self) -> crate::types::CancelReason {
self.cx_inner
.upgrade()
.and_then(|inner| inner.read().cancel_reason.clone())
.unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
}
fn abort(&self) {
self.state.store(ActorState::Stopping);
self.sender.close_receiver();
if let Some(inner) = self.cx_inner.upgrade() {
let cancel_waker = {
let mut guard = inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if guard.cancel_reason.is_none() {
guard.cancel_reason = Some(crate::types::CancelReason::user("actor aborted"));
}
guard.cancel_waker.clone()
};
if let Some(waker) = cancel_waker {
waker.wake_by_ref();
}
}
}
}
impl<A: Actor> std::future::Future for ActorJoinFuture<'_, A> {
type Output = Result<A, JoinError>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = &mut *self;
if *this.terminal_state {
return std::task::Poll::Ready(Err(JoinError::PolledAfterCompletion));
}
match Pin::new(&mut this.inner).poll(cx) {
std::task::Poll::Ready(Ok(res)) => {
*this.terminal_state = true;
this.drop_abort_defused = true;
std::task::Poll::Ready(res)
}
std::task::Poll::Ready(Err(crate::channel::oneshot::RecvError::Closed)) => {
*this.terminal_state = true;
this.drop_abort_defused = true;
let reason = this.closed_reason();
std::task::Poll::Ready(Err(JoinError::Cancelled(reason)))
}
std::task::Poll::Ready(Err(crate::channel::oneshot::RecvError::Cancelled)) => {
unreachable!("RecvUninterruptibleFuture cannot return Cancelled");
}
std::task::Poll::Ready(Err(
crate::channel::oneshot::RecvError::PolledAfterCompletion,
)) => {
unreachable!(
"JoinFuture guards repolls before polling the inner oneshot recv future"
)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
impl<A: Actor> Drop for ActorJoinFuture<'_, A> {
fn drop(&mut self) {
if !*self.terminal_state && !self.drop_abort_defused {
if self.inner.receiver_finished() {
return;
}
self.abort();
}
}
}
#[derive(Debug)]
pub struct ActorRef<M> {
actor_id: ActorId,
sender: mpsc::Sender<M>,
state: Arc<ActorStateCell>,
}
impl<M> Clone for ActorRef<M> {
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
}
}
}
impl<M: Send + 'static> ActorRef<M> {
pub async fn send(&self, cx: &Cx, msg: M) -> Result<(), SendError<M>> {
self.sender.send(cx, msg).await
}
#[must_use]
pub fn reserve<'a>(&'a self, cx: &'a Cx) -> mpsc::Reserve<'a, M> {
self.sender.reserve(cx)
}
pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
self.sender.try_send(msg)
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
#[must_use]
pub fn is_alive(&self) -> bool {
self.state.load() != ActorState::Stopped
}
#[must_use]
pub const fn actor_id(&self) -> ActorId {
self.actor_id
}
}
#[derive(Debug, Clone, Copy)]
pub struct MailboxConfig {
pub capacity: usize,
pub backpressure: bool,
}
impl Default for MailboxConfig {
fn default() -> Self {
Self {
capacity: DEFAULT_MAILBOX_CAPACITY,
backpressure: true,
}
}
}
impl MailboxConfig {
#[must_use]
pub const fn with_capacity(capacity: usize) -> Self {
Self {
capacity,
backpressure: true,
}
}
}
#[derive(Debug, Clone)]
pub enum SupervisorMessage {
ChildFailed {
child_id: ActorId,
reason: String,
},
ChildStopped {
child_id: ActorId,
},
}
pub struct ActorContext<'a, M: Send + 'static> {
cx: &'a Cx,
self_ref: ActorRef<M>,
actor_id: ActorId,
parent: Option<ActorRef<SupervisorMessage>>,
children: Vec<ActorId>,
stopping: bool,
}
#[allow(clippy::elidable_lifetime_names)]
impl<'a, M: Send + 'static> ActorContext<'a, M> {
#[must_use]
pub fn new(
cx: &'a Cx,
self_ref: ActorRef<M>,
actor_id: ActorId,
parent: Option<ActorRef<SupervisorMessage>>,
) -> Self {
Self {
cx,
self_ref,
actor_id,
parent,
children: Vec::new(),
stopping: false,
}
}
#[must_use]
pub const fn self_actor_id(&self) -> ActorId {
self.actor_id
}
#[must_use]
pub const fn actor_id(&self) -> ActorId {
self.actor_id
}
pub fn register_child(&mut self, child_id: ActorId) {
self.children.push(child_id);
}
pub fn unregister_child(&mut self, child_id: ActorId) -> bool {
if let Some(pos) = self.children.iter().position(|&id| id == child_id) {
self.children.swap_remove(pos);
true
} else {
false
}
}
#[must_use]
pub fn children(&self) -> &[ActorId] {
&self.children
}
#[must_use]
pub fn has_children(&self) -> bool {
!self.children.is_empty()
}
#[must_use]
pub fn child_count(&self) -> usize {
self.children.len()
}
pub fn stop_self(&mut self) {
self.stopping = true;
}
#[must_use]
pub fn is_stopping(&self) -> bool {
self.stopping
}
#[must_use]
pub fn parent(&self) -> Option<&ActorRef<SupervisorMessage>> {
self.parent.as_ref()
}
#[must_use]
pub fn has_parent(&self) -> bool {
self.parent.is_some()
}
pub async fn escalate(&self, reason: String) {
if let Some(parent) = &self.parent {
let msg = SupervisorMessage::ChildFailed {
child_id: self.actor_id,
reason,
};
let _ = parent.send(self.cx, msg).await;
}
}
#[allow(clippy::result_large_err)]
pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
if self.stopping {
let reason = crate::types::CancelReason::user("actor stopping")
.with_region(self.cx.region_id())
.with_task(self.cx.task_id());
return Err(crate::error::Error::cancelled(&reason));
}
self.cx.checkpoint()
}
#[must_use]
pub fn is_cancel_requested(&self) -> bool {
self.stopping || self.cx.checkpoint().is_err()
}
#[must_use]
pub fn budget(&self) -> crate::types::Budget {
self.cx.budget()
}
#[must_use]
pub fn deadline(&self) -> Option<Time> {
self.cx.budget().deadline
}
pub fn trace(&self, event: &str) {
self.cx.trace(event);
}
#[must_use]
pub fn self_ref(&self) -> ActorRef<M> {
self.self_ref.clone()
}
#[must_use]
pub const fn cx(&self) -> &Cx {
self.cx
}
}
impl<M: Send + 'static> std::ops::Deref for ActorContext<'_, M> {
type Target = Cx;
fn deref(&self) -> &Self::Target {
self.cx
}
}
impl<M: Send + 'static> std::fmt::Debug for ActorContext<'_, M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActorContext")
.field("actor_id", &self.actor_id)
.field("children", &self.children.len())
.field("stopping", &self.stopping)
.field("has_parent", &self.parent.is_some())
.finish()
}
}
pub const DEFAULT_MAILBOX_CAPACITY: usize = 64;
struct OnStopMaskGuard(Arc<parking_lot::RwLock<CxInner>>);
impl Drop for OnStopMaskGuard {
fn drop(&mut self) {
let mut g = self.0.write();
g.mask_depth = g.mask_depth.saturating_sub(1);
}
}
async fn run_actor_loop<A: Actor>(mut actor: A, cx: Cx, cell: &mut ActorCell<A::Message>) -> A {
use crate::tracing_compat::debug;
if cell.state.load() != ActorState::Stopping {
cell.state.store(ActorState::Running);
}
cx.trace("actor::on_start");
actor.on_start(&cx).await;
loop {
if cx.checkpoint().is_err() {
cx.trace("actor::cancel_requested");
break;
}
let recv_result = std::future::poll_fn(|task_cx| {
match cell.mailbox.poll_recv(&cx, task_cx) {
std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
}
other => other,
}
})
.await;
match recv_result {
Ok(msg) => {
actor.handle(&cx, msg).await;
}
Err(crate::channel::mpsc::RecvError::Disconnected) => {
cx.trace("actor::mailbox_disconnected");
break;
}
Err(crate::channel::mpsc::RecvError::Cancelled) => {
cx.trace("actor::recv_cancelled");
break;
}
Err(crate::channel::mpsc::RecvError::Empty) => {
break;
}
}
}
cell.state.store(ActorState::Stopping);
let is_aborted = cx.checkpoint().is_err();
cell.mailbox.close();
if is_aborted {
while let Ok(_msg) = cell.mailbox.try_recv() {}
} else {
let mut drained: u64 = 0;
while let Ok(msg) = cell.mailbox.try_recv() {
actor.handle(&cx, msg).await;
drained += 1;
}
if drained > 0 {
debug!(drained = drained, "actor::mailbox_drained");
cx.trace("actor::mailbox_drained");
}
}
cx.trace("actor::on_stop");
let inner = cx.inner.clone();
{
let mut guard = inner.write();
assert!(
guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
"mask depth exceeded MAX_MASK_DEPTH ({}) in actor::on_stop: \
this violates INV-MASK-BOUNDED and prevents cancellation from ever \
being observed. Reduce nesting of masked sections.",
crate::types::task_context::MAX_MASK_DEPTH
);
guard.mask_depth += 1;
}
let mask_guard = OnStopMaskGuard(inner);
actor.on_stop(&cx).await;
drop(mask_guard);
actor
}
fn actor_cancel_join_error(cx: &Cx) -> JoinError {
JoinError::Cancelled(
cx.cancel_reason()
.unwrap_or_else(|| crate::types::CancelReason::user("actor supervision cancelled")),
)
}
fn supervised_restart_timestamp(cx: &Cx) -> u64 {
cx.timer_driver().map_or_else(
|| crate::time::wall_now().as_nanos(),
|td| td.now().as_nanos(),
)
}
async fn wait_supervised_restart_delay(cx: &Cx, delay: Duration) -> Result<(), JoinError> {
if cx.checkpoint().is_err() {
return Err(actor_cancel_join_error(cx));
}
if delay.is_zero() {
return Ok(());
}
let now = cx
.timer_driver()
.map_or_else(crate::time::wall_now, |td| td.now());
let mut sleeper = crate::time::sleep(now, delay);
std::future::poll_fn(|task_cx| {
if cx.checkpoint().is_err() {
return std::task::Poll::Ready(Err(actor_cancel_join_error(cx)));
}
Pin::new(&mut sleeper).poll(task_cx).map(|()| Ok(()))
})
.await
}
fn join_result_to_task_outcome<A>(result: &Result<A, JoinError>) -> Outcome<(), ()> {
match result {
Ok(_) => Outcome::Ok(()),
Err(JoinError::Cancelled(reason)) => Outcome::Cancelled(reason.clone()),
Err(JoinError::Panicked(payload)) => Outcome::Panicked(payload.clone()),
Err(JoinError::PolledAfterCompletion) => {
panic!("actor task produced JoinError::PolledAfterCompletion")
}
}
}
impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
pub fn spawn_actor<A: Actor>(
&self,
state: &mut crate::runtime::state::RuntimeState,
cx: &Cx,
actor: A,
mailbox_capacity: usize,
) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError> {
use crate::channel::oneshot;
use crate::cx::scope::CatchUnwind;
use crate::runtime::stored_task::StoredTask;
use crate::tracing_compat::{debug, debug_span};
let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
let task_id = self.create_task_record(state)?;
let actor_id = ActorId::from_task(task_id);
let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
let _span = debug_span!(
"actor_spawn",
task_id = ?task_id,
region_id = ?self.region_id(),
mailbox_capacity = mailbox_capacity,
)
.entered();
debug!(
task_id = ?task_id,
region_id = ?self.region_id(),
mailbox_capacity = mailbox_capacity,
"actor spawned"
);
let (_, child_cx) = self.build_child_task_cx(state, cx, task_id);
if let Some(record) = state.task_mut(task_id) {
record.set_cx_inner(child_cx.inner.clone());
record.set_cx(child_cx.clone());
}
let cx_for_send = child_cx.clone();
let inner_weak = Arc::downgrade(&child_cx.inner);
let state_for_task = Arc::clone(&actor_state);
let mut cell = ActorCell {
mailbox: msg_rx,
state: Arc::clone(&actor_state),
};
let wrapped = async move {
let result = CatchUnwind {
inner: Box::pin(run_actor_loop(actor, child_cx, &mut cell)),
}
.await;
let outcome = match result {
Ok(actor_final) => {
let _ = result_tx.send(&cx_for_send, Ok(actor_final));
Outcome::Ok(())
}
Err(payload) => {
let msg = crate::cx::scope::payload_to_string(&payload);
let panic_payload = crate::types::PanicPayload::new(msg);
let _ = result_tx.send(
&cx_for_send,
Err(JoinError::Panicked(panic_payload.clone())),
);
Outcome::Panicked(panic_payload)
}
};
state_for_task.store(ActorState::Stopped);
outcome
};
let stored = StoredTask::new_with_id(wrapped, task_id);
let handle = ActorHandle {
actor_id,
sender: msg_tx,
state: actor_state,
task_id,
receiver: result_rx,
inner: inner_weak,
completed: false,
};
Ok((handle, stored))
}
pub fn spawn_supervised_actor<A, F>(
&self,
state: &mut crate::runtime::state::RuntimeState,
cx: &Cx,
mut factory: F,
strategy: crate::supervision::SupervisionStrategy,
mailbox_capacity: usize,
) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError>
where
A: Actor,
F: FnMut() -> A + Send + 'static,
{
use crate::channel::oneshot;
use crate::runtime::stored_task::StoredTask;
use crate::supervision::Supervisor;
use crate::tracing_compat::{debug, debug_span};
let actor = factory();
let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
let task_id = self.create_task_record(state)?;
let actor_id = ActorId::from_task(task_id);
let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
let _span = debug_span!(
"supervised_actor_spawn",
task_id = ?task_id,
region_id = ?self.region_id(),
mailbox_capacity = mailbox_capacity,
)
.entered();
debug!(
task_id = ?task_id,
region_id = ?self.region_id(),
"supervised actor spawned"
);
let (_, child_cx) = self.build_child_task_cx(state, cx, task_id);
if let Some(record) = state.task_mut(task_id) {
record.set_cx_inner(child_cx.inner.clone());
record.set_cx(child_cx.clone());
}
let cx_for_send = child_cx.clone();
let inner_weak = Arc::downgrade(&child_cx.inner);
let region_id = self.region_id();
let state_for_task = Arc::clone(&actor_state);
let mut cell = ActorCell {
mailbox: msg_rx,
state: Arc::clone(&actor_state),
};
let wrapped = async move {
let result = run_supervised_loop(
actor,
&mut factory,
child_cx,
&mut cell,
Supervisor::new(strategy),
task_id,
region_id,
)
.await;
let outcome = join_result_to_task_outcome(&result).map_err(|_| ());
let _ = result_tx.send(&cx_for_send, result);
state_for_task.store(ActorState::Stopped);
outcome
};
let stored = StoredTask::new_with_id(wrapped, task_id);
let handle = ActorHandle {
actor_id,
sender: msg_tx,
state: actor_state,
task_id,
receiver: result_rx,
inner: inner_weak,
completed: false,
};
Ok((handle, stored))
}
}
#[derive(Debug)]
pub enum SupervisedOutcome {
Stopped,
RestartBudgetExhausted {
total_restarts: u32,
},
Escalated,
}
async fn run_supervised_loop<A, F>(
initial_actor: A,
factory: &mut F,
cx: Cx,
cell: &mut ActorCell<A::Message>,
mut supervisor: crate::supervision::Supervisor,
task_id: TaskId,
region_id: RegionId,
) -> Result<A, JoinError>
where
A: Actor,
F: FnMut() -> A,
{
use crate::cx::scope::CatchUnwind;
use crate::supervision::SupervisionDecision;
use crate::types::Outcome;
let mut current_actor = initial_actor;
loop {
let result = CatchUnwind {
inner: Box::pin(run_actor_loop(current_actor, cx.clone(), cell)),
}
.await;
match result {
Ok(actor_final) => {
return Ok(actor_final);
}
Err(payload) => {
let msg = crate::cx::scope::payload_to_string(&payload);
let panic_payload = crate::types::PanicPayload::new(msg);
cx.trace("supervised_actor::failure");
if cell.state.load() == ActorState::Stopping || cx.checkpoint().is_err() {
cx.trace("supervised_actor::shutdown_panic");
return Err(JoinError::Panicked(panic_payload));
}
let outcome = Outcome::Err(());
let now = supervised_restart_timestamp(&cx);
let decision = supervisor.on_failure(task_id, region_id, None, &outcome, now);
match decision {
SupervisionDecision::Restart { delay, .. } => {
cx.trace("supervised_actor::restart");
if cell.state.load() == ActorState::Stopping {
cx.trace("supervised_actor::restart_suppressed");
return Err(JoinError::Panicked(panic_payload));
}
if let Some(backoff) = delay {
wait_supervised_restart_delay(&cx, backoff).await?;
}
if cell.state.load() == ActorState::Stopping || cx.checkpoint().is_err() {
cx.trace("supervised_actor::restart_suppressed");
return Err(JoinError::Panicked(panic_payload));
}
cell.state.store(ActorState::Created);
current_actor = factory();
}
SupervisionDecision::Stop { .. } => {
cx.trace("supervised_actor::stopped");
return Err(JoinError::Panicked(panic_payload));
}
SupervisionDecision::Escalate { .. } => {
cx.trace("supervised_actor::escalated");
return Err(JoinError::Panicked(panic_payload));
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cx::macaroon::MacaroonToken;
use crate::cx::registry::{RegistryCap, RegistryHandle};
use crate::remote::{NodeId, RemoteCap};
use crate::runtime::state::RuntimeState;
use crate::security::key::AuthKey;
use crate::types::Budget;
use crate::types::SystemPressure;
use crate::types::policy::FailFast;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn counting_waker(counter: Arc<std::sync::atomic::AtomicUsize>) -> Waker {
struct CountingWaker {
counter: Arc<std::sync::atomic::AtomicUsize>,
}
impl std::task::Wake for CountingWaker {
fn wake(self: Arc<Self>) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
Waker::from(Arc::new(CountingWaker { counter }))
}
#[derive(Debug)]
struct Counter {
count: u64,
started: bool,
stopped: bool,
}
impl Counter {
fn new() -> Self {
Self {
count: 0,
started: false,
stopped: false,
}
}
}
impl Actor for Counter {
type Message = u64;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.started = true;
Box::pin(async {})
}
fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.count += msg;
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.stopped = true;
Box::pin(async {})
}
}
fn assert_actor<A: Actor>() {}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CapabilitySnapshot {
same_registry: bool,
same_remote: bool,
same_io: bool,
same_pressure: bool,
same_macaroon: bool,
has_timer: bool,
}
struct CapabilityProbeActor {
snapshot: Arc<parking_lot::Mutex<Option<CapabilitySnapshot>>>,
expected_registry: Arc<dyn RegistryCap>,
expected_remote_node: String,
expected_io: Arc<dyn crate::io::IoCap>,
expected_pressure: Arc<SystemPressure>,
expected_macaroon: Arc<MacaroonToken>,
}
impl Actor for CapabilityProbeActor {
type Message = ();
fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let child_registry = cx
.registry_handle()
.expect("actor child Cx must inherit registry")
.as_arc();
let child_io = cx
.io_cap_handle()
.expect("actor child Cx must inherit io capability");
let child_pressure = cx
.pressure_handle()
.expect("actor child Cx must inherit system pressure");
let child_macaroon = cx
.macaroon_handle()
.expect("actor child Cx must inherit macaroon");
let remote_node = cx
.remote()
.map(|remote| remote.local_node().as_str().to_owned());
*self.snapshot.lock() = Some(CapabilitySnapshot {
same_registry: Arc::ptr_eq(&child_registry, &self.expected_registry),
same_remote: remote_node.as_deref() == Some(self.expected_remote_node.as_str()),
same_io: Arc::ptr_eq(&child_io, &self.expected_io),
same_pressure: Arc::ptr_eq(&child_pressure, &self.expected_pressure),
same_macaroon: Arc::ptr_eq(&child_macaroon, &self.expected_macaroon),
has_timer: cx.has_timer(),
});
Box::pin(async {})
}
fn handle(&mut self, _cx: &Cx, _msg: ()) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
fn capability_rich_parent_cx(
runtime: &crate::lab::LabRuntime,
region: crate::types::RegionId,
) -> (
Cx,
Arc<dyn RegistryCap>,
Arc<dyn crate::io::IoCap>,
Arc<SystemPressure>,
Arc<MacaroonToken>,
) {
let registry = crate::cx::NameRegistry::new();
let registry_handle = RegistryHandle::new(Arc::new(registry));
let registry_arc = registry_handle.as_arc();
let io_cap: Arc<dyn crate::io::IoCap> = Arc::new(crate::io::LabIoCap::new());
let pressure = Arc::new(SystemPressure::with_headroom(0.25));
let macaroon_token =
MacaroonToken::mint(&AuthKey::from_seed(7), "scope:actor", "actor/tests");
let parent_cx = Cx::new_with_drivers(
region,
crate::types::TaskId::new_for_test(77, 0),
Budget::INFINITE,
None,
None,
Some(Arc::clone(&io_cap)),
runtime.state.timer_driver_handle(),
None,
)
.with_registry_handle(Some(registry_handle))
.with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("actor-origin")))
.with_pressure(Arc::clone(&pressure))
.with_macaroon(macaroon_token);
let macaroon = parent_cx
.macaroon_handle()
.expect("parent actor test Cx must retain macaroon");
(parent_cx, registry_arc, io_cap, pressure, macaroon)
}
#[test]
fn actor_trait_object_safety() {
init_test("actor_trait_object_safety");
assert_actor::<Counter>();
crate::test_complete!("actor_trait_object_safety");
}
#[test]
fn actor_handle_creation() {
init_test("actor_handle_creation");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let result = scope.spawn_actor(&mut state, &cx, Counter::new(), 32);
assert!(result.is_ok(), "spawn_actor should succeed");
let (handle, stored) = result.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let _tid = handle.task_id();
assert!(!handle.is_finished());
crate::test_complete!("actor_handle_creation");
}
#[test]
fn spawn_actor_inherits_child_cx_capabilities() {
init_test("spawn_actor_inherits_child_cx_capabilities");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (parent_cx, registry_arc, io_cap, pressure, macaroon) =
capability_rich_parent_cx(&runtime, region);
let snapshot = Arc::new(parking_lot::Mutex::new(None));
let actor = CapabilityProbeActor {
snapshot: Arc::clone(&snapshot),
expected_registry: registry_arc,
expected_remote_node: "actor-origin".to_string(),
expected_io: io_cap,
expected_pressure: pressure,
expected_macaroon: macaroon,
};
let (handle, stored) = scope
.spawn_actor(&mut runtime.state, &parent_cx, actor, 8)
.expect("spawn actor");
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_idle();
let observed = snapshot
.lock()
.clone()
.expect("actor on_start should capture inherited capabilities");
assert_eq!(
observed,
CapabilitySnapshot {
same_registry: true,
same_remote: true,
same_io: true,
same_pressure: true,
same_macaroon: true,
has_timer: true,
}
);
drop(handle);
runtime.run_until_quiescent();
crate::test_complete!("spawn_actor_inherits_child_cx_capabilities");
}
#[test]
fn spawn_supervised_actor_inherits_child_cx_capabilities() {
init_test("spawn_supervised_actor_inherits_child_cx_capabilities");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (parent_cx, registry_arc, io_cap, pressure, macaroon) =
capability_rich_parent_cx(&runtime, region);
let snapshot = Arc::new(parking_lot::Mutex::new(None));
let snapshot_for_factory = Arc::clone(&snapshot);
let strategy = crate::supervision::SupervisionStrategy::Stop;
let (handle, stored) = scope
.spawn_supervised_actor(
&mut runtime.state,
&parent_cx,
move || CapabilityProbeActor {
snapshot: Arc::clone(&snapshot_for_factory),
expected_registry: Arc::clone(®istry_arc),
expected_remote_node: "actor-origin".to_string(),
expected_io: Arc::clone(&io_cap),
expected_pressure: Arc::clone(&pressure),
expected_macaroon: Arc::clone(&macaroon),
},
strategy,
8,
)
.expect("spawn supervised actor");
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_idle();
let observed = snapshot
.lock()
.clone()
.expect("supervised actor on_start should capture inherited capabilities");
assert_eq!(
observed,
CapabilitySnapshot {
same_registry: true,
same_remote: true,
same_io: true,
same_pressure: true,
same_macaroon: true,
has_timer: true,
}
);
drop(handle);
runtime.run_until_quiescent();
crate::test_complete!("spawn_supervised_actor_inherits_child_cx_capabilities");
}
#[test]
fn actor_id_generation_distinct() {
init_test("actor_id_generation_distinct");
let id1 = ActorId::from_task(TaskId::new_for_test(1, 1));
let id2 = ActorId::from_task(TaskId::new_for_test(1, 2));
assert!(id1 != id2, "generation must distinguish actor reuse");
crate::test_complete!("actor_id_generation_distinct");
}
#[test]
fn actor_ref_is_cloneable() {
init_test("actor_ref_is_cloneable");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let ref1 = handle.sender();
let ref2 = ref1.clone();
assert_eq!(ref1.actor_id(), handle.actor_id());
assert_eq!(ref2.actor_id(), handle.actor_id());
assert!(ref1.is_alive());
assert!(ref2.is_alive());
assert!(!ref1.is_closed());
assert!(!ref2.is_closed());
crate::test_complete!("actor_ref_is_cloneable");
}
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
struct ObservableCounter {
count: u64,
on_stop_count: Arc<AtomicU64>,
started: Arc<AtomicBool>,
stopped: Arc<AtomicBool>,
}
impl ObservableCounter {
fn new(
on_stop_count: Arc<AtomicU64>,
started: Arc<AtomicBool>,
stopped: Arc<AtomicBool>,
) -> Self {
Self {
count: 0,
on_stop_count,
started,
stopped,
}
}
}
impl Actor for ObservableCounter {
type Message = u64;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.started.store(true, Ordering::SeqCst);
Box::pin(async {})
}
fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.count += msg;
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.on_stop_count.store(self.count, Ordering::SeqCst);
self.stopped.store(true, Ordering::SeqCst);
Box::pin(async {})
}
}
fn observable_state() -> (Arc<AtomicU64>, Arc<AtomicBool>, Arc<AtomicBool>) {
(
Arc::new(AtomicU64::new(u64::MAX)),
Arc::new(AtomicBool::new(false)),
Arc::new(AtomicBool::new(false)),
)
}
#[test]
fn actor_processes_all_messages() {
init_test("actor_processes_all_messages");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (on_stop_count, started, stopped) = observable_state();
let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
let (handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, actor, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
for _ in 0..5 {
handle.try_send(1).unwrap();
}
drop(handle);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
assert_eq!(
on_stop_count.load(Ordering::SeqCst),
5,
"all messages processed"
);
assert!(started.load(Ordering::SeqCst), "on_start was called");
assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
crate::test_complete!("actor_processes_all_messages");
}
#[test]
fn actor_drains_mailbox_on_cancel() {
init_test("actor_drains_mailbox_on_cancel");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (on_stop_count, started, stopped) = observable_state();
let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
let (handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, actor, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
for _ in 0..5 {
handle.try_send(1).unwrap();
}
handle.stop();
let stopped_ref = handle.sender();
assert!(
stopped_ref.is_closed(),
"stop() seals the mailbox immediately"
);
assert!(
matches!(handle.try_send(99), Err(SendError::Disconnected(99))),
"stop() must reject new messages instead of extending shutdown"
);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
assert_eq!(
on_stop_count.load(Ordering::SeqCst),
5,
"drain processed all messages"
);
assert!(started.load(Ordering::SeqCst), "on_start was called");
assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
crate::test_complete!("actor_drains_mailbox_on_cancel");
}
#[test]
fn actor_ref_is_alive_transitions() {
init_test("actor_ref_is_alive_transitions");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (on_stop_count, started, stopped) = observable_state();
let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
let (handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, actor, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
let actor_ref = handle.sender();
assert!(actor_ref.is_alive(), "created actor should be alive");
assert_eq!(actor_ref.actor_id(), handle.actor_id());
handle.stop();
assert!(actor_ref.is_alive(), "stopping actor is still alive");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
assert!(
handle.is_finished(),
"actor should be finished after stop + run"
);
assert!(!actor_ref.is_alive(), "finished actor is not alive");
assert!(started.load(Ordering::SeqCst), "on_start was called");
assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
assert_ne!(
on_stop_count.load(Ordering::SeqCst),
u64::MAX,
"on_stop_count updated"
);
crate::test_complete!("actor_ref_is_alive_transitions");
}
#[test]
fn dropped_join_future_marks_actor_stopping_like_abort() {
init_test("dropped_join_future_marks_actor_stopping_like_abort");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (on_stop_count, started, stopped) = observable_state();
let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
let (mut handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, actor, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_idle();
assert_eq!(
handle.state.load(),
ActorState::Running,
"actor should be running before join drop requests abort"
);
drop(handle.join(&cx));
assert_eq!(
handle.state.load(),
ActorState::Stopping,
"dropping join future should mirror ActorHandle::abort state transition"
);
assert!(
matches!(handle.try_send(1), Err(SendError::Disconnected(1))),
"join-drop abort must seal the mailbox immediately"
);
runtime.run_until_quiescent();
assert!(
handle.is_finished(),
"actor should stop after join future drop"
);
assert!(started.load(Ordering::SeqCst), "on_start should have run");
assert!(stopped.load(Ordering::SeqCst), "on_stop should have run");
assert_eq!(
on_stop_count.load(Ordering::SeqCst),
0,
"idle actor should stop without processing phantom messages"
);
crate::test_complete!("dropped_join_future_marks_actor_stopping_like_abort");
}
#[test]
fn actor_stop_unblocks_pending_sender_with_disconnect() {
init_test("actor_stop_unblocks_pending_sender_with_disconnect");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 1)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
handle.try_send(1).expect("fill mailbox");
let sender = handle.sender();
let mut send_fut = Box::pin(sender.send(&cx, 2));
let wake_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let waker = counting_waker(Arc::clone(&wake_count));
let mut task_cx = Context::from_waker(&waker);
let first_poll = send_fut.as_mut().poll(&mut task_cx);
assert!(
matches!(first_poll, Poll::Pending),
"send should wait while the mailbox is full"
);
handle.stop();
assert_eq!(
wake_count.load(Ordering::SeqCst),
1,
"stop() must wake a sender blocked on mailbox capacity"
);
let second_poll = send_fut.as_mut().poll(&mut task_cx);
assert!(
matches!(second_poll, Poll::Ready(Err(SendError::Disconnected(2)))),
"pending sender must fail fast once stop seals the mailbox"
);
crate::test_complete!("actor_stop_unblocks_pending_sender_with_disconnect");
}
#[test]
fn supervised_actor_panic_restarts_under_restart_strategy() {
use std::sync::atomic::AtomicU32;
#[derive(Debug)]
struct PanickingCounter {
count: u64,
panic_on: u64,
final_count: Arc<AtomicU64>,
}
impl Actor for PanickingCounter {
type Message = u64;
fn handle(
&mut self,
_cx: &Cx,
msg: u64,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
assert!(msg != self.panic_on, "threshold exceeded: {msg}");
self.count += msg;
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.final_count.store(self.count, Ordering::SeqCst);
Box::pin(async {})
}
}
init_test("supervised_actor_panic_restarts_under_restart_strategy");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let final_count = Arc::new(AtomicU64::new(u64::MAX));
let restart_count = Arc::new(AtomicU32::new(0));
let fc = final_count.clone();
let rc = restart_count.clone();
let strategy = crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(3, std::time::Duration::from_secs(60))
.with_backoff(crate::supervision::BackoffStrategy::None),
);
let (mut handle, stored) = scope
.spawn_supervised_actor(
&mut runtime.state,
&cx,
move || {
rc.fetch_add(1, Ordering::SeqCst);
PanickingCounter {
count: 0,
panic_on: 999,
final_count: fc.clone(),
}
},
strategy,
32,
)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_send(1).unwrap();
handle.try_send(999).unwrap(); handle.try_send(1).unwrap();
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_idle();
handle.abort();
runtime.run_until_quiescent();
let join = futures_lite::future::block_on(handle.join(&cx));
let actor = join.expect("aborting the restarted actor should still return final state");
assert_eq!(
restart_count.load(Ordering::SeqCst),
2,
"panic must trigger exactly one supervised restart, got {} factory calls",
restart_count.load(Ordering::SeqCst)
);
assert_eq!(
actor.count, 1,
"restarted actor should keep the post-crash message count"
);
assert_eq!(
final_count.load(Ordering::SeqCst),
1,
"restarted actor should process the queued post-crash message before abort"
);
crate::test_complete!("supervised_actor_panic_restarts_under_restart_strategy");
}
#[test]
fn supervised_restart_window_expires_without_timer_driver() {
use std::thread;
init_test("supervised_restart_window_expires_without_timer_driver");
let cx = Cx::new(
RegionId::testing_default(),
TaskId::new_for_test(1, 1),
Budget::INFINITE,
);
let mut supervisor =
crate::supervision::Supervisor::new(crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(1, Duration::from_millis(2))
.with_backoff(crate::supervision::BackoffStrategy::None),
));
let outcome = Outcome::Err(());
let task_id = TaskId::new_for_test(2, 1);
let first = supervisor.on_failure(
task_id,
RegionId::testing_default(),
None,
&outcome,
supervised_restart_timestamp(&cx),
);
assert!(
matches!(
first,
crate::supervision::SupervisionDecision::Restart { attempt: 1, .. }
),
"first failure should allow a restart"
);
thread::sleep(Duration::from_millis(5));
let second = supervisor.on_failure(
task_id,
RegionId::testing_default(),
None,
&outcome,
supervised_restart_timestamp(&cx),
);
assert!(
matches!(
second,
crate::supervision::SupervisionDecision::Restart { attempt: 1, .. }
),
"wall-clock fallback must let the restart window expire without a timer driver"
);
crate::test_complete!("supervised_restart_window_expires_without_timer_driver");
}
#[test]
fn supervised_actor_stop_prevents_restart_after_panic() {
use std::sync::atomic::AtomicU32;
#[derive(Debug)]
struct StopThenPanicActor;
impl Actor for StopThenPanicActor {
type Message = ();
fn handle(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
panic!("panic during shutdown");
}
}
init_test("supervised_actor_stop_prevents_restart_after_panic");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let restart_count = Arc::new(AtomicU32::new(0));
let rc = Arc::clone(&restart_count);
let strategy = crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(3, Duration::from_secs(60))
.with_backoff(crate::supervision::BackoffStrategy::None),
);
let (mut handle, stored) = scope
.spawn_supervised_actor(
&mut runtime.state,
&cx,
move || {
rc.fetch_add(1, Ordering::SeqCst);
StopThenPanicActor
},
strategy,
8,
)
.expect("spawn supervised actor");
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_send(()).expect("queue panic message");
handle.stop();
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
assert_eq!(
restart_count.load(Ordering::SeqCst),
1,
"explicit stop must suppress supervised restarts"
);
let join = futures_lite::future::block_on(handle.join(&cx));
match join {
Err(JoinError::Panicked(payload)) => {
assert_eq!(
payload.message(),
"panic during shutdown",
"shutdown panic should surface without restarting"
);
}
other => panic!("expected shutdown panic without restart, got {other:?}"),
}
crate::test_complete!("supervised_actor_stop_prevents_restart_after_panic");
}
#[test]
fn supervised_actor_stop_during_restart_backoff_prevents_new_instance() {
use std::sync::atomic::AtomicU32;
#[derive(Debug)]
struct DelayedRestartActor {
starts: Arc<AtomicU32>,
}
impl Actor for DelayedRestartActor {
type Message = ();
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let starts = Arc::clone(&self.starts);
Box::pin(async move {
starts.fetch_add(1, Ordering::SeqCst);
})
}
fn handle(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
panic!("panic before delayed restart");
}
}
init_test("supervised_actor_stop_during_restart_backoff_prevents_new_instance");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let factory_count = Arc::new(AtomicU32::new(0));
let starts = Arc::new(AtomicU32::new(0));
let fc = Arc::clone(&factory_count);
let starts_for_factory = Arc::clone(&starts);
let strategy = crate::supervision::SupervisionStrategy::Restart(
crate::supervision::RestartConfig::new(3, Duration::from_secs(60)).with_backoff(
crate::supervision::BackoffStrategy::Fixed(Duration::from_secs(5)),
),
);
let (mut handle, stored) = scope
.spawn_supervised_actor(
&mut runtime.state,
&cx,
move || {
fc.fetch_add(1, Ordering::SeqCst);
DelayedRestartActor {
starts: Arc::clone(&starts_for_factory),
}
},
strategy,
8,
)
.expect("spawn supervised actor");
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_send(()).expect("queue panic message");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_idle();
assert_eq!(
runtime.pending_timer_count(),
1,
"supervised actor should be waiting on restart backoff"
);
handle.stop();
let report = runtime.run_with_auto_advance();
assert!(
matches!(
report.termination,
crate::lab::AutoAdvanceTermination::Quiescent
),
"runtime should quiesce after stop suppresses restart: {report:?}"
);
assert_eq!(
factory_count.load(Ordering::SeqCst),
1,
"graceful stop during backoff must prevent a replacement actor from being constructed"
);
assert_eq!(
starts.load(Ordering::SeqCst),
1,
"graceful stop during backoff must prevent restarted actor lifecycle hooks from running"
);
let join = futures_lite::future::block_on(handle.join(&cx));
match join {
Err(JoinError::Panicked(payload)) => {
assert_eq!(
payload.message(),
"panic before delayed restart",
"original panic should surface when restart is suppressed"
);
}
other => panic!(
"expected original panic when stop suppresses delayed restart, got {other:?}"
),
}
crate::test_complete!("supervised_actor_stop_during_restart_backoff_prevents_new_instance");
}
#[test]
fn spawn_actor_panic_surfaces_as_task_outcome() {
init_test("spawn_actor_panic_surfaces_as_task_outcome");
#[derive(Debug)]
struct PanicActor;
impl Actor for PanicActor {
type Message = ();
fn handle(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
panic!("actor boom");
}
}
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (mut handle, mut stored) = scope
.spawn_actor(&mut state, &cx, PanicActor, 8)
.expect("spawn actor");
handle.try_send(()).expect("queue panic message");
let waker = counting_waker(Arc::new(std::sync::atomic::AtomicUsize::new(0)));
let mut poll_cx = Context::from_waker(&waker);
match stored.poll(&mut poll_cx) {
Poll::Ready(Outcome::Panicked(payload)) => {
assert_eq!(payload.message(), "actor boom", "panic payload preserved");
}
other => panic!("panicking actor task must return Outcome::Panicked: {other:?}"),
}
let join = std::pin::pin!(handle.join(&cx));
let mut join = join;
match join.as_mut().poll(&mut poll_cx) {
Poll::Ready(Err(JoinError::Panicked(payload))) => {
assert_eq!(
payload.message(),
"actor boom",
"join preserves panic payload"
);
}
other => panic!("join must surface actor panic: {other:?}"),
}
crate::test_complete!("spawn_actor_panic_surfaces_as_task_outcome");
}
#[test]
fn spawn_supervised_actor_panic_surfaces_as_task_outcome() {
init_test("spawn_supervised_actor_panic_surfaces_as_task_outcome");
#[derive(Debug)]
struct PanicActor;
impl Actor for PanicActor {
type Message = ();
fn handle(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
panic!("supervised actor boom");
}
}
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (mut handle, mut stored) = scope
.spawn_supervised_actor(
&mut state,
&cx,
|| PanicActor,
crate::supervision::SupervisionStrategy::Stop,
8,
)
.expect("spawn supervised actor");
handle.try_send(()).expect("queue panic message");
let waker = counting_waker(Arc::new(std::sync::atomic::AtomicUsize::new(0)));
let mut poll_cx = Context::from_waker(&waker);
match stored.poll(&mut poll_cx) {
Poll::Ready(Outcome::Panicked(payload)) => {
assert_eq!(
payload.message(),
"supervised actor boom",
"panic payload preserved"
);
}
other => {
panic!("panicking supervised actor task must return Outcome::Panicked: {other:?}")
}
}
let join = std::pin::pin!(handle.join(&cx));
let mut join = join;
match join.as_mut().poll(&mut poll_cx) {
Poll::Ready(Err(JoinError::Panicked(payload))) => {
assert_eq!(
payload.message(),
"supervised actor boom",
"join preserves panic payload"
);
}
other => panic!("join must surface supervised actor panic: {other:?}"),
}
crate::test_complete!("spawn_supervised_actor_panic_surfaces_as_task_outcome");
}
#[test]
fn supervised_restart_delay_honors_cancellation() {
init_test("supervised_restart_delay_honors_cancellation");
let cx = Cx::for_testing();
cx.cancel_fast(crate::types::CancelKind::User);
let mut delay = std::pin::pin!(wait_supervised_restart_delay(
&cx,
std::time::Duration::from_secs(60),
));
let first_poll =
futures_lite::future::block_on(futures_lite::future::poll_once(&mut delay));
match first_poll {
Some(Err(JoinError::Cancelled(reason))) => {
assert_eq!(reason.kind, crate::types::CancelKind::User);
}
other => panic!("expected immediate cancellation, got {other:?}"),
}
crate::test_complete!("supervised_restart_delay_honors_cancellation");
}
#[test]
fn actor_deterministic_replay() {
fn run_scenario(seed: u64) -> u64 {
let config = crate::lab::LabConfig::new(seed);
let mut runtime = crate::lab::LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (on_stop_count, started, stopped) = observable_state();
let actor = ObservableCounter::new(on_stop_count.clone(), started, stopped);
let (handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, actor, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
for i in 1..=10 {
handle.try_send(i).unwrap();
}
drop(handle);
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
on_stop_count.load(Ordering::SeqCst)
}
init_test("actor_deterministic_replay");
let result1 = run_scenario(0xDEAD_BEEF);
let result2 = run_scenario(0xDEAD_BEEF);
assert_eq!(
result1, result2,
"deterministic replay: same seed → same result"
);
assert_eq!(result1, 55, "sum of 1..=10");
crate::test_complete!("actor_deterministic_replay");
}
#[test]
fn actor_context_self_reference() {
init_test("actor_context_self_reference");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let actor_ref = handle.sender();
let actor_id = handle.actor_id();
let ctx: ActorContext<'_, u64> = ActorContext::new(&cx, actor_ref, actor_id, None);
assert_eq!(ctx.self_actor_id(), actor_id);
assert_eq!(ctx.actor_id(), actor_id);
crate::test_complete!("actor_context_self_reference");
}
#[test]
fn actor_context_child_management() {
init_test("actor_context_child_management");
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
assert!(!ctx.has_children());
assert_eq!(ctx.child_count(), 0);
assert!(ctx.children().is_empty());
let child1 = ActorId::from_task(TaskId::new_for_test(2, 1));
let child2 = ActorId::from_task(TaskId::new_for_test(3, 1));
ctx.register_child(child1);
assert!(ctx.has_children());
assert_eq!(ctx.child_count(), 1);
ctx.register_child(child2);
assert_eq!(ctx.child_count(), 2);
assert!(ctx.unregister_child(child1));
assert_eq!(ctx.child_count(), 1);
assert!(!ctx.unregister_child(child1));
crate::test_complete!("actor_context_child_management");
}
#[test]
fn actor_context_stopping() {
init_test("actor_context_stopping");
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
assert!(!ctx.is_stopping());
assert!(ctx.checkpoint().is_ok());
ctx.stop_self();
assert!(ctx.is_stopping());
assert!(ctx.checkpoint().is_err());
assert!(cx.checkpoint().is_ok());
assert!(ctx.is_cancel_requested());
crate::test_complete!("actor_context_stopping");
}
#[test]
fn actor_context_parent_none() {
init_test("actor_context_parent_none");
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
assert!(!ctx.has_parent());
assert!(ctx.parent().is_none());
crate::test_complete!("actor_context_parent_none");
}
#[test]
fn actor_context_cx_delegation() {
init_test("actor_context_cx_delegation");
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
let _budget = ctx.budget();
ctx.trace("test_event");
let _cx_ref = ctx.cx();
crate::test_complete!("actor_context_cx_delegation");
}
#[test]
fn actor_context_debug() {
init_test("actor_context_debug");
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
let debug_str = format!("{ctx:?}");
assert!(debug_str.contains("ActorContext"));
assert!(debug_str.contains("actor_id"));
crate::test_complete!("actor_context_debug");
}
#[test]
fn actor_state_cell_encode_decode_roundtrip() {
init_test("actor_state_cell_encode_decode_roundtrip");
let states = [
ActorState::Created,
ActorState::Running,
ActorState::Stopping,
ActorState::Stopped,
];
for &state in &states {
let cell = ActorStateCell::new(state);
let loaded = cell.load();
crate::assert_with_log!(loaded == state, "roundtrip", state, loaded);
}
for raw in 4_u8..=10 {
let decoded = ActorStateCell::decode(raw);
let is_stopped = decoded == ActorState::Stopped;
crate::assert_with_log!(is_stopped, "unknown u8 -> Stopped", true, is_stopped);
}
crate::test_complete!("actor_state_cell_encode_decode_roundtrip");
}
#[test]
fn mailbox_config_defaults() {
init_test("mailbox_config_defaults");
let config = MailboxConfig::default();
crate::assert_with_log!(
config.capacity == DEFAULT_MAILBOX_CAPACITY,
"default capacity",
DEFAULT_MAILBOX_CAPACITY,
config.capacity
);
crate::assert_with_log!(
config.backpressure,
"backpressure enabled by default",
true,
config.backpressure
);
let custom = MailboxConfig::with_capacity(8);
crate::assert_with_log!(
custom.capacity == 8,
"custom capacity",
8usize,
custom.capacity
);
crate::assert_with_log!(
custom.backpressure,
"with_capacity enables backpressure",
true,
custom.backpressure
);
crate::test_complete!("mailbox_config_defaults");
}
#[test]
fn actor_try_send_full_mailbox_returns_error() {
init_test("actor_try_send_full_mailbox_returns_error");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 2)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let ok1 = handle.try_send(1).is_ok();
crate::assert_with_log!(ok1, "first send ok", true, ok1);
let ok2 = handle.try_send(2).is_ok();
crate::assert_with_log!(ok2, "second send ok", true, ok2);
let result = handle.try_send(3);
let is_full = result.is_err();
crate::assert_with_log!(is_full, "third send fails (full)", true, is_full);
crate::test_complete!("actor_try_send_full_mailbox_returns_error");
}
#[test]
fn actor_context_with_parent_supervisor() {
init_test("actor_context_with_parent_supervisor");
let cx: Cx = Cx::for_testing();
let (parent_sender, _parent_receiver) = mpsc::channel::<SupervisorMessage>(8);
let parent_id = ActorId::from_task(TaskId::new_for_test(10, 1));
let parent_ref = ActorRef {
actor_id: parent_id,
sender: parent_sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let (child_sender, _child_receiver) = mpsc::channel::<u64>(32);
let child_id = ActorId::from_task(TaskId::new_for_test(20, 1));
let child_ref = ActorRef {
actor_id: child_id,
sender: child_sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, child_ref, child_id, Some(parent_ref));
let has_parent = ctx.has_parent();
crate::assert_with_log!(has_parent, "has parent", true, has_parent);
let parent = ctx.parent().expect("parent should be Some");
let parent_id_matches = parent.actor_id() == parent_id;
crate::assert_with_log!(
parent_id_matches,
"parent id matches",
true,
parent_id_matches
);
crate::test_complete!("actor_context_with_parent_supervisor");
}
#[test]
fn actor_id_debug_format() {
let id = ActorId::from_task(TaskId::new_for_test(5, 3));
let dbg = format!("{id:?}");
assert!(dbg.contains("ActorId"), "{dbg}");
}
#[test]
fn actor_id_display_delegates_to_task_id() {
let tid = TaskId::new_for_test(7, 2);
let aid = ActorId::from_task(tid);
assert_eq!(format!("{aid}"), format!("{tid}"));
}
#[test]
fn actor_id_from_task_roundtrip() {
let tid = TaskId::new_for_test(3, 1);
let aid = ActorId::from_task(tid);
assert_eq!(aid.task_id(), tid);
}
#[test]
fn actor_id_copy_clone() {
let id = ActorId::from_task(TaskId::new_for_test(1, 1));
let copied = id; let cloned = id;
assert_eq!(id, copied);
assert_eq!(id, cloned);
}
#[test]
fn actor_id_hash_consistency() {
use crate::util::DetHasher;
use std::hash::{Hash, Hasher};
let id1 = ActorId::from_task(TaskId::new_for_test(4, 2));
let id2 = ActorId::from_task(TaskId::new_for_test(4, 2));
assert_eq!(id1, id2);
let mut h1 = DetHasher::default();
let mut h2 = DetHasher::default();
id1.hash(&mut h1);
id2.hash(&mut h2);
assert_eq!(h1.finish(), h2.finish(), "equal IDs must hash equal");
}
#[test]
fn actor_state_debug_all_variants() {
for (state, expected) in [
(ActorState::Created, "Created"),
(ActorState::Running, "Running"),
(ActorState::Stopping, "Stopping"),
(ActorState::Stopped, "Stopped"),
] {
let dbg = format!("{state:?}");
assert_eq!(dbg, expected, "ActorState::{expected}");
}
}
#[test]
fn actor_state_clone_copy_eq() {
let s = ActorState::Running;
let copied = s;
let cloned = s;
assert_eq!(s, copied);
assert_eq!(s, cloned);
}
#[test]
fn actor_state_exhaustive_inequality() {
let all = [
ActorState::Created,
ActorState::Running,
ActorState::Stopping,
ActorState::Stopped,
];
for (i, a) in all.iter().enumerate() {
for (j, b) in all.iter().enumerate() {
if i == j {
assert_eq!(a, b);
} else {
assert_ne!(a, b);
}
}
}
}
#[test]
fn actor_state_cell_sequential_transitions() {
let cell = ActorStateCell::new(ActorState::Created);
assert_eq!(cell.load(), ActorState::Created);
cell.store(ActorState::Running);
assert_eq!(cell.load(), ActorState::Running);
cell.store(ActorState::Stopping);
assert_eq!(cell.load(), ActorState::Stopping);
cell.store(ActorState::Stopped);
assert_eq!(cell.load(), ActorState::Stopped);
}
#[test]
fn supervisor_message_debug_child_failed() {
let msg = SupervisorMessage::ChildFailed {
child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
reason: "panicked".to_string(),
};
let dbg = format!("{msg:?}");
assert!(dbg.contains("ChildFailed"), "{dbg}");
assert!(dbg.contains("panicked"), "{dbg}");
}
#[test]
fn supervisor_message_debug_child_stopped() {
let msg = SupervisorMessage::ChildStopped {
child_id: ActorId::from_task(TaskId::new_for_test(2, 1)),
};
let dbg = format!("{msg:?}");
assert!(dbg.contains("ChildStopped"), "{dbg}");
}
#[test]
fn supervisor_message_clone() {
let msg = SupervisorMessage::ChildFailed {
child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
reason: "boom".to_string(),
};
let cloned = msg.clone();
let (a, b) = (format!("{msg:?}"), format!("{cloned:?}"));
assert_eq!(a, b);
}
#[test]
fn supervised_outcome_debug_all_variants() {
let variants: Vec<SupervisedOutcome> = vec![
SupervisedOutcome::Stopped,
SupervisedOutcome::RestartBudgetExhausted { total_restarts: 5 },
SupervisedOutcome::Escalated,
];
for v in &variants {
let dbg = format!("{v:?}");
assert!(!dbg.is_empty());
}
assert!(format!("{variants0:?}", variants0 = variants[0]).contains("Stopped"));
assert!(format!("{variants1:?}", variants1 = variants[1]).contains('5'));
assert!(format!("{variants2:?}", variants2 = variants[2]).contains("Escalated"));
}
#[test]
fn mailbox_config_debug_clone_copy() {
let cfg = MailboxConfig::default();
let dbg = format!("{cfg:?}");
assert!(dbg.contains("MailboxConfig"), "{dbg}");
assert!(dbg.contains("64"), "{dbg}");
let copied = cfg;
let cloned = cfg;
assert_eq!(copied.capacity, cfg.capacity);
assert_eq!(cloned.backpressure, cfg.backpressure);
}
#[test]
fn mailbox_config_zero_capacity() {
let cfg = MailboxConfig::with_capacity(0);
assert_eq!(cfg.capacity, 0);
assert!(cfg.backpressure);
}
#[test]
fn mailbox_config_max_capacity() {
let cfg = MailboxConfig::with_capacity(usize::MAX);
assert_eq!(cfg.capacity, usize::MAX);
}
#[test]
fn default_mailbox_capacity_is_64() {
assert_eq!(DEFAULT_MAILBOX_CAPACITY, 64);
}
#[test]
fn actor_context_duplicate_child_registration() {
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
let child = ActorId::from_task(TaskId::new_for_test(2, 1));
ctx.register_child(child);
ctx.register_child(child); assert_eq!(ctx.child_count(), 2, "register_child does not dedup");
assert!(ctx.unregister_child(child));
assert_eq!(ctx.child_count(), 1, "one copy remains");
assert!(ctx.unregister_child(child));
assert_eq!(ctx.child_count(), 0);
assert!(!ctx.unregister_child(child), "nothing left to remove");
}
#[test]
fn actor_context_stop_self_is_idempotent() {
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
ctx.stop_self();
assert!(ctx.is_stopping());
ctx.stop_self(); assert!(ctx.is_stopping());
}
#[test]
fn actor_context_self_ref_returns_working_ref() {
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
let self_ref = ctx.self_ref();
assert_eq!(self_ref.actor_id(), actor_id);
assert!(self_ref.is_alive());
}
#[test]
fn actor_context_deadline_reflects_budget() {
let cx: Cx = Cx::for_testing();
let (sender, _receiver) = mpsc::channel::<u64>(32);
let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
let actor_ref = ActorRef {
actor_id,
sender,
state: Arc::new(ActorStateCell::new(ActorState::Running)),
};
let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
assert!(ctx.deadline().is_none());
}
#[test]
fn actor_handle_debug() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let dbg = format!("{handle:?}");
assert!(dbg.contains("ActorHandle"), "{dbg}");
}
#[test]
fn actor_handle_second_join_fails_closed() {
init_test("actor_handle_second_join_fails_closed");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (mut handle, stored) = scope
.spawn_actor(&mut runtime.state, &cx, Counter::new(), 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.stop();
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
assert!(handle.is_finished(), "stopped actor should report finished");
let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
assert_eq!(final_state.count, 0, "join should return final actor state");
let second = futures_lite::future::block_on(handle.join(&cx));
assert!(
matches!(second, Err(JoinError::PolledAfterCompletion)),
"second join must fail closed, got {second:?}"
);
crate::test_complete!("actor_handle_second_join_fails_closed");
}
#[test]
fn actor_ref_debug() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx: Cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_actor(&mut state, &cx, Counter::new(), 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let actor_ref = handle.sender();
let dbg = format!("{actor_ref:?}");
assert!(dbg.contains("ActorRef"), "{dbg}");
}
#[test]
fn actor_state_cell_debug() {
let cell = ActorStateCell::new(ActorState::Running);
let dbg = format!("{cell:?}");
assert!(dbg.contains("ActorStateCell"), "{dbg}");
}
#[test]
fn actor_id_clone_copy_eq_hash() {
use std::collections::HashSet;
let id = ActorId::from_task(TaskId::new_for_test(1, 0));
let dbg = format!("{id:?}");
assert!(dbg.contains("ActorId"));
let id2 = id;
assert_eq!(id, id2);
let id3 = id;
assert_eq!(id, id3);
let mut set = HashSet::new();
set.insert(id);
set.insert(ActorId::from_task(TaskId::new_for_test(2, 0)));
assert_eq!(set.len(), 2);
}
#[test]
fn actor_state_debug_clone_copy_eq() {
let s = ActorState::Running;
let dbg = format!("{s:?}");
assert!(dbg.contains("Running"));
let s2 = s;
assert_eq!(s, s2);
let s3 = s;
assert_eq!(s, s3);
assert_ne!(ActorState::Created, ActorState::Stopped);
}
#[test]
fn mailbox_config_debug_clone_copy_default() {
let c = MailboxConfig::default();
let dbg = format!("{c:?}");
assert!(dbg.contains("MailboxConfig"));
let c2 = c;
assert_eq!(c2.capacity, c.capacity);
assert_eq!(c2.backpressure, c.backpressure);
let c3 = c;
assert_eq!(c3.capacity, c.capacity);
}
}