switchboard_node_health/
lib.rs

1use log::info;
2use std::convert::Infallible;
3use std::sync::Arc;
4use tokio::sync::OnceCell;
5use tokio::sync::RwLock;
6use warp::Filter;
7
8pub static SWITCHBOARD_HEALTH: OnceCell<SwitchboardHealth> = OnceCell::const_new();
9
10// Define an enum for health status
11#[derive(Default, Debug, Clone, PartialEq, Eq)]
12pub enum HealthStatus {
13    #[default]
14    NotReady,
15    Ready,
16}
17
18// Define the SwitchboardHealth struct
19#[derive(Default, Debug, Clone)]
20pub struct SwitchboardHealth {
21    pub health: Arc<RwLock<HealthStatus>>,
22}
23
24impl SwitchboardHealth {
25    pub async fn get_or_init() -> &'static SwitchboardHealth {
26        SWITCHBOARD_HEALTH
27            .get_or_init(|| async { SwitchboardHealth::initialize().await })
28            .await
29    }
30
31    pub async fn initialize() -> Self {
32        let health = Arc::new(RwLock::new(HealthStatus::NotReady));
33
34        // Clone the Arc to move into the async block
35        let server_health = health.clone();
36
37        tokio::spawn(async move {
38            // Create a filter for the shared state
39            let health_status_filter = warp::any().map(move || server_health.clone());
40
41            // Define the health check route
42            let health_route = warp::path("healthz")
43                .and(warp::get())
44                .and(health_status_filter)
45                .and_then(SwitchboardHealth::respond_with_health);
46
47            warp::serve(health_route).run(([0, 0, 0, 0], 8080)).await; // Bind to 0.0.0.0:8080
48        });
49
50        info!("Health: {:?}", HealthStatus::NotReady);
51
52        // Return struct
53        SwitchboardHealth {
54            health: health.clone(),
55        }
56    }
57
58    pub async fn set_is_ready(&self) {
59        info!("Health: {:?}", HealthStatus::Ready);
60        *self.health.write().await = HealthStatus::Ready;
61    }
62
63    pub async fn set_is_not_ready(&self) {
64        info!("Health: {:?}", HealthStatus::NotReady);
65        *self.health.write().await = HealthStatus::NotReady;
66    }
67
68    async fn respond_with_health(
69        health_status: Arc<RwLock<HealthStatus>>,
70    ) -> Result<impl warp::Reply, Infallible> {
71        match *health_status.read().await {
72            HealthStatus::Ready => Ok(warp::reply::with_status("Ok", warp::http::StatusCode::OK)),
73            HealthStatus::NotReady => Ok(warp::reply::with_status(
74                "Service Unavailable",
75                warp::http::StatusCode::SERVICE_UNAVAILABLE,
76            )),
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    use reqwest::StatusCode;
86
87    #[tokio::test]
88    async fn test_health_check() {
89        let health = SwitchboardHealth::get_or_init().await;
90
91        // Test initial state
92        assert_eq!(*health.health.read().await, HealthStatus::NotReady);
93
94        // Test state transitions
95        health.set_is_ready().await;
96        assert_eq!(*health.health.read().await, HealthStatus::Ready);
97
98        health.set_is_not_ready().await;
99        assert_eq!(*health.health.read().await, HealthStatus::NotReady);
100
101        // Test http server
102        let client = reqwest::Client::new();
103
104        let not_ready_resp = client
105            .get("http://localhost:8080/healthz")
106            .send()
107            .await
108            .unwrap();
109        assert_eq!(not_ready_resp.status(), StatusCode::SERVICE_UNAVAILABLE);
110
111        health.set_is_ready().await;
112        let ready_resp = client
113            .get("http://localhost:8080/healthz")
114            .send()
115            .await
116            .unwrap();
117        assert_eq!(ready_resp.status(), StatusCode::OK);
118        let ready_resp_body: String = ready_resp.text().await.unwrap();
119        assert_eq!(ready_resp_body, "Ok".to_string());
120    }
121}