use crate::actor::ActorId;
use crate::supervision::{EscalationPolicy, RestartPolicy};
use crate::types::{RegionId, Time};
use std::collections::{HashMap, HashSet};
use std::fmt;
#[derive(Debug, Clone)]
pub struct ActorLeakViolation {
pub region: RegionId,
pub leaked_actors: Vec<ActorId>,
pub region_close_time: Time,
}
impl fmt::Display for ActorLeakViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Region {:?} closed at {:?} with {} leaked actor(s): {:?}",
self.region,
self.region_close_time,
self.leaked_actors.len(),
self.leaked_actors
)
}
}
impl std::error::Error for ActorLeakViolation {}
#[derive(Debug, Default)]
pub struct ActorLeakOracle {
actors_by_region: HashMap<RegionId, HashSet<ActorId>>,
stopped_actors: HashMap<ActorId, Time>,
region_closes: HashMap<RegionId, Time>,
}
impl ActorLeakOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn on_spawn(&mut self, actor: ActorId, region: RegionId, _time: Time) {
self.actors_by_region
.entry(region)
.or_default()
.insert(actor);
}
pub fn on_stop(&mut self, actor: ActorId, time: Time) {
self.stopped_actors
.entry(actor)
.and_modify(|t| {
if time < *t {
*t = time;
}
})
.or_insert(time);
}
pub fn on_region_close(&mut self, region: RegionId, time: Time) {
self.region_closes.insert(region, time);
}
pub fn check(&self, _now: Time) -> Result<(), ActorLeakViolation> {
let mut sorted_regions: Vec<_> = self.region_closes.iter().collect();
sorted_regions.sort_by_key(|&(®ion, _)| region);
for (®ion, &close_time) in sorted_regions {
let Some(actors) = self.actors_by_region.get(®ion) else {
continue; };
let mut leaked: Vec<_> = actors
.iter()
.copied()
.filter(|actor| {
self.stopped_actors
.get(actor)
.is_none_or(|t| *t > close_time)
})
.collect();
leaked.sort();
if !leaked.is_empty() {
return Err(ActorLeakViolation {
region,
leaked_actors: leaked,
region_close_time: close_time,
});
}
}
Ok(())
}
pub fn reset(&mut self) {
self.actors_by_region.clear();
self.stopped_actors.clear();
self.region_closes.clear();
}
#[must_use]
pub fn actor_count(&self) -> usize {
self.actors_by_region.values().map(HashSet::len).sum()
}
#[must_use]
pub fn stopped_count(&self) -> usize {
self.stopped_actors.len()
}
#[must_use]
pub fn closed_region_count(&self) -> usize {
self.region_closes.len()
}
}
#[derive(Debug, Clone)]
pub struct SupervisionViolation {
pub kind: SupervisionViolationKind,
pub supervisor: ActorId,
pub child: Option<ActorId>,
pub time: Time,
}
#[derive(Debug, Clone)]
pub enum SupervisionViolationKind {
RestartLimitExceeded {
restarts: u32,
max_restarts: u32,
expected_escalation: EscalationPolicy,
},
OneForAllNotFollowed {
failed_actor: ActorId,
unrestarted_siblings: Vec<ActorId>,
},
RestForOneNotFollowed {
failed_actor: ActorId,
unrestarted_successors: Vec<ActorId>,
},
EscalationNotPropagated {
reason: String,
},
}
impl fmt::Display for SupervisionViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
SupervisionViolationKind::RestartLimitExceeded {
restarts,
max_restarts,
expected_escalation,
} => {
write!(
f,
"Supervisor {:?} exceeded restart limit ({}/{}) at {:?}, expected {:?}",
self.supervisor, restarts, max_restarts, self.time, expected_escalation
)
}
SupervisionViolationKind::OneForAllNotFollowed {
failed_actor,
unrestarted_siblings,
} => {
write!(
f,
"Supervisor {:?}: OneForAll not followed for {:?}, siblings {:?} not restarted",
self.supervisor, failed_actor, unrestarted_siblings
)
}
SupervisionViolationKind::RestForOneNotFollowed {
failed_actor,
unrestarted_successors,
} => {
write!(
f,
"Supervisor {:?}: RestForOne not followed for {:?}, successors {:?} not restarted",
self.supervisor, failed_actor, unrestarted_successors
)
}
SupervisionViolationKind::EscalationNotPropagated { reason } => {
write!(
f,
"Supervisor {:?}: escalation not propagated at {:?}: {}",
self.supervisor, self.time, reason
)
}
}
}
}
impl std::error::Error for SupervisionViolation {}
#[derive(Debug, Clone)]
struct ChildFailure {
parent: ActorId,
child: ActorId,
time: Time,
#[allow(dead_code)] reason: String,
}
#[derive(Debug, Clone)]
struct RestartEvent {
actor: ActorId,
attempt: u32,
time: Time,
}
#[derive(Debug, Clone)]
struct EscalationEvent {
from: ActorId,
_to: ActorId,
time: Time,
_reason: String,
}
#[derive(Debug, Clone)]
struct SupervisorConfig {
restart_policy: RestartPolicy,
max_restarts: u32,
escalation_policy: EscalationPolicy,
children: Vec<ActorId>,
}
#[derive(Debug, Default)]
pub struct SupervisionOracle {
supervisors: HashMap<ActorId, SupervisorConfig>,
failures: Vec<ChildFailure>,
restarts: Vec<RestartEvent>,
escalations: Vec<EscalationEvent>,
violations: Vec<SupervisionViolation>,
}
impl SupervisionOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register_supervisor(
&mut self,
supervisor: ActorId,
restart_policy: RestartPolicy,
max_restarts: u32,
escalation_policy: EscalationPolicy,
) {
self.supervisors.insert(
supervisor,
SupervisorConfig {
restart_policy,
max_restarts,
escalation_policy,
children: Vec::new(),
},
);
}
pub fn register_child(&mut self, supervisor: ActorId, child: ActorId) {
if let Some(config) = self.supervisors.get_mut(&supervisor) {
config.children.push(child);
}
}
pub fn on_child_failed(&mut self, parent: ActorId, child: ActorId, time: Time, reason: String) {
self.failures.push(ChildFailure {
parent,
child,
time,
reason,
});
}
pub fn on_restart(&mut self, actor: ActorId, attempt: u32, time: Time) {
self.restarts.push(RestartEvent {
actor,
attempt,
time,
});
}
pub fn on_escalation(&mut self, from: ActorId, to: ActorId, time: Time, reason: String) {
self.escalations.push(EscalationEvent {
from,
_to: to,
time,
_reason: reason,
});
}
pub fn check(&self, _now: Time) -> Result<(), SupervisionViolation> {
for failure in &self.failures {
if let Some(config) = self.supervisors.get(&failure.parent) {
let next_failure_time = self.next_failure_time(failure.parent, failure.time);
let restart_count =
self.restart_attempt_in_window(failure.child, failure.time, next_failure_time);
let escalated =
self.escalated_in_window(failure.parent, failure.time, next_failure_time);
if restart_count > config.max_restarts {
if !escalated && config.escalation_policy != EscalationPolicy::Stop {
return Err(SupervisionViolation {
kind: SupervisionViolationKind::RestartLimitExceeded {
restarts: restart_count,
max_restarts: config.max_restarts,
expected_escalation: config.escalation_policy,
},
supervisor: failure.parent,
child: Some(failure.child),
time: failure.time,
});
}
continue; }
if escalated {
continue; }
if config.restart_policy == RestartPolicy::OneForAll {
let siblings: Vec<_> = config
.children
.iter()
.filter(|&&c| c != failure.child)
.copied()
.collect();
let unrestarted: Vec<_> = siblings
.iter()
.filter(|&&s| !self.restarted_in_window(s, failure.time, next_failure_time))
.copied()
.collect();
if !unrestarted.is_empty() {
return Err(SupervisionViolation {
kind: SupervisionViolationKind::OneForAllNotFollowed {
failed_actor: failure.child,
unrestarted_siblings: unrestarted,
},
supervisor: failure.parent,
child: Some(failure.child),
time: failure.time,
});
}
}
if config.restart_policy == RestartPolicy::RestForOne {
let child_idx = config.children.iter().position(|&c| c == failure.child);
if let Some(idx) = child_idx {
let successors: Vec<_> = config.children[idx + 1..].to_vec();
let unrestarted: Vec<_> = successors
.iter()
.filter(|&&s| {
!self.restarted_in_window(s, failure.time, next_failure_time)
})
.copied()
.collect();
if !unrestarted.is_empty() {
return Err(SupervisionViolation {
kind: SupervisionViolationKind::RestForOneNotFollowed {
failed_actor: failure.child,
unrestarted_successors: unrestarted,
},
supervisor: failure.parent,
child: Some(failure.child),
time: failure.time,
});
}
}
}
}
}
if let Some(violation) = self.violations.first() {
return Err(violation.clone());
}
Ok(())
}
pub fn reset(&mut self) {
self.supervisors.clear();
self.failures.clear();
self.restarts.clear();
self.escalations.clear();
self.violations.clear();
}
#[must_use]
pub fn failure_count(&self) -> usize {
self.failures.len()
}
#[must_use]
pub fn restart_count(&self) -> usize {
self.restarts.len()
}
#[must_use]
pub fn escalation_count(&self) -> usize {
self.escalations.len()
}
fn next_failure_time(&self, parent: ActorId, failure_time: Time) -> Option<Time> {
self.failures
.iter()
.filter(|failure| failure.parent == parent && failure.time > failure_time)
.map(|failure| failure.time)
.min()
}
fn event_in_failure_window(
event_time: Time,
failure_time: Time,
next_failure_time: Option<Time>,
) -> bool {
event_time >= failure_time
&& next_failure_time.is_none_or(|next_time| event_time < next_time)
}
fn restart_attempt_in_window(
&self,
actor: ActorId,
failure_time: Time,
next_failure_time: Option<Time>,
) -> u32 {
self.restarts
.iter()
.filter(|restart| {
restart.actor == actor
&& Self::event_in_failure_window(restart.time, failure_time, next_failure_time)
})
.map(|restart| restart.attempt)
.max()
.unwrap_or(0)
}
fn restarted_in_window(
&self,
actor: ActorId,
failure_time: Time,
next_failure_time: Option<Time>,
) -> bool {
self.restart_attempt_in_window(actor, failure_time, next_failure_time) > 0
}
fn escalated_in_window(
&self,
supervisor: ActorId,
failure_time: Time,
next_failure_time: Option<Time>,
) -> bool {
self.escalations.iter().any(|escalation| {
escalation.from == supervisor
&& Self::event_in_failure_window(escalation.time, failure_time, next_failure_time)
})
}
}
#[derive(Debug, Clone)]
pub struct MailboxViolation {
pub kind: MailboxViolationKind,
pub actor: ActorId,
pub time: Time,
}
#[derive(Debug, Clone)]
pub enum MailboxViolationKind {
CapacityExceeded {
current: usize,
capacity: usize,
},
MessageLost {
sent: u64,
received: u64,
},
BackpressureNotApplied,
}
impl fmt::Display for MailboxViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
MailboxViolationKind::CapacityExceeded { current, capacity } => {
write!(
f,
"Actor {:?} mailbox capacity exceeded: {}/{} at {:?}",
self.actor, current, capacity, self.time
)
}
MailboxViolationKind::MessageLost { sent, received } => {
write!(
f,
"Actor {:?} lost messages: {} sent, {} received at {:?}",
self.actor, sent, received, self.time
)
}
MailboxViolationKind::BackpressureNotApplied => {
write!(
f,
"Actor {:?} backpressure not applied when mailbox full at {:?}",
self.actor, self.time
)
}
}
}
}
impl std::error::Error for MailboxViolation {}
#[derive(Debug, Default)]
struct MailboxStats {
capacity: usize,
backpressure_enabled: bool,
current_size: usize,
total_sent: u64,
total_received: u64,
high_water_mark: usize,
stopped_at: Option<Time>,
}
#[derive(Debug, Default)]
pub struct MailboxOracle {
mailboxes: HashMap<ActorId, MailboxStats>,
violations: Vec<MailboxViolation>,
}
impl MailboxOracle {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn configure_mailbox(&mut self, actor: ActorId, capacity: usize, backpressure: bool) {
self.mailboxes.insert(
actor,
MailboxStats {
capacity,
backpressure_enabled: backpressure,
..Default::default()
},
);
}
pub fn on_send(&mut self, actor: ActorId, time: Time) {
let stats = self.mailboxes.entry(actor).or_default();
stats.total_sent += 1;
stats.current_size += 1;
if stats.current_size > stats.high_water_mark {
stats.high_water_mark = stats.current_size;
}
if stats.capacity > 0 && stats.current_size > stats.capacity {
self.violations.push(MailboxViolation {
kind: MailboxViolationKind::CapacityExceeded {
current: stats.current_size,
capacity: stats.capacity,
},
actor,
time,
});
}
}
pub fn on_receive(&mut self, actor: ActorId, _time: Time) {
let stats = self.mailboxes.entry(actor).or_default();
stats.total_received += 1;
stats.current_size = stats.current_size.saturating_sub(1);
}
pub fn on_stop(&mut self, actor: ActorId, time: Time) {
let stats = self.mailboxes.entry(actor).or_default();
stats.stopped_at = Some(time);
}
pub fn on_backpressure(&mut self, actor: ActorId, applied: bool, time: Time) {
let stats = self.mailboxes.entry(actor).or_default();
if stats.backpressure_enabled && !applied && stats.current_size >= stats.capacity {
self.violations.push(MailboxViolation {
kind: MailboxViolationKind::BackpressureNotApplied,
actor,
time,
});
}
}
pub fn check(&self, now: Time) -> Result<(), MailboxViolation> {
if let Some(violation) = self.violations.first() {
return Err(violation.clone());
}
let mut sorted_actors: Vec<_> = self.mailboxes.iter().collect();
sorted_actors.sort_by_key(|&(&actor, _)| actor);
for (&actor, stats) in sorted_actors {
if stats.stopped_at.is_some()
&& (stats.current_size != 0 || stats.total_sent != stats.total_received)
{
return Err(MailboxViolation {
kind: MailboxViolationKind::MessageLost {
sent: stats.total_sent,
received: stats.total_received,
},
actor,
time: now,
});
}
if stats.current_size == 0 && stats.total_sent != stats.total_received {
return Err(MailboxViolation {
kind: MailboxViolationKind::MessageLost {
sent: stats.total_sent,
received: stats.total_received,
},
actor,
time: now,
});
}
}
Ok(())
}
pub fn reset(&mut self) {
self.mailboxes.clear();
self.violations.clear();
}
#[must_use]
pub fn stats(&self, actor: ActorId) -> Option<(u64, u64, usize)> {
self.mailboxes
.get(&actor)
.map(|s| (s.total_sent, s.total_received, s.high_water_mark))
}
#[must_use]
pub fn mailbox_count(&self) -> usize {
self.mailboxes.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::TaskId;
use crate::util::ArenaIndex;
fn actor(n: u32) -> ActorId {
ActorId::from_task(TaskId::from_arena(ArenaIndex::new(n, 0)))
}
fn region(n: u32) -> RegionId {
RegionId::from_arena(ArenaIndex::new(n, 0))
}
fn t(nanos: u64) -> Time {
Time::from_nanos(nanos)
}
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
mod actor_leak {
use super::*;
#[test]
fn no_actors_passes() {
init_test("no_actors_passes");
let oracle = ActorLeakOracle::new();
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("no_actors_passes");
}
#[test]
fn all_actors_stopped_passes() {
init_test("all_actors_stopped_passes");
let mut oracle = ActorLeakOracle::new();
oracle.on_spawn(actor(1), region(0), t(10));
oracle.on_spawn(actor(2), region(0), t(20));
oracle.on_stop(actor(1), t(50));
oracle.on_stop(actor(2), t(60));
oracle.on_region_close(region(0), t(100));
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("all_actors_stopped_passes");
}
#[test]
fn leaked_actor_fails() {
init_test("leaked_actor_fails");
let mut oracle = ActorLeakOracle::new();
oracle.on_spawn(actor(1), region(0), t(10));
oracle.on_spawn(actor(2), region(0), t(20));
oracle.on_stop(actor(1), t(50));
oracle.on_region_close(region(0), t(100));
let result = oracle.check(t(100));
let err = result.is_err();
crate::assert_with_log!(err, "err", true, err);
let violation = result.unwrap_err();
crate::assert_with_log!(
violation.region == region(0),
"region",
region(0),
violation.region
);
crate::assert_with_log!(
violation.leaked_actors == vec![actor(2)],
"leaked_actors",
vec![actor(2)],
violation.leaked_actors
);
crate::test_complete!("leaked_actor_fails");
}
#[test]
fn reset_clears_state() {
init_test("reset_clears_state");
let mut oracle = ActorLeakOracle::new();
oracle.on_spawn(actor(1), region(0), t(10));
oracle.on_region_close(region(0), t(100));
let err = oracle.check(t(100)).is_err();
crate::assert_with_log!(err, "err", true, err);
oracle.reset();
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("reset_clears_state");
}
}
mod supervision {
use super::*;
#[test]
fn no_failures_passes() {
init_test("no_failures_passes");
let oracle = SupervisionOracle::new();
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("no_failures_passes");
}
#[test]
fn restart_within_limit_passes() {
init_test("restart_within_limit_passes");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::OneForOne,
3,
EscalationPolicy::Escalate,
);
oracle.register_child(actor(0), actor(1));
oracle.on_child_failed(actor(0), actor(1), t(10), "error".into());
oracle.on_restart(actor(1), 1, t(20));
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("restart_within_limit_passes");
}
#[test]
fn restart_limit_escalation_from_supervisor_passes() {
init_test("restart_limit_escalation_from_supervisor_passes");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::OneForOne,
1,
EscalationPolicy::Escalate,
);
oracle.register_child(actor(0), actor(1));
oracle.on_child_failed(actor(0), actor(1), t(10), "error".into());
oracle.on_restart(actor(1), 2, t(20));
oracle.on_escalation(actor(0), actor(9), t(30), "restart limit".into());
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("restart_limit_escalation_from_supervisor_passes");
}
#[test]
fn later_escalation_does_not_mask_prior_one_for_all_violation() {
init_test("later_escalation_does_not_mask_prior_one_for_all_violation");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::OneForAll,
1,
EscalationPolicy::Escalate,
);
oracle.register_child(actor(0), actor(1));
oracle.register_child(actor(0), actor(2));
oracle.register_child(actor(0), actor(3));
oracle.on_child_failed(actor(0), actor(2), t(10), "first failure".into());
oracle.on_restart(actor(2), 1, t(20));
oracle.on_child_failed(actor(0), actor(3), t(30), "second failure".into());
oracle.on_restart(actor(3), 2, t(40));
oracle.on_escalation(actor(0), actor(9), t(50), "restart limit".into());
let violation = oracle
.check(t(100))
.expect_err("earlier OneForAll violation must still surface");
let kind_matches = matches!(
violation.kind,
SupervisionViolationKind::OneForAllNotFollowed {
failed_actor,
ref unrestarted_siblings,
} if failed_actor == actor(2)
&& unrestarted_siblings == &vec![actor(1), actor(3)]
);
crate::assert_with_log!(
kind_matches,
"kind_matches",
true,
format!("{:?}", violation.kind)
);
crate::test_complete!("later_escalation_does_not_mask_prior_one_for_all_violation");
}
#[test]
fn later_restart_does_not_mask_prior_rest_for_one_violation() {
init_test("later_restart_does_not_mask_prior_rest_for_one_violation");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::RestForOne,
2,
EscalationPolicy::Stop,
);
oracle.register_child(actor(0), actor(1));
oracle.register_child(actor(0), actor(2));
oracle.register_child(actor(0), actor(3));
oracle.register_child(actor(0), actor(4));
oracle.on_child_failed(actor(0), actor(2), t(10), "first failure".into());
oracle.on_restart(actor(2), 1, t(20));
oracle.on_child_failed(actor(0), actor(4), t(30), "second failure".into());
oracle.on_restart(actor(4), 1, t(40));
let violation = oracle
.check(t(100))
.expect_err("earlier RestForOne violation must still surface");
let kind_matches = matches!(
violation.kind,
SupervisionViolationKind::RestForOneNotFollowed {
failed_actor,
ref unrestarted_successors,
} if failed_actor == actor(2)
&& unrestarted_successors == &vec![actor(3), actor(4)]
);
crate::assert_with_log!(
kind_matches,
"kind_matches",
true,
format!("{:?}", violation.kind)
);
crate::test_complete!("later_restart_does_not_mask_prior_rest_for_one_violation");
}
#[test]
fn one_for_all_siblings_restarted_passes() {
init_test("one_for_all_siblings_restarted_passes");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::OneForAll,
3,
EscalationPolicy::Stop,
);
oracle.register_child(actor(0), actor(1));
oracle.register_child(actor(0), actor(2));
oracle.register_child(actor(0), actor(3));
oracle.on_child_failed(actor(0), actor(2), t(10), "error".into());
oracle.on_restart(actor(1), 1, t(20));
oracle.on_restart(actor(2), 1, t(20));
oracle.on_restart(actor(3), 1, t(20));
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("one_for_all_siblings_restarted_passes");
}
#[test]
fn rest_for_one_successors_restarted_passes() {
init_test("rest_for_one_successors_restarted_passes");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::RestForOne,
3,
EscalationPolicy::Stop,
);
oracle.register_child(actor(0), actor(1));
oracle.register_child(actor(0), actor(2));
oracle.register_child(actor(0), actor(3));
oracle.on_child_failed(actor(0), actor(2), t(10), "error".into());
oracle.on_restart(actor(2), 1, t(20));
oracle.on_restart(actor(3), 1, t(20));
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("rest_for_one_successors_restarted_passes");
}
#[test]
fn reset_clears_state() {
init_test("supervision_reset_clears_state");
let mut oracle = SupervisionOracle::new();
oracle.register_supervisor(
actor(0),
RestartPolicy::OneForOne,
1,
EscalationPolicy::Escalate,
);
oracle.on_child_failed(actor(0), actor(1), t(10), "error".into());
oracle.reset();
let count = oracle.failure_count();
crate::assert_with_log!(count == 0, "failure_count", 0, count);
crate::test_complete!("supervision_reset_clears_state");
}
}
mod mailbox {
use super::*;
#[test]
fn no_messages_passes() {
init_test("no_messages_passes");
let oracle = MailboxOracle::new();
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("no_messages_passes");
}
#[test]
fn balanced_send_receive_passes() {
init_test("balanced_send_receive_passes");
let mut oracle = MailboxOracle::new();
oracle.configure_mailbox(actor(1), 10, true);
oracle.on_send(actor(1), t(10));
oracle.on_send(actor(1), t(20));
oracle.on_receive(actor(1), t(30));
oracle.on_receive(actor(1), t(40));
let ok = oracle.check(t(100)).is_ok();
crate::assert_with_log!(ok, "ok", true, ok);
crate::test_complete!("balanced_send_receive_passes");
}
#[test]
fn capacity_exceeded_fails() {
init_test("capacity_exceeded_fails");
let mut oracle = MailboxOracle::new();
oracle.configure_mailbox(actor(1), 2, false);
oracle.on_send(actor(1), t(10));
oracle.on_send(actor(1), t(20));
oracle.on_send(actor(1), t(30));
let result = oracle.check(t(100));
let err = result.is_err();
crate::assert_with_log!(err, "err", true, err);
crate::test_complete!("capacity_exceeded_fails");
}
#[test]
fn tracks_high_water_mark() {
init_test("tracks_high_water_mark");
let mut oracle = MailboxOracle::new();
oracle.configure_mailbox(actor(1), 10, true);
oracle.on_send(actor(1), t(10));
oracle.on_send(actor(1), t(20));
oracle.on_send(actor(1), t(30));
oracle.on_receive(actor(1), t(40));
oracle.on_receive(actor(1), t(50));
oracle.on_receive(actor(1), t(60));
let stats = oracle.stats(actor(1));
let hwm = stats.map_or(0, |(_, _, h)| h);
crate::assert_with_log!(hwm == 3, "high_water_mark", 3, hwm);
crate::test_complete!("tracks_high_water_mark");
}
#[test]
fn reset_clears_state() {
init_test("mailbox_reset_clears_state");
let mut oracle = MailboxOracle::new();
oracle.configure_mailbox(actor(1), 10, true);
oracle.on_send(actor(1), t(10));
oracle.reset();
let count = oracle.mailbox_count();
crate::assert_with_log!(count == 0, "mailbox_count", 0, count);
crate::test_complete!("mailbox_reset_clears_state");
}
#[test]
fn stopped_with_pending_messages_fails() {
init_test("stopped_with_pending_messages_fails");
let mut oracle = MailboxOracle::new();
oracle.configure_mailbox(actor(1), 10, true);
oracle.on_send(actor(1), t(10));
oracle.on_stop(actor(1), t(20));
let result = oracle.check(t(100));
let err = result.is_err();
crate::assert_with_log!(err, "err", true, err);
let violation = result.unwrap_err();
match violation.kind {
MailboxViolationKind::MessageLost { sent, received } => {
crate::assert_with_log!(sent == 1, "sent", 1, sent);
crate::assert_with_log!(received == 0, "received", 0, received);
}
other => {
crate::assert_with_log!(false, "kind", "MessageLost", other);
}
}
crate::test_complete!("stopped_with_pending_messages_fails");
}
}
}