Skip to main content

autumn_web/
probe.rs

1//! Liveness, readiness, and startup probes.
2//!
3//! Autumn exposes explicit cloud-native probe contracts:
4//! - liveness ignores startup and dependency state
5//! - readiness reflects startup completion, shutdown draining, and core dependencies
6//! - startup stays unavailable until startup hooks complete
7
8use std::sync::Arc;
9#[cfg(feature = "db")]
10use std::sync::RwLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use crate::extract::State;
14use axum::Json;
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::Serialize;
18
19/// Trait to abstract the state requirements for probe handlers.
20///
21/// Implement this trait on your application's state type to provide
22/// the necessary dependencies for health/liveness probes.
23/// This prevents tight coupling between probe handlers and the specific `AppState`.
24pub trait ProvideProbeState {
25    /// Returns a reference to the shared [`ProbeState`] that tracks
26    /// lifecycle phases (startup, ready, draining).
27    fn probes(&self) -> &ProbeState;
28
29    /// Returns whether detailed health information (e.g., uptime, pool stats)
30    /// should be included in the response.
31    fn health_detailed(&self) -> bool;
32
33    /// Returns the currently active execution profile (e.g. "dev", "prod").
34    fn profile(&self) -> &str;
35
36    /// Returns a human-readable string displaying how long the application
37    /// has been running (e.g., "2d 4h 13m").
38    fn uptime_display(&self) -> String;
39
40    /// Returns an optional reference to the database connection pool,
41    /// used to evaluate database connectivity during a readiness check.
42    #[cfg(feature = "db")]
43    fn pool(
44        &self,
45    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
46
47    /// Returns an optional read-replica pool for readiness checks.
48    #[cfg(feature = "db")]
49    fn replica_pool(
50        &self,
51    ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
52    {
53        None
54    }
55
56    /// Returns the registry of [`crate::actuator::HealthIndicator`] implementations.
57    ///
58    /// The default returns `None`. [`crate::AppState`] overrides this to return
59    /// its registry so that `/ready` can run readiness-group indicators.
60    fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
61        None
62    }
63
64    /// Helper method to mark the application startup as complete.
65    ///
66    /// Delegates to [`ProbeState::mark_startup_complete`].
67    ///
68    /// # Examples
69    ///
70    /// ```
71    /// use autumn_web::probe::{ProvideProbeState, ProbeState};
72    ///
73    /// struct MyState { probes: ProbeState }
74    /// impl ProvideProbeState for MyState {
75    ///     fn probes(&self) -> &ProbeState { &self.probes }
76    ///     fn health_detailed(&self) -> bool { false }
77    ///     fn profile(&self) -> &str { "dev" }
78    ///     fn uptime_display(&self) -> String { String::new() }
79    ///     #[cfg(feature = "db")]
80    ///     fn pool(&self) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>> { None }
81    /// }
82    ///
83    /// let state = MyState { probes: ProbeState::pending_startup() };
84    /// assert!(!state.probes().is_startup_complete());
85    /// state.mark_startup_complete();
86    /// assert!(state.probes().is_startup_complete());
87    /// ```
88    fn mark_startup_complete(&self) {
89        self.probes().mark_startup_complete();
90    }
91}
92
93/// Shared probe lifecycle state stored in `AppState`.
94#[derive(Clone, Debug, Default)]
95pub struct ProbeState {
96    startup_complete: Arc<AtomicBool>,
97    shutting_down: Arc<AtomicBool>,
98    #[cfg(feature = "db")]
99    replica_dependency: Arc<RwLock<ReplicaDependency>>,
100}
101
102#[cfg(feature = "db")]
103#[derive(Clone, Debug, PartialEq, Eq)]
104pub(crate) struct ReplicaMigrationCheck {
105    pub(crate) primary_url: String,
106    pub(crate) replica_url: String,
107}
108
109#[cfg(feature = "db")]
110#[derive(Clone, Debug, PartialEq, Eq)]
111struct ReplicaDependency {
112    configured: bool,
113    fallback: crate::config::ReplicaFallback,
114    connection_ready: bool,
115    migrations_ready: bool,
116    migration_check: Option<ReplicaMigrationCheck>,
117    detail: Option<String>,
118}
119
120#[cfg(feature = "db")]
121impl Default for ReplicaDependency {
122    fn default() -> Self {
123        Self {
124            configured: false,
125            fallback: crate::config::ReplicaFallback::default(),
126            connection_ready: true,
127            migrations_ready: true,
128            migration_check: None,
129            detail: None,
130        }
131    }
132}
133
134impl ProbeState {
135    /// Create a probe state that starts in pending-startup mode.
136    #[must_use]
137    pub fn pending_startup() -> Self {
138        Self::default()
139    }
140
141    /// Alias for pending startup used by application bootstrapping.
142    #[must_use]
143    pub fn starting() -> Self {
144        Self::pending_startup()
145    }
146
147    /// Create a probe state that is immediately ready.
148    #[must_use]
149    pub fn ready_for_test() -> Self {
150        let state = Self::pending_startup();
151        state.mark_startup_complete();
152        state
153    }
154
155    /// Mark startup as complete and readiness eligible.
156    pub fn mark_startup_complete(&self) {
157        self.startup_complete.store(true, Ordering::Relaxed);
158    }
159
160    /// Override startup completion for tests.
161    pub fn set_startup_complete(&self, complete: bool) {
162        self.startup_complete.store(complete, Ordering::Relaxed);
163    }
164
165    /// Mark the application as shutting down so readiness flips false.
166    pub fn begin_shutdown(&self) {
167        self.shutting_down.store(true, Ordering::Relaxed);
168    }
169
170    /// Alias for readiness drain used during graceful shutdown.
171    pub fn begin_draining(&self) {
172        self.begin_shutdown();
173    }
174
175    /// Override shutdown-draining state for tests.
176    pub fn set_draining(&self, draining: bool) {
177        self.shutting_down.store(draining, Ordering::Relaxed);
178    }
179
180    /// Configure runtime readiness behavior for a read replica.
181    ///
182    /// # Panics
183    ///
184    /// Panics if the replica dependency lock is poisoned.
185    #[cfg(feature = "db")]
186    pub fn configure_replica_dependency(&self, fallback: crate::config::ReplicaFallback) {
187        let mut dependency = self
188            .replica_dependency
189            .write()
190            .expect("replica dependency lock poisoned");
191        *dependency = ReplicaDependency {
192            configured: true,
193            fallback,
194            connection_ready: false,
195            migrations_ready: true,
196            migration_check: None,
197            detail: Some("replica has not passed a readiness check".to_owned()),
198        };
199    }
200
201    /// Store URLs needed to retry replica migration readiness checks.
202    #[cfg(feature = "db")]
203    pub(crate) fn configure_replica_migration_check(
204        &self,
205        primary_url: impl Into<String>,
206        replica_url: impl Into<String>,
207    ) {
208        let mut dependency = self
209            .replica_dependency
210            .write()
211            .expect("replica dependency lock poisoned");
212        dependency.migration_check = Some(ReplicaMigrationCheck {
213            primary_url: primary_url.into(),
214            replica_url: replica_url.into(),
215        });
216    }
217
218    /// Mark the configured read replica as reachable.
219    ///
220    /// # Panics
221    ///
222    /// Panics if the replica dependency lock is poisoned.
223    #[cfg(feature = "db")]
224    pub fn mark_replica_connection_ready(&self) {
225        let mut dependency = self
226            .replica_dependency
227            .write()
228            .expect("replica dependency lock poisoned");
229        dependency.connection_ready = true;
230        if dependency.migrations_ready {
231            dependency.detail = None;
232        }
233    }
234
235    /// Mark the configured read replica as unreachable.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the replica dependency lock is poisoned.
240    #[cfg(feature = "db")]
241    pub fn mark_replica_connection_unready(&self, detail: impl Into<String>) {
242        let mut dependency = self
243            .replica_dependency
244            .write()
245            .expect("replica dependency lock poisoned");
246        dependency.connection_ready = false;
247        dependency.detail = Some(detail.into());
248    }
249
250    /// Mark the configured read replica's migration state as current.
251    ///
252    /// # Panics
253    ///
254    /// Panics if the replica dependency lock is poisoned.
255    #[cfg(feature = "db")]
256    pub fn mark_replica_migrations_ready(&self) {
257        let mut dependency = self
258            .replica_dependency
259            .write()
260            .expect("replica dependency lock poisoned");
261        dependency.migrations_ready = true;
262        if dependency.connection_ready {
263            dependency.detail = None;
264        }
265    }
266
267    /// Mark the configured read replica's migration state as stale.
268    #[cfg(feature = "db")]
269    pub(crate) fn mark_replica_migrations_unready(&self, detail: impl Into<String>) {
270        let mut dependency = self
271            .replica_dependency
272            .write()
273            .expect("replica dependency lock poisoned");
274        dependency.migrations_ready = false;
275        dependency.detail = Some(detail.into());
276    }
277
278    /// Mark the configured read replica as ready.
279    ///
280    /// # Panics
281    ///
282    /// Panics if the replica dependency lock is poisoned.
283    #[cfg(feature = "db")]
284    pub fn mark_replica_ready(&self) {
285        let mut dependency = self
286            .replica_dependency
287            .write()
288            .expect("replica dependency lock poisoned");
289        dependency.connection_ready = true;
290        dependency.migrations_ready = true;
291        dependency.detail = None;
292    }
293
294    /// Mark the configured read replica as unavailable or stale.
295    ///
296    /// # Panics
297    ///
298    /// Panics if the replica dependency lock is poisoned.
299    #[cfg(feature = "db")]
300    pub fn mark_replica_unready(&self, detail: impl Into<String>) {
301        let mut dependency = self
302            .replica_dependency
303            .write()
304            .expect("replica dependency lock poisoned");
305        dependency.connection_ready = false;
306        dependency.migrations_ready = false;
307        dependency.detail = Some(detail.into());
308    }
309
310    /// Returns whether startup completed successfully.
311    #[must_use]
312    pub fn is_startup_complete(&self) -> bool {
313        self.startup_complete.load(Ordering::Relaxed)
314    }
315
316    /// Returns whether graceful shutdown has started.
317    #[must_use]
318    pub fn is_shutting_down(&self) -> bool {
319        self.shutting_down.load(Ordering::Relaxed)
320    }
321
322    /// Returns whether readiness is currently draining.
323    #[must_use]
324    pub fn draining(&self) -> bool {
325        self.is_shutting_down()
326    }
327
328    #[cfg(feature = "db")]
329    pub(crate) fn replica_allows_readiness(&self) -> bool {
330        let dependency = self
331            .replica_dependency
332            .read()
333            .expect("replica dependency lock poisoned");
334        let ready = dependency.connection_ready && dependency.migrations_ready;
335        !dependency.configured
336            || ready
337            || matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
338    }
339
340    #[cfg(feature = "db")]
341    pub(crate) fn should_route_reads_to_replica(&self) -> bool {
342        let dependency = self
343            .replica_dependency
344            .read()
345            .expect("replica dependency lock poisoned");
346        !dependency.configured || (dependency.connection_ready && dependency.migrations_ready)
347    }
348
349    #[cfg(feature = "db")]
350    pub(crate) fn should_fallback_reads_to_primary(&self) -> bool {
351        let dependency = self
352            .replica_dependency
353            .read()
354            .expect("replica dependency lock poisoned");
355        dependency.configured
356            && !(dependency.connection_ready && dependency.migrations_ready)
357            && matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
358    }
359
360    #[cfg(feature = "db")]
361    pub(crate) fn replica_configured(&self) -> bool {
362        self.replica_dependency
363            .read()
364            .expect("replica dependency lock poisoned")
365            .configured
366    }
367
368    #[cfg(feature = "db")]
369    pub(crate) fn replica_migration_check(&self) -> Option<ReplicaMigrationCheck> {
370        self.replica_dependency
371            .read()
372            .expect("replica dependency lock poisoned")
373            .migration_check
374            .clone()
375    }
376}
377
378#[derive(Clone, Copy)]
379enum ProbeKind {
380    Live,
381    Ready,
382    Startup,
383}
384
385#[derive(Serialize)]
386pub(crate) struct ProbeResponse {
387    status: &'static str,
388    #[serde(skip_serializing_if = "Option::is_none")]
389    version: Option<&'static str>,
390    #[serde(skip_serializing_if = "Option::is_none")]
391    profile: Option<String>,
392    #[serde(skip_serializing_if = "Option::is_none")]
393    uptime: Option<String>,
394    #[serde(skip_serializing_if = "Option::is_none")]
395    pool: Option<PoolStatus>,
396}
397
398#[derive(Serialize)]
399pub(crate) struct PoolStatus {
400    size: u64,
401    available: u64,
402    waiting: u64,
403}
404
405#[allow(clippy::missing_const_for_fn, unused_variables)]
406fn dependency_readiness<S: ProvideProbeState>(state: &S) -> (bool, Option<PoolStatus>) {
407    #[cfg(feature = "db")]
408    {
409        let replica_ready_for_policy = state.probes().replica_allows_readiness();
410        let (pool_ready, pool_status) = state.pool().map_or((true, None), |pool| {
411            let status = pool.status();
412            let available = status.available as u64;
413            let size = status.max_size as u64;
414            let waiting = status.waiting as u64;
415
416            (
417                available > 0 || waiting == 0,
418                Some(PoolStatus {
419                    size,
420                    available,
421                    waiting,
422                }),
423            )
424        });
425
426        (pool_ready && replica_ready_for_policy, pool_status)
427    }
428
429    #[cfg(not(feature = "db"))]
430    {
431        (true, None)
432    }
433}
434
435#[cfg(feature = "db")]
436async fn refresh_replica_readiness<S: ProvideProbeState + Sync>(state: &S) {
437    if !state.probes().replica_configured() {
438        return;
439    }
440
441    let Some(replica_pool) = state.replica_pool() else {
442        state
443            .probes()
444            .mark_replica_connection_unready("replica pool is not available");
445        return;
446    };
447
448    match replica_pool.get().await {
449        Ok(conn) => {
450            drop(conn);
451            state.probes().mark_replica_connection_ready();
452            refresh_replica_migration_readiness(state).await;
453        }
454        Err(error) => state
455            .probes()
456            .mark_replica_connection_unready(format!("replica connection failed: {error}")),
457    }
458}
459
460#[cfg(feature = "db")]
461async fn refresh_replica_migration_readiness<S: ProvideProbeState + Sync>(state: &S) {
462    refresh_replica_migration_readiness_with(state, |check| {
463        crate::migrate::check_replica_migration_readiness_blocking(
464            check.primary_url,
465            check.replica_url,
466        )
467    })
468    .await;
469}
470
471#[cfg(feature = "db")]
472async fn refresh_replica_migration_readiness_with<S, F, Fut>(state: &S, check_readiness: F)
473where
474    S: ProvideProbeState + Sync,
475    F: FnOnce(ReplicaMigrationCheck) -> Fut,
476    Fut: std::future::Future<Output = crate::migrate::ReplicaMigrationReadiness>,
477{
478    let Some(check) = state.probes().replica_migration_check() else {
479        return;
480    };
481
482    let readiness = check_readiness(check).await;
483
484    if readiness.is_ready() {
485        state.probes().mark_replica_migrations_ready();
486    } else if let Some(detail) = readiness.detail() {
487        state.probes().mark_replica_migrations_unready(detail);
488    }
489}
490
491fn probe_response<S: ProvideProbeState>(
492    state: &S,
493    kind: ProbeKind,
494    indicator_ready: bool,
495) -> (StatusCode, Json<ProbeResponse>) {
496    let startup_complete = state.probes().is_startup_complete();
497    let shutting_down = state.probes().is_shutting_down();
498    let (dependencies_ready, pool_status) = dependency_readiness(state);
499
500    let (status_code, status) = match kind {
501        ProbeKind::Live => (StatusCode::OK, "ok"),
502        ProbeKind::Startup if startup_complete => (StatusCode::OK, "ok"),
503        ProbeKind::Startup => (StatusCode::SERVICE_UNAVAILABLE, "starting"),
504        ProbeKind::Ready
505            if startup_complete && !shutting_down && dependencies_ready && indicator_ready =>
506        {
507            (StatusCode::OK, "ok")
508        }
509        ProbeKind::Ready => (StatusCode::SERVICE_UNAVAILABLE, "degraded"),
510    };
511
512    let detailed = state.health_detailed();
513    let body = ProbeResponse {
514        status,
515        version: if detailed {
516            Some(env!("CARGO_PKG_VERSION"))
517        } else {
518            None
519        },
520        profile: if detailed {
521            Some(state.profile().to_owned())
522        } else {
523            None
524        },
525        uptime: if detailed {
526            Some(state.uptime_display())
527        } else {
528            None
529        },
530        pool: if detailed { pool_status } else { None },
531    };
532
533    (status_code, Json(body))
534}
535
536/// Return `true` when the `/ready` response will be 503 regardless of indicator
537/// results — avoids running potentially slow indicators unnecessarily.
538fn already_degraded<S: ProvideProbeState>(state: &S) -> bool {
539    let probes = state.probes();
540    !probes.is_startup_complete() || probes.is_shutting_down() || !dependency_readiness(state).0
541}
542
543/// Run all readiness-group [`HealthIndicator`]s and return `false` if any are
544/// `Down` or `OutOfService`.
545async fn check_readiness_indicators<S: ProvideProbeState + Sync>(state: &S) -> bool {
546    let Some(registry) = state.health_indicator_registry() else {
547        return true;
548    };
549    let results = registry.run_readiness().await;
550    let statuses: Vec<crate::actuator::HealthStatus> =
551        results.iter().map(|r| r.output.status).collect();
552    crate::actuator::HealthIndicatorRegistry::aggregate_status(&statuses).is_healthy()
553}
554
555/// `GET /live`
556pub async fn live_handler<S: ProvideProbeState + Send + Sync + 'static>(
557    State(state): State<S>,
558) -> impl IntoResponse {
559    probe_response(&state, ProbeKind::Live, true)
560}
561
562/// `GET /ready`
563pub async fn ready_handler<S: ProvideProbeState + Send + Sync + 'static>(
564    State(state): State<S>,
565) -> impl IntoResponse {
566    #[cfg(feature = "db")]
567    refresh_replica_readiness(&state).await;
568    // Skip slow indicator checks when the probe will be 503 regardless.
569    let indicator_ready = if already_degraded(&state) {
570        true
571    } else {
572        check_readiness_indicators(&state).await
573    };
574    probe_response(&state, ProbeKind::Ready, indicator_ready)
575}
576
577/// `GET /startup`
578pub async fn startup_handler<S: ProvideProbeState + Send + Sync + 'static>(
579    State(state): State<S>,
580) -> impl IntoResponse {
581    probe_response(&state, ProbeKind::Startup, true)
582}
583
584/// Compatibility alias for the legacy `/health` endpoint.
585pub(crate) async fn readiness_response<S: ProvideProbeState + Sync>(
586    state: &S,
587) -> (StatusCode, Json<ProbeResponse>) {
588    #[cfg(feature = "db")]
589    refresh_replica_readiness(state).await;
590    let indicator_ready = if already_degraded(state) {
591        true
592    } else {
593        check_readiness_indicators(state).await
594    };
595    probe_response(state, ProbeKind::Ready, indicator_ready)
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    struct TestProbeState {
603        probes: ProbeState,
604        health_detailed: bool,
605        profile: String,
606    }
607
608    impl ProvideProbeState for TestProbeState {
609        fn probes(&self) -> &ProbeState {
610            &self.probes
611        }
612
613        fn health_detailed(&self) -> bool {
614            self.health_detailed
615        }
616
617        fn profile(&self) -> &str {
618            &self.profile
619        }
620
621        fn uptime_display(&self) -> String {
622            "test uptime".to_string()
623        }
624
625        #[cfg(feature = "db")]
626        fn pool(
627            &self,
628        ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
629        {
630            None
631        }
632    }
633
634    impl TestProbeState {
635        fn new() -> Self {
636            Self {
637                probes: ProbeState::pending_startup(),
638                health_detailed: true,
639                profile: "test".to_string(),
640            }
641        }
642    }
643
644    #[test]
645    fn test_live_handler_returns_ok() {
646        let state = TestProbeState::new();
647        let (status, Json(response)) = probe_response(&state, ProbeKind::Live, true);
648        assert_eq!(status, StatusCode::OK);
649        assert_eq!(response.status, "ok");
650    }
651
652    #[tokio::test]
653    async fn test_startup_handler_pending() {
654        let state = TestProbeState::new();
655        let (status, Json(response)) = probe_response(&state, ProbeKind::Startup, true);
656        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
657        assert_eq!(response.status, "starting");
658    }
659
660    #[tokio::test]
661    async fn test_startup_handler_complete() {
662        let state = TestProbeState::new();
663        state.mark_startup_complete();
664        let (status, Json(response)) = probe_response(&state, ProbeKind::Startup, true);
665        assert_eq!(status, StatusCode::OK);
666        assert_eq!(response.status, "ok");
667    }
668
669    #[tokio::test]
670    async fn test_ready_handler_pending_startup() {
671        let state = TestProbeState::new();
672        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
673        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
674        assert_eq!(response.status, "degraded");
675    }
676
677    #[tokio::test]
678    async fn test_ready_handler_complete_startup() {
679        let state = TestProbeState::new();
680        state.mark_startup_complete();
681        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
682        assert_eq!(status, StatusCode::OK);
683        assert_eq!(response.status, "ok");
684    }
685
686    #[tokio::test]
687    async fn test_ready_handler_shutting_down() {
688        let state = TestProbeState::new();
689        state.mark_startup_complete();
690        state.probes().begin_shutdown();
691        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
692        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
693        assert_eq!(response.status, "degraded");
694    }
695
696    #[cfg(feature = "db")]
697    #[tokio::test]
698    async fn ready_fails_when_replica_is_unready_and_policy_is_fail_readiness() {
699        let state = TestProbeState::new();
700        state.mark_startup_complete();
701        state
702            .probes()
703            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
704        state
705            .probes()
706            .mark_replica_unready("replica migrations lag primary");
707
708        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
709
710        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
711        assert_eq!(response.status, "degraded");
712    }
713
714    #[cfg(feature = "db")]
715    #[tokio::test]
716    async fn ready_fails_when_replica_is_configured_but_not_checked() {
717        let state = TestProbeState::new();
718        state.mark_startup_complete();
719        state
720            .probes()
721            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
722
723        let (status, Json(response)) = readiness_response(&state).await;
724
725        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
726        assert_eq!(response.status, "degraded");
727    }
728
729    #[cfg(feature = "db")]
730    #[test]
731    fn replica_migration_lag_can_recover_without_resetting_connection_readiness() {
732        let probes = ProbeState::ready_for_test();
733        probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
734        probes.mark_replica_connection_ready();
735        probes.mark_replica_migrations_unready("replica migrations lag primary");
736
737        assert!(!probes.replica_allows_readiness());
738        assert!(!probes.should_route_reads_to_replica());
739
740        probes.mark_replica_connection_ready();
741        assert!(!probes.replica_allows_readiness());
742        assert!(!probes.should_route_reads_to_replica());
743
744        probes.mark_replica_migrations_ready();
745        assert!(probes.replica_allows_readiness());
746        assert!(probes.should_route_reads_to_replica());
747    }
748
749    #[cfg(feature = "db")]
750    #[test]
751    fn replica_migration_retry_urls_are_stored_for_readiness_rechecks() {
752        let probes = ProbeState::ready_for_test();
753        probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
754        probes.configure_replica_migration_check(
755            "postgres://localhost/primary",
756            "postgres://localhost/replica",
757        );
758
759        let check = probes
760            .replica_migration_check()
761            .expect("migration check should be configured");
762
763        assert_eq!(check.primary_url, "postgres://localhost/primary");
764        assert_eq!(check.replica_url, "postgres://localhost/replica");
765    }
766
767    #[cfg(feature = "db")]
768    #[tokio::test]
769    async fn replica_migration_readiness_rechecks_after_initial_ready_state() {
770        let state = TestProbeState::new();
771        state.mark_startup_complete();
772        state
773            .probes()
774            .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
775        state.probes().configure_replica_migration_check(
776            "postgres://localhost/primary",
777            "postgres://localhost/replica",
778        );
779        state.probes().mark_replica_connection_ready();
780        state.probes().mark_replica_migrations_ready();
781
782        let checks = std::sync::atomic::AtomicUsize::new(0);
783        refresh_replica_migration_readiness_with(&state, |check| {
784            checks.fetch_add(1, Ordering::Relaxed);
785            assert_eq!(check.primary_url, "postgres://localhost/primary");
786            assert_eq!(check.replica_url, "postgres://localhost/replica");
787            std::future::ready(crate::migrate::ReplicaMigrationReadiness::Stale {
788                primary_latest: Some("20260511000000".to_owned()),
789                replica_latest: Some("20260510000000".to_owned()),
790            })
791        })
792        .await;
793
794        assert_eq!(checks.load(Ordering::Relaxed), 1);
795        assert!(!state.probes().replica_allows_readiness());
796        assert!(!state.probes().should_route_reads_to_replica());
797    }
798
799    #[cfg(feature = "db")]
800    #[tokio::test]
801    async fn ready_allows_primary_fallback_when_replica_is_unready() {
802        let state = TestProbeState::new();
803        state.mark_startup_complete();
804        state
805            .probes()
806            .configure_replica_dependency(crate::config::ReplicaFallback::Primary);
807        state
808            .probes()
809            .mark_replica_unready("replica migrations lag primary");
810
811        let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
812
813        assert_eq!(status, StatusCode::OK);
814        assert_eq!(response.status, "ok");
815    }
816
817    #[tokio::test]
818    async fn test_probe_state_set_draining() {
819        let state = ProbeState::starting();
820        assert!(!state.draining());
821        state.set_draining(true);
822        assert!(state.draining());
823    }
824
825    #[tokio::test]
826    async fn test_probe_state_set_startup_complete() {
827        let state = ProbeState::starting();
828        assert!(!state.is_startup_complete());
829        state.set_startup_complete(true);
830        assert!(state.is_startup_complete());
831    }
832
833    #[tokio::test]
834    async fn test_ready_for_test() {
835        let state = ProbeState::ready_for_test();
836        assert!(state.is_startup_complete());
837    }
838
839    #[tokio::test]
840    async fn test_health_detailed_false() {
841        let mut state = TestProbeState::new();
842        state.health_detailed = false;
843
844        let (_, Json(response)) = probe_response(&state, ProbeKind::Live, true);
845        assert!(response.version.is_none());
846        assert!(response.profile.is_none());
847        assert!(response.uptime.is_none());
848        assert!(response.pool.is_none());
849    }
850
851    #[tokio::test]
852    async fn test_begin_draining() {
853        let state = ProbeState::ready_for_test();
854        assert!(!state.draining());
855        state.begin_draining();
856        assert!(state.draining());
857    }
858
859    // ── HealthIndicator integration with /ready ──────────────────
860
861    struct TestProbeStateWithIndicators {
862        probes: ProbeState,
863        health_detailed: bool,
864        profile: String,
865        registry: crate::actuator::HealthIndicatorRegistry,
866    }
867
868    impl ProvideProbeState for TestProbeStateWithIndicators {
869        fn probes(&self) -> &ProbeState {
870            &self.probes
871        }
872
873        fn health_detailed(&self) -> bool {
874            self.health_detailed
875        }
876
877        fn profile(&self) -> &str {
878            &self.profile
879        }
880
881        fn uptime_display(&self) -> String {
882            "test uptime".to_string()
883        }
884
885        #[cfg(feature = "db")]
886        fn pool(
887            &self,
888        ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
889        {
890            None
891        }
892
893        fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
894            Some(&self.registry)
895        }
896    }
897
898    impl TestProbeStateWithIndicators {
899        fn new(registry: crate::actuator::HealthIndicatorRegistry) -> Self {
900            let probes = ProbeState::pending_startup();
901            probes.mark_startup_complete();
902            Self {
903                probes,
904                health_detailed: true,
905                profile: "test".to_string(),
906                registry,
907            }
908        }
909    }
910
911    struct AlwaysDown;
912    impl crate::actuator::HealthIndicator for AlwaysDown {
913        fn check(&self) -> futures::future::BoxFuture<'_, crate::actuator::HealthCheckOutput> {
914            Box::pin(async { crate::actuator::HealthCheckOutput::down() })
915        }
916    }
917
918    struct AlwaysUp;
919    impl crate::actuator::HealthIndicator for AlwaysUp {
920        fn check(&self) -> futures::future::BoxFuture<'_, crate::actuator::HealthCheckOutput> {
921            Box::pin(async { crate::actuator::HealthCheckOutput::up() })
922        }
923    }
924
925    #[tokio::test]
926    async fn ready_is_degraded_when_readiness_indicator_is_down() {
927        let registry = crate::actuator::HealthIndicatorRegistry::new();
928        registry
929            .register(
930                "svc",
931                crate::actuator::IndicatorGroup::Readiness,
932                std::sync::Arc::new(AlwaysDown),
933            )
934            .unwrap();
935        let state = TestProbeStateWithIndicators::new(registry);
936        let (status, Json(response)) = readiness_response(&state).await;
937        assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
938        assert_eq!(response.status, "degraded");
939    }
940
941    #[tokio::test]
942    async fn ready_is_ok_when_readiness_indicator_is_up() {
943        let registry = crate::actuator::HealthIndicatorRegistry::new();
944        registry
945            .register(
946                "svc",
947                crate::actuator::IndicatorGroup::Readiness,
948                std::sync::Arc::new(AlwaysUp),
949            )
950            .unwrap();
951        let state = TestProbeStateWithIndicators::new(registry);
952        let (status, Json(response)) = readiness_response(&state).await;
953        assert_eq!(status, StatusCode::OK);
954        assert_eq!(response.status, "ok");
955    }
956
957    #[tokio::test]
958    async fn ready_is_ok_when_health_only_indicator_is_down() {
959        let registry = crate::actuator::HealthIndicatorRegistry::new();
960        registry
961            .register(
962                "svc",
963                crate::actuator::IndicatorGroup::HealthOnly,
964                std::sync::Arc::new(AlwaysDown),
965            )
966            .unwrap();
967        let state = TestProbeStateWithIndicators::new(registry);
968        let (status, Json(response)) = readiness_response(&state).await;
969        assert_eq!(status, StatusCode::OK);
970        assert_eq!(response.status, "ok");
971    }
972}