Skip to main content

pingap_health/
lib.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use humantime::format_duration;
16use pingora::lb::health_check::{
17    HealthCheck, HealthObserveCallback, TcpHealthCheck,
18};
19use pingora::upstreams::peer::PeerOptions;
20use snafu::Snafu;
21use std::time::Duration;
22use strum::EnumString;
23use tracing::info;
24static LOG_TARGET: &str = "pingap::health";
25
26mod grpc;
27mod http;
28pub use grpc::GrpcHealthCheck;
29pub use http::HealthCheckConf;
30
31/// Creates a new internal error
32fn new_internal_error(status: u16, message: impl ToString) -> pingora::BError {
33    pingora::Error::because(
34        pingora::ErrorType::HTTPStatus(status),
35        message.to_string(),
36        pingora::Error::new(pingora::ErrorType::InternalError),
37    )
38}
39
40// Add constants for default values
41const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
42const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(3);
43const DEFAULT_CHECK_FREQUENCY: Duration = Duration::from_secs(10);
44const DEFAULT_CONSECUTIVE_SUCCESS: usize = 1;
45const DEFAULT_CONSECUTIVE_FAILURE: usize = 2;
46
47#[derive(Debug, Snafu)]
48pub enum Error {
49    #[snafu(display("Url parse error {source}, {url}"))]
50    UrlParse {
51        source: url::ParseError,
52        url: String,
53    },
54    #[snafu(display("Invalid health check schema: {schema}, {message}"))]
55    InvalidSchema { schema: String, message: String },
56}
57type Result<T, E = Error> = std::result::Result<T, E>;
58
59fn update_peer_options(
60    conf: &HealthCheckConf,
61    opt: PeerOptions,
62) -> PeerOptions {
63    let mut options = opt;
64    let timeout = Some(conf.connection_timeout);
65    options.verify_hostname = false;
66    options.verify_cert = false;
67    options.connection_timeout = timeout;
68    options.total_connection_timeout = timeout;
69    options.read_timeout = Some(conf.read_timeout);
70    options.write_timeout = Some(conf.read_timeout);
71    // set zero to disable reuse connection
72    options.idle_timeout = Some(Duration::from_secs(0));
73    options
74}
75
76fn new_tcp_health_check(
77    _name: &str,
78    conf: &HealthCheckConf,
79    health_changed_callback: Option<HealthObserveCallback>,
80) -> TcpHealthCheck {
81    let mut check = TcpHealthCheck::default();
82    check.peer_template.options =
83        update_peer_options(conf, check.peer_template.options.clone());
84    check.consecutive_success = conf.consecutive_success;
85    check.consecutive_failure = conf.consecutive_failure;
86    check.health_changed_callback = health_changed_callback;
87
88    check
89}
90
91pub fn new_health_check(
92    name: &str,
93    health_check: &str,
94    health_changed_callback: Option<HealthObserveCallback>,
95) -> Result<(
96    HealthCheckConf,
97    Box<dyn HealthCheck + Send + Sync + 'static>,
98)> {
99    let mut health_check_conf = HealthCheckConf {
100        schema: HealthCheckSchema::Tcp,
101        check_frequency: DEFAULT_CHECK_FREQUENCY,
102        ..Default::default()
103    };
104    let hc: Box<dyn HealthCheck + Send + Sync + 'static> = if health_check
105        .is_empty()
106    {
107        let mut check = TcpHealthCheck::new();
108        check.health_changed_callback = health_changed_callback;
109        check.peer_template.options.connection_timeout =
110            Some(Duration::from_secs(3));
111        info!(
112            target: LOG_TARGET,
113            name,
114            options = %check.peer_template.options,
115            "new health check"
116        );
117        check
118    } else {
119        health_check_conf = health_check.try_into()?;
120        info!(
121            target: LOG_TARGET,
122            name,
123            schema = health_check_conf.schema.to_string(),
124            path = health_check_conf.path,
125            connection_timeout =
126                format_duration(health_check_conf.connection_timeout)
127                    .to_string(),
128            read_timeout =
129                format_duration(health_check_conf.read_timeout).to_string(),
130            check_frequency =
131                format_duration(health_check_conf.check_frequency).to_string(),
132            reuse_connection = health_check_conf.reuse_connection,
133            consecutive_success = health_check_conf.consecutive_success,
134            consecutive_failure = health_check_conf.consecutive_failure,
135            "new http/grpc health check"
136        );
137        match health_check_conf.schema {
138            HealthCheckSchema::Http | HealthCheckSchema::Https => {
139                Box::new(http::new_http_health_check(
140                    name,
141                    &health_check_conf,
142                    health_changed_callback,
143                ))
144            },
145            HealthCheckSchema::Grpc => {
146                let check = GrpcHealthCheck::new(
147                    name,
148                    &health_check_conf,
149                    health_changed_callback,
150                )?;
151                Box::new(check)
152            },
153            _ => Box::new(new_tcp_health_check(
154                name,
155                &health_check_conf,
156                health_changed_callback,
157            )),
158        }
159    };
160    Ok((health_check_conf, hc))
161}
162
163#[derive(PartialEq, Debug, Default, Clone, EnumString, strum::Display)]
164#[strum(serialize_all = "snake_case")]
165pub enum HealthCheckSchema {
166    #[default]
167    Tcp,
168    Http,
169    Https,
170    Grpc,
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use pingora::upstreams::peer::Peer;
177    use pretty_assertions::assert_eq;
178    use std::time::Duration;
179    #[test]
180    fn test_health_check_conf() {
181        let tcp_check: HealthCheckConf =
182            "tcp://upstreamname?connection_timeout=3s&success=2&failure=1&check_frequency=10s"
183                .try_into()
184                .unwrap();
185        assert_eq!(
186            r###"HealthCheckConf { schema: Tcp, host: "upstreamname", path: "", connection_timeout: 3s, read_timeout: 3s, check_frequency: 10s, reuse_connection: false, consecutive_success: 2, consecutive_failure: 1, service: "", tls: false, parallel_check: false }"###,
187            format!("{tcp_check:?}")
188        );
189        let tcp_check = new_tcp_health_check("", &tcp_check, None);
190        assert_eq!(1, tcp_check.consecutive_failure);
191        assert_eq!(2, tcp_check.consecutive_success);
192        assert_eq!(
193            Duration::from_secs(3),
194            tcp_check.peer_template.connection_timeout().unwrap()
195        );
196    }
197    #[test]
198    fn test_new_health_check() {
199        let (conf, _) = new_health_check("upstreamname", "https://upstreamname/ping?connection_timeout=3s&read_timeout=1s&success=2&failure=1&check_frequency=10s&from=nginx&reuse", None).unwrap();
200        assert_eq!(Duration::from_secs(10), conf.check_frequency);
201    }
202
203    #[test]
204    fn test_new_internal_error() {
205        let err = new_internal_error(500, "test");
206        assert_eq!(
207            err.to_string().trim(),
208            "HTTPStatus context: test cause:  InternalError"
209        );
210    }
211}