1pub mod h2;
82pub mod http;
83pub mod sni;
84pub mod spawn;
85
86use std::net::SocketAddr;
87use std::sync::atomic::{AtomicBool, Ordering};
88use std::sync::Arc;
89use std::time::Duration;
90
91use cellos_core::{CdnProvider, CloudEventV1, ExecutionCellSpec};
92use serde_json::{json, Map, Value};
93use tokio::io::AsyncWriteExt;
94use tokio::net::{TcpListener, TcpStream};
95
96pub const PEEK_BUF_LEN: usize = 16 * 1024;
109
110#[derive(Debug, Clone)]
115pub struct SniProxyConfig {
116 pub bind_addr: SocketAddr,
119 pub upstream_addr: SocketAddr,
124 pub hostname_allowlist: Vec<String>,
128 pub cdn_providers: Vec<CdnProvider>,
132 pub cell_id: String,
134 pub run_id: String,
136 pub policy_digest: Option<String>,
138 pub keyset_id: Option<String>,
140 pub issuer_kid: Option<String>,
142 pub correlation_id: Option<String>,
144 pub upstream_resolver_id: String,
146 pub peek_timeout: Duration,
149}
150
151pub trait L7DecisionEmitter: Send + Sync + 'static {
156 fn emit(&self, event: CloudEventV1);
158}
159
160#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
164pub struct ProxyStats {
165 pub connections_total: u64,
166 pub connections_allowed: u64,
167 pub connections_denied: u64,
168 pub peek_timeouts: u64,
169 pub upstream_failures: u64,
170}
171
172mod reason_code {
176 pub const SNI_ALLOWLIST_MATCH: &str = "l7_sni_allowlist_match";
177 pub const SNI_ALLOWLIST_MISS: &str = "l7_sni_allowlist_miss";
178 pub const SNI_MISSING: &str = "l7_sni_missing";
179 pub const HTTP_HOST_ALLOWLIST_MATCH: &str = "l7_http_host_allowlist_match";
180 pub const HTTP_HOST_ALLOWLIST_MISS: &str = "l7_http_host_allowlist_miss";
181 pub const HTTP_HOST_MISSING: &str = "l7_http_host_missing";
182 pub const UNKNOWN_PROTOCOL: &str = "l7_unknown_protocol";
183 pub const PEEK_TIMEOUT: &str = "l7_peek_timeout";
184 pub const H2_AUTHORITY_ALLOWLIST_MATCH: &str = "l7_h2_authority_allowlist_match";
186 pub const H2_AUTHORITY_ALLOWLIST_MISS: &str = "l7_h2_authority_allowlist_miss";
187 pub const H2_AUTHORITY_MISSING: &str = "l7_h2_authority_missing";
188 pub const H2_UNPARSEABLE_HEADERS: &str = "l7_h2_unparseable_headers";
189 pub const H2_AUTHORITY_ALLOWLIST_MATCH_HUFFMAN: &str =
194 "l7_h2_authority_allowlist_match_huffman";
195 pub const H2_AUTHORITY_ALLOWLIST_MISS_HUFFMAN: &str = "l7_h2_authority_allowlist_miss_huffman";
196 pub const H2_AUTHORITY_ALLOWLIST_MATCH_DYNAMIC_INDEXED: &str =
197 "l7_h2_authority_allowlist_match_dynamic_indexed";
198 pub const H2_AUTHORITY_ALLOWLIST_MISS_DYNAMIC_INDEXED: &str =
199 "l7_h2_authority_allowlist_miss_dynamic_indexed";
200}
201
202fn h2_reason_codes_for(provenance: h2::AuthorityProvenance) -> (&'static str, &'static str) {
208 match provenance {
209 h2::AuthorityProvenance::StaticIndexed | h2::AuthorityProvenance::StaticLiteral => (
210 reason_code::H2_AUTHORITY_ALLOWLIST_MATCH,
211 reason_code::H2_AUTHORITY_ALLOWLIST_MISS,
212 ),
213 h2::AuthorityProvenance::DynamicIndexed => (
214 reason_code::H2_AUTHORITY_ALLOWLIST_MATCH_DYNAMIC_INDEXED,
215 reason_code::H2_AUTHORITY_ALLOWLIST_MISS_DYNAMIC_INDEXED,
216 ),
217 h2::AuthorityProvenance::Huffman => (
218 reason_code::H2_AUTHORITY_ALLOWLIST_MATCH_HUFFMAN,
219 reason_code::H2_AUTHORITY_ALLOWLIST_MISS_HUFFMAN,
220 ),
221 }
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226enum ProtocolGuess {
227 Tls,
228 H2c,
229 Http1,
230 Unknown,
231}
232
233fn guess_protocol(buf: &[u8]) -> ProtocolGuess {
234 if buf.is_empty() {
235 return ProtocolGuess::Unknown;
236 }
237 if buf[0] == 22 {
238 return ProtocolGuess::Tls;
239 }
240 if h2::is_h2c_preface(buf) {
245 return ProtocolGuess::H2c;
246 }
247 const METHODS: &[&[u8]] = &[
250 b"GET ",
251 b"POST ",
252 b"HEAD ",
253 b"PUT ",
254 b"DELETE ",
255 b"OPTIONS ",
256 b"PATCH ",
257 b"CONNECT ",
258 ];
259 for m in METHODS {
260 if buf.len() >= m.len() && &buf[..m.len()] == *m {
261 return ProtocolGuess::Http1;
262 }
263 }
264 ProtocolGuess::Unknown
265}
266
267pub async fn run_one_shot(
293 cfg: &SniProxyConfig,
294 listener: TcpListener,
295 emitter: Arc<dyn L7DecisionEmitter>,
296 shutdown: Arc<AtomicBool>,
297) -> std::io::Result<ProxyStats> {
298 let total = Arc::new(std::sync::atomic::AtomicU64::new(0));
299 let allowed = Arc::new(std::sync::atomic::AtomicU64::new(0));
300 let denied = Arc::new(std::sync::atomic::AtomicU64::new(0));
301 let timeouts = Arc::new(std::sync::atomic::AtomicU64::new(0));
302 let upstream_failures = Arc::new(std::sync::atomic::AtomicU64::new(0));
303
304 let event_spec = build_event_spec(cfg);
309
310 while !shutdown.load(Ordering::SeqCst) {
311 let (stream, peer) = match listener.accept().await {
312 Ok(t) => t,
313 Err(e) => {
314 if shutdown.load(Ordering::SeqCst) {
315 break;
316 }
317 tracing::warn!(
318 target: "cellos.supervisor.sni_proxy",
319 error = %e,
320 "accept() failed"
321 );
322 continue;
323 }
324 };
325 if shutdown.load(Ordering::SeqCst) {
326 drop(stream);
328 break;
329 }
330 total.fetch_add(1, Ordering::SeqCst);
331
332 let cfg = cfg.clone();
333 let emitter = emitter.clone();
334 let event_spec = event_spec.clone();
335 let allowed = allowed.clone();
336 let denied = denied.clone();
337 let timeouts = timeouts.clone();
338 let upstream_failures = upstream_failures.clone();
339
340 tokio::spawn(async move {
341 handle_connection(
342 stream,
343 peer,
344 cfg,
345 emitter,
346 event_spec,
347 allowed,
348 denied,
349 timeouts,
350 upstream_failures,
351 )
352 .await;
353 });
354 }
355
356 Ok(ProxyStats {
357 connections_total: total.load(Ordering::SeqCst),
358 connections_allowed: allowed.load(Ordering::SeqCst),
359 connections_denied: denied.load(Ordering::SeqCst),
360 peek_timeouts: timeouts.load(Ordering::SeqCst),
361 upstream_failures: upstream_failures.load(Ordering::SeqCst),
362 })
363}
364
365#[allow(clippy::too_many_arguments)]
369async fn handle_connection(
370 mut stream: TcpStream,
371 _peer: SocketAddr,
372 cfg: SniProxyConfig,
373 emitter: Arc<dyn L7DecisionEmitter>,
374 event_spec: ExecutionCellSpec,
375 allowed: Arc<std::sync::atomic::AtomicU64>,
376 denied: Arc<std::sync::atomic::AtomicU64>,
377 timeouts: Arc<std::sync::atomic::AtomicU64>,
378 upstream_failures: Arc<std::sync::atomic::AtomicU64>,
379) {
380 let mut buf = vec![0u8; PEEK_BUF_LEN];
382 let peek_result = tokio::time::timeout(cfg.peek_timeout, stream.peek(&mut buf)).await;
383
384 let n = match peek_result {
385 Err(_elapsed) => {
386 timeouts.fetch_add(1, Ordering::SeqCst);
390 denied.fetch_add(1, Ordering::SeqCst);
391 emit_decision(
392 &emitter,
393 &cfg,
394 &event_spec,
395 "deny",
396 "",
397 reason_code::PEEK_TIMEOUT,
398 None,
399 None,
400 );
401 return;
402 }
403 Ok(Err(e)) => {
404 tracing::debug!(
405 target: "cellos.supervisor.sni_proxy",
406 error = %e,
407 "peek() error before any bytes"
408 );
409 denied.fetch_add(1, Ordering::SeqCst);
410 emit_decision(
411 &emitter,
412 &cfg,
413 &event_spec,
414 "deny",
415 "",
416 reason_code::UNKNOWN_PROTOCOL,
417 None,
418 None,
419 );
420 return;
421 }
422 Ok(Ok(0)) => {
423 denied.fetch_add(1, Ordering::SeqCst);
425 emit_decision(
426 &emitter,
427 &cfg,
428 &event_spec,
429 "deny",
430 "",
431 reason_code::UNKNOWN_PROTOCOL,
432 None,
433 None,
434 );
435 return;
436 }
437 Ok(Ok(n)) => n,
438 };
439 let preamble = &buf[..n];
440
441 let guess = guess_protocol(preamble);
443 let (host_opt, allow_reason, miss_reason, missing_reason, deny_response): (
444 Option<String>,
445 &'static str,
446 &'static str,
447 &'static str,
448 DenyResponse,
449 ) = match guess {
450 ProtocolGuess::Tls => match sni::extract_sni(preamble) {
451 Ok(opt) => (
452 opt,
453 reason_code::SNI_ALLOWLIST_MATCH,
454 reason_code::SNI_ALLOWLIST_MISS,
455 reason_code::SNI_MISSING,
456 DenyResponse::Drop,
457 ),
458 Err(_e) => {
459 denied.fetch_add(1, Ordering::SeqCst);
460 emit_decision(
461 &emitter,
462 &cfg,
463 &event_spec,
464 "deny",
465 "",
466 reason_code::UNKNOWN_PROTOCOL,
467 None,
468 None,
469 );
470 return;
471 }
472 },
473 ProtocolGuess::H2c => {
474 handle_h2c_connection(
491 stream,
492 preamble.to_vec(),
493 cfg.clone(),
494 emitter.clone(),
495 event_spec.clone(),
496 allowed.clone(),
497 denied.clone(),
498 upstream_failures.clone(),
499 )
500 .await;
501 return;
502 }
503 ProtocolGuess::Http1 => match http::extract_http_host(preamble) {
504 Ok(opt) => (
505 opt,
506 reason_code::HTTP_HOST_ALLOWLIST_MATCH,
507 reason_code::HTTP_HOST_ALLOWLIST_MISS,
508 reason_code::HTTP_HOST_MISSING,
509 DenyResponse::Http403,
510 ),
511 Err(_e) => {
512 denied.fetch_add(1, Ordering::SeqCst);
513 emit_decision(
514 &emitter,
515 &cfg,
516 &event_spec,
517 "deny",
518 "",
519 reason_code::UNKNOWN_PROTOCOL,
520 None,
521 None,
522 );
523 return;
524 }
525 },
526 ProtocolGuess::Unknown => {
527 denied.fetch_add(1, Ordering::SeqCst);
528 emit_decision(
529 &emitter,
530 &cfg,
531 &event_spec,
532 "deny",
533 "",
534 reason_code::UNKNOWN_PROTOCOL,
535 None,
536 None,
537 );
538 return;
539 }
540 };
541
542 let host = match host_opt {
543 Some(h) if !h.is_empty() => h,
544 _ => {
545 denied.fetch_add(1, Ordering::SeqCst);
547 send_deny_response(&mut stream, deny_response).await;
548 emit_decision(
549 &emitter,
550 &cfg,
551 &event_spec,
552 "deny",
553 "",
554 missing_reason,
555 None,
556 None,
557 );
558 return;
559 }
560 };
561
562 if !cellos_core::hostname_allowlist::matches_allowlist(&host, &cfg.hostname_allowlist) {
564 denied.fetch_add(1, Ordering::SeqCst);
565 send_deny_response(&mut stream, deny_response).await;
566 emit_decision(
567 &emitter,
568 &cfg,
569 &event_spec,
570 "deny",
571 host.as_str(),
572 miss_reason,
573 None,
574 None,
575 );
576 return;
577 }
578
579 let upstream = match TcpStream::connect(cfg.upstream_addr).await {
581 Ok(s) => s,
582 Err(e) => {
583 tracing::warn!(
584 target: "cellos.supervisor.sni_proxy",
585 error = %e,
586 upstream = %cfg.upstream_addr,
587 "upstream connect failed"
588 );
589 upstream_failures.fetch_add(1, Ordering::SeqCst);
590 denied.fetch_add(1, Ordering::SeqCst);
591 send_deny_response(&mut stream, deny_response).await;
594 emit_decision(
595 &emitter,
596 &cfg,
597 &event_spec,
598 "deny",
599 host.as_str(),
600 reason_code::UNKNOWN_PROTOCOL,
601 None,
602 None,
603 );
604 return;
605 }
606 };
607 allowed.fetch_add(1, Ordering::SeqCst);
608 emit_decision(
609 &emitter,
610 &cfg,
611 &event_spec,
612 "allow",
613 host.as_str(),
614 allow_reason,
615 None,
616 None,
617 );
618
619 let mut client = stream;
624 let mut up = upstream;
625 if let Err(e) = tokio::io::copy_bidirectional(&mut client, &mut up).await {
626 tracing::debug!(
627 target: "cellos.supervisor.sni_proxy",
628 error = %e,
629 host = %host,
630 "copy_bidirectional ended with error"
631 );
632 }
633}
634
635#[allow(clippy::too_many_arguments)]
669enum UpstreamSink {
682 Pending { addr: SocketAddr, buffer: Vec<u8> },
686 Open {
689 rd: tokio::net::tcp::OwnedReadHalf,
690 wr: tokio::net::tcp::OwnedWriteHalf,
691 },
692}
693
694impl UpstreamSink {
695 fn pending(addr: SocketAddr) -> Self {
696 Self::Pending {
697 addr,
698 buffer: Vec::new(),
699 }
700 }
701
702 async fn write(&mut self, bytes: &[u8]) -> std::io::Result<()> {
704 match self {
705 Self::Pending { buffer, .. } => {
706 buffer.extend_from_slice(bytes);
707 Ok(())
708 }
709 Self::Open { wr, .. } => wr.write_all(bytes).await,
710 }
711 }
712
713 async fn commit(&mut self) -> std::io::Result<()> {
716 if let Self::Pending { addr, buffer } = self {
717 let stream = TcpStream::connect(*addr).await?;
718 let (rd, mut wr) = stream.into_split();
719 if !buffer.is_empty() {
720 wr.write_all(buffer).await?;
721 }
722 *self = Self::Open { rd, wr };
723 }
724 Ok(())
725 }
726
727 async fn shutdown(&mut self) {
728 if let Self::Open { wr, .. } = self {
729 let _ = wr.shutdown().await;
730 }
731 }
732}
733
734async fn read_upstream(sink: &mut UpstreamSink, buf: &mut [u8]) -> std::io::Result<usize> {
738 use tokio::io::AsyncReadExt;
739 match sink {
740 UpstreamSink::Open { rd, .. } => rd.read(buf).await,
741 UpstreamSink::Pending { .. } => std::future::pending().await,
742 }
743}
744
745#[allow(clippy::too_many_arguments)]
746async fn handle_h2c_connection(
747 stream: TcpStream,
748 preamble: Vec<u8>,
749 cfg: SniProxyConfig,
750 emitter: Arc<dyn L7DecisionEmitter>,
751 event_spec: ExecutionCellSpec,
752 allowed: Arc<std::sync::atomic::AtomicU64>,
753 denied: Arc<std::sync::atomic::AtomicU64>,
754 upstream_failures: Arc<std::sync::atomic::AtomicU64>,
755) {
756 use tokio::io::AsyncReadExt;
757
758 let mut upstream = UpstreamSink::pending(cfg.upstream_addr);
765
766 let mut client = stream;
770 let mut consumed = vec![0u8; preamble.len()];
771 if let Err(e) = client.read_exact(&mut consumed).await {
772 tracing::debug!(
773 target: "cellos.supervisor.sni_proxy",
774 error = %e,
775 "h2c read_exact(preamble) failed"
776 );
777 return;
778 }
779 debug_assert_eq!(
780 consumed, preamble,
781 "kernel returned different bytes than peek"
782 );
783
784 let (mut client_rd, mut client_wr) = client.into_split();
789
790 if let Err(e) = upstream.write(&preamble[..h2::HTTP2_PREFACE.len()]).await {
791 tracing::debug!(
792 target: "cellos.supervisor.sni_proxy",
793 error = %e,
794 "h2c buffer(preface) failed"
795 );
796 return;
797 }
798
799 let mut decoder = h2::H2ConnectionDecoder::new();
805 let mut denied_streams: std::collections::HashSet<u32> = std::collections::HashSet::new();
806 let mut pending_headers: std::collections::HashMap<u32, Vec<u8>> =
813 std::collections::HashMap::new();
814 let mut frame_buf: Vec<u8> = preamble[h2::HTTP2_PREFACE.len()..].to_vec();
815 let mut read_chunk = vec![0u8; 16 * 1024];
816 let mut up_read_chunk = vec![0u8; 16 * 1024];
817
818 'outer: loop {
819 loop {
821 match h2::frame::parse_one_frame(&frame_buf) {
822 Ok(Some((header, payload, _rest))) => {
823 let frame_total = 9 + header.length as usize;
824 let frame_bytes = frame_buf[..frame_total].to_vec();
825 let payload_owned = payload.to_vec();
826 let frame_type = header.frame_type;
827 let stream_id = header.stream_id;
828 let is_stream_bound = stream_id != 0;
829 let is_headers_or_continuation = frame_type == h2::frame::FRAME_TYPE_HEADERS
830 || frame_type == h2::frame::FRAME_TYPE_CONTINUATION;
831
832 if is_headers_or_continuation {
833 pending_headers
836 .entry(stream_id)
837 .or_default()
838 .extend_from_slice(&frame_bytes);
839
840 match decoder.feed_frame(&header, &payload_owned) {
841 Ok(Some(decoded)) => {
842 let sid = decoded.stream_id;
843 let host_norm = decoded.authority.clone();
844 let provenance = if decoded.via_huffman {
845 h2::AuthorityProvenance::Huffman
846 } else if decoded.via_dynamic_table {
847 h2::AuthorityProvenance::DynamicIndexed
848 } else {
849 h2::AuthorityProvenance::StaticLiteral
850 };
851 let (allow_r, miss_r) = h2_reason_codes_for(provenance);
852 let allow = match host_norm.as_deref() {
853 Some(h) if !h.is_empty() => {
854 cellos_core::hostname_allowlist::matches_allowlist(
855 h,
856 &cfg.hostname_allowlist,
857 )
858 }
859 _ => false,
860 };
861 let pending = pending_headers.remove(&sid).unwrap_or_default();
862 if host_norm.is_none() {
863 denied.fetch_add(1, Ordering::SeqCst);
864 denied_streams.insert(sid);
865 let rst = build_rst_stream_refused(sid);
866 let _ = client_wr.write_all(&rst).await;
867 emit_decision(
868 &emitter,
869 &cfg,
870 &event_spec,
871 "deny",
872 "",
873 reason_code::H2_AUTHORITY_MISSING,
874 None,
875 Some(sid),
876 );
877 } else if allow {
878 allowed.fetch_add(1, Ordering::SeqCst);
879 if let Err(e) = upstream.commit().await {
886 tracing::warn!(
887 target: "cellos.supervisor.sni_proxy",
888 error = %e,
889 upstream = %cfg.upstream_addr,
890 "h2c upstream connect failed (deferred)"
891 );
892 upstream_failures.fetch_add(1, Ordering::SeqCst);
893 denied.fetch_add(1, Ordering::SeqCst);
894 let _ = client_wr.write_all(H2_GOAWAY_PROTOCOL_ERROR).await;
895 let _ = client_wr.shutdown().await;
896 emit_decision(
897 &emitter,
898 &cfg,
899 &event_spec,
900 "deny",
901 "",
902 reason_code::UNKNOWN_PROTOCOL,
903 None,
904 None,
905 );
906 break 'outer;
907 }
908 if let Err(e) = upstream.write(&pending).await {
909 tracing::debug!(
910 target: "cellos.supervisor.sni_proxy",
911 error = %e,
912 "h2c forward(allowed HEADERS block) failed"
913 );
914 break 'outer;
915 }
916 emit_decision(
917 &emitter,
918 &cfg,
919 &event_spec,
920 "allow",
921 host_norm.as_deref().unwrap_or(""),
922 allow_r,
923 None,
924 Some(sid),
925 );
926 } else {
927 denied.fetch_add(1, Ordering::SeqCst);
928 denied_streams.insert(sid);
929 let rst = build_rst_stream_refused(sid);
930 let _ = client_wr.write_all(&rst).await;
931 emit_decision(
932 &emitter,
933 &cfg,
934 &event_spec,
935 "deny",
936 host_norm.as_deref().unwrap_or(""),
937 miss_r,
938 None,
939 Some(sid),
940 );
941 }
942 }
943 Ok(None) => {
944 }
946 Err(_e) => {
947 denied.fetch_add(1, Ordering::SeqCst);
948 denied_streams.insert(stream_id);
949 pending_headers.remove(&stream_id);
950 let rst = build_rst_stream_refused(stream_id);
951 let _ = client_wr.write_all(&rst).await;
952 emit_decision(
953 &emitter,
954 &cfg,
955 &event_spec,
956 "deny",
957 "",
958 reason_code::H2_UNPARSEABLE_HEADERS,
959 None,
960 Some(stream_id),
961 );
962 }
963 }
964 } else if is_stream_bound && denied_streams.contains(&stream_id) {
965 } else {
967 if let Err(e) = upstream.write(&frame_bytes).await {
974 tracing::debug!(
975 target: "cellos.supervisor.sni_proxy",
976 error = %e,
977 "h2c forward(frame) failed"
978 );
979 break 'outer;
980 }
981 }
982
983 frame_buf.drain(..frame_total);
984 continue;
985 }
986 Ok(None) => break, Err(_e) => {
988 let _ = client_wr.write_all(H2_GOAWAY_PROTOCOL_ERROR).await;
991 upstream.shutdown().await;
992 break 'outer;
993 }
994 }
995 }
996
997 tokio::select! {
1001 biased;
1002 r = client_rd.read(&mut read_chunk) => match r {
1003 Ok(0) => break 'outer,
1004 Ok(n) => frame_buf.extend_from_slice(&read_chunk[..n]),
1005 Err(e) => {
1006 tracing::debug!(
1007 target: "cellos.supervisor.sni_proxy",
1008 error = %e,
1009 "h2c client read error"
1010 );
1011 break 'outer;
1012 }
1013 },
1014 r = read_upstream(&mut upstream, &mut up_read_chunk) => match r {
1015 Ok(0) => break 'outer,
1016 Ok(n) => {
1017 if let Err(e) = client_wr.write_all(&up_read_chunk[..n]).await {
1018 tracing::debug!(
1019 target: "cellos.supervisor.sni_proxy",
1020 error = %e,
1021 "h2c upstream→client write failed"
1022 );
1023 break 'outer;
1024 }
1025 }
1026 Err(e) => {
1027 tracing::debug!(
1028 target: "cellos.supervisor.sni_proxy",
1029 error = %e,
1030 "h2c upstream read error"
1031 );
1032 break 'outer;
1033 }
1034 },
1035 }
1036 }
1037
1038 upstream.shutdown().await;
1039 let _ = client_wr.shutdown().await;
1040}
1041
1042fn build_rst_stream_refused(stream_id: u32) -> [u8; 13] {
1053 let mut out = [0u8; 13];
1054 out[0] = 0x00;
1055 out[1] = 0x00;
1056 out[2] = 0x04; out[3] = 0x03; out[4] = 0x00; let sid = stream_id & 0x7FFF_FFFF;
1060 out[5..9].copy_from_slice(&sid.to_be_bytes());
1061 out[9..13].copy_from_slice(&7u32.to_be_bytes()); out
1063}
1064
1065#[derive(Debug, Clone, Copy)]
1066enum DenyResponse {
1067 Drop,
1068 Http403,
1069 #[allow(dead_code)]
1076 H2Goaway,
1077}
1078
1079const H2_GOAWAY_PROTOCOL_ERROR: &[u8; 17] = &[
1091 0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, ];
1098
1099async fn send_deny_response(stream: &mut TcpStream, mode: DenyResponse) {
1100 match mode {
1101 DenyResponse::Http403 => {
1102 const RESP: &[u8] =
1103 b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
1104 if let Err(e) = stream.write_all(RESP).await {
1105 tracing::debug!(
1106 target: "cellos.supervisor.sni_proxy",
1107 error = %e,
1108 "writing 403 response failed"
1109 );
1110 }
1111 let _ = stream.shutdown().await;
1112 }
1113 DenyResponse::H2Goaway => {
1114 if let Err(e) = stream.write_all(H2_GOAWAY_PROTOCOL_ERROR).await {
1115 tracing::debug!(
1116 target: "cellos.supervisor.sni_proxy",
1117 error = %e,
1118 "writing h2 GOAWAY failed"
1119 );
1120 }
1121 let _ = stream.shutdown().await;
1122 }
1123 DenyResponse::Drop => {
1124 }
1126 }
1127}
1128
1129fn build_event_spec(cfg: &SniProxyConfig) -> ExecutionCellSpec {
1134 use cellos_core::{AuthorityBundle, Correlation, Lifetime};
1135 let correlation = cfg.correlation_id.as_ref().map(|c| Correlation {
1136 correlation_id: Some(c.clone()),
1137 ..Default::default()
1138 });
1139 ExecutionCellSpec {
1140 id: format!("sni-proxy/{}/{}", cfg.cell_id, cfg.run_id),
1141 correlation,
1142 ingress: None,
1143 environment: None,
1144 placement: None,
1145 policy: None,
1146 identity: None,
1147 run: None,
1148 authority: AuthorityBundle::default(),
1149 lifetime: Lifetime { ttl_seconds: 0 },
1150 export: None,
1151 telemetry: None,
1152 }
1153}
1154
1155#[allow(clippy::too_many_arguments)]
1157fn emit_decision(
1158 emitter: &Arc<dyn L7DecisionEmitter>,
1159 cfg: &SniProxyConfig,
1160 spec: &ExecutionCellSpec,
1161 action: &str,
1162 sni_host: &str,
1163 reason_code_str: &str,
1164 rule_ref: Option<&str>,
1165 stream_id: Option<u32>,
1168) {
1169 let decision_id = uuid::Uuid::new_v4().to_string();
1170 let policy_digest = cfg.policy_digest.clone().unwrap_or_default();
1171 let keyset_id = cfg.keyset_id.clone().unwrap_or_default();
1172 let issuer_kid = cfg.issuer_kid.clone().unwrap_or_default();
1173 let data = match cellos_core::observability_l7_egress_decision_data_v1(
1174 spec,
1175 cfg.cell_id.as_str(),
1176 Some(cfg.run_id.as_str()),
1177 decision_id.as_str(),
1178 action,
1179 if sni_host.is_empty() {
1184 "(unknown)"
1185 } else {
1186 sni_host
1187 },
1188 policy_digest.as_str(),
1189 keyset_id.as_str(),
1190 issuer_kid.as_str(),
1191 reason_code_str,
1192 rule_ref,
1193 stream_id,
1194 ) {
1195 Ok(v) => v,
1196 Err(e) => {
1197 tracing::warn!(
1198 target: "cellos.supervisor.sni_proxy",
1199 error = %e,
1200 "build l7_egress_decision data failed"
1201 );
1202 return;
1203 }
1204 };
1205 let observed_at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1206 let event = CloudEventV1 {
1207 specversion: "1.0".into(),
1208 id: uuid::Uuid::new_v4().to_string(),
1209 source: "cellos-sni-proxy".into(),
1210 ty: "dev.cellos.events.cell.observability.v1.l7_egress_decision".into(),
1211 datacontenttype: Some("application/json".into()),
1212 data: Some(data),
1213 time: Some(observed_at),
1214 traceparent: None,
1215 };
1216 emitter.emit(event);
1217}
1218
1219#[allow(dead_code)]
1222fn event_data_get<'a>(event: &'a CloudEventV1, key: &str) -> Option<&'a Value> {
1223 event.data.as_ref()?.as_object()?.get(key)
1224}
1225
1226#[allow(dead_code)]
1230fn empty_data_map() -> Map<String, Value> {
1231 let mut m = Map::new();
1232 m.insert("decisionId".into(), json!(uuid::Uuid::new_v4().to_string()));
1233 m
1234}
1235
1236#[cfg(test)]
1237pub(crate) mod test_helpers {
1238 pub fn build_client_hello(snis: &[&str]) -> Vec<u8> {
1242 let mut body = Vec::new();
1243 body.extend_from_slice(&[0x03, 0x03]); body.extend_from_slice(&[0u8; 32]); body.push(0); body.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]); body.extend_from_slice(&[0x01, 0x00]); let mut ext_section = Vec::new();
1250 if !snis.is_empty() {
1251 let mut sn_body = Vec::new();
1252 let mut inner = Vec::new();
1253 for s in snis {
1254 inner.push(0u8); inner.extend_from_slice(&(s.len() as u16).to_be_bytes());
1256 inner.extend_from_slice(s.as_bytes());
1257 }
1258 sn_body.extend_from_slice(&(inner.len() as u16).to_be_bytes());
1259 sn_body.extend_from_slice(&inner);
1260 ext_section.extend_from_slice(&[0x00, 0x00]);
1261 ext_section.extend_from_slice(&(sn_body.len() as u16).to_be_bytes());
1262 ext_section.extend_from_slice(&sn_body);
1263 }
1264 body.extend_from_slice(&(ext_section.len() as u16).to_be_bytes());
1265 body.extend_from_slice(&ext_section);
1266
1267 let mut hs = Vec::new();
1268 hs.push(1);
1269 let body_len_bytes = (body.len() as u32).to_be_bytes();
1270 hs.extend_from_slice(&body_len_bytes[1..]);
1271 hs.extend_from_slice(&body);
1272
1273 let mut rec = Vec::new();
1274 rec.push(22);
1275 rec.extend_from_slice(&[0x03, 0x01]);
1276 rec.extend_from_slice(&(hs.len() as u16).to_be_bytes());
1277 rec.extend_from_slice(&hs);
1278 rec
1279 }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::test_helpers::build_client_hello;
1285 use super::*;
1286 use std::sync::Mutex;
1287 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1288 use tokio::net::{TcpListener, TcpStream};
1289
1290 #[derive(Default)]
1292 struct MemEmitter {
1293 events: Mutex<Vec<CloudEventV1>>,
1294 }
1295 impl L7DecisionEmitter for MemEmitter {
1296 fn emit(&self, event: CloudEventV1) {
1297 self.events.lock().unwrap().push(event);
1298 }
1299 }
1300
1301 fn cfg_with(allowlist: &[&str], upstream: SocketAddr, peek_ms: u64) -> SniProxyConfig {
1302 SniProxyConfig {
1303 bind_addr: "127.0.0.1:0".parse().unwrap(),
1304 upstream_addr: upstream,
1305 hostname_allowlist: allowlist.iter().map(|s| s.to_string()).collect(),
1306 cdn_providers: vec![],
1307 cell_id: "test-cell".into(),
1308 run_id: "test-run".into(),
1309 policy_digest: Some("digest-test".into()),
1310 keyset_id: Some("keyset-test".into()),
1311 issuer_kid: Some("kid-test".into()),
1312 correlation_id: None,
1313 upstream_resolver_id: "sni-proxy-test".into(),
1314 peek_timeout: Duration::from_millis(peek_ms),
1315 }
1316 }
1317
1318 async fn spawn_echo_upstream() -> (SocketAddr, tokio::task::JoinHandle<Vec<u8>>) {
1322 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1323 let addr = listener.local_addr().unwrap();
1324 let h = tokio::spawn(async move {
1325 let (mut s, _) = listener.accept().await.unwrap();
1326 let mut buf = Vec::new();
1327 let mut tmp = [0u8; 4096];
1331 for _ in 0..32 {
1332 match s.read(&mut tmp).await {
1333 Ok(0) => break,
1334 Ok(n) => buf.extend_from_slice(&tmp[..n]),
1335 Err(_) => break,
1336 }
1337 }
1338 buf
1339 });
1340 (addr, h)
1341 }
1342
1343 async fn spawn_proxy(
1346 cfg: SniProxyConfig,
1347 emitter: Arc<MemEmitter>,
1348 ) -> (
1349 SocketAddr,
1350 Arc<AtomicBool>,
1351 tokio::task::JoinHandle<std::io::Result<ProxyStats>>,
1352 ) {
1353 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1354 let addr = listener.local_addr().unwrap();
1355 let shutdown = Arc::new(AtomicBool::new(false));
1356 let shutdown2 = shutdown.clone();
1357 let h = tokio::spawn(async move {
1358 run_one_shot(
1359 &cfg,
1360 listener,
1361 emitter as Arc<dyn L7DecisionEmitter>,
1362 shutdown2,
1363 )
1364 .await
1365 });
1366 (addr, shutdown, h)
1367 }
1368
1369 fn poke_shutdown(addr: SocketAddr) {
1370 let _ = std::net::TcpStream::connect_timeout(&addr, Duration::from_millis(200));
1372 }
1373
1374 #[tokio::test]
1375 async fn proxy_allows_tls_with_matching_sni() {
1376 let (upstream, upstream_h) = spawn_echo_upstream().await;
1377 let emitter = Arc::new(MemEmitter::default());
1378 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1379 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1380
1381 let ch = build_client_hello(&["api.example.com"]);
1382 let mut s = TcpStream::connect(listen).await.unwrap();
1383 s.write_all(&ch).await.unwrap();
1384 s.shutdown().await.ok();
1385 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1387 .await
1388 .expect("upstream task")
1389 .expect("upstream join");
1390 assert_eq!(
1393 upstream_bytes, ch,
1394 "upstream did not receive forwarded ClientHello bytes verbatim"
1395 );
1396
1397 shutdown.store(true, Ordering::SeqCst);
1398 poke_shutdown(listen);
1399 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1400
1401 let evs = emitter.events.lock().unwrap();
1402 assert_eq!(evs.len(), 1, "expected exactly one event");
1403 let data = evs[0].data.as_ref().unwrap();
1404 assert_eq!(data["action"], "allow");
1405 assert_eq!(data["reasonCode"], "l7_sni_allowlist_match");
1406 assert_eq!(data["sniHost"], "api.example.com");
1407 }
1408
1409 #[tokio::test]
1410 async fn proxy_denies_tls_with_unmatched_sni() {
1411 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1412 let emitter = Arc::new(MemEmitter::default());
1413 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1414 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1415
1416 let ch = build_client_hello(&["evil.example.com"]);
1417 let mut s = TcpStream::connect(listen).await.unwrap();
1418 s.write_all(&ch).await.unwrap();
1419 let mut sink = Vec::new();
1421 let read_timeout =
1422 tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1423 assert!(
1424 read_timeout.is_ok(),
1425 "TLS deny should close the stream promptly; got peek timeout"
1426 );
1427
1428 shutdown.store(true, Ordering::SeqCst);
1429 poke_shutdown(listen);
1430 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1431
1432 let evs = emitter.events.lock().unwrap();
1433 assert_eq!(evs.len(), 1);
1434 let data = evs[0].data.as_ref().unwrap();
1435 assert_eq!(data["action"], "deny");
1436 assert_eq!(data["reasonCode"], "l7_sni_allowlist_miss");
1437 assert_eq!(data["sniHost"], "evil.example.com");
1438 }
1439
1440 #[tokio::test]
1441 async fn proxy_denies_tls_without_sni() {
1442 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1443 let emitter = Arc::new(MemEmitter::default());
1444 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1445 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1446
1447 let ch = build_client_hello(&[]); let mut s = TcpStream::connect(listen).await.unwrap();
1449 s.write_all(&ch).await.unwrap();
1450 let mut sink = Vec::new();
1451 let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1452
1453 shutdown.store(true, Ordering::SeqCst);
1454 poke_shutdown(listen);
1455 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1456
1457 let evs = emitter.events.lock().unwrap();
1458 assert_eq!(evs.len(), 1);
1459 let data = evs[0].data.as_ref().unwrap();
1460 assert_eq!(data["action"], "deny");
1461 assert_eq!(data["reasonCode"], "l7_sni_missing");
1462 }
1463
1464 #[tokio::test]
1465 async fn proxy_allows_http_with_matching_host() {
1466 let (upstream, upstream_h) = spawn_echo_upstream().await;
1467 let emitter = Arc::new(MemEmitter::default());
1468 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1469 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1470
1471 let req = b"GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n";
1472 let mut s = TcpStream::connect(listen).await.unwrap();
1473 s.write_all(req).await.unwrap();
1474 s.shutdown().await.ok();
1475 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1476 .await
1477 .expect("upstream task")
1478 .expect("upstream join");
1479 assert_eq!(upstream_bytes, req.to_vec());
1480
1481 shutdown.store(true, Ordering::SeqCst);
1482 poke_shutdown(listen);
1483 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1484
1485 let evs = emitter.events.lock().unwrap();
1486 assert_eq!(evs.len(), 1);
1487 let data = evs[0].data.as_ref().unwrap();
1488 assert_eq!(data["action"], "allow");
1489 assert_eq!(data["reasonCode"], "l7_http_host_allowlist_match");
1490 assert_eq!(data["sniHost"], "api.example.com");
1491 }
1492
1493 #[tokio::test]
1494 async fn proxy_denies_http_with_unmatched_host_and_returns_403() {
1495 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1496 let emitter = Arc::new(MemEmitter::default());
1497 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1498 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1499
1500 let req = b"GET / HTTP/1.1\r\nHost: evil.example.com\r\nConnection: close\r\n\r\n";
1501 let mut s = TcpStream::connect(listen).await.unwrap();
1502 s.write_all(req).await.unwrap();
1503 let mut response = Vec::new();
1509 let read_result =
1510 tokio::time::timeout(Duration::from_secs(2), s.read_to_end(&mut response))
1511 .await
1512 .expect("read deadline");
1513 match read_result {
1518 Ok(_) => {}
1519 Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {}
1520 Err(e) => panic!("unexpected read error: {e}"),
1521 }
1522 let resp_str = String::from_utf8_lossy(&response);
1523 assert!(
1524 resp_str.starts_with("HTTP/1.1 403 Forbidden\r\n"),
1525 "expected 403 response, got: {resp_str:?}"
1526 );
1527
1528 shutdown.store(true, Ordering::SeqCst);
1529 poke_shutdown(listen);
1530 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1531
1532 let evs = emitter.events.lock().unwrap();
1533 assert_eq!(evs.len(), 1);
1534 let data = evs[0].data.as_ref().unwrap();
1535 assert_eq!(data["action"], "deny");
1536 assert_eq!(data["reasonCode"], "l7_http_host_allowlist_miss");
1537 assert_eq!(data["sniHost"], "evil.example.com");
1538 }
1539
1540 #[tokio::test]
1541 async fn proxy_emits_one_event_per_connection() {
1542 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1543 let emitter = Arc::new(MemEmitter::default());
1544 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1545 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1546
1547 for ch in [build_client_hello(&["api.example.com"])] {
1550 let mut s = TcpStream::connect(listen).await.unwrap();
1551 s.write_all(&ch).await.unwrap();
1552 s.shutdown().await.ok();
1553 tokio::time::sleep(Duration::from_millis(80)).await;
1554 }
1555 for ch in [build_client_hello(&["evil.example.com"])] {
1556 let mut s = TcpStream::connect(listen).await.unwrap();
1557 s.write_all(&ch).await.unwrap();
1558 let mut sink = Vec::new();
1560 let _ =
1561 tokio::time::timeout(Duration::from_millis(400), s.read_to_end(&mut sink)).await;
1562 }
1563 let req = b"GET / HTTP/1.1\r\nHost: blocked.example.com\r\n\r\n";
1564 let mut s = TcpStream::connect(listen).await.unwrap();
1565 s.write_all(req).await.unwrap();
1566 let mut sink = Vec::new();
1567 let _ = tokio::time::timeout(Duration::from_millis(400), s.read_to_end(&mut sink)).await;
1568
1569 shutdown.store(true, Ordering::SeqCst);
1570 poke_shutdown(listen);
1571 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1572
1573 let evs = emitter.events.lock().unwrap();
1574 assert_eq!(
1575 evs.len(),
1576 3,
1577 "expected exactly one event per connection, got {}: {:#?}",
1578 evs.len(),
1579 evs.iter().map(|e| &e.data).collect::<Vec<_>>()
1580 );
1581 }
1582
1583 #[tokio::test]
1584 async fn proxy_returns_peek_timeout_when_client_silent() {
1585 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1586 let emitter = Arc::new(MemEmitter::default());
1587 let cfg = cfg_with(&["api.example.com"], upstream, 50); let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1589
1590 let s = TcpStream::connect(listen).await.unwrap();
1592 tokio::time::sleep(Duration::from_millis(250)).await;
1594 drop(s);
1595
1596 shutdown.store(true, Ordering::SeqCst);
1597 poke_shutdown(listen);
1598 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1599
1600 let evs = emitter.events.lock().unwrap();
1601 assert!(!evs.is_empty(), "expected at least one peek_timeout event");
1602 let data = evs[0].data.as_ref().unwrap();
1603 assert_eq!(data["action"], "deny");
1604 assert_eq!(data["reasonCode"], "l7_peek_timeout");
1605 }
1606
1607 #[test]
1608 fn guess_protocol_classifies_correctly() {
1609 assert_eq!(guess_protocol(&[22, 0x03, 0x03]), ProtocolGuess::Tls);
1610 assert_eq!(guess_protocol(b"GET / HTTP/1.1\r\n"), ProtocolGuess::Http1);
1611 assert_eq!(guess_protocol(b"POST / HTTP/1.1"), ProtocolGuess::Http1);
1612 assert_eq!(guess_protocol(b"\x00\x00\x00\x00"), ProtocolGuess::Unknown);
1613 assert_eq!(guess_protocol(b""), ProtocolGuess::Unknown);
1614 assert_eq!(guess_protocol(h2::HTTP2_PREFACE), ProtocolGuess::H2c);
1617 }
1618
1619 fn build_h2c_stream(authority: &str) -> Vec<u8> {
1625 let mut out = h2::HTTP2_PREFACE.to_vec();
1626 let block = h2::test_helpers::hpack_literal_indexed_name(1, authority);
1627 out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1628 out
1629 }
1630
1631 fn build_h2c_stream_no_authority() -> Vec<u8> {
1634 let mut out = h2::HTTP2_PREFACE.to_vec();
1635 let block = h2::test_helpers::hpack_indexed(2);
1638 out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1639 out
1640 }
1641
1642 fn build_h2c_stream_continuation_reassemblable(authority: &str) -> Vec<u8> {
1646 let mut out = h2::HTTP2_PREFACE.to_vec();
1647 let block = h2::test_helpers::hpack_literal_indexed_name(1, authority);
1648 let mid = block.len() / 2;
1649 let (a, b) = block.split_at(mid);
1650 let mut sequence = h2::test_helpers::empty_settings_frame();
1651 sequence.extend_from_slice(&h2::test_helpers::continuation_fragmented_headers(a));
1652 sequence.extend_from_slice(&h2::test_helpers::continuation_frame(b, true));
1653 out.extend_from_slice(&sequence);
1654 out
1655 }
1656
1657 fn build_h2c_stream_unparseable() -> Vec<u8> {
1661 let mut out = h2::HTTP2_PREFACE.to_vec();
1662 let block = h2::test_helpers::hpack_indexed(200);
1663 out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1664 out
1665 }
1666
1667 fn build_h2c_stream_huffman(authority: &str) -> Vec<u8> {
1669 let mut out = h2::HTTP2_PREFACE.to_vec();
1670 let block = h2::test_helpers::hpack_literal_indexed_name_huffman(1, authority);
1671 out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1672 out
1673 }
1674
1675 fn build_h2c_stream_dynamic_indexed_name(authority: &str) -> Vec<u8> {
1691 let mut out = h2::HTTP2_PREFACE.to_vec();
1692 let mut block: Vec<u8> = Vec::new();
1693 block.extend_from_slice(&h2::test_helpers::hpack_literal_indexed_name(1, ""));
1695 block.extend_from_slice(&h2::test_helpers::hpack_literal_indexed_name(6, authority));
1697 block.push(0x40 | 0x3F); block.push(0x00); h2::test_helpers::encode_literal_string(&mut block, authority);
1705 out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1706 out
1707 }
1708
1709 #[tokio::test]
1710 async fn proxy_allows_h2c_with_matching_authority() {
1711 let (upstream, upstream_h) = spawn_echo_upstream().await;
1712 let emitter = Arc::new(MemEmitter::default());
1713 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1714 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1715
1716 let stream_bytes = build_h2c_stream("api.example.com");
1717 let mut s = TcpStream::connect(listen).await.unwrap();
1718 s.write_all(&stream_bytes).await.unwrap();
1719 s.shutdown().await.ok();
1720
1721 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1722 .await
1723 .expect("upstream task")
1724 .expect("upstream join");
1725 assert_eq!(
1729 upstream_bytes, stream_bytes,
1730 "upstream did not receive the forwarded h2c bytes verbatim"
1731 );
1732
1733 shutdown.store(true, Ordering::SeqCst);
1734 poke_shutdown(listen);
1735 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1736
1737 let evs = emitter.events.lock().unwrap();
1738 assert_eq!(evs.len(), 1, "expected exactly one event");
1739 let data = evs[0].data.as_ref().unwrap();
1740 assert_eq!(data["action"], "allow");
1741 assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_match");
1742 assert_eq!(data["sniHost"], "api.example.com");
1743 }
1744
1745 #[tokio::test]
1746 async fn proxy_denies_h2c_with_unmatched_authority() {
1747 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1748 let emitter = Arc::new(MemEmitter::default());
1749 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1750 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1751
1752 let stream_bytes = build_h2c_stream("evil.example.com");
1753 let mut s = TcpStream::connect(listen).await.unwrap();
1754 s.write_all(&stream_bytes).await.unwrap();
1755 let mut sink = Vec::new();
1756 let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1757
1758 shutdown.store(true, Ordering::SeqCst);
1759 poke_shutdown(listen);
1760 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1761
1762 let evs = emitter.events.lock().unwrap();
1763 assert_eq!(evs.len(), 1);
1764 let data = evs[0].data.as_ref().unwrap();
1765 assert_eq!(data["action"], "deny");
1766 assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_miss");
1767 assert_eq!(data["sniHost"], "evil.example.com");
1768 }
1769
1770 #[tokio::test]
1771 async fn proxy_emits_event_for_h2_unparseable_headers() {
1772 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1778 let emitter = Arc::new(MemEmitter::default());
1779 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1780 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1781
1782 let stream_bytes = build_h2c_stream_unparseable();
1783 let mut s = TcpStream::connect(listen).await.unwrap();
1784 s.write_all(&stream_bytes).await.unwrap();
1785 let mut sink = Vec::new();
1786 let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1787
1788 shutdown.store(true, Ordering::SeqCst);
1789 poke_shutdown(listen);
1790 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1791
1792 let evs = emitter.events.lock().unwrap();
1793 assert_eq!(evs.len(), 1);
1794 let data = evs[0].data.as_ref().unwrap();
1795 assert_eq!(data["action"], "deny");
1796 assert_eq!(data["reasonCode"], "l7_h2_unparseable_headers");
1797 }
1798
1799 #[tokio::test]
1802 async fn proxy_allows_h2c_with_continuation_fragmented_authority() {
1803 let (upstream, upstream_h) = spawn_echo_upstream().await;
1804 let emitter = Arc::new(MemEmitter::default());
1805 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1806 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1807
1808 let stream_bytes = build_h2c_stream_continuation_reassemblable("api.example.com");
1809 let mut s = TcpStream::connect(listen).await.unwrap();
1810 s.write_all(&stream_bytes).await.unwrap();
1811 s.shutdown().await.ok();
1812
1813 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1814 .await
1815 .expect("upstream task")
1816 .expect("upstream join");
1817 assert_eq!(upstream_bytes, stream_bytes);
1818
1819 shutdown.store(true, Ordering::SeqCst);
1820 poke_shutdown(listen);
1821 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1822
1823 let evs = emitter.events.lock().unwrap();
1824 assert_eq!(evs.len(), 1);
1825 let data = evs[0].data.as_ref().unwrap();
1826 assert_eq!(data["action"], "allow");
1827 assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_match");
1829 assert_eq!(data["sniHost"], "api.example.com");
1830 }
1831
1832 #[tokio::test]
1833 async fn proxy_allows_h2c_with_huffman_encoded_authority() {
1834 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1835 let emitter = Arc::new(MemEmitter::default());
1836 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1837 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1838
1839 let stream_bytes = build_h2c_stream_huffman("api.example.com");
1840 let mut s = TcpStream::connect(listen).await.unwrap();
1841 s.write_all(&stream_bytes).await.unwrap();
1842 s.shutdown().await.ok();
1843
1844 tokio::time::sleep(Duration::from_millis(200)).await;
1846
1847 shutdown.store(true, Ordering::SeqCst);
1848 poke_shutdown(listen);
1849 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1850
1851 let evs = emitter.events.lock().unwrap();
1852 assert_eq!(evs.len(), 1);
1853 let data = evs[0].data.as_ref().unwrap();
1854 assert_eq!(data["action"], "allow");
1855 assert_eq!(
1856 data["reasonCode"], "l7_h2_authority_allowlist_match_huffman",
1857 "Huffman provenance must surface as the differentiated reason code"
1858 );
1859 assert_eq!(data["sniHost"], "api.example.com");
1860 }
1861
1862 #[tokio::test]
1863 async fn proxy_allows_h2c_with_dynamic_table_indexed_authority() {
1864 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1865 let emitter = Arc::new(MemEmitter::default());
1866 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1867 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1868
1869 let stream_bytes = build_h2c_stream_dynamic_indexed_name("api.example.com");
1870 let mut s = TcpStream::connect(listen).await.unwrap();
1871 s.write_all(&stream_bytes).await.unwrap();
1872 s.shutdown().await.ok();
1873
1874 tokio::time::sleep(Duration::from_millis(200)).await;
1875
1876 shutdown.store(true, Ordering::SeqCst);
1877 poke_shutdown(listen);
1878 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1879
1880 let evs = emitter.events.lock().unwrap();
1881 assert_eq!(evs.len(), 1, "expected one event, got {evs:#?}");
1882 let data = evs[0].data.as_ref().unwrap();
1883 assert_eq!(data["action"], "allow");
1884 assert_eq!(
1885 data["reasonCode"], "l7_h2_authority_allowlist_match_dynamic_indexed",
1886 "dynamic-table-name provenance must surface as the differentiated reason code"
1887 );
1888 }
1889
1890 #[tokio::test]
1891 async fn proxy_denies_h2c_with_authority_extracted_from_huffman_literal_not_in_allowlist() {
1892 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1893 let emitter = Arc::new(MemEmitter::default());
1894 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1895 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1896
1897 let stream_bytes = build_h2c_stream_huffman("evil.example.com");
1898 let mut s = TcpStream::connect(listen).await.unwrap();
1899 s.write_all(&stream_bytes).await.unwrap();
1900 let mut sink = Vec::new();
1901 let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1902
1903 shutdown.store(true, Ordering::SeqCst);
1904 poke_shutdown(listen);
1905 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1906
1907 let evs = emitter.events.lock().unwrap();
1908 assert_eq!(evs.len(), 1);
1909 let data = evs[0].data.as_ref().unwrap();
1910 assert_eq!(data["action"], "deny");
1911 assert_eq!(
1912 data["reasonCode"], "l7_h2_authority_allowlist_miss_huffman",
1913 "Huffman provenance must surface in the deny reason code"
1914 );
1915 assert_eq!(data["sniHost"], "evil.example.com");
1916 }
1917
1918 #[tokio::test]
1919 async fn proxy_emits_event_with_extracted_authority_when_huffman_used() {
1920 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1926 let emitter = Arc::new(MemEmitter::default());
1927 let cfg = cfg_with(&["my-service.example.com"], upstream, 500);
1928 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1929
1930 let stream_bytes = build_h2c_stream_huffman("my-service.example.com");
1931 let mut s = TcpStream::connect(listen).await.unwrap();
1932 s.write_all(&stream_bytes).await.unwrap();
1933 s.shutdown().await.ok();
1934
1935 tokio::time::sleep(Duration::from_millis(200)).await;
1936
1937 shutdown.store(true, Ordering::SeqCst);
1938 poke_shutdown(listen);
1939 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1940
1941 let evs = emitter.events.lock().unwrap();
1942 assert_eq!(evs.len(), 1);
1943 let data = evs[0].data.as_ref().unwrap();
1944 assert_eq!(data["sniHost"], "my-service.example.com");
1947 assert_eq!(data["action"], "allow");
1948 assert_eq!(
1949 data["reasonCode"],
1950 "l7_h2_authority_allowlist_match_huffman",
1951 );
1952 }
1953
1954 #[tokio::test]
1955 async fn proxy_emits_event_for_h2_missing_authority() {
1956 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1957 let emitter = Arc::new(MemEmitter::default());
1958 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1959 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1960
1961 let stream_bytes = build_h2c_stream_no_authority();
1962 let mut s = TcpStream::connect(listen).await.unwrap();
1963 s.write_all(&stream_bytes).await.unwrap();
1964 let mut sink = Vec::new();
1965 let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1966
1967 shutdown.store(true, Ordering::SeqCst);
1968 poke_shutdown(listen);
1969 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1970
1971 let evs = emitter.events.lock().unwrap();
1972 assert_eq!(evs.len(), 1);
1973 let data = evs[0].data.as_ref().unwrap();
1974 assert_eq!(data["action"], "deny");
1975 assert_eq!(data["reasonCode"], "l7_h2_authority_missing");
1976 }
1977
1978 #[tokio::test]
1984 async fn proxy_h2c_deny_responds_with_rst_stream_refused() {
1985 let (upstream, _upstream_h) = spawn_echo_upstream().await;
1986 let emitter = Arc::new(MemEmitter::default());
1987 let cfg = cfg_with(&["api.example.com"], upstream, 500);
1988 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1989
1990 let stream_bytes = build_h2c_stream("evil.example.com");
1991 let mut s = TcpStream::connect(listen).await.unwrap();
1992 s.write_all(&stream_bytes).await.unwrap();
1993 s.shutdown().await.ok();
1997 let mut response = Vec::new();
1998 let read_result =
1999 tokio::time::timeout(Duration::from_secs(2), s.read_to_end(&mut response))
2000 .await
2001 .expect("read deadline");
2002 match read_result {
2003 Ok(_) => {}
2004 Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {}
2005 Err(e) => panic!("unexpected read error: {e}"),
2006 }
2007 assert!(
2011 response.len() >= 13,
2012 "expected at least one full RST_STREAM frame, got {} bytes: {:02x?}",
2013 response.len(),
2014 response
2015 );
2016 assert_eq!(&response[0..3], &[0x00, 0x00, 0x04], "RST_STREAM length");
2017 assert_eq!(response[3], 0x03, "RST_STREAM frame type");
2018 assert_eq!(response[4], 0x00, "RST_STREAM flags");
2019 assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x01], "stream id 1");
2020 assert_eq!(
2021 &response[9..13],
2022 &[0x00, 0x00, 0x00, 0x07],
2023 "RST_STREAM error code = REFUSED_STREAM"
2024 );
2025 {
2029 let evs = emitter.events.lock().unwrap();
2030 assert!(!evs.is_empty(), "expected at least one deny event");
2031 let data = evs[0].data.as_ref().unwrap();
2032 assert_eq!(data["action"], "deny");
2033 assert_eq!(data["streamId"], 1);
2034 }
2035
2036 shutdown.store(true, Ordering::SeqCst);
2037 poke_shutdown(listen);
2038 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2039 }
2040
2041 #[allow(dead_code)]
2046 fn _legacy_goaway_assertions(response: &[u8]) {
2047 assert_eq!(&response[0..3], &[0x00, 0x00, 0x08], "GOAWAY length");
2048 assert_eq!(response[3], 0x07, "GOAWAY frame type");
2049 assert_eq!(response[4], 0x00, "GOAWAY flags");
2050 assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x00], "stream id 0");
2051 assert_eq!(
2052 &response[13..17],
2053 &[0x00, 0x00, 0x00, 0x01],
2054 "GOAWAY error code = PROTOCOL_ERROR"
2055 );
2056 }
2057
2058 fn build_h2c_two_streams(authority_a: &str, authority_b: &str) -> Vec<u8> {
2066 let mut out = h2::HTTP2_PREFACE.to_vec();
2067 out.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2068 let block_a = h2::test_helpers::hpack_literal_indexed_name(1, authority_a);
2069 out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_a, 1));
2070 let block_b = h2::test_helpers::hpack_literal_indexed_name(1, authority_b);
2071 out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_b, 3));
2072 out
2073 }
2074
2075 fn build_h2c_denied_stream_then_data(
2079 authority_allowed: &str,
2080 authority_denied: &str,
2081 data_payload: &[u8],
2082 ) -> Vec<u8> {
2083 let mut out = h2::HTTP2_PREFACE.to_vec();
2084 out.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2085 let block_d = h2::test_helpers::hpack_literal_indexed_name(1, authority_denied);
2087 out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_d, 1));
2088 out.extend_from_slice(&h2::test_helpers::data_frame_on_stream(data_payload, 1));
2090 let block_a = h2::test_helpers::hpack_literal_indexed_name(1, authority_allowed);
2092 out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_a, 3));
2093 out
2094 }
2095
2096 #[tokio::test]
2097 async fn proxy_allows_two_streams_with_matching_authority_on_same_connection() {
2098 let (upstream, upstream_h) = spawn_echo_upstream().await;
2099 let emitter = Arc::new(MemEmitter::default());
2100 let cfg = cfg_with(&["api.example.com", "api.other.com"], upstream, 500);
2101 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2102
2103 let stream_bytes = build_h2c_two_streams("api.example.com", "api.other.com");
2104 let mut s = TcpStream::connect(listen).await.unwrap();
2105 s.write_all(&stream_bytes).await.unwrap();
2106 s.shutdown().await.ok();
2107
2108 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2109 .await
2110 .expect("upstream task")
2111 .expect("upstream join");
2112 assert_eq!(upstream_bytes, stream_bytes);
2115
2116 shutdown.store(true, Ordering::SeqCst);
2117 poke_shutdown(listen);
2118 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2119
2120 let evs = emitter.events.lock().unwrap();
2121 assert_eq!(
2122 evs.len(),
2123 2,
2124 "expected one allow event per stream, got {evs:#?}"
2125 );
2126 let mut stream_ids: Vec<i64> = evs
2127 .iter()
2128 .map(|e| {
2129 e.data
2130 .as_ref()
2131 .unwrap()
2132 .get("streamId")
2133 .and_then(|v| v.as_i64())
2134 .expect("event must carry streamId")
2135 })
2136 .collect();
2137 stream_ids.sort_unstable();
2138 assert_eq!(stream_ids, vec![1, 3]);
2139 for ev in evs.iter() {
2140 let data = ev.data.as_ref().unwrap();
2141 assert_eq!(data["action"], "allow");
2142 }
2143 }
2144
2145 #[tokio::test]
2146 async fn proxy_denies_one_stream_keeps_connection_open_for_other_streams() {
2147 let (upstream, _upstream_h) = spawn_echo_upstream().await;
2148 let emitter = Arc::new(MemEmitter::default());
2149 let cfg = cfg_with(&["api.example.com"], upstream, 500);
2150 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2151
2152 let stream_bytes = build_h2c_two_streams("evil.example.com", "api.example.com");
2154 let mut s = TcpStream::connect(listen).await.unwrap();
2155 s.write_all(&stream_bytes).await.unwrap();
2156 let mut response = vec![0u8; 13];
2159 let read_n = tokio::time::timeout(Duration::from_secs(2), s.read_exact(&mut response))
2160 .await
2161 .expect("read deadline");
2162 assert!(read_n.is_ok(), "expected to read 13 bytes (RST_STREAM)");
2163 assert_eq!(&response[0..3], &[0x00, 0x00, 0x04], "RST_STREAM length");
2164 assert_eq!(response[3], 0x03, "RST_STREAM type");
2165 assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x01], "stream id 1");
2166 assert_eq!(
2167 &response[9..13],
2168 &[0x00, 0x00, 0x00, 0x07],
2169 "REFUSED_STREAM"
2170 );
2171
2172 s.shutdown().await.ok();
2173
2174 shutdown.store(true, Ordering::SeqCst);
2175 poke_shutdown(listen);
2176 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2177
2178 let evs = emitter.events.lock().unwrap();
2179 assert_eq!(evs.len(), 2, "expected one event per stream");
2180 let mut by_stream: std::collections::HashMap<i64, &CloudEventV1> =
2182 std::collections::HashMap::new();
2183 for ev in evs.iter() {
2184 let sid = ev
2185 .data
2186 .as_ref()
2187 .unwrap()
2188 .get("streamId")
2189 .and_then(|v| v.as_i64())
2190 .unwrap();
2191 by_stream.insert(sid, ev);
2192 }
2193 assert_eq!(by_stream[&1].data.as_ref().unwrap()["action"], "deny");
2194 assert_eq!(by_stream[&3].data.as_ref().unwrap()["action"], "allow");
2195 }
2196
2197 #[tokio::test]
2198 async fn proxy_emits_one_event_per_stream_decision() {
2199 let (upstream, _upstream_h) = spawn_echo_upstream().await;
2200 let emitter = Arc::new(MemEmitter::default());
2201 let cfg = cfg_with(&["api.example.com"], upstream, 500);
2202 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2203
2204 let mut bytes = h2::HTTP2_PREFACE.to_vec();
2206 bytes.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2207 bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2208 &h2::test_helpers::hpack_literal_indexed_name(1, "api.example.com"),
2209 1,
2210 ));
2211 bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2212 &h2::test_helpers::hpack_literal_indexed_name(1, "evil1.example.com"),
2213 3,
2214 ));
2215 bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2216 &h2::test_helpers::hpack_literal_indexed_name(1, "evil2.example.com"),
2217 5,
2218 ));
2219
2220 let mut s = TcpStream::connect(listen).await.unwrap();
2221 s.write_all(&bytes).await.unwrap();
2222 let mut sink = vec![0u8; 64];
2224 let _ = tokio::time::timeout(Duration::from_millis(400), s.read(&mut sink)).await;
2225 s.shutdown().await.ok();
2226
2227 shutdown.store(true, Ordering::SeqCst);
2228 poke_shutdown(listen);
2229 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2230
2231 let evs = emitter.events.lock().unwrap();
2232 assert_eq!(
2233 evs.len(),
2234 3,
2235 "expected exactly one event per stream, got {evs:#?}"
2236 );
2237 let mut per_action: std::collections::HashMap<String, usize> =
2238 std::collections::HashMap::new();
2239 for ev in evs.iter() {
2240 let action = ev.data.as_ref().unwrap()["action"]
2241 .as_str()
2242 .unwrap()
2243 .to_string();
2244 *per_action.entry(action).or_default() += 1;
2245 assert!(
2247 ev.data
2248 .as_ref()
2249 .unwrap()
2250 .get("streamId")
2251 .and_then(|v| v.as_i64())
2252 .is_some(),
2253 "event missing streamId: {ev:?}"
2254 );
2255 }
2256 assert_eq!(per_action.get("allow").copied(), Some(1));
2257 assert_eq!(per_action.get("deny").copied(), Some(2));
2258 }
2259
2260 #[tokio::test]
2261 async fn proxy_drops_data_frames_on_denied_stream() {
2262 let (upstream, upstream_h) = spawn_echo_upstream().await;
2263 let emitter = Arc::new(MemEmitter::default());
2264 let cfg = cfg_with(&["api.example.com"], upstream, 500);
2265 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2266
2267 let secret = b"do-not-exfiltrate";
2270 let stream_bytes =
2271 build_h2c_denied_stream_then_data("api.example.com", "evil.example.com", secret);
2272 let mut s = TcpStream::connect(listen).await.unwrap();
2273 s.write_all(&stream_bytes).await.unwrap();
2274 s.shutdown().await.ok();
2275
2276 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2277 .await
2278 .expect("upstream task")
2279 .expect("upstream join");
2280 let upstream_str = String::from_utf8_lossy(&upstream_bytes);
2283 assert!(
2284 !upstream_str.contains("do-not-exfiltrate"),
2285 "denied-stream DATA leaked to upstream: {upstream_str:?}"
2286 );
2287 assert!(!upstream_bytes.is_empty(), "upstream got no bytes");
2291
2292 shutdown.store(true, Ordering::SeqCst);
2293 poke_shutdown(listen);
2294 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2295 }
2296
2297 #[tokio::test]
2298 async fn proxy_forwards_settings_and_window_update_verbatim() {
2299 let (upstream, upstream_h) = spawn_echo_upstream().await;
2300 let emitter = Arc::new(MemEmitter::default());
2301 let cfg = cfg_with(&["api.example.com"], upstream, 500);
2302 let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2303
2304 let mut bytes = h2::HTTP2_PREFACE.to_vec();
2308 bytes.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2309 bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2310 &h2::test_helpers::hpack_literal_indexed_name(1, "api.example.com"),
2311 1,
2312 ));
2313 let mut wu = h2::test_helpers::frame_header(4, 0x08, 0x00, 0);
2315 wu.extend_from_slice(&1024u32.to_be_bytes());
2316 bytes.extend_from_slice(&wu);
2317
2318 let mut s = TcpStream::connect(listen).await.unwrap();
2319 s.write_all(&bytes).await.unwrap();
2320 s.shutdown().await.ok();
2321
2322 let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2323 .await
2324 .expect("upstream task")
2325 .expect("upstream join");
2326 assert_eq!(upstream_bytes, bytes);
2330
2331 shutdown.store(true, Ordering::SeqCst);
2332 poke_shutdown(listen);
2333 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2334 }
2335}