1use std::future::Future;
59use std::pin::Pin;
60use std::sync::atomic::{AtomicU8, Ordering};
61use std::sync::Arc;
62
63use crate::actor::{ActorId, ActorState};
64use crate::channel::mpsc;
65use crate::channel::oneshot;
66use crate::channel::session::{self, TrackedOneshotPermit};
67use crate::cx::Cx;
68use crate::monitor::{DownNotification, DownReason};
69use crate::obligation::graded::{AbortedProof, CommittedProof, SendPermit};
70use crate::runtime::{JoinError, SpawnError};
71use crate::types::{Budget, CancelReason, CxInner, Outcome, TaskId, Time};
72
73struct PhaseBudgetGuard {
83 inner: Arc<std::sync::RwLock<CxInner>>,
84 original_budget: Budget,
85 original_baseline: Budget,
86 phase_baseline: Budget,
87 restore_original: bool,
88}
89
90impl PhaseBudgetGuard {
91 fn enter(cx: &Cx, phase_budget: Budget, restore_original: bool) -> Self {
92 let inner = Arc::clone(&cx.inner);
93 let (original_budget, original_baseline, phase_baseline) = {
94 let mut guard = inner.write().expect("lock poisoned");
95 let original_budget = guard.budget;
96 let original_baseline = guard.budget_baseline;
97 let phase_baseline = original_budget.meet(phase_budget);
98 guard.budget = phase_baseline;
99 guard.budget_baseline = phase_baseline;
100 drop(guard);
101 (original_budget, original_baseline, phase_baseline)
102 };
103 Self {
104 inner,
105 original_budget,
106 original_baseline,
107 phase_baseline,
108 restore_original,
109 }
110 }
111}
112
113impl Drop for PhaseBudgetGuard {
114 fn drop(&mut self) {
115 if !self.restore_original {
116 return;
117 }
118
119 let Ok(mut guard) = self.inner.write() else {
120 return;
121 };
122
123 let phase_remaining = guard.budget;
124 let polls_used = self
125 .phase_baseline
126 .poll_quota
127 .saturating_sub(phase_remaining.poll_quota);
128
129 let cost_used = match (self.phase_baseline.cost_quota, phase_remaining.cost_quota) {
130 (Some(base), Some(rem)) => base.saturating_sub(rem),
131 _ => 0,
132 };
133
134 let restored_cost_quota = self
135 .original_budget
136 .cost_quota
137 .map(|orig| orig.saturating_sub(cost_used));
138
139 guard.budget = Budget {
140 deadline: self.original_budget.deadline,
141 poll_quota: self.original_budget.poll_quota.saturating_sub(polls_used),
142 cost_quota: restored_cost_quota,
143 priority: self.original_budget.priority,
144 };
145 guard.budget_baseline = self.original_baseline;
146 }
147}
148
149struct AsyncMaskGuard {
153 inner: Arc<std::sync::RwLock<CxInner>>,
154}
155
156impl AsyncMaskGuard {
157 fn enter(cx: &Cx) -> Self {
158 let inner = Arc::clone(&cx.inner);
159 {
160 let mut guard = inner.write().expect("lock poisoned");
161 crate::assert_with_log!(
162 guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
163 "mask_depth",
164 guard.mask_depth,
165 guard.mask_depth
166 );
167 guard.mask_depth += 1;
168 }
169 Self { inner }
170 }
171}
172
173impl Drop for AsyncMaskGuard {
174 fn drop(&mut self) {
175 if let Ok(mut guard) = self.inner.write() {
176 guard.mask_depth = guard.mask_depth.saturating_sub(1);
177 }
178 }
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
197pub enum CastOverflowPolicy {
198 #[default]
203 Reject,
204
205 DropOldest,
211}
212
213impl std::fmt::Display for CastOverflowPolicy {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 Self::Reject => write!(f, "Reject"),
217 Self::DropOldest => write!(f, "DropOldest"),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
231pub struct DownMsg {
232 pub completion_vt: Time,
234 pub notification: DownNotification,
236}
237
238impl DownMsg {
239 #[must_use]
241 pub const fn new(completion_vt: Time, notification: DownNotification) -> Self {
242 Self {
243 completion_vt,
244 notification,
245 }
246 }
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ExitMsg {
252 pub exit_vt: Time,
254 pub from: TaskId,
256 pub reason: DownReason,
258}
259
260impl ExitMsg {
261 #[must_use]
263 pub const fn new(exit_vt: Time, from: TaskId, reason: DownReason) -> Self {
264 Self {
265 exit_vt,
266 from,
267 reason,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub struct TimeoutMsg {
275 pub tick_vt: Time,
277 pub id: u64,
279}
280
281impl TimeoutMsg {
282 #[must_use]
284 pub const fn new(tick_vt: Time, id: u64) -> Self {
285 Self { tick_vt, id }
286 }
287}
288
289#[derive(Debug, Clone)]
291pub enum SystemMsg {
292 Down {
294 completion_vt: Time,
296 notification: DownNotification,
298 },
299
300 Exit {
302 exit_vt: Time,
304 from: TaskId,
306 reason: DownReason,
308 },
309
310 Timeout {
312 tick_vt: Time,
314 id: u64,
316 },
317}
318
319impl SystemMsg {
320 #[must_use]
322 pub fn down(msg: DownMsg) -> Self {
323 msg.into()
324 }
325
326 #[must_use]
328 pub fn exit(msg: ExitMsg) -> Self {
329 msg.into()
330 }
331
332 #[must_use]
334 pub fn timeout(msg: TimeoutMsg) -> Self {
335 msg.into()
336 }
337
338 const fn vt(&self) -> Time {
339 match self {
340 Self::Down { completion_vt, .. } => *completion_vt,
341 Self::Exit { exit_vt, .. } => *exit_vt,
342 Self::Timeout { tick_vt, .. } => *tick_vt,
343 }
344 }
345
346 const fn kind_rank(&self) -> u8 {
347 match self {
348 Self::Down { .. } => 0,
349 Self::Exit { .. } => 1,
350 Self::Timeout { .. } => 2,
351 }
352 }
353
354 const fn subject_key(&self) -> SystemMsgSubjectKey {
355 match self {
356 Self::Down { notification, .. } => SystemMsgSubjectKey::Task(notification.monitored),
357 Self::Exit { from, .. } => SystemMsgSubjectKey::Task(*from),
358 Self::Timeout { id, .. } => SystemMsgSubjectKey::TimeoutId(*id),
359 }
360 }
361
362 #[must_use]
372 pub const fn sort_key(&self) -> (Time, u8, SystemMsgSubjectKey) {
373 (self.vt(), self.kind_rank(), self.subject_key())
374 }
375}
376
377impl From<DownMsg> for SystemMsg {
378 fn from(value: DownMsg) -> Self {
379 Self::Down {
380 completion_vt: value.completion_vt,
381 notification: value.notification,
382 }
383 }
384}
385
386impl From<ExitMsg> for SystemMsg {
387 fn from(value: ExitMsg) -> Self {
388 Self::Exit {
389 exit_vt: value.exit_vt,
390 from: value.from,
391 reason: value.reason,
392 }
393 }
394}
395
396impl From<TimeoutMsg> for SystemMsg {
397 fn from(value: TimeoutMsg) -> Self {
398 Self::Timeout {
399 tick_vt: value.tick_vt,
400 id: value.id,
401 }
402 }
403}
404
405impl TryFrom<SystemMsg> for DownMsg {
406 type Error = SystemMsg;
407
408 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
409 match value {
410 SystemMsg::Down {
411 completion_vt,
412 notification,
413 } => Ok(Self {
414 completion_vt,
415 notification,
416 }),
417 other => Err(other),
418 }
419 }
420}
421
422impl TryFrom<SystemMsg> for ExitMsg {
423 type Error = SystemMsg;
424
425 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
426 match value {
427 SystemMsg::Exit {
428 exit_vt,
429 from,
430 reason,
431 } => Ok(Self {
432 exit_vt,
433 from,
434 reason,
435 }),
436 other => Err(other),
437 }
438 }
439}
440
441impl TryFrom<SystemMsg> for TimeoutMsg {
442 type Error = SystemMsg;
443
444 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
445 match value {
446 SystemMsg::Timeout { tick_vt, id } => Ok(Self { tick_vt, id }),
447 other => Err(other),
448 }
449 }
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
454pub enum SystemMsgSubjectKey {
455 Task(TaskId),
457 TimeoutId(u64),
459}
460
461#[derive(Debug, Default)]
466pub struct SystemMsgBatch {
467 entries: Vec<SystemMsg>,
468}
469
470impl SystemMsgBatch {
471 #[must_use]
473 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn push(&mut self, msg: SystemMsg) {
479 self.entries.push(msg);
480 }
481
482 #[must_use]
484 pub fn into_sorted(mut self) -> Vec<SystemMsg> {
485 self.entries.sort_by_key(SystemMsg::sort_key);
486 self.entries
487 }
488}
489
490pub trait GenServer: Send + 'static {
500 type Call: Send + 'static;
502
503 type Reply: Send + 'static;
505
506 type Cast: Send + 'static;
508
509 type Info: Send + 'static;
517
518 fn handle_call(
523 &mut self,
524 cx: &Cx,
525 request: Self::Call,
526 reply: Reply<Self::Reply>,
527 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
528
529 fn handle_cast(
533 &mut self,
534 _cx: &Cx,
535 _msg: Self::Cast,
536 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
537 Box::pin(async {})
538 }
539
540 fn handle_info(
544 &mut self,
545 _cx: &Cx,
546 _msg: Self::Info,
547 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
548 Box::pin(async {})
549 }
550
551 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
553 Box::pin(async {})
554 }
555
556 fn on_start_budget(&self) -> Budget {
562 Budget::INFINITE
563 }
564
565 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
567 Box::pin(async {})
568 }
569
570 fn on_stop_budget(&self) -> Budget {
574 Budget::MINIMAL
575 }
576
577 fn cast_overflow_policy(&self) -> CastOverflowPolicy {
584 CastOverflowPolicy::Reject
585 }
586}
587
588pub struct Reply<R> {
598 cx: Cx,
599 permit: TrackedOneshotPermit<R>,
600}
601
602impl<R: Send + 'static> Reply<R> {
603 fn new(cx: &Cx, permit: TrackedOneshotPermit<R>) -> Self {
604 Self {
605 cx: cx.clone(),
606 permit,
607 }
608 }
609
610 pub fn send(self, value: R) -> ReplyOutcome {
615 match self.permit.send(value) {
616 Ok(proof) => {
617 self.cx.trace("gen_server::reply_committed");
618 ReplyOutcome::Committed(proof)
619 }
620 Err(_send_err) => {
621 self.cx.trace("gen_server::reply_caller_gone");
624 ReplyOutcome::CallerGone
625 }
626 }
627 }
628
629 #[must_use]
634 pub fn abort(self) -> AbortedProof<SendPermit> {
635 self.cx.trace("gen_server::reply_aborted");
636 self.permit.abort()
637 }
638
639 #[must_use]
641 pub fn is_closed(&self) -> bool {
642 self.permit.is_closed()
643 }
644}
645
646impl<R> std::fmt::Debug for Reply<R> {
647 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648 f.debug_struct("Reply")
649 .field("pending", &!self.permit.is_closed())
650 .finish_non_exhaustive()
651 }
652}
653
654pub enum ReplyOutcome {
656 Committed(CommittedProof<SendPermit>),
658 CallerGone,
660}
661
662impl std::fmt::Debug for ReplyOutcome {
663 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
664 match self {
665 Self::Committed(_) => f.debug_tuple("Committed").finish(),
666 Self::CallerGone => write!(f, "CallerGone"),
667 }
668 }
669}
670
671enum Envelope<S: GenServer> {
677 Call {
678 request: S::Call,
679 reply_permit: TrackedOneshotPermit<S::Reply>,
680 },
681 Cast {
682 msg: S::Cast,
683 },
684 Info {
685 msg: S::Info,
686 },
687}
688
689impl<S: GenServer> std::fmt::Debug for Envelope<S> {
690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691 match self {
692 Self::Call { .. } => f.debug_struct("Envelope::Call").finish_non_exhaustive(),
693 Self::Cast { .. } => f.debug_struct("Envelope::Cast").finish_non_exhaustive(),
694 Self::Info { .. } => f.debug_struct("Envelope::Info").finish_non_exhaustive(),
695 }
696 }
697}
698
699struct GenServerCell<S: GenServer> {
704 mailbox: mpsc::Receiver<Envelope<S>>,
705 state: Arc<GenServerStateCell>,
706 _keep_alive: mpsc::Sender<Envelope<S>>,
707}
708
709#[derive(Debug)]
710struct GenServerStateCell {
711 state: AtomicU8,
712}
713
714impl GenServerStateCell {
715 fn new(state: ActorState) -> Self {
716 Self {
717 state: AtomicU8::new(encode_actor_state(state)),
718 }
719 }
720
721 fn load(&self) -> ActorState {
722 decode_actor_state(self.state.load(Ordering::Acquire))
723 }
724
725 fn store(&self, state: ActorState) {
726 self.state
727 .store(encode_actor_state(state), Ordering::Release);
728 }
729}
730
731const fn encode_actor_state(state: ActorState) -> u8 {
732 match state {
733 ActorState::Created => 0,
734 ActorState::Running => 1,
735 ActorState::Stopping => 2,
736 ActorState::Stopped => 3,
737 }
738}
739
740const fn decode_actor_state(value: u8) -> ActorState {
741 match value {
742 0 => ActorState::Created,
743 1 => ActorState::Running,
744 2 => ActorState::Stopping,
745 _ => ActorState::Stopped,
746 }
747}
748
749#[derive(Debug)]
758pub struct GenServerHandle<S: GenServer> {
759 actor_id: ActorId,
760 sender: mpsc::Sender<Envelope<S>>,
761 state: Arc<GenServerStateCell>,
762 task_id: TaskId,
763 receiver: oneshot::Receiver<Result<S, JoinError>>,
764 inner: std::sync::Weak<std::sync::RwLock<CxInner>>,
765 overflow_policy: CastOverflowPolicy,
766}
767
768#[derive(Debug)]
770pub enum CallError {
771 ServerStopped,
773 NoReply,
775 Cancelled(CancelReason),
777}
778
779impl std::fmt::Display for CallError {
780 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781 match self {
782 Self::ServerStopped => write!(f, "GenServer has stopped"),
783 Self::NoReply => write!(f, "GenServer did not reply"),
784 Self::Cancelled(reason) => write!(f, "GenServer call cancelled: {reason}"),
785 }
786 }
787}
788
789impl std::error::Error for CallError {}
790
791#[derive(Debug)]
793pub enum CastError {
794 ServerStopped,
796 Full,
798 Cancelled(CancelReason),
800}
801
802impl std::fmt::Display for CastError {
803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804 match self {
805 Self::ServerStopped => write!(f, "GenServer has stopped"),
806 Self::Full => write!(f, "GenServer mailbox full"),
807 Self::Cancelled(reason) => write!(f, "GenServer cast cancelled: {reason}"),
808 }
809 }
810}
811
812impl std::error::Error for CastError {}
813
814#[derive(Debug)]
816pub enum InfoError {
817 ServerStopped,
819 Full,
821 Cancelled(CancelReason),
823}
824
825impl std::fmt::Display for InfoError {
826 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827 match self {
828 Self::ServerStopped => write!(f, "GenServer has stopped"),
829 Self::Full => write!(f, "GenServer mailbox full"),
830 Self::Cancelled(reason) => write!(f, "GenServer info cancelled: {reason}"),
831 }
832 }
833}
834
835impl std::error::Error for InfoError {}
836
837impl<S: GenServer> GenServerHandle<S> {
838 pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
845 if cx.checkpoint().is_err() {
846 cx.trace("gen_server::call_rejected_cancelled");
847 let reason = cx
848 .cancel_reason()
849 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
850 return Err(CallError::Cancelled(reason));
851 }
852
853 if matches!(
854 self.state.load(),
855 ActorState::Stopping | ActorState::Stopped
856 ) {
857 cx.trace("gen_server::call_rejected_stopped");
858 return Err(CallError::ServerStopped);
859 }
860
861 let (reply_tx, reply_rx) = session::tracked_oneshot::<S::Reply>();
862 let reply_permit = reply_tx.reserve(cx);
863 let envelope = Envelope::Call {
864 request,
865 reply_permit,
866 };
867
868 if let Err(e) = self.sender.send(cx, envelope).await {
869 let (envelope, was_cancelled) = match e {
872 mpsc::SendError::Cancelled(v) => (v, true),
873 mpsc::SendError::Disconnected(v) | mpsc::SendError::Full(v) => (v, false),
874 };
875 if let Envelope::Call { reply_permit, .. } = envelope {
876 let _aborted = reply_permit.abort();
877 }
878 if was_cancelled {
879 cx.trace("gen_server::call_send_cancelled");
880 let reason = cx
881 .cancel_reason()
882 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
883 return Err(CallError::Cancelled(reason));
884 }
885 cx.trace("gen_server::call_send_failed");
886 return Err(CallError::ServerStopped);
887 }
888
889 cx.trace("gen_server::call_enqueued");
890
891 match reply_rx.recv(cx).await {
892 Ok(v) => Ok(v),
893 Err(oneshot::RecvError::Closed) => {
894 cx.trace("gen_server::call_no_reply");
895 Err(CallError::NoReply)
896 }
897 Err(oneshot::RecvError::Cancelled) => {
898 cx.trace("gen_server::call_reply_cancelled");
899 let reason = cx
900 .cancel_reason()
901 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
902 Err(CallError::Cancelled(reason))
903 }
904 }
905 }
906
907 pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
909 if cx.checkpoint().is_err() {
910 cx.trace("gen_server::cast_rejected_cancelled");
911 let reason = cx
912 .cancel_reason()
913 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
914 return Err(CastError::Cancelled(reason));
915 }
916
917 if matches!(
918 self.state.load(),
919 ActorState::Stopping | ActorState::Stopped
920 ) {
921 cx.trace("gen_server::cast_rejected_stopped");
922 return Err(CastError::ServerStopped);
923 }
924 let envelope = Envelope::Cast { msg };
925 self.sender.send(cx, envelope).await.map_err(|e| match e {
926 mpsc::SendError::Cancelled(_) => {
927 cx.trace("gen_server::cast_send_cancelled");
928 let reason = cx
929 .cancel_reason()
930 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
931 CastError::Cancelled(reason)
932 }
933 mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
934 cx.trace("gen_server::cast_send_failed");
935 CastError::ServerStopped
936 }
937 })
938 }
939
940 pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
946 if matches!(
947 self.state.load(),
948 ActorState::Stopping | ActorState::Stopped
949 ) {
950 return Err(CastError::ServerStopped);
951 }
952 let envelope = Envelope::Cast { msg };
953 match self.overflow_policy {
954 CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
955 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
956 CastError::ServerStopped
957 }
958 mpsc::SendError::Full(_) => CastError::Full,
959 }),
960 CastOverflowPolicy::DropOldest => {
961 match self.sender.send_evict_oldest(envelope) {
962 Ok(Some(evicted)) => {
963 if let Envelope::Call { reply_permit, .. } = evicted {
966 let _aborted = reply_permit.abort();
967 }
968 if let Some(cx) = Cx::current() {
970 cx.trace("gen_server::cast_evicted_oldest");
971 }
972 Ok(())
973 }
974 Ok(None) => Ok(()),
975 Err(mpsc::SendError::Disconnected(_)) => Err(CastError::ServerStopped),
976 Err(mpsc::SendError::Full(_) | mpsc::SendError::Cancelled(_)) => {
977 unreachable!("send_evict_oldest never returns Full or Cancelled")
978 }
979 }
980 }
981 }
982 }
983
984 pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
986 if cx.checkpoint().is_err() {
987 let reason = cx
988 .cancel_reason()
989 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
990 return Err(InfoError::Cancelled(reason));
991 }
992
993 if matches!(
994 self.state.load(),
995 ActorState::Stopping | ActorState::Stopped
996 ) {
997 return Err(InfoError::ServerStopped);
998 }
999
1000 let envelope = Envelope::Info { msg };
1001 self.sender.send(cx, envelope).await.map_err(|e| match e {
1002 mpsc::SendError::Cancelled(_) => {
1003 let reason = cx
1004 .cancel_reason()
1005 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1006 InfoError::Cancelled(reason)
1007 }
1008 mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1009 mpsc::SendError::Full(_) => InfoError::Full,
1010 })
1011 }
1012
1013 pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1015 if matches!(
1016 self.state.load(),
1017 ActorState::Stopping | ActorState::Stopped
1018 ) {
1019 return Err(InfoError::ServerStopped);
1020 }
1021
1022 let envelope = Envelope::Info { msg };
1023 self.sender.try_send(envelope).map_err(|e| match e {
1024 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1025 InfoError::ServerStopped
1026 }
1027 mpsc::SendError::Full(_) => InfoError::Full,
1028 })
1029 }
1030
1031 #[must_use]
1033 pub fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1034 self.overflow_policy
1035 }
1036
1037 #[must_use]
1039 pub const fn actor_id(&self) -> ActorId {
1040 self.actor_id
1041 }
1042
1043 #[must_use]
1045 pub fn task_id(&self) -> TaskId {
1046 self.task_id
1047 }
1048
1049 #[must_use]
1051 pub fn is_finished(&self) -> bool {
1052 self.receiver.is_ready()
1053 }
1054
1055 pub fn stop(&self) {
1057 self.state.store(ActorState::Stopping);
1058 if let Some(inner) = self.inner.upgrade() {
1059 if let Ok(mut guard) = inner.write() {
1060 guard.cancel_requested = true;
1061 }
1062 }
1063 self.sender.wake_receiver();
1066 }
1067
1068 pub async fn join(&self, cx: &Cx) -> Result<S, JoinError> {
1070 self.receiver.recv(cx).await.unwrap_or_else(|_| {
1071 let reason = cx
1072 .cancel_reason()
1073 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1074 Err(JoinError::Cancelled(reason))
1075 })
1076 }
1077}
1078
1079#[derive(Debug)]
1084pub struct GenServerRef<S: GenServer> {
1085 actor_id: ActorId,
1086 sender: mpsc::Sender<Envelope<S>>,
1087 state: Arc<GenServerStateCell>,
1088 overflow_policy: CastOverflowPolicy,
1089}
1090
1091impl<S: GenServer> Clone for GenServerRef<S> {
1092 fn clone(&self) -> Self {
1093 Self {
1094 actor_id: self.actor_id,
1095 sender: self.sender.clone(),
1096 state: Arc::clone(&self.state),
1097 overflow_policy: self.overflow_policy,
1098 }
1099 }
1100}
1101
1102impl<S: GenServer> GenServerRef<S> {
1103 #[must_use]
1105 pub const fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1106 self.overflow_policy
1107 }
1108
1109 pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
1111 if cx.checkpoint().is_err() {
1112 cx.trace("gen_server::call_rejected_cancelled");
1113 let reason = cx
1114 .cancel_reason()
1115 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1116 return Err(CallError::Cancelled(reason));
1117 }
1118
1119 if matches!(
1120 self.state.load(),
1121 ActorState::Stopping | ActorState::Stopped
1122 ) {
1123 cx.trace("gen_server::call_rejected_stopped");
1124 return Err(CallError::ServerStopped);
1125 }
1126
1127 let (reply_tx, reply_rx) = session::tracked_oneshot::<S::Reply>();
1128 let reply_permit = reply_tx.reserve(cx);
1129 let envelope = Envelope::Call {
1130 request,
1131 reply_permit,
1132 };
1133
1134 if let Err(e) = self.sender.send(cx, envelope).await {
1135 let (envelope, was_cancelled) = match e {
1136 mpsc::SendError::Cancelled(v) => (v, true),
1137 mpsc::SendError::Disconnected(v) | mpsc::SendError::Full(v) => (v, false),
1138 };
1139 if let Envelope::Call { reply_permit, .. } = envelope {
1140 let _aborted = reply_permit.abort();
1141 }
1142 if was_cancelled {
1143 cx.trace("gen_server::call_send_cancelled");
1144 let reason = cx
1145 .cancel_reason()
1146 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1147 return Err(CallError::Cancelled(reason));
1148 }
1149 cx.trace("gen_server::call_send_failed");
1150 return Err(CallError::ServerStopped);
1151 }
1152
1153 cx.trace("gen_server::call_enqueued");
1154
1155 match reply_rx.recv(cx).await {
1156 Ok(v) => Ok(v),
1157 Err(oneshot::RecvError::Closed) => {
1158 cx.trace("gen_server::call_no_reply");
1159 Err(CallError::NoReply)
1160 }
1161 Err(oneshot::RecvError::Cancelled) => {
1162 cx.trace("gen_server::call_reply_cancelled");
1163 let reason = cx
1164 .cancel_reason()
1165 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1166 Err(CallError::Cancelled(reason))
1167 }
1168 }
1169 }
1170
1171 pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
1173 if cx.checkpoint().is_err() {
1174 cx.trace("gen_server::cast_rejected_cancelled");
1175 let reason = cx
1176 .cancel_reason()
1177 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1178 return Err(CastError::Cancelled(reason));
1179 }
1180
1181 if matches!(
1182 self.state.load(),
1183 ActorState::Stopping | ActorState::Stopped
1184 ) {
1185 cx.trace("gen_server::cast_rejected_stopped");
1186 return Err(CastError::ServerStopped);
1187 }
1188 let envelope = Envelope::Cast { msg };
1189 self.sender.send(cx, envelope).await.map_err(|e| match e {
1190 mpsc::SendError::Cancelled(_) => {
1191 cx.trace("gen_server::cast_send_cancelled");
1192 let reason = cx
1193 .cancel_reason()
1194 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1195 CastError::Cancelled(reason)
1196 }
1197 mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
1198 cx.trace("gen_server::cast_send_failed");
1199 CastError::ServerStopped
1200 }
1201 })
1202 }
1203
1204 pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
1208 if matches!(
1209 self.state.load(),
1210 ActorState::Stopping | ActorState::Stopped
1211 ) {
1212 return Err(CastError::ServerStopped);
1213 }
1214 let envelope = Envelope::Cast { msg };
1215 match self.overflow_policy {
1216 CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
1217 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1218 CastError::ServerStopped
1219 }
1220 mpsc::SendError::Full(_) => CastError::Full,
1221 }),
1222 CastOverflowPolicy::DropOldest => match self.sender.send_evict_oldest(envelope) {
1223 Ok(Some(evicted)) => {
1224 if let Envelope::Call { reply_permit, .. } = evicted {
1227 let _aborted = reply_permit.abort();
1228 }
1229 if let Some(cx) = Cx::current() {
1230 cx.trace("gen_server::cast_evicted_oldest");
1231 }
1232 Ok(())
1233 }
1234 Ok(None) => Ok(()),
1235 Err(mpsc::SendError::Disconnected(_)) => Err(CastError::ServerStopped),
1236 Err(mpsc::SendError::Full(_) | mpsc::SendError::Cancelled(_)) => {
1237 unreachable!("send_evict_oldest never returns Full or Cancelled")
1238 }
1239 },
1240 }
1241 }
1242
1243 pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1245 if cx.checkpoint().is_err() {
1246 let reason = cx
1247 .cancel_reason()
1248 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1249 return Err(InfoError::Cancelled(reason));
1250 }
1251
1252 if matches!(
1253 self.state.load(),
1254 ActorState::Stopping | ActorState::Stopped
1255 ) {
1256 return Err(InfoError::ServerStopped);
1257 }
1258
1259 let envelope = Envelope::Info { msg };
1260 self.sender.send(cx, envelope).await.map_err(|e| match e {
1261 mpsc::SendError::Cancelled(_) => {
1262 let reason = cx
1263 .cancel_reason()
1264 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1265 InfoError::Cancelled(reason)
1266 }
1267 mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1268 mpsc::SendError::Full(_) => InfoError::Full,
1269 })
1270 }
1271
1272 pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1274 if matches!(
1275 self.state.load(),
1276 ActorState::Stopping | ActorState::Stopped
1277 ) {
1278 return Err(InfoError::ServerStopped);
1279 }
1280
1281 let envelope = Envelope::Info { msg };
1282 self.sender.try_send(envelope).map_err(|e| match e {
1283 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1284 InfoError::ServerStopped
1285 }
1286 mpsc::SendError::Full(_) => InfoError::Full,
1287 })
1288 }
1289
1290 #[must_use]
1292 pub fn is_closed(&self) -> bool {
1293 self.sender.is_closed()
1294 }
1295
1296 #[must_use]
1298 pub fn is_alive(&self) -> bool {
1299 self.state.load() != ActorState::Stopped
1300 }
1301
1302 #[must_use]
1304 pub const fn actor_id(&self) -> ActorId {
1305 self.actor_id
1306 }
1307}
1308
1309impl<S: GenServer> GenServerHandle<S> {
1310 #[must_use]
1312 pub fn server_ref(&self) -> GenServerRef<S> {
1313 GenServerRef {
1314 actor_id: self.actor_id,
1315 sender: self.sender.clone(),
1316 state: Arc::clone(&self.state),
1317 overflow_policy: self.overflow_policy,
1318 }
1319 }
1320}
1321
1322pub const DEFAULT_GENSERVER_MAILBOX_CAPACITY: usize = 64;
1328
1329async fn run_gen_server_loop<S: GenServer>(mut server: S, cx: Cx, cell: &GenServerCell<S>) -> S {
1331 use crate::tracing_compat::debug;
1332
1333 cell.state.store(ActorState::Running);
1334
1335 if cx.is_cancel_requested() {
1337 cx.trace("gen_server::init_skipped_cancelled");
1338 } else {
1339 cx.trace("gen_server::init");
1340 let _budget = PhaseBudgetGuard::enter(&cx, server.on_start_budget(), true);
1341 server.on_start(&cx).await;
1342 }
1343
1344 loop {
1346 if cx.is_cancel_requested() {
1347 cx.trace("gen_server::cancel_requested");
1348 break;
1349 }
1350
1351 match cell.mailbox.recv(&cx).await {
1352 Ok(envelope) => {
1353 dispatch_envelope(&mut server, &cx, envelope).await;
1354 }
1355 Err(crate::channel::mpsc::RecvError::Disconnected) => {
1356 cx.trace("gen_server::mailbox_disconnected");
1357 break;
1358 }
1359 Err(crate::channel::mpsc::RecvError::Cancelled) => {
1360 cx.trace("gen_server::recv_cancelled");
1361 break;
1362 }
1363 Err(crate::channel::mpsc::RecvError::Empty) => {
1364 break;
1365 }
1366 }
1367 }
1368
1369 cell.state.store(ActorState::Stopping);
1370
1371 let _budget = PhaseBudgetGuard::enter(&cx, server.on_stop_budget(), false);
1377 let _mask = AsyncMaskGuard::enter(&cx);
1378
1379 let drain_limit = cell.mailbox.capacity() as u64;
1383 let mut drained: u64 = 0;
1384 while let Ok(envelope) = cell.mailbox.try_recv() {
1385 match envelope {
1386 Envelope::Call {
1387 request: _,
1388 reply_permit,
1389 } => {
1390 let _aborted: AbortedProof<SendPermit> = reply_permit.abort();
1391 cx.trace("gen_server::drain_abort_call");
1392 }
1393 Envelope::Cast { msg } => {
1394 server.handle_cast(&cx, msg).await;
1395 }
1396 Envelope::Info { msg } => {
1397 server.handle_info(&cx, msg).await;
1398 }
1399 }
1400 drained += 1;
1401 if drained >= drain_limit {
1402 break;
1403 }
1404 }
1405 if drained > 0 {
1406 debug!(drained = drained, "gen_server::mailbox_drained");
1407 cx.trace("gen_server::mailbox_drained");
1408 }
1409
1410 cx.trace("gen_server::terminate");
1412 server.on_stop(&cx).await;
1413
1414 server
1415}
1416
1417async fn dispatch_envelope<S: GenServer>(server: &mut S, cx: &Cx, envelope: Envelope<S>) {
1419 match envelope {
1420 Envelope::Call {
1421 request,
1422 reply_permit,
1423 } => {
1424 let reply = Reply::new(cx, reply_permit);
1425 server.handle_call(cx, request, reply).await;
1426 }
1427 Envelope::Cast { msg } => {
1428 server.handle_cast(cx, msg).await;
1429 }
1430 Envelope::Info { msg } => {
1431 server.handle_info(cx, msg).await;
1432 }
1433 }
1434}
1435
1436impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
1441 pub fn spawn_gen_server<S: GenServer>(
1446 &self,
1447 state: &mut crate::runtime::state::RuntimeState,
1448 cx: &Cx,
1449 server: S,
1450 mailbox_capacity: usize,
1451 ) -> Result<(GenServerHandle<S>, crate::runtime::stored_task::StoredTask), SpawnError> {
1452 use crate::cx::scope::CatchUnwind;
1453 use crate::runtime::stored_task::StoredTask;
1454 use crate::tracing_compat::{debug, debug_span};
1455
1456 let overflow_policy = server.cast_overflow_policy();
1457 let (msg_tx, msg_rx) = mpsc::channel::<Envelope<S>>(mailbox_capacity);
1458 let (result_tx, result_rx) = oneshot::channel::<Result<S, JoinError>>();
1459 let task_id = self.create_task_record(state)?;
1460 let actor_id = ActorId::from_task(task_id);
1461 let server_state = Arc::new(GenServerStateCell::new(ActorState::Created));
1462
1463 let _span = debug_span!(
1464 "gen_server_spawn",
1465 task_id = ?task_id,
1466 region_id = ?self.region_id(),
1467 mailbox_capacity = mailbox_capacity,
1468 )
1469 .entered();
1470 debug!(
1471 task_id = ?task_id,
1472 region_id = ?self.region_id(),
1473 mailbox_capacity = mailbox_capacity,
1474 "gen_server spawned"
1475 );
1476
1477 let child_observability = cx.child_observability(self.region_id(), task_id);
1478 let child_entropy = cx.child_entropy(task_id);
1479 let io_driver = state.io_driver_handle();
1480 let child_cx = Cx::new_with_observability(
1481 self.region_id(),
1482 task_id,
1483 self.budget(),
1484 Some(child_observability),
1485 io_driver,
1486 Some(child_entropy),
1487 )
1488 .with_blocking_pool_handle(cx.blocking_pool_handle());
1489
1490 if let Some(record) = state.task_mut(task_id) {
1491 record.set_cx_inner(child_cx.inner.clone());
1492 record.set_cx(child_cx.clone());
1493 }
1494
1495 let cx_for_send = child_cx.clone();
1496 let inner_weak = Arc::downgrade(&child_cx.inner);
1497 let state_for_task = Arc::clone(&server_state);
1498
1499 let cell = GenServerCell {
1500 mailbox: msg_rx,
1501 state: Arc::clone(&server_state),
1502 _keep_alive: msg_tx.clone(),
1503 };
1504
1505 let wrapped = async move {
1506 let result = CatchUnwind(Box::pin(run_gen_server_loop(server, child_cx, &cell))).await;
1507 match result {
1508 Ok(server_final) => {
1509 let _ = result_tx.send(&cx_for_send, Ok(server_final));
1510 }
1511 Err(payload) => {
1512 let msg = crate::cx::scope::payload_to_string(&payload);
1513 let _ = result_tx.send(
1514 &cx_for_send,
1515 Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
1516 );
1517 }
1518 }
1519 state_for_task.store(ActorState::Stopped);
1520 Outcome::Ok(())
1521 };
1522
1523 let stored = StoredTask::new_with_id(wrapped, task_id);
1524
1525 let handle = GenServerHandle {
1526 actor_id,
1527 sender: msg_tx,
1528 state: server_state,
1529 task_id,
1530 receiver: result_rx,
1531 inner: inner_weak,
1532 overflow_policy,
1533 };
1534
1535 Ok((handle, stored))
1536 }
1537
1538 #[allow(clippy::too_many_arguments)]
1558 pub fn spawn_named_gen_server<S: GenServer>(
1559 &self,
1560 state: &mut crate::runtime::state::RuntimeState,
1561 cx: &Cx,
1562 registry: &mut crate::cx::NameRegistry,
1563 name: impl Into<String>,
1564 server: S,
1565 mailbox_capacity: usize,
1566 now: crate::types::Time,
1567 ) -> Result<
1568 (
1569 NamedGenServerHandle<S>,
1570 crate::runtime::stored_task::StoredTask,
1571 ),
1572 NamedSpawnError,
1573 > {
1574 let name = name.into();
1575
1576 let (handle, stored) = self
1578 .spawn_gen_server(state, cx, server, mailbox_capacity)
1579 .map_err(NamedSpawnError::Spawn)?;
1580
1581 let task_id = handle.task_id();
1583 let region = self.region_id();
1584
1585 match registry.register(name, task_id, region, now) {
1586 Ok(lease) => {
1587 let named = NamedGenServerHandle {
1588 handle,
1589 lease: Some(lease),
1590 };
1591 Ok((named, stored))
1592 }
1593 Err(e) => {
1594 let task_id = handle.task_id();
1599 if let Some(region_record) = state.region(self.region_id()) {
1600 region_record.remove_task(task_id);
1601 }
1602 state.remove_task(task_id);
1603 Err(NamedSpawnError::NameTaken(e))
1604 }
1605 }
1606 }
1607}
1608
1609#[derive(Debug)]
1615pub enum NamedSpawnError {
1616 Spawn(SpawnError),
1618 NameTaken(crate::cx::NameLeaseError),
1620}
1621
1622impl std::fmt::Display for NamedSpawnError {
1623 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1624 match self {
1625 Self::Spawn(e) => write!(f, "named server spawn failed: {e}"),
1626 Self::NameTaken(e) => write!(f, "named server registration failed: {e}"),
1627 }
1628 }
1629}
1630
1631impl std::error::Error for NamedSpawnError {}
1632
1633#[derive(Debug)]
1642pub struct NamedGenServerHandle<S: GenServer> {
1643 handle: GenServerHandle<S>,
1644 lease: Option<crate::cx::NameLease>,
1645}
1646
1647impl<S: GenServer> NamedGenServerHandle<S> {
1648 #[must_use]
1650 pub fn name(&self) -> &str {
1651 self.lease
1652 .as_ref()
1653 .map_or("(released)", crate::cx::NameLease::name)
1654 }
1655
1656 #[must_use]
1658 pub fn task_id(&self) -> TaskId {
1659 self.handle.task_id()
1660 }
1661
1662 #[must_use]
1664 pub fn actor_id(&self) -> ActorId {
1665 self.handle.actor_id()
1666 }
1667
1668 #[must_use]
1670 pub fn is_finished(&self) -> bool {
1671 self.handle.is_finished()
1672 }
1673
1674 #[must_use]
1676 pub fn server_ref(&self) -> GenServerRef<S> {
1677 self.handle.server_ref()
1678 }
1679
1680 #[must_use]
1682 pub fn inner(&self) -> &GenServerHandle<S> {
1683 &self.handle
1684 }
1685
1686 pub fn stop_and_release(&mut self) -> Result<(), crate::cx::NameLeaseError> {
1695 self.handle.stop();
1696 self.lease
1697 .as_mut()
1698 .expect("lease already resolved")
1699 .release()
1700 .map(|_proof| ())
1701 }
1702
1703 pub fn abort_lease(&mut self) -> Result<(), crate::cx::NameLeaseError> {
1712 self.lease
1713 .as_mut()
1714 .expect("lease already resolved")
1715 .abort()
1716 .map(|_proof| ())
1717 }
1718
1719 pub fn take_lease(&mut self) -> Option<crate::cx::NameLease> {
1724 self.lease.take()
1725 }
1726}
1727
1728pub struct NamedGenServerStart<S, F>
1742where
1743 S: GenServer,
1744 F: FnMut() -> S + Send + 'static,
1745{
1746 registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
1747 name: String,
1748 mailbox_capacity: usize,
1749 make_server: F,
1750}
1751
1752#[must_use]
1754pub fn named_gen_server_start<S, F>(
1755 registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
1756 name: impl Into<String>,
1757 mailbox_capacity: usize,
1758 make_server: F,
1759) -> NamedGenServerStart<S, F>
1760where
1761 S: GenServer,
1762 F: FnMut() -> S + Send + 'static,
1763{
1764 NamedGenServerStart {
1765 registry,
1766 name: name.into(),
1767 mailbox_capacity,
1768 make_server,
1769 }
1770}
1771
1772impl<S, F> crate::supervision::ChildStart for NamedGenServerStart<S, F>
1773where
1774 S: GenServer,
1775 F: FnMut() -> S + Send + 'static,
1776{
1777 fn start(
1778 &mut self,
1779 scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
1780 state: &mut crate::runtime::RuntimeState,
1781 cx: &crate::cx::Cx,
1782 ) -> Result<TaskId, SpawnError> {
1783 let now = crate::types::Time::ZERO;
1784 let server = (self.make_server)();
1785 let (mut named_handle, stored) = scope
1786 .spawn_named_gen_server(
1787 state,
1788 cx,
1789 &mut self.registry.lock(),
1790 self.name.clone(),
1791 server,
1792 self.mailbox_capacity,
1793 now,
1794 )
1795 .map_err(|err| match err {
1796 NamedSpawnError::Spawn(spawn_err) => spawn_err,
1797 NamedSpawnError::NameTaken(name_err) => SpawnError::NameRegistrationFailed {
1798 name: self.name.clone(),
1799 reason: name_err.to_string(),
1800 },
1801 })?;
1802
1803 let task_id = named_handle.task_id();
1804 state.store_spawned_task(task_id, stored);
1805
1806 let lease_slot = Arc::new(parking_lot::Mutex::new(named_handle.take_lease()));
1807 let lease_slot_for_finalizer = Arc::clone(&lease_slot);
1808 let registry_for_finalizer = Arc::clone(&self.registry);
1809 let finalizer_registered = scope.defer_sync(state, move || {
1810 let _ = registry_for_finalizer.lock().cleanup_task(task_id);
1811 let lease_to_resolve = lease_slot_for_finalizer.lock().take();
1812 if let Some(mut lease) = lease_to_resolve {
1813 let _ = lease.release();
1814 }
1815 });
1816
1817 if !finalizer_registered {
1818 let _ = self.registry.lock().cleanup_task(task_id);
1819 let lease_to_abort = lease_slot.lock().take();
1820 if let Some(mut lease) = lease_to_abort {
1821 let _ = lease.abort();
1822 }
1823 return Err(SpawnError::RegionClosed(scope.region_id()));
1824 }
1825
1826 Ok(task_id)
1827 }
1828}
1829
1830#[cfg(test)]
1835mod tests {
1836 use super::*;
1837 use crate::runtime::state::RuntimeState;
1838 use crate::supervision::ChildStart;
1839 use crate::types::policy::FailFast;
1840 use crate::types::Budget;
1841 use crate::types::CancelKind;
1842 use crate::util::ArenaIndex;
1843 use std::sync::atomic::{AtomicU64, Ordering};
1844 use std::sync::{Arc, Mutex};
1845
1846 fn init_test(name: &str) {
1847 crate::test_utils::init_test_logging();
1848 crate::test_phase!(name);
1849 }
1850
1851 struct Counter {
1854 count: u64,
1855 }
1856
1857 enum CounterCall {
1858 Get,
1859 Add(u64),
1860 }
1861
1862 enum CounterCast {
1863 Reset,
1864 }
1865
1866 impl GenServer for Counter {
1867 type Call = CounterCall;
1868 type Reply = u64;
1869 type Cast = CounterCast;
1870 type Info = SystemMsg;
1871
1872 fn handle_call(
1873 &mut self,
1874 _cx: &Cx,
1875 request: CounterCall,
1876 reply: Reply<u64>,
1877 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1878 match request {
1879 CounterCall::Get => {
1880 let _ = reply.send(self.count);
1881 }
1882 CounterCall::Add(n) => {
1883 self.count += n;
1884 let _ = reply.send(self.count);
1885 }
1886 }
1887 Box::pin(async {})
1888 }
1889
1890 fn handle_cast(
1891 &mut self,
1892 _cx: &Cx,
1893 msg: CounterCast,
1894 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1895 match msg {
1896 CounterCast::Reset => self.count = 0,
1897 }
1898 Box::pin(async {})
1899 }
1900 }
1901
1902 #[derive(Clone)]
1903 struct StartBudgetProbe {
1904 started_priority: Arc<AtomicU8>,
1905 loop_priority: Arc<AtomicU8>,
1906 }
1907
1908 impl GenServer for StartBudgetProbe {
1909 type Call = CounterCall;
1910 type Reply = u8;
1911 type Cast = CounterCast;
1912 type Info = SystemMsg;
1913
1914 fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1915 self.started_priority
1916 .store(cx.budget().priority, Ordering::SeqCst);
1917 Box::pin(async {})
1918 }
1919
1920 fn on_start_budget(&self) -> Budget {
1921 Budget::new().with_priority(200)
1922 }
1923
1924 fn handle_call(
1925 &mut self,
1926 cx: &Cx,
1927 _request: CounterCall,
1928 reply: Reply<u8>,
1929 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1930 self.loop_priority
1931 .store(cx.budget().priority, Ordering::SeqCst);
1932 let _ = reply.send(cx.budget().priority);
1933 Box::pin(async {})
1934 }
1935 }
1936
1937 struct StopMaskProbe {
1938 stop_checkpoint_ok: Arc<AtomicU8>,
1939 }
1940
1941 impl GenServer for StopMaskProbe {
1942 type Call = CounterCall;
1943 type Reply = u8;
1944 type Cast = CounterCast;
1945 type Info = SystemMsg;
1946
1947 fn handle_call(
1948 &mut self,
1949 _cx: &Cx,
1950 _request: CounterCall,
1951 reply: Reply<u8>,
1952 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1953 let _ = reply.send(0);
1954 Box::pin(async {})
1955 }
1956
1957 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
1958 let ok = cx.checkpoint().is_ok();
1959 self.stop_checkpoint_ok
1960 .store(u8::from(ok), Ordering::SeqCst);
1961 Box::pin(async {})
1962 }
1963 }
1964
1965 fn assert_gen_server<S: GenServer>() {}
1966
1967 #[test]
1968 fn gen_server_trait_bounds() {
1969 init_test("gen_server_trait_bounds");
1970 assert_gen_server::<Counter>();
1971 crate::test_complete!("gen_server_trait_bounds");
1972 }
1973
1974 #[test]
1975 fn gen_server_spawn_and_cast() {
1976 init_test("gen_server_spawn_and_cast");
1977
1978 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
1979 let region = runtime.state.create_root_region(Budget::INFINITE);
1980 let cx = Cx::for_testing();
1981 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
1982
1983 let (handle, stored) = scope
1984 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
1985 .unwrap();
1986 let task_id = handle.task_id();
1987 runtime.state.store_spawned_task(task_id, stored);
1988
1989 handle.try_cast(CounterCast::Reset).unwrap();
1991
1992 drop(handle);
1994
1995 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
1996 runtime.run_until_quiescent();
1997
1998 crate::test_complete!("gen_server_spawn_and_cast");
1999 }
2000
2001 #[test]
2002 fn gen_server_spawn_and_call() {
2003 init_test("gen_server_spawn_and_call");
2004
2005 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2006 let region = runtime.state.create_root_region(Budget::INFINITE);
2007 let cx = Cx::for_testing();
2008 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2009
2010 let (handle, stored) = scope
2011 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2012 .unwrap();
2013 let server_task_id = handle.task_id();
2014 runtime.state.store_spawned_task(server_task_id, stored);
2015
2016 let server_ref = handle.server_ref();
2017 let (client_handle, client_stored) = scope
2018 .spawn(&mut runtime.state, &cx, move |cx| async move {
2019 server_ref.call(&cx, CounterCall::Add(5)).await.unwrap()
2020 })
2021 .unwrap();
2022 let client_task_id = client_handle.task_id();
2023 runtime
2024 .state
2025 .store_spawned_task(client_task_id, client_stored);
2026
2027 runtime
2028 .scheduler
2029 .lock()
2030 .unwrap()
2031 .schedule(server_task_id, 0);
2032 runtime
2033 .scheduler
2034 .lock()
2035 .unwrap()
2036 .schedule(client_task_id, 0);
2037 runtime.run_until_quiescent();
2038
2039 let result =
2040 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2041 assert_eq!(result, 5);
2042
2043 crate::test_complete!("gen_server_spawn_and_call");
2044 }
2045
2046 #[test]
2047 fn gen_server_call_cancellation_is_deterministic() {
2048 init_test("gen_server_call_cancellation_is_deterministic");
2049
2050 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2051 let region = runtime.state.create_root_region(Budget::INFINITE);
2052 let cx = Cx::for_testing();
2053 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2054
2055 let (handle, stored) = scope
2056 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2057 .unwrap();
2058 let server_task_id = handle.task_id();
2059 runtime.state.store_spawned_task(server_task_id, stored);
2060
2061 let server_ref = handle.server_ref();
2062
2063 let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2064 let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2065
2066 let (client_handle, client_stored) = scope
2067 .spawn(&mut runtime.state, &cx, move |cx| async move {
2068 *client_cx_cell_for_task.lock().expect("lock poisoned") = Some(cx.clone());
2069 server_ref.call(&cx, CounterCall::Get).await
2070 })
2071 .unwrap();
2072 let client_task_id = client_handle.task_id();
2073 runtime
2074 .state
2075 .store_spawned_task(client_task_id, client_stored);
2076
2077 runtime
2079 .scheduler
2080 .lock()
2081 .unwrap()
2082 .schedule(client_task_id, 0);
2083 runtime.run_until_idle();
2084
2085 let client_cx = client_cx_cell
2087 .lock()
2088 .expect("lock poisoned")
2089 .clone()
2090 .expect("client cx published");
2091 client_cx.cancel_with(CancelKind::User, Some("gen_server call cancelled"));
2092
2093 runtime
2094 .scheduler
2095 .lock()
2096 .unwrap()
2097 .schedule(client_task_id, 0);
2098 runtime.run_until_idle();
2099
2100 let result =
2101 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2102 match result {
2103 Ok(_) => unreachable!("expected cancellation, got Ok"),
2104 Err(CallError::Cancelled(reason)) => {
2105 assert_eq!(reason.kind, CancelKind::User);
2106 assert_eq!(reason.message, Some("gen_server call cancelled"));
2107 }
2108 Err(other) => unreachable!("expected CallError::Cancelled, got {other:?}"),
2109 }
2110
2111 drop(handle);
2113 runtime
2114 .scheduler
2115 .lock()
2116 .unwrap()
2117 .schedule(server_task_id, 0);
2118 runtime.run_until_quiescent();
2119
2120 crate::test_complete!("gen_server_call_cancellation_is_deterministic");
2121 }
2122
2123 #[test]
2124 fn supervised_gen_server_stays_alive() {
2125 init_test("supervised_gen_server_stays_alive");
2126
2127 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2128 let region = runtime.state.create_root_region(Budget::INFINITE);
2129 let cx = Cx::for_testing();
2130 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2131 let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
2132
2133 let mut starter =
2134 named_gen_server_start(Arc::clone(®istry), "persistent_service", 32, || {
2135 Counter { count: 0 }
2136 });
2137
2138 let task_id = starter
2139 .start(&scope, &mut runtime.state, &cx)
2140 .expect("start ok");
2141
2142 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2145 runtime.run_until_idle();
2146
2147 let task = runtime.state.task(task_id).expect("task exists");
2148 crate::assert_with_log!(
2149 !task.state.is_terminal(),
2150 "server should be alive",
2151 "Running",
2152 format!("{:?}", task.state)
2153 );
2154
2155 let tasks_to_schedule =
2157 runtime
2158 .state
2159 .cancel_request(region, &CancelReason::user("test done"), None);
2160 for (tid, priority) in tasks_to_schedule {
2161 runtime.scheduler.lock().unwrap().schedule(tid, priority);
2162 }
2163 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2164 runtime.run_until_quiescent();
2165
2166 assert!(
2167 registry.lock().whereis("persistent_service").is_none(),
2168 "name must be removed after region stop",
2169 );
2170
2171 crate::test_complete!("supervised_gen_server_stays_alive");
2172 }
2173
2174 #[test]
2175 fn gen_server_cast_cancellation_is_deterministic() {
2176 init_test("gen_server_cast_cancellation_is_deterministic");
2177
2178 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2179 let region = runtime.state.create_root_region(Budget::INFINITE);
2180 let cx = Cx::for_testing();
2181 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2182
2183 let (handle, stored) = scope
2185 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
2186 .unwrap();
2187 let server_task_id = handle.task_id();
2188 runtime.state.store_spawned_task(server_task_id, stored);
2189
2190 let server_ref = handle.server_ref();
2191
2192 futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset))
2193 .expect("prefill cast ok");
2194
2195 let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2196 let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2197
2198 let (client_handle, client_stored) = scope
2199 .spawn(&mut runtime.state, &cx, move |cx| async move {
2200 *client_cx_cell_for_task.lock().expect("lock poisoned") = Some(cx.clone());
2201 server_ref.cast(&cx, CounterCast::Reset).await
2202 })
2203 .unwrap();
2204 let client_task_id = client_handle.task_id();
2205 runtime
2206 .state
2207 .store_spawned_task(client_task_id, client_stored);
2208
2209 runtime
2211 .scheduler
2212 .lock()
2213 .unwrap()
2214 .schedule(client_task_id, 0);
2215 runtime.run_until_idle();
2216
2217 let client_cx = client_cx_cell
2219 .lock()
2220 .expect("lock poisoned")
2221 .clone()
2222 .expect("client cx published");
2223 client_cx.cancel_with(CancelKind::User, Some("gen_server cast cancelled"));
2224
2225 runtime
2226 .scheduler
2227 .lock()
2228 .unwrap()
2229 .schedule(client_task_id, 0);
2230 runtime.run_until_idle();
2231
2232 let result =
2233 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2234 match result {
2235 Ok(()) => unreachable!("expected cancellation, got Ok"),
2236 Err(CastError::Cancelled(reason)) => {
2237 assert_eq!(reason.kind, CancelKind::User);
2238 assert_eq!(reason.message, Some("gen_server cast cancelled"));
2239 }
2240 Err(other) => unreachable!("expected CastError::Cancelled, got {other:?}"),
2241 }
2242
2243 drop(handle);
2245 runtime
2246 .scheduler
2247 .lock()
2248 .unwrap()
2249 .schedule(server_task_id, 0);
2250 runtime.run_until_quiescent();
2251
2252 crate::test_complete!("gen_server_cast_cancellation_is_deterministic");
2253 }
2254
2255 #[test]
2256 fn gen_server_handle_accessors() {
2257 init_test("gen_server_handle_accessors");
2258
2259 let mut state = RuntimeState::new();
2260 let root = state.create_root_region(Budget::INFINITE);
2261 let cx = Cx::for_testing();
2262 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2263
2264 let (handle, stored) = scope
2265 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2266 .unwrap();
2267 state.store_spawned_task(handle.task_id(), stored);
2268
2269 let _actor_id = handle.actor_id();
2270 let _task_id = handle.task_id();
2271 assert!(!handle.is_finished());
2272
2273 let server_ref = handle.server_ref();
2274 assert_eq!(server_ref.actor_id(), handle.actor_id());
2275 assert!(server_ref.is_alive());
2276 assert!(!server_ref.is_closed());
2277
2278 crate::test_complete!("gen_server_handle_accessors");
2279 }
2280
2281 #[test]
2282 fn gen_server_ref_is_cloneable() {
2283 init_test("gen_server_ref_is_cloneable");
2284
2285 let mut state = RuntimeState::new();
2286 let root = state.create_root_region(Budget::INFINITE);
2287 let cx = Cx::for_testing();
2288 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2289
2290 let (handle, stored) = scope
2291 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2292 .unwrap();
2293 state.store_spawned_task(handle.task_id(), stored);
2294
2295 let ref1 = handle.server_ref();
2296 let ref2 = ref1.clone();
2297 assert_eq!(ref1.actor_id(), ref2.actor_id());
2298
2299 crate::test_complete!("gen_server_ref_is_cloneable");
2300 }
2301
2302 #[test]
2303 fn gen_server_stop_transitions() {
2304 init_test("gen_server_stop_transitions");
2305
2306 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2307 let region = runtime.state.create_root_region(Budget::INFINITE);
2308 let cx = Cx::for_testing();
2309 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2310
2311 let (handle, stored) = scope
2312 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2313 .unwrap();
2314 let task_id = handle.task_id();
2315 runtime.state.store_spawned_task(task_id, stored);
2316
2317 let server_ref = handle.server_ref();
2318 assert!(server_ref.is_alive());
2319
2320 handle.stop();
2321
2322 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2323 runtime.run_until_quiescent();
2324
2325 assert!(handle.is_finished());
2326 assert!(!server_ref.is_alive());
2327
2328 crate::test_complete!("gen_server_stop_transitions");
2329 }
2330
2331 #[test]
2332 fn gen_server_handle_rejects_call_and_cast_after_stop() {
2333 init_test("gen_server_handle_rejects_call_and_cast_after_stop");
2334
2335 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2336 let region = runtime.state.create_root_region(Budget::INFINITE);
2337 let cx = Cx::for_testing();
2338 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2339
2340 let (handle, stored) = scope
2341 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2342 .unwrap();
2343 let task_id = handle.task_id();
2344 runtime.state.store_spawned_task(task_id, stored);
2345
2346 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2348 runtime.run_until_idle();
2349 handle.stop();
2350
2351 let call_err =
2352 futures_lite::future::block_on(handle.call(&cx, CounterCall::Get)).unwrap_err();
2353 assert!(
2354 matches!(call_err, CallError::ServerStopped),
2355 "call after stop must return ServerStopped, got {call_err:?}"
2356 );
2357
2358 let cast_err =
2359 futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset)).unwrap_err();
2360 assert!(
2361 matches!(cast_err, CastError::ServerStopped),
2362 "cast after stop must return ServerStopped, got {cast_err:?}"
2363 );
2364
2365 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2366 runtime.run_until_quiescent();
2367 assert!(handle.is_finished());
2368
2369 crate::test_complete!("gen_server_handle_rejects_call_and_cast_after_stop");
2370 }
2371
2372 #[test]
2373 fn gen_server_handle_join_returns_final_state_after_stop() {
2374 init_test("gen_server_handle_join_returns_final_state_after_stop");
2375
2376 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2377 let region = runtime.state.create_root_region(Budget::INFINITE);
2378 let cx = Cx::for_testing();
2379 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2380
2381 let (handle, stored) = scope
2382 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2383 .unwrap();
2384 let task_id = handle.task_id();
2385 runtime.state.store_spawned_task(task_id, stored);
2386
2387 handle.stop();
2388 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2389 runtime.run_until_quiescent();
2390 assert!(handle.is_finished());
2391
2392 let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("join");
2393 assert_eq!(
2394 final_state.count, 0,
2395 "final server state should be returned"
2396 );
2397
2398 crate::test_complete!("gen_server_handle_join_returns_final_state_after_stop");
2399 }
2400
2401 #[test]
2402 fn gen_server_stop_wakes_blocked_mailbox_recv() {
2403 #[allow(clippy::items_after_statements)]
2404 struct StopWakeProbe {
2405 stop_ran: Arc<AtomicU8>,
2406 }
2407
2408 #[allow(clippy::items_after_statements)]
2409 impl GenServer for StopWakeProbe {
2410 type Call = CounterCall;
2411 type Reply = u64;
2412 type Cast = CounterCast;
2413 type Info = SystemMsg;
2414
2415 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2416 self.stop_ran.store(1, Ordering::SeqCst);
2417 Box::pin(async {})
2418 }
2419
2420 fn handle_call(
2421 &mut self,
2422 _cx: &Cx,
2423 _request: CounterCall,
2424 reply: Reply<u64>,
2425 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2426 let _ = reply.send(0);
2427 Box::pin(async {})
2428 }
2429 }
2430
2431 init_test("gen_server_stop_wakes_blocked_mailbox_recv");
2432
2433 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2434 let region = runtime.state.create_root_region(Budget::INFINITE);
2435 let cx = Cx::for_testing();
2436 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2437
2438 let stop_ran = Arc::new(AtomicU8::new(0));
2439 let server = StopWakeProbe {
2440 stop_ran: Arc::clone(&stop_ran),
2441 };
2442
2443 let (handle, stored) = scope
2444 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2445 .unwrap();
2446 let server_task_id = handle.task_id();
2447 runtime.state.store_spawned_task(server_task_id, stored);
2448
2449 runtime
2451 .scheduler
2452 .lock()
2453 .unwrap()
2454 .schedule(server_task_id, 0);
2455 runtime.run_until_idle();
2456
2457 handle.stop();
2459 runtime.run_until_quiescent();
2460
2461 assert_eq!(
2462 stop_ran.load(Ordering::SeqCst),
2463 1,
2464 "on_stop should run after stop wakes blocked recv"
2465 );
2466 assert!(handle.is_finished(), "server should finish after stop");
2467
2468 crate::test_complete!("gen_server_stop_wakes_blocked_mailbox_recv");
2469 }
2470
2471 struct ObservableCounter {
2474 count: u64,
2475 final_count: Arc<AtomicU64>,
2476 }
2477
2478 impl GenServer for ObservableCounter {
2479 type Call = CounterCall;
2480 type Reply = u64;
2481 type Cast = CounterCast;
2482 type Info = SystemMsg;
2483
2484 fn handle_call(
2485 &mut self,
2486 _cx: &Cx,
2487 request: CounterCall,
2488 reply: Reply<u64>,
2489 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2490 match request {
2491 CounterCall::Get => {
2492 let _ = reply.send(self.count);
2493 }
2494 CounterCall::Add(n) => {
2495 self.count += n;
2496 let _ = reply.send(self.count);
2497 }
2498 }
2499 Box::pin(async {})
2500 }
2501
2502 fn handle_cast(
2503 &mut self,
2504 _cx: &Cx,
2505 msg: CounterCast,
2506 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2507 match msg {
2508 CounterCast::Reset => self.count = 0,
2509 }
2510 Box::pin(async {})
2511 }
2512
2513 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2514 self.final_count.store(self.count, Ordering::SeqCst);
2515 Box::pin(async {})
2516 }
2517 }
2518
2519 #[test]
2520 fn gen_server_processes_casts_before_stop() {
2521 init_test("gen_server_processes_casts_before_stop");
2522
2523 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2524 let region = runtime.state.create_root_region(Budget::INFINITE);
2525 let cx = Cx::for_testing();
2526 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2527
2528 let final_count = Arc::new(AtomicU64::new(u64::MAX));
2529 let server = ObservableCounter {
2530 count: 0,
2531 final_count: final_count.clone(),
2532 };
2533
2534 let (handle, stored) = scope
2535 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2536 .unwrap();
2537 let task_id = handle.task_id();
2538 runtime.state.store_spawned_task(task_id, stored);
2539
2540 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2542 runtime.run_until_idle();
2543
2544 for _ in 0..5 {
2547 handle.try_cast(CounterCast::Reset).expect("try_cast ok");
2548 }
2549
2550 handle.stop();
2551
2552 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2553 runtime.run_until_quiescent();
2554
2555 assert_eq!(
2557 final_count.load(Ordering::SeqCst),
2558 0,
2559 "on_stop recorded final count"
2560 );
2561
2562 crate::test_complete!("gen_server_processes_casts_before_stop");
2563 }
2564
2565 #[test]
2566 fn gen_server_deterministic_replay() {
2567 fn run_scenario(seed: u64) -> u64 {
2568 let config = crate::lab::LabConfig::new(seed);
2569 let mut runtime = crate::lab::LabRuntime::new(config);
2570 let region = runtime.state.create_root_region(Budget::INFINITE);
2571 let cx = Cx::for_testing();
2572 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2573
2574 let final_count = Arc::new(AtomicU64::new(u64::MAX));
2575 let server = ObservableCounter {
2576 count: 0,
2577 final_count: final_count.clone(),
2578 };
2579
2580 let (handle, stored) = scope
2581 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2582 .unwrap();
2583 let task_id = handle.task_id();
2584 runtime.state.store_spawned_task(task_id, stored);
2585
2586 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2587 runtime.run_until_idle();
2588
2589 for _ in 0..5 {
2591 handle.try_cast(CounterCast::Reset).expect("try_cast ok");
2592 }
2593 handle.stop();
2594
2595 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
2596 runtime.run_until_quiescent();
2597
2598 final_count.load(Ordering::SeqCst)
2599 }
2600
2601 init_test("gen_server_deterministic_replay");
2602
2603 let result1 = run_scenario(0xCAFE_BABE);
2604 let result2 = run_scenario(0xCAFE_BABE);
2605 assert_eq!(result1, result2, "deterministic replay");
2606
2607 crate::test_complete!("gen_server_deterministic_replay");
2608 }
2609
2610 #[derive(Default)]
2613 struct InfoRecorder {
2614 seen: Arc<Mutex<Vec<String>>>,
2615 }
2616
2617 impl GenServer for InfoRecorder {
2618 type Call = ();
2619 type Reply = ();
2620 type Cast = ();
2621 type Info = SystemMsg;
2622
2623 fn handle_call(
2624 &mut self,
2625 _cx: &Cx,
2626 _request: (),
2627 reply: Reply<()>,
2628 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2629 let _ = reply.send(());
2630 Box::pin(async {})
2631 }
2632
2633 fn handle_info(
2634 &mut self,
2635 _cx: &Cx,
2636 msg: Self::Info,
2637 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2638 let seen = Arc::clone(&self.seen);
2639 Box::pin(async move {
2640 seen.lock().expect("lock poisoned").push(format!("{msg:?}"));
2641 })
2642 }
2643 }
2644
2645 fn tid(n: u32) -> TaskId {
2646 TaskId::from_arena(ArenaIndex::new(n, 0))
2647 }
2648
2649 fn rid(n: u32) -> crate::types::RegionId {
2650 crate::types::RegionId::from_arena(ArenaIndex::new(n, 0))
2651 }
2652
2653 #[test]
2656 fn conformance_system_msg_sort_key_orders_shutdown_batch() {
2657 init_test("conformance_system_msg_sort_key_orders_shutdown_batch");
2658
2659 let mut monitors = crate::monitor::MonitorSet::new();
2660 let mref_down_6 = monitors.establish(tid(90), rid(0), tid(6));
2661 let mref_down_3 = monitors.establish(tid(91), rid(0), tid(3));
2662
2663 let mut batch = SystemMsgBatch::new();
2664 batch.push(SystemMsg::Exit {
2665 exit_vt: Time::from_secs(10),
2666 from: tid(6),
2667 reason: DownReason::Normal,
2668 });
2669 batch.push(SystemMsg::Timeout {
2670 tick_vt: Time::from_secs(10),
2671 id: 4,
2672 });
2673 batch.push(SystemMsg::Down {
2674 completion_vt: Time::from_secs(10),
2675 notification: DownNotification {
2676 monitored: tid(6),
2677 reason: DownReason::Normal,
2678 monitor_ref: mref_down_6,
2679 },
2680 });
2681 batch.push(SystemMsg::Timeout {
2682 tick_vt: Time::from_secs(9),
2683 id: 99,
2684 });
2685 batch.push(SystemMsg::Down {
2686 completion_vt: Time::from_secs(10),
2687 notification: DownNotification {
2688 monitored: tid(3),
2689 reason: DownReason::Normal,
2690 monitor_ref: mref_down_3,
2691 },
2692 });
2693 batch.push(SystemMsg::Exit {
2694 exit_vt: Time::from_secs(10),
2695 from: tid(2),
2696 reason: DownReason::Normal,
2697 });
2698 batch.push(SystemMsg::Timeout {
2699 tick_vt: Time::from_secs(10),
2700 id: 1,
2701 });
2702
2703 let sorted = batch.into_sorted();
2704 let keys: Vec<_> = sorted.iter().map(SystemMsg::sort_key).collect();
2705
2706 assert_eq!(
2707 keys,
2708 vec![
2709 (Time::from_secs(9), 2, SystemMsgSubjectKey::TimeoutId(99)),
2710 (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(3))),
2711 (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(6))),
2712 (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(2))),
2713 (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(6))),
2714 (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(1)),
2715 (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(4)),
2716 ],
2717 "shutdown system-message ordering must follow SYS-ORDER"
2718 );
2719
2720 crate::test_complete!("conformance_system_msg_sort_key_orders_shutdown_batch");
2721 }
2722
2723 #[test]
2726 fn conformance_system_msg_batch_matches_explicit_sort() {
2727 init_test("conformance_system_msg_batch_matches_explicit_sort");
2728
2729 let mut monitors = crate::monitor::MonitorSet::new();
2730 let mref = monitors.establish(tid(77), rid(0), tid(8));
2731
2732 let messages = vec![
2733 SystemMsg::Timeout {
2734 tick_vt: Time::from_secs(12),
2735 id: 4,
2736 },
2737 SystemMsg::Exit {
2738 exit_vt: Time::from_secs(11),
2739 from: tid(8),
2740 reason: DownReason::Error("boom".to_string()),
2741 },
2742 SystemMsg::Down {
2743 completion_vt: Time::from_secs(11),
2744 notification: DownNotification {
2745 monitored: tid(8),
2746 reason: DownReason::Normal,
2747 monitor_ref: mref,
2748 },
2749 },
2750 SystemMsg::Timeout {
2751 tick_vt: Time::from_secs(11),
2752 id: 2,
2753 },
2754 ];
2755
2756 let mut batch = SystemMsgBatch::new();
2757 for msg in messages.clone() {
2758 batch.push(msg);
2759 }
2760 let batched = batch.into_sorted();
2761
2762 let mut explicit = messages;
2763 explicit.sort_by_key(SystemMsg::sort_key);
2764
2765 let batched_keys: Vec<_> = batched.iter().map(SystemMsg::sort_key).collect();
2766 let explicit_keys: Vec<_> = explicit.iter().map(SystemMsg::sort_key).collect();
2767 assert_eq!(batched_keys, explicit_keys);
2768
2769 crate::test_complete!("conformance_system_msg_batch_matches_explicit_sort");
2770 }
2771
2772 #[test]
2773 fn system_msg_payload_types_roundtrip_via_conversions() {
2774 init_test("system_msg_payload_types_roundtrip_via_conversions");
2775
2776 let mut monitors = crate::monitor::MonitorSet::new();
2777 let mref = monitors.establish(tid(7), rid(0), tid(8));
2778
2779 let down = DownMsg::new(
2780 Time::from_secs(11),
2781 DownNotification {
2782 monitored: tid(8),
2783 reason: DownReason::Normal,
2784 monitor_ref: mref,
2785 },
2786 );
2787 let down_msg = SystemMsg::down(down.clone());
2788 let down_back = DownMsg::try_from(down_msg).expect("down conversion");
2789 assert_eq!(down_back.completion_vt, down.completion_vt);
2790 assert_eq!(
2791 down_back.notification.monitored,
2792 down.notification.monitored
2793 );
2794 assert_eq!(down_back.notification.reason, down.notification.reason);
2795 assert_eq!(
2796 down_back.notification.monitor_ref,
2797 down.notification.monitor_ref
2798 );
2799
2800 let exit = ExitMsg::new(
2801 Time::from_secs(12),
2802 tid(9),
2803 DownReason::Error("boom".into()),
2804 );
2805 let exit_msg = SystemMsg::exit(exit.clone());
2806 let exit_back = ExitMsg::try_from(exit_msg).expect("exit conversion");
2807 assert_eq!(exit_back, exit);
2808
2809 let timeout = TimeoutMsg::new(Time::from_secs(13), 42);
2810 let timeout_msg = SystemMsg::timeout(timeout);
2811 let timeout_back = TimeoutMsg::try_from(timeout_msg).expect("timeout conversion");
2812 assert_eq!(timeout_back, timeout);
2813
2814 crate::test_complete!("system_msg_payload_types_roundtrip_via_conversions");
2815 }
2816
2817 #[test]
2818 fn system_msg_try_from_mismatch_returns_original_variant() {
2819 init_test("system_msg_try_from_mismatch_returns_original_variant");
2820 let mut monitors = crate::monitor::MonitorSet::new();
2821 let mref = monitors.establish(tid(10), rid(0), tid(1));
2822
2823 let timeout = SystemMsg::Timeout {
2824 tick_vt: Time::from_secs(5),
2825 id: 99,
2826 };
2827 let err = DownMsg::try_from(timeout).expect_err("timeout is not down");
2828 assert!(matches!(err, SystemMsg::Timeout { id: 99, .. }));
2829
2830 let down = SystemMsg::Down {
2831 completion_vt: Time::from_secs(6),
2832 notification: DownNotification {
2833 monitored: tid(1),
2834 reason: DownReason::Normal,
2835 monitor_ref: mref,
2836 },
2837 };
2838 let err = TimeoutMsg::try_from(down).expect_err("down is not timeout");
2839 assert!(matches!(err, SystemMsg::Down { .. }));
2840
2841 crate::test_complete!("system_msg_try_from_mismatch_returns_original_variant");
2842 }
2843
2844 #[test]
2845 fn gen_server_handle_info_receives_system_messages() {
2846 init_test("gen_server_handle_info_receives_system_messages");
2847
2848 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2849 let region = runtime.state.create_root_region(Budget::INFINITE);
2850 let cx = Cx::for_testing();
2851 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2852
2853 let seen: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
2854 let server = InfoRecorder {
2855 seen: Arc::clone(&seen),
2856 };
2857
2858 let (handle, stored) = scope
2859 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2860 .unwrap();
2861 let server_task_id = handle.task_id();
2862 runtime.state.store_spawned_task(server_task_id, stored);
2863
2864 let mut monitors = crate::monitor::MonitorSet::new();
2865 let mref = monitors.establish(tid(10), rid(0), tid(11));
2866
2867 handle
2868 .try_info(SystemMsg::Down {
2869 completion_vt: Time::from_secs(5),
2870 notification: DownNotification {
2871 monitored: tid(11),
2872 reason: DownReason::Normal,
2873 monitor_ref: mref,
2874 },
2875 })
2876 .unwrap();
2877
2878 handle
2879 .try_info(SystemMsg::Exit {
2880 exit_vt: Time::from_secs(6),
2881 from: tid(12),
2882 reason: DownReason::Error("boom".into()),
2883 })
2884 .unwrap();
2885
2886 handle
2887 .try_info(SystemMsg::Timeout {
2888 tick_vt: Time::from_secs(7),
2889 id: 123,
2890 })
2891 .unwrap();
2892
2893 drop(handle);
2894
2895 runtime
2896 .scheduler
2897 .lock()
2898 .unwrap()
2899 .schedule(server_task_id, 0);
2900 runtime.run_until_quiescent();
2901
2902 let seen = seen.lock().expect("lock poisoned");
2903 assert_eq!(seen.len(), 3);
2904 assert!(seen[0].contains("Down"));
2905 assert!(seen[1].contains("Exit"));
2906 assert!(seen[2].contains("Timeout"));
2907 drop(seen);
2908
2909 crate::test_complete!("gen_server_handle_info_receives_system_messages");
2910 }
2911
2912 #[test]
2913 fn gen_server_info_ordering_is_deterministic_for_seed() {
2914 fn run_scenario(seed: u64) -> Vec<String> {
2915 let config = crate::lab::LabConfig::new(seed);
2916 let mut runtime = crate::lab::LabRuntime::new(config);
2917 let region = runtime.state.create_root_region(Budget::INFINITE);
2918 let cx = Cx::for_testing();
2919 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2920
2921 let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
2922 let server = InfoRecorder {
2923 seen: Arc::clone(&events),
2924 };
2925
2926 let (handle, stored) = scope
2927 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
2928 .unwrap();
2929 let server_task_id = handle.task_id();
2930 runtime.state.store_spawned_task(server_task_id, stored);
2931
2932 let server_ref = handle.server_ref();
2933
2934 let (client_a, stored_a) = scope
2935 .spawn(&mut runtime.state, &cx, move |cx| async move {
2936 server_ref
2937 .info(
2938 &cx,
2939 SystemMsg::Timeout {
2940 tick_vt: Time::from_secs(2),
2941 id: 1,
2942 },
2943 )
2944 .await
2945 .unwrap();
2946 })
2947 .unwrap();
2948 let task_id_a = client_a.task_id();
2949 runtime.state.store_spawned_task(task_id_a, stored_a);
2950
2951 let server_ref_b = handle.server_ref();
2952 let (client_b, stored_b) = scope
2953 .spawn(&mut runtime.state, &cx, move |cx| async move {
2954 server_ref_b
2955 .info(
2956 &cx,
2957 SystemMsg::Timeout {
2958 tick_vt: Time::from_secs(2),
2959 id: 2,
2960 },
2961 )
2962 .await
2963 .unwrap();
2964 })
2965 .unwrap();
2966 let task_id_b = client_b.task_id();
2967 runtime.state.store_spawned_task(task_id_b, stored_b);
2968
2969 runtime.scheduler.lock().unwrap().schedule(task_id_a, 0);
2971 runtime.scheduler.lock().unwrap().schedule(task_id_b, 0);
2972 runtime
2973 .scheduler
2974 .lock()
2975 .unwrap()
2976 .schedule(server_task_id, 0);
2977
2978 runtime.run_until_quiescent();
2979 drop(handle);
2980 runtime
2981 .scheduler
2982 .lock()
2983 .unwrap()
2984 .schedule(server_task_id, 0);
2985 runtime.run_until_quiescent();
2986
2987 let out = events.lock().expect("lock poisoned").clone();
2988 out
2989 }
2990
2991 init_test("gen_server_info_ordering_is_deterministic_for_seed");
2992
2993 let a = run_scenario(0xD00D_F00D);
2994 let b = run_scenario(0xD00D_F00D);
2995 assert_eq!(
2996 a, b,
2997 "system/info ordering must be deterministic for same seed"
2998 );
2999
3000 crate::test_complete!("gen_server_info_ordering_is_deterministic_for_seed");
3001 }
3002
3003 struct DropOldestCounter {
3007 count: u64,
3008 }
3009
3010 #[derive(Debug, Clone)]
3012 enum TaggedCast {
3013 Set(u64),
3014 }
3015
3016 impl GenServer for DropOldestCounter {
3017 type Call = CounterCall;
3018 type Reply = u64;
3019 type Cast = TaggedCast;
3020 type Info = SystemMsg;
3021
3022 fn cast_overflow_policy(&self) -> CastOverflowPolicy {
3023 CastOverflowPolicy::DropOldest
3024 }
3025
3026 fn handle_call(
3027 &mut self,
3028 _cx: &Cx,
3029 request: CounterCall,
3030 reply: Reply<u64>,
3031 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3032 match request {
3033 CounterCall::Get => {
3034 let _ = reply.send(self.count);
3035 }
3036 CounterCall::Add(n) => {
3037 self.count += n;
3038 let _ = reply.send(self.count);
3039 }
3040 }
3041 Box::pin(async {})
3042 }
3043
3044 fn handle_cast(
3045 &mut self,
3046 _cx: &Cx,
3047 msg: TaggedCast,
3048 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3049 match msg {
3050 TaggedCast::Set(v) => self.count = v,
3051 }
3052 Box::pin(async {})
3053 }
3054 }
3055
3056 #[test]
3057 fn gen_server_drop_oldest_policy_accessor() {
3058 init_test("gen_server_drop_oldest_policy_accessor");
3059
3060 let mut state = RuntimeState::new();
3061 let root = state.create_root_region(Budget::INFINITE);
3062 let cx = Cx::for_testing();
3063 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3064
3065 let (handle, stored) = scope
3066 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 4)
3067 .unwrap();
3068 state.store_spawned_task(handle.task_id(), stored);
3069
3070 assert_eq!(
3071 handle.cast_overflow_policy(),
3072 CastOverflowPolicy::DropOldest
3073 );
3074
3075 let server_ref = handle.server_ref();
3076 assert_eq!(
3077 server_ref.cast_overflow_policy(),
3078 CastOverflowPolicy::DropOldest
3079 );
3080
3081 crate::test_complete!("gen_server_drop_oldest_policy_accessor");
3082 }
3083
3084 #[test]
3085 fn gen_server_drop_oldest_evicts_when_full() {
3086 init_test("gen_server_drop_oldest_evicts_when_full");
3087
3088 let mut state = RuntimeState::new();
3089 let root = state.create_root_region(Budget::INFINITE);
3090 let cx = Cx::for_testing();
3091 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3092
3093 let (handle, stored) = scope
3095 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3096 .unwrap();
3097 state.store_spawned_task(handle.task_id(), stored);
3098
3099 handle.try_cast(TaggedCast::Set(10)).unwrap();
3101 handle.try_cast(TaggedCast::Set(20)).unwrap();
3102
3103 handle.try_cast(TaggedCast::Set(30)).unwrap();
3105
3106 handle.try_cast(TaggedCast::Set(40)).unwrap();
3108
3109 crate::test_complete!("gen_server_drop_oldest_evicts_when_full");
3110 }
3111
3112 #[test]
3113 fn gen_server_reject_policy_returns_full() {
3114 init_test("gen_server_reject_policy_returns_full");
3115
3116 let mut state = RuntimeState::new();
3117 let root = state.create_root_region(Budget::INFINITE);
3118 let cx = Cx::for_testing();
3119 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3120
3121 let (handle, stored) = scope
3123 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3124 .unwrap();
3125 state.store_spawned_task(handle.task_id(), stored);
3126
3127 assert_eq!(handle.cast_overflow_policy(), CastOverflowPolicy::Reject);
3128
3129 handle.try_cast(CounterCast::Reset).unwrap();
3131 handle.try_cast(CounterCast::Reset).unwrap();
3132
3133 let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3135 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3136
3137 crate::test_complete!("gen_server_reject_policy_returns_full");
3138 }
3139
3140 #[test]
3141 fn gen_server_drop_oldest_ref_also_evicts() {
3142 init_test("gen_server_drop_oldest_ref_also_evicts");
3143
3144 let mut state = RuntimeState::new();
3145 let root = state.create_root_region(Budget::INFINITE);
3146 let cx = Cx::for_testing();
3147 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3148
3149 let (handle, stored) = scope
3150 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3151 .unwrap();
3152 state.store_spawned_task(handle.task_id(), stored);
3153
3154 let server_ref = handle.server_ref();
3155
3156 server_ref.try_cast(TaggedCast::Set(1)).unwrap();
3158 server_ref.try_cast(TaggedCast::Set(2)).unwrap();
3159
3160 server_ref.try_cast(TaggedCast::Set(3)).unwrap();
3162
3163 crate::test_complete!("gen_server_drop_oldest_ref_also_evicts");
3164 }
3165
3166 #[test]
3170 fn gen_server_drop_oldest_evicting_call_aborts_obligation() {
3171 init_test("gen_server_drop_oldest_evicting_call_aborts_obligation");
3172
3173 let mut state = RuntimeState::new();
3174 let root = state.create_root_region(Budget::INFINITE);
3175 let cx = Cx::for_testing();
3176 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3177
3178 let (handle, stored) = scope
3180 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3181 .unwrap();
3182 state.store_spawned_task(handle.task_id(), stored);
3183
3184 let (reply_tx, _reply_rx) = session::tracked_oneshot::<u64>();
3186 let reply_permit = reply_tx.reserve(&cx);
3187 let call_envelope: Envelope<DropOldestCounter> = Envelope::Call {
3188 request: CounterCall::Get,
3189 reply_permit,
3190 };
3191 handle.sender.try_send(call_envelope).unwrap();
3192
3193 handle.try_cast(TaggedCast::Set(99)).unwrap();
3196
3197 crate::test_complete!("gen_server_drop_oldest_evicting_call_aborts_obligation");
3198 }
3199
3200 #[test]
3201 fn gen_server_default_overflow_policy_is_reject() {
3202 init_test("gen_server_default_overflow_policy_is_reject");
3203
3204 assert_eq!(CastOverflowPolicy::default(), CastOverflowPolicy::Reject);
3205
3206 let counter = Counter { count: 0 };
3208 assert_eq!(counter.cast_overflow_policy(), CastOverflowPolicy::Reject);
3209
3210 crate::test_complete!("gen_server_default_overflow_policy_is_reject");
3211 }
3212
3213 #[test]
3214 fn reply_debug_format() {
3215 init_test("reply_debug_format");
3216
3217 let cx = Cx::for_testing();
3218 let (tx, _rx) = session::tracked_oneshot::<u64>();
3219 let permit = tx.reserve(&cx);
3220 let reply = Reply::new(&cx, permit);
3221 let debug_str = format!("{reply:?}");
3222 assert!(debug_str.contains("Reply"));
3223 assert!(debug_str.contains("pending"));
3224
3225 let _ = reply.send(42);
3227
3228 crate::test_complete!("reply_debug_format");
3229 }
3230
3231 #[test]
3232 fn gen_server_on_start_budget_priority_applied_and_restored() {
3233 init_test("gen_server_on_start_budget_priority_applied_and_restored");
3234
3235 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3236 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3237 let region = runtime.state.create_root_region(budget);
3238 let cx = Cx::for_testing();
3239 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3240
3241 let started_priority = Arc::new(AtomicU8::new(0));
3242 let loop_priority = Arc::new(AtomicU8::new(0));
3243 let server = StartBudgetProbe {
3244 started_priority: Arc::clone(&started_priority),
3245 loop_priority: Arc::clone(&loop_priority),
3246 };
3247
3248 let (handle, stored) = scope
3249 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3250 .unwrap();
3251 let server_task_id = handle.task_id();
3252 runtime.state.store_spawned_task(server_task_id, stored);
3253
3254 let server_ref = handle.server_ref();
3255 let (client, stored_client) = scope
3256 .spawn(&mut runtime.state, &cx, move |cx| async move {
3257 let p = server_ref.call(&cx, CounterCall::Get).await.unwrap();
3258 assert_eq!(p, 10);
3259 })
3260 .unwrap();
3261 let client_task_id = client.task_id();
3262 runtime
3263 .state
3264 .store_spawned_task(client_task_id, stored_client);
3265
3266 runtime
3267 .scheduler
3268 .lock()
3269 .unwrap()
3270 .schedule(server_task_id, 0);
3271 runtime
3272 .scheduler
3273 .lock()
3274 .unwrap()
3275 .schedule(client_task_id, 0);
3276 runtime.run_until_quiescent();
3277
3278 assert_eq!(started_priority.load(Ordering::SeqCst), 200);
3279 assert_eq!(loop_priority.load(Ordering::SeqCst), 10);
3280
3281 drop(handle);
3282 runtime
3283 .scheduler
3284 .lock()
3285 .unwrap()
3286 .schedule(server_task_id, 0);
3287 runtime.run_until_quiescent();
3288
3289 crate::test_complete!("gen_server_on_start_budget_priority_applied_and_restored");
3290 }
3291
3292 #[test]
3293 fn gen_server_on_stop_runs_masked_under_stop() {
3294 init_test("gen_server_on_stop_runs_masked_under_stop");
3295
3296 let budget = Budget::new().with_poll_quota(10_000);
3297 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3298 let region = runtime.state.create_root_region(budget);
3299 let cx = Cx::for_testing();
3300 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3301
3302 let stop_checkpoint_ok = Arc::new(AtomicU8::new(0));
3303 let server = StopMaskProbe {
3304 stop_checkpoint_ok: Arc::clone(&stop_checkpoint_ok),
3305 };
3306
3307 let (handle, stored) = scope
3308 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3309 .unwrap();
3310 let server_task_id = handle.task_id();
3311 runtime.state.store_spawned_task(server_task_id, stored);
3312
3313 handle.stop();
3315
3316 runtime
3317 .scheduler
3318 .lock()
3319 .unwrap()
3320 .schedule(server_task_id, 0);
3321 runtime.run_until_quiescent();
3322
3323 assert_eq!(stop_checkpoint_ok.load(Ordering::SeqCst), 1);
3324
3325 crate::test_complete!("gen_server_on_stop_runs_masked_under_stop");
3326 }
3327
3328 #[test]
3332 fn cast_drop_oldest_emits_trace_on_eviction() {
3333 init_test("cast_drop_oldest_emits_trace_on_eviction");
3334
3335 let budget = Budget::new().with_poll_quota(10_000);
3336 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3337 let region = runtime.state.create_root_region(budget);
3338 let cx = Cx::for_testing();
3339 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3340
3341 let (handle, stored) = scope
3343 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3344 .unwrap();
3345 let task_id = handle.task_id();
3346 runtime.state.store_spawned_task(task_id, stored);
3347
3348 runtime.scheduler.lock().unwrap().schedule(task_id, 0);
3350 runtime.run_until_idle();
3351
3352 handle.try_cast(TaggedCast::Set(1)).unwrap();
3354 handle.try_cast(TaggedCast::Set(2)).unwrap();
3356
3357 crate::test_complete!("cast_drop_oldest_emits_trace_on_eviction");
3360 }
3361
3362 #[test]
3364 fn cast_to_stopped_server_returns_error() {
3365 init_test("cast_to_stopped_server_returns_error");
3366
3367 let mut state = RuntimeState::new();
3368 let root = state.create_root_region(Budget::INFINITE);
3369 let cx = Cx::for_testing();
3370 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3371
3372 let (handle, stored) = scope
3373 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 4)
3374 .unwrap();
3375 state.store_spawned_task(handle.task_id(), stored);
3376
3377 handle.stop();
3379
3380 let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3382 assert!(
3383 matches!(err, CastError::ServerStopped),
3384 "expected ServerStopped, got {err:?}"
3385 );
3386
3387 crate::test_complete!("cast_to_stopped_server_returns_error");
3388 }
3389
3390 #[test]
3392 fn cast_overflow_policy_display() {
3393 init_test("cast_overflow_policy_display");
3394
3395 assert_eq!(format!("{}", CastOverflowPolicy::Reject), "Reject");
3396 assert_eq!(format!("{}", CastOverflowPolicy::DropOldest), "DropOldest");
3397
3398 crate::test_complete!("cast_overflow_policy_display");
3399 }
3400
3401 #[test]
3403 fn cast_ref_reject_returns_full() {
3404 init_test("cast_ref_reject_returns_full");
3405
3406 let mut state = RuntimeState::new();
3407 let root = state.create_root_region(Budget::INFINITE);
3408 let cx = Cx::for_testing();
3409 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3410
3411 let (handle, stored) = scope
3412 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3413 .unwrap();
3414 state.store_spawned_task(handle.task_id(), stored);
3415
3416 let server_ref = handle.server_ref();
3417
3418 server_ref.try_cast(CounterCast::Reset).unwrap();
3420 server_ref.try_cast(CounterCast::Reset).unwrap();
3421
3422 let err = server_ref.try_cast(CounterCast::Reset).unwrap_err();
3424 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3425
3426 crate::test_complete!("cast_ref_reject_returns_full");
3427 }
3428
3429 #[test]
3431 fn cast_drop_oldest_ref_capacity_one() {
3432 init_test("cast_drop_oldest_ref_capacity_one");
3433
3434 let mut state = RuntimeState::new();
3435 let root = state.create_root_region(Budget::INFINITE);
3436 let cx = Cx::for_testing();
3437 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3438
3439 let (handle, stored) = scope
3440 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3441 .unwrap();
3442 state.store_spawned_task(handle.task_id(), stored);
3443
3444 let server_ref = handle.server_ref();
3445
3446 server_ref.try_cast(TaggedCast::Set(100)).unwrap();
3448 server_ref.try_cast(TaggedCast::Set(200)).unwrap();
3450 server_ref.try_cast(TaggedCast::Set(300)).unwrap();
3451
3452 crate::test_complete!("cast_drop_oldest_ref_capacity_one");
3453 }
3454
3455 #[test]
3458 fn init_default_budget_is_infinite() {
3459 init_test("init_default_budget_is_infinite");
3460 let counter = Counter { count: 0 };
3461 assert_eq!(counter.on_start_budget(), Budget::INFINITE);
3462 crate::test_complete!("init_default_budget_is_infinite");
3463 }
3464
3465 #[test]
3466 fn terminate_default_budget_is_minimal() {
3467 init_test("terminate_default_budget_is_minimal");
3468 let counter = Counter { count: 0 };
3469 assert_eq!(counter.on_stop_budget(), Budget::MINIMAL);
3470 crate::test_complete!("terminate_default_budget_is_minimal");
3471 }
3472
3473 #[test]
3476 fn init_skipped_when_pre_cancelled_but_stop_runs() {
3477 #[allow(clippy::items_after_statements)]
3478 struct LifecycleProbe {
3479 init_ran: Arc<AtomicU8>,
3480 stop_ran: Arc<AtomicU8>,
3481 }
3482
3483 #[allow(clippy::items_after_statements)]
3484 impl GenServer for LifecycleProbe {
3485 type Call = CounterCall;
3486 type Reply = u64;
3487 type Cast = CounterCast;
3488 type Info = SystemMsg;
3489
3490 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3491 self.init_ran.store(1, Ordering::SeqCst);
3492 Box::pin(async {})
3493 }
3494
3495 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3496 self.stop_ran.store(1, Ordering::SeqCst);
3497 Box::pin(async {})
3498 }
3499
3500 fn handle_call(
3501 &mut self,
3502 _cx: &Cx,
3503 _request: CounterCall,
3504 reply: Reply<u64>,
3505 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3506 let _ = reply.send(0);
3507 Box::pin(async {})
3508 }
3509 }
3510
3511 init_test("init_skipped_when_pre_cancelled_but_stop_runs");
3512
3513 let budget = Budget::new().with_poll_quota(10_000);
3514 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3515 let region = runtime.state.create_root_region(budget);
3516 let cx = Cx::for_testing();
3517 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3518
3519 let init_ran = Arc::new(AtomicU8::new(0));
3520 let stop_ran = Arc::new(AtomicU8::new(0));
3521
3522 let server = LifecycleProbe {
3523 init_ran: Arc::clone(&init_ran),
3524 stop_ran: Arc::clone(&stop_ran),
3525 };
3526
3527 let (handle, stored) = scope
3528 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3529 .unwrap();
3530 let server_task_id = handle.task_id();
3531 runtime.state.store_spawned_task(server_task_id, stored);
3532
3533 handle.stop();
3535
3536 runtime
3537 .scheduler
3538 .lock()
3539 .unwrap()
3540 .schedule(server_task_id, 0);
3541 runtime.run_until_quiescent();
3542
3543 assert_eq!(
3545 init_ran.load(Ordering::SeqCst),
3546 0,
3547 "init should be skipped when pre-cancelled"
3548 );
3549 assert_eq!(
3551 stop_ran.load(Ordering::SeqCst),
3552 1,
3553 "stop must run even when pre-cancelled"
3554 );
3555
3556 crate::test_complete!("init_skipped_when_pre_cancelled_but_stop_runs");
3557 }
3558
3559 #[test]
3562 fn init_budget_consumption_propagates_to_main_budget() {
3563 #[allow(clippy::items_after_statements)]
3564 struct BudgetCheckProbe {
3565 loop_quota: Arc<AtomicU64>,
3566 }
3567
3568 #[allow(clippy::items_after_statements)]
3569 impl GenServer for BudgetCheckProbe {
3570 type Call = CounterCall;
3571 type Reply = u64;
3572 type Cast = CounterCast;
3573 type Info = SystemMsg;
3574
3575 fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3576 let _ = cx.budget();
3580 Box::pin(async {})
3581 }
3582
3583 fn on_start_budget(&self) -> Budget {
3584 Budget::new().with_poll_quota(50).with_priority(200)
3586 }
3587
3588 fn handle_call(
3589 &mut self,
3590 cx: &Cx,
3591 _request: CounterCall,
3592 reply: Reply<u64>,
3593 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3594 self.loop_quota
3596 .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
3597 let _ = reply.send(0);
3598 Box::pin(async {})
3599 }
3600 }
3601
3602 init_test("init_budget_consumption_propagates_to_main_budget");
3603
3604 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3605 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3606 let region = runtime.state.create_root_region(budget);
3607 let cx = Cx::for_testing();
3608 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3609
3610 let loop_quota = Arc::new(AtomicU64::new(0));
3611
3612 let server = BudgetCheckProbe {
3613 loop_quota: Arc::clone(&loop_quota),
3614 };
3615
3616 let (handle, stored) = scope
3617 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3618 .unwrap();
3619 let server_task_id = handle.task_id();
3620 runtime.state.store_spawned_task(server_task_id, stored);
3621
3622 let server_ref = handle.server_ref();
3623 let (client, stored_client) = scope
3624 .spawn(&mut runtime.state, &cx, move |cx| async move {
3625 let _ = server_ref.call(&cx, CounterCall::Get).await;
3626 })
3627 .unwrap();
3628 let client_task_id = client.task_id();
3629 runtime
3630 .state
3631 .store_spawned_task(client_task_id, stored_client);
3632
3633 runtime
3634 .scheduler
3635 .lock()
3636 .unwrap()
3637 .schedule(server_task_id, 0);
3638 runtime
3639 .scheduler
3640 .lock()
3641 .unwrap()
3642 .schedule(client_task_id, 0);
3643 runtime.run_until_quiescent();
3644
3645 let remaining = loop_quota.load(Ordering::SeqCst);
3648 assert!(
3649 remaining <= 10_000,
3650 "main budget after init must be <= original ({remaining} <= 10000)"
3651 );
3652 assert!(
3653 remaining > 0,
3654 "main budget should still have polls remaining"
3655 );
3656
3657 drop(handle);
3658 runtime
3659 .scheduler
3660 .lock()
3661 .unwrap()
3662 .schedule(server_task_id, 0);
3663 runtime.run_until_quiescent();
3664
3665 crate::test_complete!("init_budget_consumption_propagates_to_main_budget");
3666 }
3667
3668 #[test]
3670 fn stop_budget_constrains_stop_phase() {
3671 struct StopBudgetProbe {
3672 stop_poll_quota: Arc<AtomicU64>,
3673 }
3674
3675 impl GenServer for StopBudgetProbe {
3676 type Call = CounterCall;
3677 type Reply = u64;
3678 type Cast = CounterCast;
3679 type Info = SystemMsg;
3680
3681 fn on_stop_budget(&self) -> Budget {
3682 Budget::new().with_poll_quota(42).with_priority(250)
3683 }
3684
3685 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3686 self.stop_poll_quota
3687 .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
3688 Box::pin(async {})
3689 }
3690
3691 fn handle_call(
3692 &mut self,
3693 _cx: &Cx,
3694 _request: CounterCall,
3695 reply: Reply<u64>,
3696 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3697 let _ = reply.send(0);
3698 Box::pin(async {})
3699 }
3700 }
3701
3702 init_test("stop_budget_constrains_stop_phase");
3703
3704 let budget = Budget::new().with_poll_quota(10_000);
3705 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3706 let region = runtime.state.create_root_region(budget);
3707 let cx = Cx::for_testing();
3708 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3709
3710 let stop_poll_quota = Arc::new(AtomicU64::new(0));
3711
3712 let server = StopBudgetProbe {
3713 stop_poll_quota: Arc::clone(&stop_poll_quota),
3714 };
3715
3716 let (handle, stored) = scope
3717 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3718 .unwrap();
3719 let server_task_id = handle.task_id();
3720 runtime.state.store_spawned_task(server_task_id, stored);
3721
3722 handle.stop();
3724
3725 runtime
3726 .scheduler
3727 .lock()
3728 .unwrap()
3729 .schedule(server_task_id, 0);
3730 runtime.run_until_quiescent();
3731
3732 let stop_quota = stop_poll_quota.load(Ordering::SeqCst);
3733 assert_eq!(stop_quota, 42, "stop phase should use the tighter budget");
3736
3737 crate::test_complete!("stop_budget_constrains_stop_phase");
3738 }
3739
3740 #[test]
3743 fn lifecycle_init_before_stop() {
3744 #[allow(clippy::items_after_statements)]
3745 struct PhaseTracker {
3746 phases: Arc<Mutex<Vec<&'static str>>>,
3747 }
3748
3749 #[allow(clippy::items_after_statements)]
3750 impl GenServer for PhaseTracker {
3751 type Call = CounterCall;
3752 type Reply = u64;
3753 type Cast = CounterCast;
3754 type Info = SystemMsg;
3755
3756 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3757 self.phases.lock().unwrap().push("init");
3758 Box::pin(async {})
3759 }
3760
3761 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3762 self.phases.lock().unwrap().push("stop");
3763 Box::pin(async {})
3764 }
3765
3766 fn handle_call(
3767 &mut self,
3768 _cx: &Cx,
3769 _request: CounterCall,
3770 reply: Reply<u64>,
3771 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3772 let _ = reply.send(0);
3773 Box::pin(async {})
3774 }
3775
3776 fn handle_cast(
3777 &mut self,
3778 _cx: &Cx,
3779 _msg: CounterCast,
3780 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3781 Box::pin(async {})
3782 }
3783 }
3784
3785 init_test("lifecycle_init_before_stop");
3786
3787 let budget = Budget::new().with_poll_quota(10_000);
3788 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3789 let region = runtime.state.create_root_region(budget);
3790 let cx = Cx::for_testing();
3791 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3792
3793 let phases = Arc::new(Mutex::new(Vec::<&'static str>::new()));
3794
3795 let server = PhaseTracker {
3796 phases: Arc::clone(&phases),
3797 };
3798
3799 let (handle, stored) = scope
3800 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3801 .unwrap();
3802 let server_task_id = handle.task_id();
3803 runtime.state.store_spawned_task(server_task_id, stored);
3804
3805 runtime
3807 .scheduler
3808 .lock()
3809 .unwrap()
3810 .schedule(server_task_id, 0);
3811 runtime.run_until_idle();
3812
3813 let phases_clone = Arc::clone(&phases);
3815 handle.stop();
3816 runtime
3817 .scheduler
3818 .lock()
3819 .unwrap()
3820 .schedule(server_task_id, 0);
3821 runtime.run_until_idle();
3822
3823 {
3824 let recorded = phases_clone.lock().unwrap();
3825
3826 assert!(
3828 recorded.contains(&"stop"),
3829 "stop phase must run, got {:?}",
3830 *recorded
3831 );
3832
3833 if let Some(init_pos) = recorded.iter().position(|p| *p == "init") {
3835 let stop_pos = recorded.iter().position(|p| *p == "stop").unwrap();
3836 assert!(
3837 init_pos < stop_pos,
3838 "init must precede stop, got {:?}",
3839 *recorded
3840 );
3841 }
3842
3843 drop(recorded);
3844 }
3845
3846 crate::test_complete!("lifecycle_init_before_stop");
3847 }
3848
3849 #[test]
3852 fn stop_budget_priority_applied() {
3853 #[allow(clippy::items_after_statements)]
3854 struct StopPriorityProbe {
3855 stop_priority: Arc<AtomicU8>,
3856 }
3857
3858 #[allow(clippy::items_after_statements)]
3859 impl GenServer for StopPriorityProbe {
3860 type Call = CounterCall;
3861 type Reply = u64;
3862 type Cast = CounterCast;
3863 type Info = SystemMsg;
3864
3865 fn on_stop_budget(&self) -> Budget {
3866 Budget::new().with_poll_quota(200).with_priority(240)
3867 }
3868
3869 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3870 self.stop_priority
3871 .store(cx.budget().priority, Ordering::SeqCst);
3872 Box::pin(async {})
3873 }
3874
3875 fn handle_call(
3876 &mut self,
3877 _cx: &Cx,
3878 _request: CounterCall,
3879 reply: Reply<u64>,
3880 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3881 let _ = reply.send(0);
3882 Box::pin(async {})
3883 }
3884 }
3885
3886 init_test("stop_budget_priority_applied");
3887
3888 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
3889 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3890 let region = runtime.state.create_root_region(budget);
3891 let cx = Cx::for_testing();
3892 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3893
3894 let stop_priority = Arc::new(AtomicU8::new(0));
3895
3896 let server = StopPriorityProbe {
3897 stop_priority: Arc::clone(&stop_priority),
3898 };
3899
3900 let (handle, stored) = scope
3901 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3902 .unwrap();
3903 let server_task_id = handle.task_id();
3904 runtime.state.store_spawned_task(server_task_id, stored);
3905
3906 handle.stop();
3907
3908 runtime
3909 .scheduler
3910 .lock()
3911 .unwrap()
3912 .schedule(server_task_id, 0);
3913 runtime.run_until_quiescent();
3914
3915 let actual_priority = stop_priority.load(Ordering::SeqCst);
3918 assert!(
3919 actual_priority >= 10,
3920 "stop priority should be at least original ({actual_priority} >= 10)"
3921 );
3922
3923 crate::test_complete!("stop_budget_priority_applied");
3924 }
3925
3926 #[test]
3940 fn conformance_cancel_propagation_to_queued_calls() {
3941 init_test("conformance_cancel_propagation_to_queued_calls");
3942
3943 let budget = Budget::new().with_poll_quota(50_000);
3944 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3945 let region = runtime.state.create_root_region(budget);
3946 let cx = Cx::for_testing();
3947 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
3948
3949 let (handle, stored) = scope
3951 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
3952 .unwrap();
3953 let server_task_id = handle.task_id();
3954 runtime.state.store_spawned_task(server_task_id, stored);
3955
3956 let server_ref_1 = handle.server_ref();
3957 let server_ref_2 = handle.server_ref();
3958
3959 let result_1: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
3961 let result_1_clone = Arc::clone(&result_1);
3962 let (c1_handle, c1_stored) = scope
3963 .spawn(&mut runtime.state, &cx, move |cx| async move {
3964 let r = server_ref_1.call(&cx, CounterCall::Add(10)).await;
3965 *result_1_clone.lock().unwrap() = Some(r);
3966 })
3967 .unwrap();
3968 let c1_id = c1_handle.task_id();
3969 runtime.state.store_spawned_task(c1_id, c1_stored);
3970
3971 let result_2: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
3973 let result_2_clone = Arc::clone(&result_2);
3974 let (c2_handle, c2_stored) = scope
3975 .spawn(&mut runtime.state, &cx, move |cx| async move {
3976 let r = server_ref_2.call(&cx, CounterCall::Add(20)).await;
3977 *result_2_clone.lock().unwrap() = Some(r);
3978 })
3979 .unwrap();
3980 let c2_id = c2_handle.task_id();
3981 runtime.state.store_spawned_task(c2_id, c2_stored);
3982
3983 {
3985 let mut sched = runtime.scheduler.lock().unwrap();
3986 sched.schedule(server_task_id, 0);
3987 sched.schedule(c1_id, 0);
3988 sched.schedule(c2_id, 0);
3989 }
3990 runtime.run_until_idle();
3991
3992 handle.stop();
3994 {
3995 let mut sched = runtime.scheduler.lock().unwrap();
3996 sched.schedule(server_task_id, 0);
3997 sched.schedule(c1_id, 0);
3998 sched.schedule(c2_id, 0);
3999 }
4000 runtime.run_until_quiescent();
4001
4002 drop(result_2.lock().unwrap());
4007
4008 crate::test_complete!("conformance_cancel_propagation_to_queued_calls");
4009 }
4010
4011 #[test]
4013 fn conformance_stopped_server_rejects_new_messages() {
4014 init_test("conformance_stopped_server_rejects_new_messages");
4015
4016 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4017 let region = runtime.state.create_root_region(Budget::INFINITE);
4018 let cx = Cx::for_testing();
4019 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4020
4021 let (handle, stored) = scope
4022 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4023 .unwrap();
4024 let server_task_id = handle.task_id();
4025 runtime.state.store_spawned_task(server_task_id, stored);
4026
4027 let server_ref = handle.server_ref();
4028
4029 runtime
4031 .scheduler
4032 .lock()
4033 .unwrap()
4034 .schedule(server_task_id, 0);
4035 runtime.run_until_idle();
4036
4037 handle.stop();
4039 runtime
4040 .scheduler
4041 .lock()
4042 .unwrap()
4043 .schedule(server_task_id, 0);
4044 runtime.run_until_quiescent();
4045
4046 let cast_result = server_ref.try_cast(CounterCast::Reset);
4048 assert!(cast_result.is_err(), "cast to stopped server must fail");
4049
4050 crate::test_complete!("conformance_stopped_server_rejects_new_messages");
4051 }
4052
4053 #[test]
4057 fn conformance_full_lifecycle_no_obligation_leaks() {
4058 init_test("conformance_full_lifecycle_no_obligation_leaks");
4059
4060 let budget = Budget::new().with_poll_quota(100_000);
4061 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4062 let region = runtime.state.create_root_region(budget);
4063 let cx = Cx::for_testing();
4064 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4065
4066 let (handle, stored) = scope
4067 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4068 .unwrap();
4069 let server_task_id = handle.task_id();
4070 runtime.state.store_spawned_task(server_task_id, stored);
4071
4072 let server_ref = handle.server_ref();
4073
4074 server_ref.try_cast(CounterCast::Reset).unwrap();
4076
4077 let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4078 let call_result_clone = Arc::clone(&call_result);
4079 let server_ref_for_call = handle.server_ref();
4080 let (client, client_stored) = scope
4081 .spawn(&mut runtime.state, &cx, move |cx| async move {
4082 let r = server_ref_for_call.call(&cx, CounterCall::Add(42)).await;
4083 *call_result_clone.lock().unwrap() = Some(r);
4084 })
4085 .unwrap();
4086 let client_id = client.task_id();
4087 runtime.state.store_spawned_task(client_id, client_stored);
4088
4089 {
4091 let mut sched = runtime.scheduler.lock().unwrap();
4092 sched.schedule(server_task_id, 0);
4093 sched.schedule(client_id, 0);
4094 }
4095 runtime.run_until_idle();
4096
4097 {
4099 let mut sched = runtime.scheduler.lock().unwrap();
4100 sched.schedule(server_task_id, 0);
4101 sched.schedule(client_id, 0);
4102 }
4103 runtime.run_until_idle();
4104
4105 let call_r = call_result.lock().unwrap();
4107 if let Some(ref r) = *call_r {
4108 match r {
4109 Ok(value) => assert_eq!(*value, 42, "counter should be 42 after Add(42)"),
4110 Err(e) => unreachable!("unexpected call error: {e:?}"),
4111 }
4112 }
4113 drop(call_r);
4114
4115 server_ref.try_cast(CounterCast::Reset).unwrap();
4117
4118 handle.stop();
4120 runtime
4121 .scheduler
4122 .lock()
4123 .unwrap()
4124 .schedule(server_task_id, 0);
4125 runtime.run_until_quiescent();
4126
4127 crate::test_complete!("conformance_full_lifecycle_no_obligation_leaks");
4130 }
4131
4132 #[test]
4135 #[allow(clippy::items_after_statements)]
4136 fn conformance_deterministic_replay_with_seed() {
4137 init_test("conformance_deterministic_replay_with_seed");
4138
4139 fn run_scenario(seed: u64) -> Vec<u64> {
4140 let config = crate::lab::LabConfig::new(seed);
4141 let mut runtime = crate::lab::LabRuntime::new(config);
4142 let budget = Budget::new().with_poll_quota(100_000);
4143 let region = runtime.state.create_root_region(budget);
4144 let cx = Cx::for_testing();
4145 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4146
4147 let (handle, stored) = scope
4148 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4149 .unwrap();
4150 let server_task_id = handle.task_id();
4151 runtime.state.store_spawned_task(server_task_id, stored);
4152
4153 let results: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
4154
4155 let mut client_ids = Vec::new();
4157 for i in 1..=3u64 {
4158 let server_ref = handle.server_ref();
4159 let results_clone = Arc::clone(&results);
4160 let (ch, cs) = scope
4161 .spawn(&mut runtime.state, &cx, move |cx| async move {
4162 if let Ok(val) = server_ref.call(&cx, CounterCall::Add(i * 10)).await {
4163 results_clone.lock().unwrap().push(val);
4164 }
4165 })
4166 .unwrap();
4167 let cid = ch.task_id();
4168 runtime.state.store_spawned_task(cid, cs);
4169 client_ids.push(cid);
4170 }
4171
4172 {
4174 let mut sched = runtime.scheduler.lock().unwrap();
4175 sched.schedule(server_task_id, 0);
4176 for &cid in &client_ids {
4177 sched.schedule(cid, 0);
4178 }
4179 }
4180 runtime.run_until_idle();
4181
4182 {
4184 let mut sched = runtime.scheduler.lock().unwrap();
4185 sched.schedule(server_task_id, 0);
4186 for &cid in &client_ids {
4187 sched.schedule(cid, 0);
4188 }
4189 }
4190 runtime.run_until_idle();
4191
4192 handle.stop();
4194 runtime
4195 .scheduler
4196 .lock()
4197 .unwrap()
4198 .schedule(server_task_id, 0);
4199 runtime.run_until_quiescent();
4200
4201 let r = results.lock().unwrap().clone();
4202 r
4203 }
4204
4205 let run_a = run_scenario(42);
4207 let run_b = run_scenario(42);
4208 assert_eq!(
4209 run_a, run_b,
4210 "same seed must produce identical results: {run_a:?} vs {run_b:?}"
4211 );
4212
4213 crate::test_complete!("conformance_deterministic_replay_with_seed");
4214 }
4215
4216 #[test]
4218 fn conformance_mailbox_overflow_reject_deterministic() {
4219 init_test("conformance_mailbox_overflow_reject_deterministic");
4220
4221 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4222 let region = runtime.state.create_root_region(Budget::INFINITE);
4223 let cx = Cx::for_testing();
4224 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4225
4226 let (handle, stored) = scope
4228 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 2)
4229 .unwrap();
4230 let server_task_id = handle.task_id();
4231 runtime.state.store_spawned_task(server_task_id, stored);
4232
4233 let server_ref = handle.server_ref();
4234
4235 server_ref.try_cast(CounterCast::Reset).unwrap();
4237 server_ref.try_cast(CounterCast::Reset).unwrap();
4238
4239 let overflow = server_ref.try_cast(CounterCast::Reset);
4241 assert!(
4242 overflow.is_err(),
4243 "third cast to capacity-2 mailbox must fail with Reject policy"
4244 );
4245 match overflow.unwrap_err() {
4246 CastError::Full => { }
4247 other => unreachable!("expected CastError::Full, got {other:?}"),
4248 }
4249
4250 drop(handle);
4252 runtime
4253 .scheduler
4254 .lock()
4255 .unwrap()
4256 .schedule(server_task_id, 0);
4257 runtime.run_until_quiescent();
4258
4259 crate::test_complete!("conformance_mailbox_overflow_reject_deterministic");
4260 }
4261
4262 #[test]
4264 fn conformance_mailbox_drop_oldest_preserves_newest() {
4265 init_test("conformance_mailbox_drop_oldest_preserves_newest");
4266
4267 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4268 let region = runtime.state.create_root_region(Budget::INFINITE);
4269 let cx = Cx::for_testing();
4270 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4271
4272 let (handle, stored) = scope
4274 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 2)
4275 .unwrap();
4276 let server_task_id = handle.task_id();
4277 runtime.state.store_spawned_task(server_task_id, stored);
4278
4279 let server_ref = handle.server_ref();
4280
4281 server_ref.try_cast(TaggedCast::Set(1)).unwrap();
4283 server_ref.try_cast(TaggedCast::Set(2)).unwrap();
4284
4285 server_ref.try_cast(TaggedCast::Set(100)).unwrap();
4287
4288 runtime
4290 .scheduler
4291 .lock()
4292 .unwrap()
4293 .schedule(server_task_id, 0);
4294 runtime.run_until_idle();
4295
4296 let result_ref = handle.server_ref();
4298 let result: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
4299 let result_clone = Arc::clone(&result);
4300 let (ch, cs) = scope
4301 .spawn(&mut runtime.state, &cx, move |cx| async move {
4302 if let Ok(val) = result_ref.call(&cx, CounterCall::Get).await {
4303 *result_clone.lock().unwrap() = Some(val);
4304 }
4305 })
4306 .unwrap();
4307 let cid = ch.task_id();
4308 runtime.state.store_spawned_task(cid, cs);
4309
4310 {
4311 let mut sched = runtime.scheduler.lock().unwrap();
4312 sched.schedule(server_task_id, 0);
4313 sched.schedule(cid, 0);
4314 }
4315 runtime.run_until_idle();
4316 {
4317 let mut sched = runtime.scheduler.lock().unwrap();
4318 sched.schedule(server_task_id, 0);
4319 sched.schedule(cid, 0);
4320 }
4321 runtime.run_until_idle();
4322
4323 assert_eq!(
4326 *result.lock().unwrap(),
4327 Some(100),
4328 "DropOldest should evict oldest, keeping newest"
4329 );
4330
4331 drop(handle);
4332 runtime
4333 .scheduler
4334 .lock()
4335 .unwrap()
4336 .schedule(server_task_id, 0);
4337 runtime.run_until_quiescent();
4338
4339 crate::test_complete!("conformance_mailbox_drop_oldest_preserves_newest");
4340 }
4341
4342 #[test]
4345 #[allow(clippy::items_after_statements)]
4346 fn conformance_budget_driven_call_timeout() {
4347 struct SlowServer;
4350 impl GenServer for SlowServer {
4351 type Call = ();
4352 type Reply = ();
4353 type Cast = ();
4354 type Info = SystemMsg;
4355
4356 fn handle_call(
4357 &mut self,
4358 _cx: &Cx,
4359 _request: (),
4360 reply: Reply<()>,
4361 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4362 let _proof = reply.abort();
4364 Box::pin(async {})
4365 }
4366 }
4367
4368 init_test("conformance_budget_driven_call_timeout");
4369
4370 let budget = Budget::new().with_poll_quota(100_000);
4371 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4372 let region = runtime.state.create_root_region(budget);
4373 let cx = Cx::for_testing();
4374 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4375
4376 let (handle, stored) = scope
4377 .spawn_gen_server(&mut runtime.state, &cx, SlowServer, 32)
4378 .unwrap();
4379 let server_task_id = handle.task_id();
4380 runtime.state.store_spawned_task(server_task_id, stored);
4381
4382 let server_ref = handle.server_ref();
4383
4384 let call_result: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
4387 let call_result_clone = Arc::clone(&call_result);
4388 let (ch, cs) = scope
4389 .spawn(&mut runtime.state, &cx, move |cx| async move {
4390 let r = server_ref.call(&cx, ()).await;
4391 *call_result_clone.lock().unwrap() = Some(r);
4392 })
4393 .unwrap();
4394 let client_id = ch.task_id();
4395 runtime.state.store_spawned_task(client_id, cs);
4396
4397 {
4399 let mut sched = runtime.scheduler.lock().unwrap();
4400 sched.schedule(server_task_id, 0);
4401 sched.schedule(client_id, 0);
4402 }
4403 runtime.run_until_idle();
4404 {
4405 let mut sched = runtime.scheduler.lock().unwrap();
4406 sched.schedule(server_task_id, 0);
4407 sched.schedule(client_id, 0);
4408 }
4409 runtime.run_until_idle();
4410
4411 if let Some(ref result) = *call_result.lock().unwrap() {
4413 assert!(result.is_err(), "aborted reply should result in call error");
4414 }
4415
4416 handle.stop();
4418 runtime
4419 .scheduler
4420 .lock()
4421 .unwrap()
4422 .schedule(server_task_id, 0);
4423 runtime.run_until_quiescent();
4424
4425 crate::test_complete!("conformance_budget_driven_call_timeout");
4426 }
4427
4428 #[test]
4431 #[allow(clippy::items_after_statements)]
4432 fn conformance_reply_linearity_send_commits() {
4433 struct ReplyTracker {
4435 committed: Arc<AtomicU8>,
4436 }
4437
4438 impl GenServer for ReplyTracker {
4439 type Call = u64;
4440 type Reply = u64;
4441 type Cast = ();
4442 type Info = SystemMsg;
4443
4444 fn handle_call(
4445 &mut self,
4446 _cx: &Cx,
4447 request: u64,
4448 reply: Reply<u64>,
4449 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4450 match reply.send(request * 2) {
4451 ReplyOutcome::Committed(_proof) => {
4452 self.committed.store(1, Ordering::SeqCst);
4453 }
4454 ReplyOutcome::CallerGone => {
4455 self.committed.store(2, Ordering::SeqCst);
4456 }
4457 }
4458 Box::pin(async {})
4459 }
4460 }
4461
4462 init_test("conformance_reply_linearity_send_commits");
4463
4464 let budget = Budget::new().with_poll_quota(100_000);
4465 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4466 let region = runtime.state.create_root_region(budget);
4467 let cx = Cx::for_testing();
4468 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4469
4470 let committed = Arc::new(AtomicU8::new(0));
4471 let server = ReplyTracker {
4472 committed: Arc::clone(&committed),
4473 };
4474
4475 let (handle, stored) = scope
4476 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4477 .unwrap();
4478 let server_task_id = handle.task_id();
4479 runtime.state.store_spawned_task(server_task_id, stored);
4480
4481 let server_ref = handle.server_ref();
4482 let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4483 let call_result_clone = Arc::clone(&call_result);
4484
4485 let (ch, cs) = scope
4486 .spawn(&mut runtime.state, &cx, move |cx| async move {
4487 let r = server_ref.call(&cx, 21).await;
4488 *call_result_clone.lock().unwrap() = Some(r);
4489 })
4490 .unwrap();
4491 let client_id = ch.task_id();
4492 runtime.state.store_spawned_task(client_id, cs);
4493
4494 {
4495 let mut sched = runtime.scheduler.lock().unwrap();
4496 sched.schedule(server_task_id, 0);
4497 sched.schedule(client_id, 0);
4498 }
4499 runtime.run_until_idle();
4500 {
4501 let mut sched = runtime.scheduler.lock().unwrap();
4502 sched.schedule(server_task_id, 0);
4503 sched.schedule(client_id, 0);
4504 }
4505 runtime.run_until_idle();
4506
4507 assert_eq!(
4509 committed.load(Ordering::SeqCst),
4510 1,
4511 "reply must be committed when caller is waiting"
4512 );
4513
4514 {
4516 let r = call_result.lock().unwrap();
4517 match r.as_ref() {
4518 Some(Ok(value)) => assert_eq!(*value, 42, "21 * 2 = 42"),
4519 other => unreachable!("expected Ok(42), got {other:?}"),
4520 }
4521 }
4522
4523 handle.stop();
4524 runtime
4525 .scheduler
4526 .lock()
4527 .unwrap()
4528 .schedule(server_task_id, 0);
4529 runtime.run_until_quiescent();
4530
4531 crate::test_complete!("conformance_reply_linearity_send_commits");
4532 }
4533
4534 #[test]
4537 #[allow(clippy::items_after_statements)]
4538 fn conformance_reply_linearity_abort_is_clean() {
4539 struct AbortServer {
4540 aborted: Arc<AtomicU8>,
4541 }
4542
4543 impl GenServer for AbortServer {
4544 type Call = ();
4545 type Reply = ();
4546 type Cast = ();
4547 type Info = SystemMsg;
4548
4549 fn handle_call(
4550 &mut self,
4551 _cx: &Cx,
4552 _request: (),
4553 reply: Reply<()>,
4554 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4555 let _proof = reply.abort();
4556 self.aborted.store(1, Ordering::SeqCst);
4557 Box::pin(async {})
4558 }
4559 }
4560
4561 init_test("conformance_reply_linearity_abort_is_clean");
4562
4563 let budget = Budget::new().with_poll_quota(100_000);
4564 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4565 let region = runtime.state.create_root_region(budget);
4566 let cx = Cx::for_testing();
4567 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4568
4569 let aborted = Arc::new(AtomicU8::new(0));
4570 let server = AbortServer {
4571 aborted: Arc::clone(&aborted),
4572 };
4573
4574 let (handle, stored) = scope
4575 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4576 .unwrap();
4577 let server_task_id = handle.task_id();
4578 runtime.state.store_spawned_task(server_task_id, stored);
4579
4580 let server_ref = handle.server_ref();
4581 let call_err: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
4582 let call_err_clone = Arc::clone(&call_err);
4583
4584 let (ch, cs) = scope
4585 .spawn(&mut runtime.state, &cx, move |cx| async move {
4586 let r = server_ref.call(&cx, ()).await;
4587 *call_err_clone.lock().unwrap() = Some(r);
4588 })
4589 .unwrap();
4590 let client_id = ch.task_id();
4591 runtime.state.store_spawned_task(client_id, cs);
4592
4593 {
4594 let mut sched = runtime.scheduler.lock().unwrap();
4595 sched.schedule(server_task_id, 0);
4596 sched.schedule(client_id, 0);
4597 }
4598 runtime.run_until_idle();
4599 {
4600 let mut sched = runtime.scheduler.lock().unwrap();
4601 sched.schedule(server_task_id, 0);
4602 sched.schedule(client_id, 0);
4603 }
4604 runtime.run_until_idle();
4605
4606 assert_eq!(
4608 aborted.load(Ordering::SeqCst),
4609 1,
4610 "server must have called abort()"
4611 );
4612
4613 {
4615 let r = call_err.lock().unwrap();
4616 match r.as_ref() {
4617 Some(Err(_)) => { }
4618 other => unreachable!("expected call error after abort, got {other:?}"),
4619 }
4620 }
4621
4622 handle.stop();
4623 runtime
4624 .scheduler
4625 .lock()
4626 .unwrap()
4627 .schedule(server_task_id, 0);
4628 runtime.run_until_quiescent();
4629
4630 crate::test_complete!("conformance_reply_linearity_abort_is_clean");
4631 }
4632
4633 #[test]
4636 #[allow(clippy::items_after_statements)]
4637 fn conformance_drain_processes_queued_casts_on_stop() {
4638 struct AccumulatorServer {
4639 sum: u64,
4640 final_sum: Arc<AtomicU64>,
4641 }
4642
4643 enum AccumCall {
4644 GetSum,
4645 }
4646 enum AccumCast {
4647 Add(u64),
4648 }
4649
4650 impl GenServer for AccumulatorServer {
4651 type Call = AccumCall;
4652 type Reply = u64;
4653 type Cast = AccumCast;
4654 type Info = SystemMsg;
4655
4656 fn handle_call(
4657 &mut self,
4658 _cx: &Cx,
4659 _request: AccumCall,
4660 reply: Reply<u64>,
4661 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4662 let _ = reply.send(self.sum);
4663 Box::pin(async {})
4664 }
4665
4666 fn handle_cast(
4667 &mut self,
4668 _cx: &Cx,
4669 msg: AccumCast,
4670 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4671 match msg {
4672 AccumCast::Add(n) => self.sum += n,
4673 }
4674 Box::pin(async {})
4675 }
4676
4677 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4678 self.final_sum.store(self.sum, Ordering::SeqCst);
4679 Box::pin(async {})
4680 }
4681 }
4682
4683 init_test("conformance_drain_processes_queued_casts_on_stop");
4684
4685 let budget = Budget::new().with_poll_quota(100_000);
4686 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4687 let region = runtime.state.create_root_region(budget);
4688 let cx = Cx::for_testing();
4689 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4690
4691 let final_sum = Arc::new(AtomicU64::new(0));
4692 let server = AccumulatorServer {
4693 sum: 0,
4694 final_sum: Arc::clone(&final_sum),
4695 };
4696
4697 let (handle, stored) = scope
4698 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4699 .unwrap();
4700 let server_task_id = handle.task_id();
4701 runtime.state.store_spawned_task(server_task_id, stored);
4702
4703 let server_ref = handle.server_ref();
4704
4705 server_ref.try_cast(AccumCast::Add(10)).unwrap();
4707 server_ref.try_cast(AccumCast::Add(20)).unwrap();
4708 server_ref.try_cast(AccumCast::Add(30)).unwrap();
4709
4710 runtime
4712 .scheduler
4713 .lock()
4714 .unwrap()
4715 .schedule(server_task_id, 0);
4716 runtime.run_until_idle();
4717
4718 handle.stop();
4720 runtime
4721 .scheduler
4722 .lock()
4723 .unwrap()
4724 .schedule(server_task_id, 0);
4725 runtime.run_until_quiescent();
4726
4727 let sum = final_sum.load(Ordering::SeqCst);
4729 assert_eq!(
4730 sum, 60,
4731 "server must drain queued casts before stopping: 10+20+30=60, got {sum}"
4732 );
4733
4734 crate::test_complete!("conformance_drain_processes_queued_casts_on_stop");
4735 }
4736
4737 #[test]
4743 fn named_server_register_and_whereis() {
4744 crate::test_utils::init_test_logging();
4745 crate::test_phase!("named_server_register_and_whereis");
4746
4747 let budget = Budget::new().with_poll_quota(100_000);
4748 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4749 let region = runtime.state.create_root_region(budget);
4750 let cx = Cx::for_testing();
4751 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4752 let mut registry = crate::cx::NameRegistry::new();
4753
4754 #[allow(clippy::items_after_statements)]
4755 #[derive(Debug)]
4756 struct Counter(u64);
4757
4758 #[allow(clippy::items_after_statements)]
4759 impl GenServer for Counter {
4760 type Call = u64;
4761 type Reply = u64;
4762 type Cast = ();
4763 type Info = SystemMsg;
4764
4765 fn handle_call(
4766 &mut self,
4767 _cx: &Cx,
4768 request: u64,
4769 reply: Reply<u64>,
4770 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4771 self.0 += request;
4772 let _ = reply.send(self.0);
4773 Box::pin(async {})
4774 }
4775
4776 fn handle_cast(
4777 &mut self,
4778 _cx: &Cx,
4779 _msg: (),
4780 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4781 Box::pin(async {})
4782 }
4783 }
4784
4785 let now = crate::types::Time::ZERO;
4786 let (mut named_handle, stored) = scope
4787 .spawn_named_gen_server(
4788 &mut runtime.state,
4789 &cx,
4790 &mut registry,
4791 "my_counter",
4792 Counter(0),
4793 32,
4794 now,
4795 )
4796 .unwrap();
4797
4798 let task_id = named_handle.task_id();
4799 runtime.state.store_spawned_task(task_id, stored);
4800
4801 assert_eq!(registry.whereis("my_counter"), Some(task_id));
4803 assert_eq!(named_handle.name(), "my_counter");
4804
4805 named_handle.stop_and_release().unwrap();
4807
4808 crate::test_complete!("named_server_register_and_whereis");
4809 }
4810
4811 #[test]
4813 fn named_server_duplicate_name_rejected() {
4814 crate::test_utils::init_test_logging();
4815 crate::test_phase!("named_server_duplicate_name_rejected");
4816
4817 let budget = Budget::new().with_poll_quota(100_000);
4818 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4819 let region = runtime.state.create_root_region(budget);
4820 let cx = Cx::for_testing();
4821 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4822 let mut registry = crate::cx::NameRegistry::new();
4823
4824 #[allow(clippy::items_after_statements)]
4825 #[derive(Debug)]
4826 struct Dummy;
4827
4828 #[allow(clippy::items_after_statements)]
4829 impl GenServer for Dummy {
4830 type Call = ();
4831 type Reply = ();
4832 type Cast = ();
4833 type Info = SystemMsg;
4834
4835 fn handle_call(
4836 &mut self,
4837 _cx: &Cx,
4838 _request: (),
4839 reply: Reply<()>,
4840 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4841 let _ = reply.send(());
4842 Box::pin(async {})
4843 }
4844
4845 fn handle_cast(
4846 &mut self,
4847 _cx: &Cx,
4848 _msg: (),
4849 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4850 Box::pin(async {})
4851 }
4852 }
4853
4854 let now = crate::types::Time::ZERO;
4855
4856 let (mut h1, s1) = scope
4858 .spawn_named_gen_server(
4859 &mut runtime.state,
4860 &cx,
4861 &mut registry,
4862 "singleton",
4863 Dummy,
4864 8,
4865 now,
4866 )
4867 .unwrap();
4868 runtime.state.store_spawned_task(h1.task_id(), s1);
4869
4870 let result = scope.spawn_named_gen_server(
4872 &mut runtime.state,
4873 &cx,
4874 &mut registry,
4875 "singleton",
4876 Dummy,
4877 8,
4878 now,
4879 );
4880 assert!(
4881 matches!(result, Err(NamedSpawnError::NameTaken(_))),
4882 "duplicate name should be rejected"
4883 );
4884
4885 assert_eq!(registry.whereis("singleton"), Some(h1.task_id()));
4887
4888 let region_tasks = runtime.state.region(region).unwrap().task_ids();
4892 assert_eq!(
4893 region_tasks,
4894 vec![h1.task_id()],
4895 "region should only have the first task; orphaned task must be removed"
4896 );
4897
4898 h1.stop_and_release().unwrap();
4899
4900 crate::test_complete!("named_server_duplicate_name_rejected");
4901 }
4902
4903 #[test]
4905 fn named_server_abort_lease_removes_name() {
4906 crate::test_utils::init_test_logging();
4907 crate::test_phase!("named_server_abort_lease_removes_name");
4908
4909 let budget = Budget::new().with_poll_quota(100_000);
4910 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4911 let region = runtime.state.create_root_region(budget);
4912 let cx = Cx::for_testing();
4913 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4914 let mut registry = crate::cx::NameRegistry::new();
4915
4916 #[allow(clippy::items_after_statements)]
4917 #[derive(Debug)]
4918 struct Noop;
4919
4920 #[allow(clippy::items_after_statements)]
4921 impl GenServer for Noop {
4922 type Call = ();
4923 type Reply = ();
4924 type Cast = ();
4925 type Info = SystemMsg;
4926
4927 fn handle_call(
4928 &mut self,
4929 _cx: &Cx,
4930 _req: (),
4931 reply: Reply<()>,
4932 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4933 let _ = reply.send(());
4934 Box::pin(async {})
4935 }
4936
4937 fn handle_cast(
4938 &mut self,
4939 _cx: &Cx,
4940 _msg: (),
4941 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4942 Box::pin(async {})
4943 }
4944 }
4945
4946 let now = crate::types::Time::ZERO;
4947 let (mut handle, stored) = scope
4948 .spawn_named_gen_server(
4949 &mut runtime.state,
4950 &cx,
4951 &mut registry,
4952 "temp_name",
4953 Noop,
4954 8,
4955 now,
4956 )
4957 .unwrap();
4958 runtime.state.store_spawned_task(handle.task_id(), stored);
4959
4960 assert!(registry.whereis("temp_name").is_some());
4962
4963 handle.abort_lease().unwrap();
4965
4966 crate::test_complete!("named_server_abort_lease_removes_name");
4972 }
4973
4974 #[test]
4976 fn named_server_take_lease_manual_management() {
4977 crate::test_utils::init_test_logging();
4978 crate::test_phase!("named_server_take_lease_manual_management");
4979
4980 let budget = Budget::new().with_poll_quota(100_000);
4981 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
4982 let region = runtime.state.create_root_region(budget);
4983 let cx = Cx::for_testing();
4984 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4985 let mut registry = crate::cx::NameRegistry::new();
4986
4987 #[allow(clippy::items_after_statements)]
4988 #[derive(Debug)]
4989 struct Noop2;
4990
4991 #[allow(clippy::items_after_statements)]
4992 impl GenServer for Noop2 {
4993 type Call = ();
4994 type Reply = ();
4995 type Cast = ();
4996 type Info = SystemMsg;
4997
4998 fn handle_call(
4999 &mut self,
5000 _cx: &Cx,
5001 _req: (),
5002 reply: Reply<()>,
5003 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5004 let _ = reply.send(());
5005 Box::pin(async {})
5006 }
5007
5008 fn handle_cast(
5009 &mut self,
5010 _cx: &Cx,
5011 _msg: (),
5012 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5013 Box::pin(async {})
5014 }
5015 }
5016
5017 let now = crate::types::Time::ZERO;
5018 let (mut handle, stored) = scope
5019 .spawn_named_gen_server(
5020 &mut runtime.state,
5021 &cx,
5022 &mut registry,
5023 "manual_name",
5024 Noop2,
5025 8,
5026 now,
5027 )
5028 .unwrap();
5029 runtime.state.store_spawned_task(handle.task_id(), stored);
5030
5031 let mut lease = handle.take_lease().unwrap();
5033 assert!(handle.take_lease().is_none(), "second take returns None");
5034
5035 assert_eq!(handle.name(), "(released)");
5037
5038 let _ = lease.abort();
5040
5041 crate::test_complete!("named_server_take_lease_manual_management");
5042 }
5043
5044 #[test]
5045 #[allow(clippy::items_after_statements)]
5046 fn named_start_helper_supervisor_stop_cleans_registry() {
5047 crate::test_utils::init_test_logging();
5048 crate::test_phase!("named_start_helper_supervisor_stop_cleans_registry");
5049
5050 #[derive(Debug)]
5051 struct Noop;
5052
5053 impl GenServer for Noop {
5054 type Call = ();
5055 type Reply = ();
5056 type Cast = ();
5057 type Info = SystemMsg;
5058
5059 fn handle_call(
5060 &mut self,
5061 _cx: &Cx,
5062 _request: (),
5063 reply: Reply<()>,
5064 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5065 let _ = reply.send(());
5066 Box::pin(async {})
5067 }
5068
5069 fn handle_cast(
5070 &mut self,
5071 _cx: &Cx,
5072 _msg: (),
5073 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5074 Box::pin(async {})
5075 }
5076 }
5077
5078 let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
5079 Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
5080
5081 let budget = Budget::new().with_poll_quota(100_000);
5082 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5083 let root = runtime.state.create_root_region(budget);
5084 let cx = Cx::for_testing();
5085
5086 let child = crate::supervision::ChildSpec::new(
5087 "svc_child",
5088 named_gen_server_start(Arc::clone(®istry), "svc", 16, || Noop),
5089 );
5090
5091 let compiled = crate::supervision::SupervisorBuilder::new("svc_supervisor")
5092 .child(child)
5093 .compile()
5094 .expect("compile supervisor");
5095
5096 let supervisor = compiled
5097 .spawn(&mut runtime.state, &cx, root, budget)
5098 .expect("spawn supervisor");
5099
5100 assert_eq!(supervisor.started.len(), 1, "exactly one started child");
5101 let child_task = supervisor.started[0].task_id;
5102 assert_eq!(registry.lock().whereis("svc"), Some(child_task));
5103
5104 let tasks_to_schedule = runtime.state.cancel_request(
5106 supervisor.region,
5107 &crate::types::CancelReason::user("stop"),
5108 None,
5109 );
5110 for (task_id, priority) in tasks_to_schedule {
5111 runtime
5112 .scheduler
5113 .lock()
5114 .unwrap()
5115 .schedule(task_id, priority);
5116 }
5117 runtime.run_until_quiescent();
5118
5119 assert!(
5120 registry.lock().whereis("svc").is_none(),
5121 "name must be removed after supervised stop",
5122 );
5123
5124 crate::test_complete!("named_start_helper_supervisor_stop_cleans_registry");
5125 }
5126
5127 #[test]
5128 #[allow(clippy::items_after_statements)]
5129 fn named_start_helper_crash_then_stop_cleans_registry() {
5130 crate::test_utils::init_test_logging();
5131 crate::test_phase!("named_start_helper_crash_then_stop_cleans_registry");
5132
5133 #[derive(Debug)]
5134 struct PanicOnStart;
5135
5136 impl GenServer for PanicOnStart {
5137 type Call = ();
5138 type Reply = ();
5139 type Cast = ();
5140 type Info = SystemMsg;
5141
5142 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5143 Box::pin(async move {
5144 std::panic::panic_any("intentional start crash for registry cleanup test");
5145 })
5146 }
5147
5148 fn handle_call(
5149 &mut self,
5150 _cx: &Cx,
5151 _request: (),
5152 reply: Reply<()>,
5153 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5154 let _ = reply.send(());
5155 Box::pin(async {})
5156 }
5157
5158 fn handle_cast(
5159 &mut self,
5160 _cx: &Cx,
5161 _msg: (),
5162 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5163 Box::pin(async {})
5164 }
5165 }
5166
5167 let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
5168 Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
5169
5170 let budget = Budget::new().with_poll_quota(100_000);
5171 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(7));
5172 let root = runtime.state.create_root_region(budget);
5173 let cx = Cx::for_testing();
5174
5175 let child = crate::supervision::ChildSpec::new(
5176 "panic_child",
5177 named_gen_server_start(Arc::clone(®istry), "panic_svc", 8, || PanicOnStart),
5178 );
5179
5180 let compiled = crate::supervision::SupervisorBuilder::new("panic_supervisor")
5181 .child(child)
5182 .compile()
5183 .expect("compile supervisor");
5184
5185 let supervisor = compiled
5186 .spawn(&mut runtime.state, &cx, root, budget)
5187 .expect("spawn supervisor");
5188
5189 let child_task = supervisor.started[0].task_id;
5190 assert_eq!(registry.lock().whereis("panic_svc"), Some(child_task));
5191
5192 runtime.scheduler.lock().unwrap().schedule(child_task, 0);
5194 runtime.run_until_idle();
5195
5196 let tasks_to_schedule = runtime.state.cancel_request(
5198 supervisor.region,
5199 &crate::types::CancelReason::user("shutdown"),
5200 None,
5201 );
5202 for (task_id, priority) in tasks_to_schedule {
5203 runtime
5204 .scheduler
5205 .lock()
5206 .unwrap()
5207 .schedule(task_id, priority);
5208 }
5209 runtime.run_until_quiescent();
5210
5211 assert!(
5212 registry.lock().whereis("panic_svc").is_none(),
5213 "name must be removed after crash + region stop",
5214 );
5215
5216 crate::test_complete!("named_start_helper_crash_then_stop_cleans_registry");
5217 }
5218}