sentinel_proxy/
grpc_health.rs

1//! gRPC Health Checking Protocol implementation
2//!
3//! Implements the standard gRPC Health Checking Protocol (grpc.health.v1.Health)
4//! for health checking gRPC backends.
5//!
6//! # Protocol
7//!
8//! The gRPC health check protocol uses:
9//! - Service: `grpc.health.v1.Health`
10//! - Method: `Check(HealthCheckRequest) returns (HealthCheckResponse)`
11//! - Request: `{ service: string }` - empty string for server health
12//! - Response: `{ status: ServingStatus }`
13//!   - `SERVING` = healthy
14//!   - `NOT_SERVING`, `UNKNOWN`, `SERVICE_UNKNOWN` = unhealthy
15//!
16//! # Example
17//!
18//! ```ignore
19//! use sentinel_proxy::grpc_health::GrpcHealthCheck;
20//! use std::time::Duration;
21//!
22//! let hc = GrpcHealthCheck::new("my.service.Name".to_string(), Duration::from_secs(5));
23//! // Use with Pingora's health check infrastructure
24//! ```
25
26use async_trait::async_trait;
27use pingora_core::{Error, ErrorType::CustomCode, Result};
28use pingora_load_balancing::health_check::HealthCheck;
29use pingora_load_balancing::Backend;
30use std::time::Duration;
31use tonic::transport::Endpoint;
32use tonic_health::pb::health_check_response::ServingStatus;
33use tonic_health::pb::health_client::HealthClient;
34use tonic_health::pb::HealthCheckRequest;
35use tracing::{debug, trace, warn};
36
37/// gRPC health check implementing Pingora's HealthCheck trait
38///
39/// This health check connects to a gRPC server and calls the standard
40/// `grpc.health.v1.Health/Check` method to verify the server is healthy.
41pub struct GrpcHealthCheck {
42    /// Service name to check (empty string = overall server health)
43    service: String,
44
45    /// Number of consecutive successes required to flip from unhealthy to healthy
46    pub consecutive_success: usize,
47
48    /// Number of consecutive failures required to flip from healthy to unhealthy
49    pub consecutive_failure: usize,
50
51    /// Health check timeout
52    timeout: Duration,
53}
54
55impl GrpcHealthCheck {
56    /// Create a new gRPC health check
57    ///
58    /// # Arguments
59    ///
60    /// * `service` - The service name to check. Empty string checks overall server health.
61    /// * `timeout` - Timeout for the health check RPC call
62    pub fn new(service: String, timeout: Duration) -> Self {
63        Self {
64            service,
65            consecutive_success: 1,
66            consecutive_failure: 1,
67            timeout,
68        }
69    }
70
71    /// Perform the gRPC health check against a specific address
72    async fn check_grpc(&self, addr: &str) -> Result<()> {
73        // Build the endpoint URL
74        // The address from Backend is in format "host:port"
75        let url = format!("http://{}", addr);
76
77        trace!(
78            address = %addr,
79            service = %self.service,
80            timeout_ms = self.timeout.as_millis(),
81            "Performing gRPC health check"
82        );
83
84        // Create endpoint with timeout
85        let endpoint = match Endpoint::from_shared(url.clone()) {
86            Ok(ep) => ep.timeout(self.timeout).connect_timeout(self.timeout),
87            Err(e) => {
88                warn!(address = %addr, error = %e, "Invalid gRPC endpoint URL");
89                return Err(Error::explain(
90                    CustomCode("gRPC health check", 1),
91                    format!("Invalid endpoint: {}", e),
92                ));
93            }
94        };
95
96        // Connect to the server
97        let channel = match endpoint.connect().await {
98            Ok(ch) => ch,
99            Err(e) => {
100                debug!(
101                    address = %addr,
102                    error = %e,
103                    "Failed to connect for gRPC health check"
104                );
105                return Err(Error::explain(
106                    CustomCode("gRPC health check", 2),
107                    format!("Connection failed: {}", e),
108                ));
109            }
110        };
111
112        // Create health client and perform check
113        let mut client = HealthClient::new(channel);
114
115        let request = tonic::Request::new(HealthCheckRequest {
116            service: self.service.clone(),
117        });
118
119        let response = match client.check(request).await {
120            Ok(resp) => resp,
121            Err(e) => {
122                debug!(
123                    address = %addr,
124                    service = %self.service,
125                    error = %e,
126                    "gRPC health check RPC failed"
127                );
128                return Err(Error::explain(
129                    CustomCode("gRPC health check", 3),
130                    format!("Health check RPC failed: {}", e),
131                ));
132            }
133        };
134
135        let status = response.into_inner().status();
136
137        match status {
138            ServingStatus::Serving => {
139                trace!(
140                    address = %addr,
141                    service = %self.service,
142                    "gRPC health check passed: SERVING"
143                );
144                Ok(())
145            }
146            ServingStatus::NotServing => {
147                debug!(
148                    address = %addr,
149                    service = %self.service,
150                    "gRPC health check failed: NOT_SERVING"
151                );
152                Err(Error::explain(
153                    CustomCode("gRPC health check", 4),
154                    "Service status: NOT_SERVING",
155                ))
156            }
157            ServingStatus::Unknown => {
158                debug!(
159                    address = %addr,
160                    service = %self.service,
161                    "gRPC health check failed: UNKNOWN"
162                );
163                Err(Error::explain(
164                    CustomCode("gRPC health check", 5),
165                    "Service status: UNKNOWN",
166                ))
167            }
168            ServingStatus::ServiceUnknown => {
169                debug!(
170                    address = %addr,
171                    service = %self.service,
172                    "gRPC health check failed: SERVICE_UNKNOWN"
173                );
174                Err(Error::explain(
175                    CustomCode("gRPC health check", 6),
176                    "Service status: SERVICE_UNKNOWN",
177                ))
178            }
179        }
180    }
181}
182
183#[async_trait]
184impl HealthCheck for GrpcHealthCheck {
185    /// Check if the backend is healthy using gRPC health protocol
186    async fn check(&self, target: &Backend) -> Result<()> {
187        let addr = target.addr.to_string();
188        self.check_grpc(&addr).await
189    }
190
191    /// Return the health threshold for flipping health status
192    ///
193    /// * `success: true` - returns consecutive_success (unhealthy -> healthy)
194    /// * `success: false` - returns consecutive_failure (healthy -> unhealthy)
195    fn health_threshold(&self, success: bool) -> usize {
196        if success {
197            self.consecutive_success
198        } else {
199            self.consecutive_failure
200        }
201    }
202}
203
204impl Default for GrpcHealthCheck {
205    fn default() -> Self {
206        Self::new(String::new(), Duration::from_secs(5))
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn test_grpc_health_check_new() {
216        let hc = GrpcHealthCheck::new("my.service".to_string(), Duration::from_secs(10));
217        assert_eq!(hc.service, "my.service");
218        assert_eq!(hc.timeout, Duration::from_secs(10));
219        assert_eq!(hc.consecutive_success, 1);
220        assert_eq!(hc.consecutive_failure, 1);
221    }
222
223    #[test]
224    fn test_grpc_health_check_default() {
225        let hc = GrpcHealthCheck::default();
226        assert_eq!(hc.service, "");
227        assert_eq!(hc.timeout, Duration::from_secs(5));
228    }
229
230    #[test]
231    fn test_health_threshold() {
232        let mut hc = GrpcHealthCheck::new("".to_string(), Duration::from_secs(5));
233        hc.consecutive_success = 3;
234        hc.consecutive_failure = 5;
235
236        assert_eq!(hc.health_threshold(true), 3);
237        assert_eq!(hc.health_threshold(false), 5);
238    }
239
240    #[tokio::test]
241    async fn test_grpc_health_check_connection_refused() {
242        let hc = GrpcHealthCheck::new("".to_string(), Duration::from_secs(1));
243
244        // Try to connect to a non-existent server
245        let result = hc.check_grpc("127.0.0.1:59999").await;
246        assert!(result.is_err());
247
248        let err = result.unwrap_err();
249        // Should fail with connection error
250        assert!(err.to_string().contains("Connection failed") || err.to_string().contains("gRPC"));
251    }
252}