1use humantime::format_duration;
16use pingora::lb::health_check::{
17 HealthCheck, HealthObserveCallback, TcpHealthCheck,
18};
19use pingora::upstreams::peer::PeerOptions;
20use snafu::Snafu;
21use std::time::Duration;
22use strum::EnumString;
23use tracing::info;
24static LOG_TARGET: &str = "pingap::health";
25
26mod grpc;
27mod http;
28pub use grpc::GrpcHealthCheck;
29pub use http::HealthCheckConf;
30
31fn new_internal_error(status: u16, message: impl ToString) -> pingora::BError {
33 pingora::Error::because(
34 pingora::ErrorType::HTTPStatus(status),
35 message.to_string(),
36 pingora::Error::new(pingora::ErrorType::InternalError),
37 )
38}
39
40const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
42const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(3);
43const DEFAULT_CHECK_FREQUENCY: Duration = Duration::from_secs(10);
44const DEFAULT_CONSECUTIVE_SUCCESS: usize = 1;
45const DEFAULT_CONSECUTIVE_FAILURE: usize = 2;
46
47#[derive(Debug, Snafu)]
48pub enum Error {
49 #[snafu(display("Url parse error {source}, {url}"))]
50 UrlParse {
51 source: url::ParseError,
52 url: String,
53 },
54 #[snafu(display("Invalid health check schema: {schema}, {message}"))]
55 InvalidSchema { schema: String, message: String },
56}
57type Result<T, E = Error> = std::result::Result<T, E>;
58
59fn update_peer_options(
60 conf: &HealthCheckConf,
61 opt: PeerOptions,
62) -> PeerOptions {
63 let mut options = opt;
64 let timeout = Some(conf.connection_timeout);
65 options.verify_hostname = false;
66 options.verify_cert = false;
67 options.connection_timeout = timeout;
68 options.total_connection_timeout = timeout;
69 options.read_timeout = Some(conf.read_timeout);
70 options.write_timeout = Some(conf.read_timeout);
71 options.idle_timeout = Some(Duration::from_secs(0));
73 options
74}
75
76fn new_tcp_health_check(
77 _name: &str,
78 conf: &HealthCheckConf,
79 health_changed_callback: Option<HealthObserveCallback>,
80) -> TcpHealthCheck {
81 let mut check = TcpHealthCheck::default();
82 check.peer_template.options =
83 update_peer_options(conf, check.peer_template.options.clone());
84 check.consecutive_success = conf.consecutive_success;
85 check.consecutive_failure = conf.consecutive_failure;
86 check.health_changed_callback = health_changed_callback;
87
88 check
89}
90
91pub fn new_health_check(
92 name: &str,
93 health_check: &str,
94 health_changed_callback: Option<HealthObserveCallback>,
95) -> Result<(
96 HealthCheckConf,
97 Box<dyn HealthCheck + Send + Sync + 'static>,
98)> {
99 let mut health_check_conf = HealthCheckConf {
100 schema: HealthCheckSchema::Tcp,
101 check_frequency: DEFAULT_CHECK_FREQUENCY,
102 ..Default::default()
103 };
104 let hc: Box<dyn HealthCheck + Send + Sync + 'static> = if health_check
105 .is_empty()
106 {
107 let mut check = TcpHealthCheck::new();
108 check.health_changed_callback = health_changed_callback;
109 check.peer_template.options.connection_timeout =
110 Some(Duration::from_secs(3));
111 info!(
112 target: LOG_TARGET,
113 name,
114 options = %check.peer_template.options,
115 "new health check"
116 );
117 check
118 } else {
119 health_check_conf = health_check.try_into()?;
120 info!(
121 target: LOG_TARGET,
122 name,
123 schema = health_check_conf.schema.to_string(),
124 path = health_check_conf.path,
125 connection_timeout =
126 format_duration(health_check_conf.connection_timeout)
127 .to_string(),
128 read_timeout =
129 format_duration(health_check_conf.read_timeout).to_string(),
130 check_frequency =
131 format_duration(health_check_conf.check_frequency).to_string(),
132 reuse_connection = health_check_conf.reuse_connection,
133 consecutive_success = health_check_conf.consecutive_success,
134 consecutive_failure = health_check_conf.consecutive_failure,
135 "new http/grpc health check"
136 );
137 match health_check_conf.schema {
138 HealthCheckSchema::Http | HealthCheckSchema::Https => {
139 Box::new(http::new_http_health_check(
140 name,
141 &health_check_conf,
142 health_changed_callback,
143 ))
144 },
145 HealthCheckSchema::Grpc => {
146 let check = GrpcHealthCheck::new(
147 name,
148 &health_check_conf,
149 health_changed_callback,
150 )?;
151 Box::new(check)
152 },
153 _ => Box::new(new_tcp_health_check(
154 name,
155 &health_check_conf,
156 health_changed_callback,
157 )),
158 }
159 };
160 Ok((health_check_conf, hc))
161}
162
163#[derive(PartialEq, Debug, Default, Clone, EnumString, strum::Display)]
164#[strum(serialize_all = "snake_case")]
165pub enum HealthCheckSchema {
166 #[default]
167 Tcp,
168 Http,
169 Https,
170 Grpc,
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use pingora::upstreams::peer::Peer;
177 use pretty_assertions::assert_eq;
178 use std::time::Duration;
179 #[test]
180 fn test_health_check_conf() {
181 let tcp_check: HealthCheckConf =
182 "tcp://upstreamname?connection_timeout=3s&success=2&failure=1&check_frequency=10s"
183 .try_into()
184 .unwrap();
185 assert_eq!(
186 r###"HealthCheckConf { schema: Tcp, host: "upstreamname", path: "", connection_timeout: 3s, read_timeout: 3s, check_frequency: 10s, reuse_connection: false, consecutive_success: 2, consecutive_failure: 1, service: "", tls: false, parallel_check: false }"###,
187 format!("{tcp_check:?}")
188 );
189 let tcp_check = new_tcp_health_check("", &tcp_check, None);
190 assert_eq!(1, tcp_check.consecutive_failure);
191 assert_eq!(2, tcp_check.consecutive_success);
192 assert_eq!(
193 Duration::from_secs(3),
194 tcp_check.peer_template.connection_timeout().unwrap()
195 );
196 }
197 #[test]
198 fn test_new_health_check() {
199 let (conf, _) = new_health_check("upstreamname", "https://upstreamname/ping?connection_timeout=3s&read_timeout=1s&success=2&failure=1&check_frequency=10s&from=nginx&reuse", None).unwrap();
200 assert_eq!(Duration::from_secs(10), conf.check_frequency);
201 }
202
203 #[test]
204 fn test_new_internal_error() {
205 let err = new_internal_error(500, "test");
206 assert_eq!(
207 err.to_string().trim(),
208 "HTTPStatus context: test cause: InternalError"
209 );
210 }
211}