1use std::future::Future;
43use std::pin::Pin;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicU8, Ordering};
46
47use crate::channel::mpsc;
48use crate::channel::mpsc::SendError;
49use crate::cx::Cx;
50use crate::runtime::{JoinError, SpawnError};
51use crate::types::{CxInner, Outcome, RegionId, TaskId, Time};
52
53#[derive(Clone, Copy, PartialEq, Eq, Hash)]
59pub struct ActorId(TaskId);
60
61impl ActorId {
62 #[must_use]
64 pub const fn from_task(task_id: TaskId) -> Self {
65 Self(task_id)
66 }
67
68 #[must_use]
70 pub const fn task_id(self) -> TaskId {
71 self.0
72 }
73}
74
75impl std::fmt::Debug for ActorId {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_tuple("ActorId").field(&self.0).finish()
78 }
79}
80
81impl std::fmt::Display for ActorId {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(f, "{}", self.0)
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ActorState {
92 Created,
94 Running,
96 Stopping,
98 Stopped,
100}
101
102#[derive(Debug)]
103struct ActorStateCell {
104 state: AtomicU8,
105}
106
107impl ActorStateCell {
108 fn new(state: ActorState) -> Self {
109 Self {
110 state: AtomicU8::new(Self::encode(state)),
111 }
112 }
113
114 fn load(&self) -> ActorState {
115 Self::decode(self.state.load(Ordering::Acquire))
116 }
117
118 fn store(&self, state: ActorState) {
119 self.state.store(Self::encode(state), Ordering::Release);
120 }
121
122 const fn encode(state: ActorState) -> u8 {
123 match state {
124 ActorState::Created => 0,
125 ActorState::Running => 1,
126 ActorState::Stopping => 2,
127 ActorState::Stopped => 3,
128 }
129 }
130
131 const fn decode(value: u8) -> ActorState {
132 match value {
133 0 => ActorState::Created,
134 1 => ActorState::Running,
135 2 => ActorState::Stopping,
136 _ => ActorState::Stopped,
137 }
138 }
139}
140
141struct ActorCell<M> {
146 mailbox: mpsc::Receiver<M>,
147 state: Arc<ActorStateCell>,
148}
149
150pub trait Actor: Send + 'static {
164 type Message: Send + 'static;
166
167 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
172 Box::pin(async {})
173 }
174
175 fn handle(
180 &mut self,
181 cx: &Cx,
182 msg: Self::Message,
183 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
184
185 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
189 Box::pin(async {})
190 }
191}
192
193#[derive(Debug)]
202pub struct ActorHandle<A: Actor> {
203 actor_id: ActorId,
204 sender: mpsc::Sender<A::Message>,
205 state: Arc<ActorStateCell>,
206 task_id: TaskId,
207 receiver: crate::channel::oneshot::Receiver<Result<A, JoinError>>,
208 inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
209 completed: bool,
210}
211
212impl<A: Actor> ActorHandle<A> {
213 pub async fn send(&self, cx: &Cx, msg: A::Message) -> Result<(), SendError<A::Message>> {
217 self.sender.send(cx, msg).await
218 }
219
220 pub fn try_send(&self, msg: A::Message) -> Result<(), SendError<A::Message>> {
225 self.sender.try_send(msg)
226 }
227
228 #[must_use]
230 pub fn sender(&self) -> ActorRef<A::Message> {
231 ActorRef {
232 actor_id: self.actor_id,
233 sender: self.sender.clone(),
234 state: Arc::clone(&self.state),
235 }
236 }
237
238 #[must_use]
240 pub const fn actor_id(&self) -> ActorId {
241 self.actor_id
242 }
243
244 #[must_use]
246 pub fn task_id(&self) -> crate::types::TaskId {
247 self.task_id
248 }
249
250 pub fn stop(&self) {
260 self.state.store(ActorState::Stopping);
261 self.sender.wake_receiver();
262 }
263
264 #[must_use]
266 pub fn is_finished(&self) -> bool {
267 self.completed || self.receiver.is_ready() || self.receiver.is_closed()
268 }
269
270 fn closed_reason(&self) -> crate::types::CancelReason {
271 self.inner
272 .upgrade()
273 .and_then(|inner| inner.read().cancel_reason.clone())
274 .unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
275 }
276
277 pub async fn join(&mut self, cx: &Cx) -> Result<A, JoinError> {
282 if self.completed {
283 return Err(JoinError::PolledAfterCompletion);
284 }
285
286 match self.receiver.recv(cx).await {
287 Ok(result) => {
288 self.completed = true;
289 result
290 }
291 Err(crate::channel::oneshot::RecvError::Closed) => {
292 self.completed = true;
293 Err(JoinError::Cancelled(self.closed_reason()))
294 }
295 Err(crate::channel::oneshot::RecvError::Cancelled) => {
296 let reason = cx
297 .cancel_reason()
298 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
299 Err(JoinError::Cancelled(reason))
300 }
301 Err(crate::channel::oneshot::RecvError::PolledAfterCompletion) => {
302 unreachable!("ActorHandle::join awaits a fresh oneshot recv future")
303 }
304 }
305 }
306
307 pub fn abort(&self) {
313 self.state.store(ActorState::Stopping);
314 if let Some(inner) = self.inner.upgrade() {
315 let mut guard = inner.write();
316 guard.cancel_requested = true;
317 guard
318 .fast_cancel
319 .store(true, std::sync::atomic::Ordering::Release);
320 }
321 self.sender.wake_receiver();
322 }
323}
324
325#[derive(Debug)]
330pub struct ActorRef<M> {
331 actor_id: ActorId,
332 sender: mpsc::Sender<M>,
333 state: Arc<ActorStateCell>,
334}
335
336impl<M> Clone for ActorRef<M> {
340 fn clone(&self) -> Self {
341 Self {
342 actor_id: self.actor_id,
343 sender: self.sender.clone(),
344 state: Arc::clone(&self.state),
345 }
346 }
347}
348
349impl<M: Send + 'static> ActorRef<M> {
350 pub async fn send(&self, cx: &Cx, msg: M) -> Result<(), SendError<M>> {
352 self.sender.send(cx, msg).await
353 }
354
355 #[must_use]
357 pub fn reserve<'a>(&'a self, cx: &'a Cx) -> mpsc::Reserve<'a, M> {
358 self.sender.reserve(cx)
359 }
360
361 pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
363 self.sender.try_send(msg)
364 }
365
366 #[must_use]
368 pub fn is_closed(&self) -> bool {
369 self.sender.is_closed()
370 }
371
372 #[must_use]
376 pub fn is_alive(&self) -> bool {
377 self.state.load() != ActorState::Stopped
378 }
379
380 #[must_use]
382 pub const fn actor_id(&self) -> ActorId {
383 self.actor_id
384 }
385}
386
387#[derive(Debug, Clone, Copy)]
393pub struct MailboxConfig {
394 pub capacity: usize,
396 pub backpressure: bool,
398}
399
400impl Default for MailboxConfig {
401 fn default() -> Self {
402 Self {
403 capacity: DEFAULT_MAILBOX_CAPACITY,
404 backpressure: true,
405 }
406 }
407}
408
409impl MailboxConfig {
410 #[must_use]
412 pub const fn with_capacity(capacity: usize) -> Self {
413 Self {
414 capacity,
415 backpressure: true,
416 }
417 }
418}
419
420#[derive(Debug, Clone)]
422pub enum SupervisorMessage {
423 ChildFailed {
425 child_id: ActorId,
427 reason: String,
429 },
430 ChildStopped {
432 child_id: ActorId,
434 },
435}
436
437pub struct ActorContext<'a, M: Send + 'static> {
462 cx: &'a Cx,
464 self_ref: ActorRef<M>,
466 actor_id: ActorId,
468 parent: Option<ActorRef<SupervisorMessage>>,
470 children: Vec<ActorId>,
472 stopping: bool,
474}
475
476#[allow(clippy::elidable_lifetime_names)]
477impl<'a, M: Send + 'static> ActorContext<'a, M> {
478 #[must_use]
482 pub fn new(
483 cx: &'a Cx,
484 self_ref: ActorRef<M>,
485 actor_id: ActorId,
486 parent: Option<ActorRef<SupervisorMessage>>,
487 ) -> Self {
488 Self {
489 cx,
490 self_ref,
491 actor_id,
492 parent,
493 children: Vec::new(),
494 stopping: false,
495 }
496 }
497
498 #[must_use]
503 pub const fn self_actor_id(&self) -> ActorId {
504 self.actor_id
505 }
506
507 #[must_use]
509 pub const fn actor_id(&self) -> ActorId {
510 self.actor_id
511 }
512
513 pub fn register_child(&mut self, child_id: ActorId) {
521 self.children.push(child_id);
522 }
523
524 pub fn unregister_child(&mut self, child_id: ActorId) -> bool {
528 if let Some(pos) = self.children.iter().position(|&id| id == child_id) {
529 self.children.swap_remove(pos);
530 true
531 } else {
532 false
533 }
534 }
535
536 #[must_use]
538 pub fn children(&self) -> &[ActorId] {
539 &self.children
540 }
541
542 #[must_use]
544 pub fn has_children(&self) -> bool {
545 !self.children.is_empty()
546 }
547
548 #[must_use]
550 pub fn child_count(&self) -> usize {
551 self.children.len()
552 }
553
554 pub fn stop_self(&mut self) {
563 self.stopping = true;
564 }
565
566 #[must_use]
568 pub fn is_stopping(&self) -> bool {
569 self.stopping
570 }
571
572 #[must_use]
580 pub fn parent(&self) -> Option<&ActorRef<SupervisorMessage>> {
581 self.parent.as_ref()
582 }
583
584 #[must_use]
586 pub fn has_parent(&self) -> bool {
587 self.parent.is_some()
588 }
589
590 pub async fn escalate(&self, reason: String) {
595 if let Some(parent) = &self.parent {
596 let msg = SupervisorMessage::ChildFailed {
597 child_id: self.actor_id,
598 reason,
599 };
600 let _ = parent.send(self.cx, msg).await;
602 }
603 }
604
605 #[allow(clippy::result_large_err)]
614 pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
615 if self.stopping {
616 let reason = crate::types::CancelReason::user("actor stopping")
617 .with_region(self.cx.region_id())
618 .with_task(self.cx.task_id());
619 return Err(crate::error::Error::cancelled(&reason));
620 }
621 self.cx.checkpoint()
622 }
623
624 #[must_use]
628 pub fn is_cancel_requested(&self) -> bool {
629 self.stopping || self.cx.is_cancel_requested()
630 }
631
632 #[must_use]
634 pub fn budget(&self) -> crate::types::Budget {
635 self.cx.budget()
636 }
637
638 #[must_use]
640 pub fn deadline(&self) -> Option<Time> {
641 self.cx.budget().deadline
642 }
643
644 pub fn trace(&self, event: &str) {
646 self.cx.trace(event);
647 }
648
649 #[must_use]
654 pub fn self_ref(&self) -> ActorRef<M> {
655 self.self_ref.clone()
656 }
657
658 #[must_use]
660 pub const fn cx(&self) -> &Cx {
661 self.cx
662 }
663}
664
665impl<M: Send + 'static> std::ops::Deref for ActorContext<'_, M> {
666 type Target = Cx;
667
668 fn deref(&self) -> &Self::Target {
669 self.cx
670 }
671}
672
673impl<M: Send + 'static> std::fmt::Debug for ActorContext<'_, M> {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 f.debug_struct("ActorContext")
676 .field("actor_id", &self.actor_id)
677 .field("children", &self.children.len())
678 .field("stopping", &self.stopping)
679 .field("has_parent", &self.parent.is_some())
680 .finish()
681 }
682}
683
684pub const DEFAULT_MAILBOX_CAPACITY: usize = 64;
686
687struct OnStopMaskGuard(Arc<parking_lot::RwLock<CxInner>>);
688
689impl Drop for OnStopMaskGuard {
690 fn drop(&mut self) {
691 let mut g = self.0.write();
692 g.mask_depth = g.mask_depth.saturating_sub(1);
693 }
694}
695
696async fn run_actor_loop<A: Actor>(mut actor: A, cx: Cx, cell: &mut ActorCell<A::Message>) -> A {
705 use crate::tracing_compat::debug;
706
707 if cell.state.load() != ActorState::Stopping {
711 cell.state.store(ActorState::Running);
712 }
713
714 cx.trace("actor::on_start");
719 actor.on_start(&cx).await;
720
721 loop {
723 if cx.is_cancel_requested() {
725 cx.trace("actor::cancel_requested");
726 break;
727 }
728
729 let recv_result = std::future::poll_fn(|task_cx| {
730 match cell.mailbox.poll_recv(&cx, task_cx) {
731 std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
732 std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
734 }
735 other => other,
736 }
737 })
738 .await;
739
740 match recv_result {
741 Ok(msg) => {
742 actor.handle(&cx, msg).await;
743 }
744 Err(crate::channel::mpsc::RecvError::Disconnected) => {
745 cx.trace("actor::mailbox_disconnected");
747 break;
748 }
749 Err(crate::channel::mpsc::RecvError::Cancelled) => {
750 cx.trace("actor::recv_cancelled");
752 break;
753 }
754 Err(crate::channel::mpsc::RecvError::Empty) => {
755 break;
757 }
758 }
759 }
760
761 cell.state.store(ActorState::Stopping);
762
763 let is_aborted = cx.is_cancel_requested();
764
765 cell.mailbox.close();
771
772 if is_aborted {
773 while let Ok(_msg) = cell.mailbox.try_recv() {}
774 } else {
775 let mut drained: u64 = 0;
776 while let Ok(msg) = cell.mailbox.try_recv() {
777 actor.handle(&cx, msg).await;
778 drained += 1;
779 }
780 if drained > 0 {
781 debug!(drained = drained, "actor::mailbox_drained");
782 cx.trace("actor::mailbox_drained");
783 }
784 }
785
786 cx.trace("actor::on_stop");
791 let inner = cx.inner.clone();
792 inner.write().mask_depth += 1;
793 let mask_guard = OnStopMaskGuard(inner);
794 actor.on_stop(&cx).await;
795 drop(mask_guard);
796
797 actor
798}
799
800impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
802 pub fn spawn_actor<A: Actor>(
819 &self,
820 state: &mut crate::runtime::state::RuntimeState,
821 cx: &Cx,
822 actor: A,
823 mailbox_capacity: usize,
824 ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError> {
825 use crate::channel::oneshot;
826 use crate::cx::scope::CatchUnwind;
827 use crate::runtime::stored_task::StoredTask;
828 use crate::tracing_compat::{debug, debug_span};
829
830 let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
832
833 let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
835
836 let task_id = self.create_task_record(state)?;
838 let actor_id = ActorId::from_task(task_id);
839 let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
840
841 let _span = debug_span!(
842 "actor_spawn",
843 task_id = ?task_id,
844 region_id = ?self.region_id(),
845 mailbox_capacity = mailbox_capacity,
846 )
847 .entered();
848 debug!(
849 task_id = ?task_id,
850 region_id = ?self.region_id(),
851 mailbox_capacity = mailbox_capacity,
852 "actor spawned"
853 );
854
855 let child_observability = cx.child_observability(self.region_id(), task_id);
857 let child_entropy = cx.child_entropy(task_id);
858 let io_driver = state.io_driver_handle();
859 let child_cx = Cx::new_with_observability(
860 self.region_id(),
861 task_id,
862 self.budget(),
863 Some(child_observability),
864 io_driver,
865 Some(child_entropy),
866 )
867 .with_blocking_pool_handle(cx.blocking_pool_handle());
868
869 if let Some(record) = state.task_mut(task_id) {
871 record.set_cx_inner(child_cx.inner.clone());
872 record.set_cx(child_cx.clone());
873 }
874
875 let cx_for_send = child_cx.clone();
876 let inner_weak = Arc::downgrade(&child_cx.inner);
877 let state_for_task = Arc::clone(&actor_state);
878
879 let mut cell = ActorCell {
880 mailbox: msg_rx,
881 state: Arc::clone(&actor_state),
882 };
883
884 let wrapped = async move {
886 let result = CatchUnwind {
887 inner: Box::pin(run_actor_loop(actor, child_cx, &mut cell)),
888 }
889 .await;
890 match result {
891 Ok(actor_final) => {
892 let _ = result_tx.send(&cx_for_send, Ok(actor_final));
893 }
894 Err(payload) => {
895 let msg = crate::cx::scope::payload_to_string(&payload);
896 let _ = result_tx.send(
897 &cx_for_send,
898 Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
899 );
900 }
901 }
902 state_for_task.store(ActorState::Stopped);
903 Outcome::Ok(())
904 };
905
906 let stored = StoredTask::new_with_id(wrapped, task_id);
907
908 let handle = ActorHandle {
909 actor_id,
910 sender: msg_tx,
911 state: actor_state,
912 task_id,
913 receiver: result_rx,
914 inner: inner_weak,
915 completed: false,
916 };
917
918 Ok((handle, stored))
919 }
920
921 pub fn spawn_supervised_actor<A, F>(
936 &self,
937 state: &mut crate::runtime::state::RuntimeState,
938 cx: &Cx,
939 mut factory: F,
940 strategy: crate::supervision::SupervisionStrategy,
941 mailbox_capacity: usize,
942 ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError>
943 where
944 A: Actor,
945 F: FnMut() -> A + Send + 'static,
946 {
947 use crate::channel::oneshot;
948 use crate::runtime::stored_task::StoredTask;
949 use crate::supervision::Supervisor;
950 use crate::tracing_compat::{debug, debug_span};
951
952 let actor = factory();
953 let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
954 let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
955 let task_id = self.create_task_record(state)?;
956 let actor_id = ActorId::from_task(task_id);
957 let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
958
959 let _span = debug_span!(
960 "supervised_actor_spawn",
961 task_id = ?task_id,
962 region_id = ?self.region_id(),
963 mailbox_capacity = mailbox_capacity,
964 )
965 .entered();
966 debug!(
967 task_id = ?task_id,
968 region_id = ?self.region_id(),
969 "supervised actor spawned"
970 );
971
972 let child_observability = cx.child_observability(self.region_id(), task_id);
973 let child_entropy = cx.child_entropy(task_id);
974 let io_driver = state.io_driver_handle();
975 let child_cx = Cx::new_with_observability(
976 self.region_id(),
977 task_id,
978 self.budget(),
979 Some(child_observability),
980 io_driver,
981 Some(child_entropy),
982 )
983 .with_blocking_pool_handle(cx.blocking_pool_handle());
984
985 if let Some(record) = state.task_mut(task_id) {
986 record.set_cx_inner(child_cx.inner.clone());
987 record.set_cx(child_cx.clone());
988 }
989
990 let cx_for_send = child_cx.clone();
991 let inner_weak = Arc::downgrade(&child_cx.inner);
992 let region_id = self.region_id();
993 let state_for_task = Arc::clone(&actor_state);
994
995 let mut cell = ActorCell {
996 mailbox: msg_rx,
997 state: Arc::clone(&actor_state),
998 };
999
1000 let wrapped = async move {
1001 let result = run_supervised_loop(
1002 actor,
1003 &mut factory,
1004 child_cx,
1005 &mut cell,
1006 Supervisor::new(strategy),
1007 task_id,
1008 region_id,
1009 )
1010 .await;
1011 let _ = result_tx.send(&cx_for_send, result);
1012 state_for_task.store(ActorState::Stopped);
1013 Outcome::Ok(())
1014 };
1015
1016 let stored = StoredTask::new_with_id(wrapped, task_id);
1017
1018 let handle = ActorHandle {
1019 actor_id,
1020 sender: msg_tx,
1021 state: actor_state,
1022 task_id,
1023 receiver: result_rx,
1024 inner: inner_weak,
1025 completed: false,
1026 };
1027
1028 Ok((handle, stored))
1029 }
1030}
1031
1032#[derive(Debug)]
1034pub enum SupervisedOutcome {
1035 Stopped,
1037 RestartBudgetExhausted {
1039 total_restarts: u32,
1041 },
1042 Escalated,
1044}
1045
1046async fn run_supervised_loop<A, F>(
1051 initial_actor: A,
1052 factory: &mut F,
1053 cx: Cx,
1054 cell: &mut ActorCell<A::Message>,
1055 mut supervisor: crate::supervision::Supervisor,
1056 task_id: TaskId,
1057 region_id: RegionId,
1058) -> Result<A, JoinError>
1059where
1060 A: Actor,
1061 F: FnMut() -> A,
1062{
1063 use crate::cx::scope::CatchUnwind;
1064 use crate::supervision::SupervisionDecision;
1065 use crate::types::Outcome;
1066
1067 let mut current_actor = initial_actor;
1068
1069 loop {
1070 let result = CatchUnwind {
1072 inner: Box::pin(run_actor_loop(current_actor, cx.clone(), cell)),
1073 }
1074 .await;
1075
1076 match result {
1077 Ok(actor_final) => {
1078 return Ok(actor_final);
1080 }
1081 Err(payload) => {
1082 let msg = crate::cx::scope::payload_to_string(&payload);
1087 cx.trace("supervised_actor::failure");
1088
1089 let outcome = Outcome::err(());
1090 let now = cx.timer_driver().map_or(0, |td| td.now().as_nanos());
1091 let decision = supervisor.on_failure(task_id, region_id, None, &outcome, now);
1092
1093 match decision {
1094 SupervisionDecision::Restart { delay, .. } => {
1095 cx.trace("supervised_actor::restart");
1096
1097 cell.state.store(ActorState::Created);
1102
1103 if let Some(backoff) = delay {
1105 if !backoff.is_zero() {
1106 let now = cx
1107 .timer_driver()
1108 .map_or_else(crate::time::wall_now, |td| td.now());
1109 crate::time::sleep(now, backoff).await;
1110 }
1111 }
1112
1113 current_actor = factory();
1114 }
1115 SupervisionDecision::Stop { .. } => {
1116 cx.trace("supervised_actor::stopped");
1117 return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1118 }
1119 SupervisionDecision::Escalate { .. } => {
1120 cx.trace("supervised_actor::escalated");
1121 return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1122 }
1123 }
1124 }
1125 }
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use crate::runtime::state::RuntimeState;
1133 use crate::types::Budget;
1134 use crate::types::policy::FailFast;
1135
1136 fn init_test(name: &str) {
1137 crate::test_utils::init_test_logging();
1138 crate::test_phase!(name);
1139 }
1140
1141 #[derive(Debug)]
1143 struct Counter {
1144 count: u64,
1145 started: bool,
1146 stopped: bool,
1147 }
1148
1149 impl Counter {
1150 fn new() -> Self {
1151 Self {
1152 count: 0,
1153 started: false,
1154 stopped: false,
1155 }
1156 }
1157 }
1158
1159 impl Actor for Counter {
1160 type Message = u64;
1161
1162 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1163 self.started = true;
1164 Box::pin(async {})
1165 }
1166
1167 fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1168 self.count += msg;
1169 Box::pin(async {})
1170 }
1171
1172 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1173 self.stopped = true;
1174 Box::pin(async {})
1175 }
1176 }
1177
1178 fn assert_actor<A: Actor>() {}
1179
1180 #[test]
1181 fn actor_trait_object_safety() {
1182 init_test("actor_trait_object_safety");
1183
1184 assert_actor::<Counter>();
1186
1187 crate::test_complete!("actor_trait_object_safety");
1188 }
1189
1190 #[test]
1191 fn actor_handle_creation() {
1192 init_test("actor_handle_creation");
1193
1194 let mut state = RuntimeState::new();
1195 let root = state.create_root_region(Budget::INFINITE);
1196 let cx: Cx = Cx::for_testing();
1197 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1198
1199 let result = scope.spawn_actor(&mut state, &cx, Counter::new(), 32);
1200 assert!(result.is_ok(), "spawn_actor should succeed");
1201
1202 let (handle, stored) = result.unwrap();
1203 state.store_spawned_task(handle.task_id(), stored);
1204
1205 let _tid = handle.task_id();
1207
1208 assert!(!handle.is_finished());
1210
1211 crate::test_complete!("actor_handle_creation");
1212 }
1213
1214 #[test]
1215 fn actor_id_generation_distinct() {
1216 init_test("actor_id_generation_distinct");
1217
1218 let id1 = ActorId::from_task(TaskId::new_for_test(1, 1));
1219 let id2 = ActorId::from_task(TaskId::new_for_test(1, 2));
1220 assert!(id1 != id2, "generation must distinguish actor reuse");
1221
1222 crate::test_complete!("actor_id_generation_distinct");
1223 }
1224
1225 #[test]
1226 fn actor_ref_is_cloneable() {
1227 init_test("actor_ref_is_cloneable");
1228
1229 let mut state = RuntimeState::new();
1230 let root = state.create_root_region(Budget::INFINITE);
1231 let cx: Cx = Cx::for_testing();
1232 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1233
1234 let (handle, stored) = scope
1235 .spawn_actor(&mut state, &cx, Counter::new(), 32)
1236 .unwrap();
1237 state.store_spawned_task(handle.task_id(), stored);
1238
1239 let ref1 = handle.sender();
1241 let ref2 = ref1.clone();
1242
1243 assert_eq!(ref1.actor_id(), handle.actor_id());
1245 assert_eq!(ref2.actor_id(), handle.actor_id());
1246
1247 assert!(ref1.is_alive());
1249 assert!(ref2.is_alive());
1250
1251 assert!(!ref1.is_closed());
1253 assert!(!ref2.is_closed());
1254
1255 crate::test_complete!("actor_ref_is_cloneable");
1256 }
1257
1258 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1261
1262 struct ObservableCounter {
1265 count: u64,
1266 on_stop_count: Arc<AtomicU64>,
1267 started: Arc<AtomicBool>,
1268 stopped: Arc<AtomicBool>,
1269 }
1270
1271 impl ObservableCounter {
1272 fn new(
1273 on_stop_count: Arc<AtomicU64>,
1274 started: Arc<AtomicBool>,
1275 stopped: Arc<AtomicBool>,
1276 ) -> Self {
1277 Self {
1278 count: 0,
1279 on_stop_count,
1280 started,
1281 stopped,
1282 }
1283 }
1284 }
1285
1286 impl Actor for ObservableCounter {
1287 type Message = u64;
1288
1289 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1290 self.started.store(true, Ordering::SeqCst);
1291 Box::pin(async {})
1292 }
1293
1294 fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1295 self.count += msg;
1296 Box::pin(async {})
1297 }
1298
1299 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1300 self.on_stop_count.store(self.count, Ordering::SeqCst);
1301 self.stopped.store(true, Ordering::SeqCst);
1302 Box::pin(async {})
1303 }
1304 }
1305
1306 fn observable_state() -> (Arc<AtomicU64>, Arc<AtomicBool>, Arc<AtomicBool>) {
1307 (
1308 Arc::new(AtomicU64::new(u64::MAX)),
1309 Arc::new(AtomicBool::new(false)),
1310 Arc::new(AtomicBool::new(false)),
1311 )
1312 }
1313
1314 #[test]
1317 fn actor_processes_all_messages() {
1318 init_test("actor_processes_all_messages");
1319
1320 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1321 let region = runtime.state.create_root_region(Budget::INFINITE);
1322 let cx: Cx = Cx::for_testing();
1323 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1324
1325 let (on_stop_count, started, stopped) = observable_state();
1326 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1327
1328 let (handle, stored) = scope
1329 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1330 .unwrap();
1331 let task_id = handle.task_id();
1332 runtime.state.store_spawned_task(task_id, stored);
1333
1334 for _ in 0..5 {
1336 handle.try_send(1).unwrap();
1337 }
1338
1339 drop(handle);
1342
1343 runtime.scheduler.lock().schedule(task_id, 0);
1344 runtime.run_until_quiescent();
1345
1346 assert_eq!(
1347 on_stop_count.load(Ordering::SeqCst),
1348 5,
1349 "all messages processed"
1350 );
1351 assert!(started.load(Ordering::SeqCst), "on_start was called");
1352 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1353
1354 crate::test_complete!("actor_processes_all_messages");
1355 }
1356
1357 #[test]
1361 fn actor_drains_mailbox_on_cancel() {
1362 init_test("actor_drains_mailbox_on_cancel");
1363
1364 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1365 let region = runtime.state.create_root_region(Budget::INFINITE);
1366 let cx: Cx = Cx::for_testing();
1367 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1368
1369 let (on_stop_count, started, stopped) = observable_state();
1370 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1371
1372 let (handle, stored) = scope
1373 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1374 .unwrap();
1375 let task_id = handle.task_id();
1376 runtime.state.store_spawned_task(task_id, stored);
1377
1378 for _ in 0..5 {
1380 handle.try_send(1).unwrap();
1381 }
1382
1383 handle.stop();
1386
1387 runtime.scheduler.lock().schedule(task_id, 0);
1388 runtime.run_until_quiescent();
1389
1390 assert_eq!(
1392 on_stop_count.load(Ordering::SeqCst),
1393 5,
1394 "drain processed all messages"
1395 );
1396 assert!(started.load(Ordering::SeqCst), "on_start was called");
1397 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1398
1399 crate::test_complete!("actor_drains_mailbox_on_cancel");
1400 }
1401
1402 #[test]
1404 fn actor_ref_is_alive_transitions() {
1405 init_test("actor_ref_is_alive_transitions");
1406
1407 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1408 let region = runtime.state.create_root_region(Budget::INFINITE);
1409 let cx: Cx = Cx::for_testing();
1410 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1411
1412 let (on_stop_count, started, stopped) = observable_state();
1413 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1414
1415 let (handle, stored) = scope
1416 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1417 .unwrap();
1418 let task_id = handle.task_id();
1419 runtime.state.store_spawned_task(task_id, stored);
1420
1421 let actor_ref = handle.sender();
1422 assert!(actor_ref.is_alive(), "created actor should be alive");
1423 assert_eq!(actor_ref.actor_id(), handle.actor_id());
1424
1425 handle.stop();
1426 assert!(actor_ref.is_alive(), "stopping actor is still alive");
1427
1428 runtime.scheduler.lock().schedule(task_id, 0);
1429 runtime.run_until_quiescent();
1430
1431 assert!(
1432 handle.is_finished(),
1433 "actor should be finished after stop + run"
1434 );
1435 assert!(!actor_ref.is_alive(), "finished actor is not alive");
1436
1437 assert!(started.load(Ordering::SeqCst), "on_start was called");
1439 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1440 assert_ne!(
1441 on_stop_count.load(Ordering::SeqCst),
1442 u64::MAX,
1443 "on_stop_count updated"
1444 );
1445
1446 crate::test_complete!("actor_ref_is_alive_transitions");
1447 }
1448
1449 #[test]
1453 fn supervised_actor_restarts_on_panic() {
1454 use std::sync::atomic::AtomicU32;
1455
1456 struct PanickingCounter {
1457 count: u64,
1458 panic_on: u64,
1459 final_count: Arc<AtomicU64>,
1460 restart_count: Arc<AtomicU32>,
1461 }
1462
1463 impl Actor for PanickingCounter {
1464 type Message = u64;
1465
1466 fn handle(
1467 &mut self,
1468 _cx: &Cx,
1469 msg: u64,
1470 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1471 assert!(msg != self.panic_on, "threshold exceeded: {msg}");
1472 self.count += msg;
1473 Box::pin(async {})
1474 }
1475
1476 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1477 self.final_count.store(self.count, Ordering::SeqCst);
1478 Box::pin(async {})
1479 }
1480 }
1481
1482 init_test("supervised_actor_restarts_on_panic");
1483
1484 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1485 let region = runtime.state.create_root_region(Budget::INFINITE);
1486 let cx: Cx = Cx::for_testing();
1487 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1488
1489 let final_count = Arc::new(AtomicU64::new(u64::MAX));
1490 let restart_count = Arc::new(AtomicU32::new(0));
1491 let fc = final_count.clone();
1492 let rc = restart_count.clone();
1493
1494 let strategy = crate::supervision::SupervisionStrategy::Restart(
1495 crate::supervision::RestartConfig::new(3, std::time::Duration::from_mins(1)),
1496 );
1497
1498 let (handle, stored) = scope
1499 .spawn_supervised_actor(
1500 &mut runtime.state,
1501 &cx,
1502 move || {
1503 rc.fetch_add(1, Ordering::SeqCst);
1504 PanickingCounter {
1505 count: 0,
1506 panic_on: 999,
1507 final_count: fc.clone(),
1508 restart_count: rc.clone(),
1509 }
1510 },
1511 strategy,
1512 32,
1513 )
1514 .unwrap();
1515 let task_id = handle.task_id();
1516 runtime.state.store_spawned_task(task_id, stored);
1517
1518 handle.try_send(1).unwrap();
1523 handle.try_send(999).unwrap(); handle.try_send(1).unwrap(); drop(handle);
1528
1529 runtime.scheduler.lock().schedule(task_id, 0);
1530 runtime.run_until_quiescent();
1531
1532 assert!(
1536 restart_count.load(Ordering::SeqCst) >= 2,
1537 "factory should have been called at least twice (initial + restart), got {}",
1538 restart_count.load(Ordering::SeqCst)
1539 );
1540
1541 assert_eq!(
1543 final_count.load(Ordering::SeqCst),
1544 1,
1545 "restarted actor should have processed the post-panic message"
1546 );
1547
1548 crate::test_complete!("supervised_actor_restarts_on_panic");
1549 }
1550
1551 #[test]
1553 fn actor_deterministic_replay() {
1554 fn run_scenario(seed: u64) -> u64 {
1555 let config = crate::lab::LabConfig::new(seed);
1556 let mut runtime = crate::lab::LabRuntime::new(config);
1557 let region = runtime.state.create_root_region(Budget::INFINITE);
1558 let cx: Cx = Cx::for_testing();
1559 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1560
1561 let (on_stop_count, started, stopped) = observable_state();
1562 let actor = ObservableCounter::new(on_stop_count.clone(), started, stopped);
1563
1564 let (handle, stored) = scope
1565 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1566 .unwrap();
1567 let task_id = handle.task_id();
1568 runtime.state.store_spawned_task(task_id, stored);
1569
1570 for i in 1..=10 {
1571 handle.try_send(i).unwrap();
1572 }
1573 drop(handle);
1574
1575 runtime.scheduler.lock().schedule(task_id, 0);
1576 runtime.run_until_quiescent();
1577
1578 on_stop_count.load(Ordering::SeqCst)
1579 }
1580
1581 init_test("actor_deterministic_replay");
1582
1583 let result1 = run_scenario(0xDEAD_BEEF);
1585 let result2 = run_scenario(0xDEAD_BEEF);
1586
1587 assert_eq!(
1588 result1, result2,
1589 "deterministic replay: same seed → same result"
1590 );
1591 assert_eq!(result1, 55, "sum of 1..=10");
1592
1593 crate::test_complete!("actor_deterministic_replay");
1594 }
1595
1596 #[test]
1599 fn actor_context_self_reference() {
1600 init_test("actor_context_self_reference");
1601
1602 let mut state = RuntimeState::new();
1603 let root = state.create_root_region(Budget::INFINITE);
1604 let cx: Cx = Cx::for_testing();
1605 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1606
1607 let (handle, stored) = scope
1608 .spawn_actor(&mut state, &cx, Counter::new(), 32)
1609 .unwrap();
1610 state.store_spawned_task(handle.task_id(), stored);
1611
1612 let actor_ref = handle.sender();
1614 let actor_id = handle.actor_id();
1615 let ctx: ActorContext<'_, u64> = ActorContext::new(&cx, actor_ref, actor_id, None);
1616
1617 assert_eq!(ctx.self_actor_id(), actor_id);
1619 assert_eq!(ctx.actor_id(), actor_id);
1620
1621 crate::test_complete!("actor_context_self_reference");
1622 }
1623
1624 #[test]
1625 fn actor_context_child_management() {
1626 init_test("actor_context_child_management");
1627
1628 let cx: Cx = Cx::for_testing();
1629 let (sender, _receiver) = mpsc::channel::<u64>(32);
1630 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1631 let actor_ref = ActorRef {
1632 actor_id,
1633 sender,
1634 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1635 };
1636
1637 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1638
1639 assert!(!ctx.has_children());
1641 assert_eq!(ctx.child_count(), 0);
1642 assert!(ctx.children().is_empty());
1643
1644 let child1 = ActorId::from_task(TaskId::new_for_test(2, 1));
1646 let child2 = ActorId::from_task(TaskId::new_for_test(3, 1));
1647
1648 ctx.register_child(child1);
1649 assert!(ctx.has_children());
1650 assert_eq!(ctx.child_count(), 1);
1651
1652 ctx.register_child(child2);
1653 assert_eq!(ctx.child_count(), 2);
1654
1655 assert!(ctx.unregister_child(child1));
1657 assert_eq!(ctx.child_count(), 1);
1658
1659 assert!(!ctx.unregister_child(child1));
1661
1662 crate::test_complete!("actor_context_child_management");
1663 }
1664
1665 #[test]
1666 fn actor_context_stopping() {
1667 init_test("actor_context_stopping");
1668
1669 let cx: Cx = Cx::for_testing();
1670 let (sender, _receiver) = mpsc::channel::<u64>(32);
1671 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1672 let actor_ref = ActorRef {
1673 actor_id,
1674 sender,
1675 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1676 };
1677
1678 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1679
1680 assert!(!ctx.is_stopping());
1682 assert!(ctx.checkpoint().is_ok());
1683
1684 ctx.stop_self();
1686 assert!(ctx.is_stopping());
1687 assert!(ctx.checkpoint().is_err());
1688 assert!(ctx.is_cancel_requested());
1689
1690 crate::test_complete!("actor_context_stopping");
1691 }
1692
1693 #[test]
1694 fn actor_context_parent_none() {
1695 init_test("actor_context_parent_none");
1696
1697 let cx: Cx = Cx::for_testing();
1698 let (sender, _receiver) = mpsc::channel::<u64>(32);
1699 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1700 let actor_ref = ActorRef {
1701 actor_id,
1702 sender,
1703 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1704 };
1705
1706 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1707
1708 assert!(!ctx.has_parent());
1710 assert!(ctx.parent().is_none());
1711
1712 crate::test_complete!("actor_context_parent_none");
1713 }
1714
1715 #[test]
1716 fn actor_context_cx_delegation() {
1717 init_test("actor_context_cx_delegation");
1718
1719 let cx: Cx = Cx::for_testing();
1720 let (sender, _receiver) = mpsc::channel::<u64>(32);
1721 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1722 let actor_ref = ActorRef {
1723 actor_id,
1724 sender,
1725 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1726 };
1727
1728 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1729
1730 let _budget = ctx.budget();
1732 ctx.trace("test_event");
1733
1734 let _cx_ref = ctx.cx();
1736
1737 crate::test_complete!("actor_context_cx_delegation");
1738 }
1739
1740 #[test]
1741 fn actor_context_debug() {
1742 init_test("actor_context_debug");
1743
1744 let cx: Cx = Cx::for_testing();
1745 let (sender, _receiver) = mpsc::channel::<u64>(32);
1746 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1747 let actor_ref = ActorRef {
1748 actor_id,
1749 sender,
1750 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1751 };
1752
1753 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1754
1755 let debug_str = format!("{ctx:?}");
1757 assert!(debug_str.contains("ActorContext"));
1758 assert!(debug_str.contains("actor_id"));
1759
1760 crate::test_complete!("actor_context_debug");
1761 }
1762
1763 #[test]
1768 fn actor_state_cell_encode_decode_roundtrip() {
1769 init_test("actor_state_cell_encode_decode_roundtrip");
1770
1771 let states = [
1772 ActorState::Created,
1773 ActorState::Running,
1774 ActorState::Stopping,
1775 ActorState::Stopped,
1776 ];
1777
1778 for &state in &states {
1779 let cell = ActorStateCell::new(state);
1780 let loaded = cell.load();
1781 crate::assert_with_log!(loaded == state, "roundtrip", state, loaded);
1782 }
1783
1784 for raw in 4_u8..=10 {
1786 let decoded = ActorStateCell::decode(raw);
1787 let is_stopped = decoded == ActorState::Stopped;
1788 crate::assert_with_log!(is_stopped, "unknown u8 -> Stopped", true, is_stopped);
1789 }
1790
1791 crate::test_complete!("actor_state_cell_encode_decode_roundtrip");
1792 }
1793
1794 #[test]
1797 fn mailbox_config_defaults() {
1798 init_test("mailbox_config_defaults");
1799
1800 let config = MailboxConfig::default();
1801 crate::assert_with_log!(
1802 config.capacity == DEFAULT_MAILBOX_CAPACITY,
1803 "default capacity",
1804 DEFAULT_MAILBOX_CAPACITY,
1805 config.capacity
1806 );
1807 crate::assert_with_log!(
1808 config.backpressure,
1809 "backpressure enabled by default",
1810 true,
1811 config.backpressure
1812 );
1813
1814 let custom = MailboxConfig::with_capacity(8);
1815 crate::assert_with_log!(
1816 custom.capacity == 8,
1817 "custom capacity",
1818 8usize,
1819 custom.capacity
1820 );
1821 crate::assert_with_log!(
1822 custom.backpressure,
1823 "with_capacity enables backpressure",
1824 true,
1825 custom.backpressure
1826 );
1827
1828 crate::test_complete!("mailbox_config_defaults");
1829 }
1830
1831 #[test]
1834 fn actor_try_send_full_mailbox_returns_error() {
1835 init_test("actor_try_send_full_mailbox_returns_error");
1836
1837 let mut state = RuntimeState::new();
1838 let root = state.create_root_region(Budget::INFINITE);
1839 let cx: Cx = Cx::for_testing();
1840 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1841
1842 let (handle, stored) = scope
1844 .spawn_actor(&mut state, &cx, Counter::new(), 2)
1845 .unwrap();
1846 state.store_spawned_task(handle.task_id(), stored);
1847
1848 let ok1 = handle.try_send(1).is_ok();
1850 crate::assert_with_log!(ok1, "first send ok", true, ok1);
1851 let ok2 = handle.try_send(2).is_ok();
1852 crate::assert_with_log!(ok2, "second send ok", true, ok2);
1853
1854 let result = handle.try_send(3);
1856 let is_full = result.is_err();
1857 crate::assert_with_log!(is_full, "third send fails (full)", true, is_full);
1858
1859 crate::test_complete!("actor_try_send_full_mailbox_returns_error");
1860 }
1861
1862 #[test]
1865 fn actor_context_with_parent_supervisor() {
1866 init_test("actor_context_with_parent_supervisor");
1867
1868 let cx: Cx = Cx::for_testing();
1869
1870 let (parent_sender, _parent_receiver) = mpsc::channel::<SupervisorMessage>(8);
1872 let parent_id = ActorId::from_task(TaskId::new_for_test(10, 1));
1873 let parent_ref = ActorRef {
1874 actor_id: parent_id,
1875 sender: parent_sender,
1876 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1877 };
1878
1879 let (child_sender, _child_receiver) = mpsc::channel::<u64>(32);
1881 let child_id = ActorId::from_task(TaskId::new_for_test(20, 1));
1882 let child_ref = ActorRef {
1883 actor_id: child_id,
1884 sender: child_sender,
1885 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1886 };
1887
1888 let ctx = ActorContext::new(&cx, child_ref, child_id, Some(parent_ref));
1889
1890 let has_parent = ctx.has_parent();
1891 crate::assert_with_log!(has_parent, "has parent", true, has_parent);
1892
1893 let parent = ctx.parent().expect("parent should be Some");
1894 let parent_id_matches = parent.actor_id() == parent_id;
1895 crate::assert_with_log!(
1896 parent_id_matches,
1897 "parent id matches",
1898 true,
1899 parent_id_matches
1900 );
1901
1902 crate::test_complete!("actor_context_with_parent_supervisor");
1903 }
1904
1905 #[test]
1908 fn actor_id_debug_format() {
1909 let id = ActorId::from_task(TaskId::new_for_test(5, 3));
1910 let dbg = format!("{id:?}");
1911 assert!(dbg.contains("ActorId"), "{dbg}");
1912 }
1913
1914 #[test]
1915 fn actor_id_display_delegates_to_task_id() {
1916 let tid = TaskId::new_for_test(7, 2);
1917 let aid = ActorId::from_task(tid);
1918 assert_eq!(format!("{aid}"), format!("{tid}"));
1919 }
1920
1921 #[test]
1922 fn actor_id_from_task_roundtrip() {
1923 let tid = TaskId::new_for_test(3, 1);
1924 let aid = ActorId::from_task(tid);
1925 assert_eq!(aid.task_id(), tid);
1926 }
1927
1928 #[test]
1929 fn actor_id_copy_clone() {
1930 let id = ActorId::from_task(TaskId::new_for_test(1, 1));
1931 let copied = id; let cloned = id;
1933 assert_eq!(id, copied);
1934 assert_eq!(id, cloned);
1935 }
1936
1937 #[test]
1938 fn actor_id_hash_consistency() {
1939 use crate::util::DetHasher;
1940 use std::hash::{Hash, Hasher};
1941
1942 let id1 = ActorId::from_task(TaskId::new_for_test(4, 2));
1943 let id2 = ActorId::from_task(TaskId::new_for_test(4, 2));
1944 assert_eq!(id1, id2);
1945
1946 let mut h1 = DetHasher::default();
1947 let mut h2 = DetHasher::default();
1948 id1.hash(&mut h1);
1949 id2.hash(&mut h2);
1950 assert_eq!(h1.finish(), h2.finish(), "equal IDs must hash equal");
1951 }
1952
1953 #[test]
1954 fn actor_state_debug_all_variants() {
1955 for (state, expected) in [
1956 (ActorState::Created, "Created"),
1957 (ActorState::Running, "Running"),
1958 (ActorState::Stopping, "Stopping"),
1959 (ActorState::Stopped, "Stopped"),
1960 ] {
1961 let dbg = format!("{state:?}");
1962 assert_eq!(dbg, expected, "ActorState::{expected}");
1963 }
1964 }
1965
1966 #[test]
1967 fn actor_state_clone_copy_eq() {
1968 let s = ActorState::Running;
1969 let copied = s;
1970 let cloned = s;
1971 assert_eq!(s, copied);
1972 assert_eq!(s, cloned);
1973 }
1974
1975 #[test]
1976 fn actor_state_exhaustive_inequality() {
1977 let all = [
1978 ActorState::Created,
1979 ActorState::Running,
1980 ActorState::Stopping,
1981 ActorState::Stopped,
1982 ];
1983 for (i, a) in all.iter().enumerate() {
1984 for (j, b) in all.iter().enumerate() {
1985 if i == j {
1986 assert_eq!(a, b);
1987 } else {
1988 assert_ne!(a, b);
1989 }
1990 }
1991 }
1992 }
1993
1994 #[test]
1995 fn actor_state_cell_sequential_transitions() {
1996 let cell = ActorStateCell::new(ActorState::Created);
1997 assert_eq!(cell.load(), ActorState::Created);
1998
1999 cell.store(ActorState::Running);
2000 assert_eq!(cell.load(), ActorState::Running);
2001
2002 cell.store(ActorState::Stopping);
2003 assert_eq!(cell.load(), ActorState::Stopping);
2004
2005 cell.store(ActorState::Stopped);
2006 assert_eq!(cell.load(), ActorState::Stopped);
2007 }
2008
2009 #[test]
2010 fn supervisor_message_debug_child_failed() {
2011 let msg = SupervisorMessage::ChildFailed {
2012 child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
2013 reason: "panicked".to_string(),
2014 };
2015 let dbg = format!("{msg:?}");
2016 assert!(dbg.contains("ChildFailed"), "{dbg}");
2017 assert!(dbg.contains("panicked"), "{dbg}");
2018 }
2019
2020 #[test]
2021 fn supervisor_message_debug_child_stopped() {
2022 let msg = SupervisorMessage::ChildStopped {
2023 child_id: ActorId::from_task(TaskId::new_for_test(2, 1)),
2024 };
2025 let dbg = format!("{msg:?}");
2026 assert!(dbg.contains("ChildStopped"), "{dbg}");
2027 }
2028
2029 #[test]
2030 fn supervisor_message_clone() {
2031 let msg = SupervisorMessage::ChildFailed {
2032 child_id: ActorId::from_task(TaskId::new_for_test(1, 1)),
2033 reason: "boom".to_string(),
2034 };
2035 let cloned = msg.clone();
2036 let (a, b) = (format!("{msg:?}"), format!("{cloned:?}"));
2037 assert_eq!(a, b);
2038 }
2039
2040 #[test]
2041 fn supervised_outcome_debug_all_variants() {
2042 let variants: Vec<SupervisedOutcome> = vec![
2043 SupervisedOutcome::Stopped,
2044 SupervisedOutcome::RestartBudgetExhausted { total_restarts: 5 },
2045 SupervisedOutcome::Escalated,
2046 ];
2047 for v in &variants {
2048 let dbg = format!("{v:?}");
2049 assert!(!dbg.is_empty());
2050 }
2051 assert!(format!("{:?}", variants[0]).contains("Stopped"));
2052 assert!(format!("{:?}", variants[1]).contains('5'));
2053 assert!(format!("{:?}", variants[2]).contains("Escalated"));
2054 }
2055
2056 #[test]
2057 fn mailbox_config_debug_clone_copy() {
2058 let cfg = MailboxConfig::default();
2059 let dbg = format!("{cfg:?}");
2060 assert!(dbg.contains("MailboxConfig"), "{dbg}");
2061 assert!(dbg.contains("64"), "{dbg}");
2062
2063 let copied = cfg;
2064 let cloned = cfg;
2065 assert_eq!(copied.capacity, cfg.capacity);
2066 assert_eq!(cloned.backpressure, cfg.backpressure);
2067 }
2068
2069 #[test]
2070 fn mailbox_config_zero_capacity() {
2071 let cfg = MailboxConfig::with_capacity(0);
2072 assert_eq!(cfg.capacity, 0);
2073 assert!(cfg.backpressure);
2074 }
2075
2076 #[test]
2077 fn mailbox_config_max_capacity() {
2078 let cfg = MailboxConfig::with_capacity(usize::MAX);
2079 assert_eq!(cfg.capacity, usize::MAX);
2080 }
2081
2082 #[test]
2083 fn default_mailbox_capacity_is_64() {
2084 assert_eq!(DEFAULT_MAILBOX_CAPACITY, 64);
2085 }
2086
2087 #[test]
2088 fn actor_context_duplicate_child_registration() {
2089 let cx: Cx = Cx::for_testing();
2090 let (sender, _receiver) = mpsc::channel::<u64>(32);
2091 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2092 let actor_ref = ActorRef {
2093 actor_id,
2094 sender,
2095 state: Arc::new(ActorStateCell::new(ActorState::Running)),
2096 };
2097
2098 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2099 let child = ActorId::from_task(TaskId::new_for_test(2, 1));
2100
2101 ctx.register_child(child);
2102 ctx.register_child(child); assert_eq!(ctx.child_count(), 2, "register_child does not dedup");
2104
2105 assert!(ctx.unregister_child(child));
2107 assert_eq!(ctx.child_count(), 1, "one copy remains");
2108 assert!(ctx.unregister_child(child));
2109 assert_eq!(ctx.child_count(), 0);
2110 assert!(!ctx.unregister_child(child), "nothing left to remove");
2111 }
2112
2113 #[test]
2114 fn actor_context_stop_self_is_idempotent() {
2115 let cx: Cx = Cx::for_testing();
2116 let (sender, _receiver) = mpsc::channel::<u64>(32);
2117 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2118 let actor_ref = ActorRef {
2119 actor_id,
2120 sender,
2121 state: Arc::new(ActorStateCell::new(ActorState::Running)),
2122 };
2123
2124 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2125 ctx.stop_self();
2126 assert!(ctx.is_stopping());
2127 ctx.stop_self(); assert!(ctx.is_stopping());
2129 }
2130
2131 #[test]
2132 fn actor_context_self_ref_returns_working_ref() {
2133 let cx: Cx = Cx::for_testing();
2134 let (sender, _receiver) = mpsc::channel::<u64>(32);
2135 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2136 let actor_ref = ActorRef {
2137 actor_id,
2138 sender,
2139 state: Arc::new(ActorStateCell::new(ActorState::Running)),
2140 };
2141
2142 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2143 let self_ref = ctx.self_ref();
2144 assert_eq!(self_ref.actor_id(), actor_id);
2145 assert!(self_ref.is_alive());
2146 }
2147
2148 #[test]
2149 fn actor_context_deadline_reflects_budget() {
2150 let cx: Cx = Cx::for_testing();
2151 let (sender, _receiver) = mpsc::channel::<u64>(32);
2152 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
2153 let actor_ref = ActorRef {
2154 actor_id,
2155 sender,
2156 state: Arc::new(ActorStateCell::new(ActorState::Running)),
2157 };
2158
2159 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
2160 assert!(ctx.deadline().is_none());
2162 }
2163
2164 #[test]
2165 fn actor_handle_debug() {
2166 let mut state = RuntimeState::new();
2167 let root = state.create_root_region(Budget::INFINITE);
2168 let cx: Cx = Cx::for_testing();
2169 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2170
2171 let (handle, stored) = scope
2172 .spawn_actor(&mut state, &cx, Counter::new(), 32)
2173 .unwrap();
2174 state.store_spawned_task(handle.task_id(), stored);
2175
2176 let dbg = format!("{handle:?}");
2177 assert!(dbg.contains("ActorHandle"), "{dbg}");
2178 }
2179
2180 #[test]
2181 fn actor_handle_second_join_fails_closed() {
2182 init_test("actor_handle_second_join_fails_closed");
2183
2184 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2185 let region = runtime.state.create_root_region(Budget::INFINITE);
2186 let cx = Cx::for_testing();
2187 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2188
2189 let (mut handle, stored) = scope
2190 .spawn_actor(&mut runtime.state, &cx, Counter::new(), 32)
2191 .unwrap();
2192 let task_id = handle.task_id();
2193 runtime.state.store_spawned_task(task_id, stored);
2194
2195 handle.stop();
2196 runtime.scheduler.lock().schedule(task_id, 0);
2197 runtime.run_until_quiescent();
2198 assert!(handle.is_finished(), "stopped actor should report finished");
2199
2200 let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
2201 assert_eq!(final_state.count, 0, "join should return final actor state");
2202
2203 let second = futures_lite::future::block_on(handle.join(&cx));
2204 assert!(
2205 matches!(second, Err(JoinError::PolledAfterCompletion)),
2206 "second join must fail closed, got {second:?}"
2207 );
2208
2209 crate::test_complete!("actor_handle_second_join_fails_closed");
2210 }
2211
2212 #[test]
2213 fn actor_ref_debug() {
2214 let mut state = RuntimeState::new();
2215 let root = state.create_root_region(Budget::INFINITE);
2216 let cx: Cx = Cx::for_testing();
2217 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2218
2219 let (handle, stored) = scope
2220 .spawn_actor(&mut state, &cx, Counter::new(), 32)
2221 .unwrap();
2222 state.store_spawned_task(handle.task_id(), stored);
2223
2224 let actor_ref = handle.sender();
2225 let dbg = format!("{actor_ref:?}");
2226 assert!(dbg.contains("ActorRef"), "{dbg}");
2227 }
2228
2229 #[test]
2230 fn actor_state_cell_debug() {
2231 let cell = ActorStateCell::new(ActorState::Running);
2232 let dbg = format!("{cell:?}");
2233 assert!(dbg.contains("ActorStateCell"), "{dbg}");
2234 }
2235
2236 #[test]
2237 fn actor_id_clone_copy_eq_hash() {
2238 use std::collections::HashSet;
2239
2240 let id = ActorId::from_task(TaskId::new_for_test(1, 0));
2241 let dbg = format!("{id:?}");
2242 assert!(dbg.contains("ActorId"));
2243
2244 let id2 = id;
2245 assert_eq!(id, id2);
2246
2247 let id3 = id;
2249 assert_eq!(id, id3);
2250
2251 let mut set = HashSet::new();
2253 set.insert(id);
2254 set.insert(ActorId::from_task(TaskId::new_for_test(2, 0)));
2255 assert_eq!(set.len(), 2);
2256 }
2257
2258 #[test]
2259 fn actor_state_debug_clone_copy_eq() {
2260 let s = ActorState::Running;
2261 let dbg = format!("{s:?}");
2262 assert!(dbg.contains("Running"));
2263
2264 let s2 = s;
2265 assert_eq!(s, s2);
2266
2267 let s3 = s;
2268 assert_eq!(s, s3);
2269
2270 assert_ne!(ActorState::Created, ActorState::Stopped);
2271 }
2272
2273 #[test]
2274 fn mailbox_config_debug_clone_copy_default() {
2275 let c = MailboxConfig::default();
2276 let dbg = format!("{c:?}");
2277 assert!(dbg.contains("MailboxConfig"));
2278
2279 let c2 = c;
2280 assert_eq!(c2.capacity, c.capacity);
2281 assert_eq!(c2.backpressure, c.backpressure);
2282
2283 let c3 = c;
2285 assert_eq!(c3.capacity, c.capacity);
2286 }
2287}