use crate::id::types::{ChildId, ChildStartCount, Generation};
use crate::runtime::admission::AdmissionSet;
use crate::runtime::child_slot::{ChildExitSummary, ChildSlot};
use crate::shutdown::report::{
ChildShutdownOutcome, ChildShutdownOutcomeInput, ChildShutdownStatus,
};
use crate::shutdown::stage::{ShutdownPhase, ShutdownPolicy};
use std::collections::HashMap;
use std::time::Duration;
use tokio::time::{Instant, timeout};
pub async fn shutdown_tree_fanout(
slots: &mut HashMap<ChildId, ChildSlot>,
policy: &ShutdownPolicy,
admission: &mut AdmissionSet,
) -> Vec<ChildShutdownOutcome> {
let global_deadline = Instant::now() + policy.graceful_timeout + policy.abort_wait;
let graceful_deadline = Instant::now() + policy.graceful_timeout;
for slot in slots.values_mut() {
if slot.has_active_attempt() {
slot.cancel();
}
}
let child_ids: Vec<ChildId> = slots.keys().cloned().collect();
for child_id in &child_ids {
let remaining = remaining_duration(graceful_deadline);
let completed = drain_one_slot(slots, child_id, remaining).await;
if completed {
admission.release(child_id);
}
}
if policy.abort_after_timeout {
for slot in slots.values_mut() {
if slot.has_active_attempt() && !slot.abort_requested {
slot.abort();
}
}
}
let abort_deadline = graceful_deadline + policy.abort_wait;
for child_id in &child_ids {
let remaining = remaining_duration(abort_deadline.min(global_deadline));
let completed = drain_one_slot(slots, child_id, remaining).await;
if completed {
admission.release(child_id);
}
}
for child_id in &child_ids {
if let Some(slot) = slots.get_mut(child_id)
&& slot.has_active_attempt()
{
slot.deactivate(ChildExitSummary {
exit_code: None,
exit_reason: "shutdown deadline reached; force-cleared".to_owned(),
exited_at_unix_nanos: 0,
});
admission.release(child_id);
}
}
slots
.iter()
.map(|(child_id, slot)| build_slot_outcome(child_id, slot))
.collect()
}
fn build_slot_outcome(child_id: &ChildId, slot: &ChildSlot) -> ChildShutdownOutcome {
let status = if slot.has_active_attempt() {
ChildShutdownStatus::AbortFailed
} else if slot.abort_requested {
ChildShutdownStatus::Aborted
} else if slot.attempt_cancel_delivered {
ChildShutdownStatus::Graceful
} else {
ChildShutdownStatus::AlreadyExited
};
let reason = slot
.last_exit
.as_ref()
.map(|e| e.exit_reason.clone())
.unwrap_or_else(|| "no active attempt".to_owned());
ChildShutdownOutcome::new(ChildShutdownOutcomeInput {
child_id: child_id.clone(),
path: slot.path.clone(),
generation: slot.generation.unwrap_or(Generation::initial()),
child_start_count: slot.attempt.unwrap_or(ChildStartCount::first()),
status,
cancel_delivered: slot.attempt_cancel_delivered,
exit: None,
phase: ShutdownPhase::Completed,
reason,
})
}
#[derive(Debug, Clone)]
pub struct SlotReconcileResult {
pub orphan_slots: Vec<ChildId>,
pub total_slots_checked: usize,
pub verified_clean: bool,
}
pub fn reconcile_shutdown_slots(slots: &HashMap<ChildId, ChildSlot>) -> SlotReconcileResult {
let mut orphan_slots: Vec<ChildId> = Vec::new();
let total_slots_checked = slots.len();
for (child_id, slot) in slots {
if slot.has_active_attempt()
|| slot.cancellation_token.is_some()
|| slot.completion_receiver.is_some()
{
orphan_slots.push(child_id.clone());
}
}
let verified_clean = orphan_slots.is_empty();
SlotReconcileResult {
orphan_slots,
total_slots_checked,
verified_clean,
}
}
fn remaining_duration(deadline: Instant) -> Option<Duration> {
let now = Instant::now();
if now >= deadline {
None
} else {
Some(deadline - now)
}
}
async fn drain_one_slot(
slots: &mut HashMap<ChildId, ChildSlot>,
child_id: &ChildId,
remaining: Option<Duration>,
) -> bool {
let Some(mut slot) = slots.remove(child_id) else {
return false;
};
if !slot.has_active_attempt() {
slots.insert(child_id.clone(), slot);
return true;
}
let mut receiver = match slot.completion_receiver.take() {
Some(rx) => rx,
None => {
slots.insert(child_id.clone(), slot);
return true;
}
};
let awaited = {
let wait_future = crate::child_runner::runner::wait_for_report(&mut receiver);
match remaining {
Some(dur) => timeout(dur, wait_future).await.ok(),
None => None,
}
};
match awaited {
Some(Ok(report)) => {
let summary = ChildExitSummary::from_report(&report, 0u128);
slot.deactivate(summary);
slots.insert(child_id.clone(), slot);
true
}
Some(Err(_e)) => {
slot.deactivate(ChildExitSummary {
exit_code: None,
exit_reason: "completion receiver error".to_owned(),
exited_at_unix_nanos: 0,
});
slots.insert(child_id.clone(), slot);
true
}
None => {
slot.completion_receiver = Some(receiver);
slots.insert(child_id.clone(), slot);
false
}
}
}