code0_flow/flow_health/
mod.rs1use futures_core::Stream;
2use std::pin::Pin;
3
4use tonic::{Request, Response, Status};
5use tonic_health::pb::{
6 HealthCheckRequest, HealthCheckResponse, health_check_response::ServingStatus,
7 health_server::Health,
8};
9
10#[derive(Debug)]
11pub struct HealthService {
12 nats_url: String,
13}
14
15impl HealthService {
16 pub fn new(nats_url: String) -> Self {
17 Self { nats_url }
18 }
19
20 async fn check_nats_connection(&self) -> bool {
21 match async_nats::connect(&self.nats_url).await {
22 Ok(_client) => {
23 true
25 }
26 Err(_) => false,
27 }
28 }
29}
30
31#[tonic::async_trait]
32impl Health for HealthService {
33 async fn check(
34 &self,
35 request: Request<HealthCheckRequest>,
36 ) -> Result<Response<HealthCheckResponse>, Status> {
37 let service = request.into_inner().service.to_lowercase();
38
39 match service.as_str() {
40 "liveness" => Ok(Response::new(HealthCheckResponse {
41 status: ServingStatus::Serving as i32,
42 })),
43 "readiness" => {
44 let nats_healthy = self.check_nats_connection().await;
45 let status = if nats_healthy {
46 ServingStatus::Serving
47 } else {
48 ServingStatus::NotServing
49 };
50 Ok(Response::new(HealthCheckResponse {
51 status: status as i32,
52 }))
53 }
54 _ => Err(Status::invalid_argument(
55 "Unknown service. Only `liveness` and `readiness` are supported.",
56 )),
57 }
58 }
59
60 type WatchStream =
61 Pin<Box<dyn Stream<Item = Result<HealthCheckResponse, Status>> + Send + 'static>>;
62
63 async fn watch(
64 &self,
65 _request: Request<HealthCheckRequest>,
66 ) -> Result<Response<Self::WatchStream>, Status> {
67 Err(Status::unimplemented("Watch is not implemented"))
68 }
69}