1use std::sync::Arc;
9#[cfg(feature = "db")]
10use std::sync::RwLock;
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use crate::extract::State;
14use axum::Json;
15use axum::http::StatusCode;
16use axum::response::IntoResponse;
17use serde::Serialize;
18
19pub trait ProvideProbeState {
25 fn probes(&self) -> &ProbeState;
28
29 fn health_detailed(&self) -> bool;
32
33 fn profile(&self) -> &str;
35
36 fn uptime_display(&self) -> String;
39
40 #[cfg(feature = "db")]
43 fn pool(
44 &self,
45 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>;
46
47 #[cfg(feature = "db")]
49 fn replica_pool(
50 &self,
51 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
52 {
53 None
54 }
55
56 fn mark_startup_complete(&self) {
81 self.probes().mark_startup_complete();
82 }
83}
84
85#[derive(Clone, Debug, Default)]
87pub struct ProbeState {
88 startup_complete: Arc<AtomicBool>,
89 shutting_down: Arc<AtomicBool>,
90 #[cfg(feature = "db")]
91 replica_dependency: Arc<RwLock<ReplicaDependency>>,
92}
93
94#[cfg(feature = "db")]
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub(crate) struct ReplicaMigrationCheck {
97 pub(crate) primary_url: String,
98 pub(crate) replica_url: String,
99}
100
101#[cfg(feature = "db")]
102#[derive(Clone, Debug, PartialEq, Eq)]
103struct ReplicaDependency {
104 configured: bool,
105 fallback: crate::config::ReplicaFallback,
106 connection_ready: bool,
107 migrations_ready: bool,
108 migration_check: Option<ReplicaMigrationCheck>,
109 detail: Option<String>,
110}
111
112#[cfg(feature = "db")]
113impl Default for ReplicaDependency {
114 fn default() -> Self {
115 Self {
116 configured: false,
117 fallback: crate::config::ReplicaFallback::default(),
118 connection_ready: true,
119 migrations_ready: true,
120 migration_check: None,
121 detail: None,
122 }
123 }
124}
125
126impl ProbeState {
127 #[must_use]
129 pub fn pending_startup() -> Self {
130 Self::default()
131 }
132
133 #[must_use]
135 pub fn starting() -> Self {
136 Self::pending_startup()
137 }
138
139 #[must_use]
141 pub fn ready_for_test() -> Self {
142 let state = Self::pending_startup();
143 state.mark_startup_complete();
144 state
145 }
146
147 pub fn mark_startup_complete(&self) {
149 self.startup_complete.store(true, Ordering::Relaxed);
150 }
151
152 pub fn set_startup_complete(&self, complete: bool) {
154 self.startup_complete.store(complete, Ordering::Relaxed);
155 }
156
157 pub fn begin_shutdown(&self) {
159 self.shutting_down.store(true, Ordering::Relaxed);
160 }
161
162 pub fn begin_draining(&self) {
164 self.begin_shutdown();
165 }
166
167 pub fn set_draining(&self, draining: bool) {
169 self.shutting_down.store(draining, Ordering::Relaxed);
170 }
171
172 #[cfg(feature = "db")]
178 pub fn configure_replica_dependency(&self, fallback: crate::config::ReplicaFallback) {
179 let mut dependency = self
180 .replica_dependency
181 .write()
182 .expect("replica dependency lock poisoned");
183 *dependency = ReplicaDependency {
184 configured: true,
185 fallback,
186 connection_ready: false,
187 migrations_ready: true,
188 migration_check: None,
189 detail: Some("replica has not passed a readiness check".to_owned()),
190 };
191 }
192
193 #[cfg(feature = "db")]
195 pub(crate) fn configure_replica_migration_check(
196 &self,
197 primary_url: impl Into<String>,
198 replica_url: impl Into<String>,
199 ) {
200 let mut dependency = self
201 .replica_dependency
202 .write()
203 .expect("replica dependency lock poisoned");
204 dependency.migration_check = Some(ReplicaMigrationCheck {
205 primary_url: primary_url.into(),
206 replica_url: replica_url.into(),
207 });
208 }
209
210 #[cfg(feature = "db")]
216 pub fn mark_replica_connection_ready(&self) {
217 let mut dependency = self
218 .replica_dependency
219 .write()
220 .expect("replica dependency lock poisoned");
221 dependency.connection_ready = true;
222 if dependency.migrations_ready {
223 dependency.detail = None;
224 }
225 }
226
227 #[cfg(feature = "db")]
233 pub fn mark_replica_connection_unready(&self, detail: impl Into<String>) {
234 let mut dependency = self
235 .replica_dependency
236 .write()
237 .expect("replica dependency lock poisoned");
238 dependency.connection_ready = false;
239 dependency.detail = Some(detail.into());
240 }
241
242 #[cfg(feature = "db")]
248 pub fn mark_replica_migrations_ready(&self) {
249 let mut dependency = self
250 .replica_dependency
251 .write()
252 .expect("replica dependency lock poisoned");
253 dependency.migrations_ready = true;
254 if dependency.connection_ready {
255 dependency.detail = None;
256 }
257 }
258
259 #[cfg(feature = "db")]
261 pub(crate) fn mark_replica_migrations_unready(&self, detail: impl Into<String>) {
262 let mut dependency = self
263 .replica_dependency
264 .write()
265 .expect("replica dependency lock poisoned");
266 dependency.migrations_ready = false;
267 dependency.detail = Some(detail.into());
268 }
269
270 #[cfg(feature = "db")]
276 pub fn mark_replica_ready(&self) {
277 let mut dependency = self
278 .replica_dependency
279 .write()
280 .expect("replica dependency lock poisoned");
281 dependency.connection_ready = true;
282 dependency.migrations_ready = true;
283 dependency.detail = None;
284 }
285
286 #[cfg(feature = "db")]
292 pub fn mark_replica_unready(&self, detail: impl Into<String>) {
293 let mut dependency = self
294 .replica_dependency
295 .write()
296 .expect("replica dependency lock poisoned");
297 dependency.connection_ready = false;
298 dependency.migrations_ready = false;
299 dependency.detail = Some(detail.into());
300 }
301
302 #[must_use]
304 pub fn is_startup_complete(&self) -> bool {
305 self.startup_complete.load(Ordering::Relaxed)
306 }
307
308 #[must_use]
310 pub fn is_shutting_down(&self) -> bool {
311 self.shutting_down.load(Ordering::Relaxed)
312 }
313
314 #[must_use]
316 pub fn draining(&self) -> bool {
317 self.is_shutting_down()
318 }
319
320 #[cfg(feature = "db")]
321 pub(crate) fn replica_allows_readiness(&self) -> bool {
322 let dependency = self
323 .replica_dependency
324 .read()
325 .expect("replica dependency lock poisoned");
326 let ready = dependency.connection_ready && dependency.migrations_ready;
327 !dependency.configured
328 || ready
329 || matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
330 }
331
332 #[cfg(feature = "db")]
333 pub(crate) fn should_route_reads_to_replica(&self) -> bool {
334 let dependency = self
335 .replica_dependency
336 .read()
337 .expect("replica dependency lock poisoned");
338 !dependency.configured || (dependency.connection_ready && dependency.migrations_ready)
339 }
340
341 #[cfg(feature = "db")]
342 pub(crate) fn should_fallback_reads_to_primary(&self) -> bool {
343 let dependency = self
344 .replica_dependency
345 .read()
346 .expect("replica dependency lock poisoned");
347 dependency.configured
348 && !(dependency.connection_ready && dependency.migrations_ready)
349 && matches!(dependency.fallback, crate::config::ReplicaFallback::Primary)
350 }
351
352 #[cfg(feature = "db")]
353 pub(crate) fn replica_configured(&self) -> bool {
354 self.replica_dependency
355 .read()
356 .expect("replica dependency lock poisoned")
357 .configured
358 }
359
360 #[cfg(feature = "db")]
361 pub(crate) fn replica_migration_check(&self) -> Option<ReplicaMigrationCheck> {
362 self.replica_dependency
363 .read()
364 .expect("replica dependency lock poisoned")
365 .migration_check
366 .clone()
367 }
368}
369
370#[derive(Clone, Copy)]
371enum ProbeKind {
372 Live,
373 Ready,
374 Startup,
375}
376
377#[derive(Serialize)]
378pub(crate) struct ProbeResponse {
379 status: &'static str,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 version: Option<&'static str>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 profile: Option<String>,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 uptime: Option<String>,
386 #[serde(skip_serializing_if = "Option::is_none")]
387 pool: Option<PoolStatus>,
388}
389
390#[derive(Serialize)]
391pub(crate) struct PoolStatus {
392 size: u64,
393 available: u64,
394 waiting: u64,
395}
396
397#[allow(clippy::missing_const_for_fn, unused_variables)]
398fn dependency_readiness<S: ProvideProbeState>(state: &S) -> (bool, Option<PoolStatus>) {
399 #[cfg(feature = "db")]
400 {
401 let replica_ready_for_policy = state.probes().replica_allows_readiness();
402 let (pool_ready, pool_status) = state.pool().map_or((true, None), |pool| {
403 let status = pool.status();
404 let available = status.available as u64;
405 let size = status.max_size as u64;
406 let waiting = status.waiting as u64;
407
408 (
409 available > 0 || waiting == 0,
410 Some(PoolStatus {
411 size,
412 available,
413 waiting,
414 }),
415 )
416 });
417
418 (pool_ready && replica_ready_for_policy, pool_status)
419 }
420
421 #[cfg(not(feature = "db"))]
422 {
423 (true, None)
424 }
425}
426
427#[cfg(feature = "db")]
428async fn refresh_replica_readiness<S: ProvideProbeState + Sync>(state: &S) {
429 if !state.probes().replica_configured() {
430 return;
431 }
432
433 let Some(replica_pool) = state.replica_pool() else {
434 state
435 .probes()
436 .mark_replica_connection_unready("replica pool is not available");
437 return;
438 };
439
440 match replica_pool.get().await {
441 Ok(conn) => {
442 drop(conn);
443 state.probes().mark_replica_connection_ready();
444 refresh_replica_migration_readiness(state).await;
445 }
446 Err(error) => state
447 .probes()
448 .mark_replica_connection_unready(format!("replica connection failed: {error}")),
449 }
450}
451
452#[cfg(feature = "db")]
453async fn refresh_replica_migration_readiness<S: ProvideProbeState + Sync>(state: &S) {
454 refresh_replica_migration_readiness_with(state, |check| {
455 crate::migrate::check_replica_migration_readiness_blocking(
456 check.primary_url,
457 check.replica_url,
458 )
459 })
460 .await;
461}
462
463#[cfg(feature = "db")]
464async fn refresh_replica_migration_readiness_with<S, F, Fut>(state: &S, check_readiness: F)
465where
466 S: ProvideProbeState + Sync,
467 F: FnOnce(ReplicaMigrationCheck) -> Fut,
468 Fut: std::future::Future<Output = crate::migrate::ReplicaMigrationReadiness>,
469{
470 let Some(check) = state.probes().replica_migration_check() else {
471 return;
472 };
473
474 let readiness = check_readiness(check).await;
475
476 if readiness.is_ready() {
477 state.probes().mark_replica_migrations_ready();
478 } else if let Some(detail) = readiness.detail() {
479 state.probes().mark_replica_migrations_unready(detail);
480 }
481}
482
483fn probe_response<S: ProvideProbeState>(
484 state: &S,
485 kind: ProbeKind,
486) -> (StatusCode, Json<ProbeResponse>) {
487 let startup_complete = state.probes().is_startup_complete();
488 let shutting_down = state.probes().is_shutting_down();
489 let (dependencies_ready, pool_status) = dependency_readiness(state);
490
491 let (status_code, status) = match kind {
492 ProbeKind::Live => (StatusCode::OK, "ok"),
493 ProbeKind::Startup if startup_complete => (StatusCode::OK, "ok"),
494 ProbeKind::Startup => (StatusCode::SERVICE_UNAVAILABLE, "starting"),
495 ProbeKind::Ready if startup_complete && !shutting_down && dependencies_ready => {
496 (StatusCode::OK, "ok")
497 }
498 ProbeKind::Ready => (StatusCode::SERVICE_UNAVAILABLE, "degraded"),
499 };
500
501 let detailed = state.health_detailed();
502 let body = ProbeResponse {
503 status,
504 version: if detailed {
505 Some(env!("CARGO_PKG_VERSION"))
506 } else {
507 None
508 },
509 profile: if detailed {
510 Some(state.profile().to_owned())
511 } else {
512 None
513 },
514 uptime: if detailed {
515 Some(state.uptime_display())
516 } else {
517 None
518 },
519 pool: if detailed { pool_status } else { None },
520 };
521
522 (status_code, Json(body))
523}
524
525pub async fn live_handler<S: ProvideProbeState + Send + Sync + 'static>(
527 State(state): State<S>,
528) -> impl IntoResponse {
529 probe_response(&state, ProbeKind::Live)
530}
531
532pub async fn ready_handler<S: ProvideProbeState + Send + Sync + 'static>(
534 State(state): State<S>,
535) -> impl IntoResponse {
536 #[cfg(feature = "db")]
537 refresh_replica_readiness(&state).await;
538 probe_response(&state, ProbeKind::Ready)
539}
540
541pub async fn startup_handler<S: ProvideProbeState + Send + Sync + 'static>(
543 State(state): State<S>,
544) -> impl IntoResponse {
545 probe_response(&state, ProbeKind::Startup)
546}
547
548pub(crate) async fn readiness_response<S: ProvideProbeState + Sync>(
550 state: &S,
551) -> (StatusCode, Json<ProbeResponse>) {
552 #[cfg(feature = "db")]
553 refresh_replica_readiness(state).await;
554 probe_response(state, ProbeKind::Ready)
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 struct TestProbeState {
562 probes: ProbeState,
563 health_detailed: bool,
564 profile: String,
565 }
566
567 impl ProvideProbeState for TestProbeState {
568 fn probes(&self) -> &ProbeState {
569 &self.probes
570 }
571
572 fn health_detailed(&self) -> bool {
573 self.health_detailed
574 }
575
576 fn profile(&self) -> &str {
577 &self.profile
578 }
579
580 fn uptime_display(&self) -> String {
581 "test uptime".to_string()
582 }
583
584 #[cfg(feature = "db")]
585 fn pool(
586 &self,
587 ) -> Option<&diesel_async::pooled_connection::deadpool::Pool<diesel_async::AsyncPgConnection>>
588 {
589 None
590 }
591 }
592
593 impl TestProbeState {
594 fn new() -> Self {
595 Self {
596 probes: ProbeState::pending_startup(),
597 health_detailed: true,
598 profile: "test".to_string(),
599 }
600 }
601 }
602
603 #[test]
604 fn test_live_handler_returns_ok() {
605 let state = TestProbeState::new();
606 let (status, Json(response)) = probe_response(&state, ProbeKind::Live);
607 assert_eq!(status, StatusCode::OK);
608 assert_eq!(response.status, "ok");
609 }
610
611 #[tokio::test]
612 async fn test_startup_handler_pending() {
613 let state = TestProbeState::new();
614 let (status, Json(response)) = probe_response(&state, ProbeKind::Startup);
615 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
616 assert_eq!(response.status, "starting");
617 }
618
619 #[tokio::test]
620 async fn test_startup_handler_complete() {
621 let state = TestProbeState::new();
622 state.mark_startup_complete();
623 let (status, Json(response)) = probe_response(&state, ProbeKind::Startup);
624 assert_eq!(status, StatusCode::OK);
625 assert_eq!(response.status, "ok");
626 }
627
628 #[tokio::test]
629 async fn test_ready_handler_pending_startup() {
630 let state = TestProbeState::new();
631 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
632 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
633 assert_eq!(response.status, "degraded");
634 }
635
636 #[tokio::test]
637 async fn test_ready_handler_complete_startup() {
638 let state = TestProbeState::new();
639 state.mark_startup_complete();
640 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
641 assert_eq!(status, StatusCode::OK);
642 assert_eq!(response.status, "ok");
643 }
644
645 #[tokio::test]
646 async fn test_ready_handler_shutting_down() {
647 let state = TestProbeState::new();
648 state.mark_startup_complete();
649 state.probes().begin_shutdown();
650 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
651 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
652 assert_eq!(response.status, "degraded");
653 }
654
655 #[cfg(feature = "db")]
656 #[tokio::test]
657 async fn ready_fails_when_replica_is_unready_and_policy_is_fail_readiness() {
658 let state = TestProbeState::new();
659 state.mark_startup_complete();
660 state
661 .probes()
662 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
663 state
664 .probes()
665 .mark_replica_unready("replica migrations lag primary");
666
667 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
668
669 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
670 assert_eq!(response.status, "degraded");
671 }
672
673 #[cfg(feature = "db")]
674 #[tokio::test]
675 async fn ready_fails_when_replica_is_configured_but_not_checked() {
676 let state = TestProbeState::new();
677 state.mark_startup_complete();
678 state
679 .probes()
680 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
681
682 let (status, Json(response)) = readiness_response(&state).await;
683
684 assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
685 assert_eq!(response.status, "degraded");
686 }
687
688 #[cfg(feature = "db")]
689 #[test]
690 fn replica_migration_lag_can_recover_without_resetting_connection_readiness() {
691 let probes = ProbeState::ready_for_test();
692 probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
693 probes.mark_replica_connection_ready();
694 probes.mark_replica_migrations_unready("replica migrations lag primary");
695
696 assert!(!probes.replica_allows_readiness());
697 assert!(!probes.should_route_reads_to_replica());
698
699 probes.mark_replica_connection_ready();
700 assert!(!probes.replica_allows_readiness());
701 assert!(!probes.should_route_reads_to_replica());
702
703 probes.mark_replica_migrations_ready();
704 assert!(probes.replica_allows_readiness());
705 assert!(probes.should_route_reads_to_replica());
706 }
707
708 #[cfg(feature = "db")]
709 #[test]
710 fn replica_migration_retry_urls_are_stored_for_readiness_rechecks() {
711 let probes = ProbeState::ready_for_test();
712 probes.configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
713 probes.configure_replica_migration_check(
714 "postgres://localhost/primary",
715 "postgres://localhost/replica",
716 );
717
718 let check = probes
719 .replica_migration_check()
720 .expect("migration check should be configured");
721
722 assert_eq!(check.primary_url, "postgres://localhost/primary");
723 assert_eq!(check.replica_url, "postgres://localhost/replica");
724 }
725
726 #[cfg(feature = "db")]
727 #[tokio::test]
728 async fn replica_migration_readiness_rechecks_after_initial_ready_state() {
729 let state = TestProbeState::new();
730 state.mark_startup_complete();
731 state
732 .probes()
733 .configure_replica_dependency(crate::config::ReplicaFallback::FailReadiness);
734 state.probes().configure_replica_migration_check(
735 "postgres://localhost/primary",
736 "postgres://localhost/replica",
737 );
738 state.probes().mark_replica_connection_ready();
739 state.probes().mark_replica_migrations_ready();
740
741 let checks = std::sync::atomic::AtomicUsize::new(0);
742 refresh_replica_migration_readiness_with(&state, |check| {
743 checks.fetch_add(1, Ordering::Relaxed);
744 assert_eq!(check.primary_url, "postgres://localhost/primary");
745 assert_eq!(check.replica_url, "postgres://localhost/replica");
746 std::future::ready(crate::migrate::ReplicaMigrationReadiness::Stale {
747 primary_latest: Some("20260511000000".to_owned()),
748 replica_latest: Some("20260510000000".to_owned()),
749 })
750 })
751 .await;
752
753 assert_eq!(checks.load(Ordering::Relaxed), 1);
754 assert!(!state.probes().replica_allows_readiness());
755 assert!(!state.probes().should_route_reads_to_replica());
756 }
757
758 #[cfg(feature = "db")]
759 #[tokio::test]
760 async fn ready_allows_primary_fallback_when_replica_is_unready() {
761 let state = TestProbeState::new();
762 state.mark_startup_complete();
763 state
764 .probes()
765 .configure_replica_dependency(crate::config::ReplicaFallback::Primary);
766 state
767 .probes()
768 .mark_replica_unready("replica migrations lag primary");
769
770 let (status, Json(response)) = probe_response(&state, ProbeKind::Ready);
771
772 assert_eq!(status, StatusCode::OK);
773 assert_eq!(response.status, "ok");
774 }
775
776 #[tokio::test]
777 async fn test_probe_state_set_draining() {
778 let state = ProbeState::starting();
779 assert!(!state.draining());
780 state.set_draining(true);
781 assert!(state.draining());
782 }
783
784 #[tokio::test]
785 async fn test_probe_state_set_startup_complete() {
786 let state = ProbeState::starting();
787 assert!(!state.is_startup_complete());
788 state.set_startup_complete(true);
789 assert!(state.is_startup_complete());
790 }
791
792 #[tokio::test]
793 async fn test_ready_for_test() {
794 let state = ProbeState::ready_for_test();
795 assert!(state.is_startup_complete());
796 }
797
798 #[tokio::test]
799 async fn test_health_detailed_false() {
800 let mut state = TestProbeState::new();
801 state.health_detailed = false;
802
803 let (_, Json(response)) = probe_response(&state, ProbeKind::Live);
804 assert!(response.version.is_none());
805 assert!(response.profile.is_none());
806 assert!(response.uptime.is_none());
807 assert!(response.pool.is_none());
808 }
809
810 #[tokio::test]
811 async fn test_begin_draining() {
812 let state = ProbeState::ready_for_test();
813 assert!(!state.draining());
814 state.begin_draining();
815 assert!(state.draining());
816 }
817}