Skip to main content

rust_supervisor/runtime/
child_slot.rs

1//! ChildSlot placed on each supervised child runtime identity.
2//!
3//! A [`ChildSlot`] owns the live handles (cancellation token, join handle) for at
4//! most one active attempt at any moment. The control loop manipulates slots
5//! through the methods defined here rather than rewriting in-memory labels.
6
7use crate::child_runner::runner::{ChildRunHandle, ChildRunReport, wait_for_report};
8use crate::control::outcome::{
9    ChildAttemptStatus, ChildControlFailure, ChildControlOperation, ChildLivenessState,
10    ChildRuntimeRecord, ChildStopState, GenerationFenceState, RestartLimitState,
11};
12use crate::error::types::SupervisorError;
13use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
14use crate::readiness::signal::ReadinessState;
15use serde::Serialize;
16use std::collections::VecDeque;
17use std::fmt::{Display, Formatter};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19use tokio::sync::watch;
20use tokio::task::AbortHandle;
21use tokio::time::Instant;
22use tokio_util::sync::CancellationToken;
23
24// ---------------------------------------------------------------------------
25// Shared types (migrated from child_runtime_state)
26// ---------------------------------------------------------------------------
27
28/// Default heartbeat stale threshold in seconds.
29pub const DEFAULT_HEARTBEAT_TIMEOUT_SECS: u64 = 5;
30
31/// Restart accounting history for one child runtime slot.
32#[derive(Debug, Clone, Default)]
33pub struct RestartLimitTracker {
34    /// Failure timestamps that are still relevant to the restart window.
35    failure_timestamps: VecDeque<u128>,
36}
37
38impl RestartLimitTracker {
39    /// Creates an empty restart limit tracker.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Refreshes accounting and optionally records the current failed exit.
45    ///
46    /// # Arguments
47    ///
48    /// - `now_unix_nanos`: Current Unix timestamp in nanoseconds.
49    /// - `window`: Restart accounting window.
50    /// - `count_failure`: Whether the current exit should consume the limit.
51    ///
52    /// # Returns
53    ///
54    /// Returns the number of failures inside the active window.
55    pub fn refresh(&mut self, now_unix_nanos: u128, window: Duration, count_failure: bool) -> u32 {
56        self.prune(now_unix_nanos, window);
57        if count_failure {
58            self.failure_timestamps.push_back(now_unix_nanos);
59        }
60        self.failure_timestamps.len().min(u32::MAX as usize) as u32
61    }
62
63    /// Removes failure timestamps outside the accounting window.
64    fn prune(&mut self, now_unix_nanos: u128, window: Duration) {
65        let window_nanos = window.as_nanos();
66        while self
67            .failure_timestamps
68            .front()
69            .is_some_and(|timestamp| now_unix_nanos.saturating_sub(*timestamp) > window_nanos)
70        {
71            self.failure_timestamps.pop_front();
72        }
73    }
74}
75
76/// Runtime time base used to convert monotonic instants into Unix timestamps.
77#[derive(Debug, Clone, Copy)]
78pub struct RuntimeTimeBase {
79    /// Monotonic instant captured when the runtime starts.
80    pub base_instant: Instant,
81    /// Unix epoch timestamp in nanoseconds captured when the runtime starts.
82    pub base_unix_nanos: u128,
83}
84
85impl RuntimeTimeBase {
86    /// Creates a runtime time base.
87    pub fn new() -> Self {
88        Self {
89            base_instant: Instant::now(),
90            base_unix_nanos: SystemTime::now()
91                .duration_since(UNIX_EPOCH)
92                .map_or(0, |duration| duration.as_nanos()),
93        }
94    }
95
96    /// Returns the current Unix epoch timestamp in nanoseconds.
97    pub fn now_unix_nanos(&self) -> u128 {
98        self.instant_to_unix_nanos(Instant::now())
99    }
100
101    /// Converts a monotonic instant into a Unix epoch timestamp in nanoseconds.
102    ///
103    /// # Arguments
104    ///
105    /// - `instant`: Monotonic instant to convert.
106    pub fn instant_to_unix_nanos(&self, instant: Instant) -> u128 {
107        if instant >= self.base_instant {
108            self.base_unix_nanos
109                .saturating_add(instant.duration_since(self.base_instant).as_nanos())
110        } else {
111            self.base_unix_nanos
112                .saturating_sub(self.base_instant.duration_since(instant).as_nanos())
113        }
114    }
115}
116
117impl Default for RuntimeTimeBase {
118    /// Creates the default runtime time base.
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124// ---------------------------------------------------------------------------
125// ChildExitSummary
126// ---------------------------------------------------------------------------
127
128/// Summary recorded when a child attempt exits.
129#[derive(Debug, Clone, Serialize)]
130pub struct ChildExitSummary {
131    /// Process exit code when available.
132    pub exit_code: Option<i32>,
133    /// Human-readable exit reason.
134    pub exit_reason: String,
135    /// Unix epoch timestamp in nanoseconds when the exit was recorded.
136    pub exited_at_unix_nanos: u128,
137}
138
139impl ChildExitSummary {
140    /// Creates an exit summary from a [`ChildRunReport`].
141    ///
142    /// # Arguments
143    ///
144    /// - `report`: Completed child run report.
145    /// - `exited_at_unix_nanos`: Timestamp when the exit was observed.
146    ///
147    /// # Returns
148    ///
149    /// Returns a [`ChildExitSummary`].
150    pub fn from_report(report: &ChildRunReport, exited_at_unix_nanos: u128) -> Self {
151        let exit_reason = match &report.exit {
152            crate::child_runner::run_exit::TaskExit::Succeeded => "succeeded".to_owned(),
153            crate::child_runner::run_exit::TaskExit::Cancelled => "cancelled".to_owned(),
154            crate::child_runner::run_exit::TaskExit::Failed(f) => f.message.clone(),
155            crate::child_runner::run_exit::TaskExit::Panicked(msg) => format!("panicked: {msg}"),
156            crate::child_runner::run_exit::TaskExit::TimedOut => "timed out".to_owned(),
157        };
158        Self {
159            exit_code: None,
160            exit_reason,
161            exited_at_unix_nanos,
162        }
163    }
164}
165
166impl Display for ChildExitSummary {
167    /// Formats the exit summary as `code=<code> reason=<reason>`.
168    fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
169        match self.exit_code {
170            Some(code) => write!(formatter, "code={} reason={}", code, self.exit_reason),
171            None => write!(formatter, "reason={}", self.exit_reason),
172        }
173    }
174}
175
176// ---------------------------------------------------------------------------
177// ChildSlot
178// ---------------------------------------------------------------------------
179
180/// Runtime slot for one supervised child.
181///
182/// At most one active attempt may occupy the slot at any moment. The slot owns
183/// the cancellation token, abort handle, and completion/health receivers for
184/// the active attempt.
185#[derive(Debug, Serialize)]
186pub struct ChildSlot {
187    /// Stable child identifier.
188    pub child_id: ChildId,
189    /// Child path in the supervisor tree.
190    pub path: SupervisorPath,
191    /// Current active attempt status.
192    pub status: ChildAttemptStatus,
193    /// Current control operation requested by the operator.
194    pub operation: ChildControlOperation,
195    /// Generation of the active attempt.
196    pub generation: Option<Generation>,
197    /// Monotonic attempt number for the active attempt.
198    pub attempt: Option<ChildStartCount>,
199    /// Cumulative restart count across all generations.
200    pub restart_count: u64,
201    /// Cancellation token for the active attempt (runtime-only, not serialized).
202    #[serde(skip)]
203    pub cancellation_token: Option<CancellationToken>,
204    /// Abort handle for the active attempt (runtime-only, not serialized).
205    #[serde(skip)]
206    pub abort_handle: Option<AbortHandle>,
207    /// Completion receiver for the active attempt (runtime-only, not serialized).
208    #[serde(skip)]
209    pub completion_receiver:
210        Option<watch::Receiver<Option<Result<ChildRunReport, SupervisorError>>>>,
211    /// Heartbeat receiver for the active attempt (runtime-only, not serialized).
212    #[serde(skip)]
213    pub heartbeat_receiver: Option<watch::Receiver<Option<Instant>>>,
214    /// Readiness receiver for the active attempt (runtime-only, not serialized).
215    #[serde(skip)]
216    pub readiness_receiver: Option<watch::Receiver<ReadinessState>>,
217    /// Summary of the most recent exit, if any.
218    pub last_exit: Option<ChildExitSummary>,
219    /// Unix epoch timestamp in nanoseconds when the child last reported ready.
220    pub last_ready_at: Option<u128>,
221    /// Unix epoch timestamp in nanoseconds of the last observed heartbeat.
222    pub last_heartbeat_at: Option<u128>,
223    /// Restart accounting window duration.
224    pub restart_window: Duration,
225    /// Whether a restart is pending but not yet activated.
226    pub pending_restart: bool,
227    /// Whether cancellation has been delivered to the active attempt.
228    pub attempt_cancel_delivered: bool,
229    /// Whether abort has been requested for the active attempt.
230    pub abort_requested: bool,
231    // --- Fields migrated from ChildRuntimeState for compatibility ---
232    /// Current restart limit state.
233    #[serde(skip)]
234    pub restart_limit: RestartLimitState,
235    /// Runtime-side restart accounting history.
236    #[serde(skip)]
237    pub restart_limit_tracker: RestartLimitTracker,
238    /// Current stop progress.
239    pub stop_state: ChildStopState,
240    /// Stop deadline in Unix epoch nanoseconds.
241    pub stop_deadline_at_unix_nanos: Option<u128>,
242    /// Most recent control failure.
243    pub last_control_failure: Option<ChildControlFailure>,
244    /// Attempt for the most recent stale heartbeat event.
245    pub stale_event_attempt: Option<ChildStartCount>,
246    /// Generation fencing state for restart coordination.
247    #[serde(skip)]
248    pub generation_fence: GenerationFenceState,
249    /// Registry identity anchor captured before a fenced restart.
250    #[serde(skip)]
251    pub registry_identity_anchor_for_spawn_attempt: Option<(Generation, ChildStartCount, u64)>,
252    /// Last observed readiness state.
253    #[serde(skip)]
254    pub last_observed_readiness: ReadinessState,
255}
256
257impl ChildSlot {
258    /// Creates an empty slot with no active attempt.
259    ///
260    /// # Arguments
261    ///
262    /// - `child_id`: Stable child identifier.
263    /// - `path`: Child path in the supervisor tree.
264    /// - `restart_window`: Restart accounting window duration.
265    ///
266    /// # Returns
267    ///
268    /// Returns a [`ChildSlot`] in idle state.
269    pub fn new(child_id: ChildId, path: SupervisorPath, restart_window: Duration) -> Self {
270        Self {
271            child_id,
272            path,
273            status: ChildAttemptStatus::Stopped,
274            operation: ChildControlOperation::Active,
275            generation: None,
276            attempt: None,
277            restart_count: 0,
278            cancellation_token: None,
279            abort_handle: None,
280            completion_receiver: None,
281            heartbeat_receiver: None,
282            readiness_receiver: None,
283            last_exit: None,
284            last_ready_at: None,
285            last_heartbeat_at: None,
286            restart_window,
287            pending_restart: false,
288            attempt_cancel_delivered: false,
289            abort_requested: false,
290            restart_limit: RestartLimitState::default(),
291            restart_limit_tracker: RestartLimitTracker::new(),
292            stop_state: ChildStopState::NoActiveAttempt,
293            stop_deadline_at_unix_nanos: None,
294            last_control_failure: None,
295            stale_event_attempt: None,
296            generation_fence: GenerationFenceState::placeholder(),
297            registry_identity_anchor_for_spawn_attempt: None,
298            last_observed_readiness: ReadinessState::Unreported,
299        }
300    }
301
302    /// Creates an empty slot with a default 60-second restart window.
303    ///
304    /// Convenience constructor for [`ChildSlot::new`] when the restart window
305    /// is not yet known.
306    ///
307    /// # Arguments
308    ///
309    /// - `child_id`: Stable child identifier.
310    /// - `path`: Child path in the supervisor tree.
311    ///
312    /// # Returns
313    ///
314    /// Returns a [`ChildSlot`] in idle state.
315    pub fn new_placeholder(child_id: ChildId, path: SupervisorPath) -> Self {
316        Self::new(child_id, path, Duration::from_secs(60))
317    }
318
319    /// Activates an attempt on this slot.
320    ///
321    /// # Arguments
322    ///
323    /// - `generation`: Generation owned by the active attempt.
324    /// - `attempt`: Monotonic attempt number.
325    /// - `status`: Initial active attempt status.
326    /// - `handle`: Child run handle carrying cancellation token and receivers.
327    ///
328    /// # Returns
329    ///
330    /// This function does not return a value.
331    pub fn activate(
332        &mut self,
333        generation: Generation,
334        attempt: ChildStartCount,
335        status: ChildAttemptStatus,
336        handle: ChildRunHandle,
337    ) {
338        self.generation = Some(generation);
339        self.attempt = Some(attempt);
340        self.status = status;
341        self.generation_fence.active_generation = Some(generation);
342        self.generation_fence.active_attempt = Some(attempt);
343        self.cancellation_token = Some(handle.cancellation_token);
344        self.abort_handle = Some(handle.abort_handle);
345        self.completion_receiver = Some(handle.completion_receiver);
346        self.heartbeat_receiver = Some(handle.heartbeat_receiver);
347        self.readiness_receiver = Some(handle.readiness_receiver);
348        self.last_exit = None;
349        self.last_ready_at = None;
350        self.last_heartbeat_at = None;
351        self.last_observed_readiness = ReadinessState::Unreported;
352        self.attempt_cancel_delivered = false;
353        self.abort_requested = false;
354        self.pending_restart = false;
355        self.stop_state = ChildStopState::Idle;
356        self.stop_deadline_at_unix_nanos = None;
357        self.last_control_failure = None;
358        self.stale_event_attempt = None;
359        self.registry_identity_anchor_for_spawn_attempt = None;
360        self.generation_fence.phase = GenerationFenceState::placeholder().phase;
361    }
362
363    /// Deactivates the current attempt and records its exit summary.
364    ///
365    /// The caller must have already awaited or consumed the completion
366    /// receiver. This method clears handles and advances the restart counter.
367    ///
368    /// # Arguments
369    ///
370    /// - `exit_summary`: Summary captured from the completed child run.
371    ///
372    /// # Returns
373    ///
374    /// This function does not return a value.
375    pub fn deactivate(&mut self, exit_summary: ChildExitSummary) {
376        self.last_exit = Some(exit_summary);
377        self.restart_count = self.restart_count.saturating_add(1);
378        self.generation = None;
379        self.attempt = None;
380        self.status = ChildAttemptStatus::Stopped;
381        self.cancellation_token = None;
382        self.abort_handle = None;
383        self.completion_receiver = None;
384        self.heartbeat_receiver = None;
385        self.readiness_receiver = None;
386        self.last_ready_at = None;
387        self.last_heartbeat_at = None;
388        self.attempt_cancel_delivered = false;
389        self.abort_requested = false;
390        self.pending_restart = false;
391        self.generation_fence.active_generation = None;
392        self.generation_fence.active_attempt = None;
393        self.stop_state = ChildStopState::NoActiveAttempt;
394        self.stop_deadline_at_unix_nanos = None;
395        self.stale_event_attempt = None;
396        self.registry_identity_anchor_for_spawn_attempt = None;
397    }
398
399    /// Clears the active instance without recording an exit.
400    pub fn clear_instance(&mut self) {
401        self.generation = None;
402        self.attempt = None;
403        self.status = ChildAttemptStatus::Stopped;
404        self.generation_fence.active_generation = None;
405        self.generation_fence.active_attempt = None;
406        self.cancellation_token = None;
407        self.abort_handle = None;
408        self.completion_receiver = None;
409        self.heartbeat_receiver = None;
410        self.readiness_receiver = None;
411        self.attempt_cancel_delivered = false;
412        self.abort_requested = false;
413        self.stop_deadline_at_unix_nanos = None;
414        self.stale_event_attempt = None;
415        self.registry_identity_anchor_for_spawn_attempt = None;
416        self.stop_state = ChildStopState::NoActiveAttempt;
417    }
418
419    /// Returns whether the slot currently holds an active attempt.
420    ///
421    /// # Arguments
422    ///
423    /// This function has no arguments.
424    ///
425    /// # Returns
426    ///
427    /// Returns `true` when an active attempt exists.
428    pub fn has_active_attempt(&self) -> bool {
429        self.attempt.is_some() && self.cancellation_token.is_some()
430    }
431
432    /// Delivers cancellation to the active attempt.
433    ///
434    /// # Arguments
435    ///
436    /// This function has no arguments.
437    ///
438    /// # Returns
439    ///
440    /// Returns `true` when this call delivered cancellation (first delivery).
441    pub fn cancel(&mut self) -> bool {
442        let Some(token) = &self.cancellation_token else {
443            return false;
444        };
445        if self.attempt_cancel_delivered {
446            return false;
447        }
448        token.cancel();
449        self.attempt_cancel_delivered = true;
450        self.status = ChildAttemptStatus::Cancelling;
451        true
452    }
453
454    /// Requests abort for the active attempt.
455    ///
456    /// # Arguments
457    ///
458    /// This function has no arguments.
459    ///
460    /// # Returns
461    ///
462    /// Returns `true` when this call requested abort (first request).
463    pub fn abort(&mut self) -> bool {
464        let Some(handle) = &self.abort_handle else {
465            return false;
466        };
467        if self.abort_requested {
468            return false;
469        }
470        handle.abort();
471        self.abort_requested = true;
472        true
473    }
474
475    /// Waits for the active attempt report.
476    ///
477    /// # Arguments
478    ///
479    /// This function has no arguments.
480    ///
481    /// # Returns
482    ///
483    /// Returns the completed child run report.
484    pub async fn wait_for_report(&mut self) -> Result<ChildRunReport, SupervisorError> {
485        let Some(receiver) = &mut self.completion_receiver else {
486            return Err(SupervisorError::InvalidTransition {
487                message: "child slot has no active completion receiver".to_owned(),
488            });
489        };
490        wait_for_report(receiver).await
491    }
492
493    /// Observes current readiness and heartbeat from the active attempt.
494    ///
495    /// # Arguments
496    ///
497    /// - `now_unix_nanos`: Current Unix epoch timestamp in nanoseconds.
498    ///
499    /// # Returns
500    ///
501    /// Returns the latest [`ChildLivenessState`].
502    pub fn observe_liveness(&mut self, now_unix_nanos: u128) -> ChildLivenessState {
503        if let Some(receiver) = &self.heartbeat_receiver {
504            let heartbeat = *receiver.borrow();
505            if heartbeat.is_some() {
506                self.last_heartbeat_at = Some(now_unix_nanos);
507            }
508        }
509        let readiness = if let Some(receiver) = &self.readiness_receiver {
510            let r = *receiver.borrow();
511            if r == ReadinessState::Ready {
512                self.last_ready_at = Some(now_unix_nanos);
513            }
514            r
515        } else {
516            ReadinessState::Unreported
517        };
518        let heartbeat_stale = self.last_heartbeat_at.is_some_and(|heartbeat| {
519            let elapsed_nanos = now_unix_nanos.saturating_sub(heartbeat);
520            elapsed_nanos >= Duration::from_secs(DEFAULT_HEARTBEAT_TIMEOUT_SECS).as_nanos()
521        });
522        ChildLivenessState::new(self.last_heartbeat_at, heartbeat_stale, readiness)
523    }
524
525    /// Updates restart limit state (migration compatibility with
526    /// [`ChildRuntimeState::update_restart_limit`]).
527    ///
528    /// # Arguments
529    ///
530    /// - `window`: Restart accounting window.
531    /// - `limit`: Restart limit inside the window.
532    /// - `used`: Restart count used so far.
533    /// - `time_base`: Runtime time base.
534    ///
535    /// # Returns
536    ///
537    /// Returns the updated [`RestartLimitState`].
538    pub fn update_restart_limit(
539        &mut self,
540        window: Duration,
541        limit: u32,
542        used: u32,
543        time_base: &RuntimeTimeBase,
544    ) -> RestartLimitState {
545        let mut updated_at = time_base.now_unix_nanos();
546        if updated_at <= self.restart_limit.updated_at_unix_nanos {
547            updated_at = self.restart_limit.updated_at_unix_nanos.saturating_add(1);
548        }
549        self.restart_limit = RestartLimitState {
550            window,
551            limit,
552            used,
553            remaining: limit.saturating_sub(used),
554            exhausted: used >= limit,
555            updated_at_unix_nanos: updated_at,
556        };
557        self.restart_limit.clone()
558    }
559
560    /// Refreshes the restart limit tracker and updates the state.
561    pub fn refresh_restart_limit(
562        &mut self,
563        window: Duration,
564        limit: u32,
565        count_failure: bool,
566        time_base: &RuntimeTimeBase,
567    ) -> RestartLimitState {
568        let now = time_base.now_unix_nanos();
569        let used = self
570            .restart_limit_tracker
571            .refresh(now, window, count_failure);
572        self.update_restart_limit(window, limit, used, time_base)
573    }
574
575    /// Builds a public runtime state record.
576    pub fn to_record(&self, liveness: ChildLivenessState) -> ChildRuntimeRecord {
577        // Data model: status is None when there is no active attempt.
578        let status = if self.attempt.is_some() {
579            Some(self.status)
580        } else {
581            None
582        };
583        ChildRuntimeRecord::new(
584            self.child_id.clone(),
585            self.path.clone(),
586            self.generation,
587            self.attempt,
588            status,
589            self.operation,
590            liveness,
591            self.restart_limit.clone(),
592            self.stop_state,
593            self.last_control_failure.clone(),
594            self.generation_fence.phase,
595            None, // pending_restart
596        )
597    }
598}