camber/http/health.rs
1use crate::RuntimeError;
2use crate::resource::{MIN_HEALTH_INTERVAL, Resource};
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5);
8
9/// Probe a URL and return `true` if the response is a success status.
10async fn probe_url(client: &reqwest::Client, url: &str) -> bool {
11 client
12 .get(url)
13 .timeout(HEALTH_CHECK_TIMEOUT)
14 .send()
15 .await
16 .is_ok_and(|r| r.status().is_success())
17}
18
19impl std::fmt::Debug for ProxyHealthResource {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("ProxyHealthResource")
22 .field("name", &self.name)
23 .field("url", &self.url)
24 .finish()
25 }
26}
27
28/// A proxy backend health check that implements [`Resource`] for lifecycle
29/// integration.
30///
31/// Create with [`ProxyHealthResource::new`], pass the [`routing_flag`] to
32/// [`Router::proxy_checked`], and register the resource with
33/// [`RuntimeBuilder::resource`]. The runtime manages the health check interval.
34///
35/// [`routing_flag`]: ProxyHealthResource::routing_flag
36/// [`Router::proxy_checked`]: super::Router::proxy_checked
37/// [`RuntimeBuilder::resource`]: crate::RuntimeBuilder::resource
38pub struct ProxyHealthResource {
39 name: Box<str>,
40 url: Box<str>,
41 routing_flag: Arc<AtomicBool>,
42}
43
44impl ProxyHealthResource {
45 /// Create a new proxy health resource.
46 ///
47 /// `backend` is the base URL (e.g., `"http://localhost:8080"`).
48 /// `path` is the health endpoint path (e.g., `"/health"`).
49 ///
50 /// The routing flag starts `true` (healthy).
51 pub fn new(backend: &str, path: &str) -> Self {
52 Self {
53 name: Box::from(backend),
54 url: format!("{backend}{path}").into_boxed_str(),
55 routing_flag: Arc::new(AtomicBool::new(true)),
56 }
57 }
58
59 /// Get the routing flag for use with [`Router::proxy_checked`].
60 ///
61 /// The runtime updates this flag based on health check results.
62 /// The proxy router reads it to decide whether to forward requests.
63 ///
64 /// [`Router::proxy_checked`]: super::Router::proxy_checked
65 pub fn routing_flag(&self) -> Arc<AtomicBool> {
66 Arc::clone(&self.routing_flag)
67 }
68}
69
70impl Resource for ProxyHealthResource {
71 fn name(&self) -> &str {
72 &self.name
73 }
74
75 fn health_check(&self) -> Result<(), RuntimeError> {
76 let client = super::async_proxy::proxy_client()?;
77 let handle = tokio::runtime::Handle::current();
78 let url: &str = &self.url;
79 let ok = handle.block_on(probe_url(client, url));
80 self.routing_flag.store(ok, Ordering::Release);
81 match ok {
82 true => Ok(()),
83 false => Err(RuntimeError::Http("backend health check failed".into())),
84 }
85 }
86
87 fn shutdown(&self) -> Result<(), RuntimeError> {
88 Ok(())
89 }
90}
91
92/// Spawn a background Tokio task that polls a backend health endpoint.
93///
94/// Performs an initial probe before returning, so the flag reflects the
95/// backend's real state immediately. The background loop then continues
96/// polling at `interval`.
97///
98/// Returns an `Arc<AtomicBool>` that reflects the backend's health state.
99/// On poll failure it flips to `false`; on success it flips back to `true`.
100///
101/// For lifecycle integration (health reporting via `/health`, structured
102/// shutdown), use [`ProxyHealthResource`] with [`RuntimeBuilder::resource`]
103/// instead.
104///
105/// # Errors
106///
107/// Returns `RuntimeError::InvalidArgument` if `interval` is less than 1 second.
108///
109/// [`RuntimeBuilder::resource`]: crate::RuntimeBuilder::resource
110pub async fn spawn_health_checker(
111 backend: &str,
112 path: &str,
113 interval: Duration,
114) -> Result<Arc<AtomicBool>, RuntimeError> {
115 if interval < MIN_HEALTH_INTERVAL {
116 return Err(RuntimeError::InvalidArgument(
117 "health check interval must be at least 1 second".into(),
118 ));
119 }
120 let url = format!("{backend}{path}");
121
122 // Reuse the shared no-proxy client from async_proxy to avoid a second
123 // connection pool and TLS session cache.
124 let client = super::async_proxy::proxy_client()?.clone();
125
126 // Initial probe before spawning the background loop.
127 let initial_ok = probe_url(&client, &url).await;
128 let healthy = Arc::new(AtomicBool::new(initial_ok));
129 let flag = Arc::clone(&healthy);
130
131 tokio::spawn(async move {
132 loop {
133 tokio::time::sleep(interval).await;
134 let ok = probe_url(&client, &url).await;
135 flag.store(ok, Ordering::Release);
136 }
137 });
138
139 Ok(healthy)
140}