1use std::{
10 cell::RefCell,
11 collections::{HashMap, HashSet},
12 hash::{Hash, Hasher},
13 io::{Read, Write},
14 net::SocketAddr,
15 rc::Rc,
16 time::{Duration, Instant},
17};
18
19use mio::{Interest, Registry, Token, net::TcpStream};
20use sozu_command::{
21 proto::command::{Event, EventKind, HealthCheckConfig},
22 state::ClusterId,
23};
24
25use crate::metrics::names;
26use crate::{
27 backends::BackendMap,
28 protocol::mux::{
29 parser::{
30 FLAG_END_HEADERS, FLAG_PADDED, FLAG_PRIORITY, FRAME_HEADER_SIZE, FrameType,
31 frame_header,
32 },
33 serializer::H2_PRI,
34 },
35 server::push_event,
36};
37
38macro_rules! log_context {
45 () => {
46 "HEALTH-CHECK"
47 };
48 ($cluster:expr) => {
49 concat!("HEALTH-CHECK cluster=", $cluster)
50 };
51}
52
53const HEALTH_CHECK_TOKEN_BASE: usize = 1 << 24;
60const HEALTH_CHECK_TOKEN_CAPACITY: usize = 1 << 16;
65
66type PendingChecks = Vec<(
71 ClusterId,
72 HealthCheckConfig,
73 bool,
74 Vec<(String, SocketAddr)>,
75)>;
76
77#[derive(Debug)]
79struct InFlightCheck {
80 stream: TcpStream,
81 token: Token,
82 cluster_id: ClusterId,
83 backend_id: String,
84 address: SocketAddr,
85 started_at: Instant,
86 timeout: Duration,
87 request_bytes: Option<Vec<u8>>,
88 write_offset: usize,
89 response_buf: Vec<u8>,
90 config: HealthCheckConfig,
91 h2c: bool,
97}
98
99#[derive(Debug)]
101pub struct HealthChecker {
102 in_flight: Vec<InFlightCheck>,
103 last_check_time: HashMap<ClusterId, Instant>,
104 next_token_id: usize,
105 ready_tokens: HashSet<Token>,
106}
107
108impl Default for HealthChecker {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl HealthChecker {
115 pub fn new() -> Self {
116 HealthChecker {
117 in_flight: Vec::new(),
118 last_check_time: HashMap::new(),
119 next_token_id: 0,
120 ready_tokens: HashSet::new(),
121 }
122 }
123
124 fn allocate_token(&mut self) -> Option<Token> {
130 let in_flight: HashSet<usize> = self
131 .in_flight
132 .iter()
133 .map(|c| c.token.0 - HEALTH_CHECK_TOKEN_BASE)
134 .collect();
135
136 for _ in 0..HEALTH_CHECK_TOKEN_CAPACITY {
137 let offset = self.next_token_id % HEALTH_CHECK_TOKEN_CAPACITY;
138 self.next_token_id = self.next_token_id.wrapping_add(1);
139 if !in_flight.contains(&offset) {
140 return Some(Token(HEALTH_CHECK_TOKEN_BASE + offset));
141 }
142 }
143 error!(
144 "{} token-table full ({} in-flight checks); refusing to allocate a new probe slot",
145 log_context!(),
146 in_flight.len()
147 );
148 None
149 }
150
151 pub fn owns_token(&self, token: Token) -> bool {
157 token.0 >= HEALTH_CHECK_TOKEN_BASE
158 && token.0 < HEALTH_CHECK_TOKEN_BASE + HEALTH_CHECK_TOKEN_CAPACITY
159 }
160
161 pub fn ready(&mut self, token: Token) {
163 self.ready_tokens.insert(token);
164 }
165
166 pub fn poll(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
169 if self.in_flight.is_empty() && backends.borrow().health_check_configs.is_empty() {
170 return;
171 }
172 self.initiate_checks(backends, registry);
173 self.progress_checks(backends, registry);
174 }
175
176 fn initiate_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
177 let backend_map = backends.borrow();
178 let now = Instant::now();
179
180 let mut to_check: PendingChecks = Vec::new();
181
182 for (cluster_id, config) in &backend_map.health_check_configs {
183 let interval = Duration::from_secs(u64::from(config.interval));
184
185 let mut hasher = std::collections::hash_map::DefaultHasher::new();
187 cluster_id.hash(&mut hasher);
188 let jitter_ms = hasher.finish() % (interval.as_millis() as u64 / 5).max(1);
189 let jittered_interval = interval + Duration::from_millis(jitter_ms);
190
191 let should_check = match self.last_check_time.get(cluster_id) {
192 Some(last) => now.duration_since(*last) >= jittered_interval,
193 None => true,
194 };
195
196 if !should_check {
197 continue;
198 }
199
200 if let Some(backend_list) = backend_map.backends.get(cluster_id) {
201 let backends_to_check: Vec<(String, SocketAddr)> = backend_list
202 .backends
203 .iter()
204 .filter(|b| {
205 let b = b.borrow();
206 b.status == crate::backends::BackendStatus::Normal
207 && !self.in_flight.iter().any(|f| {
208 f.cluster_id == *cluster_id && f.backend_id == b.backend_id
209 })
210 })
211 .map(|b| {
212 let b = b.borrow();
213 (b.backend_id.to_owned(), b.address)
214 })
215 .collect();
216
217 if !backends_to_check.is_empty() {
218 let h2c = backend_map
219 .cluster_http2
220 .get(cluster_id)
221 .copied()
222 .unwrap_or(false);
223 to_check.push((
224 cluster_id.to_owned(),
225 config.to_owned(),
226 h2c,
227 backends_to_check,
228 ));
229 }
230 }
231 }
232
233 drop(backend_map);
234
235 for (cluster_id, config, h2c, backends_to_check) in to_check {
236 self.last_check_time.insert(cluster_id.to_owned(), now);
237
238 let probe_uri = config.uri.as_str();
245
246 for (backend_id, address) in backends_to_check {
247 match TcpStream::connect(address) {
248 Ok(mut stream) => {
249 let Some(token) = self.allocate_token() else {
250 Self::record_check_result(
254 backends,
255 &cluster_id,
256 &backend_id,
257 address,
258 false,
259 &config,
260 );
261 continue;
262 };
263 if let Err(e) = registry.register(
264 &mut stream,
265 token,
266 Interest::READABLE | Interest::WRITABLE,
267 ) {
268 debug!(
269 "{} failed to register socket for {} ({}) in cluster {}: {}",
270 log_context!(),
271 backend_id,
272 address,
273 cluster_id,
274 e
275 );
276 Self::record_check_result(
277 backends,
278 &cluster_id,
279 &backend_id,
280 address,
281 false,
282 &config,
283 );
284 continue;
285 }
286 trace!(
287 "{} initiated connection to {} ({}) for cluster {}",
288 log_context!(),
289 backend_id,
290 address,
291 cluster_id
292 );
293 let request_bytes = if h2c {
294 build_h2c_probe_bytes(probe_uri, address)
295 } else {
296 format!(
310 "GET {probe_uri} HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
311 )
312 .into_bytes()
313 };
314 self.in_flight.push(InFlightCheck {
315 stream,
316 token,
317 cluster_id: cluster_id.to_owned(),
318 backend_id,
319 address,
320 started_at: now,
321 timeout: Duration::from_secs(u64::from(config.timeout)),
322 request_bytes: Some(request_bytes),
323 write_offset: 0,
324 response_buf: Vec::with_capacity(256),
325 config: config.to_owned(),
326 h2c,
327 });
328 }
329 Err(e) => {
330 debug!(
331 "{} failed to connect to {} ({}) for cluster {}: {}",
332 log_context!(),
333 backend_id,
334 address,
335 cluster_id,
336 e
337 );
338 Self::record_check_result(
339 backends,
340 &cluster_id,
341 &backend_id,
342 address,
343 false,
344 &config,
345 );
346 }
347 }
348 }
349 }
350 }
351
352 fn progress_checks(&mut self, backends: &Rc<RefCell<BackendMap>>, registry: &Registry) {
353 const MAX_HEALTH_RESPONSE_SIZE: usize = 4096;
354
355 let now = Instant::now();
356 let mut completed = Vec::new();
357 let ready = std::mem::take(&mut self.ready_tokens);
358
359 for (idx, check) in self.in_flight.iter_mut().enumerate() {
360 if now.duration_since(check.started_at) > check.timeout {
362 debug!(
363 "{} timeout for {} ({}) in cluster {}",
364 log_context!(),
365 check.backend_id,
366 check.address,
367 check.cluster_id
368 );
369 completed.push((idx, false));
370 continue;
371 }
372
373 if !ready.contains(&check.token) {
375 continue;
376 }
377
378 if let Some(ref request_bytes) = check.request_bytes {
379 match check.stream.write(&request_bytes[check.write_offset..]) {
380 Ok(n) => {
381 check.write_offset += n;
382 if check.write_offset >= request_bytes.len() {
383 check.request_bytes = None;
384 } else {
385 continue;
386 }
387 }
388 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
389 continue;
390 }
391 Err(_e) => {
392 completed.push((idx, false));
393 continue;
394 }
395 }
396 }
397
398 let mut buf = [0u8; 256];
399 match check.stream.read(&mut buf) {
400 Ok(0) => {
401 let success =
402 parse_probe_response(&check.response_buf, &check.config, check.h2c)
403 .unwrap_or(false);
404 completed.push((idx, success));
405 }
406 Ok(n) => {
407 if check.response_buf.len() + n > MAX_HEALTH_RESPONSE_SIZE {
408 completed.push((idx, false));
409 continue;
410 }
411 check.response_buf.extend_from_slice(&buf[..n]);
412 if let Some(success) =
413 parse_probe_response(&check.response_buf, &check.config, check.h2c)
414 {
415 completed.push((idx, success));
416 }
417 }
418 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
419 Err(_e) => {
420 completed.push((idx, false));
421 }
422 }
423 }
424
425 completed.sort_by(|a, b| b.0.cmp(&a.0));
429 for (idx, success) in completed {
430 let mut check = self.in_flight.swap_remove(idx);
431 let _ = registry.deregister(&mut check.stream);
432 Self::record_check_result(
433 backends,
434 &check.cluster_id,
435 &check.backend_id,
436 check.address,
437 success,
438 &check.config,
439 );
440 }
441 }
442
443 fn record_check_result(
444 backends: &Rc<RefCell<BackendMap>>,
445 cluster_id: &str,
446 backend_id: &str,
447 address: SocketAddr,
448 success: bool,
449 config: &HealthCheckConfig,
450 ) {
451 let mut backend_map = backends.borrow_mut();
452 let Some(backend_list) = backend_map.backends.get_mut(cluster_id) else {
453 return;
454 };
455
456 let Some(backend_ref) = backend_list.find_backend(&address) else {
457 return;
458 };
459
460 let mut backend = backend_ref.borrow_mut();
461
462 if success {
463 let transitioned = backend.health.record_success(config.healthy_threshold);
464 if transitioned {
465 info!(
466 "{} backend {} at {} marked UP (health check passed {} consecutive times) for cluster {}",
467 log_context!(),
468 backend_id,
469 address,
470 config.healthy_threshold,
471 cluster_id
472 );
473 incr!(names::health_check::UP);
474 gauge!(
475 names::backend::AVAILABLE,
476 1,
477 Some(cluster_id),
478 Some(backend_id)
479 );
480 push_event(Event {
481 kind: EventKind::HealthCheckHealthy as i32,
482 cluster_id: Some(cluster_id.to_owned()),
483 backend_id: Some(backend_id.to_owned()),
484 address: Some(address.into()),
485 metric_detail: None,
486 });
487 }
488 count!(names::health_check::SUCCESS, 1);
489 } else {
490 let transitioned = backend.health.record_failure(config.unhealthy_threshold);
491 if transitioned {
492 warn!(
493 "{} backend {} at {} marked DOWN (health check failed {} consecutive times) for cluster {}",
494 log_context!(),
495 backend_id,
496 address,
497 config.unhealthy_threshold,
498 cluster_id
499 );
500 incr!(names::health_check::DOWN);
501 gauge!(
502 names::backend::AVAILABLE,
503 0,
504 Some(cluster_id),
505 Some(backend_id)
506 );
507 push_event(Event {
508 kind: EventKind::HealthCheckUnhealthy as i32,
509 cluster_id: Some(cluster_id.to_owned()),
510 backend_id: Some(backend_id.to_owned()),
511 address: Some(address.into()),
512 metric_detail: None,
513 });
514 }
515 count!(names::health_check::FAILURE, 1);
516 }
517
518 drop(backend);
529 let total = backend_list.backends.len();
530 let healthy = backend_list
531 .backends
532 .iter()
533 .filter(|b| b.borrow().health.is_healthy())
534 .count();
535 if total > 0 {
536 gauge!(
537 "health_check.healthy_backends",
538 healthy,
539 Some(cluster_id),
540 None
541 );
542 if healthy > 0 && healthy * 2 <= total {
543 warn!(
544 "{} cluster {} has only {}/{} healthy backends",
545 log_context!(),
546 cluster_id,
547 healthy,
548 total
549 );
550 }
551 }
552 backend_map.record_cluster_availability(cluster_id);
565 }
566
567 pub fn remove_cluster(&mut self, cluster_id: &str) {
568 self.last_check_time.remove(cluster_id);
569 self.in_flight
570 .retain(|check| check.cluster_id != cluster_id);
571 }
572}
573
574fn parse_probe_response(buf: &[u8], config: &HealthCheckConfig, h2c: bool) -> Option<bool> {
579 if h2c {
580 try_parse_h2c_status(buf, config)
581 } else {
582 try_parse_status_line(buf, config)
583 }
584}
585
586fn try_parse_status_line(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
587 let response = std::str::from_utf8(buf).ok()?;
588 let first_line_end = response.find("\r\n")?;
589 let status_line = &response[..first_line_end];
590
591 let (_, rest) = status_line.split_once(' ')?;
592 let status_str = rest.split(' ').next()?;
593 let status_code: u32 = status_str.parse().unwrap_or(0);
594 Some(is_status_healthy(status_code, config.expected_status))
595}
596
597fn is_status_healthy(actual: u32, expected: u32) -> bool {
598 if expected == 0 {
599 (200..300).contains(&actual)
600 } else {
601 actual == expected
602 }
603}
604
605fn build_h2c_probe_bytes(uri: &str, address: SocketAddr) -> Vec<u8> {
618 let authority = address.to_string();
619
620 let mut encoder = loona_hpack::Encoder::new();
624 let mut hpack: Vec<u8> = Vec::new();
625 let headers: [(&[u8], &[u8]); 4] = [
626 (b":method", b"GET"),
627 (b":scheme", b"http"),
628 (b":path", uri.as_bytes()),
629 (b":authority", authority.as_bytes()),
630 ];
631 if encoder.encode_into(headers, &mut hpack).is_err() {
634 return Vec::new();
637 }
638
639 let mut out = Vec::with_capacity(H2_PRI.len() + FRAME_HEADER_SIZE * 2 + hpack.len());
641 out.extend_from_slice(H2_PRI.as_bytes());
642
643 out.extend_from_slice(&[0, 0, 0, 0x04, 0, 0, 0, 0, 0]);
645
646 let len = hpack.len() as u32;
649 out.push(((len >> 16) & 0xFF) as u8);
650 out.push(((len >> 8) & 0xFF) as u8);
651 out.push((len & 0xFF) as u8);
652 out.push(0x01); out.push(0x05); out.extend_from_slice(&[0, 0, 0, 1]); out.extend_from_slice(&hpack);
656 out
657}
658
659fn try_parse_h2c_status(buf: &[u8], config: &HealthCheckConfig) -> Option<bool> {
677 const MAX_FRAME_SIZE: u32 = (1 << 24) - 1;
681
682 let mut remaining: &[u8] = buf;
683 let mut headers_block: Option<Vec<u8>> = None;
689
690 while !remaining.is_empty() {
691 if remaining.len() < FRAME_HEADER_SIZE {
696 return None;
697 }
698 let (rest, header) = match frame_header(remaining, MAX_FRAME_SIZE) {
699 Ok(parsed) => parsed,
700 Err(_) => return Some(false),
704 };
705
706 let payload_len = header.payload_len as usize;
707 if rest.len() < payload_len {
708 return None;
710 }
711 let (payload, after) = rest.split_at(payload_len);
712
713 match header.frame_type {
714 FrameType::Headers if header.stream_id == 1 => {
715 let block = strip_padded_priority(payload, header.flags)?;
716 let mut accumulator = headers_block.take().unwrap_or_default();
717 accumulator.extend_from_slice(block);
718 if header.flags & FLAG_END_HEADERS != 0 {
719 return Some(decode_status_from_block(&accumulator, config));
720 }
721 headers_block = Some(accumulator);
722 }
723 FrameType::Continuation if header.stream_id == 1 => {
724 let Some(mut accumulator) = headers_block.take() else {
727 return Some(false);
730 };
731 accumulator.extend_from_slice(payload);
732 if header.flags & FLAG_END_HEADERS != 0 {
733 return Some(decode_status_from_block(&accumulator, config));
734 }
735 headers_block = Some(accumulator);
736 }
737 FrameType::GoAway => return Some(false),
738 _ => {}
741 }
742
743 remaining = after;
744 }
745 None
746}
747
748fn strip_padded_priority(payload: &[u8], flags: u8) -> Option<&[u8]> {
753 let mut start = 0usize;
754 let mut end = payload.len();
755
756 if flags & FLAG_PADDED != 0 {
757 let &pad_len = payload.first()?;
758 start = 1;
759 let pad = pad_len as usize;
760 let available = end.checked_sub(start)?;
764 if pad > available {
765 return None;
766 }
767 end -= pad;
768 }
769 if flags & FLAG_PRIORITY != 0 {
770 let new_start = start.checked_add(5)?;
771 if new_start > end {
772 return None;
773 }
774 start = new_start;
775 }
776 payload.get(start..end)
777}
778
779fn decode_status_from_block(block: &[u8], config: &HealthCheckConfig) -> bool {
785 let mut decoder = loona_hpack::Decoder::new();
786 let mut status: Option<u32> = None;
787 let decode_result = decoder.decode_with_cb(block, |name, value| {
788 if status.is_some() {
789 return;
790 }
791 if name.as_ref() == b":status"
792 && let Ok(s) = std::str::from_utf8(value.as_ref())
793 && let Ok(parsed) = s.parse::<u32>()
794 {
795 status = Some(parsed);
796 }
797 });
798 if decode_result.is_err() {
799 return false;
800 }
801 match status {
802 Some(code) => is_status_healthy(code, config.expected_status),
803 None => false,
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use super::*;
810 use crate::backends::HealthState;
811
812 #[test]
813 fn test_is_status_healthy_any_2xx() {
814 assert!(is_status_healthy(200, 0));
815 assert!(is_status_healthy(204, 0));
816 assert!(is_status_healthy(299, 0));
817 assert!(!is_status_healthy(301, 0));
818 assert!(!is_status_healthy(500, 0));
819 assert!(!is_status_healthy(0, 0));
820 }
821
822 #[test]
823 fn test_is_status_healthy_specific() {
824 assert!(is_status_healthy(200, 200));
825 assert!(!is_status_healthy(204, 200));
826 assert!(!is_status_healthy(500, 200));
827 }
828
829 #[test]
830 fn test_try_parse_status_line() {
831 let config = HealthCheckConfig {
832 uri: "/health".to_owned(),
833 interval: 10,
834 timeout: 5,
835 healthy_threshold: 3,
836 unhealthy_threshold: 3,
837 expected_status: 0,
838 };
839
840 let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
841 assert_eq!(try_parse_status_line(buf, &config), Some(true));
842
843 let buf = b"HTTP/1.1 500 Internal Server Error\r\n\r\n";
844 assert_eq!(try_parse_status_line(buf, &config), Some(false));
845
846 let buf = b"HTTP/1.1 200";
847 assert_eq!(try_parse_status_line(buf, &config), None);
848 }
849
850 #[test]
851 fn test_health_state_transitions() {
852 let mut state = HealthState::default();
853 assert!(state.is_healthy());
854
855 assert!(!state.record_failure(3));
856 assert!(!state.record_failure(3));
857 assert!(state.is_healthy());
858
859 assert!(state.record_failure(3));
860 assert!(!state.is_healthy());
861
862 assert!(!state.record_success(3));
863 assert!(!state.record_success(3));
864 assert!(!state.is_healthy());
865
866 assert!(state.record_success(3));
867 assert!(state.is_healthy());
868 }
869
870 fn h2c_config(expected: u32) -> HealthCheckConfig {
871 HealthCheckConfig {
872 uri: "/health".to_owned(),
873 interval: 10,
874 timeout: 5,
875 healthy_threshold: 3,
876 unhealthy_threshold: 3,
877 expected_status: expected,
878 }
879 }
880
881 fn frame_with_header(frame_type: u8, flags: u8, sid: u32, payload: &[u8]) -> Vec<u8> {
885 let payload_len = payload.len();
886 let mut out = Vec::with_capacity(FRAME_HEADER_SIZE + payload_len);
887 out.push(((payload_len >> 16) & 0xFF) as u8);
888 out.push(((payload_len >> 8) & 0xFF) as u8);
889 out.push((payload_len & 0xFF) as u8);
890 out.push(frame_type);
891 out.push(flags);
892 out.extend_from_slice(&sid.to_be_bytes());
893 out.extend_from_slice(payload);
894 out
895 }
896
897 fn encode_response_headers(headers: &[(&[u8], &[u8])]) -> Vec<u8> {
902 let mut encoder = loona_hpack::Encoder::new();
903 let mut out = Vec::new();
904 encoder
905 .encode_into(headers.iter().copied(), &mut out)
906 .unwrap();
907 out
908 }
909
910 #[test]
911 fn build_h2c_probe_starts_with_preface_and_frames() {
912 let bytes = build_h2c_probe_bytes("/health", "127.0.0.1:8080".parse().unwrap());
913
914 assert!(bytes.starts_with(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"));
916
917 let settings_start = 24;
919 assert_eq!(&bytes[settings_start..settings_start + 3], &[0u8, 0, 0]); assert_eq!(bytes[settings_start + 3], 0x04); assert_eq!(bytes[settings_start + 4], 0); assert_eq!(
923 &bytes[settings_start + 5..settings_start + 9],
924 &[0u8, 0, 0, 0]
925 );
926
927 let headers_start = settings_start + 9;
929 assert_eq!(bytes[headers_start + 3], 0x01); assert_eq!(bytes[headers_start + 4], 0x05);
931 assert_eq!(
932 &bytes[headers_start + 5..headers_start + 9],
933 &[0u8, 0, 0, 1]
934 );
935
936 let payload_start = headers_start + 9;
938 let mut decoder = loona_hpack::Decoder::new();
939 let mut method = None;
940 let mut scheme = None;
941 let mut path = None;
942 let mut authority = None;
943 decoder
944 .decode_with_cb(&bytes[payload_start..], |name, value| match name.as_ref() {
945 b":method" => method = Some(value.to_vec()),
946 b":scheme" => scheme = Some(value.to_vec()),
947 b":path" => path = Some(value.to_vec()),
948 b":authority" => authority = Some(value.to_vec()),
949 _ => {}
950 })
951 .expect("loona_hpack decodes a freshly-encoded probe");
952 assert_eq!(method.as_deref(), Some(b"GET" as &[u8]));
953 assert_eq!(scheme.as_deref(), Some(b"http" as &[u8]));
954 assert_eq!(path.as_deref(), Some(b"/health" as &[u8]));
955 assert_eq!(authority.as_deref(), Some(b"127.0.0.1:8080" as &[u8]));
956 }
957
958 #[test]
959 fn h2c_response_with_status_200_decodes_healthy() {
960 let block = encode_response_headers(&[(b":status", b"200")]);
961 let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
962 let cfg = h2c_config(0);
963 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
964 }
965
966 #[test]
967 fn h2c_response_with_status_500_fails_default_2xx_check() {
968 let block = encode_response_headers(&[(b":status", b"500")]);
969 let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
970 let cfg = h2c_config(0);
971 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
972 }
973
974 #[test]
975 fn h2c_response_with_status_503_matches_expected_503() {
976 let block =
977 encode_response_headers(&[(b":status", b"503"), (b"content-type", b"text/plain")]);
978 let buf = frame_with_header(0x01, FLAG_END_HEADERS, 1, &block);
979 let cfg = h2c_config(503);
980 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
981 }
982
983 #[test]
984 fn h2c_response_with_continuation_decodes_status_200_healthy() {
985 let block = encode_response_headers(&[
990 (b":status", b"200"),
991 (b"x-trace-id", b"abc-123"),
992 (b"server", b"sozu-test"),
993 ]);
994 assert!(block.len() >= 4, "HPACK block needs to be splittable");
995 let split = block.len() / 2;
996 let (head, tail) = block.split_at(split);
997
998 let mut buf = frame_with_header(0x01, 0, 1, head);
1000 buf.extend_from_slice(&frame_with_header(0x09, FLAG_END_HEADERS, 1, tail));
1002
1003 let cfg = h2c_config(0);
1004 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1005 }
1006
1007 #[test]
1008 fn h2c_response_with_padded_priority_headers_decodes_status_200() {
1009 let block = encode_response_headers(&[(b":status", b"200")]);
1013 let pad_len: u8 = 3;
1014
1015 let mut payload = Vec::new();
1016 payload.push(pad_len); payload.extend_from_slice(&[0u8, 0, 0, 0, 16]); payload.extend_from_slice(&block);
1019 payload.extend_from_slice(&[0u8; 3]); let flags = FLAG_PADDED | FLAG_PRIORITY | FLAG_END_HEADERS;
1022 let buf = frame_with_header(0x01, flags, 1, &payload);
1023 let cfg = h2c_config(0);
1024 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1025 }
1026
1027 #[test]
1028 fn h2c_response_after_unrelated_settings_frame_decodes_healthy() {
1029 let mut buf = frame_with_header(0x04, 0, 0, &[]); buf.extend_from_slice(&frame_with_header(0x04, 0x01, 0, &[])); let block = encode_response_headers(&[(b":status", b"200")]);
1035 buf.extend_from_slice(&frame_with_header(0x01, FLAG_END_HEADERS, 1, &block));
1036
1037 let cfg = h2c_config(0);
1038 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(true));
1039 }
1040
1041 #[test]
1042 fn h2c_goaway_returns_unhealthy() {
1043 let buf = frame_with_header(0x07, 0, 0, &[0u8; 8]);
1045 let cfg = h2c_config(0);
1046 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
1047 }
1048
1049 #[test]
1050 fn h2c_truncated_frame_returns_none() {
1051 let mut buf: Vec<u8> = vec![
1053 0, 0, 10, 0x01, FLAG_END_HEADERS, ];
1059 buf.extend_from_slice(&1u32.to_be_bytes()); buf.extend_from_slice(&[0u8; 5]); let cfg = h2c_config(0);
1062 assert_eq!(try_parse_h2c_status(&buf, &cfg), None);
1063 }
1064
1065 #[test]
1066 fn h2c_partial_frame_header_returns_none() {
1067 let cfg = h2c_config(0);
1071 for partial_len in 0usize..FRAME_HEADER_SIZE {
1072 let buf = vec![0u8; partial_len];
1073 assert_eq!(
1074 try_parse_h2c_status(&buf, &cfg),
1075 None,
1076 "partial buffer of {partial_len} byte(s) should be 'keep reading'"
1077 );
1078 }
1079 }
1080
1081 #[test]
1082 fn h2c_continuation_without_preceding_headers_returns_unhealthy() {
1083 let block = encode_response_headers(&[(b":status", b"200")]);
1086 let buf = frame_with_header(0x09, FLAG_END_HEADERS, 1, &block);
1087 let cfg = h2c_config(0);
1088 assert_eq!(try_parse_h2c_status(&buf, &cfg), Some(false));
1089 }
1090}