code0_flow/flow_health/
mod.rs

1use futures_core::Stream;
2use std::pin::Pin;
3
4use tonic::{Request, Response, Status};
5use tonic_health::pb::{
6    HealthCheckRequest, HealthCheckResponse, health_check_response::ServingStatus,
7    health_server::Health,
8};
9
10#[derive(Debug)]
11pub struct HealthService {
12    nats_url: String,
13}
14
15impl HealthService {
16    pub fn new(nats_url: String) -> Self {
17        Self { nats_url }
18    }
19
20    async fn check_nats_connection(&self) -> bool {
21        match async_nats::connect(&self.nats_url).await {
22            Ok(_client) => {
23                // Successfully connected to NATS
24                true
25            }
26            Err(_) => false,
27        }
28    }
29}
30
31#[tonic::async_trait]
32impl Health for HealthService {
33    async fn check(
34        &self,
35        request: Request<HealthCheckRequest>,
36    ) -> Result<Response<HealthCheckResponse>, Status> {
37        let service = request.into_inner().service.to_lowercase();
38
39        match service.as_str() {
40            "liveness" => Ok(Response::new(HealthCheckResponse {
41                status: ServingStatus::Serving as i32,
42            })),
43            "readiness" => {
44                let nats_healthy = self.check_nats_connection().await;
45                let status = if nats_healthy {
46                    ServingStatus::Serving
47                } else {
48                    ServingStatus::NotServing
49                };
50                Ok(Response::new(HealthCheckResponse {
51                    status: status as i32,
52                }))
53            }
54            _ => Err(Status::invalid_argument(
55                "Unknown service. Only `liveness` and `readiness` are supported.",
56            )),
57        }
58    }
59
60    type WatchStream =
61        Pin<Box<dyn Stream<Item = Result<HealthCheckResponse, Status>> + Send + 'static>>;
62
63    async fn watch(
64        &self,
65        _request: Request<HealthCheckRequest>,
66    ) -> Result<Response<Self::WatchStream>, Status> {
67        Err(Status::unimplemented("Watch is not implemented"))
68    }
69}