use crate::child_runner::runner::{ChildRunHandle, ChildRunReport, wait_for_report};
use crate::control::outcome::{
ChildAttemptStatus, ChildControlFailure, ChildControlOperation, ChildLivenessState,
ChildRuntimeRecord, ChildStopState, GenerationFenceState, RestartLimitState,
};
use crate::error::types::SupervisorError;
use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
use crate::readiness::signal::ReadinessState;
use serde::Serialize;
use std::collections::VecDeque;
use std::fmt::{Display, Formatter};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use tokio::task::AbortHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
pub const DEFAULT_HEARTBEAT_TIMEOUT_SECS: u64 = 5;
#[derive(Debug, Clone, Default)]
pub struct RestartLimitTracker {
failure_timestamps: VecDeque<u128>,
}
impl RestartLimitTracker {
pub fn new() -> Self {
Self::default()
}
pub fn refresh(&mut self, now_unix_nanos: u128, window: Duration, count_failure: bool) -> u32 {
self.prune(now_unix_nanos, window);
if count_failure {
self.failure_timestamps.push_back(now_unix_nanos);
}
self.failure_timestamps.len().min(u32::MAX as usize) as u32
}
fn prune(&mut self, now_unix_nanos: u128, window: Duration) {
let window_nanos = window.as_nanos();
while self
.failure_timestamps
.front()
.is_some_and(|timestamp| now_unix_nanos.saturating_sub(*timestamp) > window_nanos)
{
self.failure_timestamps.pop_front();
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct RuntimeTimeBase {
pub base_instant: Instant,
pub base_unix_nanos: u128,
}
impl RuntimeTimeBase {
pub fn new() -> Self {
Self {
base_instant: Instant::now(),
base_unix_nanos: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_nanos()),
}
}
pub fn now_unix_nanos(&self) -> u128 {
self.instant_to_unix_nanos(Instant::now())
}
pub fn instant_to_unix_nanos(&self, instant: Instant) -> u128 {
if instant >= self.base_instant {
self.base_unix_nanos
.saturating_add(instant.duration_since(self.base_instant).as_nanos())
} else {
self.base_unix_nanos
.saturating_sub(self.base_instant.duration_since(instant).as_nanos())
}
}
}
impl Default for RuntimeTimeBase {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ChildExitSummary {
pub exit_code: Option<i32>,
pub exit_reason: String,
pub exited_at_unix_nanos: u128,
}
impl ChildExitSummary {
pub fn from_report(report: &ChildRunReport, exited_at_unix_nanos: u128) -> Self {
let exit_reason = match &report.exit {
crate::child_runner::run_exit::TaskExit::Succeeded => "succeeded".to_owned(),
crate::child_runner::run_exit::TaskExit::Cancelled => "cancelled".to_owned(),
crate::child_runner::run_exit::TaskExit::Failed(f) => f.message.clone(),
crate::child_runner::run_exit::TaskExit::Panicked(msg) => format!("panicked: {msg}"),
crate::child_runner::run_exit::TaskExit::TimedOut => "timed out".to_owned(),
};
Self {
exit_code: None,
exit_reason,
exited_at_unix_nanos,
}
}
}
impl Display for ChildExitSummary {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self.exit_code {
Some(code) => write!(formatter, "code={} reason={}", code, self.exit_reason),
None => write!(formatter, "reason={}", self.exit_reason),
}
}
}
#[derive(Debug, Serialize)]
pub struct ChildSlot {
pub child_id: ChildId,
pub path: SupervisorPath,
pub status: ChildAttemptStatus,
pub operation: ChildControlOperation,
pub generation: Option<Generation>,
pub attempt: Option<ChildStartCount>,
pub restart_count: u64,
#[serde(skip)]
pub cancellation_token: Option<CancellationToken>,
#[serde(skip)]
pub abort_handle: Option<AbortHandle>,
#[serde(skip)]
pub completion_receiver:
Option<watch::Receiver<Option<Result<ChildRunReport, SupervisorError>>>>,
#[serde(skip)]
pub heartbeat_receiver: Option<watch::Receiver<Option<Instant>>>,
#[serde(skip)]
pub readiness_receiver: Option<watch::Receiver<ReadinessState>>,
pub last_exit: Option<ChildExitSummary>,
pub last_ready_at: Option<u128>,
pub last_heartbeat_at: Option<u128>,
pub restart_window: Duration,
pub pending_restart: bool,
pub attempt_cancel_delivered: bool,
pub abort_requested: bool,
#[serde(skip)]
pub restart_limit: RestartLimitState,
#[serde(skip)]
pub restart_limit_tracker: RestartLimitTracker,
pub stop_state: ChildStopState,
pub stop_deadline_at_unix_nanos: Option<u128>,
pub last_control_failure: Option<ChildControlFailure>,
pub stale_event_attempt: Option<ChildStartCount>,
#[serde(skip)]
pub generation_fence: GenerationFenceState,
#[serde(skip)]
pub registry_identity_anchor_for_spawn_attempt: Option<(Generation, ChildStartCount, u64)>,
#[serde(skip)]
pub last_observed_readiness: ReadinessState,
}
impl ChildSlot {
pub fn new(child_id: ChildId, path: SupervisorPath, restart_window: Duration) -> Self {
Self {
child_id,
path,
status: ChildAttemptStatus::Stopped,
operation: ChildControlOperation::Active,
generation: None,
attempt: None,
restart_count: 0,
cancellation_token: None,
abort_handle: None,
completion_receiver: None,
heartbeat_receiver: None,
readiness_receiver: None,
last_exit: None,
last_ready_at: None,
last_heartbeat_at: None,
restart_window,
pending_restart: false,
attempt_cancel_delivered: false,
abort_requested: false,
restart_limit: RestartLimitState::default(),
restart_limit_tracker: RestartLimitTracker::new(),
stop_state: ChildStopState::NoActiveAttempt,
stop_deadline_at_unix_nanos: None,
last_control_failure: None,
stale_event_attempt: None,
generation_fence: GenerationFenceState::placeholder(),
registry_identity_anchor_for_spawn_attempt: None,
last_observed_readiness: ReadinessState::Unreported,
}
}
pub fn new_placeholder(child_id: ChildId, path: SupervisorPath) -> Self {
Self::new(child_id, path, Duration::from_secs(60))
}
pub fn activate(
&mut self,
generation: Generation,
attempt: ChildStartCount,
status: ChildAttemptStatus,
handle: ChildRunHandle,
) {
self.generation = Some(generation);
self.attempt = Some(attempt);
self.status = status;
self.generation_fence.active_generation = Some(generation);
self.generation_fence.active_attempt = Some(attempt);
self.cancellation_token = Some(handle.cancellation_token);
self.abort_handle = Some(handle.abort_handle);
self.completion_receiver = Some(handle.completion_receiver);
self.heartbeat_receiver = Some(handle.heartbeat_receiver);
self.readiness_receiver = Some(handle.readiness_receiver);
self.last_exit = None;
self.last_ready_at = None;
self.last_heartbeat_at = None;
self.last_observed_readiness = ReadinessState::Unreported;
self.attempt_cancel_delivered = false;
self.abort_requested = false;
self.pending_restart = false;
self.stop_state = ChildStopState::Idle;
self.stop_deadline_at_unix_nanos = None;
self.last_control_failure = None;
self.stale_event_attempt = None;
self.registry_identity_anchor_for_spawn_attempt = None;
self.generation_fence.phase = GenerationFenceState::placeholder().phase;
}
pub fn deactivate(&mut self, exit_summary: ChildExitSummary) {
self.last_exit = Some(exit_summary);
self.restart_count = self.restart_count.saturating_add(1);
self.generation = None;
self.attempt = None;
self.status = ChildAttemptStatus::Stopped;
self.cancellation_token = None;
self.abort_handle = None;
self.completion_receiver = None;
self.heartbeat_receiver = None;
self.readiness_receiver = None;
self.last_ready_at = None;
self.last_heartbeat_at = None;
self.attempt_cancel_delivered = false;
self.abort_requested = false;
self.pending_restart = false;
self.generation_fence.active_generation = None;
self.generation_fence.active_attempt = None;
self.stop_state = ChildStopState::NoActiveAttempt;
self.stop_deadline_at_unix_nanos = None;
self.stale_event_attempt = None;
self.registry_identity_anchor_for_spawn_attempt = None;
}
pub fn clear_instance(&mut self) {
self.generation = None;
self.attempt = None;
self.status = ChildAttemptStatus::Stopped;
self.generation_fence.active_generation = None;
self.generation_fence.active_attempt = None;
self.cancellation_token = None;
self.abort_handle = None;
self.completion_receiver = None;
self.heartbeat_receiver = None;
self.readiness_receiver = None;
self.attempt_cancel_delivered = false;
self.abort_requested = false;
self.stop_deadline_at_unix_nanos = None;
self.stale_event_attempt = None;
self.registry_identity_anchor_for_spawn_attempt = None;
self.stop_state = ChildStopState::NoActiveAttempt;
}
pub fn has_active_attempt(&self) -> bool {
self.attempt.is_some() && self.cancellation_token.is_some()
}
pub fn cancel(&mut self) -> bool {
let Some(token) = &self.cancellation_token else {
return false;
};
if self.attempt_cancel_delivered {
return false;
}
token.cancel();
self.attempt_cancel_delivered = true;
self.status = ChildAttemptStatus::Cancelling;
true
}
pub fn abort(&mut self) -> bool {
let Some(handle) = &self.abort_handle else {
return false;
};
if self.abort_requested {
return false;
}
handle.abort();
self.abort_requested = true;
true
}
pub async fn wait_for_report(&mut self) -> Result<ChildRunReport, SupervisorError> {
let Some(receiver) = &mut self.completion_receiver else {
return Err(SupervisorError::InvalidTransition {
message: "child slot has no active completion receiver".to_owned(),
});
};
wait_for_report(receiver).await
}
pub fn observe_liveness(&mut self, now_unix_nanos: u128) -> ChildLivenessState {
if let Some(receiver) = &self.heartbeat_receiver {
let heartbeat = *receiver.borrow();
if heartbeat.is_some() {
self.last_heartbeat_at = Some(now_unix_nanos);
}
}
let readiness = if let Some(receiver) = &self.readiness_receiver {
let r = *receiver.borrow();
if r == ReadinessState::Ready {
self.last_ready_at = Some(now_unix_nanos);
}
r
} else {
ReadinessState::Unreported
};
let heartbeat_stale = self.last_heartbeat_at.is_some_and(|heartbeat| {
let elapsed_nanos = now_unix_nanos.saturating_sub(heartbeat);
elapsed_nanos >= Duration::from_secs(DEFAULT_HEARTBEAT_TIMEOUT_SECS).as_nanos()
});
ChildLivenessState::new(self.last_heartbeat_at, heartbeat_stale, readiness)
}
pub fn update_restart_limit(
&mut self,
window: Duration,
limit: u32,
used: u32,
time_base: &RuntimeTimeBase,
) -> RestartLimitState {
let mut updated_at = time_base.now_unix_nanos();
if updated_at <= self.restart_limit.updated_at_unix_nanos {
updated_at = self.restart_limit.updated_at_unix_nanos.saturating_add(1);
}
self.restart_limit = RestartLimitState {
window,
limit,
used,
remaining: limit.saturating_sub(used),
exhausted: used >= limit,
updated_at_unix_nanos: updated_at,
};
self.restart_limit.clone()
}
pub fn refresh_restart_limit(
&mut self,
window: Duration,
limit: u32,
count_failure: bool,
time_base: &RuntimeTimeBase,
) -> RestartLimitState {
let now = time_base.now_unix_nanos();
let used = self
.restart_limit_tracker
.refresh(now, window, count_failure);
self.update_restart_limit(window, limit, used, time_base)
}
pub fn to_record(&self, liveness: ChildLivenessState) -> ChildRuntimeRecord {
let status = if self.attempt.is_some() {
Some(self.status)
} else {
None
};
ChildRuntimeRecord::new(
self.child_id.clone(),
self.path.clone(),
self.generation,
self.attempt,
status,
self.operation,
liveness,
self.restart_limit.clone(),
self.stop_state,
self.last_control_failure.clone(),
self.generation_fence.phase,
None, )
}
}