1use std::sync::Arc;
9#[cfg(feature = "db")]
10use std::sync::RwLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use crate::extract::State;
14use axum::Json;
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::Serialize;
18
19pub trait ProvideProbeState {
25 fn probes(&self) -> &ProbeState;
28
29 fn health_detailed(&self) -> bool;
32
33 fn profile(&self) -> &str;
35
36 fn uptime_display(&self) -> String;
39
40 #[cfg(feature = "db")]
43 fn pool(
44 &self,
45 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
46
47 #[cfg(feature = "db")]
49 fn replica_pool(
50 &self,
51 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
52 {
53 None
54 }
55
56 fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
61 None
62 }
63
64 fn mark_startup_complete(&self) {
89 self.probes().mark_startup_complete();
90 }
91}
92
93#[derive(Clone, Debug, Default)]
95pub struct ProbeState {
96 startup_complete: Arc<AtomicBool>,
97 shutting_down: Arc<AtomicBool>,
98 #[cfg(feature = "db")]
99 replica_dependency: Arc<RwLock<ReplicaDependency>>,
100}
101
102#[cfg(feature = "db")]
103#[derive(Clone, Debug, PartialEq, Eq)]
104pub(crate) struct ReplicaMigrationCheck {
105 pub(crate) primary_url: String,
106 pub(crate) replica_url: String,
107}
108
109#[cfg(feature = "db")]
110#[derive(Clone, Debug, PartialEq, Eq)]
111struct ReplicaDependency {
112 configured: bool,
113 fallback: crate::config::ReplicaFallback,
114 connection_ready: bool,
115 migrations_ready: bool,
116 migration_check: Option<ReplicaMigrationCheck>,
117 detail: Option<String>,
118}
119
120#[cfg(feature = "db")]
121impl Default for ReplicaDependency {
122 fn default() -> Self {
123 Self {
124 configured: false,
125 fallback: crate::config::ReplicaFallback::default(),
126 connection_ready: true,
127 migrations_ready: true,
128 migration_check: None,
129 detail: None,
130 }
131 }
132}
133
134impl ProbeState {
135 #[must_use]
137 pub fn pending_startup() -> Self {
138 Self::default()
139 }
140
141 #[must_use]
143 pub fn starting() -> Self {
144 Self::pending_startup()
145 }
146
147 #[must_use]
149 pub fn ready_for_test() -> Self {
150 let state = Self::pending_startup();
151 state.mark_startup_complete();
152 state
153 }
154
155 pub fn mark_startup_complete(&self) {
157 self.startup_complete.store(true, Ordering::Relaxed);
158 }
159
160 pub fn set_startup_complete(&self, complete: bool) {
162 self.startup_complete.store(complete, Ordering::Relaxed);
163 }
164
165 pub fn begin_shutdown(&self) {
167 self.shutting_down.store(true, Ordering::Relaxed);
168 }
169
170 pub fn begin_draining(&self) {
172 self.begin_shutdown();
173 }
174
175 pub fn set_draining(&self, draining: bool) {
177 self.shutting_down.store(draining, Ordering::Relaxed);
178 }
179
180 #[cfg(feature = "db")]
186 pub fn configure_replica_dependency(&self, fallback: crate::config::ReplicaFallback) {
187 let mut dependency = self
188 .replica_dependency
189 .write()
190 .expect("replica dependency lock poisoned");
191 *dependency = ReplicaDependency {
192 configured: true,
193 fallback,
194 connection_ready: false,
195 migrations_ready: true,
196 migration_check: None,
197 detail: Some("replica has not passed a readiness check".to_owned()),
198 };
199 }
200
201 #[cfg(feature = "db")]
203 pub(crate) fn configure_replica_migration_check(
204 &self,
205 primary_url: impl Into<String>,
206 replica_url: impl Into<String>,
207 ) {
208 let mut dependency = self
209 .replica_dependency
210 .write()
211 .expect("replica dependency lock poisoned");
212 dependency.migration_check = Some(ReplicaMigrationCheck {
213 primary_url: primary_url.into(),
214 replica_url: replica_url.into(),
215 });
216 }
217
218 #[cfg(feature = "db")]
224 pub fn mark_replica_connection_ready(&self) {
225 let mut dependency = self
226 .replica_dependency
227 .write()
228 .expect("replica dependency lock poisoned");
229 dependency.connection_ready = true;
230 if dependency.migrations_ready {
231 dependency.detail = None;
232 }
233 }
234
235 #[cfg(feature = "db")]
241 pub fn mark_replica_connection_unready(&self, detail: impl Into<String>) {
242 let mut dependency = self
243 .replica_dependency
244 .write()
245 .expect("replica dependency lock poisoned");
246 dependency.connection_ready = false;
247 dependency.detail = Some(detail.into());
248 }
249
250 #[cfg(feature = "db")]
256 pub fn mark_replica_migrations_ready(&self) {
257 let mut dependency = self
258 .replica_dependency
259 .write()
260 .expect("replica dependency lock poisoned");
261 dependency.migrations_ready = true;
262 if dependency.connection_ready {
263 dependency.detail = None;
264 }
265 }
266
267 #[cfg(feature = "db")]
269 pub(crate) fn mark_replica_migrations_unready(&self, detail: impl Into<String>) {
270 let mut dependency = self
271 .replica_dependency
272 .write()
273 .expect("replica dependency lock poisoned");
274 dependency.migrations_ready = false;
275 dependency.detail = Some(detail.into());
276 }
277
278 #[cfg(feature = "db")]
284 pub fn mark_replica_ready(&self) {
285 let mut dependency = self
286 .replica_dependency
287 .write()
288 .expect("replica dependency lock poisoned");
289 dependency.connection_ready = true;
290 dependency.migrations_ready = true;
291 dependency.detail = None;
292 }
293
294 #[cfg(feature = "db")]
300 pub fn mark_replica_unready(&self, detail: impl Into<String>) {
301 let mut dependency = self
302 .replica_dependency
303 .write()
304 .expect("replica dependency lock poisoned");
305 dependency.connection_ready = false;
306 dependency.migrations_ready = false;
307 dependency.detail = Some(detail.into());
308 }
309
310 #[must_use]
312 pub fn is_startup_complete(&self) -> bool {
313 self.startup_complete.load(Ordering::Relaxed)
314 }
315
316 #[must_use]
318 pub fn is_shutting_down(&self) -> bool {
319 self.shutting_down.load(Ordering::Relaxed)
320 }
321
322 #[must_use]
324 pub fn draining(&self) -> bool {
325 self.is_shutting_down()
326 }
327
328 #[cfg(feature = "db")]
329 pub(crate) fn replica_allows_readiness(&self) -> bool {
330 let dependency = self
331 .replica_dependency
332 .read()
333 .expect("replica dependency lock poisoned");
334 let ready = dependency.connection_ready && dependency.migrations_ready;
335 !dependency.configured
336 || ready
337 || matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
338 }
339
340 #[cfg(feature = "db")]
341 pub(crate) fn should_route_reads_to_replica(&self) -> bool {
342 let dependency = self
343 .replica_dependency
344 .read()
345 .expect("replica dependency lock poisoned");
346 !dependency.configured || (dependency.connection_ready && dependency.migrations_ready)
347 }
348
349 #[cfg(feature = "db")]
350 pub(crate) fn should_fallback_reads_to_primary(&self) -> bool {
351 let dependency = self
352 .replica_dependency
353 .read()
354 .expect("replica dependency lock poisoned");
355 dependency.configured
356 && !(dependency.connection_ready && dependency.migrations_ready)
357 && matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
358 }
359
360 #[cfg(feature = "db")]
361 pub(crate) fn replica_configured(&self) -> bool {
362 self.replica_dependency
363 .read()
364 .expect("replica dependency lock poisoned")
365 .configured
366 }
367
368 #[cfg(feature = "db")]
369 pub(crate) fn replica_migration_check(&self) -> Option<ReplicaMigrationCheck> {
370 self.replica_dependency
371 .read()
372 .expect("replica dependency lock poisoned")
373 .migration_check
374 .clone()
375 }
376}
377
378#[derive(Clone, Copy)]
379enum ProbeKind {
380 Live,
381 Ready,
382 Startup,
383}
384
385#[derive(Serialize)]
386pub(crate) struct ProbeResponse {
387 status: &'static str,
388 #[serde(skip_serializing_if = "Option::is_none")]
389 version: Option<&'static str>,
390 #[serde(skip_serializing_if = "Option::is_none")]
391 profile: Option<String>,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 uptime: Option<String>,
394 #[serde(skip_serializing_if = "Option::is_none")]
395 pool: Option<PoolStatus>,
396}
397
398#[derive(Serialize)]
399pub(crate) struct PoolStatus {
400 size: u64,
401 available: u64,
402 waiting: u64,
403}
404
405#[allow(clippy::missing_const_for_fn, unused_variables)]
406fn dependency_readiness<S: ProvideProbeState>(state: &S) -> (bool, Option<PoolStatus>) {
407 #[cfg(feature = "db")]
408 {
409 let replica_ready_for_policy = state.probes().replica_allows_readiness();
410 let (pool_ready, pool_status) = state.pool().map_or((true, None), |pool| {
411 let status = pool.status();
412 let available = status.available as u64;
413 let size = status.max_size as u64;
414 let waiting = status.waiting as u64;
415
416 (
417 available > 0 || waiting == 0,
418 Some(PoolStatus {
419 size,
420 available,
421 waiting,
422 }),
423 )
424 });
425
426 (pool_ready && replica_ready_for_policy, pool_status)
427 }
428
429 #[cfg(not(feature = "db"))]
430 {
431 (true, None)
432 }
433}
434
435#[cfg(feature = "db")]
436async fn refresh_replica_readiness<S: ProvideProbeState + Sync>(state: &S) {
437 if !state.probes().replica_configured() {
438 return;
439 }
440
441 let Some(replica_pool) = state.replica_pool() else {
442 state
443 .probes()
444 .mark_replica_connection_unready("replica pool is not available");
445 return;
446 };
447
448 match replica_pool.get().await {
449 Ok(conn) => {
450 drop(conn);
451 state.probes().mark_replica_connection_ready();
452 refresh_replica_migration_readiness(state).await;
453 }
454 Err(error) => state
455 .probes()
456 .mark_replica_connection_unready(format!("replica connection failed: {error}")),
457 }
458}
459
460#[cfg(feature = "db")]
461async fn refresh_replica_migration_readiness<S: ProvideProbeState + Sync>(state: &S) {
462 refresh_replica_migration_readiness_with(state, |check| {
463 crate::migrate::check_replica_migration_readiness_blocking(
464 check.primary_url,
465 check.replica_url,
466 )
467 })
468 .await;
469}
470
471#[cfg(feature = "db")]
472async fn refresh_replica_migration_readiness_with<S, F, Fut>(state: &S, check_readiness: F)
473where
474 S: ProvideProbeState + Sync,
475 F: FnOnce(ReplicaMigrationCheck) -> Fut,
476 Fut: std::future::Future<Output = crate::migrate::ReplicaMigrationReadiness>,
477{
478 let Some(check) = state.probes().replica_migration_check() else {
479 return;
480 };
481
482 let readiness = check_readiness(check).await;
483
484 if readiness.is_ready() {
485 state.probes().mark_replica_migrations_ready();
486 } else if let Some(detail) = readiness.detail() {
487 state.probes().mark_replica_migrations_unready(detail);
488 }
489}
490
491fn probe_response<S: ProvideProbeState>(
492 state: &S,
493 kind: ProbeKind,
494 indicator_ready: bool,
495) -> (StatusCode, Json<ProbeResponse>) {
496 let startup_complete = state.probes().is_startup_complete();
497 let shutting_down = state.probes().is_shutting_down();
498 let (dependencies_ready, pool_status) = dependency_readiness(state);
499
500 let (status_code, status) = match kind {
501 ProbeKind::Live => (StatusCode::OK, "ok"),
502 ProbeKind::Startup if startup_complete => (StatusCode::OK, "ok"),
503 ProbeKind::Startup => (StatusCode::SERVICE_UNAVAILABLE, "starting"),
504 ProbeKind::Ready
505 if startup_complete && !shutting_down && dependencies_ready && indicator_ready =>
506 {
507 (StatusCode::OK, "ok")
508 }
509 ProbeKind::Ready => (StatusCode::SERVICE_UNAVAILABLE, "degraded"),
510 };
511
512 let detailed = state.health_detailed();
513 let body = ProbeResponse {
514 status,
515 version: if detailed {
516 Some(env!("CARGO_PKG_VERSION"))
517 } else {
518 None
519 },
520 profile: if detailed {
521 Some(state.profile().to_owned())
522 } else {
523 None
524 },
525 uptime: if detailed {
526 Some(state.uptime_display())
527 } else {
528 None
529 },
530 pool: if detailed { pool_status } else { None },
531 };
532
533 (status_code, Json(body))
534}
535
536fn already_degraded<S: ProvideProbeState>(state: &S) -> bool {
539 let probes = state.probes();
540 !probes.is_startup_complete() || probes.is_shutting_down() || !dependency_readiness(state).0
541}
542
543async fn check_readiness_indicators<S: ProvideProbeState + Sync>(state: &S) -> bool {
546 let Some(registry) = state.health_indicator_registry() else {
547 return true;
548 };
549 let results = registry.run_readiness().await;
550 let statuses: Vec<crate::actuator::HealthStatus> =
551 results.iter().map(|r| r.output.status).collect();
552 crate::actuator::HealthIndicatorRegistry::aggregate_status(&statuses).is_healthy()
553}
554
555pub async fn live_handler<S: ProvideProbeState + Send + Sync + 'static>(
557 State(state): State<S>,
558) -> impl IntoResponse {
559 probe_response(&state, ProbeKind::Live, true)
560}
561
562pub async fn ready_handler<S: ProvideProbeState + Send + Sync + 'static>(
564 State(state): State<S>,
565) -> impl IntoResponse {
566 #[cfg(feature = "db")]
567 refresh_replica_readiness(&state).await;
568 let indicator_ready = if already_degraded(&state) {
570 true
571 } else {
572 check_readiness_indicators(&state).await
573 };
574 probe_response(&state, ProbeKind::Ready, indicator_ready)
575}
576
577pub async fn startup_handler<S: ProvideProbeState + Send + Sync + 'static>(
579 State(state): State<S>,
580) -> impl IntoResponse {
581 probe_response(&state, ProbeKind::Startup, true)
582}
583
584pub(crate) async fn readiness_response<S: ProvideProbeState + Sync>(
586 state: &S,
587) -> (StatusCode, Json<ProbeResponse>) {
588 #[cfg(feature = "db")]
589 refresh_replica_readiness(state).await;
590 let indicator_ready = if already_degraded(state) {
591 true
592 } else {
593 check_readiness_indicators(state).await
594 };
595 probe_response(state, ProbeKind::Ready, indicator_ready)
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 struct TestProbeState {
603 probes: ProbeState,
604 health_detailed: bool,
605 profile: String,
606 }
607
608 impl ProvideProbeState for TestProbeState {
609 fn probes(&self) -> &ProbeState {
610 &self.probes
611 }
612
613 fn health_detailed(&self) -> bool {
614 self.health_detailed
615 }
616
617 fn profile(&self) -> &str {
618 &self.profile
619 }
620
621 fn uptime_display(&self) -> String {
622 "test uptime".to_string()
623 }
624
625 #[cfg(feature = "db")]
626 fn pool(
627 &self,
628 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
629 {
630 None
631 }
632 }
633
634 impl TestProbeState {
635 fn new() -> Self {
636 Self {
637 probes: ProbeState::pending_startup(),
638 health_detailed: true,
639 profile: "test".to_string(),
640 }
641 }
642 }
643
644 #[test]
645 fn test_live_handler_returns_ok() {
646 let state = TestProbeState::new();
647 let (status, Json(response)) = probe_response(&state, ProbeKind::Live, true);
648 assert_eq!(status, StatusCode::OK);
649 assert_eq!(response.status, "ok");
650 }
651
652 #[tokio::test]
653 async fn test_startup_handler_pending() {
654 let state = TestProbeState::new();
655 let (status, Json(response)) = probe_response(&state, ProbeKind::Startup, true);
656 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
657 assert_eq!(response.status, "starting");
658 }
659
660 #[tokio::test]
661 async fn test_startup_handler_complete() {
662 let state = TestProbeState::new();
663 state.mark_startup_complete();
664 let (status, Json(response)) = probe_response(&state, ProbeKind::Startup, true);
665 assert_eq!(status, StatusCode::OK);
666 assert_eq!(response.status, "ok");
667 }
668
669 #[tokio::test]
670 async fn test_ready_handler_pending_startup() {
671 let state = TestProbeState::new();
672 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
673 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
674 assert_eq!(response.status, "degraded");
675 }
676
677 #[tokio::test]
678 async fn test_ready_handler_complete_startup() {
679 let state = TestProbeState::new();
680 state.mark_startup_complete();
681 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
682 assert_eq!(status, StatusCode::OK);
683 assert_eq!(response.status, "ok");
684 }
685
686 #[tokio::test]
687 async fn test_ready_handler_shutting_down() {
688 let state = TestProbeState::new();
689 state.mark_startup_complete();
690 state.probes().begin_shutdown();
691 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
692 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
693 assert_eq!(response.status, "degraded");
694 }
695
696 #[cfg(feature = "db")]
697 #[tokio::test]
698 async fn ready_fails_when_replica_is_unready_and_policy_is_fail_readiness() {
699 let state = TestProbeState::new();
700 state.mark_startup_complete();
701 state
702 .probes()
703 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
704 state
705 .probes()
706 .mark_replica_unready("replica migrations lag primary");
707
708 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
709
710 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
711 assert_eq!(response.status, "degraded");
712 }
713
714 #[cfg(feature = "db")]
715 #[tokio::test]
716 async fn ready_fails_when_replica_is_configured_but_not_checked() {
717 let state = TestProbeState::new();
718 state.mark_startup_complete();
719 state
720 .probes()
721 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
722
723 let (status, Json(response)) = readiness_response(&state).await;
724
725 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
726 assert_eq!(response.status, "degraded");
727 }
728
729 #[cfg(feature = "db")]
730 #[test]
731 fn replica_migration_lag_can_recover_without_resetting_connection_readiness() {
732 let probes = ProbeState::ready_for_test();
733 probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
734 probes.mark_replica_connection_ready();
735 probes.mark_replica_migrations_unready("replica migrations lag primary");
736
737 assert!(!probes.replica_allows_readiness());
738 assert!(!probes.should_route_reads_to_replica());
739
740 probes.mark_replica_connection_ready();
741 assert!(!probes.replica_allows_readiness());
742 assert!(!probes.should_route_reads_to_replica());
743
744 probes.mark_replica_migrations_ready();
745 assert!(probes.replica_allows_readiness());
746 assert!(probes.should_route_reads_to_replica());
747 }
748
749 #[cfg(feature = "db")]
750 #[test]
751 fn replica_migration_retry_urls_are_stored_for_readiness_rechecks() {
752 let probes = ProbeState::ready_for_test();
753 probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
754 probes.configure_replica_migration_check(
755 "postgres://localhost/primary",
756 "postgres://localhost/replica",
757 );
758
759 let check = probes
760 .replica_migration_check()
761 .expect("migration check should be configured");
762
763 assert_eq!(check.primary_url, "postgres://localhost/primary");
764 assert_eq!(check.replica_url, "postgres://localhost/replica");
765 }
766
767 #[cfg(feature = "db")]
768 #[tokio::test]
769 async fn replica_migration_readiness_rechecks_after_initial_ready_state() {
770 let state = TestProbeState::new();
771 state.mark_startup_complete();
772 state
773 .probes()
774 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
775 state.probes().configure_replica_migration_check(
776 "postgres://localhost/primary",
777 "postgres://localhost/replica",
778 );
779 state.probes().mark_replica_connection_ready();
780 state.probes().mark_replica_migrations_ready();
781
782 let checks = std::sync::atomic::AtomicUsize::new(0);
783 refresh_replica_migration_readiness_with(&state, |check| {
784 checks.fetch_add(1, Ordering::Relaxed);
785 assert_eq!(check.primary_url, "postgres://localhost/primary");
786 assert_eq!(check.replica_url, "postgres://localhost/replica");
787 std::future::ready(crate::migrate::ReplicaMigrationReadiness::Stale {
788 primary_latest: Some("20260511000000".to_owned()),
789 replica_latest: Some("20260510000000".to_owned()),
790 })
791 })
792 .await;
793
794 assert_eq!(checks.load(Ordering::Relaxed), 1);
795 assert!(!state.probes().replica_allows_readiness());
796 assert!(!state.probes().should_route_reads_to_replica());
797 }
798
799 #[cfg(feature = "db")]
800 #[tokio::test]
801 async fn ready_allows_primary_fallback_when_replica_is_unready() {
802 let state = TestProbeState::new();
803 state.mark_startup_complete();
804 state
805 .probes()
806 .configure_replica_dependency(crate::config::ReplicaFallback::Primary);
807 state
808 .probes()
809 .mark_replica_unready("replica migrations lag primary");
810
811 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready, true);
812
813 assert_eq!(status, StatusCode::OK);
814 assert_eq!(response.status, "ok");
815 }
816
817 #[tokio::test]
818 async fn test_probe_state_set_draining() {
819 let state = ProbeState::starting();
820 assert!(!state.draining());
821 state.set_draining(true);
822 assert!(state.draining());
823 }
824
825 #[tokio::test]
826 async fn test_probe_state_set_startup_complete() {
827 let state = ProbeState::starting();
828 assert!(!state.is_startup_complete());
829 state.set_startup_complete(true);
830 assert!(state.is_startup_complete());
831 }
832
833 #[tokio::test]
834 async fn test_ready_for_test() {
835 let state = ProbeState::ready_for_test();
836 assert!(state.is_startup_complete());
837 }
838
839 #[tokio::test]
840 async fn test_health_detailed_false() {
841 let mut state = TestProbeState::new();
842 state.health_detailed = false;
843
844 let (_, Json(response)) = probe_response(&state, ProbeKind::Live, true);
845 assert!(response.version.is_none());
846 assert!(response.profile.is_none());
847 assert!(response.uptime.is_none());
848 assert!(response.pool.is_none());
849 }
850
851 #[tokio::test]
852 async fn test_begin_draining() {
853 let state = ProbeState::ready_for_test();
854 assert!(!state.draining());
855 state.begin_draining();
856 assert!(state.draining());
857 }
858
859 struct TestProbeStateWithIndicators {
862 probes: ProbeState,
863 health_detailed: bool,
864 profile: String,
865 registry: crate::actuator::HealthIndicatorRegistry,
866 }
867
868 impl ProvideProbeState for TestProbeStateWithIndicators {
869 fn probes(&self) -> &ProbeState {
870 &self.probes
871 }
872
873 fn health_detailed(&self) -> bool {
874 self.health_detailed
875 }
876
877 fn profile(&self) -> &str {
878 &self.profile
879 }
880
881 fn uptime_display(&self) -> String {
882 "test uptime".to_string()
883 }
884
885 #[cfg(feature = "db")]
886 fn pool(
887 &self,
888 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
889 {
890 None
891 }
892
893 fn health_indicator_registry(&self) -> Option<&crate::actuator::HealthIndicatorRegistry> {
894 Some(&self.registry)
895 }
896 }
897
898 impl TestProbeStateWithIndicators {
899 fn new(registry: crate::actuator::HealthIndicatorRegistry) -> Self {
900 let probes = ProbeState::pending_startup();
901 probes.mark_startup_complete();
902 Self {
903 probes,
904 health_detailed: true,
905 profile: "test".to_string(),
906 registry,
907 }
908 }
909 }
910
911 struct AlwaysDown;
912 impl crate::actuator::HealthIndicator for AlwaysDown {
913 fn check(&self) -> futures::future::BoxFuture<'_, crate::actuator::HealthCheckOutput> {
914 Box::pin(async { crate::actuator::HealthCheckOutput::down() })
915 }
916 }
917
918 struct AlwaysUp;
919 impl crate::actuator::HealthIndicator for AlwaysUp {
920 fn check(&self) -> futures::future::BoxFuture<'_, crate::actuator::HealthCheckOutput> {
921 Box::pin(async { crate::actuator::HealthCheckOutput::up() })
922 }
923 }
924
925 #[tokio::test]
926 async fn ready_is_degraded_when_readiness_indicator_is_down() {
927 let registry = crate::actuator::HealthIndicatorRegistry::new();
928 registry
929 .register(
930 "svc",
931 crate::actuator::IndicatorGroup::Readiness,
932 std::sync::Arc::new(AlwaysDown),
933 )
934 .unwrap();
935 let state = TestProbeStateWithIndicators::new(registry);
936 let (status, Json(response)) = readiness_response(&state).await;
937 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
938 assert_eq!(response.status, "degraded");
939 }
940
941 #[tokio::test]
942 async fn ready_is_ok_when_readiness_indicator_is_up() {
943 let registry = crate::actuator::HealthIndicatorRegistry::new();
944 registry
945 .register(
946 "svc",
947 crate::actuator::IndicatorGroup::Readiness,
948 std::sync::Arc::new(AlwaysUp),
949 )
950 .unwrap();
951 let state = TestProbeStateWithIndicators::new(registry);
952 let (status, Json(response)) = readiness_response(&state).await;
953 assert_eq!(status, StatusCode::OK);
954 assert_eq!(response.status, "ok");
955 }
956
957 #[tokio::test]
958 async fn ready_is_ok_when_health_only_indicator_is_down() {
959 let registry = crate::actuator::HealthIndicatorRegistry::new();
960 registry
961 .register(
962 "svc",
963 crate::actuator::IndicatorGroup::HealthOnly,
964 std::sync::Arc::new(AlwaysDown),
965 )
966 .unwrap();
967 let state = TestProbeStateWithIndicators::new(registry);
968 let (status, Json(response)) = readiness_response(&state).await;
969 assert_eq!(status, StatusCode::OK);
970 assert_eq!(response.status, "ok");
971 }
972}