Skip to main content

cellos_supervisor/sni_proxy/
mod.rs

1//! SEC-22 Phase 2 SNI-aware egress proxy.
2//!
3//! Inspects the first bytes of every TCP connection a workload opens and
4//! enforces `dnsAuthority.hostnameAllowlist` at L7 by parsing either:
5//!
6//! - the TLS ClientHello's `server_name` extension ([`sni`]), or
7//! - the HTTP/1.x `Host` header on cleartext requests ([`http`]),
8//!
9//! and matching the extracted hostname against the shared
10//! [`cellos_core::hostname_allowlist::matches_allowlist`] helper. Allowed
11//! flows are forwarded to a declared upstream via
12//! `tokio::io::copy_bidirectional` (the proxy NEVER terminates TLS — the
13//! ClientHello bytes that produced the SNI decision are written through to
14//! the upstream as the first bytes of the forwarded stream). Denied flows
15//! are dropped (TLS) or answered with `HTTP/1.1 403 Forbidden` (cleartext
16//! HTTP). Each decision emits exactly one
17//! `dev.cellos.events.cell.observability.v1.l7_egress_decision` CloudEvent
18//! carrying one of the eight Phase 2 reason codes:
19//!
20//! - `l7_sni_allowlist_match`           — TLS allow
21//! - `l7_sni_allowlist_miss`            — TLS deny (SNI not in allowlist)
22//! - `l7_sni_missing`                   — TLS deny (no SNI in ClientHello)
23//! - `l7_http_host_allowlist_match`     — HTTP allow
24//! - `l7_http_host_allowlist_miss`      — HTTP deny (Host not in allowlist)
25//! - `l7_http_host_missing`             — HTTP deny (no Host header)
26//! - `l7_unknown_protocol`              — first bytes neither TLS nor HTTP/1.x
27//! - `l7_peek_timeout`                  — workload sent no bytes within the
28//!   peek window (defensive shutdown)
29//!
30//! ### Phase 3a (h2c) — additional reason codes
31//!
32//! - `l7_h2_authority_allowlist_match`  — h2c HEADERS-frame `:authority`
33//!   matched the allowlist; bytes forwarded.
34//! - `l7_h2_authority_allowlist_miss`   — h2c HEADERS-frame `:authority`
35//!   did NOT match the allowlist; deny + GOAWAY (PROTOCOL_ERROR).
36//! - `l7_h2_authority_missing`          — well-formed h2c frames but no
37//!   `:authority` pseudo-header in the first HEADERS frame.
38//! - `l7_h2_unparseable_headers`        — HPACK decode failure,
39//!   CONTINUATION-fragmented HEADERS, oversized frame, or any other h2
40//!   parse error Phase 3a does not handle. Deny + GOAWAY.
41//!
42//! The reason-code list extends the existing
43//! `cell-observability-l7-egress-decision-v1.schema.json` `reasonCode`
44//! description without breaking shape — only new string values are added.
45//!
46//! ## Honest scope
47//!
48//! - **TLS not terminated.** SNI ↔ Host alignment cannot be checked in
49//!   Phase 2; that requires MITM termination and a CA the workload trusts.
50//!   Fronting via mismatched SNI vs. Host on the same TLS session remains a
51//!   Phase 3 residual risk per [`docs/sec22-residual-risk.md`].
52//! - **HTTP/2 cleartext (h2c) inspected (Phase 3a).** When a workload
53//!   sends the canonical 24-byte `PRI * HTTP/2.0` preface, the proxy
54//!   peels it + the optional SETTINGS frame and parses the first HEADERS
55//!   frame's `:authority` pseudo-header through the static-table-only
56//!   HPACK decoder in [`h2`]. The same `dnsAuthority.hostnameAllowlist`
57//!   that gates SNI + HTTP/1.x Host gates h2c. Allow path forwards the
58//!   buffered preface + HEADERS verbatim; deny path emits an HTTP/2
59//!   GOAWAY (PROTOCOL_ERROR) and closes.
60//! - **HTTP/2 over TLS still residual.** Inspecting HEADERS inside an
61//!   encrypted stream requires TLS termination — Phase 4. HTTP/3 / QUIC
62//!   are also out of scope.
63//! - **DoH residual.** When an operator allowlists a hostname that itself
64//!   hosts DoH, the workload can tunnel DNS inside an allowed flow. This is
65//!   an operator-policy residual; the proxy enforces what the allowlist
66//!   declares, not the semantic intent.
67//!
68//! ## Module layout
69//!
70//! - [`sni`] — pure-byte ClientHello SNI extractor.
71//! - [`http`] — HTTP/1.x request-line + Host extractor.
72//! - [`run_one_shot`] (in this file) — async per-connection accept/probe/
73//!   forward loop, platform-neutral.
74//! - [`spawn`] — Linux-only `setns(2)` helper that places `run_one_shot`'s
75//!   listener inside the cell's netns; mirrors [`crate::dns_proxy::spawn`].
76//!
77//! Reuses the shared
78//! [`cellos_core::hostname_allowlist::matches_allowlist`] helper that the
79//! W8 commit extracted from the W3 SEAM-1 dataplane proxy.
80
81pub mod h2;
82pub mod http;
83pub mod sni;
84pub mod spawn;
85
86use std::net::SocketAddr;
87use std::sync::atomic::{AtomicBool, Ordering};
88use std::sync::Arc;
89use std::time::Duration;
90
91use cellos_core::{CdnProvider, CloudEventV1, ExecutionCellSpec};
92use serde_json::{json, Map, Value};
93use tokio::io::AsyncWriteExt;
94use tokio::net::{TcpListener, TcpStream};
95
96/// Maximum bytes the proxy peeks before deciding probe/allow/deny.
97///
98/// scope: 16 KiB ceiling. The TLS / HTTP/1.x paths always decide well
99/// under 4 KiB; the ceiling is sized for h2c HEADERS+CONTINUATION
100/// reassembly. RFC 7540 §6.5.2 default `SETTINGS_MAX_FRAME_SIZE` is
101/// 16 KiB so the worst-case single HEADERS-with-no-CONTINUATION frame
102/// plus the leading SETTINGS frame fits in this buffer; longer header
103/// blocks fragment over CONTINUATION up to [`h2::MAX_HEADER_BLOCK_SIZE`]
104/// (64 KiB) — at which point the reassembler returns `Ok(None)` (need
105/// more bytes) and the peek_timeout path triggers, so the connection is
106/// denied with a `peek_timeout` rather than over-buffering memory per
107/// connection.
108pub const PEEK_BUF_LEN: usize = 16 * 1024;
109
110/// Subset of [`ExecutionCellSpec`] that the proxy needs at run-time. Owned
111/// so the supervisor can build it once and pass it through the
112/// async-to-sync-thread boundary without lifetime gymnastics (mirrors
113/// [`crate::dns_proxy::DnsProxyConfig`]).
114#[derive(Debug, Clone)]
115pub struct SniProxyConfig {
116    /// Address the proxy listener is bound to inside the cell's netns.
117    /// Used for diagnostics; the caller passes the actual pre-bound listener.
118    pub bind_addr: SocketAddr,
119    /// Address the proxy forwards allowed connections to. In a real
120    /// deployment this is a transparent next-hop (egress NAT, sidecar) that
121    /// re-emits the bytes onto the wire. In the unit tests it is a
122    /// localhost echo / handshake stub.
123    pub upstream_addr: SocketAddr,
124    /// Hostname allowlist (literal or single-leading-`*.` wildcard). Same
125    /// shape as [`crate::dns_proxy::DnsProxyConfig::hostname_allowlist`] —
126    /// shared matcher in `cellos_core::hostname_allowlist`.
127    pub hostname_allowlist: Vec<String>,
128    /// CDN providers the workload's spec declares (`spec.authority.cdnAuthority.providers`).
129    /// scope: retained for diagnostics and future SNI ↔ Host fronting
130    /// detection; the current decision logic does not branch on it.
131    pub cdn_providers: Vec<CdnProvider>,
132    /// Cell identifier (mirrors `lifecycle.started.cellId`).
133    pub cell_id: String,
134    /// Run identifier (mirrors `lifecycle.started.runId`).
135    pub run_id: String,
136    /// Optional `policyDigest` to bind into emitted events.
137    pub policy_digest: Option<String>,
138    /// Optional `keysetId` to bind into emitted events.
139    pub keyset_id: Option<String>,
140    /// Optional `issuerKid` to bind into emitted events.
141    pub issuer_kid: Option<String>,
142    /// Optional `correlationId` to bind into emitted events.
143    pub correlation_id: Option<String>,
144    /// Resolver / proxy identifier stamped into events for audit trail.
145    pub upstream_resolver_id: String,
146    /// Maximum time to wait for first bytes from the workload. On timeout
147    /// the connection is dropped and one `l7_peek_timeout` event is emitted.
148    pub peek_timeout: Duration,
149}
150
151/// Sink the proxy uses to publish per-decision CloudEvents. Trait-erased
152/// so unit tests can plug in an in-memory collector. Mirrors
153/// [`crate::dns_proxy::DnsQueryEmitter`] — the same fire-and-forget shape
154/// keeps the per-connection task synchronous from the emit-call's POV.
155pub trait L7DecisionEmitter: Send + Sync + 'static {
156    /// Publish a single CloudEvent. Implementations should not block.
157    fn emit(&self, event: CloudEventV1);
158}
159
160/// Aggregate counters returned by [`run_one_shot`] when the loop terminates.
161/// Mirrors [`crate::dns_proxy::DnsProxyStats`]; logged at teardown for
162/// sanity-checking ratios.
163#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
164pub struct ProxyStats {
165    pub connections_total: u64,
166    pub connections_allowed: u64,
167    pub connections_denied: u64,
168    pub peek_timeouts: u64,
169    pub upstream_failures: u64,
170}
171
172/// Reason codes for `l7_egress_decision` events emitted by the SNI proxy.
173/// All twelve values extend the existing schema's open-ended `reasonCode`
174/// description — no schema break.
175mod reason_code {
176    pub const SNI_ALLOWLIST_MATCH: &str = "l7_sni_allowlist_match";
177    pub const SNI_ALLOWLIST_MISS: &str = "l7_sni_allowlist_miss";
178    pub const SNI_MISSING: &str = "l7_sni_missing";
179    pub const HTTP_HOST_ALLOWLIST_MATCH: &str = "l7_http_host_allowlist_match";
180    pub const HTTP_HOST_ALLOWLIST_MISS: &str = "l7_http_host_allowlist_miss";
181    pub const HTTP_HOST_MISSING: &str = "l7_http_host_missing";
182    pub const UNKNOWN_PROTOCOL: &str = "l7_unknown_protocol";
183    pub const PEEK_TIMEOUT: &str = "l7_peek_timeout";
184    // ── Phase 3a (h2c HEADERS-frame :authority) ──────────────────────
185    pub const H2_AUTHORITY_ALLOWLIST_MATCH: &str = "l7_h2_authority_allowlist_match";
186    pub const H2_AUTHORITY_ALLOWLIST_MISS: &str = "l7_h2_authority_allowlist_miss";
187    pub const H2_AUTHORITY_MISSING: &str = "l7_h2_authority_missing";
188    pub const H2_UNPARSEABLE_HEADERS: &str = "l7_h2_unparseable_headers";
189    // ── Phase 3g (full HPACK: dynamic-table state + Huffman) ─────────
190    // Differentiate audit trail by HOW the authority was decoded so
191    // operators can spot adversaries who deliberately use rare encoder
192    // paths (Huffman + dynamic table) to obfuscate.
193    pub const H2_AUTHORITY_ALLOWLIST_MATCH_HUFFMAN: &str =
194        "l7_h2_authority_allowlist_match_huffman";
195    pub const H2_AUTHORITY_ALLOWLIST_MISS_HUFFMAN: &str = "l7_h2_authority_allowlist_miss_huffman";
196    pub const H2_AUTHORITY_ALLOWLIST_MATCH_DYNAMIC_INDEXED: &str =
197        "l7_h2_authority_allowlist_match_dynamic_indexed";
198    pub const H2_AUTHORITY_ALLOWLIST_MISS_DYNAMIC_INDEXED: &str =
199        "l7_h2_authority_allowlist_miss_dynamic_indexed";
200}
201
202/// Map an HPACK [`h2::AuthorityProvenance`] to the right (allow, miss)
203/// reason-code pair so the audit trail differentiates static-table refs
204/// (P3c attacker simplicity), dynamic-table refs (Phase 3g — adversary
205/// established prior context), and Huffman literals (Phase 3g — adversary
206/// used encoder compression).
207fn h2_reason_codes_for(provenance: h2::AuthorityProvenance) -> (&'static str, &'static str) {
208    match provenance {
209        h2::AuthorityProvenance::StaticIndexed | h2::AuthorityProvenance::StaticLiteral => (
210            reason_code::H2_AUTHORITY_ALLOWLIST_MATCH,
211            reason_code::H2_AUTHORITY_ALLOWLIST_MISS,
212        ),
213        h2::AuthorityProvenance::DynamicIndexed => (
214            reason_code::H2_AUTHORITY_ALLOWLIST_MATCH_DYNAMIC_INDEXED,
215            reason_code::H2_AUTHORITY_ALLOWLIST_MISS_DYNAMIC_INDEXED,
216        ),
217        h2::AuthorityProvenance::Huffman => (
218            reason_code::H2_AUTHORITY_ALLOWLIST_MATCH_HUFFMAN,
219            reason_code::H2_AUTHORITY_ALLOWLIST_MISS_HUFFMAN,
220        ),
221    }
222}
223
224/// What the first peeked bytes look like.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226enum ProtocolGuess {
227    Tls,
228    H2c,
229    Http1,
230    Unknown,
231}
232
233fn guess_protocol(buf: &[u8]) -> ProtocolGuess {
234    if buf.is_empty() {
235        return ProtocolGuess::Unknown;
236    }
237    if buf[0] == 22 {
238        return ProtocolGuess::Tls;
239    }
240    // The h2c connection preface starts with `PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n`.
241    // Probe BEFORE the HTTP/1.x method dispatch because `PRI` is a
242    // "request method" only on h2 and any HTTP/1.x parser would mistake
243    // the bytes that follow for a malformed request line.
244    if h2::is_h2c_preface(buf) {
245        return ProtocolGuess::H2c;
246    }
247    // HTTP/1.x request lines start with an ASCII method token. Cover the
248    // common verbs the brief enumerates; anything else is unknown.
249    const METHODS: &[&[u8]] = &[
250        b"GET ",
251        b"POST ",
252        b"HEAD ",
253        b"PUT ",
254        b"DELETE ",
255        b"OPTIONS ",
256        b"PATCH ",
257        b"CONNECT ",
258    ];
259    for m in METHODS {
260        if buf.len() >= m.len() && &buf[..m.len()] == *m {
261            return ProtocolGuess::Http1;
262        }
263    }
264    ProtocolGuess::Unknown
265}
266
267/// Run the SNI proxy accept loop until `shutdown` is set.
268///
269/// Each accepted TCP connection is handed to a dedicated `tokio::spawn`
270/// task that:
271///
272/// 1. Reads up to [`PEEK_BUF_LEN`] bytes within `cfg.peek_timeout`.
273/// 2. Probes the protocol byte (TLS=22 / HTTP method / unknown).
274/// 3. Calls the appropriate parser ([`sni::extract_sni`] or
275///    [`http::extract_http_host`]) on the buffered preamble.
276/// 4. Matches the extracted host against `cfg.hostname_allowlist`.
277/// 5. On allow: connects to `cfg.upstream_addr`, writes the buffered
278///    preamble, then `tokio::io::copy_bidirectional` until either side
279///    closes.
280/// 6. On deny: closes the client (TLS) or writes
281///    `HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n`
282///    (HTTP) and drops the stream.
283/// 7. Emits exactly one `l7_egress_decision` CloudEvent regardless of path.
284///
285/// Each spawned task increments [`ProxyStats`] via shared atomics; the
286/// returned snapshot reflects cumulative counts at loop exit.
287///
288/// Termination: the accept loop awakens at most once per `shutdown` flip
289/// because `accept` blocks. The spawn helper's `signal_sni_proxy_shutdown`
290/// connects-and-drops a TCP socket to wake `accept`; the loop then observes
291/// `shutdown` and returns.
292pub async fn run_one_shot(
293    cfg: &SniProxyConfig,
294    listener: TcpListener,
295    emitter: Arc<dyn L7DecisionEmitter>,
296    shutdown: Arc<AtomicBool>,
297) -> std::io::Result<ProxyStats> {
298    let total = Arc::new(std::sync::atomic::AtomicU64::new(0));
299    let allowed = Arc::new(std::sync::atomic::AtomicU64::new(0));
300    let denied = Arc::new(std::sync::atomic::AtomicU64::new(0));
301    let timeouts = Arc::new(std::sync::atomic::AtomicU64::new(0));
302    let upstream_failures = Arc::new(std::sync::atomic::AtomicU64::new(0));
303
304    // Build the per-event spec stub once; the supervisor passes us
305    // pre-derived fields, but `observability_l7_egress_decision_data_v1`
306    // takes a `&ExecutionCellSpec` for `correlation` and `id`. We synthesise
307    // a minimal spec carrying just the fields the builder reads.
308    let event_spec = build_event_spec(cfg);
309
310    while !shutdown.load(Ordering::SeqCst) {
311        let (stream, peer) = match listener.accept().await {
312            Ok(t) => t,
313            Err(e) => {
314                if shutdown.load(Ordering::SeqCst) {
315                    break;
316                }
317                tracing::warn!(
318                    target: "cellos.supervisor.sni_proxy",
319                    error = %e,
320                    "accept() failed"
321                );
322                continue;
323            }
324        };
325        if shutdown.load(Ordering::SeqCst) {
326            // Wake-connection from `signal_sni_proxy_shutdown` — drop and exit.
327            drop(stream);
328            break;
329        }
330        total.fetch_add(1, Ordering::SeqCst);
331
332        let cfg = cfg.clone();
333        let emitter = emitter.clone();
334        let event_spec = event_spec.clone();
335        let allowed = allowed.clone();
336        let denied = denied.clone();
337        let timeouts = timeouts.clone();
338        let upstream_failures = upstream_failures.clone();
339
340        tokio::spawn(async move {
341            handle_connection(
342                stream,
343                peer,
344                cfg,
345                emitter,
346                event_spec,
347                allowed,
348                denied,
349                timeouts,
350                upstream_failures,
351            )
352            .await;
353        });
354    }
355
356    Ok(ProxyStats {
357        connections_total: total.load(Ordering::SeqCst),
358        connections_allowed: allowed.load(Ordering::SeqCst),
359        connections_denied: denied.load(Ordering::SeqCst),
360        peek_timeouts: timeouts.load(Ordering::SeqCst),
361        upstream_failures: upstream_failures.load(Ordering::SeqCst),
362    })
363}
364
365/// Per-connection worker. Encapsulated so [`run_one_shot`] stays focused on
366/// the accept loop. Updates the shared atomics + emits exactly one event
367/// per call.
368#[allow(clippy::too_many_arguments)]
369async fn handle_connection(
370    mut stream: TcpStream,
371    _peer: SocketAddr,
372    cfg: SniProxyConfig,
373    emitter: Arc<dyn L7DecisionEmitter>,
374    event_spec: ExecutionCellSpec,
375    allowed: Arc<std::sync::atomic::AtomicU64>,
376    denied: Arc<std::sync::atomic::AtomicU64>,
377    timeouts: Arc<std::sync::atomic::AtomicU64>,
378    upstream_failures: Arc<std::sync::atomic::AtomicU64>,
379) {
380    // ── Step 1: peek with a bounded timeout. ──────────────────────────────
381    let mut buf = vec![0u8; PEEK_BUF_LEN];
382    let peek_result = tokio::time::timeout(cfg.peek_timeout, stream.peek(&mut buf)).await;
383
384    let n = match peek_result {
385        Err(_elapsed) => {
386            // Peek timed out before any bytes arrived. Drop the client and
387            // emit a peek_timeout event — this is the fallback path when
388            // the workload opens a TCP connection but never sends bytes.
389            timeouts.fetch_add(1, Ordering::SeqCst);
390            denied.fetch_add(1, Ordering::SeqCst);
391            emit_decision(
392                &emitter,
393                &cfg,
394                &event_spec,
395                "deny",
396                "",
397                reason_code::PEEK_TIMEOUT,
398                None,
399                None,
400            );
401            return;
402        }
403        Ok(Err(e)) => {
404            tracing::debug!(
405                target: "cellos.supervisor.sni_proxy",
406                error = %e,
407                "peek() error before any bytes"
408            );
409            denied.fetch_add(1, Ordering::SeqCst);
410            emit_decision(
411                &emitter,
412                &cfg,
413                &event_spec,
414                "deny",
415                "",
416                reason_code::UNKNOWN_PROTOCOL,
417                None,
418                None,
419            );
420            return;
421        }
422        Ok(Ok(0)) => {
423            // EOF before any data — treat as unknown.
424            denied.fetch_add(1, Ordering::SeqCst);
425            emit_decision(
426                &emitter,
427                &cfg,
428                &event_spec,
429                "deny",
430                "",
431                reason_code::UNKNOWN_PROTOCOL,
432                None,
433                None,
434            );
435            return;
436        }
437        Ok(Ok(n)) => n,
438    };
439    let preamble = &buf[..n];
440
441    // ── Step 2: probe protocol + extract host. ────────────────────────────
442    let guess = guess_protocol(preamble);
443    let (host_opt, allow_reason, miss_reason, missing_reason, deny_response): (
444        Option<String>,
445        &'static str,
446        &'static str,
447        &'static str,
448        DenyResponse,
449    ) = match guess {
450        ProtocolGuess::Tls => match sni::extract_sni(preamble) {
451            Ok(opt) => (
452                opt,
453                reason_code::SNI_ALLOWLIST_MATCH,
454                reason_code::SNI_ALLOWLIST_MISS,
455                reason_code::SNI_MISSING,
456                DenyResponse::Drop,
457            ),
458            Err(_e) => {
459                denied.fetch_add(1, Ordering::SeqCst);
460                emit_decision(
461                    &emitter,
462                    &cfg,
463                    &event_spec,
464                    "deny",
465                    "",
466                    reason_code::UNKNOWN_PROTOCOL,
467                    None,
468                    None,
469                );
470                return;
471            }
472        },
473        ProtocolGuess::H2c => {
474            // scope: per-stream allow/deny. Hand off to a dedicated
475            // frame-by-frame handler that:
476            //   - connects to upstream
477            //   - parses every frame on both directions (frame layer is
478            //     pure-byte; no h2 crate)
479            //   - decides allow/deny per HEADERS+CONTINUATION block via
480            //     H2ConnectionDecoder, and emits one l7_egress_decision
481            //     per stream decision (each event carries streamId)
482            //   - on deny: writes RST_STREAM(REFUSED_STREAM=7) to the
483            //     workload, drops further frames on that stream, but
484            //     keeps the connection alive for OTHER streams
485            //   - allow path forwards verbatim to upstream
486            //
487            // The single-connection allow/deny lock from Phase 3g is
488            // gone; the connection itself is no longer either "allowed"
489            // or "denied" — only individual streams are.
490            handle_h2c_connection(
491                stream,
492                preamble.to_vec(),
493                cfg.clone(),
494                emitter.clone(),
495                event_spec.clone(),
496                allowed.clone(),
497                denied.clone(),
498                upstream_failures.clone(),
499            )
500            .await;
501            return;
502        }
503        ProtocolGuess::Http1 => match http::extract_http_host(preamble) {
504            Ok(opt) => (
505                opt,
506                reason_code::HTTP_HOST_ALLOWLIST_MATCH,
507                reason_code::HTTP_HOST_ALLOWLIST_MISS,
508                reason_code::HTTP_HOST_MISSING,
509                DenyResponse::Http403,
510            ),
511            Err(_e) => {
512                denied.fetch_add(1, Ordering::SeqCst);
513                emit_decision(
514                    &emitter,
515                    &cfg,
516                    &event_spec,
517                    "deny",
518                    "",
519                    reason_code::UNKNOWN_PROTOCOL,
520                    None,
521                    None,
522                );
523                return;
524            }
525        },
526        ProtocolGuess::Unknown => {
527            denied.fetch_add(1, Ordering::SeqCst);
528            emit_decision(
529                &emitter,
530                &cfg,
531                &event_spec,
532                "deny",
533                "",
534                reason_code::UNKNOWN_PROTOCOL,
535                None,
536                None,
537            );
538            return;
539        }
540    };
541
542    let host = match host_opt {
543        Some(h) if !h.is_empty() => h,
544        _ => {
545            // TLS without SNI / HTTP without Host. Deny per protocol.
546            denied.fetch_add(1, Ordering::SeqCst);
547            send_deny_response(&mut stream, deny_response).await;
548            emit_decision(
549                &emitter,
550                &cfg,
551                &event_spec,
552                "deny",
553                "",
554                missing_reason,
555                None,
556                None,
557            );
558            return;
559        }
560    };
561
562    // ── Step 3: match against allowlist. ──────────────────────────────────
563    if !cellos_core::hostname_allowlist::matches_allowlist(&host, &cfg.hostname_allowlist) {
564        denied.fetch_add(1, Ordering::SeqCst);
565        send_deny_response(&mut stream, deny_response).await;
566        emit_decision(
567            &emitter,
568            &cfg,
569            &event_spec,
570            "deny",
571            host.as_str(),
572            miss_reason,
573            None,
574            None,
575        );
576        return;
577    }
578
579    // ── Step 4: allow → forward to upstream. ──────────────────────────────
580    let upstream = match TcpStream::connect(cfg.upstream_addr).await {
581        Ok(s) => s,
582        Err(e) => {
583            tracing::warn!(
584                target: "cellos.supervisor.sni_proxy",
585                error = %e,
586                upstream = %cfg.upstream_addr,
587                "upstream connect failed"
588            );
589            upstream_failures.fetch_add(1, Ordering::SeqCst);
590            denied.fetch_add(1, Ordering::SeqCst);
591            // Allowlist matched but upstream unreachable: send deny response so
592            // the workload sees deterministic failure rather than hanging.
593            send_deny_response(&mut stream, deny_response).await;
594            emit_decision(
595                &emitter,
596                &cfg,
597                &event_spec,
598                "deny",
599                host.as_str(),
600                reason_code::UNKNOWN_PROTOCOL,
601                None,
602                None,
603            );
604            return;
605        }
606    };
607    allowed.fetch_add(1, Ordering::SeqCst);
608    emit_decision(
609        &emitter,
610        &cfg,
611        &event_spec,
612        "allow",
613        host.as_str(),
614        allow_reason,
615        None,
616        None,
617    );
618
619    // The peeked bytes are still in the kernel socket buffer (peek doesn't
620    // consume), so `copy_bidirectional` will replay them to the upstream
621    // automatically. This preserves the ClientHello / Host-bearing request
622    // line so the upstream sees the original wire bytes verbatim.
623    let mut client = stream;
624    let mut up = upstream;
625    if let Err(e) = tokio::io::copy_bidirectional(&mut client, &mut up).await {
626        tracing::debug!(
627            target: "cellos.supervisor.sni_proxy",
628            error = %e,
629            host = %host,
630            "copy_bidirectional ended with error"
631        );
632    }
633}
634
635/// scope: dedicated h2c connection handler with per-stream
636/// allow/deny semantics.
637///
638/// Supersedes a prior `tokio::io::copy_bidirectional` short-circuit
639/// (which locked the entire connection to the FIRST `:authority`
640/// observed) with a frame-by-frame parser:
641///
642/// 1. Connect to upstream; forward the buffered preface bytes.
643/// 2. In one task, copy upstream → client verbatim (we don't inspect
644///    server-emitted HEADERS — server-PUSH inspection is Phase 4).
645/// 3. In the main task, read client; parse frames; for HEADERS or
646///    CONTINUATION feed to `H2ConnectionDecoder.feed_frame`. For each
647///    completed block, match `:authority` against the allowlist and
648///    either forward verbatim (allow) or RST_STREAM(REFUSED_STREAM=7)
649///    plus drop subsequent stream-bound frames (deny). Emit one
650///    `l7_egress_decision` event per stream decision; each event
651///    carries `streamId`.
652/// 4. Control frames (SETTINGS / PING / WINDOW_UPDATE / GOAWAY) are
653///    forwarded verbatim regardless of any per-stream deny — they
654///    govern the connection itself.
655///
656/// Honest scope: server-PUSH'd stream `:authority` (PUSH_PROMISE
657/// HEADERS) and TRAILERS-frame inspection both remain Phase 4 — this
658/// handler reads only client→server HEADERS/CONTINUATION. TLS-encrypted
659/// h2 (h2 over TLS) is still ADR-0004 territory.
660///
661/// CONTINUATION-fragmented HEADERS across MULTIPLE streams on the same
662/// connection: per RFC 7540 §6.10, only one stream may have an open
663/// HEADERS at a time on a connection (the §6.10 PROTOCOL_ERROR contract
664/// is enforced inside `H2StreamReassembler`). For the single-stream
665/// CONTINUATION case the handler buffers the raw HEADERS+CONTINUATION
666/// frame bytes against the stream id and forwards them on allow / drops
667/// them on deny.
668#[allow(clippy::too_many_arguments)]
669/// Lazy upstream connection for the h2c handler.
670///
671/// **Zero-byte invariant on deny.** The handler MUST NOT open a TCP
672/// connection to the upstream until the first `:authority`-bearing
673/// HEADERS frame has been parsed AND the authority decision is `allow`.
674/// Until then, every byte the parse loop wants to forward (the 24-byte
675/// HTTP/2 preface, the workload's SETTINGS, WINDOW_UPDATE, and any
676/// other control frames that arrive on stream 0) is buffered in
677/// memory. If the first authority decision is `deny`, the buffer is
678/// dropped and no connection is ever made — the
679/// `break_attempt_sni_mismatch_h2c::h2c_authority_mismatch_is_denied_with_zero_upstream_bytes`
680/// regression guard locks this in.
681enum UpstreamSink {
682    /// No upstream socket yet. `buffer` accumulates bytes the parse
683    /// loop wanted to forward; on the first allow they are flushed in
684    /// order.
685    Pending { addr: SocketAddr, buffer: Vec<u8> },
686    /// Connected: bytes flow through the split write half; the read
687    /// half is exposed to the bidirectional select! loop.
688    Open {
689        rd: tokio::net::tcp::OwnedReadHalf,
690        wr: tokio::net::tcp::OwnedWriteHalf,
691    },
692}
693
694impl UpstreamSink {
695    fn pending(addr: SocketAddr) -> Self {
696        Self::Pending {
697            addr,
698            buffer: Vec::new(),
699        }
700    }
701
702    /// Buffer or write — never opens the connection.
703    async fn write(&mut self, bytes: &[u8]) -> std::io::Result<()> {
704        match self {
705            Self::Pending { buffer, .. } => {
706                buffer.extend_from_slice(bytes);
707                Ok(())
708            }
709            Self::Open { wr, .. } => wr.write_all(bytes).await,
710        }
711    }
712
713    /// Connect (idempotent). On first call: opens the TCP socket and
714    /// flushes the pending buffer. On subsequent calls: no-op.
715    async fn commit(&mut self) -> std::io::Result<()> {
716        if let Self::Pending { addr, buffer } = self {
717            let stream = TcpStream::connect(*addr).await?;
718            let (rd, mut wr) = stream.into_split();
719            if !buffer.is_empty() {
720                wr.write_all(buffer).await?;
721            }
722            *self = Self::Open { rd, wr };
723        }
724        Ok(())
725    }
726
727    async fn shutdown(&mut self) {
728        if let Self::Open { wr, .. } = self {
729            let _ = wr.shutdown().await;
730        }
731    }
732}
733
734/// `select!`-friendly read: returns the upstream's next bytes when
735/// connected, otherwise an unfullfillable future so the select branch
736/// is effectively dormant until the first allow opens the socket.
737async fn read_upstream(sink: &mut UpstreamSink, buf: &mut [u8]) -> std::io::Result<usize> {
738    use tokio::io::AsyncReadExt;
739    match sink {
740        UpstreamSink::Open { rd, .. } => rd.read(buf).await,
741        UpstreamSink::Pending { .. } => std::future::pending().await,
742    }
743}
744
745#[allow(clippy::too_many_arguments)]
746async fn handle_h2c_connection(
747    stream: TcpStream,
748    preamble: Vec<u8>,
749    cfg: SniProxyConfig,
750    emitter: Arc<dyn L7DecisionEmitter>,
751    event_spec: ExecutionCellSpec,
752    allowed: Arc<std::sync::atomic::AtomicU64>,
753    denied: Arc<std::sync::atomic::AtomicU64>,
754    upstream_failures: Arc<std::sync::atomic::AtomicU64>,
755) {
756    use tokio::io::AsyncReadExt;
757
758    // Step 1: stage the upstream sink in `Pending` mode. The TCP
759    // connection is NOT opened yet — only the first allow decision
760    // commits it. Until then every "to upstream" write goes into the
761    // sink's buffer; on a connection that only ever sees denied
762    // streams we never connect, never leak the preface, never leak
763    // SETTINGS.
764    let mut upstream = UpstreamSink::pending(cfg.upstream_addr);
765
766    // Step 2: consume the buffered preamble from the kernel socket.
767    // The peek didn't consume; do a real read so subsequent reads
768    // start AFTER the preamble.
769    let mut client = stream;
770    let mut consumed = vec![0u8; preamble.len()];
771    if let Err(e) = client.read_exact(&mut consumed).await {
772        tracing::debug!(
773            target: "cellos.supervisor.sni_proxy",
774            error = %e,
775            "h2c read_exact(preamble) failed"
776        );
777        return;
778    }
779    debug_assert_eq!(
780        consumed, preamble,
781        "kernel returned different bytes than peek"
782    );
783
784    // Step 3: split the client side now (we own both halves) and queue
785    // the 24-byte preface against the deferred upstream sink. The
786    // preface is part of every h2 connection's wire shape, but it is
787    // not committed to the upstream until the first allow.
788    let (mut client_rd, mut client_wr) = client.into_split();
789
790    if let Err(e) = upstream.write(&preamble[..h2::HTTP2_PREFACE.len()]).await {
791        tracing::debug!(
792            target: "cellos.supervisor.sni_proxy",
793            error = %e,
794            "h2c buffer(preface) failed"
795        );
796        return;
797    }
798
799    // Bidirectional copy is driven inline via tokio::select! below so
800    // the same task that owns `client_wr` can both forward upstream→
801    // client bytes verbatim AND emit RST_STREAM frames on the workload
802    // side when a per-stream allow/deny decision lands. Server-PUSH
803    // HEADERS inspection is Phase 4.
804    let mut decoder = h2::H2ConnectionDecoder::new();
805    let mut denied_streams: std::collections::HashSet<u32> = std::collections::HashSet::new();
806    // Per-stream pending HEADERS+CONTINUATION raw frame buffer. The
807    // handler buffers raw frame bytes against the stream id and
808    // forwards them on the block-level allow decision (or drops on
809    // deny). RFC 7540 §6.10 forbids interleaving multiple streams'
810    // HEADERS+CONTINUATION sequences, so only ONE entry will ever be
811    // live at a time.
812    let mut pending_headers: std::collections::HashMap<u32, Vec<u8>> =
813        std::collections::HashMap::new();
814    let mut frame_buf: Vec<u8> = preamble[h2::HTTP2_PREFACE.len()..].to_vec();
815    let mut read_chunk = vec![0u8; 16 * 1024];
816    let mut up_read_chunk = vec![0u8; 16 * 1024];
817
818    'outer: loop {
819        // Try to parse as many full frames as the buffer holds.
820        loop {
821            match h2::frame::parse_one_frame(&frame_buf) {
822                Ok(Some((header, payload, _rest))) => {
823                    let frame_total = 9 + header.length as usize;
824                    let frame_bytes = frame_buf[..frame_total].to_vec();
825                    let payload_owned = payload.to_vec();
826                    let frame_type = header.frame_type;
827                    let stream_id = header.stream_id;
828                    let is_stream_bound = stream_id != 0;
829                    let is_headers_or_continuation = frame_type == h2::frame::FRAME_TYPE_HEADERS
830                        || frame_type == h2::frame::FRAME_TYPE_CONTINUATION;
831
832                    if is_headers_or_continuation {
833                        // Buffer the raw frame bytes against stream id;
834                        // they'll be forwarded together on allow.
835                        pending_headers
836                            .entry(stream_id)
837                            .or_default()
838                            .extend_from_slice(&frame_bytes);
839
840                        match decoder.feed_frame(&header, &payload_owned) {
841                            Ok(Some(decoded)) => {
842                                let sid = decoded.stream_id;
843                                let host_norm = decoded.authority.clone();
844                                let provenance = if decoded.via_huffman {
845                                    h2::AuthorityProvenance::Huffman
846                                } else if decoded.via_dynamic_table {
847                                    h2::AuthorityProvenance::DynamicIndexed
848                                } else {
849                                    h2::AuthorityProvenance::StaticLiteral
850                                };
851                                let (allow_r, miss_r) = h2_reason_codes_for(provenance);
852                                let allow = match host_norm.as_deref() {
853                                    Some(h) if !h.is_empty() => {
854                                        cellos_core::hostname_allowlist::matches_allowlist(
855                                            h,
856                                            &cfg.hostname_allowlist,
857                                        )
858                                    }
859                                    _ => false,
860                                };
861                                let pending = pending_headers.remove(&sid).unwrap_or_default();
862                                if host_norm.is_none() {
863                                    denied.fetch_add(1, Ordering::SeqCst);
864                                    denied_streams.insert(sid);
865                                    let rst = build_rst_stream_refused(sid);
866                                    let _ = client_wr.write_all(&rst).await;
867                                    emit_decision(
868                                        &emitter,
869                                        &cfg,
870                                        &event_spec,
871                                        "deny",
872                                        "",
873                                        reason_code::H2_AUTHORITY_MISSING,
874                                        None,
875                                        Some(sid),
876                                    );
877                                } else if allow {
878                                    allowed.fetch_add(1, Ordering::SeqCst);
879                                    // First allow gates the TCP commit:
880                                    // open the upstream socket and flush
881                                    // the preface + any buffered control
882                                    // frames (SETTINGS, WINDOW_UPDATE).
883                                    // Idempotent — second + subsequent
884                                    // allows just write through.
885                                    if let Err(e) = upstream.commit().await {
886                                        tracing::warn!(
887                                            target: "cellos.supervisor.sni_proxy",
888                                            error = %e,
889                                            upstream = %cfg.upstream_addr,
890                                            "h2c upstream connect failed (deferred)"
891                                        );
892                                        upstream_failures.fetch_add(1, Ordering::SeqCst);
893                                        denied.fetch_add(1, Ordering::SeqCst);
894                                        let _ = client_wr.write_all(H2_GOAWAY_PROTOCOL_ERROR).await;
895                                        let _ = client_wr.shutdown().await;
896                                        emit_decision(
897                                            &emitter,
898                                            &cfg,
899                                            &event_spec,
900                                            "deny",
901                                            "",
902                                            reason_code::UNKNOWN_PROTOCOL,
903                                            None,
904                                            None,
905                                        );
906                                        break 'outer;
907                                    }
908                                    if let Err(e) = upstream.write(&pending).await {
909                                        tracing::debug!(
910                                            target: "cellos.supervisor.sni_proxy",
911                                            error = %e,
912                                            "h2c forward(allowed HEADERS block) failed"
913                                        );
914                                        break 'outer;
915                                    }
916                                    emit_decision(
917                                        &emitter,
918                                        &cfg,
919                                        &event_spec,
920                                        "allow",
921                                        host_norm.as_deref().unwrap_or(""),
922                                        allow_r,
923                                        None,
924                                        Some(sid),
925                                    );
926                                } else {
927                                    denied.fetch_add(1, Ordering::SeqCst);
928                                    denied_streams.insert(sid);
929                                    let rst = build_rst_stream_refused(sid);
930                                    let _ = client_wr.write_all(&rst).await;
931                                    emit_decision(
932                                        &emitter,
933                                        &cfg,
934                                        &event_spec,
935                                        "deny",
936                                        host_norm.as_deref().unwrap_or(""),
937                                        miss_r,
938                                        None,
939                                        Some(sid),
940                                    );
941                                }
942                            }
943                            Ok(None) => {
944                                // Block not yet complete; keep buffering.
945                            }
946                            Err(_e) => {
947                                denied.fetch_add(1, Ordering::SeqCst);
948                                denied_streams.insert(stream_id);
949                                pending_headers.remove(&stream_id);
950                                let rst = build_rst_stream_refused(stream_id);
951                                let _ = client_wr.write_all(&rst).await;
952                                emit_decision(
953                                    &emitter,
954                                    &cfg,
955                                    &event_spec,
956                                    "deny",
957                                    "",
958                                    reason_code::H2_UNPARSEABLE_HEADERS,
959                                    None,
960                                    Some(stream_id),
961                                );
962                            }
963                        }
964                    } else if is_stream_bound && denied_streams.contains(&stream_id) {
965                        // Drop stream-bound frames on a denied stream.
966                    } else {
967                        // Control frames (stream id 0) + stream-bound
968                        // on undenied streams: queue or forward. The
969                        // sink buffers if upstream isn't connected yet
970                        // (deferred-connect invariant for the deny path)
971                        // and writes through once the first allow has
972                        // committed the connection.
973                        if let Err(e) = upstream.write(&frame_bytes).await {
974                            tracing::debug!(
975                                target: "cellos.supervisor.sni_proxy",
976                                error = %e,
977                                "h2c forward(frame) failed"
978                            );
979                            break 'outer;
980                        }
981                    }
982
983                    frame_buf.drain(..frame_total);
984                    continue;
985                }
986                Ok(None) => break, // need more bytes
987                Err(_e) => {
988                    // Connection-level frame parse error (oversized
989                    // frame, malformed header). Send GOAWAY and bail.
990                    let _ = client_wr.write_all(H2_GOAWAY_PROTOCOL_ERROR).await;
991                    upstream.shutdown().await;
992                    break 'outer;
993                }
994            }
995        }
996
997        // Bidirectional read: prefer client (so we keep parsing) but
998        // also drain upstream → client to avoid stalling the upstream
999        // when it sends responses (HEADERS / DATA / WINDOW_UPDATE).
1000        tokio::select! {
1001            biased;
1002            r = client_rd.read(&mut read_chunk) => match r {
1003                Ok(0) => break 'outer,
1004                Ok(n) => frame_buf.extend_from_slice(&read_chunk[..n]),
1005                Err(e) => {
1006                    tracing::debug!(
1007                        target: "cellos.supervisor.sni_proxy",
1008                        error = %e,
1009                        "h2c client read error"
1010                    );
1011                    break 'outer;
1012                }
1013            },
1014            r = read_upstream(&mut upstream, &mut up_read_chunk) => match r {
1015                Ok(0) => break 'outer,
1016                Ok(n) => {
1017                    if let Err(e) = client_wr.write_all(&up_read_chunk[..n]).await {
1018                        tracing::debug!(
1019                            target: "cellos.supervisor.sni_proxy",
1020                            error = %e,
1021                            "h2c upstream→client write failed"
1022                        );
1023                        break 'outer;
1024                    }
1025                }
1026                Err(e) => {
1027                    tracing::debug!(
1028                        target: "cellos.supervisor.sni_proxy",
1029                        error = %e,
1030                        "h2c upstream read error"
1031                    );
1032                    break 'outer;
1033                }
1034            },
1035        }
1036    }
1037
1038    upstream.shutdown().await;
1039    let _ = client_wr.shutdown().await;
1040}
1041
1042/// Build a 13-byte RST_STREAM frame (RFC 7540 §6.4) on `stream_id` with
1043/// error code `REFUSED_STREAM` (0x7). Layout:
1044///
1045/// ```text
1046///   00 00 04          length = 4
1047///   03                type = RST_STREAM
1048///   00                flags = 0
1049///   ?? ?? ?? ??       stream id (R bit clear)
1050///   00 00 00 07       error code = REFUSED_STREAM
1051/// ```
1052fn build_rst_stream_refused(stream_id: u32) -> [u8; 13] {
1053    let mut out = [0u8; 13];
1054    out[0] = 0x00;
1055    out[1] = 0x00;
1056    out[2] = 0x04; // length
1057    out[3] = 0x03; // type = RST_STREAM
1058    out[4] = 0x00; // flags
1059    let sid = stream_id & 0x7FFF_FFFF;
1060    out[5..9].copy_from_slice(&sid.to_be_bytes());
1061    out[9..13].copy_from_slice(&7u32.to_be_bytes()); // REFUSED_STREAM
1062    out
1063}
1064
1065#[derive(Debug, Clone, Copy)]
1066enum DenyResponse {
1067    Drop,
1068    Http403,
1069    /// scope: retained for connection-level h2c errors that bail out
1070    /// before the per-stream handler — see [`handle_h2c_connection`].
1071    /// The per-stream allow/deny path uses
1072    /// `RST_STREAM(REFUSED_STREAM=7)` directly via
1073    /// [`build_rst_stream_refused`] instead so the connection stays
1074    /// alive for other streams.
1075    #[allow(dead_code)]
1076    H2Goaway,
1077}
1078
1079/// Minimal HTTP/2 GOAWAY frame (RFC 7540 §6.8) carrying error code
1080/// `PROTOCOL_ERROR` (0x1) on stream 0 with `last-stream-id = 1`. Layout:
1081///
1082/// ```text
1083///   00 00 08    length = 8
1084///   07          type = GOAWAY
1085///   00          flags = 0
1086///   00 00 00 00 stream id = 0 (R bit clear)
1087///   00 00 00 01 last-stream-id = 1
1088///   00 00 00 01 error code = PROTOCOL_ERROR
1089/// ```
1090const H2_GOAWAY_PROTOCOL_ERROR: &[u8; 17] = &[
1091    0x00, 0x00, 0x08, // length
1092    0x07, // type=GOAWAY
1093    0x00, // flags
1094    0x00, 0x00, 0x00, 0x00, // stream id=0
1095    0x00, 0x00, 0x00, 0x01, // last-stream-id=1
1096    0x00, 0x00, 0x00, 0x01, // error code = PROTOCOL_ERROR
1097];
1098
1099async fn send_deny_response(stream: &mut TcpStream, mode: DenyResponse) {
1100    match mode {
1101        DenyResponse::Http403 => {
1102            const RESP: &[u8] =
1103                b"HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n";
1104            if let Err(e) = stream.write_all(RESP).await {
1105                tracing::debug!(
1106                    target: "cellos.supervisor.sni_proxy",
1107                    error = %e,
1108                    "writing 403 response failed"
1109                );
1110            }
1111            let _ = stream.shutdown().await;
1112        }
1113        DenyResponse::H2Goaway => {
1114            if let Err(e) = stream.write_all(H2_GOAWAY_PROTOCOL_ERROR).await {
1115                tracing::debug!(
1116                    target: "cellos.supervisor.sni_proxy",
1117                    error = %e,
1118                    "writing h2 GOAWAY failed"
1119                );
1120            }
1121            let _ = stream.shutdown().await;
1122        }
1123        DenyResponse::Drop => {
1124            // Nothing to write — letting `stream` drop closes it.
1125        }
1126    }
1127}
1128
1129/// Build a synthetic [`ExecutionCellSpec`] carrying just the fields
1130/// [`cellos_core::observability_l7_egress_decision_data_v1`] reads (id +
1131/// correlation). The supervisor's full spec is large and we do not need it
1132/// inside the proxy thread; a thin shim keeps the API surface narrow.
1133fn build_event_spec(cfg: &SniProxyConfig) -> ExecutionCellSpec {
1134    use cellos_core::{AuthorityBundle, Correlation, Lifetime};
1135    let correlation = cfg.correlation_id.as_ref().map(|c| Correlation {
1136        correlation_id: Some(c.clone()),
1137        ..Default::default()
1138    });
1139    ExecutionCellSpec {
1140        id: format!("sni-proxy/{}/{}", cfg.cell_id, cfg.run_id),
1141        correlation,
1142        ingress: None,
1143        environment: None,
1144        placement: None,
1145        policy: None,
1146        identity: None,
1147        run: None,
1148        authority: AuthorityBundle::default(),
1149        lifetime: Lifetime { ttl_seconds: 0 },
1150        export: None,
1151        telemetry: None,
1152    }
1153}
1154
1155/// Build + emit one `l7_egress_decision` CloudEvent.
1156#[allow(clippy::too_many_arguments)]
1157fn emit_decision(
1158    emitter: &Arc<dyn L7DecisionEmitter>,
1159    cfg: &SniProxyConfig,
1160    spec: &ExecutionCellSpec,
1161    action: &str,
1162    sni_host: &str,
1163    reason_code_str: &str,
1164    rule_ref: Option<&str>,
1165    // scope: per-stream correlation id for h2 paths. None for
1166    // SNI / HTTP/1.x / unknown / peek-timeout paths.
1167    stream_id: Option<u32>,
1168) {
1169    let decision_id = uuid::Uuid::new_v4().to_string();
1170    let policy_digest = cfg.policy_digest.clone().unwrap_or_default();
1171    let keyset_id = cfg.keyset_id.clone().unwrap_or_default();
1172    let issuer_kid = cfg.issuer_kid.clone().unwrap_or_default();
1173    let data = match cellos_core::observability_l7_egress_decision_data_v1(
1174        spec,
1175        cfg.cell_id.as_str(),
1176        Some(cfg.run_id.as_str()),
1177        decision_id.as_str(),
1178        action,
1179        // Schema requires sniHost.minLength=1; substitute a conventional
1180        // placeholder when the proxy could not extract a hostname (peek
1181        // timeout, missing SNI / Host, unknown protocol). The reasonCode
1182        // preserves the underlying signal.
1183        if sni_host.is_empty() {
1184            "(unknown)"
1185        } else {
1186            sni_host
1187        },
1188        policy_digest.as_str(),
1189        keyset_id.as_str(),
1190        issuer_kid.as_str(),
1191        reason_code_str,
1192        rule_ref,
1193        stream_id,
1194    ) {
1195        Ok(v) => v,
1196        Err(e) => {
1197            tracing::warn!(
1198                target: "cellos.supervisor.sni_proxy",
1199                error = %e,
1200                "build l7_egress_decision data failed"
1201            );
1202            return;
1203        }
1204    };
1205    let observed_at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1206    let event = CloudEventV1 {
1207        specversion: "1.0".into(),
1208        id: uuid::Uuid::new_v4().to_string(),
1209        source: "cellos-sni-proxy".into(),
1210        ty: "dev.cellos.events.cell.observability.v1.l7_egress_decision".into(),
1211        datacontenttype: Some("application/json".into()),
1212        data: Some(data),
1213        time: Some(observed_at),
1214        traceparent: None,
1215    };
1216    emitter.emit(event);
1217}
1218
1219/// Best-effort field-value extractor used by tests + supervisor diagnostics.
1220/// Walks the event's `data` object and returns the JSON value at `key`.
1221#[allow(dead_code)]
1222fn event_data_get<'a>(event: &'a CloudEventV1, key: &str) -> Option<&'a Value> {
1223    event.data.as_ref()?.as_object()?.get(key)
1224}
1225
1226/// Construct an empty `Map<String, Value>` with the standard event-data
1227/// shape; reserved for future use if tests need to fabricate events without
1228/// going through the emitter path.
1229#[allow(dead_code)]
1230fn empty_data_map() -> Map<String, Value> {
1231    let mut m = Map::new();
1232    m.insert("decisionId".into(), json!(uuid::Uuid::new_v4().to_string()));
1233    m
1234}
1235
1236#[cfg(test)]
1237pub(crate) mod test_helpers {
1238    /// Minimal but well-formed TLS ClientHello bytes carrying the given
1239    /// SNI value(s). Mirrors the synth fixture in `sni::tests` so other
1240    /// tests can build records without re-implementing the wire format.
1241    pub fn build_client_hello(snis: &[&str]) -> Vec<u8> {
1242        let mut body = Vec::new();
1243        body.extend_from_slice(&[0x03, 0x03]); // legacy_version TLS 1.2
1244        body.extend_from_slice(&[0u8; 32]); // random
1245        body.push(0); // session id length
1246        body.extend_from_slice(&[0x00, 0x02, 0x13, 0x01]); // 1 cipher suite
1247        body.extend_from_slice(&[0x01, 0x00]); // compression methods
1248
1249        let mut ext_section = Vec::new();
1250        if !snis.is_empty() {
1251            let mut sn_body = Vec::new();
1252            let mut inner = Vec::new();
1253            for s in snis {
1254                inner.push(0u8); // host_name type
1255                inner.extend_from_slice(&(s.len() as u16).to_be_bytes());
1256                inner.extend_from_slice(s.as_bytes());
1257            }
1258            sn_body.extend_from_slice(&(inner.len() as u16).to_be_bytes());
1259            sn_body.extend_from_slice(&inner);
1260            ext_section.extend_from_slice(&[0x00, 0x00]);
1261            ext_section.extend_from_slice(&(sn_body.len() as u16).to_be_bytes());
1262            ext_section.extend_from_slice(&sn_body);
1263        }
1264        body.extend_from_slice(&(ext_section.len() as u16).to_be_bytes());
1265        body.extend_from_slice(&ext_section);
1266
1267        let mut hs = Vec::new();
1268        hs.push(1);
1269        let body_len_bytes = (body.len() as u32).to_be_bytes();
1270        hs.extend_from_slice(&body_len_bytes[1..]);
1271        hs.extend_from_slice(&body);
1272
1273        let mut rec = Vec::new();
1274        rec.push(22);
1275        rec.extend_from_slice(&[0x03, 0x01]);
1276        rec.extend_from_slice(&(hs.len() as u16).to_be_bytes());
1277        rec.extend_from_slice(&hs);
1278        rec
1279    }
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::test_helpers::build_client_hello;
1285    use super::*;
1286    use std::sync::Mutex;
1287    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1288    use tokio::net::{TcpListener, TcpStream};
1289
1290    /// In-memory event collector for proxy unit tests.
1291    #[derive(Default)]
1292    struct MemEmitter {
1293        events: Mutex<Vec<CloudEventV1>>,
1294    }
1295    impl L7DecisionEmitter for MemEmitter {
1296        fn emit(&self, event: CloudEventV1) {
1297            self.events.lock().unwrap().push(event);
1298        }
1299    }
1300
1301    fn cfg_with(allowlist: &[&str], upstream: SocketAddr, peek_ms: u64) -> SniProxyConfig {
1302        SniProxyConfig {
1303            bind_addr: "127.0.0.1:0".parse().unwrap(),
1304            upstream_addr: upstream,
1305            hostname_allowlist: allowlist.iter().map(|s| s.to_string()).collect(),
1306            cdn_providers: vec![],
1307            cell_id: "test-cell".into(),
1308            run_id: "test-run".into(),
1309            policy_digest: Some("digest-test".into()),
1310            keyset_id: Some("keyset-test".into()),
1311            issuer_kid: Some("kid-test".into()),
1312            correlation_id: None,
1313            upstream_resolver_id: "sni-proxy-test".into(),
1314            peek_timeout: Duration::from_millis(peek_ms),
1315        }
1316    }
1317
1318    /// Spawn a tiny localhost echo server: accepts one connection, reads
1319    /// everything until EOF, echoes it back. Returns the bound address +
1320    /// a join handle. Used as the upstream for allow-path tests.
1321    async fn spawn_echo_upstream() -> (SocketAddr, tokio::task::JoinHandle<Vec<u8>>) {
1322        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1323        let addr = listener.local_addr().unwrap();
1324        let h = tokio::spawn(async move {
1325            let (mut s, _) = listener.accept().await.unwrap();
1326            let mut buf = Vec::new();
1327            // Read until EOF or first 64KiB; the proxy's
1328            // copy_bidirectional may keep the half-open stream alive
1329            // longer than the client wants, so cap.
1330            let mut tmp = [0u8; 4096];
1331            for _ in 0..32 {
1332                match s.read(&mut tmp).await {
1333                    Ok(0) => break,
1334                    Ok(n) => buf.extend_from_slice(&tmp[..n]),
1335                    Err(_) => break,
1336                }
1337            }
1338            buf
1339        });
1340        (addr, h)
1341    }
1342
1343    /// Spawn the proxy; return its listen addr + a shutdown flag + the
1344    /// accept-loop join handle.
1345    async fn spawn_proxy(
1346        cfg: SniProxyConfig,
1347        emitter: Arc<MemEmitter>,
1348    ) -> (
1349        SocketAddr,
1350        Arc<AtomicBool>,
1351        tokio::task::JoinHandle<std::io::Result<ProxyStats>>,
1352    ) {
1353        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1354        let addr = listener.local_addr().unwrap();
1355        let shutdown = Arc::new(AtomicBool::new(false));
1356        let shutdown2 = shutdown.clone();
1357        let h = tokio::spawn(async move {
1358            run_one_shot(
1359                &cfg,
1360                listener,
1361                emitter as Arc<dyn L7DecisionEmitter>,
1362                shutdown2,
1363            )
1364            .await
1365        });
1366        (addr, shutdown, h)
1367    }
1368
1369    fn poke_shutdown(addr: SocketAddr) {
1370        // Connect-and-drop wakes accept().
1371        let _ = std::net::TcpStream::connect_timeout(&addr, Duration::from_millis(200));
1372    }
1373
1374    #[tokio::test]
1375    async fn proxy_allows_tls_with_matching_sni() {
1376        let (upstream, upstream_h) = spawn_echo_upstream().await;
1377        let emitter = Arc::new(MemEmitter::default());
1378        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1379        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1380
1381        let ch = build_client_hello(&["api.example.com"]);
1382        let mut s = TcpStream::connect(listen).await.unwrap();
1383        s.write_all(&ch).await.unwrap();
1384        s.shutdown().await.ok();
1385        // Give upstream time to drain.
1386        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1387            .await
1388            .expect("upstream task")
1389            .expect("upstream join");
1390        // Upstream should have received exactly the ClientHello bytes
1391        // (peek didn't consume; copy_bidirectional replayed them).
1392        assert_eq!(
1393            upstream_bytes, ch,
1394            "upstream did not receive forwarded ClientHello bytes verbatim"
1395        );
1396
1397        shutdown.store(true, Ordering::SeqCst);
1398        poke_shutdown(listen);
1399        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1400
1401        let evs = emitter.events.lock().unwrap();
1402        assert_eq!(evs.len(), 1, "expected exactly one event");
1403        let data = evs[0].data.as_ref().unwrap();
1404        assert_eq!(data["action"], "allow");
1405        assert_eq!(data["reasonCode"], "l7_sni_allowlist_match");
1406        assert_eq!(data["sniHost"], "api.example.com");
1407    }
1408
1409    #[tokio::test]
1410    async fn proxy_denies_tls_with_unmatched_sni() {
1411        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1412        let emitter = Arc::new(MemEmitter::default());
1413        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1414        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1415
1416        let ch = build_client_hello(&["evil.example.com"]);
1417        let mut s = TcpStream::connect(listen).await.unwrap();
1418        s.write_all(&ch).await.unwrap();
1419        // For TLS deny, the proxy drops — read should return EOF quickly.
1420        let mut sink = Vec::new();
1421        let read_timeout =
1422            tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1423        assert!(
1424            read_timeout.is_ok(),
1425            "TLS deny should close the stream promptly; got peek timeout"
1426        );
1427
1428        shutdown.store(true, Ordering::SeqCst);
1429        poke_shutdown(listen);
1430        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1431
1432        let evs = emitter.events.lock().unwrap();
1433        assert_eq!(evs.len(), 1);
1434        let data = evs[0].data.as_ref().unwrap();
1435        assert_eq!(data["action"], "deny");
1436        assert_eq!(data["reasonCode"], "l7_sni_allowlist_miss");
1437        assert_eq!(data["sniHost"], "evil.example.com");
1438    }
1439
1440    #[tokio::test]
1441    async fn proxy_denies_tls_without_sni() {
1442        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1443        let emitter = Arc::new(MemEmitter::default());
1444        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1445        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1446
1447        let ch = build_client_hello(&[]); // no SNI
1448        let mut s = TcpStream::connect(listen).await.unwrap();
1449        s.write_all(&ch).await.unwrap();
1450        let mut sink = Vec::new();
1451        let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1452
1453        shutdown.store(true, Ordering::SeqCst);
1454        poke_shutdown(listen);
1455        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1456
1457        let evs = emitter.events.lock().unwrap();
1458        assert_eq!(evs.len(), 1);
1459        let data = evs[0].data.as_ref().unwrap();
1460        assert_eq!(data["action"], "deny");
1461        assert_eq!(data["reasonCode"], "l7_sni_missing");
1462    }
1463
1464    #[tokio::test]
1465    async fn proxy_allows_http_with_matching_host() {
1466        let (upstream, upstream_h) = spawn_echo_upstream().await;
1467        let emitter = Arc::new(MemEmitter::default());
1468        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1469        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1470
1471        let req = b"GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n";
1472        let mut s = TcpStream::connect(listen).await.unwrap();
1473        s.write_all(req).await.unwrap();
1474        s.shutdown().await.ok();
1475        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1476            .await
1477            .expect("upstream task")
1478            .expect("upstream join");
1479        assert_eq!(upstream_bytes, req.to_vec());
1480
1481        shutdown.store(true, Ordering::SeqCst);
1482        poke_shutdown(listen);
1483        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1484
1485        let evs = emitter.events.lock().unwrap();
1486        assert_eq!(evs.len(), 1);
1487        let data = evs[0].data.as_ref().unwrap();
1488        assert_eq!(data["action"], "allow");
1489        assert_eq!(data["reasonCode"], "l7_http_host_allowlist_match");
1490        assert_eq!(data["sniHost"], "api.example.com");
1491    }
1492
1493    #[tokio::test]
1494    async fn proxy_denies_http_with_unmatched_host_and_returns_403() {
1495        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1496        let emitter = Arc::new(MemEmitter::default());
1497        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1498        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1499
1500        let req = b"GET / HTTP/1.1\r\nHost: evil.example.com\r\nConnection: close\r\n\r\n";
1501        let mut s = TcpStream::connect(listen).await.unwrap();
1502        s.write_all(req).await.unwrap();
1503        // Read whatever the proxy sends back. On a clean shutdown we get
1504        // the full 403 response; on platforms where the immediate drop
1505        // races into a TCP RST we may see ConnectionReset before EOF —
1506        // either path is acceptable as long as the buffered bytes start
1507        // with the expected status line.
1508        let mut response = Vec::new();
1509        let read_result =
1510            tokio::time::timeout(Duration::from_secs(2), s.read_to_end(&mut response))
1511                .await
1512                .expect("read deadline");
1513        // Treat ConnectionReset as a successful read of whatever bytes
1514        // were already buffered (kernel will surface RST after the FIN
1515        // bytes are consumed on most platforms — macOS, in particular,
1516        // returns the bytes first then ECONNRESET on the next read).
1517        match read_result {
1518            Ok(_) => {}
1519            Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {}
1520            Err(e) => panic!("unexpected read error: {e}"),
1521        }
1522        let resp_str = String::from_utf8_lossy(&response);
1523        assert!(
1524            resp_str.starts_with("HTTP/1.1 403 Forbidden\r\n"),
1525            "expected 403 response, got: {resp_str:?}"
1526        );
1527
1528        shutdown.store(true, Ordering::SeqCst);
1529        poke_shutdown(listen);
1530        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1531
1532        let evs = emitter.events.lock().unwrap();
1533        assert_eq!(evs.len(), 1);
1534        let data = evs[0].data.as_ref().unwrap();
1535        assert_eq!(data["action"], "deny");
1536        assert_eq!(data["reasonCode"], "l7_http_host_allowlist_miss");
1537        assert_eq!(data["sniHost"], "evil.example.com");
1538    }
1539
1540    #[tokio::test]
1541    async fn proxy_emits_one_event_per_connection() {
1542        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1543        let emitter = Arc::new(MemEmitter::default());
1544        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1545        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1546
1547        // Three independent connections: TLS allow, TLS deny, HTTP deny.
1548        // The TLS allow consumes the only echo upstream slot.
1549        for ch in [build_client_hello(&["api.example.com"])] {
1550            let mut s = TcpStream::connect(listen).await.unwrap();
1551            s.write_all(&ch).await.unwrap();
1552            s.shutdown().await.ok();
1553            tokio::time::sleep(Duration::from_millis(80)).await;
1554        }
1555        for ch in [build_client_hello(&["evil.example.com"])] {
1556            let mut s = TcpStream::connect(listen).await.unwrap();
1557            s.write_all(&ch).await.unwrap();
1558            // TLS deny: drop. read until EOF.
1559            let mut sink = Vec::new();
1560            let _ =
1561                tokio::time::timeout(Duration::from_millis(400), s.read_to_end(&mut sink)).await;
1562        }
1563        let req = b"GET / HTTP/1.1\r\nHost: blocked.example.com\r\n\r\n";
1564        let mut s = TcpStream::connect(listen).await.unwrap();
1565        s.write_all(req).await.unwrap();
1566        let mut sink = Vec::new();
1567        let _ = tokio::time::timeout(Duration::from_millis(400), s.read_to_end(&mut sink)).await;
1568
1569        shutdown.store(true, Ordering::SeqCst);
1570        poke_shutdown(listen);
1571        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1572
1573        let evs = emitter.events.lock().unwrap();
1574        assert_eq!(
1575            evs.len(),
1576            3,
1577            "expected exactly one event per connection, got {}: {:#?}",
1578            evs.len(),
1579            evs.iter().map(|e| &e.data).collect::<Vec<_>>()
1580        );
1581    }
1582
1583    #[tokio::test]
1584    async fn proxy_returns_peek_timeout_when_client_silent() {
1585        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1586        let emitter = Arc::new(MemEmitter::default());
1587        let cfg = cfg_with(&["api.example.com"], upstream, 50); // 50ms peek timeout
1588        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1589
1590        // Open the connection but never send anything.
1591        let s = TcpStream::connect(listen).await.unwrap();
1592        // Wait past the peek_timeout so the proxy fires its timeout path.
1593        tokio::time::sleep(Duration::from_millis(250)).await;
1594        drop(s);
1595
1596        shutdown.store(true, Ordering::SeqCst);
1597        poke_shutdown(listen);
1598        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1599
1600        let evs = emitter.events.lock().unwrap();
1601        assert!(!evs.is_empty(), "expected at least one peek_timeout event");
1602        let data = evs[0].data.as_ref().unwrap();
1603        assert_eq!(data["action"], "deny");
1604        assert_eq!(data["reasonCode"], "l7_peek_timeout");
1605    }
1606
1607    #[test]
1608    fn guess_protocol_classifies_correctly() {
1609        assert_eq!(guess_protocol(&[22, 0x03, 0x03]), ProtocolGuess::Tls);
1610        assert_eq!(guess_protocol(b"GET / HTTP/1.1\r\n"), ProtocolGuess::Http1);
1611        assert_eq!(guess_protocol(b"POST / HTTP/1.1"), ProtocolGuess::Http1);
1612        assert_eq!(guess_protocol(b"\x00\x00\x00\x00"), ProtocolGuess::Unknown);
1613        assert_eq!(guess_protocol(b""), ProtocolGuess::Unknown);
1614        // The h2c preface MUST classify as H2c, not Http1, even though
1615        // it starts with `PRI` (an HTTP-shaped token).
1616        assert_eq!(guess_protocol(h2::HTTP2_PREFACE), ProtocolGuess::H2c);
1617    }
1618
1619    // ── SEC-22 Phase 3a integration tests (h2c HEADERS-frame :authority) ──
1620
1621    /// Build a complete h2c stream prefix: 24-byte preface + SETTINGS +
1622    /// HEADERS frame carrying a literal `:authority` HPACK entry whose
1623    /// name index = 1 (static-table) and value = `authority`.
1624    fn build_h2c_stream(authority: &str) -> Vec<u8> {
1625        let mut out = h2::HTTP2_PREFACE.to_vec();
1626        let block = h2::test_helpers::hpack_literal_indexed_name(1, authority);
1627        out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1628        out
1629    }
1630
1631    /// Build an h2c stream whose first HEADERS frame carries no
1632    /// `:authority` (only a `:method` reference, static index 2 = GET).
1633    fn build_h2c_stream_no_authority() -> Vec<u8> {
1634        let mut out = h2::HTTP2_PREFACE.to_vec();
1635        // Indexed header field, index 2 (`:method: GET`) — present in
1636        // the static table; ignored by the authority extractor.
1637        let block = h2::test_helpers::hpack_indexed(2);
1638        out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1639        out
1640    }
1641
1642    /// Build an h2c stream whose HEADERS frame is fragmented across
1643    /// CONTINUATION (END_HEADERS not set), with the END_HEADERS-bearing
1644    /// CONTINUATION present. Phase 3g reassembles cleanly.
1645    fn build_h2c_stream_continuation_reassemblable(authority: &str) -> Vec<u8> {
1646        let mut out = h2::HTTP2_PREFACE.to_vec();
1647        let block = h2::test_helpers::hpack_literal_indexed_name(1, authority);
1648        let mid = block.len() / 2;
1649        let (a, b) = block.split_at(mid);
1650        let mut sequence = h2::test_helpers::empty_settings_frame();
1651        sequence.extend_from_slice(&h2::test_helpers::continuation_fragmented_headers(a));
1652        sequence.extend_from_slice(&h2::test_helpers::continuation_frame(b, true));
1653        out.extend_from_slice(&sequence);
1654        out
1655    }
1656
1657    /// Build an h2c stream whose first HEADERS frame is genuinely
1658    /// unparseable: an HPACK indexed-header reference at index 200 (well
1659    /// beyond static + empty dynamic — `HpackInvalidIndex`).
1660    fn build_h2c_stream_unparseable() -> Vec<u8> {
1661        let mut out = h2::HTTP2_PREFACE.to_vec();
1662        let block = h2::test_helpers::hpack_indexed(200);
1663        out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1664        out
1665    }
1666
1667    /// Build an h2c stream whose `:authority` value is Huffman-coded.
1668    fn build_h2c_stream_huffman(authority: &str) -> Vec<u8> {
1669        let mut out = h2::HTTP2_PREFACE.to_vec();
1670        let block = h2::test_helpers::hpack_literal_indexed_name_huffman(1, authority);
1671        out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1672        out
1673    }
1674
1675    /// Build an h2c stream whose `:authority` is reachable only through
1676    /// the dynamic table. Strategy: in a SINGLE header block, emit
1677    ///   (1) literal-with-incremental-indexing for `:authority = "<a>"`
1678    ///       — but with the value EMPTY so the decoder's
1679    ///       `!value.is_empty()` skip leaves it unmatched, while STILL
1680    ///       inserting `(":authority", "")` at dynamic index 62. (Empty
1681    ///       value is unusual but valid HPACK.)
1682    ///   (2) literal-with-incremental-indexing for a non-`:authority`
1683    ///       name (`:scheme = "<authority>"` to keep the bytes
1684    ///       interesting) — pushes (1) to dynamic index 63.
1685    ///   (3) literal-with-incremental-indexing referencing dynamic index
1686    ///       63 for the name (`:authority`) and the real authority value
1687    ///       — this is a literal whose NAME is dynamic-indexed, so its
1688    ///       provenance is `DynamicIndexed`. This is the FIRST non-empty
1689    ///       `:authority` and locks the proxy's decision.
1690    fn build_h2c_stream_dynamic_indexed_name(authority: &str) -> Vec<u8> {
1691        let mut out = h2::HTTP2_PREFACE.to_vec();
1692        let mut block: Vec<u8> = Vec::new();
1693        // (1) literal-with-incremental-indexing :authority = "" (empty)
1694        block.extend_from_slice(&h2::test_helpers::hpack_literal_indexed_name(1, ""));
1695        // (2) literal-with-incremental-indexing :scheme = authority
1696        block.extend_from_slice(&h2::test_helpers::hpack_literal_indexed_name(6, authority));
1697        // (3) literal-with-incremental-indexing name=dyn63 ":authority",
1698        //     value = authority. Name index 63 in a 6-bit prefix needs
1699        //     multi-octet integer encoding: 63 = mask(6=0x3F) + 0 → 0x7F
1700        //     in prefix-bits-saturated form; first byte is 0x40 | 0x3F =
1701        //     0x7F, then the multi-octet continuation 0x00.
1702        block.push(0x40 | 0x3F); // 01 + saturated 6-bit prefix
1703        block.push(0x00); // continuation: value = 0 → total = 63
1704        h2::test_helpers::encode_literal_string(&mut block, authority);
1705        out.extend_from_slice(&h2::test_helpers::settings_then_headers(&block));
1706        out
1707    }
1708
1709    #[tokio::test]
1710    async fn proxy_allows_h2c_with_matching_authority() {
1711        let (upstream, upstream_h) = spawn_echo_upstream().await;
1712        let emitter = Arc::new(MemEmitter::default());
1713        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1714        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1715
1716        let stream_bytes = build_h2c_stream("api.example.com");
1717        let mut s = TcpStream::connect(listen).await.unwrap();
1718        s.write_all(&stream_bytes).await.unwrap();
1719        s.shutdown().await.ok();
1720
1721        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1722            .await
1723            .expect("upstream task")
1724            .expect("upstream join");
1725        // The proxy must forward the buffered preface + HEADERS verbatim
1726        // — copy_bidirectional replays the peeked bytes after the allow
1727        // decision, just like the SNI / HTTP/1.x paths.
1728        assert_eq!(
1729            upstream_bytes, stream_bytes,
1730            "upstream did not receive the forwarded h2c bytes verbatim"
1731        );
1732
1733        shutdown.store(true, Ordering::SeqCst);
1734        poke_shutdown(listen);
1735        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1736
1737        let evs = emitter.events.lock().unwrap();
1738        assert_eq!(evs.len(), 1, "expected exactly one event");
1739        let data = evs[0].data.as_ref().unwrap();
1740        assert_eq!(data["action"], "allow");
1741        assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_match");
1742        assert_eq!(data["sniHost"], "api.example.com");
1743    }
1744
1745    #[tokio::test]
1746    async fn proxy_denies_h2c_with_unmatched_authority() {
1747        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1748        let emitter = Arc::new(MemEmitter::default());
1749        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1750        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1751
1752        let stream_bytes = build_h2c_stream("evil.example.com");
1753        let mut s = TcpStream::connect(listen).await.unwrap();
1754        s.write_all(&stream_bytes).await.unwrap();
1755        let mut sink = Vec::new();
1756        let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1757
1758        shutdown.store(true, Ordering::SeqCst);
1759        poke_shutdown(listen);
1760        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1761
1762        let evs = emitter.events.lock().unwrap();
1763        assert_eq!(evs.len(), 1);
1764        let data = evs[0].data.as_ref().unwrap();
1765        assert_eq!(data["action"], "deny");
1766        assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_miss");
1767        assert_eq!(data["sniHost"], "evil.example.com");
1768    }
1769
1770    #[tokio::test]
1771    async fn proxy_emits_event_for_h2_unparseable_headers() {
1772        // scope: CONTINUATION-fragmented HEADERS reassemble cleanly when the
1773        // END_HEADERS-bearing CONTINUATION is present, so the remaining
1774        // "unparseable" cases are HPACK errors (invalid index, invalid
1775        // Huffman, oversized header block, etc.). This test uses an HPACK
1776        // indexed reference at index 200 (well beyond static + empty dynamic).
1777        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1778        let emitter = Arc::new(MemEmitter::default());
1779        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1780        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1781
1782        let stream_bytes = build_h2c_stream_unparseable();
1783        let mut s = TcpStream::connect(listen).await.unwrap();
1784        s.write_all(&stream_bytes).await.unwrap();
1785        let mut sink = Vec::new();
1786        let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1787
1788        shutdown.store(true, Ordering::SeqCst);
1789        poke_shutdown(listen);
1790        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1791
1792        let evs = emitter.events.lock().unwrap();
1793        assert_eq!(evs.len(), 1);
1794        let data = evs[0].data.as_ref().unwrap();
1795        assert_eq!(data["action"], "deny");
1796        assert_eq!(data["reasonCode"], "l7_h2_unparseable_headers");
1797    }
1798
1799    // ── SEC-22 Phase 3g integration tests (full HPACK + CONTINUATION) ──
1800
1801    #[tokio::test]
1802    async fn proxy_allows_h2c_with_continuation_fragmented_authority() {
1803        let (upstream, upstream_h) = spawn_echo_upstream().await;
1804        let emitter = Arc::new(MemEmitter::default());
1805        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1806        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1807
1808        let stream_bytes = build_h2c_stream_continuation_reassemblable("api.example.com");
1809        let mut s = TcpStream::connect(listen).await.unwrap();
1810        s.write_all(&stream_bytes).await.unwrap();
1811        s.shutdown().await.ok();
1812
1813        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
1814            .await
1815            .expect("upstream task")
1816            .expect("upstream join");
1817        assert_eq!(upstream_bytes, stream_bytes);
1818
1819        shutdown.store(true, Ordering::SeqCst);
1820        poke_shutdown(listen);
1821        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1822
1823        let evs = emitter.events.lock().unwrap();
1824        assert_eq!(evs.len(), 1);
1825        let data = evs[0].data.as_ref().unwrap();
1826        assert_eq!(data["action"], "allow");
1827        // CONTINUATION reassembly + literal-indexed name → StaticLiteral.
1828        assert_eq!(data["reasonCode"], "l7_h2_authority_allowlist_match");
1829        assert_eq!(data["sniHost"], "api.example.com");
1830    }
1831
1832    #[tokio::test]
1833    async fn proxy_allows_h2c_with_huffman_encoded_authority() {
1834        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1835        let emitter = Arc::new(MemEmitter::default());
1836        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1837        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1838
1839        let stream_bytes = build_h2c_stream_huffman("api.example.com");
1840        let mut s = TcpStream::connect(listen).await.unwrap();
1841        s.write_all(&stream_bytes).await.unwrap();
1842        s.shutdown().await.ok();
1843
1844        // Allow the proxy worker to settle.
1845        tokio::time::sleep(Duration::from_millis(200)).await;
1846
1847        shutdown.store(true, Ordering::SeqCst);
1848        poke_shutdown(listen);
1849        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1850
1851        let evs = emitter.events.lock().unwrap();
1852        assert_eq!(evs.len(), 1);
1853        let data = evs[0].data.as_ref().unwrap();
1854        assert_eq!(data["action"], "allow");
1855        assert_eq!(
1856            data["reasonCode"], "l7_h2_authority_allowlist_match_huffman",
1857            "Huffman provenance must surface as the differentiated reason code"
1858        );
1859        assert_eq!(data["sniHost"], "api.example.com");
1860    }
1861
1862    #[tokio::test]
1863    async fn proxy_allows_h2c_with_dynamic_table_indexed_authority() {
1864        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1865        let emitter = Arc::new(MemEmitter::default());
1866        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1867        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1868
1869        let stream_bytes = build_h2c_stream_dynamic_indexed_name("api.example.com");
1870        let mut s = TcpStream::connect(listen).await.unwrap();
1871        s.write_all(&stream_bytes).await.unwrap();
1872        s.shutdown().await.ok();
1873
1874        tokio::time::sleep(Duration::from_millis(200)).await;
1875
1876        shutdown.store(true, Ordering::SeqCst);
1877        poke_shutdown(listen);
1878        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1879
1880        let evs = emitter.events.lock().unwrap();
1881        assert_eq!(evs.len(), 1, "expected one event, got {evs:#?}");
1882        let data = evs[0].data.as_ref().unwrap();
1883        assert_eq!(data["action"], "allow");
1884        assert_eq!(
1885            data["reasonCode"], "l7_h2_authority_allowlist_match_dynamic_indexed",
1886            "dynamic-table-name provenance must surface as the differentiated reason code"
1887        );
1888    }
1889
1890    #[tokio::test]
1891    async fn proxy_denies_h2c_with_authority_extracted_from_huffman_literal_not_in_allowlist() {
1892        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1893        let emitter = Arc::new(MemEmitter::default());
1894        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1895        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1896
1897        let stream_bytes = build_h2c_stream_huffman("evil.example.com");
1898        let mut s = TcpStream::connect(listen).await.unwrap();
1899        s.write_all(&stream_bytes).await.unwrap();
1900        let mut sink = Vec::new();
1901        let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1902
1903        shutdown.store(true, Ordering::SeqCst);
1904        poke_shutdown(listen);
1905        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1906
1907        let evs = emitter.events.lock().unwrap();
1908        assert_eq!(evs.len(), 1);
1909        let data = evs[0].data.as_ref().unwrap();
1910        assert_eq!(data["action"], "deny");
1911        assert_eq!(
1912            data["reasonCode"], "l7_h2_authority_allowlist_miss_huffman",
1913            "Huffman provenance must surface in the deny reason code"
1914        );
1915        assert_eq!(data["sniHost"], "evil.example.com");
1916    }
1917
1918    #[tokio::test]
1919    async fn proxy_emits_event_with_extracted_authority_when_huffman_used() {
1920        // Defence-in-depth audit signal: even on the allow path, the
1921        // emitted event MUST carry the extracted hostname so operators
1922        // see what got through. Phase 3g delivers this for Huffman where
1923        // P3c could only emit `l7_h2_unparseable_headers` with empty
1924        // sniHost.
1925        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1926        let emitter = Arc::new(MemEmitter::default());
1927        let cfg = cfg_with(&["my-service.example.com"], upstream, 500);
1928        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1929
1930        let stream_bytes = build_h2c_stream_huffman("my-service.example.com");
1931        let mut s = TcpStream::connect(listen).await.unwrap();
1932        s.write_all(&stream_bytes).await.unwrap();
1933        s.shutdown().await.ok();
1934
1935        tokio::time::sleep(Duration::from_millis(200)).await;
1936
1937        shutdown.store(true, Ordering::SeqCst);
1938        poke_shutdown(listen);
1939        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1940
1941        let evs = emitter.events.lock().unwrap();
1942        assert_eq!(evs.len(), 1);
1943        let data = evs[0].data.as_ref().unwrap();
1944        // The hostname is present (no longer redacted to empty as P3c did
1945        // for the `l7_h2_unparseable_headers` Huffman path).
1946        assert_eq!(data["sniHost"], "my-service.example.com");
1947        assert_eq!(data["action"], "allow");
1948        assert_eq!(
1949            data["reasonCode"],
1950            "l7_h2_authority_allowlist_match_huffman",
1951        );
1952    }
1953
1954    #[tokio::test]
1955    async fn proxy_emits_event_for_h2_missing_authority() {
1956        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1957        let emitter = Arc::new(MemEmitter::default());
1958        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1959        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1960
1961        let stream_bytes = build_h2c_stream_no_authority();
1962        let mut s = TcpStream::connect(listen).await.unwrap();
1963        s.write_all(&stream_bytes).await.unwrap();
1964        let mut sink = Vec::new();
1965        let _ = tokio::time::timeout(Duration::from_millis(800), s.read_to_end(&mut sink)).await;
1966
1967        shutdown.store(true, Ordering::SeqCst);
1968        poke_shutdown(listen);
1969        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1970
1971        let evs = emitter.events.lock().unwrap();
1972        assert_eq!(evs.len(), 1);
1973        let data = evs[0].data.as_ref().unwrap();
1974        assert_eq!(data["action"], "deny");
1975        assert_eq!(data["reasonCode"], "l7_h2_authority_missing");
1976    }
1977
1978    /// scope: deny-response shape on a denied h2c stream. A prior shape
1979    /// returned `GOAWAY(PROTOCOL_ERROR)` and closed the whole connection;
1980    /// the current shape keeps the connection alive and emits
1981    /// `RST_STREAM(REFUSED_STREAM=7)` for the offending stream only —
1982    /// see [`build_rst_stream_refused`]. This test asserts the wire shape.
1983    #[tokio::test]
1984    async fn proxy_h2c_deny_responds_with_rst_stream_refused() {
1985        let (upstream, _upstream_h) = spawn_echo_upstream().await;
1986        let emitter = Arc::new(MemEmitter::default());
1987        let cfg = cfg_with(&["api.example.com"], upstream, 500);
1988        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
1989
1990        let stream_bytes = build_h2c_stream("evil.example.com");
1991        let mut s = TcpStream::connect(listen).await.unwrap();
1992        s.write_all(&stream_bytes).await.unwrap();
1993        // The proxy keeps the connection alive after a single-stream deny;
1994        // close from our side so the read_to_end below returns rather
1995        // than hanging on the open connection.
1996        s.shutdown().await.ok();
1997        let mut response = Vec::new();
1998        let read_result =
1999            tokio::time::timeout(Duration::from_secs(2), s.read_to_end(&mut response))
2000                .await
2001                .expect("read deadline");
2002        match read_result {
2003            Ok(_) => {}
2004            Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {}
2005            Err(e) => panic!("unexpected read error: {e}"),
2006        }
2007        // The 13-byte RST_STREAM(REFUSED_STREAM=7) frame must be present.
2008        // Length=4, type=RST_STREAM (0x03), flags=0, stream id=1,
2009        // error code=REFUSED_STREAM (7).
2010        assert!(
2011            response.len() >= 13,
2012            "expected at least one full RST_STREAM frame, got {} bytes: {:02x?}",
2013            response.len(),
2014            response
2015        );
2016        assert_eq!(&response[0..3], &[0x00, 0x00, 0x04], "RST_STREAM length");
2017        assert_eq!(response[3], 0x03, "RST_STREAM frame type");
2018        assert_eq!(response[4], 0x00, "RST_STREAM flags");
2019        assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x01], "stream id 1");
2020        assert_eq!(
2021            &response[9..13],
2022            &[0x00, 0x00, 0x00, 0x07],
2023            "RST_STREAM error code = REFUSED_STREAM"
2024        );
2025        // The deny event MUST carry the streamId for audit correlation.
2026        // Drop the guard before awaiting the proxy join below
2027        // (clippy::await_holding_lock).
2028        {
2029            let evs = emitter.events.lock().unwrap();
2030            assert!(!evs.is_empty(), "expected at least one deny event");
2031            let data = evs[0].data.as_ref().unwrap();
2032            assert_eq!(data["action"], "deny");
2033            assert_eq!(data["streamId"], 1);
2034        }
2035
2036        shutdown.store(true, Ordering::SeqCst);
2037        poke_shutdown(listen);
2038        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2039    }
2040
2041    /// Original GOAWAY assertion structure preserved as a no-op stub
2042    /// — the test is renamed and rewritten above. (No-op helper kept
2043    /// to avoid an artificial diff in the asserted wire-shape match
2044    /// arms below.)
2045    #[allow(dead_code)]
2046    fn _legacy_goaway_assertions(response: &[u8]) {
2047        assert_eq!(&response[0..3], &[0x00, 0x00, 0x08], "GOAWAY length");
2048        assert_eq!(response[3], 0x07, "GOAWAY frame type");
2049        assert_eq!(response[4], 0x00, "GOAWAY flags");
2050        assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x00], "stream id 0");
2051        assert_eq!(
2052            &response[13..17],
2053            &[0x00, 0x00, 0x00, 0x01],
2054            "GOAWAY error code = PROTOCOL_ERROR"
2055        );
2056    }
2057
2058    // ── SEC-22 Phase 3g.1 integration tests (per-stream allow/deny) ──
2059
2060    /// Build the canonical h2c stream prefix carrying TWO HEADERS frames
2061    /// — first on stream 1, then on stream 3 — each with a different
2062    /// `:authority`. Phase 3g.1 must inspect each stream's authority
2063    /// independently rather than locking the connection to the first
2064    /// observed authority.
2065    fn build_h2c_two_streams(authority_a: &str, authority_b: &str) -> Vec<u8> {
2066        let mut out = h2::HTTP2_PREFACE.to_vec();
2067        out.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2068        let block_a = h2::test_helpers::hpack_literal_indexed_name(1, authority_a);
2069        out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_a, 1));
2070        let block_b = h2::test_helpers::hpack_literal_indexed_name(1, authority_b);
2071        out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_b, 3));
2072        out
2073    }
2074
2075    /// Build an h2c stream that denies one stream then sends a DATA
2076    /// frame on that denied stream — used to verify the proxy drops
2077    /// the DATA frame instead of forwarding it.
2078    fn build_h2c_denied_stream_then_data(
2079        authority_allowed: &str,
2080        authority_denied: &str,
2081        data_payload: &[u8],
2082    ) -> Vec<u8> {
2083        let mut out = h2::HTTP2_PREFACE.to_vec();
2084        out.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2085        // Stream 1: denied.
2086        let block_d = h2::test_helpers::hpack_literal_indexed_name(1, authority_denied);
2087        out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_d, 1));
2088        // DATA on stream 1 — must be dropped by proxy.
2089        out.extend_from_slice(&h2::test_helpers::data_frame_on_stream(data_payload, 1));
2090        // Stream 3: allowed.
2091        let block_a = h2::test_helpers::hpack_literal_indexed_name(1, authority_allowed);
2092        out.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(&block_a, 3));
2093        out
2094    }
2095
2096    #[tokio::test]
2097    async fn proxy_allows_two_streams_with_matching_authority_on_same_connection() {
2098        let (upstream, upstream_h) = spawn_echo_upstream().await;
2099        let emitter = Arc::new(MemEmitter::default());
2100        let cfg = cfg_with(&["api.example.com", "api.other.com"], upstream, 500);
2101        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2102
2103        let stream_bytes = build_h2c_two_streams("api.example.com", "api.other.com");
2104        let mut s = TcpStream::connect(listen).await.unwrap();
2105        s.write_all(&stream_bytes).await.unwrap();
2106        s.shutdown().await.ok();
2107
2108        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2109            .await
2110            .expect("upstream task")
2111            .expect("upstream join");
2112        // Upstream sees the preface + SETTINGS + both HEADERS frames
2113        // verbatim — the per-stream handler forwarded each on allow.
2114        assert_eq!(upstream_bytes, stream_bytes);
2115
2116        shutdown.store(true, Ordering::SeqCst);
2117        poke_shutdown(listen);
2118        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2119
2120        let evs = emitter.events.lock().unwrap();
2121        assert_eq!(
2122            evs.len(),
2123            2,
2124            "expected one allow event per stream, got {evs:#?}"
2125        );
2126        let mut stream_ids: Vec<i64> = evs
2127            .iter()
2128            .map(|e| {
2129                e.data
2130                    .as_ref()
2131                    .unwrap()
2132                    .get("streamId")
2133                    .and_then(|v| v.as_i64())
2134                    .expect("event must carry streamId")
2135            })
2136            .collect();
2137        stream_ids.sort_unstable();
2138        assert_eq!(stream_ids, vec![1, 3]);
2139        for ev in evs.iter() {
2140            let data = ev.data.as_ref().unwrap();
2141            assert_eq!(data["action"], "allow");
2142        }
2143    }
2144
2145    #[tokio::test]
2146    async fn proxy_denies_one_stream_keeps_connection_open_for_other_streams() {
2147        let (upstream, _upstream_h) = spawn_echo_upstream().await;
2148        let emitter = Arc::new(MemEmitter::default());
2149        let cfg = cfg_with(&["api.example.com"], upstream, 500);
2150        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2151
2152        // Stream 1 = evil (deny → RST_STREAM); stream 3 = allowed.
2153        let stream_bytes = build_h2c_two_streams("evil.example.com", "api.example.com");
2154        let mut s = TcpStream::connect(listen).await.unwrap();
2155        s.write_all(&stream_bytes).await.unwrap();
2156        // Read whatever the proxy sends back. We expect the
2157        // RST_STREAM(REFUSED_STREAM=7) for stream 1.
2158        let mut response = vec![0u8; 13];
2159        let read_n = tokio::time::timeout(Duration::from_secs(2), s.read_exact(&mut response))
2160            .await
2161            .expect("read deadline");
2162        assert!(read_n.is_ok(), "expected to read 13 bytes (RST_STREAM)");
2163        assert_eq!(&response[0..3], &[0x00, 0x00, 0x04], "RST_STREAM length");
2164        assert_eq!(response[3], 0x03, "RST_STREAM type");
2165        assert_eq!(&response[5..9], &[0x00, 0x00, 0x00, 0x01], "stream id 1");
2166        assert_eq!(
2167            &response[9..13],
2168            &[0x00, 0x00, 0x00, 0x07],
2169            "REFUSED_STREAM"
2170        );
2171
2172        s.shutdown().await.ok();
2173
2174        shutdown.store(true, Ordering::SeqCst);
2175        poke_shutdown(listen);
2176        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2177
2178        let evs = emitter.events.lock().unwrap();
2179        assert_eq!(evs.len(), 2, "expected one event per stream");
2180        // Stream 1 = deny; stream 3 = allow.
2181        let mut by_stream: std::collections::HashMap<i64, &CloudEventV1> =
2182            std::collections::HashMap::new();
2183        for ev in evs.iter() {
2184            let sid = ev
2185                .data
2186                .as_ref()
2187                .unwrap()
2188                .get("streamId")
2189                .and_then(|v| v.as_i64())
2190                .unwrap();
2191            by_stream.insert(sid, ev);
2192        }
2193        assert_eq!(by_stream[&1].data.as_ref().unwrap()["action"], "deny");
2194        assert_eq!(by_stream[&3].data.as_ref().unwrap()["action"], "allow");
2195    }
2196
2197    #[tokio::test]
2198    async fn proxy_emits_one_event_per_stream_decision() {
2199        let (upstream, _upstream_h) = spawn_echo_upstream().await;
2200        let emitter = Arc::new(MemEmitter::default());
2201        let cfg = cfg_with(&["api.example.com"], upstream, 500);
2202        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2203
2204        // 3 streams: 1=allow, 3=deny, 5=deny
2205        let mut bytes = h2::HTTP2_PREFACE.to_vec();
2206        bytes.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2207        bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2208            &h2::test_helpers::hpack_literal_indexed_name(1, "api.example.com"),
2209            1,
2210        ));
2211        bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2212            &h2::test_helpers::hpack_literal_indexed_name(1, "evil1.example.com"),
2213            3,
2214        ));
2215        bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2216            &h2::test_helpers::hpack_literal_indexed_name(1, "evil2.example.com"),
2217            5,
2218        ));
2219
2220        let mut s = TcpStream::connect(listen).await.unwrap();
2221        s.write_all(&bytes).await.unwrap();
2222        // Drain the deny frames.
2223        let mut sink = vec![0u8; 64];
2224        let _ = tokio::time::timeout(Duration::from_millis(400), s.read(&mut sink)).await;
2225        s.shutdown().await.ok();
2226
2227        shutdown.store(true, Ordering::SeqCst);
2228        poke_shutdown(listen);
2229        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2230
2231        let evs = emitter.events.lock().unwrap();
2232        assert_eq!(
2233            evs.len(),
2234            3,
2235            "expected exactly one event per stream, got {evs:#?}"
2236        );
2237        let mut per_action: std::collections::HashMap<String, usize> =
2238            std::collections::HashMap::new();
2239        for ev in evs.iter() {
2240            let action = ev.data.as_ref().unwrap()["action"]
2241                .as_str()
2242                .unwrap()
2243                .to_string();
2244            *per_action.entry(action).or_default() += 1;
2245            // Every event must carry a streamId.
2246            assert!(
2247                ev.data
2248                    .as_ref()
2249                    .unwrap()
2250                    .get("streamId")
2251                    .and_then(|v| v.as_i64())
2252                    .is_some(),
2253                "event missing streamId: {ev:?}"
2254            );
2255        }
2256        assert_eq!(per_action.get("allow").copied(), Some(1));
2257        assert_eq!(per_action.get("deny").copied(), Some(2));
2258    }
2259
2260    #[tokio::test]
2261    async fn proxy_drops_data_frames_on_denied_stream() {
2262        let (upstream, upstream_h) = spawn_echo_upstream().await;
2263        let emitter = Arc::new(MemEmitter::default());
2264        let cfg = cfg_with(&["api.example.com"], upstream, 500);
2265        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2266
2267        // Stream 1 = denied (evil), then a DATA frame on stream 1
2268        // (must be dropped), then stream 3 = allowed.
2269        let secret = b"do-not-exfiltrate";
2270        let stream_bytes =
2271            build_h2c_denied_stream_then_data("api.example.com", "evil.example.com", secret);
2272        let mut s = TcpStream::connect(listen).await.unwrap();
2273        s.write_all(&stream_bytes).await.unwrap();
2274        s.shutdown().await.ok();
2275
2276        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2277            .await
2278            .expect("upstream task")
2279            .expect("upstream join");
2280        // The denied DATA payload MUST NOT appear in what upstream
2281        // received.
2282        let upstream_str = String::from_utf8_lossy(&upstream_bytes);
2283        assert!(
2284            !upstream_str.contains("do-not-exfiltrate"),
2285            "denied-stream DATA leaked to upstream: {upstream_str:?}"
2286        );
2287        // The allowed stream's HEADERS DID reach upstream.
2288        // (The simplest end-to-end check: upstream got non-zero bytes
2289        // from the allowed stream.)
2290        assert!(!upstream_bytes.is_empty(), "upstream got no bytes");
2291
2292        shutdown.store(true, Ordering::SeqCst);
2293        poke_shutdown(listen);
2294        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2295    }
2296
2297    #[tokio::test]
2298    async fn proxy_forwards_settings_and_window_update_verbatim() {
2299        let (upstream, upstream_h) = spawn_echo_upstream().await;
2300        let emitter = Arc::new(MemEmitter::default());
2301        let cfg = cfg_with(&["api.example.com"], upstream, 500);
2302        let (listen, shutdown, h) = spawn_proxy(cfg, emitter.clone()).await;
2303
2304        // preface + SETTINGS + HEADERS(allow) + WINDOW_UPDATE on stream 0.
2305        // WINDOW_UPDATE: type=0x08, length=4, flags=0, stream id=0,
2306        // payload = 4-byte window-size-increment (32-bit BE).
2307        let mut bytes = h2::HTTP2_PREFACE.to_vec();
2308        bytes.extend_from_slice(&h2::test_helpers::empty_settings_frame());
2309        bytes.extend_from_slice(&h2::test_helpers::headers_frame_on_stream(
2310            &h2::test_helpers::hpack_literal_indexed_name(1, "api.example.com"),
2311            1,
2312        ));
2313        // WINDOW_UPDATE frame: 4-byte payload (window-size-increment = 1024).
2314        let mut wu = h2::test_helpers::frame_header(4, 0x08, 0x00, 0);
2315        wu.extend_from_slice(&1024u32.to_be_bytes());
2316        bytes.extend_from_slice(&wu);
2317
2318        let mut s = TcpStream::connect(listen).await.unwrap();
2319        s.write_all(&bytes).await.unwrap();
2320        s.shutdown().await.ok();
2321
2322        let upstream_bytes = tokio::time::timeout(Duration::from_secs(2), upstream_h)
2323            .await
2324            .expect("upstream task")
2325            .expect("upstream join");
2326        // Upstream sees the SETTINGS, HEADERS, AND the WINDOW_UPDATE
2327        // verbatim. Asserting full equality is the strongest possible
2328        // shape check: every byte of the input arrived.
2329        assert_eq!(upstream_bytes, bytes);
2330
2331        shutdown.store(true, Ordering::SeqCst);
2332        poke_shutdown(listen);
2333        let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
2334    }
2335}