1use crate::child_runner::run_exit::TaskExit;
7use crate::child_runner::runner::{ChildRunHandle, ChildRunReport, ChildRunner, wait_for_report};
8use crate::control::command::{CommandMeta, CommandResult, ControlCommand, CurrentState};
9use crate::control::outcome::{
10 ChildAttemptStatus, ChildControlFailure, ChildControlFailurePhase, ChildControlOperation,
11 ChildControlResult, ChildLivenessState, ChildStopState, GenerationFenceDecision,
12 GenerationFenceOutcome, GenerationFencePhase, PendingRestart, RestartLimitState,
13 StaleAttemptReport, StaleReportHandling,
14};
15use crate::error::types::SupervisorError;
16use crate::event::payload::{ProtectionAction, SupervisorEvent, ThrottleGateOwner, What, Where};
17use crate::event::time::{CorrelationId, EventSequenceSource, EventTime, When};
18use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
19use crate::observe::fairness::FairnessProbe;
20use crate::observe::pipeline::{ObservabilityPipeline, PipelineStageDiagnostic};
21use crate::policy::backoff::BackoffPolicy;
22use crate::policy::decision::{
23 PolicyEngine, RestartDecision, RestartPolicy, TaskExit as PolicyTaskExit,
24};
25use crate::policy::failure_window::FailureWindow;
26use crate::policy::meltdown::MeltdownTracker;
27use crate::policy::task_role_defaults::{EffectivePolicy, OnSuccessAction};
28use crate::registry::entry::{ChildRuntime, ChildRuntimeStatus};
29use crate::registry::store::RegistryStore;
30use crate::runtime::admission::{AdmissionConflict, AdmissionSet};
31use crate::runtime::child_slot::{
32 ChildExitSummary, ChildSlot, DEFAULT_HEARTBEAT_TIMEOUT_SECS, RuntimeTimeBase,
33};
34use crate::runtime::lifecycle::RuntimeExitReport;
35use crate::runtime::message::{ChildStartMessage, ControlPlaneMessage, RuntimeLoopMessage};
36use crate::runtime::pipeline::{ExitClassification, PipelineContext, SupervisionPipeline};
37use crate::runtime::shutdown::{reconcile_shutdown_slots, shutdown_tree_fanout};
38use crate::runtime::shutdown_pipeline::ShutdownPipeline;
39use crate::shutdown::coordinator::{ShutdownCoordinator, ShutdownResult};
40use crate::shutdown::report::{
41 ChildShutdownOutcome, ChildShutdownOutcomeInput, ChildShutdownStatus, ShutdownPipelineReport,
42 ShutdownReconcileReport,
43};
44use crate::shutdown::stage::{ShutdownCause, ShutdownPhase, ShutdownPolicy};
45use crate::spec::child::{ChildSpec, RestartPolicy as ChildRestartPolicy};
46use crate::spec::child_declaration::{ChildDeclaration, validate_child_declaration};
47use crate::spec::supervisor::{RestartLimit, SupervisorSpec};
48use crate::tree::builder::SupervisorTree;
49use crate::tree::order::{restart_execution_plan, shutdown_order, startup_order};
50use std::collections::HashMap;
51use std::sync::{Arc, Mutex};
52use std::time::{Duration, SystemTime, UNIX_EPOCH};
53use tokio::sync::{broadcast, mpsc};
54use tokio::time::{Instant, timeout};
55
56#[derive(Debug)]
58struct PendingRuntimeEvent {
59 child_id: ChildId,
61 path: SupervisorPath,
63 generation: Option<Generation>,
65 attempt: Option<ChildStartCount>,
67 correlation_id: CorrelationId,
69 what: What,
71}
72
73#[derive(Debug)]
75pub struct RuntimeControlState {
76 shutdown: ShutdownCoordinator,
78 shutdown_pipeline: ShutdownPipeline,
80 slots: HashMap<ChildId, ChildSlot>,
82 #[allow(dead_code)]
84 admission_set: AdmissionSet,
85 time_base: RuntimeTimeBase,
87 event_sequences: EventSequenceSource,
89 observability: Arc<Mutex<ObservabilityPipeline>>,
91 supervision_pipeline: SupervisionPipeline,
93 concurrent_gate: crate::runtime::concurrent_gate::SupervisorInstanceGate,
95 fairness_probe: FairnessProbe,
97 manifests: Vec<String>,
99 registry: RegistryStore,
101 tree: SupervisorTree,
103 spec: SupervisorSpec,
105 policy_engine: PolicyEngine,
107 command_sender: mpsc::Sender<RuntimeLoopMessage>,
109}
110
111fn build_initial_slots(registry: &RegistryStore) -> HashMap<ChildId, ChildSlot> {
113 registry
114 .declaration_order()
115 .iter()
116 .filter_map(|child_id| {
117 registry.child(child_id).map(|runtime| {
118 let slot = ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone());
119 (child_id.clone(), slot)
120 })
121 })
122 .collect::<HashMap<_, _>>()
123}
124
125#[allow(dead_code)]
126impl RuntimeControlState {
127 pub fn new(
139 spec: SupervisorSpec,
140 shutdown_policy: ShutdownPolicy,
141 command_sender: mpsc::Sender<RuntimeLoopMessage>,
142 observability: Arc<Mutex<ObservabilityPipeline>>,
143 ) -> Result<Self, SupervisorError> {
144 let tree = SupervisorTree::build(&spec)?;
145 let mut registry = RegistryStore::new();
146 registry.register_tree(&tree)?;
147 let time_base = RuntimeTimeBase::new();
148 let slots = build_initial_slots(®istry);
149
150 let meltdown_tracker = MeltdownTracker::new(spec.meltdown_policy);
151 let failure_window = FailureWindow::new(spec.failure_window_config);
152 let supervision_pipeline = SupervisionPipeline::with_backpressure_config(
153 spec.pipeline_journal_capacity,
154 spec.pipeline_subscriber_capacity,
155 meltdown_tracker,
156 failure_window,
157 spec.restart_budget_config.clone(),
158 spec.group_dependencies.clone(),
159 spec.backpressure_config.clone(),
160 );
161
162 let concurrent_gate = crate::runtime::concurrent_gate::SupervisorInstanceGate::new(
163 spec.concurrent_restart_limit,
164 );
165
166 let now_unix_nanos = SystemTime::now()
168 .duration_since(UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_nanos();
171 let fairness_probe = FairnessProbe::new(now_unix_nanos);
172
173 Ok(Self {
174 shutdown: ShutdownCoordinator::new(shutdown_policy),
175 shutdown_pipeline: ShutdownPipeline::new(),
176 slots,
177 admission_set: AdmissionSet::new(),
178 time_base,
179 event_sequences: EventSequenceSource::new(),
180 observability,
181 supervision_pipeline,
182 concurrent_gate,
183 fairness_probe,
184 manifests: Vec::new(),
185 registry,
186 tree,
187 spec,
188 policy_engine: PolicyEngine::new(),
189 command_sender,
190 })
191 }
192
193 pub fn start_declared_children(&mut self) {
203 let child_ids = startup_order(&self.tree)
204 .into_iter()
205 .map(|node| node.child.id.clone())
206 .collect::<Vec<_>>();
207 for child_id in child_ids {
208 self.spawn_child_start(child_id, false, Duration::ZERO);
209 }
210 }
211
212 fn attach_spawned_child_handle(
229 &mut self,
230 child_id: ChildId,
231 path: SupervisorPath,
232 generation: Generation,
233 attempt: ChildStartCount,
234 handle: ChildRunHandle,
235 ) {
236 let mut completion_receiver = handle.completion_receiver.clone();
237 let sender = self.command_sender.clone();
238 self.slots
239 .entry(child_id.clone())
240 .or_insert_with(|| ChildSlot::new_placeholder(child_id.clone(), path))
241 .activate(generation, attempt, ChildAttemptStatus::Running, handle);
242 tokio::spawn(async move {
243 let result = wait_for_report(&mut completion_receiver).await;
244 send_child_result(sender, child_id, result).await;
245 });
246 }
247
248 pub async fn execute_control(
259 &mut self,
260 command: ControlCommand,
261 event_sender: &broadcast::Sender<String>,
262 ) -> Result<CommandResult, SupervisorError> {
263 command.validate_audit_metadata()?;
264 self.reconcile_stop_deadlines();
265 match command {
266 ControlCommand::AddChild { child_manifest, .. } => {
267 self.ensure_dynamic_child_allowed()?;
268
269 if self.shutdown.phase() != ShutdownPhase::Idle {
271 return Err(SupervisorError::fatal_config(
272 "Cannot add child: supervisor is shutting down",
273 ));
274 }
275
276 let declaration: ChildDeclaration =
278 serde_yaml::from_str(&child_manifest).map_err(|e| {
279 SupervisorError::fatal_config(format!(
280 "Failed to parse child manifest: {e}"
281 ))
282 })?;
283
284 let all_names: std::collections::HashSet<String> =
286 self.spec.children.iter().map(|c| c.name.clone()).collect();
287 let mut new_names = all_names.clone();
288 new_names.insert(declaration.name.clone());
289
290 validate_child_declaration(&declaration, &all_names).map_err(|e| {
291 SupervisorError::fatal_config(format!(
292 "Child validation failed at {}: {}",
293 e.field_path, e.reason
294 ))
295 })?;
296
297 let child_spec =
300 crate::spec::child::ChildSpec::try_from(declaration).map_err(|e| {
301 SupervisorError::fatal_config(format!("Child conversion failed: {e:?}"))
302 })?;
303
304 self.manifests.push(child_manifest.clone());
305 self.spec.children.push(child_spec);
306 Ok(CommandResult::ChildAdded { child_manifest })
307 }
308 ControlCommand::RemoveChild { meta, child_id } => Ok(self.execute_stop_child_control(
309 child_id,
310 ChildControlOperation::Removed,
311 "remove_child",
312 &meta,
313 event_sender,
314 )),
315 ControlCommand::RestartChild { meta, child_id } => {
316 Ok(self.execute_restart_child_control(child_id, &meta, event_sender))
317 }
318 ControlCommand::PauseChild { meta, child_id } => Ok(self.execute_stop_child_control(
319 child_id,
320 ChildControlOperation::Paused,
321 "pause_child",
322 &meta,
323 event_sender,
324 )),
325 ControlCommand::ResumeChild { child_id, .. } => {
326 Ok(self.set_child_state(child_id, ChildControlOperation::Active))
327 }
328 ControlCommand::QuarantineChild { meta, child_id } => Ok(self
329 .execute_stop_child_control(
330 child_id,
331 ChildControlOperation::Quarantined,
332 "quarantine_child",
333 &meta,
334 event_sender,
335 )),
336 ControlCommand::ShutdownTree { meta } => {
337 let result = self
338 .execute_shutdown(meta.requested_by, meta.reason, event_sender)
339 .await?;
340 Ok(CommandResult::Shutdown { result })
341 }
342 ControlCommand::CurrentState { .. } => {
343 self.reconcile_stop_deadlines();
344 Ok(CommandResult::CurrentState {
345 state: self.build_current_state(),
346 })
347 }
348 }
349 }
350
351 pub fn handle_child_exit(
362 &mut self,
363 report: ChildRunReport,
364 event_sender: &broadcast::Sender<String>,
365 ) {
366 if self.concurrent_gate.get_active_count() > 0 {
368 self.concurrent_gate.release();
369 }
370
371 let child_id = report.runtime.id.clone();
372 let generation = report.runtime.generation;
373 let attempt = report.runtime.child_start_count;
374 let exit_kind = report.exit.clone();
375 let mut pending_events = Vec::new();
376 let was_active = self
377 .slots
378 .get(&child_id)
379 .is_some_and(ChildSlot::has_active_attempt);
380 let matches_pending_fence = self
381 .slots
382 .get(&child_id)
383 .and_then(|state| state.generation_fence.pending_restart.as_ref())
384 .is_some_and(|pending_restart| {
385 pending_restart.old_generation == generation
386 && pending_restart.old_attempt == attempt
387 });
388 let matches_active_attempt = self.slots.get(&child_id).is_some_and(|state| {
389 state.has_active_attempt()
390 && state.generation == Some(generation)
391 && state.attempt == Some(attempt)
392 });
393 let manual_stop_requested = self
394 .slots
395 .get(&child_id)
396 .is_some_and(|state| state.stop_state == ChildStopState::CancelDelivered);
397 let mut stale_idle_report = false;
398 let count_restart_failure = self.slots.get(&child_id).is_some_and(|state| {
399 state.operation == ChildControlOperation::Active
400 && restart_limit_counts_exit(&exit_kind)
401 });
402 let late_report = !was_active && self.shutdown.phase() == ShutdownPhase::Completed;
403 let mut fence_pending_release = None::<PendingRestart>;
404
405 if let Some(runtime_state) = self.slots.get_mut(&child_id) {
406 if matches_pending_fence {
407 if runtime_state.stop_state == ChildStopState::CancelDelivered {
408 runtime_state.stop_state = ChildStopState::Completed;
409 pending_events.push(PendingRuntimeEvent {
410 child_id: child_id.clone(),
411 path: runtime_state.path.clone(),
412 generation: Some(generation),
413 attempt: Some(attempt),
414 correlation_id: CorrelationId::from_uuid(
415 runtime_state
416 .generation_fence
417 .pending_restart
418 .as_ref()
419 .expect("matches pending implies Some")
420 .command_id,
421 ),
422 what: What::ChildControlStopCompleted {
423 child_id: child_id.clone(),
424 generation,
425 attempt,
426 exit_kind: exit_kind.clone(),
427 },
428 });
429 }
430 fence_pending_release = runtime_state.generation_fence.pending_restart.take();
431 if let Some(pending_release) = fence_pending_release.as_ref() {
432 let drained_correlation_id =
433 CorrelationId::from_uuid(pending_release.command_id);
434 pending_events.push(PendingRuntimeEvent {
435 child_id: child_id.clone(),
436 path: runtime_state.path.clone(),
437 generation: Some(generation),
438 attempt: Some(attempt),
439 correlation_id: drained_correlation_id,
440 what: What::ChildRestartFencePendingDrained {
441 child_id: child_id.clone(),
442 },
443 });
444 }
445 runtime_state.generation_fence.phase = GenerationFencePhase::ReadyToStart;
446 runtime_state.status = ChildAttemptStatus::Stopped;
447 runtime_state.clear_instance();
448 } else if matches_active_attempt
449 || late_report
450 || self.shutdown.phase() != ShutdownPhase::Idle
451 {
452 if runtime_state.stop_state == ChildStopState::CancelDelivered {
453 runtime_state.stop_state = ChildStopState::Completed;
454 pending_events.push(PendingRuntimeEvent {
455 child_id: child_id.clone(),
456 path: runtime_state.path.clone(),
457 generation: Some(generation),
458 attempt: Some(attempt),
459 correlation_id: CorrelationId::new(),
460 what: What::ChildControlStopCompleted {
461 child_id: child_id.clone(),
462 generation,
463 attempt,
464 exit_kind: exit_kind.clone(),
465 },
466 });
467 }
468 runtime_state.status = ChildAttemptStatus::Stopped;
469 runtime_state.clear_instance();
470 } else {
471 stale_idle_report = true;
472 let observed_at_unix_nanos = self.time_base.now_unix_nanos();
473 let current_generation = runtime_state.generation;
474 let current_attempt = runtime_state.attempt;
475 let stale_fact = StaleAttemptReport::new(
476 child_id.clone(),
477 generation,
478 attempt,
479 current_generation,
480 current_attempt,
481 exit_kind.clone(),
482 StaleReportHandling::RecordedForAudit,
483 observed_at_unix_nanos,
484 );
485 runtime_state.generation_fence.last_stale_report = Some(stale_fact);
486 pending_events.push(PendingRuntimeEvent {
487 child_id: child_id.clone(),
488 path: runtime_state.path.clone(),
489 generation: Some(generation),
490 attempt: Some(attempt),
491 correlation_id: CorrelationId::new(),
492 what: What::ChildAttemptStaleReport {
493 child_id: child_id.clone(),
494 reported_generation: generation,
495 reported_attempt: attempt,
496 current_generation,
497 current_attempt,
498 exit_kind: exit_kind.clone(),
499 handled_as: StaleReportHandling::RecordedForAudit,
500 },
501 });
502 }
503 }
504
505 if stale_idle_report {
506 let _ignored = event_sender.send(format!("child_exit:{child_id}"));
507 for event in pending_events {
508 self.emit_pending_event(event);
509 }
510 self.reconcile_stop_deadlines();
511 return;
512 }
513
514 self.record_child_exit(report);
515 let restart_limit_refreshed =
516 self.refresh_restart_limit_for_child(&child_id, count_restart_failure);
517 if let Some((path, restart_limit)) = restart_limit_refreshed.clone() {
518 pending_events.push(PendingRuntimeEvent {
519 child_id: child_id.clone(),
520 path,
521 generation: Some(generation),
522 attempt: Some(attempt),
523 correlation_id: CorrelationId::new(),
524 what: What::ChildRuntimeRestartLimitUpdated {
525 child_id: child_id.clone(),
526 restart_limit,
527 },
528 });
529 }
530 let _ignored = event_sender.send(format!("child_exit:{child_id}"));
531 if late_report {
532 let _ignored = event_sender.send(format!("child_shutdown_late_report:{child_id}"));
533 }
534
535 let sequence = self.event_sequences.next().value;
537 let correlation_id_str = format!("{}", uuid::Uuid::new_v4());
539 let supervisor_path = self
540 .slots
541 .get(&child_id)
542 .map(|state| state.path.clone())
543 .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
544
545 let mut pipeline_ctx = PipelineContext::new(
546 child_id.clone(),
547 supervisor_path,
548 sequence,
549 correlation_id_str,
550 );
551 pipeline_ctx.exit_classification = Some(classify_exit_for_pipeline(
552 &exit_kind,
553 manual_stop_requested,
554 ));
555 pipeline_ctx.effective_policy = self
556 .registry
557 .child(&child_id)
558 .map(|runtime| prepare_effective_policy(&runtime.spec));
559
560 let policy_exit = policy_task_exit(&exit_kind);
562
563 let pipeline_result = self.supervision_pipeline.execute_pipeline(
565 pipeline_ctx,
566 policy_exit,
567 &self.spec,
568 &self.tree,
569 );
570 self.record_pipeline_stage_diagnostics(&pipeline_result.stage_diagnostics);
571
572 self.fairness_probe.record_opportunity(&child_id);
574
575 self.check_fairness_probe(event_sender);
577
578 if let Some(ref budget_eval) = pipeline_result.budget_evaluation
580 && matches!(
581 budget_eval.meltdown_outcome,
582 crate::policy::meltdown::MeltdownOutcome::GroupFuse
583 )
584 && let Some(ref group_id) = pipeline_result.group_id
585 {
586 let _ignored = event_sender.send(format!("group_fuse_active:{group_id}:{}", child_id));
587 for (_cid, slot) in self.slots.iter_mut() {
589 if slot.path.to_string().contains(group_id) {
590 slot.last_control_failure = Some(ChildControlFailure::new(
591 ChildControlFailurePhase::WaitCompletion,
592 format!("group_fuse_active:{group_id}"),
593 false,
594 ));
595 }
596 }
597 }
598
599 if let Some(pending) = fence_pending_release {
600 for event in pending_events {
601 self.emit_pending_event(event);
602 }
603 self.spawn_pending_restart_target(child_id.clone(), pending, exit_kind.clone());
604 self.reconcile_stop_deadlines();
605 return;
606 }
607
608 if !self.should_apply_automatic_policy(&child_id) {
609 if self
610 .slots
611 .get(&child_id)
612 .is_some_and(|state| state.operation == ChildControlOperation::Removed)
613 && let Some(removed) = self.slots.remove(&child_id)
614 {
615 pending_events.push(PendingRuntimeEvent {
616 child_id: child_id.clone(),
617 path: removed.path.clone(),
618 generation: Some(generation),
619 attempt: Some(attempt),
620 correlation_id: CorrelationId::new(),
621 what: What::ChildRuntimeStateRemoved {
622 child_id: child_id.clone(),
623 path: removed.path,
624 final_status: Some(ChildAttemptStatus::Stopped),
625 },
626 });
627 }
628 for event in pending_events {
629 self.emit_pending_event(event);
630 }
631 self.reconcile_stop_deadlines();
632 return;
633 }
634
635 let action_decision = pipeline_result.action_decision.as_ref();
637
638 let pipeline_driven_decision = if let Some(decision) = action_decision {
640 match decision.action {
641 ProtectionAction::RestartAllowed => {
642 if role_policy_restarts_success(&pipeline_result) {
643 Some(RestartDecision::RestartAfter {
644 delay: Duration::ZERO,
645 })
646 } else {
647 self.restart_decision(&child_id)
648 }
649 }
650 ProtectionAction::RestartQueued => {
651 None
653 }
654 ProtectionAction::RestartDenied
655 | ProtectionAction::SupervisionPaused
656 | ProtectionAction::Escalated
657 | ProtectionAction::SupervisedStop => {
658 None
660 }
661 }
662 } else {
663 self.restart_decision(&child_id)
665 };
666
667 let Some(decision) = pipeline_driven_decision else {
668 for event in pending_events {
669 self.emit_pending_event(event);
670 }
671 self.reconcile_stop_deadlines();
672 return;
673 };
674
675 if restart_limit_refreshed
676 .as_ref()
677 .is_some_and(|(_path, restart_limit)| {
678 restart_limit.used > restart_limit.limit
679 && matches!(decision, RestartDecision::RestartAfter { .. })
680 })
681 {
682 let _ignored = event_sender.send(format!("child_restart_limit_exhausted:{child_id}"));
683 for event in pending_events {
684 self.emit_pending_event(event);
685 }
686 self.reconcile_stop_deadlines();
687 return;
688 }
689 self.execute_restart_decision(child_id, decision, event_sender);
690 for event in pending_events {
691 self.emit_pending_event(event);
692 }
693 self.reconcile_stop_deadlines();
694 }
695
696 pub fn handle_child_start_failed(
708 &mut self,
709 child_id: ChildId,
710 message: String,
711 event_sender: &broadcast::Sender<String>,
712 ) {
713 let _ignored = event_sender.send(format!("child_start_failed:{child_id}:{message}"));
714
715 let mut fenced_spawn_recovery = Option::<(Generation, ChildStartCount, u64)>::None;
716 let mut repaired_fenced_spawn = false;
717
718 if let Some(runtime_state) = self.slots.get_mut(&child_id)
719 && runtime_state.generation_fence.phase == GenerationFencePhase::ReadyToStart
720 {
721 fenced_spawn_recovery = runtime_state
722 .registry_identity_anchor_for_spawn_attempt
723 .take();
724 repaired_fenced_spawn = true;
725 runtime_state.generation_fence.phase = GenerationFencePhase::Open;
726 runtime_state.last_control_failure = Some(ChildControlFailure::new(
727 ChildControlFailurePhase::WaitCompletion,
728 message,
729 true,
730 ));
731 }
732
733 if repaired_fenced_spawn {
734 if let Some((generation, attempt, restart_count)) = fenced_spawn_recovery
735 && let Some(registry_runtime) = self.registry.child_mut(&child_id)
736 {
737 registry_runtime.generation = generation;
738 registry_runtime.child_start_count = attempt;
739 registry_runtime.restart_count = restart_count;
740 }
741 return;
742 }
743
744 let _result = self.set_child_state(child_id, ChildControlOperation::Quarantined);
745 }
746
747 async fn execute_shutdown(
759 &mut self,
760 requested_by: String,
761 reason: String,
762 event_sender: &broadcast::Sender<String>,
763 ) -> Result<ShutdownResult, SupervisorError> {
764 if let Some(report) = self.shutdown_pipeline.cached_report() {
765 return Ok(self
766 .shutdown
767 .result_with_report(report.as_idempotent(), true));
768 }
769
770 let cause = ShutdownCause::new(requested_by, reason);
771 let requested = self.shutdown.request_stop(cause);
772 let started_at_unix_nanos = unix_epoch_nanos();
773 let wait_order = self.shutdown_wait_order();
774 let mut outcomes = HashMap::<ChildId, ChildShutdownOutcome>::new();
775 let _ignored = event_sender.send(format!(
776 "shutdown_phase_changed:{:?}:{:?}",
777 ShutdownPhase::Idle,
778 self.shutdown.phase()
779 ));
780 self.deliver_shutdown_cancellations(&wait_order, event_sender);
781
782 self.advance_shutdown_phase(event_sender);
783 self.drain_graceful_children(&wait_order, &mut outcomes, event_sender)
784 .await;
785
786 self.advance_shutdown_phase(event_sender);
787 self.abort_remaining_children(&wait_order, &mut outcomes, event_sender)
788 .await;
789
790 self.advance_shutdown_phase(event_sender);
791 self.reconcile_shutdown_outcomes(&wait_order, &mut outcomes);
792 let reconcile = ShutdownReconcileReport::core_runtime_completed();
793
794 let from = self.shutdown.phase();
795 self.shutdown.complete();
796 let _ignored = event_sender.send(format!(
797 "shutdown_phase_changed:{from:?}:{:?}",
798 self.shutdown.phase()
799 ));
800 let ordered_outcomes = wait_order
801 .iter()
802 .filter_map(|child_id| outcomes.remove(child_id))
803 .collect::<Vec<_>>();
804 let report = ShutdownPipelineReport {
805 cause: requested.cause,
806 started_at_unix_nanos,
807 completed_at_unix_nanos: unix_epoch_nanos(),
808 phase: self.shutdown.phase(),
809 outcomes: ordered_outcomes,
810 reconcile,
811 idempotent: false,
812 };
813 let _ignored = event_sender.send(format!("shutdown_completed:{}", report.outcomes.len()));
814 self.shutdown_pipeline.cache_report(report.clone());
815 Ok(self.shutdown.result_with_report(report, false))
816 }
817
818 fn advance_shutdown_phase(
828 &mut self,
829 event_sender: &broadcast::Sender<String>,
830 ) -> ShutdownPhase {
831 let from = self.shutdown.phase();
832 let to = self.shutdown.advance();
833 let _ignored = event_sender.send(format!("shutdown_phase_changed:{from:?}:{to:?}"));
834 to
835 }
836
837 fn shutdown_wait_order(&self) -> Vec<ChildId> {
847 shutdown_order(&self.tree)
848 .into_iter()
849 .map(|node| node.child.id.clone())
850 .collect()
851 }
852
853 fn deliver_shutdown_cancellations(
864 &mut self,
865 wait_order: &[ChildId],
866 event_sender: &broadcast::Sender<String>,
867 ) {
868 for child_id in wait_order {
869 let Some(runtime_state) = self.slots.get_mut(child_id) else {
870 continue;
871 };
872 if runtime_state.operation == ChildControlOperation::Removed {
873 continue;
874 }
875 if !runtime_state.has_active_attempt() {
876 continue;
877 };
878 runtime_state.cancel();
879 let _ignored = event_sender.send(format!(
880 "child_shutdown_cancel_delivered:{}:{}:{}",
881 runtime_state.child_id,
882 runtime_state
883 .generation
884 .map_or(0, |generation| generation.value),
885 runtime_state.attempt.map_or(0, |attempt| attempt.value)
886 ));
887 }
888 }
889
890 async fn drain_graceful_children(
902 &mut self,
903 wait_order: &[ChildId],
904 outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
905 event_sender: &broadcast::Sender<String>,
906 ) {
907 let deadline = Instant::now() + self.shutdown.policy.graceful_timeout;
908 for child_id in wait_order {
909 if outcomes.contains_key(child_id) {
910 continue;
911 }
912 let Some(mut runtime_state) = self.slots.remove(child_id) else {
913 continue;
914 };
915 if runtime_state.operation == ChildControlOperation::Removed {
916 outcomes.insert(
917 child_id.clone(),
918 removed_runtime_state_shutdown_outcome(
919 &runtime_state,
920 ShutdownPhase::GracefulDrain,
921 ),
922 );
923 self.slots.insert(child_id.clone(), runtime_state);
924 continue;
925 }
926 if !runtime_state.has_active_attempt() {
927 self.slots.insert(child_id.clone(), runtime_state);
928 continue;
929 };
930 let completed = match remaining_duration(deadline) {
931 Some(remaining) => timeout(remaining, runtime_state.wait_for_report())
932 .await
933 .ok(),
934 None => None,
935 };
936 match completed {
937 Some(Ok(report)) => {
938 let outcome = outcome_from_report(
939 &runtime_state,
940 &report,
941 ChildShutdownStatus::Graceful,
942 ShutdownPhase::GracefulDrain,
943 "child completed during graceful drain",
944 );
945 self.record_child_exit(report);
946 runtime_state.clear_instance();
947 self.slots.insert(child_id.clone(), runtime_state);
948 let _ignored = event_sender.send(format!("child_shutdown_graceful:{child_id}"));
949 outcomes.insert(child_id.clone(), outcome);
950 }
951 Some(Err(error)) => {
952 outcomes.insert(
953 child_id.clone(),
954 outcome_from_error(
955 &runtime_state,
956 ChildShutdownStatus::Graceful,
957 ShutdownPhase::GracefulDrain,
958 error,
959 ),
960 );
961 self.slots.insert(child_id.clone(), runtime_state);
962 }
963 None => {
964 self.slots.insert(child_id.clone(), runtime_state);
965 }
966 }
967 }
968 }
969
970 async fn abort_remaining_children(
982 &mut self,
983 wait_order: &[ChildId],
984 outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
985 event_sender: &broadcast::Sender<String>,
986 ) {
987 let policy = self.shutdown.policy;
988 for child_id in wait_order {
989 if outcomes.contains_key(child_id) {
990 continue;
991 }
992 let Some(mut runtime_state) = self.slots.remove(child_id) else {
993 continue;
994 };
995 if runtime_state.operation == ChildControlOperation::Removed {
996 outcomes.insert(
997 child_id.clone(),
998 removed_runtime_state_shutdown_outcome(
999 &runtime_state,
1000 ShutdownPhase::AbortStragglers,
1001 ),
1002 );
1003 self.slots.insert(child_id.clone(), runtime_state);
1004 continue;
1005 }
1006 if !runtime_state.has_active_attempt() {
1007 self.slots.insert(child_id.clone(), runtime_state);
1008 continue;
1009 };
1010 if !policy.abort_after_timeout {
1011 self.wait_for_late_report(
1012 child_id,
1013 runtime_state,
1014 policy.abort_wait,
1015 outcomes,
1016 event_sender,
1017 )
1018 .await;
1019 continue;
1020 }
1021 runtime_state.abort();
1022 let _ignored = event_sender.send(format!(
1023 "child_shutdown_abort_requested:{}",
1024 runtime_state.child_id
1025 ));
1026 match timeout(policy.abort_wait, runtime_state.wait_for_report()).await {
1027 Ok(Ok(report)) => {
1028 let outcome = outcome_from_report(
1029 &runtime_state,
1030 &report,
1031 ChildShutdownStatus::Aborted,
1032 ShutdownPhase::AbortStragglers,
1033 "child completed after abort request",
1034 );
1035 self.record_child_exit(report);
1036 runtime_state.clear_instance();
1037 self.slots.insert(child_id.clone(), runtime_state);
1038 let _ignored = event_sender.send(format!("child_shutdown_aborted:{child_id}"));
1039 outcomes.insert(child_id.clone(), outcome);
1040 }
1041 Ok(Err(error)) => {
1042 outcomes.insert(
1043 child_id.clone(),
1044 outcome_from_error(
1045 &runtime_state,
1046 ChildShutdownStatus::AbortFailed,
1047 ShutdownPhase::AbortStragglers,
1048 error,
1049 ),
1050 );
1051 self.slots.insert(child_id.clone(), runtime_state);
1052 }
1053 Err(_elapsed) => {
1054 outcomes.insert(
1055 child_id.clone(),
1056 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1057 child_id: runtime_state.child_id.clone(),
1058 path: runtime_state.path.clone(),
1059 generation: runtime_state
1060 .generation
1061 .unwrap_or_else(Generation::initial),
1062 child_start_count: runtime_state
1063 .attempt
1064 .unwrap_or_else(ChildStartCount::first),
1065 status: ChildShutdownStatus::AbortFailed,
1066 cancel_delivered: runtime_state.attempt_cancel_delivered,
1067 exit: None,
1068 phase: ShutdownPhase::AbortStragglers,
1069 reason: "child did not complete after abort request".to_owned(),
1070 }),
1071 );
1072 self.slots.insert(child_id.clone(), runtime_state);
1073 }
1074 }
1075 }
1076 }
1077
1078 async fn wait_for_late_report(
1092 &mut self,
1093 child_id: &ChildId,
1094 mut runtime_state: ChildSlot,
1095 wait: Duration,
1096 outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
1097 event_sender: &broadcast::Sender<String>,
1098 ) {
1099 match timeout(wait, runtime_state.wait_for_report()).await {
1100 Ok(Ok(report)) => {
1101 let outcome = outcome_from_report(
1102 &runtime_state,
1103 &report,
1104 ChildShutdownStatus::LateReport,
1105 ShutdownPhase::AbortStragglers,
1106 "child reported after graceful timeout",
1107 );
1108 self.record_child_exit(report);
1109 runtime_state.clear_instance();
1110 self.slots.insert(child_id.clone(), runtime_state);
1111 let _ignored = event_sender.send(format!("child_shutdown_late_report:{child_id}"));
1112 outcomes.insert(child_id.clone(), outcome);
1113 }
1114 Ok(Err(error)) => {
1115 outcomes.insert(
1116 child_id.clone(),
1117 outcome_from_error(
1118 &runtime_state,
1119 ChildShutdownStatus::LateReport,
1120 ShutdownPhase::AbortStragglers,
1121 error,
1122 ),
1123 );
1124 self.slots.insert(child_id.clone(), runtime_state);
1125 }
1126 Err(_elapsed) => {
1127 outcomes.insert(
1128 child_id.clone(),
1129 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1130 child_id: runtime_state.child_id.clone(),
1131 path: runtime_state.path.clone(),
1132 generation: runtime_state.generation.unwrap_or_else(Generation::initial),
1133 child_start_count: runtime_state
1134 .attempt
1135 .unwrap_or_else(ChildStartCount::first),
1136 status: ChildShutdownStatus::AbortFailed,
1137 cancel_delivered: runtime_state.attempt_cancel_delivered,
1138 exit: None,
1139 phase: ShutdownPhase::AbortStragglers,
1140 reason: "abort disabled and child did not report before reconcile"
1141 .to_owned(),
1142 }),
1143 );
1144 self.slots.insert(child_id.clone(), runtime_state);
1145 }
1146 }
1147 }
1148
1149 fn reconcile_shutdown_outcomes(
1160 &self,
1161 wait_order: &[ChildId],
1162 outcomes: &mut HashMap<ChildId, ChildShutdownOutcome>,
1163 ) {
1164 for child_id in wait_order {
1165 if outcomes.contains_key(child_id) {
1166 continue;
1167 }
1168 let Some(runtime) = self.registry.child(child_id) else {
1169 continue;
1170 };
1171 outcomes.insert(
1172 child_id.clone(),
1173 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
1174 child_id: runtime.id.clone(),
1175 path: runtime.path.clone(),
1176 generation: runtime.generation,
1177 child_start_count: runtime.child_start_count,
1178 status: ChildShutdownStatus::AlreadyExited,
1179 cancel_delivered: false,
1180 exit: runtime.last_exit.clone(),
1181 phase: ShutdownPhase::Reconcile,
1182 reason: "child had no active child_start_count during shutdown".to_owned(),
1183 }),
1184 );
1185 }
1186 }
1187
1188 fn set_child_state(
1199 &mut self,
1200 child_id: ChildId,
1201 operation: ChildControlOperation,
1202 ) -> CommandResult {
1203 if !self.slots.contains_key(&child_id) {
1204 let placeholder = self
1205 .registry
1206 .child(&child_id)
1207 .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1208 .unwrap_or_else(|| {
1209 ChildSlot::new_placeholder(
1210 child_id.clone(),
1211 crate::id::types::SupervisorPath::root().join(child_id.value.clone()),
1212 )
1213 });
1214 self.slots.insert(child_id.clone(), placeholder);
1215 }
1216 let runtime_state = self
1217 .slots
1218 .get_mut(&child_id)
1219 .expect("child runtime state should exist after insertion");
1220 let operation_before = runtime_state.operation;
1221 runtime_state.operation = operation;
1222 let outcome = ChildControlResult::new(
1223 child_id,
1224 runtime_state.attempt,
1225 runtime_state.generation,
1226 operation_before,
1227 runtime_state.operation,
1228 Some(runtime_state.status),
1229 false,
1230 if runtime_state.has_active_attempt() {
1231 runtime_state.stop_state
1232 } else {
1233 ChildStopState::NoActiveAttempt
1234 },
1235 runtime_state.restart_limit.clone(),
1236 runtime_state.observe_liveness(self.time_base.now_unix_nanos()),
1237 operation_before == operation,
1238 runtime_state.last_control_failure.clone(),
1239 None,
1240 );
1241 CommandResult::ChildControl { outcome }
1242 }
1243
1244 fn execute_stop_child_control(
1258 &mut self,
1259 child_id: ChildId,
1260 target_operation: ChildControlOperation,
1261 command_name: &'static str,
1262 meta: &CommandMeta,
1263 event_sender: &broadcast::Sender<String>,
1264 ) -> CommandResult {
1265 if !self.slots.contains_key(&child_id) {
1266 let placeholder = self
1267 .registry
1268 .child(&child_id)
1269 .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1270 .unwrap_or_else(|| {
1271 ChildSlot::new_placeholder(
1272 child_id.clone(),
1273 crate::id::types::SupervisorPath::root().join(child_id.value.clone()),
1274 )
1275 });
1276 self.slots.insert(child_id.clone(), placeholder);
1277 }
1278
1279 let remove_after_outcome;
1280 let correlation_id = CorrelationId::from_uuid(meta.command_id.value);
1281 let mut pending_events = Vec::new();
1282 let outcome = {
1283 let runtime_state = self
1284 .slots
1285 .get_mut(&child_id)
1286 .expect("child runtime state should exist after insertion");
1287 let stop = apply_stop_control_to_runtime_state(
1288 runtime_state,
1289 target_operation,
1290 command_name,
1291 &meta.command_id.value.to_string(),
1292 correlation_id,
1293 self.time_base
1294 .now_unix_nanos()
1295 .saturating_add(self.shutdown.policy.graceful_timeout.as_nanos()),
1296 event_sender,
1297 &mut pending_events,
1298 );
1299 remove_after_outcome = stop.remove_after_outcome;
1300 build_child_control_outcome(
1301 stop.operation_before,
1302 stop.cancel_delivered,
1303 stop.idempotent,
1304 runtime_state.last_control_failure.clone(),
1305 runtime_state,
1306 &self.time_base,
1307 None,
1308 )
1309 };
1310
1311 for event in pending_events {
1312 self.emit_pending_event(event);
1313 }
1314
1315 let outcome_path = self
1316 .slots
1317 .get(&child_id)
1318 .map(|state| state.path.clone())
1319 .or_else(|| {
1320 self.registry
1321 .child(&child_id)
1322 .map(|runtime| runtime.path.clone())
1323 })
1324 .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
1325 self.emit_pending_event(PendingRuntimeEvent {
1326 child_id: child_id.clone(),
1327 path: outcome_path,
1328 generation: outcome.generation,
1329 attempt: outcome.attempt,
1330 correlation_id,
1331 what: What::ChildControlCommandCompleted {
1332 child_id: child_id.clone(),
1333 command: command_name.to_owned(),
1334 command_id: meta.command_id.value.to_string(),
1335 requested_by: meta.requested_by.clone(),
1336 reason: meta.reason.clone(),
1337 result: child_control_result_label(&outcome).to_owned(),
1338 outcome: Box::new(outcome.clone()),
1339 },
1340 });
1341
1342 if remove_after_outcome {
1343 if let Some(removed) = self.slots.remove(&child_id) {
1344 self.emit_pending_event(PendingRuntimeEvent {
1345 child_id: child_id.clone(),
1346 path: removed.path.clone(),
1347 generation: removed.generation,
1348 attempt: removed.attempt,
1349 correlation_id,
1350 what: What::ChildRuntimeStateRemoved {
1351 child_id: child_id.clone(),
1352 path: removed.path,
1353 final_status: None,
1354 },
1355 });
1356 }
1357 let _ignored = event_sender.send(format!("child_runtime_state_removed:{child_id}"));
1358 }
1359
1360 CommandResult::ChildControl { outcome }
1361 }
1362
1363 fn record_child_exit(&mut self, report: ChildRunReport) {
1373 let child_id = report.runtime.id.clone();
1374 if let Some(runtime) = self.registry.child_mut(&child_id) {
1375 runtime.last_exit = Some(report.exit);
1376 runtime.status = ChildRuntimeStatus::Exited;
1377 runtime.generation = report.runtime.generation;
1378 runtime.child_start_count = report.runtime.child_start_count;
1379 runtime.restart_count = report.runtime.restart_count;
1380 }
1381 }
1382
1383 fn should_apply_automatic_policy(&self, child_id: &ChildId) -> bool {
1393 if self.shutdown.phase() != crate::shutdown::stage::ShutdownPhase::Idle {
1394 return false;
1395 }
1396 !matches!(
1397 self.slots.get(child_id).map(|state| state.operation),
1398 Some(
1399 ChildControlOperation::Paused
1400 | ChildControlOperation::Quarantined
1401 | ChildControlOperation::Removed
1402 )
1403 )
1404 }
1405
1406 fn restart_decision(&self, child_id: &ChildId) -> Option<RestartDecision> {
1416 let runtime = self.registry.child(child_id)?;
1417 let exit = runtime.last_exit.as_ref()?;
1418 let policy_exit = policy_task_exit(exit);
1419 let restart_policy = restart_policy(runtime.spec.restart_policy);
1420 let backoff = backoff_policy(runtime.spec.backoff_policy);
1421 Some(self.policy_engine.decide(
1422 restart_policy,
1423 policy_exit,
1424 runtime.child_start_count.value,
1425 &backoff,
1426 ))
1427 }
1428
1429 fn refresh_restart_limit_for_child(
1440 &mut self,
1441 child_id: &ChildId,
1442 count_failure: bool,
1443 ) -> Option<(SupervisorPath, crate::control::outcome::RestartLimitState)> {
1444 let restart_limit = restart_limit_for_child_in_spec(&self.tree, &self.spec, child_id);
1445 let runtime_state = self.slots.get_mut(child_id)?;
1446 let updated = runtime_state.refresh_restart_limit(
1447 restart_limit.window,
1448 restart_limit.max_restarts,
1449 count_failure,
1450 &self.time_base,
1451 );
1452 Some((runtime_state.path.clone(), updated))
1453 }
1454
1455 fn execute_restart_decision(
1467 &mut self,
1468 failed_child: ChildId,
1469 decision: RestartDecision,
1470 event_sender: &broadcast::Sender<String>,
1471 ) {
1472 match decision {
1473 RestartDecision::RestartAfter { delay } => {
1474 self.restart_strategy_scope(failed_child, delay, event_sender);
1475 }
1476 RestartDecision::Quarantine => {
1477 let _result =
1478 self.set_child_state(failed_child, ChildControlOperation::Quarantined);
1479 }
1480 RestartDecision::ShutdownTree => {
1481 let cause = ShutdownCause::new("runtime", "policy requested tree shutdown");
1482 let _result = self.shutdown.request_stop(cause);
1483 }
1484 RestartDecision::EscalateToParent | RestartDecision::DoNotRestart => {}
1485 }
1486 }
1487
1488 fn restart_strategy_scope(
1500 &mut self,
1501 failed_child: ChildId,
1502 delay: Duration,
1503 event_sender: &broadcast::Sender<String>,
1504 ) {
1505 let plan = restart_execution_plan(&self.tree, &self.spec, &failed_child);
1506 let scope_label = child_scope_label(&plan.scope);
1507 let group_label = plan.group.as_deref().unwrap_or("supervisor");
1508
1509 if !self.concurrent_gate.try_acquire() {
1511 let _ignored = event_sender.send(format!(
1513 "restart_throttled:concurrent_gate_saturated:{group_label}:{scope_label}"
1514 ));
1515 self.emit_throttle_gate_event(
1516 &failed_child,
1517 plan.group.as_deref(),
1518 ThrottleGateOwner::SupervisorInstance,
1519 );
1520 return;
1521 }
1522
1523 let _ignored = event_sender.send(format!(
1524 "restart_plan:{:?}:{group_label}:{scope_label}",
1525 plan.strategy
1526 ));
1527 for child_id in plan.scope {
1528 self.spawn_child_start(child_id, true, delay);
1529 }
1530 self.concurrent_gate.release();
1531 }
1532
1533 fn emit_throttle_gate_event(
1545 &mut self,
1546 child_id: &ChildId,
1547 group_id: Option<&str>,
1548 owner: ThrottleGateOwner,
1549 ) {
1550 let now = Instant::now();
1551 let uptime = now
1552 .duration_since(self.time_base.base_instant)
1553 .as_millis()
1554 .min(u128::from(u64::MAX)) as u64;
1555 let monotonic_nanos = now.duration_since(self.time_base.base_instant).as_nanos();
1556 let path = self
1557 .slots
1558 .get(child_id)
1559 .map(|state| state.path.clone())
1560 .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
1561 let child_name = self
1562 .registry
1563 .child(child_id)
1564 .map(|runtime| runtime.spec.name.clone())
1565 .unwrap_or_else(|| child_id.to_string());
1566 let mut event = SupervisorEvent::new(
1567 When::new(EventTime::from_parts(
1568 monotonic_nanos,
1569 uptime,
1570 Generation::initial(),
1571 ChildStartCount::first(),
1572 )),
1573 Where::new(path).with_child(child_id.clone(), child_name),
1574 What::ChildFailed {
1575 failure: crate::error::types::TaskFailure::new(
1576 crate::error::types::TaskFailureKind::Error,
1577 "restart_throttled",
1578 format!(
1579 "restart denied by throttle gate {} for group {}",
1580 owner,
1581 group_id.unwrap_or("supervisor")
1582 ),
1583 ),
1584 },
1585 self.event_sequences.next(),
1586 CorrelationId::new(),
1587 1,
1588 );
1589 event.effective_protective_action = Some(ProtectionAction::RestartDenied);
1590 event.throttle_gate_owner = owner;
1591 if let Some(runtime) = self.registry.child(child_id) {
1592 let effective_policy = prepare_effective_policy(&runtime.spec);
1593 event.task_role = Some(effective_policy.task_role);
1594 event.used_fallback_default = effective_policy.used_fallback;
1595 event.effective_policy_source = Some(effective_policy.source);
1596 }
1597 if let Ok(mut observability) = self.observability.lock() {
1598 let _lagged = observability.emit(event);
1599 }
1600 }
1601
1602 fn record_pipeline_stage_diagnostics(&self, diagnostics: &[PipelineStageDiagnostic]) {
1612 if let Ok(mut observability) = self.observability.lock() {
1613 observability.record_pipeline_stage_diagnostics(diagnostics);
1614 }
1615 }
1616
1617 fn check_fairness_probe(&mut self, event_sender: &broadcast::Sender<String>) {
1627 let now_unix_nanos = self.time_base.now_unix_nanos();
1628 let all_child_ids: Vec<ChildId> = self.slots.keys().cloned().collect();
1629 if let Some(alert) = self.fairness_probe.check(now_unix_nanos, &all_child_ids) {
1630 let path = self
1632 .slots
1633 .get(&alert.starved_child_id)
1634 .map(|slot| slot.path.clone())
1635 .unwrap_or_else(|| {
1636 SupervisorPath::root().join(alert.starved_child_id.value.clone())
1637 });
1638 let generation = self
1639 .slots
1640 .get(&alert.starved_child_id)
1641 .and_then(|slot| slot.generation);
1642 let attempt = self
1643 .slots
1644 .get(&alert.starved_child_id)
1645 .and_then(|slot| slot.attempt);
1646 let pending = PendingRuntimeEvent {
1647 child_id: alert.starved_child_id.clone(),
1648 path,
1649 generation,
1650 attempt,
1651 correlation_id: CorrelationId::new(),
1652 what: What::FairnessProbeStarvation {
1653 starved_child_id: alert.starved_child_id.clone(),
1654 skip_count: alert.skip_count,
1655 probe_start_unix_nanos: alert.probe_start_unix_nanos,
1656 probe_end_unix_nanos: alert.probe_end_unix_nanos,
1657 },
1658 };
1659 self.emit_pending_event(pending);
1660
1661 let _ignored = event_sender.send(format!(
1663 "fairness_starvation:{}:skip_count={}:window_start={}:window_end={}",
1664 alert.starved_child_id,
1665 alert.skip_count,
1666 alert.probe_start_unix_nanos,
1667 alert.probe_end_unix_nanos,
1668 ));
1669 }
1670 }
1671
1672 fn ensure_dynamic_child_allowed(&self) -> Result<(), SupervisorError> {
1682 let current_child_count = self.dynamic_child_count();
1683 if self
1684 .spec
1685 .dynamic_supervisor_policy
1686 .allows_addition(current_child_count)
1687 {
1688 return Ok(());
1689 }
1690 Err(SupervisorError::InvalidTransition {
1691 message: "dynamic supervisor child limit reached".to_owned(),
1692 })
1693 }
1694
1695 fn dynamic_child_count(&self) -> usize {
1705 self.registry
1706 .declaration_order()
1707 .len()
1708 .saturating_add(self.manifests.len())
1709 }
1710
1711 fn execute_restart_child_control(
1723 &mut self,
1724 child_id: ChildId,
1725 meta: &CommandMeta,
1726 event_sender: &broadcast::Sender<String>,
1727 ) -> CommandResult {
1728 let correlation_id = CorrelationId::from_uuid(meta.command_id.value);
1729
1730 if self.registry.child(&child_id).is_none() {
1731 let outcome = restart_child_unknown_outcome(child_id.clone());
1732 self.emit_restart_child_completed(
1733 outcome.clone(),
1734 meta,
1735 correlation_id,
1736 event_sender,
1737 Vec::new(),
1738 );
1739 return CommandResult::ChildControl { outcome };
1740 }
1741
1742 if self.shutdown.phase() != ShutdownPhase::Idle {
1743 return self.restart_child_blocked_by_shutdown(
1744 &child_id,
1745 meta,
1746 correlation_id,
1747 event_sender,
1748 );
1749 }
1750
1751 if !self.slots.contains_key(&child_id) {
1752 let placeholder = self
1753 .registry
1754 .child(&child_id)
1755 .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
1756 .unwrap_or_else(|| {
1757 ChildSlot::new_placeholder(
1758 child_id.clone(),
1759 SupervisorPath::root().join(child_id.value.clone()),
1760 )
1761 });
1762 self.slots.insert(child_id.clone(), placeholder);
1763 }
1764
1765 let mut pending_events = Vec::new();
1766
1767 enum RestartPrep {
1769 Completed(Box<ChildControlResult>),
1771 DeferredImmediate {
1773 operation_before: ChildControlOperation,
1775 },
1776 }
1777
1778 let restart_prep = {
1779 let runtime_state = self
1780 .slots
1781 .get_mut(&child_id)
1782 .expect("runtime state exists after insertion");
1783 if runtime_state.generation_fence.pending_restart.is_some() {
1784 let pending = runtime_state
1785 .generation_fence
1786 .pending_restart
1787 .as_mut()
1788 .expect("checked pending restart");
1789 pending.duplicate_request_count = pending.duplicate_request_count.saturating_add(1);
1790 let pending_for_conflict = pending.clone();
1791 pending_events.push(PendingRuntimeEvent {
1792 child_id: child_id.clone(),
1793 path: runtime_state.path.clone(),
1794 generation: Some(pending_for_conflict.old_generation),
1795 attempt: Some(pending_for_conflict.old_attempt),
1796 correlation_id: CorrelationId::from_uuid(meta.command_id.value),
1797 what: What::ChildRestartConflict {
1798 child_id: child_id.clone(),
1799 current_generation: Some(pending_for_conflict.old_generation),
1800 current_attempt: Some(pending_for_conflict.old_attempt),
1801 target_generation: Some(pending_for_conflict.target_generation),
1802 command_id: meta.command_id.value.to_string(),
1803 decision: "already_pending".to_owned(),
1804 reason: "duplicate restart merged into pending restart".to_owned(),
1805 },
1806 });
1807 let fence = GenerationFenceOutcome::new(
1808 GenerationFenceDecision::AlreadyPending,
1809 Some(pending_for_conflict.old_generation),
1810 Some(pending_for_conflict.old_attempt),
1811 Some(pending_for_conflict.target_generation),
1812 false,
1813 pending_for_conflict.abort_requested,
1814 None,
1815 );
1816 let operation_before = runtime_state.operation;
1817 RestartPrep::Completed(Box::new(build_child_control_outcome(
1818 operation_before,
1819 false,
1820 false,
1821 runtime_state.last_control_failure.clone(),
1822 runtime_state,
1823 &self.time_base,
1824 Some(fence),
1825 )))
1826 } else if !runtime_state.has_active_attempt() {
1827 RestartPrep::DeferredImmediate {
1828 operation_before: runtime_state.operation,
1829 }
1830 } else {
1831 let old_generation = runtime_state
1832 .generation
1833 .expect("active attempt owns a generation");
1834 let old_attempt = runtime_state
1835 .attempt
1836 .expect("active attempt owns an attempt counter");
1837 let cancel_delivered = runtime_state.cancel();
1838 let deadline = self
1839 .time_base
1840 .now_unix_nanos()
1841 .saturating_add(self.shutdown.policy.graceful_timeout.as_nanos());
1842 runtime_state.stop_deadline_at_unix_nanos = Some(deadline);
1843 if cancel_delivered {
1844 runtime_state.stop_state = ChildStopState::CancelDelivered;
1845 }
1846 let target_generation = old_generation.next();
1847 let requested_at = self.time_base.now_unix_nanos();
1848 let pending = PendingRestart::new(
1849 meta.command_id.value,
1850 meta.requested_by.clone(),
1851 meta.reason.clone(),
1852 old_generation,
1853 old_attempt,
1854 target_generation,
1855 requested_at,
1856 deadline,
1857 false,
1858 0,
1859 );
1860 runtime_state.generation_fence.pending_restart = Some(pending.clone());
1861 runtime_state.generation_fence.phase = GenerationFencePhase::WaitingForOldStop;
1862
1863 if cancel_delivered {
1864 pending_events.push(PendingRuntimeEvent {
1865 child_id: child_id.clone(),
1866 path: runtime_state.path.clone(),
1867 generation: Some(old_generation),
1868 attempt: Some(old_attempt),
1869 correlation_id,
1870 what: What::ChildControlCancelDelivered {
1871 child_id: child_id.clone(),
1872 generation: old_generation,
1873 attempt: old_attempt,
1874 command: "restart_child".to_owned(),
1875 command_id: meta.command_id.value.to_string(),
1876 },
1877 });
1878 let _ignored = event_sender.send(format!(
1879 "child_control_cancel_delivered:{child_id}:restart_child"
1880 ));
1881 }
1882
1883 pending_events.push(PendingRuntimeEvent {
1884 child_id: child_id.clone(),
1885 path: runtime_state.path.clone(),
1886 generation: Some(old_generation),
1887 attempt: Some(old_attempt),
1888 correlation_id,
1889 what: What::ChildRestartFenceEntered {
1890 child_id: child_id.clone(),
1891 old_generation,
1892 old_attempt,
1893 target_generation,
1894 command_id: meta.command_id.value.to_string(),
1895 requested_by: meta.requested_by.clone(),
1896 reason: meta.reason.clone(),
1897 stop_deadline_at_unix_nanos: deadline,
1898 },
1899 });
1900
1901 let operation_before = runtime_state.operation;
1902 let fence = GenerationFenceOutcome::new(
1903 GenerationFenceDecision::QueuedAfterStop,
1904 Some(old_generation),
1905 Some(old_attempt),
1906 Some(target_generation),
1907 cancel_delivered,
1908 false,
1909 None,
1910 );
1911 RestartPrep::Completed(Box::new(build_child_control_outcome(
1912 operation_before,
1913 cancel_delivered,
1914 false,
1915 None,
1916 runtime_state,
1917 &self.time_base,
1918 Some(fence),
1919 )))
1920 }
1921 };
1922
1923 let outcome = match restart_prep {
1924 RestartPrep::Completed(outcome) => *outcome,
1925 RestartPrep::DeferredImmediate { operation_before } => {
1926 self.spawn_child_start(child_id.clone(), true, Duration::ZERO);
1927 let runtime_state = self.slots.get_mut(&child_id).expect("runtime state exists");
1928 let target_generation = self
1929 .registry
1930 .child(&child_id)
1931 .map(|runtime| runtime.generation);
1932 let fence = GenerationFenceOutcome::new(
1933 GenerationFenceDecision::StartedImmediately,
1934 None,
1935 None,
1936 target_generation,
1937 false,
1938 false,
1939 None,
1940 );
1941 build_child_control_outcome(
1942 operation_before,
1943 false,
1944 false,
1945 runtime_state.last_control_failure.clone(),
1946 runtime_state,
1947 &self.time_base,
1948 Some(fence),
1949 )
1950 }
1951 };
1952
1953 self.emit_restart_child_completed(
1954 outcome.clone(),
1955 meta,
1956 correlation_id,
1957 event_sender,
1958 pending_events,
1959 );
1960
1961 CommandResult::ChildControl { outcome }
1962 }
1963
1964 fn emit_restart_child_completed(
1978 &mut self,
1979 outcome: ChildControlResult,
1980 meta: &CommandMeta,
1981 correlation_id: CorrelationId,
1982 event_sender: &broadcast::Sender<String>,
1983 mut pending_events: Vec<PendingRuntimeEvent>,
1984 ) {
1985 for event in pending_events.drain(..) {
1986 self.emit_pending_event(event);
1987 }
1988 let outcome_identifier = outcome.child_id.clone();
1989 let outcome_path = self
1990 .slots
1991 .get(&outcome.child_id)
1992 .map(|state| state.path.clone())
1993 .or_else(|| {
1994 self.registry
1995 .child(&outcome.child_id)
1996 .map(|runtime| runtime.path.clone())
1997 })
1998 .unwrap_or_else(|| SupervisorPath::root().join(outcome.child_id.value.clone()));
1999 self.emit_pending_event(PendingRuntimeEvent {
2000 child_id: outcome.child_id.clone(),
2001 path: outcome_path,
2002 generation: outcome.generation,
2003 attempt: outcome.attempt,
2004 correlation_id,
2005 what: What::ChildControlCommandCompleted {
2006 child_id: outcome.child_id.clone(),
2007 command: "restart_child".to_owned(),
2008 command_id: meta.command_id.value.to_string(),
2009 requested_by: meta.requested_by.clone(),
2010 reason: meta.reason.clone(),
2011 result: child_control_result_label(&outcome).to_owned(),
2012 outcome: Box::new(outcome),
2013 },
2014 });
2015 let _ignored = event_sender.send(format!(
2016 "child_control_command_completed:{}:restart_child",
2017 outcome_identifier
2018 ));
2019 }
2020
2021 fn restart_child_blocked_by_shutdown(
2034 &mut self,
2035 child_id: &ChildId,
2036 meta: &CommandMeta,
2037 correlation_id: CorrelationId,
2038 event_sender: &broadcast::Sender<String>,
2039 ) -> CommandResult {
2040 if !self.slots.contains_key(child_id) {
2041 let placeholder = self
2042 .registry
2043 .child(child_id)
2044 .map(|runtime| ChildSlot::new_placeholder(runtime.id.clone(), runtime.path.clone()))
2045 .unwrap_or_else(|| {
2046 ChildSlot::new_placeholder(
2047 child_id.clone(),
2048 SupervisorPath::root().join(child_id.value.clone()),
2049 )
2050 });
2051 self.slots.insert(child_id.clone(), placeholder);
2052 }
2053
2054 let outcome = {
2055 let runtime_state = self.slots.get_mut(child_id).expect("runtime state exists");
2056 runtime_state.generation_fence.phase = GenerationFencePhase::Closed;
2057 let failure = ChildControlFailure::new(
2058 ChildControlFailurePhase::WaitCompletion,
2059 "supervisor tree is shutting down",
2060 false,
2061 );
2062 let fence = GenerationFenceOutcome::new(
2063 GenerationFenceDecision::BlockedByShutdown,
2064 runtime_state.generation,
2065 runtime_state.attempt,
2066 None,
2067 false,
2068 false,
2069 Some(failure.clone()),
2070 );
2071 let operation_before = runtime_state.operation;
2072 runtime_state.last_control_failure = Some(failure);
2073 build_child_control_outcome(
2074 operation_before,
2075 false,
2076 false,
2077 runtime_state.last_control_failure.clone(),
2078 runtime_state,
2079 &self.time_base,
2080 Some(fence),
2081 )
2082 };
2083
2084 let blocked_events = match self.slots.get(child_id).map(|runtime_state| {
2085 (
2086 runtime_state.path.clone(),
2087 runtime_state.generation,
2088 runtime_state.attempt,
2089 )
2090 }) {
2091 Some((path, current_generation, current_attempt)) => {
2092 vec![PendingRuntimeEvent {
2093 child_id: child_id.clone(),
2094 path,
2095 generation: current_generation,
2096 attempt: current_attempt,
2097 correlation_id,
2098 what: What::ChildRestartConflict {
2099 child_id: child_id.clone(),
2100 current_generation,
2101 current_attempt,
2102 target_generation: None,
2103 command_id: meta.command_id.value.to_string(),
2104 decision: "rejected".to_owned(),
2105 reason: "restart rejected while supervisor tree is shutting down"
2106 .to_owned(),
2107 },
2108 }]
2109 }
2110 None => Vec::new(),
2111 };
2112
2113 self.emit_restart_child_completed(
2114 outcome.clone(),
2115 meta,
2116 correlation_id,
2117 event_sender,
2118 blocked_events,
2119 );
2120
2121 CommandResult::ChildControl { outcome }
2122 }
2123
2124 fn build_current_state(&mut self) -> CurrentState {
2134 let mut child_runtime_records = Vec::new();
2135 let mut pending_events = Vec::new();
2136 let declaration_order = self.registry.declaration_order().to_vec();
2137 for child_id in declaration_order {
2138 if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2139 let liveness = runtime_state.observe_liveness(self.time_base.now_unix_nanos());
2140 if let Some(event) = heartbeat_stale_event(runtime_state, &liveness) {
2141 pending_events.push(event);
2142 }
2143 child_runtime_records.push(runtime_state.to_record(liveness));
2144 }
2145 }
2146 for (child_id, runtime_state) in &mut self.slots {
2147 if self.registry.child(child_id).is_some() {
2148 continue;
2149 }
2150 let liveness = runtime_state.observe_liveness(self.time_base.now_unix_nanos());
2151 if let Some(event) = heartbeat_stale_event(runtime_state, &liveness) {
2152 pending_events.push(event);
2153 }
2154 child_runtime_records.push(runtime_state.to_record(liveness));
2155 }
2156 for event in pending_events {
2157 self.emit_pending_event(event);
2158 }
2159 CurrentState {
2160 child_count: self.dynamic_child_count(),
2161 shutdown_completed: self.shutdown.phase()
2162 == crate::shutdown::stage::ShutdownPhase::Completed,
2163 child_runtime_records,
2164 }
2165 }
2166
2167 fn spawn_pending_restart_target(
2179 &mut self,
2180 child_id: ChildId,
2181 pending: PendingRestart,
2182 old_exit: TaskExit,
2183 ) {
2184 let Some(registry_identity_anchor) = self.registry.child(&child_id).map(|runtime| {
2185 (
2186 runtime.generation,
2187 runtime.child_start_count,
2188 runtime.restart_count,
2189 )
2190 }) else {
2191 return;
2192 };
2193 let path = self
2194 .slots
2195 .get(&child_id)
2196 .map(|state| state.path.clone())
2197 .unwrap_or_else(|| SupervisorPath::root().join(child_id.value.clone()));
2198 let correlation_id = CorrelationId::from_uuid(pending.command_id);
2199
2200 if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2201 runtime_state.registry_identity_anchor_for_spawn_attempt =
2202 Some(registry_identity_anchor);
2203 }
2204
2205 {
2206 let Some(registry_runtime) = self.registry.child_mut(&child_id) else {
2207 return;
2208 };
2209 registry_runtime.generation = pending.target_generation;
2210 registry_runtime.child_start_count = registry_runtime.child_start_count.next();
2211 registry_runtime.restart_count = registry_runtime.restart_count.saturating_add(1);
2212 registry_runtime.status = ChildRuntimeStatus::Starting;
2213 }
2214
2215 let Some(runtime) = self.registry.child(&child_id).cloned() else {
2216 return;
2217 };
2218 let new_generation = runtime.generation;
2219 let new_attempt = runtime.child_start_count;
2220
2221 let path_for_handles = path.clone();
2222
2223 match ChildRunner::new().spawn_once(runtime) {
2224 Ok(handle) => {
2225 let sender = self.command_sender.clone();
2226 self.emit_pending_event(PendingRuntimeEvent {
2227 child_id: child_id.clone(),
2228 path,
2229 generation: Some(new_generation),
2230 attempt: Some(new_attempt),
2231 correlation_id,
2232 what: What::ChildRestartFenceReleased {
2233 child_id: child_id.clone(),
2234 old_generation: pending.old_generation,
2235 old_attempt: pending.old_attempt,
2236 target_generation: pending.target_generation,
2237 exit_kind: old_exit.clone(),
2238 },
2239 });
2240 let mut completion_receiver = handle.completion_receiver.clone();
2241 self.slots
2242 .entry(child_id.clone())
2243 .or_insert_with(|| {
2244 ChildSlot::new_placeholder(child_id.clone(), path_for_handles)
2245 })
2246 .activate(
2247 new_generation,
2248 new_attempt,
2249 ChildAttemptStatus::Running,
2250 handle,
2251 );
2252 tokio::spawn(async move {
2253 let result = wait_for_report(&mut completion_receiver).await;
2254 send_child_result(sender, child_id, result).await;
2255 });
2256 }
2257 Err(error) => {
2258 let message = error.to_string();
2259 if let Some(runtime_state) = self.slots.get_mut(&child_id) {
2260 let identity_anchor_triple_opt = runtime_state
2261 .registry_identity_anchor_for_spawn_attempt
2262 .take();
2263 if let Some((generation, attempt, restart_count)) = identity_anchor_triple_opt {
2264 if let Some(registry_runtime) = self.registry.child_mut(&child_id) {
2265 registry_runtime.generation = generation;
2266 registry_runtime.child_start_count = attempt;
2267 registry_runtime.restart_count = restart_count;
2268 }
2269 runtime_state.generation = Some(generation);
2271 runtime_state.attempt = Some(attempt);
2272 runtime_state.status = ChildAttemptStatus::Stopped;
2273 }
2274 runtime_state.generation_fence.phase = GenerationFencePhase::Open;
2275 runtime_state.last_control_failure = Some(ChildControlFailure::new(
2276 ChildControlFailurePhase::WaitCompletion,
2277 message,
2278 true,
2279 ));
2280 }
2281 }
2283 }
2284 }
2285
2286 fn reconcile_stop_deadlines(&mut self) {
2296 let now = self.time_base.now_unix_nanos();
2297 let mut pending_events = Vec::new();
2298 for runtime_state in self.slots.values_mut() {
2299 let fence_escalation = if let Some(pending_restart) =
2300 runtime_state.generation_fence.pending_restart.as_ref()
2301 {
2302 if pending_restart.abort_requested {
2303 None
2304 } else if runtime_state.generation_fence.phase
2305 == GenerationFencePhase::WaitingForOldStop
2306 && runtime_state.stop_state == ChildStopState::CancelDelivered
2307 && runtime_state.has_active_attempt()
2308 && now >= pending_restart.stop_deadline_at_unix_nanos
2309 {
2310 match (runtime_state.generation, runtime_state.attempt) {
2311 (Some(old_generation), Some(old_attempt)) => Some((
2312 pending_restart.command_id,
2313 pending_restart.target_generation,
2314 pending_restart.stop_deadline_at_unix_nanos,
2315 runtime_state.child_id.clone(),
2316 runtime_state.path.clone(),
2317 old_generation,
2318 old_attempt,
2319 )),
2320 _ => None,
2321 }
2322 } else {
2323 None
2324 }
2325 } else {
2326 None
2327 };
2328
2329 if let Some((
2330 command_id,
2331 target_generation,
2332 deadline_ns,
2333 fence_child_id,
2334 fence_path,
2335 old_generation,
2336 old_attempt,
2337 )) = fence_escalation
2338 {
2339 let delivered = runtime_state.abort();
2340 if delivered {
2341 if let Some(pending_mut) = &mut runtime_state.generation_fence.pending_restart {
2342 pending_mut.abort_requested = true;
2343 }
2344 runtime_state.generation_fence.phase = GenerationFencePhase::AbortingOld;
2345 pending_events.push(PendingRuntimeEvent {
2346 child_id: fence_child_id.clone(),
2347 path: fence_path,
2348 generation: Some(old_generation),
2349 attempt: Some(old_attempt),
2350 correlation_id: CorrelationId::from_uuid(command_id),
2351 what: What::ChildRestartFenceAbortRequested {
2352 child_id: fence_child_id,
2353 old_generation,
2354 old_attempt,
2355 target_generation,
2356 command_id: command_id.to_string(),
2357 deadline_unix_nanos: deadline_ns,
2358 },
2359 });
2360 }
2361 }
2362
2363 if runtime_state.generation_fence.pending_restart.is_some() {
2364 continue;
2365 }
2366
2367 if matches!(
2368 runtime_state.generation_fence.phase,
2369 GenerationFencePhase::WaitingForOldStop | GenerationFencePhase::AbortingOld
2370 ) {
2371 continue;
2372 }
2373
2374 if runtime_state.stop_state != ChildStopState::CancelDelivered {
2375 continue;
2376 }
2377 let Some(deadline) = runtime_state.stop_deadline_at_unix_nanos else {
2378 continue;
2379 };
2380 if deadline > now || !runtime_state.has_active_attempt() {
2381 continue;
2382 }
2383 let Some(generation) = runtime_state.generation else {
2384 continue;
2385 };
2386 let Some(attempt) = runtime_state.attempt else {
2387 continue;
2388 };
2389 let status = runtime_state.status;
2390 let failure = ChildControlFailure::new(
2391 ChildControlFailurePhase::WaitCompletion,
2392 "child did not complete before stop deadline",
2393 true,
2394 );
2395 runtime_state.status = status;
2396 runtime_state.stop_state = ChildStopState::Failed;
2397 runtime_state.last_control_failure = Some(failure.clone());
2398 pending_events.push(PendingRuntimeEvent {
2399 child_id: runtime_state.child_id.clone(),
2400 path: runtime_state.path.clone(),
2401 generation: Some(generation),
2402 attempt: Some(attempt),
2403 correlation_id: CorrelationId::new(),
2404 what: What::ChildControlStopFailed {
2405 child_id: runtime_state.child_id.clone(),
2406 generation,
2407 attempt,
2408 status,
2409 stop_state: ChildStopState::Failed,
2410 phase: failure.phase,
2411 reason: failure.reason,
2412 recoverable: failure.recoverable,
2413 },
2414 });
2415 }
2416 for event in pending_events {
2417 self.emit_pending_event(event);
2418 }
2419 }
2420
2421 fn emit_pending_event(&mut self, pending: PendingRuntimeEvent) {
2431 let now = Instant::now();
2432 let uptime = now
2433 .duration_since(self.time_base.base_instant)
2434 .as_millis()
2435 .min(u128::from(u64::MAX)) as u64;
2436 let monotonic_nanos = now.duration_since(self.time_base.base_instant).as_nanos();
2437 let child_name = self
2438 .registry
2439 .child(&pending.child_id)
2440 .map(|runtime| runtime.spec.name.clone())
2441 .unwrap_or_else(|| pending.child_id.to_string());
2442 let event = SupervisorEvent::new(
2443 When::new(EventTime::from_parts(
2444 monotonic_nanos,
2445 uptime,
2446 pending.generation.unwrap_or_else(Generation::initial),
2447 pending.attempt.unwrap_or_else(ChildStartCount::first),
2448 )),
2449 Where::new(pending.path).with_child(pending.child_id, child_name),
2450 pending.what,
2451 self.event_sequences.next(),
2452 pending.correlation_id,
2453 1,
2454 );
2455 if let Ok(mut observability) = self.observability.lock() {
2456 let _lagged = observability.emit(event);
2457 }
2458 }
2459
2460 fn spawn_child_start(&mut self, child_id: ChildId, is_restart: bool, delay: Duration) {
2472 if self.shutdown.phase() != ShutdownPhase::Idle {
2473 return;
2474 }
2475 if let Some(runtime_state) = self.slots.get(&child_id) {
2476 if runtime_state.generation_fence.pending_restart.is_some() {
2477 if is_restart {
2478 let path = runtime_state.path.clone();
2479 let generation = runtime_state.generation;
2480 let attempt = runtime_state.attempt;
2481 let pending_target = runtime_state
2482 .generation_fence
2483 .pending_restart
2484 .as_ref()
2485 .map(|pending| pending.target_generation);
2486 self.emit_pending_event(PendingRuntimeEvent {
2487 child_id: child_id.clone(),
2488 path,
2489 generation,
2490 attempt,
2491 correlation_id: CorrelationId::new(),
2492 what: What::ChildRestartConflict {
2493 child_id: child_id.clone(),
2494 current_generation: generation,
2495 current_attempt: attempt,
2496 target_generation: pending_target,
2497 command_id: "runtime-policy".to_owned(),
2498 decision: "rejected".to_owned(),
2499 reason: "automatic restart suppressed while pending manual restart holds the fence".to_owned(),
2500 },
2501 });
2502 }
2503 return;
2504 }
2505 if matches!(
2506 runtime_state.generation_fence.phase,
2507 GenerationFencePhase::WaitingForOldStop
2508 | GenerationFencePhase::AbortingOld
2509 | GenerationFencePhase::Closed
2510 | GenerationFencePhase::ReadyToStart
2511 ) {
2512 return;
2513 }
2514 }
2515 let Some(runtime) = self.prepare_child_start(&child_id, is_restart) else {
2516 return;
2517 };
2518 let sender = self.command_sender.clone();
2519 if !delay.is_zero() {
2520 tokio::spawn(async move {
2521 tokio::time::sleep(delay).await;
2522 let child_id_for_msg = runtime.id.clone();
2523 let path = runtime.path.clone();
2524 let generation = runtime.generation;
2525 let attempt = runtime.child_start_count;
2526 match ChildRunner::new().spawn_once(runtime) {
2527 Ok(handle) => {
2528 let _ignored = sender
2529 .send(RuntimeLoopMessage::ChildStart(
2530 ChildStartMessage::DelayedSpawnAttached {
2531 child_id: child_id_for_msg,
2532 path,
2533 generation,
2534 attempt,
2535 handle,
2536 },
2537 ))
2538 .await;
2539 }
2540 Err(error) => {
2541 tokio::spawn(async move {
2542 send_child_result(sender, child_id_for_msg, Err(error)).await;
2543 });
2544 }
2545 }
2546 });
2547 return;
2548 }
2549
2550 let child_id_cloned = runtime.id.clone();
2551 let path = runtime.path.clone();
2552 let generation = runtime.generation;
2553 let child_start_count = runtime.child_start_count;
2554 match ChildRunner::new().spawn_once(runtime) {
2555 Ok(handle) => {
2556 self.attach_spawned_child_handle(
2557 child_id_cloned,
2558 path,
2559 generation,
2560 child_start_count,
2561 handle,
2562 );
2563 }
2564 Err(error) => {
2565 tokio::spawn(async move {
2566 send_child_result(sender, child_id_cloned, Err(error)).await;
2567 });
2568 }
2569 }
2570 }
2571
2572 fn prepare_child_start(
2583 &mut self,
2584 child_id: &ChildId,
2585 bump_restart_counters: bool,
2586 ) -> Option<ChildRuntime> {
2587 let runtime = self.registry.child_mut(child_id)?;
2588 if bump_restart_counters {
2589 runtime.child_start_count = runtime.child_start_count.next();
2590 runtime.generation = runtime.generation.next();
2591 runtime.restart_count = runtime.restart_count.saturating_add(1);
2592 }
2593 runtime.status = ChildRuntimeStatus::Starting;
2594 if let Some(runtime_state) = self.slots.get_mut(child_id) {
2595 runtime_state.operation = ChildControlOperation::Active;
2596 }
2597 Some(runtime.clone())
2598 }
2599
2600 pub(crate) async fn handle_shutdown_tree(
2615 &mut self,
2616 requested_by: String,
2617 reason: String,
2618 event_sender: &broadcast::Sender<String>,
2619 ) -> Result<ShutdownResult, SupervisorError> {
2620 let policy = self.shutdown.policy;
2621 let reason_copy = reason.clone();
2622 let cause = ShutdownCause::new(requested_by, reason);
2623 let _started = self.shutdown.request_stop(cause);
2624 let _ignored = event_sender.send(format!(
2625 "shutdown_phase_changed:{:?}:{:?}",
2626 ShutdownPhase::Idle,
2627 self.shutdown.phase()
2628 ));
2629 self.advance_shutdown_phase(event_sender);
2630 self.advance_shutdown_phase(event_sender);
2631
2632 let outcomes =
2633 shutdown_tree_fanout(&mut self.slots, &policy, &mut self.admission_set).await;
2634 let reconcile = reconcile_shutdown_slots(&self.slots);
2635
2636 if !reconcile.verified_clean {
2638 let _ignored = event_sender.send(format!(
2639 "shutdown_reconcile_warning: orphan_slots={:?}",
2640 reconcile.orphan_slots
2641 ));
2642 }
2643
2644 self.advance_shutdown_phase(event_sender);
2645 self.advance_shutdown_phase(event_sender);
2646 let _completed = self.shutdown.complete();
2647
2648 let report = ShutdownPipelineReport {
2649 cause: ShutdownCause::new("slot-shutdown", reason_copy),
2650 started_at_unix_nanos: unix_epoch_nanos(),
2651 completed_at_unix_nanos: unix_epoch_nanos(),
2652 phase: ShutdownPhase::Completed,
2653 outcomes,
2654 reconcile: ShutdownReconcileReport::core_runtime_completed(),
2655 idempotent: false,
2656 };
2657 self.shutdown_pipeline.cache_report(report.clone());
2658 let _ignored = event_sender.send(format!("shutdown_completed:{}", report.outcomes.len()));
2659 Ok(self.shutdown.result_with_report(report, false))
2660 }
2661
2662 pub(crate) fn handle_command_on_slot(
2673 &mut self,
2674 child_id: &ChildId,
2675 operation: ChildControlOperation,
2676 ) -> bool {
2677 let Some(slot) = self.slots.get_mut(child_id) else {
2678 return false;
2679 };
2680 slot.operation = operation;
2681 if matches!(
2682 operation,
2683 ChildControlOperation::Quarantined | ChildControlOperation::Removed
2684 ) && slot.has_active_attempt()
2685 {
2686 slot.cancel();
2687 }
2688 true
2689 }
2690
2691 pub(crate) fn process_child_exit_on_slot(
2703 &mut self,
2704 child_id: &ChildId,
2705 report: &ChildRunReport,
2706 ) -> Option<ChildExitSummary> {
2707 let slot = self.slots.get_mut(child_id)?;
2708 let now_nanos = self.time_base.now_unix_nanos();
2709 let summary = ChildExitSummary::from_report(report, now_nanos);
2710 slot.deactivate(summary.clone());
2711 self.admission_set.release(child_id);
2712 Some(summary)
2713 }
2714
2715 pub(crate) fn observe_slot_liveness(
2726 &mut self,
2727 event_sender: &broadcast::Sender<String>,
2728 ) -> usize {
2729 let mut stale_count = 0usize;
2730 let threshold_nanos = Duration::from_secs(DEFAULT_HEARTBEAT_TIMEOUT_SECS).as_nanos();
2731 let now_nanos = self.time_base.now_unix_nanos();
2732
2733 for (child_id, slot) in self.slots.iter_mut() {
2734 if !slot.has_active_attempt() {
2735 continue;
2736 }
2737 if let Some(last_hb) = slot.last_heartbeat_at
2738 && now_nanos.saturating_sub(last_hb) >= threshold_nanos
2739 {
2740 stale_count += 1;
2741 let _ignored = event_sender.send(format!(
2742 "child_liveness_stale: child_id={} last_heartbeat_at={}",
2743 child_id, last_hb
2744 ));
2745 }
2746 }
2747 stale_count
2748 }
2749
2750 pub(crate) fn check_slot_restart_eligibility(
2765 &self,
2766 child_id: &ChildId,
2767 request_generation: Generation,
2768 request_attempt: ChildStartCount,
2769 ) -> Result<(), AdmissionConflict> {
2770 let Some(slot) = self.slots.get(child_id) else {
2771 return Ok(());
2772 };
2773 if slot.pending_restart {
2774 return Err(AdmissionConflict::new(
2775 child_id.clone(),
2776 slot.generation.unwrap_or(Generation::initial()),
2777 slot.attempt.unwrap_or(ChildStartCount::first()),
2778 "restart rejected: pending restart already exists",
2779 ));
2780 }
2781 if let (Some(active_gen), Some(active_att)) = (slot.generation, slot.attempt)
2782 && (request_generation != active_gen || request_attempt != active_att)
2783 {
2784 return Err(AdmissionConflict::new(
2785 child_id.clone(),
2786 active_gen,
2787 active_att,
2788 "restart conflicts with active attempt",
2789 ));
2790 }
2791 Ok(())
2792 }
2793
2794 pub(crate) fn ensure_slot_exists(&mut self, child_id: ChildId, path: SupervisorPath) -> bool {
2806 if self.slots.contains_key(&child_id) {
2807 return false;
2808 }
2809 let slot = ChildSlot::new(
2810 child_id.clone(),
2811 path,
2812 Duration::from_secs(60), );
2814 self.slots.insert(child_id, slot);
2815 true
2816 }
2817}
2818
2819fn build_child_control_outcome(
2835 operation_before: ChildControlOperation,
2836 cancel_delivered: bool,
2837 idempotent: bool,
2838 failure: Option<ChildControlFailure>,
2839 runtime_state: &mut ChildSlot,
2840 time_base: &RuntimeTimeBase,
2841 generation_fence: Option<GenerationFenceOutcome>,
2842) -> ChildControlResult {
2843 let liveness = runtime_state.observe_liveness(time_base.now_unix_nanos());
2844 let status = if runtime_state.attempt.is_some() {
2846 Some(runtime_state.status)
2847 } else {
2848 None
2849 };
2850 ChildControlResult::new(
2851 runtime_state.child_id.clone(),
2852 runtime_state.attempt,
2853 runtime_state.generation,
2854 operation_before,
2855 runtime_state.operation,
2856 status,
2857 cancel_delivered,
2858 runtime_state.stop_state,
2859 runtime_state.restart_limit.clone(),
2860 liveness,
2861 idempotent,
2862 failure,
2863 generation_fence,
2864 )
2865}
2866
2867#[derive(Debug, Clone, Copy)]
2869struct StopControlApplication {
2870 operation_before: ChildControlOperation,
2872 cancel_delivered: bool,
2874 idempotent: bool,
2876 remove_after_outcome: bool,
2878}
2879
2880#[allow(clippy::too_many_arguments)]
2897fn apply_stop_control_to_runtime_state(
2898 runtime_state: &mut ChildSlot,
2899 target_operation: ChildControlOperation,
2900 command_name: &'static str,
2901 command_id: &str,
2902 correlation_id: CorrelationId,
2903 stop_deadline_at_unix_nanos: u128,
2904 event_sender: &broadcast::Sender<String>,
2905 pending_events: &mut Vec<PendingRuntimeEvent>,
2906) -> StopControlApplication {
2907 let child_id = runtime_state.child_id.clone();
2908 let operation_before = runtime_state.operation;
2909 let had_active_attempt = runtime_state.has_active_attempt();
2910 let already_cancelled_for_target = had_active_attempt
2911 && operation_before == target_operation
2912 && runtime_state.attempt_cancel_delivered;
2913 let idempotent = if had_active_attempt {
2914 already_cancelled_for_target
2915 } else {
2916 operation_before == target_operation && target_operation != ChildControlOperation::Removed
2917 };
2918
2919 if operation_before != target_operation {
2920 runtime_state.operation = target_operation;
2921 pending_events.push(PendingRuntimeEvent {
2922 child_id: child_id.clone(),
2923 path: runtime_state.path.clone(),
2924 generation: runtime_state.generation,
2925 attempt: runtime_state.attempt,
2926 correlation_id,
2927 what: What::ChildControlOperationChanged {
2928 child_id: child_id.clone(),
2929 from: operation_before,
2930 to: target_operation,
2931 command: command_name.to_owned(),
2932 command_id: command_id.to_owned(),
2933 },
2934 });
2935 let _ignored = event_sender.send(format!(
2936 "child_control_operation_changed:{child_id}:{operation_before:?}:{target_operation:?}"
2937 ));
2938 }
2939
2940 let cancel_delivered = if had_active_attempt && !already_cancelled_for_target {
2941 let delivered = runtime_state.cancel();
2942 if delivered {
2943 runtime_state.stop_state = ChildStopState::CancelDelivered;
2944 runtime_state.stop_deadline_at_unix_nanos = Some(stop_deadline_at_unix_nanos);
2945 if let (Some(generation), Some(attempt)) =
2946 (runtime_state.generation, runtime_state.attempt)
2947 {
2948 pending_events.push(PendingRuntimeEvent {
2949 child_id: child_id.clone(),
2950 path: runtime_state.path.clone(),
2951 generation: Some(generation),
2952 attempt: Some(attempt),
2953 correlation_id,
2954 what: What::ChildControlCancelDelivered {
2955 child_id: child_id.clone(),
2956 generation,
2957 attempt,
2958 command: command_name.to_owned(),
2959 command_id: command_id.to_owned(),
2960 },
2961 });
2962 }
2963 let _ignored = event_sender.send(format!(
2964 "child_control_cancel_delivered:{child_id}:{command_name}"
2965 ));
2966 }
2967 delivered
2968 } else {
2969 if !had_active_attempt {
2970 runtime_state.stop_state = ChildStopState::NoActiveAttempt;
2971 }
2972 false
2973 };
2974
2975 StopControlApplication {
2976 operation_before,
2977 cancel_delivered,
2978 idempotent,
2979 remove_after_outcome: target_operation == ChildControlOperation::Removed
2980 && !had_active_attempt,
2981 }
2982}
2983
2984fn heartbeat_stale_event(
2995 runtime_state: &mut ChildSlot,
2996 liveness: &ChildLivenessState,
2997) -> Option<PendingRuntimeEvent> {
2998 let Some(attempt) = runtime_state.attempt else {
2999 runtime_state.stale_event_attempt = None;
3000 return None;
3001 };
3002 if !liveness.heartbeat_stale {
3003 runtime_state.stale_event_attempt = None;
3004 return None;
3005 }
3006 if runtime_state.stale_event_attempt == Some(attempt) {
3007 return None;
3008 }
3009 let since_unix_nanos = liveness.last_heartbeat_at_unix_nanos?;
3010 runtime_state.stale_event_attempt = Some(attempt);
3011 Some(PendingRuntimeEvent {
3012 child_id: runtime_state.child_id.clone(),
3013 path: runtime_state.path.clone(),
3014 generation: runtime_state.generation,
3015 attempt: Some(attempt),
3016 correlation_id: CorrelationId::new(),
3017 what: What::ChildHeartbeatStale {
3018 child_id: runtime_state.child_id.clone(),
3019 attempt,
3020 since_unix_nanos,
3021 },
3022 })
3023}
3024
3025pub async fn run_control_loop(
3037 mut state: RuntimeControlState,
3038 mut receiver: mpsc::Receiver<RuntimeLoopMessage>,
3039 event_sender: broadcast::Sender<String>,
3040) -> RuntimeExitReport {
3041 state.start_declared_children();
3042 while let Some(message) = receiver.recv().await {
3043 match message {
3044 RuntimeLoopMessage::Control {
3045 command,
3046 reply_sender,
3047 } => {
3048 let command_name = command_name(&command);
3049 let result = state.execute_control(command, &event_sender).await;
3050 let _ignored = event_sender.send(format!("control_command:{command_name}"));
3051 let _ignored = reply_sender.send(result);
3052 }
3053 RuntimeLoopMessage::ChildStart(ChildStartMessage::Exited { report }) => {
3054 state.handle_child_exit(*report, &event_sender);
3055 }
3056 RuntimeLoopMessage::ChildStart(ChildStartMessage::StartFailed {
3057 child_id,
3058 message,
3059 }) => {
3060 state.handle_child_start_failed(child_id, message, &event_sender);
3061 }
3062 RuntimeLoopMessage::ChildStart(ChildStartMessage::DelayedSpawnAttached {
3063 child_id,
3064 path,
3065 generation,
3066 attempt,
3067 handle,
3068 }) => {
3069 state.attach_spawned_child_handle(child_id, path, generation, attempt, handle);
3070 }
3071 RuntimeLoopMessage::ControlPlane(ControlPlaneMessage::ReplayChildExitForTest {
3072 report,
3073 }) => {
3074 state.handle_child_exit(*report, &event_sender);
3075 }
3076 RuntimeLoopMessage::ControlPlane(ControlPlaneMessage::Shutdown {
3077 meta,
3078 reply_sender,
3079 }) => {
3080 let _ignored = event_sender.send(format!(
3081 "runtime_control_loop_shutdown_requested:{}:{}",
3082 meta.requested_by, meta.reason
3083 ));
3084 match meta.validate() {
3085 Ok(()) => {
3086 let report = RuntimeExitReport::completed(
3087 "shutdown",
3088 format!("runtime control plane shutdown requested: {}", meta.reason),
3089 );
3090 let _ignored = reply_sender.send(Ok(report.clone()));
3091 return report;
3092 }
3093 Err(error) => {
3094 let _ignored = reply_sender.send(Err(error));
3095 continue;
3096 }
3097 }
3098 }
3099 }
3100 }
3101 RuntimeExitReport::completed("message_loop", "runtime command channel closed")
3102}
3103
3104fn command_name(command: &ControlCommand) -> &'static str {
3114 match command {
3115 ControlCommand::AddChild { .. } => "add_child",
3116 ControlCommand::RemoveChild { .. } => "remove_child",
3117 ControlCommand::RestartChild { .. } => "restart_child",
3118 ControlCommand::PauseChild { .. } => "pause_child",
3119 ControlCommand::ResumeChild { .. } => "resume_child",
3120 ControlCommand::QuarantineChild { .. } => "quarantine_child",
3121 ControlCommand::ShutdownTree { .. } => "shutdown_tree",
3122 ControlCommand::CurrentState { .. } => "current_state",
3123 }
3124}
3125
3126async fn send_child_result(
3138 sender: mpsc::Sender<RuntimeLoopMessage>,
3139 child_id: ChildId,
3140 result: Result<ChildRunReport, SupervisorError>,
3141) {
3142 let message = match result {
3143 Ok(report) => RuntimeLoopMessage::ChildStart(ChildStartMessage::Exited {
3144 report: Box::new(report),
3145 }),
3146 Err(error) => RuntimeLoopMessage::ChildStart(ChildStartMessage::StartFailed {
3147 child_id,
3148 message: error.to_string(),
3149 }),
3150 };
3151 let _ignored = sender.send(message).await;
3152}
3153
3154fn restart_policy(policy: ChildRestartPolicy) -> RestartPolicy {
3164 match policy {
3165 ChildRestartPolicy::Permanent => RestartPolicy::Permanent,
3166 ChildRestartPolicy::Transient => RestartPolicy::Transient,
3167 ChildRestartPolicy::Temporary => RestartPolicy::Temporary,
3168 }
3169}
3170
3171fn backoff_policy(policy: crate::spec::child::BackoffPolicy) -> BackoffPolicy {
3185 let jitter_percent = (policy.jitter_ratio * 100.0).round().clamp(0.0, 100.0) as u8;
3186 BackoffPolicy::new(
3187 policy.initial_delay,
3188 policy.max_delay,
3189 jitter_percent,
3190 policy.max_delay,
3191 )
3192 .with_full_jitter(42) }
3194
3195fn policy_task_exit(exit: &TaskExit) -> PolicyTaskExit {
3205 match exit.failure_kind() {
3206 Some(kind) => PolicyTaskExit::Failed { kind: kind.into() },
3207 None => PolicyTaskExit::Succeeded,
3208 }
3209}
3210
3211fn classify_exit_for_pipeline(exit: &TaskExit, manual_stop_requested: bool) -> ExitClassification {
3224 match exit {
3225 TaskExit::Succeeded => ExitClassification::Success,
3226 TaskExit::Cancelled if manual_stop_requested => ExitClassification::ManualStop,
3227 TaskExit::Cancelled => ExitClassification::ExternalCancel,
3228 TaskExit::Failed(failure) => {
3229 match failure.kind {
3231 crate::error::types::TaskFailureKind::Cancelled if manual_stop_requested => {
3232 ExitClassification::ManualStop
3233 }
3234 crate::error::types::TaskFailureKind::Cancelled => {
3235 ExitClassification::ExternalCancel
3236 }
3237 crate::error::types::TaskFailureKind::Timeout => ExitClassification::Timeout,
3238 _ => ExitClassification::NonZeroExit { exit_code: -1 },
3239 }
3240 }
3241 TaskExit::Panicked(_) => ExitClassification::Crash {
3242 reason: "panic".to_string(),
3243 },
3244 TaskExit::TimedOut => ExitClassification::Timeout,
3245 }
3246}
3247
3248fn role_policy_restarts_success(pipeline_result: &PipelineContext) -> bool {
3258 pipeline_result.exit_classification == Some(ExitClassification::Success)
3259 && pipeline_result
3260 .effective_policy
3261 .as_ref()
3262 .is_some_and(|policy| policy.policy_pack.on_success_exit == OnSuccessAction::Restart)
3263}
3264
3265fn prepare_effective_policy(child_spec: &ChildSpec) -> EffectivePolicy {
3275 EffectivePolicy::for_child(child_spec)
3276}
3277
3278fn restart_limit_counts_exit(exit: &TaskExit) -> bool {
3288 matches!(
3289 exit,
3290 TaskExit::Failed(_) | TaskExit::Panicked(_) | TaskExit::TimedOut
3291 )
3292}
3293
3294fn restart_limit_for_child_in_spec(
3306 tree: &SupervisorTree,
3307 spec: &SupervisorSpec,
3308 child_id: &ChildId,
3309) -> RestartLimit {
3310 restart_execution_plan(tree, spec, child_id)
3311 .restart_limit
3312 .unwrap_or_else(default_restart_limit)
3313}
3314
3315fn default_restart_limit() -> RestartLimit {
3325 RestartLimit::new(u32::MAX, Duration::from_secs(60))
3326}
3327
3328fn restart_child_unknown_outcome(child_id: ChildId) -> ChildControlResult {
3338 let conflict = ChildControlFailure::new(
3339 ChildControlFailurePhase::WaitCompletion,
3340 "unknown child",
3341 false,
3342 );
3343 let fence = GenerationFenceOutcome::new(
3344 GenerationFenceDecision::Rejected,
3345 None,
3346 None,
3347 None,
3348 false,
3349 false,
3350 Some(conflict.clone()),
3351 );
3352 ChildControlResult::new(
3353 child_id,
3354 None,
3355 None,
3356 ChildControlOperation::Active,
3357 ChildControlOperation::Active,
3358 None,
3359 false,
3360 ChildStopState::NoActiveAttempt,
3361 RestartLimitState::default(),
3362 ChildLivenessState::new(
3363 None,
3364 false,
3365 crate::readiness::signal::ReadinessState::Unreported,
3366 ),
3367 false,
3368 Some(conflict),
3369 Some(fence),
3370 )
3371}
3372
3373fn child_control_result_label(outcome: &ChildControlResult) -> &'static str {
3383 if outcome.failure.is_some() || outcome.stop_state == ChildStopState::Failed {
3384 "failed"
3385 } else if outcome.idempotent {
3386 "idempotent"
3387 } else {
3388 "accepted"
3389 }
3390}
3391
3392fn child_scope_label(scope: &[ChildId]) -> String {
3402 scope
3403 .iter()
3404 .map(|child_id| child_id.value.clone())
3405 .collect::<Vec<_>>()
3406 .join(",")
3407}
3408
3409fn outcome_from_report(
3432 runtime_state: &ChildSlot,
3433 report: &ChildRunReport,
3434 status: ChildShutdownStatus,
3435 phase: ShutdownPhase,
3436 reason: impl Into<String>,
3437) -> ChildShutdownOutcome {
3438 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3439 child_id: runtime_state.child_id.clone(),
3440 path: runtime_state.path.clone(),
3441 generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3442 child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3443 status,
3444 cancel_delivered: runtime_state.attempt_cancel_delivered,
3445 exit: Some(report.exit.clone()),
3446 phase,
3447 reason: reason.into(),
3448 })
3449}
3450
3451fn removed_runtime_state_shutdown_outcome(
3462 runtime_state: &ChildSlot,
3463 phase: ShutdownPhase,
3464) -> ChildShutdownOutcome {
3465 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3466 child_id: runtime_state.child_id.clone(),
3467 path: runtime_state.path.clone(),
3468 generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3469 child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3470 status: ChildShutdownStatus::AlreadyExited,
3471 cancel_delivered: false,
3472 exit: None,
3473 phase,
3474 reason: "child runtime state was already removed before shutdown".to_owned(),
3475 })
3476}
3477
3478fn outcome_from_error(
3491 runtime_state: &ChildSlot,
3492 status: ChildShutdownStatus,
3493 phase: ShutdownPhase,
3494 error: SupervisorError,
3495) -> ChildShutdownOutcome {
3496 ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
3497 child_id: runtime_state.child_id.clone(),
3498 path: runtime_state.path.clone(),
3499 generation: runtime_state.generation.unwrap_or_else(Generation::initial),
3500 child_start_count: runtime_state.attempt.unwrap_or_else(ChildStartCount::first),
3501 status,
3502 cancel_delivered: runtime_state.attempt_cancel_delivered,
3503 exit: None,
3504 phase,
3505 reason: error.to_string(),
3506 })
3507}
3508
3509fn remaining_duration(deadline: Instant) -> Option<Duration> {
3519 deadline.checked_duration_since(Instant::now())
3520}
3521
3522fn unix_epoch_nanos() -> u128 {
3532 SystemTime::now()
3533 .duration_since(UNIX_EPOCH)
3534 .map_or(0, |duration| duration.as_nanos())
3535}