use std::collections::BTreeMap;
use std::hash::Hasher;
use std::sync::Arc;
use std::time::Duration;
use crate::runtime::{RegionCreateError, RuntimeState, SpawnError};
use crate::types::{Budget, CancelReason, Outcome, RegionId, TaskId, Time};
#[derive(Clone, Eq, Ord, PartialOrd)]
pub struct ChildName(Arc<str>);
impl ChildName {
pub fn new(name: impl Into<Arc<str>>) -> Self {
Self(name.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
#[must_use]
pub fn strong_count(&self) -> usize {
Arc::strong_count(&self.0)
}
}
impl std::ops::Deref for ChildName {
type Target = str;
fn deref(&self) -> &str {
&self.0
}
}
impl AsRef<str> for ChildName {
fn as_ref(&self) -> &str {
&self.0
}
}
impl std::borrow::Borrow<str> for ChildName {
fn borrow(&self) -> &str {
&self.0
}
}
impl std::hash::Hash for ChildName {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
(*self.0).hash(state);
}
}
impl PartialEq for ChildName {
fn eq(&self, other: &Self) -> bool {
*self.0 == *other.0
}
}
impl PartialEq<str> for ChildName {
fn eq(&self, other: &str) -> bool {
&*self.0 == other
}
}
impl PartialEq<&str> for ChildName {
fn eq(&self, other: &&str) -> bool {
&*self.0 == *other
}
}
impl PartialEq<String> for ChildName {
fn eq(&self, other: &String) -> bool {
&*self.0 == other.as_str()
}
}
impl PartialEq<ChildName> for str {
fn eq(&self, other: &ChildName) -> bool {
self == &*other.0
}
}
impl PartialEq<ChildName> for &str {
fn eq(&self, other: &ChildName) -> bool {
*self == &*other.0
}
}
impl PartialEq<ChildName> for String {
fn eq(&self, other: &ChildName) -> bool {
self.as_str() == &*other.0
}
}
impl From<&str> for ChildName {
fn from(s: &str) -> Self {
Self(Arc::from(s))
}
}
impl From<String> for ChildName {
fn from(s: String) -> Self {
Self(Arc::from(s))
}
}
impl From<Arc<str>> for ChildName {
fn from(s: Arc<str>) -> Self {
Self(s)
}
}
impl std::fmt::Debug for ChildName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", &*self.0)
}
}
impl std::fmt::Display for ChildName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum SupervisionStrategy {
#[default]
Stop,
Restart(RestartConfig),
Escalate,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RestartConfig {
pub max_restarts: u32,
pub window: Duration,
pub backoff: BackoffStrategy,
pub restart_cost: u64,
pub min_remaining_for_restart: Option<Duration>,
pub min_polls_for_restart: u32,
}
impl Default for RestartConfig {
fn default() -> Self {
Self {
max_restarts: 3,
window: Duration::from_mins(1),
backoff: BackoffStrategy::default(),
restart_cost: 0,
min_remaining_for_restart: None,
min_polls_for_restart: 0,
}
}
}
impl RestartConfig {
#[must_use]
pub fn new(max_restarts: u32, window: Duration) -> Self {
Self {
max_restarts,
window,
backoff: BackoffStrategy::default(),
restart_cost: 0,
min_remaining_for_restart: None,
min_polls_for_restart: 0,
}
}
#[must_use]
pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
self.backoff = backoff;
self
}
#[must_use]
pub fn with_restart_cost(mut self, cost: u64) -> Self {
self.restart_cost = cost;
self
}
#[must_use]
pub fn with_min_remaining(mut self, min: Duration) -> Self {
self.min_remaining_for_restart = Some(min);
self
}
#[must_use]
pub fn with_min_polls(mut self, min_polls: u32) -> Self {
self.min_polls_for_restart = min_polls;
self
}
}
#[derive(Debug, Clone)]
pub enum BackoffStrategy {
None,
Fixed(Duration),
Exponential {
initial: Duration,
max: Duration,
multiplier: f64,
},
}
impl PartialEq for BackoffStrategy {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::None, Self::None) => true,
(Self::Fixed(a), Self::Fixed(b)) => a == b,
(
Self::Exponential {
initial: i1,
max: m1,
multiplier: mul1,
},
Self::Exponential {
initial: i2,
max: m2,
multiplier: mul2,
},
) => i1 == i2 && m1 == m2 && mul1.to_bits() == mul2.to_bits(),
_ => false,
}
}
}
impl Default for BackoffStrategy {
fn default() -> Self {
Self::Exponential {
initial: Duration::from_millis(100),
max: Duration::from_secs(10),
multiplier: 2.0,
}
}
}
impl Eq for BackoffStrategy {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
pub enum RestartPolicy {
#[default]
OneForOne,
OneForAll,
RestForOne,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum EscalationPolicy {
#[default]
Stop,
Escalate,
ResetCounter,
}
#[derive(Debug, Clone, PartialEq)]
pub struct SupervisionConfig {
pub restart_policy: RestartPolicy,
pub max_restarts: u32,
pub restart_window: Duration,
pub backoff: BackoffStrategy,
pub escalation: EscalationPolicy,
pub storm_threshold: Option<f64>,
}
impl Default for SupervisionConfig {
fn default() -> Self {
Self {
restart_policy: RestartPolicy::OneForOne,
max_restarts: 3,
restart_window: Duration::from_mins(1),
backoff: BackoffStrategy::default(),
escalation: EscalationPolicy::Stop,
storm_threshold: None,
}
}
}
impl SupervisionConfig {
#[must_use]
pub fn new(max_restarts: u32, restart_window: Duration) -> Self {
Self {
restart_policy: RestartPolicy::OneForOne,
max_restarts,
restart_window,
backoff: BackoffStrategy::default(),
escalation: EscalationPolicy::Stop,
storm_threshold: None,
}
}
#[must_use]
pub fn with_storm_threshold(mut self, threshold: f64) -> Self {
validate_storm_threshold(threshold);
self.storm_threshold = Some(threshold);
self
}
#[must_use]
pub fn with_restart_policy(mut self, policy: RestartPolicy) -> Self {
self.restart_policy = policy;
self
}
#[must_use]
pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self {
self.backoff = backoff;
self
}
#[must_use]
pub fn with_escalation(mut self, escalation: EscalationPolicy) -> Self {
self.escalation = escalation;
self
}
#[must_use]
pub fn one_for_all(max_restarts: u32, restart_window: Duration) -> Self {
Self::new(max_restarts, restart_window).with_restart_policy(RestartPolicy::OneForAll)
}
#[must_use]
pub fn rest_for_one(max_restarts: u32, restart_window: Duration) -> Self {
Self::new(max_restarts, restart_window).with_restart_policy(RestartPolicy::RestForOne)
}
#[must_use]
pub fn restart_tracker(&self) -> RestartTracker {
let restart = RestartConfig::new(self.max_restarts, self.restart_window)
.with_backoff(self.backoff.clone());
let mut tracker_config = RestartTrackerConfig::from_restart(restart);
if let Some(threshold) = self.storm_threshold {
tracker_config = tracker_config.with_storm_detection(threshold);
}
RestartTracker::new(tracker_config)
}
}
impl Eq for SupervisionConfig {}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum NameRegistrationPolicy {
#[default]
None,
Register {
name: String,
collision: NameCollisionPolicy,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum NameCollisionPolicy {
#[default]
Fail,
Replace,
Wait,
}
pub trait ChildStart: Send {
fn start(
&mut self,
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError>;
}
impl<F> ChildStart for F
where
F: FnMut(
&crate::cx::Scope<'static, crate::types::policy::FailFast>,
&mut RuntimeState,
&crate::cx::Cx,
) -> Result<TaskId, SpawnError>
+ Send,
{
fn start(
&mut self,
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
(self)(scope, state, cx)
}
}
pub struct ChildSpec {
pub name: ChildName,
pub start: Box<dyn ChildStart>,
pub restart: SupervisionStrategy,
pub shutdown_budget: Budget,
pub depends_on: Vec<ChildName>,
pub registration: NameRegistrationPolicy,
pub start_immediately: bool,
pub required: bool,
}
impl std::fmt::Debug for ChildSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChildSpec")
.field("name", &self.name)
.field("restart", &self.restart)
.field("shutdown_budget", &self.shutdown_budget)
.field("depends_on", &self.depends_on)
.field("registration", &self.registration)
.field("start_immediately", &self.start_immediately)
.field("required", &self.required)
.finish_non_exhaustive()
}
}
impl ChildSpec {
pub fn new<F>(name: impl Into<ChildName>, start: F) -> Self
where
F: ChildStart + 'static,
{
Self {
name: name.into(),
start: Box::new(start),
restart: SupervisionStrategy::default(),
shutdown_budget: Budget::INFINITE,
depends_on: Vec::new(),
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
}
}
#[must_use]
pub fn with_restart(mut self, restart: SupervisionStrategy) -> Self {
self.restart = restart;
self
}
#[must_use]
pub fn with_shutdown_budget(mut self, budget: Budget) -> Self {
self.shutdown_budget = budget;
self
}
#[must_use]
pub fn depends_on(mut self, name: impl Into<ChildName>) -> Self {
self.depends_on.push(name.into());
self
}
#[must_use]
pub fn with_registration(mut self, policy: NameRegistrationPolicy) -> Self {
self.registration = policy;
self
}
#[must_use]
pub fn with_start_immediately(mut self, start: bool) -> Self {
self.start_immediately = start;
self
}
#[must_use]
pub fn with_required(mut self, required: bool) -> Self {
self.required = required;
self
}
#[must_use]
pub fn spec_eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.restart == other.restart
&& self.shutdown_budget == other.shutdown_budget
&& self.depends_on == other.depends_on
&& self.registration == other.registration
&& self.start_immediately == other.start_immediately
&& self.required == other.required
}
#[must_use]
pub fn spec_fingerprint(&self) -> u64 {
let mut hasher = crate::util::DetHasher::default();
hash_child_spec_fields(self, &mut hasher);
std::hash::Hasher::finish(&hasher)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum StartTieBreak {
#[default]
InsertionOrder,
NameLex,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SupervisorCompileError {
DuplicateChildName(ChildName),
UnknownDependency {
child: ChildName,
depends_on: ChildName,
},
DeferredDependency {
child: ChildName,
depends_on: ChildName,
},
CycleDetected {
remaining: Vec<ChildName>,
},
}
impl std::fmt::Display for SupervisorCompileError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateChildName(name) => write!(f, "duplicate child name: {name}"),
Self::UnknownDependency { child, depends_on } => {
write!(f, "child {child} depends on unknown child {depends_on}")
}
Self::DeferredDependency { child, depends_on } => {
write!(
f,
"child {child} is start_immediately but depends on deferred child {depends_on}"
)
}
Self::CycleDetected { remaining } => {
write!(f, "dependency cycle detected among children: ")?;
for (i, name) in remaining.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{name}")?;
}
Ok(())
}
}
}
}
impl std::error::Error for SupervisorCompileError {}
#[derive(Debug)]
pub enum SupervisorSpawnError {
RegionCreate(RegionCreateError),
ChildStartFailed {
child: ChildName,
err: SpawnError,
region: RegionId,
},
DependencyUnavailable {
child: ChildName,
dependency: ChildName,
dependency_error: Option<SpawnError>,
region: RegionId,
},
}
impl std::fmt::Display for SupervisorSpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RegionCreate(e) => write!(f, "supervisor region create failed: {e}"),
Self::ChildStartFailed {
child, err, region, ..
} => {
write!(
f,
"child start failed: child={child} region={region:?} err={err}"
)
}
Self::DependencyUnavailable {
child,
dependency,
dependency_error,
region,
} => match dependency_error {
Some(err) => write!(
f,
"child start blocked: child={child} dependency={dependency} region={region:?} cause={err}"
),
None => write!(
f,
"child start blocked: child={child} dependency={dependency} region={region:?}"
),
},
}
}
}
impl std::error::Error for SupervisorSpawnError {}
impl From<RegionCreateError> for SupervisorSpawnError {
fn from(value: RegionCreateError) -> Self {
Self::RegionCreate(value)
}
}
#[derive(Debug)]
pub struct SupervisorBuilder {
name: ChildName,
budget: Option<Budget>,
tie_break: StartTieBreak,
restart_policy: RestartPolicy,
children: Vec<ChildSpec>,
}
impl SupervisorBuilder {
#[must_use]
pub fn new(name: impl Into<ChildName>) -> Self {
Self {
name: name.into(),
budget: None,
tie_break: StartTieBreak::InsertionOrder,
restart_policy: RestartPolicy::OneForOne,
children: Vec::new(),
}
}
#[must_use]
pub fn with_budget(mut self, budget: Budget) -> Self {
self.budget = Some(budget);
self
}
#[must_use]
pub fn with_tie_break(mut self, tie_break: StartTieBreak) -> Self {
self.tie_break = tie_break;
self
}
#[must_use]
pub fn with_restart_policy(mut self, restart_policy: RestartPolicy) -> Self {
self.restart_policy = restart_policy;
self
}
#[must_use]
pub fn child(mut self, child: ChildSpec) -> Self {
self.children.push(child);
self
}
#[must_use]
pub fn spec_eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.budget == other.budget
&& self.tie_break == other.tie_break
&& self.restart_policy == other.restart_policy
&& self.children.len() == other.children.len()
&& self
.children
.iter()
.zip(other.children.iter())
.all(|(left, right)| left.spec_eq(right))
}
#[must_use]
pub fn spec_fingerprint(&self) -> u64 {
let mut hasher = crate::util::DetHasher::default();
hasher.write(self.name.as_str().as_bytes());
hash_budget_option(&mut hasher, self.budget);
hash_start_tie_break(&mut hasher, self.tie_break);
hash_restart_policy(&mut hasher, self.restart_policy);
hasher.write_u64(self.children.len() as u64);
for child in &self.children {
hash_child_spec_fields(child, &mut hasher);
}
std::hash::Hasher::finish(&hasher)
}
pub fn compile(self) -> Result<CompiledSupervisor, SupervisorCompileError> {
CompiledSupervisor::new(self)
}
}
fn hash_child_spec_fields(spec: &ChildSpec, hasher: &mut crate::util::DetHasher) {
hasher.write(spec.name.as_str().as_bytes());
hash_supervision_strategy(hasher, &spec.restart);
hash_budget(hasher, spec.shutdown_budget);
hasher.write_u64(spec.depends_on.len() as u64);
for dep in &spec.depends_on {
hasher.write(dep.as_str().as_bytes());
}
hash_registration_policy(hasher, &spec.registration);
hasher.write_u8(u8::from(spec.start_immediately));
hasher.write_u8(u8::from(spec.required));
}
fn hash_budget_option(hasher: &mut crate::util::DetHasher, budget: Option<Budget>) {
match budget {
Some(value) => {
hasher.write_u8(1);
hash_budget(hasher, value);
}
None => hasher.write_u8(0),
}
}
fn hash_budget(hasher: &mut crate::util::DetHasher, budget: Budget) {
match budget.deadline {
Some(deadline) => {
hasher.write_u8(1);
hasher.write_u64(deadline.as_nanos());
}
None => hasher.write_u8(0),
}
hasher.write_u32(budget.poll_quota);
match budget.cost_quota {
Some(cost) => {
hasher.write_u8(1);
hasher.write_u64(cost);
}
None => hasher.write_u8(0),
}
hasher.write_u8(budget.priority);
}
fn hash_supervision_strategy(hasher: &mut crate::util::DetHasher, strategy: &SupervisionStrategy) {
match strategy {
SupervisionStrategy::Stop => hasher.write_u8(0),
SupervisionStrategy::Restart(config) => {
hasher.write_u8(1);
hash_restart_config(hasher, config);
}
SupervisionStrategy::Escalate => hasher.write_u8(2),
}
}
fn duration_nanos_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
}
fn hash_restart_config(hasher: &mut crate::util::DetHasher, config: &RestartConfig) {
hasher.write_u32(config.max_restarts);
hasher.write_u64(duration_nanos_u64(config.window));
hash_backoff_strategy(hasher, &config.backoff);
hasher.write_u64(config.restart_cost);
match config.min_remaining_for_restart {
Some(value) => {
hasher.write_u8(1);
hasher.write_u64(duration_nanos_u64(value));
}
None => hasher.write_u8(0),
}
hasher.write_u32(config.min_polls_for_restart);
}
fn hash_backoff_strategy(hasher: &mut crate::util::DetHasher, strategy: &BackoffStrategy) {
match strategy {
BackoffStrategy::None => hasher.write_u8(0),
BackoffStrategy::Fixed(value) => {
hasher.write_u8(1);
hasher.write_u64(duration_nanos_u64(*value));
}
BackoffStrategy::Exponential {
initial,
max,
multiplier,
} => {
hasher.write_u8(2);
hasher.write_u64(duration_nanos_u64(*initial));
hasher.write_u64(duration_nanos_u64(*max));
hasher.write_u64(multiplier.to_bits());
}
}
}
fn hash_registration_policy(hasher: &mut crate::util::DetHasher, policy: &NameRegistrationPolicy) {
match policy {
NameRegistrationPolicy::None => hasher.write_u8(0),
NameRegistrationPolicy::Register { name, collision } => {
hasher.write_u8(1);
hasher.write(name.as_bytes());
hash_collision_policy(hasher, *collision);
}
}
}
fn hash_collision_policy(hasher: &mut crate::util::DetHasher, policy: NameCollisionPolicy) {
match policy {
NameCollisionPolicy::Fail => hasher.write_u8(0),
NameCollisionPolicy::Replace => hasher.write_u8(1),
NameCollisionPolicy::Wait => hasher.write_u8(2),
}
}
fn hash_restart_policy(hasher: &mut crate::util::DetHasher, policy: RestartPolicy) {
match policy {
RestartPolicy::OneForOne => hasher.write_u8(0),
RestartPolicy::OneForAll => hasher.write_u8(1),
RestartPolicy::RestForOne => hasher.write_u8(2),
}
}
fn hash_start_tie_break(hasher: &mut crate::util::DetHasher, tie_break: StartTieBreak) {
match tie_break {
StartTieBreak::InsertionOrder => hasher.write_u8(0),
StartTieBreak::NameLex => hasher.write_u8(1),
}
}
#[derive(Debug)]
pub struct CompiledSupervisor {
pub name: ChildName,
pub budget: Option<Budget>,
pub tie_break: StartTieBreak,
pub restart_policy: RestartPolicy,
pub children: Vec<ChildSpec>,
pub start_order: Vec<usize>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SupervisorRestartPlan {
pub policy: RestartPolicy,
pub cancel_order: Vec<ChildName>,
pub restart_order: Vec<ChildName>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RegionOp {
CancelChild {
name: ChildName,
shutdown_budget: Budget,
},
DrainChild {
name: ChildName,
shutdown_budget: Budget,
},
RestartChild {
name: ChildName,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompiledRestartOps {
pub policy: RestartPolicy,
pub ops: Vec<RegionOp>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ReadyKey {
name: ChildName,
idx: usize,
}
impl Ord for ReadyKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.name
.cmp(&other.name)
.then_with(|| self.idx.cmp(&other.idx))
}
}
impl PartialOrd for ReadyKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl CompiledSupervisor {
fn new(builder: SupervisorBuilder) -> Result<Self, SupervisorCompileError> {
let mut name_to_idx = std::collections::HashMap::<ChildName, usize>::new();
for (idx, child) in builder.children.iter().enumerate() {
if name_to_idx.insert(child.name.clone(), idx).is_some() {
return Err(SupervisorCompileError::DuplicateChildName(
child.name.clone(),
));
}
}
let mut indeg = vec![0usize; builder.children.len()];
let mut out = vec![Vec::<usize>::new(); builder.children.len()];
for (idx, child) in builder.children.iter().enumerate() {
for dep in &child.depends_on {
let Some(&dep_idx) = name_to_idx.get(dep) else {
return Err(SupervisorCompileError::UnknownDependency {
child: child.name.clone(),
depends_on: dep.clone(),
});
};
if child.start_immediately && !builder.children[dep_idx].start_immediately {
return Err(SupervisorCompileError::DeferredDependency {
child: child.name.clone(),
depends_on: dep.clone(),
});
}
indeg[idx] += 1;
out[dep_idx].push(idx);
}
}
let mut ready = std::collections::BTreeSet::<ReadyKey>::new();
for (idx, child) in builder.children.iter().enumerate() {
if indeg[idx] == 0 {
ready.insert(ReadyKey {
name: child.name.clone(),
idx,
});
}
}
let mut order = Vec::with_capacity(builder.children.len());
while let Some(next) = match builder.tie_break {
StartTieBreak::InsertionOrder => ready
.iter()
.min_by(|a, b| a.idx.cmp(&b.idx).then_with(|| a.name.cmp(&b.name)))
.cloned(),
StartTieBreak::NameLex => ready.iter().next().cloned(),
} {
ready.take(&next);
order.push(next.idx);
for &succ in &out[next.idx] {
indeg[succ] = indeg[succ].saturating_sub(1);
if indeg[succ] == 0 {
ready.insert(ReadyKey {
name: builder.children[succ].name.clone(),
idx: succ,
});
}
}
}
if order.len() != builder.children.len() {
let mut remaining = Vec::new();
for (idx, child) in builder.children.iter().enumerate() {
if indeg[idx] > 0 {
remaining.push(child.name.clone());
}
}
remaining.sort();
return Err(SupervisorCompileError::CycleDetected { remaining });
}
Ok(Self {
name: builder.name,
budget: builder.budget,
tie_break: builder.tie_break,
restart_policy: builder.restart_policy,
children: builder.children,
start_order: order,
})
}
#[must_use]
pub fn restart_plan_for(&self, failed_child: &str) -> Option<SupervisorRestartPlan> {
let failed_idx = self
.children
.iter()
.enumerate()
.find_map(|(idx, child)| (child.name == failed_child).then_some(idx))?;
self.restart_plan_for_idx(failed_idx)
}
#[must_use]
pub fn child_start_pos(&self, child_name: &str) -> Option<usize> {
let child_idx = self
.children
.iter()
.enumerate()
.find_map(|(idx, child)| (child.name == child_name).then_some(idx))?;
self.start_pos_for_child_idx(child_idx)
}
#[must_use]
pub fn child_start_order_names(&self) -> Vec<&str> {
self.start_order
.iter()
.map(|&idx| self.children[idx].name.as_str())
.collect()
}
#[must_use]
pub fn child_stop_order_names(&self) -> Vec<&str> {
self.start_order
.iter()
.rev()
.map(|&idx| self.children[idx].name.as_str())
.collect()
}
#[must_use]
fn start_pos_for_child_idx(&self, child_idx: usize) -> Option<usize> {
self.start_order.iter().position(|&idx| idx == child_idx)
}
#[must_use]
pub fn restart_plan_for_failure<E>(
&self,
failed_child: &str,
outcome: &Outcome<(), E>,
) -> Option<SupervisorRestartPlan> {
let failed_idx = self
.children
.iter()
.enumerate()
.find_map(|(idx, child)| (child.name == failed_child).then_some(idx))?;
if !matches!(outcome, Outcome::Err(_)) {
return None;
}
match self.children[failed_idx].restart {
SupervisionStrategy::Restart(_) => self.restart_plan_for_failure_idx(failed_idx),
SupervisionStrategy::Stop | SupervisionStrategy::Escalate => None,
}
}
#[must_use]
fn affected_positions_for_idx(&self, failed_child_idx: usize) -> Option<Vec<usize>> {
let failed_pos = self.start_pos_for_child_idx(failed_child_idx)?;
let total = self.start_order.len();
let affected_positions = match self.restart_policy {
RestartPolicy::OneForOne => vec![failed_pos],
RestartPolicy::OneForAll => (0..total).collect(),
RestartPolicy::RestForOne => (failed_pos..total).collect(),
}
.into_iter()
.filter(|&pos| {
let child_idx = self.start_order[pos];
let child = &self.children[child_idx];
child.start_immediately || child_idx == failed_child_idx
})
.collect::<Vec<_>>();
(!affected_positions.is_empty()).then_some(affected_positions)
}
#[must_use]
fn restart_plan_for_failure_idx(
&self,
failed_child_idx: usize,
) -> Option<SupervisorRestartPlan> {
let affected_positions = self.affected_positions_for_idx(failed_child_idx)?;
let mut cancel_order = Vec::with_capacity(affected_positions.len());
for &pos in affected_positions.iter().rev() {
cancel_order.push(self.children[self.start_order[pos]].name.clone());
}
let child_index_by_name = self
.children
.iter()
.enumerate()
.map(|(idx, child)| (child.name.as_str(), idx))
.collect::<std::collections::HashMap<_, _>>();
let mut affected_children = vec![false; self.children.len()];
for &pos in &affected_positions {
affected_children[self.start_order[pos]] = true;
}
let mut scheduled_restart = vec![false; self.children.len()];
let mut restart_order = Vec::with_capacity(affected_positions.len());
for &pos in &affected_positions {
let child_idx = self.start_order[pos];
let child = &self.children[child_idx];
if !matches!(child.restart, SupervisionStrategy::Restart(_)) {
continue;
}
let dependencies_restartable = child.depends_on.iter().all(|dependency| {
let dep_idx = *child_index_by_name
.get(dependency.as_str())
.expect("compiled supervisor dependency index missing");
!affected_children[dep_idx] || scheduled_restart[dep_idx]
});
if !dependencies_restartable {
continue;
}
scheduled_restart[child_idx] = true;
restart_order.push(child.name.clone());
}
Some(SupervisorRestartPlan {
policy: self.restart_policy,
cancel_order,
restart_order,
})
}
#[must_use]
fn restart_plan_for_idx(&self, failed_child_idx: usize) -> Option<SupervisorRestartPlan> {
let affected_positions = self.affected_positions_for_idx(failed_child_idx)?;
let mut cancel_order = Vec::with_capacity(affected_positions.len());
let mut restart_order = Vec::with_capacity(affected_positions.len());
for &pos in affected_positions.iter().rev() {
cancel_order.push(self.children[self.start_order[pos]].name.clone());
}
for &pos in &affected_positions {
restart_order.push(self.children[self.start_order[pos]].name.clone());
}
Some(SupervisorRestartPlan {
policy: self.restart_policy,
cancel_order,
restart_order,
})
}
#[must_use]
pub fn compile_restart_ops(&self, plan: &SupervisorRestartPlan) -> CompiledRestartOps {
let child_by_name =
|name: &str| -> Option<&ChildSpec> { self.children.iter().find(|c| c.name == name) };
let mut ops = Vec::with_capacity(plan.cancel_order.len() * 2 + plan.restart_order.len());
for name in &plan.cancel_order {
let budget = child_by_name(name).map_or(Budget::INFINITE, |c| c.shutdown_budget);
ops.push(RegionOp::CancelChild {
name: name.clone(),
shutdown_budget: budget,
});
}
for name in &plan.cancel_order {
let budget = child_by_name(name).map_or(Budget::INFINITE, |c| c.shutdown_budget);
ops.push(RegionOp::DrainChild {
name: name.clone(),
shutdown_budget: budget,
});
}
for name in &plan.restart_order {
ops.push(RegionOp::RestartChild { name: name.clone() });
}
CompiledRestartOps {
policy: plan.policy,
ops,
}
}
pub fn spawn(
mut self,
state: &mut RuntimeState,
cx: &crate::cx::Cx,
parent_region: RegionId,
parent_budget: Budget,
) -> Result<SupervisorHandle, SupervisorSpawnError> {
let budget = self.budget.unwrap_or(parent_budget);
let region = state.create_child_region(parent_region, budget)?;
let effective_budget = state
.region(region)
.map_or(budget, crate::record::RegionRecord::budget);
let scope: crate::cx::Scope<'static, crate::types::policy::FailFast> =
crate::cx::Scope::<crate::types::policy::FailFast>::new(region, effective_budget);
#[derive(Clone)]
enum BootState {
NotStarted,
Deferred,
Started,
Failed(SpawnError),
DependencyUnavailable {
dependency_error: Option<SpawnError>,
},
}
fn abort_supervisor_boot(state: &mut RuntimeState, region: RegionId) {
let _ = state.cancel_request(region, &crate::types::CancelReason::shutdown(), None);
if let Some(r) = state.region(region) {
r.begin_close(None);
}
state.advance_region_state(region);
}
let child_index_by_name = self
.children
.iter()
.enumerate()
.map(|(idx, child)| (child.name.clone(), idx))
.collect::<std::collections::HashMap<_, _>>();
let mut boot_states = vec![BootState::NotStarted; self.children.len()];
let mut started = Vec::new();
for &idx in &self.start_order {
let (child_name, child_required, child_dependencies, start_immediately) = {
let child = &self.children[idx];
(
child.name.clone(),
child.required,
child.depends_on.clone(),
child.start_immediately,
)
};
if !start_immediately {
boot_states[idx] = BootState::Deferred;
continue;
}
let dependency_unavailable = child_dependencies.iter().find_map(|dependency| {
let dep_idx = *child_index_by_name
.get(dependency)
.expect("compiled supervisor dependency index missing");
match &boot_states[dep_idx] {
BootState::Started => None,
BootState::Failed(err) => Some((dependency.clone(), Some(err.clone()))),
BootState::DependencyUnavailable { dependency_error } => {
Some((dependency.clone(), dependency_error.clone()))
}
BootState::NotStarted | BootState::Deferred => Some((dependency.clone(), None)),
}
});
if let Some((dependency, dependency_error)) = dependency_unavailable {
cx.trace("supervisor_child_start_blocked_dependency");
if child_required {
abort_supervisor_boot(state, region);
return Err(SupervisorSpawnError::DependencyUnavailable {
child: child_name,
dependency,
dependency_error,
region,
});
}
boot_states[idx] = BootState::DependencyUnavailable { dependency_error };
continue;
}
let child = &mut self.children[idx];
match child.start.start(&scope, state, cx) {
Ok(task_id) => started.push(StartedChild {
name: child_name.clone(),
task_id,
}),
Err(err) => {
boot_states[idx] = BootState::Failed(err.clone());
cx.trace("supervisor_child_start_failed");
if child_required {
abort_supervisor_boot(state, region);
return Err(SupervisorSpawnError::ChildStartFailed {
child: child_name,
err,
region,
});
}
}
}
if matches!(boot_states[idx], BootState::NotStarted) {
boot_states[idx] = BootState::Started;
}
}
Ok(SupervisorHandle {
name: self.name,
region,
started,
})
}
}
#[derive(Debug)]
pub struct SupervisorHandle {
pub name: ChildName,
pub region: RegionId,
pub started: Vec<StartedChild>,
}
#[derive(Debug)]
pub struct StartedChild {
pub name: ChildName,
pub task_id: TaskId,
}
impl BackoffStrategy {
#[must_use]
pub fn delay_for_attempt(&self, attempt: u32) -> Option<Duration> {
match self {
Self::None => None,
Self::Fixed(d) => Some(*d),
Self::Exponential {
initial,
max,
multiplier,
} => {
let safe_multiplier = if multiplier.is_finite() && *multiplier >= 0.0 {
*multiplier
} else {
2.0
};
#[allow(clippy::cast_precision_loss)]
let exp = i32::try_from(attempt).unwrap_or(30).min(30);
let base_secs = initial.as_secs_f64() * safe_multiplier.powi(exp);
let safe_secs = if base_secs.is_finite() && base_secs >= 0.0 {
base_secs
} else {
max.as_secs_f64()
};
let delay = Duration::from_secs_f64(safe_secs.min(max.as_secs_f64()));
Some(delay)
}
}
}
}
#[derive(Debug, Clone)]
pub struct RestartHistory {
restarts: Vec<u64>, config: RestartConfig,
}
impl RestartHistory {
#[must_use]
pub fn new(config: RestartConfig) -> Self {
Self {
restarts: Vec::new(),
config,
}
}
#[must_use]
pub fn can_restart(&self, now: u64) -> bool {
let window_nanos = duration_nanos_u64(self.config.window);
let cutoff = now.saturating_sub(window_nanos);
let recent_count = self.restarts.iter().filter(|&&t| t >= cutoff).count();
recent_count < self.config.max_restarts as usize
}
pub fn record_restart(&mut self, now: u64) {
let window_nanos = duration_nanos_u64(self.config.window);
let cutoff = now.saturating_sub(window_nanos);
self.restarts.retain(|&t| t >= cutoff);
self.restarts.push(now);
}
#[must_use]
pub fn recent_restart_count(&self, now: u64) -> usize {
let window_nanos = duration_nanos_u64(self.config.window);
let cutoff = now.saturating_sub(window_nanos);
self.restarts.iter().filter(|&&t| t >= cutoff).count()
}
#[must_use]
pub fn next_delay(&self, now: u64) -> Option<Duration> {
let attempt = self.recent_restart_count(now) as u32;
self.config.backoff.delay_for_attempt(attempt)
}
#[must_use]
pub fn config(&self) -> &RestartConfig {
&self.config
}
pub fn can_restart_with_budget(&self, now: u64, budget: &Budget) -> Result<(), BudgetRefusal> {
if !self.can_restart(now) {
return Err(BudgetRefusal::WindowExhausted {
max_restarts: self.config.max_restarts,
window: self.config.window,
});
}
if self.config.restart_cost > 0 {
if let Some(remaining) = budget.cost_quota {
if remaining < self.config.restart_cost {
return Err(BudgetRefusal::InsufficientCost {
required: self.config.restart_cost,
remaining,
});
}
}
}
if let Some(min_remaining) = self.config.min_remaining_for_restart {
if let Some(deadline) = budget.deadline {
let now_time = crate::types::id::Time::from_nanos(now);
let remaining = budget.remaining_time(now_time);
match remaining {
None => {
return Err(BudgetRefusal::DeadlineTooClose {
min_required: min_remaining,
remaining: Duration::ZERO,
});
}
Some(rem) if rem < min_remaining => {
return Err(BudgetRefusal::DeadlineTooClose {
min_required: min_remaining,
remaining: rem,
});
}
_ => {} }
let _ = deadline;
}
}
if self.config.min_polls_for_restart > 0
&& budget.poll_quota < self.config.min_polls_for_restart
{
return Err(BudgetRefusal::InsufficientPolls {
min_required: self.config.min_polls_for_restart,
remaining: budget.poll_quota,
});
}
Ok(())
}
#[must_use]
pub fn intensity(&self, now: u64) -> f64 {
let count = self.recent_restart_count(now);
if count == 0 {
return 0.0;
}
let window_secs = self.config.window.as_secs_f64();
if window_secs <= 0.0 {
return 0.0;
}
#[allow(clippy::cast_precision_loss)]
let intensity = count as f64 / window_secs;
intensity
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum BudgetRefusal {
WindowExhausted {
max_restarts: u32,
window: Duration,
},
InsufficientCost {
required: u64,
remaining: u64,
},
DeadlineTooClose {
min_required: Duration,
remaining: Duration,
},
InsufficientPolls {
min_required: u32,
remaining: u32,
},
}
impl Eq for BudgetRefusal {}
impl std::fmt::Display for BudgetRefusal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WindowExhausted {
max_restarts,
window,
} => write!(
f,
"restart window exhausted: {max_restarts} restarts in {window:?}"
),
Self::InsufficientCost {
required,
remaining,
} => write!(
f,
"insufficient cost budget: need {required}, have {remaining}"
),
Self::DeadlineTooClose {
min_required,
remaining,
} => write!(
f,
"deadline too close: need {min_required:?} remaining, have {remaining:?}"
),
Self::InsufficientPolls {
min_required,
remaining,
} => write!(
f,
"insufficient poll budget: need {min_required}, have {remaining}"
),
}
}
}
impl std::error::Error for BudgetRefusal {}
#[derive(Debug, Clone)]
pub struct RestartIntensityWindow {
timestamps: Vec<u64>,
window: Duration,
storm_threshold: f64,
}
impl RestartIntensityWindow {
#[must_use]
pub fn new(window: Duration, storm_threshold: f64) -> Self {
validate_storm_threshold(storm_threshold);
Self {
timestamps: Vec::new(),
window,
storm_threshold,
}
}
pub fn record(&mut self, now: u64) {
let window_nanos = duration_nanos_u64(self.window);
let cutoff = now.saturating_sub(window_nanos);
self.timestamps.retain(|&t| t >= cutoff);
self.timestamps.push(now);
}
#[must_use]
pub fn intensity(&self, now: u64) -> f64 {
let window_nanos = duration_nanos_u64(self.window);
let cutoff = now.saturating_sub(window_nanos);
let count = self.timestamps.iter().filter(|&&t| t >= cutoff).count();
if count == 0 {
return 0.0;
}
let window_secs = self.window.as_secs_f64();
if window_secs <= 0.0 {
return 0.0;
}
#[allow(clippy::cast_precision_loss)]
let intensity = count as f64 / window_secs;
intensity
}
#[must_use]
pub fn is_storm(&self, now: u64) -> bool {
self.intensity(now) > self.storm_threshold
}
#[must_use]
pub fn count(&self, now: u64) -> usize {
let window_nanos = duration_nanos_u64(self.window);
let cutoff = now.saturating_sub(window_nanos);
self.timestamps.iter().filter(|&&t| t >= cutoff).count()
}
#[must_use]
pub fn storm_threshold(&self) -> f64 {
self.storm_threshold
}
#[must_use]
pub fn window(&self) -> Duration {
self.window
}
}
#[derive(Debug, Clone, Copy)]
pub struct StormMonitorConfig {
pub alpha: f64,
pub expected_rate: f64,
pub min_observations: u64,
pub tolerance: f64,
}
impl Default for StormMonitorConfig {
fn default() -> Self {
Self {
alpha: 0.01,
expected_rate: 0.05, min_observations: 3,
tolerance: 1.2,
}
}
}
#[derive(Debug)]
pub struct RestartStormMonitor {
config: StormMonitorConfig,
e_value: f64,
threshold: f64,
observations: u64,
log_e_value: f64,
peak_e_value: f64,
alert_count: u64,
}
impl RestartStormMonitor {
#[must_use]
pub fn new(config: StormMonitorConfig) -> Self {
assert!(
config.alpha > 0.0 && config.alpha < 1.0,
"alpha must be in (0, 1), got {}",
config.alpha
);
assert!(
config.expected_rate > 0.0,
"expected_rate must be > 0, got {}",
config.expected_rate
);
assert!(
config.tolerance >= 1.0,
"tolerance must be >= 1.0, got {}",
config.tolerance
);
let threshold = 1.0 / config.alpha;
Self {
config,
e_value: 1.0,
threshold,
observations: 0,
log_e_value: 0.0,
peak_e_value: 1.0,
alert_count: 0,
}
}
pub fn observe_intensity(&mut self, intensity: f64) -> crate::obligation::eprocess::AlertState {
let was_alert = self.is_alert();
self.observations += 1;
let ratio = intensity / self.config.expected_rate;
let normalizer = self.config.tolerance;
let lr = ratio.max(1.0) / normalizer;
self.log_e_value += lr.ln();
if self.log_e_value < 0.0 {
self.log_e_value = 0.0;
}
self.e_value = self.log_e_value.exp();
if self.e_value > self.peak_e_value {
self.peak_e_value = self.e_value;
}
if !was_alert
&& self.e_value >= self.threshold
&& self.observations >= self.config.min_observations
{
self.alert_count += 1;
}
self.alert_state()
}
pub fn observe_from_window(
&mut self,
window: &RestartIntensityWindow,
now: u64,
) -> crate::obligation::eprocess::AlertState {
self.observe_intensity(window.intensity(now))
}
#[must_use]
pub fn alert_state(&self) -> crate::obligation::eprocess::AlertState {
use crate::obligation::eprocess::AlertState;
if self.observations < self.config.min_observations {
return AlertState::Clear;
}
if self.e_value >= self.threshold {
AlertState::Alert
} else if self.e_value > 1.0 {
AlertState::Watching
} else {
AlertState::Clear
}
}
#[must_use]
pub fn is_alert(&self) -> bool {
self.alert_state() == crate::obligation::eprocess::AlertState::Alert
}
#[must_use]
pub fn e_value(&self) -> f64 {
self.e_value
}
#[must_use]
pub fn threshold(&self) -> f64 {
self.threshold
}
#[must_use]
pub fn observations(&self) -> u64 {
self.observations
}
#[must_use]
pub fn peak_e_value(&self) -> f64 {
self.peak_e_value
}
#[must_use]
pub fn alert_count(&self) -> u64 {
self.alert_count
}
#[must_use]
pub fn config(&self) -> &StormMonitorConfig {
&self.config
}
pub fn reset(&mut self) {
self.e_value = 1.0;
self.log_e_value = 0.0;
self.peak_e_value = 1.0;
self.observations = 0;
self.alert_count = 0;
}
#[must_use]
pub fn snapshot(&self) -> StormMonitorSnapshot {
StormMonitorSnapshot {
e_value: self.e_value,
threshold: self.threshold,
observations: self.observations,
alert_state: self.alert_state(),
peak_e_value: self.peak_e_value,
alert_count: self.alert_count,
}
}
}
#[derive(Debug, Clone)]
pub struct StormMonitorSnapshot {
pub e_value: f64,
pub threshold: f64,
pub observations: u64,
pub alert_state: crate::obligation::eprocess::AlertState,
pub peak_e_value: f64,
pub alert_count: u64,
}
impl std::fmt::Display for StormMonitorSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"StormMonitor[{}]: e={:.4} threshold={:.1} obs={} peak={:.4} alerts={}",
self.alert_state,
self.e_value,
self.threshold,
self.observations,
self.peak_e_value,
self.alert_count,
)
}
}
#[derive(Debug, Clone)]
pub struct RestartTrackerConfig {
pub restart: RestartConfig,
pub storm_threshold: Option<f64>,
pub storm_monitor: StormMonitorConfig,
auto_align_storm_expected_rate: bool,
}
impl RestartTrackerConfig {
#[must_use]
pub fn from_restart(restart: RestartConfig) -> Self {
Self {
restart,
storm_threshold: None,
storm_monitor: StormMonitorConfig::default(),
auto_align_storm_expected_rate: true,
}
}
#[must_use]
pub fn with_storm_detection(mut self, threshold: f64) -> Self {
validate_storm_threshold(threshold);
self.storm_threshold = Some(threshold);
self
}
#[must_use]
pub fn with_storm_monitor(mut self, config: StormMonitorConfig) -> Self {
self.storm_monitor = config;
self.auto_align_storm_expected_rate = false;
self
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum RestartVerdict {
Allowed {
attempt: u32,
delay: Option<Duration>,
},
Denied {
refusal: BudgetRefusal,
},
}
impl RestartVerdict {
#[must_use]
pub fn is_allowed(&self) -> bool {
matches!(self, Self::Allowed { .. })
}
}
impl Eq for RestartVerdict {}
#[derive(Debug)]
pub struct RestartTracker {
history: RestartHistory,
intensity: Option<RestartIntensityWindow>,
storm: Option<RestartStormMonitor>,
}
impl RestartTracker {
#[must_use]
pub fn new(config: RestartTrackerConfig) -> Self {
let window = config.restart.window;
let (intensity, storm) = match config.storm_threshold {
Some(threshold) => (
Some(RestartIntensityWindow::new(window, threshold)),
Some(RestartStormMonitor::new({
let mut storm_monitor = config.storm_monitor;
if config.auto_align_storm_expected_rate {
storm_monitor.expected_rate = threshold / storm_monitor.tolerance;
}
storm_monitor
})),
),
None => (None, None),
};
let history = RestartHistory::new(config.restart);
Self {
history,
intensity,
storm,
}
}
#[must_use]
pub fn from_restart_config(config: RestartConfig) -> Self {
Self::new(RestartTrackerConfig::from_restart(config))
}
#[must_use]
pub fn evaluate(&self, now: u64) -> RestartVerdict {
if !self.history.can_restart(now) {
return RestartVerdict::Denied {
refusal: BudgetRefusal::WindowExhausted {
max_restarts: self.history.config().max_restarts,
window: self.history.config().window,
},
};
}
let attempt = self.history.recent_restart_count(now) as u32 + 1;
let delay = self.history.next_delay(now);
RestartVerdict::Allowed { attempt, delay }
}
#[must_use]
pub fn evaluate_with_budget(&self, now: u64, budget: &Budget) -> RestartVerdict {
if let Err(refusal) = self.history.can_restart_with_budget(now, budget) {
return RestartVerdict::Denied { refusal };
}
let attempt = self.history.recent_restart_count(now) as u32 + 1;
let delay = self.history.next_delay(now);
RestartVerdict::Allowed { attempt, delay }
}
pub fn record(&mut self, now: u64) {
self.history.record_restart(now);
if let Some(ref mut intensity) = self.intensity {
intensity.record(now);
if let Some(ref mut storm) = self.storm {
storm.observe_from_window(intensity, now);
}
}
}
#[must_use]
pub fn recent_count(&self, now: u64) -> usize {
self.history.recent_restart_count(now)
}
#[must_use]
pub fn intensity(&self, now: u64) -> Option<f64> {
self.intensity.as_ref().map(|w| w.intensity(now))
}
#[must_use]
pub fn is_storm(&self) -> bool {
self.storm
.as_ref()
.is_some_and(RestartStormMonitor::is_alert)
}
#[must_use]
pub fn is_intensity_storm(&self, now: u64) -> bool {
self.intensity.as_ref().is_some_and(|w| w.is_storm(now))
}
#[must_use]
pub fn history(&self) -> &RestartHistory {
&self.history
}
#[must_use]
pub fn storm_snapshot(&self) -> Option<StormMonitorSnapshot> {
self.storm.as_ref().map(RestartStormMonitor::snapshot)
}
pub fn reset(&mut self) {
self.history = RestartHistory::new(self.history.config().clone());
if let Some(ref mut intensity) = self.intensity {
*intensity =
RestartIntensityWindow::new(intensity.window(), intensity.storm_threshold());
}
if let Some(ref mut storm) = self.storm {
storm.reset();
}
}
}
fn validate_storm_threshold(threshold: f64) {
assert!(
threshold.is_finite() && threshold > 0.0,
"storm threshold must be finite and > 0, got {threshold}"
);
}
#[derive(Debug, Clone)]
pub enum SupervisionDecision {
Restart {
task_id: TaskId,
region_id: RegionId,
attempt: u32,
delay: Option<Duration>,
},
Stop {
task_id: TaskId,
region_id: RegionId,
reason: StopReason,
},
Escalate {
task_id: TaskId,
region_id: RegionId,
parent_region_id: Option<RegionId>,
outcome: Outcome<(), ()>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StopReason {
ExplicitStop,
RestartBudgetExhausted {
total_restarts: u32,
window: Duration,
},
BudgetRefused(BudgetRefusal),
Cancelled(CancelReason),
Panicked,
RegionClosing,
}
#[derive(Debug, Clone)]
pub enum SupervisionEvent {
ActorFailed {
task_id: TaskId,
region_id: RegionId,
outcome: Outcome<(), ()>,
},
DecisionMade {
task_id: TaskId,
region_id: RegionId,
decision: SupervisionDecision,
},
RestartBeginning {
task_id: TaskId,
region_id: RegionId,
attempt: u32,
},
RestartComplete {
task_id: TaskId,
region_id: RegionId,
attempt: u32,
},
RestartFailed {
task_id: TaskId,
region_id: RegionId,
attempt: u32,
outcome: Outcome<(), ()>,
},
BudgetExhausted {
task_id: TaskId,
region_id: RegionId,
total_restarts: u32,
window: Duration,
},
Escalating {
task_id: TaskId,
from_region: RegionId,
to_region: Option<RegionId>,
},
BudgetRefusedRestart {
task_id: TaskId,
region_id: RegionId,
refusal: BudgetRefusal,
},
}
#[derive(Debug, Clone, PartialEq)]
pub enum BindingConstraint {
MonotoneSeverity {
outcome_kind: &'static str,
},
ExplicitStopStrategy,
EscalateStrategy,
RestartAllowed {
attempt: u32,
},
WindowExhausted {
max_restarts: u32,
window: Duration,
},
InsufficientCost {
required: u64,
remaining: u64,
},
DeadlineTooClose {
min_required: Duration,
remaining: Duration,
},
InsufficientPolls {
min_required: u32,
remaining: u32,
},
}
impl Eq for BindingConstraint {}
impl std::fmt::Display for BindingConstraint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MonotoneSeverity { outcome_kind } => {
write!(f, "monotone severity: {outcome_kind} is not restartable")
}
Self::ExplicitStopStrategy => write!(f, "strategy is Stop"),
Self::EscalateStrategy => write!(f, "strategy is Escalate"),
Self::RestartAllowed { attempt } => {
write!(f, "restart allowed (attempt {attempt})")
}
Self::WindowExhausted {
max_restarts,
window,
} => write!(f, "window exhausted: {max_restarts} restarts in {window:?}"),
Self::InsufficientCost {
required,
remaining,
} => write!(f, "insufficient cost: need {required}, have {remaining}"),
Self::DeadlineTooClose {
min_required,
remaining,
} => write!(
f,
"deadline too close: need {min_required:?}, have {remaining:?}"
),
Self::InsufficientPolls {
min_required,
remaining,
} => write!(
f,
"insufficient polls: need {min_required}, have {remaining}"
),
}
}
}
#[derive(Debug, Clone)]
pub struct EvidenceEntry {
pub timestamp: u64,
pub task_id: TaskId,
pub region_id: RegionId,
pub outcome: Outcome<(), ()>,
pub strategy_kind: &'static str,
pub decision: SupervisionDecision,
pub binding_constraint: BindingConstraint,
}
impl EvidenceEntry {
#[must_use]
pub fn to_evidence_record(&self) -> crate::evidence::EvidenceRecord {
use crate::evidence::{
EvidenceDetail, EvidenceRecord, Subsystem, SupervisionDetail, Verdict,
};
let (verdict, detail) = match &self.binding_constraint {
BindingConstraint::MonotoneSeverity { outcome_kind } => (
Verdict::Stop,
SupervisionDetail::MonotoneSeverity {
outcome_kind: outcome_kind.to_string(),
},
),
BindingConstraint::ExplicitStopStrategy => {
(Verdict::Stop, SupervisionDetail::ExplicitStop)
}
BindingConstraint::EscalateStrategy => {
(Verdict::Escalate, SupervisionDetail::ExplicitEscalate)
}
BindingConstraint::RestartAllowed { attempt } => {
let delay = match &self.decision {
SupervisionDecision::Restart { delay, .. } => *delay,
_ => None,
};
(
Verdict::Restart,
SupervisionDetail::RestartAllowed {
attempt: *attempt,
delay,
},
)
}
BindingConstraint::WindowExhausted {
max_restarts,
window,
} => (
Verdict::Stop,
SupervisionDetail::WindowExhausted {
max_restarts: *max_restarts,
window: *window,
},
),
BindingConstraint::InsufficientCost {
required,
remaining,
} => (
Verdict::Stop,
SupervisionDetail::BudgetRefused {
constraint: format!("insufficient cost: need {required}, have {remaining}"),
},
),
BindingConstraint::DeadlineTooClose {
min_required,
remaining,
} => (
Verdict::Stop,
SupervisionDetail::BudgetRefused {
constraint: format!(
"deadline too close: need {min_required:?}, have {remaining:?}"
),
},
),
BindingConstraint::InsufficientPolls {
min_required,
remaining,
} => (
Verdict::Stop,
SupervisionDetail::BudgetRefused {
constraint: format!(
"insufficient polls: need {min_required}, have {remaining}"
),
},
),
};
EvidenceRecord {
timestamp: self.timestamp,
task_id: self.task_id,
region_id: self.region_id,
subsystem: Subsystem::Supervision,
verdict,
detail: EvidenceDetail::Supervision(detail),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EvidenceLedger {
entries: Vec<EvidenceEntry>,
}
impl EvidenceLedger {
#[must_use]
pub fn new() -> Self {
Self {
entries: Vec::new(),
}
}
pub fn push(&mut self, entry: EvidenceEntry) {
self.entries.push(entry);
}
#[must_use]
pub fn entries(&self) -> &[EvidenceEntry] {
&self.entries
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn for_task(&self, task_id: TaskId) -> impl Iterator<Item = &EvidenceEntry> {
self.entries.iter().filter(move |e| e.task_id == task_id)
}
pub fn with_constraint<F>(&self, predicate: F) -> impl Iterator<Item = &EvidenceEntry>
where
F: Fn(&BindingConstraint) -> bool,
{
self.entries
.iter()
.filter(move |e| predicate(&e.binding_constraint))
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
#[derive(Debug)]
pub struct Supervisor {
strategy: SupervisionStrategy,
history: Option<RestartHistory>,
evidence: EvidenceLedger,
generalized_evidence: crate::evidence::GeneralizedLedger,
}
impl Supervisor {
#[must_use]
pub fn new(strategy: SupervisionStrategy) -> Self {
let history = match &strategy {
SupervisionStrategy::Restart(config) => Some(RestartHistory::new(config.clone())),
_ => None,
};
Self {
strategy,
history,
evidence: EvidenceLedger::new(),
generalized_evidence: crate::evidence::GeneralizedLedger::new(),
}
}
#[must_use]
pub fn strategy(&self) -> &SupervisionStrategy {
&self.strategy
}
fn record_evidence(&mut self, entry: EvidenceEntry) {
let generalized_record = entry.to_evidence_record();
self.evidence.push(entry);
self.generalized_evidence.push(generalized_record);
}
#[allow(clippy::too_many_lines)]
fn decide_err_with_budget(
&mut self,
task_id: TaskId,
region_id: RegionId,
parent_region_id: Option<RegionId>,
now: u64,
budget: Option<&mut Budget>,
) -> (SupervisionDecision, BindingConstraint) {
match &mut self.strategy {
SupervisionStrategy::Stop => (
SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::ExplicitStop,
},
BindingConstraint::ExplicitStopStrategy,
),
SupervisionStrategy::Restart(config) => {
let history = self.history.as_mut().expect("history exists for Restart");
if let Some(b) = budget {
if let Err(refusal) = history.can_restart_with_budget(now, b) {
let constraint = match &refusal {
BudgetRefusal::WindowExhausted {
max_restarts,
window,
} => BindingConstraint::WindowExhausted {
max_restarts: *max_restarts,
window: *window,
},
BudgetRefusal::InsufficientCost {
required,
remaining,
} => BindingConstraint::InsufficientCost {
required: *required,
remaining: *remaining,
},
BudgetRefusal::DeadlineTooClose {
min_required,
remaining,
} => BindingConstraint::DeadlineTooClose {
min_required: *min_required,
remaining: *remaining,
},
BudgetRefusal::InsufficientPolls {
min_required,
remaining,
} => BindingConstraint::InsufficientPolls {
min_required: *min_required,
remaining: *remaining,
},
};
let decision = match refusal {
BudgetRefusal::WindowExhausted { .. } => SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::RestartBudgetExhausted {
total_restarts: u32::try_from(
history.recent_restart_count(now),
)
.unwrap_or(u32::MAX),
window: config.window,
},
},
_ => SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::BudgetRefused(refusal),
},
};
return (decision, constraint);
}
if config.restart_cost > 0 {
b.consume_cost(config.restart_cost);
}
} else if !history.can_restart(now) {
return (
SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::RestartBudgetExhausted {
total_restarts: u32::try_from(history.recent_restart_count(now))
.unwrap_or(u32::MAX),
window: config.window,
},
},
BindingConstraint::WindowExhausted {
max_restarts: config.max_restarts,
window: config.window,
},
);
}
let attempt = history.recent_restart_count(now) as u32 + 1;
let delay = history.next_delay(now);
history.record_restart(now);
(
SupervisionDecision::Restart {
task_id,
region_id,
attempt,
delay,
},
BindingConstraint::RestartAllowed { attempt },
)
}
SupervisionStrategy::Escalate => (
SupervisionDecision::Escalate {
task_id,
region_id,
parent_region_id,
outcome: Outcome::Err(()),
},
BindingConstraint::EscalateStrategy,
),
}
}
pub fn on_failure(
&mut self,
task_id: TaskId,
region_id: RegionId,
parent_region_id: Option<RegionId>,
outcome: &Outcome<(), ()>,
now: u64,
) -> SupervisionDecision {
self.on_failure_with_budget(task_id, region_id, parent_region_id, outcome, now, None)
}
pub fn on_failure_with_budget(
&mut self,
task_id: TaskId,
region_id: RegionId,
parent_region_id: Option<RegionId>,
outcome: &Outcome<(), ()>,
now: u64,
budget: Option<&mut Budget>,
) -> SupervisionDecision {
let strategy_kind = match &self.strategy {
SupervisionStrategy::Stop => "Stop",
SupervisionStrategy::Restart(_) => "Restart",
SupervisionStrategy::Escalate => "Escalate",
};
let (decision, constraint) = match outcome {
Outcome::Ok(()) => (
SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::ExplicitStop,
},
BindingConstraint::MonotoneSeverity { outcome_kind: "Ok" },
),
Outcome::Cancelled(reason) => (
SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::Cancelled(reason.clone()),
},
BindingConstraint::MonotoneSeverity {
outcome_kind: "Cancelled",
},
),
Outcome::Panicked(_) => (
SupervisionDecision::Stop {
task_id,
region_id,
reason: StopReason::Panicked,
},
BindingConstraint::MonotoneSeverity {
outcome_kind: "Panicked",
},
),
Outcome::Err(()) => {
self.decide_err_with_budget(task_id, region_id, parent_region_id, now, budget)
}
};
self.record_evidence(EvidenceEntry {
timestamp: now,
task_id,
region_id,
outcome: outcome.clone(),
strategy_kind,
decision: decision.clone(),
binding_constraint: constraint,
});
decision
}
#[must_use]
pub fn history(&self) -> Option<&RestartHistory> {
self.history.as_ref()
}
#[must_use]
pub fn evidence(&self) -> &EvidenceLedger {
&self.evidence
}
pub fn take_evidence(&mut self) -> EvidenceLedger {
std::mem::take(&mut self.evidence)
}
#[must_use]
pub fn generalized_evidence(&self) -> &crate::evidence::GeneralizedLedger {
&self.generalized_evidence
}
pub fn take_generalized_evidence(&mut self) -> crate::evidence::GeneralizedLedger {
std::mem::take(&mut self.generalized_evidence)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MonitorRef(u64);
impl MonitorRef {
#[doc(hidden)]
#[must_use]
pub const fn new_for_test(id: u64) -> Self {
Self(id)
}
#[must_use]
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl std::fmt::Display for MonitorRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Mon{}", self.0)
}
}
#[derive(Debug, Clone)]
pub struct Down {
pub monitored: TaskId,
pub reason: Outcome<(), ()>,
pub monitor_ref: MonitorRef,
pub completion_vt: Time,
}
impl Down {
#[must_use]
pub fn sort_key(&self) -> (Time, TaskId) {
(self.completion_vt, self.monitored)
}
}
impl PartialEq for Down {
fn eq(&self, other: &Self) -> bool {
self.monitored == other.monitored
&& self.monitor_ref == other.monitor_ref
&& self.completion_vt == other.completion_vt
}
}
impl Eq for Down {}
#[derive(Debug, Clone)]
struct MonitorEntry {
watcher: TaskId,
watcher_region: RegionId,
monitored: TaskId,
}
#[derive(Debug)]
pub struct MonitorTable {
next_ref: u64,
monitors: BTreeMap<MonitorRef, MonitorEntry>,
by_monitored: BTreeMap<TaskId, Vec<MonitorRef>>,
by_region: BTreeMap<RegionId, Vec<MonitorRef>>,
}
impl Default for MonitorTable {
fn default() -> Self {
Self::new()
}
}
impl MonitorTable {
#[must_use]
pub fn new() -> Self {
Self {
next_ref: 0,
monitors: BTreeMap::new(),
by_monitored: BTreeMap::new(),
by_region: BTreeMap::new(),
}
}
pub fn monitor(
&mut self,
watcher: TaskId,
watcher_region: RegionId,
monitored: TaskId,
) -> MonitorRef {
let mref = MonitorRef(self.next_ref);
self.next_ref += 1;
let entry = MonitorEntry {
watcher,
watcher_region,
monitored,
};
self.monitors.insert(mref, entry);
let refs = self.by_monitored.entry(monitored).or_default();
let pos = refs.binary_search(&mref).unwrap_or_else(|p| p);
refs.insert(pos, mref);
let region_refs = self.by_region.entry(watcher_region).or_default();
let pos = region_refs.binary_search(&mref).unwrap_or_else(|p| p);
region_refs.insert(pos, mref);
mref
}
pub fn demonitor(&mut self, mref: MonitorRef) -> bool {
let Some(entry) = self.monitors.remove(&mref) else {
return false;
};
Self::remove_from_index(&mut self.by_monitored, entry.monitored, mref);
Self::remove_from_index(&mut self.by_region, entry.watcher_region, mref);
true
}
pub fn notify_down(
&mut self,
task: TaskId,
reason: &Outcome<(), ()>,
completion_vt: Time,
) -> Vec<Down> {
let refs = self.by_monitored.remove(&task).unwrap_or_default();
let mut downs = Vec::with_capacity(refs.len());
for mref in refs {
if let Some(entry) = self.monitors.remove(&mref) {
Self::remove_from_index(&mut self.by_region, entry.watcher_region, mref);
downs.push(Down {
monitored: task,
reason: reason.clone(),
monitor_ref: mref,
completion_vt,
});
}
}
downs.sort_by_key(Down::sort_key);
downs
}
pub fn notify_down_batch(
&mut self,
terminations: &[(TaskId, Outcome<(), ()>, Time)],
) -> Vec<Down> {
let mut all_downs = Vec::new();
for (task, reason, vt) in terminations {
all_downs.extend(self.notify_down(*task, reason, *vt));
}
all_downs.sort_by_key(Down::sort_key);
all_downs
}
pub fn cleanup_region(&mut self, region: RegionId) -> usize {
let refs = self.by_region.remove(®ion).unwrap_or_default();
let count = refs.len();
for mref in refs {
if let Some(entry) = self.monitors.remove(&mref) {
Self::remove_from_index(&mut self.by_monitored, entry.monitored, mref);
}
}
count
}
#[must_use]
pub fn len(&self) -> usize {
self.monitors.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.monitors.is_empty()
}
#[must_use]
pub fn watchers_of(&self, task: TaskId) -> &[MonitorRef] {
self.by_monitored.get(&task).map_or(&[], Vec::as_slice)
}
#[must_use]
pub fn watcher_for(&self, mref: MonitorRef) -> Option<TaskId> {
self.monitors.get(&mref).map(|e| e.watcher)
}
#[must_use]
pub fn monitored_for(&self, mref: MonitorRef) -> Option<TaskId> {
self.monitors.get(&mref).map(|e| e.monitored)
}
fn remove_from_index<K>(index: &mut BTreeMap<K, Vec<MonitorRef>>, key: K, mref: MonitorRef)
where
K: Ord + Copy,
{
let remove_bucket = if let Some(bucket) = index.get_mut(&key) {
if let Ok(pos) = bucket.binary_search(&mref) {
bucket.remove(pos);
}
bucket.is_empty()
} else {
false
};
if remove_bucket {
index.remove(&key);
}
}
}
#[derive(Debug, Clone)]
pub enum MonitorEvent {
Established {
watcher: TaskId,
monitored: TaskId,
monitor_ref: MonitorRef,
},
Demonitored {
monitor_ref: MonitorRef,
},
DownProduced {
monitored: TaskId,
watcher: TaskId,
monitor_ref: MonitorRef,
completion_vt: Time,
},
RegionCleanup {
region: RegionId,
count: usize,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::evidence::{EvidenceDetail, SupervisionDetail, Verdict};
use crate::types::PanicPayload;
use crate::util::ArenaIndex;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn test_task_id() -> TaskId {
TaskId::from_arena(ArenaIndex::new(0, 1))
}
fn test_region_id() -> RegionId {
RegionId::from_arena(ArenaIndex::new(0, 0))
}
#[allow(clippy::unnecessary_wraps)]
fn noop_start(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Ok(test_task_id())
}
#[allow(clippy::unnecessary_wraps)]
fn noop_start_alt(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Ok(test_task_id())
}
fn spawn_registered_child(
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
let handle = scope.spawn_registered(state, cx, |_cx| async move { 0u8 })?;
Ok(handle.task_id())
}
use parking_lot::Mutex;
use std::sync::Arc;
struct LoggingStart {
name: &'static str,
log: Arc<Mutex<Vec<String>>>,
}
impl ChildStart for LoggingStart {
fn start(
&mut self,
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
state: &mut RuntimeState,
cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
self.log.lock().push(self.name.to_string());
let handle = scope.spawn_registered(state, cx, |_cx| async move { 0u8 })?;
Ok(handle.task_id())
}
}
#[test]
fn stop_strategy_always_stops() {
init_test("stop_strategy_always_stops");
let mut supervisor = Supervisor::new(SupervisionStrategy::Stop);
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Cancelled(CancelReason::user("test")),
0,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Cancelled(_),
..
}
));
crate::test_complete!("stop_strategy_always_stops");
}
#[test]
fn restart_strategy_allows_restarts() {
init_test("restart_strategy_allows_restarts");
let config = RestartConfig::new(3, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let decision =
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
assert!(matches!(
decision,
SupervisionDecision::Restart { attempt: 1, .. }
));
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000, );
assert!(matches!(
decision,
SupervisionDecision::Restart { attempt: 2, .. }
));
crate::test_complete!("restart_strategy_allows_restarts");
}
#[test]
fn restart_strategy_does_not_restart_cancelled() {
init_test("restart_strategy_does_not_restart_cancelled");
let config = RestartConfig::new(3, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Cancelled(CancelReason::user("test")),
0,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Cancelled(_),
..
}
));
crate::test_complete!("restart_strategy_does_not_restart_cancelled");
}
#[test]
fn restart_budget_exhaustion() {
init_test("restart_budget_exhaustion");
let config = RestartConfig::new(2, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000,
);
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted { .. },
..
}
));
crate::test_complete!("restart_budget_exhaustion");
}
#[test]
fn restart_window_resets() {
init_test("restart_window_resets");
let config = RestartConfig::new(2, Duration::from_secs(1)); let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
500_000_000, );
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000, );
assert!(matches!(
decision,
SupervisionDecision::Restart { attempt: 1, .. }
));
crate::test_complete!("restart_window_resets");
}
#[test]
fn escalate_strategy_escalates() {
init_test("escalate_strategy_escalates");
let mut supervisor = Supervisor::new(SupervisionStrategy::Escalate);
let parent = RegionId::from_arena(ArenaIndex::new(0, 99));
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
Some(parent),
&Outcome::Err(()),
0,
);
assert!(matches!(
decision,
SupervisionDecision::Escalate {
parent_region_id: Some(_),
..
}
));
crate::test_complete!("escalate_strategy_escalates");
}
#[test]
fn escalate_strategy_does_not_escalate_cancelled() {
init_test("escalate_strategy_does_not_escalate_cancelled");
let mut supervisor = Supervisor::new(SupervisionStrategy::Escalate);
let parent = RegionId::from_arena(ArenaIndex::new(0, 99));
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
Some(parent),
&Outcome::Cancelled(CancelReason::user("test")),
0,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Cancelled(_),
..
}
));
crate::test_complete!("escalate_strategy_does_not_escalate_cancelled");
}
#[test]
fn panics_always_stop() {
init_test("panics_always_stop");
let config = RestartConfig::new(10, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Panicked(PanicPayload::new("test panic")),
0,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Panicked,
..
}
));
crate::test_complete!("panics_always_stop");
}
#[test]
fn exponential_backoff() {
init_test("exponential_backoff");
let backoff = BackoffStrategy::Exponential {
initial: Duration::from_millis(100),
max: Duration::from_secs(10),
multiplier: 2.0,
};
let d0 = backoff.delay_for_attempt(0).unwrap();
assert_eq!(d0.as_millis(), 100);
let d1 = backoff.delay_for_attempt(1).unwrap();
assert_eq!(d1.as_millis(), 200);
let d2 = backoff.delay_for_attempt(2).unwrap();
assert_eq!(d2.as_millis(), 400);
let d10 = backoff.delay_for_attempt(10).unwrap();
assert_eq!(d10.as_secs(), 10);
crate::test_complete!("exponential_backoff");
}
#[test]
fn fixed_backoff() {
init_test("fixed_backoff");
let backoff = BackoffStrategy::Fixed(Duration::from_millis(500));
for attempt in 0..5 {
let delay = backoff.delay_for_attempt(attempt).unwrap();
assert_eq!(delay.as_millis(), 500);
}
crate::test_complete!("fixed_backoff");
}
#[test]
fn no_backoff() {
init_test("no_backoff");
let backoff = BackoffStrategy::None;
for attempt in 0..5 {
assert!(backoff.delay_for_attempt(attempt).is_none());
}
crate::test_complete!("no_backoff");
}
#[test]
fn restart_history_tracking() {
init_test("restart_history_tracking");
let config = RestartConfig::new(3, Duration::from_secs(10));
let mut history = RestartHistory::new(config);
assert!(history.can_restart(0));
assert_eq!(history.recent_restart_count(0), 0);
history.record_restart(1_000_000_000); history.record_restart(2_000_000_000); history.record_restart(3_000_000_000);
assert_eq!(history.recent_restart_count(3_000_000_000), 3);
assert!(!history.can_restart(3_000_000_000));
assert_eq!(history.recent_restart_count(15_000_000_000), 0);
assert!(history.can_restart(15_000_000_000));
crate::test_complete!("restart_history_tracking");
}
#[test]
fn restart_policy_defaults_to_one_for_one() {
init_test("restart_policy_defaults_to_one_for_one");
let policy = RestartPolicy::default();
assert_eq!(policy, RestartPolicy::OneForOne);
crate::test_complete!("restart_policy_defaults_to_one_for_one");
}
#[test]
fn escalation_policy_defaults_to_stop() {
init_test("escalation_policy_defaults_to_stop");
let policy = EscalationPolicy::default();
assert_eq!(policy, EscalationPolicy::Stop);
crate::test_complete!("escalation_policy_defaults_to_stop");
}
#[test]
fn supervision_config_defaults() {
init_test("supervision_config_defaults");
let config = SupervisionConfig::default();
assert_eq!(config.restart_policy, RestartPolicy::OneForOne);
assert_eq!(config.max_restarts, 3);
assert_eq!(config.restart_window, Duration::from_mins(1));
assert_eq!(config.escalation, EscalationPolicy::Stop);
crate::test_complete!("supervision_config_defaults");
}
#[test]
fn supervision_config_builder() {
init_test("supervision_config_builder");
let config = SupervisionConfig::new(5, Duration::from_secs(30))
.with_restart_policy(RestartPolicy::OneForAll)
.with_backoff(BackoffStrategy::Fixed(Duration::from_millis(100)))
.with_escalation(EscalationPolicy::Escalate);
assert_eq!(config.restart_policy, RestartPolicy::OneForAll);
assert_eq!(config.max_restarts, 5);
assert_eq!(config.restart_window, Duration::from_secs(30));
assert_eq!(
config.backoff,
BackoffStrategy::Fixed(Duration::from_millis(100))
);
assert_eq!(config.escalation, EscalationPolicy::Escalate);
crate::test_complete!("supervision_config_builder");
}
#[test]
fn supervision_config_one_for_all_helper() {
init_test("supervision_config_one_for_all_helper");
let config = SupervisionConfig::one_for_all(5, Duration::from_secs(120));
assert_eq!(config.restart_policy, RestartPolicy::OneForAll);
assert_eq!(config.max_restarts, 5);
assert_eq!(config.restart_window, Duration::from_secs(120));
crate::test_complete!("supervision_config_one_for_all_helper");
}
#[test]
fn supervision_config_rest_for_one_helper() {
init_test("supervision_config_rest_for_one_helper");
let config = SupervisionConfig::rest_for_one(10, Duration::from_secs(300));
assert_eq!(config.restart_policy, RestartPolicy::RestForOne);
assert_eq!(config.max_restarts, 10);
assert_eq!(config.restart_window, Duration::from_secs(300));
crate::test_complete!("supervision_config_rest_for_one_helper");
}
#[test]
fn child_spec_builder() {
init_test("child_spec_builder");
let spec = ChildSpec::new("worker-1", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default()))
.with_shutdown_budget(Budget::with_deadline_secs(10))
.with_registration(NameRegistrationPolicy::Register {
name: "worker-1".to_string(),
collision: NameCollisionPolicy::Fail,
})
.depends_on("db")
.with_start_immediately(false)
.with_required(false);
assert_eq!(spec.name, "worker-1");
assert!(matches!(spec.restart, SupervisionStrategy::Restart(_)));
assert!(!spec.start_immediately);
assert!(!spec.required);
assert_eq!(spec.depends_on, vec!["db".to_string()]);
crate::test_complete!("child_spec_builder");
}
#[test]
fn child_spec_defaults() {
init_test("child_spec_defaults");
let spec = ChildSpec::new("default-child", noop_start);
assert_eq!(spec.name, "default-child");
assert!(matches!(spec.restart, SupervisionStrategy::Stop));
assert_eq!(spec.shutdown_budget, Budget::INFINITE);
assert!(spec.depends_on.is_empty());
assert_eq!(spec.registration, NameRegistrationPolicy::None);
assert!(spec.start_immediately);
assert!(spec.required);
crate::test_complete!("child_spec_defaults");
}
#[test]
fn supervisor_builder_defaults() {
init_test("supervisor_builder_defaults");
let defaults = SupervisorBuilder::new("sup-default");
assert_eq!(defaults.name, "sup-default");
assert_eq!(defaults.budget, None);
assert_eq!(defaults.tie_break, StartTieBreak::InsertionOrder);
assert_eq!(defaults.restart_policy, RestartPolicy::OneForOne);
assert!(defaults.children.is_empty());
let same = SupervisorBuilder::new("sup-default");
assert!(defaults.spec_eq(&same));
assert_eq!(defaults.spec_fingerprint(), same.spec_fingerprint());
let different =
SupervisorBuilder::new("sup-default").with_restart_policy(RestartPolicy::OneForAll);
assert!(!defaults.spec_eq(&different));
assert_ne!(defaults.spec_fingerprint(), different.spec_fingerprint());
crate::test_complete!("supervisor_builder_defaults");
}
#[test]
fn child_spec_pure_surface_is_comparable_and_hashable() {
init_test("child_spec_pure_surface_is_comparable_and_hashable");
let left = ChildSpec::new("svc", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::new(
3,
Duration::from_secs(10),
)))
.with_shutdown_budget(Budget::with_deadline_secs(2))
.depends_on("db")
.with_start_immediately(false)
.with_required(true);
let right = ChildSpec::new("svc", noop_start_alt)
.with_restart(SupervisionStrategy::Restart(RestartConfig::new(
3,
Duration::from_secs(10),
)))
.with_shutdown_budget(Budget::with_deadline_secs(2))
.depends_on("db")
.with_start_immediately(false)
.with_required(true);
assert!(left.spec_eq(&right));
assert_eq!(left.spec_fingerprint(), right.spec_fingerprint());
crate::test_complete!("child_spec_pure_surface_is_comparable_and_hashable");
}
#[test]
fn supervisor_builder_pure_surface_is_comparable_and_hashable() {
init_test("supervisor_builder_pure_surface_is_comparable_and_hashable");
let left = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::NameLex)
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("worker", noop_start));
let right = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::NameLex)
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("worker", noop_start_alt));
assert!(left.spec_eq(&right));
assert_eq!(left.spec_fingerprint(), right.spec_fingerprint());
crate::test_complete!("supervisor_builder_pure_surface_is_comparable_and_hashable");
}
#[test]
fn supervisor_builder_compile_order_insertion_tie_break() {
init_test("supervisor_builder_compile_order_insertion_tie_break");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"))
.child(ChildSpec::new("c", noop_start).depends_on("a"));
let compiled = builder.compile().expect("compile");
assert_eq!(compiled.start_order, vec![0, 1, 2]);
crate::test_complete!("supervisor_builder_compile_order_insertion_tie_break");
}
#[test]
fn supervisor_builder_compile_detects_cycle() {
init_test("supervisor_builder_compile_detects_cycle");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("a", noop_start).depends_on("b"))
.child(ChildSpec::new("b", noop_start).depends_on("a"));
let err = builder.compile().expect_err("should detect cycle");
assert!(matches!(err, SupervisorCompileError::CycleDetected { .. }));
crate::test_complete!("supervisor_builder_compile_detects_cycle");
}
#[test]
fn compiled_supervisor_spawn_starts_children_in_order() {
init_test("compiled_supervisor_spawn_starts_children_in_order");
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let mk = |name: &'static str, log: &Arc<Mutex<Vec<String>>>| {
ChildSpec::new(
name,
LoggingStart {
name,
log: Arc::clone(log),
},
)
};
let builder = SupervisorBuilder::new("sup")
.child(mk("a", &log))
.child(mk("b", &log).depends_on("a"))
.child(mk("c", &log).depends_on("a"));
let compiled = builder.compile().expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let handle = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect("spawn");
assert_eq!(handle.started.len(), 3);
assert_eq!(
*log.lock(),
vec!["a".to_string(), "b".to_string(), "c".to_string()]
);
crate::test_complete!("compiled_supervisor_spawn_starts_children_in_order");
}
#[test]
fn compiled_supervisor_restart_plan_one_for_one() {
init_test("compiled_supervisor_restart_plan_one_for_one");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"))
.child(ChildSpec::new("c", noop_start).depends_on("b"))
.child(ChildSpec::new("d", noop_start).depends_on("c"));
let compiled = builder.compile().expect("compile");
let plan = compiled.restart_plan_for("b").expect("plan");
assert_eq!(plan.policy, RestartPolicy::OneForOne);
assert_eq!(plan.cancel_order, vec!["b".to_string()]);
assert_eq!(plan.restart_order, vec!["b".to_string()]);
crate::test_complete!("compiled_supervisor_restart_plan_one_for_one");
}
#[test]
fn compiled_supervisor_restart_plan_one_for_all() {
init_test("compiled_supervisor_restart_plan_one_for_all");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"))
.child(ChildSpec::new("c", noop_start).depends_on("b"))
.child(ChildSpec::new("d", noop_start).depends_on("c"));
let compiled = builder.compile().expect("compile");
let plan = compiled.restart_plan_for("b").expect("plan");
assert_eq!(plan.policy, RestartPolicy::OneForAll);
assert_eq!(
plan.cancel_order,
vec![
"d".to_string(),
"c".to_string(),
"b".to_string(),
"a".to_string()
]
);
assert_eq!(
plan.restart_order,
vec![
"a".to_string(),
"b".to_string(),
"c".to_string(),
"d".to_string()
]
);
crate::test_complete!("compiled_supervisor_restart_plan_one_for_all");
}
#[test]
fn compiled_supervisor_restart_plan_rest_for_one() {
init_test("compiled_supervisor_restart_plan_rest_for_one");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::RestForOne)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"))
.child(ChildSpec::new("c", noop_start).depends_on("b"))
.child(ChildSpec::new("d", noop_start).depends_on("c"));
let compiled = builder.compile().expect("compile");
let plan = compiled.restart_plan_for("b").expect("plan");
assert_eq!(plan.policy, RestartPolicy::RestForOne);
assert_eq!(
plan.cancel_order,
vec!["d".to_string(), "c".to_string(), "b".to_string()]
);
assert_eq!(
plan.restart_order,
vec!["b".to_string(), "c".to_string(), "d".to_string()]
);
crate::test_complete!("compiled_supervisor_restart_plan_rest_for_one");
}
#[test]
fn compiled_supervisor_restart_plan_unknown_child_none() {
init_test("compiled_supervisor_restart_plan_unknown_child_none");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"));
let compiled = builder.compile().expect("compile");
assert!(compiled.restart_plan_for("zzz").is_none());
crate::test_complete!("compiled_supervisor_restart_plan_unknown_child_none");
}
#[test]
fn compiled_supervisor_restart_plan_for_failure_monotone_severity() {
init_test("compiled_supervisor_restart_plan_for_failure_monotone_severity");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("a", noop_start))
.child(
ChildSpec::new("b", noop_start)
.depends_on("a")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(ChildSpec::new("c", noop_start).depends_on("b"));
let compiled = builder.compile().expect("compile");
let ok: Outcome<(), ()> = Outcome::Ok(());
let cancelled: Outcome<(), ()> = Outcome::Cancelled(CancelReason::user("cancelled"));
let panicked: Outcome<(), ()> = Outcome::Panicked(crate::types::PanicPayload::new("panic"));
let err: Outcome<(), ()> = Outcome::Err(());
assert!(compiled.restart_plan_for_failure("b", &ok).is_none());
assert!(compiled.restart_plan_for_failure("b", &cancelled).is_none());
assert!(compiled.restart_plan_for_failure("b", &panicked).is_none());
let plan = compiled
.restart_plan_for_failure("b", &err)
.expect("restart plan");
assert_eq!(plan.policy, RestartPolicy::OneForAll);
crate::test_complete!("compiled_supervisor_restart_plan_for_failure_monotone_severity");
}
#[test]
fn compiled_supervisor_restart_plan_for_failure_requires_restart_strategy() {
init_test("compiled_supervisor_restart_plan_for_failure_requires_restart_strategy");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start).depends_on("a"));
let compiled = builder.compile().expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
assert!(compiled.restart_plan_for_failure("b", &err).is_none());
crate::test_complete!(
"compiled_supervisor_restart_plan_for_failure_requires_restart_strategy"
);
}
#[test]
fn restart_policy_equality() {
init_test("restart_policy_equality");
assert_eq!(RestartPolicy::OneForOne, RestartPolicy::OneForOne);
assert_ne!(RestartPolicy::OneForOne, RestartPolicy::OneForAll);
assert_ne!(RestartPolicy::OneForAll, RestartPolicy::RestForOne);
crate::test_complete!("restart_policy_equality");
}
fn make_restart_child(name: &str, budget: Budget) -> ChildSpec {
ChildSpec {
name: name.into(),
start: Box::new(noop_start),
restart: SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
shutdown_budget: budget,
depends_on: vec![],
registration: NameRegistrationPolicy::None,
start_immediately: true,
required: true,
}
}
#[test]
fn compile_restart_ops_one_for_one() {
init_test("compile_restart_ops_one_for_one");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForOne)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("b").unwrap();
let ops = compiled.compile_restart_ops(&plan);
assert_eq!(ops.policy, RestartPolicy::OneForOne);
assert_eq!(ops.ops.len(), 3);
assert!(matches!(&ops.ops[0], RegionOp::CancelChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[1], RegionOp::DrainChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[2], RegionOp::RestartChild { name } if name == "b"));
crate::test_complete!("compile_restart_ops_one_for_one");
}
#[test]
fn compile_restart_ops_one_for_all() {
init_test("compile_restart_ops_one_for_all");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("b").unwrap();
let ops = compiled.compile_restart_ops(&plan);
assert_eq!(ops.policy, RestartPolicy::OneForAll);
assert_eq!(ops.ops.len(), 9);
assert!(matches!(&ops.ops[0], RegionOp::CancelChild { name, .. } if name == "c"));
assert!(matches!(&ops.ops[1], RegionOp::CancelChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[2], RegionOp::CancelChild { name, .. } if name == "a"));
assert!(matches!(&ops.ops[3], RegionOp::DrainChild { name, .. } if name == "c"));
assert!(matches!(&ops.ops[4], RegionOp::DrainChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[5], RegionOp::DrainChild { name, .. } if name == "a"));
assert!(matches!(&ops.ops[6], RegionOp::RestartChild { name } if name == "a"));
assert!(matches!(&ops.ops[7], RegionOp::RestartChild { name } if name == "b"));
assert!(matches!(&ops.ops[8], RegionOp::RestartChild { name } if name == "c"));
crate::test_complete!("compile_restart_ops_one_for_all");
}
#[test]
fn compile_restart_ops_rest_for_one() {
init_test("compile_restart_ops_rest_for_one");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::RestForOne)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("b").unwrap();
let ops = compiled.compile_restart_ops(&plan);
assert_eq!(ops.policy, RestartPolicy::RestForOne);
assert_eq!(ops.ops.len(), 6);
assert!(matches!(&ops.ops[0], RegionOp::CancelChild { name, .. } if name == "c"));
assert!(matches!(&ops.ops[1], RegionOp::CancelChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[2], RegionOp::DrainChild { name, .. } if name == "c"));
assert!(matches!(&ops.ops[3], RegionOp::DrainChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[4], RegionOp::RestartChild { name } if name == "b"));
assert!(matches!(&ops.ops[5], RegionOp::RestartChild { name } if name == "c"));
crate::test_complete!("compile_restart_ops_rest_for_one");
}
#[test]
fn compile_restart_ops_preserves_per_child_budgets() {
init_test("compile_restart_ops_preserves_per_child_budgets");
let budget_a = Budget::INFINITE.with_poll_quota(10);
let budget_b = Budget::INFINITE.with_poll_quota(20);
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", budget_a))
.child(make_restart_child("b", budget_b))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("a").unwrap();
let ops = compiled.compile_restart_ops(&plan);
assert!(
matches!(
&ops.ops[0],
RegionOp::CancelChild {
name,
shutdown_budget,
} if name == "b" && *shutdown_budget == budget_b
),
"expected first op to cancel child b with its shutdown budget"
);
assert!(
matches!(
&ops.ops[1],
RegionOp::CancelChild {
name,
shutdown_budget,
} if name == "a" && *shutdown_budget == budget_a
),
"expected second op to cancel child a with its shutdown budget"
);
assert!(
matches!(
&ops.ops[2],
RegionOp::DrainChild {
name,
shutdown_budget,
} if name == "b" && *shutdown_budget == budget_b
),
"expected third op to drain child b with its shutdown budget"
);
crate::test_complete!("compile_restart_ops_preserves_per_child_budgets");
}
#[test]
fn compile_restart_ops_with_dependencies() {
init_test("compile_restart_ops_with_dependencies");
let mut child_b = make_restart_child("b", Budget::INFINITE);
child_b.depends_on = vec!["a".into()];
let mut child_c = make_restart_child("c", Budget::INFINITE);
child_c.depends_on = vec!["b".into()];
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", Budget::INFINITE))
.child(child_b)
.child(child_c)
.compile()
.unwrap();
let plan = compiled.restart_plan_for("a").unwrap();
let ops = compiled.compile_restart_ops(&plan);
assert!(matches!(&ops.ops[0], RegionOp::CancelChild { name, .. } if name == "c"));
assert!(matches!(&ops.ops[1], RegionOp::CancelChild { name, .. } if name == "b"));
assert!(matches!(&ops.ops[2], RegionOp::CancelChild { name, .. } if name == "a"));
assert!(matches!(&ops.ops[6], RegionOp::RestartChild { name } if name == "a"));
assert!(matches!(&ops.ops[7], RegionOp::RestartChild { name } if name == "b"));
assert!(matches!(&ops.ops[8], RegionOp::RestartChild { name } if name == "c"));
crate::test_complete!("compile_restart_ops_with_dependencies");
}
#[test]
fn compile_restart_ops_deterministic() {
init_test("compile_restart_ops_deterministic");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("b").unwrap();
let ops1 = compiled.compile_restart_ops(&plan);
let ops2 = compiled.compile_restart_ops(&plan);
assert_eq!(ops1, ops2, "compile_restart_ops must be deterministic");
crate::test_complete!("compile_restart_ops_deterministic");
}
#[test]
fn compile_restart_ops_three_phase_ordering() {
init_test("compile_restart_ops_three_phase_ordering");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("a").unwrap();
let ops = compiled.compile_restart_ops(&plan);
let mut phase = 0; for op in &ops.ops {
let op_phase = match op {
RegionOp::CancelChild { .. } => 0,
RegionOp::DrainChild { .. } => 1,
RegionOp::RestartChild { .. } => 2,
};
assert!(
op_phase >= phase,
"ops must be ordered: cancels, then drains, then restarts; got phase {op_phase} after {phase}"
);
phase = op_phase;
}
crate::test_complete!("compile_restart_ops_three_phase_ordering");
}
#[test]
fn conformance_one_for_one_isolates_failure() {
init_test("conformance_one_for_one_isolates_failure");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForOne)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
for name in &["a", "b", "c"] {
let plan = compiled.restart_plan_for(name).unwrap();
let ops = compiled.compile_restart_ops(&plan);
let names: Vec<&str> = ops
.ops
.iter()
.filter_map(|op| match op {
RegionOp::CancelChild { name, .. } => Some(name.as_str()),
_ => None,
})
.collect();
assert_eq!(
names,
vec![*name],
"OneForOne must only cancel the failed child"
);
}
crate::test_complete!("conformance_one_for_one_isolates_failure");
}
#[test]
fn conformance_one_for_all_cancels_all() {
init_test("conformance_one_for_all_cancels_all");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
for name in &["a", "b", "c"] {
let plan = compiled.restart_plan_for(name).unwrap();
let ops = compiled.compile_restart_ops(&plan);
let cancel_count = ops
.ops
.iter()
.filter(|op| matches!(op, RegionOp::CancelChild { .. }))
.count();
assert_eq!(cancel_count, 3, "OneForAll must cancel all children");
}
crate::test_complete!("conformance_one_for_all_cancels_all");
}
#[test]
fn conformance_rest_for_one_cancels_rest() {
init_test("conformance_rest_for_one_cancels_rest");
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::RestForOne)
.child(make_restart_child("a", Budget::INFINITE))
.child(make_restart_child("b", Budget::INFINITE))
.child(make_restart_child("c", Budget::INFINITE))
.compile()
.unwrap();
let plan_a = compiled.restart_plan_for("a").unwrap();
let ops_a = compiled.compile_restart_ops(&plan_a);
{
let cancel_count = ops_a
.ops
.iter()
.filter(|op| matches!(op, RegionOp::CancelChild { .. }))
.count();
assert_eq!(cancel_count, 3);
}
let plan_b = compiled.restart_plan_for("b").unwrap();
let ops_b = compiled.compile_restart_ops(&plan_b);
{
let cancel_count = ops_b
.ops
.iter()
.filter(|op| matches!(op, RegionOp::CancelChild { .. }))
.count();
assert_eq!(cancel_count, 2);
let cancels_a = ops_b
.ops
.iter()
.any(|op| matches!(op, RegionOp::CancelChild { name, .. } if name == "a"));
assert!(!cancels_a, "RestForOne must not cancel earlier children");
}
let plan_c = compiled.restart_plan_for("c").unwrap();
let ops_c = compiled.compile_restart_ops(&plan_c);
{
let cancel_count = ops_c
.ops
.iter()
.filter(|op| matches!(op, RegionOp::CancelChild { .. }))
.count();
assert_eq!(cancel_count, 1);
}
crate::test_complete!("conformance_rest_for_one_cancels_rest");
}
#[test]
fn conformance_cancel_drain_restart_budget_bound() {
init_test("conformance_cancel_drain_restart_budget_bound");
let budget_a = Budget::INFINITE.with_poll_quota(5);
let budget_b = Budget::INFINITE.with_poll_quota(10);
let compiled = SupervisorBuilder::new("test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(make_restart_child("a", budget_a))
.child(make_restart_child("b", budget_b))
.compile()
.unwrap();
let plan = compiled.restart_plan_for("a").unwrap();
let ops = compiled.compile_restart_ops(&plan);
for op in &ops.ops {
match op {
RegionOp::CancelChild {
shutdown_budget, ..
}
| RegionOp::DrainChild {
shutdown_budget, ..
} => {
assert!(
shutdown_budget.poll_quota > 0,
"shutdown budget must be present"
);
}
RegionOp::RestartChild { .. } => {} }
}
crate::test_complete!("conformance_cancel_drain_restart_budget_bound");
}
#[test]
fn escalation_policy_variants() {
init_test("escalation_policy_variants");
let stop = EscalationPolicy::Stop;
let escalate = EscalationPolicy::Escalate;
let reset = EscalationPolicy::ResetCounter;
assert_ne!(stop, escalate);
assert_ne!(escalate, reset);
assert_ne!(stop, reset);
crate::test_complete!("escalation_policy_variants");
}
#[test]
fn restart_config_budget_fields_default_to_disabled() {
init_test("restart_config_budget_fields_default");
let config = RestartConfig::default();
assert_eq!(config.restart_cost, 0);
assert_eq!(config.min_remaining_for_restart, None);
assert_eq!(config.min_polls_for_restart, 0);
crate::test_complete!("restart_config_budget_fields_default");
}
#[test]
fn restart_config_budget_builders() {
init_test("restart_config_budget_builders");
let config = RestartConfig::new(5, Duration::from_secs(30))
.with_restart_cost(100)
.with_min_remaining(Duration::from_secs(5))
.with_min_polls(50);
assert_eq!(config.restart_cost, 100);
assert_eq!(
config.min_remaining_for_restart,
Some(Duration::from_secs(5))
);
assert_eq!(config.min_polls_for_restart, 50);
crate::test_complete!("restart_config_budget_builders");
}
#[test]
fn budget_aware_restart_allowed_with_sufficient_budget() {
init_test("budget_aware_restart_sufficient");
let config = RestartConfig::new(3, Duration::from_mins(1))
.with_restart_cost(10)
.with_min_remaining(Duration::from_secs(5))
.with_min_polls(100);
let history = RestartHistory::new(config);
let budget = Budget::new()
.with_deadline(crate::types::id::Time::from_secs(60))
.with_cost_quota(1000)
.with_poll_quota(5000);
assert!(history.can_restart_with_budget(0, &budget).is_ok());
crate::test_complete!("budget_aware_restart_sufficient");
}
#[test]
fn budget_aware_restart_refused_insufficient_cost() {
init_test("budget_aware_restart_insufficient_cost");
let config = RestartConfig::new(3, Duration::from_mins(1)).with_restart_cost(100);
let history = RestartHistory::new(config);
let budget = Budget::new().with_cost_quota(50);
let result = history.can_restart_with_budget(0, &budget);
assert!(matches!(
result,
Err(BudgetRefusal::InsufficientCost {
required: 100,
remaining: 50
})
));
crate::test_complete!("budget_aware_restart_insufficient_cost");
}
#[test]
fn budget_aware_restart_refused_deadline_too_close() {
init_test("budget_aware_restart_deadline_close");
let config = RestartConfig::new(3, Duration::from_mins(1))
.with_min_remaining(Duration::from_secs(10));
let history = RestartHistory::new(config);
let budget = Budget::with_deadline_secs(15);
let now_ns = 12_000_000_000u64;
let result = history.can_restart_with_budget(now_ns, &budget);
assert!(matches!(
result,
Err(BudgetRefusal::DeadlineTooClose { .. })
));
crate::test_complete!("budget_aware_restart_deadline_close");
}
#[test]
fn budget_aware_restart_refused_insufficient_polls() {
init_test("budget_aware_restart_insufficient_polls");
let config = RestartConfig::new(3, Duration::from_mins(1)).with_min_polls(500);
let history = RestartHistory::new(config);
let budget = Budget::new().with_poll_quota(100);
let result = history.can_restart_with_budget(0, &budget);
assert!(matches!(
result,
Err(BudgetRefusal::InsufficientPolls {
min_required: 500,
remaining: 100
})
));
crate::test_complete!("budget_aware_restart_insufficient_polls");
}
#[test]
fn budget_aware_restart_allowed_no_cost_quota_set() {
init_test("budget_aware_restart_no_cost_quota");
let config = RestartConfig::new(3, Duration::from_mins(1)).with_restart_cost(100);
let history = RestartHistory::new(config);
let budget = Budget::INFINITE;
assert!(history.can_restart_with_budget(0, &budget).is_ok());
crate::test_complete!("budget_aware_restart_no_cost_quota");
}
#[test]
fn budget_aware_restart_allowed_no_deadline() {
init_test("budget_aware_restart_no_deadline");
let config = RestartConfig::new(3, Duration::from_mins(1))
.with_min_remaining(Duration::from_secs(10));
let history = RestartHistory::new(config);
let budget = Budget::INFINITE;
assert!(history.can_restart_with_budget(0, &budget).is_ok());
crate::test_complete!("budget_aware_restart_no_deadline");
}
#[test]
fn supervisor_on_failure_with_budget_refuses_restart() {
init_test("supervisor_on_failure_with_budget_refuses");
let config = RestartConfig::new(10, Duration::from_mins(1)).with_restart_cost(100);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let mut budget = Budget::new().with_cost_quota(50);
let decision = supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
0,
Some(&mut budget),
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::BudgetRefused(BudgetRefusal::InsufficientCost { .. }),
..
}
));
crate::test_complete!("supervisor_on_failure_with_budget_refuses");
}
#[test]
fn supervisor_on_failure_with_budget_allows_restart() {
init_test("supervisor_on_failure_with_budget_allows");
let config = RestartConfig::new(3, Duration::from_mins(1)).with_restart_cost(10);
let expected_delay = config.backoff.delay_for_attempt(0);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let mut budget = Budget::new().with_cost_quota(1000);
let decision = supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
0,
Some(&mut budget),
);
match decision {
SupervisionDecision::Restart { attempt, delay, .. } => {
assert_eq!(attempt, 1);
assert_eq!(delay, expected_delay);
}
other => unreachable!("expected Restart, got {other:?}"),
}
crate::test_complete!("supervisor_on_failure_with_budget_allows");
}
#[test]
fn supervisor_on_failure_without_budget_uses_window_only() {
init_test("supervisor_on_failure_without_budget");
let config = RestartConfig::new(2, Duration::from_mins(1)).with_restart_cost(10);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let d1 =
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
assert!(matches!(
d1,
SupervisionDecision::Restart { attempt: 1, .. }
));
let d2 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000,
);
assert!(matches!(
d2,
SupervisionDecision::Restart { attempt: 2, .. }
));
let d3 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
);
assert!(matches!(
d3,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted { .. },
..
}
));
crate::test_complete!("supervisor_on_failure_without_budget");
}
#[test]
fn restart_intensity_basic() {
init_test("restart_intensity_basic");
let config = RestartConfig::new(10, Duration::from_secs(10));
let mut history = RestartHistory::new(config);
assert!(history.intensity(0).abs() < f64::EPSILON);
history.record_restart(1_000_000_000); history.record_restart(2_000_000_000); history.record_restart(3_000_000_000);
let intensity = history.intensity(5_000_000_000);
assert!((intensity - 0.3).abs() < 0.01);
crate::test_complete!("restart_intensity_basic");
}
#[test]
fn intensity_window_basic() {
init_test("intensity_window_basic");
let mut window = RestartIntensityWindow::new(Duration::from_secs(10), 1.0);
assert_eq!(window.count(0), 0);
assert!(window.intensity(0).abs() < f64::EPSILON);
assert!(!window.is_storm(0));
window.record(1_000_000_000); window.record(2_000_000_000); window.record(3_000_000_000);
assert_eq!(window.count(5_000_000_000), 3);
let intensity = window.intensity(5_000_000_000);
assert!((intensity - 0.3).abs() < 0.01);
crate::test_complete!("intensity_window_basic");
}
#[test]
fn intensity_window_storm_detection() {
init_test("intensity_window_storm_detection");
let mut window = RestartIntensityWindow::new(Duration::from_secs(5), 2.0);
for i in 0..5 {
window.record(i * 200_000_000); }
assert!(!window.is_storm(1_000_000_000));
for i in 0..10 {
window.record(1_000_000_000 + i * 100_000_000); }
let now = 2_000_000_000;
assert!(window.is_storm(now));
crate::test_complete!("intensity_window_storm_detection");
}
#[test]
fn intensity_window_prunes_old_entries() {
init_test("intensity_window_prunes");
let mut window = RestartIntensityWindow::new(Duration::from_secs(5), 1.0);
window.record(0);
window.record(1_000_000_000);
window.record(2_000_000_000);
assert_eq!(window.count(3_000_000_000), 3);
window.record(10_000_000_000);
assert_eq!(window.count(10_000_000_000), 1);
crate::test_complete!("intensity_window_prunes");
}
#[test]
fn restart_history_huge_window_keeps_entries() {
init_test("restart_history_huge_window_keeps_entries");
let config = RestartConfig::new(10, Duration::MAX);
let mut history = RestartHistory::new(config);
history.record_restart(10);
history.record_restart(20);
history.record_restart(u64::MAX);
assert_eq!(history.recent_restart_count(u64::MAX), 3);
crate::test_complete!("restart_history_huge_window_keeps_entries");
}
#[test]
fn intensity_window_huge_window_keeps_entries() {
init_test("intensity_window_huge_window_keeps_entries");
let mut window = RestartIntensityWindow::new(Duration::MAX, 1.0);
window.record(10);
window.record(20);
window.record(u64::MAX);
assert_eq!(window.count(u64::MAX), 3);
crate::test_complete!("intensity_window_huge_window_keeps_entries");
}
#[test]
fn budget_refusal_display() {
init_test("budget_refusal_display");
let refusals = vec![
BudgetRefusal::WindowExhausted {
max_restarts: 3,
window: Duration::from_mins(1),
},
BudgetRefusal::InsufficientCost {
required: 100,
remaining: 50,
},
BudgetRefusal::DeadlineTooClose {
min_required: Duration::from_secs(10),
remaining: Duration::from_secs(3),
},
BudgetRefusal::InsufficientPolls {
min_required: 500,
remaining: 100,
},
];
for refusal in &refusals {
let s = format!("{refusal}");
assert!(!s.is_empty());
}
crate::test_complete!("budget_refusal_display");
}
#[test]
fn deadline_already_passed_refuses_restart() {
init_test("deadline_already_passed_refuses_restart");
let config = RestartConfig::new(3, Duration::from_mins(1))
.with_min_remaining(Duration::from_secs(1));
let history = RestartHistory::new(config);
let budget = Budget::with_deadline_secs(5);
let result = history.can_restart_with_budget(10_000_000_000, &budget);
assert!(matches!(
result,
Err(BudgetRefusal::DeadlineTooClose {
remaining: Duration::ZERO,
..
})
));
crate::test_complete!("deadline_already_passed_refuses_restart");
}
#[test]
fn budget_aware_checks_combined() {
init_test("budget_aware_checks_combined");
let config = RestartConfig::new(5, Duration::from_mins(1))
.with_restart_cost(50)
.with_min_remaining(Duration::from_secs(10))
.with_min_polls(200);
let history = RestartHistory::new(config);
let good_budget = Budget::new()
.with_deadline(crate::types::id::Time::from_secs(60))
.with_cost_quota(500)
.with_poll_quota(1000);
assert!(history.can_restart_with_budget(0, &good_budget).is_ok());
let bad_cost = Budget::new()
.with_deadline(crate::types::id::Time::from_secs(60))
.with_cost_quota(10)
.with_poll_quota(1000);
assert!(matches!(
history.can_restart_with_budget(0, &bad_cost),
Err(BudgetRefusal::InsufficientCost { .. })
));
let bad_deadline = Budget::new()
.with_deadline(crate::types::id::Time::from_secs(5))
.with_cost_quota(500)
.with_poll_quota(1000);
assert!(matches!(
history.can_restart_with_budget(0, &bad_deadline),
Err(BudgetRefusal::DeadlineTooClose { .. })
));
let bad_polls = Budget::new()
.with_deadline(crate::types::id::Time::from_secs(60))
.with_cost_quota(500)
.with_poll_quota(50);
assert!(matches!(
history.can_restart_with_budget(0, &bad_polls),
Err(BudgetRefusal::InsufficientPolls { .. })
));
crate::test_complete!("budget_aware_checks_combined");
}
fn task_id(index: u32, generation: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(index, generation))
}
fn region_id(index: u32, generation: u32) -> RegionId {
RegionId::from_arena(ArenaIndex::new(index, generation))
}
#[test]
fn monitor_ref_display() {
init_test("monitor_ref_display");
let mref = MonitorRef::new_for_test(42);
assert_eq!(format!("{mref}"), "Mon42");
assert_eq!(mref.as_u64(), 42);
crate::test_complete!("monitor_ref_display");
}
#[test]
fn monitor_table_basic_lifecycle() {
init_test("monitor_table_basic_lifecycle");
let mut table = MonitorTable::new();
assert!(table.is_empty());
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
let region = region_id(0, 0);
let mref = table.monitor(watcher, region, monitored);
assert_eq!(table.len(), 1);
assert_eq!(table.watchers_of(monitored), &[mref]);
assert_eq!(table.watcher_for(mref), Some(watcher));
assert_eq!(table.monitored_for(mref), Some(monitored));
assert!(table.demonitor(mref));
assert!(table.is_empty());
assert!(table.watchers_of(monitored).is_empty());
assert_eq!(table.watcher_for(mref), None);
assert!(!table.demonitor(mref));
crate::test_complete!("monitor_table_basic_lifecycle");
}
#[test]
fn monitor_multiple_watchers_single_target() {
init_test("monitor_multiple_watchers_single_target");
let mut table = MonitorTable::new();
let monitored = task_id(10, 0);
let watcher_a = task_id(1, 0);
let watcher_b = task_id(2, 0);
let region = region_id(0, 0);
let ref_a = table.monitor(watcher_a, region, monitored);
let ref_b = table.monitor(watcher_b, region, monitored);
assert_eq!(table.len(), 2);
let watchers = table.watchers_of(monitored);
assert_eq!(watchers.len(), 2);
assert!(watchers.contains(&ref_a));
assert!(watchers.contains(&ref_b));
crate::test_complete!("monitor_multiple_watchers_single_target");
}
#[test]
fn monitor_same_pair_multiple_times() {
init_test("monitor_same_pair_multiple_times");
let mut table = MonitorTable::new();
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
let region = region_id(0, 0);
let ref1 = table.monitor(watcher, region, monitored);
let ref2 = table.monitor(watcher, region, monitored);
assert_ne!(ref1, ref2);
assert_eq!(table.len(), 2);
crate::test_complete!("monitor_same_pair_multiple_times");
}
#[test]
fn notify_down_basic() {
init_test("notify_down_basic");
let mut table = MonitorTable::new();
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
let region = region_id(0, 0);
let mref = table.monitor(watcher, region, monitored);
let downs = table.notify_down(monitored, &Outcome::Ok(()), Time::from_secs(5));
assert_eq!(downs.len(), 1);
assert_eq!(downs[0].monitored, monitored);
assert_eq!(downs[0].monitor_ref, mref);
assert_eq!(downs[0].completion_vt, Time::from_secs(5));
assert!(table.is_empty());
crate::test_complete!("notify_down_basic");
}
#[test]
fn notify_down_multiple_watchers() {
init_test("notify_down_multiple_watchers");
let mut table = MonitorTable::new();
let monitored = task_id(10, 0);
let watcher_a = task_id(1, 0);
let watcher_b = task_id(2, 0);
let region = region_id(0, 0);
let _ref_a = table.monitor(watcher_a, region, monitored);
let _ref_b = table.monitor(watcher_b, region, monitored);
let downs = table.notify_down(monitored, &Outcome::Err(()), Time::from_secs(1));
assert_eq!(downs.len(), 2);
assert!(table.is_empty());
crate::test_complete!("notify_down_multiple_watchers");
}
#[test]
fn notify_down_ordering_by_vt_then_tid() {
init_test("notify_down_ordering_by_vt_then_tid");
let mut table = MonitorTable::new();
let watcher = task_id(0, 0);
let region = region_id(0, 0);
let t_low = task_id(1, 0);
let t_high = task_id(5, 0);
let t_mid = task_id(3, 0);
table.monitor(watcher, region, t_low);
table.monitor(watcher, region, t_high);
table.monitor(watcher, region, t_mid);
let terminations = vec![
(t_high, Outcome::Ok(()), Time::from_secs(10)),
(t_low, Outcome::Ok(()), Time::from_secs(10)),
(t_mid, Outcome::Ok(()), Time::from_secs(10)),
];
let downs = table.notify_down_batch(&terminations);
assert_eq!(downs.len(), 3);
assert_eq!(downs[0].monitored, t_low);
assert_eq!(downs[1].monitored, t_mid);
assert_eq!(downs[2].monitored, t_high);
crate::test_complete!("notify_down_ordering_by_vt_then_tid");
}
#[test]
fn notify_down_ordering_vt_primary() {
init_test("notify_down_ordering_vt_primary");
let mut table = MonitorTable::new();
let watcher = task_id(0, 0);
let region = region_id(0, 0);
let t_early_high_id = task_id(99, 0);
let t_late_low_id = task_id(1, 0);
table.monitor(watcher, region, t_early_high_id);
table.monitor(watcher, region, t_late_low_id);
let terminations = vec![
(t_late_low_id, Outcome::Ok(()), Time::from_secs(20)),
(t_early_high_id, Outcome::Err(()), Time::from_secs(10)),
];
let downs = table.notify_down_batch(&terminations);
assert_eq!(downs.len(), 2);
assert_eq!(downs[0].monitored, t_early_high_id);
assert_eq!(downs[0].completion_vt, Time::from_secs(10));
assert_eq!(downs[1].monitored, t_late_low_id);
assert_eq!(downs[1].completion_vt, Time::from_secs(20));
crate::test_complete!("notify_down_ordering_vt_primary");
}
#[test]
fn cleanup_region_releases_monitors() {
init_test("cleanup_region_releases_monitors");
let mut table = MonitorTable::new();
let region_a = region_id(1, 0);
let region_b = region_id(2, 0);
let watcher_a = task_id(1, 0);
let watcher_b = task_id(2, 0);
let monitored = task_id(10, 0);
table.monitor(watcher_a, region_a, monitored);
table.monitor(watcher_b, region_b, monitored);
assert_eq!(table.len(), 2);
let released = table.cleanup_region(region_a);
assert_eq!(released, 1);
assert_eq!(table.len(), 1);
assert_eq!(table.watchers_of(monitored).len(), 1);
let released = table.cleanup_region(region_b);
assert_eq!(released, 1);
assert!(table.is_empty());
crate::test_complete!("cleanup_region_releases_monitors");
}
#[test]
fn monitor_reverse_indexes_prune_empty_buckets() {
init_test("monitor_reverse_indexes_prune_empty_buckets");
let mut table = MonitorTable::new();
let region = region_id(7, 0);
let watcher = task_id(3, 0);
let monitored = task_id(5, 0);
let mref = table.monitor(watcher, region, monitored);
assert_eq!(table.by_monitored.len(), 1);
assert_eq!(table.by_region.len(), 1);
assert!(table.demonitor(mref));
assert!(table.by_monitored.is_empty());
assert!(table.by_region.is_empty());
let _mref = table.monitor(watcher, region, monitored);
let _downs = table.notify_down(monitored, &Outcome::Ok(()), Time::ZERO);
assert!(table.by_monitored.is_empty());
assert!(table.by_region.is_empty());
let _mref = table.monitor(watcher, region, monitored);
assert_eq!(table.cleanup_region(region), 1);
assert!(table.by_monitored.is_empty());
assert!(table.by_region.is_empty());
crate::test_complete!("monitor_reverse_indexes_prune_empty_buckets");
}
#[test]
fn cleanup_region_idempotent() {
init_test("cleanup_region_idempotent");
let mut table = MonitorTable::new();
let region = region_id(1, 0);
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
table.monitor(watcher, region, monitored);
assert_eq!(table.cleanup_region(region), 1);
assert_eq!(table.cleanup_region(region), 0);
crate::test_complete!("cleanup_region_idempotent");
}
#[test]
fn notify_down_no_monitors_returns_empty() {
init_test("notify_down_no_monitors_returns_empty");
let mut table = MonitorTable::new();
let task = task_id(99, 0);
let downs = table.notify_down(task, &Outcome::Ok(()), Time::ZERO);
assert!(downs.is_empty());
crate::test_complete!("notify_down_no_monitors_returns_empty");
}
#[test]
fn demonitor_prevents_down_delivery() {
init_test("demonitor_prevents_down_delivery");
let mut table = MonitorTable::new();
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
let region = region_id(0, 0);
let mref = table.monitor(watcher, region, monitored);
assert!(table.demonitor(mref));
let downs = table.notify_down(monitored, &Outcome::Ok(()), Time::from_secs(1));
assert!(downs.is_empty());
crate::test_complete!("demonitor_prevents_down_delivery");
}
#[test]
fn region_cleanup_prevents_down_delivery() {
init_test("region_cleanup_prevents_down_delivery");
let mut table = MonitorTable::new();
let watcher = task_id(1, 0);
let monitored = task_id(2, 0);
let region = region_id(0, 0);
table.monitor(watcher, region, monitored);
table.cleanup_region(region);
let downs = table.notify_down(monitored, &Outcome::Ok(()), Time::from_secs(1));
assert!(downs.is_empty());
crate::test_complete!("region_cleanup_prevents_down_delivery");
}
#[test]
fn down_sort_key_matches_contract() {
init_test("down_sort_key_matches_contract");
let d = Down {
monitored: task_id(5, 2),
reason: Outcome::Ok(()),
monitor_ref: MonitorRef::new_for_test(0),
completion_vt: Time::from_secs(42),
};
let (vt, tid) = d.sort_key();
assert_eq!(vt, Time::from_secs(42));
assert_eq!(tid, task_id(5, 2));
crate::test_complete!("down_sort_key_matches_contract");
}
#[test]
fn monitor_event_variants() {
init_test("monitor_event_variants");
let _established = MonitorEvent::Established {
watcher: task_id(1, 0),
monitored: task_id(2, 0),
monitor_ref: MonitorRef::new_for_test(0),
};
let _demonitored = MonitorEvent::Demonitored {
monitor_ref: MonitorRef::new_for_test(0),
};
let _down_produced = MonitorEvent::DownProduced {
monitored: task_id(2, 0),
watcher: task_id(1, 0),
monitor_ref: MonitorRef::new_for_test(0),
completion_vt: Time::from_secs(1),
};
let _cleanup = MonitorEvent::RegionCleanup {
region: region_id(0, 0),
count: 5,
};
crate::test_complete!("monitor_event_variants");
}
#[test]
fn notify_down_batch_merges_and_sorts() {
init_test("notify_down_batch_merges_and_sorts");
let mut table = MonitorTable::new();
let watcher = task_id(0, 0);
let region = region_id(0, 0);
let tasks: Vec<TaskId> = (1..=5).map(|i| task_id(i, 0)).collect();
for &t in &tasks {
table.monitor(watcher, region, t);
}
let terminations = vec![
(tasks[4], Outcome::Ok(()), Time::from_secs(3)),
(tasks[0], Outcome::Err(()), Time::from_secs(1)),
(tasks[2], Outcome::Ok(()), Time::from_secs(1)),
(tasks[1], Outcome::Ok(()), Time::from_secs(2)),
(tasks[3], Outcome::Err(()), Time::from_secs(1)),
];
let downs = table.notify_down_batch(&terminations);
assert_eq!(downs.len(), 5);
assert_eq!(downs[0].monitored, tasks[0]);
assert_eq!(downs[1].monitored, tasks[2]);
assert_eq!(downs[2].monitored, tasks[3]);
assert_eq!(downs[3].monitored, tasks[1]);
assert_eq!(downs[4].monitored, tasks[4]);
assert!(table.is_empty());
crate::test_complete!("notify_down_batch_merges_and_sorts");
}
#[test]
fn monitor_ref_ordering_is_monotone() {
init_test("monitor_ref_ordering_is_monotone");
let mut table = MonitorTable::new();
let watcher = task_id(0, 0);
let region = region_id(0, 0);
let ref1 = table.monitor(watcher, region, task_id(1, 0));
let ref2 = table.monitor(watcher, region, task_id(2, 0));
let ref3 = table.monitor(watcher, region, task_id(3, 0));
assert!(ref1 < ref2);
assert!(ref2 < ref3);
crate::test_complete!("monitor_ref_ordering_is_monotone");
}
#[test]
fn down_equality() {
init_test("down_equality");
let d1 = Down {
monitored: task_id(1, 0),
reason: Outcome::Ok(()),
monitor_ref: MonitorRef::new_for_test(5),
completion_vt: Time::from_secs(10),
};
let d2 = Down {
monitored: task_id(1, 0),
reason: Outcome::Err(()),
monitor_ref: MonitorRef::new_for_test(5),
completion_vt: Time::from_secs(10),
};
assert_eq!(d1, d2);
let d3 = Down {
monitored: task_id(2, 0),
reason: Outcome::Ok(()),
monitor_ref: MonitorRef::new_for_test(5),
completion_vt: Time::from_secs(10),
};
assert_ne!(d1, d3);
crate::test_complete!("down_equality");
}
#[allow(clippy::too_many_lines)]
#[test]
fn conformance_monotone_severity_cross_product() {
init_test("conformance_monotone_severity_cross_product");
let outcomes: Vec<(&str, Outcome<(), ()>)> = vec![
("Ok", Outcome::Ok(())),
("Err", Outcome::Err(())),
("Cancelled", Outcome::Cancelled(CancelReason::user("test"))),
(
"Panicked",
Outcome::Panicked(PanicPayload::new("test panic")),
),
];
let strategies: Vec<(&str, SupervisionStrategy)> = vec![
("Stop", SupervisionStrategy::Stop),
(
"Restart",
SupervisionStrategy::Restart(RestartConfig::new(10, Duration::from_mins(1))),
),
("Escalate", SupervisionStrategy::Escalate),
];
let parent = RegionId::from_arena(ArenaIndex::new(0, 99));
for (outcome_name, outcome) in &outcomes {
for (strategy_name, strategy) in &strategies {
let mut supervisor = Supervisor::new(strategy.clone());
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
Some(parent),
outcome,
0,
);
match (outcome_name, strategy_name) {
(&"Panicked", _) => {
assert!(
matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Panicked,
..
}
),
"Panicked + {strategy_name} should Stop(Panicked)"
);
}
(&"Cancelled", _) => {
assert!(
matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::Cancelled(_),
..
}
),
"Cancelled + {strategy_name} should Stop(Cancelled)"
);
}
(_, &"Stop") => {
assert!(
matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::ExplicitStop,
..
}
),
"{outcome_name} + Stop should Stop(ExplicitStop)"
);
}
(&"Err", &"Escalate") => {
assert!(
matches!(decision, SupervisionDecision::Escalate { .. }),
"Err + Escalate should Escalate"
);
}
(&"Err", &"Restart") => {
assert!(
matches!(decision, SupervisionDecision::Restart { attempt: 1, .. }),
"Err + Restart should Restart(attempt=1)"
);
}
(&"Ok", &"Escalate") => {
assert!(
matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::ExplicitStop,
..
}
),
"Ok + {strategy_name} should Stop(ExplicitStop) (fallback)"
);
}
(&"Ok", &"Restart") => {
assert!(
matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::ExplicitStop,
..
}
),
"Ok + {strategy_name} should Stop(ExplicitStop) (fallback)"
);
}
_ => unreachable!(),
}
}
}
crate::test_complete!("conformance_monotone_severity_cross_product");
}
#[test]
fn conformance_one_for_one_isolates_failed_child() {
init_test("conformance_one_for_one_isolates_failed_child");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForOne)
.child(
ChildSpec::new("db", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("cache", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("web", noop_start)
.depends_on("cache")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
);
let compiled = builder.compile().expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("cache", &err)
.expect("restart plan");
assert_eq!(plan.policy, RestartPolicy::OneForOne);
assert_eq!(plan.cancel_order, vec!["cache"]);
assert_eq!(plan.restart_order, vec!["cache"]);
let plan = compiled
.restart_plan_for_failure("db", &err)
.expect("restart plan");
assert_eq!(plan.cancel_order, vec!["db"]);
assert_eq!(plan.restart_order, vec!["db"]);
let plan = compiled
.restart_plan_for_failure("web", &err)
.expect("restart plan");
assert_eq!(plan.cancel_order, vec!["web"]);
assert_eq!(plan.restart_order, vec!["web"]);
crate::test_complete!("conformance_one_for_one_isolates_failed_child");
}
#[test]
fn conformance_one_for_all_restarts_all_children() {
init_test("conformance_one_for_all_restarts_all_children");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("a", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("b", noop_start)
.depends_on("a")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("c", noop_start)
.depends_on("b")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
);
let compiled = builder.compile().expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
for failed in &["a", "b", "c"] {
let plan = compiled
.restart_plan_for_failure(failed, &err)
.expect("restart plan");
assert_eq!(plan.policy, RestartPolicy::OneForAll);
assert_eq!(plan.cancel_order, vec!["c", "b", "a"]);
assert_eq!(plan.restart_order, vec!["a", "b", "c"]);
}
crate::test_complete!("conformance_one_for_all_restarts_all_children");
}
#[test]
fn conformance_rest_for_one_restarts_suffix() {
init_test("conformance_rest_for_one_restarts_suffix");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::RestForOne)
.child(
ChildSpec::new("a", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("b", noop_start)
.depends_on("a")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("c", noop_start)
.depends_on("b")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("d", noop_start)
.depends_on("c")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
);
let compiled = builder.compile().expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("b", &err)
.expect("restart plan");
assert_eq!(plan.policy, RestartPolicy::RestForOne);
assert_eq!(plan.cancel_order, vec!["d", "c", "b"]); assert_eq!(plan.restart_order, vec!["b", "c", "d"]);
let plan = compiled
.restart_plan_for_failure("a", &err)
.expect("restart plan");
assert_eq!(plan.cancel_order, vec!["d", "c", "b", "a"]);
assert_eq!(plan.restart_order, vec!["a", "b", "c", "d"]);
let plan = compiled
.restart_plan_for_failure("d", &err)
.expect("restart plan");
assert_eq!(plan.cancel_order, vec!["d"]);
assert_eq!(plan.restart_order, vec!["d"]);
crate::test_complete!("conformance_rest_for_one_restarts_suffix");
}
#[test]
fn conformance_escalation_without_parent_region() {
init_test("conformance_escalation_without_parent_region");
let mut supervisor = Supervisor::new(SupervisionStrategy::Escalate);
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None, &Outcome::Err(()),
0,
);
match decision {
SupervisionDecision::Escalate {
parent_region_id,
task_id: tid,
region_id: rid,
..
} => {
assert!(parent_region_id.is_none(), "root escalation has no parent");
assert_eq!(tid, test_task_id());
assert_eq!(rid, test_region_id());
}
other => unreachable!("expected Escalate, got {other:?}"),
}
crate::test_complete!("conformance_escalation_without_parent_region");
}
#[test]
fn conformance_budget_exhaustion_idempotent_stop() {
init_test("conformance_budget_exhaustion_idempotent_stop");
let config = RestartConfig::new(1, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let d1 =
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
assert!(matches!(
d1,
SupervisionDecision::Restart { attempt: 1, .. }
));
let d2 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000,
);
assert!(matches!(
d2,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted { .. },
..
}
));
let d3 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
);
assert!(matches!(
d3,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted { .. },
..
}
));
crate::test_complete!("conformance_budget_exhaustion_idempotent_stop");
}
#[test]
fn restart_budget_exhaustion_reports_observed_restart_count() {
init_test("restart_budget_exhaustion_reports_observed_restart_count");
let config = RestartConfig::new(1, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let history = supervisor
.history
.as_mut()
.expect("restart strategy initializes history");
history.record_restart(0);
history.record_restart(1_000_000_000);
history.record_restart(2_000_000_000);
let decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted {
total_restarts: 3,
window,
},
..
} if window == Duration::from_mins(1)
));
crate::test_complete!("restart_budget_exhaustion_reports_observed_restart_count");
}
#[test]
fn restart_budget_exhaustion_with_budget_reports_observed_restart_count() {
init_test("restart_budget_exhaustion_with_budget_reports_observed_restart_count");
let config = RestartConfig::new(1, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let history = supervisor
.history
.as_mut()
.expect("restart strategy initializes history");
history.record_restart(0);
history.record_restart(1_000_000_000);
history.record_restart(2_000_000_000);
let mut budget = Budget::INFINITE;
let decision = supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
Some(&mut budget),
);
assert!(matches!(
decision,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted {
total_restarts: 3,
window,
},
..
} if window == Duration::from_mins(1)
));
crate::test_complete!(
"restart_budget_exhaustion_with_budget_reports_observed_restart_count"
);
}
#[test]
fn conformance_budget_refusal_checks_window_first() {
init_test("conformance_budget_refusal_checks_window_first");
let config = RestartConfig::new(1, Duration::from_mins(1))
.with_restart_cost(100)
.with_min_polls(500);
let mut history = RestartHistory::new(config);
history.record_restart(0);
let bad_budget = Budget::new().with_cost_quota(10).with_poll_quota(50);
let result = history.can_restart_with_budget(1_000_000_000, &bad_budget);
assert!(
matches!(result, Err(BudgetRefusal::WindowExhausted { .. })),
"window exhaustion should be checked before budget constraints"
);
crate::test_complete!("conformance_budget_refusal_checks_window_first");
}
#[test]
fn conformance_restart_window_boundary_exact() {
init_test("conformance_restart_window_boundary_exact");
let config = RestartConfig::new(2, Duration::from_secs(10));
let mut history = RestartHistory::new(config);
history.record_restart(0);
history.record_restart(1_000_000_000);
assert!(!history.can_restart(9_999_999_999));
assert!(!history.can_restart(10_000_000_000));
assert!(history.can_restart(10_000_000_001));
crate::test_complete!("conformance_restart_window_boundary_exact");
}
#[test]
fn conformance_decision_carries_correct_ids() {
init_test("conformance_decision_carries_correct_ids");
let task = TaskId::from_arena(ArenaIndex::new(42, 7));
let region = RegionId::from_arena(ArenaIndex::new(10, 3));
let parent = RegionId::from_arena(ArenaIndex::new(0, 1));
{
let mut sup = Supervisor::new(SupervisionStrategy::Stop);
let decision = sup.on_failure(task, region, Some(parent), &Outcome::Err(()), 0);
match decision {
SupervisionDecision::Stop {
task_id: tid,
region_id: rid,
reason,
} => {
assert_eq!(tid, task);
assert_eq!(rid, region);
assert_eq!(reason, StopReason::ExplicitStop);
}
other => unreachable!("expected Stop, got {other:?}"),
}
}
{
let config = RestartConfig::new(5, Duration::from_mins(1));
let mut sup = Supervisor::new(SupervisionStrategy::Restart(config));
for expected_attempt in 1..=3u32 {
let decision = sup.on_failure(
task,
region,
Some(parent),
&Outcome::Err(()),
u64::from(expected_attempt - 1) * 1_000_000_000,
);
match decision {
SupervisionDecision::Restart {
task_id: tid,
region_id: rid,
attempt,
..
} => {
assert_eq!(tid, task);
assert_eq!(rid, region);
assert_eq!(attempt, expected_attempt);
}
other => {
unreachable!("expected Restart attempt={expected_attempt}, got {other:?}")
}
}
}
}
{
let mut sup = Supervisor::new(SupervisionStrategy::Escalate);
let decision = sup.on_failure(task, region, Some(parent), &Outcome::Err(()), 0);
match decision {
SupervisionDecision::Escalate {
task_id: tid,
region_id: rid,
parent_region_id,
..
} => {
assert_eq!(tid, task);
assert_eq!(rid, region);
assert_eq!(parent_region_id, Some(parent));
}
other => unreachable!("expected Escalate, got {other:?}"),
}
}
crate::test_complete!("conformance_decision_carries_correct_ids");
}
#[test]
fn conformance_restart_delay_matches_backoff() {
init_test("conformance_restart_delay_matches_backoff");
let config = RestartConfig::new(5, Duration::from_mins(1)).with_backoff(
BackoffStrategy::Exponential {
initial: Duration::from_millis(100),
max: Duration::from_secs(10),
multiplier: 2.0,
},
);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config.clone()));
let d1 =
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
match d1 {
SupervisionDecision::Restart { delay, attempt, .. } => {
assert_eq!(attempt, 1);
assert_eq!(delay, config.backoff.delay_for_attempt(0));
}
other => unreachable!("expected Restart, got {other:?}"),
}
let d2 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000,
);
match d2 {
SupervisionDecision::Restart { delay, attempt, .. } => {
assert_eq!(attempt, 2);
assert_eq!(delay, config.backoff.delay_for_attempt(1));
}
other => unreachable!("expected Restart, got {other:?}"),
}
let d3 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2_000_000_000,
);
match d3 {
SupervisionDecision::Restart { delay, attempt, .. } => {
assert_eq!(attempt, 3);
assert_eq!(delay, config.backoff.delay_for_attempt(2));
}
other => unreachable!("expected Restart, got {other:?}"),
}
crate::test_complete!("conformance_restart_delay_matches_backoff");
}
#[test]
fn conformance_spawn_dependency_ordered_start() {
init_test("conformance_spawn_dependency_ordered_start");
let log = Arc::new(Mutex::new(Vec::new()));
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new(
"db",
LoggingStart {
name: "db",
log: Arc::clone(&log),
},
))
.child(
ChildSpec::new(
"cache",
LoggingStart {
name: "cache",
log: Arc::clone(&log),
},
)
.depends_on("db"),
)
.child(
ChildSpec::new(
"web",
LoggingStart {
name: "web",
log: Arc::clone(&log),
},
)
.depends_on("cache"),
)
.child(
ChildSpec::new(
"deferred",
LoggingStart {
name: "deferred",
log: Arc::clone(&log),
},
)
.depends_on("db")
.with_start_immediately(false),
);
let compiled = builder.compile().expect("compile");
assert_eq!(compiled.start_order.len(), 4);
assert_eq!(compiled.children[compiled.start_order[0]].name, "db");
assert_eq!(compiled.children[compiled.start_order[1]].name, "cache");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let handle = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect("spawn");
let started: Vec<String> = log.lock().clone();
assert_eq!(started, vec!["db", "cache", "web"]);
assert!(!started.contains(&"deferred".to_string()));
assert_eq!(handle.started.len(), 3);
assert_eq!(handle.started[0].name, "db");
assert_eq!(handle.started[1].name, "cache");
assert_eq!(handle.started[2].name, "web");
crate::test_complete!("conformance_spawn_dependency_ordered_start");
}
#[test]
fn deferred_children_are_skipped_at_boot_and_stay_out_of_sibling_restart_plans() {
init_test("deferred_children_are_skipped_at_boot_and_stay_out_of_sibling_restart_plans");
let compiled = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("db", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("cache", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("deferred", noop_start)
.depends_on("cache")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default()))
.with_start_immediately(false),
)
.compile()
.expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("cache", &err)
.expect("compiled restart planning should cover started siblings");
assert_eq!(
plan.cancel_order,
vec![ChildName::from("cache"), ChildName::from("db")]
);
assert_eq!(
plan.restart_order,
vec![ChildName::from("db"), ChildName::from("cache")]
);
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let handle = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect("spawn");
assert_eq!(handle.started.len(), 2);
assert_eq!(handle.started[0].name, "db");
assert_eq!(handle.started[1].name, "cache");
crate::test_complete!(
"deferred_children_are_skipped_at_boot_and_stay_out_of_sibling_restart_plans"
);
}
#[test]
fn failed_deferred_child_remains_in_restart_plan() {
init_test("failed_deferred_child_remains_in_restart_plan");
let compiled = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("db", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("deferred", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default()))
.with_start_immediately(false),
)
.compile()
.expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("deferred", &err)
.expect("the failed child itself should remain restartable");
assert_eq!(
plan.cancel_order,
vec![ChildName::from("deferred"), ChildName::from("db")]
);
assert_eq!(
plan.restart_order,
vec![ChildName::from("db"), ChildName::from("deferred")]
);
crate::test_complete!("failed_deferred_child_remains_in_restart_plan");
}
#[test]
fn conformance_spawn_required_vs_optional_child_failure() {
#[allow(clippy::unnecessary_wraps)]
fn failing_start(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Err(SpawnError::RegionClosed(test_region_id()))
}
init_test("conformance_spawn_required_vs_optional_child_failure");
{
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("ok_child", noop_start))
.child(
ChildSpec::new("optional_fail", failing_start)
.with_required(false)
.depends_on("ok_child"),
);
let compiled = builder.compile().expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let result = compiled.spawn(&mut state, &cx, parent, Budget::INFINITE);
assert!(
result.is_ok(),
"optional child failure should not fail supervisor"
);
}
{
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("ok_child", noop_start))
.child(
ChildSpec::new("required_fail", failing_start)
.with_required(true)
.depends_on("ok_child"),
);
let compiled = builder.compile().expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let result = compiled.spawn(&mut state, &cx, parent, Budget::INFINITE);
assert!(
result.is_err(),
"required child failure should fail supervisor"
);
match result.unwrap_err() {
SupervisorSpawnError::ChildStartFailed { child, region, .. } => {
assert_eq!(child, "required_fail");
if let Some(record) = state.region(region) {
let rs = record.state();
assert!(
matches!(
rs,
crate::record::region::RegionState::Closing
| crate::record::region::RegionState::Draining
| crate::record::region::RegionState::Finalizing
| crate::record::region::RegionState::Closed
),
"region should not be Open after partial spawn failure, got {rs:?}"
);
}
}
SupervisorSpawnError::RegionCreate(_) => {
unreachable!("expected ChildStartFailed, got RegionCreate");
}
SupervisorSpawnError::DependencyUnavailable { .. } => {
unreachable!("expected ChildStartFailed, got DependencyUnavailable");
}
}
}
crate::test_complete!("conformance_spawn_required_vs_optional_child_failure");
}
#[test]
fn optional_dependency_failure_skips_eager_dependents() {
#[allow(clippy::unnecessary_wraps)]
fn failing_start(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Err(SpawnError::RegionClosed(test_region_id()))
}
init_test("optional_dependency_failure_skips_eager_dependents");
let log = Arc::new(Mutex::new(Vec::new()));
let compiled = SupervisorBuilder::new("sup")
.child(ChildSpec::new("db", failing_start).with_required(false))
.child(
ChildSpec::new(
"worker",
LoggingStart {
name: "worker",
log: Arc::clone(&log),
},
)
.depends_on("db")
.with_required(false),
)
.child(ChildSpec::new(
"metrics",
LoggingStart {
name: "metrics",
log: Arc::clone(&log),
},
))
.compile()
.expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let handle = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect("optional dependency failure should not fail supervisor");
let started: Vec<String> = log.lock().clone();
assert_eq!(started, vec!["metrics"]);
assert_eq!(handle.started.len(), 1);
assert_eq!(handle.started[0].name, "metrics");
crate::test_complete!("optional_dependency_failure_skips_eager_dependents");
}
#[test]
fn required_child_with_failed_dependency_fails_supervisor_boot() {
#[allow(clippy::unnecessary_wraps)]
fn failing_start(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Err(SpawnError::RegionClosed(test_region_id()))
}
init_test("required_child_with_failed_dependency_fails_supervisor_boot");
let log = Arc::new(Mutex::new(Vec::new()));
let compiled = SupervisorBuilder::new("sup")
.child(ChildSpec::new("db", failing_start).with_required(false))
.child(ChildSpec::new("api", noop_start).depends_on("db"))
.child(ChildSpec::new(
"metrics",
LoggingStart {
name: "metrics",
log: Arc::clone(&log),
},
))
.compile()
.expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let err = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect_err("required dependent with failed dependency should fail boot");
let (region, dependency_error) = match err {
SupervisorSpawnError::DependencyUnavailable {
child,
dependency,
dependency_error,
region,
} => {
assert_eq!(child, "api");
assert_eq!(dependency, "db");
(region, dependency_error)
}
SupervisorSpawnError::RegionCreate(_) => {
panic!("expected DependencyUnavailable, got RegionCreate")
}
SupervisorSpawnError::ChildStartFailed { .. } => {
panic!("expected DependencyUnavailable, got ChildStartFailed")
}
};
assert!(
matches!(dependency_error, Some(SpawnError::RegionClosed(_))),
"dependency root cause should preserve the original start failure"
);
let record = state
.region(region)
.expect("supervisor region should still be tracked after boot failure");
let started_task = *record
.task_ids()
.first()
.expect("independent child should have started before boot failed");
let task = state
.task(started_task)
.expect("started child task should exist");
assert!(
task.state.is_cancelling(),
"already-started sibling should be cancelled when boot fails on dependency availability"
);
assert_eq!(log.lock().as_slice(), ["metrics"]);
crate::test_complete!("required_child_with_failed_dependency_fails_supervisor_boot");
}
#[test]
fn transitive_dependency_unavailable_reports_direct_dependency() {
#[allow(clippy::unnecessary_wraps)]
fn failing_start(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Err(SpawnError::RegionClosed(test_region_id()))
}
init_test("transitive_dependency_unavailable_reports_direct_dependency");
let compiled = SupervisorBuilder::new("sup")
.child(ChildSpec::new("db", failing_start).with_required(false))
.child(
ChildSpec::new("api", noop_start)
.depends_on("db")
.with_required(false),
)
.child(ChildSpec::new("frontend", noop_start).depends_on("api"))
.compile()
.expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let err = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect_err("required transitive dependent should fail boot");
match err {
SupervisorSpawnError::DependencyUnavailable {
child,
dependency,
dependency_error,
..
} => {
assert_eq!(child, "frontend");
assert_eq!(
dependency, "api",
"direct blocker should be reported even when the root cause is transitive"
);
assert!(
matches!(dependency_error, Some(SpawnError::RegionClosed(_))),
"root-cause spawn error should still be preserved"
);
}
SupervisorSpawnError::RegionCreate(_) => {
panic!("expected DependencyUnavailable, got RegionCreate")
}
SupervisorSpawnError::ChildStartFailed { .. } => {
panic!("expected DependencyUnavailable, got ChildStartFailed")
}
}
crate::test_complete!("transitive_dependency_unavailable_reports_direct_dependency");
}
#[test]
fn required_child_failure_requests_cancel_for_started_task() {
fn failing_start(
scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
Err(SpawnError::RegionClosed(scope.region_id()))
}
init_test("required_child_failure_requests_cancel_for_started_task");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("started", spawn_registered_child))
.child(
ChildSpec::new("required_fail", failing_start)
.with_required(true)
.depends_on("started"),
);
let compiled = builder.compile().expect("compile");
let mut state = RuntimeState::new();
let parent = state.create_root_region(Budget::INFINITE);
let cx: crate::cx::Cx = crate::cx::Cx::for_testing();
let err = compiled
.spawn(&mut state, &cx, parent, Budget::INFINITE)
.expect_err("required child failure should fail supervisor");
let (region, started_task) = match err {
SupervisorSpawnError::ChildStartFailed { region, .. } => {
let record = state
.region(region)
.expect("supervisor region should exist");
let started_task = *record
.task_ids()
.first()
.expect("started task should remain tracked");
(region, started_task)
}
SupervisorSpawnError::RegionCreate(_) => {
panic!("expected ChildStartFailed, got RegionCreate")
}
SupervisorSpawnError::DependencyUnavailable { .. } => {
panic!("expected ChildStartFailed, got DependencyUnavailable")
}
};
let task = state.task(started_task).expect("started task should exist");
assert!(
task.state.is_cancelling(),
"started child task should enter cancellation after supervisor boot failure"
);
assert!(
state
.region(region)
.and_then(crate::record::RegionRecord::cancel_reason)
.is_some(),
"failed supervisor region should retain a cancel reason"
);
crate::test_complete!("required_child_failure_requests_cancel_for_started_task");
}
#[test]
fn conformance_per_child_strategy_vs_supervisor_policy() {
init_test("conformance_per_child_strategy_vs_supervisor_policy");
let builder = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("restartable", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("stopper", noop_start)
.depends_on("restartable")
.with_restart(SupervisionStrategy::Stop),
)
.child(
ChildSpec::new("escalator", noop_start)
.depends_on("restartable")
.with_restart(SupervisionStrategy::Escalate),
);
let compiled = builder.compile().expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
assert!(
compiled
.restart_plan_for_failure("restartable", &err)
.is_some()
);
assert!(compiled.restart_plan_for_failure("stopper", &err).is_none());
assert!(
compiled
.restart_plan_for_failure("escalator", &err)
.is_none()
);
crate::test_complete!("conformance_per_child_strategy_vs_supervisor_policy");
}
#[test]
fn conformance_failure_plan_prunes_non_restartable_siblings_and_blocked_dependents() {
init_test(
"conformance_failure_plan_prunes_non_restartable_siblings_and_blocked_dependents",
);
let compiled = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("db", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("cache", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Stop),
)
.child(
ChildSpec::new("web", noop_start)
.depends_on("cache")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("metrics", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.compile()
.expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("db", &err)
.expect("restart plan");
assert_eq!(
plan.cancel_order,
vec![
ChildName::from("metrics"),
ChildName::from("web"),
ChildName::from("cache"),
ChildName::from("db"),
]
);
assert_eq!(
plan.restart_order,
vec![ChildName::from("db"), ChildName::from("metrics")]
);
crate::test_complete!(
"conformance_failure_plan_prunes_non_restartable_siblings_and_blocked_dependents"
);
}
#[test]
fn conformance_failure_plan_prunes_escalating_siblings_from_restart_order() {
init_test("conformance_failure_plan_prunes_escalating_siblings_from_restart_order");
let compiled = SupervisorBuilder::new("sup")
.with_restart_policy(RestartPolicy::OneForAll)
.child(
ChildSpec::new("db", noop_start)
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.child(
ChildSpec::new("audit", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Escalate),
)
.child(
ChildSpec::new("metrics", noop_start)
.depends_on("db")
.with_restart(SupervisionStrategy::Restart(RestartConfig::default())),
)
.compile()
.expect("compile");
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("db", &err)
.expect("restart plan");
assert_eq!(
plan.cancel_order,
vec![
ChildName::from("metrics"),
ChildName::from("audit"),
ChildName::from("db"),
]
);
assert_eq!(
plan.restart_order,
vec![ChildName::from("db"), ChildName::from("metrics")]
);
crate::test_complete!(
"conformance_failure_plan_prunes_escalating_siblings_from_restart_order"
);
}
#[test]
fn conformance_window_expiry_restores_restart_ability() {
init_test("conformance_window_expiry_restores_restart_ability");
let config = RestartConfig::new(1, Duration::from_secs(5));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let d1 =
supervisor.on_failure(test_task_id(), test_region_id(), None, &Outcome::Err(()), 0);
assert!(matches!(
d1,
SupervisionDecision::Restart { attempt: 1, .. }
));
let d2 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1_000_000_000,
);
assert!(matches!(
d2,
SupervisionDecision::Stop {
reason: StopReason::RestartBudgetExhausted { .. },
..
}
));
let d3 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
6_000_000_000,
);
assert!(
matches!(d3, SupervisionDecision::Restart { attempt: 1, .. }),
"window expiry should restore restart ability"
);
crate::test_complete!("conformance_window_expiry_restores_restart_ability");
}
#[test]
fn conformance_intensity_storm_threshold_boundary() {
init_test("conformance_intensity_storm_threshold_boundary");
let mut window = RestartIntensityWindow::new(Duration::from_secs(10), 2.0);
for i in 0u64..20 {
window.record(i * 500_000_000); }
let now = 10_000_000_000; let intensity = window.intensity(now);
assert!(
(intensity - 2.0).abs() < 0.01,
"20 restarts in 10s should be ~2.0/s"
);
assert!(!window.is_storm(now));
window.record(10_000_000_000);
assert!(window.is_storm(10_000_000_000));
crate::test_complete!("conformance_intensity_storm_threshold_boundary");
}
#[test]
#[should_panic(expected = "storm threshold must be finite and > 0")]
fn intensity_window_zero_threshold_panics() {
let _window = RestartIntensityWindow::new(Duration::from_secs(1), 0.0);
}
#[test]
fn conformance_compile_rejects_duplicate_names() {
init_test("conformance_compile_rejects_duplicate_names");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("worker", noop_start))
.child(ChildSpec::new("worker", noop_start));
let result = builder.compile();
assert!(matches!(
result,
Err(SupervisorCompileError::DuplicateChildName(ref name)) if name == "worker"
));
crate::test_complete!("conformance_compile_rejects_duplicate_names");
}
#[test]
fn conformance_compile_rejects_unknown_dependency() {
init_test("conformance_compile_rejects_unknown_dependency");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("a", noop_start).depends_on("nonexistent"));
let result = builder.compile();
assert!(matches!(
result,
Err(SupervisorCompileError::UnknownDependency { ref child, ref depends_on })
if child == "a" && depends_on == "nonexistent"
));
crate::test_complete!("conformance_compile_rejects_unknown_dependency");
}
#[test]
fn conformance_compile_rejects_immediate_child_with_deferred_dependency() {
init_test("conformance_compile_rejects_immediate_child_with_deferred_dependency");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("db", noop_start).with_start_immediately(false))
.child(ChildSpec::new("api", noop_start).depends_on("db"));
let result = builder.compile();
assert!(matches!(
result,
Err(SupervisorCompileError::DeferredDependency { ref child, ref depends_on })
if child == "api" && depends_on == "db"
));
crate::test_complete!(
"conformance_compile_rejects_immediate_child_with_deferred_dependency"
);
}
#[test]
fn conformance_compile_rejects_cycles() {
init_test("conformance_compile_rejects_cycles");
let builder = SupervisorBuilder::new("sup")
.child(ChildSpec::new("a", noop_start).depends_on("c"))
.child(ChildSpec::new("b", noop_start).depends_on("a"))
.child(ChildSpec::new("c", noop_start).depends_on("b"));
let result = builder.compile();
match result {
Err(SupervisorCompileError::CycleDetected { remaining }) => {
assert_eq!(remaining.len(), 3);
assert!(remaining.contains(&ChildName::from("a")));
assert!(remaining.contains(&ChildName::from("b")));
assert!(remaining.contains(&ChildName::from("c")));
}
other => unreachable!("expected CycleDetected, got {other:?}"),
}
crate::test_complete!("conformance_compile_rejects_cycles");
}
#[test]
fn conformance_name_lex_tie_break() {
init_test("conformance_name_lex_tie_break");
let builder = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::NameLex)
.child(ChildSpec::new("zulu", noop_start))
.child(ChildSpec::new("alpha", noop_start))
.child(ChildSpec::new("mike", noop_start));
let compiled = builder.compile().expect("compile");
let names = compiled.child_start_order_names();
assert_eq!(names, vec!["alpha", "mike", "zulu"]);
crate::test_complete!("conformance_name_lex_tie_break");
}
#[test]
fn conformance_child_start_pos_matches_start_order() {
init_test("conformance_child_start_pos_matches_start_order");
let compiled = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::NameLex)
.child(ChildSpec::new("db", noop_start))
.child(ChildSpec::new("cache", noop_start).depends_on("db"))
.child(ChildSpec::new("web", noop_start).depends_on("db"))
.child(ChildSpec::new("worker", noop_start).depends_on("cache"))
.compile()
.expect("compile");
for (pos, &idx) in compiled.start_order.iter().enumerate() {
let name = compiled.children[idx].name.as_str();
assert_eq!(compiled.child_start_pos(name), Some(pos));
}
assert_eq!(compiled.child_start_pos("does_not_exist"), None);
crate::test_complete!("conformance_child_start_pos_matches_start_order");
}
#[test]
fn conformance_child_stop_order_is_reverse_start_order() {
init_test("conformance_child_stop_order_is_reverse_start_order");
let compiled = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::NameLex)
.child(ChildSpec::new("db", noop_start))
.child(ChildSpec::new("cache", noop_start).depends_on("db"))
.child(ChildSpec::new("web", noop_start).depends_on("db"))
.child(ChildSpec::new("worker", noop_start).depends_on("cache"))
.compile()
.expect("compile");
let start = compiled.child_start_order_names();
let stop = compiled.child_stop_order_names();
let mut expected = start.clone();
expected.reverse();
assert_eq!(stop, expected);
assert_eq!(start, vec!["db", "cache", "web", "worker"]);
assert_eq!(stop, vec!["worker", "web", "cache", "db"]);
crate::test_complete!("conformance_child_stop_order_is_reverse_start_order");
}
#[test]
fn conformance_simultaneous_failures_sorted_by_start_pos_then_task_id() {
init_test("conformance_simultaneous_failures_sorted_by_start_pos_then_task_id");
let compiled = SupervisorBuilder::new("sup")
.with_tie_break(StartTieBreak::InsertionOrder)
.child(ChildSpec::new("a", noop_start))
.child(ChildSpec::new("b", noop_start))
.child(ChildSpec::new("c", noop_start))
.compile()
.expect("compile");
let tid = |n: u32| TaskId::from_arena(ArenaIndex::new(n, 1));
let mut batch = [("c", tid(3)), ("a", tid(1)), ("b", tid(2))];
batch.sort_by_key(|(name, task_id)| {
(
compiled.child_start_pos(name).expect("known child"),
*task_id,
)
});
let names: Vec<&str> = batch.iter().map(|(n, _)| *n).collect();
assert_eq!(names, vec!["a", "b", "c"]);
crate::test_complete!("conformance_simultaneous_failures_sorted_by_start_pos_then_task_id");
}
#[test]
fn evidence_ledger_empty_on_creation() {
init_test("evidence_ledger_empty_on_creation");
let supervisor = Supervisor::new(SupervisionStrategy::Stop);
assert!(supervisor.evidence().is_empty());
assert_eq!(supervisor.evidence().len(), 0);
crate::test_complete!("evidence_ledger_empty_on_creation");
}
#[test]
fn evidence_records_explicit_stop_strategy() {
init_test("evidence_records_explicit_stop_strategy");
let mut supervisor = Supervisor::new(SupervisionStrategy::Stop);
let _decision = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
);
let ledger = supervisor.evidence();
assert_eq!(ledger.len(), 1);
let entry = &ledger.entries()[0];
assert_eq!(entry.timestamp, 1000);
assert_eq!(entry.task_id, test_task_id());
assert_eq!(entry.region_id, test_region_id());
assert_eq!(entry.strategy_kind, "Stop");
assert_eq!(
entry.binding_constraint,
BindingConstraint::ExplicitStopStrategy
);
crate::test_complete!("evidence_records_explicit_stop_strategy");
}
#[test]
fn evidence_records_restart_allowed() {
init_test("evidence_records_restart_allowed");
let config = RestartConfig::new(3, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let _d1 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
);
let _d2 = supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2000,
);
let ledger = supervisor.evidence();
assert_eq!(ledger.len(), 2);
assert_eq!(
ledger.entries()[0].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 1 }
);
assert_eq!(ledger.entries()[0].timestamp, 1000);
assert_eq!(ledger.entries()[0].strategy_kind, "Restart");
assert_eq!(
ledger.entries()[1].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 2 }
);
assert_eq!(ledger.entries()[1].timestamp, 2000);
crate::test_complete!("evidence_records_restart_allowed");
}
#[test]
fn evidence_records_window_exhaustion() {
init_test("evidence_records_window_exhaustion");
let window = Duration::from_secs(10);
let config = RestartConfig::new(2, window);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2000,
);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
3000,
);
let ledger = supervisor.evidence();
assert_eq!(ledger.len(), 3);
assert_eq!(
ledger.entries()[0].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 1 }
);
assert_eq!(
ledger.entries()[1].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 2 }
);
assert_eq!(
ledger.entries()[2].binding_constraint,
BindingConstraint::WindowExhausted {
max_restarts: 2,
window,
}
);
crate::test_complete!("evidence_records_window_exhaustion");
}
#[test]
fn evidence_records_monotone_severity_panicked() {
init_test("evidence_records_monotone_severity_panicked");
let config = RestartConfig::new(5, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Panicked(PanicPayload::new("boom")),
1000,
);
let ledger = supervisor.evidence();
assert_eq!(ledger.len(), 1);
assert_eq!(
ledger.entries()[0].binding_constraint,
BindingConstraint::MonotoneSeverity {
outcome_kind: "Panicked",
}
);
assert_eq!(ledger.entries()[0].strategy_kind, "Restart");
crate::test_complete!("evidence_records_monotone_severity_panicked");
}
#[test]
fn evidence_records_monotone_severity_cancelled() {
init_test("evidence_records_monotone_severity_cancelled");
let config = RestartConfig::new(5, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Cancelled(CancelReason::user("test")),
1000,
);
let entry = &supervisor.evidence().entries()[0];
assert_eq!(
entry.binding_constraint,
BindingConstraint::MonotoneSeverity {
outcome_kind: "Cancelled",
}
);
crate::test_complete!("evidence_records_monotone_severity_cancelled");
}
#[test]
fn evidence_records_monotone_severity_ok() {
init_test("evidence_records_monotone_severity_ok");
let config = RestartConfig::new(5, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Ok(()),
1000,
);
let entry = &supervisor.evidence().entries()[0];
assert_eq!(
entry.binding_constraint,
BindingConstraint::MonotoneSeverity { outcome_kind: "Ok" }
);
crate::test_complete!("evidence_records_monotone_severity_ok");
}
#[test]
fn evidence_records_escalate_strategy() {
init_test("evidence_records_escalate_strategy");
let parent = RegionId::from_arena(ArenaIndex::new(0, 5));
let mut supervisor = Supervisor::new(SupervisionStrategy::Escalate);
supervisor.on_failure(
test_task_id(),
test_region_id(),
Some(parent),
&Outcome::Err(()),
1000,
);
let entry = &supervisor.evidence().entries()[0];
assert_eq!(entry.strategy_kind, "Escalate");
assert_eq!(
entry.binding_constraint,
BindingConstraint::EscalateStrategy
);
crate::test_complete!("evidence_records_escalate_strategy");
}
#[test]
fn evidence_records_budget_insufficient_cost() {
init_test("evidence_records_budget_insufficient_cost");
let config = RestartConfig::new(5, Duration::from_mins(1)).with_restart_cost(100);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let mut budget = Budget {
cost_quota: Some(50),
..Budget::INFINITE
};
supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
Some(&mut budget),
);
let entry = &supervisor.evidence().entries()[0];
assert_eq!(
entry.binding_constraint,
BindingConstraint::InsufficientCost {
required: 100,
remaining: 50,
}
);
crate::test_complete!("evidence_records_budget_insufficient_cost");
}
#[test]
fn evidence_records_budget_insufficient_polls() {
init_test("evidence_records_budget_insufficient_polls");
let config = RestartConfig::new(5, Duration::from_mins(1)).with_min_polls(10);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let mut budget = Budget {
poll_quota: 5,
..Budget::INFINITE
};
supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
Some(&mut budget),
);
let entry = &supervisor.evidence().entries()[0];
assert_eq!(
entry.binding_constraint,
BindingConstraint::InsufficientPolls {
min_required: 10,
remaining: 5,
}
);
crate::test_complete!("evidence_records_budget_insufficient_polls");
}
#[test]
fn evidence_records_budget_deadline_too_close() {
init_test("evidence_records_budget_deadline_too_close");
let config = RestartConfig::new(5, Duration::from_mins(1))
.with_min_remaining(Duration::from_secs(10));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let now_nanos = 1_000_000_000u64; let deadline_nanos = 6_000_000_000u64; let mut budget = Budget {
deadline: Some(Time::from_nanos(deadline_nanos)),
..Budget::INFINITE
};
supervisor.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
now_nanos,
Some(&mut budget),
);
let entry = &supervisor.evidence().entries()[0];
assert!(matches!(
entry.binding_constraint,
BindingConstraint::DeadlineTooClose { .. }
));
crate::test_complete!("evidence_records_budget_deadline_too_close");
}
#[test]
fn evidence_full_lifecycle_restart_to_exhaustion() {
init_test("evidence_full_lifecycle_restart_to_exhaustion");
let window = Duration::from_mins(1);
let config = RestartConfig::new(3, window);
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
for i in 0u64..5 {
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
i * 1_000_000_000,
);
}
let ledger = supervisor.evidence();
assert_eq!(ledger.len(), 5);
for (idx, expected_attempt) in [(0, 1u32), (1, 2), (2, 3)] {
assert_eq!(
ledger.entries()[idx].binding_constraint,
BindingConstraint::RestartAllowed {
attempt: expected_attempt,
}
);
}
for idx in 3..5 {
assert_eq!(
ledger.entries()[idx].binding_constraint,
BindingConstraint::WindowExhausted {
max_restarts: 3,
window,
}
);
}
crate::test_complete!("evidence_full_lifecycle_restart_to_exhaustion");
}
#[test]
fn evidence_for_task_filter() {
init_test("evidence_for_task_filter");
let config = RestartConfig::new(5, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
let task_a = TaskId::from_arena(ArenaIndex::new(0, 1));
let task_b = TaskId::from_arena(ArenaIndex::new(0, 2));
supervisor.on_failure(task_a, test_region_id(), None, &Outcome::Err(()), 1000);
supervisor.on_failure(task_b, test_region_id(), None, &Outcome::Err(()), 2000);
supervisor.on_failure(task_a, test_region_id(), None, &Outcome::Err(()), 3000);
let a_entries: Vec<_> = supervisor.evidence().for_task(task_a).collect();
assert_eq!(a_entries.len(), 2);
assert_eq!(a_entries[0].timestamp, 1000);
assert_eq!(a_entries[1].timestamp, 3000);
assert_eq!(supervisor.evidence().for_task(task_b).count(), 1);
crate::test_complete!("evidence_for_task_filter");
}
#[test]
fn evidence_with_constraint_filter() {
init_test("evidence_with_constraint_filter");
let config = RestartConfig::new(2, Duration::from_mins(1));
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(config));
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2000,
);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Panicked(PanicPayload::new("oops")),
3000,
);
assert_eq!(
supervisor
.evidence()
.with_constraint(|c| matches!(c, BindingConstraint::RestartAllowed { .. }))
.count(),
2
);
assert_eq!(
supervisor
.evidence()
.with_constraint(|c| matches!(c, BindingConstraint::MonotoneSeverity { .. }))
.count(),
1
);
crate::test_complete!("evidence_with_constraint_filter");
}
#[test]
fn evidence_take_drains_ledger() {
init_test("evidence_take_drains_ledger");
let mut supervisor = Supervisor::new(SupervisionStrategy::Stop);
supervisor.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
);
assert_eq!(supervisor.evidence().len(), 1);
let taken = supervisor.take_evidence();
assert_eq!(taken.len(), 1);
assert!(supervisor.evidence().is_empty());
crate::test_complete!("evidence_take_drains_ledger");
}
#[test]
fn evidence_deterministic_across_strategies() {
init_test("evidence_deterministic_across_strategies");
let outcomes = [
Outcome::Ok(()),
Outcome::Err(()),
Outcome::Cancelled(CancelReason::user("test")),
Outcome::Panicked(PanicPayload::new("boom")),
];
for strategy in [
SupervisionStrategy::Stop,
SupervisionStrategy::Restart(RestartConfig::new(5, Duration::from_mins(1))),
SupervisionStrategy::Escalate,
] {
let mut sup_a = Supervisor::new(strategy.clone());
let mut sup_b = Supervisor::new(strategy);
for (i, outcome) in outcomes.iter().enumerate() {
let t = (i as u64) * 1000;
sup_a.on_failure(test_task_id(), test_region_id(), None, outcome, t);
sup_b.on_failure(test_task_id(), test_region_id(), None, outcome, t);
}
let a = sup_a.evidence();
let b = sup_b.evidence();
assert_eq!(a.len(), b.len());
for (ea, eb) in a.entries().iter().zip(b.entries().iter()) {
assert_eq!(ea.timestamp, eb.timestamp);
assert_eq!(ea.strategy_kind, eb.strategy_kind);
assert_eq!(ea.binding_constraint, eb.binding_constraint);
}
}
crate::test_complete!("evidence_deterministic_across_strategies");
}
#[test]
fn evidence_binding_constraint_display() {
init_test("evidence_binding_constraint_display");
let constraints = vec![
(
BindingConstraint::MonotoneSeverity {
outcome_kind: "Panicked",
},
"monotone severity: Panicked is not restartable",
),
(BindingConstraint::ExplicitStopStrategy, "strategy is Stop"),
(BindingConstraint::EscalateStrategy, "strategy is Escalate"),
(
BindingConstraint::RestartAllowed { attempt: 3 },
"restart allowed (attempt 3)",
),
(
BindingConstraint::WindowExhausted {
max_restarts: 5,
window: Duration::from_mins(1),
},
"window exhausted: 5 restarts in 60s",
),
(
BindingConstraint::InsufficientCost {
required: 100,
remaining: 42,
},
"insufficient cost: need 100, have 42",
),
(
BindingConstraint::InsufficientPolls {
min_required: 10,
remaining: 3,
},
"insufficient polls: need 10, have 3",
),
];
for (constraint, expected) in constraints {
assert_eq!(format!("{constraint}"), expected);
}
crate::test_complete!("evidence_binding_constraint_display");
}
#[test]
fn evidence_window_exhaustion_with_budget_vs_without() {
init_test("evidence_window_exhaustion_with_budget_vs_without");
let window = Duration::from_mins(1);
let config = RestartConfig::new(1, window);
let mut sup_no_budget = Supervisor::new(SupervisionStrategy::Restart(config.clone()));
sup_no_budget.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
); sup_no_budget.on_failure(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2000,
);
let mut sup_budget = Supervisor::new(SupervisionStrategy::Restart(config));
let mut budget = Budget::INFINITE;
sup_budget.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
1000,
Some(&mut budget),
); sup_budget.on_failure_with_budget(
test_task_id(),
test_region_id(),
None,
&Outcome::Err(()),
2000,
Some(&mut budget),
);
assert_eq!(
sup_no_budget.evidence().entries()[1].binding_constraint,
BindingConstraint::WindowExhausted {
max_restarts: 1,
window,
}
);
assert_eq!(
sup_budget.evidence().entries()[1].binding_constraint,
BindingConstraint::WindowExhausted {
max_restarts: 1,
window,
}
);
crate::test_complete!("evidence_window_exhaustion_with_budget_vs_without");
}
#[test]
fn emission_wiring_restart_produces_generalized_record() {
init_test("emission_wiring_restart_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 3,
window: Duration::from_mins(1),
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(task, region, None, &Outcome::Err(()), 1_000);
assert_eq!(supervisor.evidence().len(), 1);
let evidence = supervisor.generalized_evidence();
assert_eq!(evidence.len(), 1);
let record = &evidence.entries()[0];
assert_eq!(record.subsystem, crate::evidence::Subsystem::Supervision);
assert_eq!(record.verdict, Verdict::Restart);
assert_eq!(record.task_id, task);
assert_eq!(record.region_id, region);
assert_eq!(record.timestamp, 1_000);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::RestartAllowed { attempt: 1, .. })
));
crate::test_complete!("emission_wiring_restart_produces_generalized_record");
}
#[test]
fn emission_wiring_stop_produces_generalized_record() {
init_test("emission_wiring_stop_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Stop);
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(task, region, None, &Outcome::Err(()), 2_000);
let evidence = supervisor.generalized_evidence();
assert_eq!(evidence.len(), 1);
let record = &evidence.entries()[0];
assert_eq!(record.verdict, Verdict::Stop);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::ExplicitStop)
));
crate::test_complete!("emission_wiring_stop_produces_generalized_record");
}
#[test]
fn emission_wiring_escalate_produces_generalized_record() {
init_test("emission_wiring_escalate_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Escalate);
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(task, region, None, &Outcome::Err(()), 3_000);
let evidence = supervisor.generalized_evidence();
assert_eq!(evidence.len(), 1);
let record = &evidence.entries()[0];
assert_eq!(record.verdict, Verdict::Escalate);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::ExplicitEscalate)
));
crate::test_complete!("emission_wiring_escalate_produces_generalized_record");
}
#[test]
fn emission_wiring_monotone_severity_produces_generalized_record() {
init_test("emission_wiring_monotone_severity_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 3,
window: Duration::from_mins(1),
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(
task,
region,
None,
&Outcome::Panicked(PanicPayload::new("oops")),
4_000,
);
let evidence = supervisor.generalized_evidence();
let record = &evidence.entries()[0];
assert_eq!(record.verdict, Verdict::Stop);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::MonotoneSeverity {
outcome_kind: ref kind
}) if kind == "Panicked"
));
crate::test_complete!("emission_wiring_monotone_severity_produces_generalized_record");
}
#[test]
fn emission_wiring_window_exhaustion_produces_generalized_record() {
init_test("emission_wiring_window_exhaustion_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 1,
window: Duration::from_mins(1),
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(task, region, None, &Outcome::Err(()), 5_000);
supervisor.on_failure(task, region, None, &Outcome::Err(()), 6_000);
let evidence = supervisor.generalized_evidence();
assert_eq!(evidence.len(), 2);
assert_eq!(evidence.entries()[0].verdict, Verdict::Restart);
let record = &evidence.entries()[1];
assert_eq!(record.verdict, Verdict::Stop);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::WindowExhausted {
max_restarts: 1,
..
})
));
crate::test_complete!("emission_wiring_window_exhaustion_produces_generalized_record");
}
#[test]
fn emission_wiring_budget_refused_produces_generalized_record() {
init_test("emission_wiring_budget_refused_produces_generalized_record");
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 5,
window: Duration::from_mins(1),
restart_cost: 100,
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
let mut budget = Budget::new().with_cost_quota(10);
supervisor.on_failure_with_budget(
task,
region,
None,
&Outcome::Err(()),
7_000,
Some(&mut budget),
);
let evidence = supervisor.generalized_evidence();
assert_eq!(evidence.len(), 1);
let record = &evidence.entries()[0];
assert_eq!(record.verdict, Verdict::Stop);
assert!(matches!(
record.detail,
EvidenceDetail::Supervision(SupervisionDetail::BudgetRefused { .. })
));
if let EvidenceDetail::Supervision(SupervisionDetail::BudgetRefused { constraint }) =
&record.detail
{
assert!(constraint.contains("insufficient cost"));
}
crate::test_complete!("emission_wiring_budget_refused_produces_generalized_record");
}
#[test]
fn emission_wiring_ledgers_stay_in_sync() {
init_test("emission_wiring_ledgers_stay_in_sync");
let mut supervisor = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 5,
window: Duration::from_mins(1),
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
for i in 0..3 {
supervisor.on_failure(task, region, None, &Outcome::Err(()), (i + 1) * 1_000);
}
assert_eq!(supervisor.evidence().len(), 3);
assert_eq!(supervisor.generalized_evidence().len(), 3);
for (domain, generalized) in supervisor
.evidence()
.entries()
.iter()
.zip(supervisor.generalized_evidence().entries().iter())
{
assert_eq!(domain.timestamp, generalized.timestamp);
assert_eq!(domain.task_id, generalized.task_id);
assert_eq!(domain.region_id, generalized.region_id);
}
crate::test_complete!("emission_wiring_ledgers_stay_in_sync");
}
#[test]
fn emission_wiring_take_generalized_drains() {
init_test("emission_wiring_take_generalized_drains");
let mut supervisor = Supervisor::new(SupervisionStrategy::Stop);
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
supervisor.on_failure(task, region, None, &Outcome::Err(()), 8_000);
assert_eq!(supervisor.generalized_evidence().len(), 1);
let taken = supervisor.take_generalized_evidence();
assert_eq!(taken.len(), 1);
assert!(supervisor.generalized_evidence().is_empty());
assert_eq!(supervisor.evidence().len(), 1);
crate::test_complete!("emission_wiring_take_generalized_drains");
}
#[test]
fn emission_wiring_render_is_deterministic() {
init_test("emission_wiring_render_is_deterministic");
let mut sup_a = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 2,
window: Duration::from_mins(1),
..Default::default()
}));
let mut sup_b = Supervisor::new(SupervisionStrategy::Restart(RestartConfig {
max_restarts: 2,
window: Duration::from_mins(1),
..Default::default()
}));
let task = TaskId::from_arena(ArenaIndex::new(0, 1));
let region = RegionId::from_arena(ArenaIndex::new(0, 0));
for t in [1_000u64, 2_000, 3_000] {
sup_a.on_failure(task, region, None, &Outcome::Err(()), t);
sup_b.on_failure(task, region, None, &Outcome::Err(()), t);
}
assert_eq!(
sup_a.generalized_evidence().render(),
sup_b.generalized_evidence().render()
);
let rendered = sup_a.generalized_evidence().render();
assert!(rendered.contains("supervision"));
assert!(rendered.contains("RESTART"));
crate::test_complete!("emission_wiring_render_is_deterministic");
}
#[test]
fn storm_monitor_starts_clear() {
init_test("storm_monitor_starts_clear");
let monitor = RestartStormMonitor::new(StormMonitorConfig::default());
assert_eq!(
monitor.alert_state(),
crate::obligation::eprocess::AlertState::Clear
);
assert!((monitor.e_value() - 1.0).abs() < f64::EPSILON);
assert_eq!(monitor.observations(), 0);
crate::test_complete!("storm_monitor_starts_clear");
}
#[test]
#[should_panic(expected = "alpha must be in (0, 1)")]
fn storm_monitor_alpha_zero_panics() {
let _m = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.0,
..Default::default()
});
}
#[test]
#[should_panic(expected = "expected_rate must be > 0")]
fn storm_monitor_zero_rate_panics() {
let _m = RestartStormMonitor::new(StormMonitorConfig {
expected_rate: 0.0,
..Default::default()
});
}
#[test]
#[should_panic(expected = "storm threshold must be finite and > 0")]
fn tracker_config_nan_threshold_panics() {
let _config = RestartTrackerConfig::from_restart(RestartConfig::default())
.with_storm_detection(f64::NAN);
}
#[test]
fn storm_monitor_normal_intensity_stays_clear() {
init_test("storm_monitor_normal_intensity_stays_clear");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 1.0, min_observations: 3,
tolerance: 1.2,
});
for _ in 0..100 {
monitor.observe_intensity(0.5); }
assert!(!monitor.is_alert());
assert_eq!(
monitor.alert_state(),
crate::obligation::eprocess::AlertState::Clear
);
crate::test_complete!("storm_monitor_normal_intensity_stays_clear");
}
#[test]
fn storm_monitor_high_intensity_triggers_alert() {
init_test("storm_monitor_high_intensity_triggers_alert");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.05, min_observations: 3,
tolerance: 1.2,
});
for _ in 0..10 {
monitor.observe_intensity(5.0); }
assert!(monitor.is_alert());
assert!(monitor.alert_count() > 0);
assert!(monitor.e_value() >= monitor.threshold());
crate::test_complete!("storm_monitor_high_intensity_triggers_alert");
}
#[test]
fn storm_monitor_alert_count_tracks_transitions_not_samples() {
init_test("storm_monitor_alert_count_tracks_transitions_not_samples");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.05,
min_observations: 3,
tolerance: 1.2,
});
for _ in 0..10 {
monitor.observe_intensity(5.0);
}
assert!(
monitor.is_alert(),
"sustained storm should enter alert state"
);
assert_eq!(
monitor.alert_count(),
1,
"alert_count should increment once when the monitor first crosses into alert"
);
for _ in 0..10 {
monitor.observe_intensity(5.0);
}
assert_eq!(
monitor.alert_count(),
1,
"additional samples while already alert must not inflate alert_count"
);
monitor.reset();
for _ in 0..10 {
monitor.observe_intensity(5.0);
}
assert_eq!(
monitor.alert_count(),
1,
"after reset, the next alert transition should be counted once again"
);
crate::test_complete!("storm_monitor_alert_count_tracks_transitions_not_samples");
}
#[test]
fn storm_monitor_gated_by_min_observations() {
init_test("storm_monitor_gated_by_min_observations");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.01,
min_observations: 5,
tolerance: 1.2,
});
monitor.observe_intensity(1000.0);
monitor.observe_intensity(1000.0);
assert_eq!(
monitor.alert_state(),
crate::obligation::eprocess::AlertState::Clear
);
for _ in 0..5 {
monitor.observe_intensity(1000.0);
}
assert!(monitor.is_alert());
crate::test_complete!("storm_monitor_gated_by_min_observations");
}
#[test]
fn storm_monitor_observe_from_window() {
init_test("storm_monitor_observe_from_window");
let mut window = RestartIntensityWindow::new(Duration::from_secs(10), 1.0);
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.1, min_observations: 3,
tolerance: 1.2,
});
let base = 1_000_000_000u64; for i in 0..20 {
let now = base + i * 50_000_000; window.record(now);
monitor.observe_from_window(&window, now);
}
assert!(monitor.is_alert());
crate::test_complete!("storm_monitor_observe_from_window");
}
#[test]
fn storm_monitor_reset_clears_state() {
init_test("storm_monitor_reset_clears_state");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.01,
min_observations: 3,
tolerance: 1.2,
});
for _ in 0..10 {
monitor.observe_intensity(100.0);
}
assert!(monitor.is_alert());
monitor.reset();
assert!(!monitor.is_alert());
assert_eq!(monitor.observations(), 0);
assert!((monitor.e_value() - 1.0).abs() < f64::EPSILON);
crate::test_complete!("storm_monitor_reset_clears_state");
}
#[test]
fn storm_monitor_snapshot_display() {
init_test("storm_monitor_snapshot_display");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig::default());
monitor.observe_intensity(0.01);
let snap = monitor.snapshot();
assert_eq!(snap.observations, 1);
assert!(snap.threshold > 0.0);
let display = format!("{snap}");
assert!(display.contains("StormMonitor"));
crate::test_complete!("storm_monitor_snapshot_display");
}
#[test]
fn storm_monitor_supermartingale_under_null() {
init_test("storm_monitor_supermartingale_under_null");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 1.0,
min_observations: 3,
tolerance: 1.2,
});
for i in 0u32..1000 {
let intensity = f64::from((i % 10) + 1) * 0.1; monitor.observe_intensity(intensity);
}
assert!(monitor.e_value() <= 2.0);
assert!(!monitor.is_alert());
crate::test_complete!("storm_monitor_supermartingale_under_null");
}
#[test]
fn storm_monitor_deterministic_across_runs() {
init_test("storm_monitor_deterministic_across_runs");
let config = StormMonitorConfig::default();
let intensities = [0.01, 0.05, 0.1, 0.5, 1.0];
let mut m1 = RestartStormMonitor::new(config);
let mut m2 = RestartStormMonitor::new(config);
for &i in &intensities {
m1.observe_intensity(i);
m2.observe_intensity(i);
}
assert!((m1.e_value() - m2.e_value()).abs() < f64::EPSILON);
crate::test_complete!("storm_monitor_deterministic_across_runs");
}
#[test]
fn storm_monitor_config_default() {
init_test("storm_monitor_config_default");
let config = StormMonitorConfig::default();
assert!((config.alpha - 0.01).abs() < f64::EPSILON);
assert!((config.expected_rate - 0.05).abs() < f64::EPSILON);
assert_eq!(config.min_observations, 3);
crate::test_complete!("storm_monitor_config_default");
}
#[test]
fn restart_tracker_aligns_default_storm_monitor_with_threshold() {
init_test("restart_tracker_aligns_default_storm_monitor_with_threshold");
let config =
RestartTrackerConfig::from_restart(RestartConfig::new(10, Duration::from_secs(1)))
.with_storm_detection(2.0);
let mut tracker = RestartTracker::new(config);
tracker.record(0);
tracker.record(500_000_000);
tracker.record(900_000_000);
assert!(
tracker.is_intensity_storm(900_000_000),
"intensity threshold should trip once three restarts land inside one second"
);
assert!(
tracker.is_storm(),
"default e-process monitor should align with the configured threshold"
);
crate::test_complete!("restart_tracker_aligns_default_storm_monitor_with_threshold");
}
#[test]
fn restart_tracker_preserves_explicit_monitor_rate_across_builder_order() {
init_test("restart_tracker_preserves_explicit_monitor_rate_across_builder_order");
let explicit_monitor = StormMonitorConfig {
alpha: 0.01,
expected_rate: StormMonitorConfig::default().expected_rate,
min_observations: 1,
tolerance: 1.2,
};
let build_tracker = |threshold_first: bool| {
let config = if threshold_first {
RestartTrackerConfig::from_restart(RestartConfig::new(10, Duration::from_secs(10)))
.with_storm_detection(2.0)
.with_storm_monitor(explicit_monitor)
} else {
RestartTrackerConfig::from_restart(RestartConfig::new(10, Duration::from_secs(10)))
.with_storm_monitor(explicit_monitor)
.with_storm_detection(2.0)
};
let mut tracker = RestartTracker::new(config);
tracker.record(0);
tracker
.storm_snapshot()
.expect("storm detection enabled")
.e_value
};
let threshold_then_monitor = build_tracker(true);
let monitor_then_threshold = build_tracker(false);
assert!(
threshold_then_monitor > 1.0,
"explicit expected_rate=0.05 should be preserved instead of being rewritten from threshold"
);
assert!(
(threshold_then_monitor - monitor_then_threshold).abs() < f64::EPSILON,
"builder order must not change explicit storm monitor behavior"
);
crate::test_complete!(
"restart_tracker_preserves_explicit_monitor_rate_across_builder_order"
);
}
#[test]
fn obs_evidence_ledger_determinism_mixed_outcomes() {
init_test("obs_evidence_ledger_determinism_mixed_outcomes");
let config = RestartConfig::new(3, Duration::from_mins(1));
let task = test_task_id();
let region = test_region_id();
let run = || {
let mut sup = Supervisor::new(SupervisionStrategy::Restart(config.clone()));
let outcomes = [
Outcome::Err(()),
Outcome::Err(()),
Outcome::Ok(()),
Outcome::Cancelled(CancelReason::user("test")),
Outcome::Err(()),
Outcome::Panicked(PanicPayload::new("boom")),
];
let mut decisions = Vec::new();
for (i, outcome) in outcomes.iter().enumerate() {
let t = (i as u64 + 1) * 1_000;
decisions.push(sup.on_failure(task, region, None, outcome, t));
}
(sup, decisions)
};
let (sup_a, dec_a) = run();
let (sup_b, dec_b) = run();
assert_eq!(dec_a.len(), dec_b.len());
for (a, b) in dec_a.iter().zip(dec_b.iter()) {
assert_eq!(format!("{a:?}"), format!("{b:?}"));
}
let ev_a = sup_a.evidence();
let ev_b = sup_b.evidence();
assert_eq!(ev_a.len(), ev_b.len());
for (a, b) in ev_a.entries().iter().zip(ev_b.entries().iter()) {
assert_eq!(a.timestamp, b.timestamp);
assert_eq!(
format!("{:?}", a.binding_constraint),
format!("{:?}", b.binding_constraint)
);
}
assert_eq!(
sup_a.generalized_evidence().render(),
sup_b.generalized_evidence().render()
);
crate::test_complete!("obs_evidence_ledger_determinism_mixed_outcomes");
}
#[test]
fn obs_storm_monitor_intensity_window_integration_deterministic() {
init_test("obs_storm_monitor_intensity_window_integration_deterministic");
let run = || {
let mut window = RestartIntensityWindow::new(Duration::from_secs(10), 1.0);
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.1,
min_observations: 3,
tolerance: 1.2,
});
let base = 1_000_000_000u64;
let mut states = Vec::new();
for i in 0..30 {
let now = base + i * 100_000_000; window.record(now);
let state = monitor.observe_from_window(&window, now);
states.push((monitor.e_value(), state));
}
states
};
let run_a = run();
let run_b = run();
assert_eq!(run_a.len(), run_b.len());
for ((e_a, s_a), (e_b, s_b)) in run_a.iter().zip(run_b.iter()) {
assert!((e_a - e_b).abs() < f64::EPSILON, "e-values diverged");
assert_eq!(s_a, s_b, "alert states diverged");
}
crate::test_complete!("obs_storm_monitor_intensity_window_integration_deterministic");
}
#[test]
fn obs_eprocess_alert_transitions_monotone() {
init_test("obs_eprocess_alert_transitions_monotone");
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.01,
expected_rate: 0.05,
min_observations: 3,
tolerance: 1.2,
});
let mut saw_watching = false;
let mut saw_alert = false;
let mut transitions = Vec::new();
for i in 0..50 {
let intensity = f64::from(i).mul_add(0.1, 0.01);
let state = monitor.observe_intensity(intensity);
match state {
crate::obligation::eprocess::AlertState::Watching if !saw_watching => {
saw_watching = true;
transitions.push("watching");
}
crate::obligation::eprocess::AlertState::Alert if !saw_alert => {
saw_alert = true;
transitions.push("alert");
}
_ => {}
}
}
assert!(saw_alert, "monitor should reach alert state");
if saw_watching {
let w_pos = transitions.iter().position(|t| *t == "watching");
let a_pos = transitions.iter().position(|t| *t == "alert");
if let (Some(w), Some(a)) = (w_pos, a_pos) {
assert!(w < a, "watching must precede alert");
}
}
crate::test_complete!("obs_eprocess_alert_transitions_monotone");
}
#[test]
fn obs_supervisor_storm_combined_determinism() {
init_test("obs_supervisor_storm_combined_determinism");
let run = || {
let config = RestartConfig::new(5, Duration::from_secs(10));
let mut sup = Supervisor::new(SupervisionStrategy::Restart(config));
let mut window = RestartIntensityWindow::new(Duration::from_secs(10), 1.0);
let mut monitor = RestartStormMonitor::new(StormMonitorConfig {
alpha: 0.05,
expected_rate: 0.5,
min_observations: 3,
tolerance: 1.2,
});
let task = test_task_id();
let region = test_region_id();
let mut snapshots = Vec::new();
for i in 0..8u64 {
let now = i * 500_000_000; let decision = sup.on_failure(task, region, None, &Outcome::Err(()), now);
window.record(now);
let alert = monitor.observe_from_window(&window, now);
snapshots.push((
format!("{decision:?}"),
monitor.e_value(),
alert,
window.intensity(now),
));
}
(sup.evidence().len(), snapshots)
};
let (len_a, snap_a) = run();
let (len_b, snap_b) = run();
assert_eq!(len_a, len_b);
assert_eq!(snap_a.len(), snap_b.len());
for ((dec_a, e_a, alert_a, int_a), (dec_b, e_b, alert_b, int_b)) in
snap_a.iter().zip(snap_b.iter())
{
assert_eq!(dec_a, dec_b, "decisions diverged");
assert!((e_a - e_b).abs() < f64::EPSILON, "e-values diverged");
assert_eq!(alert_a, alert_b, "alerts diverged");
assert!((int_a - int_b).abs() < f64::EPSILON, "intensities diverged");
}
crate::test_complete!("obs_supervisor_storm_combined_determinism");
}
#[test]
fn obs_evidence_ledger_binding_constraints_cover_all_paths() {
init_test("obs_evidence_ledger_binding_constraints_cover_all_paths");
let task = test_task_id();
let region = test_region_id();
let config = RestartConfig::new(2, Duration::from_mins(1)).with_restart_cost(100);
let mut sup = Supervisor::new(SupervisionStrategy::Restart(config));
sup.on_failure(task, region, None, &Outcome::Err(()), 1_000);
sup.on_failure(task, region, None, &Outcome::Err(()), 2_000);
sup.on_failure(task, region, None, &Outcome::Err(()), 3_000);
sup.on_failure(task, region, None, &Outcome::Ok(()), 4_000);
sup.on_failure(
task,
region,
None,
&Outcome::Cancelled(CancelReason::user("test")),
5_000,
);
sup.on_failure(
task,
region,
None,
&Outcome::Panicked(PanicPayload::new("x")),
6_000,
);
let entries = sup.evidence().entries();
assert_eq!(entries.len(), 6);
assert!(matches!(
entries[0].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 1 }
));
assert!(matches!(
entries[1].binding_constraint,
BindingConstraint::RestartAllowed { attempt: 2 }
));
assert!(matches!(
entries[2].binding_constraint,
BindingConstraint::WindowExhausted { .. }
));
assert!(matches!(
entries[3].binding_constraint,
BindingConstraint::MonotoneSeverity { outcome_kind: "Ok" }
));
assert!(matches!(
entries[4].binding_constraint,
BindingConstraint::MonotoneSeverity {
outcome_kind: "Cancelled"
}
));
assert!(matches!(
entries[5].binding_constraint,
BindingConstraint::MonotoneSeverity {
outcome_kind: "Panicked"
}
));
crate::test_complete!("obs_evidence_ledger_binding_constraints_cover_all_paths");
}
#[test]
fn gate_child_name_clone_is_arc_bump() {
init_test("gate_child_name_clone_is_arc_bump");
let name = ChildName::from("test_worker");
assert_eq!(name.strong_count(), 1);
let cloned = name.clone();
assert_eq!(name.strong_count(), 2);
assert_eq!(cloned.strong_count(), 2);
assert_eq!(name, cloned);
drop(cloned);
assert_eq!(name.strong_count(), 1);
crate::test_complete!("gate_child_name_clone_is_arc_bump");
}
#[test]
fn gate_restart_plan_shares_arcs_with_children() {
init_test("gate_restart_plan_shares_arcs_with_children");
let compiled = SupervisorBuilder::new("alloc_gate")
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("alpha", noop_start))
.child(ChildSpec::new("bravo", noop_start))
.child(ChildSpec::new("charlie", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.compile()
.unwrap();
let alpha_rc_before = compiled.children[compiled.start_order[0]]
.name
.strong_count();
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("charlie", &err)
.expect("plan");
let alpha_rc_after = compiled.children[compiled.start_order[0]]
.name
.strong_count();
assert_eq!(
alpha_rc_after,
alpha_rc_before + 2,
"plan names must share Arc with children (refcount bump, not copy)"
);
assert_eq!(plan.cancel_order.len(), 3);
assert_eq!(plan.restart_order.len(), 3);
drop(plan);
let alpha_rc_final = compiled.children[compiled.start_order[0]]
.name
.strong_count();
assert_eq!(alpha_rc_final, alpha_rc_before);
crate::test_complete!("gate_restart_plan_shares_arcs_with_children");
}
#[test]
fn gate_compiled_ops_share_arcs_with_plan() {
init_test("gate_compiled_ops_share_arcs_with_plan");
let compiled =
SupervisorBuilder::new("ops_gate")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("svc", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.compile()
.unwrap();
let err: Outcome<(), ()> = Outcome::Err(());
let plan = compiled
.restart_plan_for_failure("svc", &err)
.expect("plan");
let rc_before = plan.cancel_order[0].strong_count();
let ops = compiled.compile_restart_ops(&plan);
let rc_after = plan.cancel_order[0].strong_count();
assert_eq!(
rc_after,
rc_before + 3,
"ops names must share Arc with plan (refcount bump per op)"
);
assert_eq!(ops.ops.len(), 3);
drop(ops);
assert_eq!(plan.cancel_order[0].strong_count(), rc_before);
crate::test_complete!("gate_compiled_ops_share_arcs_with_plan");
}
#[test]
fn supervision_strategy_debug_clone_eq_default() {
let s = SupervisionStrategy::default();
assert_eq!(s, SupervisionStrategy::Stop);
let s2 = s.clone();
assert_eq!(s, s2);
assert_ne!(s, SupervisionStrategy::Escalate);
let dbg = format!("{s:?}");
assert!(dbg.contains("Stop"));
}
#[test]
fn restart_policy_debug_clone_copy_eq_default() {
let p = RestartPolicy::default();
assert_eq!(p, RestartPolicy::OneForOne);
let p2 = p; let p3 = p;
assert_eq!(p2, p3);
assert_ne!(p, RestartPolicy::OneForAll);
let dbg = format!("{p:?}");
assert!(dbg.contains("OneForOne"));
}
#[test]
fn escalation_policy_debug_clone_copy_eq_default() {
let e = EscalationPolicy::default();
assert_eq!(e, EscalationPolicy::Stop);
let e2 = e; assert_eq!(e, e2);
assert_ne!(e, EscalationPolicy::Escalate);
let dbg = format!("{e:?}");
assert!(dbg.contains("Stop"));
}
#[test]
fn name_collision_policy_debug_clone_copy_eq_default() {
let n = NameCollisionPolicy::default();
assert_eq!(n, NameCollisionPolicy::Fail);
let n2 = n; assert_eq!(n, n2);
assert_ne!(n, NameCollisionPolicy::Replace);
let dbg = format!("{n:?}");
assert!(dbg.contains("Fail"));
}
#[test]
fn start_tie_break_debug_clone_copy_eq_default() {
let t = StartTieBreak::default();
assert_eq!(t, StartTieBreak::InsertionOrder);
let t2 = t; assert_eq!(t, t2);
assert_ne!(t, StartTieBreak::NameLex);
let dbg = format!("{t:?}");
assert!(dbg.contains("InsertionOrder"));
}
#[derive(Debug, Clone)]
struct SupervisionMetamorphicConfig {
child_count: usize,
max_restarts: u32,
restart_window: Duration,
}
impl Default for SupervisionMetamorphicConfig {
fn default() -> Self {
Self {
child_count: 5,
max_restarts: 3,
restart_window: Duration::from_mins(1),
}
}
}
trait SupervisionDetRngExt {
fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize;
fn choose<'a, T>(&mut self, items: &'a [T]) -> &'a T;
}
impl SupervisionDetRngExt for crate::util::det_rng::DetRng {
fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize {
if range.is_empty() {
range.start
} else {
range.start + (self.next_u64() as usize % (range.end - range.start))
}
}
fn choose<'a, T>(&mut self, items: &'a [T]) -> &'a T {
let idx = self.gen_range(0..items.len());
&items[idx]
}
}
fn noop_start_metamorphic(
_scope: &crate::cx::Scope<'static, crate::types::policy::FailFast>,
_state: &mut crate::runtime::RuntimeState,
_cx: &crate::cx::Cx,
) -> Result<TaskId, SpawnError> {
use crate::util::ArenaIndex;
let arena_idx = ArenaIndex::new(42, 0);
Ok(TaskId::from_arena(arena_idx))
}
fn create_test_supervisor_builder(
name: &str,
child_count: usize,
restart_policy: RestartPolicy,
rng: &mut crate::util::det_rng::DetRng,
) -> SupervisorBuilder {
let mut builder = SupervisorBuilder::new(name).with_restart_policy(restart_policy);
for i in 0..child_count {
let child_name = format!("child_{}", i);
let restart_config = RestartConfig::new(3, Duration::from_mins(1)).with_backoff(
BackoffStrategy::Fixed(Duration::from_millis(rng.gen_range(10..100) as u64)),
);
builder = builder.child(
ChildSpec::new(&*child_name, noop_start_metamorphic)
.with_restart(SupervisionStrategy::Restart(restart_config)),
);
}
builder
}
#[test]
fn metamorphic_one_for_one_vs_one_for_all_restart_scope() {
init_test("metamorphic_one_for_one_vs_one_for_all_restart_scope");
const SEED: u64 = 0xA11C_E001_0000_0001;
let mut rng = crate::util::det_rng::DetRng::new(SEED);
let config = SupervisionMetamorphicConfig::default();
let one_for_one = create_test_supervisor_builder(
"one_for_one_sup",
config.child_count,
RestartPolicy::OneForOne,
&mut rng,
)
.compile()
.unwrap();
let one_for_all = create_test_supervisor_builder(
"one_for_all_sup",
config.child_count,
RestartPolicy::OneForAll,
&mut rng,
)
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
for child_idx in 0..config.child_count {
let failed_child_name = format!("child_{}", child_idx);
let one_for_one_plan = one_for_one
.restart_plan_for_failure(&failed_child_name, &err_outcome)
.expect("OneForOne plan");
let one_for_all_plan = one_for_all
.restart_plan_for_failure(&failed_child_name, &err_outcome)
.expect("OneForAll plan");
assert_eq!(
one_for_one_plan.cancel_order.len(),
1,
"OneForOne should cancel only failed child {}",
child_idx
);
assert_eq!(
one_for_one_plan.restart_order.len(),
1,
"OneForOne should restart only failed child {}",
child_idx
);
assert_eq!(
one_for_one_plan.cancel_order[0].as_str(),
failed_child_name,
"OneForOne should cancel the failed child {}",
child_idx
);
assert_eq!(
one_for_all_plan.cancel_order.len(),
config.child_count,
"OneForAll should cancel all {} children when child {} fails",
config.child_count,
child_idx
);
assert_eq!(
one_for_all_plan.restart_order.len(),
config.child_count,
"OneForAll should restart all {} children when child {} fails",
config.child_count,
child_idx
);
assert_eq!(one_for_one_plan.policy, RestartPolicy::OneForOne);
assert_eq!(one_for_all_plan.policy, RestartPolicy::OneForAll);
}
crate::test_complete!("metamorphic_one_for_one_vs_one_for_all_restart_scope");
}
#[test]
fn metamorphic_restart_budget_exhaustion_invariance() {
init_test("metamorphic_restart_budget_exhaustion_invariance");
let config = SupervisionMetamorphicConfig {
child_count: 3,
max_restarts: 2, restart_window: Duration::from_secs(60),
};
for &restart_policy in &[RestartPolicy::OneForOne, RestartPolicy::OneForAll] {
let supervisor = SupervisorBuilder::new("budget_test")
.with_restart_policy(restart_policy)
.child(
ChildSpec::new("child_0", noop_start_metamorphic).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(
config.max_restarts,
config.restart_window,
)),
),
)
.child(
ChildSpec::new("child_1", noop_start_metamorphic).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(
config.max_restarts,
config.restart_window,
)),
),
)
.child(
ChildSpec::new("child_2", noop_start_metamorphic).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(
config.max_restarts,
config.restart_window,
)),
),
)
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
let failure_sequences = vec![
vec!["child_0", "child_1", "child_2"],
vec!["child_2", "child_1", "child_0"],
vec!["child_1", "child_0", "child_2"],
];
for sequence in &failure_sequences {
for &child_name in sequence {
let plan = supervisor.restart_plan_for_failure(child_name, &err_outcome);
match restart_policy {
RestartPolicy::OneForOne => {
if let Some(plan) = plan {
assert!(plan.cancel_order.contains(&ChildName::new(child_name)));
assert!(plan.restart_order.contains(&ChildName::new(child_name)));
}
}
RestartPolicy::OneForAll => {
if let Some(plan) = plan {
assert_eq!(plan.cancel_order.len(), config.child_count);
assert_eq!(plan.restart_order.len(), config.child_count);
}
}
RestartPolicy::RestForOne => {}
}
}
}
}
crate::test_complete!("metamorphic_restart_budget_exhaustion_invariance");
}
#[test]
fn metamorphic_child_failure_isolation() {
init_test("metamorphic_child_failure_isolation");
let supervisor = SupervisorBuilder::new("isolation_test")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("child_a", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("child_b", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("child_c", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
let plan_a = supervisor
.restart_plan_for_failure("child_a", &err_outcome)
.unwrap();
let plan_b = supervisor
.restart_plan_for_failure("child_b", &err_outcome)
.unwrap();
let plan_c = supervisor
.restart_plan_for_failure("child_c", &err_outcome)
.unwrap();
assert_eq!(plan_a.cancel_order.len(), 1);
assert_eq!(plan_a.cancel_order[0].as_str(), "child_a");
assert_eq!(plan_a.restart_order[0].as_str(), "child_a");
assert_eq!(plan_b.cancel_order.len(), 1);
assert_eq!(plan_b.cancel_order[0].as_str(), "child_b");
assert_eq!(plan_b.restart_order[0].as_str(), "child_b");
assert_eq!(plan_c.cancel_order.len(), 1);
assert_eq!(plan_c.cancel_order[0].as_str(), "child_c");
assert_eq!(plan_c.restart_order[0].as_str(), "child_c");
for &failed_child in &["child_a", "child_b", "child_c"] {
let isolated_plan = supervisor
.restart_plan_for_failure(failed_child, &err_outcome)
.unwrap();
assert_eq!(isolated_plan.cancel_order.len(), 1);
assert_eq!(isolated_plan.restart_order.len(), 1);
assert_eq!(isolated_plan.cancel_order[0].as_str(), failed_child);
assert_eq!(isolated_plan.restart_order[0].as_str(), failed_child);
assert_eq!(isolated_plan.policy, RestartPolicy::OneForOne);
}
crate::test_complete!("metamorphic_child_failure_isolation");
}
#[test]
fn metamorphic_restart_policy_commutativity() {
init_test("metamorphic_restart_policy_commutativity");
let _config = SupervisionMetamorphicConfig {
child_count: 4,
..SupervisionMetamorphicConfig::default()
};
let supervisor =
SupervisorBuilder::new("commutativity_test")
.with_restart_policy(RestartPolicy::OneForAll)
.child(ChildSpec::new("alpha", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("beta", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("gamma", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("delta", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
let child_names = ["alpha", "beta", "gamma", "delta"];
let mut plans = Vec::new();
for &child_name in &child_names {
let plan = supervisor
.restart_plan_for_failure(child_name, &err_outcome)
.unwrap();
plans.push(plan);
}
for i in 1..plans.len() {
assert_eq!(
plans[0].policy, plans[i].policy,
"Plan {} has different policy than plan 0",
i
);
assert_eq!(
plans[0].cancel_order.len(),
plans[i].cancel_order.len(),
"Plan {} has different cancel count than plan 0",
i
);
assert_eq!(
plans[0].restart_order.len(),
plans[i].restart_order.len(),
"Plan {} has different restart count than plan 0",
i
);
assert_eq!(plans[i].cancel_order.len(), child_names.len());
assert_eq!(plans[i].restart_order.len(), child_names.len());
}
for plan in &plans {
for &expected_child in &child_names {
assert!(
plan.cancel_order
.iter()
.any(|name| name.as_str() == expected_child),
"Plan missing {} in cancel_order",
expected_child
);
assert!(
plan.restart_order
.iter()
.any(|name| name.as_str() == expected_child),
"Plan missing {} in restart_order",
expected_child
);
}
}
crate::test_complete!("metamorphic_restart_policy_commutativity");
}
#[test]
fn metamorphic_escalation_policy_consistency() {
init_test("metamorphic_escalation_policy_consistency");
let escalation_policies = [EscalationPolicy::Stop, EscalationPolicy::Escalate];
let restart_policies = [RestartPolicy::OneForOne, RestartPolicy::OneForAll];
let child_counts = [1, 3, 5];
for &_escalation_policy in &escalation_policies {
for &restart_policy in &restart_policies {
for &child_count in &child_counts {
let mut builder = SupervisorBuilder::new("escalation_test")
.with_restart_policy(restart_policy);
for i in 0..child_count {
let child_name = format!("child_{}", i);
builder =
builder.child(ChildSpec::new(&*child_name, noop_start).with_restart(
SupervisionStrategy::Restart(
RestartConfig::new(1, Duration::from_secs(1)), ),
));
}
let supervisor = builder.compile().unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
for i in 0..child_count {
let child_name = format!("child_{}", i);
let plan_result =
supervisor.restart_plan_for_failure(&child_name, &err_outcome);
match plan_result {
Some(plan) => {
match restart_policy {
RestartPolicy::OneForOne => {
assert_eq!(plan.cancel_order.len(), 1);
assert_eq!(plan.restart_order.len(), 1);
assert_eq!(plan.cancel_order[0].as_str(), child_name);
}
RestartPolicy::OneForAll => {
assert_eq!(plan.cancel_order.len(), child_count);
assert_eq!(plan.restart_order.len(), child_count);
}
RestartPolicy::RestForOne => {
assert!(!plan.cancel_order.is_empty());
assert!(!plan.restart_order.is_empty());
}
}
assert_eq!(plan.policy, restart_policy);
}
None => {
}
}
}
}
}
}
crate::test_complete!("metamorphic_escalation_policy_consistency");
}
#[test]
fn metamorphic_lab_runtime_replay_determinism() {
init_test("metamorphic_lab_runtime_replay_determinism");
const SEED: u64 = 0xDEADBEEF_CAFEBABE;
let results: Vec<Vec<String>> = (0..3)
.map(|_| {
let mut rng = crate::util::det_rng::DetRng::new(SEED);
let mut plan_summaries = Vec::new();
let supervisor = SupervisorBuilder::new("determinism_test")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("service_a", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.child(ChildSpec::new("service_b", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(3, Duration::from_mins(1))),
))
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
let services = ["service_a", "service_b"];
for _ in 0..10 {
let chosen_service = rng.choose(&services);
if let Some(plan) =
supervisor.restart_plan_for_failure(chosen_service, &err_outcome)
{
plan_summaries.push(format!(
"fail:{} policy:{:?} cancel:{} restart:{}",
chosen_service,
plan.policy,
plan.cancel_order.len(),
plan.restart_order.len()
));
}
}
plan_summaries
})
.collect();
for i in 1..results.len() {
assert_eq!(
results[0], results[i],
"Run {} produced different results than run 0 - determinism broken",
i
);
}
assert!(
!results[0].is_empty(),
"Should have generated some supervision plans"
);
crate::test_complete!("metamorphic_lab_runtime_replay_determinism");
}
#[test]
fn metamorphic_composite_supervision_invariants() {
init_test("metamorphic_composite_supervision_invariants");
let config = SupervisionMetamorphicConfig {
child_count: 3,
max_restarts: 2,
restart_window: Duration::from_secs(30),
};
let supervisor = SupervisorBuilder::new("composite_test")
.with_restart_policy(RestartPolicy::OneForOne)
.child(ChildSpec::new("primary", noop_start).with_restart(
SupervisionStrategy::Restart(
RestartConfig::new(config.max_restarts, config.restart_window).with_backoff(
BackoffStrategy::Exponential {
initial: Duration::from_millis(100),
max: Duration::from_secs(5),
multiplier: 2.0,
},
),
),
))
.child(ChildSpec::new("secondary", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(
config.max_restarts,
config.restart_window,
)),
))
.child(ChildSpec::new("tertiary", noop_start).with_restart(
SupervisionStrategy::Restart(RestartConfig::new(
config.max_restarts,
config.restart_window,
)),
))
.compile()
.unwrap();
let err_outcome: Outcome<(), ()> = Outcome::Err(());
let children = ["primary", "secondary", "tertiary"];
for &child_name in &children {
let plan = supervisor.restart_plan_for_failure(child_name, &err_outcome);
if let Some(plan) = plan {
assert_eq!(plan.cancel_order.len(), 1);
assert_eq!(plan.restart_order.len(), 1);
assert_eq!(plan.cancel_order[0].as_str(), child_name);
assert_eq!(plan.restart_order[0].as_str(), child_name);
assert_eq!(plan.policy, RestartPolicy::OneForOne);
assert!(!plan.cancel_order.is_empty());
assert!(!plan.restart_order.is_empty());
assert_eq!(plan.cancel_order.len(), plan.restart_order.len());
}
}
for _ in 0..5 {
let primary_plan1 = supervisor.restart_plan_for_failure("primary", &err_outcome);
let primary_plan2 = supervisor.restart_plan_for_failure("primary", &err_outcome);
match (primary_plan1, primary_plan2) {
(Some(plan1), Some(plan2)) => {
assert_eq!(plan1.policy, plan2.policy);
assert_eq!(plan1.cancel_order, plan2.cancel_order);
assert_eq!(plan1.restart_order, plan2.restart_order);
}
(None, None) => {
}
_ => {
panic!("Inconsistent restart plan generation for same input");
}
}
}
crate::test_complete!("metamorphic_composite_supervision_invariants");
}
}