1use std::future::Future;
59use std::pin::Pin;
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU8, Ordering};
62
63use crate::actor::{ActorId, ActorState};
64use crate::channel::mpsc;
65use crate::channel::oneshot;
66use crate::channel::session::{self, TrackedOneshotPermit};
67use crate::cx::Cx;
68use crate::monitor::{DownNotification, DownReason};
69use crate::obligation::graded::{AbortedProof, CommittedProof, SendPermit};
70use crate::runtime::{JoinError, SpawnError};
71use crate::types::{Budget, CancelReason, CxInner, Outcome, TaskId, Time};
72
73struct PhaseBudgetGuard {
83 inner: Arc<parking_lot::RwLock<CxInner>>,
84 original_budget: Budget,
85 original_baseline: Budget,
86 phase_baseline: Budget,
87 restore_original: bool,
88}
89
90impl PhaseBudgetGuard {
91 fn enter(cx: &Cx, phase_budget: Budget, restore_original: bool) -> Self {
92 let inner = Arc::clone(&cx.inner);
93 let (original_budget, original_baseline, phase_baseline) = {
94 let mut guard = inner.write();
95 let original_budget = guard.budget;
96 let original_baseline = guard.budget_baseline;
97 let mut phase_baseline = original_budget.meet(phase_budget);
98 phase_baseline.priority = original_budget.priority.max(phase_budget.priority);
99 guard.budget = phase_baseline;
100 guard.budget_baseline = phase_baseline;
101 drop(guard);
102 (original_budget, original_baseline, phase_baseline)
103 };
104 Self {
105 inner,
106 original_budget,
107 original_baseline,
108 phase_baseline,
109 restore_original,
110 }
111 }
112}
113
114impl Drop for PhaseBudgetGuard {
115 fn drop(&mut self) {
116 if !self.restore_original {
117 return;
118 }
119
120 let mut guard = self.inner.write();
121
122 let phase_remaining = guard.budget;
123 let polls_used = self
124 .phase_baseline
125 .poll_quota
126 .saturating_sub(phase_remaining.poll_quota);
127
128 let cost_used = match (self.phase_baseline.cost_quota, phase_remaining.cost_quota) {
129 (Some(base), Some(rem)) => base.saturating_sub(rem),
130 _ => 0,
131 };
132
133 let restored_cost_quota = self
134 .original_budget
135 .cost_quota
136 .map(|orig| orig.saturating_sub(cost_used));
137
138 guard.budget = Budget {
139 deadline: self.original_budget.deadline,
140 poll_quota: self.original_budget.poll_quota.saturating_sub(polls_used),
141 cost_quota: restored_cost_quota,
142 priority: self.original_budget.priority,
143 };
144 guard.budget_baseline = self.original_baseline;
145 }
146}
147
148struct AsyncMaskGuard {
152 inner: Arc<parking_lot::RwLock<CxInner>>,
153}
154
155impl AsyncMaskGuard {
156 fn enter(cx: &Cx) -> Self {
157 let inner = Arc::clone(&cx.inner);
158 {
159 let mut guard = inner.write();
160 assert!(
161 guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
162 "mask depth exceeded MAX_MASK_DEPTH ({}) in AsyncMaskGuard::enter: \
163 this violates INV-MASK-BOUNDED and prevents cancellation from ever \
164 being observed. Reduce nesting of masked sections.",
165 crate::types::task_context::MAX_MASK_DEPTH
166 );
167 guard.mask_depth += 1;
168 }
169 Self { inner }
170 }
171}
172
173impl Drop for AsyncMaskGuard {
174 fn drop(&mut self) {
175 let mut guard = self.inner.write();
176 guard.mask_depth = guard.mask_depth.saturating_sub(1);
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
196pub enum CastOverflowPolicy {
197 #[default]
202 Reject,
203
204 DropOldest,
211}
212
213impl std::fmt::Display for CastOverflowPolicy {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 Self::Reject => write!(f, "Reject"),
217 Self::DropOldest => write!(f, "DropOldest"),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
231pub struct DownMsg {
232 pub completion_vt: Time,
234 pub notification: DownNotification,
236}
237
238impl DownMsg {
239 #[must_use]
241 pub const fn new(completion_vt: Time, notification: DownNotification) -> Self {
242 Self {
243 completion_vt,
244 notification,
245 }
246 }
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct ExitMsg {
252 pub exit_vt: Time,
254 pub from: TaskId,
256 pub reason: DownReason,
258}
259
260impl ExitMsg {
261 #[must_use]
263 pub const fn new(exit_vt: Time, from: TaskId, reason: DownReason) -> Self {
264 Self {
265 exit_vt,
266 from,
267 reason,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub struct TimeoutMsg {
275 pub tick_vt: Time,
277 pub id: u64,
279}
280
281impl TimeoutMsg {
282 #[must_use]
284 pub const fn new(tick_vt: Time, id: u64) -> Self {
285 Self { tick_vt, id }
286 }
287}
288
289#[derive(Debug, Clone)]
291pub enum SystemMsg {
292 Down {
294 completion_vt: Time,
296 notification: DownNotification,
298 },
299
300 Exit {
302 exit_vt: Time,
304 from: TaskId,
306 reason: DownReason,
308 },
309
310 Timeout {
312 tick_vt: Time,
314 id: u64,
316 },
317}
318
319impl SystemMsg {
320 #[must_use]
322 pub fn down(msg: DownMsg) -> Self {
323 msg.into()
324 }
325
326 #[must_use]
328 pub fn exit(msg: ExitMsg) -> Self {
329 msg.into()
330 }
331
332 #[must_use]
334 pub fn timeout(msg: TimeoutMsg) -> Self {
335 msg.into()
336 }
337
338 const fn vt(&self) -> Time {
339 match self {
340 Self::Down { completion_vt, .. } => *completion_vt,
341 Self::Exit { exit_vt, .. } => *exit_vt,
342 Self::Timeout { tick_vt, .. } => *tick_vt,
343 }
344 }
345
346 const fn kind_rank(&self) -> u8 {
347 match self {
348 Self::Down { .. } => 0,
349 Self::Exit { .. } => 1,
350 Self::Timeout { .. } => 2,
351 }
352 }
353
354 const fn subject_key(&self) -> SystemMsgSubjectKey {
355 match self {
356 Self::Down { notification, .. } => SystemMsgSubjectKey::Task(notification.monitored),
357 Self::Exit { from, .. } => SystemMsgSubjectKey::Task(*from),
358 Self::Timeout { id, .. } => SystemMsgSubjectKey::TimeoutId(*id),
359 }
360 }
361
362 #[must_use]
372 pub const fn sort_key(&self) -> (Time, u8, SystemMsgSubjectKey) {
373 (self.vt(), self.kind_rank(), self.subject_key())
374 }
375}
376
377impl From<DownMsg> for SystemMsg {
378 fn from(value: DownMsg) -> Self {
379 Self::Down {
380 completion_vt: value.completion_vt,
381 notification: value.notification,
382 }
383 }
384}
385
386impl From<ExitMsg> for SystemMsg {
387 fn from(value: ExitMsg) -> Self {
388 Self::Exit {
389 exit_vt: value.exit_vt,
390 from: value.from,
391 reason: value.reason,
392 }
393 }
394}
395
396impl From<TimeoutMsg> for SystemMsg {
397 fn from(value: TimeoutMsg) -> Self {
398 Self::Timeout {
399 tick_vt: value.tick_vt,
400 id: value.id,
401 }
402 }
403}
404
405impl TryFrom<SystemMsg> for DownMsg {
406 type Error = SystemMsg;
407
408 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
409 match value {
410 SystemMsg::Down {
411 completion_vt,
412 notification,
413 } => Ok(Self {
414 completion_vt,
415 notification,
416 }),
417 other => Err(other),
418 }
419 }
420}
421
422impl TryFrom<SystemMsg> for ExitMsg {
423 type Error = SystemMsg;
424
425 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
426 match value {
427 SystemMsg::Exit {
428 exit_vt,
429 from,
430 reason,
431 } => Ok(Self {
432 exit_vt,
433 from,
434 reason,
435 }),
436 other => Err(other),
437 }
438 }
439}
440
441impl TryFrom<SystemMsg> for TimeoutMsg {
442 type Error = SystemMsg;
443
444 fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
445 match value {
446 SystemMsg::Timeout { tick_vt, id } => Ok(Self { tick_vt, id }),
447 other => Err(other),
448 }
449 }
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
454pub enum SystemMsgSubjectKey {
455 Task(TaskId),
457 TimeoutId(u64),
459}
460
461#[derive(Debug, Default)]
466pub struct SystemMsgBatch {
467 entries: Vec<SystemMsg>,
468}
469
470impl SystemMsgBatch {
471 #[must_use]
473 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn push(&mut self, msg: SystemMsg) {
479 self.entries.push(msg);
480 }
481
482 #[must_use]
484 pub fn into_sorted(mut self) -> Vec<SystemMsg> {
485 self.entries.sort_by_key(SystemMsg::sort_key);
486 self.entries
487 }
488}
489
490pub trait GenServer: Send + 'static {
500 type Call: Send + 'static;
502
503 type Reply: Send + 'static;
505
506 type Cast: Send + 'static;
508
509 type Info: Send + 'static;
517
518 fn handle_call(
523 &mut self,
524 cx: &Cx,
525 request: Self::Call,
526 reply: Reply<Self::Reply>,
527 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
528
529 fn handle_cast(
533 &mut self,
534 _cx: &Cx,
535 _msg: Self::Cast,
536 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
537 Box::pin(async {})
538 }
539
540 fn handle_info(
544 &mut self,
545 _cx: &Cx,
546 _msg: Self::Info,
547 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
548 Box::pin(async {})
549 }
550
551 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
553 Box::pin(async {})
554 }
555
556 fn on_start_budget(&self) -> Budget {
562 Budget::INFINITE
563 }
564
565 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
567 Box::pin(async {})
568 }
569
570 fn on_stop_budget(&self) -> Budget {
574 Budget::MINIMAL
575 }
576
577 fn cast_overflow_policy(&self) -> CastOverflowPolicy {
584 CastOverflowPolicy::Reject
585 }
586}
587
588pub struct Reply<R> {
598 cx: Cx,
599 permit: Option<TrackedOneshotPermit<R>>,
600}
601
602impl<R: Send + 'static> Reply<R> {
603 fn new(cx: &Cx, permit: TrackedOneshotPermit<R>) -> Self {
604 Self {
605 cx: cx.clone(),
606 permit: Some(permit),
607 }
608 }
609
610 pub fn send(mut self, value: R) -> ReplyOutcome {
615 let permit = self
616 .permit
617 .take()
618 .expect("Reply::send called after reply was already consumed");
619 match permit.send(value) {
620 Ok(proof) => {
621 self.cx.trace("gen_server::reply_committed");
622 ReplyOutcome::Committed(proof)
623 }
624 Err(_send_err) => {
625 self.cx.trace("gen_server::reply_caller_gone");
628 ReplyOutcome::CallerGone
629 }
630 }
631 }
632
633 #[must_use]
638 pub fn abort(mut self) -> AbortedProof<SendPermit> {
639 self.cx.trace("gen_server::reply_aborted");
640 self.permit
641 .take()
642 .expect("Reply::abort called after reply was already consumed")
643 .abort()
644 }
645
646 #[inline]
648 #[must_use]
649 pub fn is_closed(&self) -> bool {
650 self.permit
651 .as_ref()
652 .is_some_and(TrackedOneshotPermit::is_closed)
653 }
654}
655
656impl<R> Drop for Reply<R> {
657 fn drop(&mut self) {
658 let Some(permit) = self.permit.take() else {
659 return;
660 };
661
662 if std::thread::panicking() {
663 let _ = permit.abort();
666 } else {
667 drop(permit);
668 }
669 }
670}
671
672impl<R> std::fmt::Debug for Reply<R> {
673 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
674 f.debug_struct("Reply")
675 .field("pending", &self.permit.is_some())
676 .finish_non_exhaustive()
677 }
678}
679
680pub enum ReplyOutcome {
682 Committed(CommittedProof<SendPermit>),
684 CallerGone,
686}
687
688impl std::fmt::Debug for ReplyOutcome {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 match self {
691 Self::Committed(_) => f.debug_tuple("Committed").finish(),
692 Self::CallerGone => write!(f, "CallerGone"),
693 }
694 }
695}
696
697enum Envelope<S: GenServer> {
703 Call {
704 request: S::Call,
705 reply_permit: TrackedOneshotPermit<S::Reply>,
706 },
707 Cast {
708 msg: S::Cast,
709 },
710 Info {
711 msg: S::Info,
712 },
713}
714
715impl<S: GenServer> std::fmt::Debug for Envelope<S> {
716 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
717 match self {
718 Self::Call { .. } => f.debug_struct("Envelope::Call").finish_non_exhaustive(),
719 Self::Cast { .. } => f.debug_struct("Envelope::Cast").finish_non_exhaustive(),
720 Self::Info { .. } => f.debug_struct("Envelope::Info").finish_non_exhaustive(),
721 }
722 }
723}
724
725struct GenServerCell<S: GenServer> {
730 mailbox: mpsc::Receiver<Envelope<S>>,
731 state: Arc<GenServerStateCell>,
732 _keep_alive: mpsc::Sender<Envelope<S>>,
733}
734
735#[derive(Debug)]
736struct GenServerStateCell {
737 state: AtomicU8,
738}
739
740impl GenServerStateCell {
741 fn new(state: ActorState) -> Self {
742 Self {
743 state: AtomicU8::new(encode_actor_state(state)),
744 }
745 }
746
747 fn load(&self) -> ActorState {
748 decode_actor_state(self.state.load(Ordering::Acquire))
749 }
750
751 fn store(&self, state: ActorState) {
752 self.state
753 .store(encode_actor_state(state), Ordering::Release);
754 }
755}
756
757const fn encode_actor_state(state: ActorState) -> u8 {
758 match state {
759 ActorState::Created => 0,
760 ActorState::Running => 1,
761 ActorState::Stopping => 2,
762 ActorState::Stopped => 3,
763 }
764}
765
766const fn decode_actor_state(value: u8) -> ActorState {
767 match value {
768 0 => ActorState::Created,
769 1 => ActorState::Running,
770 2 => ActorState::Stopping,
771 _ => ActorState::Stopped,
772 }
773}
774
775#[derive(Debug)]
784pub struct GenServerHandle<S: GenServer> {
785 actor_id: ActorId,
786 sender: mpsc::Sender<Envelope<S>>,
787 state: Arc<GenServerStateCell>,
788 task_id: TaskId,
789 receiver: oneshot::Receiver<Result<S, JoinError>>,
790 inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
791 completed: bool,
792 overflow_policy: CastOverflowPolicy,
793}
794
795#[derive(Debug)]
797pub enum CallError {
798 ServerStopped,
800 NoReply,
802 Cancelled(CancelReason),
804}
805
806impl std::fmt::Display for CallError {
807 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808 match self {
809 Self::ServerStopped => write!(f, "GenServer has stopped"),
810 Self::NoReply => write!(f, "GenServer did not reply"),
811 Self::Cancelled(reason) => write!(f, "GenServer call cancelled: {reason}"),
812 }
813 }
814}
815
816impl std::error::Error for CallError {}
817
818#[derive(Debug)]
820pub enum CastError {
821 ServerStopped,
823 Full,
825 Cancelled(CancelReason),
827}
828
829impl std::fmt::Display for CastError {
830 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
831 match self {
832 Self::ServerStopped => write!(f, "GenServer has stopped"),
833 Self::Full => write!(f, "GenServer mailbox full"),
834 Self::Cancelled(reason) => write!(f, "GenServer cast cancelled: {reason}"),
835 }
836 }
837}
838
839impl std::error::Error for CastError {}
840
841#[derive(Debug)]
843pub enum InfoError {
844 ServerStopped,
846 Full,
848 Cancelled(CancelReason),
850}
851
852impl std::fmt::Display for InfoError {
853 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
854 match self {
855 Self::ServerStopped => write!(f, "GenServer has stopped"),
856 Self::Full => write!(f, "GenServer mailbox full"),
857 Self::Cancelled(reason) => write!(f, "GenServer info cancelled: {reason}"),
858 }
859 }
860}
861
862impl std::error::Error for InfoError {}
863
864impl<S: GenServer> GenServerHandle<S> {
865 pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
872 if cx.checkpoint().is_err() {
873 cx.trace("gen_server::call_rejected_cancelled");
874 let reason = cx
875 .cancel_reason()
876 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
877 return Err(CallError::Cancelled(reason));
878 }
879
880 if matches!(
881 self.state.load(),
882 ActorState::Stopping | ActorState::Stopped
883 ) {
884 cx.trace("gen_server::call_rejected_stopped");
885 return Err(CallError::ServerStopped);
886 }
887
888 let send_permit = match self.sender.reserve(cx).await {
889 Ok(p) => p,
890 Err(e) => {
891 let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
892 if was_cancelled {
893 cx.trace("gen_server::call_send_cancelled");
894 let reason = cx
895 .cancel_reason()
896 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
897 return Err(CallError::Cancelled(reason));
898 }
899 cx.trace("gen_server::call_send_failed");
900 return Err(CallError::ServerStopped);
901 }
902 };
903
904 let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
905 let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
906 let envelope: Envelope<S> = Envelope::Call {
907 request,
908 reply_permit,
909 };
910
911 if let Err(e) = send_permit.try_send(envelope) {
917 let envelope = match e {
918 mpsc::SendError::Disconnected(v)
919 | mpsc::SendError::Full(v)
920 | mpsc::SendError::Cancelled(v) => v,
921 };
922 if let Envelope::Call { reply_permit, .. } = envelope {
923 let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
924 }
925 cx.trace("gen_server::call_send_failed");
926 return Err(CallError::ServerStopped);
927 }
928
929 cx.trace("gen_server::call_enqueued");
930
931 match reply_rx.recv(cx).await {
932 Ok(v) => Ok(v),
933 Err(oneshot::RecvError::Closed) => {
934 cx.trace("gen_server::call_no_reply");
935 Err(CallError::NoReply)
936 }
937 Err(oneshot::RecvError::Cancelled) => {
938 cx.trace("gen_server::call_reply_cancelled");
939 let reason = cx
940 .cancel_reason()
941 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
942 Err(CallError::Cancelled(reason))
943 }
944 Err(oneshot::RecvError::PolledAfterCompletion) => {
945 unreachable!("GenServer call awaits a fresh reply oneshot recv future")
946 }
947 }
948 }
949
950 pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
952 if cx.checkpoint().is_err() {
953 cx.trace("gen_server::cast_rejected_cancelled");
954 let reason = cx
955 .cancel_reason()
956 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
957 return Err(CastError::Cancelled(reason));
958 }
959
960 if matches!(
961 self.state.load(),
962 ActorState::Stopping | ActorState::Stopped
963 ) {
964 cx.trace("gen_server::cast_rejected_stopped");
965 return Err(CastError::ServerStopped);
966 }
967 let envelope: Envelope<S> = Envelope::Cast { msg };
968 self.sender.send(cx, envelope).await.map_err(|e| match e {
969 mpsc::SendError::Cancelled(_) => {
970 cx.trace("gen_server::cast_send_cancelled");
971 let reason = cx
972 .cancel_reason()
973 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
974 CastError::Cancelled(reason)
975 }
976 mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
977 cx.trace("gen_server::cast_send_failed");
978 CastError::ServerStopped
979 }
980 })
981 }
982
983 pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
989 if matches!(
990 self.state.load(),
991 ActorState::Stopping | ActorState::Stopped
992 ) {
993 return Err(CastError::ServerStopped);
994 }
995 let envelope: Envelope<S> = Envelope::Cast { msg };
996 match self.overflow_policy {
997 CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
998 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
999 CastError::ServerStopped
1000 }
1001 mpsc::SendError::Full(_) => CastError::Full,
1002 }),
1003 CastOverflowPolicy::DropOldest => {
1004 match self.sender.send_evict_oldest_where(envelope, |queued| {
1005 matches!(queued, Envelope::Cast { .. })
1006 }) {
1007 Ok(Some(_evicted)) => {
1008 if let Some(cx) = Cx::current() {
1010 cx.trace("gen_server::cast_evicted_oldest");
1011 }
1012 Ok(())
1013 }
1014 Ok(None) => Ok(()),
1015 Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1016 Err(CastError::ServerStopped)
1017 }
1018 Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
1019 }
1020 }
1021 }
1022 }
1023
1024 pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1026 if cx.checkpoint().is_err() {
1027 let reason = cx
1028 .cancel_reason()
1029 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1030 return Err(InfoError::Cancelled(reason));
1031 }
1032
1033 if matches!(
1034 self.state.load(),
1035 ActorState::Stopping | ActorState::Stopped
1036 ) {
1037 cx.trace("gen_server::info_rejected_stopped");
1038 return Err(InfoError::ServerStopped);
1039 }
1040
1041 let envelope: Envelope<S> = Envelope::Info { msg };
1042 self.sender.send(cx, envelope).await.map_err(|e| match e {
1043 mpsc::SendError::Cancelled(_) => {
1044 let reason = cx
1045 .cancel_reason()
1046 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1047 InfoError::Cancelled(reason)
1048 }
1049 mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1050 mpsc::SendError::Full(_) => InfoError::Full,
1051 })
1052 }
1053
1054 pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1056 if matches!(
1057 self.state.load(),
1058 ActorState::Stopping | ActorState::Stopped
1059 ) {
1060 return Err(InfoError::ServerStopped);
1061 }
1062
1063 let envelope: Envelope<S> = Envelope::Info { msg };
1064 self.sender.try_send(envelope).map_err(|e| match e {
1065 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1066 InfoError::ServerStopped
1067 }
1068 mpsc::SendError::Full(_) => InfoError::Full,
1069 })
1070 }
1071
1072 #[inline]
1074 #[must_use]
1075 pub fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1076 self.overflow_policy
1077 }
1078
1079 #[inline]
1081 #[must_use]
1082 pub const fn actor_id(&self) -> ActorId {
1083 self.actor_id
1084 }
1085
1086 #[inline]
1088 #[must_use]
1089 pub fn task_id(&self) -> TaskId {
1090 self.task_id
1091 }
1092
1093 #[inline]
1095 #[must_use]
1096 pub fn is_finished(&self) -> bool {
1097 self.completed || self.receiver.is_ready() || self.receiver.is_closed()
1098 }
1099
1100 pub fn stop(&self) {
1104 self.state.store(ActorState::Stopping);
1105 self.sender.wake_receiver();
1108 }
1109
1110 pub fn abort(&self) {
1115 self.state.store(ActorState::Stopping);
1116 if let Some(inner) = self.inner.upgrade() {
1117 let cancel_waker = {
1118 let mut guard = inner.write();
1119 guard.cancel_requested = true;
1120 guard
1121 .fast_cancel
1122 .store(true, std::sync::atomic::Ordering::Release);
1123 if guard.cancel_reason.is_none() {
1124 guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
1125 }
1126 guard.cancel_waker.clone()
1127 };
1128 if let Some(waker) = cancel_waker {
1129 waker.wake_by_ref();
1130 }
1131 }
1132 self.sender.wake_receiver();
1133 }
1134
1135 pub fn join<'a>(&'a mut self, _cx: &'a Cx) -> GenServerJoinFuture<'a, S> {
1137 let cx_inner = self.inner.clone();
1138 let receiver = &mut self.receiver;
1139 let terminal_state = &mut self.completed;
1140 GenServerJoinFuture {
1141 inner: receiver.recv_uninterruptible(),
1142 cx_inner,
1143 sender: self.sender.clone(),
1144 state: Arc::clone(&self.state),
1145 terminal_state,
1146 drop_abort_defused: false,
1147 }
1148 }
1149}
1150
1151pub struct GenServerJoinFuture<'a, S: GenServer> {
1156 inner: oneshot::RecvUninterruptibleFuture<'a, Result<S, JoinError>>,
1157 cx_inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
1158 sender: mpsc::Sender<Envelope<S>>,
1159 state: Arc<GenServerStateCell>,
1160 terminal_state: &'a mut bool,
1161 drop_abort_defused: bool,
1162}
1163
1164impl<S: GenServer> GenServerJoinFuture<'_, S> {
1165 fn closed_reason(&self) -> crate::types::CancelReason {
1166 self.cx_inner
1167 .upgrade()
1168 .and_then(|inner| inner.read().cancel_reason.clone())
1169 .unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
1170 }
1171
1172 fn abort(&self) {
1173 self.state.store(ActorState::Stopping);
1174 if let Some(inner) = self.cx_inner.upgrade() {
1175 let cancel_waker = {
1176 let mut guard = inner.write();
1177 guard.cancel_requested = true;
1178 guard
1179 .fast_cancel
1180 .store(true, std::sync::atomic::Ordering::Release);
1181 if guard.cancel_reason.is_none() {
1182 guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
1183 }
1184 guard.cancel_waker.clone()
1185 };
1186 if let Some(waker) = cancel_waker {
1187 waker.wake_by_ref();
1188 }
1189 }
1190 self.sender.wake_receiver();
1191 }
1192}
1193
1194impl<S: GenServer> std::future::Future for GenServerJoinFuture<'_, S> {
1195 type Output = Result<S, JoinError>;
1196
1197 fn poll(
1198 mut self: std::pin::Pin<&mut Self>,
1199 cx: &mut std::task::Context<'_>,
1200 ) -> std::task::Poll<Self::Output> {
1201 let this = &mut *self;
1202 if *this.terminal_state {
1203 return std::task::Poll::Ready(Err(JoinError::PolledAfterCompletion));
1204 }
1205
1206 match std::pin::Pin::new(&mut this.inner).poll(cx) {
1207 std::task::Poll::Ready(Ok(res)) => {
1208 *this.terminal_state = true;
1209 this.drop_abort_defused = true;
1210 std::task::Poll::Ready(res)
1211 }
1212 std::task::Poll::Ready(Err(oneshot::RecvError::Closed)) => {
1213 *this.terminal_state = true;
1214 this.drop_abort_defused = true;
1215 let reason = this.closed_reason();
1216 std::task::Poll::Ready(Err(JoinError::Cancelled(reason)))
1217 }
1218 std::task::Poll::Ready(Err(oneshot::RecvError::Cancelled)) => {
1219 unreachable!("RecvUninterruptibleFuture cannot return Cancelled");
1220 }
1221 std::task::Poll::Ready(Err(oneshot::RecvError::PolledAfterCompletion)) => {
1222 unreachable!(
1223 "JoinFuture guards repolls before polling the inner oneshot recv future"
1224 )
1225 }
1226 std::task::Poll::Pending => std::task::Poll::Pending,
1227 }
1228 }
1229}
1230
1231impl<S: GenServer> Drop for GenServerJoinFuture<'_, S> {
1232 fn drop(&mut self) {
1233 if !*self.terminal_state && !self.drop_abort_defused {
1234 if self.inner.receiver_finished() {
1235 return;
1236 }
1237 self.abort();
1238 }
1239 }
1240}
1241
1242#[derive(Debug)]
1247pub struct GenServerRef<S: GenServer> {
1248 actor_id: ActorId,
1249 sender: mpsc::Sender<Envelope<S>>,
1250 state: Arc<GenServerStateCell>,
1251 overflow_policy: CastOverflowPolicy,
1252}
1253
1254impl<S: GenServer> Clone for GenServerRef<S> {
1255 fn clone(&self) -> Self {
1256 Self {
1257 actor_id: self.actor_id,
1258 sender: self.sender.clone(),
1259 state: Arc::clone(&self.state),
1260 overflow_policy: self.overflow_policy,
1261 }
1262 }
1263}
1264
1265impl<S: GenServer> GenServerRef<S> {
1266 #[must_use]
1268 pub const fn cast_overflow_policy(&self) -> CastOverflowPolicy {
1269 self.overflow_policy
1270 }
1271
1272 pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
1274 if cx.checkpoint().is_err() {
1275 cx.trace("gen_server::call_rejected_cancelled");
1276 let reason = cx
1277 .cancel_reason()
1278 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1279 return Err(CallError::Cancelled(reason));
1280 }
1281
1282 if matches!(
1283 self.state.load(),
1284 ActorState::Stopping | ActorState::Stopped
1285 ) {
1286 cx.trace("gen_server::call_rejected_stopped");
1287 return Err(CallError::ServerStopped);
1288 }
1289
1290 let send_permit = match self.sender.reserve(cx).await {
1291 Ok(p) => p,
1292 Err(e) => {
1293 let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
1294 if was_cancelled {
1295 cx.trace("gen_server::call_send_cancelled");
1296 let reason = cx
1297 .cancel_reason()
1298 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1299 return Err(CallError::Cancelled(reason));
1300 }
1301 cx.trace("gen_server::call_send_failed");
1302 return Err(CallError::ServerStopped);
1303 }
1304 };
1305
1306 let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
1307 let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
1308 let envelope: Envelope<S> = Envelope::Call {
1309 request,
1310 reply_permit,
1311 };
1312
1313 if let Err(e) = send_permit.try_send(envelope) {
1319 let envelope = match e {
1320 mpsc::SendError::Disconnected(v)
1321 | mpsc::SendError::Full(v)
1322 | mpsc::SendError::Cancelled(v) => v,
1323 };
1324 if let Envelope::Call { reply_permit, .. } = envelope {
1325 let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
1326 }
1327 cx.trace("gen_server::call_send_failed");
1328 return Err(CallError::ServerStopped);
1329 }
1330
1331 cx.trace("gen_server::call_enqueued");
1332
1333 match reply_rx.recv(cx).await {
1334 Ok(v) => Ok(v),
1335 Err(oneshot::RecvError::Closed) => {
1336 cx.trace("gen_server::call_no_reply");
1337 Err(CallError::NoReply)
1338 }
1339 Err(oneshot::RecvError::Cancelled) => {
1340 cx.trace("gen_server::call_reply_cancelled");
1341 let reason = cx
1342 .cancel_reason()
1343 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1344 Err(CallError::Cancelled(reason))
1345 }
1346 Err(oneshot::RecvError::PolledAfterCompletion) => {
1347 unreachable!("GenServerRef::call awaits a fresh reply oneshot recv future")
1348 }
1349 }
1350 }
1351
1352 pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
1354 if cx.checkpoint().is_err() {
1355 cx.trace("gen_server::cast_rejected_cancelled");
1356 let reason = cx
1357 .cancel_reason()
1358 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1359 return Err(CastError::Cancelled(reason));
1360 }
1361
1362 if matches!(
1363 self.state.load(),
1364 ActorState::Stopping | ActorState::Stopped
1365 ) {
1366 cx.trace("gen_server::cast_rejected_stopped");
1367 return Err(CastError::ServerStopped);
1368 }
1369 let envelope: Envelope<S> = Envelope::Cast { msg };
1370 self.sender.send(cx, envelope).await.map_err(|e| match e {
1371 mpsc::SendError::Cancelled(_) => {
1372 cx.trace("gen_server::cast_send_cancelled");
1373 let reason = cx
1374 .cancel_reason()
1375 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1376 CastError::Cancelled(reason)
1377 }
1378 mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
1379 cx.trace("gen_server::cast_send_failed");
1380 CastError::ServerStopped
1381 }
1382 })
1383 }
1384
1385 pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
1389 if matches!(
1390 self.state.load(),
1391 ActorState::Stopping | ActorState::Stopped
1392 ) {
1393 return Err(CastError::ServerStopped);
1394 }
1395 let envelope: Envelope<S> = Envelope::Cast { msg };
1396 match self.overflow_policy {
1397 CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
1398 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1399 CastError::ServerStopped
1400 }
1401 mpsc::SendError::Full(_) => CastError::Full,
1402 }),
1403 CastOverflowPolicy::DropOldest => match self
1404 .sender
1405 .send_evict_oldest_where(envelope, |queued| matches!(queued, Envelope::Cast { .. }))
1406 {
1407 Ok(Some(evicted)) => {
1408 debug_assert!(matches!(evicted, Envelope::Cast { .. }));
1409 if let Some(cx) = Cx::current() {
1410 cx.trace("gen_server::cast_evicted_oldest");
1411 }
1412 Ok(())
1413 }
1414 Ok(None) => Ok(()),
1415 Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
1416 Err(CastError::ServerStopped)
1417 }
1418 Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
1419 },
1420 }
1421 }
1422
1423 pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
1425 if cx.checkpoint().is_err() {
1426 let reason = cx
1427 .cancel_reason()
1428 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1429 return Err(InfoError::Cancelled(reason));
1430 }
1431
1432 if matches!(
1433 self.state.load(),
1434 ActorState::Stopping | ActorState::Stopped
1435 ) {
1436 cx.trace("gen_server::info_rejected_stopped");
1437 return Err(InfoError::ServerStopped);
1438 }
1439
1440 let envelope: Envelope<S> = Envelope::Info { msg };
1441 self.sender.send(cx, envelope).await.map_err(|e| match e {
1442 mpsc::SendError::Cancelled(_) => {
1443 let reason = cx
1444 .cancel_reason()
1445 .unwrap_or_else(crate::types::CancelReason::parent_cancelled);
1446 InfoError::Cancelled(reason)
1447 }
1448 mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
1449 mpsc::SendError::Full(_) => InfoError::Full,
1450 })
1451 }
1452
1453 pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
1455 if matches!(
1456 self.state.load(),
1457 ActorState::Stopping | ActorState::Stopped
1458 ) {
1459 return Err(InfoError::ServerStopped);
1460 }
1461
1462 let envelope: Envelope<S> = Envelope::Info { msg };
1463 self.sender.try_send(envelope).map_err(|e| match e {
1464 mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
1465 InfoError::ServerStopped
1466 }
1467 mpsc::SendError::Full(_) => InfoError::Full,
1468 })
1469 }
1470
1471 #[inline]
1473 #[must_use]
1474 pub fn is_closed(&self) -> bool {
1475 self.sender.is_closed()
1476 }
1477
1478 #[inline]
1480 #[must_use]
1481 pub fn is_alive(&self) -> bool {
1482 self.state.load() != ActorState::Stopped
1483 }
1484
1485 #[inline]
1487 #[must_use]
1488 pub const fn actor_id(&self) -> ActorId {
1489 self.actor_id
1490 }
1491}
1492
1493impl<S: GenServer> GenServerHandle<S> {
1494 #[inline]
1496 #[must_use]
1497 pub fn server_ref(&self) -> GenServerRef<S> {
1498 GenServerRef {
1499 actor_id: self.actor_id,
1500 sender: self.sender.clone(),
1501 state: Arc::clone(&self.state),
1502 overflow_policy: self.overflow_policy,
1503 }
1504 }
1505}
1506
1507pub const DEFAULT_GENSERVER_MAILBOX_CAPACITY: usize = 64;
1513
1514async fn run_gen_server_loop<S: GenServer>(
1516 mut server: S,
1517 cx: Cx,
1518 cell: &mut GenServerCell<S>,
1519) -> S {
1520 use crate::tracing_compat::debug;
1521
1522 if cell.state.load() != ActorState::Stopping {
1526 cell.state.store(ActorState::Running);
1527 }
1528
1529 if cx.checkpoint().is_err() || cell.state.load() == ActorState::Stopping {
1533 cx.trace("gen_server::init_skipped_cancelled");
1534 } else {
1535 cx.trace("gen_server::init");
1536 let _budget = PhaseBudgetGuard::enter(&cx, server.on_start_budget(), true);
1537 server.on_start(&cx).await;
1538 }
1539
1540 loop {
1542 if cx.checkpoint().is_err() {
1543 cx.trace("gen_server::cancel_requested");
1544 break;
1545 }
1546
1547 let recv_result = std::future::poll_fn(|task_cx| {
1548 match cell.mailbox.poll_recv(&cx, task_cx) {
1549 std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
1550 std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
1552 }
1553 other => other,
1554 }
1555 })
1556 .await;
1557
1558 match recv_result {
1559 Ok(envelope) => {
1560 dispatch_envelope(&mut server, &cx, envelope).await;
1561 }
1562 Err(crate::channel::mpsc::RecvError::Disconnected) => {
1563 cx.trace("gen_server::mailbox_disconnected");
1564 break;
1565 }
1566 Err(crate::channel::mpsc::RecvError::Cancelled) => {
1567 cx.trace("gen_server::recv_cancelled");
1568 break;
1569 }
1570 Err(crate::channel::mpsc::RecvError::Empty) => {
1571 break;
1572 }
1573 }
1574 }
1575
1576 cell.state.store(ActorState::Stopping);
1577
1578 let _budget = PhaseBudgetGuard::enter(&cx, server.on_stop_budget(), false);
1584 let _mask = AsyncMaskGuard::enter(&cx);
1585
1586 let is_aborted = cx.checkpoint().is_err();
1590
1591 cell.mailbox.close();
1592
1593 let mut drained: u64 = 0;
1594 while let Ok(envelope) = cell.mailbox.try_recv() {
1595 match envelope {
1596 Envelope::Call {
1597 request: _,
1598 reply_permit,
1599 } => {
1600 let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
1601 cx.trace("gen_server::drain_abort_call");
1602 }
1603 Envelope::Cast { msg } => {
1604 if !is_aborted {
1605 server.handle_cast(&cx, msg).await;
1606 }
1607 }
1608 Envelope::Info { msg } => {
1609 if !is_aborted {
1610 server.handle_info(&cx, msg).await;
1611 }
1612 }
1613 }
1614 drained += 1;
1615 }
1616 if drained > 0 {
1617 debug!(drained = drained, "gen_server::mailbox_drained");
1618 cx.trace("gen_server::mailbox_drained");
1619 }
1620
1621 cx.trace("gen_server::terminate");
1623 server.on_stop(&cx).await;
1624
1625 server
1626}
1627
1628async fn dispatch_envelope<S: GenServer>(server: &mut S, cx: &Cx, envelope: Envelope<S>) {
1630 match envelope {
1631 Envelope::Call {
1632 request,
1633 reply_permit,
1634 } => {
1635 let reply = Reply::<S::Reply>::new(cx, reply_permit);
1636 server.handle_call(cx, request, reply).await;
1637 }
1638 Envelope::Cast { msg } => {
1639 server.handle_cast(cx, msg).await;
1640 }
1641 Envelope::Info { msg } => {
1642 server.handle_info(cx, msg).await;
1643 }
1644 }
1645}
1646
1647impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
1652 pub fn spawn_gen_server<S: GenServer>(
1657 &self,
1658 state: &mut crate::runtime::state::RuntimeState,
1659 cx: &Cx,
1660 server: S,
1661 mailbox_capacity: usize,
1662 ) -> Result<(GenServerHandle<S>, crate::runtime::stored_task::StoredTask), SpawnError> {
1663 use crate::cx::scope::CatchUnwind;
1664 use crate::runtime::stored_task::StoredTask;
1665 use crate::tracing_compat::{debug, debug_span};
1666
1667 let overflow_policy = server.cast_overflow_policy();
1668 let (msg_tx, msg_rx) = mpsc::channel::<Envelope<S>>(mailbox_capacity);
1669 let (result_tx, result_rx) = oneshot::channel::<Result<S, JoinError>>();
1670 let task_id = self.create_task_record(state)?;
1671 let actor_id = ActorId::from_task(task_id);
1672 let server_state = Arc::new(GenServerStateCell::new(ActorState::Created));
1673
1674 let _span = debug_span!(
1675 "gen_server_spawn",
1676 task_id = ?task_id,
1677 region_id = ?self.region_id(),
1678 mailbox_capacity = mailbox_capacity,
1679 )
1680 .entered();
1681 debug!(
1682 task_id = ?task_id,
1683 region_id = ?self.region_id(),
1684 mailbox_capacity = mailbox_capacity,
1685 "gen_server spawned"
1686 );
1687
1688 let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
1689
1690 if let Some(record) = state.task_mut(task_id) {
1691 record.set_cx_inner(child_cx.inner.clone());
1692 record.set_cx(child_cx_full.clone());
1693 }
1694
1695 let cx_for_send = child_cx_full;
1696 let inner_weak = Arc::downgrade(&child_cx.inner);
1697 let state_for_task = Arc::clone(&server_state);
1698
1699 let mut cell = GenServerCell {
1700 mailbox: msg_rx,
1701 state: Arc::clone(&server_state),
1702 _keep_alive: msg_tx.clone(),
1703 };
1704
1705 let wrapped = async move {
1706 let result = CatchUnwind {
1707 inner: Box::pin(run_gen_server_loop(server, child_cx, &mut cell)),
1708 }
1709 .await;
1710 match result {
1711 Ok(server_final) => {
1712 let _ = result_tx.send(&cx_for_send, Ok(server_final));
1713 }
1714 Err(payload) => {
1715 let msg = crate::cx::scope::payload_to_string(&payload);
1716 let _ = result_tx.send(
1717 &cx_for_send,
1718 Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
1719 );
1720 }
1721 }
1722 state_for_task.store(ActorState::Stopped);
1723 Outcome::Ok(())
1724 };
1725
1726 let stored = StoredTask::new_with_id(wrapped, task_id);
1727
1728 let handle = GenServerHandle {
1729 actor_id,
1730 sender: msg_tx,
1731 state: server_state,
1732 task_id,
1733 receiver: result_rx,
1734 inner: inner_weak,
1735 completed: false,
1736 overflow_policy,
1737 };
1738
1739 Ok((handle, stored))
1740 }
1741
1742 #[allow(clippy::too_many_arguments)]
1762 pub fn spawn_named_gen_server<S: GenServer>(
1763 &self,
1764 state: &mut crate::runtime::state::RuntimeState,
1765 cx: &Cx,
1766 registry: &mut crate::cx::NameRegistry,
1767 name: impl Into<String>,
1768 server: S,
1769 mailbox_capacity: usize,
1770 now: crate::types::Time,
1771 ) -> Result<
1772 (
1773 NamedGenServerHandle<S>,
1774 crate::runtime::stored_task::StoredTask,
1775 ),
1776 NamedSpawnError,
1777 > {
1778 let name = name.into();
1779
1780 let (handle, stored) = self
1782 .spawn_gen_server(state, cx, server, mailbox_capacity)
1783 .map_err(NamedSpawnError::Spawn)?;
1784
1785 let task_id = handle.task_id();
1787 let region = self.region_id();
1788
1789 match registry.register(name, task_id, region, now) {
1790 Ok(lease) => {
1791 let named = NamedGenServerHandle {
1792 handle,
1793 lease: Some(lease),
1794 };
1795 Ok((named, stored))
1796 }
1797 Err(e) => {
1798 let task_id = handle.task_id();
1803 if let Some(region_record) = state.region(self.region_id()) {
1804 region_record.remove_task(task_id);
1805 }
1806 state.remove_task(task_id);
1807 Err(NamedSpawnError::NameTaken(e))
1808 }
1809 }
1810 }
1811}
1812
1813#[derive(Debug)]
1819pub enum NamedSpawnError {
1820 Spawn(SpawnError),
1822 NameTaken(crate::cx::NameLeaseError),
1824}
1825
1826impl std::fmt::Display for NamedSpawnError {
1827 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1828 match self {
1829 Self::Spawn(e) => write!(f, "named server spawn failed: {e}"),
1830 Self::NameTaken(e) => write!(f, "named server registration failed: {e}"),
1831 }
1832 }
1833}
1834
1835impl std::error::Error for NamedSpawnError {}
1836
1837#[derive(Debug)]
1839pub enum ReleaseNameError {
1840 StillRunning,
1842 Lease(crate::cx::NameLeaseError),
1844}
1845
1846impl std::fmt::Display for ReleaseNameError {
1847 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1848 match self {
1849 Self::StillRunning => write!(f, "named server is still running"),
1850 Self::Lease(err) => write!(f, "named server lease resolution failed: {err}"),
1851 }
1852 }
1853}
1854
1855impl std::error::Error for ReleaseNameError {}
1856
1857#[derive(Debug)]
1866pub struct NamedGenServerHandle<S: GenServer> {
1867 handle: GenServerHandle<S>,
1868 lease: Option<crate::cx::NameLease>,
1869}
1870
1871impl<S: GenServer> NamedGenServerHandle<S> {
1872 #[inline]
1874 #[must_use]
1875 pub fn name(&self) -> &str {
1876 self.lease
1877 .as_ref()
1878 .map_or("(released)", crate::cx::NameLease::name)
1879 }
1880
1881 #[inline]
1883 #[must_use]
1884 pub fn task_id(&self) -> TaskId {
1885 self.handle.task_id()
1886 }
1887
1888 #[inline]
1890 #[must_use]
1891 pub fn actor_id(&self) -> ActorId {
1892 self.handle.actor_id()
1893 }
1894
1895 #[inline]
1897 #[must_use]
1898 pub fn is_finished(&self) -> bool {
1899 self.handle.is_finished()
1900 }
1901
1902 #[must_use]
1904 pub fn server_ref(&self) -> GenServerRef<S> {
1905 self.handle.server_ref()
1906 }
1907
1908 #[must_use]
1910 pub fn inner(&self) -> &GenServerHandle<S> {
1911 &self.handle
1912 }
1913
1914 pub fn stop(&self) {
1916 self.handle.stop();
1917 }
1918
1919 pub fn release_name(
1934 &mut self,
1935 registry: &mut crate::cx::NameRegistry,
1936 now: Time,
1937 ) -> Result<(), ReleaseNameError> {
1938 let Some(lease) = self.lease.as_ref() else {
1939 return Err(ReleaseNameError::Lease(
1940 crate::cx::NameLeaseError::AlreadyResolved,
1941 ));
1942 };
1943 if !lease.is_active() {
1944 return Err(ReleaseNameError::Lease(
1945 crate::cx::NameLeaseError::AlreadyResolved,
1946 ));
1947 }
1948
1949 if !self.handle.is_finished() {
1950 return Err(ReleaseNameError::StillRunning);
1951 }
1952
1953 registry
1954 .unregister_owned_and_grant(lease, now)
1955 .map(|_proof| ())
1956 .map_err(ReleaseNameError::Lease)
1957 .and_then(|()| {
1958 self.lease
1959 .take()
1960 .ok_or(ReleaseNameError::Lease(
1961 crate::cx::NameLeaseError::AlreadyResolved,
1962 ))?
1963 .release()
1964 .map(|_proof| ())
1965 .map_err(ReleaseNameError::Lease)
1966 })
1967 }
1968
1969 pub fn abort_lease(
1979 &mut self,
1980 registry: &mut crate::cx::NameRegistry,
1981 now: Time,
1982 ) -> Result<(), crate::cx::NameLeaseError> {
1983 let Some(lease) = self.lease.as_ref() else {
1984 return Err(crate::cx::NameLeaseError::AlreadyResolved);
1985 };
1986 if !lease.is_active() {
1987 return Err(crate::cx::NameLeaseError::AlreadyResolved);
1988 }
1989 registry.unregister_owned_and_grant(lease, now)?;
1990 self.lease
1991 .take()
1992 .ok_or(crate::cx::NameLeaseError::AlreadyResolved)?
1993 .abort()
1994 .map(|_proof| ())
1995 }
1996
1997 pub fn take_lease(&mut self) -> Option<crate::cx::NameLease> {
2004 self.lease.take()
2005 }
2006}
2007
2008pub struct NamedGenServerStart<S, F>
2022where
2023 S: GenServer,
2024 F: FnMut() -> S + Send + 'static,
2025{
2026 registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
2027 name: String,
2028 mailbox_capacity: usize,
2029 make_server: F,
2030}
2031
2032#[must_use]
2034pub fn named_gen_server_start<S, F>(
2035 registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
2036 name: impl Into<String>,
2037 mailbox_capacity: usize,
2038 make_server: F,
2039) -> NamedGenServerStart<S, F>
2040where
2041 S: GenServer,
2042 F: FnMut() -> S + Send + 'static,
2043{
2044 NamedGenServerStart {
2045 registry,
2046 name: name.into(),
2047 mailbox_capacity,
2048 make_server,
2049 }
2050}
2051
2052impl<S, F> crate::supervision::ChildStart for NamedGenServerStart<S, F>
2053where
2054 S: GenServer,
2055 F: FnMut() -> S + Send + 'static,
2056{
2057 fn start(
2058 &mut self,
2059 scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
2060 state: &mut crate::runtime::RuntimeState,
2061 cx: &crate::cx::Cx,
2062 ) -> Result<TaskId, SpawnError> {
2063 let now = state.now;
2064 let server = (self.make_server)();
2065 let (mut named_handle, stored) = scope
2066 .spawn_named_gen_server(
2067 state,
2068 cx,
2069 &mut self.registry.lock(),
2070 self.name.clone(),
2071 server,
2072 self.mailbox_capacity,
2073 now,
2074 )
2075 .map_err(|err| match err {
2076 NamedSpawnError::Spawn(spawn_err) => spawn_err,
2077 NamedSpawnError::NameTaken(name_err) => SpawnError::NameRegistrationFailed {
2078 name: self.name.clone(),
2079 reason: name_err.to_string(),
2080 },
2081 })?;
2082
2083 let task_id = named_handle.task_id();
2084 state.store_spawned_task(task_id, stored);
2085
2086 let lease_slot = Arc::new(parking_lot::Mutex::new(named_handle.take_lease()));
2087 let lease_slot_for_finalizer = Arc::clone(&lease_slot);
2088 let registry_for_finalizer = Arc::clone(&self.registry);
2089 let finalizer_registered = scope.defer_sync(state, move || {
2090 let lease_to_resolve = lease_slot_for_finalizer.lock().take();
2091 if let Some(mut lease) = lease_to_resolve {
2092 let _ = registry_for_finalizer
2093 .lock()
2094 .unregister_owned_and_grant(&lease, Time::ZERO);
2095 let _ = lease.release();
2096 }
2097 });
2098
2099 if !finalizer_registered {
2100 let lease_to_abort = lease_slot.lock().take();
2101 if let Some(mut lease) = lease_to_abort {
2102 let _ = self.registry.lock().unregister_owned_and_grant(&lease, now);
2103 let _ = lease.abort();
2104 }
2105 if let Some(region_record) = state.region(scope.region_id()) {
2106 region_record.remove_task(task_id);
2107 }
2108 state.remove_task(task_id);
2109 return Err(SpawnError::RegionClosed(scope.region_id()));
2110 }
2111
2112 Ok(task_id)
2113 }
2114}
2115
2116#[cfg(test)]
2121mod tests {
2122 use super::*;
2123 use crate::runtime::state::RuntimeState;
2124 use crate::runtime::yield_now;
2125 use crate::supervision::ChildStart;
2126 use crate::types::Budget;
2127 use crate::types::CancelKind;
2128 use crate::types::policy::FailFast;
2129 use crate::util::ArenaIndex;
2130 use parking_lot::Mutex;
2131 use std::sync::Arc;
2132 use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
2133
2134 fn init_test(name: &str) {
2135 crate::test_utils::init_test_logging();
2136 crate::test_phase!(name);
2137 }
2138
2139 #[derive(Debug)]
2142 struct Counter {
2143 count: u64,
2144 }
2145
2146 enum CounterCall {
2147 Get,
2148 Add(u64),
2149 }
2150
2151 enum CounterCast {
2152 Reset,
2153 }
2154
2155 impl GenServer for Counter {
2156 type Call = CounterCall;
2157 type Reply = u64;
2158 type Cast = CounterCast;
2159 type Info = SystemMsg;
2160
2161 fn handle_call(
2162 &mut self,
2163 _cx: &Cx,
2164 request: CounterCall,
2165 reply: Reply<u64>,
2166 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2167 match request {
2168 CounterCall::Get => {
2169 let _ = reply.send(self.count);
2170 }
2171 CounterCall::Add(n) => {
2172 self.count += n;
2173 let _ = reply.send(self.count);
2174 }
2175 }
2176 Box::pin(async {})
2177 }
2178
2179 fn handle_cast(
2180 &mut self,
2181 _cx: &Cx,
2182 msg: CounterCast,
2183 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2184 match msg {
2185 CounterCast::Reset => self.count = 0,
2186 }
2187 Box::pin(async {})
2188 }
2189 }
2190
2191 #[derive(Clone)]
2192 struct StartBudgetProbe {
2193 started_priority: Arc<AtomicU8>,
2194 loop_priority: Arc<AtomicU8>,
2195 }
2196
2197 impl GenServer for StartBudgetProbe {
2198 type Call = CounterCall;
2199 type Reply = u8;
2200 type Cast = CounterCast;
2201 type Info = SystemMsg;
2202
2203 fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2204 self.started_priority
2205 .store(cx.budget().priority, Ordering::SeqCst);
2206 Box::pin(async {})
2207 }
2208
2209 fn on_start_budget(&self) -> Budget {
2210 Budget::new().with_priority(200)
2211 }
2212
2213 fn handle_call(
2214 &mut self,
2215 cx: &Cx,
2216 _request: CounterCall,
2217 reply: Reply<u8>,
2218 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2219 self.loop_priority
2220 .store(cx.budget().priority, Ordering::SeqCst);
2221 let _ = reply.send(cx.budget().priority);
2222 Box::pin(async {})
2223 }
2224 }
2225
2226 struct StopMaskProbe {
2227 stop_checkpoint_ok: Arc<AtomicU8>,
2228 }
2229
2230 impl GenServer for StopMaskProbe {
2231 type Call = CounterCall;
2232 type Reply = u8;
2233 type Cast = CounterCast;
2234 type Info = SystemMsg;
2235
2236 fn handle_call(
2237 &mut self,
2238 _cx: &Cx,
2239 _request: CounterCall,
2240 reply: Reply<u8>,
2241 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2242 let _ = reply.send(0);
2243 Box::pin(async {})
2244 }
2245
2246 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2247 let ok = cx.checkpoint().is_ok();
2248 self.stop_checkpoint_ok
2249 .store(u8::from(ok), Ordering::SeqCst);
2250 Box::pin(async {})
2251 }
2252 }
2253
2254 enum InitProbeCall {
2255 GetStarted,
2256 }
2257
2258 struct InitProbe {
2259 started: Arc<AtomicU8>,
2260 checkpoints: Arc<Mutex<Vec<serde_json::Value>>>,
2261 }
2262
2263 impl GenServer for InitProbe {
2264 type Call = InitProbeCall;
2265 type Reply = bool;
2266 type Cast = ();
2267 type Info = SystemMsg;
2268
2269 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2270 self.started.store(1, Ordering::SeqCst);
2271 let started = Arc::clone(&self.started);
2272 let checkpoints = Arc::clone(&self.checkpoints);
2273 Box::pin(async move {
2274 let event = serde_json::json!({
2275 "phase": "on_start",
2276 "started": started.load(Ordering::SeqCst),
2277 });
2278 tracing::info!(event = %event, "gen_server_lab_checkpoint");
2279 checkpoints.lock().push(event);
2280 yield_now().await;
2281 })
2282 }
2283
2284 fn handle_call(
2285 &mut self,
2286 _cx: &Cx,
2287 request: InitProbeCall,
2288 reply: Reply<bool>,
2289 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2290 match request {
2291 InitProbeCall::GetStarted => {
2292 let started = self.started.load(Ordering::SeqCst) == 1;
2293 let event = serde_json::json!({
2294 "phase": "handle_call",
2295 "started": started,
2296 });
2297 tracing::info!(event = %event, "gen_server_lab_checkpoint");
2298 self.checkpoints.lock().push(event);
2299 let _ = reply.send(started);
2300 }
2301 }
2302 Box::pin(async {})
2303 }
2304 }
2305
2306 fn assert_gen_server<S: GenServer>() {}
2307
2308 #[test]
2309 fn gen_server_trait_bounds() {
2310 init_test("gen_server_trait_bounds");
2311 assert_gen_server::<Counter>();
2312 crate::test_complete!("gen_server_trait_bounds");
2313 }
2314
2315 #[test]
2316 fn gen_server_spawn_and_cast() {
2317 init_test("gen_server_spawn_and_cast");
2318
2319 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2320 let region = runtime.state.create_root_region(Budget::INFINITE);
2321 let cx = Cx::for_testing();
2322 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2323
2324 let (handle, stored) = scope
2325 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2326 .unwrap();
2327 let task_id = handle.task_id();
2328 runtime.state.store_spawned_task(task_id, stored);
2329
2330 handle.try_cast(CounterCast::Reset).unwrap();
2332
2333 drop(handle);
2335
2336 {
2337 runtime.scheduler.lock().schedule(task_id, 0);
2338 }
2339 runtime.run_until_quiescent();
2340
2341 crate::test_complete!("gen_server_spawn_and_cast");
2342 }
2343
2344 #[test]
2345 fn gen_server_spawn_and_call() {
2346 init_test("gen_server_spawn_and_call");
2347
2348 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2349 let region = runtime.state.create_root_region(Budget::INFINITE);
2350 let cx = Cx::for_testing();
2351 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2352
2353 let (handle, stored) = scope
2354 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2355 .unwrap();
2356 let server_task_id = handle.task_id();
2357 runtime.state.store_spawned_task(server_task_id, stored);
2358
2359 let server_ref = handle.server_ref();
2360 let (mut client_handle, client_stored) = scope
2361 .spawn(&mut runtime.state, &cx, move |cx| async move {
2362 server_ref.call(&cx, CounterCall::Add(5)).await.unwrap()
2363 })
2364 .unwrap();
2365 let client_task_id = client_handle.task_id();
2366 runtime
2367 .state
2368 .store_spawned_task(client_task_id, client_stored);
2369
2370 {
2371 runtime.scheduler.lock().schedule(server_task_id, 0);
2372 }
2373 {
2374 runtime.scheduler.lock().schedule(client_task_id, 0);
2375 }
2376 runtime.run_until_quiescent();
2377
2378 let result =
2379 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2380 assert_eq!(result, 5);
2381
2382 crate::test_complete!("gen_server_spawn_and_call");
2383 }
2384
2385 #[test]
2386 fn gen_server_init_runs_before_queued_call_under_lab_runtime() {
2387 init_test("gen_server_init_runs_before_queued_call_under_lab_runtime");
2388
2389 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(0x6E57_1001));
2390 let region = runtime.state.create_root_region(Budget::INFINITE);
2391 let cx = Cx::for_testing();
2392 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2393 let started = Arc::new(AtomicU8::new(0));
2394 let checkpoints = Arc::new(Mutex::new(Vec::new()));
2395
2396 let (mut handle, stored) = scope
2397 .spawn_gen_server(
2398 &mut runtime.state,
2399 &cx,
2400 InitProbe {
2401 started: Arc::clone(&started),
2402 checkpoints: Arc::clone(&checkpoints),
2403 },
2404 8,
2405 )
2406 .expect("spawn should succeed");
2407 let server_task_id = handle.task_id();
2408 runtime.state.store_spawned_task(server_task_id, stored);
2409
2410 let server_ref = handle.server_ref();
2411 let checkpoints_for_client = Arc::clone(&checkpoints);
2412 let (mut client_handle, client_stored) = scope
2413 .spawn(&mut runtime.state, &cx, move |cx| async move {
2414 let started = server_ref
2415 .call(&cx, InitProbeCall::GetStarted)
2416 .await
2417 .expect("init probe call should succeed");
2418 let event = serde_json::json!({
2419 "phase": "client_completed",
2420 "started": started,
2421 });
2422 tracing::info!(event = %event, "gen_server_lab_checkpoint");
2423 checkpoints_for_client.lock().push(event);
2424 started
2425 })
2426 .expect("client spawn should succeed");
2427 let client_task_id = client_handle.task_id();
2428 runtime
2429 .state
2430 .store_spawned_task(client_task_id, client_stored);
2431
2432 {
2433 runtime.scheduler.lock().schedule(client_task_id, 0);
2434 }
2435 runtime.run_until_idle();
2436 {
2437 runtime.scheduler.lock().schedule(server_task_id, 0);
2438 }
2439 runtime.run_until_idle();
2440 {
2441 runtime.scheduler.lock().schedule(client_task_id, 0);
2442 }
2443 runtime.run_until_idle();
2444
2445 let call_saw_initialized =
2446 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2447 crate::assert_with_log!(
2448 call_saw_initialized,
2449 "queued call observes completed gen_server init",
2450 true,
2451 call_saw_initialized
2452 );
2453 crate::assert_with_log!(
2454 started.load(Ordering::SeqCst) == 1,
2455 "on_start marks server initialized",
2456 1,
2457 started.load(Ordering::SeqCst)
2458 );
2459
2460 handle.stop();
2461 {
2462 runtime.scheduler.lock().schedule(server_task_id, 0);
2463 }
2464 runtime.run_until_quiescent();
2465
2466 let server = futures_lite::future::block_on(handle.join(&cx)).expect("server join ok");
2467 crate::assert_with_log!(
2468 server.started.load(Ordering::SeqCst) == 1,
2469 "joined server preserves initialized state",
2470 1,
2471 server.started.load(Ordering::SeqCst)
2472 );
2473
2474 let checkpoint_snapshot = checkpoints.lock().clone();
2475 crate::assert_with_log!(
2476 checkpoint_snapshot.len() == 3,
2477 "lab runtime emits init/call/client checkpoints",
2478 3,
2479 checkpoint_snapshot.len()
2480 );
2481 crate::assert_with_log!(
2482 checkpoint_snapshot[0]["phase"] == "on_start",
2483 "on_start checkpoint recorded first",
2484 "on_start",
2485 checkpoint_snapshot[0]["phase"].clone()
2486 );
2487 crate::assert_with_log!(
2488 runtime.is_quiescent(),
2489 "gen_server init test reaches lab quiescence",
2490 true,
2491 runtime.is_quiescent()
2492 );
2493
2494 crate::test_complete!("gen_server_init_runs_before_queued_call_under_lab_runtime");
2495 }
2496
2497 #[test]
2498 #[allow(clippy::items_after_statements, clippy::too_many_lines)]
2499 fn gen_server_spawn_inherits_full_child_cx_capabilities() {
2500 use crate::cx::registry::RegistryHandle;
2501 use crate::evidence_sink::{CollectorSink, EvidenceSink};
2502 use crate::observability::{LogCollector, LogLevel};
2503 use crate::remote::{NodeId, RemoteCap};
2504 use franken_evidence::EvidenceLedgerBuilder;
2505
2506 init_test("gen_server_spawn_inherits_full_child_cx_capabilities");
2507
2508 #[derive(Debug, Default)]
2509 #[allow(clippy::struct_excessive_bools)]
2510 struct CapabilityProbe {
2511 has_timer: bool,
2512 has_io_driver: bool,
2513 has_registry: bool,
2514 has_remote: bool,
2515 has_blocking_pool: bool,
2516 has_log_collector: bool,
2517 remote_origin: Option<String>,
2518 logical_tick_advanced: bool,
2519 }
2520
2521 impl GenServer for CapabilityProbe {
2522 type Call = ();
2523 type Reply = ();
2524 type Cast = ();
2525 type Info = SystemMsg;
2526
2527 fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2528 self.has_timer = cx.has_timer();
2529 self.has_io_driver = cx.io_driver_handle().is_some();
2530 self.has_registry = cx.registry_handle().is_some();
2531 self.has_remote = cx.has_remote();
2532 self.has_blocking_pool = cx.blocking_pool_handle().is_some();
2533 self.has_log_collector = cx.log_collector().is_some();
2534 self.remote_origin = cx.remote().map(|remote| remote.local_node().to_string());
2535 let before = cx.logical_now();
2536 let after = cx.logical_tick();
2537 self.logical_tick_advanced = after > before;
2538 cx.trace("gen_server_capability_probe_trace");
2539 let entry = EvidenceLedgerBuilder::new()
2540 .ts_unix_ms(1_700_000_000_000)
2541 .component("gen_server_capability_probe")
2542 .action("on_start")
2543 .posterior(vec![0.6, 0.4])
2544 .chosen_expected_loss(0.1)
2545 .calibration_score(0.85)
2546 .build()
2547 .expect("evidence entry");
2548 cx.emit_evidence(&entry);
2549 Box::pin(async {})
2550 }
2551
2552 fn handle_call(
2553 &mut self,
2554 _cx: &Cx,
2555 _request: (),
2556 reply: Reply<()>,
2557 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
2558 let _ = reply.send(());
2559 Box::pin(async {})
2560 }
2561 }
2562
2563 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2564 let clock = Arc::new(crate::time::VirtualClock::new());
2565 runtime
2566 .state
2567 .set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
2568
2569 let registry = crate::cx::NameRegistry::new();
2570 let registry_handle = RegistryHandle::new(Arc::new(registry));
2571 let sink = Arc::new(CollectorSink::new());
2572 let collector = LogCollector::new(16).with_min_level(LogLevel::Trace);
2573 let blocking_pool = crate::runtime::blocking_pool::BlockingPool::new(1, 1);
2574 let cx = Cx::for_testing()
2575 .with_registry_handle(Some(registry_handle))
2576 .with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")))
2577 .with_blocking_pool_handle(Some(blocking_pool.handle()))
2578 .with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
2579 cx.set_log_collector(collector.clone());
2580
2581 let region = runtime.state.create_root_region(Budget::INFINITE);
2582 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2583
2584 let (mut handle, stored) = scope
2585 .spawn_gen_server(&mut runtime.state, &cx, CapabilityProbe::default(), 8)
2586 .expect("spawn should succeed");
2587 let task_id = handle.task_id();
2588 runtime.state.store_spawned_task(task_id, stored);
2589
2590 {
2591 runtime.scheduler.lock().schedule(task_id, 0);
2592 }
2593 runtime.run_until_idle();
2594
2595 handle.stop();
2596 {
2597 runtime.scheduler.lock().schedule(task_id, 0);
2598 }
2599 runtime.run_until_quiescent();
2600
2601 let server = futures_lite::future::block_on(handle.join(&cx)).expect("join ok");
2602 assert!(
2603 server.has_timer,
2604 "gen server child cx must inherit timer driver"
2605 );
2606 assert!(
2607 server.has_io_driver,
2608 "gen server child cx must inherit the runtime I/O driver",
2609 );
2610 assert!(
2611 server.has_registry,
2612 "gen server child cx must inherit registry handle",
2613 );
2614 assert!(
2615 server.has_remote,
2616 "gen server child cx must inherit remote cap"
2617 );
2618 assert!(
2619 server.has_blocking_pool,
2620 "gen server child cx must inherit blocking-pool capability",
2621 );
2622 assert!(
2623 server.has_log_collector,
2624 "gen server child cx must inherit observability collector state",
2625 );
2626 assert_eq!(server.remote_origin.as_deref(), Some("Node(origin-test)"));
2627 assert!(
2628 server.logical_tick_advanced,
2629 "gen server child cx must inherit a live logical clock",
2630 );
2631 let entries = sink.entries();
2632 assert_eq!(
2633 entries.len(),
2634 1,
2635 "gen server child cx must inherit evidence sink"
2636 );
2637 assert_eq!(entries[0].component, "gen_server_capability_probe");
2638 assert!(
2639 collector
2640 .peek()
2641 .iter()
2642 .any(|entry| entry.message() == "gen_server_capability_probe_trace"),
2643 "gen server child cx must inherit trace/log collector wiring",
2644 );
2645
2646 crate::test_complete!("gen_server_spawn_inherits_full_child_cx_capabilities");
2647 }
2648
2649 #[test]
2650 fn gen_server_call_cancellation_is_deterministic() {
2651 init_test("gen_server_call_cancellation_is_deterministic");
2652
2653 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2654 let region = runtime.state.create_root_region(Budget::INFINITE);
2655 let cx = Cx::for_testing();
2656 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2657
2658 let (handle, stored) = scope
2659 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2660 .unwrap();
2661 let server_task_id = handle.task_id();
2662 runtime.state.store_spawned_task(server_task_id, stored);
2663
2664 let server_ref = handle.server_ref();
2665
2666 let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2667 let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2668
2669 let (mut client_handle, client_stored) = scope
2670 .spawn(&mut runtime.state, &cx, move |cx| async move {
2671 {
2672 *client_cx_cell_for_task.lock() = Some(cx.clone());
2673 }
2674 server_ref.call(&cx, CounterCall::Get).await
2675 })
2676 .unwrap();
2677 let client_task_id = client_handle.task_id();
2678 runtime
2679 .state
2680 .store_spawned_task(client_task_id, client_stored);
2681
2682 {
2684 runtime.scheduler.lock().schedule(client_task_id, 0);
2685 }
2686 runtime.run_until_idle();
2687
2688 let client_cx = client_cx_cell
2690 .lock()
2691 .as_ref()
2692 .expect("client cx published")
2693 .clone();
2694 client_cx.cancel_with(CancelKind::User, Some("gen_server call cancelled"));
2695
2696 {
2697 runtime.scheduler.lock().schedule(client_task_id, 0);
2698 }
2699 runtime.run_until_idle();
2700
2701 let result =
2702 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2703 match result {
2704 Ok(_) => unreachable!("expected cancellation, got Ok"),
2705 Err(CallError::Cancelled(reason)) => {
2706 assert_eq!(reason.kind, CancelKind::User);
2707 assert_eq!(
2708 reason.message,
2709 Some("gen_server call cancelled".to_string())
2710 );
2711 }
2712 Err(other) => unreachable!("expected CallError::Cancelled, got {other:?}"),
2713 }
2714
2715 drop(handle);
2717 {
2718 runtime.scheduler.lock().schedule(server_task_id, 0);
2719 }
2720 runtime.run_until_quiescent();
2721
2722 crate::test_complete!("gen_server_call_cancellation_is_deterministic");
2723 }
2724
2725 #[test]
2726 fn supervised_gen_server_stays_alive() {
2727 init_test("supervised_gen_server_stays_alive");
2728
2729 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2730 let region = runtime.state.create_root_region(Budget::INFINITE);
2731 let cx = Cx::for_testing();
2732 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2733 let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
2734
2735 let mut starter =
2736 named_gen_server_start(Arc::clone(®istry), "persistent_service", 32, || {
2737 Counter { count: 0 }
2738 });
2739
2740 let task_id = starter
2741 .start(&scope, &mut runtime.state, &cx)
2742 .expect("start ok");
2743
2744 {
2747 runtime.scheduler.lock().schedule(task_id, 0);
2748 }
2749 runtime.run_until_idle();
2750
2751 let task = runtime.state.task(task_id).expect("task exists");
2752 crate::assert_with_log!(
2753 !task.state.is_terminal(),
2754 "server should be alive",
2755 "Running",
2756 format!("{:?}", task.state)
2757 );
2758
2759 let tasks_to_schedule =
2761 runtime
2762 .state
2763 .cancel_request(region, &CancelReason::user("test done"), None);
2764 for (tid, priority) in tasks_to_schedule {
2765 runtime.scheduler.lock().schedule(tid, priority);
2766 }
2767 {
2768 runtime.scheduler.lock().schedule(task_id, 0);
2769 }
2770 runtime.run_until_quiescent();
2771
2772 assert!(
2773 registry.lock().whereis("persistent_service").is_none(),
2774 "name must be removed after region stop",
2775 );
2776
2777 crate::test_complete!("supervised_gen_server_stays_alive");
2778 }
2779
2780 #[test]
2781 fn gen_server_cast_cancellation_is_deterministic() {
2782 init_test("gen_server_cast_cancellation_is_deterministic");
2783
2784 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2785 let region = runtime.state.create_root_region(Budget::INFINITE);
2786 let cx = Cx::for_testing();
2787 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2788
2789 let (handle, stored) = scope
2791 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
2792 .unwrap();
2793 let server_task_id = handle.task_id();
2794 runtime.state.store_spawned_task(server_task_id, stored);
2795
2796 let server_ref = handle.server_ref();
2797
2798 futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset))
2799 .expect("prefill cast ok");
2800
2801 let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
2802 let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
2803
2804 let (mut client_handle, client_stored) = scope
2805 .spawn(&mut runtime.state, &cx, move |cx| async move {
2806 {
2807 *client_cx_cell_for_task.lock() = Some(cx.clone());
2808 }
2809 server_ref.cast(&cx, CounterCast::Reset).await
2810 })
2811 .unwrap();
2812 let client_task_id = client_handle.task_id();
2813 runtime
2814 .state
2815 .store_spawned_task(client_task_id, client_stored);
2816
2817 {
2819 runtime.scheduler.lock().schedule(client_task_id, 0);
2820 }
2821 runtime.run_until_idle();
2822
2823 let client_cx = client_cx_cell
2825 .lock()
2826 .as_ref()
2827 .expect("client cx published")
2828 .clone();
2829 client_cx.cancel_with(CancelKind::User, Some("gen_server cast cancelled"));
2830
2831 {
2832 runtime.scheduler.lock().schedule(client_task_id, 0);
2833 }
2834 runtime.run_until_idle();
2835
2836 let result =
2837 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
2838 match result {
2839 Ok(()) => unreachable!("expected cancellation, got Ok"),
2840 Err(CastError::Cancelled(reason)) => {
2841 assert_eq!(reason.kind, CancelKind::User);
2842 assert_eq!(
2843 reason.message,
2844 Some("gen_server cast cancelled".to_string())
2845 );
2846 }
2847 Err(other) => unreachable!("expected CastError::Cancelled, got {other:?}"),
2848 }
2849
2850 drop(handle);
2852 {
2853 runtime.scheduler.lock().schedule(server_task_id, 0);
2854 }
2855 runtime.run_until_quiescent();
2856
2857 crate::test_complete!("gen_server_cast_cancellation_is_deterministic");
2858 }
2859
2860 #[test]
2861 fn gen_server_handle_accessors() {
2862 init_test("gen_server_handle_accessors");
2863
2864 let mut state = RuntimeState::new();
2865 let root = state.create_root_region(Budget::INFINITE);
2866 let cx = Cx::for_testing();
2867 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2868
2869 let (handle, stored) = scope
2870 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2871 .unwrap();
2872 state.store_spawned_task(handle.task_id(), stored);
2873
2874 let _actor_id = handle.actor_id();
2875 let _task_id = handle.task_id();
2876 assert!(!handle.is_finished());
2877
2878 let server_ref = handle.server_ref();
2879 assert_eq!(server_ref.actor_id(), handle.actor_id());
2880 assert!(server_ref.is_alive());
2881 assert!(!server_ref.is_closed());
2882
2883 crate::test_complete!("gen_server_handle_accessors");
2884 }
2885
2886 #[test]
2887 fn gen_server_ref_is_cloneable() {
2888 init_test("gen_server_ref_is_cloneable");
2889
2890 let mut state = RuntimeState::new();
2891 let root = state.create_root_region(Budget::INFINITE);
2892 let cx = Cx::for_testing();
2893 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
2894
2895 let (handle, stored) = scope
2896 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
2897 .unwrap();
2898 state.store_spawned_task(handle.task_id(), stored);
2899
2900 let ref1 = handle.server_ref();
2901 let ref2 = ref1.clone();
2902 assert_eq!(ref1.actor_id(), ref2.actor_id());
2903
2904 crate::test_complete!("gen_server_ref_is_cloneable");
2905 }
2906
2907 #[test]
2908 fn gen_server_stop_transitions() {
2909 init_test("gen_server_stop_transitions");
2910
2911 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2912 let region = runtime.state.create_root_region(Budget::INFINITE);
2913 let cx = Cx::for_testing();
2914 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2915
2916 let (handle, stored) = scope
2917 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2918 .unwrap();
2919 let task_id = handle.task_id();
2920 runtime.state.store_spawned_task(task_id, stored);
2921
2922 let server_ref = handle.server_ref();
2923 assert!(server_ref.is_alive());
2924
2925 handle.stop();
2926
2927 {
2928 runtime.scheduler.lock().schedule(task_id, 0);
2929 }
2930 runtime.run_until_quiescent();
2931
2932 assert!(handle.is_finished());
2933 assert!(!server_ref.is_alive());
2934
2935 crate::test_complete!("gen_server_stop_transitions");
2936 }
2937
2938 #[test]
2939 fn gen_server_handle_rejects_call_and_cast_after_stop() {
2940 init_test("gen_server_handle_rejects_call_and_cast_after_stop");
2941
2942 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2943 let region = runtime.state.create_root_region(Budget::INFINITE);
2944 let cx = Cx::for_testing();
2945 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2946
2947 let (handle, stored) = scope
2948 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2949 .unwrap();
2950 let task_id = handle.task_id();
2951 runtime.state.store_spawned_task(task_id, stored);
2952
2953 {
2955 runtime.scheduler.lock().schedule(task_id, 0);
2956 }
2957 runtime.run_until_idle();
2958 handle.stop();
2959
2960 let call_err =
2961 futures_lite::future::block_on(handle.call(&cx, CounterCall::Get)).unwrap_err();
2962 assert!(
2963 matches!(call_err, CallError::ServerStopped),
2964 "call after stop must return ServerStopped, got {call_err:?}"
2965 );
2966
2967 let cast_err =
2968 futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset)).unwrap_err();
2969 assert!(
2970 matches!(cast_err, CastError::ServerStopped),
2971 "cast after stop must return ServerStopped, got {cast_err:?}"
2972 );
2973
2974 {
2975 runtime.scheduler.lock().schedule(task_id, 0);
2976 }
2977 runtime.run_until_quiescent();
2978 assert!(handle.is_finished());
2979
2980 crate::test_complete!("gen_server_handle_rejects_call_and_cast_after_stop");
2981 }
2982
2983 #[test]
2984 fn gen_server_handle_join_returns_final_state_after_stop() {
2985 init_test("gen_server_handle_join_returns_final_state_after_stop");
2986
2987 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
2988 let region = runtime.state.create_root_region(Budget::INFINITE);
2989 let cx = Cx::for_testing();
2990 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
2991
2992 let (mut handle, stored) = scope
2993 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
2994 .unwrap();
2995 let task_id = handle.task_id();
2996 runtime.state.store_spawned_task(task_id, stored);
2997
2998 handle.stop();
2999 {
3000 runtime.scheduler.lock().schedule(task_id, 0);
3001 }
3002 runtime.run_until_quiescent();
3003 assert!(handle.is_finished());
3004
3005 let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("join");
3006 assert_eq!(
3007 final_state.count, 0,
3008 "final server state should be returned"
3009 );
3010
3011 crate::test_complete!("gen_server_handle_join_returns_final_state_after_stop");
3012 }
3013
3014 #[test]
3015 fn gen_server_handle_second_join_fails_closed() {
3016 init_test("gen_server_handle_second_join_fails_closed");
3017
3018 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3019 let region = runtime.state.create_root_region(Budget::INFINITE);
3020 let cx = Cx::for_testing();
3021 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3022
3023 let (mut handle, stored) = scope
3024 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
3025 .unwrap();
3026 let task_id = handle.task_id();
3027 runtime.state.store_spawned_task(task_id, stored);
3028
3029 handle.stop();
3030 {
3031 runtime.scheduler.lock().schedule(task_id, 0);
3032 }
3033 runtime.run_until_quiescent();
3034 assert!(
3035 handle.is_finished(),
3036 "stopped server should report finished"
3037 );
3038
3039 let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
3040 assert_eq!(
3041 final_state.count, 0,
3042 "join should return final server state"
3043 );
3044
3045 let second = futures_lite::future::block_on(handle.join(&cx));
3046 assert!(
3047 matches!(second, Err(JoinError::PolledAfterCompletion)),
3048 "second join must fail closed, got {second:?}"
3049 );
3050
3051 crate::test_complete!("gen_server_handle_second_join_fails_closed");
3052 }
3053
3054 #[test]
3055 fn gen_server_stop_wakes_blocked_mailbox_recv() {
3056 #[allow(clippy::items_after_statements)]
3057 struct StopWakeProbe {
3058 stop_ran: Arc<AtomicU8>,
3059 }
3060
3061 #[allow(clippy::items_after_statements)]
3062 impl GenServer for StopWakeProbe {
3063 type Call = CounterCall;
3064 type Reply = u64;
3065 type Cast = CounterCast;
3066 type Info = SystemMsg;
3067
3068 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3069 self.stop_ran.store(1, Ordering::SeqCst);
3070 Box::pin(async {})
3071 }
3072
3073 fn handle_call(
3074 &mut self,
3075 _cx: &Cx,
3076 _request: CounterCall,
3077 reply: Reply<u64>,
3078 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3079 let _ = reply.send(0);
3080 Box::pin(async {})
3081 }
3082 }
3083
3084 init_test("gen_server_stop_wakes_blocked_mailbox_recv");
3085
3086 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3087 let region = runtime.state.create_root_region(Budget::INFINITE);
3088 let cx = Cx::for_testing();
3089 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3090
3091 let stop_ran = Arc::new(AtomicU8::new(0));
3092 let server = StopWakeProbe {
3093 stop_ran: Arc::clone(&stop_ran),
3094 };
3095
3096 let (handle, stored) = scope
3097 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3098 .unwrap();
3099 let server_task_id = handle.task_id();
3100 runtime.state.store_spawned_task(server_task_id, stored);
3101
3102 {
3104 runtime.scheduler.lock().schedule(server_task_id, 0);
3105 }
3106 runtime.run_until_idle();
3107
3108 handle.stop();
3110 runtime.run_until_quiescent();
3111
3112 assert_eq!(
3113 stop_ran.load(Ordering::SeqCst),
3114 1,
3115 "on_stop should run after stop wakes blocked recv"
3116 );
3117 assert!(handle.is_finished(), "server should finish after stop");
3118
3119 crate::test_complete!("gen_server_stop_wakes_blocked_mailbox_recv");
3120 }
3121
3122 #[test]
3123 fn dropped_join_future_marks_server_stopping_like_abort() {
3124 init_test("dropped_join_future_marks_server_stopping_like_abort");
3125
3126 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3127 let region = runtime.state.create_root_region(Budget::INFINITE);
3128 let cx = Cx::for_testing();
3129 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3130
3131 let final_count = Arc::new(AtomicU64::new(u64::MAX));
3132 let server = ObservableCounter {
3133 count: 0,
3134 final_count: Arc::clone(&final_count),
3135 };
3136
3137 let (mut handle, stored) = scope
3138 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3139 .unwrap();
3140 let server_task_id = handle.task_id();
3141 runtime.state.store_spawned_task(server_task_id, stored);
3142
3143 {
3144 runtime.scheduler.lock().schedule(server_task_id, 0);
3145 }
3146 runtime.run_until_idle();
3147 assert_eq!(
3148 handle.state.load(),
3149 ActorState::Running,
3150 "server should be running before join drop requests abort"
3151 );
3152
3153 drop(handle.join(&cx));
3154
3155 assert_eq!(
3156 handle.state.load(),
3157 ActorState::Stopping,
3158 "dropping join future should mirror GenServerHandle::abort state transition"
3159 );
3160
3161 runtime.run_until_quiescent();
3162 assert!(
3163 handle.is_finished(),
3164 "server should stop after join future drop"
3165 );
3166 assert_eq!(
3167 final_count.load(Ordering::SeqCst),
3168 0,
3169 "idle server should stop without processing phantom work"
3170 );
3171
3172 crate::test_complete!("dropped_join_future_marks_server_stopping_like_abort");
3173 }
3174
3175 struct ObservableCounter {
3178 count: u64,
3179 final_count: Arc<AtomicU64>,
3180 }
3181
3182 impl GenServer for ObservableCounter {
3183 type Call = CounterCall;
3184 type Reply = u64;
3185 type Cast = CounterCast;
3186 type Info = SystemMsg;
3187
3188 fn handle_call(
3189 &mut self,
3190 _cx: &Cx,
3191 request: CounterCall,
3192 reply: Reply<u64>,
3193 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3194 match request {
3195 CounterCall::Get => {
3196 let _ = reply.send(self.count);
3197 }
3198 CounterCall::Add(n) => {
3199 self.count += n;
3200 let _ = reply.send(self.count);
3201 }
3202 }
3203 Box::pin(async {})
3204 }
3205
3206 fn handle_cast(
3207 &mut self,
3208 _cx: &Cx,
3209 msg: CounterCast,
3210 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3211 match msg {
3212 CounterCast::Reset => self.count = 0,
3213 }
3214 Box::pin(async {})
3215 }
3216
3217 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3218 self.final_count.store(self.count, Ordering::SeqCst);
3219 Box::pin(async {})
3220 }
3221 }
3222
3223 #[test]
3224 fn gen_server_processes_casts_before_stop() {
3225 init_test("gen_server_processes_casts_before_stop");
3226
3227 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3228 let region = runtime.state.create_root_region(Budget::INFINITE);
3229 let cx = Cx::for_testing();
3230 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3231
3232 let final_count = Arc::new(AtomicU64::new(u64::MAX));
3233 let server = ObservableCounter {
3234 count: 0,
3235 final_count: final_count.clone(),
3236 };
3237
3238 let (handle, stored) = scope
3239 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3240 .unwrap();
3241 let task_id = handle.task_id();
3242 runtime.state.store_spawned_task(task_id, stored);
3243
3244 {
3246 runtime.scheduler.lock().schedule(task_id, 0);
3247 }
3248 runtime.run_until_idle();
3249
3250 for _ in 0..5 {
3253 handle.try_cast(CounterCast::Reset).expect("try_cast ok");
3254 }
3255
3256 handle.stop();
3257
3258 {
3259 runtime.scheduler.lock().schedule(task_id, 0);
3260 }
3261 runtime.run_until_quiescent();
3262
3263 assert_eq!(
3265 final_count.load(Ordering::SeqCst),
3266 0,
3267 "on_stop recorded final count"
3268 );
3269
3270 crate::test_complete!("gen_server_processes_casts_before_stop");
3271 }
3272
3273 #[test]
3274 fn gen_server_deterministic_replay() {
3275 fn run_scenario(seed: u64) -> u64 {
3276 let config = crate::lab::LabConfig::new(seed);
3277 let mut runtime = crate::lab::LabRuntime::new(config);
3278 let region = runtime.state.create_root_region(Budget::INFINITE);
3279 let cx = Cx::for_testing();
3280 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3281
3282 let final_count = Arc::new(AtomicU64::new(u64::MAX));
3283 let server = ObservableCounter {
3284 count: 0,
3285 final_count: final_count.clone(),
3286 };
3287
3288 let (handle, stored) = scope
3289 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3290 .unwrap();
3291 let task_id = handle.task_id();
3292 runtime.state.store_spawned_task(task_id, stored);
3293
3294 {
3295 runtime.scheduler.lock().schedule(task_id, 0);
3296 }
3297 runtime.run_until_idle();
3298
3299 for _ in 0..5 {
3301 handle.try_cast(CounterCast::Reset).expect("try_cast ok");
3302 }
3303 handle.stop();
3304
3305 {
3306 runtime.scheduler.lock().schedule(task_id, 0);
3307 }
3308 runtime.run_until_quiescent();
3309
3310 final_count.load(Ordering::SeqCst)
3311 }
3312
3313 init_test("gen_server_deterministic_replay");
3314
3315 let result1 = run_scenario(0xCAFE_BABE);
3316 let result2 = run_scenario(0xCAFE_BABE);
3317 assert_eq!(result1, result2, "deterministic replay");
3318
3319 crate::test_complete!("gen_server_deterministic_replay");
3320 }
3321
3322 #[derive(Default)]
3325 struct InfoRecorder {
3326 seen: Arc<Mutex<Vec<String>>>,
3327 }
3328
3329 impl GenServer for InfoRecorder {
3330 type Call = ();
3331 type Reply = ();
3332 type Cast = ();
3333 type Info = SystemMsg;
3334
3335 fn handle_call(
3336 &mut self,
3337 _cx: &Cx,
3338 _request: (),
3339 reply: Reply<()>,
3340 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3341 let _ = reply.send(());
3342 Box::pin(async {})
3343 }
3344
3345 fn handle_info(
3346 &mut self,
3347 _cx: &Cx,
3348 msg: Self::Info,
3349 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3350 let seen = Arc::clone(&self.seen);
3351 Box::pin(async move {
3352 seen.lock().push(format!("{msg:?}"));
3353 })
3354 }
3355 }
3356
3357 fn tid(n: u32) -> TaskId {
3358 TaskId::from_arena(ArenaIndex::new(n, 0))
3359 }
3360
3361 fn rid(n: u32) -> crate::types::RegionId {
3362 crate::types::RegionId::from_arena(ArenaIndex::new(n, 0))
3363 }
3364
3365 #[test]
3368 fn conformance_system_msg_sort_key_orders_shutdown_batch() {
3369 init_test("conformance_system_msg_sort_key_orders_shutdown_batch");
3370
3371 let mut monitors = crate::monitor::MonitorSet::new();
3372 let mref_down_6 = monitors.establish(tid(90), rid(0), tid(6));
3373 let mref_down_3 = monitors.establish(tid(91), rid(0), tid(3));
3374
3375 let mut batch = SystemMsgBatch::new();
3376 batch.push(SystemMsg::Exit {
3377 exit_vt: Time::from_secs(10),
3378 from: tid(6),
3379 reason: DownReason::Normal,
3380 });
3381 batch.push(SystemMsg::Timeout {
3382 tick_vt: Time::from_secs(10),
3383 id: 4,
3384 });
3385 batch.push(SystemMsg::Down {
3386 completion_vt: Time::from_secs(10),
3387 notification: DownNotification {
3388 monitored: tid(6),
3389 reason: DownReason::Normal,
3390 monitor_ref: mref_down_6,
3391 },
3392 });
3393 batch.push(SystemMsg::Timeout {
3394 tick_vt: Time::from_secs(9),
3395 id: 99,
3396 });
3397 batch.push(SystemMsg::Down {
3398 completion_vt: Time::from_secs(10),
3399 notification: DownNotification {
3400 monitored: tid(3),
3401 reason: DownReason::Normal,
3402 monitor_ref: mref_down_3,
3403 },
3404 });
3405 batch.push(SystemMsg::Exit {
3406 exit_vt: Time::from_secs(10),
3407 from: tid(2),
3408 reason: DownReason::Normal,
3409 });
3410 batch.push(SystemMsg::Timeout {
3411 tick_vt: Time::from_secs(10),
3412 id: 1,
3413 });
3414
3415 let sorted = batch.into_sorted();
3416 let keys: Vec<_> = sorted.iter().map(SystemMsg::sort_key).collect();
3417
3418 assert_eq!(
3419 keys,
3420 vec![
3421 (Time::from_secs(9), 2, SystemMsgSubjectKey::TimeoutId(99)),
3422 (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(3))),
3423 (Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(6))),
3424 (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(2))),
3425 (Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(6))),
3426 (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(1)),
3427 (Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(4)),
3428 ],
3429 "shutdown system-message ordering must follow SYS-ORDER"
3430 );
3431
3432 crate::test_complete!("conformance_system_msg_sort_key_orders_shutdown_batch");
3433 }
3434
3435 #[test]
3438 fn conformance_system_msg_batch_matches_explicit_sort() {
3439 init_test("conformance_system_msg_batch_matches_explicit_sort");
3440
3441 let mut monitors = crate::monitor::MonitorSet::new();
3442 let mref = monitors.establish(tid(77), rid(0), tid(8));
3443
3444 let messages = vec![
3445 SystemMsg::Timeout {
3446 tick_vt: Time::from_secs(12),
3447 id: 4,
3448 },
3449 SystemMsg::Exit {
3450 exit_vt: Time::from_secs(11),
3451 from: tid(8),
3452 reason: DownReason::Error("boom".to_string()),
3453 },
3454 SystemMsg::Down {
3455 completion_vt: Time::from_secs(11),
3456 notification: DownNotification {
3457 monitored: tid(8),
3458 reason: DownReason::Normal,
3459 monitor_ref: mref,
3460 },
3461 },
3462 SystemMsg::Timeout {
3463 tick_vt: Time::from_secs(11),
3464 id: 2,
3465 },
3466 ];
3467
3468 let mut batch = SystemMsgBatch::new();
3469 for msg in messages.clone() {
3470 batch.push(msg);
3471 }
3472 let batched = batch.into_sorted();
3473
3474 let mut explicit = messages;
3475 explicit.sort_by_key(SystemMsg::sort_key);
3476
3477 let batched_keys: Vec<_> = batched.iter().map(SystemMsg::sort_key).collect();
3478 let explicit_keys: Vec<_> = explicit.iter().map(SystemMsg::sort_key).collect();
3479 assert_eq!(batched_keys, explicit_keys);
3480
3481 crate::test_complete!("conformance_system_msg_batch_matches_explicit_sort");
3482 }
3483
3484 #[test]
3485 fn system_msg_payload_types_roundtrip_via_conversions() {
3486 init_test("system_msg_payload_types_roundtrip_via_conversions");
3487
3488 let mut monitors = crate::monitor::MonitorSet::new();
3489 let mref = monitors.establish(tid(7), rid(0), tid(8));
3490
3491 let down = DownMsg::new(
3492 Time::from_secs(11),
3493 DownNotification {
3494 monitored: tid(8),
3495 reason: DownReason::Normal,
3496 monitor_ref: mref,
3497 },
3498 );
3499 let down_msg = SystemMsg::down(down.clone());
3500 let down_back = DownMsg::try_from(down_msg).expect("down conversion");
3501 assert_eq!(down_back.completion_vt, down.completion_vt);
3502 assert_eq!(
3503 down_back.notification.monitored,
3504 down.notification.monitored
3505 );
3506 assert_eq!(down_back.notification.reason, down.notification.reason);
3507 assert_eq!(
3508 down_back.notification.monitor_ref,
3509 down.notification.monitor_ref
3510 );
3511
3512 let exit = ExitMsg::new(
3513 Time::from_secs(12),
3514 tid(9),
3515 DownReason::Error("boom".into()),
3516 );
3517 let exit_msg = SystemMsg::exit(exit.clone());
3518 let exit_back = ExitMsg::try_from(exit_msg).expect("exit conversion");
3519 assert_eq!(exit_back, exit);
3520
3521 let timeout = TimeoutMsg::new(Time::from_secs(13), 42);
3522 let timeout_msg = SystemMsg::timeout(timeout);
3523 let timeout_back = TimeoutMsg::try_from(timeout_msg).expect("timeout conversion");
3524 assert_eq!(timeout_back, timeout);
3525
3526 crate::test_complete!("system_msg_payload_types_roundtrip_via_conversions");
3527 }
3528
3529 #[test]
3530 fn system_msg_try_from_mismatch_returns_original_variant() {
3531 init_test("system_msg_try_from_mismatch_returns_original_variant");
3532 let mut monitors = crate::monitor::MonitorSet::new();
3533 let mref = monitors.establish(tid(10), rid(0), tid(1));
3534
3535 let timeout = SystemMsg::Timeout {
3536 tick_vt: Time::from_secs(5),
3537 id: 99,
3538 };
3539 let err = DownMsg::try_from(timeout).expect_err("timeout is not down");
3540 assert!(matches!(err, SystemMsg::Timeout { id: 99, .. }));
3541
3542 let down = SystemMsg::Down {
3543 completion_vt: Time::from_secs(6),
3544 notification: DownNotification {
3545 monitored: tid(1),
3546 reason: DownReason::Normal,
3547 monitor_ref: mref,
3548 },
3549 };
3550 let err = TimeoutMsg::try_from(down).expect_err("down is not timeout");
3551 assert!(matches!(err, SystemMsg::Down { .. }));
3552
3553 crate::test_complete!("system_msg_try_from_mismatch_returns_original_variant");
3554 }
3555
3556 #[test]
3557 fn gen_server_handle_info_receives_system_messages() {
3558 init_test("gen_server_handle_info_receives_system_messages");
3559
3560 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3561 let region = runtime.state.create_root_region(Budget::INFINITE);
3562 let cx = Cx::for_testing();
3563 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3564
3565 let seen: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3566 let server = InfoRecorder {
3567 seen: Arc::clone(&seen),
3568 };
3569
3570 let (handle, stored) = scope
3571 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3572 .unwrap();
3573 let server_task_id = handle.task_id();
3574 runtime.state.store_spawned_task(server_task_id, stored);
3575
3576 let mut monitors = crate::monitor::MonitorSet::new();
3577 let mref = monitors.establish(tid(10), rid(0), tid(11));
3578
3579 handle
3580 .try_info(SystemMsg::Down {
3581 completion_vt: Time::from_secs(5),
3582 notification: DownNotification {
3583 monitored: tid(11),
3584 reason: DownReason::Normal,
3585 monitor_ref: mref,
3586 },
3587 })
3588 .unwrap();
3589
3590 handle
3591 .try_info(SystemMsg::Exit {
3592 exit_vt: Time::from_secs(6),
3593 from: tid(12),
3594 reason: DownReason::Error("boom".into()),
3595 })
3596 .unwrap();
3597
3598 handle
3599 .try_info(SystemMsg::Timeout {
3600 tick_vt: Time::from_secs(7),
3601 id: 123,
3602 })
3603 .unwrap();
3604
3605 drop(handle);
3606
3607 {
3608 runtime.scheduler.lock().schedule(server_task_id, 0);
3609 }
3610 runtime.run_until_quiescent();
3611
3612 let seen = seen.lock();
3613 assert_eq!(seen.len(), 3);
3614 assert!(seen[0].contains("Down"));
3615 assert!(seen[1].contains("Exit"));
3616 assert!(seen[2].contains("Timeout"));
3617 drop(seen);
3618
3619 crate::test_complete!("gen_server_handle_info_receives_system_messages");
3620 }
3621
3622 #[test]
3623 fn gen_server_info_ordering_is_deterministic_for_seed() {
3624 fn run_scenario(seed: u64) -> Vec<String> {
3625 let config = crate::lab::LabConfig::new(seed);
3626 let mut runtime = crate::lab::LabRuntime::new(config);
3627 let region = runtime.state.create_root_region(Budget::INFINITE);
3628 let cx = Cx::for_testing();
3629 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
3630
3631 let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
3632 let server = InfoRecorder {
3633 seen: Arc::clone(&events),
3634 };
3635
3636 let (handle, stored) = scope
3637 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
3638 .unwrap();
3639 let server_task_id = handle.task_id();
3640 runtime.state.store_spawned_task(server_task_id, stored);
3641
3642 let server_ref = handle.server_ref();
3643
3644 let (client_a, stored_a) = scope
3645 .spawn(&mut runtime.state, &cx, move |cx| async move {
3646 server_ref
3647 .info(
3648 &cx,
3649 SystemMsg::Timeout {
3650 tick_vt: Time::from_secs(2),
3651 id: 1,
3652 },
3653 )
3654 .await
3655 .unwrap();
3656 })
3657 .unwrap();
3658 let task_id_a = client_a.task_id();
3659 runtime.state.store_spawned_task(task_id_a, stored_a);
3660
3661 let server_ref_b = handle.server_ref();
3662 let (client_b, stored_b) = scope
3663 .spawn(&mut runtime.state, &cx, move |cx| async move {
3664 server_ref_b
3665 .info(
3666 &cx,
3667 SystemMsg::Timeout {
3668 tick_vt: Time::from_secs(2),
3669 id: 2,
3670 },
3671 )
3672 .await
3673 .unwrap();
3674 })
3675 .unwrap();
3676 let task_id_b = client_b.task_id();
3677 runtime.state.store_spawned_task(task_id_b, stored_b);
3678
3679 {
3681 runtime.scheduler.lock().schedule(task_id_a, 0);
3682 }
3683 {
3684 runtime.scheduler.lock().schedule(task_id_b, 0);
3685 }
3686 {
3687 runtime.scheduler.lock().schedule(server_task_id, 0);
3688 }
3689
3690 runtime.run_until_quiescent();
3691 drop(handle);
3692 {
3693 runtime.scheduler.lock().schedule(server_task_id, 0);
3694 }
3695 runtime.run_until_quiescent();
3696
3697 events.lock().clone()
3698 }
3699
3700 init_test("gen_server_info_ordering_is_deterministic_for_seed");
3701
3702 let a = run_scenario(0xD00D_F00D);
3703 let b = run_scenario(0xD00D_F00D);
3704 assert_eq!(
3705 a, b,
3706 "system/info ordering must be deterministic for same seed"
3707 );
3708
3709 crate::test_complete!("gen_server_info_ordering_is_deterministic_for_seed");
3710 }
3711
3712 struct DropOldestCounter {
3716 count: u64,
3717 }
3718
3719 #[derive(Debug, Clone)]
3721 enum TaggedCast {
3722 Set(u64),
3723 }
3724
3725 impl GenServer for DropOldestCounter {
3726 type Call = CounterCall;
3727 type Reply = u64;
3728 type Cast = TaggedCast;
3729 type Info = SystemMsg;
3730
3731 fn cast_overflow_policy(&self) -> CastOverflowPolicy {
3732 CastOverflowPolicy::DropOldest
3733 }
3734
3735 fn handle_call(
3736 &mut self,
3737 _cx: &Cx,
3738 request: CounterCall,
3739 reply: Reply<u64>,
3740 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3741 match request {
3742 CounterCall::Get => {
3743 let _ = reply.send(self.count);
3744 }
3745 CounterCall::Add(n) => {
3746 self.count += n;
3747 let _ = reply.send(self.count);
3748 }
3749 }
3750 Box::pin(async {})
3751 }
3752
3753 fn handle_cast(
3754 &mut self,
3755 _cx: &Cx,
3756 msg: TaggedCast,
3757 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
3758 match msg {
3759 TaggedCast::Set(v) => self.count = v,
3760 }
3761 Box::pin(async {})
3762 }
3763 }
3764
3765 #[test]
3766 fn gen_server_drop_oldest_policy_accessor() {
3767 init_test("gen_server_drop_oldest_policy_accessor");
3768
3769 let mut state = RuntimeState::new();
3770 let root = state.create_root_region(Budget::INFINITE);
3771 let cx = Cx::for_testing();
3772 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3773
3774 let (handle, stored) = scope
3775 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 4)
3776 .unwrap();
3777 state.store_spawned_task(handle.task_id(), stored);
3778
3779 assert_eq!(
3780 handle.cast_overflow_policy(),
3781 CastOverflowPolicy::DropOldest
3782 );
3783
3784 let server_ref = handle.server_ref();
3785 assert_eq!(
3786 server_ref.cast_overflow_policy(),
3787 CastOverflowPolicy::DropOldest
3788 );
3789
3790 crate::test_complete!("gen_server_drop_oldest_policy_accessor");
3791 }
3792
3793 #[test]
3794 fn gen_server_drop_oldest_evicts_when_full() {
3795 init_test("gen_server_drop_oldest_evicts_when_full");
3796
3797 let mut state = RuntimeState::new();
3798 let root = state.create_root_region(Budget::INFINITE);
3799 let cx = Cx::for_testing();
3800 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3801
3802 let (handle, stored) = scope
3804 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3805 .unwrap();
3806 state.store_spawned_task(handle.task_id(), stored);
3807
3808 handle.try_cast(TaggedCast::Set(10)).unwrap();
3810 handle.try_cast(TaggedCast::Set(20)).unwrap();
3811
3812 handle.try_cast(TaggedCast::Set(30)).unwrap();
3814
3815 handle.try_cast(TaggedCast::Set(40)).unwrap();
3817
3818 crate::test_complete!("gen_server_drop_oldest_evicts_when_full");
3819 }
3820
3821 #[test]
3822 fn gen_server_reject_policy_returns_full() {
3823 init_test("gen_server_reject_policy_returns_full");
3824
3825 let mut state = RuntimeState::new();
3826 let root = state.create_root_region(Budget::INFINITE);
3827 let cx = Cx::for_testing();
3828 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3829
3830 let (handle, stored) = scope
3832 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
3833 .unwrap();
3834 state.store_spawned_task(handle.task_id(), stored);
3835
3836 assert_eq!(handle.cast_overflow_policy(), CastOverflowPolicy::Reject);
3837
3838 handle.try_cast(CounterCast::Reset).unwrap();
3840 handle.try_cast(CounterCast::Reset).unwrap();
3841
3842 let err = handle.try_cast(CounterCast::Reset).unwrap_err();
3844 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3845
3846 crate::test_complete!("gen_server_reject_policy_returns_full");
3847 }
3848
3849 #[test]
3850 fn gen_server_drop_oldest_ref_also_evicts() {
3851 init_test("gen_server_drop_oldest_ref_also_evicts");
3852
3853 let mut state = RuntimeState::new();
3854 let root = state.create_root_region(Budget::INFINITE);
3855 let cx = Cx::for_testing();
3856 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3857
3858 let (handle, stored) = scope
3859 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
3860 .unwrap();
3861 state.store_spawned_task(handle.task_id(), stored);
3862
3863 let server_ref = handle.server_ref();
3864
3865 server_ref.try_cast(TaggedCast::Set(1)).unwrap();
3867 server_ref.try_cast(TaggedCast::Set(2)).unwrap();
3868
3869 server_ref.try_cast(TaggedCast::Set(3)).unwrap();
3871
3872 crate::test_complete!("gen_server_drop_oldest_ref_also_evicts");
3873 }
3874
3875 #[test]
3876 fn gen_server_drop_oldest_reserved_slots_returns_full() {
3877 init_test("gen_server_drop_oldest_reserved_slots_returns_full");
3878
3879 let mut state = RuntimeState::new();
3880 let root = state.create_root_region(Budget::INFINITE);
3881 let cx = Cx::for_testing();
3882 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3883
3884 let (handle, stored) = scope
3885 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3886 .unwrap();
3887 state.store_spawned_task(handle.task_id(), stored);
3888
3889 let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
3891
3892 let err = handle.try_cast(TaggedCast::Set(1)).unwrap_err();
3894 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3895
3896 crate::test_complete!("gen_server_drop_oldest_reserved_slots_returns_full");
3897 }
3898
3899 #[test]
3900 fn gen_server_ref_drop_oldest_reserved_slots_returns_full() {
3901 init_test("gen_server_ref_drop_oldest_reserved_slots_returns_full");
3902
3903 let mut state = RuntimeState::new();
3904 let root = state.create_root_region(Budget::INFINITE);
3905 let cx = Cx::for_testing();
3906 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3907
3908 let (handle, stored) = scope
3909 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
3910 .unwrap();
3911 state.store_spawned_task(handle.task_id(), stored);
3912 let server_ref = handle.server_ref();
3913
3914 let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
3916
3917 let err = server_ref.try_cast(TaggedCast::Set(1)).unwrap_err();
3919 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3920
3921 crate::test_complete!("gen_server_ref_drop_oldest_reserved_slots_returns_full");
3922 }
3923
3924 #[test]
3926 fn gen_server_drop_oldest_preserves_queued_call_and_returns_full() {
3927 init_test("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
3928
3929 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3930 let root = runtime.state.create_root_region(Budget::INFINITE);
3931 let cx = Cx::for_testing();
3932 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3933
3934 let (handle, stored) = scope
3935 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3936 .unwrap();
3937 let task_id = handle.task_id();
3938 runtime.state.store_spawned_task(task_id, stored);
3939
3940 let (reply_tx, mut reply_rx) = session::tracked_oneshot::<u64>();
3941 let reply_permit = reply_tx.reserve(&cx);
3942 let call_envelope: Envelope<DropOldestCounter> = Envelope::Call {
3943 request: CounterCall::Get,
3944 reply_permit,
3945 };
3946 handle.sender.try_send(call_envelope).unwrap();
3947
3948 let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
3949 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3950
3951 handle.stop();
3952 {
3953 runtime.scheduler.lock().schedule(task_id, 0);
3954 }
3955 runtime.run_until_quiescent();
3956
3957 let recv = futures_lite::future::block_on(reply_rx.recv(&cx));
3958 assert_eq!(
3959 recv,
3960 Ok(0),
3961 "preserved queued call should still be serviced, got {recv:?}"
3962 );
3963
3964 crate::test_complete!("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
3965 }
3966
3967 #[test]
3968 fn gen_server_drop_oldest_preserves_queued_info_and_returns_full() {
3969 init_test("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
3970
3971 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
3972 let root = runtime.state.create_root_region(Budget::INFINITE);
3973 let cx = Cx::for_testing();
3974 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
3975
3976 let (handle, stored) = scope
3977 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
3978 .unwrap();
3979 let task_id = handle.task_id();
3980 runtime.state.store_spawned_task(task_id, stored);
3981
3982 let info = SystemMsg::timeout(TimeoutMsg::new(Time::from_secs(1), 7));
3983 handle
3984 .sender
3985 .try_send(Envelope::Info { msg: info })
3986 .expect("queue info");
3987
3988 let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
3989 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
3990
3991 handle.stop();
3992 {
3993 runtime.scheduler.lock().schedule(task_id, 0);
3994 }
3995 runtime.run_until_quiescent();
3996
3997 crate::test_complete!("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
3998 }
3999
4000 #[test]
4001 fn gen_server_default_overflow_policy_is_reject() {
4002 init_test("gen_server_default_overflow_policy_is_reject");
4003
4004 assert_eq!(CastOverflowPolicy::default(), CastOverflowPolicy::Reject);
4005
4006 let counter = Counter { count: 0 };
4008 assert_eq!(counter.cast_overflow_policy(), CastOverflowPolicy::Reject);
4009
4010 crate::test_complete!("gen_server_default_overflow_policy_is_reject");
4011 }
4012
4013 #[test]
4014 fn reply_debug_format() {
4015 init_test("reply_debug_format");
4016
4017 let cx = Cx::for_testing();
4018 let (tx, _rx) = session::tracked_oneshot::<u64>();
4019 let permit = tx.reserve(&cx);
4020 let reply = Reply::new(&cx, permit);
4021 let debug_str = format!("{reply:?}");
4022 assert!(debug_str.contains("Reply"));
4023 assert!(debug_str.contains("pending"));
4024
4025 let _ = reply.send(42);
4027
4028 crate::test_complete!("reply_debug_format");
4029 }
4030
4031 #[test]
4032 fn gen_server_on_start_budget_priority_applied_and_restored() {
4033 init_test("gen_server_on_start_budget_priority_applied_and_restored");
4034
4035 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4036 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4037 let region = runtime.state.create_root_region(budget);
4038 let cx = Cx::for_testing();
4039 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4040
4041 let started_priority = Arc::new(AtomicU8::new(0));
4042 let loop_priority = Arc::new(AtomicU8::new(0));
4043 let server = StartBudgetProbe {
4044 started_priority: Arc::clone(&started_priority),
4045 loop_priority: Arc::clone(&loop_priority),
4046 };
4047
4048 let (handle, stored) = scope
4049 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4050 .unwrap();
4051 let server_task_id = handle.task_id();
4052 runtime.state.store_spawned_task(server_task_id, stored);
4053
4054 let server_ref = handle.server_ref();
4055 let (client, stored_client) = scope
4056 .spawn(&mut runtime.state, &cx, move |cx| async move {
4057 let p = server_ref.call(&cx, CounterCall::Get).await.unwrap();
4058 assert_eq!(p, 10);
4059 })
4060 .unwrap();
4061 let client_task_id = client.task_id();
4062 runtime
4063 .state
4064 .store_spawned_task(client_task_id, stored_client);
4065
4066 {
4067 runtime.scheduler.lock().schedule(server_task_id, 0);
4068 }
4069 {
4070 runtime.scheduler.lock().schedule(client_task_id, 0);
4071 }
4072 runtime.run_until_quiescent();
4073
4074 assert_eq!(started_priority.load(Ordering::SeqCst), 200);
4075 assert_eq!(loop_priority.load(Ordering::SeqCst), 10);
4076
4077 drop(handle);
4078 {
4079 runtime.scheduler.lock().schedule(server_task_id, 0);
4080 }
4081 runtime.run_until_quiescent();
4082
4083 crate::test_complete!("gen_server_on_start_budget_priority_applied_and_restored");
4084 }
4085
4086 #[test]
4087 fn gen_server_on_stop_runs_masked_under_stop() {
4088 init_test("gen_server_on_stop_runs_masked_under_stop");
4089
4090 let budget = Budget::new().with_poll_quota(10_000);
4091 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4092 let region = runtime.state.create_root_region(budget);
4093 let cx = Cx::for_testing();
4094 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4095
4096 let stop_checkpoint_ok = Arc::new(AtomicU8::new(0));
4097 let server = StopMaskProbe {
4098 stop_checkpoint_ok: Arc::clone(&stop_checkpoint_ok),
4099 };
4100
4101 let (handle, stored) = scope
4102 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4103 .unwrap();
4104 let server_task_id = handle.task_id();
4105 runtime.state.store_spawned_task(server_task_id, stored);
4106
4107 handle.stop();
4109
4110 {
4111 runtime.scheduler.lock().schedule(server_task_id, 0);
4112 }
4113 runtime.run_until_quiescent();
4114
4115 assert_eq!(stop_checkpoint_ok.load(Ordering::SeqCst), 1);
4116
4117 crate::test_complete!("gen_server_on_stop_runs_masked_under_stop");
4118 }
4119
4120 #[test]
4124 fn cast_drop_oldest_emits_trace_on_eviction() {
4125 init_test("cast_drop_oldest_emits_trace_on_eviction");
4126
4127 let budget = Budget::new().with_poll_quota(10_000);
4128 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4129 let region = runtime.state.create_root_region(budget);
4130 let cx = Cx::for_testing();
4131 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4132
4133 let (handle, stored) = scope
4135 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
4136 .unwrap();
4137 let task_id = handle.task_id();
4138 runtime.state.store_spawned_task(task_id, stored);
4139
4140 {
4142 runtime.scheduler.lock().schedule(task_id, 0);
4143 }
4144 runtime.run_until_idle();
4145
4146 handle.try_cast(TaggedCast::Set(1)).unwrap();
4148 handle.try_cast(TaggedCast::Set(2)).unwrap();
4150
4151 crate::test_complete!("cast_drop_oldest_emits_trace_on_eviction");
4154 }
4155
4156 #[test]
4158 fn cast_to_stopped_server_returns_error() {
4159 init_test("cast_to_stopped_server_returns_error");
4160
4161 let mut state = RuntimeState::new();
4162 let root = state.create_root_region(Budget::INFINITE);
4163 let cx = Cx::for_testing();
4164 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4165
4166 let (handle, stored) = scope
4167 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 4)
4168 .unwrap();
4169 state.store_spawned_task(handle.task_id(), stored);
4170
4171 handle.stop();
4173
4174 let err = handle.try_cast(CounterCast::Reset).unwrap_err();
4176 assert!(
4177 matches!(err, CastError::ServerStopped),
4178 "expected ServerStopped, got {err:?}"
4179 );
4180
4181 crate::test_complete!("cast_to_stopped_server_returns_error");
4182 }
4183
4184 #[test]
4186 fn cast_overflow_policy_display() {
4187 init_test("cast_overflow_policy_display");
4188
4189 assert_eq!(format!("{}", CastOverflowPolicy::Reject), "Reject");
4190 assert_eq!(format!("{}", CastOverflowPolicy::DropOldest), "DropOldest");
4191
4192 crate::test_complete!("cast_overflow_policy_display");
4193 }
4194
4195 #[test]
4197 fn cast_ref_reject_returns_full() {
4198 init_test("cast_ref_reject_returns_full");
4199
4200 let mut state = RuntimeState::new();
4201 let root = state.create_root_region(Budget::INFINITE);
4202 let cx = Cx::for_testing();
4203 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4204
4205 let (handle, stored) = scope
4206 .spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
4207 .unwrap();
4208 state.store_spawned_task(handle.task_id(), stored);
4209
4210 let server_ref = handle.server_ref();
4211
4212 server_ref.try_cast(CounterCast::Reset).unwrap();
4214 server_ref.try_cast(CounterCast::Reset).unwrap();
4215
4216 let err = server_ref.try_cast(CounterCast::Reset).unwrap_err();
4218 assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
4219
4220 crate::test_complete!("cast_ref_reject_returns_full");
4221 }
4222
4223 #[test]
4225 fn cast_drop_oldest_ref_capacity_one() {
4226 init_test("cast_drop_oldest_ref_capacity_one");
4227
4228 let mut state = RuntimeState::new();
4229 let root = state.create_root_region(Budget::INFINITE);
4230 let cx = Cx::for_testing();
4231 let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
4232
4233 let (handle, stored) = scope
4234 .spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
4235 .unwrap();
4236 state.store_spawned_task(handle.task_id(), stored);
4237
4238 let server_ref = handle.server_ref();
4239
4240 server_ref.try_cast(TaggedCast::Set(100)).unwrap();
4242 server_ref.try_cast(TaggedCast::Set(200)).unwrap();
4244 server_ref.try_cast(TaggedCast::Set(300)).unwrap();
4245
4246 crate::test_complete!("cast_drop_oldest_ref_capacity_one");
4247 }
4248
4249 #[test]
4252 fn init_default_budget_is_infinite() {
4253 init_test("init_default_budget_is_infinite");
4254 let counter = Counter { count: 0 };
4255 assert_eq!(counter.on_start_budget(), Budget::INFINITE);
4256 crate::test_complete!("init_default_budget_is_infinite");
4257 }
4258
4259 #[test]
4260 fn terminate_default_budget_is_minimal() {
4261 init_test("terminate_default_budget_is_minimal");
4262 let counter = Counter { count: 0 };
4263 assert_eq!(counter.on_stop_budget(), Budget::MINIMAL);
4264 crate::test_complete!("terminate_default_budget_is_minimal");
4265 }
4266
4267 #[test]
4270 fn init_skipped_when_pre_cancelled_but_stop_runs() {
4271 #[allow(clippy::items_after_statements)]
4272 struct LifecycleProbe {
4273 init_ran: Arc<AtomicU8>,
4274 stop_ran: Arc<AtomicU8>,
4275 }
4276
4277 #[allow(clippy::items_after_statements)]
4278 impl GenServer for LifecycleProbe {
4279 type Call = CounterCall;
4280 type Reply = u64;
4281 type Cast = CounterCast;
4282 type Info = SystemMsg;
4283
4284 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4285 self.init_ran.store(1, Ordering::SeqCst);
4286 Box::pin(async {})
4287 }
4288
4289 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4290 self.stop_ran.store(1, Ordering::SeqCst);
4291 Box::pin(async {})
4292 }
4293
4294 fn handle_call(
4295 &mut self,
4296 _cx: &Cx,
4297 _request: CounterCall,
4298 reply: Reply<u64>,
4299 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4300 let _ = reply.send(0);
4301 Box::pin(async {})
4302 }
4303 }
4304
4305 init_test("init_skipped_when_pre_cancelled_but_stop_runs");
4306
4307 let budget = Budget::new().with_poll_quota(10_000);
4308 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4309 let region = runtime.state.create_root_region(budget);
4310 let cx = Cx::for_testing();
4311 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4312
4313 let init_ran = Arc::new(AtomicU8::new(0));
4314 let stop_ran = Arc::new(AtomicU8::new(0));
4315
4316 let server = LifecycleProbe {
4317 init_ran: Arc::clone(&init_ran),
4318 stop_ran: Arc::clone(&stop_ran),
4319 };
4320
4321 let (handle, stored) = scope
4322 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4323 .unwrap();
4324 let server_task_id = handle.task_id();
4325 runtime.state.store_spawned_task(server_task_id, stored);
4326
4327 handle.stop();
4329
4330 {
4331 runtime.scheduler.lock().schedule(server_task_id, 0);
4332 }
4333 runtime.run_until_quiescent();
4334
4335 assert_eq!(
4337 init_ran.load(Ordering::SeqCst),
4338 0,
4339 "init should be skipped when pre-cancelled"
4340 );
4341 assert_eq!(
4343 stop_ran.load(Ordering::SeqCst),
4344 1,
4345 "stop must run even when pre-cancelled"
4346 );
4347
4348 crate::test_complete!("init_skipped_when_pre_cancelled_but_stop_runs");
4349 }
4350
4351 #[test]
4354 fn init_budget_consumption_propagates_to_main_budget() {
4355 #[allow(clippy::items_after_statements)]
4356 struct BudgetCheckProbe {
4357 loop_quota: Arc<AtomicU64>,
4358 }
4359
4360 #[allow(clippy::items_after_statements)]
4361 impl GenServer for BudgetCheckProbe {
4362 type Call = CounterCall;
4363 type Reply = u64;
4364 type Cast = CounterCast;
4365 type Info = SystemMsg;
4366
4367 fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4368 let _ = cx.budget();
4372 Box::pin(async {})
4373 }
4374
4375 fn on_start_budget(&self) -> Budget {
4376 Budget::new().with_poll_quota(50).with_priority(200)
4378 }
4379
4380 fn handle_call(
4381 &mut self,
4382 cx: &Cx,
4383 _request: CounterCall,
4384 reply: Reply<u64>,
4385 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4386 self.loop_quota
4388 .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
4389 let _ = reply.send(0);
4390 Box::pin(async {})
4391 }
4392 }
4393
4394 init_test("init_budget_consumption_propagates_to_main_budget");
4395
4396 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4397 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4398 let region = runtime.state.create_root_region(budget);
4399 let cx = Cx::for_testing();
4400 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4401
4402 let loop_quota = Arc::new(AtomicU64::new(0));
4403
4404 let server = BudgetCheckProbe {
4405 loop_quota: Arc::clone(&loop_quota),
4406 };
4407
4408 let (handle, stored) = scope
4409 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4410 .unwrap();
4411 let server_task_id = handle.task_id();
4412 runtime.state.store_spawned_task(server_task_id, stored);
4413
4414 let server_ref = handle.server_ref();
4415 let (client, stored_client) = scope
4416 .spawn(&mut runtime.state, &cx, move |cx| async move {
4417 let _ = server_ref.call(&cx, CounterCall::Get).await;
4418 })
4419 .unwrap();
4420 let client_task_id = client.task_id();
4421 runtime
4422 .state
4423 .store_spawned_task(client_task_id, stored_client);
4424
4425 {
4426 runtime.scheduler.lock().schedule(server_task_id, 0);
4427 }
4428 {
4429 runtime.scheduler.lock().schedule(client_task_id, 0);
4430 }
4431 runtime.run_until_quiescent();
4432
4433 let remaining = loop_quota.load(Ordering::SeqCst);
4436 assert!(
4437 remaining <= 10_000,
4438 "main budget after init must be <= original ({remaining} <= 10000)"
4439 );
4440 assert!(
4441 remaining > 0,
4442 "main budget should still have polls remaining"
4443 );
4444
4445 drop(handle);
4446 {
4447 runtime.scheduler.lock().schedule(server_task_id, 0);
4448 }
4449 runtime.run_until_quiescent();
4450
4451 crate::test_complete!("init_budget_consumption_propagates_to_main_budget");
4452 }
4453
4454 #[test]
4456 fn stop_budget_constrains_stop_phase() {
4457 struct StopBudgetProbe {
4458 stop_poll_quota: Arc<AtomicU64>,
4459 }
4460
4461 impl GenServer for StopBudgetProbe {
4462 type Call = CounterCall;
4463 type Reply = u64;
4464 type Cast = CounterCast;
4465 type Info = SystemMsg;
4466
4467 fn on_stop_budget(&self) -> Budget {
4468 Budget::new().with_poll_quota(42).with_priority(250)
4469 }
4470
4471 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4472 self.stop_poll_quota
4473 .store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
4474 Box::pin(async {})
4475 }
4476
4477 fn handle_call(
4478 &mut self,
4479 _cx: &Cx,
4480 _request: CounterCall,
4481 reply: Reply<u64>,
4482 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4483 let _ = reply.send(0);
4484 Box::pin(async {})
4485 }
4486 }
4487
4488 init_test("stop_budget_constrains_stop_phase");
4489
4490 let budget = Budget::new().with_poll_quota(10_000);
4491 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4492 let region = runtime.state.create_root_region(budget);
4493 let cx = Cx::for_testing();
4494 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4495
4496 let stop_poll_quota = Arc::new(AtomicU64::new(0));
4497
4498 let server = StopBudgetProbe {
4499 stop_poll_quota: Arc::clone(&stop_poll_quota),
4500 };
4501
4502 let (handle, stored) = scope
4503 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4504 .unwrap();
4505 let server_task_id = handle.task_id();
4506 runtime.state.store_spawned_task(server_task_id, stored);
4507
4508 handle.stop();
4510
4511 {
4512 runtime.scheduler.lock().schedule(server_task_id, 0);
4513 }
4514 runtime.run_until_quiescent();
4515
4516 let stop_quota = stop_poll_quota.load(Ordering::SeqCst);
4517 assert_eq!(stop_quota, 42, "stop phase should use the tighter budget");
4520
4521 crate::test_complete!("stop_budget_constrains_stop_phase");
4522 }
4523
4524 #[test]
4527 fn lifecycle_init_before_stop() {
4528 #[allow(clippy::items_after_statements)]
4529 struct PhaseTracker {
4530 phases: Arc<Mutex<Vec<&'static str>>>,
4531 }
4532
4533 #[allow(clippy::items_after_statements)]
4534 impl GenServer for PhaseTracker {
4535 type Call = CounterCall;
4536 type Reply = u64;
4537 type Cast = CounterCast;
4538 type Info = SystemMsg;
4539
4540 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4541 self.phases.lock().push("init");
4542 Box::pin(async {})
4543 }
4544
4545 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4546 self.phases.lock().push("stop");
4547 Box::pin(async {})
4548 }
4549
4550 fn handle_call(
4551 &mut self,
4552 _cx: &Cx,
4553 _request: CounterCall,
4554 reply: Reply<u64>,
4555 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4556 let _ = reply.send(0);
4557 Box::pin(async {})
4558 }
4559
4560 fn handle_cast(
4561 &mut self,
4562 _cx: &Cx,
4563 _msg: CounterCast,
4564 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4565 Box::pin(async {})
4566 }
4567 }
4568
4569 init_test("lifecycle_init_before_stop");
4570
4571 let budget = Budget::new().with_poll_quota(10_000);
4572 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4573 let region = runtime.state.create_root_region(budget);
4574 let cx = Cx::for_testing();
4575 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4576
4577 let phases = Arc::new(Mutex::new(Vec::<&'static str>::new()));
4578
4579 let server = PhaseTracker {
4580 phases: Arc::clone(&phases),
4581 };
4582
4583 let (handle, stored) = scope
4584 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4585 .unwrap();
4586 let server_task_id = handle.task_id();
4587 runtime.state.store_spawned_task(server_task_id, stored);
4588
4589 {
4591 runtime.scheduler.lock().schedule(server_task_id, 0);
4592 }
4593 runtime.run_until_idle();
4594
4595 let phases_clone = Arc::clone(&phases);
4597 handle.stop();
4598 {
4599 runtime.scheduler.lock().schedule(server_task_id, 0);
4600 }
4601 runtime.run_until_idle();
4602
4603 {
4604 let recorded = phases_clone.lock();
4605
4606 assert!(
4608 recorded.contains(&"stop"),
4609 "stop phase must run, got {:?}",
4610 *recorded
4611 );
4612
4613 if let Some(init_pos) = recorded.iter().position(|p| *p == "init") {
4615 let stop_pos = recorded.iter().position(|p| *p == "stop").unwrap();
4616 assert!(
4617 init_pos < stop_pos,
4618 "init must precede stop, got {:?}",
4619 *recorded
4620 );
4621 }
4622
4623 drop(recorded);
4624 }
4625
4626 crate::test_complete!("lifecycle_init_before_stop");
4627 }
4628
4629 #[test]
4632 fn stop_budget_priority_applied() {
4633 #[allow(clippy::items_after_statements)]
4634 struct StopPriorityProbe {
4635 stop_priority: Arc<AtomicU8>,
4636 }
4637
4638 #[allow(clippy::items_after_statements)]
4639 impl GenServer for StopPriorityProbe {
4640 type Call = CounterCall;
4641 type Reply = u64;
4642 type Cast = CounterCast;
4643 type Info = SystemMsg;
4644
4645 fn on_stop_budget(&self) -> Budget {
4646 Budget::new().with_poll_quota(200).with_priority(240)
4647 }
4648
4649 fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4650 self.stop_priority
4651 .store(cx.budget().priority, Ordering::SeqCst);
4652 Box::pin(async {})
4653 }
4654
4655 fn handle_call(
4656 &mut self,
4657 _cx: &Cx,
4658 _request: CounterCall,
4659 reply: Reply<u64>,
4660 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
4661 let _ = reply.send(0);
4662 Box::pin(async {})
4663 }
4664 }
4665
4666 init_test("stop_budget_priority_applied");
4667
4668 let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
4669 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4670 let region = runtime.state.create_root_region(budget);
4671 let cx = Cx::for_testing();
4672 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4673
4674 let stop_priority = Arc::new(AtomicU8::new(0));
4675
4676 let server = StopPriorityProbe {
4677 stop_priority: Arc::clone(&stop_priority),
4678 };
4679
4680 let (handle, stored) = scope
4681 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
4682 .unwrap();
4683 let server_task_id = handle.task_id();
4684 runtime.state.store_spawned_task(server_task_id, stored);
4685
4686 handle.stop();
4687
4688 {
4689 runtime.scheduler.lock().schedule(server_task_id, 0);
4690 }
4691 runtime.run_until_quiescent();
4692
4693 let actual_priority = stop_priority.load(Ordering::SeqCst);
4696 assert!(
4697 actual_priority >= 10,
4698 "stop priority should be at least original ({actual_priority} >= 10)"
4699 );
4700
4701 crate::test_complete!("stop_budget_priority_applied");
4702 }
4703
4704 #[test]
4718 fn conformance_cancel_propagation_to_queued_calls() {
4719 init_test("conformance_cancel_propagation_to_queued_calls");
4720
4721 let budget = Budget::new().with_poll_quota(50_000);
4722 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4723 let region = runtime.state.create_root_region(budget);
4724 let cx = Cx::for_testing();
4725 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4726
4727 let (handle, stored) = scope
4729 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
4730 .unwrap();
4731 let server_task_id = handle.task_id();
4732 runtime.state.store_spawned_task(server_task_id, stored);
4733
4734 let server_ref_1 = handle.server_ref();
4735 let server_ref_2 = handle.server_ref();
4736
4737 let result_1: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4739 let result_1_clone = Arc::clone(&result_1);
4740 let (c1_handle, c1_stored) = scope
4741 .spawn(&mut runtime.state, &cx, move |cx| async move {
4742 let r = server_ref_1.call(&cx, CounterCall::Add(10)).await;
4743 *result_1_clone.lock() = Some(r);
4744 })
4745 .unwrap();
4746 let c1_id = c1_handle.task_id();
4747 runtime.state.store_spawned_task(c1_id, c1_stored);
4748
4749 let result_2: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4751 let result_2_clone = Arc::clone(&result_2);
4752 let (c2_handle, c2_stored) = scope
4753 .spawn(&mut runtime.state, &cx, move |cx| async move {
4754 let r = server_ref_2.call(&cx, CounterCall::Add(20)).await;
4755 *result_2_clone.lock() = Some(r);
4756 })
4757 .unwrap();
4758 let c2_id = c2_handle.task_id();
4759 runtime.state.store_spawned_task(c2_id, c2_stored);
4760
4761 {
4763 let mut sched = runtime.scheduler.lock();
4764 sched.schedule(server_task_id, 0);
4765 sched.schedule(c1_id, 0);
4766 sched.schedule(c2_id, 0);
4767 }
4768 runtime.run_until_idle();
4769
4770 handle.stop();
4772 {
4773 let mut sched = runtime.scheduler.lock();
4774 sched.schedule(server_task_id, 0);
4775 sched.schedule(c1_id, 0);
4776 sched.schedule(c2_id, 0);
4777 }
4778 runtime.run_until_quiescent();
4779
4780 drop(result_2.lock());
4785
4786 crate::test_complete!("conformance_cancel_propagation_to_queued_calls");
4787 }
4788
4789 #[test]
4791 fn conformance_stopped_server_rejects_new_messages() {
4792 init_test("conformance_stopped_server_rejects_new_messages");
4793
4794 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4795 let region = runtime.state.create_root_region(Budget::INFINITE);
4796 let cx = Cx::for_testing();
4797 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4798
4799 let (handle, stored) = scope
4800 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4801 .unwrap();
4802 let server_task_id = handle.task_id();
4803 runtime.state.store_spawned_task(server_task_id, stored);
4804
4805 let server_ref = handle.server_ref();
4806
4807 {
4809 runtime.scheduler.lock().schedule(server_task_id, 0);
4810 }
4811 runtime.run_until_idle();
4812
4813 handle.stop();
4815 {
4816 runtime.scheduler.lock().schedule(server_task_id, 0);
4817 }
4818 runtime.run_until_quiescent();
4819
4820 let cast_result = server_ref.try_cast(CounterCast::Reset);
4822 assert!(cast_result.is_err(), "cast to stopped server must fail");
4823
4824 crate::test_complete!("conformance_stopped_server_rejects_new_messages");
4825 }
4826
4827 #[test]
4831 fn conformance_full_lifecycle_no_obligation_leaks() {
4832 init_test("conformance_full_lifecycle_no_obligation_leaks");
4833
4834 let budget = Budget::new().with_poll_quota(100_000);
4835 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4836 let region = runtime.state.create_root_region(budget);
4837 let cx = Cx::for_testing();
4838 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4839
4840 let (handle, stored) = scope
4841 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4842 .unwrap();
4843 let server_task_id = handle.task_id();
4844 runtime.state.store_spawned_task(server_task_id, stored);
4845
4846 let server_ref = handle.server_ref();
4847
4848 server_ref.try_cast(CounterCast::Reset).unwrap();
4850
4851 let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
4852 let call_result_clone = Arc::clone(&call_result);
4853 let server_ref_for_call = handle.server_ref();
4854 let (client, client_stored) = scope
4855 .spawn(&mut runtime.state, &cx, move |cx| async move {
4856 let r = server_ref_for_call.call(&cx, CounterCall::Add(42)).await;
4857 *call_result_clone.lock() = Some(r);
4858 })
4859 .unwrap();
4860 let client_id = client.task_id();
4861 runtime.state.store_spawned_task(client_id, client_stored);
4862
4863 {
4865 let mut sched = runtime.scheduler.lock();
4866 sched.schedule(server_task_id, 0);
4867 sched.schedule(client_id, 0);
4868 }
4869 runtime.run_until_idle();
4870
4871 {
4873 let mut sched = runtime.scheduler.lock();
4874 sched.schedule(server_task_id, 0);
4875 sched.schedule(client_id, 0);
4876 }
4877 runtime.run_until_idle();
4878
4879 let call_r = call_result.lock();
4881 if let Some(ref r) = *call_r {
4882 match r {
4883 Ok(value) => assert_eq!(*value, 42, "counter should be 42 after Add(42)"),
4884 Err(e) => unreachable!("unexpected call error: {e:?}"),
4885 }
4886 }
4887 drop(call_r);
4888
4889 server_ref.try_cast(CounterCast::Reset).unwrap();
4891
4892 handle.stop();
4894 {
4895 runtime.scheduler.lock().schedule(server_task_id, 0);
4896 }
4897 runtime.run_until_quiescent();
4898
4899 crate::test_complete!("conformance_full_lifecycle_no_obligation_leaks");
4902 }
4903
4904 #[test]
4907 #[allow(clippy::items_after_statements)]
4908 fn conformance_deterministic_replay_with_seed() {
4909 init_test("conformance_deterministic_replay_with_seed");
4910
4911 fn run_scenario(seed: u64) -> Vec<u64> {
4912 let config = crate::lab::LabConfig::new(seed);
4913 let mut runtime = crate::lab::LabRuntime::new(config);
4914 let budget = Budget::new().with_poll_quota(100_000);
4915 let region = runtime.state.create_root_region(budget);
4916 let cx = Cx::for_testing();
4917 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
4918
4919 let (handle, stored) = scope
4920 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
4921 .unwrap();
4922 let server_task_id = handle.task_id();
4923 runtime.state.store_spawned_task(server_task_id, stored);
4924
4925 let results: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
4926
4927 let mut client_ids = Vec::new();
4929 for i in 1..=3u64 {
4930 let server_ref = handle.server_ref();
4931 let results_clone = Arc::clone(&results);
4932 let (ch, cs) = scope
4933 .spawn(&mut runtime.state, &cx, move |cx| async move {
4934 if let Ok(val) = server_ref.call(&cx, CounterCall::Add(i * 10)).await {
4935 results_clone.lock().push(val);
4936 }
4937 })
4938 .unwrap();
4939 let cid = ch.task_id();
4940 runtime.state.store_spawned_task(cid, cs);
4941 client_ids.push(cid);
4942 }
4943
4944 {
4946 let mut sched = runtime.scheduler.lock();
4947 sched.schedule(server_task_id, 0);
4948 for &cid in &client_ids {
4949 sched.schedule(cid, 0);
4950 }
4951 }
4952 runtime.run_until_idle();
4953
4954 {
4956 let mut sched = runtime.scheduler.lock();
4957 sched.schedule(server_task_id, 0);
4958 for &cid in &client_ids {
4959 sched.schedule(cid, 0);
4960 }
4961 }
4962 runtime.run_until_idle();
4963
4964 handle.stop();
4966 {
4967 runtime.scheduler.lock().schedule(server_task_id, 0);
4968 }
4969 runtime.run_until_quiescent();
4970
4971 results.lock().clone()
4972 }
4973
4974 let run_a = run_scenario(42);
4976 let run_b = run_scenario(42);
4977 assert_eq!(
4978 run_a, run_b,
4979 "same seed must produce identical results: {run_a:?} vs {run_b:?}"
4980 );
4981
4982 crate::test_complete!("conformance_deterministic_replay_with_seed");
4983 }
4984
4985 #[test]
4987 fn conformance_mailbox_overflow_reject_deterministic() {
4988 init_test("conformance_mailbox_overflow_reject_deterministic");
4989
4990 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
4991 let region = runtime.state.create_root_region(Budget::INFINITE);
4992 let cx = Cx::for_testing();
4993 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
4994
4995 let (handle, stored) = scope
4997 .spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 2)
4998 .unwrap();
4999 let server_task_id = handle.task_id();
5000 runtime.state.store_spawned_task(server_task_id, stored);
5001
5002 let server_ref = handle.server_ref();
5003
5004 server_ref.try_cast(CounterCast::Reset).unwrap();
5006 server_ref.try_cast(CounterCast::Reset).unwrap();
5007
5008 let overflow = server_ref.try_cast(CounterCast::Reset);
5010 assert!(
5011 overflow.is_err(),
5012 "third cast to capacity-2 mailbox must fail with Reject policy"
5013 );
5014 match overflow.unwrap_err() {
5015 CastError::Full => { }
5016 other => unreachable!("expected CastError::Full, got {other:?}"),
5017 }
5018
5019 drop(handle);
5021 {
5022 runtime.scheduler.lock().schedule(server_task_id, 0);
5023 }
5024 runtime.run_until_quiescent();
5025
5026 crate::test_complete!("conformance_mailbox_overflow_reject_deterministic");
5027 }
5028
5029 #[test]
5031 fn conformance_mailbox_drop_oldest_preserves_newest() {
5032 init_test("conformance_mailbox_drop_oldest_preserves_newest");
5033
5034 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5035 let region = runtime.state.create_root_region(Budget::INFINITE);
5036 let cx = Cx::for_testing();
5037 let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
5038
5039 let (handle, stored) = scope
5041 .spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 2)
5042 .unwrap();
5043 let server_task_id = handle.task_id();
5044 runtime.state.store_spawned_task(server_task_id, stored);
5045
5046 let server_ref = handle.server_ref();
5047
5048 server_ref.try_cast(TaggedCast::Set(1)).unwrap();
5050 server_ref.try_cast(TaggedCast::Set(2)).unwrap();
5051
5052 server_ref.try_cast(TaggedCast::Set(100)).unwrap();
5054
5055 {
5057 runtime.scheduler.lock().schedule(server_task_id, 0);
5058 }
5059 runtime.run_until_idle();
5060
5061 let result_ref = handle.server_ref();
5063 let result: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
5064 let result_clone = Arc::clone(&result);
5065 let (ch, cs) = scope
5066 .spawn(&mut runtime.state, &cx, move |cx| async move {
5067 if let Ok(val) = result_ref.call(&cx, CounterCall::Get).await {
5068 *result_clone.lock() = Some(val);
5069 }
5070 })
5071 .unwrap();
5072 let cid = ch.task_id();
5073 runtime.state.store_spawned_task(cid, cs);
5074
5075 {
5076 let mut sched = runtime.scheduler.lock();
5077 sched.schedule(server_task_id, 0);
5078 sched.schedule(cid, 0);
5079 }
5080 runtime.run_until_idle();
5081 {
5082 let mut sched = runtime.scheduler.lock();
5083 sched.schedule(server_task_id, 0);
5084 sched.schedule(cid, 0);
5085 }
5086 runtime.run_until_idle();
5087
5088 assert_eq!(
5091 *result.lock(),
5092 Some(100),
5093 "DropOldest should evict oldest, keeping newest"
5094 );
5095
5096 drop(handle);
5097 {
5098 runtime.scheduler.lock().schedule(server_task_id, 0);
5099 }
5100 runtime.run_until_quiescent();
5101
5102 crate::test_complete!("conformance_mailbox_drop_oldest_preserves_newest");
5103 }
5104
5105 #[test]
5108 #[allow(clippy::items_after_statements)]
5109 fn conformance_budget_driven_call_timeout() {
5110 struct SlowServer;
5113 impl GenServer for SlowServer {
5114 type Call = ();
5115 type Reply = ();
5116 type Cast = ();
5117 type Info = SystemMsg;
5118
5119 fn handle_call(
5120 &mut self,
5121 _cx: &Cx,
5122 _request: (),
5123 reply: Reply<()>,
5124 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5125 let _proof = reply.abort();
5127 Box::pin(async {})
5128 }
5129 }
5130
5131 init_test("conformance_budget_driven_call_timeout");
5132
5133 let budget = Budget::new().with_poll_quota(100_000);
5134 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5135 let region = runtime.state.create_root_region(budget);
5136 let cx = Cx::for_testing();
5137 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5138
5139 let (handle, stored) = scope
5140 .spawn_gen_server(&mut runtime.state, &cx, SlowServer, 32)
5141 .unwrap();
5142 let server_task_id = handle.task_id();
5143 runtime.state.store_spawned_task(server_task_id, stored);
5144
5145 let server_ref = handle.server_ref();
5146
5147 let call_result: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
5150 let call_result_clone = Arc::clone(&call_result);
5151 let (ch, cs) = scope
5152 .spawn(&mut runtime.state, &cx, move |cx| async move {
5153 let r = server_ref.call(&cx, ()).await;
5154 *call_result_clone.lock() = Some(r);
5155 })
5156 .unwrap();
5157 let client_id = ch.task_id();
5158 runtime.state.store_spawned_task(client_id, cs);
5159
5160 {
5162 let mut sched = runtime.scheduler.lock();
5163 sched.schedule(server_task_id, 0);
5164 sched.schedule(client_id, 0);
5165 }
5166 runtime.run_until_idle();
5167 {
5168 let mut sched = runtime.scheduler.lock();
5169 sched.schedule(server_task_id, 0);
5170 sched.schedule(client_id, 0);
5171 }
5172 runtime.run_until_idle();
5173
5174 if let Some(ref result) = *call_result.lock() {
5176 assert!(result.is_err(), "aborted reply should result in call error");
5177 }
5178
5179 handle.stop();
5181 {
5182 runtime.scheduler.lock().schedule(server_task_id, 0);
5183 }
5184 runtime.run_until_quiescent();
5185
5186 crate::test_complete!("conformance_budget_driven_call_timeout");
5187 }
5188
5189 #[test]
5192 #[allow(clippy::items_after_statements)]
5193 fn conformance_reply_linearity_send_commits() {
5194 struct ReplyTracker {
5196 committed: Arc<AtomicU8>,
5197 }
5198
5199 impl GenServer for ReplyTracker {
5200 type Call = u64;
5201 type Reply = u64;
5202 type Cast = ();
5203 type Info = SystemMsg;
5204
5205 fn handle_call(
5206 &mut self,
5207 _cx: &Cx,
5208 request: u64,
5209 reply: Reply<u64>,
5210 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5211 match reply.send(request * 2) {
5212 ReplyOutcome::Committed(_proof) => {
5213 self.committed.store(1, Ordering::SeqCst);
5214 }
5215 ReplyOutcome::CallerGone => {
5216 self.committed.store(2, Ordering::SeqCst);
5217 }
5218 }
5219 Box::pin(async {})
5220 }
5221 }
5222
5223 init_test("conformance_reply_linearity_send_commits");
5224
5225 let budget = Budget::new().with_poll_quota(100_000);
5226 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5227 let region = runtime.state.create_root_region(budget);
5228 let cx = Cx::for_testing();
5229 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5230
5231 let committed = Arc::new(AtomicU8::new(0));
5232 let server = ReplyTracker {
5233 committed: Arc::clone(&committed),
5234 };
5235
5236 let (handle, stored) = scope
5237 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5238 .unwrap();
5239 let server_task_id = handle.task_id();
5240 runtime.state.store_spawned_task(server_task_id, stored);
5241
5242 let server_ref = handle.server_ref();
5243 let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
5244 let call_result_clone = Arc::clone(&call_result);
5245
5246 let (ch, cs) = scope
5247 .spawn(&mut runtime.state, &cx, move |cx| async move {
5248 let r = server_ref.call(&cx, 21).await;
5249 *call_result_clone.lock() = Some(r);
5250 })
5251 .unwrap();
5252 let client_id = ch.task_id();
5253 runtime.state.store_spawned_task(client_id, cs);
5254
5255 {
5256 let mut sched = runtime.scheduler.lock();
5257 sched.schedule(server_task_id, 0);
5258 sched.schedule(client_id, 0);
5259 }
5260 runtime.run_until_idle();
5261 {
5262 let mut sched = runtime.scheduler.lock();
5263 sched.schedule(server_task_id, 0);
5264 sched.schedule(client_id, 0);
5265 }
5266 runtime.run_until_idle();
5267
5268 assert_eq!(
5270 committed.load(Ordering::SeqCst),
5271 1,
5272 "reply must be committed when caller is waiting"
5273 );
5274
5275 {
5277 let r = call_result.lock();
5278 match r.as_ref() {
5279 Some(Ok(value)) => assert_eq!(*value, 42, "21 * 2 = 42"),
5280 other => unreachable!("expected Ok(42), got {other:?}"),
5281 }
5282 }
5283
5284 handle.stop();
5285 {
5286 runtime.scheduler.lock().schedule(server_task_id, 0);
5287 }
5288 runtime.run_until_quiescent();
5289
5290 crate::test_complete!("conformance_reply_linearity_send_commits");
5291 }
5292
5293 #[test]
5296 #[allow(clippy::items_after_statements)]
5297 fn conformance_reply_linearity_abort_is_clean() {
5298 struct AbortServer {
5299 aborted: Arc<AtomicU8>,
5300 }
5301
5302 impl GenServer for AbortServer {
5303 type Call = ();
5304 type Reply = ();
5305 type Cast = ();
5306 type Info = SystemMsg;
5307
5308 fn handle_call(
5309 &mut self,
5310 _cx: &Cx,
5311 _request: (),
5312 reply: Reply<()>,
5313 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5314 let _proof = reply.abort();
5315 self.aborted.store(1, Ordering::SeqCst);
5316 Box::pin(async {})
5317 }
5318 }
5319
5320 init_test("conformance_reply_linearity_abort_is_clean");
5321
5322 let budget = Budget::new().with_poll_quota(100_000);
5323 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5324 let region = runtime.state.create_root_region(budget);
5325 let cx = Cx::for_testing();
5326 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5327
5328 let aborted = Arc::new(AtomicU8::new(0));
5329 let server = AbortServer {
5330 aborted: Arc::clone(&aborted),
5331 };
5332
5333 let (handle, stored) = scope
5334 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5335 .unwrap();
5336 let server_task_id = handle.task_id();
5337 runtime.state.store_spawned_task(server_task_id, stored);
5338
5339 let server_ref = handle.server_ref();
5340 let call_err: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
5341 let call_err_clone = Arc::clone(&call_err);
5342
5343 let (ch, cs) = scope
5344 .spawn(&mut runtime.state, &cx, move |cx| async move {
5345 let r = server_ref.call(&cx, ()).await;
5346 *call_err_clone.lock() = Some(r);
5347 })
5348 .unwrap();
5349 let client_id = ch.task_id();
5350 runtime.state.store_spawned_task(client_id, cs);
5351
5352 {
5353 let mut sched = runtime.scheduler.lock();
5354 sched.schedule(server_task_id, 0);
5355 sched.schedule(client_id, 0);
5356 }
5357 runtime.run_until_idle();
5358 {
5359 let mut sched = runtime.scheduler.lock();
5360 sched.schedule(server_task_id, 0);
5361 sched.schedule(client_id, 0);
5362 }
5363 runtime.run_until_idle();
5364
5365 assert_eq!(
5367 aborted.load(Ordering::SeqCst),
5368 1,
5369 "server must have called abort()"
5370 );
5371
5372 {
5374 let r = call_err.lock();
5375 match r.as_ref() {
5376 Some(Err(_)) => { }
5377 other => unreachable!("expected call error after abort, got {other:?}"),
5378 }
5379 }
5380
5381 handle.stop();
5382 {
5383 runtime.scheduler.lock().schedule(server_task_id, 0);
5384 }
5385 runtime.run_until_quiescent();
5386
5387 crate::test_complete!("conformance_reply_linearity_abort_is_clean");
5388 }
5389
5390 #[test]
5391 #[allow(clippy::items_after_statements)]
5392 fn conformance_panicking_handle_call_returns_join_error_without_double_panic() {
5393 #[derive(Debug)]
5394 struct PanicOnCall;
5395
5396 impl GenServer for PanicOnCall {
5397 type Call = ();
5398 type Reply = ();
5399 type Cast = ();
5400 type Info = SystemMsg;
5401
5402 fn handle_call(
5403 &mut self,
5404 _cx: &Cx,
5405 _request: (),
5406 _reply: Reply<()>,
5407 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5408 std::panic::panic_any("intentional handle_call panic");
5409 }
5410 }
5411
5412 init_test("conformance_panicking_handle_call_returns_join_error_without_double_panic");
5413
5414 let budget = Budget::new().with_poll_quota(100_000);
5415 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5416 let region = runtime.state.create_root_region(budget);
5417 let cx = Cx::for_testing();
5418 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5419
5420 let (mut handle, stored) = scope
5421 .spawn_gen_server(&mut runtime.state, &cx, PanicOnCall, 32)
5422 .unwrap();
5423 let server_task_id = handle.task_id();
5424 runtime.state.store_spawned_task(server_task_id, stored);
5425
5426 let server_ref = handle.server_ref();
5427 let (mut client_handle, client_stored) = scope
5428 .spawn(&mut runtime.state, &cx, move |cx| async move {
5429 server_ref.call(&cx, ()).await
5430 })
5431 .unwrap();
5432 let client_task_id = client_handle.task_id();
5433 runtime
5434 .state
5435 .store_spawned_task(client_task_id, client_stored);
5436
5437 {
5438 let mut sched = runtime.scheduler.lock();
5439 sched.schedule(server_task_id, 0);
5440 sched.schedule(client_task_id, 0);
5441 }
5442 runtime.run_until_idle();
5443 {
5444 let mut sched = runtime.scheduler.lock();
5445 sched.schedule(server_task_id, 0);
5446 sched.schedule(client_task_id, 0);
5447 }
5448 runtime.run_until_idle();
5449
5450 let join = futures_lite::future::block_on(handle.join(&cx));
5451 assert!(
5452 matches!(join, Err(JoinError::Panicked(_))),
5453 "panicking call handler should surface JoinError::Panicked"
5454 );
5455
5456 let client_result =
5457 futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
5458 assert!(
5459 matches!(client_result, Err(CallError::NoReply)),
5460 "caller should observe closed reply after panic, got {client_result:?}"
5461 );
5462
5463 crate::test_complete!(
5464 "conformance_panicking_handle_call_returns_join_error_without_double_panic"
5465 );
5466 }
5467
5468 #[test]
5471 #[allow(clippy::items_after_statements)]
5472 fn conformance_drain_processes_queued_casts_on_stop() {
5473 struct AccumulatorServer {
5474 sum: u64,
5475 final_sum: Arc<AtomicU64>,
5476 }
5477
5478 enum AccumCall {
5479 #[allow(dead_code)]
5480 GetSum,
5481 }
5482 enum AccumCast {
5483 Add(u64),
5484 }
5485
5486 impl GenServer for AccumulatorServer {
5487 type Call = AccumCall;
5488 type Reply = u64;
5489 type Cast = AccumCast;
5490 type Info = SystemMsg;
5491
5492 fn handle_call(
5493 &mut self,
5494 _cx: &Cx,
5495 _request: AccumCall,
5496 reply: Reply<u64>,
5497 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5498 let _ = reply.send(self.sum);
5499 Box::pin(async {})
5500 }
5501
5502 fn handle_cast(
5503 &mut self,
5504 _cx: &Cx,
5505 msg: AccumCast,
5506 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5507 match msg {
5508 AccumCast::Add(n) => self.sum += n,
5509 }
5510 Box::pin(async {})
5511 }
5512
5513 fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5514 self.final_sum.store(self.sum, Ordering::SeqCst);
5515 Box::pin(async {})
5516 }
5517 }
5518
5519 init_test("conformance_drain_processes_queued_casts_on_stop");
5520
5521 let budget = Budget::new().with_poll_quota(100_000);
5522 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
5523 let region = runtime.state.create_root_region(budget);
5524 let cx = Cx::for_testing();
5525 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5526
5527 let final_sum = Arc::new(AtomicU64::new(0));
5528 let server = AccumulatorServer {
5529 sum: 0,
5530 final_sum: Arc::clone(&final_sum),
5531 };
5532
5533 let (handle, stored) = scope
5534 .spawn_gen_server(&mut runtime.state, &cx, server, 32)
5535 .unwrap();
5536 let server_task_id = handle.task_id();
5537 runtime.state.store_spawned_task(server_task_id, stored);
5538
5539 let server_ref = handle.server_ref();
5540
5541 server_ref.try_cast(AccumCast::Add(10)).unwrap();
5543 server_ref.try_cast(AccumCast::Add(20)).unwrap();
5544 server_ref.try_cast(AccumCast::Add(30)).unwrap();
5545
5546 {
5548 runtime.scheduler.lock().schedule(server_task_id, 0);
5549 }
5550 runtime.run_until_idle();
5551
5552 handle.stop();
5554 {
5555 runtime.scheduler.lock().schedule(server_task_id, 0);
5556 }
5557 runtime.run_until_quiescent();
5558
5559 let sum = final_sum.load(Ordering::SeqCst);
5561 assert_eq!(
5562 sum, 60,
5563 "server must drain queued casts before stopping: 10+20+30=60, got {sum}"
5564 );
5565
5566 crate::test_complete!("conformance_drain_processes_queued_casts_on_stop");
5567 }
5568
5569 #[test]
5575 fn named_server_register_and_whereis() {
5576 crate::test_utils::init_test_logging();
5577 crate::test_phase!("named_server_register_and_whereis");
5578
5579 let budget = Budget::new().with_poll_quota(100_000);
5580 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5581 let region = runtime.state.create_root_region(budget);
5582 let cx = Cx::for_testing();
5583 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5584 let mut registry = crate::cx::NameRegistry::new();
5585
5586 #[allow(clippy::items_after_statements)]
5587 #[derive(Debug)]
5588 struct Counter(u64);
5589
5590 #[allow(clippy::items_after_statements)]
5591 impl GenServer for Counter {
5592 type Call = u64;
5593 type Reply = u64;
5594 type Cast = ();
5595 type Info = SystemMsg;
5596
5597 fn handle_call(
5598 &mut self,
5599 _cx: &Cx,
5600 request: u64,
5601 reply: Reply<u64>,
5602 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5603 self.0 += request;
5604 let _ = reply.send(self.0);
5605 Box::pin(async {})
5606 }
5607
5608 fn handle_cast(
5609 &mut self,
5610 _cx: &Cx,
5611 _msg: (),
5612 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5613 Box::pin(async {})
5614 }
5615 }
5616
5617 let now = crate::types::Time::ZERO;
5618 let (mut named_handle, stored) = scope
5619 .spawn_named_gen_server(
5620 &mut runtime.state,
5621 &cx,
5622 &mut registry,
5623 "my_counter",
5624 Counter(0),
5625 32,
5626 now,
5627 )
5628 .unwrap();
5629
5630 let task_id = named_handle.task_id();
5631 runtime.state.store_spawned_task(task_id, stored);
5632
5633 assert_eq!(registry.whereis("my_counter"), Some(task_id));
5635 assert_eq!(named_handle.name(), "my_counter");
5636
5637 named_handle.stop();
5639 {
5640 runtime.scheduler.lock().schedule(task_id, 0);
5641 }
5642 runtime.run_until_quiescent();
5643 let release_now = runtime.state.now;
5644 named_handle
5645 .release_name(&mut registry, release_now)
5646 .expect("release ok");
5647 assert!(
5648 registry.whereis("my_counter").is_none(),
5649 "name must be removed once shutdown completes",
5650 );
5651
5652 let (mut restarted, stored) = scope
5653 .spawn_named_gen_server(
5654 &mut runtime.state,
5655 &cx,
5656 &mut registry,
5657 "my_counter",
5658 Counter(0),
5659 32,
5660 now,
5661 )
5662 .expect("name should be reusable after release_name");
5663 runtime
5664 .state
5665 .store_spawned_task(restarted.task_id(), stored);
5666 restarted.stop();
5667 {
5668 runtime.scheduler.lock().schedule(restarted.task_id(), 0);
5669 }
5670 runtime.run_until_quiescent();
5671 let restart_release_now = runtime.state.now;
5672 restarted
5673 .release_name(&mut registry, restart_release_now)
5674 .expect("restart cleanup ok");
5675
5676 crate::test_complete!("named_server_register_and_whereis");
5677 }
5678
5679 #[test]
5681 fn named_server_release_name_requires_stopped_server() {
5682 crate::test_utils::init_test_logging();
5683 crate::test_phase!("named_server_release_name_requires_stopped_server");
5684
5685 let budget = Budget::new().with_poll_quota(100_000);
5686 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5687 let region = runtime.state.create_root_region(budget);
5688 let cx = Cx::for_testing();
5689 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5690 let mut registry = crate::cx::NameRegistry::new();
5691
5692 #[allow(clippy::items_after_statements)]
5693 #[derive(Debug)]
5694 struct Noop;
5695
5696 #[allow(clippy::items_after_statements)]
5697 impl GenServer for Noop {
5698 type Call = ();
5699 type Reply = ();
5700 type Cast = ();
5701 type Info = SystemMsg;
5702
5703 fn handle_call(
5704 &mut self,
5705 _cx: &Cx,
5706 _request: (),
5707 reply: Reply<()>,
5708 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5709 let _ = reply.send(());
5710 Box::pin(async {})
5711 }
5712 }
5713
5714 let now = crate::types::Time::ZERO;
5715 let (mut handle, stored) = scope
5716 .spawn_named_gen_server(
5717 &mut runtime.state,
5718 &cx,
5719 &mut registry,
5720 "still_running",
5721 Noop,
5722 8,
5723 now,
5724 )
5725 .unwrap();
5726 runtime.state.store_spawned_task(handle.task_id(), stored);
5727
5728 assert!(
5729 matches!(
5730 handle.release_name(&mut registry, now),
5731 Err(ReleaseNameError::StillRunning)
5732 ),
5733 "release_name must fail closed while the server is still live",
5734 );
5735 assert_eq!(
5736 registry.whereis("still_running"),
5737 Some(handle.task_id()),
5738 "premature release_name must not remove the registered name",
5739 );
5740
5741 handle.stop();
5742 assert!(
5743 matches!(
5744 handle.release_name(&mut registry, now),
5745 Err(ReleaseNameError::StillRunning)
5746 ),
5747 "release_name must keep failing closed after stop() until the task actually finishes",
5748 );
5749 assert_eq!(
5750 registry.whereis("still_running"),
5751 Some(handle.task_id()),
5752 "release_name during shutdown drain must not remove the registered name",
5753 );
5754 {
5755 runtime.scheduler.lock().schedule(handle.task_id(), 0);
5756 }
5757 runtime.run_until_quiescent();
5758 let release_now = runtime.state.now;
5759 handle
5760 .release_name(&mut registry, release_now)
5761 .expect("release after shutdown should succeed");
5762
5763 crate::test_complete!("named_server_release_name_requires_stopped_server");
5764 }
5765
5766 #[test]
5768 fn named_server_duplicate_name_rejected() {
5769 crate::test_utils::init_test_logging();
5770 crate::test_phase!("named_server_duplicate_name_rejected");
5771
5772 let budget = Budget::new().with_poll_quota(100_000);
5773 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5774 let region = runtime.state.create_root_region(budget);
5775 let cx = Cx::for_testing();
5776 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5777 let mut registry = crate::cx::NameRegistry::new();
5778
5779 #[allow(clippy::items_after_statements)]
5780 #[derive(Debug)]
5781 struct Dummy;
5782
5783 #[allow(clippy::items_after_statements)]
5784 impl GenServer for Dummy {
5785 type Call = ();
5786 type Reply = ();
5787 type Cast = ();
5788 type Info = SystemMsg;
5789
5790 fn handle_call(
5791 &mut self,
5792 _cx: &Cx,
5793 _request: (),
5794 reply: Reply<()>,
5795 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5796 let _ = reply.send(());
5797 Box::pin(async {})
5798 }
5799
5800 fn handle_cast(
5801 &mut self,
5802 _cx: &Cx,
5803 _msg: (),
5804 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5805 Box::pin(async {})
5806 }
5807 }
5808
5809 let now = crate::types::Time::ZERO;
5810
5811 let (mut h1, s1) = scope
5813 .spawn_named_gen_server(
5814 &mut runtime.state,
5815 &cx,
5816 &mut registry,
5817 "singleton",
5818 Dummy,
5819 8,
5820 now,
5821 )
5822 .unwrap();
5823 runtime.state.store_spawned_task(h1.task_id(), s1);
5824
5825 let result = scope.spawn_named_gen_server(
5827 &mut runtime.state,
5828 &cx,
5829 &mut registry,
5830 "singleton",
5831 Dummy,
5832 8,
5833 now,
5834 );
5835 assert!(
5836 matches!(result, Err(NamedSpawnError::NameTaken(_))),
5837 "duplicate name should be rejected"
5838 );
5839
5840 assert_eq!(registry.whereis("singleton"), Some(h1.task_id()));
5842
5843 let region_tasks = runtime.state.region(region).unwrap().task_ids();
5847 assert_eq!(
5848 region_tasks,
5849 vec![h1.task_id()],
5850 "region should only have the first task; orphaned task must be removed"
5851 );
5852
5853 h1.stop();
5854 runtime.scheduler.lock().schedule(h1.task_id(), 0);
5855 runtime.run_until_quiescent();
5856 let release_now = runtime.state.now;
5857 h1.release_name(&mut registry, release_now)
5858 .expect("release ok");
5859
5860 crate::test_complete!("named_server_duplicate_name_rejected");
5861 }
5862
5863 #[test]
5865 fn named_server_abort_lease_removes_name() {
5866 crate::test_utils::init_test_logging();
5867 crate::test_phase!("named_server_abort_lease_removes_name");
5868
5869 let budget = Budget::new().with_poll_quota(100_000);
5870 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5871 let region = runtime.state.create_root_region(budget);
5872 let cx = Cx::for_testing();
5873 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5874 let mut registry = crate::cx::NameRegistry::new();
5875
5876 #[allow(clippy::items_after_statements)]
5877 #[derive(Debug)]
5878 struct Noop;
5879
5880 #[allow(clippy::items_after_statements)]
5881 impl GenServer for Noop {
5882 type Call = ();
5883 type Reply = ();
5884 type Cast = ();
5885 type Info = SystemMsg;
5886
5887 fn handle_call(
5888 &mut self,
5889 _cx: &Cx,
5890 _req: (),
5891 reply: Reply<()>,
5892 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5893 let _ = reply.send(());
5894 Box::pin(async {})
5895 }
5896
5897 fn handle_cast(
5898 &mut self,
5899 _cx: &Cx,
5900 _msg: (),
5901 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5902 Box::pin(async {})
5903 }
5904 }
5905
5906 let now = crate::types::Time::ZERO;
5907 let (mut handle, stored) = scope
5908 .spawn_named_gen_server(
5909 &mut runtime.state,
5910 &cx,
5911 &mut registry,
5912 "temp_name",
5913 Noop,
5914 8,
5915 now,
5916 )
5917 .unwrap();
5918 runtime.state.store_spawned_task(handle.task_id(), stored);
5919 let mut alias = registry
5920 .register("temp_alias", handle.task_id(), scope.region_id(), now)
5921 .expect("second alias should register for same task");
5922
5923 assert!(registry.whereis("temp_name").is_some());
5925 assert_eq!(registry.whereis("temp_alias"), Some(handle.task_id()));
5926
5927 handle.abort_lease(&mut registry, now).unwrap();
5929 assert!(
5930 registry.whereis("temp_name").is_none(),
5931 "aborting the lease must remove the registry entry",
5932 );
5933 assert_eq!(
5934 registry.whereis("temp_alias"),
5935 Some(handle.task_id()),
5936 "aborting one named handle must not drop other names owned by the same task",
5937 );
5938 registry
5939 .unregister_owned_and_grant(&alias, now)
5940 .expect("manual alias cleanup should succeed");
5941 alias.abort().unwrap();
5942
5943 crate::test_complete!("named_server_abort_lease_removes_name");
5944 }
5945
5946 #[test]
5948 fn named_server_take_lease_manual_management() {
5949 crate::test_utils::init_test_logging();
5950 crate::test_phase!("named_server_take_lease_manual_management");
5951
5952 let budget = Budget::new().with_poll_quota(100_000);
5953 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
5954 let region = runtime.state.create_root_region(budget);
5955 let cx = Cx::for_testing();
5956 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
5957 let mut registry = crate::cx::NameRegistry::new();
5958
5959 #[allow(clippy::items_after_statements)]
5960 #[derive(Debug)]
5961 struct Noop2;
5962
5963 #[allow(clippy::items_after_statements)]
5964 impl GenServer for Noop2 {
5965 type Call = ();
5966 type Reply = ();
5967 type Cast = ();
5968 type Info = SystemMsg;
5969
5970 fn handle_call(
5971 &mut self,
5972 _cx: &Cx,
5973 _req: (),
5974 reply: Reply<()>,
5975 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5976 let _ = reply.send(());
5977 Box::pin(async {})
5978 }
5979
5980 fn handle_cast(
5981 &mut self,
5982 _cx: &Cx,
5983 _msg: (),
5984 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
5985 Box::pin(async {})
5986 }
5987 }
5988
5989 let now = crate::types::Time::ZERO;
5990 let (mut handle, stored) = scope
5991 .spawn_named_gen_server(
5992 &mut runtime.state,
5993 &cx,
5994 &mut registry,
5995 "manual_name",
5996 Noop2,
5997 8,
5998 now,
5999 )
6000 .unwrap();
6001 runtime.state.store_spawned_task(handle.task_id(), stored);
6002
6003 let mut lease = handle.take_lease().unwrap();
6005 assert!(handle.take_lease().is_none(), "second take returns None");
6006
6007 assert_eq!(handle.name(), "(released)");
6009
6010 registry
6013 .unregister_owned_and_grant(&lease, now)
6014 .expect("manual lease cleanup should remove the matching name");
6015 lease
6016 .abort()
6017 .expect("manual lease abort should resolve the token");
6018 assert!(
6019 registry.whereis("manual_name").is_none(),
6020 "manual lifecycle management must remove the registry entry as well as resolve the token",
6021 );
6022
6023 crate::test_complete!("named_server_take_lease_manual_management");
6024 }
6025
6026 #[test]
6028 fn named_server_release_name_after_take_lease_returns_already_resolved() {
6029 crate::test_utils::init_test_logging();
6030 crate::test_phase!("named_server_release_name_after_take_lease_returns_already_resolved");
6031
6032 let budget = Budget::new().with_poll_quota(100_000);
6033 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6034 let region = runtime.state.create_root_region(budget);
6035 let cx = Cx::for_testing();
6036 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6037 let mut registry = crate::cx::NameRegistry::new();
6038
6039 #[allow(clippy::items_after_statements)]
6040 #[derive(Debug)]
6041 struct Noop3;
6042
6043 #[allow(clippy::items_after_statements)]
6044 impl GenServer for Noop3 {
6045 type Call = ();
6046 type Reply = ();
6047 type Cast = ();
6048 type Info = SystemMsg;
6049
6050 fn handle_call(
6051 &mut self,
6052 _cx: &Cx,
6053 _req: (),
6054 reply: Reply<()>,
6055 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6056 let _ = reply.send(());
6057 Box::pin(async {})
6058 }
6059
6060 fn handle_cast(
6061 &mut self,
6062 _cx: &Cx,
6063 _msg: (),
6064 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6065 Box::pin(async {})
6066 }
6067 }
6068
6069 let now = crate::types::Time::ZERO;
6070 let (mut handle, stored) = scope
6071 .spawn_named_gen_server(
6072 &mut runtime.state,
6073 &cx,
6074 &mut registry,
6075 "take_then_stop",
6076 Noop3,
6077 8,
6078 now,
6079 )
6080 .unwrap();
6081 runtime.state.store_spawned_task(handle.task_id(), stored);
6082
6083 let mut lease = handle.take_lease().expect("lease should be present");
6084 assert_eq!(
6085 handle.handle.state.load(),
6086 ActorState::Created,
6087 "taking the lease alone must not stop the server"
6088 );
6089 assert!(
6090 matches!(
6091 handle.release_name(&mut registry, now),
6092 Err(ReleaseNameError::Lease(
6093 crate::cx::NameLeaseError::AlreadyResolved
6094 ))
6095 ),
6096 "release_name after take_lease must fail closed with AlreadyResolved",
6097 );
6098 assert_eq!(
6099 handle.handle.state.load(),
6100 ActorState::Created,
6101 "failed release_name after take_lease must not mutate actor state"
6102 );
6103 assert_eq!(
6104 registry.whereis("take_then_stop"),
6105 Some(handle.task_id()),
6106 "failed release_name after take_lease must not unregister the live name",
6107 );
6108 registry
6109 .unregister_owned_and_grant(&lease, now)
6110 .expect("manual cleanup should still be possible after failed release_name");
6111 lease.abort().unwrap();
6112
6113 crate::test_complete!(
6114 "named_server_release_name_after_take_lease_returns_already_resolved"
6115 );
6116 }
6117
6118 #[test]
6120 fn named_server_abort_lease_after_take_lease_returns_already_resolved() {
6121 crate::test_utils::init_test_logging();
6122 crate::test_phase!("named_server_abort_lease_after_take_lease_returns_already_resolved");
6123
6124 let budget = Budget::new().with_poll_quota(100_000);
6125 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6126 let region = runtime.state.create_root_region(budget);
6127 let cx = Cx::for_testing();
6128 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6129 let mut registry = crate::cx::NameRegistry::new();
6130
6131 #[allow(clippy::items_after_statements)]
6132 #[derive(Debug)]
6133 struct Noop4;
6134
6135 #[allow(clippy::items_after_statements)]
6136 impl GenServer for Noop4 {
6137 type Call = ();
6138 type Reply = ();
6139 type Cast = ();
6140 type Info = SystemMsg;
6141
6142 fn handle_call(
6143 &mut self,
6144 _cx: &Cx,
6145 _req: (),
6146 reply: Reply<()>,
6147 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6148 let _ = reply.send(());
6149 Box::pin(async {})
6150 }
6151
6152 fn handle_cast(
6153 &mut self,
6154 _cx: &Cx,
6155 _msg: (),
6156 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6157 Box::pin(async {})
6158 }
6159 }
6160
6161 let now = crate::types::Time::ZERO;
6162 let (mut handle, stored) = scope
6163 .spawn_named_gen_server(
6164 &mut runtime.state,
6165 &cx,
6166 &mut registry,
6167 "take_then_abort",
6168 Noop4,
6169 8,
6170 now,
6171 )
6172 .unwrap();
6173 runtime.state.store_spawned_task(handle.task_id(), stored);
6174
6175 let mut lease = handle.take_lease().expect("lease should be present");
6176 let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
6177 assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
6178 assert_eq!(
6179 registry.whereis("take_then_abort"),
6180 Some(handle.task_id()),
6181 "failed abort_lease after take_lease must not unregister the live name",
6182 );
6183
6184 registry
6185 .unregister_owned_and_grant(&lease, now)
6186 .expect("manual cleanup should still be possible after failed abort_lease");
6187 lease.abort().unwrap();
6188
6189 crate::test_complete!("named_server_abort_lease_after_take_lease_returns_already_resolved");
6190 }
6191
6192 #[test]
6194 fn named_server_abort_lease_after_release_name_returns_already_resolved() {
6195 crate::test_utils::init_test_logging();
6196 crate::test_phase!("named_server_abort_lease_after_release_name_returns_already_resolved");
6197
6198 let budget = Budget::new().with_poll_quota(100_000);
6199 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6200 let region = runtime.state.create_root_region(budget);
6201 let cx = Cx::for_testing();
6202 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6203 let mut registry = crate::cx::NameRegistry::new();
6204
6205 #[allow(clippy::items_after_statements)]
6206 #[derive(Debug)]
6207 struct Noop4;
6208
6209 #[allow(clippy::items_after_statements)]
6210 impl GenServer for Noop4 {
6211 type Call = ();
6212 type Reply = ();
6213 type Cast = ();
6214 type Info = SystemMsg;
6215
6216 fn handle_call(
6217 &mut self,
6218 _cx: &Cx,
6219 _req: (),
6220 reply: Reply<()>,
6221 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6222 let _ = reply.send(());
6223 Box::pin(async {})
6224 }
6225
6226 fn handle_cast(
6227 &mut self,
6228 _cx: &Cx,
6229 _msg: (),
6230 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6231 Box::pin(async {})
6232 }
6233 }
6234
6235 let now = crate::types::Time::ZERO;
6236 let (mut handle, stored) = scope
6237 .spawn_named_gen_server(
6238 &mut runtime.state,
6239 &cx,
6240 &mut registry,
6241 "stop_then_abort",
6242 Noop4,
6243 8,
6244 now,
6245 )
6246 .unwrap();
6247 runtime.state.store_spawned_task(handle.task_id(), stored);
6248
6249 handle.stop();
6250 {
6251 runtime.scheduler.lock().schedule(handle.task_id(), 0);
6252 }
6253 runtime.run_until_quiescent();
6254 let release_now = runtime.state.now;
6255 handle
6256 .release_name(&mut registry, release_now)
6257 .expect("initial release should succeed");
6258 let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
6259 assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
6260 assert!(
6261 registry.whereis("stop_then_abort").is_none(),
6262 "failed abort_lease after release_name must not mutate the registry entry",
6263 );
6264
6265 crate::test_complete!(
6266 "named_server_abort_lease_after_release_name_returns_already_resolved"
6267 );
6268 }
6269
6270 #[test]
6272 fn named_server_release_name_preserves_other_names_on_same_task() {
6273 crate::test_utils::init_test_logging();
6274 crate::test_phase!("named_server_release_name_preserves_other_names_on_same_task");
6275
6276 let budget = Budget::new().with_poll_quota(100_000);
6277 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6278 let region = runtime.state.create_root_region(budget);
6279 let cx = Cx::for_testing();
6280 let scope = crate::cx::Scope::<FailFast>::new(region, budget);
6281 let mut registry = crate::cx::NameRegistry::new();
6282
6283 #[allow(clippy::items_after_statements)]
6284 #[derive(Debug)]
6285 struct Noop5;
6286
6287 #[allow(clippy::items_after_statements)]
6288 impl GenServer for Noop5 {
6289 type Call = ();
6290 type Reply = ();
6291 type Cast = ();
6292 type Info = SystemMsg;
6293
6294 fn handle_call(
6295 &mut self,
6296 _cx: &Cx,
6297 _req: (),
6298 reply: Reply<()>,
6299 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6300 let _ = reply.send(());
6301 Box::pin(async {})
6302 }
6303
6304 fn handle_cast(
6305 &mut self,
6306 _cx: &Cx,
6307 _msg: (),
6308 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6309 Box::pin(async {})
6310 }
6311 }
6312
6313 let now = crate::types::Time::ZERO;
6314 let (mut handle, stored) = scope
6315 .spawn_named_gen_server(
6316 &mut runtime.state,
6317 &cx,
6318 &mut registry,
6319 "primary_name",
6320 Noop5,
6321 8,
6322 now,
6323 )
6324 .unwrap();
6325 runtime.state.store_spawned_task(handle.task_id(), stored);
6326 let mut alias = registry
6327 .register("secondary_name", handle.task_id(), scope.region_id(), now)
6328 .expect("second alias should register for same task");
6329
6330 handle.stop();
6331 {
6332 runtime.scheduler.lock().schedule(handle.task_id(), 0);
6333 }
6334 runtime.run_until_quiescent();
6335
6336 let release_now = runtime.state.now;
6337 handle
6338 .release_name(&mut registry, release_now)
6339 .expect("targeted release should succeed");
6340 assert!(
6341 registry.whereis("primary_name").is_none(),
6342 "release_name must remove the targeted registry entry",
6343 );
6344 assert_eq!(
6345 registry.whereis("secondary_name"),
6346 Some(handle.task_id()),
6347 "release_name must not remove unrelated names on the same task",
6348 );
6349
6350 registry
6351 .unregister_owned_and_grant(&alias, release_now)
6352 .expect("manual alias cleanup should succeed");
6353 alias.release().unwrap();
6354
6355 crate::test_complete!("named_server_release_name_preserves_other_names_on_same_task");
6356 }
6357
6358 #[test]
6359 #[allow(clippy::items_after_statements)]
6360 fn named_start_helper_supervisor_stop_cleans_registry() {
6361 crate::test_utils::init_test_logging();
6362 crate::test_phase!("named_start_helper_supervisor_stop_cleans_registry");
6363
6364 #[derive(Debug)]
6365 struct Noop;
6366
6367 impl GenServer for Noop {
6368 type Call = ();
6369 type Reply = ();
6370 type Cast = ();
6371 type Info = SystemMsg;
6372
6373 fn handle_call(
6374 &mut self,
6375 _cx: &Cx,
6376 _request: (),
6377 reply: Reply<()>,
6378 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6379 let _ = reply.send(());
6380 Box::pin(async {})
6381 }
6382
6383 fn handle_cast(
6384 &mut self,
6385 _cx: &Cx,
6386 _msg: (),
6387 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6388 Box::pin(async {})
6389 }
6390 }
6391
6392 let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
6393 Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
6394
6395 let budget = Budget::new().with_poll_quota(100_000);
6396 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
6397 let root = runtime.state.create_root_region(budget);
6398 let cx = Cx::for_testing();
6399
6400 let child = crate::supervision::ChildSpec::new(
6401 "svc_child",
6402 named_gen_server_start(Arc::clone(®istry), "svc", 16, || Noop),
6403 );
6404
6405 let compiled = crate::supervision::SupervisorBuilder::new("svc_supervisor")
6406 .child(child)
6407 .compile()
6408 .expect("compile supervisor");
6409
6410 let supervisor = compiled
6411 .spawn(&mut runtime.state, &cx, root, budget)
6412 .expect("spawn supervisor");
6413
6414 assert_eq!(supervisor.started.len(), 1, "exactly one started child");
6415 let child_task = supervisor.started[0].task_id;
6416 assert_eq!(registry.lock().whereis("svc"), Some(child_task));
6417
6418 let tasks_to_schedule = runtime.state.cancel_request(
6420 supervisor.region,
6421 &crate::types::CancelReason::user("stop"),
6422 None,
6423 );
6424 for (task_id, priority) in tasks_to_schedule {
6425 runtime.scheduler.lock().schedule(task_id, priority);
6426 }
6427 runtime.run_until_quiescent();
6428
6429 assert!(
6430 registry.lock().whereis("svc").is_none(),
6431 "name must be removed after supervised stop",
6432 );
6433
6434 crate::test_complete!("named_start_helper_supervisor_stop_cleans_registry");
6435 }
6436
6437 #[test]
6438 #[allow(clippy::items_after_statements)]
6439 fn named_start_helper_crash_then_stop_cleans_registry() {
6440 crate::test_utils::init_test_logging();
6441 crate::test_phase!("named_start_helper_crash_then_stop_cleans_registry");
6442
6443 #[derive(Debug)]
6444 struct PanicOnStart;
6445
6446 impl GenServer for PanicOnStart {
6447 type Call = ();
6448 type Reply = ();
6449 type Cast = ();
6450 type Info = SystemMsg;
6451
6452 fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6453 Box::pin(async move {
6454 std::panic::panic_any("intentional start crash for registry cleanup test");
6455 })
6456 }
6457
6458 fn handle_call(
6459 &mut self,
6460 _cx: &Cx,
6461 _request: (),
6462 reply: Reply<()>,
6463 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6464 let _ = reply.send(());
6465 Box::pin(async {})
6466 }
6467
6468 fn handle_cast(
6469 &mut self,
6470 _cx: &Cx,
6471 _msg: (),
6472 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
6473 Box::pin(async {})
6474 }
6475 }
6476
6477 let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
6478 Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
6479
6480 let budget = Budget::new().with_poll_quota(100_000);
6481 let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(7));
6482 let root = runtime.state.create_root_region(budget);
6483 let cx = Cx::for_testing();
6484
6485 let child = crate::supervision::ChildSpec::new(
6486 "panic_child",
6487 named_gen_server_start(Arc::clone(®istry), "panic_svc", 8, || PanicOnStart),
6488 );
6489
6490 let compiled = crate::supervision::SupervisorBuilder::new("panic_supervisor")
6491 .child(child)
6492 .compile()
6493 .expect("compile supervisor");
6494
6495 let supervisor = compiled
6496 .spawn(&mut runtime.state, &cx, root, budget)
6497 .expect("spawn supervisor");
6498
6499 let child_task = supervisor.started[0].task_id;
6500 assert_eq!(registry.lock().whereis("panic_svc"), Some(child_task));
6501
6502 {
6504 runtime.scheduler.lock().schedule(child_task, 0);
6505 }
6506 runtime.run_until_idle();
6507
6508 let tasks_to_schedule = runtime.state.cancel_request(
6510 supervisor.region,
6511 &crate::types::CancelReason::user("shutdown"),
6512 None,
6513 );
6514 for (task_id, priority) in tasks_to_schedule {
6515 runtime.scheduler.lock().schedule(task_id, priority);
6516 }
6517 runtime.run_until_quiescent();
6518
6519 assert!(
6520 registry.lock().whereis("panic_svc").is_none(),
6521 "name must be removed after crash + region stop",
6522 );
6523
6524 crate::test_complete!("named_start_helper_crash_then_stop_cleans_registry");
6525 }
6526
6527 #[test]
6532 fn cast_overflow_policy_default_is_reject() {
6533 let policy = CastOverflowPolicy::default();
6534 assert!(matches!(policy, CastOverflowPolicy::Reject));
6535 }
6536
6537 #[test]
6538 fn cast_overflow_policy_debug() {
6539 let dbg = format!("{:?}", CastOverflowPolicy::Reject);
6540 assert!(dbg.contains("Reject"), "{dbg}");
6541 let dbg2 = format!("{:?}", CastOverflowPolicy::DropOldest);
6542 assert!(dbg2.contains("DropOldest"), "{dbg2}");
6543 }
6544
6545 #[test]
6546 fn cast_overflow_policy_eq_clone_copy() {
6547 let a = CastOverflowPolicy::Reject;
6548 let b = a; let c = a;
6550 assert_eq!(a, b);
6551 assert_eq!(a, c);
6552 assert_ne!(CastOverflowPolicy::Reject, CastOverflowPolicy::DropOldest);
6553 }
6554
6555 #[test]
6556 fn down_msg_constructor_and_debug() {
6557 let mut monitors = crate::monitor::MonitorSet::new();
6558 let mref = monitors.establish(tid(50), rid(0), tid(51));
6559 let msg = DownMsg::new(
6560 Time::from_secs(7),
6561 DownNotification {
6562 monitored: tid(51),
6563 reason: DownReason::Normal,
6564 monitor_ref: mref,
6565 },
6566 );
6567 assert_eq!(msg.completion_vt, Time::from_secs(7));
6568 assert_eq!(msg.notification.monitored, tid(51));
6569 let dbg = format!("{msg:?}");
6570 assert!(dbg.contains("DownMsg"), "{dbg}");
6571 }
6572
6573 #[test]
6574 fn down_msg_clone() {
6575 let mut monitors = crate::monitor::MonitorSet::new();
6576 let mref = monitors.establish(tid(60), rid(0), tid(61));
6577 let msg = DownMsg::new(
6578 Time::from_secs(8),
6579 DownNotification {
6580 monitored: tid(61),
6581 reason: DownReason::Normal,
6582 monitor_ref: mref,
6583 },
6584 );
6585 let cloned = msg.clone();
6586 assert_eq!(cloned.completion_vt, msg.completion_vt);
6587 assert_eq!(cloned.notification.monitored, msg.notification.monitored);
6588 }
6589
6590 #[test]
6591 fn exit_msg_constructor_and_eq() {
6592 let a = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
6593 let b = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
6594 assert_eq!(a, b);
6595 }
6596
6597 #[test]
6598 fn exit_msg_debug_and_clone() {
6599 let msg = ExitMsg::new(
6600 Time::from_secs(6),
6601 tid(11),
6602 DownReason::Error("oops".into()),
6603 );
6604 let dbg = format!("{msg:?}");
6605 assert!(dbg.contains("ExitMsg"), "{dbg}");
6606 let cloned = msg.clone();
6607 assert_eq!(cloned, msg);
6608 }
6609
6610 #[test]
6611 fn exit_msg_inequality() {
6612 let a = ExitMsg::new(Time::from_secs(1), tid(1), DownReason::Normal);
6613 let b = ExitMsg::new(Time::from_secs(2), tid(1), DownReason::Normal);
6614 assert_ne!(a, b);
6615 }
6616
6617 #[test]
6618 fn timeout_msg_constructor_eq_copy() {
6619 let a = TimeoutMsg::new(Time::from_secs(10), 42);
6620 let b = a; let c = a;
6622 assert_eq!(a, b);
6623 assert_eq!(a, c);
6624 assert_eq!(a.tick_vt, Time::from_secs(10));
6625 assert_eq!(a.id, 42);
6626 }
6627
6628 #[test]
6629 fn timeout_msg_debug() {
6630 let msg = TimeoutMsg::new(Time::from_secs(1), 99);
6631 let dbg = format!("{msg:?}");
6632 assert!(dbg.contains("TimeoutMsg"), "{dbg}");
6633 }
6634
6635 #[test]
6636 fn timeout_msg_inequality() {
6637 let a = TimeoutMsg::new(Time::from_secs(1), 1);
6638 let b = TimeoutMsg::new(Time::from_secs(1), 2);
6639 assert_ne!(a, b);
6640 }
6641
6642 #[test]
6643 fn system_msg_debug_all_variants() {
6644 let mut monitors = crate::monitor::MonitorSet::new();
6645 let mref = monitors.establish(tid(70), rid(0), tid(71));
6646
6647 let down = SystemMsg::Down {
6648 completion_vt: Time::from_secs(1),
6649 notification: DownNotification {
6650 monitored: tid(71),
6651 reason: DownReason::Normal,
6652 monitor_ref: mref,
6653 },
6654 };
6655 let exit = SystemMsg::Exit {
6656 exit_vt: Time::from_secs(2),
6657 from: tid(72),
6658 reason: DownReason::Normal,
6659 };
6660 let timeout = SystemMsg::Timeout {
6661 tick_vt: Time::from_secs(3),
6662 id: 55,
6663 };
6664
6665 let d = format!("{down:?}");
6666 assert!(d.contains("Down"), "{d}");
6667 let e = format!("{exit:?}");
6668 assert!(e.contains("Exit"), "{e}");
6669 let t = format!("{timeout:?}");
6670 assert!(t.contains("Timeout"), "{t}");
6671 }
6672
6673 #[test]
6674 fn system_msg_clone() {
6675 let msg = SystemMsg::Timeout {
6676 tick_vt: Time::from_secs(5),
6677 id: 7,
6678 };
6679 let cloned = msg.clone();
6680 assert_eq!(cloned.sort_key(), msg.sort_key());
6681 }
6682
6683 #[test]
6684 fn system_msg_convenience_constructors() {
6685 let mut monitors = crate::monitor::MonitorSet::new();
6686 let mref = monitors.establish(tid(80), rid(0), tid(81));
6687
6688 let down_payload = DownMsg::new(
6689 Time::from_secs(1),
6690 DownNotification {
6691 monitored: tid(81),
6692 reason: DownReason::Normal,
6693 monitor_ref: mref,
6694 },
6695 );
6696 let msg = SystemMsg::down(down_payload);
6697 assert!(matches!(msg, SystemMsg::Down { .. }));
6698
6699 let exit_payload = ExitMsg::new(Time::from_secs(2), tid(82), DownReason::Normal);
6700 let msg = SystemMsg::exit(exit_payload);
6701 assert!(matches!(msg, SystemMsg::Exit { .. }));
6702
6703 let timeout_payload = TimeoutMsg::new(Time::from_secs(3), 44);
6704 let msg = SystemMsg::timeout(timeout_payload);
6705 assert!(matches!(msg, SystemMsg::Timeout { .. }));
6706 }
6707
6708 #[test]
6709 fn system_msg_sort_key_kind_rank_ordering() {
6710 let mut monitors = crate::monitor::MonitorSet::new();
6711 let mref = monitors.establish(tid(85), rid(0), tid(86));
6712
6713 let same_vt = Time::from_secs(100);
6714 let down = SystemMsg::Down {
6715 completion_vt: same_vt,
6716 notification: DownNotification {
6717 monitored: tid(86),
6718 reason: DownReason::Normal,
6719 monitor_ref: mref,
6720 },
6721 };
6722 let exit = SystemMsg::Exit {
6723 exit_vt: same_vt,
6724 from: tid(86),
6725 reason: DownReason::Normal,
6726 };
6727 let timeout = SystemMsg::Timeout {
6728 tick_vt: same_vt,
6729 id: 1,
6730 };
6731
6732 assert!(down.sort_key() < exit.sort_key());
6734 assert!(exit.sort_key() < timeout.sort_key());
6735 }
6736
6737 #[test]
6738 fn system_msg_subject_key_debug_eq_ord() {
6739 let a = SystemMsgSubjectKey::Task(tid(1));
6740 let b = SystemMsgSubjectKey::Task(tid(1));
6741 let c = SystemMsgSubjectKey::TimeoutId(1);
6742
6743 assert_eq!(a, b);
6744 assert_ne!(a, c);
6745
6746 let dbg = format!("{a:?}");
6747 assert!(dbg.contains("Task"), "{dbg}");
6748 let dbg2 = format!("{c:?}");
6749 assert!(dbg2.contains("TimeoutId"), "{dbg2}");
6750
6751 let copied = a;
6753 let cloned = a;
6754 assert_eq!(copied, cloned);
6755
6756 assert!(a <= b);
6758 }
6759
6760 #[test]
6761 fn system_msg_batch_default_and_empty() {
6762 let batch = SystemMsgBatch::new();
6763 let sorted = batch.into_sorted();
6764 assert!(sorted.is_empty());
6765 }
6766
6767 #[test]
6768 fn system_msg_batch_debug() {
6769 let batch = SystemMsgBatch::new();
6770 let dbg = format!("{batch:?}");
6771 assert!(dbg.contains("SystemMsgBatch"), "{dbg}");
6772 }
6773
6774 #[test]
6775 fn system_msg_batch_single_element() {
6776 let mut batch = SystemMsgBatch::new();
6777 batch.push(SystemMsg::Timeout {
6778 tick_vt: Time::from_secs(42),
6779 id: 1,
6780 });
6781 let sorted = batch.into_sorted();
6782 assert_eq!(sorted.len(), 1);
6783 assert!(matches!(sorted[0], SystemMsg::Timeout { id: 1, .. }));
6784 }
6785
6786 #[test]
6787 fn call_error_display_server_stopped() {
6788 let err = CallError::ServerStopped;
6789 let disp = format!("{err}");
6790 assert_eq!(disp, "GenServer has stopped");
6791 }
6792
6793 #[test]
6794 fn call_error_display_no_reply() {
6795 let err = CallError::NoReply;
6796 let disp = format!("{err}");
6797 assert_eq!(disp, "GenServer did not reply");
6798 }
6799
6800 #[test]
6801 fn call_error_display_cancelled() {
6802 let reason = CancelReason::user("test cancel");
6803 let err = CallError::Cancelled(reason);
6804 let disp = format!("{err}");
6805 assert!(disp.contains("cancelled"), "{disp}");
6806 }
6807
6808 #[test]
6809 fn call_error_debug_all_variants() {
6810 let dbg1 = format!("{:?}", CallError::ServerStopped);
6811 assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6812 let dbg2 = format!("{:?}", CallError::NoReply);
6813 assert!(dbg2.contains("NoReply"), "{dbg2}");
6814 let dbg3 = format!("{:?}", CallError::Cancelled(CancelReason::user("x")));
6815 assert!(dbg3.contains("Cancelled"), "{dbg3}");
6816 }
6817
6818 #[test]
6819 fn call_error_is_std_error() {
6820 let err = CallError::ServerStopped;
6821 let _: &dyn std::error::Error = &err;
6822 assert!(std::error::Error::source(&err).is_none());
6824 }
6825
6826 #[test]
6827 fn cast_error_display_all_variants() {
6828 assert_eq!(
6829 format!("{}", CastError::ServerStopped),
6830 "GenServer has stopped"
6831 );
6832 assert_eq!(format!("{}", CastError::Full), "GenServer mailbox full");
6833 let cancelled = CastError::Cancelled(CancelReason::user("t"));
6834 let disp = format!("{cancelled}");
6835 assert!(disp.contains("cancelled"), "{disp}");
6836 }
6837
6838 #[test]
6839 fn cast_error_debug_all_variants() {
6840 let dbg1 = format!("{:?}", CastError::ServerStopped);
6841 assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6842 let dbg2 = format!("{:?}", CastError::Full);
6843 assert!(dbg2.contains("Full"), "{dbg2}");
6844 }
6845
6846 #[test]
6847 fn cast_error_is_std_error() {
6848 let err = CastError::Full;
6849 let _: &dyn std::error::Error = &err;
6850 assert!(std::error::Error::source(&err).is_none());
6851 }
6852
6853 #[test]
6854 fn info_error_display_all_variants() {
6855 assert_eq!(
6856 format!("{}", InfoError::ServerStopped),
6857 "GenServer has stopped"
6858 );
6859 assert_eq!(format!("{}", InfoError::Full), "GenServer mailbox full");
6860 let cancelled = InfoError::Cancelled(CancelReason::user("u"));
6861 let disp = format!("{cancelled}");
6862 assert!(disp.contains("cancelled"), "{disp}");
6863 }
6864
6865 #[test]
6866 fn info_error_debug_all_variants() {
6867 let dbg1 = format!("{:?}", InfoError::ServerStopped);
6868 assert!(dbg1.contains("ServerStopped"), "{dbg1}");
6869 let dbg2 = format!("{:?}", InfoError::Full);
6870 assert!(dbg2.contains("Full"), "{dbg2}");
6871 }
6872
6873 #[test]
6874 fn info_error_is_std_error() {
6875 let err = InfoError::ServerStopped;
6876 let _: &dyn std::error::Error = &err;
6877 assert!(std::error::Error::source(&err).is_none());
6878 }
6879
6880 #[test]
6881 fn system_msg_try_from_exit_rejects_timeout() {
6882 let timeout = SystemMsg::Timeout {
6883 tick_vt: Time::from_secs(1),
6884 id: 7,
6885 };
6886 let err = ExitMsg::try_from(timeout).expect_err("timeout is not exit");
6887 assert!(matches!(err, SystemMsg::Timeout { id: 7, .. }));
6888 }
6889
6890 #[test]
6891 fn system_msg_try_from_exit_succeeds() {
6892 let exit = SystemMsg::Exit {
6893 exit_vt: Time::from_secs(3),
6894 from: tid(15),
6895 reason: DownReason::Normal,
6896 };
6897 let result = ExitMsg::try_from(exit).expect("exit conversion");
6898 assert_eq!(result.exit_vt, Time::from_secs(3));
6899 assert_eq!(result.from, tid(15));
6900 }
6901
6902 #[test]
6903 fn system_msg_try_from_timeout_rejects_exit() {
6904 let exit = SystemMsg::Exit {
6905 exit_vt: Time::from_secs(1),
6906 from: tid(1),
6907 reason: DownReason::Normal,
6908 };
6909 let err = TimeoutMsg::try_from(exit).expect_err("exit is not timeout");
6910 assert!(matches!(err, SystemMsg::Exit { .. }));
6911 }
6912}