1use crate::error::types::TaskFailure;
12use crate::event::payload::{
13 ColdStartReason, HotLoopReason, MeltdownScope, ProtectionAction, SupervisorEvent,
14 ThrottleGateOwner, What, Where,
15};
16use crate::id::types::{ChildId, SupervisorPath};
17use crate::observe::pipeline::{ObservabilityPipeline, PipelineStage, PipelineStageDiagnostic};
18use crate::policy::backoff::{ColdStartBudget, HotLoopDetector};
19use crate::policy::budget::{BudgetVerdict, RestartBudgetConfig, RestartBudgetTracker};
20use crate::policy::decision::{PolicyFailureKind, TaskExit};
21use crate::policy::failure_window::FailureWindow;
22use crate::policy::group::{GroupDependencyEdge, GroupIsolationPolicy};
23use crate::policy::meltdown::{
24 LocalVerdict, MeltdownOutcome, MeltdownTracker, merge_meltdown_verdicts,
25};
26use crate::policy::task_role_defaults::{
27 EffectivePolicy, OnBudgetExhaustedAction, OnFailureAction, OnSuccessAction, OnTimeoutAction,
28};
29use crate::spec::supervisor::{BackpressureConfig, EscalationPolicy, RestartLimit, SupervisorSpec};
30use crate::tree::builder::SupervisorTree;
31use crate::tree::order::restart_execution_plan;
32use std::time::{Instant, SystemTime};
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ExitClassification {
37 Success,
39 NonZeroExit { exit_code: i32 },
41 Crash { reason: String },
43 Timeout,
45 ExternalCancel,
47 ManualStop,
49}
50
51impl ExitClassification {
52 pub fn as_str(&self) -> &'static str {
54 match self {
55 Self::Success => "success",
56 Self::NonZeroExit { .. } => "nonzero_exit",
57 Self::Crash { .. } => "panic",
58 Self::Timeout => "timeout",
59 Self::ExternalCancel => "external_cancel",
60 Self::ManualStop => "manual_stop",
61 }
62 }
63
64 pub fn should_restart(&self) -> bool {
66 match self {
67 Self::Success => false,
68 Self::NonZeroExit { .. } => true,
69 Self::Crash { .. } => true,
70 Self::Timeout => true,
71 Self::ExternalCancel => false,
72 Self::ManualStop => false,
73 }
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct BudgetEvaluation {
80 pub remaining_restarts: Option<u32>,
82 pub limit_exhausted: bool,
84 pub escalation_policy: Option<String>,
86 pub meltdown_outcome: MeltdownOutcome,
88 pub budget_verdict: Option<BudgetVerdict>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct ActionDecision {
95 pub action: ProtectionAction,
97 pub delay_ms: Option<u64>,
99 pub reason: String,
101}
102
103#[derive(Debug, Clone)]
105pub struct PipelineContext {
106 pub child_id: ChildId,
108 pub supervisor_path: SupervisorPath,
110 pub group_id: Option<String>,
112 pub exit_classification: Option<ExitClassification>,
114 pub failure_window_state: Option<String>,
116 pub budget_evaluation: Option<BudgetEvaluation>,
118 pub action_decision: Option<ActionDecision>,
120 pub cold_start_reason: ColdStartReason,
122 pub hot_loop_reason: HotLoopReason,
124 pub throttle_gate_owner: ThrottleGateOwner,
126 pub effective_policy: Option<EffectivePolicy>,
128 pub scopes_triggered: Vec<MeltdownScope>,
130 pub lead_scope: Option<MeltdownScope>,
132 pub stage_diagnostics: Vec<PipelineStageDiagnostic>,
134 pub execution_result: Option<String>,
136 pub sequence: u64,
138 pub correlation_id: String,
140}
141
142impl PipelineContext {
143 pub fn new(
156 child_id: ChildId,
157 supervisor_path: SupervisorPath,
158 sequence: u64,
159 correlation_id: impl Into<String>,
160 ) -> Self {
161 Self {
162 child_id,
163 supervisor_path,
164 group_id: None,
165 exit_classification: None,
166 failure_window_state: None,
167 budget_evaluation: None,
168 action_decision: None,
169 cold_start_reason: ColdStartReason::NotApplicable,
170 hot_loop_reason: HotLoopReason::NotApplicable,
171 throttle_gate_owner: ThrottleGateOwner::None,
172 effective_policy: None,
173 scopes_triggered: Vec::new(),
174 lead_scope: None,
175 stage_diagnostics: Vec::new(),
176 execution_result: None,
177 sequence,
178 correlation_id: correlation_id.into(),
179 }
180 }
181}
182
183#[derive(Debug)]
185pub struct SupervisionPipeline {
186 pub observability: ObservabilityPipeline,
188 pub meltdown_tracker: MeltdownTracker,
190 pub failure_window: FailureWindow,
192 pub cold_start_budget: ColdStartBudget,
194 pub hot_loop_detector: HotLoopDetector,
196 pub budget_tracker: RestartBudgetTracker,
198 pub group_isolation: GroupIsolationPolicy,
200}
201
202impl SupervisionPipeline {
203 pub fn new(
218 journal_capacity: usize,
219 subscriber_capacity: usize,
220 meltdown_tracker: MeltdownTracker,
221 failure_window: FailureWindow,
222 budget_config: RestartBudgetConfig,
223 group_dependencies: Vec<GroupDependencyEdge>,
224 ) -> Self {
225 Self::with_backpressure_config(
226 journal_capacity,
227 subscriber_capacity,
228 meltdown_tracker,
229 failure_window,
230 budget_config,
231 group_dependencies,
232 BackpressureConfig::default(),
233 )
234 }
235
236 pub fn with_backpressure_config(
252 journal_capacity: usize,
253 subscriber_capacity: usize,
254 meltdown_tracker: MeltdownTracker,
255 failure_window: FailureWindow,
256 budget_config: RestartBudgetConfig,
257 group_dependencies: Vec<GroupDependencyEdge>,
258 backpressure_config: BackpressureConfig,
259 ) -> Self {
260 let started_at_secs = current_unix_secs();
261 let now_unix_nanos = SystemTime::now()
262 .duration_since(SystemTime::UNIX_EPOCH)
263 .unwrap_or_default()
264 .as_nanos();
265 Self {
266 observability: ObservabilityPipeline::with_backpressure_config(
267 journal_capacity,
268 subscriber_capacity,
269 true,
270 true,
271 backpressure_config,
272 ),
273 meltdown_tracker,
274 failure_window,
275 cold_start_budget: ColdStartBudget::new(60, 5, started_at_secs),
276 hot_loop_detector: HotLoopDetector::new(10, 3),
277 budget_tracker: RestartBudgetTracker::new(budget_config, now_unix_nanos),
278 group_isolation: GroupIsolationPolicy::new(group_dependencies),
279 }
280 }
281
282 pub fn execute_pipeline(
295 &mut self,
296 mut ctx: PipelineContext,
297 exit: TaskExit,
298 spec: &SupervisorSpec,
299 tree: &SupervisorTree,
300 ) -> PipelineContext {
301 let now = Instant::now();
302 let now_unix_nanos = SystemTime::now()
303 .duration_since(SystemTime::UNIX_EPOCH)
304 .unwrap_or_default()
305 .as_nanos();
306
307 ctx = self.stage_classify_exit(ctx, &exit, now_unix_nanos);
309
310 ctx = self.stage_record_failure_window(ctx, now, now_unix_nanos);
312
313 ctx = self.stage_evaluate_budget(ctx, spec, tree, now, now_unix_nanos);
315
316 ctx = self.stage_decide_action(ctx, now_unix_nanos);
318
319 ctx = self.stage_emit_typed_event(ctx, &exit, now_unix_nanos);
321
322 ctx = self.stage_execute_action(ctx, now_unix_nanos);
324
325 ctx
326 }
327
328 pub(crate) fn stage_classify_exit(
339 &self,
340 mut ctx: PipelineContext,
341 exit: &TaskExit,
342 completed_at_unix_nanos: u128,
343 ) -> PipelineContext {
344 let classification = ctx
345 .exit_classification
346 .clone()
347 .unwrap_or_else(|| match exit {
348 TaskExit::Succeeded => ExitClassification::Success,
349 TaskExit::Failed { kind, .. } => match kind {
350 PolicyFailureKind::Cancelled => ExitClassification::ExternalCancel,
351 PolicyFailureKind::Panic => ExitClassification::Crash {
352 reason: "panic".to_string(),
353 },
354 PolicyFailureKind::Timeout => ExitClassification::Timeout,
355 _ => ExitClassification::NonZeroExit { exit_code: -1 },
356 },
357 });
358
359 ctx.exit_classification = Some(classification);
360 append_stage_diagnostic(
361 &mut ctx,
362 PipelineStage::ClassifyExit,
363 completed_at_unix_nanos,
364 );
365 ctx
366 }
367
368 fn stage_record_failure_window(
379 &mut self,
380 mut ctx: PipelineContext,
381 now: Instant,
382 completed_at_unix_nanos: u128,
383 ) -> PipelineContext {
384 if let Some(ref classification) = ctx.exit_classification
386 && classification.should_restart()
387 {
388 let state = self.failure_window.record_failure(now);
389 ctx.failure_window_state = Some(format!(
390 "count={}, threshold_reached={}",
391 state.current_count, state.threshold_reached
392 ));
393 }
394 append_stage_diagnostic(
395 &mut ctx,
396 PipelineStage::RecordFailureWindow,
397 completed_at_unix_nanos,
398 );
399 ctx
400 }
401
402 fn stage_evaluate_budget(
414 &mut self,
415 mut ctx: PipelineContext,
416 spec: &SupervisorSpec,
417 tree: &SupervisorTree,
418 now: Instant,
419 completed_at_unix_nanos: u128,
420 ) -> PipelineContext {
421 let plan = restart_execution_plan(tree, spec, &ctx.child_id);
423
424 let restart_failure_count = self.failure_window.failure_count() as u32;
425 let restart_limit = effective_restart_limit(&ctx, plan.restart_limit);
426 let escalation_policy = effective_escalation_policy(&ctx, plan.escalation_policy);
427 let remaining =
428 restart_limit.map(|limit| limit.max_restarts.saturating_sub(restart_failure_count));
429
430 let limit_exhausted =
431 restart_limit.is_some_and(|limit| restart_failure_count > limit.max_restarts);
432 let group_id = plan.group.clone();
433 let should_restart = ctx
434 .exit_classification
435 .as_ref()
436 .is_some_and(ExitClassification::should_restart);
437
438 let budget_verdict = if should_restart {
440 Some(self.budget_tracker.try_consume(completed_at_unix_nanos))
441 } else {
442 None
443 };
444
445 let budget_exhausted = budget_verdict
447 .as_ref()
448 .is_some_and(|v| matches!(v, BudgetVerdict::Exhausted { .. }));
449 let effective_should_restart = should_restart && !budget_exhausted;
450
451 let now_secs = nanos_to_secs(completed_at_unix_nanos);
452 if should_restart {
453 let exhausted = self.cold_start_budget.record_restart(now_secs);
454 ctx.cold_start_reason = if exhausted {
455 ColdStartReason::BudgetExhausted
456 } else if self.cold_start_budget.is_window_active(now_secs) {
457 ColdStartReason::InitialStartup
458 } else {
459 ColdStartReason::NotApplicable
460 };
461
462 if self.hot_loop_detector.record_crash(now_secs) {
463 ctx.hot_loop_reason = HotLoopReason::RapidCrashDetected;
464 }
465 }
466
467 let meltdown_outcome = if effective_should_restart {
468 self.meltdown_tracker.record_child_restart_with_group(
469 ctx.child_id.clone(),
470 group_id.clone(),
471 now,
472 );
473 let merged = merge_meltdown_verdicts(
474 child_local_verdict(&self.meltdown_tracker, &ctx.child_id),
475 group_local_verdict(&self.meltdown_tracker, group_id.as_deref()),
476 supervisor_local_verdict(&self.meltdown_tracker),
477 );
478 ctx.scopes_triggered = merged.scopes_triggered;
479 ctx.lead_scope = merged.lead_scope;
480 merged.effective_outcome
481 } else {
482 MeltdownOutcome::Continue
483 };
484
485 if matches!(meltdown_outcome, MeltdownOutcome::GroupFuse)
488 && let Some(ref gid) = group_id
489 {
490 let affected: Vec<String> = ctx
492 .group_id
493 .iter()
494 .filter(|g| self.group_isolation.affected_by(g, gid))
495 .cloned()
496 .collect();
497 if !affected.is_empty() {
498 self.meltdown_tracker.propagate_fuse(gid, &affected);
499 }
500 }
501
502 if let Some(ref policy) = ctx.effective_policy {
504 use crate::policy::task_role_defaults::SeverityClass;
505 match policy.severity {
506 SeverityClass::Critical => {
507 ctx.stage_diagnostics.push(PipelineStageDiagnostic::new(
509 ctx.sequence,
510 ctx.correlation_id.clone(),
511 PipelineStage::EvaluateBudget,
512 completed_at_unix_nanos,
513 ));
514 }
515 SeverityClass::Optional => {
516 }
518 SeverityClass::Standard => {
519 }
521 }
522 }
523
524 ctx.budget_evaluation = Some(BudgetEvaluation {
525 remaining_restarts: remaining,
526 limit_exhausted,
527 escalation_policy: escalation_policy.map(|policy| format!("{policy:?}")),
528 meltdown_outcome,
529 budget_verdict,
530 });
531
532 ctx.group_id = group_id;
534
535 append_stage_diagnostic(
536 &mut ctx,
537 PipelineStage::EvaluateBudget,
538 completed_at_unix_nanos,
539 );
540 ctx
541 }
542
543 pub(crate) fn stage_decide_action(
553 &self,
554 mut ctx: PipelineContext,
555 completed_at_unix_nanos: u128,
556 ) -> PipelineContext {
557 let classification = ctx.exit_classification.as_ref();
558 let budget = ctx.budget_evaluation.as_ref();
559
560 let (mut action, mut reason) = match classification {
561 Some(ExitClassification::ExternalCancel) | Some(ExitClassification::ManualStop) => (
562 ProtectionAction::SupervisedStop,
563 "external_cancel_or_manual_stop".to_string(),
564 ),
565 Some(classification) => {
566 role_or_budget_action(classification, ctx.effective_policy.as_ref(), budget)
567 }
568 None => budget_action(ctx.effective_policy.as_ref(), budget),
569 };
570
571 if let Some(budget_eval) = budget {
572 let meltdown_action = protection_action_for_meltdown(budget_eval.meltdown_outcome);
573 if meltdown_action > action {
574 action = meltdown_action;
575 reason = meltdown_reason(action).to_string();
576 }
577 }
578 if ctx.cold_start_reason == ColdStartReason::BudgetExhausted
579 && ProtectionAction::RestartDenied > action
580 {
581 action = ProtectionAction::RestartDenied;
582 reason = "cold_start_budget_exhausted".to_string();
583 }
584 if ctx.hot_loop_reason != HotLoopReason::NotApplicable
585 && ProtectionAction::SupervisionPaused > action
586 {
587 action = ProtectionAction::SupervisionPaused;
588 reason = "hot_loop_detected".to_string();
589 }
590
591 ctx.action_decision = Some(ActionDecision {
592 action,
593 delay_ms: None,
594 reason,
595 });
596
597 append_stage_diagnostic(
598 &mut ctx,
599 PipelineStage::DecideAction,
600 completed_at_unix_nanos,
601 );
602 ctx
603 }
604
605 fn stage_emit_typed_event(
625 &mut self,
626 ctx: PipelineContext,
627 exit: &TaskExit,
628 now_unix_nanos: u128,
629 ) -> PipelineContext {
630 let what = self.build_policy_aware_what(&ctx, exit);
632
633 let location = Where::new(ctx.supervisor_path.clone())
635 .with_child(ctx.child_id.clone(), "pipeline-child");
636
637 let event_correlation_id = crate::event::time::CorrelationId::from_uuid(
638 uuid::Uuid::parse_str(&ctx.correlation_id).unwrap_or(uuid::Uuid::nil()),
639 );
640 let mut event = SupervisorEvent::new(
641 crate::event::time::When::new(crate::event::time::EventTime::deterministic(
642 now_unix_nanos,
643 now_unix_nanos,
644 0,
645 crate::id::types::Generation::initial(),
646 crate::id::types::ChildStartCount::first(),
647 )),
648 location,
649 what,
650 crate::event::time::EventSequence::new(ctx.sequence),
651 event_correlation_id,
652 1,
653 );
654
655 event.effective_protective_action = ctx.action_decision.as_ref().map(|d| d.action);
657 event.cold_start_reason = ctx.cold_start_reason.clone();
658 event.hot_loop_reason = ctx.hot_loop_reason.clone();
659 event.throttle_gate_owner = ctx.throttle_gate_owner.clone();
660 event.scopes_triggered = ctx.scopes_triggered.clone();
661 event.lead_scope = ctx.lead_scope;
662 if let Some(effective_policy) = ctx.effective_policy.as_ref() {
663 event.task_role = Some(effective_policy.task_role);
664 event.used_fallback_default = effective_policy.used_fallback;
665 event.effective_policy_source = Some(effective_policy.source);
666 }
667
668 let _lagged = self.observability.emit(event);
670
671 let mut ctx = ctx;
672 append_stage_diagnostic(&mut ctx, PipelineStage::EmitTypedEvent, now_unix_nanos);
673 ctx
674 }
675
676 fn build_policy_aware_what(&self, ctx: &PipelineContext, exit: &TaskExit) -> What {
693 if let Some(ref budget_eval) = ctx.budget_evaluation
695 && let Some(ref verdict) = budget_eval.budget_verdict
696 && let BudgetVerdict::Exhausted { retry_after_ns } = verdict
697 {
698 tracing::warn!(
702 target: "rust_supervisor::policy::budget",
703 child_id = %ctx.child_id,
704 retry_after_ns = %retry_after_ns,
705 "BudgetExhausted rate alert: check restart_budget configuration \
706 (threshold: >10 events/minute indicates budget may be too tight)"
707 );
708
709 return What::BudgetExhausted {
710 child_id: ctx.child_id.clone(),
711 retry_after_ns: *retry_after_ns,
712 budget_source_group: ctx.group_id.clone(),
713 };
714 }
715
716 if let Some(ref budget_eval) = ctx.budget_evaluation
718 && matches!(
719 budget_eval.meltdown_outcome,
720 crate::policy::meltdown::MeltdownOutcome::GroupFuse
721 )
722 {
723 return What::GroupFuseTriggered {
724 group_name: ctx
725 .group_id
726 .clone()
727 .unwrap_or_else(|| "unknown".to_string()),
728 propagated_from_group: None,
729 };
730 }
731
732 if let Some(ref policy) = ctx.effective_policy {
734 use crate::policy::task_role_defaults::SeverityClass;
735 match policy.severity {
736 SeverityClass::Critical | SeverityClass::Optional => {
737 let budget_verdict_str = ctx
738 .budget_evaluation
739 .as_ref()
740 .and_then(|be| be.budget_verdict.as_ref())
741 .map(|v| match v {
742 BudgetVerdict::Granted => "granted".to_string(),
743 BudgetVerdict::Exhausted { retry_after_ns } => {
744 format!("exhausted:retry_after_ns={retry_after_ns}")
745 }
746 });
747 let fuse_outcome_str = ctx
748 .budget_evaluation
749 .as_ref()
750 .map(|be| format!("{:?}", be.meltdown_outcome));
751 return What::EscalationBifurcated {
752 severity: format!("{:?}", policy.severity),
753 budget_verdict: budget_verdict_str,
754 fuse_outcome: fuse_outcome_str,
755 tie_break_reason: None,
756 };
757 }
758 SeverityClass::Standard => {
759 }
761 }
762 }
763
764 match exit {
766 TaskExit::Succeeded => What::ChildRunning { transition: None },
767 TaskExit::Failed { .. } => What::ChildFailed {
768 failure: TaskFailure::new(
769 crate::error::types::TaskFailureKind::Error,
770 "pipeline_exit",
771 "processed through six-stage pipeline",
772 ),
773 },
774 }
775 }
776
777 fn stage_execute_action(
787 &self,
788 mut ctx: PipelineContext,
789 completed_at_unix_nanos: u128,
790 ) -> PipelineContext {
791 ctx.execution_result = if let Some(ref decision) = ctx.action_decision {
792 Some(match decision.action {
793 ProtectionAction::RestartAllowed => "restart_allowed_for_runtime".to_string(),
794 ProtectionAction::RestartQueued => "restart_queued".to_string(),
795 ProtectionAction::RestartDenied => "restart_denied".to_string(),
796 ProtectionAction::SupervisionPaused => "supervision_paused".to_string(),
797 ProtectionAction::Escalated => "escalated".to_string(),
798 ProtectionAction::SupervisedStop => "supervised_stop".to_string(),
799 })
800 } else {
801 Some("no_decision".to_string())
802 };
803
804 append_stage_diagnostic(
805 &mut ctx,
806 PipelineStage::ExecuteAction,
807 completed_at_unix_nanos,
808 );
809 ctx
810 }
811}
812
813fn effective_restart_limit(
824 ctx: &PipelineContext,
825 plan_limit: Option<RestartLimit>,
826) -> Option<RestartLimit> {
827 plan_limit.or_else(|| {
828 ctx.effective_policy
829 .as_ref()
830 .and_then(|policy| policy.policy_pack.default_restart_limit)
831 })
832}
833
834fn effective_escalation_policy(
845 ctx: &PipelineContext,
846 plan_policy: Option<EscalationPolicy>,
847) -> Option<EscalationPolicy> {
848 plan_policy.or_else(|| {
849 ctx.effective_policy
850 .as_ref()
851 .and_then(|policy| policy.policy_pack.default_escalation_policy)
852 })
853}
854
855fn role_or_budget_action(
867 classification: &ExitClassification,
868 effective_policy: Option<&EffectivePolicy>,
869 budget: Option<&BudgetEvaluation>,
870) -> (ProtectionAction, String) {
871 let Some(effective_policy) = effective_policy else {
872 return budget_action(None, budget);
873 };
874 match classification {
875 ExitClassification::Success => match effective_policy.policy_pack.on_success_exit {
876 OnSuccessAction::Restart => (
877 ProtectionAction::RestartAllowed,
878 "role_success_restart".to_string(),
879 ),
880 OnSuccessAction::Stop | OnSuccessAction::NoOp => (
881 ProtectionAction::SupervisedStop,
882 "role_success_stop".to_string(),
883 ),
884 },
885 ExitClassification::Timeout => match effective_policy.policy_pack.on_timeout {
886 OnTimeoutAction::RestartWithBackoff => budget_action(Some(effective_policy), budget),
887 OnTimeoutAction::StopAndEscalate => (
888 ProtectionAction::Escalated,
889 "role_timeout_escalate".to_string(),
890 ),
891 },
892 ExitClassification::NonZeroExit { .. } | ExitClassification::Crash { .. } => {
893 match effective_policy.policy_pack.on_failure_exit {
894 OnFailureAction::RestartWithBackoff | OnFailureAction::RestartPermanent => {
895 budget_action(Some(effective_policy), budget)
896 }
897 OnFailureAction::StopAndEscalate => (
898 ProtectionAction::Escalated,
899 "role_failure_escalate".to_string(),
900 ),
901 }
902 }
903 ExitClassification::ExternalCancel | ExitClassification::ManualStop => (
904 ProtectionAction::SupervisedStop,
905 "external_cancel_or_manual_stop".to_string(),
906 ),
907 }
908}
909
910fn budget_action(
921 effective_policy: Option<&EffectivePolicy>,
922 budget: Option<&BudgetEvaluation>,
923) -> (ProtectionAction, String) {
924 let Some(budget_eval) = budget else {
925 return (
926 ProtectionAction::RestartAllowed,
927 "within_restart_budget".to_string(),
928 );
929 };
930 if !budget_eval.limit_exhausted {
931 return (
932 ProtectionAction::RestartAllowed,
933 "within_restart_budget".to_string(),
934 );
935 }
936 match effective_policy
937 .map(|policy| policy.policy_pack.on_budget_exhausted)
938 .unwrap_or(OnBudgetExhaustedAction::Quarantine)
939 {
940 OnBudgetExhaustedAction::StopAndEscalate => (
941 ProtectionAction::Escalated,
942 "restart_limit_exhausted".to_string(),
943 ),
944 OnBudgetExhaustedAction::Quarantine => (
945 ProtectionAction::RestartDenied,
946 "restart_limit_exhausted".to_string(),
947 ),
948 }
949}
950
951fn meltdown_reason(action: ProtectionAction) -> &'static str {
961 match action {
962 ProtectionAction::RestartDenied => "meltdown_child_fuse",
963 ProtectionAction::SupervisionPaused => "meltdown_group_fuse",
964 ProtectionAction::Escalated => "meltdown_supervisor_fuse",
965 ProtectionAction::RestartAllowed => "within_restart_budget",
966 ProtectionAction::RestartQueued => "restart_queued_by_throttle",
967 ProtectionAction::SupervisedStop => "external_cancel_or_manual_stop",
968 }
969}
970
971fn append_stage_diagnostic(
983 ctx: &mut PipelineContext,
984 stage: PipelineStage,
985 completed_at_unix_nanos: u128,
986) {
987 let mut diagnostic = PipelineStageDiagnostic::new(
988 ctx.sequence,
989 ctx.correlation_id.clone(),
990 stage,
991 completed_at_unix_nanos,
992 )
993 .with_child_id(ctx.child_id.value.clone())
994 .with_supervisor_path(ctx.supervisor_path.to_string());
995
996 diagnostic.group_id = ctx.group_id.clone();
997 diagnostic.exit_classification = ctx
998 .exit_classification
999 .as_ref()
1000 .map(|classification| classification.as_str().to_string());
1001 diagnostic.failure_window_state = ctx.failure_window_state.clone();
1002 diagnostic.budget_evaluation = ctx.budget_evaluation.as_ref().map(|budget| {
1003 format!(
1004 "remaining_restarts={:?}, limit_exhausted={}, escalation_policy={:?}, meltdown_outcome={:?}",
1005 budget.remaining_restarts,
1006 budget.limit_exhausted,
1007 budget.escalation_policy,
1008 budget.meltdown_outcome
1009 )
1010 });
1011 diagnostic.decided_action = ctx
1012 .action_decision
1013 .as_ref()
1014 .map(|decision| decision.action.to_string());
1015 diagnostic.event_emitted = stage == PipelineStage::EmitTypedEvent;
1016 diagnostic.execution_result = ctx.execution_result.clone();
1017
1018 ctx.stage_diagnostics.push(diagnostic);
1019}
1020
1021fn current_unix_secs() -> u64 {
1031 SystemTime::now()
1032 .duration_since(SystemTime::UNIX_EPOCH)
1033 .map_or(0, |duration| duration.as_secs())
1034}
1035
1036fn nanos_to_secs(nanos: u128) -> u64 {
1046 (nanos / 1_000_000_000).min(u128::from(u64::MAX)) as u64
1047}
1048
1049fn child_local_verdict(tracker: &MeltdownTracker, child_id: &ChildId) -> LocalVerdict {
1060 let triggered =
1061 tracker.child_failure_count(child_id) >= tracker.policy.child_max_restarts as usize;
1062 LocalVerdict {
1063 triggered,
1064 outcome: if triggered {
1065 MeltdownOutcome::ChildFuse
1066 } else {
1067 MeltdownOutcome::Continue
1068 },
1069 }
1070}
1071
1072fn group_local_verdict(tracker: &MeltdownTracker, group_id: Option<&str>) -> LocalVerdict {
1083 let triggered = group_id.is_some_and(|group| {
1084 tracker.group_failure_count(group) >= tracker.policy.group_max_failures as usize
1085 });
1086 LocalVerdict {
1087 triggered,
1088 outcome: if triggered {
1089 MeltdownOutcome::GroupFuse
1090 } else {
1091 MeltdownOutcome::Continue
1092 },
1093 }
1094}
1095
1096fn supervisor_local_verdict(tracker: &MeltdownTracker) -> LocalVerdict {
1106 let triggered = tracker.get_supervisor_outcome() == MeltdownOutcome::SupervisorFuse;
1107 LocalVerdict {
1108 triggered,
1109 outcome: if triggered {
1110 MeltdownOutcome::SupervisorFuse
1111 } else {
1112 MeltdownOutcome::Continue
1113 },
1114 }
1115}
1116
1117fn protection_action_for_meltdown(outcome: MeltdownOutcome) -> ProtectionAction {
1127 match outcome {
1128 MeltdownOutcome::Continue => ProtectionAction::RestartAllowed,
1129 MeltdownOutcome::ChildFuse => ProtectionAction::RestartDenied,
1130 MeltdownOutcome::GroupFuse => ProtectionAction::SupervisionPaused,
1131 MeltdownOutcome::SupervisorFuse => ProtectionAction::Escalated,
1132 }
1133}