Skip to main content

rust_supervisor/runtime/
pipeline.rs

1//! Six-stage supervision pipeline orchestration.
2//!
3//! This module implements the unified failure processing pipeline:
4//! 1. **classify exit**: Classify the exit reason and category
5//! 2. **record failure window**: Record failure into sliding window
6//! 3. **evaluate budget**: Evaluate restart budget and limits from restart_execution_plan
7//! 4. **decide action**: Decide protective action based on merged verdicts
8//! 5. **emit typed event**: Emit structured supervision event with all diagnostic fields
9//! 6. **execute action**: Execute the decided action (restart, queue, deny, etc.)
10
11use crate::error::types::TaskFailure;
12use crate::event::payload::{
13    ColdStartReason, HotLoopReason, MeltdownScope, ProtectionAction, SupervisorEvent,
14    ThrottleGateOwner, What, Where,
15};
16use crate::id::types::{ChildId, SupervisorPath};
17use crate::observe::pipeline::{ObservabilityPipeline, PipelineStage, PipelineStageDiagnostic};
18use crate::policy::backoff::{ColdStartBudget, HotLoopDetector};
19use crate::policy::budget::{BudgetVerdict, RestartBudgetConfig, RestartBudgetTracker};
20use crate::policy::decision::{PolicyFailureKind, TaskExit};
21use crate::policy::failure_window::FailureWindow;
22use crate::policy::group::{GroupDependencyEdge, GroupIsolationPolicy};
23use crate::policy::meltdown::{
24    LocalVerdict, MeltdownOutcome, MeltdownTracker, merge_meltdown_verdicts,
25};
26use crate::policy::task_role_defaults::{
27    EffectivePolicy, OnBudgetExhaustedAction, OnFailureAction, OnSuccessAction, OnTimeoutAction,
28};
29use crate::spec::supervisor::{BackpressureConfig, EscalationPolicy, RestartLimit, SupervisorSpec};
30use crate::tree::builder::SupervisorTree;
31use crate::tree::order::restart_execution_plan;
32use std::time::{Instant, SystemTime};
33
34/// Exit classification result from stage 1.
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ExitClassification {
37    /// Successful completion.
38    Success,
39    /// Non-zero exit code.
40    NonZeroExit { exit_code: i32 },
41    /// Process crash or panic.
42    Crash { reason: String },
43    /// Timeout exceeded.
44    Timeout,
45    /// External cancellation requested.
46    ExternalCancel,
47    /// Manual stop requested by operator.
48    ManualStop,
49}
50
51impl ExitClassification {
52    /// Returns a string representation for diagnostics.
53    pub fn as_str(&self) -> &'static str {
54        match self {
55            Self::Success => "success",
56            Self::NonZeroExit { .. } => "nonzero_exit",
57            Self::Crash { .. } => "panic",
58            Self::Timeout => "timeout",
59            Self::ExternalCancel => "external_cancel",
60            Self::ManualStop => "manual_stop",
61        }
62    }
63
64    /// Checks if this exit should trigger automatic restart.
65    pub fn should_restart(&self) -> bool {
66        match self {
67            Self::Success => false,
68            Self::NonZeroExit { .. } => true,
69            Self::Crash { .. } => true,
70            Self::Timeout => true,
71            Self::ExternalCancel => false,
72            Self::ManualStop => false,
73        }
74    }
75}
76
77/// Budget evaluation result from stage 3.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct BudgetEvaluation {
80    /// Remaining restart count before limit is reached.
81    pub remaining_restarts: Option<u32>,
82    /// Whether the restart limit has been exhausted.
83    pub limit_exhausted: bool,
84    /// Escalation policy if defined.
85    pub escalation_policy: Option<String>,
86    /// Effective meltdown outcome after merging local verdicts.
87    pub meltdown_outcome: MeltdownOutcome,
88    /// Budget verdict from the token bucket check (Granted or Exhausted).
89    pub budget_verdict: Option<BudgetVerdict>,
90}
91
92/// Final decision from stage 4.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct ActionDecision {
95    /// The chosen protection action.
96    pub action: ProtectionAction,
97    /// Optional delay before execution.
98    pub delay_ms: Option<u64>,
99    /// Reason for the decision.
100    pub reason: String,
101}
102
103/// Complete pipeline context carrying state through all six stages.
104#[derive(Debug, Clone)]
105pub struct PipelineContext {
106    /// Child identifier being supervised.
107    pub child_id: ChildId,
108    /// Supervisor path owning the scope.
109    pub supervisor_path: SupervisorPath,
110    /// Group identifier if the child belongs to a group.
111    pub group_id: Option<String>,
112    /// Exit classification from stage 1.
113    pub exit_classification: Option<ExitClassification>,
114    /// Failure window state from stage 2.
115    pub failure_window_state: Option<String>,
116    /// Budget evaluation from stage 3.
117    pub budget_evaluation: Option<BudgetEvaluation>,
118    /// Action decision from stage 4.
119    pub action_decision: Option<ActionDecision>,
120    /// Cold start reason determined during evaluation.
121    pub cold_start_reason: ColdStartReason,
122    /// Hot loop reason determined during detection.
123    pub hot_loop_reason: HotLoopReason,
124    /// Throttle gate owner that limited concurrent restarts.
125    pub throttle_gate_owner: ThrottleGateOwner,
126    /// Effective role policy applied to this pipeline run.
127    pub effective_policy: Option<EffectivePolicy>,
128    /// Meltdown scopes that triggered in this pipeline round.
129    pub scopes_triggered: Vec<MeltdownScope>,
130    /// Dominant meltdown scope selected for attribution.
131    pub lead_scope: Option<MeltdownScope>,
132    /// Stage diagnostics emitted by the six-stage pipeline.
133    pub stage_diagnostics: Vec<PipelineStageDiagnostic>,
134    /// Result summary produced by the execute action stage.
135    pub execution_result: Option<String>,
136    /// Event sequence number.
137    pub sequence: u64,
138    /// Correlation identifier.
139    pub correlation_id: String,
140}
141
142impl PipelineContext {
143    /// Creates a new pipeline context.
144    ///
145    /// # Arguments
146    ///
147    /// - `child_id`: Child identifier.
148    /// - `supervisor_path`: Supervisor path.
149    /// - `sequence`: Event sequence number.
150    /// - `correlation_id`: Correlation identifier.
151    ///
152    /// # Returns
153    ///
154    /// Returns a new [`PipelineContext`].
155    pub fn new(
156        child_id: ChildId,
157        supervisor_path: SupervisorPath,
158        sequence: u64,
159        correlation_id: impl Into<String>,
160    ) -> Self {
161        Self {
162            child_id,
163            supervisor_path,
164            group_id: None,
165            exit_classification: None,
166            failure_window_state: None,
167            budget_evaluation: None,
168            action_decision: None,
169            cold_start_reason: ColdStartReason::NotApplicable,
170            hot_loop_reason: HotLoopReason::NotApplicable,
171            throttle_gate_owner: ThrottleGateOwner::None,
172            effective_policy: None,
173            scopes_triggered: Vec::new(),
174            lead_scope: None,
175            stage_diagnostics: Vec::new(),
176            execution_result: None,
177            sequence,
178            correlation_id: correlation_id.into(),
179        }
180    }
181}
182
183/// Six-stage supervision pipeline orchestrator.
184#[derive(Debug)]
185pub struct SupervisionPipeline {
186    /// Observability pipeline for event emission.
187    pub observability: ObservabilityPipeline,
188    /// Meltdown tracker for failure counting.
189    pub meltdown_tracker: MeltdownTracker,
190    /// Failure window for sliding accumulation.
191    pub failure_window: FailureWindow,
192    /// Cold start restart budget for initial startup protection.
193    pub cold_start_budget: ColdStartBudget,
194    /// Hot loop detector for rapid crash-restart cycles.
195    pub hot_loop_detector: HotLoopDetector,
196    /// Restart budget tracker for effective restart rate limiting.
197    pub budget_tracker: RestartBudgetTracker,
198    /// Group isolation policy for cross-group fault boundary enforcement.
199    pub group_isolation: GroupIsolationPolicy,
200}
201
202impl SupervisionPipeline {
203    /// Creates a new supervision pipeline.
204    ///
205    /// # Arguments
206    ///
207    /// - `journal_capacity`: Event journal capacity.
208    /// - `subscriber_capacity`: Subscriber queue capacity.
209    /// - `meltdown_tracker`: Configured meltdown tracker.
210    /// - `failure_window`: Configured failure window.
211    /// - `budget_config`: Restart budget configuration.
212    /// - `group_dependencies`: Declared group dependency edges.
213    ///
214    /// # Returns
215    ///
216    /// Returns a new [`SupervisionPipeline`].
217    pub fn new(
218        journal_capacity: usize,
219        subscriber_capacity: usize,
220        meltdown_tracker: MeltdownTracker,
221        failure_window: FailureWindow,
222        budget_config: RestartBudgetConfig,
223        group_dependencies: Vec<GroupDependencyEdge>,
224    ) -> Self {
225        Self::with_backpressure_config(
226            journal_capacity,
227            subscriber_capacity,
228            meltdown_tracker,
229            failure_window,
230            budget_config,
231            group_dependencies,
232            BackpressureConfig::default(),
233        )
234    }
235
236    /// Creates a new supervision pipeline with explicit backpressure policy.
237    ///
238    /// # Arguments
239    ///
240    /// - `journal_capacity`: Event journal capacity.
241    /// - `subscriber_capacity`: Subscriber queue capacity.
242    /// - `meltdown_tracker`: Configured meltdown tracker.
243    /// - `failure_window`: Configured failure window.
244    /// - `budget_config`: Restart budget configuration.
245    /// - `group_dependencies`: Declared group dependency edges.
246    /// - `backpressure_config`: Backpressure thresholds and strategy.
247    ///
248    /// # Returns
249    ///
250    /// Returns a new [`SupervisionPipeline`].
251    pub fn with_backpressure_config(
252        journal_capacity: usize,
253        subscriber_capacity: usize,
254        meltdown_tracker: MeltdownTracker,
255        failure_window: FailureWindow,
256        budget_config: RestartBudgetConfig,
257        group_dependencies: Vec<GroupDependencyEdge>,
258        backpressure_config: BackpressureConfig,
259    ) -> Self {
260        let started_at_secs = current_unix_secs();
261        let now_unix_nanos = SystemTime::now()
262            .duration_since(SystemTime::UNIX_EPOCH)
263            .unwrap_or_default()
264            .as_nanos();
265        Self {
266            observability: ObservabilityPipeline::with_backpressure_config(
267                journal_capacity,
268                subscriber_capacity,
269                true,
270                true,
271                backpressure_config,
272            ),
273            meltdown_tracker,
274            failure_window,
275            cold_start_budget: ColdStartBudget::new(60, 5, started_at_secs),
276            hot_loop_detector: HotLoopDetector::new(10, 3),
277            budget_tracker: RestartBudgetTracker::new(budget_config, now_unix_nanos),
278            group_isolation: GroupIsolationPolicy::new(group_dependencies),
279        }
280    }
281
282    /// Executes the complete six-stage pipeline for a child exit.
283    ///
284    /// # Arguments
285    ///
286    /// - `ctx`: Pipeline context with child information.
287    /// - `exit`: The task exit to process.
288    /// - `spec`: Supervisor specification for restart_execution_plan.
289    /// - `tree`: Supervisor tree for scope calculation.
290    ///
291    /// # Returns
292    ///
293    /// Returns the updated pipeline context with all stage results.
294    pub fn execute_pipeline(
295        &mut self,
296        mut ctx: PipelineContext,
297        exit: TaskExit,
298        spec: &SupervisorSpec,
299        tree: &SupervisorTree,
300    ) -> PipelineContext {
301        let now = Instant::now();
302        let now_unix_nanos = SystemTime::now()
303            .duration_since(SystemTime::UNIX_EPOCH)
304            .unwrap_or_default()
305            .as_nanos();
306
307        // Stage 1: Classify Exit
308        ctx = self.stage_classify_exit(ctx, &exit, now_unix_nanos);
309
310        // Stage 2: Record Failure Window
311        ctx = self.stage_record_failure_window(ctx, now, now_unix_nanos);
312
313        // Stage 3: Evaluate Budget
314        ctx = self.stage_evaluate_budget(ctx, spec, tree, now, now_unix_nanos);
315
316        // Stage 4: Decide Action
317        ctx = self.stage_decide_action(ctx, now_unix_nanos);
318
319        // Stage 5: Emit Typed Event
320        ctx = self.stage_emit_typed_event(ctx, &exit, now_unix_nanos);
321
322        // Stage 6: Execute Action
323        ctx = self.stage_execute_action(ctx, now_unix_nanos);
324
325        ctx
326    }
327
328    /// Stage 1: Classify the exit reason and category.
329    ///
330    /// # Arguments
331    ///
332    /// - `ctx`: Current pipeline context.
333    /// - `exit`: Task exit to classify.
334    ///
335    /// # Returns
336    ///
337    /// Returns the updated context with exit classification.
338    pub(crate) fn stage_classify_exit(
339        &self,
340        mut ctx: PipelineContext,
341        exit: &TaskExit,
342        completed_at_unix_nanos: u128,
343    ) -> PipelineContext {
344        let classification = ctx
345            .exit_classification
346            .clone()
347            .unwrap_or_else(|| match exit {
348                TaskExit::Succeeded => ExitClassification::Success,
349                TaskExit::Failed { kind, .. } => match kind {
350                    PolicyFailureKind::Cancelled => ExitClassification::ExternalCancel,
351                    PolicyFailureKind::Panic => ExitClassification::Crash {
352                        reason: "panic".to_string(),
353                    },
354                    PolicyFailureKind::Timeout => ExitClassification::Timeout,
355                    _ => ExitClassification::NonZeroExit { exit_code: -1 },
356                },
357            });
358
359        ctx.exit_classification = Some(classification);
360        append_stage_diagnostic(
361            &mut ctx,
362            PipelineStage::ClassifyExit,
363            completed_at_unix_nanos,
364        );
365        ctx
366    }
367
368    /// Stage 2: Record failure into sliding window.
369    ///
370    /// # Arguments
371    ///
372    /// - `ctx`: Current pipeline context.
373    /// - `now`: Current monotonic time.
374    ///
375    /// # Returns
376    ///
377    /// Returns the updated context with failure window state.
378    fn stage_record_failure_window(
379        &mut self,
380        mut ctx: PipelineContext,
381        now: Instant,
382        completed_at_unix_nanos: u128,
383    ) -> PipelineContext {
384        // Only record failures, not successes
385        if let Some(ref classification) = ctx.exit_classification
386            && classification.should_restart()
387        {
388            let state = self.failure_window.record_failure(now);
389            ctx.failure_window_state = Some(format!(
390                "count={}, threshold_reached={}",
391                state.current_count, state.threshold_reached
392            ));
393        }
394        append_stage_diagnostic(
395            &mut ctx,
396            PipelineStage::RecordFailureWindow,
397            completed_at_unix_nanos,
398        );
399        ctx
400    }
401
402    /// Stage 3: Evaluate restart budget and limits.
403    ///
404    /// # Arguments
405    ///
406    /// - `ctx`: Current pipeline context.
407    /// - `spec`: Supervisor specification.
408    /// - `tree`: Supervisor tree.
409    ///
410    /// # Returns
411    ///
412    /// Returns the updated context with budget evaluation.
413    fn stage_evaluate_budget(
414        &mut self,
415        mut ctx: PipelineContext,
416        spec: &SupervisorSpec,
417        tree: &SupervisorTree,
418        now: Instant,
419        completed_at_unix_nanos: u128,
420    ) -> PipelineContext {
421        // Get restart_execution_plan for this child
422        let plan = restart_execution_plan(tree, spec, &ctx.child_id);
423
424        let restart_failure_count = self.failure_window.failure_count() as u32;
425        let restart_limit = effective_restart_limit(&ctx, plan.restart_limit);
426        let escalation_policy = effective_escalation_policy(&ctx, plan.escalation_policy);
427        let remaining =
428            restart_limit.map(|limit| limit.max_restarts.saturating_sub(restart_failure_count));
429
430        let limit_exhausted =
431            restart_limit.is_some_and(|limit| restart_failure_count > limit.max_restarts);
432        let group_id = plan.group.clone();
433        let should_restart = ctx
434            .exit_classification
435            .as_ref()
436            .is_some_and(ExitClassification::should_restart);
437
438        // Budget check (stage 3a): budget → meltdown → backoff order
439        let budget_verdict = if should_restart {
440            Some(self.budget_tracker.try_consume(completed_at_unix_nanos))
441        } else {
442            None
443        };
444
445        // Budget exhaustion overrides should_restart (budget → meltdown order)
446        let budget_exhausted = budget_verdict
447            .as_ref()
448            .is_some_and(|v| matches!(v, BudgetVerdict::Exhausted { .. }));
449        let effective_should_restart = should_restart && !budget_exhausted;
450
451        let now_secs = nanos_to_secs(completed_at_unix_nanos);
452        if should_restart {
453            let exhausted = self.cold_start_budget.record_restart(now_secs);
454            ctx.cold_start_reason = if exhausted {
455                ColdStartReason::BudgetExhausted
456            } else if self.cold_start_budget.is_window_active(now_secs) {
457                ColdStartReason::InitialStartup
458            } else {
459                ColdStartReason::NotApplicable
460            };
461
462            if self.hot_loop_detector.record_crash(now_secs) {
463                ctx.hot_loop_reason = HotLoopReason::RapidCrashDetected;
464            }
465        }
466
467        let meltdown_outcome = if effective_should_restart {
468            self.meltdown_tracker.record_child_restart_with_group(
469                ctx.child_id.clone(),
470                group_id.clone(),
471                now,
472            );
473            let merged = merge_meltdown_verdicts(
474                child_local_verdict(&self.meltdown_tracker, &ctx.child_id),
475                group_local_verdict(&self.meltdown_tracker, group_id.as_deref()),
476                supervisor_local_verdict(&self.meltdown_tracker),
477            );
478            ctx.scopes_triggered = merged.scopes_triggered;
479            ctx.lead_scope = merged.lead_scope;
480            merged.effective_outcome
481        } else {
482            MeltdownOutcome::Continue
483        };
484
485        // Group isolation check: if meltdown triggered for a group,
486        // propagate to dependent groups that declared Full propagation edges.
487        if matches!(meltdown_outcome, MeltdownOutcome::GroupFuse)
488            && let Some(ref gid) = group_id
489        {
490            // Find all groups affected by this group's meltdown
491            let affected: Vec<String> = ctx
492                .group_id
493                .iter()
494                .filter(|g| self.group_isolation.affected_by(g, gid))
495                .cloned()
496                .collect();
497            if !affected.is_empty() {
498                self.meltdown_tracker.propagate_fuse(gid, &affected);
499            }
500        }
501
502        // Severity escalation bifurcation (US3): check EffectivePolicy.severity
503        if let Some(ref policy) = ctx.effective_policy {
504            use crate::policy::task_role_defaults::SeverityClass;
505            match policy.severity {
506                SeverityClass::Critical => {
507                    // Critical path: escalation (emit EscalationBifurcated later in emit stage)
508                    ctx.stage_diagnostics.push(PipelineStageDiagnostic::new(
509                        ctx.sequence,
510                        ctx.correlation_id.clone(),
511                        PipelineStage::EvaluateBudget,
512                        completed_at_unix_nanos,
513                    ));
514                }
515                SeverityClass::Optional => {
516                    // Optional path: noise reduction (no escalation alert)
517                }
518                SeverityClass::Standard => {
519                    // Standard path: follow TaskTask role defaults
520                }
521            }
522        }
523
524        ctx.budget_evaluation = Some(BudgetEvaluation {
525            remaining_restarts: remaining,
526            limit_exhausted,
527            escalation_policy: escalation_policy.map(|policy| format!("{policy:?}")),
528            meltdown_outcome,
529            budget_verdict,
530        });
531
532        // Set group_id from plan if available
533        ctx.group_id = group_id;
534
535        append_stage_diagnostic(
536            &mut ctx,
537            PipelineStage::EvaluateBudget,
538            completed_at_unix_nanos,
539        );
540        ctx
541    }
542
543    /// Stage 4: Decide protective action based on merged verdicts.
544    ///
545    /// # Arguments
546    ///
547    /// - `ctx`: Current pipeline context.
548    ///
549    /// # Returns
550    ///
551    /// Returns the updated context with action decision.
552    pub(crate) fn stage_decide_action(
553        &self,
554        mut ctx: PipelineContext,
555        completed_at_unix_nanos: u128,
556    ) -> PipelineContext {
557        let classification = ctx.exit_classification.as_ref();
558        let budget = ctx.budget_evaluation.as_ref();
559
560        let (mut action, mut reason) = match classification {
561            Some(ExitClassification::ExternalCancel) | Some(ExitClassification::ManualStop) => (
562                ProtectionAction::SupervisedStop,
563                "external_cancel_or_manual_stop".to_string(),
564            ),
565            Some(classification) => {
566                role_or_budget_action(classification, ctx.effective_policy.as_ref(), budget)
567            }
568            None => budget_action(ctx.effective_policy.as_ref(), budget),
569        };
570
571        if let Some(budget_eval) = budget {
572            let meltdown_action = protection_action_for_meltdown(budget_eval.meltdown_outcome);
573            if meltdown_action > action {
574                action = meltdown_action;
575                reason = meltdown_reason(action).to_string();
576            }
577        }
578        if ctx.cold_start_reason == ColdStartReason::BudgetExhausted
579            && ProtectionAction::RestartDenied > action
580        {
581            action = ProtectionAction::RestartDenied;
582            reason = "cold_start_budget_exhausted".to_string();
583        }
584        if ctx.hot_loop_reason != HotLoopReason::NotApplicable
585            && ProtectionAction::SupervisionPaused > action
586        {
587            action = ProtectionAction::SupervisionPaused;
588            reason = "hot_loop_detected".to_string();
589        }
590
591        ctx.action_decision = Some(ActionDecision {
592            action,
593            delay_ms: None,
594            reason,
595        });
596
597        append_stage_diagnostic(
598            &mut ctx,
599            PipelineStage::DecideAction,
600            completed_at_unix_nanos,
601        );
602        ctx
603    }
604
605    /// Stage 5: Emit typed supervision event with all diagnostic fields.
606    ///
607    /// Uses pipeline context data to select the correct `What` variant:
608    /// - `BudgetExhausted` when the budget check failed
609    /// - `GroupFuseTriggered` when a group-level fuse triggered
610    /// - `EscalationBifurcated` for critical/optional bifurcation
611    /// - `ChildFailed` / `ChildRunning` as fallback (existing behavior)
612    ///
613    /// Also uses the pipeline context's `correlation_id` instead of a nil UUID.
614    ///
615    /// # Arguments
616    ///
617    /// - `ctx`: Current pipeline context.
618    /// - `exit`: Original task exit.
619    /// - `now_unix_nanos`: Current timestamp.
620    ///
621    /// # Returns
622    ///
623    /// Returns the updated context.
624    fn stage_emit_typed_event(
625        &mut self,
626        ctx: PipelineContext,
627        exit: &TaskExit,
628        now_unix_nanos: u128,
629    ) -> PipelineContext {
630        // Build the What payload based on pipeline evaluation results.
631        let what = self.build_policy_aware_what(&ctx, exit);
632
633        // Create event with all diagnostic fields populated
634        let location = Where::new(ctx.supervisor_path.clone())
635            .with_child(ctx.child_id.clone(), "pipeline-child");
636
637        let event_correlation_id = crate::event::time::CorrelationId::from_uuid(
638            uuid::Uuid::parse_str(&ctx.correlation_id).unwrap_or(uuid::Uuid::nil()),
639        );
640        let mut event = SupervisorEvent::new(
641            crate::event::time::When::new(crate::event::time::EventTime::deterministic(
642                now_unix_nanos,
643                now_unix_nanos,
644                0,
645                crate::id::types::Generation::initial(),
646                crate::id::types::ChildStartCount::first(),
647            )),
648            location,
649            what,
650            crate::event::time::EventSequence::new(ctx.sequence),
651            event_correlation_id,
652            1,
653        );
654
655        // Populate new fields from pipeline processing
656        event.effective_protective_action = ctx.action_decision.as_ref().map(|d| d.action);
657        event.cold_start_reason = ctx.cold_start_reason.clone();
658        event.hot_loop_reason = ctx.hot_loop_reason.clone();
659        event.throttle_gate_owner = ctx.throttle_gate_owner.clone();
660        event.scopes_triggered = ctx.scopes_triggered.clone();
661        event.lead_scope = ctx.lead_scope;
662        if let Some(effective_policy) = ctx.effective_policy.as_ref() {
663            event.task_role = Some(effective_policy.task_role);
664            event.used_fallback_default = effective_policy.used_fallback;
665            event.effective_policy_source = Some(effective_policy.source);
666        }
667
668        // Emit through observability pipeline
669        let _lagged = self.observability.emit(event);
670
671        let mut ctx = ctx;
672        append_stage_diagnostic(&mut ctx, PipelineStage::EmitTypedEvent, now_unix_nanos);
673        ctx
674    }
675
676    /// Selects the correct `What` variant based on the pipeline evaluation results.
677    ///
678    /// Priority order:
679    /// 1. BudgetExhausted — when the token bucket has no tokens
680    /// 2. GroupFuseTriggered — when a group-level fuse fired
681    /// 3. EscalationBifurcated — for critical/optional severity bifurcation
682    /// 4. ChildFailed / ChildRunning — fallback (original behavior)
683    ///
684    /// # Arguments
685    ///
686    /// - `ctx`: Pipeline context with evaluation results.
687    /// - `exit`: Original task exit.
688    ///
689    /// # Returns
690    ///
691    /// Returns the appropriate [`What`] variant.
692    fn build_policy_aware_what(&self, ctx: &PipelineContext, exit: &TaskExit) -> What {
693        // 1. Check for budget exhaustion
694        if let Some(ref budget_eval) = ctx.budget_evaluation
695            && let Some(ref verdict) = budget_eval.budget_verdict
696            && let BudgetVerdict::Exhausted { retry_after_ns } = verdict
697        {
698            // T048: Emit a warning when BudgetExhausted rate exceeds threshold.
699            // The alert is rate-limited via tracing's built-in filtering;
700            // external monitoring should subscribe to `budget_exhausted` events.
701            tracing::warn!(
702                target: "rust_supervisor::policy::budget",
703                child_id = %ctx.child_id,
704                retry_after_ns = %retry_after_ns,
705                "BudgetExhausted rate alert: check restart_budget configuration \
706                 (threshold: >10 events/minute indicates budget may be too tight)"
707            );
708
709            return What::BudgetExhausted {
710                child_id: ctx.child_id.clone(),
711                retry_after_ns: *retry_after_ns,
712                budget_source_group: ctx.group_id.clone(),
713            };
714        }
715
716        // 2. Check for group-level fuse
717        if let Some(ref budget_eval) = ctx.budget_evaluation
718            && matches!(
719                budget_eval.meltdown_outcome,
720                crate::policy::meltdown::MeltdownOutcome::GroupFuse
721            )
722        {
723            return What::GroupFuseTriggered {
724                group_name: ctx
725                    .group_id
726                    .clone()
727                    .unwrap_or_else(|| "unknown".to_string()),
728                propagated_from_group: None,
729            };
730        }
731
732        // 3. Check for severity bifurcation (critical/optional)
733        if let Some(ref policy) = ctx.effective_policy {
734            use crate::policy::task_role_defaults::SeverityClass;
735            match policy.severity {
736                SeverityClass::Critical | SeverityClass::Optional => {
737                    let budget_verdict_str = ctx
738                        .budget_evaluation
739                        .as_ref()
740                        .and_then(|be| be.budget_verdict.as_ref())
741                        .map(|v| match v {
742                            BudgetVerdict::Granted => "granted".to_string(),
743                            BudgetVerdict::Exhausted { retry_after_ns } => {
744                                format!("exhausted:retry_after_ns={retry_after_ns}")
745                            }
746                        });
747                    let fuse_outcome_str = ctx
748                        .budget_evaluation
749                        .as_ref()
750                        .map(|be| format!("{:?}", be.meltdown_outcome));
751                    return What::EscalationBifurcated {
752                        severity: format!("{:?}", policy.severity),
753                        budget_verdict: budget_verdict_str,
754                        fuse_outcome: fuse_outcome_str,
755                        tie_break_reason: None,
756                    };
757                }
758                SeverityClass::Standard => {
759                    // Standard path: fall through to exit-based classification
760                }
761            }
762        }
763
764        // 4. Fallback: exit-based classification (original behavior)
765        match exit {
766            TaskExit::Succeeded => What::ChildRunning { transition: None },
767            TaskExit::Failed { .. } => What::ChildFailed {
768                failure: TaskFailure::new(
769                    crate::error::types::TaskFailureKind::Error,
770                    "pipeline_exit",
771                    "processed through six-stage pipeline",
772                ),
773            },
774        }
775    }
776
777    /// Stage 6: Execute the decided action.
778    ///
779    /// # Arguments
780    ///
781    /// - `ctx`: Current pipeline context.
782    ///
783    /// # Returns
784    ///
785    /// Returns the updated context with execution result.
786    fn stage_execute_action(
787        &self,
788        mut ctx: PipelineContext,
789        completed_at_unix_nanos: u128,
790    ) -> PipelineContext {
791        ctx.execution_result = if let Some(ref decision) = ctx.action_decision {
792            Some(match decision.action {
793                ProtectionAction::RestartAllowed => "restart_allowed_for_runtime".to_string(),
794                ProtectionAction::RestartQueued => "restart_queued".to_string(),
795                ProtectionAction::RestartDenied => "restart_denied".to_string(),
796                ProtectionAction::SupervisionPaused => "supervision_paused".to_string(),
797                ProtectionAction::Escalated => "escalated".to_string(),
798                ProtectionAction::SupervisedStop => "supervised_stop".to_string(),
799            })
800        } else {
801            Some("no_decision".to_string())
802        };
803
804        append_stage_diagnostic(
805            &mut ctx,
806            PipelineStage::ExecuteAction,
807            completed_at_unix_nanos,
808        );
809        ctx
810    }
811}
812
813/// Selects the restart limit for the current pipeline run.
814///
815/// # Arguments
816///
817/// - `ctx`: Pipeline context carrying the effective role policy.
818/// - `plan_limit`: Restart limit selected by the restart execution plan.
819///
820/// # Returns
821///
822/// Returns the explicit plan limit, or the role default limit when the plan does not define one.
823fn effective_restart_limit(
824    ctx: &PipelineContext,
825    plan_limit: Option<RestartLimit>,
826) -> Option<RestartLimit> {
827    plan_limit.or_else(|| {
828        ctx.effective_policy
829            .as_ref()
830            .and_then(|policy| policy.policy_pack.default_restart_limit)
831    })
832}
833
834/// Selects the escalation policy for the current pipeline run.
835///
836/// # Arguments
837///
838/// - `ctx`: Pipeline context carrying the effective role policy.
839/// - `plan_policy`: Escalation policy selected by the restart execution plan.
840///
841/// # Returns
842///
843/// Returns the explicit plan policy, or the role default policy when the plan does not define one.
844fn effective_escalation_policy(
845    ctx: &PipelineContext,
846    plan_policy: Option<EscalationPolicy>,
847) -> Option<EscalationPolicy> {
848    plan_policy.or_else(|| {
849        ctx.effective_policy
850            .as_ref()
851            .and_then(|policy| policy.policy_pack.default_escalation_policy)
852    })
853}
854
855/// Selects either role-specific action or budget-only action.
856///
857/// # Arguments
858///
859/// - `classification`: Exit classification produced by stage 1.
860/// - `effective_policy`: Optional role policy for the child.
861/// - `budget`: Optional budget evaluation produced by stage 3.
862///
863/// # Returns
864///
865/// Returns the protection action and diagnostic reason.
866fn role_or_budget_action(
867    classification: &ExitClassification,
868    effective_policy: Option<&EffectivePolicy>,
869    budget: Option<&BudgetEvaluation>,
870) -> (ProtectionAction, String) {
871    let Some(effective_policy) = effective_policy else {
872        return budget_action(None, budget);
873    };
874    match classification {
875        ExitClassification::Success => match effective_policy.policy_pack.on_success_exit {
876            OnSuccessAction::Restart => (
877                ProtectionAction::RestartAllowed,
878                "role_success_restart".to_string(),
879            ),
880            OnSuccessAction::Stop | OnSuccessAction::NoOp => (
881                ProtectionAction::SupervisedStop,
882                "role_success_stop".to_string(),
883            ),
884        },
885        ExitClassification::Timeout => match effective_policy.policy_pack.on_timeout {
886            OnTimeoutAction::RestartWithBackoff => budget_action(Some(effective_policy), budget),
887            OnTimeoutAction::StopAndEscalate => (
888                ProtectionAction::Escalated,
889                "role_timeout_escalate".to_string(),
890            ),
891        },
892        ExitClassification::NonZeroExit { .. } | ExitClassification::Crash { .. } => {
893            match effective_policy.policy_pack.on_failure_exit {
894                OnFailureAction::RestartWithBackoff | OnFailureAction::RestartPermanent => {
895                    budget_action(Some(effective_policy), budget)
896                }
897                OnFailureAction::StopAndEscalate => (
898                    ProtectionAction::Escalated,
899                    "role_failure_escalate".to_string(),
900                ),
901            }
902        }
903        ExitClassification::ExternalCancel | ExitClassification::ManualStop => (
904            ProtectionAction::SupervisedStop,
905            "external_cancel_or_manual_stop".to_string(),
906        ),
907    }
908}
909
910/// Selects the budget-only protection action.
911///
912/// # Arguments
913///
914/// - `effective_policy`: Optional role policy used for exhausted budget semantics.
915/// - `budget`: Optional budget evaluation produced by stage 3.
916///
917/// # Returns
918///
919/// Returns the protection action and diagnostic reason.
920fn budget_action(
921    effective_policy: Option<&EffectivePolicy>,
922    budget: Option<&BudgetEvaluation>,
923) -> (ProtectionAction, String) {
924    let Some(budget_eval) = budget else {
925        return (
926            ProtectionAction::RestartAllowed,
927            "within_restart_budget".to_string(),
928        );
929    };
930    if !budget_eval.limit_exhausted {
931        return (
932            ProtectionAction::RestartAllowed,
933            "within_restart_budget".to_string(),
934        );
935    }
936    match effective_policy
937        .map(|policy| policy.policy_pack.on_budget_exhausted)
938        .unwrap_or(OnBudgetExhaustedAction::Quarantine)
939    {
940        OnBudgetExhaustedAction::StopAndEscalate => (
941            ProtectionAction::Escalated,
942            "restart_limit_exhausted".to_string(),
943        ),
944        OnBudgetExhaustedAction::Quarantine => (
945            ProtectionAction::RestartDenied,
946            "restart_limit_exhausted".to_string(),
947        ),
948    }
949}
950
951/// Returns a diagnostic reason for a meltdown action override.
952///
953/// # Arguments
954///
955/// - `action`: Protection action selected from a meltdown verdict.
956///
957/// # Returns
958///
959/// Returns a stable reason label.
960fn meltdown_reason(action: ProtectionAction) -> &'static str {
961    match action {
962        ProtectionAction::RestartDenied => "meltdown_child_fuse",
963        ProtectionAction::SupervisionPaused => "meltdown_group_fuse",
964        ProtectionAction::Escalated => "meltdown_supervisor_fuse",
965        ProtectionAction::RestartAllowed => "within_restart_budget",
966        ProtectionAction::RestartQueued => "restart_queued_by_throttle",
967        ProtectionAction::SupervisedStop => "external_cancel_or_manual_stop",
968    }
969}
970
971/// Appends a diagnostic record for one completed pipeline stage.
972///
973/// # Arguments
974///
975/// - `ctx`: Pipeline context that receives the diagnostic record.
976/// - `stage`: Stage that has completed.
977/// - `completed_at_unix_nanos`: Completion timestamp in Unix epoch nanoseconds.
978///
979/// # Returns
980///
981/// This function returns nothing.
982fn append_stage_diagnostic(
983    ctx: &mut PipelineContext,
984    stage: PipelineStage,
985    completed_at_unix_nanos: u128,
986) {
987    let mut diagnostic = PipelineStageDiagnostic::new(
988        ctx.sequence,
989        ctx.correlation_id.clone(),
990        stage,
991        completed_at_unix_nanos,
992    )
993    .with_child_id(ctx.child_id.value.clone())
994    .with_supervisor_path(ctx.supervisor_path.to_string());
995
996    diagnostic.group_id = ctx.group_id.clone();
997    diagnostic.exit_classification = ctx
998        .exit_classification
999        .as_ref()
1000        .map(|classification| classification.as_str().to_string());
1001    diagnostic.failure_window_state = ctx.failure_window_state.clone();
1002    diagnostic.budget_evaluation = ctx.budget_evaluation.as_ref().map(|budget| {
1003        format!(
1004            "remaining_restarts={:?}, limit_exhausted={}, escalation_policy={:?}, meltdown_outcome={:?}",
1005            budget.remaining_restarts,
1006            budget.limit_exhausted,
1007            budget.escalation_policy,
1008            budget.meltdown_outcome
1009        )
1010    });
1011    diagnostic.decided_action = ctx
1012        .action_decision
1013        .as_ref()
1014        .map(|decision| decision.action.to_string());
1015    diagnostic.event_emitted = stage == PipelineStage::EmitTypedEvent;
1016    diagnostic.execution_result = ctx.execution_result.clone();
1017
1018    ctx.stage_diagnostics.push(diagnostic);
1019}
1020
1021/// Returns the current Unix epoch timestamp in seconds.
1022///
1023/// # Arguments
1024///
1025/// This function has no arguments.
1026///
1027/// # Returns
1028///
1029/// Returns zero if system time is before Unix epoch.
1030fn current_unix_secs() -> u64 {
1031    SystemTime::now()
1032        .duration_since(SystemTime::UNIX_EPOCH)
1033        .map_or(0, |duration| duration.as_secs())
1034}
1035
1036/// Converts Unix nanoseconds to seconds.
1037///
1038/// # Arguments
1039///
1040/// - `nanos`: Unix epoch nanoseconds.
1041///
1042/// # Returns
1043///
1044/// Returns the whole-second timestamp capped at `u64::MAX`.
1045fn nanos_to_secs(nanos: u128) -> u64 {
1046    (nanos / 1_000_000_000).min(u128::from(u64::MAX)) as u64
1047}
1048
1049/// Builds a child-scope local meltdown verdict from current tracker state.
1050///
1051/// # Arguments
1052///
1053/// - `tracker`: Meltdown tracker containing current counters.
1054/// - `child_id`: Child scope to evaluate.
1055///
1056/// # Returns
1057///
1058/// Returns the local child verdict.
1059fn child_local_verdict(tracker: &MeltdownTracker, child_id: &ChildId) -> LocalVerdict {
1060    let triggered =
1061        tracker.child_failure_count(child_id) >= tracker.policy.child_max_restarts as usize;
1062    LocalVerdict {
1063        triggered,
1064        outcome: if triggered {
1065            MeltdownOutcome::ChildFuse
1066        } else {
1067            MeltdownOutcome::Continue
1068        },
1069    }
1070}
1071
1072/// Builds a group-scope local meltdown verdict from current tracker state.
1073///
1074/// # Arguments
1075///
1076/// - `tracker`: Meltdown tracker containing current counters.
1077/// - `group_id`: Optional group scope to evaluate.
1078///
1079/// # Returns
1080///
1081/// Returns the local group verdict.
1082fn group_local_verdict(tracker: &MeltdownTracker, group_id: Option<&str>) -> LocalVerdict {
1083    let triggered = group_id.is_some_and(|group| {
1084        tracker.group_failure_count(group) >= tracker.policy.group_max_failures as usize
1085    });
1086    LocalVerdict {
1087        triggered,
1088        outcome: if triggered {
1089            MeltdownOutcome::GroupFuse
1090        } else {
1091            MeltdownOutcome::Continue
1092        },
1093    }
1094}
1095
1096/// Builds a supervisor-scope local meltdown verdict from current tracker state.
1097///
1098/// # Arguments
1099///
1100/// - `tracker`: Meltdown tracker containing current counters.
1101///
1102/// # Returns
1103///
1104/// Returns the local supervisor verdict.
1105fn supervisor_local_verdict(tracker: &MeltdownTracker) -> LocalVerdict {
1106    let triggered = tracker.get_supervisor_outcome() == MeltdownOutcome::SupervisorFuse;
1107    LocalVerdict {
1108        triggered,
1109        outcome: if triggered {
1110            MeltdownOutcome::SupervisorFuse
1111        } else {
1112            MeltdownOutcome::Continue
1113        },
1114    }
1115}
1116
1117/// Maps meltdown outcomes onto the protection action ladder.
1118///
1119/// # Arguments
1120///
1121/// - `outcome`: Effective meltdown outcome.
1122///
1123/// # Returns
1124///
1125/// Returns the corresponding protection action.
1126fn protection_action_for_meltdown(outcome: MeltdownOutcome) -> ProtectionAction {
1127    match outcome {
1128        MeltdownOutcome::Continue => ProtectionAction::RestartAllowed,
1129        MeltdownOutcome::ChildFuse => ProtectionAction::RestartDenied,
1130        MeltdownOutcome::GroupFuse => ProtectionAction::SupervisionPaused,
1131        MeltdownOutcome::SupervisorFuse => ProtectionAction::Escalated,
1132    }
1133}