Skip to main content

rust_supervisor/runtime/
control_loop.rs

1//! Runtime control loop.
2//!
3//! This module executes control-plane commands, receives child child_start_count exits,
4//! and applies supervisor restart strategy decisions.
5
6use crate::child_runner::run_exit::TaskExit;
7use crate::child_runner::runner::{ChildRunHandle, ChildRunReport, ChildRunner, wait_for_report};
8use crate::control::command::{CommandMeta, CommandResult, ControlCommand, CurrentState};
9use crate::control::outcome::{
10    ChildAttemptStatus, ChildControlFailure, ChildControlFailurePhase, ChildControlOperation,
11    ChildControlResult, ChildLivenessState, ChildStopState, GenerationFenceDecision,
12    GenerationFenceOutcome, GenerationFencePhase, PendingRestart, RestartLimitState,
13    StaleAttemptReport, StaleReportHandling,
14};
15use crate::error::types::SupervisorError;
16use crate::event::payload::{ProtectionAction, SupervisorEvent, ThrottleGateOwner, What, Where};
17use crate::event::time::{CorrelationId, EventSequenceSource, EventTime, When};
18use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
19use crate::observe::fairness::FairnessProbe;
20use crate::observe::pipeline::{ObservabilityPipeline, PipelineStageDiagnostic};
21use crate::policy::backoff::BackoffPolicy;
22use crate::policy::decision::{
23    PolicyEngine, RestartDecision, RestartPolicy, TaskExit as PolicyTaskExit,
24};
25use crate::policy::failure_window::FailureWindow;
26use crate::policy::meltdown::MeltdownTracker;
27use crate::policy::task_role_defaults::{EffectivePolicy, OnSuccessAction};
28use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
29use crate::registry::store::RegistryStore;
30use crate::runtime::admission::{AdmissionConflict, AdmissionSet};
31use crate::runtime::child_slot::{
32    ChildExitSummary, ChildSlot, DEFAULT_HEARTBEAT_TIMEOUT_SECS, RuntimeTimeBase,
33};
34use crate::runtime::lifecycle::RuntimeExitReport;
35use crate::runtime::message::{ChildStartMessage, ControlPlaneMessage, RuntimeLoopMessage};
36use crate::runtime::pipeline::{ExitClassification, PipelineContext, SupervisionPipeline};
37use crate::runtime::shutdown::{reconcile_shutdown_slots, shutdown_tree_fanout};
38use crate::runtime::shutdown_pipeline::ShutdownPipeline;
39use crate::shutdown::coordinator::{ShutdownCoordinator, ShutdownResult};
40use crate::shutdown::report::{
41    ChildShutdownOutcome, ChildShutdownOutcomeInput, ChildShutdownStatus, ShutdownPipelineReport,
42    ShutdownReconcileReport,
43};
44use crate::shutdown::stage::{ShutdownCause, ShutdownPhase, ShutdownPolicy};
45use crate::spec::child::{ChildSpec, RestartPolicy as ChildRestartPolicy};
46use crate::spec::child_declaration::{ChildDeclaration, validate_child_declaration};
47use crate::spec::supervisor::{RestartLimit, SupervisorSpec};
48use crate::tree::builder::SupervisorTree;
49use crate::tree::order::{restart_execution_plan, shutdown_order, startup_order};
50use std::collections::HashMap;
51use std::sync::{Arc, Mutex};
52use std::time::{Duration, SystemTime, UNIX_EPOCH};
53use tokio::sync::{broadcast, mpsc};
54use tokio::time::{Instant, timeout};
55
56/// Typed event waiting for emission after mutable state borrows end.
57#[derive(Debug)]
58struct PendingRuntimeEvent {
59    /// Child task identifier related to the event.
60    child_id: ChildId,
61    /// Child task path attached to the event location.
62    path: SupervisorPath,
63    /// Generation number attached to event timing.
64    generation: Option<Generation>,
65    /// Attempt attached to event timing.
66    attempt: Option<ChildStartCount>,
67    /// Correlation identifier attached to the event.
68    correlation_id: CorrelationId,
69    /// Typed event payload.
70    what: What,
71}
72
73/// Mutable state owned by the control loop.
74#[derive(Debug)]
75pub struct RuntimeControlState {
76    /// Shutdown state machine used by tree-level shutdown commands.
77    shutdown: ShutdownCoordinator,
78    /// Runtime-owned shutdown pipeline state and cached report.
79    shutdown_pipeline: ShutdownPipeline,
80    /// Runtime slots for declared children.
81    slots: HashMap<ChildId, ChildSlot>,
82    /// Admission set that enforces at-most-one active attempt per child.
83    #[allow(dead_code)]
84    admission_set: AdmissionSet,
85    /// Runtime time base used for public timestamps.
86    time_base: RuntimeTimeBase,
87    /// Event sequence source for typed observability facts.
88    event_sequences: EventSequenceSource,
89    /// Shared typed observability pipeline.
90    observability: Arc<Mutex<ObservabilityPipeline>>,
91    /// Six-stage supervision pipeline for failure processing.
92    supervision_pipeline: SupervisionPipeline,
93    /// Instance-global concurrent restart throttle gate (FR-003).
94    concurrent_gate: crate::runtime::concurrent_gate::SupervisorInstanceGate,
95    /// Fairness probe that detects scheduling starvation (US1).
96    fairness_probe: FairnessProbe,
97    /// Dynamic child manifests accepted after startup.
98    manifests: Vec<String>,
99    /// Registry that owns declared child runtime records.
100    registry: RegistryStore,
101    /// Built supervisor tree used for order and scope planning.
102    tree: SupervisorTree,
103    /// Supervisor specification that owns strategy and dynamic policies.
104    spec: SupervisorSpec,
105    /// Policy engine used to convert task exits into restart decisions.
106    policy_engine: PolicyEngine,
107    /// Sender used by spawned child start_counts to report runtime messages.
108    command_sender: mpsc::Sender<RuntimeLoopMessage>,
109}
110
111/// Builds initial [`ChildSlot`] records from the registry.
112fn build_initial_slots(registry: &RegistryStore) -> HashMap<ChildId, ChildSlot> {
113    registry
114        .declaration_order()
115        .iter()
116        .filter_map(|child_id| {
117            registry.child(child_id).map(|runtime| {
118                let slot = ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone());
119                (child_id.clone(), slot)
120            })
121        })
122        .collect::<HashMap<_, _>>()
123}
124
125#[allow(dead_code)]
126impl RuntimeControlState {
127    /// Creates control state from a supervisor specification.
128    ///
129    /// # Arguments
130    ///
131    /// - `spec`: Supervisor declaration that owns children and strategy.
132    /// - `shutdown_policy`: Policy used by the shutdown coordinator.
133    /// - `command_sender`: Sender used by child start_counts to report exits.
134    ///
135    /// # Returns
136    ///
137    /// Returns a [`RuntimeControlState`] value.
138    pub fn new(
139        spec: SupervisorSpec,
140        shutdown_policy: ShutdownPolicy,
141        command_sender: mpsc::Sender<RuntimeLoopMessage>,
142        observability: Arc<Mutex<ObservabilityPipeline>>,
143    ) -> Result<Self, SupervisorError> {
144        let tree = SupervisorTree::build(&spec)?;
145        let mut registry = RegistryStore::new();
146        registry.register_tree(&tree)?;
147        let time_base = RuntimeTimeBase::new();
148        let slots = build_initial_slots(&registry);
149
150        let meltdown_tracker = MeltdownTracker::new(spec.meltdown_policy);
151        let failure_window = FailureWindow::new(spec.failure_window_config);
152        let supervision_pipeline = SupervisionPipeline::with_backpressure_config(
153            spec.pipeline_journal_capacity,
154            spec.pipeline_subscriber_capacity,
155            meltdown_tracker,
156            failure_window,
157            spec.restart_budget_config.clone(),
158            spec.group_dependencies.clone(),
159            spec.backpressure_config.clone(),
160        );
161
162        let concurrent_gate = crate::runtime::concurrent_gate::SupervisorInstanceGate::new(
163            spec.concurrent_restart_limit,
164        );
165
166        // Initialize fairness probe with current timestamp
167        let now_unix_nanos = SystemTime::now()
168            .duration_since(UNIX_EPOCH)
169            .unwrap_or_default()
170            .as_nanos();
171        let fairness_probe = FairnessProbe::new(now_unix_nanos);
172
173        Ok(Self {
174            shutdown: ShutdownCoordinator::new(shutdown_policy),
175            shutdown_pipeline: ShutdownPipeline::new(),
176            slots,
177            admission_set: AdmissionSet::new(),
178            time_base,
179            event_sequences: EventSequenceSource::new(),
180            observability,
181            supervision_pipeline,
182            concurrent_gate,
183            fairness_probe,
184            manifests: Vec::new(),
185            registry,
186            tree,
187            spec,
188            policy_engine: PolicyEngine::new(),
189            command_sender,
190        })
191    }
192
193    /// Starts every declared child in supervisor startup order.
194    ///
195    /// # Arguments
196    ///
197    /// This function has no arguments.
198    ///
199    /// # Returns
200    ///
201    /// This function does not return a value.
202    pub fn start_declared_children(&mut self) {
203        let child_ids = startup_order(&self.tree)
204            .into_iter()
205            .map(|node| node.child.id.clone())
206            .collect::<Vec<_>>();
207        for child_id in child_ids {
208            self.spawn_child_start(child_id, false, Duration::ZERO);
209        }
210    }
211
212    /// Records an active attempt after `spawn_once` so exit routing matches registry identities.
213    ///
214    /// Immediate spawns run this inline; delayed backoff spawns deliver the same handle through
215    /// [`ChildStartMessage::DelayedSpawnAttached`] so `activate_instance` stays on the control loop.
216    ///
217    /// # Arguments
218    ///
219    /// - `child_id`: Stable child owning the spawned attempt.
220    /// - `path`: Supervisor path used when inserting placeholder runtime records.
221    /// - `generation`: Generation pinned from the registry [`ChildRuntime`] passed to `spawn_once`.
222    /// - `attempt`: Attempt counter pinned from the same registry record.
223    /// - `handle`: Runner handle carrying cancellation and completion endpoints.
224    ///
225    /// # Returns
226    ///
227    /// This function does not return a value.
228    fn attach_spawned_child_handle(
229        &mut self,
230        child_id: ChildId,
231        path: SupervisorPath,
232        generation: Generation,
233        attempt: ChildStartCount,
234        handle: ChildRunHandle,
235    ) {
236        let mut completion_receiver = handle.completion_receiver.clone();
237        let sender = self.command_sender.clone();
238        self.slots
239            .entry(child_id.clone())
240            .or_insert_with(|| ChildSlot::new_placeholder(child_id.clone(), path))
241            .activate(generation, attempt, ChildAttemptStatus::Running, handle);
242        tokio::spawn(async move {
243            let result = wait_for_report(&mut completion_receiver).await;
244            send_child_result(sender, child_id, result).await;
245        });
246    }
247
248    /// Executes one control command.
249    ///
250    /// # Arguments
251    ///
252    /// - `command`: Command received by the runtime.
253    /// - `event_sender`: Event channel used for lifecycle text.
254    ///
255    /// # Returns
256    ///
257    /// Returns a command result.
258    pub async fn execute_control(
259        &mut self,
260        command: ControlCommand,
261        event_sender: &broadcast::Sender<String>,
262    ) -> Result<CommandResult, SupervisorError> {
263        command.validate_audit_metadata()?;
264        self.reconcile_stop_deadlines();
265        match command {
266            ControlCommand::AddChild { child_manifest, .. } => {
267                self.ensure_dynamic_child_allowed()?;
268
269                // Reject add_child when shutdown is in progress.
270                if self.shutdown.phase() != ShutdownPhase::Idle {
271                    return Err(SupervisorError::fatal_config(
272                        "Cannot add child: supervisor is shutting down",
273                    ));
274                }
275
276                // Parse manifest as ChildDeclaration.
277                let declaration: ChildDeclaration =
278                    serde_yaml::from_str(&child_manifest).map_err(|e| {
279                        SupervisorError::fatal_config(format!(
280                            "Failed to parse child manifest: {e}"
281                        ))
282                    })?;
283
284                // Validate declaration against existing children.
285                let all_names: std::collections::HashSet<String> =
286                    self.spec.children.iter().map(|c| c.name.clone()).collect();
287                let mut new_names = all_names.clone();
288                new_names.insert(declaration.name.clone());
289
290                validate_child_declaration(&declaration, &all_names).map_err(|e| {
291                    SupervisorError::fatal_config(format!(
292                        "Child validation failed at {}: {}",
293                        e.field_path, e.reason
294                    ))
295                })?;
296
297                // Staged via begin_transaction — for now register directly
298                // since we operate inside the control loop's mutable state.
299                let child_spec =
300                    crate::spec::child::ChildSpec::try_from(declaration).map_err(|e| {
301                        SupervisorError::fatal_config(format!("Child conversion failed: {e:?}"))
302                    })?;
303
304                self.manifests.push(child_manifest.clone());
305                self.spec.children.push(child_spec);
306                Ok(CommandResult::ChildAdded { child_manifest })
307            }
308            ControlCommand::RemoveChild { meta, child_id } => Ok(self.execute_stop_child_control(
309                child_id,
310                ChildControlOperation::Removed,
311                "remove_child",
312                &meta,
313                event_sender,
314            )),
315            ControlCommand::RestartChild { meta, child_id } => {
316                Ok(self.execute_restart_child_control(child_id, &meta, event_sender))
317            }
318            ControlCommand::PauseChild { meta, child_id } => Ok(self.execute_stop_child_control(
319                child_id,
320                ChildControlOperation::Paused,
321                "pause_child",
322                &meta,
323                event_sender,
324            )),
325            ControlCommand::ResumeChild { child_id, .. } => {
326                Ok(self.set_child_state(child_id, ChildControlOperation::Active))
327            }
328            ControlCommand::QuarantineChild { meta, child_id } => Ok(self
329                .execute_stop_child_control(
330                    child_id,
331                    ChildControlOperation::Quarantined,
332                    "quarantine_child",
333                    &meta,
334                    event_sender,
335                )),
336            ControlCommand::ShutdownTree { meta } => {
337                let result = self
338                    .execute_shutdown(meta.requested_by, meta.reason, event_sender)
339                    .await?;
340                Ok(CommandResult::Shutdown { result })
341            }
342            ControlCommand::CurrentState { .. } => {
343                self.reconcile_stop_deadlines();
344                Ok(CommandResult::CurrentState {
345                    state: self.build_current_state(),
346                })
347            }
348        }
349    }
350
351    /// Applies policy to a completed child child_start_count.
352    ///
353    /// # Arguments
354    ///
355    /// - `report`: Completed child child_start_count report.
356    /// - `event_sender`: Event channel used for lifecycle text.
357    ///
358    /// # Returns
359    ///
360    /// This function does not return a value.
361    pub fn handle_child_exit(
362        &mut self,
363        report: ChildRunReport,
364        event_sender: &broadcast::Sender<String>,
365    ) {
366        // FR-003: Release concurrent gate slot when child exits (only if gate has active slots)
367        if self.concurrent_gate.get_active_count() > 0 {
368            self.concurrent_gate.release();
369        }
370
371        let child_id = report.runtime.id.clone();
372        let generation = report.runtime.generation;
373        let attempt = report.runtime.child_start_count;
374        let exit_kind = report.exit.clone();
375        let mut pending_events = Vec::new();
376        let was_active = self
377            .slots
378            .get(&child_id)
379            .is_some_and(ChildSlot::has_active_attempt);
380        let matches_pending_fence = self
381            .slots
382            .get(&child_id)
383            .and_then(|state| state.generation_fence.pending_restart.as_ref())
384            .is_some_and(|pending_restart| {
385                pending_restart.old_generation == generation
386                    && pending_restart.old_attempt == attempt
387            });
388        let matches_active_attempt = self.slots.get(&child_id).is_some_and(|state| {
389            state.has_active_attempt()
390                && state.generation == Some(generation)
391                && state.attempt == Some(attempt)
392        });
393        let manual_stop_requested = self
394            .slots
395            .get(&child_id)
396            .is_some_and(|state| state.stop_state == ChildStopState::CancelDelivered);
397        let mut stale_idle_report = false;
398        let count_restart_failure = self.slots.get(&child_id).is_some_and(|state| {
399            state.operation == ChildControlOperation::Active
400                && restart_limit_counts_exit(&exit_kind)
401        });
402        let late_report = !was_active && self.shutdown.phase() == ShutdownPhase::Completed;
403        let mut fence_pending_release = None::<PendingRestart>;
404
405        if let Some(runtime_state) = self.slots.get_mut(&child_id) {
406            if matches_pending_fence {
407                if runtime_state.stop_state == ChildStopState::CancelDelivered {
408                    runtime_state.stop_state = ChildStopState::Completed;
409                    pending_events.push(PendingRuntimeEvent {
410                        child_id: child_id.clone(),
411                        path: runtime_state.path.clone(),
412                        generation: Some(generation),
413                        attempt: Some(attempt),
414                        correlation_id: CorrelationId::from_uuid(
415                            runtime_state
416                                .generation_fence
417                                .pending_restart
418                                .as_ref()
419                                .expect("matches pending implies Some")
420                                .command_id,
421                        ),
422                        what: What::ChildControlStopCompleted {
423                            child_id: child_id.clone(),
424                            generation,
425                            attempt,
426                            exit_kind: exit_kind.clone(),
427                        },
428                    });
429                }
430                fence_pending_release = runtime_state.generation_fence.pending_restart.take();
431                if let Some(pending_release) = fence_pending_release.as_ref() {
432                    let drained_correlation_id =
433                        CorrelationId::from_uuid(pending_release.command_id);
434                    pending_events.push(PendingRuntimeEvent {
435                        child_id: child_id.clone(),
436                        path: runtime_state.path.clone(),
437                        generation: Some(generation),
438                        attempt: Some(attempt),
439                        correlation_id: drained_correlation_id,
440                        what: What::ChildRestartFencePendingDrained {
441                            child_id: child_id.clone(),
442                        },
443                    });
444                }
445                runtime_state.generation_fence.phase = GenerationFencePhase::ReadyToStart;
446                runtime_state.status = ChildAttemptStatus::Stopped;
447                runtime_state.clear_instance();
448            } else if matches_active_attempt
449                || late_report
450                || self.shutdown.phase() != ShutdownPhase::Idle
451            {
452                if runtime_state.stop_state == ChildStopState::CancelDelivered {
453                    runtime_state.stop_state = ChildStopState::Completed;
454                    pending_events.push(PendingRuntimeEvent {
455                        child_id: child_id.clone(),
456                        path: runtime_state.path.clone(),
457                        generation: Some(generation),
458                        attempt: Some(attempt),
459                        correlation_id: CorrelationId::new(),
460                        what: What::ChildControlStopCompleted {
461                            child_id: child_id.clone(),
462                            generation,
463                            attempt,
464                            exit_kind: exit_kind.clone(),
465                        },
466                    });
467                }
468                runtime_state.status = ChildAttemptStatus::Stopped;
469                runtime_state.clear_instance();
470            } else {
471                stale_idle_report = true;
472                let observed_at_unix_nanos = self.time_base.now_unix_nanos();
473                let current_generation = runtime_state.generation;
474                let current_attempt = runtime_state.attempt;
475                let stale_fact = StaleAttemptReport::new(
476                    child_id.clone(),
477                    generation,
478                    attempt,
479                    current_generation,
480                    current_attempt,
481                    exit_kind.clone(),
482                    StaleReportHandling::RecordedForAudit,
483                    observed_at_unix_nanos,
484                );
485                runtime_state.generation_fence.last_stale_report = Some(stale_fact);
486                pending_events.push(PendingRuntimeEvent {
487                    child_id: child_id.clone(),
488                    path: runtime_state.path.clone(),
489                    generation: Some(generation),
490                    attempt: Some(attempt),
491                    correlation_id: CorrelationId::new(),
492                    what: What::ChildAttemptStaleReport {
493                        child_id: child_id.clone(),
494                        reported_generation: generation,
495                        reported_attempt: attempt,
496                        current_generation,
497                        current_attempt,
498                        exit_kind: exit_kind.clone(),
499                        handled_as: StaleReportHandling::RecordedForAudit,
500                    },
501                });
502            }
503        }
504
505        if stale_idle_report {
506            let _ignored = event_sender.send(format!("child_exit:{child_id}"));
507            for event in pending_events {
508                self.emit_pending_event(event);
509            }
510            self.reconcile_stop_deadlines();
511            return;
512        }
513
514        self.record_child_exit(report);
515        let restart_limit_refreshed =
516            self.refresh_restart_limit_for_child(&child_id, count_restart_failure);
517        if let Some((path, restart_limit)) = restart_limit_refreshed.clone() {
518            pending_events.push(PendingRuntimeEvent {
519                child_id: child_id.clone(),
520                path,
521                generation: Some(generation),
522                attempt: Some(attempt),
523                correlation_id: CorrelationId::new(),
524                what: What::ChildRuntimeRestartLimitUpdated {
525                    child_id: child_id.clone(),
526                    restart_limit,
527                },
528            });
529        }
530        let _ignored = event_sender.send(format!("child_exit:{child_id}"));
531        if late_report {
532            let _ignored = event_sender.send(format!("child_shutdown_late_report:{child_id}"));
533        }
534
535        // Execute six-stage supervision pipeline for failure processing.
536        let sequence = self.event_sequences.next().value;
537        // T037: Generate a real CorrelationId to link budget→meltdown→escalation events.
538        let correlation_id_str = format!("{}", uuid::Uuid::new_v4());
539        let supervisor_path = self
540            .slots
541            .get(&child_id)
542            .map(|state| state.path.clone())
543            .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
544
545        let mut pipeline_ctx = PipelineContext::new(
546            child_id.clone(),
547            supervisor_path,
548            sequence,
549            correlation_id_str,
550        );
551        pipeline_ctx.exit_classification = Some(classify_exit_for_pipeline(
552            &exit_kind,
553            manual_stop_requested,
554        ));
555        pipeline_ctx.effective_policy = self
556            .registry
557            .child(&child_id)
558            .map(|runtime| prepare_effective_policy(&runtime.spec));
559
560        // Convert TaskExit to PolicyTaskExit for pipeline.
561        let policy_exit = policy_task_exit(&exit_kind);
562
563        // Execute the complete six-stage pipeline.
564        let pipeline_result = self.supervision_pipeline.execute_pipeline(
565            pipeline_ctx,
566            policy_exit,
567            &self.spec,
568            &self.tree,
569        );
570        self.record_pipeline_stage_diagnostics(&pipeline_result.stage_diagnostics);
571
572        // T019: Record scheduling opportunity for fairness probe after each child exit.
573        self.fairness_probe.record_opportunity(&child_id);
574
575        // T019: Periodically check fairness probe for scheduling starvation.
576        self.check_fairness_probe(event_sender);
577
578        // T029: Reflect group_fuse_active state when meltdown triggers group-level fuse.
579        if let Some(ref budget_eval) = pipeline_result.budget_evaluation
580            && matches!(
581                budget_eval.meltdown_outcome,
582                crate::policy::meltdown::MeltdownOutcome::GroupFuse
583            )
584            && let Some(ref group_id) = pipeline_result.group_id
585        {
586            let _ignored = event_sender.send(format!("group_fuse_active:{group_id}:{}", child_id));
587            // Mark all children in the affected group as non-restartable.
588            for (_cid, slot) in self.slots.iter_mut() {
589                if slot.path.to_string().contains(group_id) {
590                    slot.last_control_failure = Some(ChildControlFailure::new(
591                        ChildControlFailurePhase::WaitCompletion,
592                        format!("group_fuse_active:{group_id}"),
593                        false,
594                    ));
595                }
596            }
597        }
598
599        if let Some(pending) = fence_pending_release {
600            for event in pending_events {
601                self.emit_pending_event(event);
602            }
603            self.spawn_pending_restart_target(child_id.clone(), pending, exit_kind.clone());
604            self.reconcile_stop_deadlines();
605            return;
606        }
607
608        if !self.should_apply_automatic_policy(&child_id) {
609            if self
610                .slots
611                .get(&child_id)
612                .is_some_and(|state| state.operation == ChildControlOperation::Removed)
613                && let Some(removed) = self.slots.remove(&child_id)
614            {
615                pending_events.push(PendingRuntimeEvent {
616                    child_id: child_id.clone(),
617                    path: removed.path.clone(),
618                    generation: Some(generation),
619                    attempt: Some(attempt),
620                    correlation_id: CorrelationId::new(),
621                    what: What::ChildRuntimeStateRemoved {
622                        child_id: child_id.clone(),
623                        path: removed.path,
624                        final_status: Some(ChildAttemptStatus::Stopped),
625                    },
626                });
627            }
628            for event in pending_events {
629                self.emit_pending_event(event);
630            }
631            self.reconcile_stop_deadlines();
632            return;
633        }
634
635        // Extract action decision from pipeline result.
636        let action_decision = pipeline_result.action_decision.as_ref();
637
638        // Map pipeline protection action to restart decision.
639        let pipeline_driven_decision = if let Some(decision) = action_decision {
640            match decision.action {
641                ProtectionAction::RestartAllowed => {
642                    if role_policy_restarts_success(&pipeline_result) {
643                        Some(RestartDecision::RestartAfter {
644                            delay: Duration::ZERO,
645                        })
646                    } else {
647                        self.restart_decision(&child_id)
648                    }
649                }
650                ProtectionAction::RestartQueued => {
651                    // Queue the restart - for now treat as no immediate restart.
652                    None
653                }
654                ProtectionAction::RestartDenied
655                | ProtectionAction::SupervisionPaused
656                | ProtectionAction::Escalated
657                | ProtectionAction::SupervisedStop => {
658                    // Do not restart - respect pipeline decision.
659                    None
660                }
661            }
662        } else {
663            // Fallback to existing policy engine if pipeline didn't produce a decision.
664            self.restart_decision(&child_id)
665        };
666
667        let Some(decision) = pipeline_driven_decision else {
668            for event in pending_events {
669                self.emit_pending_event(event);
670            }
671            self.reconcile_stop_deadlines();
672            return;
673        };
674
675        if restart_limit_refreshed
676            .as_ref()
677            .is_some_and(|(_path, restart_limit)| {
678                restart_limit.used > restart_limit.limit
679                    && matches!(decision, RestartDecision::RestartAfter { .. })
680            })
681        {
682            let _ignored = event_sender.send(format!("child_restart_limit_exhausted:{child_id}"));
683            for event in pending_events {
684                self.emit_pending_event(event);
685            }
686            self.reconcile_stop_deadlines();
687            return;
688        }
689        self.execute_restart_decision(child_id, decision, event_sender);
690        for event in pending_events {
691            self.emit_pending_event(event);
692        }
693        self.reconcile_stop_deadlines();
694    }
695
696    /// Records a failed child start.
697    ///
698    /// # Arguments
699    ///
700    /// - `child_id`: Child identifier whose child_start_count failed.
701    /// - `message`: Diagnostic error message.
702    /// - `event_sender`: Event channel used for lifecycle text.
703    ///
704    /// # Returns
705    ///
706    /// This function does not return a value.
707    pub fn handle_child_start_failed(
708        &mut self,
709        child_id: ChildId,
710        message: String,
711        event_sender: &broadcast::Sender<String>,
712    ) {
713        let _ignored = event_sender.send(format!("child_start_failed:{child_id}:{message}"));
714
715        let mut fenced_spawn_recovery = Option::<(Generation, ChildStartCount, u64)>::None;
716        let mut repaired_fenced_spawn = false;
717
718        if let Some(runtime_state) = self.slots.get_mut(&child_id)
719            && runtime_state.generation_fence.phase == GenerationFencePhase::ReadyToStart
720        {
721            fenced_spawn_recovery = runtime_state
722                .registry_identity_anchor_for_spawn_attempt
723                .take();
724            repaired_fenced_spawn = true;
725            runtime_state.generation_fence.phase = GenerationFencePhase::Open;
726            runtime_state.last_control_failure = Some(ChildControlFailure::new(
727                ChildControlFailurePhase::WaitCompletion,
728                message,
729                true,
730            ));
731        }
732
733        if repaired_fenced_spawn {
734            if let Some((generation, attempt, restart_count)) = fenced_spawn_recovery
735                && let Some(registry_runtime) = self.registry.child_mut(&child_id)
736            {
737                registry_runtime.generation = generation;
738                registry_runtime.child_start_count = attempt;
739                registry_runtime.restart_count = restart_count;
740            }
741            return;
742        }
743
744        let _result = self.set_child_state(child_id, ChildControlOperation::Quarantined);
745    }
746
747    /// Executes the real shutdown pipeline.
748    ///
749    /// # Arguments
750    ///
751    /// - `requested_by`: Actor that requested shutdown.
752    /// - `reason`: Human-readable shutdown reason.
753    /// - `event_sender`: Event channel used for lifecycle text.
754    ///
755    /// # Returns
756    ///
757    /// Returns a [`ShutdownResult`] with a completed report attached.
758    async fn execute_shutdown(
759        &mut self,
760        requested_by: String,
761        reason: String,
762        event_sender: &broadcast::Sender<String>,
763    ) -> Result<ShutdownResult, SupervisorError> {
764        if let Some(report) = self.shutdown_pipeline.cached_report() {
765            return Ok(self
766                .shutdown
767                .result_with_report(report.as_idempotent(), true));
768        }
769
770        let cause = ShutdownCause::new(requested_by, reason);
771        let requested = self.shutdown.request_stop(cause);
772        let started_at_unix_nanos = unix_epoch_nanos();
773        let wait_order = self.shutdown_wait_order();
774        let mut outcomes = HashMap::<ChildId, ChildShutdownOutcome>::new();
775        let _ignored = event_sender.send(format!(
776            "shutdown_phase_changed:{:?}:{:?}",
777            ShutdownPhase::Idle,
778            self.shutdown.phase()
779        ));
780        self.deliver_shutdown_cancellations(&wait_order, event_sender);
781
782        self.advance_shutdown_phase(event_sender);
783        self.drain_graceful_children(&wait_order, &mut outcomes, event_sender)
784            .await;
785
786        self.advance_shutdown_phase(event_sender);
787        self.abort_remaining_children(&wait_order, &mut outcomes, event_sender)
788            .await;
789
790        self.advance_shutdown_phase(event_sender);
791        self.reconcile_shutdown_outcomes(&wait_order, &mut outcomes);
792        let reconcile = ShutdownReconcileReport::core_runtime_completed();
793
794        let from = self.shutdown.phase();
795        self.shutdown.complete();
796        let _ignored = event_sender.send(format!(
797            "shutdown_phase_changed:{from:?}:{:?}",
798            self.shutdown.phase()
799        ));
800        let ordered_outcomes = wait_order
801            .iter()
802            .filter_map(|child_id| outcomes.remove(child_id))
803            .collect::<Vec<_>>();
804        let report = ShutdownPipelineReport {
805            cause: requested.cause,
806            started_at_unix_nanos,
807            completed_at_unix_nanos: unix_epoch_nanos(),
808            phase: self.shutdown.phase(),
809            outcomes: ordered_outcomes,
810            reconcile,
811            idempotent: false,
812        };
813        let _ignored = event_sender.send(format!("shutdown_completed:{}", report.outcomes.len()));
814        self.shutdown_pipeline.cache_report(report.clone());
815        Ok(self.shutdown.result_with_report(report, false))
816    }
817
818    /// Advances the shutdown phase and emits a phase event.
819    ///
820    /// # Arguments
821    ///
822    /// - `event_sender`: Event channel used for lifecycle text.
823    ///
824    /// # Returns
825    ///
826    /// Returns the phase after advancing.
827    fn advance_shutdown_phase(
828        &mut self,
829        event_sender: &broadcast::Sender<String>,
830    ) -> ShutdownPhase {
831        let from = self.shutdown.phase();
832        let to = self.shutdown.advance();
833        let _ignored = event_sender.send(format!("shutdown_phase_changed:{from:?}:{to:?}"));
834        to
835    }
836
837    /// Returns declared children in shutdown wait order.
838    ///
839    /// # Arguments
840    ///
841    /// This function has no arguments.
842    ///
843    /// # Returns
844    ///
845    /// Returns child identifiers in shutdown order.
846    fn shutdown_wait_order(&self) -> Vec<ChildId> {
847        shutdown_order(&self.tree)
848            .into_iter()
849            .map(|node| node.child.id.clone())
850            .collect()
851    }
852
853    /// Delivers cancellation to every active child child_start_count.
854    ///
855    /// # Arguments
856    ///
857    /// - `wait_order`: Stable shutdown order for declared children.
858    /// - `event_sender`: Event channel used for lifecycle text.
859    ///
860    /// # Returns
861    ///
862    /// This function does not return a value.
863    fn deliver_shutdown_cancellations(
864        &mut self,
865        wait_order: &[ChildId],
866        event_sender: &broadcast::Sender<String>,
867    ) {
868        for child_id in wait_order {
869            let Some(runtime_state) = self.slots.get_mut(child_id) else {
870                continue;
871            };
872            if runtime_state.operation == ChildControlOperation::Removed {
873                continue;
874            }
875            if !runtime_state.has_active_attempt() {
876                continue;
877            };
878            runtime_state.cancel();
879            let _ignored = event_sender.send(format!(
880                "child_shutdown_cancel_delivered:{}:{}:{}",
881                runtime_state.child_id,
882                runtime_state
883                    .generation
884                    .map_or(0, |generation| generation.value),
885                runtime_state.attempt.map_or(0, |attempt| attempt.value)
886            ));
887        }
888    }
889
890    /// Drains cooperative child start_counts within the graceful timeout budget.
891    ///
892    /// # Arguments
893    ///
894    /// - `wait_order`: Stable shutdown order for declared children.
895    /// - `outcomes`: Output map for completed child outcomes.
896    /// - `event_sender`: Event channel used for lifecycle text.
897    ///
898    /// # Returns
899    ///
900    /// This function does not return a value.
901    async fn drain_graceful_children(
902        &mut self,
903        wait_order: &[ChildId],
904        outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
905        event_sender: &broadcast::Sender<String>,
906    ) {
907        let deadline = Instant::now() + self.shutdown.policy.graceful_timeout;
908        for child_id in wait_order {
909            if outcomes.contains_key(child_id) {
910                continue;
911            }
912            let Some(mut runtime_state) = self.slots.remove(child_id) else {
913                continue;
914            };
915            if runtime_state.operation == ChildControlOperation::Removed {
916                outcomes.insert(
917                    child_id.clone(),
918                    removed_runtime_state_shutdown_outcome(
919                        &runtime_state,
920                        ShutdownPhase::GracefulDrain,
921                    ),
922                );
923                self.slots.insert(child_id.clone(), runtime_state);
924                continue;
925            }
926            if !runtime_state.has_active_attempt() {
927                self.slots.insert(child_id.clone(), runtime_state);
928                continue;
929            };
930            let completed = match remaining_duration(deadline) {
931                Some(remaining) => timeout(remaining, runtime_state.wait_for_report())
932                    .await
933                    .ok(),
934                None => None,
935            };
936            match completed {
937                Some(Ok(report)) => {
938                    let outcome = outcome_from_report(
939                        &runtime_state,
940                        &report,
941                        ChildShutdownStatus::Graceful,
942                        ShutdownPhase::GracefulDrain,
943                        "child completed during graceful drain",
944                    );
945                    self.record_child_exit(report);
946                    runtime_state.clear_instance();
947                    self.slots.insert(child_id.clone(), runtime_state);
948                    let _ignored = event_sender.send(format!("child_shutdown_graceful:{child_id}"));
949                    outcomes.insert(child_id.clone(), outcome);
950                }
951                Some(Err(error)) => {
952                    outcomes.insert(
953                        child_id.clone(),
954                        outcome_from_error(
955                            &runtime_state,
956                            ChildShutdownStatus::Graceful,
957                            ShutdownPhase::GracefulDrain,
958                            error,
959                        ),
960                    );
961                    self.slots.insert(child_id.clone(), runtime_state);
962                }
963                None => {
964                    self.slots.insert(child_id.clone(), runtime_state);
965                }
966            }
967        }
968    }
969
970    /// Aborts children that did not complete during graceful drain.
971    ///
972    /// # Arguments
973    ///
974    /// - `wait_order`: Stable shutdown order for declared children.
975    /// - `outcomes`: Output map for completed child outcomes.
976    /// - `event_sender`: Event channel used for lifecycle text.
977    ///
978    /// # Returns
979    ///
980    /// This function does not return a value.
981    async fn abort_remaining_children(
982        &mut self,
983        wait_order: &[ChildId],
984        outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
985        event_sender: &broadcast::Sender<String>,
986    ) {
987        let policy = self.shutdown.policy;
988        for child_id in wait_order {
989            if outcomes.contains_key(child_id) {
990                continue;
991            }
992            let Some(mut runtime_state) = self.slots.remove(child_id) else {
993                continue;
994            };
995            if runtime_state.operation == ChildControlOperation::Removed {
996                outcomes.insert(
997                    child_id.clone(),
998                    removed_runtime_state_shutdown_outcome(
999                        &runtime_state,
1000                        ShutdownPhase::AbortStragglers,
1001                    ),
1002                );
1003                self.slots.insert(child_id.clone(), runtime_state);
1004                continue;
1005            }
1006            if !runtime_state.has_active_attempt() {
1007                self.slots.insert(child_id.clone(), runtime_state);
1008                continue;
1009            };
1010            if !policy.abort_after_timeout {
1011                self.wait_for_late_report(
1012                    child_id,
1013                    runtime_state,
1014                    policy.abort_wait,
1015                    outcomes,
1016                    event_sender,
1017                )
1018                .await;
1019                continue;
1020            }
1021            runtime_state.abort();
1022            let _ignored = event_sender.send(format!(
1023                "child_shutdown_abort_requested:{}",
1024                runtime_state.child_id
1025            ));
1026            match timeout(policy.abort_wait, runtime_state.wait_for_report()).await {
1027                Ok(Ok(report)) => {
1028                    let outcome = outcome_from_report(
1029                        &runtime_state,
1030                        &report,
1031                        ChildShutdownStatus::Aborted,
1032                        ShutdownPhase::AbortStragglers,
1033                        "child completed after abort request",
1034                    );
1035                    self.record_child_exit(report);
1036                    runtime_state.clear_instance();
1037                    self.slots.insert(child_id.clone(), runtime_state);
1038                    let _ignored = event_sender.send(format!("child_shutdown_aborted:{child_id}"));
1039                    outcomes.insert(child_id.clone(), outcome);
1040                }
1041                Ok(Err(error)) => {
1042                    outcomes.insert(
1043                        child_id.clone(),
1044                        outcome_from_error(
1045                            &runtime_state,
1046                            ChildShutdownStatus::AbortFailed,
1047                            ShutdownPhase::AbortStragglers,
1048                            error,
1049                        ),
1050                    );
1051                    self.slots.insert(child_id.clone(), runtime_state);
1052                }
1053                Err(_elapsed) => {
1054                    outcomes.insert(
1055                        child_id.clone(),
1056                        ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1057                            child_id: runtime_state.child_id.clone(),
1058                            path: runtime_state.path.clone(),
1059                            generation: runtime_state
1060                                .generation
1061                                .unwrap_or_else(Generation::initial),
1062                            child_start_count: runtime_state
1063                                .attempt
1064                                .unwrap_or_else(ChildStartCount::first),
1065                            status: ChildShutdownStatus::AbortFailed,
1066                            cancel_delivered: runtime_state.attempt_cancel_delivered,
1067                            exit: None,
1068                            phase: ShutdownPhase::AbortStragglers,
1069                            reason: "child did not complete after abort request".to_owned(),
1070                        }),
1071                    );
1072                    self.slots.insert(child_id.clone(), runtime_state);
1073                }
1074            }
1075        }
1076    }
1077
1078    /// Waits for a late report when abort is disabled by policy.
1079    ///
1080    /// # Arguments
1081    ///
1082    /// - `child_id`: Child whose child_start_count is being reconciled.
1083    /// - `runtime_state`: Runtime state removed from runtime tracking.
1084    /// - `wait`: Late report wait budget.
1085    /// - `outcomes`: Output map for completed child outcomes.
1086    /// - `event_sender`: Event channel used for lifecycle text.
1087    ///
1088    /// # Returns
1089    ///
1090    /// This function does not return a value.
1091    async fn wait_for_late_report(
1092        &mut self,
1093        child_id: &ChildId,
1094        mut runtime_state: ChildSlot,
1095        wait: Duration,
1096        outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
1097        event_sender: &broadcast::Sender<String>,
1098    ) {
1099        match timeout(wait, runtime_state.wait_for_report()).await {
1100            Ok(Ok(report)) => {
1101                let outcome = outcome_from_report(
1102                    &runtime_state,
1103                    &report,
1104                    ChildShutdownStatus::LateReport,
1105                    ShutdownPhase::AbortStragglers,
1106                    "child reported after graceful timeout",
1107                );
1108                self.record_child_exit(report);
1109                runtime_state.clear_instance();
1110                self.slots.insert(child_id.clone(), runtime_state);
1111                let _ignored = event_sender.send(format!("child_shutdown_late_report:{child_id}"));
1112                outcomes.insert(child_id.clone(), outcome);
1113            }
1114            Ok(Err(error)) => {
1115                outcomes.insert(
1116                    child_id.clone(),
1117                    outcome_from_error(
1118                        &runtime_state,
1119                        ChildShutdownStatus::LateReport,
1120                        ShutdownPhase::AbortStragglers,
1121                        error,
1122                    ),
1123                );
1124                self.slots.insert(child_id.clone(), runtime_state);
1125            }
1126            Err(_elapsed) => {
1127                outcomes.insert(
1128                    child_id.clone(),
1129                    ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1130                        child_id: runtime_state.child_id.clone(),
1131                        path: runtime_state.path.clone(),
1132                        generation: runtime_state.generation.unwrap_or_else(Generation::initial),
1133                        child_start_count: runtime_state
1134                            .attempt
1135                            .unwrap_or_else(ChildStartCount::first),
1136                        status: ChildShutdownStatus::AbortFailed,
1137                        cancel_delivered: runtime_state.attempt_cancel_delivered,
1138                        exit: None,
1139                        phase: ShutdownPhase::AbortStragglers,
1140                        reason: "abort disabled and child did not report before reconcile"
1141                            .to_owned(),
1142                    }),
1143                );
1144                self.slots.insert(child_id.clone(), runtime_state);
1145            }
1146        }
1147    }
1148
1149    /// Adds already-exited outcomes for declared children with no active task.
1150    ///
1151    /// # Arguments
1152    ///
1153    /// - `wait_order`: Stable shutdown order for declared children.
1154    /// - `outcomes`: Output map for completed child outcomes.
1155    ///
1156    /// # Returns
1157    ///
1158    /// This function does not return a value.
1159    fn reconcile_shutdown_outcomes(
1160        &self,
1161        wait_order: &[ChildId],
1162        outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
1163    ) {
1164        for child_id in wait_order {
1165            if outcomes.contains_key(child_id) {
1166                continue;
1167            }
1168            let Some(runtime) = self.registry.child(child_id) else {
1169                continue;
1170            };
1171            outcomes.insert(
1172                child_id.clone(),
1173                ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1174                    child_id: runtime.id.clone(),
1175                    path: runtime.path.clone(),
1176                    generation: runtime.generation,
1177                    child_start_count: runtime.child_start_count,
1178                    status: ChildShutdownStatus::AlreadyExited,
1179                    cancel_delivered: false,
1180                    exit: runtime.last_exit.clone(),
1181                    phase: ShutdownPhase::Reconcile,
1182                    reason: "child had no active child_start_count during shutdown".to_owned(),
1183                }),
1184            );
1185        }
1186    }
1187
1188    /// Sets a child state and reports whether the operation was idempotent.
1189    ///
1190    /// # Arguments
1191    ///
1192    /// - `child_id`: Target child identifier.
1193    /// - `next`: Requested managed child state.
1194    ///
1195    /// # Returns
1196    ///
1197    /// Returns a [`CommandResult::ChildControl`] value.
1198    fn set_child_state(
1199        &mut self,
1200        child_id: ChildId,
1201        operation: ChildControlOperation,
1202    ) -> CommandResult {
1203        if !self.slots.contains_key(&child_id) {
1204            let placeholder = self
1205                .registry
1206                .child(&child_id)
1207                .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1208                .unwrap_or_else(|| {
1209                    ChildSlot::new_placeholder(
1210                        child_id.clone(),
1211                        crate::id::types::SupervisorPath::root().join(child_id.value.clone()),
1212                    )
1213                });
1214            self.slots.insert(child_id.clone(), placeholder);
1215        }
1216        let runtime_state = self
1217            .slots
1218            .get_mut(&child_id)
1219            .expect("child runtime state should exist after insertion");
1220        let operation_before = runtime_state.operation;
1221        runtime_state.operation = operation;
1222        let outcome = ChildControlResult::new(
1223            child_id,
1224            runtime_state.attempt,
1225            runtime_state.generation,
1226            operation_before,
1227            runtime_state.operation,
1228            Some(runtime_state.status),
1229            false,
1230            if runtime_state.has_active_attempt() {
1231                runtime_state.stop_state
1232            } else {
1233                ChildStopState::NoActiveAttempt
1234            },
1235            runtime_state.restart_limit.clone(),
1236            runtime_state.observe_liveness(self.time_base.now_unix_nanos()),
1237            operation_before == operation,
1238            runtime_state.last_control_failure.clone(),
1239            None,
1240        );
1241        CommandResult::ChildControl { outcome }
1242    }
1243
1244    /// Executes a stop-style child control command.
1245    ///
1246    /// # Arguments
1247    ///
1248    /// - `child_id`: Target child identifier.
1249    /// - `target_operation`: Operation requested by the command.
1250    /// - `command_name`: Stable command name used in lifecycle text.
1251    /// - `meta`: Audit metadata attached to the command.
1252    /// - `event_sender`: Event channel used for lifecycle text.
1253    ///
1254    /// # Returns
1255    ///
1256    /// Returns a [`CommandResult::ChildControl`] value.
1257    fn execute_stop_child_control(
1258        &mut self,
1259        child_id: ChildId,
1260        target_operation: ChildControlOperation,
1261        command_name: &'static str,
1262        meta: &CommandMeta,
1263        event_sender: &broadcast::Sender<String>,
1264    ) -> CommandResult {
1265        if !self.slots.contains_key(&child_id) {
1266            let placeholder = self
1267                .registry
1268                .child(&child_id)
1269                .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1270                .unwrap_or_else(|| {
1271                    ChildSlot::new_placeholder(
1272                        child_id.clone(),
1273                        crate::id::types::SupervisorPath::root().join(child_id.value.clone()),
1274                    )
1275                });
1276            self.slots.insert(child_id.clone(), placeholder);
1277        }
1278
1279        let remove_after_outcome;
1280        let correlation_id = CorrelationId::from_uuid(meta.command_id.value);
1281        let mut pending_events = Vec::new();
1282        let outcome = {
1283            let runtime_state = self
1284                .slots
1285                .get_mut(&child_id)
1286                .expect("child runtime state should exist after insertion");
1287            let stop = apply_stop_control_to_runtime_state(
1288                runtime_state,
1289                target_operation,
1290                command_name,
1291                &meta.command_id.value.to_string(),
1292                correlation_id,
1293                self.time_base
1294                    .now_unix_nanos()
1295                    .saturating_add(self.shutdown.policy.graceful_timeout.as_nanos()),
1296                event_sender,
1297                &mut pending_events,
1298            );
1299            remove_after_outcome = stop.remove_after_outcome;
1300            build_child_control_outcome(
1301                stop.operation_before,
1302                stop.cancel_delivered,
1303                stop.idempotent,
1304                runtime_state.last_control_failure.clone(),
1305                runtime_state,
1306                &self.time_base,
1307                None,
1308            )
1309        };
1310
1311        for event in pending_events {
1312            self.emit_pending_event(event);
1313        }
1314
1315        let outcome_path = self
1316            .slots
1317            .get(&child_id)
1318            .map(|state| state.path.clone())
1319            .or_else(|| {
1320                self.registry
1321                    .child(&child_id)
1322                    .map(|runtime| runtime.path.clone())
1323            })
1324            .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
1325        self.emit_pending_event(PendingRuntimeEvent {
1326            child_id: child_id.clone(),
1327            path: outcome_path,
1328            generation: outcome.generation,
1329            attempt: outcome.attempt,
1330            correlation_id,
1331            what: What::ChildControlCommandCompleted {
1332                child_id: child_id.clone(),
1333                command: command_name.to_owned(),
1334                command_id: meta.command_id.value.to_string(),
1335                requested_by: meta.requested_by.clone(),
1336                reason: meta.reason.clone(),
1337                result: child_control_result_label(&outcome).to_owned(),
1338                outcome: Box::new(outcome.clone()),
1339            },
1340        });
1341
1342        if remove_after_outcome {
1343            if let Some(removed) = self.slots.remove(&child_id) {
1344                self.emit_pending_event(PendingRuntimeEvent {
1345                    child_id: child_id.clone(),
1346                    path: removed.path.clone(),
1347                    generation: removed.generation,
1348                    attempt: removed.attempt,
1349                    correlation_id,
1350                    what: What::ChildRuntimeStateRemoved {
1351                        child_id: child_id.clone(),
1352                        path: removed.path,
1353                        final_status: None,
1354                    },
1355                });
1356            }
1357            let _ignored = event_sender.send(format!("child_runtime_state_removed:{child_id}"));
1358        }
1359
1360        CommandResult::ChildControl { outcome }
1361    }
1362
1363    /// Records the completed child_start_count in the registry.
1364    ///
1365    /// # Arguments
1366    ///
1367    /// - `report`: Completed child child_start_count report.
1368    ///
1369    /// # Returns
1370    ///
1371    /// This function does not return a value.
1372    fn record_child_exit(&mut self, report: ChildRunReport) {
1373        let child_id = report.runtime.id.clone();
1374        if let Some(runtime) = self.registry.child_mut(&child_id) {
1375            runtime.last_exit = Some(report.exit);
1376            runtime.status = ChildRuntimeStatus::Exited;
1377            runtime.generation = report.runtime.generation;
1378            runtime.child_start_count = report.runtime.child_start_count;
1379            runtime.restart_count = report.runtime.restart_count;
1380        }
1381    }
1382
1383    /// Reports whether automatic policy may still act on a child.
1384    ///
1385    /// # Arguments
1386    ///
1387    /// - `child_id`: Child whose latest exit is being evaluated.
1388    ///
1389    /// # Returns
1390    ///
1391    /// Returns `true` when the runtime may restart the child.
1392    fn should_apply_automatic_policy(&self, child_id: &ChildId) -> bool {
1393        if self.shutdown.phase() != crate::shutdown::stage::ShutdownPhase::Idle {
1394            return false;
1395        }
1396        !matches!(
1397            self.slots.get(child_id).map(|state| state.operation),
1398            Some(
1399                ChildControlOperation::Paused
1400                    | ChildControlOperation::Quarantined
1401                    | ChildControlOperation::Removed
1402            )
1403        )
1404    }
1405
1406    /// Calculates a restart decision for the latest child exit.
1407    ///
1408    /// # Arguments
1409    ///
1410    /// - `child_id`: Child whose latest exit is being evaluated.
1411    ///
1412    /// # Returns
1413    ///
1414    /// Returns a restart decision when the child is known.
1415    fn restart_decision(&self, child_id: &ChildId) -> Option<RestartDecision> {
1416        let runtime = self.registry.child(child_id)?;
1417        let exit = runtime.last_exit.as_ref()?;
1418        let policy_exit = policy_task_exit(exit);
1419        let restart_policy = restart_policy(runtime.spec.restart_policy);
1420        let backoff = backoff_policy(runtime.spec.backoff_policy);
1421        Some(self.policy_engine.decide(
1422            restart_policy,
1423            policy_exit,
1424            runtime.child_start_count.value,
1425            &backoff,
1426        ))
1427    }
1428
1429    /// Refreshes restart limit state for one child after an exit.
1430    ///
1431    /// # Arguments
1432    ///
1433    /// - `child_id`: Child whose accounting should be refreshed.
1434    /// - `count_failure`: Whether this exit consumes the restart limit.
1435    ///
1436    /// # Returns
1437    ///
1438    /// Returns the child path and updated restart limit state when the child is tracked.
1439    fn refresh_restart_limit_for_child(
1440        &mut self,
1441        child_id: &ChildId,
1442        count_failure: bool,
1443    ) -> Option<(SupervisorPath, crate::control::outcome::RestartLimitState)> {
1444        let restart_limit = restart_limit_for_child_in_spec(&self.tree, &self.spec, child_id);
1445        let runtime_state = self.slots.get_mut(child_id)?;
1446        let updated = runtime_state.refresh_restart_limit(
1447            restart_limit.window,
1448            restart_limit.max_restarts,
1449            count_failure,
1450            &self.time_base,
1451        );
1452        Some((runtime_state.path.clone(), updated))
1453    }
1454
1455    /// Executes a restart decision after a child exit.
1456    ///
1457    /// # Arguments
1458    ///
1459    /// - `failed_child`: Child whose exit triggered the decision.
1460    /// - `decision`: Restart decision returned by the policy engine.
1461    /// - `event_sender`: Event channel used for lifecycle text.
1462    ///
1463    /// # Returns
1464    ///
1465    /// This function does not return a value.
1466    fn execute_restart_decision(
1467        &mut self,
1468        failed_child: ChildId,
1469        decision: RestartDecision,
1470        event_sender: &broadcast::Sender<String>,
1471    ) {
1472        match decision {
1473            RestartDecision::RestartAfter { delay } => {
1474                self.restart_strategy_scope(failed_child, delay, event_sender);
1475            }
1476            RestartDecision::Quarantine => {
1477                let _result =
1478                    self.set_child_state(failed_child, ChildControlOperation::Quarantined);
1479            }
1480            RestartDecision::ShutdownTree => {
1481                let cause = ShutdownCause::new("runtime", "policy requested tree shutdown");
1482                let _result = self.shutdown.request_stop(cause);
1483            }
1484            RestartDecision::EscalateToParent | RestartDecision::DoNotRestart => {}
1485        }
1486    }
1487
1488    /// Restarts every child selected by the current execution plan.
1489    ///
1490    /// # Arguments
1491    ///
1492    /// - `failed_child`: Child whose exit triggered the restart scope.
1493    /// - `delay`: Delay before every selected child is restarted.
1494    /// - `event_sender`: Event channel used for lifecycle text.
1495    ///
1496    /// # Returns
1497    ///
1498    /// This function does not return a value.
1499    fn restart_strategy_scope(
1500        &mut self,
1501        failed_child: ChildId,
1502        delay: Duration,
1503        event_sender: &broadcast::Sender<String>,
1504    ) {
1505        let plan = restart_execution_plan(&self.tree, &self.spec, &failed_child);
1506        let scope_label = child_scope_label(&plan.scope);
1507        let group_label = plan.group.as_deref().unwrap_or("supervisor");
1508
1509        // FR-003: Check concurrent restart gate before spawning
1510        if !self.concurrent_gate.try_acquire() {
1511            // Gate saturated - emit throttle event and skip restart
1512            let _ignored = event_sender.send(format!(
1513                "restart_throttled:concurrent_gate_saturated:{group_label}:{scope_label}"
1514            ));
1515            self.emit_throttle_gate_event(
1516                &failed_child,
1517                plan.group.as_deref(),
1518                ThrottleGateOwner::SupervisorInstance,
1519            );
1520            return;
1521        }
1522
1523        let _ignored = event_sender.send(format!(
1524            "restart_plan:{:?}:{group_label}:{scope_label}",
1525            plan.strategy
1526        ));
1527        for child_id in plan.scope {
1528            self.spawn_child_start(child_id, true, delay);
1529        }
1530        self.concurrent_gate.release();
1531    }
1532
1533    /// Emits a typed event for a restart throttle gate hit.
1534    ///
1535    /// # Arguments
1536    ///
1537    /// - `child_id`: Child whose restart was throttled.
1538    /// - `group_id`: Optional restart execution group.
1539    /// - `owner`: Gate owner that limited the restart.
1540    ///
1541    /// # Returns
1542    ///
1543    /// This function does not return a value.
1544    fn emit_throttle_gate_event(
1545        &mut self,
1546        child_id: &ChildId,
1547        group_id: Option<&str>,
1548        owner: ThrottleGateOwner,
1549    ) {
1550        let now = Instant::now();
1551        let uptime = now
1552            .duration_since(self.time_base.base_instant)
1553            .as_millis()
1554            .min(u128::from(u64::MAX)) as u64;
1555        let monotonic_nanos = now.duration_since(self.time_base.base_instant).as_nanos();
1556        let path = self
1557            .slots
1558            .get(child_id)
1559            .map(|state| state.path.clone())
1560            .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
1561        let child_name = self
1562            .registry
1563            .child(child_id)
1564            .map(|runtime| runtime.spec.name.clone())
1565            .unwrap_or_else(|| child_id.to_string());
1566        let mut event = SupervisorEvent::new(
1567            When::new(EventTime::from_parts(
1568                monotonic_nanos,
1569                uptime,
1570                Generation::initial(),
1571                ChildStartCount::first(),
1572            )),
1573            Where::new(path).with_child(child_id.clone(), child_name),
1574            What::ChildFailed {
1575                failure: crate::error::types::TaskFailure::new(
1576                    crate::error::types::TaskFailureKind::Error,
1577                    "restart_throttled",
1578                    format!(
1579                        "restart denied by throttle gate {} for group {}",
1580                        owner,
1581                        group_id.unwrap_or("supervisor")
1582                    ),
1583                ),
1584            },
1585            self.event_sequences.next(),
1586            CorrelationId::new(),
1587            1,
1588        );
1589        event.effective_protective_action = Some(ProtectionAction::RestartDenied);
1590        event.throttle_gate_owner = owner;
1591        if let Some(runtime) = self.registry.child(child_id) {
1592            let effective_policy = prepare_effective_policy(&runtime.spec);
1593            event.task_role = Some(effective_policy.task_role);
1594            event.used_fallback_default = effective_policy.used_fallback;
1595            event.effective_policy_source = Some(effective_policy.source);
1596        }
1597        if let Ok(mut observability) = self.observability.lock() {
1598            let _lagged = observability.emit(event);
1599        }
1600    }
1601
1602    /// Records six-stage pipeline diagnostics in shared observability.
1603    ///
1604    /// # Arguments
1605    ///
1606    /// - `diagnostics`: Diagnostics produced by the supervision pipeline.
1607    ///
1608    /// # Returns
1609    ///
1610    /// This function does not return a value.
1611    fn record_pipeline_stage_diagnostics(&self, diagnostics: &[PipelineStageDiagnostic]) {
1612        if let Ok(mut observability) = self.observability.lock() {
1613            observability.record_pipeline_stage_diagnostics(diagnostics);
1614        }
1615    }
1616
1617    /// Checks the fairness probe and emits starvation alerts (T019).
1618    ///
1619    /// # Arguments
1620    ///
1621    /// - `event_sender`: Event channel used for lifecycle text.
1622    ///
1623    /// # Returns
1624    ///
1625    /// This function does not return a value.
1626    fn check_fairness_probe(&mut self, event_sender: &broadcast::Sender<String>) {
1627        let now_unix_nanos = self.time_base.now_unix_nanos();
1628        let all_child_ids: Vec<ChildId> = self.slots.keys().cloned().collect();
1629        if let Some(alert) = self.fairness_probe.check(now_unix_nanos, &all_child_ids) {
1630            // Emit typed event for structured observability (ALIGN-003).
1631            let path = self
1632                .slots
1633                .get(&alert.starved_child_id)
1634                .map(|slot| slot.path.clone())
1635                .unwrap_or_else(|| {
1636                    SupervisorPath::root().join(alert.starved_child_id.value.clone())
1637                });
1638            let generation = self
1639                .slots
1640                .get(&alert.starved_child_id)
1641                .and_then(|slot| slot.generation);
1642            let attempt = self
1643                .slots
1644                .get(&alert.starved_child_id)
1645                .and_then(|slot| slot.attempt);
1646            let pending = PendingRuntimeEvent {
1647                child_id: alert.starved_child_id.clone(),
1648                path,
1649                generation,
1650                attempt,
1651                correlation_id: CorrelationId::new(),
1652                what: What::FairnessProbeStarvation {
1653                    starved_child_id: alert.starved_child_id.clone(),
1654                    skip_count: alert.skip_count,
1655                    probe_start_unix_nanos: alert.probe_start_unix_nanos,
1656                    probe_end_unix_nanos: alert.probe_end_unix_nanos,
1657                },
1658            };
1659            self.emit_pending_event(pending);
1660
1661            // Text-based log for audit trail.
1662            let _ignored = event_sender.send(format!(
1663                "fairness_starvation:{}:skip_count={}:window_start={}:window_end={}",
1664                alert.starved_child_id,
1665                alert.skip_count,
1666                alert.probe_start_unix_nanos,
1667                alert.probe_end_unix_nanos,
1668            ));
1669        }
1670    }
1671
1672    /// Ensures that the dynamic supervisor accepts another child manifest.
1673    ///
1674    /// # Arguments
1675    ///
1676    /// This function has no arguments.
1677    ///
1678    /// # Returns
1679    ///
1680    /// Returns `Ok(())` when another dynamic child can be added.
1681    fn ensure_dynamic_child_allowed(&self) -> Result<(), SupervisorError> {
1682        let current_child_count = self.dynamic_child_count();
1683        if self
1684            .spec
1685            .dynamic_supervisor_policy
1686            .allows_addition(current_child_count)
1687        {
1688            return Ok(());
1689        }
1690        Err(SupervisorError::InvalidTransition {
1691            message: "dynamic supervisor child limit reached".to_owned(),
1692        })
1693    }
1694
1695    /// Counts declared and dynamic child records.
1696    ///
1697    /// # Arguments
1698    ///
1699    /// This function has no arguments.
1700    ///
1701    /// # Returns
1702    ///
1703    /// Returns the number of declared children plus accepted dynamic manifests.
1704    fn dynamic_child_count(&self) -> usize {
1705        self.registry
1706            .declaration_order()
1707            .len()
1708            .saturating_add(self.manifests.len())
1709    }
1710
1711    /// Handles `RestartChild` with generation fencing semantics.
1712    ///
1713    /// # Arguments
1714    ///
1715    /// - `child_id`: Stable child targeted by restart.
1716    /// - `meta`: Audit metadata forwarded from the caller.
1717    /// - `event_sender`: Lifecycle text broadcaster.
1718    ///
1719    /// # Returns
1720    ///
1721    /// Returns a structured [`CommandResult`] that always uses [`CommandResult::ChildControl`].
1722    fn execute_restart_child_control(
1723        &mut self,
1724        child_id: ChildId,
1725        meta: &CommandMeta,
1726        event_sender: &broadcast::Sender<String>,
1727    ) -> CommandResult {
1728        let correlation_id = CorrelationId::from_uuid(meta.command_id.value);
1729
1730        if self.registry.child(&child_id).is_none() {
1731            let outcome = restart_child_unknown_outcome(child_id.clone());
1732            self.emit_restart_child_completed(
1733                outcome.clone(),
1734                meta,
1735                correlation_id,
1736                event_sender,
1737                Vec::new(),
1738            );
1739            return CommandResult::ChildControl { outcome };
1740        }
1741
1742        if self.shutdown.phase() != ShutdownPhase::Idle {
1743            return self.restart_child_blocked_by_shutdown(
1744                &child_id,
1745                meta,
1746                correlation_id,
1747                event_sender,
1748            );
1749        }
1750
1751        if !self.slots.contains_key(&child_id) {
1752            let placeholder = self
1753                .registry
1754                .child(&child_id)
1755                .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1756                .unwrap_or_else(|| {
1757                    ChildSlot::new_placeholder(
1758                        child_id.clone(),
1759                        SupervisorPath::root().join(child_id.value.clone()),
1760                    )
1761                });
1762            self.slots.insert(child_id.clone(), placeholder);
1763        }
1764
1765        let mut pending_events = Vec::new();
1766
1767        // Records which restart branch matched before optional immediate spawn bookkeeping.
1768        enum RestartPrep {
1769            // Outcome resolved without visiting `spawn_child_start`.
1770            Completed(Box<ChildControlResult>),
1771            // Child had no activity and should restart immediately via the shared spawn helper.
1772            DeferredImmediate {
1773                // Operation captured before spawning.
1774                operation_before: ChildControlOperation,
1775            },
1776        }
1777
1778        let restart_prep = {
1779            let runtime_state = self
1780                .slots
1781                .get_mut(&child_id)
1782                .expect("runtime state exists after insertion");
1783            if runtime_state.generation_fence.pending_restart.is_some() {
1784                let pending = runtime_state
1785                    .generation_fence
1786                    .pending_restart
1787                    .as_mut()
1788                    .expect("checked pending restart");
1789                pending.duplicate_request_count = pending.duplicate_request_count.saturating_add(1);
1790                let pending_for_conflict = pending.clone();
1791                pending_events.push(PendingRuntimeEvent {
1792                    child_id: child_id.clone(),
1793                    path: runtime_state.path.clone(),
1794                    generation: Some(pending_for_conflict.old_generation),
1795                    attempt: Some(pending_for_conflict.old_attempt),
1796                    correlation_id: CorrelationId::from_uuid(meta.command_id.value),
1797                    what: What::ChildRestartConflict {
1798                        child_id: child_id.clone(),
1799                        current_generation: Some(pending_for_conflict.old_generation),
1800                        current_attempt: Some(pending_for_conflict.old_attempt),
1801                        target_generation: Some(pending_for_conflict.target_generation),
1802                        command_id: meta.command_id.value.to_string(),
1803                        decision: "already_pending".to_owned(),
1804                        reason: "duplicate restart merged into pending restart".to_owned(),
1805                    },
1806                });
1807                let fence = GenerationFenceOutcome::new(
1808                    GenerationFenceDecision::AlreadyPending,
1809                    Some(pending_for_conflict.old_generation),
1810                    Some(pending_for_conflict.old_attempt),
1811                    Some(pending_for_conflict.target_generation),
1812                    false,
1813                    pending_for_conflict.abort_requested,
1814                    None,
1815                );
1816                let operation_before = runtime_state.operation;
1817                RestartPrep::Completed(Box::new(build_child_control_outcome(
1818                    operation_before,
1819                    false,
1820                    false,
1821                    runtime_state.last_control_failure.clone(),
1822                    runtime_state,
1823                    &self.time_base,
1824                    Some(fence),
1825                )))
1826            } else if !runtime_state.has_active_attempt() {
1827                RestartPrep::DeferredImmediate {
1828                    operation_before: runtime_state.operation,
1829                }
1830            } else {
1831                let old_generation = runtime_state
1832                    .generation
1833                    .expect("active attempt owns a generation");
1834                let old_attempt = runtime_state
1835                    .attempt
1836                    .expect("active attempt owns an attempt counter");
1837                let cancel_delivered = runtime_state.cancel();
1838                let deadline = self
1839                    .time_base
1840                    .now_unix_nanos()
1841                    .saturating_add(self.shutdown.policy.graceful_timeout.as_nanos());
1842                runtime_state.stop_deadline_at_unix_nanos = Some(deadline);
1843                if cancel_delivered {
1844                    runtime_state.stop_state = ChildStopState::CancelDelivered;
1845                }
1846                let target_generation = old_generation.next();
1847                let requested_at = self.time_base.now_unix_nanos();
1848                let pending = PendingRestart::new(
1849                    meta.command_id.value,
1850                    meta.requested_by.clone(),
1851                    meta.reason.clone(),
1852                    old_generation,
1853                    old_attempt,
1854                    target_generation,
1855                    requested_at,
1856                    deadline,
1857                    false,
1858                    0,
1859                );
1860                runtime_state.generation_fence.pending_restart = Some(pending.clone());
1861                runtime_state.generation_fence.phase = GenerationFencePhase::WaitingForOldStop;
1862
1863                if cancel_delivered {
1864                    pending_events.push(PendingRuntimeEvent {
1865                        child_id: child_id.clone(),
1866                        path: runtime_state.path.clone(),
1867                        generation: Some(old_generation),
1868                        attempt: Some(old_attempt),
1869                        correlation_id,
1870                        what: What::ChildControlCancelDelivered {
1871                            child_id: child_id.clone(),
1872                            generation: old_generation,
1873                            attempt: old_attempt,
1874                            command: "restart_child".to_owned(),
1875                            command_id: meta.command_id.value.to_string(),
1876                        },
1877                    });
1878                    let _ignored = event_sender.send(format!(
1879                        "child_control_cancel_delivered:{child_id}:restart_child"
1880                    ));
1881                }
1882
1883                pending_events.push(PendingRuntimeEvent {
1884                    child_id: child_id.clone(),
1885                    path: runtime_state.path.clone(),
1886                    generation: Some(old_generation),
1887                    attempt: Some(old_attempt),
1888                    correlation_id,
1889                    what: What::ChildRestartFenceEntered {
1890                        child_id: child_id.clone(),
1891                        old_generation,
1892                        old_attempt,
1893                        target_generation,
1894                        command_id: meta.command_id.value.to_string(),
1895                        requested_by: meta.requested_by.clone(),
1896                        reason: meta.reason.clone(),
1897                        stop_deadline_at_unix_nanos: deadline,
1898                    },
1899                });
1900
1901                let operation_before = runtime_state.operation;
1902                let fence = GenerationFenceOutcome::new(
1903                    GenerationFenceDecision::QueuedAfterStop,
1904                    Some(old_generation),
1905                    Some(old_attempt),
1906                    Some(target_generation),
1907                    cancel_delivered,
1908                    false,
1909                    None,
1910                );
1911                RestartPrep::Completed(Box::new(build_child_control_outcome(
1912                    operation_before,
1913                    cancel_delivered,
1914                    false,
1915                    None,
1916                    runtime_state,
1917                    &self.time_base,
1918                    Some(fence),
1919                )))
1920            }
1921        };
1922
1923        let outcome = match restart_prep {
1924            RestartPrep::Completed(outcome) => *outcome,
1925            RestartPrep::DeferredImmediate { operation_before } => {
1926                self.spawn_child_start(child_id.clone(), true, Duration::ZERO);
1927                let runtime_state = self.slots.get_mut(&child_id).expect("runtime state exists");
1928                let target_generation = self
1929                    .registry
1930                    .child(&child_id)
1931                    .map(|runtime| runtime.generation);
1932                let fence = GenerationFenceOutcome::new(
1933                    GenerationFenceDecision::StartedImmediately,
1934                    None,
1935                    None,
1936                    target_generation,
1937                    false,
1938                    false,
1939                    None,
1940                );
1941                build_child_control_outcome(
1942                    operation_before,
1943                    false,
1944                    false,
1945                    runtime_state.last_control_failure.clone(),
1946                    runtime_state,
1947                    &self.time_base,
1948                    Some(fence),
1949                )
1950            }
1951        };
1952
1953        self.emit_restart_child_completed(
1954            outcome.clone(),
1955            meta,
1956            correlation_id,
1957            event_sender,
1958            pending_events,
1959        );
1960
1961        CommandResult::ChildControl { outcome }
1962    }
1963
1964    /// Emits [`What::ChildControlCommandCompleted`] for an explicit restart command.
1965    ///
1966    /// # Arguments
1967    ///
1968    /// - `outcome`: Command outcome returned to the caller.
1969    /// - `meta`: Audit metadata carried from the command.
1970    /// - `correlation_id`: Correlation shared with related fence events.
1971    /// - `event_sender`: Legacy text broadcaster.
1972    /// - `pending_events`: Fence or cancellation events that must publish first.
1973    ///
1974    /// # Returns
1975    ///
1976    /// This function does not return a value.
1977    fn emit_restart_child_completed(
1978        &mut self,
1979        outcome: ChildControlResult,
1980        meta: &CommandMeta,
1981        correlation_id: CorrelationId,
1982        event_sender: &broadcast::Sender<String>,
1983        mut pending_events: Vec<PendingRuntimeEvent>,
1984    ) {
1985        for event in pending_events.drain(..) {
1986            self.emit_pending_event(event);
1987        }
1988        let outcome_identifier = outcome.child_id.clone();
1989        let outcome_path = self
1990            .slots
1991            .get(&outcome.child_id)
1992            .map(|state| state.path.clone())
1993            .or_else(|| {
1994                self.registry
1995                    .child(&outcome.child_id)
1996                    .map(|runtime| runtime.path.clone())
1997            })
1998            .unwrap_or_else(|| SupervisorPath::root().join(outcome.child_id.value.clone()));
1999        self.emit_pending_event(PendingRuntimeEvent {
2000            child_id: outcome.child_id.clone(),
2001            path: outcome_path,
2002            generation: outcome.generation,
2003            attempt: outcome.attempt,
2004            correlation_id,
2005            what: What::ChildControlCommandCompleted {
2006                child_id: outcome.child_id.clone(),
2007                command: "restart_child".to_owned(),
2008                command_id: meta.command_id.value.to_string(),
2009                requested_by: meta.requested_by.clone(),
2010                reason: meta.reason.clone(),
2011                result: child_control_result_label(&outcome).to_owned(),
2012                outcome: Box::new(outcome),
2013            },
2014        });
2015        let _ignored = event_sender.send(format!(
2016            "child_control_command_completed:{}:restart_child",
2017            outcome_identifier
2018        ));
2019    }
2020
2021    /// Blocks restart while the supervisor tree is not idle.
2022    ///
2023    /// # Arguments
2024    ///
2025    /// - `child_id`: Target child identifier.
2026    /// - `meta`: Audit metadata from the command.
2027    /// - `correlation_id`: Correlation binding typed events.
2028    /// - `event_sender`: Legacy text broadcaster.
2029    ///
2030    /// # Returns
2031    ///
2032    /// Returns [`CommandResult::ChildControl`] with [`GenerationFenceDecision::BlockedByShutdown`].
2033    fn restart_child_blocked_by_shutdown(
2034        &mut self,
2035        child_id: &ChildId,
2036        meta: &CommandMeta,
2037        correlation_id: CorrelationId,
2038        event_sender: &broadcast::Sender<String>,
2039    ) -> CommandResult {
2040        if !self.slots.contains_key(child_id) {
2041            let placeholder = self
2042                .registry
2043                .child(child_id)
2044                .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
2045                .unwrap_or_else(|| {
2046                    ChildSlot::new_placeholder(
2047                        child_id.clone(),
2048                        SupervisorPath::root().join(child_id.value.clone()),
2049                    )
2050                });
2051            self.slots.insert(child_id.clone(), placeholder);
2052        }
2053
2054        let outcome = {
2055            let runtime_state = self.slots.get_mut(child_id).expect("runtime state exists");
2056            runtime_state.generation_fence.phase = GenerationFencePhase::Closed;
2057            let failure = ChildControlFailure::new(
2058                ChildControlFailurePhase::WaitCompletion,
2059                "supervisor tree is shutting down",
2060                false,
2061            );
2062            let fence = GenerationFenceOutcome::new(
2063                GenerationFenceDecision::BlockedByShutdown,
2064                runtime_state.generation,
2065                runtime_state.attempt,
2066                None,
2067                false,
2068                false,
2069                Some(failure.clone()),
2070            );
2071            let operation_before = runtime_state.operation;
2072            runtime_state.last_control_failure = Some(failure);
2073            build_child_control_outcome(
2074                operation_before,
2075                false,
2076                false,
2077                runtime_state.last_control_failure.clone(),
2078                runtime_state,
2079                &self.time_base,
2080                Some(fence),
2081            )
2082        };
2083
2084        let blocked_events = match self.slots.get(child_id).map(|runtime_state| {
2085            (
2086                runtime_state.path.clone(),
2087                runtime_state.generation,
2088                runtime_state.attempt,
2089            )
2090        }) {
2091            Some((path, current_generation, current_attempt)) => {
2092                vec![PendingRuntimeEvent {
2093                    child_id: child_id.clone(),
2094                    path,
2095                    generation: current_generation,
2096                    attempt: current_attempt,
2097                    correlation_id,
2098                    what: What::ChildRestartConflict {
2099                        child_id: child_id.clone(),
2100                        current_generation,
2101                        current_attempt,
2102                        target_generation: None,
2103                        command_id: meta.command_id.value.to_string(),
2104                        decision: "rejected".to_owned(),
2105                        reason: "restart rejected while supervisor tree is shutting down"
2106                            .to_owned(),
2107                    },
2108                }]
2109            }
2110            None => Vec::new(),
2111        };
2112
2113        self.emit_restart_child_completed(
2114            outcome.clone(),
2115            meta,
2116            correlation_id,
2117            event_sender,
2118            blocked_events,
2119        );
2120
2121        CommandResult::ChildControl { outcome }
2122    }
2123
2124    /// Builds the current runtime state report.
2125    ///
2126    /// # Arguments
2127    ///
2128    /// This function has no arguments.
2129    ///
2130    /// # Returns
2131    ///
2132    /// Returns a [`CurrentState`] value.
2133    fn build_current_state(&mut self) -> CurrentState {
2134        let mut child_runtime_records = Vec::new();
2135        let mut pending_events = Vec::new();
2136        let declaration_order = self.registry.declaration_order().to_vec();
2137        for child_id in declaration_order {
2138            if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2139                let liveness = runtime_state.observe_liveness(self.time_base.now_unix_nanos());
2140                if let Some(event) = heartbeat_stale_event(runtime_state, &liveness) {
2141                    pending_events.push(event);
2142                }
2143                child_runtime_records.push(runtime_state.to_record(liveness));
2144            }
2145        }
2146        for (child_id, runtime_state) in &mut self.slots {
2147            if self.registry.child(child_id).is_some() {
2148                continue;
2149            }
2150            let liveness = runtime_state.observe_liveness(self.time_base.now_unix_nanos());
2151            if let Some(event) = heartbeat_stale_event(runtime_state, &liveness) {
2152                pending_events.push(event);
2153            }
2154            child_runtime_records.push(runtime_state.to_record(liveness));
2155        }
2156        for event in pending_events {
2157            self.emit_pending_event(event);
2158        }
2159        CurrentState {
2160            child_count: self.dynamic_child_count(),
2161            shutdown_completed: self.shutdown.phase()
2162                == crate::shutdown::stage::ShutdownPhase::Completed,
2163            child_runtime_records,
2164        }
2165    }
2166
2167    /// Spawns the target generation queued by a pending manual restart once the old attempt exits.
2168    ///
2169    /// # Arguments
2170    ///
2171    /// - `child_id`: Stable child undergoing a fenced restart.
2172    /// - `pending`: Accepted restart bookkeeping that pins the identity triple transition.
2173    /// - `old_exit`: Exit classification observed for the old attempt.
2174    ///
2175    /// # Returns
2176    ///
2177    /// This function does not return a value.
2178    fn spawn_pending_restart_target(
2179        &mut self,
2180        child_id: ChildId,
2181        pending: PendingRestart,
2182        old_exit: TaskExit,
2183    ) {
2184        let Some(registry_identity_anchor) = self.registry.child(&child_id).map(|runtime| {
2185            (
2186                runtime.generation,
2187                runtime.child_start_count,
2188                runtime.restart_count,
2189            )
2190        }) else {
2191            return;
2192        };
2193        let path = self
2194            .slots
2195            .get(&child_id)
2196            .map(|state| state.path.clone())
2197            .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
2198        let correlation_id = CorrelationId::from_uuid(pending.command_id);
2199
2200        if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2201            runtime_state.registry_identity_anchor_for_spawn_attempt =
2202                Some(registry_identity_anchor);
2203        }
2204
2205        {
2206            let Some(registry_runtime) = self.registry.child_mut(&child_id) else {
2207                return;
2208            };
2209            registry_runtime.generation = pending.target_generation;
2210            registry_runtime.child_start_count = registry_runtime.child_start_count.next();
2211            registry_runtime.restart_count = registry_runtime.restart_count.saturating_add(1);
2212            registry_runtime.status = ChildRuntimeStatus::Starting;
2213        }
2214
2215        let Some(runtime) = self.registry.child(&child_id).cloned() else {
2216            return;
2217        };
2218        let new_generation = runtime.generation;
2219        let new_attempt = runtime.child_start_count;
2220
2221        let path_for_handles = path.clone();
2222
2223        match ChildRunner::new().spawn_once(runtime) {
2224            Ok(handle) => {
2225                let sender = self.command_sender.clone();
2226                self.emit_pending_event(PendingRuntimeEvent {
2227                    child_id: child_id.clone(),
2228                    path,
2229                    generation: Some(new_generation),
2230                    attempt: Some(new_attempt),
2231                    correlation_id,
2232                    what: What::ChildRestartFenceReleased {
2233                        child_id: child_id.clone(),
2234                        old_generation: pending.old_generation,
2235                        old_attempt: pending.old_attempt,
2236                        target_generation: pending.target_generation,
2237                        exit_kind: old_exit.clone(),
2238                    },
2239                });
2240                let mut completion_receiver = handle.completion_receiver.clone();
2241                self.slots
2242                    .entry(child_id.clone())
2243                    .or_insert_with(|| {
2244                        ChildSlot::new_placeholder(child_id.clone(), path_for_handles)
2245                    })
2246                    .activate(
2247                        new_generation,
2248                        new_attempt,
2249                        ChildAttemptStatus::Running,
2250                        handle,
2251                    );
2252                tokio::spawn(async move {
2253                    let result = wait_for_report(&mut completion_receiver).await;
2254                    send_child_result(sender, child_id, result).await;
2255                });
2256            }
2257            Err(error) => {
2258                let message = error.to_string();
2259                if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2260                    let identity_anchor_triple_opt = runtime_state
2261                        .registry_identity_anchor_for_spawn_attempt
2262                        .take();
2263                    if let Some((generation, attempt, restart_count)) = identity_anchor_triple_opt {
2264                        if let Some(registry_runtime) = self.registry.child_mut(&child_id) {
2265                            registry_runtime.generation = generation;
2266                            registry_runtime.child_start_count = attempt;
2267                            registry_runtime.restart_count = restart_count;
2268                        }
2269                        // Keep the superseded `(generation, attempt)` identity visible alongside the queued target spawn failure diagnostics.
2270                        runtime_state.generation = Some(generation);
2271                        runtime_state.attempt = Some(attempt);
2272                        runtime_state.status = ChildAttemptStatus::Stopped;
2273                    }
2274                    runtime_state.generation_fence.phase = GenerationFencePhase::Open;
2275                    runtime_state.last_control_failure = Some(ChildControlFailure::new(
2276                        ChildControlFailurePhase::WaitCompletion,
2277                        message,
2278                        true,
2279                    ));
2280                }
2281                // Avoid enqueueing a second asynchronous start-failure loop message because `last_control_failure` already records the deterministic spawn diagnostics.
2282            }
2283        }
2284    }
2285
2286    /// Reconciles expired stop deadlines without blocking the control loop.
2287    ///
2288    /// # Arguments
2289    ///
2290    /// This function has no arguments.
2291    ///
2292    /// # Returns
2293    ///
2294    /// This function does not return a value.
2295    fn reconcile_stop_deadlines(&mut self) {
2296        let now = self.time_base.now_unix_nanos();
2297        let mut pending_events = Vec::new();
2298        for runtime_state in self.slots.values_mut() {
2299            let fence_escalation = if let Some(pending_restart) =
2300                runtime_state.generation_fence.pending_restart.as_ref()
2301            {
2302                if pending_restart.abort_requested {
2303                    None
2304                } else if runtime_state.generation_fence.phase
2305                    == GenerationFencePhase::WaitingForOldStop
2306                    && runtime_state.stop_state == ChildStopState::CancelDelivered
2307                    && runtime_state.has_active_attempt()
2308                    && now >= pending_restart.stop_deadline_at_unix_nanos
2309                {
2310                    match (runtime_state.generation, runtime_state.attempt) {
2311                        (Some(old_generation), Some(old_attempt)) => Some((
2312                            pending_restart.command_id,
2313                            pending_restart.target_generation,
2314                            pending_restart.stop_deadline_at_unix_nanos,
2315                            runtime_state.child_id.clone(),
2316                            runtime_state.path.clone(),
2317                            old_generation,
2318                            old_attempt,
2319                        )),
2320                        _ => None,
2321                    }
2322                } else {
2323                    None
2324                }
2325            } else {
2326                None
2327            };
2328
2329            if let Some((
2330                command_id,
2331                target_generation,
2332                deadline_ns,
2333                fence_child_id,
2334                fence_path,
2335                old_generation,
2336                old_attempt,
2337            )) = fence_escalation
2338            {
2339                let delivered = runtime_state.abort();
2340                if delivered {
2341                    if let Some(pending_mut) = &mut runtime_state.generation_fence.pending_restart {
2342                        pending_mut.abort_requested = true;
2343                    }
2344                    runtime_state.generation_fence.phase = GenerationFencePhase::AbortingOld;
2345                    pending_events.push(PendingRuntimeEvent {
2346                        child_id: fence_child_id.clone(),
2347                        path: fence_path,
2348                        generation: Some(old_generation),
2349                        attempt: Some(old_attempt),
2350                        correlation_id: CorrelationId::from_uuid(command_id),
2351                        what: What::ChildRestartFenceAbortRequested {
2352                            child_id: fence_child_id,
2353                            old_generation,
2354                            old_attempt,
2355                            target_generation,
2356                            command_id: command_id.to_string(),
2357                            deadline_unix_nanos: deadline_ns,
2358                        },
2359                    });
2360                }
2361            }
2362
2363            if runtime_state.generation_fence.pending_restart.is_some() {
2364                continue;
2365            }
2366
2367            if matches!(
2368                runtime_state.generation_fence.phase,
2369                GenerationFencePhase::WaitingForOldStop | GenerationFencePhase::AbortingOld
2370            ) {
2371                continue;
2372            }
2373
2374            if runtime_state.stop_state != ChildStopState::CancelDelivered {
2375                continue;
2376            }
2377            let Some(deadline) = runtime_state.stop_deadline_at_unix_nanos else {
2378                continue;
2379            };
2380            if deadline > now || !runtime_state.has_active_attempt() {
2381                continue;
2382            }
2383            let Some(generation) = runtime_state.generation else {
2384                continue;
2385            };
2386            let Some(attempt) = runtime_state.attempt else {
2387                continue;
2388            };
2389            let status = runtime_state.status;
2390            let failure = ChildControlFailure::new(
2391                ChildControlFailurePhase::WaitCompletion,
2392                "child did not complete before stop deadline",
2393                true,
2394            );
2395            runtime_state.status = status;
2396            runtime_state.stop_state = ChildStopState::Failed;
2397            runtime_state.last_control_failure = Some(failure.clone());
2398            pending_events.push(PendingRuntimeEvent {
2399                child_id: runtime_state.child_id.clone(),
2400                path: runtime_state.path.clone(),
2401                generation: Some(generation),
2402                attempt: Some(attempt),
2403                correlation_id: CorrelationId::new(),
2404                what: What::ChildControlStopFailed {
2405                    child_id: runtime_state.child_id.clone(),
2406                    generation,
2407                    attempt,
2408                    status,
2409                    stop_state: ChildStopState::Failed,
2410                    phase: failure.phase,
2411                    reason: failure.reason,
2412                    recoverable: failure.recoverable,
2413                },
2414            });
2415        }
2416        for event in pending_events {
2417            self.emit_pending_event(event);
2418        }
2419    }
2420
2421    /// Emits one pending typed runtime event.
2422    ///
2423    /// # Arguments
2424    ///
2425    /// - `pending`: Event data collected while runtime state was borrowed.
2426    ///
2427    /// # Returns
2428    ///
2429    /// This function does not return a value.
2430    fn emit_pending_event(&mut self, pending: PendingRuntimeEvent) {
2431        let now = Instant::now();
2432        let uptime = now
2433            .duration_since(self.time_base.base_instant)
2434            .as_millis()
2435            .min(u128::from(u64::MAX)) as u64;
2436        let monotonic_nanos = now.duration_since(self.time_base.base_instant).as_nanos();
2437        let child_name = self
2438            .registry
2439            .child(&pending.child_id)
2440            .map(|runtime| runtime.spec.name.clone())
2441            .unwrap_or_else(|| pending.child_id.to_string());
2442        let event = SupervisorEvent::new(
2443            When::new(EventTime::from_parts(
2444                monotonic_nanos,
2445                uptime,
2446                pending.generation.unwrap_or_else(Generation::initial),
2447                pending.attempt.unwrap_or_else(ChildStartCount::first),
2448            )),
2449            Where::new(pending.path).with_child(pending.child_id, child_name),
2450            pending.what,
2451            self.event_sequences.next(),
2452            pending.correlation_id,
2453            1,
2454        );
2455        if let Ok(mut observability) = self.observability.lock() {
2456            let _lagged = observability.emit(event);
2457        }
2458    }
2459
2460    /// Spawns one child child_start_count and reports the exit back to this control loop.
2461    ///
2462    /// # Arguments
2463    ///
2464    /// - `child_id`: Child that should run.
2465    /// - `is_restart`: Whether this child_start_count is a restart child_start_count.
2466    /// - `delay`: Delay before the child_start_count starts.
2467    ///
2468    /// # Returns
2469    ///
2470    /// This function does not return a value.
2471    fn spawn_child_start(&mut self, child_id: ChildId, is_restart: bool, delay: Duration) {
2472        if self.shutdown.phase() != ShutdownPhase::Idle {
2473            return;
2474        }
2475        if let Some(runtime_state) = self.slots.get(&child_id) {
2476            if runtime_state.generation_fence.pending_restart.is_some() {
2477                if is_restart {
2478                    let path = runtime_state.path.clone();
2479                    let generation = runtime_state.generation;
2480                    let attempt = runtime_state.attempt;
2481                    let pending_target = runtime_state
2482                        .generation_fence
2483                        .pending_restart
2484                        .as_ref()
2485                        .map(|pending| pending.target_generation);
2486                    self.emit_pending_event(PendingRuntimeEvent {
2487                        child_id: child_id.clone(),
2488                        path,
2489                        generation,
2490                        attempt,
2491                        correlation_id: CorrelationId::new(),
2492                        what: What::ChildRestartConflict {
2493                            child_id: child_id.clone(),
2494                            current_generation: generation,
2495                            current_attempt: attempt,
2496                            target_generation: pending_target,
2497                            command_id: "runtime-policy".to_owned(),
2498                            decision: "rejected".to_owned(),
2499                            reason: "automatic restart suppressed while pending manual restart holds the fence".to_owned(),
2500                        },
2501                    });
2502                }
2503                return;
2504            }
2505            if matches!(
2506                runtime_state.generation_fence.phase,
2507                GenerationFencePhase::WaitingForOldStop
2508                    | GenerationFencePhase::AbortingOld
2509                    | GenerationFencePhase::Closed
2510                    | GenerationFencePhase::ReadyToStart
2511            ) {
2512                return;
2513            }
2514        }
2515        let Some(runtime) = self.prepare_child_start(&child_id, is_restart) else {
2516            return;
2517        };
2518        let sender = self.command_sender.clone();
2519        if !delay.is_zero() {
2520            tokio::spawn(async move {
2521                tokio::time::sleep(delay).await;
2522                let child_id_for_msg = runtime.id.clone();
2523                let path = runtime.path.clone();
2524                let generation = runtime.generation;
2525                let attempt = runtime.child_start_count;
2526                match ChildRunner::new().spawn_once(runtime) {
2527                    Ok(handle) => {
2528                        let _ignored = sender
2529                            .send(RuntimeLoopMessage::ChildStart(
2530                                ChildStartMessage::DelayedSpawnAttached {
2531                                    child_id: child_id_for_msg,
2532                                    path,
2533                                    generation,
2534                                    attempt,
2535                                    handle,
2536                                },
2537                            ))
2538                            .await;
2539                    }
2540                    Err(error) => {
2541                        tokio::spawn(async move {
2542                            send_child_result(sender, child_id_for_msg, Err(error)).await;
2543                        });
2544                    }
2545                }
2546            });
2547            return;
2548        }
2549
2550        let child_id_cloned = runtime.id.clone();
2551        let path = runtime.path.clone();
2552        let generation = runtime.generation;
2553        let child_start_count = runtime.child_start_count;
2554        match ChildRunner::new().spawn_once(runtime) {
2555            Ok(handle) => {
2556                self.attach_spawned_child_handle(
2557                    child_id_cloned,
2558                    path,
2559                    generation,
2560                    child_start_count,
2561                    handle,
2562                );
2563            }
2564            Err(error) => {
2565                tokio::spawn(async move {
2566                    send_child_result(sender, child_id_cloned, Err(error)).await;
2567                });
2568            }
2569        }
2570    }
2571
2572    /// Prepares registry state for one child child_start_count.
2573    ///
2574    /// # Arguments
2575    ///
2576    /// - `child_id`: Child that should run.
2577    /// - `bump_restart_counters`: Whether this spawn should bump generation accounting like a restart.
2578    ///
2579    /// # Returns
2580    ///
2581    /// Returns a runtime record for the child runner.
2582    fn prepare_child_start(
2583        &mut self,
2584        child_id: &ChildId,
2585        bump_restart_counters: bool,
2586    ) -> Option<ChildRuntime> {
2587        let runtime = self.registry.child_mut(child_id)?;
2588        if bump_restart_counters {
2589            runtime.child_start_count = runtime.child_start_count.next();
2590            runtime.generation = runtime.generation.next();
2591            runtime.restart_count = runtime.restart_count.saturating_add(1);
2592        }
2593        runtime.status = ChildRuntimeStatus::Starting;
2594        if let Some(runtime_state) = self.slots.get_mut(child_id) {
2595            runtime_state.operation = ChildControlOperation::Active;
2596        }
2597        Some(runtime.clone())
2598    }
2599
2600    // ------------------------------------------------------------------
2601    // Slot-based lifecycle operations (slot-based replacement for child_runtime_states)
2602    // ------------------------------------------------------------------
2603
2604    /// Executes a shutdown on all slots using the real cancellation+join
2605    /// pipeline.
2606    ///
2607    /// # Arguments
2608    ///
2609    /// - `event_sender`: Event channel used for lifecycle text.
2610    ///
2611    /// # Returns
2612    ///
2613    /// Returns a shutdown result.
2614    pub(crate) async fn handle_shutdown_tree(
2615        &mut self,
2616        requested_by: String,
2617        reason: String,
2618        event_sender: &broadcast::Sender<String>,
2619    ) -> Result<ShutdownResult, SupervisorError> {
2620        let policy = self.shutdown.policy;
2621        let reason_copy = reason.clone();
2622        let cause = ShutdownCause::new(requested_by, reason);
2623        let _started = self.shutdown.request_stop(cause);
2624        let _ignored = event_sender.send(format!(
2625            "shutdown_phase_changed:{:?}:{:?}",
2626            ShutdownPhase::Idle,
2627            self.shutdown.phase()
2628        ));
2629        self.advance_shutdown_phase(event_sender);
2630        self.advance_shutdown_phase(event_sender);
2631
2632        let outcomes =
2633            shutdown_tree_fanout(&mut self.slots, &policy, &mut self.admission_set).await;
2634        let reconcile = reconcile_shutdown_slots(&self.slots);
2635
2636        // Emit orphan warning when residual handles remain after shutdown.
2637        if !reconcile.verified_clean {
2638            let _ignored = event_sender.send(format!(
2639                "shutdown_reconcile_warning: orphan_slots={:?}",
2640                reconcile.orphan_slots
2641            ));
2642        }
2643
2644        self.advance_shutdown_phase(event_sender);
2645        self.advance_shutdown_phase(event_sender);
2646        let _completed = self.shutdown.complete();
2647
2648        let report = ShutdownPipelineReport {
2649            cause: ShutdownCause::new("slot-shutdown", reason_copy),
2650            started_at_unix_nanos: unix_epoch_nanos(),
2651            completed_at_unix_nanos: unix_epoch_nanos(),
2652            phase: ShutdownPhase::Completed,
2653            outcomes,
2654            reconcile: ShutdownReconcileReport::core_runtime_completed(),
2655            idempotent: false,
2656        };
2657        self.shutdown_pipeline.cache_report(report.clone());
2658        let _ignored = event_sender.send(format!("shutdown_completed:{}", report.outcomes.len()));
2659        Ok(self.shutdown.result_with_report(report, false))
2660    }
2661
2662    /// Applies a control operation to the slot for the given child.
2663    ///
2664    /// # Arguments
2665    ///
2666    /// - `child_id`: Target child identifier.
2667    /// - `operation`: Desired control operation.
2668    ///
2669    /// # Returns
2670    ///
2671    /// Returns `true` when the slot was found and modified.
2672    pub(crate) fn handle_command_on_slot(
2673        &mut self,
2674        child_id: &ChildId,
2675        operation: ChildControlOperation,
2676    ) -> bool {
2677        let Some(slot) = self.slots.get_mut(child_id) else {
2678            return false;
2679        };
2680        slot.operation = operation;
2681        if matches!(
2682            operation,
2683            ChildControlOperation::Quarantined | ChildControlOperation::Removed
2684        ) && slot.has_active_attempt()
2685        {
2686            slot.cancel();
2687        }
2688        true
2689    }
2690
2691    /// Processes a completed child exit through the slot system.
2692    ///
2693    /// # Arguments
2694    ///
2695    /// - `child_id`: Child that exited.
2696    /// - `report`: Completed child run report.
2697    ///
2698    /// # Returns
2699    ///
2700    /// Returns the exit summary stored in the slot, or `None` when no slot
2701    /// exists for this child.
2702    pub(crate) fn process_child_exit_on_slot(
2703        &mut self,
2704        child_id: &ChildId,
2705        report: &ChildRunReport,
2706    ) -> Option<ChildExitSummary> {
2707        let slot = self.slots.get_mut(child_id)?;
2708        let now_nanos = self.time_base.now_unix_nanos();
2709        let summary = ChildExitSummary::from_report(report, now_nanos);
2710        slot.deactivate(summary.clone());
2711        self.admission_set.release(child_id);
2712        Some(summary)
2713    }
2714
2715    /// Observes liveness for every active slot and emits stale heartbeat
2716    /// events.
2717    ///
2718    /// # Arguments
2719    ///
2720    /// - `event_sender`: Event channel used for lifecycle text.
2721    ///
2722    /// # Returns
2723    ///
2724    /// Returns the count of slots with stale heartbeats.
2725    pub(crate) fn observe_slot_liveness(
2726        &mut self,
2727        event_sender: &broadcast::Sender<String>,
2728    ) -> usize {
2729        let mut stale_count = 0usize;
2730        let threshold_nanos = Duration::from_secs(DEFAULT_HEARTBEAT_TIMEOUT_SECS).as_nanos();
2731        let now_nanos = self.time_base.now_unix_nanos();
2732
2733        for (child_id, slot) in self.slots.iter_mut() {
2734            if !slot.has_active_attempt() {
2735                continue;
2736            }
2737            if let Some(last_hb) = slot.last_heartbeat_at
2738                && now_nanos.saturating_sub(last_hb) >= threshold_nanos
2739            {
2740                stale_count += 1;
2741                let _ignored = event_sender.send(format!(
2742                    "child_liveness_stale: child_id={} last_heartbeat_at={}",
2743                    child_id, last_hb
2744                ));
2745            }
2746        }
2747        stale_count
2748    }
2749
2750    /// Checks whether a child slot is eligible for restart.
2751    ///
2752    /// Returns `Ok(())` when restart may proceed, or `Err(AdmissionConflict)`
2753    /// when the slot has a pending restart or an active attempt.
2754    ///
2755    /// # Arguments
2756    ///
2757    /// - `child_id`: Child to check.
2758    /// - `request_generation`: Generation claimed by the restart request.
2759    /// - `request_attempt`: Attempt number claimed by the restart request.
2760    ///
2761    /// # Returns
2762    ///
2763    /// Returns `Ok(())` or `Err(AdmissionConflict)`.
2764    pub(crate) fn check_slot_restart_eligibility(
2765        &self,
2766        child_id: &ChildId,
2767        request_generation: Generation,
2768        request_attempt: ChildStartCount,
2769    ) -> Result<(), AdmissionConflict> {
2770        let Some(slot) = self.slots.get(child_id) else {
2771            return Ok(());
2772        };
2773        if slot.pending_restart {
2774            return Err(AdmissionConflict::new(
2775                child_id.clone(),
2776                slot.generation.unwrap_or(Generation::initial()),
2777                slot.attempt.unwrap_or(ChildStartCount::first()),
2778                "restart rejected: pending restart already exists",
2779            ));
2780        }
2781        if let (Some(active_gen), Some(active_att)) = (slot.generation, slot.attempt)
2782            && (request_generation != active_gen || request_attempt != active_att)
2783        {
2784            return Err(AdmissionConflict::new(
2785                child_id.clone(),
2786                active_gen,
2787                active_att,
2788                "restart conflicts with active attempt",
2789            ));
2790        }
2791        Ok(())
2792    }
2793
2794    /// Copies a `ChildRuntimeState` entry into a `ChildSlot` when a slot
2795    /// does not yet exist for the child.
2796    ///
2797    /// # Arguments
2798    ///
2799    /// - `child_id`: Child to ensure has a slot.
2800    /// - `path`: Supervisor path for the child.
2801    ///
2802    /// # Returns
2803    ///
2804    /// Returns `true` when a new slot was created.
2805    pub(crate) fn ensure_slot_exists(&mut self, child_id: ChildId, path: SupervisorPath) -> bool {
2806        if self.slots.contains_key(&child_id) {
2807            return false;
2808        }
2809        let slot = ChildSlot::new(
2810            child_id.clone(),
2811            path,
2812            Duration::from_secs(60), // Default restart window.
2813        );
2814        self.slots.insert(child_id, slot);
2815        true
2816    }
2817}
2818
2819/// Builds a child control command outcome from the latest runtime state.
2820///
2821/// # Arguments
2822///
2823/// - `operation_before`: Operation observed before command handling.
2824/// - `cancel_delivered`: Whether this command delivered cancellation.
2825/// - `idempotent`: Whether this command reused existing state.
2826/// - `failure`: Failure observed during command handling.
2827/// - `runtime_state`: Runtime state used as the source of truth.
2828/// - `time_base`: Runtime time base used for liveness timestamps.
2829/// - `generation_fence`: Optional fencing metadata for restart commands only.
2830///
2831/// # Returns
2832///
2833/// Returns a [`ChildControlResult`] value.
2834fn build_child_control_outcome(
2835    operation_before: ChildControlOperation,
2836    cancel_delivered: bool,
2837    idempotent: bool,
2838    failure: Option<ChildControlFailure>,
2839    runtime_state: &mut ChildSlot,
2840    time_base: &RuntimeTimeBase,
2841    generation_fence: Option<GenerationFenceOutcome>,
2842) -> ChildControlResult {
2843    let liveness = runtime_state.observe_liveness(time_base.now_unix_nanos());
2844    // Data model: status is None when there is no active attempt.
2845    let status = if runtime_state.attempt.is_some() {
2846        Some(runtime_state.status)
2847    } else {
2848        None
2849    };
2850    ChildControlResult::new(
2851        runtime_state.child_id.clone(),
2852        runtime_state.attempt,
2853        runtime_state.generation,
2854        operation_before,
2855        runtime_state.operation,
2856        status,
2857        cancel_delivered,
2858        runtime_state.stop_state,
2859        runtime_state.restart_limit.clone(),
2860        liveness,
2861        idempotent,
2862        failure,
2863        generation_fence,
2864    )
2865}
2866
2867/// Result of applying a stop-style control command to a runtime state record.
2868#[derive(Debug, Clone, Copy)]
2869struct StopControlApplication {
2870    /// Operation observed before command handling.
2871    operation_before: ChildControlOperation,
2872    /// Whether this command delivered cancellation.
2873    cancel_delivered: bool,
2874    /// Whether this command reused existing state.
2875    idempotent: bool,
2876    /// Whether the caller should remove the record after building the outcome.
2877    remove_after_outcome: bool,
2878}
2879
2880/// Applies a stop-style child control command to one runtime state record.
2881///
2882/// # Arguments
2883///
2884/// - `runtime_state`: Runtime state that owns the target child.
2885/// - `target_operation`: Operation requested by the command.
2886/// - `command_name`: Stable command name.
2887/// - `command_id`: Stable command identifier.
2888/// - `correlation_id`: Correlation identifier for emitted events.
2889/// - `stop_deadline_at_unix_nanos`: Deadline written when cancellation is delivered.
2890/// - `event_sender`: Event channel used for lifecycle text.
2891/// - `pending_events`: Typed events collected until mutable borrows end.
2892///
2893/// # Returns
2894///
2895/// Returns the applied stop control facts.
2896#[allow(clippy::too_many_arguments)]
2897fn apply_stop_control_to_runtime_state(
2898    runtime_state: &mut ChildSlot,
2899    target_operation: ChildControlOperation,
2900    command_name: &'static str,
2901    command_id: &str,
2902    correlation_id: CorrelationId,
2903    stop_deadline_at_unix_nanos: u128,
2904    event_sender: &broadcast::Sender<String>,
2905    pending_events: &mut Vec<PendingRuntimeEvent>,
2906) -> StopControlApplication {
2907    let child_id = runtime_state.child_id.clone();
2908    let operation_before = runtime_state.operation;
2909    let had_active_attempt = runtime_state.has_active_attempt();
2910    let already_cancelled_for_target = had_active_attempt
2911        && operation_before == target_operation
2912        && runtime_state.attempt_cancel_delivered;
2913    let idempotent = if had_active_attempt {
2914        already_cancelled_for_target
2915    } else {
2916        operation_before == target_operation && target_operation != ChildControlOperation::Removed
2917    };
2918
2919    if operation_before != target_operation {
2920        runtime_state.operation = target_operation;
2921        pending_events.push(PendingRuntimeEvent {
2922            child_id: child_id.clone(),
2923            path: runtime_state.path.clone(),
2924            generation: runtime_state.generation,
2925            attempt: runtime_state.attempt,
2926            correlation_id,
2927            what: What::ChildControlOperationChanged {
2928                child_id: child_id.clone(),
2929                from: operation_before,
2930                to: target_operation,
2931                command: command_name.to_owned(),
2932                command_id: command_id.to_owned(),
2933            },
2934        });
2935        let _ignored = event_sender.send(format!(
2936            "child_control_operation_changed:{child_id}:{operation_before:?}:{target_operation:?}"
2937        ));
2938    }
2939
2940    let cancel_delivered = if had_active_attempt && !already_cancelled_for_target {
2941        let delivered = runtime_state.cancel();
2942        if delivered {
2943            runtime_state.stop_state = ChildStopState::CancelDelivered;
2944            runtime_state.stop_deadline_at_unix_nanos = Some(stop_deadline_at_unix_nanos);
2945            if let (Some(generation), Some(attempt)) =
2946                (runtime_state.generation, runtime_state.attempt)
2947            {
2948                pending_events.push(PendingRuntimeEvent {
2949                    child_id: child_id.clone(),
2950                    path: runtime_state.path.clone(),
2951                    generation: Some(generation),
2952                    attempt: Some(attempt),
2953                    correlation_id,
2954                    what: What::ChildControlCancelDelivered {
2955                        child_id: child_id.clone(),
2956                        generation,
2957                        attempt,
2958                        command: command_name.to_owned(),
2959                        command_id: command_id.to_owned(),
2960                    },
2961                });
2962            }
2963            let _ignored = event_sender.send(format!(
2964                "child_control_cancel_delivered:{child_id}:{command_name}"
2965            ));
2966        }
2967        delivered
2968    } else {
2969        if !had_active_attempt {
2970            runtime_state.stop_state = ChildStopState::NoActiveAttempt;
2971        }
2972        false
2973    };
2974
2975    StopControlApplication {
2976        operation_before,
2977        cancel_delivered,
2978        idempotent,
2979        remove_after_outcome: target_operation == ChildControlOperation::Removed
2980            && !had_active_attempt,
2981    }
2982}
2983
2984/// Builds a stale heartbeat event when suppression allows emission.
2985///
2986/// # Arguments
2987///
2988/// - `runtime_state`: Runtime state whose heartbeat was observed.
2989/// - `liveness`: Latest liveness state.
2990///
2991/// # Returns
2992///
2993/// Returns a pending event when the stale heartbeat should be emitted.
2994fn heartbeat_stale_event(
2995    runtime_state: &mut ChildSlot,
2996    liveness: &ChildLivenessState,
2997) -> Option<PendingRuntimeEvent> {
2998    let Some(attempt) = runtime_state.attempt else {
2999        runtime_state.stale_event_attempt = None;
3000        return None;
3001    };
3002    if !liveness.heartbeat_stale {
3003        runtime_state.stale_event_attempt = None;
3004        return None;
3005    }
3006    if runtime_state.stale_event_attempt == Some(attempt) {
3007        return None;
3008    }
3009    let since_unix_nanos = liveness.last_heartbeat_at_unix_nanos?;
3010    runtime_state.stale_event_attempt = Some(attempt);
3011    Some(PendingRuntimeEvent {
3012        child_id: runtime_state.child_id.clone(),
3013        path: runtime_state.path.clone(),
3014        generation: runtime_state.generation,
3015        attempt: Some(attempt),
3016        correlation_id: CorrelationId::new(),
3017        what: What::ChildHeartbeatStale {
3018            child_id: runtime_state.child_id.clone(),
3019            attempt,
3020            since_unix_nanos,
3021        },
3022    })
3023}
3024
3025/// Runs the control loop until all command senders are dropped.
3026///
3027/// # Arguments
3028///
3029/// - `state`: Runtime state initialized from the supervisor specification.
3030/// - `receiver`: Runtime command receiver.
3031/// - `event_sender`: Event channel used for audit text.
3032///
3033/// # Returns
3034///
3035/// Returns a [`RuntimeExitReport`] when the control loop ends.
3036pub async fn run_control_loop(
3037    mut state: RuntimeControlState,
3038    mut receiver: mpsc::Receiver<RuntimeLoopMessage>,
3039    event_sender: broadcast::Sender<String>,
3040) -> RuntimeExitReport {
3041    state.start_declared_children();
3042    while let Some(message) = receiver.recv().await {
3043        match message {
3044            RuntimeLoopMessage::Control {
3045                command,
3046                reply_sender,
3047            } => {
3048                let command_name = command_name(&command);
3049                let result = state.execute_control(command, &event_sender).await;
3050                let _ignored = event_sender.send(format!("control_command:{command_name}"));
3051                let _ignored = reply_sender.send(result);
3052            }
3053            RuntimeLoopMessage::ChildStart(ChildStartMessage::Exited { report }) => {
3054                state.handle_child_exit(*report, &event_sender);
3055            }
3056            RuntimeLoopMessage::ChildStart(ChildStartMessage::StartFailed {
3057                child_id,
3058                message,
3059            }) => {
3060                state.handle_child_start_failed(child_id, message, &event_sender);
3061            }
3062            RuntimeLoopMessage::ChildStart(ChildStartMessage::DelayedSpawnAttached {
3063                child_id,
3064                path,
3065                generation,
3066                attempt,
3067                handle,
3068            }) => {
3069                state.attach_spawned_child_handle(child_id, path, generation, attempt, handle);
3070            }
3071            RuntimeLoopMessage::ControlPlane(ControlPlaneMessage::ReplayChildExitForTest {
3072                report,
3073            }) => {
3074                state.handle_child_exit(*report, &event_sender);
3075            }
3076            RuntimeLoopMessage::ControlPlane(ControlPlaneMessage::Shutdown {
3077                meta,
3078                reply_sender,
3079            }) => {
3080                let _ignored = event_sender.send(format!(
3081                    "runtime_control_loop_shutdown_requested:{}:{}",
3082                    meta.requested_by, meta.reason
3083                ));
3084                match meta.validate() {
3085                    Ok(()) => {
3086                        let report = RuntimeExitReport::completed(
3087                            "shutdown",
3088                            format!("runtime control plane shutdown requested: {}", meta.reason),
3089                        );
3090                        let _ignored = reply_sender.send(Ok(report.clone()));
3091                        return report;
3092                    }
3093                    Err(error) => {
3094                        let _ignored = reply_sender.send(Err(error));
3095                        continue;
3096                    }
3097                }
3098            }
3099        }
3100    }
3101    RuntimeExitReport::completed("message_loop", "runtime command channel closed")
3102}
3103
3104/// Returns a stable command name for audit text.
3105///
3106/// # Arguments
3107///
3108/// - `command`: Command being executed.
3109///
3110/// # Returns
3111///
3112/// Returns a static command name.
3113fn command_name(command: &ControlCommand) -> &'static str {
3114    match command {
3115        ControlCommand::AddChild { .. } => "add_child",
3116        ControlCommand::RemoveChild { .. } => "remove_child",
3117        ControlCommand::RestartChild { .. } => "restart_child",
3118        ControlCommand::PauseChild { .. } => "pause_child",
3119        ControlCommand::ResumeChild { .. } => "resume_child",
3120        ControlCommand::QuarantineChild { .. } => "quarantine_child",
3121        ControlCommand::ShutdownTree { .. } => "shutdown_tree",
3122        ControlCommand::CurrentState { .. } => "current_state",
3123    }
3124}
3125
3126/// Sends a child run result back to the control loop.
3127///
3128/// # Arguments
3129///
3130/// - `sender`: Runtime command sender.
3131/// - `child_id`: Child identifier used when the run fails before reporting.
3132/// - `result`: Child run result.
3133///
3134/// # Returns
3135///
3136/// This function does not return a value.
3137async fn send_child_result(
3138    sender: mpsc::Sender<RuntimeLoopMessage>,
3139    child_id: ChildId,
3140    result: Result<ChildRunReport, SupervisorError>,
3141) {
3142    let message = match result {
3143        Ok(report) => RuntimeLoopMessage::ChildStart(ChildStartMessage::Exited {
3144            report: Box::new(report),
3145        }),
3146        Err(error) => RuntimeLoopMessage::ChildStart(ChildStartMessage::StartFailed {
3147            child_id,
3148            message: error.to_string(),
3149        }),
3150    };
3151    let _ignored = sender.send(message).await;
3152}
3153
3154/// Maps child restart policy into policy-engine restart policy.
3155///
3156/// # Arguments
3157///
3158/// - `policy`: Restart policy stored on the child declaration.
3159///
3160/// # Returns
3161///
3162/// Returns the equivalent policy-engine value.
3163fn restart_policy(policy: ChildRestartPolicy) -> RestartPolicy {
3164    match policy {
3165        ChildRestartPolicy::Permanent => RestartPolicy::Permanent,
3166        ChildRestartPolicy::Transient => RestartPolicy::Transient,
3167        ChildRestartPolicy::Temporary => RestartPolicy::Temporary,
3168    }
3169}
3170
3171/// Maps child backoff policy into policy-engine backoff policy.
3172///
3173/// # Arguments
3174///
3175/// Maps a child spec backoff policy into the policy-engine equivalent.
3176///
3177/// # Arguments
3178///
3179/// - `policy`: Backoff policy stored on the child declaration.
3180///
3181/// # Returns
3182///
3183/// Returns the equivalent policy-engine value with full jitter enabled.
3184fn backoff_policy(policy: crate::spec::child::BackoffPolicy) -> BackoffPolicy {
3185    let jitter_percent = (policy.jitter_ratio * 100.0).round().clamp(0.0, 100.0) as u8;
3186    BackoffPolicy::new(
3187        policy.initial_delay,
3188        policy.max_delay,
3189        jitter_percent,
3190        policy.max_delay,
3191    )
3192    .with_full_jitter(42) // Enable full jitter mode per FR-003
3193}
3194
3195/// Maps a child-runner exit into policy-engine task exit.
3196///
3197/// # Arguments
3198///
3199/// - `exit`: Exit reported by the child runner.
3200///
3201/// # Returns
3202///
3203/// Returns the policy-engine exit value.
3204fn policy_task_exit(exit: &TaskExit) -> PolicyTaskExit {
3205    match exit.failure_kind() {
3206        Some(kind) => PolicyTaskExit::Failed { kind: kind.into() },
3207        None => PolicyTaskExit::Succeeded,
3208    }
3209}
3210
3211/// Classifies a child-runner exit into pipeline exit classification.
3212///
3213/// This function maps all six minimum required exit kinds from the specification:
3214/// success, nonzero_exit, panic, timeout, external_cancel, manual_stop.
3215///
3216/// # Arguments
3217///
3218/// - `exit`: Exit reported by the child runner.
3219///
3220/// # Returns
3221///
3222/// Returns the pipeline exit classification value.
3223fn classify_exit_for_pipeline(exit: &TaskExit, manual_stop_requested: bool) -> ExitClassification {
3224    match exit {
3225        TaskExit::Succeeded => ExitClassification::Success,
3226        TaskExit::Cancelled if manual_stop_requested => ExitClassification::ManualStop,
3227        TaskExit::Cancelled => ExitClassification::ExternalCancel,
3228        TaskExit::Failed(failure) => {
3229            // Check if this is an external cancel or timeout based on failure kind.
3230            match failure.kind {
3231                crate::error::types::TaskFailureKind::Cancelled if manual_stop_requested => {
3232                    ExitClassification::ManualStop
3233                }
3234                crate::error::types::TaskFailureKind::Cancelled => {
3235                    ExitClassification::ExternalCancel
3236                }
3237                crate::error::types::TaskFailureKind::Timeout => ExitClassification::Timeout,
3238                _ => ExitClassification::NonZeroExit { exit_code: -1 },
3239            }
3240        }
3241        TaskExit::Panicked(_) => ExitClassification::Crash {
3242            reason: "panic".to_string(),
3243        },
3244        TaskExit::TimedOut => ExitClassification::Timeout,
3245    }
3246}
3247
3248/// Reports whether the role policy should restart a successful exit.
3249///
3250/// # Arguments
3251///
3252/// - `pipeline_result`: Completed supervision pipeline context.
3253///
3254/// # Returns
3255///
3256/// Returns `true` when the effective role treats success as a restartable exit.
3257fn role_policy_restarts_success(pipeline_result: &PipelineContext) -> bool {
3258    pipeline_result.exit_classification == Some(ExitClassification::Success)
3259        && pipeline_result
3260            .effective_policy
3261            .as_ref()
3262            .is_some_and(|policy| policy.policy_pack.on_success_exit == OnSuccessAction::Restart)
3263}
3264
3265/// Builds the effective policy for a child before budget evaluation.
3266///
3267/// # Arguments
3268///
3269/// - `child_spec`: Child specification whose declared role and overrides should be merged.
3270///
3271/// # Returns
3272///
3273/// Returns an [`EffectivePolicy`] ready for the supervision pipeline.
3274fn prepare_effective_policy(child_spec: &ChildSpec) -> EffectivePolicy {
3275    EffectivePolicy::for_child(child_spec)
3276}
3277
3278/// Reports whether an exit should consume restart limit accounting.
3279///
3280/// # Arguments
3281///
3282/// - `exit`: Exit reported by the child runner.
3283///
3284/// # Returns
3285///
3286/// Returns `true` when the exit is an unplanned failure.
3287fn restart_limit_counts_exit(exit: &TaskExit) -> bool {
3288    matches!(
3289        exit,
3290        TaskExit::Failed(_) | TaskExit::Panicked(_) | TaskExit::TimedOut
3291    )
3292}
3293
3294/// Resolves the restart limit for a child from the supervisor strategy layers.
3295///
3296/// # Arguments
3297///
3298/// - `tree`: Supervisor tree used for child group lookup.
3299/// - `spec`: Supervisor specification that owns restart limit layers.
3300/// - `child_id`: Child whose restart limit should be resolved.
3301///
3302/// # Returns
3303///
3304/// Returns the selected restart limit or the runtime default.
3305fn restart_limit_for_child_in_spec(
3306    tree: &SupervisorTree,
3307    spec: &SupervisorSpec,
3308    child_id: &ChildId,
3309) -> RestartLimit {
3310    restart_execution_plan(tree, spec, child_id)
3311        .restart_limit
3312        .unwrap_or_else(default_restart_limit)
3313}
3314
3315/// Returns the runtime default restart limit.
3316///
3317/// # Arguments
3318///
3319/// This function has no arguments.
3320///
3321/// # Returns
3322///
3323/// Returns a conservative effectively-unbounded restart limit.
3324fn default_restart_limit() -> RestartLimit {
3325    RestartLimit::new(u32::MAX, Duration::from_secs(60))
3326}
3327
3328/// Builds a deterministic restart outcome for unknown identifiers.
3329///
3330/// # Arguments
3331///
3332/// - `child_id`: Stable child referenced by the command.
3333///
3334/// # Returns
3335///
3336/// Returns a rejection [`ChildControlResult`] with structured fencing metadata.
3337fn restart_child_unknown_outcome(child_id: ChildId) -> ChildControlResult {
3338    let conflict = ChildControlFailure::new(
3339        ChildControlFailurePhase::WaitCompletion,
3340        "unknown child",
3341        false,
3342    );
3343    let fence = GenerationFenceOutcome::new(
3344        GenerationFenceDecision::Rejected,
3345        None,
3346        None,
3347        None,
3348        false,
3349        false,
3350        Some(conflict.clone()),
3351    );
3352    ChildControlResult::new(
3353        child_id,
3354        None,
3355        None,
3356        ChildControlOperation::Active,
3357        ChildControlOperation::Active,
3358        None,
3359        false,
3360        ChildStopState::NoActiveAttempt,
3361        RestartLimitState::default(),
3362        ChildLivenessState::new(
3363            None,
3364            false,
3365            crate::readiness::signal::ReadinessState::Unreported,
3366        ),
3367        false,
3368        Some(conflict),
3369        Some(fence),
3370    )
3371}
3372
3373/// Classifies a child control command outcome for metrics.
3374///
3375/// # Arguments
3376///
3377/// - `outcome`: Child control command outcome.
3378///
3379/// # Returns
3380///
3381/// Returns `accepted`, `idempotent`, or `failed`.
3382fn child_control_result_label(outcome: &ChildControlResult) -> &'static str {
3383    if outcome.failure.is_some() || outcome.stop_state == ChildStopState::Failed {
3384        "failed"
3385    } else if outcome.idempotent {
3386        "idempotent"
3387    } else {
3388        "accepted"
3389    }
3390}
3391
3392/// Formats a restart scope for lifecycle events.
3393///
3394/// # Arguments
3395///
3396/// - `scope`: Child identifiers selected by strategy.
3397///
3398/// # Returns
3399///
3400/// Returns a comma-separated child identifier list.
3401fn child_scope_label(scope: &[ChildId]) -> String {
3402    scope
3403        .iter()
3404        .map(|child_id| child_id.value.clone())
3405        .collect::<Vec<_>>()
3406        .join(",")
3407}
3408
3409/// Maps managed child state into a control operation.
3410///
3411/// # Arguments
3412///
3413/// - `state`: Managed child state used by the current control loop.
3414///
3415/// # Returns
3416///
3417/// Returns the equivalent child control operation.
3418/// Builds a child shutdown outcome from a completed run report.
3419///
3420/// # Arguments
3421///
3422/// - `runtime_state`: Runtime state that produced the report.
3423/// - `report`: Completed child run report.
3424/// - `status`: Shutdown status assigned to the report.
3425/// - `phase`: Shutdown phase where the report was consumed.
3426/// - `reason`: Human-readable diagnostic reason.
3427///
3428/// # Returns
3429///
3430/// Returns a [`ChildShutdownOutcome`].
3431fn outcome_from_report(
3432    runtime_state: &ChildSlot,
3433    report: &ChildRunReport,
3434    status: ChildShutdownStatus,
3435    phase: ShutdownPhase,
3436    reason: impl Into<String>,
3437) -> ChildShutdownOutcome {
3438    ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3439        child_id: runtime_state.child_id.clone(),
3440        path: runtime_state.path.clone(),
3441        generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3442        child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3443        status,
3444        cancel_delivered: runtime_state.attempt_cancel_delivered,
3445        exit: Some(report.exit.clone()),
3446        phase,
3447        reason: reason.into(),
3448    })
3449}
3450
3451/// Builds a shutdown outcome for a removed runtime state record.
3452///
3453/// # Arguments
3454///
3455/// - `runtime_state`: Removed runtime state skipped by shutdown.
3456/// - `phase`: Shutdown phase that observed the removed state.
3457///
3458/// # Returns
3459///
3460/// Returns a [`ChildShutdownOutcome`] marked as already exited.
3461fn removed_runtime_state_shutdown_outcome(
3462    runtime_state: &ChildSlot,
3463    phase: ShutdownPhase,
3464) -> ChildShutdownOutcome {
3465    ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3466        child_id: runtime_state.child_id.clone(),
3467        path: runtime_state.path.clone(),
3468        generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3469        child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3470        status: ChildShutdownStatus::AlreadyExited,
3471        cancel_delivered: false,
3472        exit: None,
3473        phase,
3474        reason: "child runtime state was already removed before shutdown".to_owned(),
3475    })
3476}
3477
3478/// Builds a child shutdown outcome from a run report error.
3479///
3480/// # Arguments
3481///
3482/// - `runtime_state`: Runtime state that produced the error.
3483/// - `status`: Shutdown status assigned to the error.
3484/// - `phase`: Shutdown phase where the error was consumed.
3485/// - `error`: Error returned by the child run observer.
3486///
3487/// # Returns
3488///
3489/// Returns a [`ChildShutdownOutcome`].
3490fn outcome_from_error(
3491    runtime_state: &ChildSlot,
3492    status: ChildShutdownStatus,
3493    phase: ShutdownPhase,
3494    error: SupervisorError,
3495) -> ChildShutdownOutcome {
3496    ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3497        child_id: runtime_state.child_id.clone(),
3498        path: runtime_state.path.clone(),
3499        generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3500        child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3501        status,
3502        cancel_delivered: runtime_state.attempt_cancel_delivered,
3503        exit: None,
3504        phase,
3505        reason: error.to_string(),
3506    })
3507}
3508
3509/// Returns the remaining duration before a deadline.
3510///
3511/// # Arguments
3512///
3513/// - `deadline`: Monotonic deadline.
3514///
3515/// # Returns
3516///
3517/// Returns `None` when the deadline has already passed.
3518fn remaining_duration(deadline: Instant) -> Option<Duration> {
3519    deadline.checked_duration_since(Instant::now())
3520}
3521
3522/// Returns the current Unix epoch timestamp in nanoseconds.
3523///
3524/// # Arguments
3525///
3526/// This function has no arguments.
3527///
3528/// # Returns
3529///
3530/// Returns a nanosecond timestamp, or zero if system time is before epoch.
3531fn unix_epoch_nanos() -> u128 {
3532    SystemTime::now()
3533        .duration_since(UNIX_EPOCH)
3534        .map_or(0, |duration| duration.as_nanos())
3535}