use crate::id::types::ChildId;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct MeltdownPolicy {
pub child_max_restarts: u32,
pub child_window: Duration,
pub group_max_failures: u32,
pub group_window: Duration,
pub supervisor_max_failures: u32,
pub supervisor_window: Duration,
pub reset_after: Duration,
}
impl MeltdownPolicy {
pub fn new(
child_max_restarts: u32,
child_window: Duration,
group_max_failures: u32,
group_window: Duration,
supervisor_max_failures: u32,
supervisor_window: Duration,
reset_after: Duration,
) -> Self {
Self {
child_max_restarts,
child_window,
group_max_failures,
group_window,
supervisor_max_failures,
supervisor_window,
reset_after,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MeltdownOutcome {
Continue,
ChildFuse,
GroupFuse,
SupervisorFuse,
}
#[derive(Debug, Clone)]
pub struct MeltdownTracker {
pub policy: MeltdownPolicy,
child_failures: HashMap<ChildId, VecDeque<Instant>>,
group_failures: HashMap<String, VecDeque<Instant>>,
group_counters: HashMap<String, GroupCounter>,
supervisor_failures: VecDeque<Instant>,
last_failure: Option<Instant>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GroupCounter {
pub failures: VecDeque<Instant>,
pub fuse_active: bool,
}
impl GroupCounter {
pub fn new() -> Self {
Self {
failures: VecDeque::new(),
fuse_active: false,
}
}
}
impl Default for GroupCounter {
fn default() -> Self {
Self::new()
}
}
impl MeltdownTracker {
pub fn new(policy: MeltdownPolicy) -> Self {
Self {
policy,
child_failures: HashMap::new(),
group_failures: HashMap::new(),
group_counters: HashMap::new(),
supervisor_failures: VecDeque::new(),
last_failure: None,
}
}
#[allow(dead_code)]
pub fn record_child_restart(&mut self, now: Instant) -> MeltdownOutcome {
let synthetic_child = ChildId::new("_default".to_string());
self.record_child_restart_with_group(synthetic_child, None, now)
}
pub fn record_child_restart_with_group(
&mut self,
child_id: ChildId,
group_id: Option<String>,
now: Instant,
) -> MeltdownOutcome {
self.prune(now);
let child_queue = self.child_failures.entry(child_id.clone()).or_default();
child_queue.push_back(now);
if let Some(ref gid) = group_id {
let group_queue = self.group_failures.entry(gid.clone()).or_default();
group_queue.push_back(now);
}
self.supervisor_failures.push_back(now);
self.last_failure = Some(now);
self.evaluate_outcome_for_scopes(&child_id, group_id.as_deref())
}
fn evaluate_outcome_for_scopes(
&self,
child_id: &ChildId,
group_id: Option<&str>,
) -> MeltdownOutcome {
if self.supervisor_failures.len() >= self.policy.supervisor_max_failures as usize {
return MeltdownOutcome::SupervisorFuse;
}
if let Some(gid) = group_id {
let group_count = self.group_failures.get(gid).map_or(0, |q| q.len());
if group_count >= self.policy.group_max_failures as usize {
return MeltdownOutcome::GroupFuse;
}
}
let child_count = self.child_failures.get(child_id).map_or(0, |q| q.len());
if child_count >= self.policy.child_max_restarts as usize {
return MeltdownOutcome::ChildFuse;
}
MeltdownOutcome::Continue
}
pub fn reset_if_stable(&mut self, now: Instant) -> bool {
let Some(last_failure) = self.last_failure else {
return false;
};
if now.duration_since(last_failure) < self.policy.reset_after {
return false;
}
self.clear();
true
}
pub fn clear(&mut self) {
self.child_failures.clear();
self.group_failures.clear();
self.supervisor_failures.clear();
self.last_failure = None;
}
pub fn child_failure_count(&self, child_id: &ChildId) -> usize {
self.child_failures.get(child_id).map_or(0, |q| q.len())
}
pub fn group_failure_count(&self, group_id: &str) -> usize {
self.group_failures.get(group_id).map_or(0, |q| q.len())
}
pub fn track_group_failure(&mut self, group_name: &str, now: Instant) -> MeltdownOutcome {
let counter = self
.group_counters
.entry(group_name.to_string())
.or_default();
counter.failures.push_back(now);
prune_window(&mut counter.failures, now, self.policy.group_window);
let count = counter.failures.len();
if count >= self.policy.group_max_failures as usize {
counter.fuse_active = true;
MeltdownOutcome::GroupFuse
} else {
MeltdownOutcome::Continue
}
}
pub fn group_fuse_active(&self, group_name: &str) -> bool {
self.group_counters
.get(group_name)
.is_some_and(|c| c.fuse_active)
}
pub fn propagate_fuse(
&mut self,
failed_group: &str,
affected_groups: &[String],
) -> Vec<String> {
let mut newly_affected = Vec::new();
for group in affected_groups {
if group == failed_group {
continue;
}
let counter = self.group_counters.entry(group.clone()).or_default();
if !counter.fuse_active {
counter.fuse_active = true;
newly_affected.push(group.clone());
}
}
newly_affected
}
fn prune(&mut self, now: Instant) {
for queue in self.child_failures.values_mut() {
prune_window(queue, now, self.policy.child_window);
}
self.child_failures.retain(|_, v| !v.is_empty());
for queue in self.group_failures.values_mut() {
prune_window(queue, now, self.policy.group_window);
}
self.group_failures.retain(|_, v| !v.is_empty());
prune_window(
&mut self.supervisor_failures,
now,
self.policy.supervisor_window,
);
}
fn evaluate_outcome(&self) -> MeltdownOutcome {
if self.supervisor_failures.len() >= self.policy.supervisor_max_failures as usize {
return MeltdownOutcome::SupervisorFuse;
}
for queue in self.group_failures.values() {
if queue.len() >= self.policy.group_max_failures as usize {
return MeltdownOutcome::GroupFuse;
}
}
for queue in self.child_failures.values() {
if queue.len() >= self.policy.child_max_restarts as usize {
return MeltdownOutcome::ChildFuse;
}
}
MeltdownOutcome::Continue
}
pub fn current_outcome_for_test(&self) -> MeltdownOutcome {
self.evaluate_outcome()
}
pub fn get_group_outcome(&self, group_id: &str) -> MeltdownOutcome {
let count = self.group_failures.get(group_id).map_or(0, |q| q.len());
if count >= self.policy.group_max_failures as usize {
MeltdownOutcome::GroupFuse
} else {
MeltdownOutcome::Continue
}
}
pub fn get_supervisor_outcome(&self) -> MeltdownOutcome {
if self.supervisor_failures.len() >= self.policy.supervisor_max_failures as usize {
MeltdownOutcome::SupervisorFuse
} else {
MeltdownOutcome::Continue
}
}
}
fn prune_window(entries: &mut VecDeque<Instant>, now: Instant, window: Duration) {
while entries
.front()
.is_some_and(|entry| now.duration_since(*entry) > window)
{
entries.pop_front();
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LocalVerdict {
pub triggered: bool,
pub outcome: MeltdownOutcome,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MergedVerdict {
pub effective_outcome: MeltdownOutcome,
pub scopes_triggered: Vec<crate::event::payload::MeltdownScope>,
pub lead_scope: Option<crate::event::payload::MeltdownScope>,
}
pub fn merge_meltdown_verdicts(
child_verdict: LocalVerdict,
group_verdict: LocalVerdict,
supervisor_verdict: LocalVerdict,
) -> MergedVerdict {
use crate::event::payload::MeltdownScope;
let mut scopes_triggered = Vec::new();
if child_verdict.triggered {
scopes_triggered.push(MeltdownScope::Child);
}
if group_verdict.triggered {
scopes_triggered.push(MeltdownScope::Group);
}
if supervisor_verdict.triggered {
scopes_triggered.push(MeltdownScope::Supervisor);
}
let effective_outcome = [
supervisor_verdict.outcome,
group_verdict.outcome,
child_verdict.outcome,
]
.iter()
.max_by(|a, b| outcome_severity(**a).cmp(&outcome_severity(**b)))
.copied()
.unwrap_or(MeltdownOutcome::Continue);
let lead_scope = if child_verdict.triggered && child_verdict.outcome == effective_outcome {
Some(MeltdownScope::Child)
} else if group_verdict.triggered && group_verdict.outcome == effective_outcome {
Some(MeltdownScope::Group)
} else if supervisor_verdict.triggered && supervisor_verdict.outcome == effective_outcome {
Some(MeltdownScope::Supervisor)
} else {
None
};
MergedVerdict {
effective_outcome,
scopes_triggered,
lead_scope,
}
}
fn outcome_severity(outcome: MeltdownOutcome) -> u8 {
match outcome {
MeltdownOutcome::Continue => 0,
MeltdownOutcome::ChildFuse => 1,
MeltdownOutcome::GroupFuse => 2,
MeltdownOutcome::SupervisorFuse => 3,
}
}
#[cfg(test)]
mod merge_tests {
use crate::event::payload::MeltdownScope;
use crate::policy::meltdown::{LocalVerdict, MeltdownOutcome, merge_meltdown_verdicts};
#[test]
fn test_merge_child_only() {
let child = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::ChildFuse,
};
let group = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let supervisor = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let merged = merge_meltdown_verdicts(child, group, supervisor);
assert_eq!(merged.effective_outcome, MeltdownOutcome::ChildFuse);
assert_eq!(merged.scopes_triggered, vec![MeltdownScope::Child]);
assert_eq!(merged.lead_scope, Some(MeltdownScope::Child));
}
#[test]
fn test_merge_all_three_uses_strictest_scope() {
let child = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::ChildFuse,
};
let group = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::GroupFuse,
};
let supervisor = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::SupervisorFuse,
};
let merged = merge_meltdown_verdicts(child, group, supervisor);
assert_eq!(merged.effective_outcome, MeltdownOutcome::SupervisorFuse);
assert_eq!(merged.scopes_triggered.len(), 3);
assert_eq!(merged.lead_scope, Some(MeltdownScope::Supervisor));
}
#[test]
fn test_merge_tie_breaks_matching_effective_outcome() {
let child = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::GroupFuse,
};
let group = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::GroupFuse,
};
let supervisor = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let merged = merge_meltdown_verdicts(child, group, supervisor);
assert_eq!(merged.effective_outcome, MeltdownOutcome::GroupFuse);
assert_eq!(merged.lead_scope, Some(MeltdownScope::Child));
}
#[test]
fn test_merge_group_and_supervisor() {
let child = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let group = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::GroupFuse,
};
let supervisor = LocalVerdict {
triggered: true,
outcome: MeltdownOutcome::SupervisorFuse,
};
let merged = merge_meltdown_verdicts(child, group, supervisor);
assert_eq!(merged.effective_outcome, MeltdownOutcome::SupervisorFuse);
assert_eq!(merged.scopes_triggered.len(), 2);
assert_eq!(merged.lead_scope, Some(MeltdownScope::Supervisor));
}
#[test]
fn test_merge_none_triggered() {
let child = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let group = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let supervisor = LocalVerdict {
triggered: false,
outcome: MeltdownOutcome::Continue,
};
let merged = merge_meltdown_verdicts(child, group, supervisor);
assert_eq!(merged.effective_outcome, MeltdownOutcome::Continue);
assert!(merged.scopes_triggered.is_empty());
assert_eq!(merged.lead_scope, None);
}
}