Skip to main content

supervised/
readiness.rs

1//! Startup readiness primitives.
2use std::{
3    fmt,
4    sync::{
5        atomic::{AtomicBool, AtomicUsize, Ordering},
6        Arc,
7    },
8};
9
10use tokio::sync::watch;
11
12/// Aggregate startup readiness reported by a running
13/// [`Supervisor`](crate::Supervisor).
14///
15/// Readiness is deliberately narrower than liveness: it only answers whether
16/// all services registered with [`ReadinessMode::Explicit`] have crossed their
17/// startup gate at least once, either by signaling readiness or completing
18/// successfully.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum SupervisorReadiness {
21    /// At least one startup-gated service has not crossed its startup gate.
22    Pending,
23    /// Every startup-gated service has crossed its startup gate.
24    Ready,
25}
26
27/// Per-registration startup readiness behavior.
28///
29/// The default is [`ReadinessMode::Immediate`], which keeps ordinary services
30/// out of the readiness path. Use [`ReadinessMode::Explicit`] when the
31/// supervisor should wait for the service to finish meaningful startup work by
32/// either calling [`ReadySignal::mark_ready`] or returning
33/// [`ServiceOutcome::Completed`](crate::ServiceOutcome::Completed).
34#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
35pub enum ReadinessMode {
36    /// The service counts as ready as soon as it is spawned.
37    #[default]
38    Immediate,
39    /// The service must either call [`ReadySignal::mark_ready`] or complete
40    /// successfully before any other terminal outcome.
41    Explicit,
42}
43
44/// Service-local handle used to declare startup readiness.
45///
46/// The handle is intentionally tiny and idempotent. It does not expose
47/// snapshots, failure marking, partitions, or policy decisions; those remain
48/// supervisor-owned concepts.
49#[derive(Clone)]
50pub struct ReadySignal {
51    inner: ReadySignalInner,
52}
53
54impl ReadySignal {
55    pub(crate) fn immediate() -> Self {
56        Self {
57            inner: ReadySignalInner::Immediate,
58        }
59    }
60
61    pub(crate) fn explicit(tracker: ReadinessTracker, index: usize) -> Self {
62        Self {
63            inner: ReadySignalInner::Explicit { tracker, index },
64        }
65    }
66
67    /// Marks this service ready if it participates in explicit startup
68    /// readiness. Calling this more than once is harmless.
69    pub fn mark_ready(&self) {
70        match &self.inner {
71            ReadySignalInner::Immediate => {},
72            ReadySignalInner::Explicit { tracker, index } => {
73                tracker.mark_ready(*index);
74            },
75        }
76    }
77}
78
79impl fmt::Debug for ReadySignal {
80    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
81        match &self.inner {
82            ReadySignalInner::Immediate => formatter.write_str("ReadySignal::Immediate"),
83            ReadySignalInner::Explicit { index, .. } => formatter
84                .debug_struct("ReadySignal::Explicit")
85                .field("index", index)
86                .finish_non_exhaustive(),
87        }
88    }
89}
90
91#[derive(Clone)]
92enum ReadySignalInner {
93    Immediate,
94    Explicit {
95        tracker: ReadinessTracker,
96        index: usize,
97    },
98}
99
100#[derive(Clone)]
101pub(crate) struct ReadinessTracker {
102    inner: Arc<ReadinessInner>,
103}
104
105impl ReadinessTracker {
106    pub(crate) fn new(modes: impl IntoIterator<Item = ReadinessMode>) -> Self {
107        let mut pending = 0;
108        let members = modes
109            .into_iter()
110            .map(|mode| match mode {
111                ReadinessMode::Immediate => MemberReadiness::Immediate,
112                ReadinessMode::Explicit => {
113                    pending += 1;
114                    MemberReadiness::Explicit {
115                        ready: AtomicBool::new(false),
116                    }
117                },
118            })
119            .collect::<Vec<_>>();
120        let state = readiness_state(pending);
121        let (sender, _) = watch::channel(state);
122
123        Self {
124            inner: Arc::new(ReadinessInner {
125                members,
126                pending: AtomicUsize::new(pending),
127                sender,
128            }),
129        }
130    }
131
132    pub(crate) fn signal(&self, index: usize, mode: ReadinessMode) -> ReadySignal {
133        match mode {
134            ReadinessMode::Immediate => ReadySignal::immediate(),
135            ReadinessMode::Explicit => ReadySignal::explicit(self.clone(), index),
136        }
137    }
138
139    pub(crate) fn subscribe(&self) -> watch::Receiver<SupervisorReadiness> {
140        self.inner.sender.subscribe()
141    }
142
143    pub(crate) fn state(&self) -> SupervisorReadiness {
144        readiness_state(self.inner.pending.load(Ordering::Acquire))
145    }
146
147    pub(crate) fn is_ready(&self, index: usize) -> bool {
148        self.inner
149            .members
150            .get(index)
151            .is_some_and(MemberReadiness::is_ready)
152    }
153
154    pub(crate) fn mark_ready(&self, index: usize) {
155        let Some(member) = self.inner.members.get(index) else {
156            return;
157        };
158
159        if member.mark_ready() {
160            let previous = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
161            if previous == 1 {
162                let _ = self.inner.sender.send(SupervisorReadiness::Ready);
163            }
164        }
165    }
166}
167
168struct ReadinessInner {
169    members: Vec<MemberReadiness>,
170    pending: AtomicUsize,
171    sender: watch::Sender<SupervisorReadiness>,
172}
173
174enum MemberReadiness {
175    Immediate,
176    Explicit { ready: AtomicBool },
177}
178
179impl MemberReadiness {
180    pub(crate) fn is_ready(&self) -> bool {
181        match self {
182            Self::Immediate => true,
183            Self::Explicit { ready } => ready.load(Ordering::Acquire),
184        }
185    }
186
187    pub(crate) fn mark_ready(&self) -> bool {
188        match self {
189            Self::Immediate => false,
190            Self::Explicit { ready } => ready
191                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
192                .is_ok(),
193        }
194    }
195}
196
197fn readiness_state(pending: usize) -> SupervisorReadiness {
198    if pending == 0 {
199        SupervisorReadiness::Ready
200    } else {
201        SupervisorReadiness::Pending
202    }
203}