Skip to main content

autumn_web/
probe.rs

1//! Liveness, readiness, and startup probes.
2//!
3//! Autumn exposes explicit cloud-native probe contracts:
4//! - liveness ignores startup and dependency state
5//! - readiness reflects startup completion, shutdown draining, and core dependencies
6//! - startup stays unavailable until startup hooks complete
7
8use std::sync::Arc;
9#[cfg(feature = "db")]
10use std::sync::RwLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use crate::extract::State;
14use axum::Json;
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::Serialize;
18
19/// Trait to abstract the state requirements for probe handlers.
20///
21/// Implement this trait on your application's state type to provide
22/// the necessary dependencies for health/liveness probes.
23/// This prevents tight coupling between probe handlers and the specific `AppState`.
24pub trait ProvideProbeState {
25    /// Returns a reference to the shared [`ProbeState`] that tracks
26    /// lifecycle phases (startup, ready, draining).
27    fn probes(&self) -> &ProbeState;
28
29    /// Returns whether detailed health information (e.g., uptime, pool stats)
30    /// should be included in the response.
31    fn health_detailed(&self) -> bool;
32
33    /// Returns the currently active execution profile (e.g. "dev", "prod").
34    fn profile(&self) -> &str;
35
36    /// Returns a human-readable string displaying how long the application
37    /// has been running (e.g., "2d 4h 13m").
38    fn uptime_display(&self) -> String;
39
40    /// Returns an optional reference to the database connection pool,
41    /// used to evaluate database connectivity during a readiness check.
42    #[cfg(feature = "db")]
43    fn pool(
44        &self,
45    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
46
47    /// Returns an optional read-replica pool for readiness checks.
48    #[cfg(feature = "db")]
49    fn replica_pool(
50        &self,
51    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
52    {
53        None
54    }
55
56    /// Helper method to mark the application startup as complete.
57    ///
58    /// Delegates to [`ProbeState::mark_startup_complete`].
59    ///
60    /// # Examples
61    ///
62    /// ```
63    /// use autumn_web::probe::{ProvideProbeState, ProbeState};
64    ///
65    /// struct MyState { probes: ProbeState }
66    /// impl ProvideProbeState for MyState {
67    ///     fn probes(&self) -> &ProbeState { &self.probes }
68    ///     fn health_detailed(&self) -> bool { false }
69    ///     fn profile(&self) -> &str { "dev" }
70    ///     fn uptime_display(&self) -> String { String::new() }
71    ///     #[cfg(feature = "db")]
72    ///     fn pool(&self) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>> { None }
73    /// }
74    ///
75    /// let state = MyState { probes: ProbeState::pending_startup() };
76    /// assert!(!state.probes().is_startup_complete());
77    /// state.mark_startup_complete();
78    /// assert!(state.probes().is_startup_complete());
79    /// ```
80    fn mark_startup_complete(&self) {
81        self.probes().mark_startup_complete();
82    }
83}
84
85/// Shared probe lifecycle state stored in `AppState`.
86#[derive(Clone, Debug, Default)]
87pub struct ProbeState {
88    startup_complete: Arc<AtomicBool>,
89    shutting_down: Arc<AtomicBool>,
90    #[cfg(feature = "db")]
91    replica_dependency: Arc<RwLock<ReplicaDependency>>,
92}
93
94#[cfg(feature = "db")]
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub(crate) struct ReplicaMigrationCheck {
97    pub(crate) primary_url: String,
98    pub(crate) replica_url: String,
99}
100
101#[cfg(feature = "db")]
102#[derive(Clone, Debug, PartialEq, Eq)]
103struct ReplicaDependency {
104    configured: bool,
105    fallback: crate::config::ReplicaFallback,
106    connection_ready: bool,
107    migrations_ready: bool,
108    migration_check: Option<ReplicaMigrationCheck>,
109    detail: Option<String>,
110}
111
112#[cfg(feature = "db")]
113impl Default for ReplicaDependency {
114    fn default() -> Self {
115        Self {
116            configured: false,
117            fallback: crate::config::ReplicaFallback::default(),
118            connection_ready: true,
119            migrations_ready: true,
120            migration_check: None,
121            detail: None,
122        }
123    }
124}
125
126impl ProbeState {
127    /// Create a probe state that starts in pending-startup mode.
128    #[must_use]
129    pub fn pending_startup() -> Self {
130        Self::default()
131    }
132
133    /// Alias for pending startup used by application bootstrapping.
134    #[must_use]
135    pub fn starting() -> Self {
136        Self::pending_startup()
137    }
138
139    /// Create a probe state that is immediately ready.
140    #[must_use]
141    pub fn ready_for_test() -> Self {
142        let state = Self::pending_startup();
143        state.mark_startup_complete();
144        state
145    }
146
147    /// Mark startup as complete and readiness eligible.
148    pub fn mark_startup_complete(&self) {
149        self.startup_complete.store(true, Ordering::Relaxed);
150    }
151
152    /// Override startup completion for tests.
153    pub fn set_startup_complete(&self, complete: bool) {
154        self.startup_complete.store(complete, Ordering::Relaxed);
155    }
156
157    /// Mark the application as shutting down so readiness flips false.
158    pub fn begin_shutdown(&self) {
159        self.shutting_down.store(true, Ordering::Relaxed);
160    }
161
162    /// Alias for readiness drain used during graceful shutdown.
163    pub fn begin_draining(&self) {
164        self.begin_shutdown();
165    }
166
167    /// Override shutdown-draining state for tests.
168    pub fn set_draining(&self, draining: bool) {
169        self.shutting_down.store(draining, Ordering::Relaxed);
170    }
171
172    /// Configure runtime readiness behavior for a read replica.
173    ///
174    /// # Panics
175    ///
176    /// Panics if the replica dependency lock is poisoned.
177    #[cfg(feature = "db")]
178    pub fn configure_replica_dependency(&self, fallback: crate::config::ReplicaFallback) {
179        let mut dependency = self
180            .replica_dependency
181            .write()
182            .expect("replica dependency lock poisoned");
183        *dependency = ReplicaDependency {
184            configured: true,
185            fallback,
186            connection_ready: false,
187            migrations_ready: true,
188            migration_check: None,
189            detail: Some("replica has not passed a readiness check".to_owned()),
190        };
191    }
192
193    /// Store URLs needed to retry replica migration readiness checks.
194    #[cfg(feature = "db")]
195    pub(crate) fn configure_replica_migration_check(
196        &self,
197        primary_url: impl Into<String>,
198        replica_url: impl Into<String>,
199    ) {
200        let mut dependency = self
201            .replica_dependency
202            .write()
203            .expect("replica dependency lock poisoned");
204        dependency.migration_check = Some(ReplicaMigrationCheck {
205            primary_url: primary_url.into(),
206            replica_url: replica_url.into(),
207        });
208    }
209
210    /// Mark the configured read replica as reachable.
211    ///
212    /// # Panics
213    ///
214    /// Panics if the replica dependency lock is poisoned.
215    #[cfg(feature = "db")]
216    pub fn mark_replica_connection_ready(&self) {
217        let mut dependency = self
218            .replica_dependency
219            .write()
220            .expect("replica dependency lock poisoned");
221        dependency.connection_ready = true;
222        if dependency.migrations_ready {
223            dependency.detail = None;
224        }
225    }
226
227    /// Mark the configured read replica as unreachable.
228    ///
229    /// # Panics
230    ///
231    /// Panics if the replica dependency lock is poisoned.
232    #[cfg(feature = "db")]
233    pub fn mark_replica_connection_unready(&self, detail: impl Into<String>) {
234        let mut dependency = self
235            .replica_dependency
236            .write()
237            .expect("replica dependency lock poisoned");
238        dependency.connection_ready = false;
239        dependency.detail = Some(detail.into());
240    }
241
242    /// Mark the configured read replica's migration state as current.
243    ///
244    /// # Panics
245    ///
246    /// Panics if the replica dependency lock is poisoned.
247    #[cfg(feature = "db")]
248    pub fn mark_replica_migrations_ready(&self) {
249        let mut dependency = self
250            .replica_dependency
251            .write()
252            .expect("replica dependency lock poisoned");
253        dependency.migrations_ready = true;
254        if dependency.connection_ready {
255            dependency.detail = None;
256        }
257    }
258
259    /// Mark the configured read replica's migration state as stale.
260    #[cfg(feature = "db")]
261    pub(crate) fn mark_replica_migrations_unready(&self, detail: impl Into<String>) {
262        let mut dependency = self
263            .replica_dependency
264            .write()
265            .expect("replica dependency lock poisoned");
266        dependency.migrations_ready = false;
267        dependency.detail = Some(detail.into());
268    }
269
270    /// Mark the configured read replica as ready.
271    ///
272    /// # Panics
273    ///
274    /// Panics if the replica dependency lock is poisoned.
275    #[cfg(feature = "db")]
276    pub fn mark_replica_ready(&self) {
277        let mut dependency = self
278            .replica_dependency
279            .write()
280            .expect("replica dependency lock poisoned");
281        dependency.connection_ready = true;
282        dependency.migrations_ready = true;
283        dependency.detail = None;
284    }
285
286    /// Mark the configured read replica as unavailable or stale.
287    ///
288    /// # Panics
289    ///
290    /// Panics if the replica dependency lock is poisoned.
291    #[cfg(feature = "db")]
292    pub fn mark_replica_unready(&self, detail: impl Into<String>) {
293        let mut dependency = self
294            .replica_dependency
295            .write()
296            .expect("replica dependency lock poisoned");
297        dependency.connection_ready = false;
298        dependency.migrations_ready = false;
299        dependency.detail = Some(detail.into());
300    }
301
302    /// Returns whether startup completed successfully.
303    #[must_use]
304    pub fn is_startup_complete(&self) -> bool {
305        self.startup_complete.load(Ordering::Relaxed)
306    }
307
308    /// Returns whether graceful shutdown has started.
309    #[must_use]
310    pub fn is_shutting_down(&self) -> bool {
311        self.shutting_down.load(Ordering::Relaxed)
312    }
313
314    /// Returns whether readiness is currently draining.
315    #[must_use]
316    pub fn draining(&self) -> bool {
317        self.is_shutting_down()
318    }
319
320    #[cfg(feature = "db")]
321    pub(crate) fn replica_allows_readiness(&self) -> bool {
322        let dependency = self
323            .replica_dependency
324            .read()
325            .expect("replica dependency lock poisoned");
326        let ready = dependency.connection_ready && dependency.migrations_ready;
327        !dependency.configured
328            || ready
329            || matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
330    }
331
332    #[cfg(feature = "db")]
333    pub(crate) fn should_route_reads_to_replica(&self) -> bool {
334        let dependency = self
335            .replica_dependency
336            .read()
337            .expect("replica dependency lock poisoned");
338        !dependency.configured || (dependency.connection_ready && dependency.migrations_ready)
339    }
340
341    #[cfg(feature = "db")]
342    pub(crate) fn should_fallback_reads_to_primary(&self) -> bool {
343        let dependency = self
344            .replica_dependency
345            .read()
346            .expect("replica dependency lock poisoned");
347        dependency.configured
348            && !(dependency.connection_ready && dependency.migrations_ready)
349            && matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
350    }
351
352    #[cfg(feature = "db")]
353    pub(crate) fn replica_configured(&self) -> bool {
354        self.replica_dependency
355            .read()
356            .expect("replica dependency lock poisoned")
357            .configured
358    }
359
360    #[cfg(feature = "db")]
361    pub(crate) fn replica_migration_check(&self) -> Option<ReplicaMigrationCheck> {
362        self.replica_dependency
363            .read()
364            .expect("replica dependency lock poisoned")
365            .migration_check
366            .clone()
367    }
368}
369
370#[derive(Clone, Copy)]
371enum ProbeKind {
372    Live,
373    Ready,
374    Startup,
375}
376
377#[derive(Serialize)]
378pub(crate) struct ProbeResponse {
379    status: &'static str,
380    #[serde(skip_serializing_if = "Option::is_none")]
381    version: Option<&'static str>,
382    #[serde(skip_serializing_if = "Option::is_none")]
383    profile: Option<String>,
384    #[serde(skip_serializing_if = "Option::is_none")]
385    uptime: Option<String>,
386    #[serde(skip_serializing_if = "Option::is_none")]
387    pool: Option<PoolStatus>,
388}
389
390#[derive(Serialize)]
391pub(crate) struct PoolStatus {
392    size: u64,
393    available: u64,
394    waiting: u64,
395}
396
397#[allow(clippy::missing_const_for_fn, unused_variables)]
398fn dependency_readiness<S: ProvideProbeState>(state: &S) -> (bool, Option<PoolStatus>) {
399    #[cfg(feature = "db")]
400    {
401        let replica_ready_for_policy = state.probes().replica_allows_readiness();
402        let (pool_ready, pool_status) = state.pool().map_or((true, None), |pool| {
403            let status = pool.status();
404            let available = status.available as u64;
405            let size = status.max_size as u64;
406            let waiting = status.waiting as u64;
407
408            (
409                available > 0 || waiting == 0,
410                Some(PoolStatus {
411                    size,
412                    available,
413                    waiting,
414                }),
415            )
416        });
417
418        (pool_ready && replica_ready_for_policy, pool_status)
419    }
420
421    #[cfg(not(feature = "db"))]
422    {
423        (true, None)
424    }
425}
426
427#[cfg(feature = "db")]
428async fn refresh_replica_readiness<S: ProvideProbeState + Sync>(state: &S) {
429    if !state.probes().replica_configured() {
430        return;
431    }
432
433    let Some(replica_pool) = state.replica_pool() else {
434        state
435            .probes()
436            .mark_replica_connection_unready("replica pool is not available");
437        return;
438    };
439
440    match replica_pool.get().await {
441        Ok(conn) => {
442            drop(conn);
443            state.probes().mark_replica_connection_ready();
444            refresh_replica_migration_readiness(state).await;
445        }
446        Err(error) => state
447            .probes()
448            .mark_replica_connection_unready(format!("replica connection failed: {error}")),
449    }
450}
451
452#[cfg(feature = "db")]
453async fn refresh_replica_migration_readiness<S: ProvideProbeState + Sync>(state: &S) {
454    refresh_replica_migration_readiness_with(state, |check| {
455        crate::migrate::check_replica_migration_readiness_blocking(
456            check.primary_url,
457            check.replica_url,
458        )
459    })
460    .await;
461}
462
463#[cfg(feature = "db")]
464async fn refresh_replica_migration_readiness_with<S, F, Fut>(state: &S, check_readiness: F)
465where
466    S: ProvideProbeState + Sync,
467    F: FnOnce(ReplicaMigrationCheck) -> Fut,
468    Fut: std::future::Future<Output = crate::migrate::ReplicaMigrationReadiness>,
469{
470    let Some(check) = state.probes().replica_migration_check() else {
471        return;
472    };
473
474    let readiness = check_readiness(check).await;
475
476    if readiness.is_ready() {
477        state.probes().mark_replica_migrations_ready();
478    } else if let Some(detail) = readiness.detail() {
479        state.probes().mark_replica_migrations_unready(detail);
480    }
481}
482
483fn probe_response<S: ProvideProbeState>(
484    state: &S,
485    kind: ProbeKind,
486) -> (StatusCode, Json<ProbeResponse>) {
487    let startup_complete = state.probes().is_startup_complete();
488    let shutting_down = state.probes().is_shutting_down();
489    let (dependencies_ready, pool_status) = dependency_readiness(state);
490
491    let (status_code, status) = match kind {
492        ProbeKind::Live => (StatusCode::OK, "ok"),
493        ProbeKind::Startup if startup_complete => (StatusCode::OK, "ok"),
494        ProbeKind::Startup => (StatusCode::SERVICE_UNAVAILABLE, "starting"),
495        ProbeKind::Ready if startup_complete && !shutting_down && dependencies_ready => {
496            (StatusCode::OK, "ok")
497        }
498        ProbeKind::Ready => (StatusCode::SERVICE_UNAVAILABLE, "degraded"),
499    };
500
501    let detailed = state.health_detailed();
502    let body = ProbeResponse {
503        status,
504        version: if detailed {
505            Some(env!("CARGO_PKG_VERSION"))
506        } else {
507            None
508        },
509        profile: if detailed {
510            Some(state.profile().to_owned())
511        } else {
512            None
513        },
514        uptime: if detailed {
515            Some(state.uptime_display())
516        } else {
517            None
518        },
519        pool: if detailed { pool_status } else { None },
520    };
521
522    (status_code, Json(body))
523}
524
525/// `GET /live`
526pub async fn live_handler<S: ProvideProbeState + Send + Sync + 'static>(
527    State(state): State<S>,
528) -> impl IntoResponse {
529    probe_response(&state, ProbeKind::Live)
530}
531
532/// `GET /ready`
533pub async fn ready_handler<S: ProvideProbeState + Send + Sync + 'static>(
534    State(state): State<S>,
535) -> impl IntoResponse {
536    #[cfg(feature = "db")]
537    refresh_replica_readiness(&state).await;
538    probe_response(&state, ProbeKind::Ready)
539}
540
541/// `GET /startup`
542pub async fn startup_handler<S: ProvideProbeState + Send + Sync + 'static>(
543    State(state): State<S>,
544) -> impl IntoResponse {
545    probe_response(&state, ProbeKind::Startup)
546}
547
548/// Compatibility alias for the legacy `/health` endpoint.
549pub(crate) async fn readiness_response<S: ProvideProbeState + Sync>(
550    state: &S,
551) -> (StatusCode, Json<ProbeResponse>) {
552    #[cfg(feature = "db")]
553    refresh_replica_readiness(state).await;
554    probe_response(state, ProbeKind::Ready)
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    struct TestProbeState {
562        probes: ProbeState,
563        health_detailed: bool,
564        profile: String,
565    }
566
567    impl ProvideProbeState for TestProbeState {
568        fn probes(&self) -> &ProbeState {
569            &self.probes
570        }
571
572        fn health_detailed(&self) -> bool {
573            self.health_detailed
574        }
575
576        fn profile(&self) -> &str {
577            &self.profile
578        }
579
580        fn uptime_display(&self) -> String {
581            "test uptime".to_string()
582        }
583
584        #[cfg(feature = "db")]
585        fn pool(
586            &self,
587        ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
588        {
589            None
590        }
591    }
592
593    impl TestProbeState {
594        fn new() -> Self {
595            Self {
596                probes: ProbeState::pending_startup(),
597                health_detailed: true,
598                profile: "test".to_string(),
599            }
600        }
601    }
602
603    #[test]
604    fn test_live_handler_returns_ok() {
605        let state = TestProbeState::new();
606        let (status, Json(response)) = probe_response(&state, ProbeKind::Live);
607        assert_eq!(status, StatusCode::OK);
608        assert_eq!(response.status, "ok");
609    }
610
611    #[tokio::test]
612    async fn test_startup_handler_pending() {
613        let state = TestProbeState::new();
614        let (status, Json(response)) = probe_response(&state, ProbeKind::Startup);
615        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
616        assert_eq!(response.status, "starting");
617    }
618
619    #[tokio::test]
620    async fn test_startup_handler_complete() {
621        let state = TestProbeState::new();
622        state.mark_startup_complete();
623        let (status, Json(response)) = probe_response(&state, ProbeKind::Startup);
624        assert_eq!(status, StatusCode::OK);
625        assert_eq!(response.status, "ok");
626    }
627
628    #[tokio::test]
629    async fn test_ready_handler_pending_startup() {
630        let state = TestProbeState::new();
631        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
632        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
633        assert_eq!(response.status, "degraded");
634    }
635
636    #[tokio::test]
637    async fn test_ready_handler_complete_startup() {
638        let state = TestProbeState::new();
639        state.mark_startup_complete();
640        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
641        assert_eq!(status, StatusCode::OK);
642        assert_eq!(response.status, "ok");
643    }
644
645    #[tokio::test]
646    async fn test_ready_handler_shutting_down() {
647        let state = TestProbeState::new();
648        state.mark_startup_complete();
649        state.probes().begin_shutdown();
650        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
651        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
652        assert_eq!(response.status, "degraded");
653    }
654
655    #[cfg(feature = "db")]
656    #[tokio::test]
657    async fn ready_fails_when_replica_is_unready_and_policy_is_fail_readiness() {
658        let state = TestProbeState::new();
659        state.mark_startup_complete();
660        state
661            .probes()
662            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
663        state
664            .probes()
665            .mark_replica_unready("replica migrations lag primary");
666
667        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
668
669        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
670        assert_eq!(response.status, "degraded");
671    }
672
673    #[cfg(feature = "db")]
674    #[tokio::test]
675    async fn ready_fails_when_replica_is_configured_but_not_checked() {
676        let state = TestProbeState::new();
677        state.mark_startup_complete();
678        state
679            .probes()
680            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
681
682        let (status, Json(response)) = readiness_response(&state).await;
683
684        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
685        assert_eq!(response.status, "degraded");
686    }
687
688    #[cfg(feature = "db")]
689    #[test]
690    fn replica_migration_lag_can_recover_without_resetting_connection_readiness() {
691        let probes = ProbeState::ready_for_test();
692        probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
693        probes.mark_replica_connection_ready();
694        probes.mark_replica_migrations_unready("replica migrations lag primary");
695
696        assert!(!probes.replica_allows_readiness());
697        assert!(!probes.should_route_reads_to_replica());
698
699        probes.mark_replica_connection_ready();
700        assert!(!probes.replica_allows_readiness());
701        assert!(!probes.should_route_reads_to_replica());
702
703        probes.mark_replica_migrations_ready();
704        assert!(probes.replica_allows_readiness());
705        assert!(probes.should_route_reads_to_replica());
706    }
707
708    #[cfg(feature = "db")]
709    #[test]
710    fn replica_migration_retry_urls_are_stored_for_readiness_rechecks() {
711        let probes = ProbeState::ready_for_test();
712        probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
713        probes.configure_replica_migration_check(
714            "postgres://localhost/primary",
715            "postgres://localhost/replica",
716        );
717
718        let check = probes
719            .replica_migration_check()
720            .expect("migration check should be configured");
721
722        assert_eq!(check.primary_url, "postgres://localhost/primary");
723        assert_eq!(check.replica_url, "postgres://localhost/replica");
724    }
725
726    #[cfg(feature = "db")]
727    #[tokio::test]
728    async fn replica_migration_readiness_rechecks_after_initial_ready_state() {
729        let state = TestProbeState::new();
730        state.mark_startup_complete();
731        state
732            .probes()
733            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
734        state.probes().configure_replica_migration_check(
735            "postgres://localhost/primary",
736            "postgres://localhost/replica",
737        );
738        state.probes().mark_replica_connection_ready();
739        state.probes().mark_replica_migrations_ready();
740
741        let checks = std::sync::atomic::AtomicUsize::new(0);
742        refresh_replica_migration_readiness_with(&state, |check| {
743            checks.fetch_add(1, Ordering::Relaxed);
744            assert_eq!(check.primary_url, "postgres://localhost/primary");
745            assert_eq!(check.replica_url, "postgres://localhost/replica");
746            std::future::ready(crate::migrate::ReplicaMigrationReadiness::Stale {
747                primary_latest: Some("20260511000000".to_owned()),
748                replica_latest: Some("20260510000000".to_owned()),
749            })
750        })
751        .await;
752
753        assert_eq!(checks.load(Ordering::Relaxed), 1);
754        assert!(!state.probes().replica_allows_readiness());
755        assert!(!state.probes().should_route_reads_to_replica());
756    }
757
758    #[cfg(feature = "db")]
759    #[tokio::test]
760    async fn ready_allows_primary_fallback_when_replica_is_unready() {
761        let state = TestProbeState::new();
762        state.mark_startup_complete();
763        state
764            .probes()
765            .configure_replica_dependency(crate::config::ReplicaFallback::Primary);
766        state
767            .probes()
768            .mark_replica_unready("replica migrations lag primary");
769
770        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
771
772        assert_eq!(status, StatusCode::OK);
773        assert_eq!(response.status, "ok");
774    }
775
776    #[tokio::test]
777    async fn test_probe_state_set_draining() {
778        let state = ProbeState::starting();
779        assert!(!state.draining());
780        state.set_draining(true);
781        assert!(state.draining());
782    }
783
784    #[tokio::test]
785    async fn test_probe_state_set_startup_complete() {
786        let state = ProbeState::starting();
787        assert!(!state.is_startup_complete());
788        state.set_startup_complete(true);
789        assert!(state.is_startup_complete());
790    }
791
792    #[tokio::test]
793    async fn test_ready_for_test() {
794        let state = ProbeState::ready_for_test();
795        assert!(state.is_startup_complete());
796    }
797
798    #[tokio::test]
799    async fn test_health_detailed_false() {
800        let mut state = TestProbeState::new();
801        state.health_detailed = false;
802
803        let (_, Json(response)) = probe_response(&state, ProbeKind::Live);
804        assert!(response.version.is_none());
805        assert!(response.profile.is_none());
806        assert!(response.uptime.is_none());
807        assert!(response.pool.is_none());
808    }
809
810    #[tokio::test]
811    async fn test_begin_draining() {
812        let state = ProbeState::ready_for_test();
813        assert!(!state.draining());
814        state.begin_draining();
815        assert!(state.draining());
816    }
817}