Skip to main content

sozu_lib/
health_check.rs

1//! Non-blocking HTTP health checks for backends
2//!
3//! Health checks run within the single-threaded mio event loop using non-blocking TCP
4//! connections. Each check cycle is triggered by a timer. For each backend with a
5//! configured health check, we open a non-blocking TCP connection, send a minimal
6//! HTTP/1.1 GET request, parse the status line from the response, and update the
7//! backend's health state accordingly.
8
9use std::{
10    cell::RefCell,
11    collections::{HashMap, HashSet},
12    hash::{Hash, Hasher},
13    io::{Read, Write},
14    net::SocketAddr,
15    rc::Rc,
16    time::{Duration, Instant},
17};
18
19use mio::{Interest, Registry, Token, net::TcpStream};
20use sozu_command::{
21    proto::command::{Event, EventKind, HealthCheckConfig},
22    state::ClusterId,
23};
24
25use crate::metrics::names;
26use crate::{
27    backends::BackendMap,
28    protocol::mux::{
29        parser::{
30            FLAG_END_HEADERS, FLAG_PADDED, FLAG_PRIORITY, FRAME_HEADER_SIZE, FrameType,
31            frame_header,
32        },
33        serializer::H2_PRI,
34    },
35    server::push_event,
36};
37
38/// Canonical log envelope tag for the health-checker. Mirrors the
39/// hyphenated, all-caps `MUX-H2`/`PROXY-RELAY`/`TLS-RESOLVER` convention.
40/// The `log_context!()` macro is the single contact point — every
41/// `info!`/`warn!`/`error!`/`debug!`/`trace!` in this module prefixes its
42/// format string with `"{} ", log_context!()` so the regression guard at
43/// `lib/tests/log_layout.rs` recognises the tag.
44macro_rules! log_context {
45    () => {
46        "HEALTH-CHECK"
47    };
48    ($cluster:expr) => {
49        concat!("HEALTH-CHECK cluster=", $cluster)
50    };
51}
52
53/// Base of the dedicated mio token namespace used for health-check
54/// sockets. Tokens are allocated in the range
55/// `[HEALTH_CHECK_TOKEN_BASE, HEALTH_CHECK_TOKEN_BASE + HEALTH_CHECK_TOKEN_CAPACITY)`
56/// so they never collide with slab-allocated session tokens (capped well
57/// below `1 << 24` by `Server::sessions::max_connections`) nor with the
58/// mux GOAWAY sentinel `Token(usize::MAX)`.
59const HEALTH_CHECK_TOKEN_BASE: usize = 1 << 24;
60/// Maximum number of concurrent in-flight health-check sockets. The
61/// allocator wraps modulo this capacity and skips slots that are still
62/// in flight (see `HealthChecker::allocate_token`); exceeding it is a
63/// programmer error and emits an `error!` rather than silently colliding.
64const HEALTH_CHECK_TOKEN_CAPACITY: usize = 1 << 16;
65
66/// Each pending entry is `(cluster_id, config, h2c, backends_to_check)`.
67/// `h2c` mirrors `cluster.http2` (the same backend-capability hint the
68/// mux router uses) so the probe wire format matches what the proxy
69/// will actually use to reach those backends.
70type PendingChecks = Vec<(
71    ClusterId,
72    HealthCheckConfig,
73    bool,
74    Vec<(String, SocketAddr)>,
75)>;
76
77/// Tracks an in-flight health check connection
78#[derive(Debug)]
79struct InFlightCheck {
80    stream: TcpStream,
81    token: Token,
82    cluster_id: ClusterId,
83    backend_id: String,
84    address: SocketAddr,
85    started_at: Instant,
86    timeout: Duration,
87    request_bytes: Option<Vec<u8>>,
88    write_offset: usize,
89    response_buf: Vec<u8>,
90    config: HealthCheckConfig,
91    /// Captured at probe-creation time from `BackendMap.cluster_http2`.
92    /// The response parser needs to know whether to walk H2 frames or
93    /// parse an HTTP/1.1 status line; storing it on the in-flight
94    /// record avoids racing the cluster's `http2` flag if the
95    /// operator flips it mid-probe.
96    h2c: bool,
97}
98
99/// Manages health checks across all clusters and backends
100#[derive(Debug)]
101pub struct HealthChecker {
102    in_flight: Vec<InFlightCheck>,
103    last_check_time: HashMap<ClusterId, Instant>,
104    next_token_id: usize,
105    ready_tokens: HashSet<Token>,
106}
107
108impl Default for HealthChecker {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl HealthChecker {
115    pub fn new() -> Self {
116        HealthChecker {
117            in_flight: Vec::new(),
118            last_check_time: HashMap::new(),
119            next_token_id: 0,
120            ready_tokens: HashSet::new(),
121        }
122    }
123
124    /// Pick the next free slot offset modulo `HEALTH_CHECK_TOKEN_CAPACITY`,
125    /// skipping offsets that match an in-flight check. Returns `None` when
126    /// the entire capacity is occupied — caller must surface the error
127    /// rather than silently colliding with another in-flight stream's
128    /// readiness events.
129    fn allocate_token(&mut self) -> Option<Token> {
130        let in_flight: HashSet<usize> = self
131            .in_flight
132            .iter()
133            .map(|c| c.token.0 - HEALTH_CHECK_TOKEN_BASE)
134            .collect();
135
136        for _ in 0..HEALTH_CHECK_TOKEN_CAPACITY {
137            let offset = self.next_token_id % HEALTH_CHECK_TOKEN_CAPACITY;
138            self.next_token_id = self.next_token_id.wrapping_add(1);
139            if !in_flight.contains(&offset) {
140                return Some(Token(HEALTH_CHECK_TOKEN_BASE + offset));
141            }
142        }
143        error!(
144            "{} token-table full ({} in-flight checks); refusing to allocate a new probe slot",
145            log_context!(),
146            in_flight.len()
147        );
148        None
149    }
150
151    /// Returns true iff `token` falls in the bounded range reserved for
152    /// health-check sockets. Critically, the upper bound prevents this
153    /// helper from falsely claiming the mux GOAWAY sentinel
154    /// `Token(usize::MAX)` (CVE-class regression caught during the
155    /// post-1209 rebase cross-check).
156    pub fn owns_token(&self, token: Token) -> bool {
157        token.0 >= HEALTH_CHECK_TOKEN_BASE
158            && token.0 < HEALTH_CHECK_TOKEN_BASE + HEALTH_CHECK_TOKEN_CAPACITY
159    }
160
161    /// Called by the server event loop when mio reports readiness for a health check socket.
162    pub fn ready(&mut self, token: Token) {
163        self.ready_tokens.insert(token);
164    }
165
166    /// Called on each event loop iteration. Initiates new health checks when intervals
167    /// have elapsed, and progresses in-flight checks.
168    pub fn poll(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
169        if self.in_flight.is_empty() && backends.borrow().health_check_configs.is_empty() {
170            return;
171        }
172        self.initiate_checks(backends, registry);
173        self.progress_checks(backends, registry);
174    }
175
176    fn initiate_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
177        let backend_map = backends.borrow();
178        let now = Instant::now();
179
180        let mut to_check: PendingChecks = Vec::new();
181
182        for (cluster_id, config) in &backend_map.health_check_configs {
183            let interval = Duration::from_secs(u64::from(config.interval));
184
185            // Add jitter based on cluster_id hash to prevent synchronized health check storms
186            let mut hasher = std::collections::hash_map::DefaultHasher::new();
187            cluster_id.hash(&mut hasher);
188            let jitter_ms = hasher.finish() % (interval.as_millis() as u64 / 5).max(1);
189            let jittered_interval = interval + Duration::from_millis(jitter_ms);
190
191            let should_check = match self.last_check_time.get(cluster_id) {
192                Some(last) => now.duration_since(*last) >= jittered_interval,
193                None => true,
194            };
195
196            if !should_check {
197                continue;
198            }
199
200            if let Some(backend_list) = backend_map.backends.get(cluster_id) {
201                let backends_to_check: Vec<(String, SocketAddr)> = backend_list
202                    .backends
203                    .iter()
204                    .filter(|b| {
205                        let b = b.borrow();
206                        b.status == crate::backends::BackendStatus::Normal
207                            && !self.in_flight.iter().any(|f| {
208                                f.cluster_id == *cluster_id && f.backend_id == b.backend_id
209                            })
210                    })
211                    .map(|b| {
212                        let b = b.borrow();
213                        (b.backend_id.to_owned(), b.address)
214                    })
215                    .collect();
216
217                if !backends_to_check.is_empty() {
218                    let h2c = backend_map
219                        .cluster_http2
220                        .get(cluster_id)
221                        .copied()
222                        .unwrap_or(false);
223                    to_check.push((
224                        cluster_id.to_owned(),
225                        config.to_owned(),
226                        h2c,
227                        backends_to_check,
228                    ));
229                }
230            }
231        }
232
233        drop(backend_map);
234
235        for (cluster_id, config, h2c, backends_to_check) in to_check {
236            self.last_check_time.insert(cluster_id.to_owned(), now);
237
238            // The URI was validated at the worker `SetHealthCheck`
239            // boundary by `sozu_command::config::validate_health_check_config`
240            // (CR/LF/NUL/C0 rejected, leading `/` enforced). The probe
241            // runtime trusts that contract — no second silent strip
242            // here, no defense-in-depth divergence between what the
243            // operator typed and what hits the wire.
244            let probe_uri = config.uri.as_str();
245
246            for (backend_id, address) in backends_to_check {
247                match TcpStream::connect(address) {
248                    Ok(mut stream) => {
249                        let Some(token) = self.allocate_token() else {
250                            // Token table exhausted — record the probe as failed
251                            // so threshold logic can still fire, then move on.
252                            // The allocator already logged at error level.
253                            Self::record_check_result(
254                                backends,
255                                &cluster_id,
256                                &backend_id,
257                                address,
258                                false,
259                                &config,
260                            );
261                            continue;
262                        };
263                        if let Err(e) = registry.register(
264                            &mut stream,
265                            token,
266                            Interest::READABLE | Interest::WRITABLE,
267                        ) {
268                            debug!(
269                                "{} failed to register socket for {} ({}) in cluster {}: {}",
270                                log_context!(),
271                                backend_id,
272                                address,
273                                cluster_id,
274                                e
275                            );
276                            Self::record_check_result(
277                                backends,
278                                &cluster_id,
279                                &backend_id,
280                                address,
281                                false,
282                                &config,
283                            );
284                            continue;
285                        }
286                        trace!(
287                            "{} initiated connection to {} ({}) for cluster {}",
288                            log_context!(),
289                            backend_id,
290                            address,
291                            cluster_id
292                        );
293                        let request_bytes = if h2c {
294                            build_h2c_probe_bytes(probe_uri, address)
295                        } else {
296                            // RFC 9110 §7.2: `Host` MUST carry the
297                            // authority component, including the port
298                            // when it differs from the URI scheme's
299                            // default. SocketAddr's Display impl emits
300                            // `ip:port` (with brackets for IPv6), which
301                            // is unambiguous against any non-default
302                            // backend port. Backends that demand a
303                            // specific virtual-host name should expose
304                            // a non-vhost health endpoint (the same
305                            // pattern nginx/apache document) — adding
306                            // a per-cluster `host` field on
307                            // `HealthCheckConfig` is tracked as a
308                            // follow-up.
309                            format!(
310                                "GET {probe_uri} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
311                            )
312                            .into_bytes()
313                        };
314                        self.in_flight.push(InFlightCheck {
315                            stream,
316                            token,
317                            cluster_id: cluster_id.to_owned(),
318                            backend_id,
319                            address,
320                            started_at: now,
321                            timeout: Duration::from_secs(u64::from(config.timeout)),
322                            request_bytes: Some(request_bytes),
323                            write_offset: 0,
324                            response_buf: Vec::with_capacity(256),
325                            config: config.to_owned(),
326                            h2c,
327                        });
328                    }
329                    Err(e) => {
330                        debug!(
331                            "{} failed to connect to {} ({}) for cluster {}: {}",
332                            log_context!(),
333                            backend_id,
334                            address,
335                            cluster_id,
336                            e
337                        );
338                        Self::record_check_result(
339                            backends,
340                            &cluster_id,
341                            &backend_id,
342                            address,
343                            false,
344                            &config,
345                        );
346                    }
347                }
348            }
349        }
350    }
351
352    fn progress_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
353        const MAX_HEALTH_RESPONSE_SIZE: usize = 4096;
354
355        let now = Instant::now();
356        let mut completed = Vec::new();
357        let ready = std::mem::take(&mut self.ready_tokens);
358
359        for (idx, check) in self.in_flight.iter_mut().enumerate() {
360            // Always check timeouts regardless of readiness
361            if now.duration_since(check.started_at) > check.timeout {
362                debug!(
363                    "{} timeout for {} ({}) in cluster {}",
364                    log_context!(),
365                    check.backend_id,
366                    check.address,
367                    check.cluster_id
368                );
369                completed.push((idx, false));
370                continue;
371            }
372
373            // Skip I/O if the socket has not been reported ready by mio
374            if !ready.contains(&check.token) {
375                continue;
376            }
377
378            if let Some(ref request_bytes) = check.request_bytes {
379                match check.stream.write(&request_bytes[check.write_offset..]) {
380                    Ok(n) => {
381                        check.write_offset += n;
382                        if check.write_offset >= request_bytes.len() {
383                            check.request_bytes = None;
384                        } else {
385                            continue;
386                        }
387                    }
388                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
389                        continue;
390                    }
391                    Err(_e) => {
392                        completed.push((idx, false));
393                        continue;
394                    }
395                }
396            }
397
398            let mut buf = [0u8; 256];
399            match check.stream.read(&mut buf) {
400                Ok(0) => {
401                    let success =
402                        parse_probe_response(&check.response_buf, &check.config, check.h2c)
403                            .unwrap_or(false);
404                    completed.push((idx, success));
405                }
406                Ok(n) => {
407                    if check.response_buf.len() + n > MAX_HEALTH_RESPONSE_SIZE {
408                        completed.push((idx, false));
409                        continue;
410                    }
411                    check.response_buf.extend_from_slice(&buf[..n]);
412                    if let Some(success) =
413                        parse_probe_response(&check.response_buf, &check.config, check.h2c)
414                    {
415                        completed.push((idx, success));
416                    }
417                }
418                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
419                Err(_e) => {
420                    completed.push((idx, false));
421                }
422            }
423        }
424
425        // Sort indices in descending order so swap_remove doesn't invalidate
426        // later indices — it moves the last element to the removed position,
427        // which has already been processed or is beyond our range.
428        completed.sort_by(|a, b| b.0.cmp(&a.0));
429        for (idx, success) in completed {
430            let mut check = self.in_flight.swap_remove(idx);
431            let _ = registry.deregister(&mut check.stream);
432            Self::record_check_result(
433                backends,
434                &check.cluster_id,
435                &check.backend_id,
436                check.address,
437                success,
438                &check.config,
439            );
440        }
441    }
442
443    fn record_check_result(
444        backends: &Rc<RefCell<BackendMap>>,
445        cluster_id: &str,
446        backend_id: &str,
447        address: SocketAddr,
448        success: bool,
449        config: &HealthCheckConfig,
450    ) {
451        let mut backend_map = backends.borrow_mut();
452        let Some(backend_list) = backend_map.backends.get_mut(cluster_id) else {
453            return;
454        };
455
456        let Some(backend_ref) = backend_list.find_backend(&address) else {
457            return;
458        };
459
460        let mut backend = backend_ref.borrow_mut();
461
462        if success {
463            let transitioned = backend.health.record_success(config.healthy_threshold);
464            if transitioned {
465                info!(
466                    "{} backend {} at {} marked UP (health check passed {} consecutive times) for cluster {}",
467                    log_context!(),
468                    backend_id,
469                    address,
470                    config.healthy_threshold,
471                    cluster_id
472                );
473                incr!(names::health_check::UP);
474                gauge!(
475                    names::backend::AVAILABLE,
476                    1,
477                    Some(cluster_id),
478                    Some(backend_id)
479                );
480                push_event(Event {
481                    kind: EventKind::HealthCheckHealthy as i32,
482                    cluster_id: Some(cluster_id.to_owned()),
483                    backend_id: Some(backend_id.to_owned()),
484                    address: Some(address.into()),
485                    metric_detail: None,
486                });
487            }
488            count!(names::health_check::SUCCESS, 1);
489        } else {
490            let transitioned = backend.health.record_failure(config.unhealthy_threshold);
491            if transitioned {
492                warn!(
493                    "{} backend {} at {} marked DOWN (health check failed {} consecutive times) for cluster {}",
494                    log_context!(),
495                    backend_id,
496                    address,
497                    config.unhealthy_threshold,
498                    cluster_id
499                );
500                incr!(names::health_check::DOWN);
501                gauge!(
502                    names::backend::AVAILABLE,
503                    0,
504                    Some(cluster_id),
505                    Some(backend_id)
506                );
507                push_event(Event {
508                    kind: EventKind::HealthCheckUnhealthy as i32,
509                    cluster_id: Some(cluster_id.to_owned()),
510                    backend_id: Some(backend_id.to_owned()),
511                    address: Some(address.into()),
512                    metric_detail: None,
513                });
514            }
515            count!(names::health_check::FAILURE, 1);
516        }
517
518        // Emit the healthy-backend gauge on every result update for clusters
519        // with at least one configured backend, including `healthy == 0`. The
520        // gauge is the only documented signal that lets dashboards detect
521        // universal-outage / fail-open. Previously it was gated on
522        // `healthy > 0 && healthy * 2 <= total`, so when all backends went
523        // unhealthy the gauge retained its last non-zero value and operators
524        // could not see fail-open in dashboards.
525        //
526        // The "critically low" warning (≤50% healthy) keeps its original
527        // condition — only the gauge emission is unconditional.
528        drop(backend);
529        let total = backend_list.backends.len();
530        let healthy = backend_list
531            .backends
532            .iter()
533            .filter(|b| b.borrow().health.is_healthy())
534            .count();
535        if total > 0 {
536            gauge!(
537                "health_check.healthy_backends",
538                healthy,
539                Some(cluster_id),
540                None
541            );
542            if healthy > 0 && healthy * 2 <= total {
543                warn!(
544                    "{} cluster {} has only {}/{} healthy backends",
545                    log_context!(),
546                    cluster_id,
547                    healthy,
548                    total
549                );
550            }
551        }
552        // Re-evaluate per-cluster availability so an `Available -> AllDown`
553        // or `AllDown -> Available` transition driven purely by health-check
554        // results (no failed routing call to observe it) still emits the
555        // log + counter + Event. This is the only path that catches
556        // passive-only recoveries — a cluster whose backends came back via
557        // successful HC probes but whose retry policies haven't been
558        // touched by any session.
559        // `backend_list` is a `&BackendList` reborrowed from
560        // `backend_map.backends.get(cluster_id)` above. The `&backend_map`
561        // borrow it depends on lasts until the next use of `backend_map`,
562        // so we just stop using `backend_list` and call the helper, which
563        // re-takes its own `&self` borrow internally.
564        backend_map.record_cluster_availability(cluster_id);
565    }
566
567    pub fn remove_cluster(&mut self, cluster_id: &str) {
568        self.last_check_time.remove(cluster_id);
569        self.in_flight
570            .retain(|check| check.cluster_id != cluster_id);
571    }
572}
573
574/// Top-level dispatch: pick the HTTP/1.1 status-line parser or the
575/// HTTP/2 frame walker depending on whether the probe was sent as h2c.
576/// `h2c` is captured from `BackendMap.cluster_http2` on the
577/// `InFlightCheck` at probe-creation time.
578fn parse_probe_response(buf: &[u8], config: &HealthCheckConfig, h2c: bool) -> Option<bool> {
579    if h2c {
580        try_parse_h2c_status(buf, config)
581    } else {
582        try_parse_status_line(buf, config)
583    }
584}
585
586fn try_parse_status_line(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
587    let response = std::str::from_utf8(buf).ok()?;
588    let first_line_end = response.find("\r\n")?;
589    let status_line = &response[..first_line_end];
590
591    let (_, rest) = status_line.split_once(' ')?;
592    let status_str = rest.split(' ').next()?;
593    let status_code: u32 = status_str.parse().unwrap_or(0);
594    Some(is_status_healthy(status_code, config.expected_status))
595}
596
597fn is_status_healthy(actual: u32, expected: u32) -> bool {
598    if expected == 0 {
599        (200..300).contains(&actual)
600    } else {
601        actual == expected
602    }
603}
604
605/// Compose a bare-minimum h2c (HTTP/2 over cleartext, prior-knowledge)
606/// probe: the 24-byte connection preface, an empty client SETTINGS
607/// frame (acknowledging the spec-mandated handshake), and a single
608/// HEADERS frame on stream 1 carrying `GET <path>` with
609/// END_STREAM | END_HEADERS.
610///
611/// HPACK encoding is delegated to `loona_hpack::Encoder` — the same
612/// encoder the H2 mux uses for live traffic (`lib/src/protocol/mux/converter.rs`).
613/// The probe inherits whatever static/dynamic-table behaviour the
614/// encoder picks, including any future Huffman support. The connection
615/// preface comes from `serializer::H2_PRI` so the probe and the live
616/// mux stay in lockstep.
617fn build_h2c_probe_bytes(uri: &str, address: SocketAddr) -> Vec<u8> {
618    let authority = address.to_string();
619
620    // Build the HPACK header block first so we know its length for the
621    // HEADERS frame header. A fresh encoder per probe keeps things
622    // stateless — we never reuse the dynamic table across probes.
623    let mut encoder = loona_hpack::Encoder::new();
624    let mut hpack: Vec<u8> = Vec::new();
625    let headers: [(&[u8], &[u8]); 4] = [
626        (b":method", b"GET"),
627        (b":scheme", b"http"),
628        (b":path", uri.as_bytes()),
629        (b":authority", authority.as_bytes()),
630    ];
631    // Encoder::encode_into writes to an io::Write. Vec<u8> implements
632    // it infallibly, so the ? cannot fire in practice.
633    if encoder.encode_into(headers, &mut hpack).is_err() {
634        // Defensive: return an empty buffer so the probe records as
635        // failed via the read path instead of panicking.
636        return Vec::new();
637    }
638
639    // Pre-allocate: preface + SETTINGS (9) + HEADERS header (9) + block.
640    let mut out = Vec::with_capacity(H2_PRI.len() + FRAME_HEADER_SIZE * 2 + hpack.len());
641    out.extend_from_slice(H2_PRI.as_bytes());
642
643    // Empty client SETTINGS frame: length=0, type=Settings(0x04), flags=0, sid=0.
644    out.extend_from_slice(&[0, 0, 0, 0x04, 0, 0, 0, 0, 0]);
645
646    // HEADERS frame: length=hpack.len(), type=Headers(0x01),
647    // flags=END_STREAM|END_HEADERS=0x05, stream=1.
648    let len = hpack.len() as u32;
649    out.push(((len >> 16) & 0xFF) as u8);
650    out.push(((len >> 8) & 0xFF) as u8);
651    out.push((len & 0xFF) as u8);
652    out.push(0x01); // HEADERS
653    out.push(0x05); // END_STREAM | END_HEADERS
654    out.extend_from_slice(&[0, 0, 0, 1]); // stream id = 1
655    out.extend_from_slice(&hpack);
656    out
657}
658
659/// Walk the buffered H2 frames looking for a HEADERS frame (plus any
660/// CONTINUATION frames until END_HEADERS) on stream 1, then HPACK-decode
661/// the assembled block via `loona_hpack::Decoder` and pull `:status`.
662///
663/// Returns:
664///
665/// * `Some(true)` — `:status` decoded and matches
666///   `config.expected_status` (or any 2xx when `expected_status == 0`).
667/// * `Some(false)` — `:status` decoded but does not match, the HPACK
668///   block was malformed, or a GOAWAY frame arrived.
669/// * `None` — buffer truncated mid-frame; caller should keep reading.
670///
671/// Frames with PADDED / PRIORITY flags are normalised correctly via the
672/// `mux::parser` flag constants (`FLAG_PADDED` / `FLAG_PRIORITY`). Unknown
673/// HPACK encodings, Huffman-coded values, and CONTINUATION fragmentation
674/// all fall through to the decoder rather than the hand-rolled byte walk
675/// the previous implementation used.
676fn try_parse_h2c_status(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
677    // RFC 9113 §4.2: the absolute upper bound for SETTINGS_MAX_FRAME_SIZE
678    // is 2^24 - 1. The probe never advertises a custom limit, so accept
679    // anything within the spec ceiling.
680    const MAX_FRAME_SIZE: u32 = (1 << 24) - 1;
681
682    let mut remaining: &[u8] = buf;
683    // Buffer accumulating HEADERS + CONTINUATION block fragments for
684    // stream 1 until END_HEADERS lands. RFC 9113 §6.10: until the
685    // continuation chain ends, no other frames may arrive on the
686    // connection, but we still tolerate interleaved control frames
687    // for robustness — the decoder only fires on END_HEADERS.
688    let mut headers_block: Option<Vec<u8>> = None;
689
690    while !remaining.is_empty() {
691        // The `mux::parser::frame_header` is built from `nom::number::complete`
692        // primitives which emit `Err::Error` (not `Err::Incomplete`) on short
693        // input, so distinguish "header still arriving" from "header arrived
694        // but is malformed" by an explicit length check before parsing.
695        if remaining.len() < FRAME_HEADER_SIZE {
696            return None;
697        }
698        let (rest, header) = match frame_header(remaining, MAX_FRAME_SIZE) {
699            Ok(parsed) => parsed,
700            // Header bytes are present but parser rejected them: malformed
701            // framing (oversized payload, invalid stream-id parity, etc.).
702            // → probe is unhealthy.
703            Err(_) => return Some(false),
704        };
705
706        let payload_len = header.payload_len as usize;
707        if rest.len() < payload_len {
708            // Body of the frame has not arrived yet.
709            return None;
710        }
711        let (payload, after) = rest.split_at(payload_len);
712
713        match header.frame_type {
714            FrameType::Headers if header.stream_id == 1 => {
715                let block = strip_padded_priority(payload, header.flags)?;
716                let mut accumulator = headers_block.take().unwrap_or_default();
717                accumulator.extend_from_slice(block);
718                if header.flags & FLAG_END_HEADERS != 0 {
719                    return Some(decode_status_from_block(&accumulator, config));
720                }
721                headers_block = Some(accumulator);
722            }
723            FrameType::Continuation if header.stream_id == 1 => {
724                // CONTINUATION carries no padding/priority flags; the
725                // payload is a raw block fragment.
726                let Some(mut accumulator) = headers_block.take() else {
727                    // CONTINUATION without a preceding HEADERS is a
728                    // protocol error per RFC 9113 §6.10.
729                    return Some(false);
730                };
731                accumulator.extend_from_slice(payload);
732                if header.flags & FLAG_END_HEADERS != 0 {
733                    return Some(decode_status_from_block(&accumulator, config));
734                }
735                headers_block = Some(accumulator);
736            }
737            FrameType::GoAway => return Some(false),
738            // SETTINGS, SETTINGS-ACK, DATA, PING, etc. — keep walking
739            // until we find HEADERS on stream 1.
740            _ => {}
741        }
742
743        remaining = after;
744    }
745    None
746}
747
748/// Trim the optional 1-byte pad-length prefix and the 5-byte priority
749/// dependency (RFC 9113 §6.2). Returns `None` when the flags claim
750/// padding/priority but the payload is too short to satisfy them — the
751/// caller turns that into `Some(false)` (probe unhealthy).
752fn strip_padded_priority(payload: &[u8], flags: u8) -> Option<&[u8]> {
753    let mut start = 0usize;
754    let mut end = payload.len();
755
756    if flags & FLAG_PADDED != 0 {
757        let &pad_len = payload.first()?;
758        start = 1;
759        let pad = pad_len as usize;
760        // Trailing padding bytes must fit within the remaining payload
761        // (after dropping the 1-byte pad-length prefix) — otherwise the
762        // frame is malformed.
763        let available = end.checked_sub(start)?;
764        if pad > available {
765            return None;
766        }
767        end -= pad;
768    }
769    if flags & FLAG_PRIORITY != 0 {
770        let new_start = start.checked_add(5)?;
771        if new_start > end {
772            return None;
773        }
774        start = new_start;
775    }
776    payload.get(start..end)
777}
778
779/// Run `loona_hpack::Decoder` over the assembled HEADERS block and
780/// return whether `:status` matches `config.expected_status`. Unknown
781/// HPACK encodings, malformed integers, Huffman fallbacks, and
782/// `:status` values that fail UTF-8 / numeric parsing all collapse to
783/// `false` — the probe is recorded as unhealthy, never as a panic.
784fn decode_status_from_block(block: &[u8], config: &HealthCheckConfig) -> bool {
785    let mut decoder = loona_hpack::Decoder::new();
786    let mut status: Option<u32> = None;
787    let decode_result = decoder.decode_with_cb(block, |name, value| {
788        if status.is_some() {
789            return;
790        }
791        if name.as_ref() == b":status"
792            && let Ok(s) = std::str::from_utf8(value.as_ref())
793            && let Ok(parsed) = s.parse::<u32>()
794        {
795            status = Some(parsed);
796        }
797    });
798    if decode_result.is_err() {
799        return false;
800    }
801    match status {
802        Some(code) => is_status_healthy(code, config.expected_status),
803        None => false,
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use crate::backends::HealthState;
811
812    #[test]
813    fn test_is_status_healthy_any_2xx() {
814        assert!(is_status_healthy(200, 0));
815        assert!(is_status_healthy(204, 0));
816        assert!(is_status_healthy(299, 0));
817        assert!(!is_status_healthy(301, 0));
818        assert!(!is_status_healthy(500, 0));
819        assert!(!is_status_healthy(0, 0));
820    }
821
822    #[test]
823    fn test_is_status_healthy_specific() {
824        assert!(is_status_healthy(200, 200));
825        assert!(!is_status_healthy(204, 200));
826        assert!(!is_status_healthy(500, 200));
827    }
828
829    #[test]
830    fn test_try_parse_status_line() {
831        let config = HealthCheckConfig {
832            uri: "/health".to_owned(),
833            interval: 10,
834            timeout: 5,
835            healthy_threshold: 3,
836            unhealthy_threshold: 3,
837            expected_status: 0,
838        };
839
840        let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
841        assert_eq!(try_parse_status_line(buf, &config), Some(true));
842
843        let buf = b"HTTP/1.1 500 Internal Server Error\r\n\r\n";
844        assert_eq!(try_parse_status_line(buf, &config), Some(false));
845
846        let buf = b"HTTP/1.1 200";
847        assert_eq!(try_parse_status_line(buf, &config), None);
848    }
849
850    #[test]
851    fn test_health_state_transitions() {
852        let mut state = HealthState::default();
853        assert!(state.is_healthy());
854
855        assert!(!state.record_failure(3));
856        assert!(!state.record_failure(3));
857        assert!(state.is_healthy());
858
859        assert!(state.record_failure(3));
860        assert!(!state.is_healthy());
861
862        assert!(!state.record_success(3));
863        assert!(!state.record_success(3));
864        assert!(!state.is_healthy());
865
866        assert!(state.record_success(3));
867        assert!(state.is_healthy());
868    }
869
870    fn h2c_config(expected: u32) -> HealthCheckConfig {
871        HealthCheckConfig {
872            uri: "/health".to_owned(),
873            interval: 10,
874            timeout: 5,
875            healthy_threshold: 3,
876            unhealthy_threshold: 3,
877            expected_status: expected,
878        }
879    }
880
881    /// Wrap an HPACK-encoded header block in an H2 frame header with the
882    /// given type, flags and stream id. Frame body bytes live in
883    /// `payload`.
884    fn frame_with_header(frame_type: u8, flags: u8, sid: u32, payload: &[u8]) -> Vec<u8> {
885        let payload_len = payload.len();
886        let mut out = Vec::with_capacity(FRAME_HEADER_SIZE + payload_len);
887        out.push(((payload_len >> 16) & 0xFF) as u8);
888        out.push(((payload_len >> 8) & 0xFF) as u8);
889        out.push((payload_len & 0xFF) as u8);
890        out.push(frame_type);
891        out.push(flags);
892        out.extend_from_slice(&sid.to_be_bytes());
893        out.extend_from_slice(payload);
894        out
895    }
896
897    /// Encode `:status <code>` (plus optional extra response headers) into
898    /// a fresh HPACK block via the same encoder the live mux uses. This
899    /// keeps the tests aligned with whatever loona_hpack picks for static
900    /// vs. literal vs. (future) Huffman encoding instead of pinning bytes.
901    fn encode_response_headers(headers: &[(&[u8], &[u8])]) -> Vec<u8> {
902        let mut encoder = loona_hpack::Encoder::new();
903        let mut out = Vec::new();
904        encoder
905            .encode_into(headers.iter().copied(), &mut out)
906            .unwrap();
907        out
908    }
909
910    #[test]
911    fn build_h2c_probe_starts_with_preface_and_frames() {
912        let bytes = build_h2c_probe_bytes("/health", "127.0.0.1:8080".parse().unwrap());
913
914        // Connection preface (24 bytes).
915        assert!(bytes.starts_with(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"));
916
917        // SETTINGS frame after the preface: empty payload, type=0x04, flags=0, sid=0.
918        let settings_start = 24;
919        assert_eq!(&bytes[settings_start..settings_start + 3], &[0u8, 0, 0]); // length = 0
920        assert_eq!(bytes[settings_start + 3], 0x04); // SETTINGS
921        assert_eq!(bytes[settings_start + 4], 0); // flags
922        assert_eq!(
923            &bytes[settings_start + 5..settings_start + 9],
924            &[0u8, 0, 0, 0]
925        );
926
927        // HEADERS frame on stream 1, END_STREAM | END_HEADERS = 0x05.
928        let headers_start = settings_start + 9;
929        assert_eq!(bytes[headers_start + 3], 0x01); // HEADERS
930        assert_eq!(bytes[headers_start + 4], 0x05);
931        assert_eq!(
932            &bytes[headers_start + 5..headers_start + 9],
933            &[0u8, 0, 0, 1]
934        );
935
936        // The HEADERS payload must HPACK-decode back to the four pseudo-headers.
937        let payload_start = headers_start + 9;
938        let mut decoder = loona_hpack::Decoder::new();
939        let mut method = None;
940        let mut scheme = None;
941        let mut path = None;
942        let mut authority = None;
943        decoder
944            .decode_with_cb(&bytes[payload_start..], |name, value| match name.as_ref() {
945                b":method" => method = Some(value.to_vec()),
946                b":scheme" => scheme = Some(value.to_vec()),
947                b":path" => path = Some(value.to_vec()),
948                b":authority" => authority = Some(value.to_vec()),
949                _ => {}
950            })
951            .expect("loona_hpack decodes a freshly-encoded probe");
952        assert_eq!(method.as_deref(), Some(b"GET" as &[u8]));
953        assert_eq!(scheme.as_deref(), Some(b"http" as &[u8]));
954        assert_eq!(path.as_deref(), Some(b"/health" as &[u8]));
955        assert_eq!(authority.as_deref(), Some(b"127.0.0.1:8080" as &[u8]));
956    }
957
958    #[test]
959    fn h2c_response_with_status_200_decodes_healthy() {
960        let block = encode_response_headers(&[(b":status", b"200")]);
961        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
962        let cfg = h2c_config(0);
963        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
964    }
965
966    #[test]
967    fn h2c_response_with_status_500_fails_default_2xx_check() {
968        let block = encode_response_headers(&[(b":status", b"500")]);
969        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
970        let cfg = h2c_config(0);
971        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
972    }
973
974    #[test]
975    fn h2c_response_with_status_503_matches_expected_503() {
976        let block =
977            encode_response_headers(&[(b":status", b"503"), (b"content-type", b"text/plain")]);
978        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
979        let cfg = h2c_config(503);
980        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
981    }
982
983    #[test]
984    fn h2c_response_with_continuation_decodes_status_200_healthy() {
985        // Build one HPACK block, then split it across HEADERS (no
986        // END_HEADERS) + CONTINUATION (END_HEADERS). The hand-rolled
987        // walker that this commit replaces would have refused to assemble
988        // the block and reported the probe as unhealthy.
989        let block = encode_response_headers(&[
990            (b":status", b"200"),
991            (b"x-trace-id", b"abc-123"),
992            (b"server", b"sozu-test"),
993        ]);
994        assert!(block.len() >= 4, "HPACK block needs to be splittable");
995        let split = block.len() / 2;
996        let (head, tail) = block.split_at(split);
997
998        // HEADERS with no END_HEADERS (flags = 0) — block continues.
999        let mut buf = frame_with_header(0x01, 0, 1, head);
1000        // CONTINUATION (frame type 0x09) carrying the rest, END_HEADERS set.
1001        buf.extend_from_slice(&frame_with_header(0x09, FLAG_END_HEADERS, 1, tail));
1002
1003        let cfg = h2c_config(0);
1004        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1005    }
1006
1007    #[test]
1008    fn h2c_response_with_padded_priority_headers_decodes_status_200() {
1009        // Build a HEADERS frame with both PADDED and PRIORITY flags so the
1010        // parser must strip 1 + 5 = 6 bytes before handing the block to
1011        // loona_hpack.
1012        let block = encode_response_headers(&[(b":status", b"200")]);
1013        let pad_len: u8 = 3;
1014
1015        let mut payload = Vec::new();
1016        payload.push(pad_len); // PADDED: 1-byte pad length
1017        payload.extend_from_slice(&[0u8, 0, 0, 0, 16]); // PRIORITY: stream-dep + weight
1018        payload.extend_from_slice(&block);
1019        payload.extend_from_slice(&[0u8; 3]); // padding bytes
1020
1021        let flags = FLAG_PADDED | FLAG_PRIORITY | FLAG_END_HEADERS;
1022        let buf = frame_with_header(0x01, flags, 1, &payload);
1023        let cfg = h2c_config(0);
1024        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1025    }
1026
1027    #[test]
1028    fn h2c_response_after_unrelated_settings_frame_decodes_healthy() {
1029        // The probe parser must skip over the server's SETTINGS frame
1030        // and any other connection-scoped frames before locating the
1031        // HEADERS on stream 1.
1032        let mut buf = frame_with_header(0x04, 0, 0, &[]); // SETTINGS, empty
1033        buf.extend_from_slice(&frame_with_header(0x04, 0x01, 0, &[])); // SETTINGS-ACK
1034        let block = encode_response_headers(&[(b":status", b"200")]);
1035        buf.extend_from_slice(&frame_with_header(0x01, FLAG_END_HEADERS, 1, &block));
1036
1037        let cfg = h2c_config(0);
1038        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1039    }
1040
1041    #[test]
1042    fn h2c_goaway_returns_unhealthy() {
1043        // GOAWAY: 8-byte payload (last-stream-id + error code).
1044        let buf = frame_with_header(0x07, 0, 0, &[0u8; 8]);
1045        let cfg = h2c_config(0);
1046        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
1047    }
1048
1049    #[test]
1050    fn h2c_truncated_frame_returns_none() {
1051        // Frame header claims a 10-byte payload but only 5 are present.
1052        let mut buf: Vec<u8> = vec![
1053            0,                // length high
1054            0,                // length mid
1055            10,               // length low = 10
1056            0x01,             // HEADERS
1057            FLAG_END_HEADERS, // flags
1058        ];
1059        buf.extend_from_slice(&1u32.to_be_bytes()); // stream id = 1
1060        buf.extend_from_slice(&[0u8; 5]); // partial payload
1061        let cfg = h2c_config(0);
1062        assert_eq!(try_parse_h2c_status(&buf, &cfg), None);
1063    }
1064
1065    #[test]
1066    fn h2c_partial_frame_header_returns_none() {
1067        // Fewer than 9 bytes: the frame header has not even fully arrived
1068        // yet. The probe loop should report `None` so the caller keeps
1069        // reading rather than recording the backend as unhealthy.
1070        let cfg = h2c_config(0);
1071        for partial_len in 0usize..FRAME_HEADER_SIZE {
1072            let buf = vec![0u8; partial_len];
1073            assert_eq!(
1074                try_parse_h2c_status(&buf, &cfg),
1075                None,
1076                "partial buffer of {partial_len} byte(s) should be 'keep reading'"
1077            );
1078        }
1079    }
1080
1081    #[test]
1082    fn h2c_continuation_without_preceding_headers_returns_unhealthy() {
1083        // CONTINUATION without a HEADERS predecessor is a protocol
1084        // error per RFC 9113 §6.10.
1085        let block = encode_response_headers(&[(b":status", b"200")]);
1086        let buf = frame_with_header(0x09, FLAG_END_HEADERS, 1, &block);
1087        let cfg = h2c_config(0);
1088        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
1089    }
1090}