sozu-lib 2.1.0

sozu library to build hot reconfigurable HTTP reverse proxies
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
//! Non-blocking HTTP health checks for backends
//!
//! Health checks run within the single-threaded mio event loop using non-blocking TCP
//! connections. Each check cycle is triggered by a timer. For each backend with a
//! configured health check, we open a non-blocking TCP connection, send a minimal
//! HTTP/1.1 GET request, parse the status line from the response, and update the
//! backend's health state accordingly.

use std::{
    cell::RefCell,
    collections::{HashMap, HashSet},
    hash::{Hash, Hasher},
    io::{Read, Write},
    net::SocketAddr,
    rc::Rc,
    time::{Duration, Instant},
};

use mio::{Interest, Registry, Token, net::TcpStream};
use sozu_command::{
    proto::command::{Event, EventKind, HealthCheckConfig},
    state::ClusterId,
};

use crate::metrics::names;
use crate::{
    backends::BackendMap,
    protocol::mux::{
        parser::{
            FLAG_END_HEADERS, FLAG_PADDED, FLAG_PRIORITY, FRAME_HEADER_SIZE, FrameType,
            frame_header,
        },
        serializer::H2_PRI,
    },
    server::push_event,
};

/// Canonical log envelope tag for the health-checker. Mirrors the
/// hyphenated, all-caps `MUX-H2`/`PROXY-RELAY`/`TLS-RESOLVER` convention.
/// The `log_context!()` macro is the single contact point — every
/// `info!`/`warn!`/`error!`/`debug!`/`trace!` in this module prefixes its
/// format string with `"{} ", log_context!()` so the regression guard at
/// `lib/tests/log_layout.rs` recognises the tag.
macro_rules! log_context {
    () => {
        "HEALTH-CHECK"
    };
    ($cluster:expr) => {
        concat!("HEALTH-CHECK cluster=", $cluster)
    };
}

/// Base of the dedicated mio token namespace used for health-check
/// sockets. Tokens are allocated in the range
/// `[HEALTH_CHECK_TOKEN_BASE, HEALTH_CHECK_TOKEN_BASE + HEALTH_CHECK_TOKEN_CAPACITY)`
/// so they never collide with slab-allocated session tokens (capped well
/// below `1 << 24` by `Server::sessions::max_connections`) nor with the
/// mux GOAWAY sentinel `Token(usize::MAX)`.
const HEALTH_CHECK_TOKEN_BASE: usize = 1 << 24;
/// Maximum number of concurrent in-flight health-check sockets. The
/// allocator wraps modulo this capacity and skips slots that are still
/// in flight (see `HealthChecker::allocate_token`); exceeding it is a
/// programmer error and emits an `error!` rather than silently colliding.
const HEALTH_CHECK_TOKEN_CAPACITY: usize = 1 << 16;

/// Each pending entry is `(cluster_id, config, h2c, backends_to_check)`.
/// `h2c` mirrors `cluster.http2` (the same backend-capability hint the
/// mux router uses) so the probe wire format matches what the proxy
/// will actually use to reach those backends.
type PendingChecks = Vec<(
    ClusterId,
    HealthCheckConfig,
    bool,
    Vec<(String, SocketAddr)>,
)>;

/// Tracks an in-flight health check connection
#[derive(Debug)]
struct InFlightCheck {
    stream: TcpStream,
    token: Token,
    cluster_id: ClusterId,
    backend_id: String,
    address: SocketAddr,
    started_at: Instant,
    timeout: Duration,
    request_bytes: Option<Vec<u8>>,
    write_offset: usize,
    response_buf: Vec<u8>,
    config: HealthCheckConfig,
    /// Captured at probe-creation time from `BackendMap.cluster_http2`.
    /// The response parser needs to know whether to walk H2 frames or
    /// parse an HTTP/1.1 status line; storing it on the in-flight
    /// record avoids racing the cluster's `http2` flag if the
    /// operator flips it mid-probe.
    h2c: bool,
}

/// Manages health checks across all clusters and backends
#[derive(Debug)]
pub struct HealthChecker {
    in_flight: Vec<InFlightCheck>,
    last_check_time: HashMap<ClusterId, Instant>,
    next_token_id: usize,
    ready_tokens: HashSet<Token>,
}

impl Default for HealthChecker {
    fn default() -> Self {
        Self::new()
    }
}

impl HealthChecker {
    pub fn new() -> Self {
        HealthChecker {
            in_flight: Vec::new(),
            last_check_time: HashMap::new(),
            next_token_id: 0,
            ready_tokens: HashSet::new(),
        }
    }

    /// Pick the next free slot offset modulo `HEALTH_CHECK_TOKEN_CAPACITY`,
    /// skipping offsets that match an in-flight check. Returns `None` when
    /// the entire capacity is occupied — caller must surface the error
    /// rather than silently colliding with another in-flight stream's
    /// readiness events.
    fn allocate_token(&mut self) -> Option<Token> {
        let in_flight: HashSet<usize> = self
            .in_flight
            .iter()
            .map(|c| c.token.0 - HEALTH_CHECK_TOKEN_BASE)
            .collect();
        // Invariant: every in-flight token sits inside the reserved namespace,
        // so subtracting the base above never underflows and every offset is a
        // valid slot index. Mirrors the bound enforced by `owns_token`.
        debug_assert!(
            in_flight.iter().all(|&o| o < HEALTH_CHECK_TOKEN_CAPACITY),
            "every in-flight token offset must fall within the slot capacity"
        );
        debug_assert!(
            in_flight.len() <= HEALTH_CHECK_TOKEN_CAPACITY,
            "cannot have more in-flight checks than the token slot capacity"
        );

        for _ in 0..HEALTH_CHECK_TOKEN_CAPACITY {
            let offset = self.next_token_id % HEALTH_CHECK_TOKEN_CAPACITY;
            self.next_token_id = self.next_token_id.wrapping_add(1);
            if !in_flight.contains(&offset) {
                let token = Token(HEALTH_CHECK_TOKEN_BASE + offset);
                // Post-condition: a freshly allocated token is owned by this
                // namespace and does not collide with any in-flight stream.
                debug_assert!(
                    self.owns_token(token),
                    "allocated token must fall inside the health-check namespace"
                );
                debug_assert!(
                    !in_flight.contains(&offset),
                    "allocated offset must not already be in flight"
                );
                return Some(token);
            }
        }
        // Exhaustion: the slot table is full. This is graceful (returns None),
        // never a panic — the caller records the probe as failed. The table is
        // only full when every slot is occupied.
        debug_assert_eq!(
            in_flight.len(),
            HEALTH_CHECK_TOKEN_CAPACITY,
            "allocation only fails when every slot is occupied"
        );
        error!(
            "{} token-table full ({} in-flight checks); refusing to allocate a new probe slot",
            log_context!(),
            in_flight.len()
        );
        None
    }

    /// Returns true iff `token` falls in the bounded range reserved for
    /// health-check sockets. Critically, the upper bound prevents this
    /// helper from falsely claiming the mux GOAWAY sentinel
    /// `Token(usize::MAX)` (CVE-class regression caught during the
    /// post-1209 rebase cross-check).
    pub fn owns_token(&self, token: Token) -> bool {
        let owned = token.0 >= HEALTH_CHECK_TOKEN_BASE
            && token.0 < HEALTH_CHECK_TOKEN_BASE + HEALTH_CHECK_TOKEN_CAPACITY;
        // The upper bound is load-bearing: it must reject the mux GOAWAY
        // sentinel `Token(usize::MAX)`, which would otherwise be misclaimed as
        // a health-check socket (CVE-class regression). Assert the relationship
        // the comparison encodes rather than restating it.
        debug_assert!(
            !owned || token.0 - HEALTH_CHECK_TOKEN_BASE < HEALTH_CHECK_TOKEN_CAPACITY,
            "an owned token must map to a valid bounded slot offset"
        );
        debug_assert!(
            owned || token != Token(HEALTH_CHECK_TOKEN_BASE),
            "the base token itself must always be classified as owned"
        );
        owned
    }

    /// Called by the server event loop when mio reports readiness for a health check socket.
    pub fn ready(&mut self, token: Token) {
        self.ready_tokens.insert(token);
        // Post-condition: the readiness set now records this token. The set is
        // drained wholesale in `progress_checks`, so a missed insert here would
        // strand the socket until timeout.
        debug_assert!(
            self.ready_tokens.contains(&token),
            "ready() must record the token in the readiness set"
        );
    }

    /// Called on each event loop iteration. Initiates new health checks when intervals
    /// have elapsed, and progresses in-flight checks.
    pub fn poll(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
        if self.in_flight.is_empty() && backends.borrow().health_check_configs.is_empty() {
            return;
        }
        self.initiate_checks(backends, registry);
        self.progress_checks(backends, registry);
    }

    fn initiate_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
        let backend_map = backends.borrow();
        let now = Instant::now();

        let mut to_check: PendingChecks = Vec::new();

        for (cluster_id, config) in &backend_map.health_check_configs {
            let interval = Duration::from_secs(u64::from(config.interval));

            // Add jitter based on cluster_id hash to prevent synchronized health check storms
            let mut hasher = std::collections::hash_map::DefaultHasher::new();
            cluster_id.hash(&mut hasher);
            let jitter_ms = hasher.finish() % (interval.as_millis() as u64 / 5).max(1);
            let jittered_interval = interval + Duration::from_millis(jitter_ms);

            let should_check = match self.last_check_time.get(cluster_id) {
                Some(last) => now.duration_since(*last) >= jittered_interval,
                None => true,
            };

            if !should_check {
                continue;
            }

            if let Some(backend_list) = backend_map.backends.get(cluster_id) {
                let backends_to_check: Vec<(String, SocketAddr)> = backend_list
                    .backends
                    .iter()
                    .filter(|b| {
                        let b = b.borrow();
                        b.status == crate::backends::BackendStatus::Normal
                            && !self.in_flight.iter().any(|f| {
                                f.cluster_id == *cluster_id && f.backend_id == b.backend_id
                            })
                    })
                    .map(|b| {
                        let b = b.borrow();
                        (b.backend_id.to_owned(), b.address)
                    })
                    .collect();

                if !backends_to_check.is_empty() {
                    let h2c = backend_map
                        .cluster_http2
                        .get(cluster_id)
                        .copied()
                        .unwrap_or(false);
                    to_check.push((
                        cluster_id.to_owned(),
                        config.to_owned(),
                        h2c,
                        backends_to_check,
                    ));
                }
            }
        }

        drop(backend_map);

        for (cluster_id, config, h2c, backends_to_check) in to_check {
            self.last_check_time.insert(cluster_id.to_owned(), now);

            // The URI was validated at the worker `SetHealthCheck`
            // boundary by `sozu_command::config::validate_health_check_config`
            // (CR/LF/NUL/C0 rejected, leading `/` enforced). The probe
            // runtime trusts that contract — no second silent strip
            // here, no defense-in-depth divergence between what the
            // operator typed and what hits the wire.
            let probe_uri = config.uri.as_str();

            for (backend_id, address) in backends_to_check {
                match TcpStream::connect(address) {
                    Ok(mut stream) => {
                        let Some(token) = self.allocate_token() else {
                            // Token table exhausted — record the probe as failed
                            // so threshold logic can still fire, then move on.
                            // The allocator already logged at error level.
                            Self::record_check_result(
                                backends,
                                &cluster_id,
                                &backend_id,
                                address,
                                false,
                                &config,
                            );
                            continue;
                        };
                        if let Err(e) = registry.register(
                            &mut stream,
                            token,
                            Interest::READABLE | Interest::WRITABLE,
                        ) {
                            debug!(
                                "{} failed to register socket for {} ({}) in cluster {}: {}",
                                log_context!(),
                                backend_id,
                                address,
                                cluster_id,
                                e
                            );
                            Self::record_check_result(
                                backends,
                                &cluster_id,
                                &backend_id,
                                address,
                                false,
                                &config,
                            );
                            continue;
                        }
                        trace!(
                            "{} initiated connection to {} ({}) for cluster {}",
                            log_context!(),
                            backend_id,
                            address,
                            cluster_id
                        );
                        let request_bytes = if h2c {
                            build_h2c_probe_bytes(probe_uri, address)
                        } else {
                            // RFC 9110 §7.2: `Host` MUST carry the
                            // authority component, including the port
                            // when it differs from the URI scheme's
                            // default. SocketAddr's Display impl emits
                            // `ip:port` (with brackets for IPv6), which
                            // is unambiguous against any non-default
                            // backend port. Backends that demand a
                            // specific virtual-host name should expose
                            // a non-vhost health endpoint (the same
                            // pattern nginx/apache document) — adding
                            // a per-cluster `host` field on
                            // `HealthCheckConfig` is tracked as a
                            // follow-up.
                            format!(
                                "GET {probe_uri} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
                            )
                            .into_bytes()
                        };
                        self.in_flight.push(InFlightCheck {
                            stream,
                            token,
                            cluster_id: cluster_id.to_owned(),
                            backend_id,
                            address,
                            started_at: now,
                            timeout: Duration::from_secs(u64::from(config.timeout)),
                            request_bytes: Some(request_bytes),
                            write_offset: 0,
                            response_buf: Vec::with_capacity(256),
                            config: config.to_owned(),
                            h2c,
                        });
                    }
                    Err(e) => {
                        debug!(
                            "{} failed to connect to {} ({}) for cluster {}: {}",
                            log_context!(),
                            backend_id,
                            address,
                            cluster_id,
                            e
                        );
                        Self::record_check_result(
                            backends,
                            &cluster_id,
                            &backend_id,
                            address,
                            false,
                            &config,
                        );
                    }
                }
            }
        }
    }

    fn progress_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
        const MAX_HEALTH_RESPONSE_SIZE: usize = 4096;

        let now = Instant::now();
        let mut completed = Vec::new();
        let ready = std::mem::take(&mut self.ready_tokens);
        // Post-condition of the wholesale take: the live readiness set is now
        // empty, so a token re-armed during this pass is handled next loop.
        debug_assert!(
            self.ready_tokens.is_empty(),
            "readiness set must be drained before processing in-flight checks"
        );
        let in_flight_before = self.in_flight.len();

        for (idx, check) in self.in_flight.iter_mut().enumerate() {
            // Index must address a live in-flight slot — the basis for the
            // descending-sort swap_remove below.
            debug_assert!(
                idx < in_flight_before,
                "in-flight index ({idx}) must be within the live slot range ({in_flight_before})"
            );
            // The write cursor never runs past the request it is draining.
            debug_assert!(
                check
                    .request_bytes
                    .as_ref()
                    .is_none_or(|r| check.write_offset <= r.len()),
                "write_offset must never exceed the request length"
            );

            // Always check timeouts regardless of readiness
            if now.duration_since(check.started_at) > check.timeout {
                debug!(
                    "{} timeout for {} ({}) in cluster {}",
                    log_context!(),
                    check.backend_id,
                    check.address,
                    check.cluster_id
                );
                completed.push((idx, false));
                continue;
            }

            // Skip I/O if the socket has not been reported ready by mio
            if !ready.contains(&check.token) {
                continue;
            }

            if let Some(ref request_bytes) = check.request_bytes {
                match check.stream.write(&request_bytes[check.write_offset..]) {
                    Ok(n) => {
                        check.write_offset += n;
                        if check.write_offset >= request_bytes.len() {
                            check.request_bytes = None;
                        } else {
                            continue;
                        }
                    }
                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                        continue;
                    }
                    Err(_e) => {
                        completed.push((idx, false));
                        continue;
                    }
                }
            }

            let mut buf = [0u8; 256];
            match check.stream.read(&mut buf) {
                Ok(0) => {
                    let success =
                        parse_probe_response(&check.response_buf, &check.config, check.h2c)
                            .unwrap_or(false);
                    completed.push((idx, success));
                }
                Ok(n) => {
                    // A successful read never reports more bytes than the
                    // fixed-size stack buffer can hold.
                    debug_assert!(
                        n <= buf.len(),
                        "read reported {n} bytes into a {}-byte buffer",
                        buf.len()
                    );
                    if check.response_buf.len() + n > MAX_HEALTH_RESPONSE_SIZE {
                        completed.push((idx, false));
                        continue;
                    }
                    check.response_buf.extend_from_slice(&buf[..n]);
                    // The accumulator stays within the documented ceiling once
                    // the over-limit guard above has run.
                    debug_assert!(
                        check.response_buf.len() <= MAX_HEALTH_RESPONSE_SIZE,
                        "response buffer must stay within the max health response size"
                    );
                    if let Some(success) =
                        parse_probe_response(&check.response_buf, &check.config, check.h2c)
                    {
                        completed.push((idx, success));
                    }
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
                Err(_e) => {
                    completed.push((idx, false));
                }
            }
        }

        // Sort indices in descending order so swap_remove doesn't invalidate
        // later indices — it moves the last element to the removed position,
        // which has already been processed or is beyond our range.
        completed.sort_by(|a, b| b.0.cmp(&a.0));
        // We never schedule the same slot for removal twice, and never more
        // removals than there are in-flight checks.
        debug_assert!(
            completed.len() <= in_flight_before,
            "cannot complete more checks ({}) than were in flight ({in_flight_before})",
            completed.len()
        );
        debug_assert!(
            completed.windows(2).all(|w| w[0].0 > w[1].0),
            "completed indices must be strictly descending and unique for swap_remove safety"
        );
        for (idx, success) in completed {
            let len_before = self.in_flight.len();
            let mut check = self.in_flight.swap_remove(idx);
            // swap_remove drops exactly one in-flight slot.
            debug_assert_eq!(
                self.in_flight.len(),
                len_before - 1,
                "swap_remove must drop exactly one in-flight check"
            );
            let _ = registry.deregister(&mut check.stream);
            Self::record_check_result(
                backends,
                &check.cluster_id,
                &check.backend_id,
                check.address,
                success,
                &check.config,
            );
        }
    }

    fn record_check_result(
        backends: &Rc<RefCell<BackendMap>>,
        cluster_id: &str,
        backend_id: &str,
        address: SocketAddr,
        success: bool,
        config: &HealthCheckConfig,
    ) {
        let mut backend_map = backends.borrow_mut();
        let Some(backend_list) = backend_map.backends.get_mut(cluster_id) else {
            return;
        };

        let Some(backend_ref) = backend_list.find_backend(&address) else {
            return;
        };

        let mut backend = backend_ref.borrow_mut();

        if success {
            // Snapshot the hysteresis status before the counter mutation so we
            // can assert the flip happens exactly at the threshold. Read only
            // inside debug_assert! → dead in release, but must compile.
            let was_healthy = backend.health.is_healthy();
            let transitioned = backend.health.record_success(config.healthy_threshold);
            // A success resets the failure counter and bumps the success
            // counter; the rise counter must stay within [0, threshold] at the
            // transition boundary. `transitioned` is true iff we crossed from
            // unhealthy to healthy at exactly the configured threshold.
            debug_assert!(
                backend.health.consecutive_failures == 0,
                "a recorded success must zero the consecutive-failure counter"
            );
            debug_assert_eq!(
                transitioned,
                !was_healthy && backend.health.is_healthy(),
                "transition flag must be set iff the backend just flipped to healthy"
            );
            debug_assert!(
                !transitioned || backend.health.consecutive_successes >= config.healthy_threshold,
                "an UP transition only fires once the rise counter reaches the healthy threshold"
            );
            debug_assert!(
                !transitioned || backend.health.is_healthy(),
                "after an UP transition the backend must report healthy"
            );
            if transitioned {
                info!(
                    "{} backend {} at {} marked UP (health check passed {} consecutive times) for cluster {}",
                    log_context!(),
                    backend_id,
                    address,
                    config.healthy_threshold,
                    cluster_id
                );
                incr!(names::health_check::UP);
                gauge!(
                    names::backend::AVAILABLE,
                    1,
                    Some(cluster_id),
                    Some(backend_id)
                );
                push_event(Event {
                    kind: EventKind::HealthCheckHealthy as i32,
                    cluster_id: Some(cluster_id.to_owned()),
                    backend_id: Some(backend_id.to_owned()),
                    address: Some(address.into()),
                    metric_detail: None,
                });
            }
            count!(names::health_check::SUCCESS, 1);
        } else {
            let was_healthy = backend.health.is_healthy();
            let transitioned = backend.health.record_failure(config.unhealthy_threshold);
            // A failure resets the success counter and bumps the failure
            // counter; the fall counter must stay within [0, threshold] at the
            // transition boundary. `transitioned` is true iff we crossed from
            // healthy to unhealthy at exactly the configured threshold.
            debug_assert!(
                backend.health.consecutive_successes == 0,
                "a recorded failure must zero the consecutive-success counter"
            );
            debug_assert_eq!(
                transitioned,
                was_healthy && !backend.health.is_healthy(),
                "transition flag must be set iff the backend just flipped to unhealthy"
            );
            debug_assert!(
                !transitioned || backend.health.consecutive_failures >= config.unhealthy_threshold,
                "a DOWN transition only fires once the fall counter reaches the unhealthy threshold"
            );
            debug_assert!(
                !transitioned || !backend.health.is_healthy(),
                "after a DOWN transition the backend must report unhealthy"
            );
            if transitioned {
                warn!(
                    "{} backend {} at {} marked DOWN (health check failed {} consecutive times) for cluster {}",
                    log_context!(),
                    backend_id,
                    address,
                    config.unhealthy_threshold,
                    cluster_id
                );
                incr!(names::health_check::DOWN);
                gauge!(
                    names::backend::AVAILABLE,
                    0,
                    Some(cluster_id),
                    Some(backend_id)
                );
                push_event(Event {
                    kind: EventKind::HealthCheckUnhealthy as i32,
                    cluster_id: Some(cluster_id.to_owned()),
                    backend_id: Some(backend_id.to_owned()),
                    address: Some(address.into()),
                    metric_detail: None,
                });
            }
            count!(names::health_check::FAILURE, 1);
        }

        // Emit the healthy-backend gauge on every result update for clusters
        // with at least one configured backend, including `healthy == 0`. The
        // gauge is the only documented signal that lets dashboards detect
        // universal-outage / fail-open. Previously it was gated on
        // `healthy > 0 && healthy * 2 <= total`, so when all backends went
        // unhealthy the gauge retained its last non-zero value and operators
        // could not see fail-open in dashboards.
        //
        // The "critically low" warning (≤50% healthy) keeps its original
        // condition — only the gauge emission is unconditional.
        drop(backend);
        let total = backend_list.backends.len();
        let healthy = backend_list
            .backends
            .iter()
            .filter(|b| b.borrow().health.is_healthy())
            .count();
        // Gauge pairing: the healthy count is a subset of the total, so the
        // emitted gauge can never exceed the cluster size (it is a usize, but
        // a gauge above `total` would be an accounting bug, not rounding).
        debug_assert!(
            healthy <= total,
            "healthy backend count ({healthy}) must not exceed total ({total})"
        );
        if total > 0 {
            gauge!(
                "health_check.healthy_backends",
                healthy,
                Some(cluster_id),
                None
            );
            if healthy > 0 && healthy * 2 <= total {
                warn!(
                    "{} cluster {} has only {}/{} healthy backends",
                    log_context!(),
                    cluster_id,
                    healthy,
                    total
                );
            }
        }
        // Re-evaluate per-cluster availability so an `Available -> AllDown`
        // or `AllDown -> Available` transition driven purely by health-check
        // results (no failed routing call to observe it) still emits the
        // log + counter + Event. This is the only path that catches
        // passive-only recoveries — a cluster whose backends came back via
        // successful HC probes but whose retry policies haven't been
        // touched by any session.
        // `backend_list` is a `&BackendList` reborrowed from
        // `backend_map.backends.get(cluster_id)` above. The `&backend_map`
        // borrow it depends on lasts until the next use of `backend_map`,
        // so we just stop using `backend_list` and call the helper, which
        // re-takes its own `&self` borrow internally.
        backend_map.record_cluster_availability(cluster_id);
    }

    pub fn remove_cluster(&mut self, cluster_id: &str) {
        self.last_check_time.remove(cluster_id);
        self.in_flight
            .retain(|check| check.cluster_id != cluster_id);
        // Post-condition: no in-flight probe references the removed cluster.
        debug_assert!(
            self.in_flight.iter().all(|c| c.cluster_id != cluster_id),
            "remove_cluster must drop every in-flight check for the cluster"
        );
        debug_assert!(
            !self.last_check_time.contains_key(cluster_id),
            "remove_cluster must forget the cluster's last-check timestamp"
        );
    }
}

/// Top-level dispatch: pick the HTTP/1.1 status-line parser or the
/// HTTP/2 frame walker depending on whether the probe was sent as h2c.
/// `h2c` is captured from `BackendMap.cluster_http2` on the
/// `InFlightCheck` at probe-creation time.
fn parse_probe_response(buf: &[u8], config: &HealthCheckConfig, h2c: bool) -> Option<bool> {
    if h2c {
        try_parse_h2c_status(buf, config)
    } else {
        try_parse_status_line(buf, config)
    }
}

fn try_parse_status_line(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
    let response = std::str::from_utf8(buf).ok()?;
    let first_line_end = response.find("\r\n")?;
    let status_line = &response[..first_line_end];
    // The status line is the prefix before the first CRLF, so it can never be
    // longer than the buffer it was sliced from.
    debug_assert!(
        status_line.len() < response.len(),
        "status line must be a strict prefix ending before the CRLF"
    );

    let (_, rest) = status_line.split_once(' ')?;
    let status_str = rest.split(' ').next()?;
    let status_code: u32 = status_str.parse().unwrap_or(0);
    Some(is_status_healthy(status_code, config.expected_status))
}

fn is_status_healthy(actual: u32, expected: u32) -> bool {
    let healthy = if expected == 0 {
        (200..300).contains(&actual)
    } else {
        actual == expected
    };
    // When a specific status is required, health is exact equality; the two
    // branches must never both claim healthy for the same inputs unless the
    // expected code is itself a 2xx.
    debug_assert!(
        expected == 0 || healthy == (actual == expected),
        "with a specific expected status, health must be exact equality"
    );
    healthy
}

/// Compose a bare-minimum h2c (HTTP/2 over cleartext, prior-knowledge)
/// probe: the 24-byte connection preface, an empty client SETTINGS
/// frame (acknowledging the spec-mandated handshake), and a single
/// HEADERS frame on stream 1 carrying `GET <path>` with
/// END_STREAM | END_HEADERS.
///
/// HPACK encoding is delegated to `loona_hpack::Encoder` — the same
/// encoder the H2 mux uses for live traffic (`lib/src/protocol/mux/converter.rs`).
/// The probe inherits whatever static/dynamic-table behaviour the
/// encoder picks, including any future Huffman support. The connection
/// preface comes from `serializer::H2_PRI` so the probe and the live
/// mux stay in lockstep.
fn build_h2c_probe_bytes(uri: &str, address: SocketAddr) -> Vec<u8> {
    let authority = address.to_string();

    // Build the HPACK header block first so we know its length for the
    // HEADERS frame header. A fresh encoder per probe keeps things
    // stateless — we never reuse the dynamic table across probes.
    let mut encoder = loona_hpack::Encoder::new();
    let mut hpack: Vec<u8> = Vec::new();
    let headers: [(&[u8], &[u8]); 4] = [
        (b":method", b"GET"),
        (b":scheme", b"http"),
        (b":path", uri.as_bytes()),
        (b":authority", authority.as_bytes()),
    ];
    // Encoder::encode_into writes to an io::Write. Vec<u8> implements
    // it infallibly, so the ? cannot fire in practice.
    if encoder.encode_into(headers, &mut hpack).is_err() {
        // Defensive: return an empty buffer so the probe records as
        // failed via the read path instead of panicking.
        return Vec::new();
    }

    // Pre-allocate: preface + SETTINGS (9) + HEADERS header (9) + block.
    let mut out = Vec::with_capacity(H2_PRI.len() + FRAME_HEADER_SIZE * 2 + hpack.len());
    out.extend_from_slice(H2_PRI.as_bytes());

    // Empty client SETTINGS frame: length=0, type=Settings(0x04), flags=0, sid=0.
    out.extend_from_slice(&[0, 0, 0, 0x04, 0, 0, 0, 0, 0]);

    // HEADERS frame: length=hpack.len(), type=Headers(0x01),
    // flags=END_STREAM|END_HEADERS=0x05, stream=1.
    let len = hpack.len() as u32;
    out.push(((len >> 16) & 0xFF) as u8);
    out.push(((len >> 8) & 0xFF) as u8);
    out.push((len & 0xFF) as u8);
    out.push(0x01); // HEADERS
    out.push(0x05); // END_STREAM | END_HEADERS
    out.extend_from_slice(&[0, 0, 0, 1]); // stream id = 1
    out.extend_from_slice(&hpack);
    // Post-condition: a well-formed probe begins with the H2 connection
    // preface and carries exactly preface + 2 frame headers + the HPACK block.
    debug_assert!(
        out.starts_with(H2_PRI.as_bytes()),
        "an h2c probe must begin with the connection preface"
    );
    debug_assert_eq!(
        out.len(),
        H2_PRI.len() + FRAME_HEADER_SIZE * 2 + hpack.len(),
        "probe length must be preface + SETTINGS + HEADERS header + HPACK block"
    );
    out
}

/// Walk the buffered H2 frames looking for a HEADERS frame (plus any
/// CONTINUATION frames until END_HEADERS) on stream 1, then HPACK-decode
/// the assembled block via `loona_hpack::Decoder` and pull `:status`.
///
/// Returns:
///
/// * `Some(true)` — `:status` decoded and matches
///   `config.expected_status` (or any 2xx when `expected_status == 0`).
/// * `Some(false)` — `:status` decoded but does not match, the HPACK
///   block was malformed, or a GOAWAY frame arrived.
/// * `None` — buffer truncated mid-frame; caller should keep reading.
///
/// Frames with PADDED / PRIORITY flags are normalised correctly via the
/// `mux::parser` flag constants (`FLAG_PADDED` / `FLAG_PRIORITY`). Unknown
/// HPACK encodings, Huffman-coded values, and CONTINUATION fragmentation
/// all fall through to the decoder rather than the hand-rolled byte walk
/// the previous implementation used.
fn try_parse_h2c_status(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
    // RFC 9113 §4.2: the absolute upper bound for SETTINGS_MAX_FRAME_SIZE
    // is 2^24 - 1. The probe never advertises a custom limit, so accept
    // anything within the spec ceiling.
    const MAX_FRAME_SIZE: u32 = (1 << 24) - 1;

    let mut remaining: &[u8] = buf;
    // Buffer accumulating HEADERS + CONTINUATION block fragments for
    // stream 1 until END_HEADERS lands. RFC 9113 §6.10: until the
    // continuation chain ends, no other frames may arrive on the
    // connection, but we still tolerate interleaved control frames
    // for robustness — the decoder only fires on END_HEADERS.
    let mut headers_block: Option<Vec<u8>> = None;

    while !remaining.is_empty() {
        // The `mux::parser::frame_header` is built from `nom::number::complete`
        // primitives which emit `Err::Error` (not `Err::Incomplete`) on short
        // input, so distinguish "header still arriving" from "header arrived
        // but is malformed" by an explicit length check before parsing.
        if remaining.len() < FRAME_HEADER_SIZE {
            return None;
        }
        let consumable = remaining.len();
        let (rest, header) = match frame_header(remaining, MAX_FRAME_SIZE) {
            Ok(parsed) => parsed,
            // Header bytes are present but parser rejected them: malformed
            // framing (oversized payload, invalid stream-id parity, etc.).
            // → probe is unhealthy.
            Err(_) => return Some(false),
        };
        // Parser post-conditions: it consumes exactly the 9-byte header (never
        // grows its input) and honours the size bound it was handed.
        debug_assert!(
            rest.len() < consumable,
            "frame_header must consume at least the fixed frame header"
        );
        debug_assert_eq!(
            consumable - rest.len(),
            FRAME_HEADER_SIZE,
            "frame_header must consume exactly the fixed-size frame header"
        );
        debug_assert!(
            header.payload_len <= MAX_FRAME_SIZE,
            "frame_header must enforce the max-frame-size bound it was given"
        );

        let payload_len = header.payload_len as usize;
        if rest.len() < payload_len {
            // Body of the frame has not arrived yet.
            return None;
        }
        let (payload, after) = rest.split_at(payload_len);
        // Splitting at the declared payload length must not lose bytes: the
        // payload plus the remainder reconstitute `rest`, and the walk makes
        // forward progress so the loop terminates.
        debug_assert_eq!(
            payload.len(),
            payload_len,
            "payload split must yield exactly the declared payload length"
        );
        debug_assert_eq!(
            payload.len() + after.len(),
            rest.len(),
            "payload + remainder must equal the pre-split buffer"
        );
        debug_assert!(
            after.len() < remaining.len(),
            "each iteration must shrink the remaining buffer to guarantee termination"
        );

        match header.frame_type {
            FrameType::Headers if header.stream_id == 1 => {
                let block = strip_padded_priority(payload, header.flags)?;
                let mut accumulator = headers_block.take().unwrap_or_default();
                accumulator.extend_from_slice(block);
                if header.flags & FLAG_END_HEADERS != 0 {
                    return Some(decode_status_from_block(&accumulator, config));
                }
                headers_block = Some(accumulator);
            }
            FrameType::Continuation if header.stream_id == 1 => {
                // CONTINUATION carries no padding/priority flags; the
                // payload is a raw block fragment.
                let Some(mut accumulator) = headers_block.take() else {
                    // CONTINUATION without a preceding HEADERS is a
                    // protocol error per RFC 9113 §6.10.
                    return Some(false);
                };
                accumulator.extend_from_slice(payload);
                if header.flags & FLAG_END_HEADERS != 0 {
                    return Some(decode_status_from_block(&accumulator, config));
                }
                headers_block = Some(accumulator);
            }
            FrameType::GoAway => return Some(false),
            // SETTINGS, SETTINGS-ACK, DATA, PING, etc. — keep walking
            // until we find HEADERS on stream 1.
            _ => {}
        }

        remaining = after;
    }
    None
}

/// Trim the optional 1-byte pad-length prefix and the 5-byte priority
/// dependency (RFC 9113 §6.2). Returns `None` when the flags claim
/// padding/priority but the payload is too short to satisfy them — the
/// caller turns that into `Some(false)` (probe unhealthy).
fn strip_padded_priority(payload: &[u8], flags: u8) -> Option<&[u8]> {
    let mut start = 0usize;
    let mut end = payload.len();

    if flags & FLAG_PADDED != 0 {
        let &pad_len = payload.first()?;
        start = 1;
        let pad = pad_len as usize;
        // Trailing padding bytes must fit within the remaining payload
        // (after dropping the 1-byte pad-length prefix) — otherwise the
        // frame is malformed.
        let available = end.checked_sub(start)?;
        if pad > available {
            return None;
        }
        end -= pad;
    }
    if flags & FLAG_PRIORITY != 0 {
        let new_start = start.checked_add(5)?;
        if new_start > end {
            return None;
        }
        start = new_start;
    }
    // Post-condition: the surviving window `[start, end)` lies inside the
    // input payload and never grows it. A returned slice is therefore a
    // sub-slice of the original frame body.
    debug_assert!(
        start <= end && end <= payload.len(),
        "stripped header window [{start}, {end}) must lie within the payload ({})",
        payload.len()
    );
    let block = payload.get(start..end)?;
    debug_assert!(
        block.len() <= payload.len(),
        "stripped block must never be larger than the original payload"
    );
    Some(block)
}

/// Run `loona_hpack::Decoder` over the assembled HEADERS block and
/// return whether `:status` matches `config.expected_status`. Unknown
/// HPACK encodings, malformed integers, Huffman fallbacks, and
/// `:status` values that fail UTF-8 / numeric parsing all collapse to
/// `false` — the probe is recorded as unhealthy, never as a panic.
fn decode_status_from_block(block: &[u8], config: &HealthCheckConfig) -> bool {
    let mut decoder = loona_hpack::Decoder::new();
    let mut status: Option<u32> = None;
    let decode_result = decoder.decode_with_cb(block, |name, value| {
        if status.is_some() {
            return;
        }
        if name.as_ref() == b":status"
            && let Ok(s) = std::str::from_utf8(value.as_ref())
            && let Ok(parsed) = s.parse::<u32>()
        {
            status = Some(parsed);
        }
    });
    if decode_result.is_err() {
        return false;
    }
    match status {
        Some(code) => is_status_healthy(code, config.expected_status),
        None => false,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::backends::HealthState;

    #[test]
    fn test_is_status_healthy_any_2xx() {
        assert!(is_status_healthy(200, 0));
        assert!(is_status_healthy(204, 0));
        assert!(is_status_healthy(299, 0));
        assert!(!is_status_healthy(301, 0));
        assert!(!is_status_healthy(500, 0));
        assert!(!is_status_healthy(0, 0));
    }

    #[test]
    fn test_is_status_healthy_specific() {
        assert!(is_status_healthy(200, 200));
        assert!(!is_status_healthy(204, 200));
        assert!(!is_status_healthy(500, 200));
    }

    #[test]
    fn test_try_parse_status_line() {
        let config = HealthCheckConfig {
            uri: "/health".to_owned(),
            interval: 10,
            timeout: 5,
            healthy_threshold: 3,
            unhealthy_threshold: 3,
            expected_status: 0,
        };

        let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
        assert_eq!(try_parse_status_line(buf, &config), Some(true));

        let buf = b"HTTP/1.1 500 Internal Server Error\r\n\r\n";
        assert_eq!(try_parse_status_line(buf, &config), Some(false));

        let buf = b"HTTP/1.1 200";
        assert_eq!(try_parse_status_line(buf, &config), None);
    }

    #[test]
    fn test_health_state_transitions() {
        let mut state = HealthState::default();
        assert!(state.is_healthy());

        assert!(!state.record_failure(3));
        assert!(!state.record_failure(3));
        assert!(state.is_healthy());

        assert!(state.record_failure(3));
        assert!(!state.is_healthy());

        assert!(!state.record_success(3));
        assert!(!state.record_success(3));
        assert!(!state.is_healthy());

        assert!(state.record_success(3));
        assert!(state.is_healthy());
    }

    fn h2c_config(expected: u32) -> HealthCheckConfig {
        HealthCheckConfig {
            uri: "/health".to_owned(),
            interval: 10,
            timeout: 5,
            healthy_threshold: 3,
            unhealthy_threshold: 3,
            expected_status: expected,
        }
    }

    /// Wrap an HPACK-encoded header block in an H2 frame header with the
    /// given type, flags and stream id. Frame body bytes live in
    /// `payload`.
    fn frame_with_header(frame_type: u8, flags: u8, sid: u32, payload: &[u8]) -> Vec<u8> {
        let payload_len = payload.len();
        let mut out = Vec::with_capacity(FRAME_HEADER_SIZE + payload_len);
        out.push(((payload_len >> 16) & 0xFF) as u8);
        out.push(((payload_len >> 8) & 0xFF) as u8);
        out.push((payload_len & 0xFF) as u8);
        out.push(frame_type);
        out.push(flags);
        out.extend_from_slice(&sid.to_be_bytes());
        out.extend_from_slice(payload);
        out
    }

    /// Encode `:status <code>` (plus optional extra response headers) into
    /// a fresh HPACK block via the same encoder the live mux uses. This
    /// keeps the tests aligned with whatever loona_hpack picks for static
    /// vs. literal vs. (future) Huffman encoding instead of pinning bytes.
    fn encode_response_headers(headers: &[(&[u8], &[u8])]) -> Vec<u8> {
        let mut encoder = loona_hpack::Encoder::new();
        let mut out = Vec::new();
        encoder
            .encode_into(headers.iter().copied(), &mut out)
            .unwrap();
        out
    }

    #[test]
    fn build_h2c_probe_starts_with_preface_and_frames() {
        let bytes = build_h2c_probe_bytes("/health", "127.0.0.1:8080".parse().unwrap());

        // Connection preface (24 bytes).
        assert!(bytes.starts_with(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"));

        // SETTINGS frame after the preface: empty payload, type=0x04, flags=0, sid=0.
        let settings_start = 24;
        assert_eq!(&bytes[settings_start..settings_start + 3], &[0u8, 0, 0]); // length = 0
        assert_eq!(bytes[settings_start + 3], 0x04); // SETTINGS
        assert_eq!(bytes[settings_start + 4], 0); // flags
        assert_eq!(
            &bytes[settings_start + 5..settings_start + 9],
            &[0u8, 0, 0, 0]
        );

        // HEADERS frame on stream 1, END_STREAM | END_HEADERS = 0x05.
        let headers_start = settings_start + 9;
        assert_eq!(bytes[headers_start + 3], 0x01); // HEADERS
        assert_eq!(bytes[headers_start + 4], 0x05);
        assert_eq!(
            &bytes[headers_start + 5..headers_start + 9],
            &[0u8, 0, 0, 1]
        );

        // The HEADERS payload must HPACK-decode back to the four pseudo-headers.
        let payload_start = headers_start + 9;
        let mut decoder = loona_hpack::Decoder::new();
        let mut method = None;
        let mut scheme = None;
        let mut path = None;
        let mut authority = None;
        decoder
            .decode_with_cb(&bytes[payload_start..], |name, value| match name.as_ref() {
                b":method" => method = Some(value.to_vec()),
                b":scheme" => scheme = Some(value.to_vec()),
                b":path" => path = Some(value.to_vec()),
                b":authority" => authority = Some(value.to_vec()),
                _ => {}
            })
            .expect("loona_hpack decodes a freshly-encoded probe");
        assert_eq!(method.as_deref(), Some(b"GET" as &[u8]));
        assert_eq!(scheme.as_deref(), Some(b"http" as &[u8]));
        assert_eq!(path.as_deref(), Some(b"/health" as &[u8]));
        assert_eq!(authority.as_deref(), Some(b"127.0.0.1:8080" as &[u8]));
    }

    #[test]
    fn h2c_response_with_status_200_decodes_healthy() {
        let block = encode_response_headers(&[(b":status", b"200")]);
        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
    }

    #[test]
    fn h2c_response_with_status_500_fails_default_2xx_check() {
        let block = encode_response_headers(&[(b":status", b"500")]);
        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
    }

    #[test]
    fn h2c_response_with_status_503_matches_expected_503() {
        let block =
            encode_response_headers(&[(b":status", b"503"), (b"content-type", b"text/plain")]);
        let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
        let cfg = h2c_config(503);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
    }

    #[test]
    fn h2c_response_with_continuation_decodes_status_200_healthy() {
        // Build one HPACK block, then split it across HEADERS (no
        // END_HEADERS) + CONTINUATION (END_HEADERS). The hand-rolled
        // walker that this commit replaces would have refused to assemble
        // the block and reported the probe as unhealthy.
        let block = encode_response_headers(&[
            (b":status", b"200"),
            (b"x-trace-id", b"abc-123"),
            (b"server", b"sozu-test"),
        ]);
        assert!(block.len() >= 4, "HPACK block needs to be splittable");
        let split = block.len() / 2;
        let (head, tail) = block.split_at(split);

        // HEADERS with no END_HEADERS (flags = 0) — block continues.
        let mut buf = frame_with_header(0x01, 0, 1, head);
        // CONTINUATION (frame type 0x09) carrying the rest, END_HEADERS set.
        buf.extend_from_slice(&frame_with_header(0x09, FLAG_END_HEADERS, 1, tail));

        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
    }

    #[test]
    fn h2c_response_with_padded_priority_headers_decodes_status_200() {
        // Build a HEADERS frame with both PADDED and PRIORITY flags so the
        // parser must strip 1 + 5 = 6 bytes before handing the block to
        // loona_hpack.
        let block = encode_response_headers(&[(b":status", b"200")]);
        let pad_len: u8 = 3;

        let mut payload = Vec::new();
        payload.push(pad_len); // PADDED: 1-byte pad length
        payload.extend_from_slice(&[0u8, 0, 0, 0, 16]); // PRIORITY: stream-dep + weight
        payload.extend_from_slice(&block);
        payload.extend_from_slice(&[0u8; 3]); // padding bytes

        let flags = FLAG_PADDED | FLAG_PRIORITY | FLAG_END_HEADERS;
        let buf = frame_with_header(0x01, flags, 1, &payload);
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
    }

    #[test]
    fn h2c_response_after_unrelated_settings_frame_decodes_healthy() {
        // The probe parser must skip over the server's SETTINGS frame
        // and any other connection-scoped frames before locating the
        // HEADERS on stream 1.
        let mut buf = frame_with_header(0x04, 0, 0, &[]); // SETTINGS, empty
        buf.extend_from_slice(&frame_with_header(0x04, 0x01, 0, &[])); // SETTINGS-ACK
        let block = encode_response_headers(&[(b":status", b"200")]);
        buf.extend_from_slice(&frame_with_header(0x01, FLAG_END_HEADERS, 1, &block));

        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
    }

    #[test]
    fn h2c_goaway_returns_unhealthy() {
        // GOAWAY: 8-byte payload (last-stream-id + error code).
        let buf = frame_with_header(0x07, 0, 0, &[0u8; 8]);
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
    }

    #[test]
    fn h2c_truncated_frame_returns_none() {
        // Frame header claims a 10-byte payload but only 5 are present.
        let mut buf: Vec<u8> = vec![
            0,                // length high
            0,                // length mid
            10,               // length low = 10
            0x01,             // HEADERS
            FLAG_END_HEADERS, // flags
        ];
        buf.extend_from_slice(&1u32.to_be_bytes()); // stream id = 1
        buf.extend_from_slice(&[0u8; 5]); // partial payload
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), None);
    }

    #[test]
    fn h2c_partial_frame_header_returns_none() {
        // Fewer than 9 bytes: the frame header has not even fully arrived
        // yet. The probe loop should report `None` so the caller keeps
        // reading rather than recording the backend as unhealthy.
        let cfg = h2c_config(0);
        for partial_len in 0usize..FRAME_HEADER_SIZE {
            let buf = vec![0u8; partial_len];
            assert_eq!(
                try_parse_h2c_status(&buf, &cfg),
                None,
                "partial buffer of {partial_len} byte(s) should be 'keep reading'"
            );
        }
    }

    #[test]
    fn h2c_continuation_without_preceding_headers_returns_unhealthy() {
        // CONTINUATION without a HEADERS predecessor is a protocol
        // error per RFC 9113 §6.10.
        let block = encode_response_headers(&[(b":status", b"200")]);
        let buf = frame_with_header(0x09, FLAG_END_HEADERS, 1, &block);
        let cfg = h2c_config(0);
        assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
    }
}