1use std::future::Future;
43use std::pin::Pin;
44use std::sync::atomic::{AtomicU8, Ordering};
45use std::sync::Arc;
46
47use crate::channel::mpsc;
48use crate::channel::mpsc::SendError;
49use crate::cx::Cx;
50use crate::runtime::{JoinError, SpawnError};
51use crate::types::{CxInner, Outcome, RegionId, TaskId, Time};
52
53#[derive(Clone, Copy, PartialEq, Eq, Hash)]
59pub struct ActorId(TaskId);
60
61impl ActorId {
62 #[must_use]
64 pub const fn from_task(task_id: TaskId) -> Self {
65 Self(task_id)
66 }
67
68 #[must_use]
70 pub const fn task_id(self) -> TaskId {
71 self.0
72 }
73}
74
75impl std::fmt::Debug for ActorId {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_tuple("ActorId").field(&self.0).finish()
78 }
79}
80
81impl std::fmt::Display for ActorId {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(f, "{}", self.0)
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ActorState {
92 Created,
94 Running,
96 Stopping,
98 Stopped,
100}
101
102#[derive(Debug)]
103struct ActorStateCell {
104 state: AtomicU8,
105}
106
107impl ActorStateCell {
108 fn new(state: ActorState) -> Self {
109 Self {
110 state: AtomicU8::new(Self::encode(state)),
111 }
112 }
113
114 fn load(&self) -> ActorState {
115 Self::decode(self.state.load(Ordering::Acquire))
116 }
117
118 fn store(&self, state: ActorState) {
119 self.state.store(Self::encode(state), Ordering::Release);
120 }
121
122 const fn encode(state: ActorState) -> u8 {
123 match state {
124 ActorState::Created => 0,
125 ActorState::Running => 1,
126 ActorState::Stopping => 2,
127 ActorState::Stopped => 3,
128 }
129 }
130
131 const fn decode(value: u8) -> ActorState {
132 match value {
133 0 => ActorState::Created,
134 1 => ActorState::Running,
135 2 => ActorState::Stopping,
136 _ => ActorState::Stopped,
137 }
138 }
139}
140
141struct ActorCell<M> {
146 mailbox: mpsc::Receiver<M>,
147 state: Arc<ActorStateCell>,
148}
149
150pub trait Actor: Send + 'static {
164 type Message: Send + 'static;
166
167 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
172 Box::pin(async {})
173 }
174
175 fn handle(
180 &mut self,
181 cx: &Cx,
182 msg: Self::Message,
183 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
184
185 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
189 Box::pin(async {})
190 }
191}
192
193#[derive(Debug)]
202pub struct ActorHandle<A: Actor> {
203 actor_id: ActorId,
204 sender: mpsc::Sender<A::Message>,
205 state: Arc<ActorStateCell>,
206 task_id: TaskId,
207 receiver: crate::channel::oneshot::Receiver<Result<A, JoinError>>,
208 inner: std::sync::Weak<std::sync::RwLock<CxInner>>,
209}
210
211impl<A: Actor> ActorHandle<A> {
212 pub async fn send(&self, cx: &Cx, msg: A::Message) -> Result<(), SendError<A::Message>> {
216 self.sender.send(cx, msg).await
217 }
218
219 pub fn try_send(&self, msg: A::Message) -> Result<(), SendError<A::Message>> {
224 self.sender.try_send(msg)
225 }
226
227 #[must_use]
229 pub fn sender(&self) -> ActorRef<A::Message> {
230 ActorRef {
231 actor_id: self.actor_id,
232 sender: self.sender.clone(),
233 state: Arc::clone(&self.state),
234 }
235 }
236
237 #[must_use]
239 pub const fn actor_id(&self) -> ActorId {
240 self.actor_id
241 }
242
243 #[must_use]
245 pub fn task_id(&self) -> crate::types::TaskId {
246 self.task_id
247 }
248
249 pub fn stop(&self) {
259 self.state.store(ActorState::Stopping);
260 if let Some(inner) = self.inner.upgrade() {
261 if let Ok(mut guard) = inner.write() {
262 guard.cancel_requested = true;
263 }
264 }
265 }
266
267 #[must_use]
269 pub fn is_finished(&self) -> bool {
270 self.receiver.is_ready()
271 }
272
273 pub async fn join(&self, cx: &Cx) -> Result<A, JoinError> {
278 self.receiver.recv(cx).await.unwrap_or_else(|_| {
279 let reason = cx
284 .cancel_reason()
285 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
286 Err(JoinError::Cancelled(reason))
287 })
288 }
289
290 pub fn abort(&self) {
300 self.state.store(ActorState::Stopping);
301 if let Some(inner) = self.inner.upgrade() {
302 if let Ok(mut guard) = inner.write() {
303 guard.cancel_requested = true;
304 }
305 }
306 }
307}
308
309#[derive(Debug)]
314pub struct ActorRef<M> {
315 actor_id: ActorId,
316 sender: mpsc::Sender<M>,
317 state: Arc<ActorStateCell>,
318}
319
320impl<M> Clone for ActorRef<M> {
324 fn clone(&self) -> Self {
325 Self {
326 actor_id: self.actor_id,
327 sender: self.sender.clone(),
328 state: Arc::clone(&self.state),
329 }
330 }
331}
332
333impl<M: Send + 'static> ActorRef<M> {
334 pub async fn send(&self, cx: &Cx, msg: M) -> Result<(), SendError<M>> {
336 self.sender.send(cx, msg).await
337 }
338
339 #[must_use]
341 pub fn reserve<'a>(&'a self, cx: &'a Cx) -> mpsc::Reserve<'a, M> {
342 self.sender.reserve(cx)
343 }
344
345 pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
347 self.sender.try_send(msg)
348 }
349
350 #[must_use]
352 pub fn is_closed(&self) -> bool {
353 self.sender.is_closed()
354 }
355
356 #[must_use]
360 pub fn is_alive(&self) -> bool {
361 self.state.load() != ActorState::Stopped
362 }
363
364 #[must_use]
366 pub const fn actor_id(&self) -> ActorId {
367 self.actor_id
368 }
369}
370
371#[derive(Debug, Clone, Copy)]
377pub struct MailboxConfig {
378 pub capacity: usize,
380 pub backpressure: bool,
382}
383
384impl Default for MailboxConfig {
385 fn default() -> Self {
386 Self {
387 capacity: DEFAULT_MAILBOX_CAPACITY,
388 backpressure: true,
389 }
390 }
391}
392
393impl MailboxConfig {
394 #[must_use]
396 pub const fn with_capacity(capacity: usize) -> Self {
397 Self {
398 capacity,
399 backpressure: true,
400 }
401 }
402}
403
404#[derive(Debug, Clone)]
406pub enum SupervisorMessage {
407 ChildFailed {
409 child_id: ActorId,
411 reason: String,
413 },
414 ChildStopped {
416 child_id: ActorId,
418 },
419}
420
421pub struct ActorContext<'a, M: Send + 'static> {
446 cx: &'a Cx,
448 self_ref: ActorRef<M>,
450 actor_id: ActorId,
452 parent: Option<ActorRef<SupervisorMessage>>,
454 children: Vec<ActorId>,
456 stopping: bool,
458}
459
460#[allow(clippy::elidable_lifetime_names)]
461impl<'a, M: Send + 'static> ActorContext<'a, M> {
462 #[must_use]
466 pub fn new(
467 cx: &'a Cx,
468 self_ref: ActorRef<M>,
469 actor_id: ActorId,
470 parent: Option<ActorRef<SupervisorMessage>>,
471 ) -> Self {
472 Self {
473 cx,
474 self_ref,
475 actor_id,
476 parent,
477 children: Vec::new(),
478 stopping: false,
479 }
480 }
481
482 #[must_use]
487 pub const fn self_actor_id(&self) -> ActorId {
488 self.actor_id
489 }
490
491 #[must_use]
493 pub const fn actor_id(&self) -> ActorId {
494 self.actor_id
495 }
496
497 pub fn register_child(&mut self, child_id: ActorId) {
505 self.children.push(child_id);
506 }
507
508 pub fn unregister_child(&mut self, child_id: ActorId) -> bool {
512 if let Some(pos) = self.children.iter().position(|&id| id == child_id) {
513 self.children.swap_remove(pos);
514 true
515 } else {
516 false
517 }
518 }
519
520 #[must_use]
522 pub fn children(&self) -> &[ActorId] {
523 &self.children
524 }
525
526 #[must_use]
528 pub fn has_children(&self) -> bool {
529 !self.children.is_empty()
530 }
531
532 #[must_use]
534 pub fn child_count(&self) -> usize {
535 self.children.len()
536 }
537
538 pub fn stop_self(&mut self) {
547 self.stopping = true;
548 }
549
550 #[must_use]
552 pub fn is_stopping(&self) -> bool {
553 self.stopping
554 }
555
556 #[must_use]
564 pub fn parent(&self) -> Option<&ActorRef<SupervisorMessage>> {
565 self.parent.as_ref()
566 }
567
568 #[must_use]
570 pub fn has_parent(&self) -> bool {
571 self.parent.is_some()
572 }
573
574 pub async fn escalate(&self, reason: String) {
579 if let Some(parent) = &self.parent {
580 let msg = SupervisorMessage::ChildFailed {
581 child_id: self.actor_id,
582 reason,
583 };
584 let _ = parent.send(self.cx, msg).await;
586 }
587 }
588
589 #[allow(clippy::result_large_err)]
598 pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
599 if self.stopping {
600 let reason = crate::types::CancelReason::user("actor stopping")
601 .with_region(self.cx.region_id())
602 .with_task(self.cx.task_id());
603 return Err(crate::error::Error::cancelled(&reason));
604 }
605 self.cx.checkpoint()
606 }
607
608 #[must_use]
612 pub fn is_cancel_requested(&self) -> bool {
613 self.stopping || self.cx.is_cancel_requested()
614 }
615
616 #[must_use]
618 pub fn budget(&self) -> crate::types::Budget {
619 self.cx.budget()
620 }
621
622 #[must_use]
624 pub fn deadline(&self) -> Option<Time> {
625 self.cx.budget().deadline
626 }
627
628 pub fn trace(&self, event: &str) {
630 self.cx.trace(event);
631 }
632
633 #[must_use]
638 pub fn self_ref(&self) -> ActorRef<M> {
639 self.self_ref.clone()
640 }
641
642 #[must_use]
644 pub const fn cx(&self) -> &Cx {
645 self.cx
646 }
647}
648
649impl<M: Send + 'static> std::ops::Deref for ActorContext<'_, M> {
650 type Target = Cx;
651
652 fn deref(&self) -> &Self::Target {
653 self.cx
654 }
655}
656
657impl<M: Send + 'static> std::fmt::Debug for ActorContext<'_, M> {
658 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
659 f.debug_struct("ActorContext")
660 .field("actor_id", &self.actor_id)
661 .field("children", &self.children.len())
662 .field("stopping", &self.stopping)
663 .field("has_parent", &self.parent.is_some())
664 .finish()
665 }
666}
667
668pub const DEFAULT_MAILBOX_CAPACITY: usize = 64;
670
671async fn run_actor_loop<A: Actor>(mut actor: A, cx: Cx, cell: &ActorCell<A::Message>) -> A {
680 use crate::tracing_compat::debug;
681
682 cell.state.store(ActorState::Running);
683
684 cx.trace("actor::on_start");
686 actor.on_start(&cx).await;
687
688 loop {
690 if cx.is_cancel_requested() {
692 cx.trace("actor::cancel_requested");
693 break;
694 }
695
696 match cell.mailbox.recv(&cx).await {
697 Ok(msg) => {
698 actor.handle(&cx, msg).await;
699 }
700 Err(crate::channel::mpsc::RecvError::Disconnected) => {
701 cx.trace("actor::mailbox_disconnected");
703 break;
704 }
705 Err(crate::channel::mpsc::RecvError::Cancelled) => {
706 cx.trace("actor::recv_cancelled");
708 break;
709 }
710 Err(crate::channel::mpsc::RecvError::Empty) => {
711 break;
713 }
714 }
715 }
716
717 cell.state.store(ActorState::Stopping);
718
719 let drain_limit = cell.mailbox.capacity() as u64;
726 let mut drained: u64 = 0;
727 while let Ok(msg) = cell.mailbox.try_recv() {
728 actor.handle(&cx, msg).await;
729 drained += 1;
730 if drained >= drain_limit {
731 break;
732 }
733 }
734 if drained > 0 {
735 debug!(drained = drained, "actor::mailbox_drained");
736 cx.trace("actor::mailbox_drained");
737 }
738
739 cx.trace("actor::on_stop");
741 actor.on_stop(&cx).await;
742
743 actor
744}
745
746impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
748 pub fn spawn_actor<A: Actor>(
765 &self,
766 state: &mut crate::runtime::state::RuntimeState,
767 cx: &Cx,
768 actor: A,
769 mailbox_capacity: usize,
770 ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError> {
771 use crate::channel::oneshot;
772 use crate::cx::scope::CatchUnwind;
773 use crate::runtime::stored_task::StoredTask;
774 use crate::tracing_compat::{debug, debug_span};
775
776 let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
778
779 let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
781
782 let task_id = self.create_task_record(state)?;
784 let actor_id = ActorId::from_task(task_id);
785 let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
786
787 let _span = debug_span!(
788 "actor_spawn",
789 task_id = ?task_id,
790 region_id = ?self.region_id(),
791 mailbox_capacity = mailbox_capacity,
792 )
793 .entered();
794 debug!(
795 task_id = ?task_id,
796 region_id = ?self.region_id(),
797 mailbox_capacity = mailbox_capacity,
798 "actor spawned"
799 );
800
801 let child_observability = cx.child_observability(self.region_id(), task_id);
803 let child_entropy = cx.child_entropy(task_id);
804 let io_driver = state.io_driver_handle();
805 let child_cx = Cx::new_with_observability(
806 self.region_id(),
807 task_id,
808 self.budget(),
809 Some(child_observability),
810 io_driver,
811 Some(child_entropy),
812 )
813 .with_blocking_pool_handle(cx.blocking_pool_handle());
814
815 if let Some(record) = state.task_mut(task_id) {
817 record.set_cx_inner(child_cx.inner.clone());
818 record.set_cx(child_cx.clone());
819 }
820
821 let cx_for_send = child_cx.clone();
822 let inner_weak = Arc::downgrade(&child_cx.inner);
823 let state_for_task = Arc::clone(&actor_state);
824
825 let cell = ActorCell {
826 mailbox: msg_rx,
827 state: Arc::clone(&actor_state),
828 };
829
830 let wrapped = async move {
832 let result = CatchUnwind(Box::pin(run_actor_loop(actor, child_cx, &cell))).await;
833 match result {
834 Ok(actor_final) => {
835 let _ = result_tx.send(&cx_for_send, Ok(actor_final));
836 }
837 Err(payload) => {
838 let msg = crate::cx::scope::payload_to_string(&payload);
839 let _ = result_tx.send(
840 &cx_for_send,
841 Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
842 );
843 }
844 }
845 state_for_task.store(ActorState::Stopped);
846 Outcome::Ok(())
847 };
848
849 let stored = StoredTask::new_with_id(wrapped, task_id);
850
851 let handle = ActorHandle {
852 actor_id,
853 sender: msg_tx,
854 state: actor_state,
855 task_id,
856 receiver: result_rx,
857 inner: inner_weak,
858 };
859
860 Ok((handle, stored))
861 }
862
863 pub fn spawn_supervised_actor<A, F>(
878 &self,
879 state: &mut crate::runtime::state::RuntimeState,
880 cx: &Cx,
881 mut factory: F,
882 strategy: crate::supervision::SupervisionStrategy,
883 mailbox_capacity: usize,
884 ) -> Result<(ActorHandle<A>, crate::runtime::stored_task::StoredTask), SpawnError>
885 where
886 A: Actor,
887 F: FnMut() -> A + Send + 'static,
888 {
889 use crate::channel::oneshot;
890 use crate::runtime::stored_task::StoredTask;
891 use crate::supervision::Supervisor;
892 use crate::tracing_compat::{debug, debug_span};
893
894 let actor = factory();
895 let (msg_tx, msg_rx) = mpsc::channel::<A::Message>(mailbox_capacity);
896 let (result_tx, result_rx) = oneshot::channel::<Result<A, JoinError>>();
897 let task_id = self.create_task_record(state)?;
898 let actor_id = ActorId::from_task(task_id);
899 let actor_state = Arc::new(ActorStateCell::new(ActorState::Created));
900
901 let _span = debug_span!(
902 "supervised_actor_spawn",
903 task_id = ?task_id,
904 region_id = ?self.region_id(),
905 mailbox_capacity = mailbox_capacity,
906 )
907 .entered();
908 debug!(
909 task_id = ?task_id,
910 region_id = ?self.region_id(),
911 "supervised actor spawned"
912 );
913
914 let child_observability = cx.child_observability(self.region_id(), task_id);
915 let child_entropy = cx.child_entropy(task_id);
916 let io_driver = state.io_driver_handle();
917 let child_cx = Cx::new_with_observability(
918 self.region_id(),
919 task_id,
920 self.budget(),
921 Some(child_observability),
922 io_driver,
923 Some(child_entropy),
924 );
925
926 if let Some(record) = state.task_mut(task_id) {
927 record.set_cx_inner(child_cx.inner.clone());
928 record.set_cx(child_cx.clone());
929 }
930
931 let cx_for_send = child_cx.clone();
932 let inner_weak = Arc::downgrade(&child_cx.inner);
933 let region_id = self.region_id();
934 let state_for_task = Arc::clone(&actor_state);
935
936 let cell = ActorCell {
937 mailbox: msg_rx,
938 state: Arc::clone(&actor_state),
939 };
940
941 let wrapped = async move {
942 let result = run_supervised_loop(
943 actor,
944 &mut factory,
945 child_cx,
946 &cell,
947 Supervisor::new(strategy),
948 task_id,
949 region_id,
950 )
951 .await;
952 let _ = result_tx.send(&cx_for_send, result);
953 state_for_task.store(ActorState::Stopped);
954 Outcome::Ok(())
955 };
956
957 let stored = StoredTask::new_with_id(wrapped, task_id);
958
959 let handle = ActorHandle {
960 actor_id,
961 sender: msg_tx,
962 state: actor_state,
963 task_id,
964 receiver: result_rx,
965 inner: inner_weak,
966 };
967
968 Ok((handle, stored))
969 }
970}
971
972#[derive(Debug)]
974pub enum SupervisedOutcome {
975 Stopped,
977 RestartBudgetExhausted {
979 total_restarts: u32,
981 },
982 Escalated,
984}
985
986async fn run_supervised_loop<A, F>(
991 initial_actor: A,
992 factory: &mut F,
993 cx: Cx,
994 cell: &ActorCell<A::Message>,
995 mut supervisor: crate::supervision::Supervisor,
996 task_id: TaskId,
997 region_id: RegionId,
998) -> Result<A, JoinError>
999where
1000 A: Actor,
1001 F: FnMut() -> A,
1002{
1003 use crate::cx::scope::CatchUnwind;
1004 use crate::supervision::SupervisionDecision;
1005 use crate::types::Outcome;
1006
1007 let mut current_actor = initial_actor;
1008
1009 loop {
1010 let result = CatchUnwind(Box::pin(run_actor_loop(current_actor, cx.clone(), cell))).await;
1012
1013 match result {
1014 Ok(actor_final) => {
1015 return Ok(actor_final);
1017 }
1018 Err(payload) => {
1019 let msg = crate::cx::scope::payload_to_string(&payload);
1024 cx.trace("supervised_actor::failure");
1025
1026 let outcome = Outcome::err(());
1027 let now = cx.timer_driver().map_or(0, |td| td.now().as_nanos());
1028 let decision = supervisor.on_failure(task_id, region_id, None, &outcome, now);
1029
1030 match decision {
1031 SupervisionDecision::Restart { .. } => {
1032 cx.trace("supervised_actor::restart");
1033 current_actor = factory();
1034 }
1035 SupervisionDecision::Stop { .. } => {
1036 cx.trace("supervised_actor::stopped");
1037 return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1038 }
1039 SupervisionDecision::Escalate { .. } => {
1040 cx.trace("supervised_actor::escalated");
1041 return Err(JoinError::Panicked(crate::types::PanicPayload::new(msg)));
1042 }
1043 }
1044 }
1045 }
1046 }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 use super::*;
1052 use crate::runtime::state::RuntimeState;
1053 use crate::types::policy::FailFast;
1054 use crate::types::Budget;
1055
1056 fn init_test(name: &str) {
1057 crate::test_utils::init_test_logging();
1058 crate::test_phase!(name);
1059 }
1060
1061 struct Counter {
1063 count: u64,
1064 started: bool,
1065 stopped: bool,
1066 }
1067
1068 impl Counter {
1069 fn new() -> Self {
1070 Self {
1071 count: 0,
1072 started: false,
1073 stopped: false,
1074 }
1075 }
1076 }
1077
1078 impl Actor for Counter {
1079 type Message = u64;
1080
1081 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1082 self.started = true;
1083 Box::pin(async {})
1084 }
1085
1086 fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1087 self.count += msg;
1088 Box::pin(async {})
1089 }
1090
1091 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1092 self.stopped = true;
1093 Box::pin(async {})
1094 }
1095 }
1096
1097 fn assert_actor<A: Actor>() {}
1098
1099 #[test]
1100 fn actor_trait_object_safety() {
1101 init_test("actor_trait_object_safety");
1102
1103 assert_actor::<Counter>();
1105
1106 crate::test_complete!("actor_trait_object_safety");
1107 }
1108
1109 #[test]
1110 fn actor_handle_creation() {
1111 init_test("actor_handle_creation");
1112
1113 let mut state = RuntimeState::new();
1114 let root = state.create_root_region(Budget::INFINITE);
1115 let cx: Cx = Cx::for_testing();
1116 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1117
1118 let result = scope.spawn_actor(&mut state, &cx, Counter::new(), 32);
1119 assert!(result.is_ok(), "spawn_actor should succeed");
1120
1121 let (handle, stored) = result.unwrap();
1122 state.store_spawned_task(handle.task_id(), stored);
1123
1124 let _tid = handle.task_id();
1126
1127 assert!(!handle.is_finished());
1129
1130 crate::test_complete!("actor_handle_creation");
1131 }
1132
1133 #[test]
1134 fn actor_id_generation_distinct() {
1135 init_test("actor_id_generation_distinct");
1136
1137 let id1 = ActorId::from_task(TaskId::new_for_test(1, 1));
1138 let id2 = ActorId::from_task(TaskId::new_for_test(1, 2));
1139 assert!(id1 != id2, "generation must distinguish actor reuse");
1140
1141 crate::test_complete!("actor_id_generation_distinct");
1142 }
1143
1144 #[test]
1145 fn actor_ref_is_cloneable() {
1146 init_test("actor_ref_is_cloneable");
1147
1148 let mut state = RuntimeState::new();
1149 let root = state.create_root_region(Budget::INFINITE);
1150 let cx: Cx = Cx::for_testing();
1151 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1152
1153 let (handle, stored) = scope
1154 .spawn_actor(&mut state, &cx, Counter::new(), 32)
1155 .unwrap();
1156 state.store_spawned_task(handle.task_id(), stored);
1157
1158 let ref1 = handle.sender();
1160 let ref2 = ref1.clone();
1161
1162 assert_eq!(ref1.actor_id(), handle.actor_id());
1164 assert_eq!(ref2.actor_id(), handle.actor_id());
1165
1166 assert!(ref1.is_alive());
1168 assert!(ref2.is_alive());
1169
1170 assert!(!ref1.is_closed());
1172 assert!(!ref2.is_closed());
1173
1174 crate::test_complete!("actor_ref_is_cloneable");
1175 }
1176
1177 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1180
1181 struct ObservableCounter {
1184 count: u64,
1185 on_stop_count: Arc<AtomicU64>,
1186 started: Arc<AtomicBool>,
1187 stopped: Arc<AtomicBool>,
1188 }
1189
1190 impl ObservableCounter {
1191 fn new(
1192 on_stop_count: Arc<AtomicU64>,
1193 started: Arc<AtomicBool>,
1194 stopped: Arc<AtomicBool>,
1195 ) -> Self {
1196 Self {
1197 count: 0,
1198 on_stop_count,
1199 started,
1200 stopped,
1201 }
1202 }
1203 }
1204
1205 impl Actor for ObservableCounter {
1206 type Message = u64;
1207
1208 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1209 self.started.store(true, Ordering::SeqCst);
1210 Box::pin(async {})
1211 }
1212
1213 fn handle(&mut self, _cx: &Cx, msg: u64) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1214 self.count += msg;
1215 Box::pin(async {})
1216 }
1217
1218 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1219 self.on_stop_count.store(self.count, Ordering::SeqCst);
1220 self.stopped.store(true, Ordering::SeqCst);
1221 Box::pin(async {})
1222 }
1223 }
1224
1225 fn observable_state() -> (Arc<AtomicU64>, Arc<AtomicBool>, Arc<AtomicBool>) {
1226 (
1227 Arc::new(AtomicU64::new(u64::MAX)),
1228 Arc::new(AtomicBool::new(false)),
1229 Arc::new(AtomicBool::new(false)),
1230 )
1231 }
1232
1233 #[test]
1236 fn actor_processes_all_messages() {
1237 init_test("actor_processes_all_messages");
1238
1239 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1240 let region = runtime.state.create_root_region(Budget::INFINITE);
1241 let cx: Cx = Cx::for_testing();
1242 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1243
1244 let (on_stop_count, started, stopped) = observable_state();
1245 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1246
1247 let (handle, stored) = scope
1248 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1249 .unwrap();
1250 let task_id = handle.task_id();
1251 runtime.state.store_spawned_task(task_id, stored);
1252
1253 for _ in 0..5 {
1255 handle.try_send(1).unwrap();
1256 }
1257
1258 drop(handle);
1261
1262 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1263 runtime.run_until_quiescent();
1264
1265 assert_eq!(
1266 on_stop_count.load(Ordering::SeqCst),
1267 5,
1268 "all messages processed"
1269 );
1270 assert!(started.load(Ordering::SeqCst), "on_start was called");
1271 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1272
1273 crate::test_complete!("actor_processes_all_messages");
1274 }
1275
1276 #[test]
1280 fn actor_drains_mailbox_on_cancel() {
1281 init_test("actor_drains_mailbox_on_cancel");
1282
1283 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1284 let region = runtime.state.create_root_region(Budget::INFINITE);
1285 let cx: Cx = Cx::for_testing();
1286 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1287
1288 let (on_stop_count, started, stopped) = observable_state();
1289 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1290
1291 let (handle, stored) = scope
1292 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1293 .unwrap();
1294 let task_id = handle.task_id();
1295 runtime.state.store_spawned_task(task_id, stored);
1296
1297 for _ in 0..5 {
1299 handle.try_send(1).unwrap();
1300 }
1301
1302 handle.stop();
1305
1306 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1307 runtime.run_until_quiescent();
1308
1309 assert_eq!(
1311 on_stop_count.load(Ordering::SeqCst),
1312 5,
1313 "drain processed all messages"
1314 );
1315 assert!(started.load(Ordering::SeqCst), "on_start was called");
1316 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1317
1318 crate::test_complete!("actor_drains_mailbox_on_cancel");
1319 }
1320
1321 #[test]
1323 fn actor_ref_is_alive_transitions() {
1324 init_test("actor_ref_is_alive_transitions");
1325
1326 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1327 let region = runtime.state.create_root_region(Budget::INFINITE);
1328 let cx: Cx = Cx::for_testing();
1329 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1330
1331 let (on_stop_count, started, stopped) = observable_state();
1332 let actor = ObservableCounter::new(on_stop_count.clone(), started.clone(), stopped.clone());
1333
1334 let (handle, stored) = scope
1335 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1336 .unwrap();
1337 let task_id = handle.task_id();
1338 runtime.state.store_spawned_task(task_id, stored);
1339
1340 let actor_ref = handle.sender();
1341 assert!(actor_ref.is_alive(), "created actor should be alive");
1342 assert_eq!(actor_ref.actor_id(), handle.actor_id());
1343
1344 handle.stop();
1345 assert!(actor_ref.is_alive(), "stopping actor is still alive");
1346
1347 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1348 runtime.run_until_quiescent();
1349
1350 assert!(
1351 handle.is_finished(),
1352 "actor should be finished after stop + run"
1353 );
1354 assert!(!actor_ref.is_alive(), "finished actor is not alive");
1355
1356 assert!(started.load(Ordering::SeqCst), "on_start was called");
1358 assert!(stopped.load(Ordering::SeqCst), "on_stop was called");
1359 assert_ne!(
1360 on_stop_count.load(Ordering::SeqCst),
1361 u64::MAX,
1362 "on_stop_count updated"
1363 );
1364
1365 crate::test_complete!("actor_ref_is_alive_transitions");
1366 }
1367
1368 #[test]
1372 fn supervised_actor_restarts_on_panic() {
1373 use std::sync::atomic::AtomicU32;
1374
1375 struct PanickingCounter {
1376 count: u64,
1377 panic_on: u64,
1378 final_count: Arc<AtomicU64>,
1379 restart_count: Arc<AtomicU32>,
1380 }
1381
1382 impl Actor for PanickingCounter {
1383 type Message = u64;
1384
1385 fn handle(
1386 &mut self,
1387 _cx: &Cx,
1388 msg: u64,
1389 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1390 assert!(msg != self.panic_on, "threshold exceeded: {msg}");
1391 self.count += msg;
1392 Box::pin(async {})
1393 }
1394
1395 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1396 self.final_count.store(self.count, Ordering::SeqCst);
1397 Box::pin(async {})
1398 }
1399 }
1400
1401 init_test("supervised_actor_restarts_on_panic");
1402
1403 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1404 let region = runtime.state.create_root_region(Budget::INFINITE);
1405 let cx: Cx = Cx::for_testing();
1406 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1407
1408 let final_count = Arc::new(AtomicU64::new(u64::MAX));
1409 let restart_count = Arc::new(AtomicU32::new(0));
1410 let fc = final_count.clone();
1411 let rc = restart_count.clone();
1412
1413 let strategy = crate::supervision::SupervisionStrategy::Restart(
1414 crate::supervision::RestartConfig::new(3, std::time::Duration::from_mins(1)),
1415 );
1416
1417 let (handle, stored) = scope
1418 .spawn_supervised_actor(
1419 &mut runtime.state,
1420 &cx,
1421 move || {
1422 rc.fetch_add(1, Ordering::SeqCst);
1423 PanickingCounter {
1424 count: 0,
1425 panic_on: 999,
1426 final_count: fc.clone(),
1427 restart_count: rc.clone(),
1428 }
1429 },
1430 strategy,
1431 32,
1432 )
1433 .unwrap();
1434 let task_id = handle.task_id();
1435 runtime.state.store_spawned_task(task_id, stored);
1436
1437 handle.try_send(1).unwrap();
1442 handle.try_send(999).unwrap(); handle.try_send(1).unwrap(); drop(handle);
1447
1448 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1449 runtime.run_until_quiescent();
1450
1451 assert!(
1455 restart_count.load(Ordering::SeqCst) >= 2,
1456 "factory should have been called at least twice (initial + restart), got {}",
1457 restart_count.load(Ordering::SeqCst)
1458 );
1459
1460 assert_eq!(
1462 final_count.load(Ordering::SeqCst),
1463 1,
1464 "restarted actor should have processed the post-panic message"
1465 );
1466
1467 crate::test_complete!("supervised_actor_restarts_on_panic");
1468 }
1469
1470 #[test]
1472 fn actor_deterministic_replay() {
1473 fn run_scenario(seed: u64) -> u64 {
1474 let config = crate::lab::LabConfig::new(seed);
1475 let mut runtime = crate::lab::LabRuntime::new(config);
1476 let region = runtime.state.create_root_region(Budget::INFINITE);
1477 let cx: Cx = Cx::for_testing();
1478 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1479
1480 let (on_stop_count, started, stopped) = observable_state();
1481 let actor = ObservableCounter::new(on_stop_count.clone(), started, stopped);
1482
1483 let (handle, stored) = scope
1484 .spawn_actor(&mut runtime.state, &cx, actor, 32)
1485 .unwrap();
1486 let task_id = handle.task_id();
1487 runtime.state.store_spawned_task(task_id, stored);
1488
1489 for i in 1..=10 {
1490 handle.try_send(i).unwrap();
1491 }
1492 drop(handle);
1493
1494 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1495 runtime.run_until_quiescent();
1496
1497 on_stop_count.load(Ordering::SeqCst)
1498 }
1499
1500 init_test("actor_deterministic_replay");
1501
1502 let result1 = run_scenario(0xDEAD_BEEF);
1504 let result2 = run_scenario(0xDEAD_BEEF);
1505
1506 assert_eq!(
1507 result1, result2,
1508 "deterministic replay: same seed → same result"
1509 );
1510 assert_eq!(result1, 55, "sum of 1..=10");
1511
1512 crate::test_complete!("actor_deterministic_replay");
1513 }
1514
1515 #[test]
1518 fn actor_context_self_reference() {
1519 init_test("actor_context_self_reference");
1520
1521 let mut state = RuntimeState::new();
1522 let root = state.create_root_region(Budget::INFINITE);
1523 let cx: Cx = Cx::for_testing();
1524 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
1525
1526 let (handle, stored) = scope
1527 .spawn_actor(&mut state, &cx, Counter::new(), 32)
1528 .unwrap();
1529 state.store_spawned_task(handle.task_id(), stored);
1530
1531 let actor_ref = handle.sender();
1533 let actor_id = handle.actor_id();
1534 let ctx: ActorContext<'_, u64> = ActorContext::new(&cx, actor_ref, actor_id, None);
1535
1536 assert_eq!(ctx.self_actor_id(), actor_id);
1538 assert_eq!(ctx.actor_id(), actor_id);
1539
1540 crate::test_complete!("actor_context_self_reference");
1541 }
1542
1543 #[test]
1544 fn actor_context_child_management() {
1545 init_test("actor_context_child_management");
1546
1547 let cx: Cx = Cx::for_testing();
1548 let (sender, _receiver) = mpsc::channel::<u64>(32);
1549 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1550 let actor_ref = ActorRef {
1551 actor_id,
1552 sender,
1553 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1554 };
1555
1556 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1557
1558 assert!(!ctx.has_children());
1560 assert_eq!(ctx.child_count(), 0);
1561 assert!(ctx.children().is_empty());
1562
1563 let child1 = ActorId::from_task(TaskId::new_for_test(2, 1));
1565 let child2 = ActorId::from_task(TaskId::new_for_test(3, 1));
1566
1567 ctx.register_child(child1);
1568 assert!(ctx.has_children());
1569 assert_eq!(ctx.child_count(), 1);
1570
1571 ctx.register_child(child2);
1572 assert_eq!(ctx.child_count(), 2);
1573
1574 assert!(ctx.unregister_child(child1));
1576 assert_eq!(ctx.child_count(), 1);
1577
1578 assert!(!ctx.unregister_child(child1));
1580
1581 crate::test_complete!("actor_context_child_management");
1582 }
1583
1584 #[test]
1585 fn actor_context_stopping() {
1586 init_test("actor_context_stopping");
1587
1588 let cx: Cx = Cx::for_testing();
1589 let (sender, _receiver) = mpsc::channel::<u64>(32);
1590 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1591 let actor_ref = ActorRef {
1592 actor_id,
1593 sender,
1594 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1595 };
1596
1597 let mut ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1598
1599 assert!(!ctx.is_stopping());
1601 assert!(ctx.checkpoint().is_ok());
1602
1603 ctx.stop_self();
1605 assert!(ctx.is_stopping());
1606 assert!(ctx.checkpoint().is_err());
1607 assert!(ctx.is_cancel_requested());
1608
1609 crate::test_complete!("actor_context_stopping");
1610 }
1611
1612 #[test]
1613 fn actor_context_parent_none() {
1614 init_test("actor_context_parent_none");
1615
1616 let cx: Cx = Cx::for_testing();
1617 let (sender, _receiver) = mpsc::channel::<u64>(32);
1618 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1619 let actor_ref = ActorRef {
1620 actor_id,
1621 sender,
1622 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1623 };
1624
1625 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1626
1627 assert!(!ctx.has_parent());
1629 assert!(ctx.parent().is_none());
1630
1631 crate::test_complete!("actor_context_parent_none");
1632 }
1633
1634 #[test]
1635 fn actor_context_cx_delegation() {
1636 init_test("actor_context_cx_delegation");
1637
1638 let cx: Cx = Cx::for_testing();
1639 let (sender, _receiver) = mpsc::channel::<u64>(32);
1640 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1641 let actor_ref = ActorRef {
1642 actor_id,
1643 sender,
1644 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1645 };
1646
1647 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1648
1649 let _budget = ctx.budget();
1651 ctx.trace("test_event");
1652
1653 let _cx_ref = ctx.cx();
1655
1656 crate::test_complete!("actor_context_cx_delegation");
1657 }
1658
1659 #[test]
1660 fn actor_context_debug() {
1661 init_test("actor_context_debug");
1662
1663 let cx: Cx = Cx::for_testing();
1664 let (sender, _receiver) = mpsc::channel::<u64>(32);
1665 let actor_id = ActorId::from_task(TaskId::new_for_test(1, 1));
1666 let actor_ref = ActorRef {
1667 actor_id,
1668 sender,
1669 state: Arc::new(ActorStateCell::new(ActorState::Running)),
1670 };
1671
1672 let ctx = ActorContext::new(&cx, actor_ref, actor_id, None);
1673
1674 let debug_str = format!("{ctx:?}");
1676 assert!(debug_str.contains("ActorContext"));
1677 assert!(debug_str.contains("actor_id"));
1678
1679 crate::test_complete!("actor_context_debug");
1680 }
1681}