use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use crate::actor::{ActorId, ActorState};
use crate::channel::mpsc;
use crate::channel::oneshot;
use crate::channel::session::{self, TrackedOneshotPermit};
use crate::cx::Cx;
use crate::monitor::{DownNotification, DownReason};
use crate::obligation::graded::{AbortedProof, CommittedProof, SendPermit};
use crate::runtime::{JoinError, SpawnError};
use crate::types::{Budget, CancelReason, CxInner, Outcome, TaskId, Time};
struct PhaseBudgetGuard {
inner: Arc<parking_lot::RwLock<CxInner>>,
original_budget: Budget,
original_baseline: Budget,
phase_baseline: Budget,
restore_original: bool,
}
impl PhaseBudgetGuard {
fn enter(cx: &Cx, phase_budget: Budget, restore_original: bool) -> Self {
let inner = Arc::clone(&cx.inner);
let (original_budget, original_baseline, phase_baseline) = {
let mut guard = inner.write();
let original_budget = guard.budget;
let original_baseline = guard.budget_baseline;
let mut phase_baseline = original_budget.meet(phase_budget);
phase_baseline.priority = original_budget.priority.max(phase_budget.priority);
guard.budget = phase_baseline;
guard.budget_baseline = phase_baseline;
drop(guard);
(original_budget, original_baseline, phase_baseline)
};
Self {
inner,
original_budget,
original_baseline,
phase_baseline,
restore_original,
}
}
}
impl Drop for PhaseBudgetGuard {
fn drop(&mut self) {
if !self.restore_original {
return;
}
let mut guard = self.inner.write();
let phase_remaining = guard.budget;
let polls_used = self
.phase_baseline
.poll_quota
.saturating_sub(phase_remaining.poll_quota);
let cost_used = match (self.phase_baseline.cost_quota, phase_remaining.cost_quota) {
(Some(base), Some(rem)) => base.saturating_sub(rem),
_ => 0,
};
let restored_cost_quota = self
.original_budget
.cost_quota
.map(|orig| orig.saturating_sub(cost_used));
guard.budget = Budget {
deadline: self.original_budget.deadline,
poll_quota: self.original_budget.poll_quota.saturating_sub(polls_used),
cost_quota: restored_cost_quota,
priority: self.original_budget.priority,
};
guard.budget_baseline = self.original_baseline;
}
}
struct AsyncMaskGuard {
inner: Arc<parking_lot::RwLock<CxInner>>,
}
impl AsyncMaskGuard {
fn enter(cx: &Cx) -> Self {
let inner = Arc::clone(&cx.inner);
{
let mut guard = inner.write();
assert!(
guard.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
"mask depth exceeded MAX_MASK_DEPTH ({}) in AsyncMaskGuard::enter: \
this violates INV-MASK-BOUNDED and prevents cancellation from ever \
being observed. Reduce nesting of masked sections.",
crate::types::task_context::MAX_MASK_DEPTH
);
guard.mask_depth += 1;
}
Self { inner }
}
}
impl Drop for AsyncMaskGuard {
fn drop(&mut self) {
let mut guard = self.inner.write();
guard.mask_depth = guard.mask_depth.saturating_sub(1);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CastOverflowPolicy {
#[default]
Reject,
DropOldest,
}
impl std::fmt::Display for CastOverflowPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Reject => write!(f, "Reject"),
Self::DropOldest => write!(f, "DropOldest"),
}
}
}
#[derive(Debug, Clone)]
pub struct DownMsg {
pub completion_vt: Time,
pub notification: DownNotification,
}
impl DownMsg {
#[must_use]
pub const fn new(completion_vt: Time, notification: DownNotification) -> Self {
Self {
completion_vt,
notification,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExitMsg {
pub exit_vt: Time,
pub from: TaskId,
pub reason: DownReason,
}
impl ExitMsg {
#[must_use]
pub const fn new(exit_vt: Time, from: TaskId, reason: DownReason) -> Self {
Self {
exit_vt,
from,
reason,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TimeoutMsg {
pub tick_vt: Time,
pub id: u64,
}
impl TimeoutMsg {
#[must_use]
pub const fn new(tick_vt: Time, id: u64) -> Self {
Self { tick_vt, id }
}
}
#[derive(Debug, Clone)]
pub enum SystemMsg {
Down {
completion_vt: Time,
notification: DownNotification,
},
Exit {
exit_vt: Time,
from: TaskId,
reason: DownReason,
},
Timeout {
tick_vt: Time,
id: u64,
},
}
impl SystemMsg {
#[must_use]
pub fn down(msg: DownMsg) -> Self {
msg.into()
}
#[must_use]
pub fn exit(msg: ExitMsg) -> Self {
msg.into()
}
#[must_use]
pub fn timeout(msg: TimeoutMsg) -> Self {
msg.into()
}
const fn vt(&self) -> Time {
match self {
Self::Down { completion_vt, .. } => *completion_vt,
Self::Exit { exit_vt, .. } => *exit_vt,
Self::Timeout { tick_vt, .. } => *tick_vt,
}
}
const fn kind_rank(&self) -> u8 {
match self {
Self::Down { .. } => 0,
Self::Exit { .. } => 1,
Self::Timeout { .. } => 2,
}
}
const fn subject_key(&self) -> SystemMsgSubjectKey {
match self {
Self::Down { notification, .. } => SystemMsgSubjectKey::Task(notification.monitored),
Self::Exit { from, .. } => SystemMsgSubjectKey::Task(*from),
Self::Timeout { id, .. } => SystemMsgSubjectKey::TimeoutId(*id),
}
}
#[must_use]
pub const fn sort_key(&self) -> (Time, u8, SystemMsgSubjectKey) {
(self.vt(), self.kind_rank(), self.subject_key())
}
}
impl From<DownMsg> for SystemMsg {
fn from(value: DownMsg) -> Self {
Self::Down {
completion_vt: value.completion_vt,
notification: value.notification,
}
}
}
impl From<ExitMsg> for SystemMsg {
fn from(value: ExitMsg) -> Self {
Self::Exit {
exit_vt: value.exit_vt,
from: value.from,
reason: value.reason,
}
}
}
impl From<TimeoutMsg> for SystemMsg {
fn from(value: TimeoutMsg) -> Self {
Self::Timeout {
tick_vt: value.tick_vt,
id: value.id,
}
}
}
impl TryFrom<SystemMsg> for DownMsg {
type Error = SystemMsg;
fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
match value {
SystemMsg::Down {
completion_vt,
notification,
} => Ok(Self {
completion_vt,
notification,
}),
other => Err(other),
}
}
}
impl TryFrom<SystemMsg> for ExitMsg {
type Error = SystemMsg;
fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
match value {
SystemMsg::Exit {
exit_vt,
from,
reason,
} => Ok(Self {
exit_vt,
from,
reason,
}),
other => Err(other),
}
}
}
impl TryFrom<SystemMsg> for TimeoutMsg {
type Error = SystemMsg;
fn try_from(value: SystemMsg) -> Result<Self, Self::Error> {
match value {
SystemMsg::Timeout { tick_vt, id } => Ok(Self { tick_vt, id }),
other => Err(other),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SystemMsgSubjectKey {
Task(TaskId),
TimeoutId(u64),
}
#[derive(Debug, Default)]
pub struct SystemMsgBatch {
entries: Vec<SystemMsg>,
}
impl SystemMsgBatch {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, msg: SystemMsg) {
self.entries.push(msg);
}
#[must_use]
pub fn into_sorted(mut self) -> Vec<SystemMsg> {
self.entries.sort_by_key(SystemMsg::sort_key);
self.entries
}
}
pub trait GenServer: Send + 'static {
type Call: Send + 'static;
type Reply: Send + 'static;
type Cast: Send + 'static;
type Info: Send + 'static;
fn handle_call(
&mut self,
cx: &Cx,
request: Self::Call,
reply: Reply<Self::Reply>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: Self::Cast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
fn handle_info(
&mut self,
_cx: &Cx,
_msg: Self::Info,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
fn on_start_budget(&self) -> Budget {
Budget::INFINITE
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
fn on_stop_budget(&self) -> Budget {
Budget::MINIMAL
}
fn cast_overflow_policy(&self) -> CastOverflowPolicy {
CastOverflowPolicy::Reject
}
}
pub struct Reply<R> {
cx: Cx,
permit: Option<TrackedOneshotPermit<R>>,
}
impl<R: Send + 'static> Reply<R> {
fn new(cx: &Cx, permit: TrackedOneshotPermit<R>) -> Self {
Self {
cx: cx.clone(),
permit: Some(permit),
}
}
pub fn send(mut self, value: R) -> ReplyOutcome {
let permit = self
.permit
.take()
.expect("Reply::send called after reply was already consumed");
match permit.send(value) {
Ok(proof) => {
self.cx.trace("gen_server::reply_committed");
ReplyOutcome::Committed(proof)
}
Err(_send_err) => {
self.cx.trace("gen_server::reply_caller_gone");
ReplyOutcome::CallerGone
}
}
}
#[must_use]
pub fn abort(mut self) -> AbortedProof<SendPermit> {
self.cx.trace("gen_server::reply_aborted");
self.permit
.take()
.expect("Reply::abort called after reply was already consumed")
.abort()
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.permit
.as_ref()
.is_some_and(TrackedOneshotPermit::is_closed)
}
}
impl<R> Drop for Reply<R> {
fn drop(&mut self) {
let Some(permit) = self.permit.take() else {
return;
};
if std::thread::panicking() {
let _ = permit.abort();
} else {
drop(permit);
}
}
}
impl<R> std::fmt::Debug for Reply<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Reply")
.field("pending", &self.permit.is_some())
.finish_non_exhaustive()
}
}
pub enum ReplyOutcome {
Committed(CommittedProof<SendPermit>),
CallerGone,
}
impl std::fmt::Debug for ReplyOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Committed(_) => f.debug_tuple("Committed").finish(),
Self::CallerGone => write!(f, "CallerGone"),
}
}
}
enum Envelope<S: GenServer> {
Call {
request: S::Call,
reply_permit: TrackedOneshotPermit<S::Reply>,
},
Cast {
msg: S::Cast,
},
Info {
msg: S::Info,
},
}
impl<S: GenServer> std::fmt::Debug for Envelope<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Call { .. } => f.debug_struct("Envelope::Call").finish_non_exhaustive(),
Self::Cast { .. } => f.debug_struct("Envelope::Cast").finish_non_exhaustive(),
Self::Info { .. } => f.debug_struct("Envelope::Info").finish_non_exhaustive(),
}
}
}
struct GenServerCell<S: GenServer> {
mailbox: mpsc::Receiver<Envelope<S>>,
state: Arc<GenServerStateCell>,
_keep_alive: mpsc::Sender<Envelope<S>>,
}
#[derive(Debug)]
struct GenServerStateCell {
state: AtomicU8,
}
impl GenServerStateCell {
fn new(state: ActorState) -> Self {
Self {
state: AtomicU8::new(encode_actor_state(state)),
}
}
fn load(&self) -> ActorState {
decode_actor_state(self.state.load(Ordering::Acquire))
}
fn store(&self, state: ActorState) {
self.state
.store(encode_actor_state(state), Ordering::Release);
}
}
const fn encode_actor_state(state: ActorState) -> u8 {
match state {
ActorState::Created => 0,
ActorState::Running => 1,
ActorState::Stopping => 2,
ActorState::Stopped => 3,
}
}
const fn decode_actor_state(value: u8) -> ActorState {
match value {
0 => ActorState::Created,
1 => ActorState::Running,
2 => ActorState::Stopping,
_ => ActorState::Stopped,
}
}
#[derive(Debug)]
pub struct GenServerHandle<S: GenServer> {
actor_id: ActorId,
sender: mpsc::Sender<Envelope<S>>,
state: Arc<GenServerStateCell>,
task_id: TaskId,
receiver: oneshot::Receiver<Result<S, JoinError>>,
inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
completed: bool,
overflow_policy: CastOverflowPolicy,
}
#[derive(Debug)]
pub enum CallError {
ServerStopped,
NoReply,
Cancelled(CancelReason),
}
impl std::fmt::Display for CallError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ServerStopped => write!(f, "GenServer has stopped"),
Self::NoReply => write!(f, "GenServer did not reply"),
Self::Cancelled(reason) => write!(f, "GenServer call cancelled: {reason}"),
}
}
}
impl std::error::Error for CallError {}
#[derive(Debug)]
pub enum CastError {
ServerStopped,
Full,
Cancelled(CancelReason),
}
impl std::fmt::Display for CastError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ServerStopped => write!(f, "GenServer has stopped"),
Self::Full => write!(f, "GenServer mailbox full"),
Self::Cancelled(reason) => write!(f, "GenServer cast cancelled: {reason}"),
}
}
}
impl std::error::Error for CastError {}
#[derive(Debug)]
pub enum InfoError {
ServerStopped,
Full,
Cancelled(CancelReason),
}
impl std::fmt::Display for InfoError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ServerStopped => write!(f, "GenServer has stopped"),
Self::Full => write!(f, "GenServer mailbox full"),
Self::Cancelled(reason) => write!(f, "GenServer info cancelled: {reason}"),
}
}
}
impl std::error::Error for InfoError {}
impl<S: GenServer> GenServerHandle<S> {
pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
if cx.checkpoint().is_err() {
cx.trace("gen_server::call_rejected_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CallError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::call_rejected_stopped");
return Err(CallError::ServerStopped);
}
let send_permit = match self.sender.reserve(cx).await {
Ok(p) => p,
Err(e) => {
let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
if was_cancelled {
cx.trace("gen_server::call_send_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CallError::Cancelled(reason));
}
cx.trace("gen_server::call_send_failed");
return Err(CallError::ServerStopped);
}
};
let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
let envelope: Envelope<S> = Envelope::Call {
request,
reply_permit,
};
if let Err(e) = send_permit.try_send(envelope) {
let envelope = match e {
mpsc::SendError::Disconnected(v)
| mpsc::SendError::Full(v)
| mpsc::SendError::Cancelled(v) => v,
};
if let Envelope::Call { reply_permit, .. } = envelope {
let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
}
cx.trace("gen_server::call_send_failed");
return Err(CallError::ServerStopped);
}
cx.trace("gen_server::call_enqueued");
match reply_rx.recv(cx).await {
Ok(v) => Ok(v),
Err(oneshot::RecvError::Closed) => {
cx.trace("gen_server::call_no_reply");
Err(CallError::NoReply)
}
Err(oneshot::RecvError::Cancelled) => {
cx.trace("gen_server::call_reply_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
Err(CallError::Cancelled(reason))
}
Err(oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("GenServer call awaits a fresh reply oneshot recv future")
}
}
}
pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
if cx.checkpoint().is_err() {
cx.trace("gen_server::cast_rejected_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CastError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::cast_rejected_stopped");
return Err(CastError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Cast { msg };
self.sender.send(cx, envelope).await.map_err(|e| match e {
mpsc::SendError::Cancelled(_) => {
cx.trace("gen_server::cast_send_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
CastError::Cancelled(reason)
}
mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
cx.trace("gen_server::cast_send_failed");
CastError::ServerStopped
}
})
}
pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
return Err(CastError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Cast { msg };
match self.overflow_policy {
CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
CastError::ServerStopped
}
mpsc::SendError::Full(_) => CastError::Full,
}),
CastOverflowPolicy::DropOldest => {
match self.sender.send_evict_oldest_where(envelope, |queued| {
matches!(queued, Envelope::Cast { .. })
}) {
Ok(Some(_evicted)) => {
if let Some(cx) = Cx::current() {
cx.trace("gen_server::cast_evicted_oldest");
}
Ok(())
}
Ok(None) => Ok(()),
Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
Err(CastError::ServerStopped)
}
Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
}
}
}
}
pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
if cx.checkpoint().is_err() {
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(InfoError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::info_rejected_stopped");
return Err(InfoError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Info { msg };
self.sender.send(cx, envelope).await.map_err(|e| match e {
mpsc::SendError::Cancelled(_) => {
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
InfoError::Cancelled(reason)
}
mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
mpsc::SendError::Full(_) => InfoError::Full,
})
}
pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
return Err(InfoError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Info { msg };
self.sender.try_send(envelope).map_err(|e| match e {
mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
InfoError::ServerStopped
}
mpsc::SendError::Full(_) => InfoError::Full,
})
}
#[inline]
#[must_use]
pub fn cast_overflow_policy(&self) -> CastOverflowPolicy {
self.overflow_policy
}
#[inline]
#[must_use]
pub const fn actor_id(&self) -> ActorId {
self.actor_id
}
#[inline]
#[must_use]
pub fn task_id(&self) -> TaskId {
self.task_id
}
#[inline]
#[must_use]
pub fn is_finished(&self) -> bool {
self.completed || self.receiver.is_ready() || self.receiver.is_closed()
}
pub fn stop(&self) {
self.state.store(ActorState::Stopping);
self.sender.wake_receiver();
}
pub fn abort(&self) {
self.state.store(ActorState::Stopping);
if let Some(inner) = self.inner.upgrade() {
let cancel_waker = {
let mut guard = inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if guard.cancel_reason.is_none() {
guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
}
guard.cancel_waker.clone()
};
if let Some(waker) = cancel_waker {
waker.wake_by_ref();
}
}
self.sender.wake_receiver();
}
pub fn join<'a>(&'a mut self, _cx: &'a Cx) -> GenServerJoinFuture<'a, S> {
let cx_inner = self.inner.clone();
let receiver = &mut self.receiver;
let terminal_state = &mut self.completed;
GenServerJoinFuture {
inner: receiver.recv_uninterruptible(),
cx_inner,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
terminal_state,
drop_abort_defused: false,
}
}
}
pub struct GenServerJoinFuture<'a, S: GenServer> {
inner: oneshot::RecvUninterruptibleFuture<'a, Result<S, JoinError>>,
cx_inner: std::sync::Weak<parking_lot::RwLock<CxInner>>,
sender: mpsc::Sender<Envelope<S>>,
state: Arc<GenServerStateCell>,
terminal_state: &'a mut bool,
drop_abort_defused: bool,
}
impl<S: GenServer> GenServerJoinFuture<'_, S> {
fn closed_reason(&self) -> crate::types::CancelReason {
self.cx_inner
.upgrade()
.and_then(|inner| inner.read().cancel_reason.clone())
.unwrap_or_else(|| crate::types::CancelReason::user("join channel closed"))
}
fn abort(&self) {
self.state.store(ActorState::Stopping);
if let Some(inner) = self.cx_inner.upgrade() {
let cancel_waker = {
let mut guard = inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if guard.cancel_reason.is_none() {
guard.cancel_reason = Some(crate::types::CancelReason::user("server aborted"));
}
guard.cancel_waker.clone()
};
if let Some(waker) = cancel_waker {
waker.wake_by_ref();
}
}
self.sender.wake_receiver();
}
}
impl<S: GenServer> std::future::Future for GenServerJoinFuture<'_, S> {
type Output = Result<S, JoinError>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = &mut *self;
if *this.terminal_state {
return std::task::Poll::Ready(Err(JoinError::PolledAfterCompletion));
}
match std::pin::Pin::new(&mut this.inner).poll(cx) {
std::task::Poll::Ready(Ok(res)) => {
*this.terminal_state = true;
this.drop_abort_defused = true;
std::task::Poll::Ready(res)
}
std::task::Poll::Ready(Err(oneshot::RecvError::Closed)) => {
*this.terminal_state = true;
this.drop_abort_defused = true;
let reason = this.closed_reason();
std::task::Poll::Ready(Err(JoinError::Cancelled(reason)))
}
std::task::Poll::Ready(Err(oneshot::RecvError::Cancelled)) => {
unreachable!("RecvUninterruptibleFuture cannot return Cancelled");
}
std::task::Poll::Ready(Err(oneshot::RecvError::PolledAfterCompletion)) => {
unreachable!(
"JoinFuture guards repolls before polling the inner oneshot recv future"
)
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
impl<S: GenServer> Drop for GenServerJoinFuture<'_, S> {
fn drop(&mut self) {
if !*self.terminal_state && !self.drop_abort_defused {
if self.inner.receiver_finished() {
return;
}
self.abort();
}
}
}
#[derive(Debug)]
pub struct GenServerRef<S: GenServer> {
actor_id: ActorId,
sender: mpsc::Sender<Envelope<S>>,
state: Arc<GenServerStateCell>,
overflow_policy: CastOverflowPolicy,
}
impl<S: GenServer> Clone for GenServerRef<S> {
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
overflow_policy: self.overflow_policy,
}
}
}
impl<S: GenServer> GenServerRef<S> {
#[must_use]
pub const fn cast_overflow_policy(&self) -> CastOverflowPolicy {
self.overflow_policy
}
pub async fn call(&self, cx: &Cx, request: S::Call) -> Result<S::Reply, CallError> {
if cx.checkpoint().is_err() {
cx.trace("gen_server::call_rejected_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CallError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::call_rejected_stopped");
return Err(CallError::ServerStopped);
}
let send_permit = match self.sender.reserve(cx).await {
Ok(p) => p,
Err(e) => {
let was_cancelled = matches!(e, mpsc::SendError::Cancelled(()));
if was_cancelled {
cx.trace("gen_server::call_send_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CallError::Cancelled(reason));
}
cx.trace("gen_server::call_send_failed");
return Err(CallError::ServerStopped);
}
};
let (reply_tx, mut reply_rx) = session::tracked_oneshot::<S::Reply>();
let reply_permit: session::TrackedOneshotPermit<S::Reply> = reply_tx.reserve(cx);
let envelope: Envelope<S> = Envelope::Call {
request,
reply_permit,
};
if let Err(e) = send_permit.try_send(envelope) {
let envelope = match e {
mpsc::SendError::Disconnected(v)
| mpsc::SendError::Full(v)
| mpsc::SendError::Cancelled(v) => v,
};
if let Envelope::Call { reply_permit, .. } = envelope {
let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
}
cx.trace("gen_server::call_send_failed");
return Err(CallError::ServerStopped);
}
cx.trace("gen_server::call_enqueued");
match reply_rx.recv(cx).await {
Ok(v) => Ok(v),
Err(oneshot::RecvError::Closed) => {
cx.trace("gen_server::call_no_reply");
Err(CallError::NoReply)
}
Err(oneshot::RecvError::Cancelled) => {
cx.trace("gen_server::call_reply_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
Err(CallError::Cancelled(reason))
}
Err(oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("GenServerRef::call awaits a fresh reply oneshot recv future")
}
}
}
pub async fn cast(&self, cx: &Cx, msg: S::Cast) -> Result<(), CastError> {
if cx.checkpoint().is_err() {
cx.trace("gen_server::cast_rejected_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(CastError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::cast_rejected_stopped");
return Err(CastError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Cast { msg };
self.sender.send(cx, envelope).await.map_err(|e| match e {
mpsc::SendError::Cancelled(_) => {
cx.trace("gen_server::cast_send_cancelled");
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
CastError::Cancelled(reason)
}
mpsc::SendError::Disconnected(_) | mpsc::SendError::Full(_) => {
cx.trace("gen_server::cast_send_failed");
CastError::ServerStopped
}
})
}
pub fn try_cast(&self, msg: S::Cast) -> Result<(), CastError> {
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
return Err(CastError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Cast { msg };
match self.overflow_policy {
CastOverflowPolicy::Reject => self.sender.try_send(envelope).map_err(|e| match e {
mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
CastError::ServerStopped
}
mpsc::SendError::Full(_) => CastError::Full,
}),
CastOverflowPolicy::DropOldest => match self
.sender
.send_evict_oldest_where(envelope, |queued| matches!(queued, Envelope::Cast { .. }))
{
Ok(Some(evicted)) => {
debug_assert!(matches!(evicted, Envelope::Cast { .. }));
if let Some(cx) = Cx::current() {
cx.trace("gen_server::cast_evicted_oldest");
}
Ok(())
}
Ok(None) => Ok(()),
Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
Err(CastError::ServerStopped)
}
Err(mpsc::SendError::Full(_)) => Err(CastError::Full),
},
}
}
pub async fn info(&self, cx: &Cx, msg: S::Info) -> Result<(), InfoError> {
if cx.checkpoint().is_err() {
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
return Err(InfoError::Cancelled(reason));
}
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
cx.trace("gen_server::info_rejected_stopped");
return Err(InfoError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Info { msg };
self.sender.send(cx, envelope).await.map_err(|e| match e {
mpsc::SendError::Cancelled(_) => {
let reason = cx
.cancel_reason()
.unwrap_or_else(crate::types::CancelReason::parent_cancelled);
InfoError::Cancelled(reason)
}
mpsc::SendError::Disconnected(_) => InfoError::ServerStopped,
mpsc::SendError::Full(_) => InfoError::Full,
})
}
pub fn try_info(&self, msg: S::Info) -> Result<(), InfoError> {
if matches!(
self.state.load(),
ActorState::Stopping | ActorState::Stopped
) {
return Err(InfoError::ServerStopped);
}
let envelope: Envelope<S> = Envelope::Info { msg };
self.sender.try_send(envelope).map_err(|e| match e {
mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_) => {
InfoError::ServerStopped
}
mpsc::SendError::Full(_) => InfoError::Full,
})
}
#[inline]
#[must_use]
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
#[inline]
#[must_use]
pub fn is_alive(&self) -> bool {
self.state.load() != ActorState::Stopped
}
#[inline]
#[must_use]
pub const fn actor_id(&self) -> ActorId {
self.actor_id
}
}
impl<S: GenServer> GenServerHandle<S> {
#[inline]
#[must_use]
pub fn server_ref(&self) -> GenServerRef<S> {
GenServerRef {
actor_id: self.actor_id,
sender: self.sender.clone(),
state: Arc::clone(&self.state),
overflow_policy: self.overflow_policy,
}
}
}
pub const DEFAULT_GENSERVER_MAILBOX_CAPACITY: usize = 64;
async fn run_gen_server_loop<S: GenServer>(
mut server: S,
cx: Cx,
cell: &mut GenServerCell<S>,
) -> S {
use crate::tracing_compat::debug;
if cell.state.load() != ActorState::Stopping {
cell.state.store(ActorState::Running);
}
if cx.checkpoint().is_err() || cell.state.load() == ActorState::Stopping {
cx.trace("gen_server::init_skipped_cancelled");
} else {
cx.trace("gen_server::init");
let _budget = PhaseBudgetGuard::enter(&cx, server.on_start_budget(), true);
server.on_start(&cx).await;
}
loop {
if cx.checkpoint().is_err() {
cx.trace("gen_server::cancel_requested");
break;
}
let recv_result = std::future::poll_fn(|task_cx| {
match cell.mailbox.poll_recv(&cx, task_cx) {
std::task::Poll::Pending if cell.state.load() == ActorState::Stopping => {
std::task::Poll::Ready(Err(crate::channel::mpsc::RecvError::Disconnected))
}
other => other,
}
})
.await;
match recv_result {
Ok(envelope) => {
dispatch_envelope(&mut server, &cx, envelope).await;
}
Err(crate::channel::mpsc::RecvError::Disconnected) => {
cx.trace("gen_server::mailbox_disconnected");
break;
}
Err(crate::channel::mpsc::RecvError::Cancelled) => {
cx.trace("gen_server::recv_cancelled");
break;
}
Err(crate::channel::mpsc::RecvError::Empty) => {
break;
}
}
}
cell.state.store(ActorState::Stopping);
let _budget = PhaseBudgetGuard::enter(&cx, server.on_stop_budget(), false);
let _mask = AsyncMaskGuard::enter(&cx);
let is_aborted = cx.checkpoint().is_err();
cell.mailbox.close();
let mut drained: u64 = 0;
while let Ok(envelope) = cell.mailbox.try_recv() {
match envelope {
Envelope::Call {
request: _,
reply_permit,
} => {
let _aborted = session::TrackedOneshotPermit::abort(reply_permit);
cx.trace("gen_server::drain_abort_call");
}
Envelope::Cast { msg } => {
if !is_aborted {
server.handle_cast(&cx, msg).await;
}
}
Envelope::Info { msg } => {
if !is_aborted {
server.handle_info(&cx, msg).await;
}
}
}
drained += 1;
}
if drained > 0 {
debug!(drained = drained, "gen_server::mailbox_drained");
cx.trace("gen_server::mailbox_drained");
}
cx.trace("gen_server::terminate");
server.on_stop(&cx).await;
server
}
async fn dispatch_envelope<S: GenServer>(server: &mut S, cx: &Cx, envelope: Envelope<S>) {
match envelope {
Envelope::Call {
request,
reply_permit,
} => {
let reply = Reply::<S::Reply>::new(cx, reply_permit);
server.handle_call(cx, request, reply).await;
}
Envelope::Cast { msg } => {
server.handle_cast(cx, msg).await;
}
Envelope::Info { msg } => {
server.handle_info(cx, msg).await;
}
}
}
impl<P: crate::types::Policy> crate::cx::Scope<'_, P> {
pub fn spawn_gen_server<S: GenServer>(
&self,
state: &mut crate::runtime::state::RuntimeState,
cx: &Cx,
server: S,
mailbox_capacity: usize,
) -> Result<(GenServerHandle<S>, crate::runtime::stored_task::StoredTask), SpawnError> {
use crate::cx::scope::CatchUnwind;
use crate::runtime::stored_task::StoredTask;
use crate::tracing_compat::{debug, debug_span};
let overflow_policy = server.cast_overflow_policy();
let (msg_tx, msg_rx) = mpsc::channel::<Envelope<S>>(mailbox_capacity);
let (result_tx, result_rx) = oneshot::channel::<Result<S, JoinError>>();
let task_id = self.create_task_record(state)?;
let actor_id = ActorId::from_task(task_id);
let server_state = Arc::new(GenServerStateCell::new(ActorState::Created));
let _span = debug_span!(
"gen_server_spawn",
task_id = ?task_id,
region_id = ?self.region_id(),
mailbox_capacity = mailbox_capacity,
)
.entered();
debug!(
task_id = ?task_id,
region_id = ?self.region_id(),
mailbox_capacity = mailbox_capacity,
"gen_server spawned"
);
let (child_cx, child_cx_full) = self.build_child_task_cx(state, cx, task_id);
if let Some(record) = state.task_mut(task_id) {
record.set_cx_inner(child_cx.inner.clone());
record.set_cx(child_cx_full.clone());
}
let cx_for_send = child_cx_full;
let inner_weak = Arc::downgrade(&child_cx.inner);
let state_for_task = Arc::clone(&server_state);
let mut cell = GenServerCell {
mailbox: msg_rx,
state: Arc::clone(&server_state),
_keep_alive: msg_tx.clone(),
};
let wrapped = async move {
let result = CatchUnwind {
inner: Box::pin(run_gen_server_loop(server, child_cx, &mut cell)),
}
.await;
match result {
Ok(server_final) => {
let _ = result_tx.send(&cx_for_send, Ok(server_final));
}
Err(payload) => {
let msg = crate::cx::scope::payload_to_string(&payload);
let _ = result_tx.send(
&cx_for_send,
Err(JoinError::Panicked(crate::types::PanicPayload::new(msg))),
);
}
}
state_for_task.store(ActorState::Stopped);
Outcome::Ok(())
};
let stored = StoredTask::new_with_id(wrapped, task_id);
let handle = GenServerHandle {
actor_id,
sender: msg_tx,
state: server_state,
task_id,
receiver: result_rx,
inner: inner_weak,
completed: false,
overflow_policy,
};
Ok((handle, stored))
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_named_gen_server<S: GenServer>(
&self,
state: &mut crate::runtime::state::RuntimeState,
cx: &Cx,
registry: &mut crate::cx::NameRegistry,
name: impl Into<String>,
server: S,
mailbox_capacity: usize,
now: crate::types::Time,
) -> Result<
(
NamedGenServerHandle<S>,
crate::runtime::stored_task::StoredTask,
),
NamedSpawnError,
> {
let name = name.into();
let (handle, stored) = self
.spawn_gen_server(state, cx, server, mailbox_capacity)
.map_err(NamedSpawnError::Spawn)?;
let task_id = handle.task_id();
let region = self.region_id();
match registry.register(name, task_id, region, now) {
Ok(lease) => {
let named = NamedGenServerHandle {
handle,
lease: Some(lease),
};
Ok((named, stored))
}
Err(e) => {
let task_id = handle.task_id();
if let Some(region_record) = state.region(self.region_id()) {
region_record.remove_task(task_id);
}
state.remove_task(task_id);
Err(NamedSpawnError::NameTaken(e))
}
}
}
}
#[derive(Debug)]
pub enum NamedSpawnError {
Spawn(SpawnError),
NameTaken(crate::cx::NameLeaseError),
}
impl std::fmt::Display for NamedSpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Spawn(e) => write!(f, "named server spawn failed: {e}"),
Self::NameTaken(e) => write!(f, "named server registration failed: {e}"),
}
}
}
impl std::error::Error for NamedSpawnError {}
#[derive(Debug)]
pub enum ReleaseNameError {
StillRunning,
Lease(crate::cx::NameLeaseError),
}
impl std::fmt::Display for ReleaseNameError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StillRunning => write!(f, "named server is still running"),
Self::Lease(err) => write!(f, "named server lease resolution failed: {err}"),
}
}
}
impl std::error::Error for ReleaseNameError {}
#[derive(Debug)]
pub struct NamedGenServerHandle<S: GenServer> {
handle: GenServerHandle<S>,
lease: Option<crate::cx::NameLease>,
}
impl<S: GenServer> NamedGenServerHandle<S> {
#[inline]
#[must_use]
pub fn name(&self) -> &str {
self.lease
.as_ref()
.map_or("(released)", crate::cx::NameLease::name)
}
#[inline]
#[must_use]
pub fn task_id(&self) -> TaskId {
self.handle.task_id()
}
#[inline]
#[must_use]
pub fn actor_id(&self) -> ActorId {
self.handle.actor_id()
}
#[inline]
#[must_use]
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}
#[must_use]
pub fn server_ref(&self) -> GenServerRef<S> {
self.handle.server_ref()
}
#[must_use]
pub fn inner(&self) -> &GenServerHandle<S> {
&self.handle
}
pub fn stop(&self) {
self.handle.stop();
}
pub fn release_name(
&mut self,
registry: &mut crate::cx::NameRegistry,
now: Time,
) -> Result<(), ReleaseNameError> {
let Some(lease) = self.lease.as_ref() else {
return Err(ReleaseNameError::Lease(
crate::cx::NameLeaseError::AlreadyResolved,
));
};
if !lease.is_active() {
return Err(ReleaseNameError::Lease(
crate::cx::NameLeaseError::AlreadyResolved,
));
}
if !self.handle.is_finished() {
return Err(ReleaseNameError::StillRunning);
}
registry
.unregister_owned_and_grant(lease, now)
.map(|_proof| ())
.map_err(ReleaseNameError::Lease)
.and_then(|()| {
self.lease
.take()
.ok_or(ReleaseNameError::Lease(
crate::cx::NameLeaseError::AlreadyResolved,
))?
.release()
.map(|_proof| ())
.map_err(ReleaseNameError::Lease)
})
}
pub fn abort_lease(
&mut self,
registry: &mut crate::cx::NameRegistry,
now: Time,
) -> Result<(), crate::cx::NameLeaseError> {
let Some(lease) = self.lease.as_ref() else {
return Err(crate::cx::NameLeaseError::AlreadyResolved);
};
if !lease.is_active() {
return Err(crate::cx::NameLeaseError::AlreadyResolved);
}
registry.unregister_owned_and_grant(lease, now)?;
self.lease
.take()
.ok_or(crate::cx::NameLeaseError::AlreadyResolved)?
.abort()
.map(|_proof| ())
}
pub fn take_lease(&mut self) -> Option<crate::cx::NameLease> {
self.lease.take()
}
}
pub struct NamedGenServerStart<S, F>
where
S: GenServer,
F: FnMut() -> S + Send + 'static,
{
registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
name: String,
mailbox_capacity: usize,
make_server: F,
}
#[must_use]
pub fn named_gen_server_start<S, F>(
registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>>,
name: impl Into<String>,
mailbox_capacity: usize,
make_server: F,
) -> NamedGenServerStart<S, F>
where
S: GenServer,
F: FnMut() -> S + Send + 'static,
{
NamedGenServerStart {
registry,
name: name.into(),
mailbox_capacity,
make_server,
}
}
impl<S, F> crate::supervision::ChildStart for NamedGenServerStart<S, F>
where
S: GenServer,
F: FnMut() -> S + Send + 'static,
{
fn start(
&mut self,
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut crate::runtime::RuntimeState,
cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
let now = state.now;
let server = (self.make_server)();
let (mut named_handle, stored) = scope
.spawn_named_gen_server(
state,
cx,
&mut self.registry.lock(),
self.name.clone(),
server,
self.mailbox_capacity,
now,
)
.map_err(|err| match err {
NamedSpawnError::Spawn(spawn_err) => spawn_err,
NamedSpawnError::NameTaken(name_err) => SpawnError::NameRegistrationFailed {
name: self.name.clone(),
reason: name_err.to_string(),
},
})?;
let task_id = named_handle.task_id();
state.store_spawned_task(task_id, stored);
let lease_slot = Arc::new(parking_lot::Mutex::new(named_handle.take_lease()));
let lease_slot_for_finalizer = Arc::clone(&lease_slot);
let registry_for_finalizer = Arc::clone(&self.registry);
let finalizer_registered = scope.defer_sync(state, move || {
let lease_to_resolve = lease_slot_for_finalizer.lock().take();
if let Some(mut lease) = lease_to_resolve {
let _ = registry_for_finalizer
.lock()
.unregister_owned_and_grant(&lease, Time::ZERO);
let _ = lease.release();
}
});
if !finalizer_registered {
let lease_to_abort = lease_slot.lock().take();
if let Some(mut lease) = lease_to_abort {
let _ = self.registry.lock().unregister_owned_and_grant(&lease, now);
let _ = lease.abort();
}
if let Some(region_record) = state.region(scope.region_id()) {
region_record.remove_task(task_id);
}
state.remove_task(task_id);
return Err(SpawnError::RegionClosed(scope.region_id()));
}
Ok(task_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::state::RuntimeState;
use crate::runtime::yield_now;
use crate::supervision::ChildStart;
use crate::types::Budget;
use crate::types::CancelKind;
use crate::types::policy::FailFast;
use crate::util::ArenaIndex;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[derive(Debug)]
struct Counter {
count: u64,
}
enum CounterCall {
Get,
Add(u64),
}
enum CounterCast {
Reset,
}
impl GenServer for Counter {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match request {
CounterCall::Get => {
let _ = reply.send(self.count);
}
CounterCall::Add(n) => {
self.count += n;
let _ = reply.send(self.count);
}
}
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
msg: CounterCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match msg {
CounterCast::Reset => self.count = 0,
}
Box::pin(async {})
}
}
#[derive(Clone)]
struct StartBudgetProbe {
started_priority: Arc<AtomicU8>,
loop_priority: Arc<AtomicU8>,
}
impl GenServer for StartBudgetProbe {
type Call = CounterCall;
type Reply = u8;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.started_priority
.store(cx.budget().priority, Ordering::SeqCst);
Box::pin(async {})
}
fn on_start_budget(&self) -> Budget {
Budget::new().with_priority(200)
}
fn handle_call(
&mut self,
cx: &Cx,
_request: CounterCall,
reply: Reply<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.loop_priority
.store(cx.budget().priority, Ordering::SeqCst);
let _ = reply.send(cx.budget().priority);
Box::pin(async {})
}
}
struct StopMaskProbe {
stop_checkpoint_ok: Arc<AtomicU8>,
}
impl GenServer for StopMaskProbe {
type Call = CounterCall;
type Reply = u8;
type Cast = CounterCast;
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u8>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let ok = cx.checkpoint().is_ok();
self.stop_checkpoint_ok
.store(u8::from(ok), Ordering::SeqCst);
Box::pin(async {})
}
}
enum InitProbeCall {
GetStarted,
}
struct InitProbe {
started: Arc<AtomicU8>,
checkpoints: Arc<Mutex<Vec<serde_json::Value>>>,
}
impl GenServer for InitProbe {
type Call = InitProbeCall;
type Reply = bool;
type Cast = ();
type Info = SystemMsg;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.started.store(1, Ordering::SeqCst);
let started = Arc::clone(&self.started);
let checkpoints = Arc::clone(&self.checkpoints);
Box::pin(async move {
let event = serde_json::json!({
"phase": "on_start",
"started": started.load(Ordering::SeqCst),
});
tracing::info!(event = %event, "gen_server_lab_checkpoint");
checkpoints.lock().push(event);
yield_now().await;
})
}
fn handle_call(
&mut self,
_cx: &Cx,
request: InitProbeCall,
reply: Reply<bool>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match request {
InitProbeCall::GetStarted => {
let started = self.started.load(Ordering::SeqCst) == 1;
let event = serde_json::json!({
"phase": "handle_call",
"started": started,
});
tracing::info!(event = %event, "gen_server_lab_checkpoint");
self.checkpoints.lock().push(event);
let _ = reply.send(started);
}
}
Box::pin(async {})
}
}
fn assert_gen_server<S: GenServer>() {}
#[test]
fn gen_server_trait_bounds() {
init_test("gen_server_trait_bounds");
assert_gen_server::<Counter>();
crate::test_complete!("gen_server_trait_bounds");
}
#[test]
fn gen_server_spawn_and_cast() {
init_test("gen_server_spawn_and_cast");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.try_cast(CounterCast::Reset).unwrap();
drop(handle);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("gen_server_spawn_and_cast");
}
#[test]
fn gen_server_spawn_and_call() {
init_test("gen_server_spawn_and_call");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, CounterCall::Add(5)).await.unwrap()
})
.unwrap();
let client_task_id = client_handle.task_id();
runtime
.state
.store_spawned_task(client_task_id, client_stored);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_quiescent();
let result =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
assert_eq!(result, 5);
crate::test_complete!("gen_server_spawn_and_call");
}
#[test]
fn gen_server_init_runs_before_queued_call_under_lab_runtime() {
init_test("gen_server_init_runs_before_queued_call_under_lab_runtime");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(0x6E57_1001));
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let started = Arc::new(AtomicU8::new(0));
let checkpoints = Arc::new(Mutex::new(Vec::new()));
let (mut handle, stored) = scope
.spawn_gen_server(
&mut runtime.state,
&cx,
InitProbe {
started: Arc::clone(&started),
checkpoints: Arc::clone(&checkpoints),
},
8,
)
.expect("spawn should succeed");
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let checkpoints_for_client = Arc::clone(&checkpoints);
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let started = server_ref
.call(&cx, InitProbeCall::GetStarted)
.await
.expect("init probe call should succeed");
let event = serde_json::json!({
"phase": "client_completed",
"started": started,
});
tracing::info!(event = %event, "gen_server_lab_checkpoint");
checkpoints_for_client.lock().push(event);
started
})
.expect("client spawn should succeed");
let client_task_id = client_handle.task_id();
runtime
.state
.store_spawned_task(client_task_id, client_stored);
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
let call_saw_initialized =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
crate::assert_with_log!(
call_saw_initialized,
"queued call observes completed gen_server init",
true,
call_saw_initialized
);
crate::assert_with_log!(
started.load(Ordering::SeqCst) == 1,
"on_start marks server initialized",
1,
started.load(Ordering::SeqCst)
);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let server = futures_lite::future::block_on(handle.join(&cx)).expect("server join ok");
crate::assert_with_log!(
server.started.load(Ordering::SeqCst) == 1,
"joined server preserves initialized state",
1,
server.started.load(Ordering::SeqCst)
);
let checkpoint_snapshot = checkpoints.lock().clone();
crate::assert_with_log!(
checkpoint_snapshot.len() == 3,
"lab runtime emits init/call/client checkpoints",
3,
checkpoint_snapshot.len()
);
crate::assert_with_log!(
checkpoint_snapshot[0]["phase"] == "on_start",
"on_start checkpoint recorded first",
"on_start",
checkpoint_snapshot[0]["phase"].clone()
);
crate::assert_with_log!(
runtime.is_quiescent(),
"gen_server init test reaches lab quiescence",
true,
runtime.is_quiescent()
);
crate::test_complete!("gen_server_init_runs_before_queued_call_under_lab_runtime");
}
#[test]
#[allow(clippy::items_after_statements, clippy::too_many_lines)]
fn gen_server_spawn_inherits_full_child_cx_capabilities() {
use crate::cx::registry::RegistryHandle;
use crate::evidence_sink::{CollectorSink, EvidenceSink};
use crate::observability::{LogCollector, LogLevel};
use crate::remote::{NodeId, RemoteCap};
use franken_evidence::EvidenceLedgerBuilder;
init_test("gen_server_spawn_inherits_full_child_cx_capabilities");
#[derive(Debug, Default)]
#[allow(clippy::struct_excessive_bools)]
struct CapabilityProbe {
has_timer: bool,
has_io_driver: bool,
has_registry: bool,
has_remote: bool,
has_blocking_pool: bool,
has_log_collector: bool,
remote_origin: Option<String>,
logical_tick_advanced: bool,
}
impl GenServer for CapabilityProbe {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.has_timer = cx.has_timer();
self.has_io_driver = cx.io_driver_handle().is_some();
self.has_registry = cx.registry_handle().is_some();
self.has_remote = cx.has_remote();
self.has_blocking_pool = cx.blocking_pool_handle().is_some();
self.has_log_collector = cx.log_collector().is_some();
self.remote_origin = cx.remote().map(|remote| remote.local_node().to_string());
let before = cx.logical_now();
let after = cx.logical_tick();
self.logical_tick_advanced = after > before;
cx.trace("gen_server_capability_probe_trace");
let entry = EvidenceLedgerBuilder::new()
.ts_unix_ms(1_700_000_000_000)
.component("gen_server_capability_probe")
.action("on_start")
.posterior(vec![0.6, 0.4])
.chosen_expected_loss(0.1)
.calibration_score(0.85)
.build()
.expect("evidence entry");
cx.emit_evidence(&entry);
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
}
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let clock = Arc::new(crate::time::VirtualClock::new());
runtime
.state
.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(clock));
let registry = crate::cx::NameRegistry::new();
let registry_handle = RegistryHandle::new(Arc::new(registry));
let sink = Arc::new(CollectorSink::new());
let collector = LogCollector::new(16).with_min_level(LogLevel::Trace);
let blocking_pool = crate::runtime::blocking_pool::BlockingPool::new(1, 1);
let cx = Cx::for_testing()
.with_registry_handle(Some(registry_handle))
.with_remote_cap(RemoteCap::new().with_local_node(NodeId::new("origin-test")))
.with_blocking_pool_handle(Some(blocking_pool.handle()))
.with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
cx.set_log_collector(collector.clone());
let region = runtime.state.create_root_region(Budget::INFINITE);
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (mut handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, CapabilityProbe::default(), 8)
.expect("spawn should succeed");
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
let server = futures_lite::future::block_on(handle.join(&cx)).expect("join ok");
assert!(
server.has_timer,
"gen server child cx must inherit timer driver"
);
assert!(
server.has_io_driver,
"gen server child cx must inherit the runtime I/O driver",
);
assert!(
server.has_registry,
"gen server child cx must inherit registry handle",
);
assert!(
server.has_remote,
"gen server child cx must inherit remote cap"
);
assert!(
server.has_blocking_pool,
"gen server child cx must inherit blocking-pool capability",
);
assert!(
server.has_log_collector,
"gen server child cx must inherit observability collector state",
);
assert_eq!(server.remote_origin.as_deref(), Some("Node(origin-test)"));
assert!(
server.logical_tick_advanced,
"gen server child cx must inherit a live logical clock",
);
let entries = sink.entries();
assert_eq!(
entries.len(),
1,
"gen server child cx must inherit evidence sink"
);
assert_eq!(entries[0].component, "gen_server_capability_probe");
assert!(
collector
.peek()
.iter()
.any(|entry| entry.message() == "gen_server_capability_probe_trace"),
"gen server child cx must inherit trace/log collector wiring",
);
crate::test_complete!("gen_server_spawn_inherits_full_child_cx_capabilities");
}
#[test]
fn gen_server_call_cancellation_is_deterministic() {
init_test("gen_server_call_cancellation_is_deterministic");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
{
*client_cx_cell_for_task.lock() = Some(cx.clone());
}
server_ref.call(&cx, CounterCall::Get).await
})
.unwrap();
let client_task_id = client_handle.task_id();
runtime
.state
.store_spawned_task(client_task_id, client_stored);
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
let client_cx = client_cx_cell
.lock()
.as_ref()
.expect("client cx published")
.clone();
client_cx.cancel_with(CancelKind::User, Some("gen_server call cancelled"));
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
let result =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
match result {
Ok(_) => unreachable!("expected cancellation, got Ok"),
Err(CallError::Cancelled(reason)) => {
assert_eq!(reason.kind, CancelKind::User);
assert_eq!(
reason.message,
Some("gen_server call cancelled".to_string())
);
}
Err(other) => unreachable!("expected CallError::Cancelled, got {other:?}"),
}
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("gen_server_call_cancellation_is_deterministic");
}
#[test]
fn supervised_gen_server_stays_alive() {
init_test("supervised_gen_server_stays_alive");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let registry = Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
let mut starter =
named_gen_server_start(Arc::clone(®istry), "persistent_service", 32, || {
Counter { count: 0 }
});
let task_id = starter
.start(&scope, &mut runtime.state, &cx)
.expect("start ok");
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
let task = runtime.state.task(task_id).expect("task exists");
crate::assert_with_log!(
!task.state.is_terminal(),
"server should be alive",
"Running",
format!("{:?}", task.state)
);
let tasks_to_schedule =
runtime
.state
.cancel_request(region, &CancelReason::user("test done"), None);
for (tid, priority) in tasks_to_schedule {
runtime.scheduler.lock().schedule(tid, priority);
}
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert!(
registry.lock().whereis("persistent_service").is_none(),
"name must be removed after region stop",
);
crate::test_complete!("supervised_gen_server_stays_alive");
}
#[test]
fn gen_server_cast_cancellation_is_deterministic() {
init_test("gen_server_cast_cancellation_is_deterministic");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset))
.expect("prefill cast ok");
let client_cx_cell: Arc<Mutex<Option<Cx>>> = Arc::new(Mutex::new(None));
let client_cx_cell_for_task = Arc::clone(&client_cx_cell);
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
{
*client_cx_cell_for_task.lock() = Some(cx.clone());
}
server_ref.cast(&cx, CounterCast::Reset).await
})
.unwrap();
let client_task_id = client_handle.task_id();
runtime
.state
.store_spawned_task(client_task_id, client_stored);
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
let client_cx = client_cx_cell
.lock()
.as_ref()
.expect("client cx published")
.clone();
client_cx.cancel_with(CancelKind::User, Some("gen_server cast cancelled"));
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_idle();
let result =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
match result {
Ok(()) => unreachable!("expected cancellation, got Ok"),
Err(CastError::Cancelled(reason)) => {
assert_eq!(reason.kind, CancelKind::User);
assert_eq!(
reason.message,
Some("gen_server cast cancelled".to_string())
);
}
Err(other) => unreachable!("expected CastError::Cancelled, got {other:?}"),
}
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("gen_server_cast_cancellation_is_deterministic");
}
#[test]
fn gen_server_handle_accessors() {
init_test("gen_server_handle_accessors");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let _actor_id = handle.actor_id();
let _task_id = handle.task_id();
assert!(!handle.is_finished());
let server_ref = handle.server_ref();
assert_eq!(server_ref.actor_id(), handle.actor_id());
assert!(server_ref.is_alive());
assert!(!server_ref.is_closed());
crate::test_complete!("gen_server_handle_accessors");
}
#[test]
fn gen_server_ref_is_cloneable() {
init_test("gen_server_ref_is_cloneable");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 32)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let ref1 = handle.server_ref();
let ref2 = ref1.clone();
assert_eq!(ref1.actor_id(), ref2.actor_id());
crate::test_complete!("gen_server_ref_is_cloneable");
}
#[test]
fn gen_server_stop_transitions() {
init_test("gen_server_stop_transitions");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
let server_ref = handle.server_ref();
assert!(server_ref.is_alive());
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert!(handle.is_finished());
assert!(!server_ref.is_alive());
crate::test_complete!("gen_server_stop_transitions");
}
#[test]
fn gen_server_handle_rejects_call_and_cast_after_stop() {
init_test("gen_server_handle_rejects_call_and_cast_after_stop");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
handle.stop();
let call_err =
futures_lite::future::block_on(handle.call(&cx, CounterCall::Get)).unwrap_err();
assert!(
matches!(call_err, CallError::ServerStopped),
"call after stop must return ServerStopped, got {call_err:?}"
);
let cast_err =
futures_lite::future::block_on(handle.cast(&cx, CounterCast::Reset)).unwrap_err();
assert!(
matches!(cast_err, CastError::ServerStopped),
"cast after stop must return ServerStopped, got {cast_err:?}"
);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert!(handle.is_finished());
crate::test_complete!("gen_server_handle_rejects_call_and_cast_after_stop");
}
#[test]
fn gen_server_handle_join_returns_final_state_after_stop() {
init_test("gen_server_handle_join_returns_final_state_after_stop");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (mut handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert!(handle.is_finished());
let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("join");
assert_eq!(
final_state.count, 0,
"final server state should be returned"
);
crate::test_complete!("gen_server_handle_join_returns_final_state_after_stop");
}
#[test]
fn gen_server_handle_second_join_fails_closed() {
init_test("gen_server_handle_second_join_fails_closed");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (mut handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert!(
handle.is_finished(),
"stopped server should report finished"
);
let final_state = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
assert_eq!(
final_state.count, 0,
"join should return final server state"
);
let second = futures_lite::future::block_on(handle.join(&cx));
assert!(
matches!(second, Err(JoinError::PolledAfterCompletion)),
"second join must fail closed, got {second:?}"
);
crate::test_complete!("gen_server_handle_second_join_fails_closed");
}
#[test]
fn gen_server_stop_wakes_blocked_mailbox_recv() {
#[allow(clippy::items_after_statements)]
struct StopWakeProbe {
stop_ran: Arc<AtomicU8>,
}
#[allow(clippy::items_after_statements)]
impl GenServer for StopWakeProbe {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.stop_ran.store(1, Ordering::SeqCst);
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
}
init_test("gen_server_stop_wakes_blocked_mailbox_recv");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let stop_ran = Arc::new(AtomicU8::new(0));
let server = StopWakeProbe {
stop_ran: Arc::clone(&stop_ran),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
handle.stop();
runtime.run_until_quiescent();
assert_eq!(
stop_ran.load(Ordering::SeqCst),
1,
"on_stop should run after stop wakes blocked recv"
);
assert!(handle.is_finished(), "server should finish after stop");
crate::test_complete!("gen_server_stop_wakes_blocked_mailbox_recv");
}
#[test]
fn dropped_join_future_marks_server_stopping_like_abort() {
init_test("dropped_join_future_marks_server_stopping_like_abort");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let final_count = Arc::new(AtomicU64::new(u64::MAX));
let server = ObservableCounter {
count: 0,
final_count: Arc::clone(&final_count),
};
let (mut handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
assert_eq!(
handle.state.load(),
ActorState::Running,
"server should be running before join drop requests abort"
);
drop(handle.join(&cx));
assert_eq!(
handle.state.load(),
ActorState::Stopping,
"dropping join future should mirror GenServerHandle::abort state transition"
);
runtime.run_until_quiescent();
assert!(
handle.is_finished(),
"server should stop after join future drop"
);
assert_eq!(
final_count.load(Ordering::SeqCst),
0,
"idle server should stop without processing phantom work"
);
crate::test_complete!("dropped_join_future_marks_server_stopping_like_abort");
}
struct ObservableCounter {
count: u64,
final_count: Arc<AtomicU64>,
}
impl GenServer for ObservableCounter {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match request {
CounterCall::Get => {
let _ = reply.send(self.count);
}
CounterCall::Add(n) => {
self.count += n;
let _ = reply.send(self.count);
}
}
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
msg: CounterCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match msg {
CounterCast::Reset => self.count = 0,
}
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.final_count.store(self.count, Ordering::SeqCst);
Box::pin(async {})
}
}
#[test]
fn gen_server_processes_casts_before_stop() {
init_test("gen_server_processes_casts_before_stop");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let final_count = Arc::new(AtomicU64::new(u64::MAX));
let server = ObservableCounter {
count: 0,
final_count: final_count.clone(),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
for _ in 0..5 {
handle.try_cast(CounterCast::Reset).expect("try_cast ok");
}
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
assert_eq!(
final_count.load(Ordering::SeqCst),
0,
"on_stop recorded final count"
);
crate::test_complete!("gen_server_processes_casts_before_stop");
}
#[test]
fn gen_server_deterministic_replay() {
fn run_scenario(seed: u64) -> u64 {
let config = crate::lab::LabConfig::new(seed);
let mut runtime = crate::lab::LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let final_count = Arc::new(AtomicU64::new(u64::MAX));
let server = ObservableCounter {
count: 0,
final_count: final_count.clone(),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
for _ in 0..5 {
handle.try_cast(CounterCast::Reset).expect("try_cast ok");
}
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
final_count.load(Ordering::SeqCst)
}
init_test("gen_server_deterministic_replay");
let result1 = run_scenario(0xCAFE_BABE);
let result2 = run_scenario(0xCAFE_BABE);
assert_eq!(result1, result2, "deterministic replay");
crate::test_complete!("gen_server_deterministic_replay");
}
#[derive(Default)]
struct InfoRecorder {
seen: Arc<Mutex<Vec<String>>>,
}
impl GenServer for InfoRecorder {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_info(
&mut self,
_cx: &Cx,
msg: Self::Info,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let seen = Arc::clone(&self.seen);
Box::pin(async move {
seen.lock().push(format!("{msg:?}"));
})
}
}
fn tid(n: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(n, 0))
}
fn rid(n: u32) -> crate::types::RegionId {
crate::types::RegionId::from_arena(ArenaIndex::new(n, 0))
}
#[test]
fn conformance_system_msg_sort_key_orders_shutdown_batch() {
init_test("conformance_system_msg_sort_key_orders_shutdown_batch");
let mut monitors = crate::monitor::MonitorSet::new();
let mref_down_6 = monitors.establish(tid(90), rid(0), tid(6));
let mref_down_3 = monitors.establish(tid(91), rid(0), tid(3));
let mut batch = SystemMsgBatch::new();
batch.push(SystemMsg::Exit {
exit_vt: Time::from_secs(10),
from: tid(6),
reason: DownReason::Normal,
});
batch.push(SystemMsg::Timeout {
tick_vt: Time::from_secs(10),
id: 4,
});
batch.push(SystemMsg::Down {
completion_vt: Time::from_secs(10),
notification: DownNotification {
monitored: tid(6),
reason: DownReason::Normal,
monitor_ref: mref_down_6,
},
});
batch.push(SystemMsg::Timeout {
tick_vt: Time::from_secs(9),
id: 99,
});
batch.push(SystemMsg::Down {
completion_vt: Time::from_secs(10),
notification: DownNotification {
monitored: tid(3),
reason: DownReason::Normal,
monitor_ref: mref_down_3,
},
});
batch.push(SystemMsg::Exit {
exit_vt: Time::from_secs(10),
from: tid(2),
reason: DownReason::Normal,
});
batch.push(SystemMsg::Timeout {
tick_vt: Time::from_secs(10),
id: 1,
});
let sorted = batch.into_sorted();
let keys: Vec<_> = sorted.iter().map(SystemMsg::sort_key).collect();
assert_eq!(
keys,
vec![
(Time::from_secs(9), 2, SystemMsgSubjectKey::TimeoutId(99)),
(Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(3))),
(Time::from_secs(10), 0, SystemMsgSubjectKey::Task(tid(6))),
(Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(2))),
(Time::from_secs(10), 1, SystemMsgSubjectKey::Task(tid(6))),
(Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(1)),
(Time::from_secs(10), 2, SystemMsgSubjectKey::TimeoutId(4)),
],
"shutdown system-message ordering must follow SYS-ORDER"
);
crate::test_complete!("conformance_system_msg_sort_key_orders_shutdown_batch");
}
#[test]
fn conformance_system_msg_batch_matches_explicit_sort() {
init_test("conformance_system_msg_batch_matches_explicit_sort");
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(77), rid(0), tid(8));
let messages = vec![
SystemMsg::Timeout {
tick_vt: Time::from_secs(12),
id: 4,
},
SystemMsg::Exit {
exit_vt: Time::from_secs(11),
from: tid(8),
reason: DownReason::Error("boom".to_string()),
},
SystemMsg::Down {
completion_vt: Time::from_secs(11),
notification: DownNotification {
monitored: tid(8),
reason: DownReason::Normal,
monitor_ref: mref,
},
},
SystemMsg::Timeout {
tick_vt: Time::from_secs(11),
id: 2,
},
];
let mut batch = SystemMsgBatch::new();
for msg in messages.clone() {
batch.push(msg);
}
let batched = batch.into_sorted();
let mut explicit = messages;
explicit.sort_by_key(SystemMsg::sort_key);
let batched_keys: Vec<_> = batched.iter().map(SystemMsg::sort_key).collect();
let explicit_keys: Vec<_> = explicit.iter().map(SystemMsg::sort_key).collect();
assert_eq!(batched_keys, explicit_keys);
crate::test_complete!("conformance_system_msg_batch_matches_explicit_sort");
}
#[test]
fn system_msg_payload_types_roundtrip_via_conversions() {
init_test("system_msg_payload_types_roundtrip_via_conversions");
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(7), rid(0), tid(8));
let down = DownMsg::new(
Time::from_secs(11),
DownNotification {
monitored: tid(8),
reason: DownReason::Normal,
monitor_ref: mref,
},
);
let down_msg = SystemMsg::down(down.clone());
let down_back = DownMsg::try_from(down_msg).expect("down conversion");
assert_eq!(down_back.completion_vt, down.completion_vt);
assert_eq!(
down_back.notification.monitored,
down.notification.monitored
);
assert_eq!(down_back.notification.reason, down.notification.reason);
assert_eq!(
down_back.notification.monitor_ref,
down.notification.monitor_ref
);
let exit = ExitMsg::new(
Time::from_secs(12),
tid(9),
DownReason::Error("boom".into()),
);
let exit_msg = SystemMsg::exit(exit.clone());
let exit_back = ExitMsg::try_from(exit_msg).expect("exit conversion");
assert_eq!(exit_back, exit);
let timeout = TimeoutMsg::new(Time::from_secs(13), 42);
let timeout_msg = SystemMsg::timeout(timeout);
let timeout_back = TimeoutMsg::try_from(timeout_msg).expect("timeout conversion");
assert_eq!(timeout_back, timeout);
crate::test_complete!("system_msg_payload_types_roundtrip_via_conversions");
}
#[test]
fn system_msg_try_from_mismatch_returns_original_variant() {
init_test("system_msg_try_from_mismatch_returns_original_variant");
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(10), rid(0), tid(1));
let timeout = SystemMsg::Timeout {
tick_vt: Time::from_secs(5),
id: 99,
};
let err = DownMsg::try_from(timeout).expect_err("timeout is not down");
assert!(matches!(err, SystemMsg::Timeout { id: 99, .. }));
let down = SystemMsg::Down {
completion_vt: Time::from_secs(6),
notification: DownNotification {
monitored: tid(1),
reason: DownReason::Normal,
monitor_ref: mref,
},
};
let err = TimeoutMsg::try_from(down).expect_err("down is not timeout");
assert!(matches!(err, SystemMsg::Down { .. }));
crate::test_complete!("system_msg_try_from_mismatch_returns_original_variant");
}
#[test]
fn gen_server_handle_info_receives_system_messages() {
init_test("gen_server_handle_info_receives_system_messages");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let seen: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let server = InfoRecorder {
seen: Arc::clone(&seen),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(10), rid(0), tid(11));
handle
.try_info(SystemMsg::Down {
completion_vt: Time::from_secs(5),
notification: DownNotification {
monitored: tid(11),
reason: DownReason::Normal,
monitor_ref: mref,
},
})
.unwrap();
handle
.try_info(SystemMsg::Exit {
exit_vt: Time::from_secs(6),
from: tid(12),
reason: DownReason::Error("boom".into()),
})
.unwrap();
handle
.try_info(SystemMsg::Timeout {
tick_vt: Time::from_secs(7),
id: 123,
})
.unwrap();
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let seen = seen.lock();
assert_eq!(seen.len(), 3);
assert!(seen[0].contains("Down"));
assert!(seen[1].contains("Exit"));
assert!(seen[2].contains("Timeout"));
drop(seen);
crate::test_complete!("gen_server_handle_info_receives_system_messages");
}
#[test]
fn gen_server_info_ordering_is_deterministic_for_seed() {
fn run_scenario(seed: u64) -> Vec<String> {
let config = crate::lab::LabConfig::new(seed);
let mut runtime = crate::lab::LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let server = InfoRecorder {
seen: Arc::clone(&events),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let (client_a, stored_a) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref
.info(
&cx,
SystemMsg::Timeout {
tick_vt: Time::from_secs(2),
id: 1,
},
)
.await
.unwrap();
})
.unwrap();
let task_id_a = client_a.task_id();
runtime.state.store_spawned_task(task_id_a, stored_a);
let server_ref_b = handle.server_ref();
let (client_b, stored_b) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref_b
.info(
&cx,
SystemMsg::Timeout {
tick_vt: Time::from_secs(2),
id: 2,
},
)
.await
.unwrap();
})
.unwrap();
let task_id_b = client_b.task_id();
runtime.state.store_spawned_task(task_id_b, stored_b);
{
runtime.scheduler.lock().schedule(task_id_a, 0);
}
{
runtime.scheduler.lock().schedule(task_id_b, 0);
}
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
events.lock().clone()
}
init_test("gen_server_info_ordering_is_deterministic_for_seed");
let a = run_scenario(0xD00D_F00D);
let b = run_scenario(0xD00D_F00D);
assert_eq!(
a, b,
"system/info ordering must be deterministic for same seed"
);
crate::test_complete!("gen_server_info_ordering_is_deterministic_for_seed");
}
struct DropOldestCounter {
count: u64,
}
#[derive(Debug, Clone)]
enum TaggedCast {
Set(u64),
}
impl GenServer for DropOldestCounter {
type Call = CounterCall;
type Reply = u64;
type Cast = TaggedCast;
type Info = SystemMsg;
fn cast_overflow_policy(&self) -> CastOverflowPolicy {
CastOverflowPolicy::DropOldest
}
fn handle_call(
&mut self,
_cx: &Cx,
request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match request {
CounterCall::Get => {
let _ = reply.send(self.count);
}
CounterCall::Add(n) => {
self.count += n;
let _ = reply.send(self.count);
}
}
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
msg: TaggedCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match msg {
TaggedCast::Set(v) => self.count = v,
}
Box::pin(async {})
}
}
#[test]
fn gen_server_drop_oldest_policy_accessor() {
init_test("gen_server_drop_oldest_policy_accessor");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 4)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
assert_eq!(
handle.cast_overflow_policy(),
CastOverflowPolicy::DropOldest
);
let server_ref = handle.server_ref();
assert_eq!(
server_ref.cast_overflow_policy(),
CastOverflowPolicy::DropOldest
);
crate::test_complete!("gen_server_drop_oldest_policy_accessor");
}
#[test]
fn gen_server_drop_oldest_evicts_when_full() {
init_test("gen_server_drop_oldest_evicts_when_full");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
handle.try_cast(TaggedCast::Set(10)).unwrap();
handle.try_cast(TaggedCast::Set(20)).unwrap();
handle.try_cast(TaggedCast::Set(30)).unwrap();
handle.try_cast(TaggedCast::Set(40)).unwrap();
crate::test_complete!("gen_server_drop_oldest_evicts_when_full");
}
#[test]
fn gen_server_reject_policy_returns_full() {
init_test("gen_server_reject_policy_returns_full");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
assert_eq!(handle.cast_overflow_policy(), CastOverflowPolicy::Reject);
handle.try_cast(CounterCast::Reset).unwrap();
handle.try_cast(CounterCast::Reset).unwrap();
let err = handle.try_cast(CounterCast::Reset).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
crate::test_complete!("gen_server_reject_policy_returns_full");
}
#[test]
fn gen_server_drop_oldest_ref_also_evicts() {
init_test("gen_server_drop_oldest_ref_also_evicts");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 2)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let server_ref = handle.server_ref();
server_ref.try_cast(TaggedCast::Set(1)).unwrap();
server_ref.try_cast(TaggedCast::Set(2)).unwrap();
server_ref.try_cast(TaggedCast::Set(3)).unwrap();
crate::test_complete!("gen_server_drop_oldest_ref_also_evicts");
}
#[test]
fn gen_server_drop_oldest_reserved_slots_returns_full() {
init_test("gen_server_drop_oldest_reserved_slots_returns_full");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
let err = handle.try_cast(TaggedCast::Set(1)).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
crate::test_complete!("gen_server_drop_oldest_reserved_slots_returns_full");
}
#[test]
fn gen_server_ref_drop_oldest_reserved_slots_returns_full() {
init_test("gen_server_ref_drop_oldest_reserved_slots_returns_full");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let server_ref = handle.server_ref();
let _permit = futures_lite::future::block_on(handle.sender.reserve(&cx)).unwrap();
let err = server_ref.try_cast(TaggedCast::Set(1)).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
crate::test_complete!("gen_server_ref_drop_oldest_reserved_slots_returns_full");
}
#[test]
fn gen_server_drop_oldest_preserves_queued_call_and_returns_full() {
init_test("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let root = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
let (reply_tx, mut reply_rx) = session::tracked_oneshot::<u64>();
let reply_permit = reply_tx.reserve(&cx);
let call_envelope: Envelope<DropOldestCounter> = Envelope::Call {
request: CounterCall::Get,
reply_permit,
};
handle.sender.try_send(call_envelope).unwrap();
let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
let recv = futures_lite::future::block_on(reply_rx.recv(&cx));
assert_eq!(
recv,
Ok(0),
"preserved queued call should still be serviced, got {recv:?}"
);
crate::test_complete!("gen_server_drop_oldest_preserves_queued_call_and_returns_full");
}
#[test]
fn gen_server_drop_oldest_preserves_queued_info_and_returns_full() {
init_test("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let root = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
let info = SystemMsg::timeout(TimeoutMsg::new(Time::from_secs(1), 7));
handle
.sender
.try_send(Envelope::Info { msg: info })
.expect("queue info");
let err = handle.try_cast(TaggedCast::Set(99)).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("gen_server_drop_oldest_preserves_queued_info_and_returns_full");
}
#[test]
fn gen_server_default_overflow_policy_is_reject() {
init_test("gen_server_default_overflow_policy_is_reject");
assert_eq!(CastOverflowPolicy::default(), CastOverflowPolicy::Reject);
let counter = Counter { count: 0 };
assert_eq!(counter.cast_overflow_policy(), CastOverflowPolicy::Reject);
crate::test_complete!("gen_server_default_overflow_policy_is_reject");
}
#[test]
fn reply_debug_format() {
init_test("reply_debug_format");
let cx = Cx::for_testing();
let (tx, _rx) = session::tracked_oneshot::<u64>();
let permit = tx.reserve(&cx);
let reply = Reply::new(&cx, permit);
let debug_str = format!("{reply:?}");
assert!(debug_str.contains("Reply"));
assert!(debug_str.contains("pending"));
let _ = reply.send(42);
crate::test_complete!("reply_debug_format");
}
#[test]
fn gen_server_on_start_budget_priority_applied_and_restored() {
init_test("gen_server_on_start_budget_priority_applied_and_restored");
let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let started_priority = Arc::new(AtomicU8::new(0));
let loop_priority = Arc::new(AtomicU8::new(0));
let server = StartBudgetProbe {
started_priority: Arc::clone(&started_priority),
loop_priority: Arc::clone(&loop_priority),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let (client, stored_client) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let p = server_ref.call(&cx, CounterCall::Get).await.unwrap();
assert_eq!(p, 10);
})
.unwrap();
let client_task_id = client.task_id();
runtime
.state
.store_spawned_task(client_task_id, stored_client);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_quiescent();
assert_eq!(started_priority.load(Ordering::SeqCst), 200);
assert_eq!(loop_priority.load(Ordering::SeqCst), 10);
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("gen_server_on_start_budget_priority_applied_and_restored");
}
#[test]
fn gen_server_on_stop_runs_masked_under_stop() {
init_test("gen_server_on_stop_runs_masked_under_stop");
let budget = Budget::new().with_poll_quota(10_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let stop_checkpoint_ok = Arc::new(AtomicU8::new(0));
let server = StopMaskProbe {
stop_checkpoint_ok: Arc::clone(&stop_checkpoint_ok),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
assert_eq!(stop_checkpoint_ok.load(Ordering::SeqCst), 1);
crate::test_complete!("gen_server_on_stop_runs_masked_under_stop");
}
#[test]
fn cast_drop_oldest_emits_trace_on_eviction() {
init_test("cast_drop_oldest_emits_trace_on_eviction");
let budget = Budget::new().with_poll_quota(10_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
let task_id = handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_idle();
handle.try_cast(TaggedCast::Set(1)).unwrap();
handle.try_cast(TaggedCast::Set(2)).unwrap();
crate::test_complete!("cast_drop_oldest_emits_trace_on_eviction");
}
#[test]
fn cast_to_stopped_server_returns_error() {
init_test("cast_to_stopped_server_returns_error");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 4)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
handle.stop();
let err = handle.try_cast(CounterCast::Reset).unwrap_err();
assert!(
matches!(err, CastError::ServerStopped),
"expected ServerStopped, got {err:?}"
);
crate::test_complete!("cast_to_stopped_server_returns_error");
}
#[test]
fn cast_overflow_policy_display() {
init_test("cast_overflow_policy_display");
assert_eq!(format!("{}", CastOverflowPolicy::Reject), "Reject");
assert_eq!(format!("{}", CastOverflowPolicy::DropOldest), "DropOldest");
crate::test_complete!("cast_overflow_policy_display");
}
#[test]
fn cast_ref_reject_returns_full() {
init_test("cast_ref_reject_returns_full");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, Counter { count: 0 }, 2)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let server_ref = handle.server_ref();
server_ref.try_cast(CounterCast::Reset).unwrap();
server_ref.try_cast(CounterCast::Reset).unwrap();
let err = server_ref.try_cast(CounterCast::Reset).unwrap_err();
assert!(matches!(err, CastError::Full), "expected Full, got {err:?}");
crate::test_complete!("cast_ref_reject_returns_full");
}
#[test]
fn cast_drop_oldest_ref_capacity_one() {
init_test("cast_drop_oldest_ref_capacity_one");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(root, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut state, &cx, DropOldestCounter { count: 0 }, 1)
.unwrap();
state.store_spawned_task(handle.task_id(), stored);
let server_ref = handle.server_ref();
server_ref.try_cast(TaggedCast::Set(100)).unwrap();
server_ref.try_cast(TaggedCast::Set(200)).unwrap();
server_ref.try_cast(TaggedCast::Set(300)).unwrap();
crate::test_complete!("cast_drop_oldest_ref_capacity_one");
}
#[test]
fn init_default_budget_is_infinite() {
init_test("init_default_budget_is_infinite");
let counter = Counter { count: 0 };
assert_eq!(counter.on_start_budget(), Budget::INFINITE);
crate::test_complete!("init_default_budget_is_infinite");
}
#[test]
fn terminate_default_budget_is_minimal() {
init_test("terminate_default_budget_is_minimal");
let counter = Counter { count: 0 };
assert_eq!(counter.on_stop_budget(), Budget::MINIMAL);
crate::test_complete!("terminate_default_budget_is_minimal");
}
#[test]
fn init_skipped_when_pre_cancelled_but_stop_runs() {
#[allow(clippy::items_after_statements)]
struct LifecycleProbe {
init_ran: Arc<AtomicU8>,
stop_ran: Arc<AtomicU8>,
}
#[allow(clippy::items_after_statements)]
impl GenServer for LifecycleProbe {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.init_ran.store(1, Ordering::SeqCst);
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.stop_ran.store(1, Ordering::SeqCst);
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
}
init_test("init_skipped_when_pre_cancelled_but_stop_runs");
let budget = Budget::new().with_poll_quota(10_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let init_ran = Arc::new(AtomicU8::new(0));
let stop_ran = Arc::new(AtomicU8::new(0));
let server = LifecycleProbe {
init_ran: Arc::clone(&init_ran),
stop_ran: Arc::clone(&stop_ran),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
assert_eq!(
init_ran.load(Ordering::SeqCst),
0,
"init should be skipped when pre-cancelled"
);
assert_eq!(
stop_ran.load(Ordering::SeqCst),
1,
"stop must run even when pre-cancelled"
);
crate::test_complete!("init_skipped_when_pre_cancelled_but_stop_runs");
}
#[test]
fn init_budget_consumption_propagates_to_main_budget() {
#[allow(clippy::items_after_statements)]
struct BudgetCheckProbe {
loop_quota: Arc<AtomicU64>,
}
#[allow(clippy::items_after_statements)]
impl GenServer for BudgetCheckProbe {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_start(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = cx.budget();
Box::pin(async {})
}
fn on_start_budget(&self) -> Budget {
Budget::new().with_poll_quota(50).with_priority(200)
}
fn handle_call(
&mut self,
cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.loop_quota
.store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
let _ = reply.send(0);
Box::pin(async {})
}
}
init_test("init_budget_consumption_propagates_to_main_budget");
let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let loop_quota = Arc::new(AtomicU64::new(0));
let server = BudgetCheckProbe {
loop_quota: Arc::clone(&loop_quota),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let (client, stored_client) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let _ = server_ref.call(&cx, CounterCall::Get).await;
})
.unwrap();
let client_task_id = client.task_id();
runtime
.state
.store_spawned_task(client_task_id, stored_client);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
{
runtime.scheduler.lock().schedule(client_task_id, 0);
}
runtime.run_until_quiescent();
let remaining = loop_quota.load(Ordering::SeqCst);
assert!(
remaining <= 10_000,
"main budget after init must be <= original ({remaining} <= 10000)"
);
assert!(
remaining > 0,
"main budget should still have polls remaining"
);
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("init_budget_consumption_propagates_to_main_budget");
}
#[test]
fn stop_budget_constrains_stop_phase() {
struct StopBudgetProbe {
stop_poll_quota: Arc<AtomicU64>,
}
impl GenServer for StopBudgetProbe {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_stop_budget(&self) -> Budget {
Budget::new().with_poll_quota(42).with_priority(250)
}
fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.stop_poll_quota
.store(u64::from(cx.budget().poll_quota), Ordering::SeqCst);
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
}
init_test("stop_budget_constrains_stop_phase");
let budget = Budget::new().with_poll_quota(10_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let stop_poll_quota = Arc::new(AtomicU64::new(0));
let server = StopBudgetProbe {
stop_poll_quota: Arc::clone(&stop_poll_quota),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let stop_quota = stop_poll_quota.load(Ordering::SeqCst);
assert_eq!(stop_quota, 42, "stop phase should use the tighter budget");
crate::test_complete!("stop_budget_constrains_stop_phase");
}
#[test]
fn lifecycle_init_before_stop() {
#[allow(clippy::items_after_statements)]
struct PhaseTracker {
phases: Arc<Mutex<Vec<&'static str>>>,
}
#[allow(clippy::items_after_statements)]
impl GenServer for PhaseTracker {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.phases.lock().push("init");
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.phases.lock().push("stop");
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: CounterCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
init_test("lifecycle_init_before_stop");
let budget = Budget::new().with_poll_quota(10_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let phases = Arc::new(Mutex::new(Vec::<&'static str>::new()));
let server = PhaseTracker {
phases: Arc::clone(&phases),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
let phases_clone = Arc::clone(&phases);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
{
let recorded = phases_clone.lock();
assert!(
recorded.contains(&"stop"),
"stop phase must run, got {:?}",
*recorded
);
if let Some(init_pos) = recorded.iter().position(|p| *p == "init") {
let stop_pos = recorded.iter().position(|p| *p == "stop").unwrap();
assert!(
init_pos < stop_pos,
"init must precede stop, got {:?}",
*recorded
);
}
drop(recorded);
}
crate::test_complete!("lifecycle_init_before_stop");
}
#[test]
fn stop_budget_priority_applied() {
#[allow(clippy::items_after_statements)]
struct StopPriorityProbe {
stop_priority: Arc<AtomicU8>,
}
#[allow(clippy::items_after_statements)]
impl GenServer for StopPriorityProbe {
type Call = CounterCall;
type Reply = u64;
type Cast = CounterCast;
type Info = SystemMsg;
fn on_stop_budget(&self) -> Budget {
Budget::new().with_poll_quota(200).with_priority(240)
}
fn on_stop(&mut self, cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.stop_priority
.store(cx.budget().priority, Ordering::SeqCst);
Box::pin(async {})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: CounterCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(0);
Box::pin(async {})
}
}
init_test("stop_budget_priority_applied");
let budget = Budget::new().with_poll_quota(10_000).with_priority(10);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let stop_priority = Arc::new(AtomicU8::new(0));
let server = StopPriorityProbe {
stop_priority: Arc::clone(&stop_priority),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let actual_priority = stop_priority.load(Ordering::SeqCst);
assert!(
actual_priority >= 10,
"stop priority should be at least original ({actual_priority} >= 10)"
);
crate::test_complete!("stop_budget_priority_applied");
}
#[test]
fn conformance_cancel_propagation_to_queued_calls() {
init_test("conformance_cancel_propagation_to_queued_calls");
let budget = Budget::new().with_poll_quota(50_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 1)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref_1 = handle.server_ref();
let server_ref_2 = handle.server_ref();
let result_1: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
let result_1_clone = Arc::clone(&result_1);
let (c1_handle, c1_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref_1.call(&cx, CounterCall::Add(10)).await;
*result_1_clone.lock() = Some(r);
})
.unwrap();
let c1_id = c1_handle.task_id();
runtime.state.store_spawned_task(c1_id, c1_stored);
let result_2: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
let result_2_clone = Arc::clone(&result_2);
let (c2_handle, c2_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref_2.call(&cx, CounterCall::Add(20)).await;
*result_2_clone.lock() = Some(r);
})
.unwrap();
let c2_id = c2_handle.task_id();
runtime.state.store_spawned_task(c2_id, c2_stored);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(c1_id, 0);
sched.schedule(c2_id, 0);
}
runtime.run_until_idle();
handle.stop();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(c1_id, 0);
sched.schedule(c2_id, 0);
}
runtime.run_until_quiescent();
drop(result_2.lock());
crate::test_complete!("conformance_cancel_propagation_to_queued_calls");
}
#[test]
fn conformance_stopped_server_rejects_new_messages() {
init_test("conformance_stopped_server_rejects_new_messages");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let cast_result = server_ref.try_cast(CounterCast::Reset);
assert!(cast_result.is_err(), "cast to stopped server must fail");
crate::test_complete!("conformance_stopped_server_rejects_new_messages");
}
#[test]
fn conformance_full_lifecycle_no_obligation_leaks() {
init_test("conformance_full_lifecycle_no_obligation_leaks");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
server_ref.try_cast(CounterCast::Reset).unwrap();
let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
let call_result_clone = Arc::clone(&call_result);
let server_ref_for_call = handle.server_ref();
let (client, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref_for_call.call(&cx, CounterCall::Add(42)).await;
*call_result_clone.lock() = Some(r);
})
.unwrap();
let client_id = client.task_id();
runtime.state.store_spawned_task(client_id, client_stored);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
let call_r = call_result.lock();
if let Some(ref r) = *call_r {
match r {
Ok(value) => assert_eq!(*value, 42, "counter should be 42 after Add(42)"),
Err(e) => unreachable!("unexpected call error: {e:?}"),
}
}
drop(call_r);
server_ref.try_cast(CounterCast::Reset).unwrap();
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_full_lifecycle_no_obligation_leaks");
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_deterministic_replay_with_seed() {
init_test("conformance_deterministic_replay_with_seed");
fn run_scenario(seed: u64) -> Vec<u64> {
let config = crate::lab::LabConfig::new(seed);
let mut runtime = crate::lab::LabRuntime::new(config);
let budget = Budget::new().with_poll_quota(100_000);
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let results: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
let mut client_ids = Vec::new();
for i in 1..=3u64 {
let server_ref = handle.server_ref();
let results_clone = Arc::clone(&results);
let (ch, cs) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
if let Ok(val) = server_ref.call(&cx, CounterCall::Add(i * 10)).await {
results_clone.lock().push(val);
}
})
.unwrap();
let cid = ch.task_id();
runtime.state.store_spawned_task(cid, cs);
client_ids.push(cid);
}
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
for &cid in &client_ids {
sched.schedule(cid, 0);
}
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
for &cid in &client_ids {
sched.schedule(cid, 0);
}
}
runtime.run_until_idle();
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
results.lock().clone()
}
let run_a = run_scenario(42);
let run_b = run_scenario(42);
assert_eq!(
run_a, run_b,
"same seed must produce identical results: {run_a:?} vs {run_b:?}"
);
crate::test_complete!("conformance_deterministic_replay_with_seed");
}
#[test]
fn conformance_mailbox_overflow_reject_deterministic() {
init_test("conformance_mailbox_overflow_reject_deterministic");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, Counter { count: 0 }, 2)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
server_ref.try_cast(CounterCast::Reset).unwrap();
server_ref.try_cast(CounterCast::Reset).unwrap();
let overflow = server_ref.try_cast(CounterCast::Reset);
assert!(
overflow.is_err(),
"third cast to capacity-2 mailbox must fail with Reject policy"
);
match overflow.unwrap_err() {
CastError::Full => { }
other => unreachable!("expected CastError::Full, got {other:?}"),
}
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_mailbox_overflow_reject_deterministic");
}
#[test]
fn conformance_mailbox_drop_oldest_preserves_newest() {
init_test("conformance_mailbox_drop_oldest_preserves_newest");
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(Budget::INFINITE);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, Budget::INFINITE);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, DropOldestCounter { count: 0 }, 2)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
server_ref.try_cast(TaggedCast::Set(1)).unwrap();
server_ref.try_cast(TaggedCast::Set(2)).unwrap();
server_ref.try_cast(TaggedCast::Set(100)).unwrap();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
let result_ref = handle.server_ref();
let result: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let result_clone = Arc::clone(&result);
let (ch, cs) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
if let Ok(val) = result_ref.call(&cx, CounterCall::Get).await {
*result_clone.lock() = Some(val);
}
})
.unwrap();
let cid = ch.task_id();
runtime.state.store_spawned_task(cid, cs);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(cid, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(cid, 0);
}
runtime.run_until_idle();
assert_eq!(
*result.lock(),
Some(100),
"DropOldest should evict oldest, keeping newest"
);
drop(handle);
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_mailbox_drop_oldest_preserves_newest");
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_budget_driven_call_timeout() {
struct SlowServer;
impl GenServer for SlowServer {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _proof = reply.abort();
Box::pin(async {})
}
}
init_test("conformance_budget_driven_call_timeout");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, SlowServer, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let call_result: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
let call_result_clone = Arc::clone(&call_result);
let (ch, cs) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref.call(&cx, ()).await;
*call_result_clone.lock() = Some(r);
})
.unwrap();
let client_id = ch.task_id();
runtime.state.store_spawned_task(client_id, cs);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
if let Some(ref result) = *call_result.lock() {
assert!(result.is_err(), "aborted reply should result in call error");
}
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_budget_driven_call_timeout");
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_reply_linearity_send_commits() {
struct ReplyTracker {
committed: Arc<AtomicU8>,
}
impl GenServer for ReplyTracker {
type Call = u64;
type Reply = u64;
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
request: u64,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match reply.send(request * 2) {
ReplyOutcome::Committed(_proof) => {
self.committed.store(1, Ordering::SeqCst);
}
ReplyOutcome::CallerGone => {
self.committed.store(2, Ordering::SeqCst);
}
}
Box::pin(async {})
}
}
init_test("conformance_reply_linearity_send_commits");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let committed = Arc::new(AtomicU8::new(0));
let server = ReplyTracker {
committed: Arc::clone(&committed),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let call_result: Arc<Mutex<Option<Result<u64, CallError>>>> = Arc::new(Mutex::new(None));
let call_result_clone = Arc::clone(&call_result);
let (ch, cs) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref.call(&cx, 21).await;
*call_result_clone.lock() = Some(r);
})
.unwrap();
let client_id = ch.task_id();
runtime.state.store_spawned_task(client_id, cs);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
assert_eq!(
committed.load(Ordering::SeqCst),
1,
"reply must be committed when caller is waiting"
);
{
let r = call_result.lock();
match r.as_ref() {
Some(Ok(value)) => assert_eq!(*value, 42, "21 * 2 = 42"),
other => unreachable!("expected Ok(42), got {other:?}"),
}
}
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_reply_linearity_send_commits");
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_reply_linearity_abort_is_clean() {
struct AbortServer {
aborted: Arc<AtomicU8>,
}
impl GenServer for AbortServer {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _proof = reply.abort();
self.aborted.store(1, Ordering::SeqCst);
Box::pin(async {})
}
}
init_test("conformance_reply_linearity_abort_is_clean");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let aborted = Arc::new(AtomicU8::new(0));
let server = AbortServer {
aborted: Arc::clone(&aborted),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let call_err: Arc<Mutex<Option<Result<(), CallError>>>> = Arc::new(Mutex::new(None));
let call_err_clone = Arc::clone(&call_err);
let (ch, cs) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
let r = server_ref.call(&cx, ()).await;
*call_err_clone.lock() = Some(r);
})
.unwrap();
let client_id = ch.task_id();
runtime.state.store_spawned_task(client_id, cs);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_id, 0);
}
runtime.run_until_idle();
assert_eq!(
aborted.load(Ordering::SeqCst),
1,
"server must have called abort()"
);
{
let r = call_err.lock();
match r.as_ref() {
Some(Err(_)) => { }
other => unreachable!("expected call error after abort, got {other:?}"),
}
}
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
crate::test_complete!("conformance_reply_linearity_abort_is_clean");
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_panicking_handle_call_returns_join_error_without_double_panic() {
#[derive(Debug)]
struct PanicOnCall;
impl GenServer for PanicOnCall {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
_reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
std::panic::panic_any("intentional handle_call panic");
}
}
init_test("conformance_panicking_handle_call_returns_join_error_without_double_panic");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let (mut handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, PanicOnCall, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
let (mut client_handle, client_stored) = scope
.spawn(&mut runtime.state, &cx, move |cx| async move {
server_ref.call(&cx, ()).await
})
.unwrap();
let client_task_id = client_handle.task_id();
runtime
.state
.store_spawned_task(client_task_id, client_stored);
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_task_id, 0);
}
runtime.run_until_idle();
{
let mut sched = runtime.scheduler.lock();
sched.schedule(server_task_id, 0);
sched.schedule(client_task_id, 0);
}
runtime.run_until_idle();
let join = futures_lite::future::block_on(handle.join(&cx));
assert!(
matches!(join, Err(JoinError::Panicked(_))),
"panicking call handler should surface JoinError::Panicked"
);
let client_result =
futures_lite::future::block_on(client_handle.join(&cx)).expect("client join ok");
assert!(
matches!(client_result, Err(CallError::NoReply)),
"caller should observe closed reply after panic, got {client_result:?}"
);
crate::test_complete!(
"conformance_panicking_handle_call_returns_join_error_without_double_panic"
);
}
#[test]
#[allow(clippy::items_after_statements)]
fn conformance_drain_processes_queued_casts_on_stop() {
struct AccumulatorServer {
sum: u64,
final_sum: Arc<AtomicU64>,
}
enum AccumCall {
#[allow(dead_code)]
GetSum,
}
enum AccumCast {
Add(u64),
}
impl GenServer for AccumulatorServer {
type Call = AccumCall;
type Reply = u64;
type Cast = AccumCast;
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: AccumCall,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(self.sum);
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
msg: AccumCast,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
match msg {
AccumCast::Add(n) => self.sum += n,
}
Box::pin(async {})
}
fn on_stop(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.final_sum.store(self.sum, Ordering::SeqCst);
Box::pin(async {})
}
}
init_test("conformance_drain_processes_queued_casts_on_stop");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::default());
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let final_sum = Arc::new(AtomicU64::new(0));
let server = AccumulatorServer {
sum: 0,
final_sum: Arc::clone(&final_sum),
};
let (handle, stored) = scope
.spawn_gen_server(&mut runtime.state, &cx, server, 32)
.unwrap();
let server_task_id = handle.task_id();
runtime.state.store_spawned_task(server_task_id, stored);
let server_ref = handle.server_ref();
server_ref.try_cast(AccumCast::Add(10)).unwrap();
server_ref.try_cast(AccumCast::Add(20)).unwrap();
server_ref.try_cast(AccumCast::Add(30)).unwrap();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_idle();
handle.stop();
{
runtime.scheduler.lock().schedule(server_task_id, 0);
}
runtime.run_until_quiescent();
let sum = final_sum.load(Ordering::SeqCst);
assert_eq!(
sum, 60,
"server must drain queued casts before stopping: 10+20+30=60, got {sum}"
);
crate::test_complete!("conformance_drain_processes_queued_casts_on_stop");
}
#[test]
fn named_server_register_and_whereis() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_register_and_whereis");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Counter(u64);
#[allow(clippy::items_after_statements)]
impl GenServer for Counter {
type Call = u64;
type Reply = u64;
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
request: u64,
reply: Reply<u64>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
self.0 += request;
let _ = reply.send(self.0);
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut named_handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"my_counter",
Counter(0),
32,
now,
)
.unwrap();
let task_id = named_handle.task_id();
runtime.state.store_spawned_task(task_id, stored);
assert_eq!(registry.whereis("my_counter"), Some(task_id));
assert_eq!(named_handle.name(), "my_counter");
named_handle.stop();
{
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
let release_now = runtime.state.now;
named_handle
.release_name(&mut registry, release_now)
.expect("release ok");
assert!(
registry.whereis("my_counter").is_none(),
"name must be removed once shutdown completes",
);
let (mut restarted, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"my_counter",
Counter(0),
32,
now,
)
.expect("name should be reusable after release_name");
runtime
.state
.store_spawned_task(restarted.task_id(), stored);
restarted.stop();
{
runtime.scheduler.lock().schedule(restarted.task_id(), 0);
}
runtime.run_until_quiescent();
let restart_release_now = runtime.state.now;
restarted
.release_name(&mut registry, restart_release_now)
.expect("restart cleanup ok");
crate::test_complete!("named_server_register_and_whereis");
}
#[test]
fn named_server_release_name_requires_stopped_server() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_release_name_requires_stopped_server");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"still_running",
Noop,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
assert!(
matches!(
handle.release_name(&mut registry, now),
Err(ReleaseNameError::StillRunning)
),
"release_name must fail closed while the server is still live",
);
assert_eq!(
registry.whereis("still_running"),
Some(handle.task_id()),
"premature release_name must not remove the registered name",
);
handle.stop();
assert!(
matches!(
handle.release_name(&mut registry, now),
Err(ReleaseNameError::StillRunning)
),
"release_name must keep failing closed after stop() until the task actually finishes",
);
assert_eq!(
registry.whereis("still_running"),
Some(handle.task_id()),
"release_name during shutdown drain must not remove the registered name",
);
{
runtime.scheduler.lock().schedule(handle.task_id(), 0);
}
runtime.run_until_quiescent();
let release_now = runtime.state.now;
handle
.release_name(&mut registry, release_now)
.expect("release after shutdown should succeed");
crate::test_complete!("named_server_release_name_requires_stopped_server");
}
#[test]
fn named_server_duplicate_name_rejected() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_duplicate_name_rejected");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Dummy;
#[allow(clippy::items_after_statements)]
impl GenServer for Dummy {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut h1, s1) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"singleton",
Dummy,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(h1.task_id(), s1);
let result = scope.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"singleton",
Dummy,
8,
now,
);
assert!(
matches!(result, Err(NamedSpawnError::NameTaken(_))),
"duplicate name should be rejected"
);
assert_eq!(registry.whereis("singleton"), Some(h1.task_id()));
let region_tasks = runtime.state.region(region).unwrap().task_ids();
assert_eq!(
region_tasks,
vec![h1.task_id()],
"region should only have the first task; orphaned task must be removed"
);
h1.stop();
runtime.scheduler.lock().schedule(h1.task_id(), 0);
runtime.run_until_quiescent();
let release_now = runtime.state.now;
h1.release_name(&mut registry, release_now)
.expect("release ok");
crate::test_complete!("named_server_duplicate_name_rejected");
}
#[test]
fn named_server_abort_lease_removes_name() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_abort_lease_removes_name");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"temp_name",
Noop,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
let mut alias = registry
.register("temp_alias", handle.task_id(), scope.region_id(), now)
.expect("second alias should register for same task");
assert!(registry.whereis("temp_name").is_some());
assert_eq!(registry.whereis("temp_alias"), Some(handle.task_id()));
handle.abort_lease(&mut registry, now).unwrap();
assert!(
registry.whereis("temp_name").is_none(),
"aborting the lease must remove the registry entry",
);
assert_eq!(
registry.whereis("temp_alias"),
Some(handle.task_id()),
"aborting one named handle must not drop other names owned by the same task",
);
registry
.unregister_owned_and_grant(&alias, now)
.expect("manual alias cleanup should succeed");
alias.abort().unwrap();
crate::test_complete!("named_server_abort_lease_removes_name");
}
#[test]
fn named_server_take_lease_manual_management() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_take_lease_manual_management");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop2;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop2 {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"manual_name",
Noop2,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
let mut lease = handle.take_lease().unwrap();
assert!(handle.take_lease().is_none(), "second take returns None");
assert_eq!(handle.name(), "(released)");
registry
.unregister_owned_and_grant(&lease, now)
.expect("manual lease cleanup should remove the matching name");
lease
.abort()
.expect("manual lease abort should resolve the token");
assert!(
registry.whereis("manual_name").is_none(),
"manual lifecycle management must remove the registry entry as well as resolve the token",
);
crate::test_complete!("named_server_take_lease_manual_management");
}
#[test]
fn named_server_release_name_after_take_lease_returns_already_resolved() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_release_name_after_take_lease_returns_already_resolved");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop3;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop3 {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"take_then_stop",
Noop3,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
let mut lease = handle.take_lease().expect("lease should be present");
assert_eq!(
handle.handle.state.load(),
ActorState::Created,
"taking the lease alone must not stop the server"
);
assert!(
matches!(
handle.release_name(&mut registry, now),
Err(ReleaseNameError::Lease(
crate::cx::NameLeaseError::AlreadyResolved
))
),
"release_name after take_lease must fail closed with AlreadyResolved",
);
assert_eq!(
handle.handle.state.load(),
ActorState::Created,
"failed release_name after take_lease must not mutate actor state"
);
assert_eq!(
registry.whereis("take_then_stop"),
Some(handle.task_id()),
"failed release_name after take_lease must not unregister the live name",
);
registry
.unregister_owned_and_grant(&lease, now)
.expect("manual cleanup should still be possible after failed release_name");
lease.abort().unwrap();
crate::test_complete!(
"named_server_release_name_after_take_lease_returns_already_resolved"
);
}
#[test]
fn named_server_abort_lease_after_take_lease_returns_already_resolved() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_abort_lease_after_take_lease_returns_already_resolved");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop4;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop4 {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"take_then_abort",
Noop4,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
let mut lease = handle.take_lease().expect("lease should be present");
let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
assert_eq!(
registry.whereis("take_then_abort"),
Some(handle.task_id()),
"failed abort_lease after take_lease must not unregister the live name",
);
registry
.unregister_owned_and_grant(&lease, now)
.expect("manual cleanup should still be possible after failed abort_lease");
lease.abort().unwrap();
crate::test_complete!("named_server_abort_lease_after_take_lease_returns_already_resolved");
}
#[test]
fn named_server_abort_lease_after_release_name_returns_already_resolved() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_abort_lease_after_release_name_returns_already_resolved");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop4;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop4 {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"stop_then_abort",
Noop4,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
handle.stop();
{
runtime.scheduler.lock().schedule(handle.task_id(), 0);
}
runtime.run_until_quiescent();
let release_now = runtime.state.now;
handle
.release_name(&mut registry, release_now)
.expect("initial release should succeed");
let abort_err = handle.abort_lease(&mut registry, now).unwrap_err();
assert_eq!(abort_err, crate::cx::NameLeaseError::AlreadyResolved);
assert!(
registry.whereis("stop_then_abort").is_none(),
"failed abort_lease after release_name must not mutate the registry entry",
);
crate::test_complete!(
"named_server_abort_lease_after_release_name_returns_already_resolved"
);
}
#[test]
fn named_server_release_name_preserves_other_names_on_same_task() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_server_release_name_preserves_other_names_on_same_task");
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let region = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let scope = crate::cx::Scope::<FailFast>::new(region, budget);
let mut registry = crate::cx::NameRegistry::new();
#[allow(clippy::items_after_statements)]
#[derive(Debug)]
struct Noop5;
#[allow(clippy::items_after_statements)]
impl GenServer for Noop5 {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_req: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let now = crate::types::Time::ZERO;
let (mut handle, stored) = scope
.spawn_named_gen_server(
&mut runtime.state,
&cx,
&mut registry,
"primary_name",
Noop5,
8,
now,
)
.unwrap();
runtime.state.store_spawned_task(handle.task_id(), stored);
let mut alias = registry
.register("secondary_name", handle.task_id(), scope.region_id(), now)
.expect("second alias should register for same task");
handle.stop();
{
runtime.scheduler.lock().schedule(handle.task_id(), 0);
}
runtime.run_until_quiescent();
let release_now = runtime.state.now;
handle
.release_name(&mut registry, release_now)
.expect("targeted release should succeed");
assert!(
registry.whereis("primary_name").is_none(),
"release_name must remove the targeted registry entry",
);
assert_eq!(
registry.whereis("secondary_name"),
Some(handle.task_id()),
"release_name must not remove unrelated names on the same task",
);
registry
.unregister_owned_and_grant(&alias, release_now)
.expect("manual alias cleanup should succeed");
alias.release().unwrap();
crate::test_complete!("named_server_release_name_preserves_other_names_on_same_task");
}
#[test]
#[allow(clippy::items_after_statements)]
fn named_start_helper_supervisor_stop_cleans_registry() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_start_helper_supervisor_stop_cleans_registry");
#[derive(Debug)]
struct Noop;
impl GenServer for Noop {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(42));
let root = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let child = crate::supervision::ChildSpec::new(
"svc_child",
named_gen_server_start(Arc::clone(®istry), "svc", 16, || Noop),
);
let compiled = crate::supervision::SupervisorBuilder::new("svc_supervisor")
.child(child)
.compile()
.expect("compile supervisor");
let supervisor = compiled
.spawn(&mut runtime.state, &cx, root, budget)
.expect("spawn supervisor");
assert_eq!(supervisor.started.len(), 1, "exactly one started child");
let child_task = supervisor.started[0].task_id;
assert_eq!(registry.lock().whereis("svc"), Some(child_task));
let tasks_to_schedule = runtime.state.cancel_request(
supervisor.region,
&crate::types::CancelReason::user("stop"),
None,
);
for (task_id, priority) in tasks_to_schedule {
runtime.scheduler.lock().schedule(task_id, priority);
}
runtime.run_until_quiescent();
assert!(
registry.lock().whereis("svc").is_none(),
"name must be removed after supervised stop",
);
crate::test_complete!("named_start_helper_supervisor_stop_cleans_registry");
}
#[test]
#[allow(clippy::items_after_statements)]
fn named_start_helper_crash_then_stop_cleans_registry() {
crate::test_utils::init_test_logging();
crate::test_phase!("named_start_helper_crash_then_stop_cleans_registry");
#[derive(Debug)]
struct PanicOnStart;
impl GenServer for PanicOnStart {
type Call = ();
type Reply = ();
type Cast = ();
type Info = SystemMsg;
fn on_start(&mut self, _cx: &Cx) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
std::panic::panic_any("intentional start crash for registry cleanup test");
})
}
fn handle_call(
&mut self,
_cx: &Cx,
_request: (),
reply: Reply<()>,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
let _ = reply.send(());
Box::pin(async {})
}
fn handle_cast(
&mut self,
_cx: &Cx,
_msg: (),
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let registry: Arc<parking_lot::Mutex<crate::cx::NameRegistry>> =
Arc::new(parking_lot::Mutex::new(crate::cx::NameRegistry::new()));
let budget = Budget::new().with_poll_quota(100_000);
let mut runtime = crate::lab::LabRuntime::new(crate::lab::LabConfig::new(7));
let root = runtime.state.create_root_region(budget);
let cx = Cx::for_testing();
let child = crate::supervision::ChildSpec::new(
"panic_child",
named_gen_server_start(Arc::clone(®istry), "panic_svc", 8, || PanicOnStart),
);
let compiled = crate::supervision::SupervisorBuilder::new("panic_supervisor")
.child(child)
.compile()
.expect("compile supervisor");
let supervisor = compiled
.spawn(&mut runtime.state, &cx, root, budget)
.expect("spawn supervisor");
let child_task = supervisor.started[0].task_id;
assert_eq!(registry.lock().whereis("panic_svc"), Some(child_task));
{
runtime.scheduler.lock().schedule(child_task, 0);
}
runtime.run_until_idle();
let tasks_to_schedule = runtime.state.cancel_request(
supervisor.region,
&crate::types::CancelReason::user("shutdown"),
None,
);
for (task_id, priority) in tasks_to_schedule {
runtime.scheduler.lock().schedule(task_id, priority);
}
runtime.run_until_quiescent();
assert!(
registry.lock().whereis("panic_svc").is_none(),
"name must be removed after crash + region stop",
);
crate::test_complete!("named_start_helper_crash_then_stop_cleans_registry");
}
#[test]
fn cast_overflow_policy_default_is_reject() {
let policy = CastOverflowPolicy::default();
assert!(matches!(policy, CastOverflowPolicy::Reject));
}
#[test]
fn cast_overflow_policy_debug() {
let dbg = format!("{:?}", CastOverflowPolicy::Reject);
assert!(dbg.contains("Reject"), "{dbg}");
let dbg2 = format!("{:?}", CastOverflowPolicy::DropOldest);
assert!(dbg2.contains("DropOldest"), "{dbg2}");
}
#[test]
fn cast_overflow_policy_eq_clone_copy() {
let a = CastOverflowPolicy::Reject;
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(CastOverflowPolicy::Reject, CastOverflowPolicy::DropOldest);
}
#[test]
fn down_msg_constructor_and_debug() {
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(50), rid(0), tid(51));
let msg = DownMsg::new(
Time::from_secs(7),
DownNotification {
monitored: tid(51),
reason: DownReason::Normal,
monitor_ref: mref,
},
);
assert_eq!(msg.completion_vt, Time::from_secs(7));
assert_eq!(msg.notification.monitored, tid(51));
let dbg = format!("{msg:?}");
assert!(dbg.contains("DownMsg"), "{dbg}");
}
#[test]
fn down_msg_clone() {
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(60), rid(0), tid(61));
let msg = DownMsg::new(
Time::from_secs(8),
DownNotification {
monitored: tid(61),
reason: DownReason::Normal,
monitor_ref: mref,
},
);
let cloned = msg.clone();
assert_eq!(cloned.completion_vt, msg.completion_vt);
assert_eq!(cloned.notification.monitored, msg.notification.monitored);
}
#[test]
fn exit_msg_constructor_and_eq() {
let a = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
let b = ExitMsg::new(Time::from_secs(5), tid(10), DownReason::Normal);
assert_eq!(a, b);
}
#[test]
fn exit_msg_debug_and_clone() {
let msg = ExitMsg::new(
Time::from_secs(6),
tid(11),
DownReason::Error("oops".into()),
);
let dbg = format!("{msg:?}");
assert!(dbg.contains("ExitMsg"), "{dbg}");
let cloned = msg.clone();
assert_eq!(cloned, msg);
}
#[test]
fn exit_msg_inequality() {
let a = ExitMsg::new(Time::from_secs(1), tid(1), DownReason::Normal);
let b = ExitMsg::new(Time::from_secs(2), tid(1), DownReason::Normal);
assert_ne!(a, b);
}
#[test]
fn timeout_msg_constructor_eq_copy() {
let a = TimeoutMsg::new(Time::from_secs(10), 42);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_eq!(a.tick_vt, Time::from_secs(10));
assert_eq!(a.id, 42);
}
#[test]
fn timeout_msg_debug() {
let msg = TimeoutMsg::new(Time::from_secs(1), 99);
let dbg = format!("{msg:?}");
assert!(dbg.contains("TimeoutMsg"), "{dbg}");
}
#[test]
fn timeout_msg_inequality() {
let a = TimeoutMsg::new(Time::from_secs(1), 1);
let b = TimeoutMsg::new(Time::from_secs(1), 2);
assert_ne!(a, b);
}
#[test]
fn system_msg_debug_all_variants() {
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(70), rid(0), tid(71));
let down = SystemMsg::Down {
completion_vt: Time::from_secs(1),
notification: DownNotification {
monitored: tid(71),
reason: DownReason::Normal,
monitor_ref: mref,
},
};
let exit = SystemMsg::Exit {
exit_vt: Time::from_secs(2),
from: tid(72),
reason: DownReason::Normal,
};
let timeout = SystemMsg::Timeout {
tick_vt: Time::from_secs(3),
id: 55,
};
let d = format!("{down:?}");
assert!(d.contains("Down"), "{d}");
let e = format!("{exit:?}");
assert!(e.contains("Exit"), "{e}");
let t = format!("{timeout:?}");
assert!(t.contains("Timeout"), "{t}");
}
#[test]
fn system_msg_clone() {
let msg = SystemMsg::Timeout {
tick_vt: Time::from_secs(5),
id: 7,
};
let cloned = msg.clone();
assert_eq!(cloned.sort_key(), msg.sort_key());
}
#[test]
fn system_msg_convenience_constructors() {
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(80), rid(0), tid(81));
let down_payload = DownMsg::new(
Time::from_secs(1),
DownNotification {
monitored: tid(81),
reason: DownReason::Normal,
monitor_ref: mref,
},
);
let msg = SystemMsg::down(down_payload);
assert!(matches!(msg, SystemMsg::Down { .. }));
let exit_payload = ExitMsg::new(Time::from_secs(2), tid(82), DownReason::Normal);
let msg = SystemMsg::exit(exit_payload);
assert!(matches!(msg, SystemMsg::Exit { .. }));
let timeout_payload = TimeoutMsg::new(Time::from_secs(3), 44);
let msg = SystemMsg::timeout(timeout_payload);
assert!(matches!(msg, SystemMsg::Timeout { .. }));
}
#[test]
fn system_msg_sort_key_kind_rank_ordering() {
let mut monitors = crate::monitor::MonitorSet::new();
let mref = monitors.establish(tid(85), rid(0), tid(86));
let same_vt = Time::from_secs(100);
let down = SystemMsg::Down {
completion_vt: same_vt,
notification: DownNotification {
monitored: tid(86),
reason: DownReason::Normal,
monitor_ref: mref,
},
};
let exit = SystemMsg::Exit {
exit_vt: same_vt,
from: tid(86),
reason: DownReason::Normal,
};
let timeout = SystemMsg::Timeout {
tick_vt: same_vt,
id: 1,
};
assert!(down.sort_key() < exit.sort_key());
assert!(exit.sort_key() < timeout.sort_key());
}
#[test]
fn system_msg_subject_key_debug_eq_ord() {
let a = SystemMsgSubjectKey::Task(tid(1));
let b = SystemMsgSubjectKey::Task(tid(1));
let c = SystemMsgSubjectKey::TimeoutId(1);
assert_eq!(a, b);
assert_ne!(a, c);
let dbg = format!("{a:?}");
assert!(dbg.contains("Task"), "{dbg}");
let dbg2 = format!("{c:?}");
assert!(dbg2.contains("TimeoutId"), "{dbg2}");
let copied = a;
let cloned = a;
assert_eq!(copied, cloned);
assert!(a <= b);
}
#[test]
fn system_msg_batch_default_and_empty() {
let batch = SystemMsgBatch::new();
let sorted = batch.into_sorted();
assert!(sorted.is_empty());
}
#[test]
fn system_msg_batch_debug() {
let batch = SystemMsgBatch::new();
let dbg = format!("{batch:?}");
assert!(dbg.contains("SystemMsgBatch"), "{dbg}");
}
#[test]
fn system_msg_batch_single_element() {
let mut batch = SystemMsgBatch::new();
batch.push(SystemMsg::Timeout {
tick_vt: Time::from_secs(42),
id: 1,
});
let sorted = batch.into_sorted();
assert_eq!(sorted.len(), 1);
assert!(matches!(sorted[0], SystemMsg::Timeout { id: 1, .. }));
}
#[test]
fn call_error_display_server_stopped() {
let err = CallError::ServerStopped;
let disp = format!("{err}");
assert_eq!(disp, "GenServer has stopped");
}
#[test]
fn call_error_display_no_reply() {
let err = CallError::NoReply;
let disp = format!("{err}");
assert_eq!(disp, "GenServer did not reply");
}
#[test]
fn call_error_display_cancelled() {
let reason = CancelReason::user("test cancel");
let err = CallError::Cancelled(reason);
let disp = format!("{err}");
assert!(disp.contains("cancelled"), "{disp}");
}
#[test]
fn call_error_debug_all_variants() {
let dbg1 = format!("{:?}", CallError::ServerStopped);
assert!(dbg1.contains("ServerStopped"), "{dbg1}");
let dbg2 = format!("{:?}", CallError::NoReply);
assert!(dbg2.contains("NoReply"), "{dbg2}");
let dbg3 = format!("{:?}", CallError::Cancelled(CancelReason::user("x")));
assert!(dbg3.contains("Cancelled"), "{dbg3}");
}
#[test]
fn call_error_is_std_error() {
let err = CallError::ServerStopped;
let _: &dyn std::error::Error = &err;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn cast_error_display_all_variants() {
assert_eq!(
format!("{}", CastError::ServerStopped),
"GenServer has stopped"
);
assert_eq!(format!("{}", CastError::Full), "GenServer mailbox full");
let cancelled = CastError::Cancelled(CancelReason::user("t"));
let disp = format!("{cancelled}");
assert!(disp.contains("cancelled"), "{disp}");
}
#[test]
fn cast_error_debug_all_variants() {
let dbg1 = format!("{:?}", CastError::ServerStopped);
assert!(dbg1.contains("ServerStopped"), "{dbg1}");
let dbg2 = format!("{:?}", CastError::Full);
assert!(dbg2.contains("Full"), "{dbg2}");
}
#[test]
fn cast_error_is_std_error() {
let err = CastError::Full;
let _: &dyn std::error::Error = &err;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn info_error_display_all_variants() {
assert_eq!(
format!("{}", InfoError::ServerStopped),
"GenServer has stopped"
);
assert_eq!(format!("{}", InfoError::Full), "GenServer mailbox full");
let cancelled = InfoError::Cancelled(CancelReason::user("u"));
let disp = format!("{cancelled}");
assert!(disp.contains("cancelled"), "{disp}");
}
#[test]
fn info_error_debug_all_variants() {
let dbg1 = format!("{:?}", InfoError::ServerStopped);
assert!(dbg1.contains("ServerStopped"), "{dbg1}");
let dbg2 = format!("{:?}", InfoError::Full);
assert!(dbg2.contains("Full"), "{dbg2}");
}
#[test]
fn info_error_is_std_error() {
let err = InfoError::ServerStopped;
let _: &dyn std::error::Error = &err;
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn system_msg_try_from_exit_rejects_timeout() {
let timeout = SystemMsg::Timeout {
tick_vt: Time::from_secs(1),
id: 7,
};
let err = ExitMsg::try_from(timeout).expect_err("timeout is not exit");
assert!(matches!(err, SystemMsg::Timeout { id: 7, .. }));
}
#[test]
fn system_msg_try_from_exit_succeeds() {
let exit = SystemMsg::Exit {
exit_vt: Time::from_secs(3),
from: tid(15),
reason: DownReason::Normal,
};
let result = ExitMsg::try_from(exit).expect("exit conversion");
assert_eq!(result.exit_vt, Time::from_secs(3));
assert_eq!(result.from, tid(15));
}
#[test]
fn system_msg_try_from_timeout_rejects_exit() {
let exit = SystemMsg::Exit {
exit_vt: Time::from_secs(1),
from: tid(1),
reason: DownReason::Normal,
};
let err = TimeoutMsg::try_from(exit).expect_err("exit is not timeout");
assert!(matches!(err, SystemMsg::Exit { .. }));
}
}