1use crate::child_runner::runner::{ChildRunHandle, ChildRunReport, wait_for_report};
8use crate::control::outcome::{
9 ChildAttemptStatus, ChildControlFailure, ChildControlOperation, ChildLivenessState,
10 ChildRuntimeRecord, ChildStopState, GenerationFenceState, RestartLimitState,
11};
12use crate::error::types::SupervisorError;
13use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
14use crate::readiness::signal::ReadinessState;
15use serde::Serialize;
16use std::collections::VecDeque;
17use std::fmt::{Display, Formatter};
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19use tokio::sync::watch;
20use tokio::task::AbortHandle;
21use tokio::time::Instant;
22use tokio_util::sync::CancellationToken;
23
24pub const DEFAULT_HEARTBEAT_TIMEOUT_SECS: u64 = 5;
30
31#[derive(Debug, Clone, Default)]
33pub struct RestartLimitTracker {
34 failure_timestamps: VecDeque<u128>,
36}
37
38impl RestartLimitTracker {
39 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn refresh(&mut self, now_unix_nanos: u128, window: Duration, count_failure: bool) -> u32 {
56 self.prune(now_unix_nanos, window);
57 if count_failure {
58 self.failure_timestamps.push_back(now_unix_nanos);
59 }
60 self.failure_timestamps.len().min(u32::MAX as usize) as u32
61 }
62
63 fn prune(&mut self, now_unix_nanos: u128, window: Duration) {
65 let window_nanos = window.as_nanos();
66 while self
67 .failure_timestamps
68 .front()
69 .is_some_and(|timestamp| now_unix_nanos.saturating_sub(*timestamp) > window_nanos)
70 {
71 self.failure_timestamps.pop_front();
72 }
73 }
74}
75
76#[derive(Debug, Clone, Copy)]
78pub struct RuntimeTimeBase {
79 pub base_instant: Instant,
81 pub base_unix_nanos: u128,
83}
84
85impl RuntimeTimeBase {
86 pub fn new() -> Self {
88 Self {
89 base_instant: Instant::now(),
90 base_unix_nanos: SystemTime::now()
91 .duration_since(UNIX_EPOCH)
92 .map_or(0, |duration| duration.as_nanos()),
93 }
94 }
95
96 pub fn now_unix_nanos(&self) -> u128 {
98 self.instant_to_unix_nanos(Instant::now())
99 }
100
101 pub fn instant_to_unix_nanos(&self, instant: Instant) -> u128 {
107 if instant >= self.base_instant {
108 self.base_unix_nanos
109 .saturating_add(instant.duration_since(self.base_instant).as_nanos())
110 } else {
111 self.base_unix_nanos
112 .saturating_sub(self.base_instant.duration_since(instant).as_nanos())
113 }
114 }
115}
116
117impl Default for RuntimeTimeBase {
118 fn default() -> Self {
120 Self::new()
121 }
122}
123
124#[derive(Debug, Clone, Serialize)]
130pub struct ChildExitSummary {
131 pub exit_code: Option<i32>,
133 pub exit_reason: String,
135 pub exited_at_unix_nanos: u128,
137}
138
139impl ChildExitSummary {
140 pub fn from_report(report: &ChildRunReport, exited_at_unix_nanos: u128) -> Self {
151 let exit_reason = match &report.exit {
152 crate::child_runner::run_exit::TaskExit::Succeeded => "succeeded".to_owned(),
153 crate::child_runner::run_exit::TaskExit::Cancelled => "cancelled".to_owned(),
154 crate::child_runner::run_exit::TaskExit::Failed(f) => f.message.clone(),
155 crate::child_runner::run_exit::TaskExit::Panicked(msg) => format!("panicked: {msg}"),
156 crate::child_runner::run_exit::TaskExit::TimedOut => "timed out".to_owned(),
157 };
158 Self {
159 exit_code: None,
160 exit_reason,
161 exited_at_unix_nanos,
162 }
163 }
164}
165
166impl Display for ChildExitSummary {
167 fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
169 match self.exit_code {
170 Some(code) => write!(formatter, "code={} reason={}", code, self.exit_reason),
171 None => write!(formatter, "reason={}", self.exit_reason),
172 }
173 }
174}
175
176#[derive(Debug, Serialize)]
186pub struct ChildSlot {
187 pub child_id: ChildId,
189 pub path: SupervisorPath,
191 pub status: ChildAttemptStatus,
193 pub operation: ChildControlOperation,
195 pub generation: Option<Generation>,
197 pub attempt: Option<ChildStartCount>,
199 pub restart_count: u64,
201 #[serde(skip)]
203 pub cancellation_token: Option<CancellationToken>,
204 #[serde(skip)]
206 pub abort_handle: Option<AbortHandle>,
207 #[serde(skip)]
209 pub completion_receiver:
210 Option<watch::Receiver<Option<Result<ChildRunReport, SupervisorError>>>>,
211 #[serde(skip)]
213 pub heartbeat_receiver: Option<watch::Receiver<Option<Instant>>>,
214 #[serde(skip)]
216 pub readiness_receiver: Option<watch::Receiver<ReadinessState>>,
217 pub last_exit: Option<ChildExitSummary>,
219 pub last_ready_at: Option<u128>,
221 pub last_heartbeat_at: Option<u128>,
223 pub restart_window: Duration,
225 pub pending_restart: bool,
227 pub attempt_cancel_delivered: bool,
229 pub abort_requested: bool,
231 #[serde(skip)]
234 pub restart_limit: RestartLimitState,
235 #[serde(skip)]
237 pub restart_limit_tracker: RestartLimitTracker,
238 pub stop_state: ChildStopState,
240 pub stop_deadline_at_unix_nanos: Option<u128>,
242 pub last_control_failure: Option<ChildControlFailure>,
244 pub stale_event_attempt: Option<ChildStartCount>,
246 #[serde(skip)]
248 pub generation_fence: GenerationFenceState,
249 #[serde(skip)]
251 pub registry_identity_anchor_for_spawn_attempt: Option<(Generation, ChildStartCount, u64)>,
252 #[serde(skip)]
254 pub last_observed_readiness: ReadinessState,
255}
256
257impl ChildSlot {
258 pub fn new(child_id: ChildId, path: SupervisorPath, restart_window: Duration) -> Self {
270 Self {
271 child_id,
272 path,
273 status: ChildAttemptStatus::Stopped,
274 operation: ChildControlOperation::Active,
275 generation: None,
276 attempt: None,
277 restart_count: 0,
278 cancellation_token: None,
279 abort_handle: None,
280 completion_receiver: None,
281 heartbeat_receiver: None,
282 readiness_receiver: None,
283 last_exit: None,
284 last_ready_at: None,
285 last_heartbeat_at: None,
286 restart_window,
287 pending_restart: false,
288 attempt_cancel_delivered: false,
289 abort_requested: false,
290 restart_limit: RestartLimitState::default(),
291 restart_limit_tracker: RestartLimitTracker::new(),
292 stop_state: ChildStopState::NoActiveAttempt,
293 stop_deadline_at_unix_nanos: None,
294 last_control_failure: None,
295 stale_event_attempt: None,
296 generation_fence: GenerationFenceState::placeholder(),
297 registry_identity_anchor_for_spawn_attempt: None,
298 last_observed_readiness: ReadinessState::Unreported,
299 }
300 }
301
302 pub fn new_placeholder(child_id: ChildId, path: SupervisorPath) -> Self {
316 Self::new(child_id, path, Duration::from_secs(60))
317 }
318
319 pub fn activate(
332 &mut self,
333 generation: Generation,
334 attempt: ChildStartCount,
335 status: ChildAttemptStatus,
336 handle: ChildRunHandle,
337 ) {
338 self.generation = Some(generation);
339 self.attempt = Some(attempt);
340 self.status = status;
341 self.generation_fence.active_generation = Some(generation);
342 self.generation_fence.active_attempt = Some(attempt);
343 self.cancellation_token = Some(handle.cancellation_token);
344 self.abort_handle = Some(handle.abort_handle);
345 self.completion_receiver = Some(handle.completion_receiver);
346 self.heartbeat_receiver = Some(handle.heartbeat_receiver);
347 self.readiness_receiver = Some(handle.readiness_receiver);
348 self.last_exit = None;
349 self.last_ready_at = None;
350 self.last_heartbeat_at = None;
351 self.last_observed_readiness = ReadinessState::Unreported;
352 self.attempt_cancel_delivered = false;
353 self.abort_requested = false;
354 self.pending_restart = false;
355 self.stop_state = ChildStopState::Idle;
356 self.stop_deadline_at_unix_nanos = None;
357 self.last_control_failure = None;
358 self.stale_event_attempt = None;
359 self.registry_identity_anchor_for_spawn_attempt = None;
360 self.generation_fence.phase = GenerationFenceState::placeholder().phase;
361 }
362
363 pub fn deactivate(&mut self, exit_summary: ChildExitSummary) {
376 self.last_exit = Some(exit_summary);
377 self.restart_count = self.restart_count.saturating_add(1);
378 self.generation = None;
379 self.attempt = None;
380 self.status = ChildAttemptStatus::Stopped;
381 self.cancellation_token = None;
382 self.abort_handle = None;
383 self.completion_receiver = None;
384 self.heartbeat_receiver = None;
385 self.readiness_receiver = None;
386 self.last_ready_at = None;
387 self.last_heartbeat_at = None;
388 self.attempt_cancel_delivered = false;
389 self.abort_requested = false;
390 self.pending_restart = false;
391 self.generation_fence.active_generation = None;
392 self.generation_fence.active_attempt = None;
393 self.stop_state = ChildStopState::NoActiveAttempt;
394 self.stop_deadline_at_unix_nanos = None;
395 self.stale_event_attempt = None;
396 self.registry_identity_anchor_for_spawn_attempt = None;
397 }
398
399 pub fn clear_instance(&mut self) {
401 self.generation = None;
402 self.attempt = None;
403 self.status = ChildAttemptStatus::Stopped;
404 self.generation_fence.active_generation = None;
405 self.generation_fence.active_attempt = None;
406 self.cancellation_token = None;
407 self.abort_handle = None;
408 self.completion_receiver = None;
409 self.heartbeat_receiver = None;
410 self.readiness_receiver = None;
411 self.attempt_cancel_delivered = false;
412 self.abort_requested = false;
413 self.stop_deadline_at_unix_nanos = None;
414 self.stale_event_attempt = None;
415 self.registry_identity_anchor_for_spawn_attempt = None;
416 self.stop_state = ChildStopState::NoActiveAttempt;
417 }
418
419 pub fn has_active_attempt(&self) -> bool {
429 self.attempt.is_some() && self.cancellation_token.is_some()
430 }
431
432 pub fn cancel(&mut self) -> bool {
442 let Some(token) = &self.cancellation_token else {
443 return false;
444 };
445 if self.attempt_cancel_delivered {
446 return false;
447 }
448 token.cancel();
449 self.attempt_cancel_delivered = true;
450 self.status = ChildAttemptStatus::Cancelling;
451 true
452 }
453
454 pub fn abort(&mut self) -> bool {
464 let Some(handle) = &self.abort_handle else {
465 return false;
466 };
467 if self.abort_requested {
468 return false;
469 }
470 handle.abort();
471 self.abort_requested = true;
472 true
473 }
474
475 pub async fn wait_for_report(&mut self) -> Result<ChildRunReport, SupervisorError> {
485 let Some(receiver) = &mut self.completion_receiver else {
486 return Err(SupervisorError::InvalidTransition {
487 message: "child slot has no active completion receiver".to_owned(),
488 });
489 };
490 wait_for_report(receiver).await
491 }
492
493 pub fn observe_liveness(&mut self, now_unix_nanos: u128) -> ChildLivenessState {
503 if let Some(receiver) = &self.heartbeat_receiver {
504 let heartbeat = *receiver.borrow();
505 if heartbeat.is_some() {
506 self.last_heartbeat_at = Some(now_unix_nanos);
507 }
508 }
509 let readiness = if let Some(receiver) = &self.readiness_receiver {
510 let r = *receiver.borrow();
511 if r == ReadinessState::Ready {
512 self.last_ready_at = Some(now_unix_nanos);
513 }
514 r
515 } else {
516 ReadinessState::Unreported
517 };
518 let heartbeat_stale = self.last_heartbeat_at.is_some_and(|heartbeat| {
519 let elapsed_nanos = now_unix_nanos.saturating_sub(heartbeat);
520 elapsed_nanos >= Duration::from_secs(DEFAULT_HEARTBEAT_TIMEOUT_SECS).as_nanos()
521 });
522 ChildLivenessState::new(self.last_heartbeat_at, heartbeat_stale, readiness)
523 }
524
525 pub fn update_restart_limit(
539 &mut self,
540 window: Duration,
541 limit: u32,
542 used: u32,
543 time_base: &RuntimeTimeBase,
544 ) -> RestartLimitState {
545 let mut updated_at = time_base.now_unix_nanos();
546 if updated_at <= self.restart_limit.updated_at_unix_nanos {
547 updated_at = self.restart_limit.updated_at_unix_nanos.saturating_add(1);
548 }
549 self.restart_limit = RestartLimitState {
550 window,
551 limit,
552 used,
553 remaining: limit.saturating_sub(used),
554 exhausted: used >= limit,
555 updated_at_unix_nanos: updated_at,
556 };
557 self.restart_limit.clone()
558 }
559
560 pub fn refresh_restart_limit(
562 &mut self,
563 window: Duration,
564 limit: u32,
565 count_failure: bool,
566 time_base: &RuntimeTimeBase,
567 ) -> RestartLimitState {
568 let now = time_base.now_unix_nanos();
569 let used = self
570 .restart_limit_tracker
571 .refresh(now, window, count_failure);
572 self.update_restart_limit(window, limit, used, time_base)
573 }
574
575 pub fn to_record(&self, liveness: ChildLivenessState) -> ChildRuntimeRecord {
577 let status = if self.attempt.is_some() {
579 Some(self.status)
580 } else {
581 None
582 };
583 ChildRuntimeRecord::new(
584 self.child_id.clone(),
585 self.path.clone(),
586 self.generation,
587 self.attempt,
588 status,
589 self.operation,
590 liveness,
591 self.restart_limit.clone(),
592 self.stop_state,
593 self.last_control_failure.clone(),
594 self.generation_fence.phase,
595 None, )
597 }
598}