sentinel_proxy/
grpc_health.rs1use async_trait::async_trait;
27use pingora_core::{Error, ErrorType::CustomCode, Result};
28use pingora_load_balancing::health_check::HealthCheck;
29use pingora_load_balancing::Backend;
30use std::time::Duration;
31use tonic::transport::Endpoint;
32use tonic_health::pb::health_check_response::ServingStatus;
33use tonic_health::pb::health_client::HealthClient;
34use tonic_health::pb::HealthCheckRequest;
35use tracing::{debug, trace, warn};
36
37pub struct GrpcHealthCheck {
42 service: String,
44
45 pub consecutive_success: usize,
47
48 pub consecutive_failure: usize,
50
51 timeout: Duration,
53}
54
55impl GrpcHealthCheck {
56 pub fn new(service: String, timeout: Duration) -> Self {
63 Self {
64 service,
65 consecutive_success: 1,
66 consecutive_failure: 1,
67 timeout,
68 }
69 }
70
71 async fn check_grpc(&self, addr: &str) -> Result<()> {
73 let url = format!("http://{}", addr);
76
77 trace!(
78 address = %addr,
79 service = %self.service,
80 timeout_ms = self.timeout.as_millis(),
81 "Performing gRPC health check"
82 );
83
84 let endpoint = match Endpoint::from_shared(url.clone()) {
86 Ok(ep) => ep.timeout(self.timeout).connect_timeout(self.timeout),
87 Err(e) => {
88 warn!(address = %addr, error = %e, "Invalid gRPC endpoint URL");
89 return Err(Error::explain(
90 CustomCode("gRPC health check", 1),
91 format!("Invalid endpoint: {}", e),
92 ));
93 }
94 };
95
96 let channel = match endpoint.connect().await {
98 Ok(ch) => ch,
99 Err(e) => {
100 debug!(
101 address = %addr,
102 error = %e,
103 "Failed to connect for gRPC health check"
104 );
105 return Err(Error::explain(
106 CustomCode("gRPC health check", 2),
107 format!("Connection failed: {}", e),
108 ));
109 }
110 };
111
112 let mut client = HealthClient::new(channel);
114
115 let request = tonic::Request::new(HealthCheckRequest {
116 service: self.service.clone(),
117 });
118
119 let response = match client.check(request).await {
120 Ok(resp) => resp,
121 Err(e) => {
122 debug!(
123 address = %addr,
124 service = %self.service,
125 error = %e,
126 "gRPC health check RPC failed"
127 );
128 return Err(Error::explain(
129 CustomCode("gRPC health check", 3),
130 format!("Health check RPC failed: {}", e),
131 ));
132 }
133 };
134
135 let status = response.into_inner().status();
136
137 match status {
138 ServingStatus::Serving => {
139 trace!(
140 address = %addr,
141 service = %self.service,
142 "gRPC health check passed: SERVING"
143 );
144 Ok(())
145 }
146 ServingStatus::NotServing => {
147 debug!(
148 address = %addr,
149 service = %self.service,
150 "gRPC health check failed: NOT_SERVING"
151 );
152 Err(Error::explain(
153 CustomCode("gRPC health check", 4),
154 "Service status: NOT_SERVING",
155 ))
156 }
157 ServingStatus::Unknown => {
158 debug!(
159 address = %addr,
160 service = %self.service,
161 "gRPC health check failed: UNKNOWN"
162 );
163 Err(Error::explain(
164 CustomCode("gRPC health check", 5),
165 "Service status: UNKNOWN",
166 ))
167 }
168 ServingStatus::ServiceUnknown => {
169 debug!(
170 address = %addr,
171 service = %self.service,
172 "gRPC health check failed: SERVICE_UNKNOWN"
173 );
174 Err(Error::explain(
175 CustomCode("gRPC health check", 6),
176 "Service status: SERVICE_UNKNOWN",
177 ))
178 }
179 }
180 }
181}
182
183#[async_trait]
184impl HealthCheck for GrpcHealthCheck {
185 async fn check(&self, target: &Backend) -> Result<()> {
187 let addr = target.addr.to_string();
188 self.check_grpc(&addr).await
189 }
190
191 fn health_threshold(&self, success: bool) -> usize {
196 if success {
197 self.consecutive_success
198 } else {
199 self.consecutive_failure
200 }
201 }
202}
203
204impl Default for GrpcHealthCheck {
205 fn default() -> Self {
206 Self::new(String::new(), Duration::from_secs(5))
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[test]
215 fn test_grpc_health_check_new() {
216 let hc = GrpcHealthCheck::new("my.service".to_string(), Duration::from_secs(10));
217 assert_eq!(hc.service, "my.service");
218 assert_eq!(hc.timeout, Duration::from_secs(10));
219 assert_eq!(hc.consecutive_success, 1);
220 assert_eq!(hc.consecutive_failure, 1);
221 }
222
223 #[test]
224 fn test_grpc_health_check_default() {
225 let hc = GrpcHealthCheck::default();
226 assert_eq!(hc.service, "");
227 assert_eq!(hc.timeout, Duration::from_secs(5));
228 }
229
230 #[test]
231 fn test_health_threshold() {
232 let mut hc = GrpcHealthCheck::new("".to_string(), Duration::from_secs(5));
233 hc.consecutive_success = 3;
234 hc.consecutive_failure = 5;
235
236 assert_eq!(hc.health_threshold(true), 3);
237 assert_eq!(hc.health_threshold(false), 5);
238 }
239
240 #[tokio::test]
241 async fn test_grpc_health_check_connection_refused() {
242 let hc = GrpcHealthCheck::new("".to_string(), Duration::from_secs(1));
243
244 let result = hc.check_grpc("127.0.0.1:59999").await;
246 assert!(result.is_err());
247
248 let err = result.unwrap_err();
249 assert!(err.to_string().contains("Connection failed") || err.to_string().contains("gRPC"));
251 }
252}