Skip to main content

liminal_server/health/
checks.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4/// Process liveness state returned by the liveness probe.
5#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
6#[serde(rename_all = "snake_case")]
7pub enum HealthState {
8    /// The server process is alive.
9    Healthy,
10    /// The server process is not alive.
11    Unhealthy,
12}
13
14/// Result of the server liveness probe.
15#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
16pub struct HealthStatus {
17    /// Liveness status for the process.
18    pub status: HealthState,
19    /// Optional operator-facing liveness detail.
20    pub message: Option<String>,
21}
22
23impl HealthStatus {
24    /// Returns the healthy liveness status used while the process is running.
25    #[must_use]
26    pub const fn healthy() -> Self {
27        Self {
28            status: HealthState::Healthy,
29            message: None,
30        }
31    }
32
33    /// Returns an unhealthy liveness status with explanatory detail.
34    #[must_use]
35    pub fn unhealthy(message: impl Into<String>) -> Self {
36        Self {
37            status: HealthState::Unhealthy,
38            message: Some(message.into()),
39        }
40    }
41}
42
43/// Returns the server process liveness status.
44///
45/// This is deliberately a liveness probe, not a readiness probe: if the process
46/// can call this function, the process is alive and the result is healthy.
47#[must_use]
48pub const fn health_check() -> HealthStatus {
49    HealthStatus::healthy()
50}
51
52/// Cluster readiness requirement for a startup snapshot.
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
54pub enum ClusterReadiness {
55    /// No cluster configuration is present, so membership is not required.
56    #[default]
57    NotConfigured,
58    /// Cluster configuration is present and membership must be established.
59    Configured {
60        /// Whether beamr distribution membership has been established.
61        membership_established: bool,
62    },
63}
64
65/// Startup state evaluated by the readiness probe.
66#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
67pub struct ReadinessState {
68    /// Whether configuration has loaded, environment overrides applied, and
69    /// validation completed successfully.
70    pub config_loaded: bool,
71    /// Whether the main wire protocol listener is bound and accepting traffic.
72    pub listener_bound: bool,
73    /// Conditional cluster startup state.
74    pub cluster: ClusterReadiness,
75}
76
77impl ReadinessState {
78    /// Creates a startup readiness snapshot.
79    #[must_use]
80    pub const fn new(config_loaded: bool, listener_bound: bool, cluster: ClusterReadiness) -> Self {
81        Self {
82            config_loaded,
83            listener_bound,
84            cluster,
85        }
86    }
87
88    /// Creates a fully ready snapshot for a non-clustered server.
89    #[must_use]
90    pub const fn ready_without_cluster() -> Self {
91        Self::new(true, true, ClusterReadiness::NotConfigured)
92    }
93
94    /// Creates a fully ready snapshot for a clustered server.
95    #[must_use]
96    pub const fn ready_with_cluster() -> Self {
97        Self::new(
98            true,
99            true,
100            ClusterReadiness::Configured {
101                membership_established: true,
102            },
103        )
104    }
105}
106
107/// Readiness conditions that can prevent a server from receiving traffic.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
109#[serde(rename_all = "snake_case")]
110pub enum ReadinessCondition {
111    /// Configuration has not completed loading and validation.
112    ConfigLoaded,
113    /// The main wire protocol listener is not bound and accepting traffic.
114    ListenerBound,
115    /// Cluster configuration is present but membership is not established.
116    ClusterMembershipEstablished,
117}
118
119/// Result of the server readiness probe.
120#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
121pub struct ReadinessStatus {
122    /// True only when all applicable startup gates are satisfied.
123    pub ready: bool,
124    /// Startup gates that are not yet satisfied, in stable evaluation order.
125    pub unmet_conditions: Vec<ReadinessCondition>,
126}
127
128impl ReadinessStatus {
129    /// Creates a readiness status from unmet startup gates.
130    #[must_use]
131    pub fn from_unmet_conditions(unmet_conditions: Vec<ReadinessCondition>) -> Self {
132        Self {
133            ready: unmet_conditions.is_empty(),
134            unmet_conditions,
135        }
136    }
137}
138
139/// Thread-safe readiness state shared with the HTTP endpoint server.
140#[derive(Debug, Clone)]
141pub struct SharedReadinessState {
142    inner: Arc<ReadinessFlags>,
143}
144
145impl SharedReadinessState {
146    /// Creates a shared readiness state from an initial startup snapshot.
147    #[must_use]
148    pub fn new(initial: ReadinessState) -> Self {
149        Self {
150            inner: Arc::new(ReadinessFlags::from_state(initial)),
151        }
152    }
153
154    /// Returns a consistent snapshot of the current readiness flags.
155    #[must_use]
156    pub fn snapshot(&self) -> ReadinessState {
157        let cluster = if self.inner.cluster_configured.load(Ordering::SeqCst) {
158            ClusterReadiness::Configured {
159                membership_established: self
160                    .inner
161                    .cluster_membership_established
162                    .load(Ordering::SeqCst),
163            }
164        } else {
165            ClusterReadiness::NotConfigured
166        };
167
168        ReadinessState::new(
169            self.inner.config_loaded.load(Ordering::SeqCst),
170            self.inner.listener_bound.load(Ordering::SeqCst),
171            cluster,
172        )
173    }
174
175    /// Updates whether configuration loading and validation completed.
176    pub fn set_config_loaded(&self, loaded: bool) {
177        self.inner.config_loaded.store(loaded, Ordering::SeqCst);
178    }
179
180    /// Updates whether the main wire protocol listener is bound.
181    pub fn set_listener_bound(&self, bound: bool) {
182        self.inner.listener_bound.store(bound, Ordering::SeqCst);
183    }
184
185    /// Updates whether cluster configuration is present.
186    pub fn set_cluster_configured(&self, configured: bool) {
187        self.inner
188            .cluster_configured
189            .store(configured, Ordering::SeqCst);
190        if !configured {
191            self.set_cluster_membership_established(false);
192        }
193    }
194
195    /// Updates whether clustered membership is established.
196    pub fn set_cluster_membership_established(&self, established: bool) {
197        self.inner
198            .cluster_membership_established
199            .store(established, Ordering::SeqCst);
200    }
201}
202
203impl Default for SharedReadinessState {
204    fn default() -> Self {
205        Self::new(ReadinessState::default())
206    }
207}
208
209#[derive(Debug)]
210struct ReadinessFlags {
211    config_loaded: AtomicBool,
212    listener_bound: AtomicBool,
213    cluster_configured: AtomicBool,
214    cluster_membership_established: AtomicBool,
215}
216
217impl ReadinessFlags {
218    const fn from_state(state: ReadinessState) -> Self {
219        let (cluster_configured, cluster_membership_established) = match state.cluster {
220            ClusterReadiness::NotConfigured => (false, false),
221            ClusterReadiness::Configured {
222                membership_established,
223            } => (true, membership_established),
224        };
225
226        Self {
227            config_loaded: AtomicBool::new(state.config_loaded),
228            listener_bound: AtomicBool::new(state.listener_bound),
229            cluster_configured: AtomicBool::new(cluster_configured),
230            cluster_membership_established: AtomicBool::new(cluster_membership_established),
231        }
232    }
233}
234
235/// Evaluates whether the server has completed every applicable startup gate.
236#[must_use]
237pub fn readiness_check(state: &ReadinessState) -> ReadinessStatus {
238    let mut unmet_conditions = Vec::new();
239
240    if !state.config_loaded {
241        unmet_conditions.push(ReadinessCondition::ConfigLoaded);
242    }
243
244    if !state.listener_bound {
245        unmet_conditions.push(ReadinessCondition::ListenerBound);
246    }
247
248    if state.cluster
249        == (ClusterReadiness::Configured {
250            membership_established: false,
251        })
252    {
253        unmet_conditions.push(ReadinessCondition::ClusterMembershipEstablished);
254    }
255
256    ReadinessStatus::from_unmet_conditions(unmet_conditions)
257}
258
259#[cfg(test)]
260mod tests {
261    use super::{
262        ClusterReadiness, HealthState, ReadinessCondition, ReadinessState, SharedReadinessState,
263        health_check, readiness_check,
264    };
265
266    #[test]
267    fn health_check_is_always_healthy_liveness() {
268        let status = health_check();
269
270        assert_eq!(status.status, HealthState::Healthy);
271        assert!(status.message.is_none());
272    }
273
274    #[test]
275    fn readiness_reports_missing_config() {
276        let state = ReadinessState::new(false, true, ClusterReadiness::NotConfigured);
277
278        let status = readiness_check(&state);
279
280        assert!(!status.ready);
281        assert_eq!(
282            status.unmet_conditions,
283            vec![ReadinessCondition::ConfigLoaded]
284        );
285    }
286
287    #[test]
288    fn readiness_reports_missing_listener() {
289        let state = ReadinessState::new(true, false, ClusterReadiness::NotConfigured);
290
291        let status = readiness_check(&state);
292
293        assert!(!status.ready);
294        assert_eq!(
295            status.unmet_conditions,
296            vec![ReadinessCondition::ListenerBound]
297        );
298    }
299
300    #[test]
301    fn readiness_requires_cluster_membership_when_configured() {
302        let state = ReadinessState::new(
303            true,
304            true,
305            ClusterReadiness::Configured {
306                membership_established: false,
307            },
308        );
309
310        let status = readiness_check(&state);
311
312        assert!(!status.ready);
313        assert_eq!(
314            status.unmet_conditions,
315            vec![ReadinessCondition::ClusterMembershipEstablished]
316        );
317    }
318
319    #[test]
320    fn readiness_ignores_cluster_membership_when_not_configured() {
321        let state = ReadinessState::ready_without_cluster();
322
323        let status = readiness_check(&state);
324
325        assert!(status.ready);
326        assert!(status.unmet_conditions.is_empty());
327    }
328
329    #[test]
330    fn readiness_is_ready_only_when_all_applicable_conditions_are_met() {
331        let state = ReadinessState::ready_with_cluster();
332
333        let status = readiness_check(&state);
334
335        assert!(status.ready);
336        assert!(status.unmet_conditions.is_empty());
337    }
338
339    #[test]
340    fn shared_readiness_state_snapshots_updates() {
341        let shared = SharedReadinessState::default();
342        shared.set_config_loaded(true);
343        shared.set_listener_bound(true);
344        shared.set_cluster_configured(true);
345        shared.set_cluster_membership_established(true);
346
347        assert_eq!(shared.snapshot(), ReadinessState::ready_with_cluster());
348    }
349}