Skip to main content

phantom_protocol/api/
session.rs

1//! Client-First Transport Session
2//!
3//! `PhantomSession` provides instant connection establishment with
4//! automatic send queuing during handshake. This is the transport-level
5//! API that sits below MLS and above the raw UDP/TCP transport.
6
7use crate::crypto::hybrid_sign::HybridVerifyingKey;
8use crate::errors::CoreError;
9use crate::observability::attrs::{AeadAlgorithm, ReplayReason};
10use crate::observability::{Observability, ObservabilityConfig};
11use crate::runtime::{Runtime, TokioRuntime};
12use crate::transport::handshake::{
13    HandshakeClient, HelloRetryRequest, ServerHello, ServerReject, EARLY_DATA_MAX_LEN,
14};
15use crate::transport::multiplexer::StreamDemultiplexer;
16use crate::transport::packet_coalescer_codec::unwrap_coalesced_packet;
17use crate::transport::path_validation_codec::build_path_validation_packet;
18use crate::transport::session::Session;
19use crate::transport::stream::Stream;
20use crate::transport::types::{
21    LegType, PacketFlags, PacketHeader, PhantomPacket, SessionId, StreamId as TransportStreamId,
22    WIRE_VERSION,
23};
24use bytes::Bytes;
25use dashmap::DashMap;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use tokio::sync::{mpsc, oneshot, Mutex};
29
30/// Generate a fresh 128-bit session identifier from the thread-local CSPRNG.
31///
32/// Replaces the historical `rand::random::<u32>()` (32 bits, insufficient to
33/// avoid birthday collisions at scale and not advertised as cryptographic).
34/// `rand::thread_rng` is seeded from the OS at thread startup and uses a
35/// modern stream cipher (ChaCha) — adequate for non-secret identifiers.
36fn new_session_id() -> String {
37    let bytes: [u8; 16] = rand::random();
38    format!("phantom-{}", hex::encode(bytes))
39}
40
41// ─── Connection State ───────────────────────────────────────────────────────
42
43/// Connection state for `PhantomSession`.
44///
45/// The session is usable from the moment it's created — sends are queued
46/// until the handshake completes.
47#[cfg_attr(feature = "bindings", derive(uniffi::Enum))]
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49#[repr(u8)]
50#[non_exhaustive]
51pub enum ConnectionState {
52    /// Connection initiated, handshake pending
53    Connecting = 0,
54    /// Classical (X25519) channel established — data flows
55    ClassicalReady = 1,
56    /// PQC upgrade in progress
57    PqcUpgrading = 2,
58    /// Full hybrid PQC protection active
59    PqcReady = 3,
60    /// Fully connected and operational
61    Connected = 4,
62    /// Connection failed
63    Failed = 5,
64    /// Gracefully closed
65    Closed = 6,
66}
67
68impl ConnectionState {
69    fn from_u8(v: u8) -> Self {
70        match v {
71            0 => Self::Connecting,
72            1 => Self::ClassicalReady,
73            2 => Self::PqcUpgrading,
74            3 => Self::PqcReady,
75            4 => Self::Connected,
76            5 => Self::Failed,
77            6 => Self::Closed,
78            _ => Self::Failed,
79        }
80    }
81
82    /// Whether data can flow (classical or better).
83    pub fn is_data_ready(&self) -> bool {
84        matches!(
85            self,
86            Self::ClassicalReady | Self::PqcUpgrading | Self::PqcReady | Self::Connected
87        )
88    }
89}
90
91// ─── Resumption Hint ────────────────────────────────────────────────────────
92
93/// 0-RTT resumption material extracted from a completed session.
94///
95/// Produced by [`PhantomSession::resumption_hint`] after a handshake
96/// completes, and fed back into [`connect_pinned_with_resumption`] to
97/// attempt a 0-RTT reconnect to the same server.
98///
99/// Both fields are exactly 32 bytes — this record is the
100/// UniFFI-representable surface for the internal `(session_id,
101/// resumption_secret)` tuple. The fields are `Vec<u8>` because UniFFI
102/// has no fixed-size-array type, so the length is a runtime invariant
103/// checked when the hint is used.
104///
105/// Store the hint alongside the pinned `HybridVerifyingKey` of the
106/// server it was negotiated against: the `resumption_secret` is
107/// server-pinned, and reusing a hint across servers is a configuration
108/// bug.
109#[cfg_attr(feature = "bindings", derive(uniffi::Record))]
110#[derive(Clone)]
111#[non_exhaustive]
112pub struct ResumptionHint {
113    /// The negotiated session id (32 bytes).
114    pub session_id: Vec<u8>,
115    /// The resumption secret (32 bytes) — sensitive; treat like a key.
116    pub resumption_secret: Vec<u8>,
117}
118
119// INFOLEAK-1: hand-written redacting `Debug` (not derived) so a mobile/FFI
120// consumer that logs the hint with `{:?}` cannot leak the 0-RTT `resumption_secret`
121// — the one secret-bearing type that crosses the FFI boundary. Mirrors the
122// REDACTED `Debug` on `HybridSigningKey` / `HybridSecretKey`.
123impl std::fmt::Debug for ResumptionHint {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("ResumptionHint")
126            .field(
127                "session_id",
128                &format_args!("<{} bytes>", self.session_id.len()),
129            )
130            .field("resumption_secret", &"REDACTED")
131            .finish()
132    }
133}
134
135// ─── Transport Abstraction ──────────────────────────────────────────────────
136
137// `SessionTransport` now lives in `crate::transport::session_transport` — a
138// dependency-light module that can compile in a `no_std + alloc` build. It is
139// re-exported here so `crate::api::session::SessionTransport` and the public
140// `phantom_protocol::api::SessionTransport` path stay stable.
141pub use crate::transport::session_transport::{FramePhase, SessionTransport};
142
143/// Transport decorator that records `record_send` / `record_recv` on the
144/// session's [`Observability`] for every frame that crosses the wire — so the
145/// data-plane packet/byte counters reflect a real run without threading the
146/// handle through every send site. Wraps the concrete `SessionTransport` just
147/// before the data pump takes over, so handshake bytes are not counted as
148/// data-plane packets (they have their own handshake metric).
149struct ObservedTransport<T> {
150    inner: T,
151    observability: Arc<Observability>,
152    leg: LegType,
153}
154
155impl<T> ObservedTransport<T> {
156    fn new(inner: T, observability: Arc<Observability>, leg: LegType) -> Self {
157        Self {
158            inner,
159            observability,
160            leg,
161        }
162    }
163}
164
165impl<T: SessionTransport> SessionTransport for ObservedTransport<T> {
166    async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
167        let result = self.inner.send_bytes(data).await;
168        if result.is_ok() {
169            self.observability.record_send(data.len(), self.leg);
170        }
171        result
172    }
173
174    async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
175        let result = self.inner.recv_bytes().await;
176        if let Ok(ref bytes) = result {
177            self.observability.record_recv(bytes.len(), self.leg);
178        }
179        result
180    }
181}
182
183// ─── Session ────────────────────────────────────────────────────────────────
184
185/// Client-first session — instant `connect()`, non-blocking `send()`.
186///
187/// # Design
188///
189/// ```text
190///   let session = PhantomSession::connect("server:443");  // instant!
191///   session.send(data).await;   // queued until handshake completes
192///   session.send(data2).await;  // also queued
193///   // ... handshake completes in background ...
194///   // queued data auto-flushed, new sends go directly
195/// ```
196///
197/// The session progresses through states:
198/// `Connecting → ClassicalReady → PqcUpgrading → PqcReady → Connected`
199#[cfg_attr(feature = "bindings", derive(uniffi::Object))]
200pub struct PhantomSession {
201    /// Session identifier
202    id: String,
203    /// Target server address
204    peer_addr: String,
205    /// Connection state (atomic for lock-free reads)
206    state: Arc<AtomicU8>,
207    /// Queued messages before connection is ready
208    send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
209    /// Channel to send commands to the background handshake task
210    cmd_tx: mpsc::Sender<SessionCommand>,
211    /// Command receiver — taken by the background task when spawned
212    #[allow(dead_code)]
213    cmd_rx: Mutex<Option<mpsc::Receiver<SessionCommand>>>,
214    /// Received messages channel. Carries `Bytes` (not `Vec<u8>`) so the recv
215    /// path can fan out via cheap refcount clones to both the stream demux
216    /// and the synchronous `recv()` consumer without deep-copying the payload.
217    recv_rx: Mutex<mpsc::Receiver<Bytes>>,
218    /// Multiplexes incoming packets to independent streams
219    demux: Arc<StreamDemultiplexer>,
220    /// Active outgoing streams (ARQ management)
221    streams: Arc<DashMap<u32, Arc<Stream>>>,
222    /// Negotiated session handle, populated by the background task
223    /// once the handshake completes. Exposed via `resumption_hint`
224    /// for Phase 4.1 0-RTT clients. `None` while still handshaking
225    /// or after a failure.
226    inner_session: Arc<Mutex<Option<Arc<Session>>>>,
227    /// 0-RTT verdict. `None` while handshaking, after a failure, or when the
228    /// client sent no early-data on this connect. `Some(true)` — the server
229    /// consumed the early-data; `Some(false)` — the client sent early-data and
230    /// the server rejected it. Exposed via `early_data_accepted()`.
231    early_data_accepted: Arc<Mutex<Option<bool>>>,
232    /// Session observability handle. Server-accepted sessions share the
233    /// `PhantomListener`'s instance (so its `snapshot()` aggregates every
234    /// session it accepted); client sessions get their own. The data pump
235    /// records send/recv, the security drops, and the session lifecycle
236    /// (open/close) against it. A ZST no-op when `telemetry-otel` is off.
237    observability: Arc<Observability>,
238}
239
240/// Commands for the background session task
241pub enum SessionCommand {
242    /// Queue data for sending
243    Send(Vec<u8>),
244    /// Send data on a specific stream reliably
245    SendStreamReliable { stream_id: u32, data: bytes::Bytes },
246    /// Send data on a specific stream unreliably
247    SendStreamUnreliable { stream_id: u32, data: bytes::Bytes },
248    /// Close a specific stream
249    CloseStream { stream_id: u32 },
250    /// Close the session
251    Close,
252}
253
254impl PhantomSession {
255    /// Create a new session and start the background handshake task.
256    ///
257    /// Requires `expected_server_key` for MITM resistance — the client will
258    /// abort the handshake unless the server presents this exact verifying key.
259    /// Callers obtain this key out-of-band (e.g. from `PhantomListener::verifying_key_bytes`).
260    ///
261    /// The handshake runs in the background:
262    /// 1. Exchange hybrid PQC `ClientHello`/`ServerHello`.
263    /// 2. Verify server identity against `expected_server_key`.
264    /// 3. Derive AEAD keys; flush queued sends as encrypted packets.
265    ///
266    /// All network I/O goes through the provided `SessionTransport`. The
267    /// task that drives the handshake + data pump runs on the default
268    /// [`TokioRuntime`]; use
269    /// [`connect_with_transport_with_runtime`](Self::connect_with_transport_with_runtime)
270    /// to substitute a different `Runtime`.
271    pub fn connect_with_transport<T: SessionTransport>(
272        peer_addr: &str,
273        transport: T,
274        expected_server_key: HybridVerifyingKey,
275    ) -> Self {
276        Self::connect_with_transport_with_runtime(
277            peer_addr,
278            transport,
279            expected_server_key,
280            Arc::new(TokioRuntime),
281        )
282    }
283
284    /// Like [`connect_with_transport`](Self::connect_with_transport) but
285    /// runs the background task on the supplied `Runtime`. Intended for
286    /// WASM / embedded / test backends that don't drive `tokio::spawn`.
287    pub fn connect_with_transport_with_runtime<T: SessionTransport>(
288        peer_addr: &str,
289        transport: T,
290        expected_server_key: HybridVerifyingKey,
291        runtime: Arc<dyn Runtime>,
292    ) -> Self {
293        Self::spawn_client(peer_addr, transport, expected_server_key, runtime, None)
294    }
295
296    /// Connect with a **0-RTT resumption attempt**.
297    ///
298    /// `resumption_hint` is the `(session_id, resumption_secret)` tuple
299    /// from a prior session's [`PhantomSession::resumption_hint`].
300    /// `early_data` (≤ [`EARLY_DATA_MAX_LEN`] bytes) is sealed and carried
301    /// inside the resuming ClientHello so it reaches the server on the very
302    /// first flight — saving a round-trip versus 1-RTT.
303    ///
304    /// Acceptance is best-effort: a stale/unknown ticket or an AEAD failure
305    /// leaves [`early_data_accepted`](Self::early_data_accepted) at
306    /// `Some(false)` and the handshake completes as a normal 1-RTT exchange —
307    /// the caller must then send that payload over the normal channel.
308    /// Returns `Err` only when `early_data` exceeds the cap.
309    ///
310    /// Runs on the default [`TokioRuntime`].
311    pub fn connect_with_resumption<T: SessionTransport>(
312        peer_addr: &str,
313        transport: T,
314        expected_server_key: HybridVerifyingKey,
315        resumption_hint: ([u8; 32], [u8; 32]),
316        early_data: Vec<u8>,
317    ) -> Result<Self, CoreError> {
318        // fips bootstrap POST gate. `connect_with_resumption`
319        // returns `Result`, so unlike the infallible `connect_with_transport*`
320        // entry points we can surface the POST failure directly to the
321        // caller (mirrors the `PhantomListener::bind*` and
322        // `connect_pinned*` convention). The same POST is also checked
323        // in `background_task` as a defense-in-depth backstop.
324        #[cfg(feature = "fips")]
325        crate::crypto::self_tests::ensure_post_passed()
326            .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
327
328        if early_data.len() > EARLY_DATA_MAX_LEN {
329            return Err(CoreError::ValidationError(format!(
330                "early_data is {} bytes, exceeds the {}-byte 0-RTT cap",
331                early_data.len(),
332                EARLY_DATA_MAX_LEN
333            )));
334        }
335        let (resume_id, resume_secret) = resumption_hint;
336        Ok(Self::spawn_client(
337            peer_addr,
338            transport,
339            expected_server_key,
340            Arc::new(TokioRuntime),
341            Some((resume_id, resume_secret, early_data)),
342        ))
343    }
344
345    /// Shared constructor body for [`connect_with_transport_with_runtime`]
346    /// and [`connect_with_resumption`]. `resumption_request` is `None`
347    /// for a plain handshake, `Some((id, secret, early_data))` to attempt a
348    /// 0-RTT resumption.
349    fn spawn_client<T: SessionTransport>(
350        peer_addr: &str,
351        transport: T,
352        expected_server_key: HybridVerifyingKey,
353        runtime: Arc<dyn Runtime>,
354        resumption_request: Option<([u8; 32], [u8; 32], Vec<u8>)>,
355    ) -> Self {
356        let (cmd_tx, cmd_rx) = mpsc::channel(256);
357        let (recv_tx, recv_rx) = mpsc::channel(256);
358
359        let state = Arc::new(AtomicU8::new(ConnectionState::Connecting as u8));
360        let send_queue = Arc::new(Mutex::new(Vec::new()));
361        let peer = peer_addr.to_string();
362        let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
363        let demux = Arc::new(demux);
364
365        let streams = Arc::new(DashMap::new());
366        let inner_session: Arc<Mutex<Option<Arc<Session>>>> = Arc::new(Mutex::new(None));
367        let early_data_accepted: Arc<Mutex<Option<bool>>> = Arc::new(Mutex::new(None));
368        // Client sessions have no listener, so they own their observability
369        // instance (its `snapshot()` reflects just this connection).
370        let observability = Observability::new(ObservabilityConfig::default());
371
372        let session = Self {
373            id: new_session_id(),
374            peer_addr: peer.clone(),
375            state: state.clone(),
376            send_queue: send_queue.clone(),
377            cmd_tx: cmd_tx.clone(),
378            cmd_rx: Mutex::new(None), // taken by background task
379            recv_rx: Mutex::new(recv_rx),
380            demux: demux.clone(),
381            streams: streams.clone(),
382            inner_session: inner_session.clone(),
383            early_data_accepted: early_data_accepted.clone(),
384            observability: observability.clone(),
385        };
386
387        // Spawn the background handshake + data pump task on the supplied
388        // runtime. `SpawnHandle` is detached: dropping it leaves the task
389        // running. The session is owned by the caller for its lifetime
390        // and natural shutdown comes via `SessionCommand::Close`.
391        let runtime_for_pump = runtime.clone();
392        let _detached = runtime.spawn(Box::pin(Self::background_task(
393            state,
394            send_queue,
395            cmd_tx,
396            cmd_rx,
397            recv_tx,
398            transport,
399            peer,
400            demux,
401            streams,
402            expected_server_key,
403            runtime_for_pump,
404            inner_session,
405            early_data_accepted,
406            resumption_request,
407            observability,
408        )));
409
410        session
411    }
412
413    /// Install a server-side `Session` (already derived by `HandshakeServer::process_client_hello`)
414    /// and spawn the data pump on the default [`TokioRuntime`]. Used by
415    /// `PhantomListener::accept` after driving the server handshake.
416    ///
417    /// `PhantomListener::accept` itself now uses
418    /// `from_accepted_server_session_with_runtime` so the listener's
419    /// runtime is honored. This wrapper is preserved for callers that
420    /// do not have a runtime handle and want the default `TokioRuntime`.
421    #[allow(dead_code)]
422    pub(crate) fn from_accepted_server_session<T: SessionTransport>(
423        peer_addr: String,
424        transport: T,
425        server_session: Arc<Session>,
426    ) -> Arc<Self> {
427        Self::from_accepted_server_session_with_runtime(
428            peer_addr,
429            transport,
430            server_session,
431            Arc::new(TokioRuntime),
432            Observability::new(ObservabilityConfig::default()),
433        )
434    }
435
436    /// Runtime-aware variant of [`from_accepted_server_session`].
437    pub(crate) fn from_accepted_server_session_with_runtime<T: SessionTransport>(
438        peer_addr: String,
439        transport: T,
440        server_session: Arc<Session>,
441        runtime: Arc<dyn Runtime>,
442        observability: Arc<Observability>,
443    ) -> Arc<Self> {
444        let (cmd_tx, cmd_rx) = mpsc::channel(256);
445        let (recv_tx, recv_rx) = mpsc::channel(256);
446
447        let state = Arc::new(AtomicU8::new(ConnectionState::Connected as u8));
448        let send_queue = Arc::new(Mutex::new(Vec::new()));
449        let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
450        let demux = Arc::new(demux);
451        let streams = Arc::new(DashMap::new());
452
453        let inner_session: Arc<Mutex<Option<Arc<Session>>>> =
454            Arc::new(Mutex::new(Some(server_session.clone())));
455
456        let session = Arc::new(Self {
457            id: new_session_id(),
458            peer_addr: peer_addr.clone(),
459            state: state.clone(),
460            send_queue: send_queue.clone(),
461            cmd_tx,
462            cmd_rx: Mutex::new(None),
463            recv_rx: Mutex::new(recv_rx),
464            demux: demux.clone(),
465            streams: streams.clone(),
466            inner_session,
467            // Server side: 0-RTT early-data is delivered via
468            // `AcceptOutcome`, not this client-facing field.
469            early_data_accepted: Arc::new(Mutex::new(None)),
470            // Shares the listener's instance so its `snapshot()` aggregates
471            // every accepted session.
472            observability: observability.clone(),
473        });
474
475        let session_id = *server_session.id();
476        let runtime_for_pump = runtime.clone();
477        // WIRE-001: the server handshake is complete — raise the receive frame
478        // cap from the tight unauthenticated handshake limit to the steady-state
479        // application limit before the data pump takes over.
480        transport.set_frame_phase(FramePhase::Established);
481        let observed = Arc::new(ObservedTransport::new(
482            transport,
483            observability.clone(),
484            LegType::Tcp,
485        ));
486        let _detached = runtime.spawn(Box::pin(run_data_pump(
487            server_session,
488            session_id,
489            observed,
490            state,
491            send_queue,
492            cmd_rx,
493            recv_tx,
494            demux,
495            streams,
496            runtime_for_pump,
497            observability,
498            LegType::Tcp,
499        )));
500
501        session
502    }
503
504    /// Background task: performs handshake, then pumps data.
505    #[allow(clippy::too_many_arguments)]
506    async fn background_task<T: SessionTransport>(
507        state: Arc<AtomicU8>,
508        send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
509        _cmd_tx: mpsc::Sender<SessionCommand>,
510        cmd_rx: mpsc::Receiver<SessionCommand>,
511        recv_tx: mpsc::Sender<Bytes>,
512        transport: T,
513        peer: String,
514        demux: Arc<StreamDemultiplexer>,
515        streams: Arc<DashMap<u32, Arc<Stream>>>,
516        expected_server_key: HybridVerifyingKey,
517        runtime: Arc<dyn Runtime>,
518        inner_session: Arc<Mutex<Option<Arc<Session>>>>,
519        early_data_accepted: Arc<Mutex<Option<bool>>>,
520        resumption_request: Option<([u8; 32], [u8; 32], Vec<u8>)>,
521        observability: Arc<Observability>,
522    ) {
523        // DEBUG: the peer address is correlatable; keep it off default logs.
524        log::debug!("PhantomSession: starting handshake with {}", peer);
525
526        // fips bootstrap POST gate, mirroring the listener and
527        // `connect_pinned*` paths: the synchronous Rust-only entry
528        // points (`connect_with_transport*` / `connect_with_resumption`)
529        // also need to honor FIPS 140-3 §7.7 before any cryptographic
530        // work. Cached `OnceLock` makes the second+ call an atomic
531        // read; the first call runs the full POST battery.
532        //
533        // On failure we cannot return a `CoreError` (the entry points
534        // are infallible by API contract) — instead we transition the
535        // state machine to `Failed` and bail, matching the existing
536        // handshake-failure shape. The error string lands in the log.
537        #[cfg(feature = "fips")]
538        if let Err(e) = crate::crypto::self_tests::ensure_post_passed() {
539            log::error!(
540                "PhantomSession: FIPS POST self-test failed; refusing to handshake: {:?}",
541                e
542            );
543            state.store(ConnectionState::Failed as u8, Ordering::Relaxed);
544            return;
545        }
546
547        // Retain a copy of any 0-RTT early-data so it can be losslessly
548        // re-sent over the established session if the server rejects it (C3 —
549        // the rejection-retransmission contract). `run_client_handshake`
550        // consumes `resumption_request`, so clone the blob first.
551        let pending_early_data: Option<Vec<u8>> = resumption_request
552            .as_ref()
553            .and_then(|(_, _, ed)| (!ed.is_empty()).then(|| ed.clone()));
554
555        // ── Stage 1 & 2: Hybrid Handshake (optionally 0-RTT resumption) ──
556        // HS-02: bound the whole client handshake by a wall-clock deadline so a
557        // silent or stalling server can't hang the connect indefinitely. The
558        // TIMER is `runtime.sleep` (NOT raw tokio::time) so it stays correct
559        // under WasmRuntime/EmbeddedRuntime; `select!` is just the combinator.
560        const CLIENT_HANDSHAKE_DEADLINE: std::time::Duration = std::time::Duration::from_secs(10);
561        // Scoped so the handshake future's borrow of `transport` ends before
562        // `transport` is moved into the data pump below.
563        let handshake_result = {
564            let handshake_fut =
565                run_client_handshake(&transport, &expected_server_key, resumption_request);
566            let handshake_timeout = runtime.sleep(CLIENT_HANDSHAKE_DEADLINE);
567            tokio::pin!(handshake_fut);
568            tokio::select! {
569                r = &mut handshake_fut => r,
570                _ = handshake_timeout => Err(CoreError::Timeout),
571            }
572        };
573        let (crypto_session, ed_accepted) = match handshake_result {
574            Ok((session, accepted)) => (Arc::new(session), accepted),
575            Err(e) => {
576                log::error!("PhantomSession: handshake failed: {}", e);
577                state.store(ConnectionState::Failed as u8, Ordering::Relaxed);
578                return;
579            }
580        };
581        log::info!("PhantomSession: Handshake complete — hybrid channel ready");
582
583        // Phase 4.1 — publish the negotiated Session + the 0-RTT
584        // verdict via the outer PhantomSession so `resumption_hint()`
585        // and `early_data_accepted()` can reach them after the
586        // background task moves the Arc into the pump.
587        {
588            let mut guard = inner_session.lock().await;
589            *guard = Some(crypto_session.clone());
590        }
591        *early_data_accepted.lock().await = ed_accepted;
592
593        // C3 — 0-RTT rejection retransmission contract. If we sent early-data
594        // and the server rejected it (`Some(false)`), it never reached the
595        // application layer, so re-send it losslessly over the now-established
596        // 1-RTT session. Prepend it to the pre-handshake send queue (drained
597        // first by the pump onto the reliable raw-app stream) so it lands
598        // *ahead* of anything the app queued while connecting — preserving the
599        // order in which the bytes were originally offered. `Some(true)` (the
600        // server consumed it) and `None` (none sent) need no action.
601        if ed_accepted == Some(false) {
602            if let Some(ed) = pending_early_data {
603                send_queue.lock().await.insert(0, ed);
604                log::debug!(
605                    "PhantomSession: 0-RTT early-data rejected; re-queued for 1-RTT delivery"
606                );
607            }
608        }
609
610        let session_id = *crypto_session.id();
611        state.store(ConnectionState::Connected as u8, Ordering::Relaxed);
612        log::debug!("PhantomSession: fully connected to {}", peer);
613
614        // Wrap the (post-handshake) transport so every data-plane send/recv is
615        // recorded. The connectionless API rides TCP today (KCP / FakeTLS legs
616        // are not session-wired yet), so the leg label is fixed to TCP.
617        // WIRE-001: the handshake is done — raise the frame cap from the tight
618        // unauthenticated handshake limit to the steady-state application limit.
619        transport.set_frame_phase(FramePhase::Established);
620        let observed = Arc::new(ObservedTransport::new(
621            transport,
622            observability.clone(),
623            LegType::Tcp,
624        ));
625        run_data_pump(
626            crypto_session,
627            session_id,
628            observed,
629            state,
630            send_queue,
631            cmd_rx,
632            recv_tx,
633            demux,
634            streams,
635            runtime,
636            observability,
637            LegType::Tcp,
638        )
639        .await;
640    }
641}
642
643/// Drive the client side of the Phantom handshake to completion.
644///
645/// When `resumption` is `Some((resume_id, resume_secret, early_data))` the
646/// first-flight `ClientHello` carries the resume id and, when `early_data` is
647/// non-empty, a sealed 0-RTT blob folded into `ClientHello.early_data` — so it
648/// reaches the server on the first flight. A cookie/PoW `HelloRetryRequest` is
649/// answered in-loop, reusing the same hello (the early-data blob rides along).
650///
651/// Returns the established `Session` and the 0-RTT verdict (resolved
652/// decision 1):
653/// - `Some(true)`  — the client sent early-data and the server consumed it
654/// - `Some(false)` — the client sent early-data and the server rejected it
655///   (stale ticket / oversized / AEAD failure)
656/// - `None`        — the client sent no early-data on this connect
657async fn run_client_handshake<T: SessionTransport>(
658    transport: &T,
659    expected_server_key: &HybridVerifyingKey,
660    resumption: Option<([u8; 32], [u8; 32], Vec<u8>)>,
661) -> Result<(Session, Option<bool>), CoreError> {
662    let handshake = HandshakeClient::new()?;
663
664    // Build the first-flight ClientHello. A resumption request folds the
665    // resume id and (optionally) a sealed 0-RTT early-data blob into the
666    // single hello; otherwise it is a plain hello.
667    let mut hello = match &resumption {
668        Some((resume_id, resume_secret, early_data)) => {
669            let ed: Option<&[u8]> = if early_data.is_empty() {
670                None
671            } else {
672                Some(early_data.as_slice())
673            };
674            handshake.create_client_hello_with_resume(*resume_id, resume_secret, ed)
675        }
676        None => handshake.create_client_hello(),
677    };
678
679    // HS-02: cap the number of HelloRetryRequest rounds. The legitimate flow
680    // needs at most one cookie round + one PoW round; a bound of 3 leaves slack
681    // for a benign reorder. Without it, a MITM answering every ClientHello with
682    // a fresh cheap HelloRetryRequest could loop the client forever.
683    const MAX_CLIENT_RETRY_ROUNDS: u32 = 3;
684    let mut retry_rounds: u32 = 0;
685
686    loop {
687        let bytes = borsh::to_vec(&hello).map_err(|e| {
688            CoreError::SerializationError(format!("ClientHello encode failed: {}", e))
689        })?;
690        transport.send_bytes(&bytes).await?;
691        let resp = transport.recv_bytes().await?;
692
693        // The reply is one of three shapes: a `ServerHello` (success), a
694        // `HelloRetryRequest` (cookie/PoW demand), or a `ServerReject` (the
695        // server cannot speak our version). Try the success shape first — a
696        // retry/reject blob is far too small to deserialize as a multi-KiB
697        // ServerHello, so the disambiguation is unambiguous.
698        if let Ok(sh) = borsh::from_slice::<ServerHello>(&resp) {
699            let (session, accepted) =
700                handshake.process_server_hello(&hello, &sh, Some(expected_server_key))?;
701            return Ok((session, accepted));
702        } else if let Ok(reject) = borsh::from_slice::<ServerReject>(&resp) {
703            // The marker guard keeps a same-sized non-reject blob from being
704            // mistaken for a reject. We surface this as a hard error and do
705            // NOT auto-downgrade to `reject.supported_version` — a forced
706            // downgrade on an injected reject would defeat the transcript-bound
707            // version pin (Invariant 7).
708            if reject.has_marker() {
709                return Err(CoreError::HandshakeError(format!(
710                    "server rejected the handshake: unsupported protocol version \
711                     (client speaks v{}, server speaks v{})",
712                    hello.version, reject.supported_version
713                )));
714            }
715            return Err(CoreError::HandshakeError(
716                "invalid ServerHello, Retry, or Reject received".into(),
717            ));
718        } else if let Ok(retry) = borsh::from_slice::<HelloRetryRequest>(&resp) {
719            retry_rounds += 1;
720            if retry_rounds > MAX_CLIENT_RETRY_ROUNDS {
721                return Err(CoreError::HandshakeError(format!(
722                    "server demanded more than {MAX_CLIENT_RETRY_ROUNDS} HelloRetryRequest rounds"
723                )));
724            }
725            log::info!("PhantomSession: Received HelloRetryRequest, retrying...");
726            hello.cookie = retry.cookie;
727            if let Some(challenge) = retry.challenge {
728                // H3: cap the accepted difficulty and bound the solver, so an
729                // injected/malicious HelloRetryRequest (e.g. difficulty 255)
730                // surfaces a handshake error instead of pinning a CPU core.
731                log::info!("PhantomSession: Solving PoW challenge...");
732                hello.pow_solution = Some(
733                    challenge
734                        .solve_capped(crate::crypto::pow::MAX_CLIENT_POW_DIFFICULTY)
735                        .map_err(|e| CoreError::HandshakeError(e.to_string()))?,
736                );
737            }
738            continue;
739        } else {
740            return Err(CoreError::HandshakeError(
741                "invalid ServerHello, Retry, or Reject received".into(),
742            ));
743        }
744    }
745}
746
747/// Shared client/server data pump.
748///
749/// After the handshake completes (client side) or after the server `Session` is
750/// derived (server side), this loop:
751///   - drains the queued early-data buffer,
752///   - listens for incoming packets and decrypts them,
753///   - encrypts outgoing application/stream packets,
754///   - sends ACKs for reliable packets.
755// The 11 parameters represent the complete session-identity and I/O surface.
756// Grouping them into a struct would require a generic struct (due to `T:
757// SessionTransport`), add indirection with no safety or clarity gain, and
758// constitute a public-API change. The function is private (`async fn`, no
759// `pub`), so the extra arguments are contained here.
760#[allow(clippy::too_many_arguments)]
761async fn run_data_pump<T: SessionTransport>(
762    crypto_session: Arc<Session>,
763    session_id: SessionId,
764    transport: Arc<T>,
765    state: Arc<AtomicU8>,
766    send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
767    mut cmd_rx: mpsc::Receiver<SessionCommand>,
768    recv_tx: mpsc::Sender<Bytes>,
769    demux: Arc<StreamDemultiplexer>,
770    streams: Arc<DashMap<u32, Arc<Stream>>>,
771    runtime: Arc<dyn Runtime>,
772    observability: Arc<Observability>,
773    leg: LegType,
774) {
775    // Session is now established and active — bump the active-session gauge.
776    // The matching `session_closed` at teardown (below) lets the gauge fall,
777    // so it tracks live sessions instead of growing monotonically.
778    observability.session_opened(leg);
779
780    // ── Raw-app session stream (reserved id 1) ──
781    // The connectionless `send()` / `recv()` surface is multiplexed onto one
782    // reserved stream so it gets the same reliable-delivery machinery as
783    // explicitly-opened streams: `drain_streams_priority_ordered` (re)transmits
784    // its buffered segments on the poll tick / outbound-ready notify, and
785    // inbound ACKs for id 1 clear them via `Stream::ack`. The demultiplexer
786    // hands out ids 2+, so this never collides with a user-opened stream.
787    const RAW_APP_STREAM_ID: u32 = 1;
788    let raw_stream = Arc::new(Stream::new(RAW_APP_STREAM_ID as TransportStreamId));
789    streams.insert(RAW_APP_STREAM_ID, raw_stream.clone());
790
791    // ── Flush queued early-data onto the raw-app stream ──
792    // Routed through the stream (not a one-shot direct send) so queued
793    // pre-handshake data is buffered for retransmit just like post-handshake
794    // sends — a dropped early-data frame is recovered, not lost.
795    {
796        let mut queue = send_queue.lock().await;
797        let count = queue.len();
798        for msg in queue.drain(..) {
799            for chunk in msg.chunks(TRANSPORT_MTU) {
800                raw_stream
801                    .send_reliable(Bytes::copy_from_slice(chunk))
802                    .await;
803            }
804        }
805        if count > 0 {
806            log::info!(
807                "PhantomSession: queued {} early-data message(s) onto the raw-app stream",
808                count
809            );
810            crypto_session.notify_outbound_ready();
811        }
812    }
813
814    // ── Receive-delivery decoupling ──
815    // The reader task hands decrypted application data to a dedicated delivery
816    // task over an UNBOUNDED channel and never blocks on app delivery, so a slow
817    // `recv()` consumer cannot head-of-line-stall inbound ACK / WINDOW_UPDATE /
818    // control processing. The delivery task does the app-paced `recv_tx.send()`
819    // and credits the flow-control window on *real* consumption; enforced
820    // send-side flow control (`Stream::poll_send`) bounds the in-flight backlog
821    // to ~one window, and `undelivered_bytes` + `RECV_DELIVERY_HARD_CAP` guard
822    // against a peer that ignores flow control.
823    let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
824    let undelivered_bytes = Arc::new(AtomicU64::new(0));
825    {
826        let recv_tx_deliver = recv_tx; // move the session recv channel here
827        let demux_deliver = demux.clone();
828        let streams_deliver = streams.clone();
829        let crypto_deliver = crypto_session.clone();
830        let undelivered_deliver = undelivered_bytes.clone();
831        runtime.spawn(Box::pin(async move {
832            while let Some((stream_id, bytes)) = deliver_rx.recv().await {
833                let len = bytes.len() as u64;
834                // Best-effort, non-blocking notification to the (vestigial) demux.
835                demux_deliver.route_data(stream_id, bytes.clone());
836                // Account the item the instant it leaves the UNBOUNDED delivery
837                // queue (which the reader's HARD_CAP guards) — BEFORE the
838                // app-paced `recv_tx.send()` below, which can block for a long
839                // time on a slow consumer. Decrementing (and crediting) only
840                // after a successful send would (a) keep this item counted
841                // against the cap while it sits in the bounded recv pipeline,
842                // inflating `undelivered_bytes`, and (b) leak the count entirely
843                // if the send then fails. The byte is now in the bounded
844                // recv-channel pipeline (capacity-limited, its own backpressure),
845                // so it no longer belongs to the unbounded backlog.
846                undelivered_deliver.fetch_sub(len, Ordering::AcqRel);
847                // Credit the flow-control window: the item has been pulled into
848                // the app-delivery pipeline (matching the inline ACK's "accepted
849                // into my in-memory delivery queue" semantics). The pull rate is
850                // still paced by `recv_tx.send()` completing below, so credit
851                // tracks app consumption (one item of look-ahead) — backpressure
852                // is preserved. Wake the send loop to flush the WINDOW_UPDATE
853                // (emitted there, under rekey ownership, so it is epoch-safe).
854                if let Some(stream) = streams_deliver.get(&stream_id) {
855                    if let Some(credit) = stream.record_app_consumed(len as u32) {
856                        stream.stage_window_update_credit(credit);
857                        crypto_deliver.notify_outbound_ready();
858                    }
859                }
860                // Real, app-paced delivery to the session recv channel. A closed
861                // channel means the consumer is gone → session ending; stop. The
862                // item was already removed from the backlog accounting above, so
863                // breaking here leaks nothing.
864                if recv_tx_deliver.send(bytes).await.is_err() {
865                    break;
866                }
867            }
868        }));
869    }
870
871    // ── Receive (reader) task: deserialize, decrypt, hand off to delivery ──
872    let transport_recv = transport.clone();
873    let transport_send_ack = transport.clone();
874    let crypto_recv = crypto_session.clone();
875    let demux_recv = demux.clone();
876    let streams_recv = streams.clone();
877    let undelivered_reader = undelivered_bytes.clone();
878    let observability_recv = observability.clone();
879    // Completion signal for the receive task. `SpawnHandle` from the
880    // runtime trait does not expose a `Future` for `.await` directly
881    // (different runtimes provide different join futures), so we wire a
882    // one-shot channel — the recv task sends `()` right before exiting
883    // and the main loop selects on the receiver to detect transport
884    // closure.
885    let (recv_done_tx, mut recv_done_rx) = oneshot::channel::<()>();
886    let transport_for_path = transport.clone();
887    let recv_handle = runtime.spawn(Box::pin(async move {
888        // Reusable buffer for ACK frame serialization. Hoisted out of the
889        // loop (Phase 2.3) so we don't pay a fresh `Vec::new()` allocation
890        // for every ACK we emit on a busy reliable stream. 256 bytes is
891        // comfortably larger than a serialized empty `PhantomPacket` (the
892        // 45-byte header plus a couple of length prefixes), so the underlying
893        // buffer is never reallocated after the first frame.
894        let mut ack_buf: Vec<u8> = Vec::with_capacity(256);
895        // Monotonic sequence space for outbound PATH_VALIDATION packets.
896        // Local to the recv task because that's where
897        // path-validation echoes are emitted in response to incoming
898        // challenges. Wraps via `wrapping_add` — sequence space is the
899        // session's overall stream-0 control space.
900        let mut path_validation_seq: u32 = 0;
901        // Buffering ceiling: the delivery queue is unbounded so the reader
902        // never blocks, but a peer that ignores flow control could flood it.
903        // Compliant senders are bounded by ~one window per stream (enforced
904        // `poll_send`), far below this cap; crossing it means the peer is
905        // misbehaving, so we tear the session down rather than buffer without
906        // limit. 4 MiB tolerates many streams × the 64 KiB window with margin.
907        const RECV_DELIVERY_HARD_CAP: u64 = 4 * 1024 * 1024;
908        loop {
909            // Flow-control / anti-flood gate: if the app-delivery backlog
910            // has blown past the cap, the peer is not honouring the window —
911            // close instead of growing the in-memory queue unboundedly. Cheap
912            // pre-check, before any AEAD work.
913            if undelivered_reader.load(Ordering::Acquire) > RECV_DELIVERY_HARD_CAP {
914                log::warn!(
915                    "PhantomSession: receive backlog {} B exceeds cap — peer ignoring flow \
916                     control; closing session",
917                    undelivered_reader.load(Ordering::Acquire)
918                );
919                break;
920            }
921            let data = match transport_recv.recv_bytes().await {
922                Ok(b) => b,
923                Err(_) => break,
924            };
925
926            // A malformed / unparseable frame (no legitimate peer produces
927            // one) is dropped — never a panic.
928            let packet = match PhantomPacket::from_wire(&data) {
929                Ok(v) => v,
930                Err(_) => continue,
931            };
932            // Pinned wire-version gate: the format is not negotiated, so a
933            // frame carrying any other version byte is dropped.
934            if packet.header.version != WIRE_VERSION {
935                continue;
936            }
937            handle_packet(
938                packet,
939                session_id,
940                &crypto_recv,
941                &streams_recv,
942                &demux_recv,
943                &transport_send_ack,
944                &transport_for_path,
945                &deliver_tx,
946                &undelivered_reader,
947                &mut ack_buf,
948                &mut path_validation_seq,
949                &observability_recv,
950                leg,
951            )
952            .await;
953        }
954        // Reader exiting → drop `deliver_tx` so the delivery task drains any
955        // queued items and then sees the channel closed and exits.
956        drop(deliver_tx);
957        // Signal the main loop that the recv task has exited so it can
958        // also unwind. `send` returns `Err(())` if the receiver was
959        // already dropped — that case is harmless, the main loop has
960        // already shut down.
961        let _ = recv_done_tx.send(());
962    }));
963
964    // MTU for transport packets
965    const TRANSPORT_MTU: usize = 1300;
966    // Phase 2.4: the 10 ms `poll_interval` stays as a retransmit-timer
967    // fallback (streams without an explicit notifier reference still
968    // get swept), but `send_notify.notified()` joins the select! so the
969    // pump wakes immediately when a producer calls
970    // `Session::notify_outbound_ready()`. This drops idle CPU usage to
971    // zero on quiet sessions while keeping the worst-case post-queue
972    // latency at <10 ms even for producers that haven't been wired into
973    // the notifier yet.
974    let mut poll_interval = tokio::time::interval(std::time::Duration::from_millis(10));
975    let send_notify = crypto_session.send_notifier();
976    // Outbound WINDOW_UPDATE control packets are emitted on the send loop — the
977    // single writer that also owns the rekey lock — so the encrypted control
978    // frame is always sealed under a consistent epoch. The delivery task only
979    // stages the relative credit (`Stream::stage_window_update_credit`) and
980    // wakes us; the wire sequence is drawn from the stream's own send-sequence
981    // space inside `flush_pending_window_updates` (no private counter, so it
982    // can never collide with application data on the AEAD nonce).
983
984    loop {
985        tokio::select! {
986            _ = poll_interval.tick() => {
987                flush_pending_window_updates(
988                    &transport, &crypto_session, session_id, &streams,
989                )
990                .await;
991                drain_streams_priority_ordered(
992                    &transport,
993                    &crypto_session,
994                    session_id,
995                    &streams,
996                )
997                .await;
998            }
999            _ = send_notify.notified() => {
1000                // Same drain logic as the tick arm — fast-wake path. Also flush
1001                // any flow-control credit the delivery task staged.
1002                flush_pending_window_updates(
1003                    &transport, &crypto_session, session_id, &streams,
1004                )
1005                .await;
1006                drain_streams_priority_ordered(
1007                    &transport,
1008                    &crypto_session,
1009                    session_id,
1010                    &streams,
1011                )
1012                .await;
1013            }
1014            cmd_opt = cmd_rx.recv() => {
1015                match cmd_opt {
1016                    Some(SessionCommand::Send(data)) => {
1017                        // Route through the raw-app stream so the payload is
1018                        // buffered for retransmit until ACKed (drained by
1019                        // `drain_streams_priority_ordered`), instead of being
1020                        // fired once and forgotten on the wire.
1021                        for chunk in data.chunks(TRANSPORT_MTU) {
1022                            raw_stream
1023                                .send_reliable(Bytes::copy_from_slice(chunk))
1024                                .await;
1025                        }
1026                        crypto_session.notify_outbound_ready();
1027                    }
1028                    Some(SessionCommand::SendStreamReliable { stream_id, data }) => {
1029                        if let Some(stream) = streams.get(&stream_id) {
1030                            for chunk in data.chunks(TRANSPORT_MTU) {
1031                                stream.send_reliable(Bytes::copy_from_slice(chunk)).await;
1032                            }
1033                        }
1034                    }
1035                    Some(SessionCommand::SendStreamUnreliable { stream_id, data }) => {
1036                        if let Some(stream) = streams.get(&stream_id) {
1037                            for chunk in data.chunks(TRANSPORT_MTU) {
1038                                stream.send_unreliable(Bytes::copy_from_slice(chunk)).await;
1039                            }
1040                        }
1041                    }
1042                    Some(SessionCommand::CloseStream { stream_id }) => {
1043                        if let Some(stream) = streams.get(&stream_id) {
1044                            stream.finish().await;
1045                            // Same per-stream sequence space as the stream's data
1046                            // (and its WINDOW_UPDATEs) so this bare FIN cannot
1047                            // collide on the AEAD nonce / replay window.
1048                            let seq = stream.next_send_sequence();
1049                            let _ = send_app_data(
1050                                &transport,
1051                                &crypto_session,
1052                                session_id,
1053                                stream_id as TransportStreamId,
1054                                seq,
1055                                &[],
1056                                PacketFlags::FIN,
1057                            ).await;
1058                        }
1059                        streams.remove(&stream_id);
1060                        demux.close_stream(stream_id);
1061                    }
1062                    Some(SessionCommand::Close) => {
1063                        log::info!("PhantomSession: closing");
1064                        break;
1065                    }
1066                    None => {
1067                        log::info!("PhantomSession: command channel dropped");
1068                        break;
1069                    }
1070                }
1071            }
1072            _ = &mut recv_done_rx => {
1073                log::error!("PhantomSession: receive task ended unexpectedly (transport closed)");
1074                break;
1075            }
1076        }
1077    }
1078
1079    // Abort the recv task if it's still running; idempotent on a finished
1080    // handle. Goes through the runtime-agnostic `SpawnHandle::abort`.
1081    recv_handle.abort();
1082    state.store(ConnectionState::Closed as u8, Ordering::Relaxed);
1083    // Session torn down — drop the active-session gauge back down.
1084    observability.session_closed(leg);
1085}
1086
1087/// Emit any flow-control credit the receive **delivery** task staged.
1088///
1089/// The delivery task credits the window on real app consumption and stages the
1090/// relative credit via `Stream::stage_window_update_credit` + a send-loop wake;
1091/// the send loop (this, the single rekey owner) actually encrypts and sends the
1092/// `WINDOW_UPDATE`, so the control frame is always sealed under a consistent
1093/// epoch. The staged credits are snapshotted out of the `DashMap` first so no
1094/// shard lock is held across the `.await` (which would deadlock the delivery /
1095/// reader tasks that also touch `streams`).
1096async fn flush_pending_window_updates<T: SessionTransport>(
1097    transport: &Arc<T>,
1098    crypto_session: &Arc<Session>,
1099    session_id: SessionId,
1100    streams: &Arc<DashMap<u32, Arc<Stream>>>,
1101) {
1102    let pending: Vec<(u32, u32, Arc<Stream>)> = streams
1103        .iter()
1104        .filter_map(|e| {
1105            e.value()
1106                .take_pending_window_update()
1107                .map(|c| (*e.key(), c, e.value().clone()))
1108        })
1109        .collect();
1110    for (stream_id, credit, stream) in pending {
1111        // Draw the control-frame sequence from the SAME per-stream outbound
1112        // space as application data (`Stream::next_send_sequence`) so a
1113        // WINDOW_UPDATE never collides with a data packet on (stream_id,
1114        // sequence) — a collision would reuse an AEAD nonce within the epoch and
1115        // be dropped by the peer's replay window, silently starving flow control.
1116        let seq = stream.next_send_sequence();
1117        if !send_window_update(
1118            transport,
1119            crypto_session,
1120            session_id,
1121            stream_id as TransportStreamId,
1122            seq,
1123            credit,
1124        )
1125        .await
1126        {
1127            // The send failed (transient transport hiccup): re-stage the credit
1128            // so the next send-loop pass — the 10 ms tick at the latest — retries
1129            // it. Dropping it silently would under-credit the peer and could
1130            // eventually stall the sender. Credits accumulate, so a retry simply
1131            // folds back in; a permanently dead transport tears the session down
1132            // via the reader, which ends this loop.
1133            stream.stage_window_update_credit(credit);
1134        }
1135    }
1136}
1137
1138/// Drain every stream with pending data, scheduling them in strict
1139/// priority order (higher `Stream::priority()` wins). Streams of equal
1140/// priority are drained in stream-id order (deterministic so tests
1141/// don't get flaky under DashMap's hash-order shuffle).
1142///
1143/// This is **strict priority**: a stream with priority N never yields
1144/// to a stream with priority < N while it still has data. A future
1145/// weighted-fair scheduler can replace this without changing the
1146/// caller surface. Phase 4.3.
1147async fn drain_streams_priority_ordered<T: SessionTransport>(
1148    transport: &Arc<T>,
1149    crypto_session: &Arc<Session>,
1150    session_id: SessionId,
1151    streams: &Arc<DashMap<u32, Arc<Stream>>>,
1152) {
1153    // Snapshot the stream set so we can sort without holding DashMap
1154    // shard locks across awaits. Each entry is (priority, stream_id,
1155    // stream-Arc) — Arc clones are cheap (refcount bump).
1156    let mut snapshot: Vec<(u32, u32, Arc<Stream>)> = streams
1157        .iter()
1158        .map(|e| (e.value().priority(), *e.key(), e.value().clone()))
1159        .collect();
1160    // Descending priority; ties broken by stream id ascending so the
1161    // order is stable across iterations.
1162    snapshot.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
1163
1164    for (_priority, stream_id, stream) in snapshot {
1165        loop {
1166            // Bytes of new data the congestion window currently permits.
1167            // Recomputed each iteration: every send grows inflight, so the
1168            // budget shrinks and the drain stops once the window is full.
1169            let snap = crypto_session.bandwidth_snapshot();
1170            let budget = snap.cwnd_bytes.saturating_sub(snap.inflight_bytes);
1171            let Some(seg) = stream.poll_send(budget).await else {
1172                break;
1173            };
1174            // A retransmission means the prior send was lost — tell congestion
1175            // control so BBR enters FastRecovery and the pacing rate backs off.
1176            if seg.retransmit {
1177                crypto_session.on_packet_lost(seg.data.len() as u64);
1178            }
1179            let base = if seg.reliable {
1180                PacketFlags::RELIABLE
1181            } else {
1182                PacketFlags::UNRELIABLE
1183            };
1184            if !send_app_data(
1185                transport,
1186                crypto_session,
1187                session_id,
1188                stream_id as TransportStreamId,
1189                seg.seq,
1190                &seg.data,
1191                base,
1192            )
1193            .await
1194            {
1195                log::error!("PhantomSession: priority-ordered drain send failed");
1196                // `poll_send` already stamped `sent_at` on this reliable
1197                // segment, but the bytes never reached the wire. Clear it so the
1198                // next drain re-offers it immediately instead of stalling a full
1199                // RTO before the retransmit pass. Unreliable segments were
1200                // removed by `poll_send` (fire-and-forget) — nothing to reset.
1201                if seg.reliable {
1202                    stream.mark_unsent(seg.seq).await;
1203                }
1204                break;
1205            }
1206        }
1207    }
1208}
1209
1210/// Build a `DeliverySample` from a successful Stream ack callback and
1211/// feed it into the session's BBR estimator (Phase 4.4). The BBR loop
1212/// internally re-sets the pacer rate via `Session::on_packet_acked`,
1213/// so the next outbound packet is paced at the freshly-estimated
1214/// bottleneck bandwidth.
1215///
1216/// `ack_delay_us` is the V2 header's `ack_delay` field (microseconds
1217/// the receiver held the ACK before sending) — subtracted from the
1218/// observed RTT to yield the propagation delay. For V1 ACKs there is
1219/// no `ack_delay` field on the wire; pass 0 (the estimator treats
1220/// this as "no peer-side delay reported").
1221fn feed_bbr_on_ack(
1222    crypto_session: &Arc<Session>,
1223    sent_at: tokio::time::Instant,
1224    packet_bytes: u64,
1225    ack_delay_us: u64,
1226) {
1227    let sample = crate::transport::bandwidth_estimator::DeliverySample {
1228        delivered_bytes: 0, // BandwidthEstimator tracks its own counter
1229        sent_at: sent_at.into_std(),
1230        acked_at: std::time::Instant::now(),
1231        packet_bytes,
1232        is_app_limited: false,
1233        ack_delay_us,
1234    };
1235    let _ = crypto_session.on_packet_acked(sample);
1236}
1237
1238/// Wait until the pacer has tokens for `bytes` bytes. No-op when the
1239/// pacer is unlimited (the default until BBR sets a finite rate).
1240async fn pace_send(crypto_session: &Arc<Session>, bytes: u64) {
1241    let pacer = crypto_session.pacer();
1242    if !pacer.is_enabled() {
1243        return;
1244    }
1245    loop {
1246        if pacer.try_consume(bytes) {
1247            return;
1248        }
1249        let wait = pacer.time_until_available(bytes);
1250        if wait.is_zero() {
1251            // Tokens should be available; retry the consume to handle
1252            // a concurrent race with another sender.
1253            continue;
1254        }
1255        // Cap the wait to keep the loop responsive — a stale wait
1256        // estimate from a long-idle pacer is corrected on the next
1257        // iteration.
1258        let cap = std::time::Duration::from_millis(50);
1259        let wait = wait.min(cap);
1260        tokio::time::sleep(wait).await;
1261    }
1262}
1263
1264/// C1: decide whether a rekey is needed before stamping a packet for
1265/// `(stream_id, sequence)` and, if so, perform it. A rekey fires when either the
1266/// direction-wide AEAD high-watermark ([`Session::send_needs_rekey`]) or this
1267/// stream's sequence watermark ([`Session::stream_seq_needs_rekey`]) is crossed
1268/// — the latter prevents a per-stream `u32` sequence from wrapping within one
1269/// epoch and reusing the AEAD nonce `(epoch, stream_id, sequence, path_id)`
1270/// under a fixed key (Invariant 8).
1271///
1272/// Returns the extra flag bits to OR into the header (`PacketFlags::REKEY` on a
1273/// successful rotation, `0` when no rekey was needed), or `None` if a rekey was
1274/// required but failed (epoch saturated at `u8::MAX`) — the caller MUST fail the
1275/// send so the session reconnects rather than reusing a nonce.
1276fn rekey_before_stamp(
1277    crypto_session: &Arc<Session>,
1278    stream_id: TransportStreamId,
1279    sequence: u32,
1280) -> Option<u16> {
1281    if crypto_session.send_needs_rekey()
1282        || crypto_session.stream_seq_needs_rekey(stream_id, sequence)
1283    {
1284        match crypto_session.rekey() {
1285            Ok(_) => Some(PacketFlags::REKEY),
1286            Err(e) => {
1287                log::error!("PhantomSession: mid-session rekey failed: {}", e);
1288                None
1289            }
1290        }
1291    } else {
1292        Some(0)
1293    }
1294}
1295
1296/// V2 send. Builds `PhantomPacket` with `PacketFlags::ENCRYPTED` and
1297/// the negotiated rekey epoch; AEAD nonce derives from the header
1298/// (`Session::encrypt_packet`), so a failed peer decrypt no longer
1299/// desyncs the local counter.
1300async fn send_app_data<T: SessionTransport>(
1301    transport: &Arc<T>,
1302    crypto_session: &Arc<Session>,
1303    session_id: SessionId,
1304    stream_id: TransportStreamId,
1305    sequence: u32,
1306    payload: &[u8],
1307    base_flags: u16,
1308) -> bool {
1309    // Always OR in ENCRYPTED for application data.
1310    let mut flag_bits = base_flags | PacketFlags::ENCRYPTED;
1311    // Mid-session rekey (C1): rotate to a fresh key BEFORE stamping this header
1312    // when either the direction-wide AEAD high-watermark or this stream's
1313    // sequence watermark is crossed, so the header carries the new epoch (+ the
1314    // REKEY flag) and no per-stream sequence can wrap within an epoch and reuse a
1315    // nonce (Invariant 8). The peer follows on the authenticated epoch bump (it
1316    // trial-decrypts under the next key).
1317    match rekey_before_stamp(crypto_session, stream_id, sequence) {
1318        Some(extra) => flag_bits |= extra,
1319        // Epoch saturated (u8::MAX): can't rotate further. Surface as a failed
1320        // send so the caller re-offers; the session reconnects rather than wrap.
1321        None => return false,
1322    }
1323    let header = PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
1324        .with_epoch(crypto_session.current_epoch());
1325    let ciphertext = match crypto_session.encrypt_packet(&header, payload) {
1326        Ok(c) => c,
1327        Err(e) => {
1328            log::error!("PhantomSession: encrypt_packet failed: {}", e);
1329            return false;
1330        }
1331    };
1332    let packet = PhantomPacket::new(header, ciphertext);
1333    let buf = packet.to_wire();
1334    let size = buf.len();
1335    // Pacing is a wire-rate limiter, so it consumes the full on-wire size.
1336    pace_send(crypto_session, size as u64).await;
1337    if let Err(e) = transport.send_bytes(&buf[..size]).await {
1338        log::error!("PhantomSession: transport send failed: {}", e);
1339        return false;
1340    }
1341    // Inflight/cwnd accounting MUST use the same unit the ACK and loss paths
1342    // settle in. `Stream::ack` returns and `on_packet_lost` subtracts the
1343    // segment's *payload* length (`seg.data.len()`), so the send side has to add
1344    // the payload length too — adding the full wire size here leaked ~69 bytes
1345    // (header + length prefixes + AEAD tag) of phantom inflight per packet,
1346    // which silently exhausted the congestion window after a few dozen packets
1347    // and stalled long-lived sessions. (Bandwidth/BDP derive from acked bytes,
1348    // so they stay in the same payload unit.)
1349    crypto_session.on_packet_sent(payload.len() as u64);
1350    true
1351}
1352
1353/// Emit a V2 WINDOW_UPDATE packet announcing `new_window` bytes of
1354/// receive capacity for `stream_id`. Encrypted under the current
1355/// session epoch (Phase 4.3 flow control).
1356async fn send_window_update<T: SessionTransport>(
1357    transport: &Arc<T>,
1358    crypto_session: &Arc<Session>,
1359    session_id: SessionId,
1360    stream_id: TransportStreamId,
1361    sequence: u32,
1362    new_window: u32,
1363) -> bool {
1364    let mut flag_bits = PacketFlags::ENCRYPTED | PacketFlags::WINDOW_UPDATE;
1365    // WINDOW_UPDATE shares the per-stream sequence space with application data,
1366    // so it must obey the same C1 rekey discipline before stamping (Invariant 8).
1367    match rekey_before_stamp(crypto_session, stream_id, sequence) {
1368        Some(extra) => flag_bits |= extra,
1369        None => return false,
1370    }
1371    let header = PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
1372        .with_epoch(crypto_session.current_epoch());
1373    let payload = new_window.to_be_bytes();
1374    let ciphertext = match crypto_session.encrypt_packet(&header, &payload) {
1375        Ok(c) => c,
1376        Err(e) => {
1377            log::error!("PhantomSession: WINDOW_UPDATE encrypt failed: {}", e);
1378            return false;
1379        }
1380    };
1381    let packet = PhantomPacket::new(header, ciphertext);
1382    let buf = packet.to_wire();
1383    if let Err(e) = transport.send_bytes(&buf).await {
1384        log::error!("PhantomSession: WINDOW_UPDATE send failed: {}", e);
1385        return false;
1386    }
1387    true
1388}
1389
1390/// Emit a V2 PATH_VALIDATION packet on `path_id` carrying the given
1391/// 32-byte challenge or response payload. Encrypted under the current
1392/// session epoch.
1393async fn send_path_validation<T: SessionTransport>(
1394    transport: &Arc<T>,
1395    crypto_session: &Arc<Session>,
1396    session_id: SessionId,
1397    path_id: u8,
1398    sequence: u32,
1399    payload: [u8; crate::transport::path::PATH_CHALLENGE_LEN],
1400) -> bool {
1401    // Build the packet skeleton via the codec, then layer ENCRYPTED
1402    // and epoch on top before the actual encrypt.
1403    let mut packet = build_path_validation_packet(session_id, path_id, sequence, payload);
1404    let flag_bits = packet.header.flags.0 | PacketFlags::ENCRYPTED;
1405    packet.header.flags = PacketFlags::new(flag_bits);
1406    packet.header.epoch = crypto_session.current_epoch();
1407    let plaintext = std::mem::take(&mut packet.payload);
1408    let ciphertext = match crypto_session.encrypt_packet(&packet.header, &plaintext) {
1409        Ok(c) => c,
1410        Err(e) => {
1411            log::error!("PhantomSession: PATH_VALIDATION encrypt failed: {}", e);
1412            return false;
1413        }
1414    };
1415    packet.payload = ciphertext;
1416    let buf = packet.to_wire();
1417    if let Err(e) = transport.send_bytes(&buf).await {
1418        log::error!("PhantomSession: PATH_VALIDATION send failed: {}", e);
1419        return false;
1420    }
1421    true
1422}
1423
1424/// Recv-side handler for a packet:
1425/// - session-id guard → drop any frame not stamped with the negotiated
1426///   session id before touching any state (H1).
1427/// - decrypt (REQUIRED on application data — a non-empty unencrypted
1428///   post-handshake packet is a downgrade indicator and is dropped).
1429/// - ACK (now `ENCRYPTED | ACK`, post-decrypt) → read the authenticated
1430///   4-byte acked sequence from the payload, feed BBR + route to the
1431///   stream / demux. Forged/plaintext ACKs cannot reach this path (H1).
1432/// - PATH_VALIDATION flag → drive the path registry: verify against an
1433///   outstanding challenge if one exists, otherwise echo the payload
1434///   back as a response.
1435/// - WINDOW_UPDATE flag → apply the peer's announced flow-control window.
1436/// - COALESCED flag → split the decrypted bundle into sub-payloads and
1437///   route each through the demux as an independent application chunk.
1438#[allow(clippy::too_many_arguments)]
1439async fn handle_packet<T: SessionTransport>(
1440    packet: PhantomPacket,
1441    session_id: SessionId,
1442    crypto_recv: &Arc<Session>,
1443    streams_recv: &Arc<DashMap<u32, Arc<Stream>>>,
1444    demux_recv: &Arc<StreamDemultiplexer>,
1445    transport_send_ack: &Arc<T>,
1446    transport_for_path: &Arc<T>,
1447    // The reader hands decrypted application data to the delivery task via
1448    // this unbounded channel instead of blocking on `recv_tx`/the demux — so a
1449    // slow `recv()` consumer can never head-of-line-stall inbound ACK/control.
1450    deliver_tx: &mpsc::UnboundedSender<(u32, Bytes)>,
1451    undelivered_bytes: &AtomicU64,
1452    ack_buf: &mut Vec<u8>,
1453    path_validation_seq: &mut u32,
1454    observability: &Observability,
1455    leg: LegType,
1456) {
1457    let stream_id: u32 = packet.header.stream_id.into();
1458    let path_id = packet.header.path_id;
1459
1460    // Bind every inbound frame to the negotiated session (H1). A frame stamped
1461    // with a different session id is dropped before any state mutation, so
1462    // cross-session / off-path control injection (forged ACK/FIN) can never
1463    // reach the stream table, BBR, or the path registry. Application data also
1464    // binds session_id through the AEAD AAD; this guard extends the same
1465    // protection to the non-AEAD header inspection that follows.
1466    if packet.header.session_id != session_id {
1467        return;
1468    }
1469
1470    // Mark path activity even before decrypt (the path id is plaintext
1471    // header bytes; this is just a liveness signal for the sweep).
1472    crypto_recv.mark_path_seen(path_id);
1473
1474    // NOTE: ACK/FIN are NO LONGER processed here, pre-decrypt. They are
1475    // authenticated `ENCRYPTED | ACK` control frames now (H1) and are handled
1476    // *after* the AEAD gate below — see the ACK branch following the decrypt.
1477
1478    // Decrypt if marked. V2 sessions REQUIRE ENCRYPTED on application
1479    // data — a non-empty unencrypted V2 application-data packet is a
1480    // downgrade indicator and is dropped (same posture as V1).
1481    let plaintext: Vec<u8> = if packet.header.flags.contains(PacketFlags::ENCRYPTED) {
1482        // Accept a single authenticated forward rekey step (C1): if this
1483        // packet's epoch is one ahead, the peer rekeyed — trial-decrypt under
1484        // the next key and only commit the ratchet on AEAD success, so a forged
1485        // epoch can't desync us. Same-epoch packets take the ordinary path.
1486        match crypto_recv.decrypt_packet_accepting_rekey(&packet.header, &packet.payload) {
1487            Ok(pt) => pt,
1488            Err(e) => {
1489                // Distinguish the two drop reasons for the security metrics: a
1490                // post-AEAD sliding-window replay reject vs an AEAD-verify
1491                // failure (Invariant 4 — replay is checked after AEAD opens).
1492                // decrypt_packet doesn't surface old-vs-duplicate, so record the
1493                // representative `Duplicate` reason.
1494                if matches!(e, CoreError::ReplayDetected(_)) {
1495                    observability.record_replay_rejected(ReplayReason::Duplicate);
1496                } else {
1497                    observability.record_aead_failure(leg, AeadAlgorithm::Aes256Gcm);
1498                }
1499                log::warn!("PhantomSession: V2 decrypt failed (dropping packet): {}", e);
1500                return;
1501            }
1502        }
1503    } else if !packet.payload.is_empty() {
1504        // Stripped-flag downgrade defense (Invariant 2): a non-empty unencrypted
1505        // post-handshake application packet is dropped.
1506        observability.record_unencrypted_dropped(leg);
1507        log::warn!(
1508            "PhantomSession: dropping unencrypted V2 post-handshake data packet (downgrade?)"
1509        );
1510        return;
1511    } else {
1512        Vec::new()
1513    };
1514
1515    // Authenticated ACK (H1). ACKs are `ENCRYPTED | ACK` control frames whose
1516    // AEAD payload carries the acked data sequence as 4 big-endian bytes. We act
1517    // on the ACK only *after* AEAD verify, which authenticates the header
1518    // (including `session_id` and `ack_delay`) and the acked-seq payload — so a
1519    // forged or stripped-flag ACK (dropped above by the downgrade defense, or
1520    // failing this length check) can neither retire a pending segment, restore a
1521    // flow-control permit, poison BBR, nor close a stream.
1522    if packet.header.flags.contains(PacketFlags::ACK) {
1523        if plaintext.len() != 4 {
1524            log::warn!(
1525                "PhantomSession: ACK payload length {} (expected 4)",
1526                plaintext.len()
1527            );
1528            return;
1529        }
1530        let acked_seq =
1531            u32::from_be_bytes([plaintext[0], plaintext[1], plaintext[2], plaintext[3]]);
1532        if let Some(stream) = streams_recv.get(&stream_id) {
1533            if let Some((sent_at, bytes)) = stream.ack(acked_seq).await {
1534                feed_bbr_on_ack(crypto_recv, sent_at, bytes, packet.header.ack_delay as u64);
1535            }
1536        }
1537        // Best-effort, non-blocking: the demux/PhantomStream path is vestigial;
1538        // routing the ACK/close notification to it must never block the reader.
1539        demux_recv.route_ack(stream_id, acked_seq);
1540        if packet.header.flags.contains(PacketFlags::FIN) {
1541            demux_recv.route_close(stream_id);
1542        }
1543        return;
1544    }
1545
1546    // WINDOW_UPDATE dispatch (Phase 4.3 flow control). Payload is a
1547    // big-endian u32 carrying relative flow-control credit — the bytes the
1548    // peer's application just consumed, which we ADD to our send window.
1549    if packet.header.flags.contains(PacketFlags::WINDOW_UPDATE) {
1550        if plaintext.len() != 4 {
1551            log::warn!(
1552                "PhantomSession: WINDOW_UPDATE payload length {} (expected 4)",
1553                plaintext.len()
1554            );
1555            return;
1556        }
1557        let credit = u32::from_be_bytes([plaintext[0], plaintext[1], plaintext[2], plaintext[3]]);
1558        if let Some(stream) = streams_recv.get(&stream_id) {
1559            // Relative-credit flow control — add the granted credit, then
1560            // wake the send loop so a window-blocked sender resumes immediately
1561            // instead of waiting a full poll tick.
1562            stream.apply_peer_window_update(credit);
1563            crypto_recv.notify_outbound_ready();
1564        }
1565        return;
1566    }
1567
1568    // PATH_VALIDATION dispatch (Phase 4.2): the codec inspects the *plaintext*
1569    // because the wire packet was sealed by the AEAD layer.
1570    if packet.header.flags.contains(PacketFlags::PATH_VALIDATION) {
1571        if plaintext.len() != crate::transport::path::PATH_CHALLENGE_LEN {
1572            log::warn!(
1573                "PhantomSession: PATH_VALIDATION plaintext length {} (expected {})",
1574                plaintext.len(),
1575                crate::transport::path::PATH_CHALLENGE_LEN
1576            );
1577            return;
1578        }
1579        let mut payload_buf = [0u8; crate::transport::path::PATH_CHALLENGE_LEN];
1580        payload_buf.copy_from_slice(&plaintext);
1581        // If we have an in-flight challenge on this path, try to
1582        // verify against it. If verification succeeds, the path
1583        // transitions to Validated and we're done. If it fails, the
1584        // registry already transitioned to Failed — also done.
1585        match crypto_recv.path_state(path_id) {
1586            Some(crate::transport::path::PathStateKind::Validating) => {
1587                let _ = crypto_recv.complete_path_validation(path_id, &payload_buf);
1588                return;
1589            }
1590            Some(crate::transport::path::PathStateKind::Validated)
1591            | Some(crate::transport::path::PathStateKind::Failed) => {
1592                // Terminal state — ignore.
1593                return;
1594            }
1595            _ => {
1596                // Unknown or Unvalidated: treat this packet as an
1597                // incoming challenge and echo the payload back as our
1598                // response. The remote will then verify it against its
1599                // own pending challenge.
1600                let seq = *path_validation_seq;
1601                *path_validation_seq = path_validation_seq.wrapping_add(1);
1602                let _ = send_path_validation(
1603                    transport_for_path,
1604                    crypto_recv,
1605                    session_id,
1606                    path_id,
1607                    seq,
1608                    payload_buf,
1609                )
1610                .await;
1611                return;
1612            }
1613        }
1614    }
1615
1616    // PATH-001 (Invariant 6): application data is delivered only on a Validated
1617    // path. Path 0 is pre-validated at session establishment; any other path_id
1618    // must complete a PATH_VALIDATION challenge/response first. The control
1619    // frames (ACK / WINDOW_UPDATE / PATH_VALIDATION) were handled above and
1620    // returned; this gates the app-data delivery branches below. It runs AFTER
1621    // AEAD verify, so it never acts on an attacker-chosen plaintext path_id that
1622    // fails decryption.
1623    if !matches!(
1624        crypto_recv.path_state(path_id),
1625        Some(crate::transport::path::PathStateKind::Validated)
1626    ) {
1627        // Track a first-seen path id so a future challenge/response can promote
1628        // it; drop the data until then.
1629        crypto_recv.register_unvalidated_path(path_id);
1630        log::warn!(
1631            "PhantomSession: dropping application data on non-validated path_id {}",
1632            path_id
1633        );
1634        return;
1635    }
1636
1637    // COALESCED dispatch (Phase 2.5): split the decrypted bundle into
1638    // sub-payloads and hand each, IN ORDER, to the single FIFO delivery task.
1639    // The delivery task drains them in this order, so the bundle's internal
1640    // ordering (and its order relative to later frames) is preserved —
1641    // the reader never blocks on application delivery.
1642    if packet.header.flags.contains(PacketFlags::COALESCED) {
1643        let inner_for_codec = PhantomPacket {
1644            header: packet.header,
1645            payload: plaintext,
1646            extensions: Vec::new(),
1647        };
1648        match unwrap_coalesced_packet(&inner_for_codec) {
1649            Ok(Some(subs)) => {
1650                for sub in subs {
1651                    if sub.is_empty() {
1652                        continue;
1653                    }
1654                    // Count toward the backlog only once the item is actually
1655                    // enqueued. If the delivery task has exited (consumer gone,
1656                    // `deliver_rx` dropped) the send fails and we must not inflate
1657                    // `undelivered_bytes` for data that was discarded.
1658                    let len = sub.len() as u64;
1659                    if deliver_tx.send((stream_id, Bytes::from(sub))).is_ok() {
1660                        undelivered_bytes.fetch_add(len, Ordering::AcqRel);
1661                    }
1662                }
1663            }
1664            Ok(None) => {
1665                log::warn!("PhantomSession: COALESCED flag set but bundle didn't parse");
1666            }
1667            Err(e) => {
1668                log::warn!("PhantomSession: COALESCED parse error: {}", e);
1669            }
1670        }
1671        // Bundles do not auto-ACK at the outer level — sub-packets are not
1672        // independently sequenced and the outer sequence has already been
1673        // consumed by the replay window.
1674        return;
1675    }
1676
1677    // Reliable application data → emit an authenticated ACK **inline in the
1678    // reader** (H1). The ACK is an `ENCRYPTED | ACK` control frame whose AEAD
1679    // payload carries the acked data sequence (4 big-endian bytes); the peer
1680    // acts on it only after AEAD verify, so it cannot be forged off-path. The
1681    // ACK's own `header.sequence` is drawn from this side's per-stream send
1682    // counter — shared with our data/window-update sends on the same stream — so
1683    // `(epoch, stream_id, sequence, path_id)` is unique and never collides with
1684    // our outbound data (the nonce-reuse trap). It obeys the C1 rekey discipline
1685    // and stays prompt even when the app consumer is slow (encrypt is a lock-free
1686    // ArcSwap load). "ACK" means "received, decrypted, replay-passed, accepted."
1687    if packet.header.flags.contains(PacketFlags::RELIABLE) {
1688        let local = streams_recv
1689            .entry(stream_id)
1690            .or_insert_with(|| Arc::new(Stream::new(stream_id as TransportStreamId)))
1691            .clone();
1692        let ack_seq = local.next_send_sequence();
1693        let mut ack_flag_bits = PacketFlags::ENCRYPTED | PacketFlags::ACK;
1694        match rekey_before_stamp(crypto_recv, stream_id as TransportStreamId, ack_seq) {
1695            Some(extra) => ack_flag_bits |= extra,
1696            // Epoch saturated — drop this ACK rather than reuse a nonce; the
1697            // sender retransmits and the session is expected to reconnect.
1698            None => return,
1699        }
1700        let ack_header = PacketHeader::new(
1701            session_id,
1702            stream_id as TransportStreamId,
1703            ack_seq,
1704            PacketFlags::new(ack_flag_bits),
1705        )
1706        .with_epoch(crypto_recv.current_epoch())
1707        .with_path_id(path_id);
1708        let ack_payload = packet.header.sequence.to_be_bytes();
1709        match crypto_recv.encrypt_packet(&ack_header, &ack_payload) {
1710            Ok(ct) => {
1711                let ack_packet = PhantomPacket::new(ack_header, ct);
1712                ack_buf.clear();
1713                ack_buf.extend_from_slice(&ack_packet.to_wire());
1714                let size = ack_buf.len();
1715                let _ = transport_send_ack.send_bytes(&ack_buf[..size]).await;
1716            }
1717            Err(e) => log::error!("PhantomSession: ACK encrypt failed: {}", e),
1718        }
1719    }
1720
1721    // Hand the decrypted application data to the delivery task: unbounded +
1722    // non-blocking, so the reader never stalls on a slow `recv()` consumer. The
1723    // delivery task drains the app-paced `recv_tx.send()` and credits the
1724    // flow-control window. `undelivered_bytes` is the backlog counter the
1725    // reader's HARD_CAP gate watches — counted only on a successful enqueue, so
1726    // a dead delivery task (consumer gone) can't inflate it for discarded data.
1727    if !plaintext.is_empty() {
1728        let len = plaintext.len() as u64;
1729        if deliver_tx.send((stream_id, Bytes::from(plaintext))).is_ok() {
1730            undelivered_bytes.fetch_add(len, Ordering::AcqRel);
1731        }
1732    }
1733
1734    if packet.header.flags.contains(PacketFlags::FIN) {
1735        demux_recv.route_close(stream_id);
1736    }
1737}
1738
1739// Internal-only methods — deliberately NOT on the `#[uniffi::export]` surface.
1740// `set_state` mutates the connection state machine; a foreign caller forcing
1741// `Connected` mid-handshake would make `is_data_ready()` lie and let `send()`
1742// bypass the queue, or `Closed` without tearing down the pump.
1743impl PhantomSession {
1744    /// Transition to a new connection state. Crate-internal: driven by the
1745    /// handshake task and teardown only.
1746    pub(crate) fn set_state(&self, new_state: ConnectionState) {
1747        self.state.store(new_state as u8, Ordering::Relaxed);
1748    }
1749
1750    /// Session observability handle (Rust-only — `Observability` is not a
1751    /// UniFFI type). For a server-accepted session this is the
1752    /// `PhantomListener`'s shared instance; for a client it is the session's
1753    /// own. Read `.snapshot()` for the lock-free metric counters.
1754    pub fn observability(&self) -> Arc<Observability> {
1755        self.observability.clone()
1756    }
1757}
1758
1759#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
1760impl PhantomSession {
1761    /// Create a new session — returns instantly.
1762    ///
1763    /// Handshake is not started until a transport is provided.
1764    /// Use `connect_with_transport()` for full integration.
1765    #[cfg_attr(feature = "bindings", uniffi::constructor)]
1766    pub fn connect(peer_addr: String) -> Arc<Self> {
1767        let (cmd_tx, cmd_rx) = mpsc::channel(256);
1768        let (_recv_tx, recv_rx) = mpsc::channel(256);
1769
1770        let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
1771        let streams = Arc::new(DashMap::new());
1772        Arc::new(Self {
1773            id: new_session_id(),
1774            peer_addr,
1775            state: Arc::new(AtomicU8::new(ConnectionState::Connecting as u8)),
1776            send_queue: Arc::new(Mutex::new(Vec::new())),
1777            cmd_tx,
1778            cmd_rx: Mutex::new(Some(cmd_rx)),
1779            recv_rx: Mutex::new(recv_rx),
1780            demux: Arc::new(demux),
1781            streams,
1782            inner_session: Arc::new(Mutex::new(None)),
1783            early_data_accepted: Arc::new(Mutex::new(None)),
1784            // Placeholder session (no transport / pump yet); a no-op holder
1785            // until `connect_with_transport` spawns the real pump.
1786            observability: Observability::new(ObservabilityConfig::default()),
1787        })
1788    }
1789
1790    /// Open a new multiplexed stream
1791    pub fn open_stream(&self) -> Arc<crate::api::stream::PhantomStream> {
1792        let handle = self.demux.open_stream(1024);
1793        let stream_id = handle.stream_id;
1794
1795        let transport_stream = Arc::new(Stream::new(stream_id as TransportStreamId));
1796        self.streams.insert(stream_id, transport_stream);
1797
1798        Arc::new(crate::api::stream::PhantomStream::new(
1799            handle,
1800            self.cmd_tx.clone(),
1801        ))
1802    }
1803
1804    /// Send data through the session.
1805    ///
1806    /// - If the session is connected: sends immediately
1807    /// - If still handshaking: queues the data for auto-flush later
1808    pub async fn send(&self, data: Vec<u8>) -> Result<(), CoreError> {
1809        let state = self.connection_state();
1810
1811        if state.is_data_ready() {
1812            // Channel is up — send directly
1813            self.cmd_tx
1814                .send(SessionCommand::Send(data))
1815                .await
1816                .map_err(|_| CoreError::NetworkError("Session closed".into()))?;
1817        } else if state == ConnectionState::Connecting {
1818            // Still handshaking — queue
1819            self.send_queue.lock().await.push(data);
1820        } else {
1821            return Err(CoreError::NetworkError(format!(
1822                "Cannot send in state {:?}",
1823                state
1824            )));
1825        }
1826
1827        Ok(())
1828    }
1829
1830    /// Receive data from the session.
1831    ///
1832    /// Internally the recv pipeline keeps payloads as `Bytes` to avoid the
1833    /// per-packet Vec clone that used to fan out to the stream demux. The
1834    /// FFI surface still hands callers a `Vec<u8>`; if this is the last
1835    /// refcount the Vec is moved out of the underlying buffer, otherwise
1836    /// `Bytes::to_vec` copies.
1837    pub async fn recv(&self) -> Result<Vec<u8>, CoreError> {
1838        let mut rx = self.recv_rx.lock().await;
1839        let bytes = rx
1840            .recv()
1841            .await
1842            .ok_or_else(|| CoreError::NetworkError("Session closed".into()))?;
1843        Ok(bytes.to_vec())
1844    }
1845
1846    /// Get the current connection state (lock-free).
1847    pub fn connection_state(&self) -> ConnectionState {
1848        ConnectionState::from_u8(self.state.load(Ordering::Relaxed))
1849    }
1850
1851    /// Whether the session is ready for data transmission.
1852    pub fn is_data_ready(&self) -> bool {
1853        self.connection_state().is_data_ready()
1854    }
1855
1856    /// Whether the session has full PQC protection.
1857    pub fn is_pqc_ready(&self) -> bool {
1858        matches!(
1859            self.connection_state(),
1860            ConnectionState::PqcReady | ConnectionState::Connected
1861        )
1862    }
1863
1864    /// Flush all queued messages (called when handshake completes).
1865    pub async fn flush_queue(&self) -> Result<u32, CoreError> {
1866        let mut queue = self.send_queue.lock().await;
1867        let count = queue.len() as u32;
1868        for msg in queue.drain(..) {
1869            self.cmd_tx
1870                .send(SessionCommand::Send(msg))
1871                .await
1872                .map_err(|_| CoreError::NetworkError("Session closed during flush".into()))?;
1873        }
1874        Ok(count)
1875    }
1876
1877    /// Number of messages queued (waiting for handshake).
1878    pub async fn queued_count(&self) -> u32 {
1879        self.send_queue.lock().await.len() as u32
1880    }
1881
1882    /// Session identifier.
1883    pub fn id(&self) -> String {
1884        self.id.clone()
1885    }
1886
1887    /// Target peer address.
1888    pub fn peer_addr(&self) -> String {
1889        self.peer_addr.clone()
1890    }
1891
1892    /// The 0-RTT verdict for this session.
1893    ///
1894    /// - `None` — still handshaking, the handshake failed, or the client sent
1895    ///   no early-data on this connect.
1896    /// - `Some(true)` — the server consumed the 0-RTT early-data.
1897    /// - `Some(false)` — the client sent early-data and the server rejected it
1898    ///   (stale/unknown ticket, oversized blob, or AEAD failure). The caller
1899    ///   must re-send that payload over the normal channel.
1900    pub async fn early_data_accepted(&self) -> Option<bool> {
1901        *self.early_data_accepted.lock().await
1902    }
1903
1904    /// Extract a [`ResumptionHint`] for a future 0-RTT reconnect.
1905    ///
1906    /// Returns `Some` after a successful handshake; `None` while still
1907    /// handshaking, after a failure, or before the inner session has
1908    /// been published.
1909    ///
1910    /// Store the hint alongside the pinned `HybridVerifyingKey` of the
1911    /// server it was negotiated against and feed it back to
1912    /// [`connect_pinned_with_resumption`]. Reusing a hint across
1913    /// servers is a configuration bug — the `resumption_secret` is
1914    /// server-pinned.
1915    pub async fn resumption_hint(&self) -> Option<ResumptionHint> {
1916        let guard = self.inner_session.lock().await;
1917        guard
1918            .as_ref()
1919            .and_then(|s| s.resumption_hint())
1920            .map(|(session_id, resumption_secret)| ResumptionHint {
1921                session_id: session_id.to_vec(),
1922                resumption_secret: resumption_secret.to_vec(),
1923            })
1924    }
1925
1926    /// Current rekey epoch of the established session (`None` while still
1927    /// connecting). Rust-only — used by soak / integration tests to confirm
1928    /// that automatic mid-session rekey (C1) advanced the epoch.
1929    pub async fn current_epoch(&self) -> Option<u8> {
1930        self.inner_session
1931            .lock()
1932            .await
1933            .as_ref()
1934            .map(|s| s.current_epoch())
1935    }
1936
1937    /// Override the automatic-rekey send-invocation high-watermark on the
1938    /// established session (default `REKEY_SOFT_LIMIT`). Returns `false` if
1939    /// the session is still connecting. Rust-only — primarily for soak/load
1940    /// harnesses that need to exercise mid-session rekey without sending `2^47`
1941    /// packets.
1942    pub async fn set_rekey_threshold(&self, n: u64) -> bool {
1943        match self.inner_session.lock().await.as_ref() {
1944            Some(s) => {
1945                s.set_rekey_threshold(n);
1946                true
1947            }
1948            None => false,
1949        }
1950    }
1951
1952    /// Send the graceful close frame and shut the session down.
1953    ///
1954    /// Named `disconnect` rather than `close` because UniFFI's Kotlin
1955    /// generator unconditionally adds `AutoCloseable.close()` to every
1956    /// object, and a Rust-side `close` here would conflict with it.
1957    pub async fn disconnect(&self) -> Result<(), CoreError> {
1958        self.set_state(ConnectionState::Closed);
1959        let _ = self.cmd_tx.send(SessionCommand::Close).await;
1960        Ok(())
1961    }
1962}
1963
1964impl PhantomSession {
1965    /// Get the stream demultiplexer (internal use, not exposed to UniFFI)
1966    pub fn demux(&self) -> Arc<StreamDemultiplexer> {
1967        self.demux.clone()
1968    }
1969}
1970
1971impl std::fmt::Debug for PhantomSession {
1972    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1973        f.debug_struct("PhantomSession")
1974            .field("id", &self.id)
1975            .field("peer", &self.peer_addr)
1976            .field("state", &self.connection_state())
1977            .finish()
1978    }
1979}
1980
1981// ─── Pinned-Connect Shim (Phase 7.2 mobile bridge) ──────────────────────────
1982//
1983// `connect_with_transport` itself can't cross the UniFFI boundary directly —
1984// it takes a generic `T: SessionTransport` trait object and a typed
1985// `HybridVerifyingKey`, neither of which is a UniFFI primitive. Mobile
1986// callers (iOS / Android) need a single async entry point that opens a TCP
1987// connection, wraps it in `TcpSessionTransport`, parses the pinned key from
1988// bytes (per security invariant 1 in SECURITY.md), and hands back an
1989// `Arc<PhantomSession>` ready for `send` / `recv`.
1990//
1991// Native-only: `TcpSessionTransport` lives behind `cfg(not(target_arch =
1992// "wasm32"))`, mirroring `crate::api::tcp_transport`. Wasm consumers use
1993// the in-tree `WebSocketLeg` instead.
1994#[cfg(not(target_arch = "wasm32"))]
1995#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
1996pub async fn connect_pinned(
1997    host: String,
1998    port: u16,
1999    pinned_key: Vec<u8>,
2000) -> Result<Arc<PhantomSession>, CoreError> {
2001    // fips bootstrap POST gate (same policy as
2002    // `PhantomListener::bind_inner`). A failure here aborts the
2003    // connect before any socket is opened or key material is
2004    // touched.
2005    #[cfg(feature = "fips")]
2006    crate::crypto::self_tests::ensure_post_passed()
2007        .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
2008
2009    // Decode the server's hybrid verifying key. A malformed blob is a
2010    // crypto-layer problem (wrong length, wrong encoding) rather than a
2011    // network failure — surface it as `CryptoError`.
2012    let expected_server_key = HybridVerifyingKey::from_bytes(&pinned_key)
2013        .map_err(|e| CoreError::CryptoError(format!("invalid pinned key: {}", e)))?;
2014
2015    // Open the TCP stream. The `format!` is shared between the actual
2016    // connect target and the `peer_addr` recorded inside the session
2017    // (`connect_with_transport` takes it as a free-form string).
2018    let addr = format!("{}:{}", host, port);
2019    let stream = tokio::net::TcpStream::connect(&addr)
2020        .await
2021        .map_err(|e| CoreError::NetworkError(format!("connect {}: {}", addr, e)))?;
2022    let transport = crate::api::tcp_transport::TcpSessionTransport::new(stream);
2023
2024    // The handshake is driven by the background task spawned inside
2025    // `connect_with_transport`; the returned `PhantomSession` is usable
2026    // immediately (state `Connecting`, sends auto-queued until ready).
2027    let session = PhantomSession::connect_with_transport(&addr, transport, expected_server_key);
2028    Ok(Arc::new(session))
2029}
2030
2031/// Connect to a pinned server with a **0-RTT resumption attempt** — the
2032/// resumption-aware analogue of [`connect_pinned`].
2033///
2034/// `hint` is a [`ResumptionHint`] from a prior session's
2035/// [`PhantomSession::resumption_hint`]; both of its fields must be
2036/// exactly 32 bytes or the call fails with `ValidationError` before any
2037/// socket is opened. `early_data` (≤ 16 KiB) is sealed into the resuming
2038/// ClientHello so it reaches the server on the very first flight.
2039///
2040/// Acceptance is best-effort: when the server does not consume the early-data
2041/// (stale/unknown ticket or AEAD failure) the handshake completes 1-RTT — the
2042/// caller checks [`PhantomSession::early_data_accepted`] and re-sends over the
2043/// normal channel when it is not `Some(true)`.
2044///
2045/// Native-only, like [`connect_pinned`]: `TcpSessionTransport` lives
2046/// behind `cfg(not(target_arch = "wasm32"))`.
2047#[cfg(not(target_arch = "wasm32"))]
2048#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
2049pub async fn connect_pinned_with_resumption(
2050    host: String,
2051    port: u16,
2052    pinned_key: Vec<u8>,
2053    hint: ResumptionHint,
2054    early_data: Vec<u8>,
2055) -> Result<Arc<PhantomSession>, CoreError> {
2056    // fips bootstrap POST gate (same policy as
2057    // `connect_pinned`).
2058    #[cfg(feature = "fips")]
2059    crate::crypto::self_tests::ensure_post_passed()
2060        .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
2061
2062    // Server-key pinning stays mandatory (security invariant 1): a
2063    // malformed blob is a crypto-layer problem, surfaced as `CryptoError`.
2064    let expected_server_key = HybridVerifyingKey::from_bytes(&pinned_key)
2065        .map_err(|e| CoreError::CryptoError(format!("invalid pinned key: {}", e)))?;
2066
2067    // `ResumptionHint` fields are `Vec<u8>` (UniFFI has no fixed-size
2068    // array type) — enforce the 32-byte invariant here, before any
2069    // socket is opened, so a caller bug never becomes a network call.
2070    let session_id: [u8; 32] = hint.session_id.as_slice().try_into().map_err(|_| {
2071        CoreError::ValidationError(format!(
2072            "resumption hint session_id must be 32 bytes, got {}",
2073            hint.session_id.len()
2074        ))
2075    })?;
2076    let resumption_secret: [u8; 32] =
2077        hint.resumption_secret.as_slice().try_into().map_err(|_| {
2078            CoreError::ValidationError(format!(
2079                "resumption hint resumption_secret must be 32 bytes, got {}",
2080                hint.resumption_secret.len()
2081            ))
2082        })?;
2083
2084    // APIFFI-03: reject oversized early-data BEFORE opening a socket, so a caller
2085    // bug (or oversized blob) never wastes a TCP connection establishment. The
2086    // inner `connect_with_resumption` enforces the same cap as defense-in-depth.
2087    if early_data.len() > EARLY_DATA_MAX_LEN {
2088        return Err(CoreError::ValidationError(format!(
2089            "early_data is {} bytes, exceeds the {}-byte 0-RTT cap",
2090            early_data.len(),
2091            EARLY_DATA_MAX_LEN
2092        )));
2093    }
2094
2095    let addr = format!("{}:{}", host, port);
2096    let stream = tokio::net::TcpStream::connect(&addr)
2097        .await
2098        .map_err(|e| CoreError::NetworkError(format!("connect {}: {}", addr, e)))?;
2099    let transport = crate::api::tcp_transport::TcpSessionTransport::new(stream);
2100
2101    // Reuses the Rust-only `connect_with_resumption` — no new crypto and
2102    // no new wire format. That path enforces the `EARLY_DATA_MAX_LEN`
2103    // cap and keeps 0-RTT one-shot / best-effort (security invariant 9).
2104    let session = PhantomSession::connect_with_resumption(
2105        &addr,
2106        transport,
2107        expected_server_key,
2108        (session_id, resumption_secret),
2109        early_data,
2110    )?;
2111    Ok(Arc::new(session))
2112}
2113
2114#[cfg(test)]
2115mod tests {
2116    use super::*;
2117    use crate::transport::handshake::{ClientHello, HandshakeResponse, HandshakeServer};
2118
2119    // ── Mock transport for testing ──
2120
2121    /// In-memory transport using channels (simulates a loopback pipe).
2122    struct ChannelTransport {
2123        tx: mpsc::Sender<Vec<u8>>,
2124        rx: Mutex<mpsc::Receiver<Vec<u8>>>,
2125    }
2126
2127    impl ChannelTransport {
2128        /// Create a pair of connected transports (client ↔ server).
2129        fn pair() -> (Self, Self) {
2130            let (a_tx, b_rx) = mpsc::channel(64);
2131            let (b_tx, a_rx) = mpsc::channel(64);
2132            (
2133                Self {
2134                    tx: a_tx,
2135                    rx: Mutex::new(a_rx),
2136                },
2137                Self {
2138                    tx: b_tx,
2139                    rx: Mutex::new(b_rx),
2140                },
2141            )
2142        }
2143    }
2144
2145    impl SessionTransport for ChannelTransport {
2146        async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
2147            self.tx
2148                .send(data.to_vec())
2149                .await
2150                .map_err(|_| CoreError::NetworkError("channel closed".into()))
2151        }
2152
2153        async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
2154            let mut rx = self.rx.lock().await;
2155            let v = rx
2156                .recv()
2157                .await
2158                .ok_or_else(|| CoreError::NetworkError("channel closed".into()))?;
2159            Ok(Bytes::from(v))
2160        }
2161    }
2162
2163    // ── Tests ──
2164
2165    /// H9 forward-compat (client side): when the server answers a `ClientHello`
2166    /// with a typed `ServerReject` (the version isn't one it speaks), the client
2167    /// surfaces a clear version-mismatch error instead of hanging or returning a
2168    /// generic failure — and crucially does NOT auto-downgrade.
2169    #[tokio::test]
2170    async fn client_surfaces_server_reject_as_version_error() {
2171        use crate::transport::handshake::ServerReject;
2172
2173        let (client_transport, server_transport) = ChannelTransport::pair();
2174        // The reject path errors before any key verification, so any key works.
2175        let (_sk, expected_vk) = crate::crypto::hybrid_sign::HybridSigningKey::generate();
2176
2177        let server = tokio::spawn(async move {
2178            // Consume the ClientHello, then reply with the typed reject.
2179            let _hello = server_transport.recv_bytes().await.unwrap();
2180            let reject = borsh::to_vec(&ServerReject::unsupported_version()).unwrap();
2181            server_transport.send_bytes(&reject).await.unwrap();
2182        });
2183
2184        let result = run_client_handshake(&client_transport, &expected_vk, None).await;
2185        server.await.unwrap();
2186
2187        let err = result.expect_err("client must surface the reject as an error");
2188        let msg = format!("{err:?}");
2189        assert!(
2190            msg.contains("unsupported protocol version"),
2191            "expected a version-mismatch error, got: {msg}"
2192        );
2193    }
2194
2195    /// **HS-02.** A MITM that answers every `ClientHello` with a fresh cheap
2196    /// `HelloRetryRequest` must NOT loop the client forever — `run_client_handshake`
2197    /// caps the retry rounds and returns an error. (Pre-fix this test would hang.)
2198    #[tokio::test]
2199    async fn client_handshake_caps_retry_rounds() {
2200        use crate::transport::handshake::HelloRetryRequest;
2201
2202        let (client_transport, server_transport) = ChannelTransport::pair();
2203        let (_sk, expected_vk) = crate::crypto::hybrid_sign::HybridSigningKey::generate();
2204
2205        // Malicious server: answer EVERY ClientHello with a fresh, cheap
2206        // HelloRetryRequest (no cookie, no PoW) — never converging.
2207        let server = tokio::spawn(async move {
2208            loop {
2209                if server_transport.recv_bytes().await.is_err() {
2210                    break;
2211                }
2212                let retry = borsh::to_vec(&HelloRetryRequest {
2213                    challenge: None,
2214                    cookie: None,
2215                })
2216                .expect("encode retry");
2217                if server_transport.send_bytes(&retry).await.is_err() {
2218                    break;
2219                }
2220            }
2221        });
2222
2223        let result = run_client_handshake(&client_transport, &expected_vk, None).await;
2224        drop(client_transport); // close the channel so the server task ends
2225        let _ = server.await;
2226
2227        assert!(
2228            matches!(result, Err(CoreError::HandshakeError(_))),
2229            "client must error after the retry-round cap, not loop forever; got {result:?}"
2230        );
2231    }
2232
2233    /// **INFOLEAK-1.** `ResumptionHint`'s `Debug` must redact the 0-RTT
2234    /// `resumption_secret` — a mobile/FFI consumer logging it with `{:?}` must
2235    /// not leak the key material.
2236    #[test]
2237    fn resumption_hint_debug_redacts_secret() {
2238        let hint = ResumptionHint {
2239            session_id: vec![0xAB; 32],
2240            resumption_secret: vec![0xCD; 32],
2241        };
2242        let dbg = format!("{hint:?}");
2243        assert!(dbg.contains("REDACTED"), "secret must be redacted: {dbg}");
2244        // No representation of the secret bytes (0xCD) leaks — neither hex nor
2245        // the decimal the derived Debug would have printed for a Vec<u8>.
2246        assert!(
2247            !dbg.contains("205"),
2248            "no decimal secret bytes in Debug: {dbg}"
2249        );
2250        assert!(
2251            !dbg.to_lowercase().contains("cd, cd"),
2252            "no hex secret bytes: {dbg}"
2253        );
2254    }
2255
2256    #[tokio::test]
2257    async fn test_phantom_session_instant_connect() {
2258        let session = PhantomSession::connect("example.com:443".to_string());
2259
2260        // Should be in Connecting state immediately
2261        assert_eq!(session.connection_state(), ConnectionState::Connecting);
2262        assert!(!session.is_data_ready());
2263        assert_eq!(session.peer_addr(), "example.com:443");
2264    }
2265
2266    #[tokio::test]
2267    async fn test_phantom_session_send_queue() {
2268        let session = PhantomSession::connect("example.com:443".to_string());
2269
2270        // Send while still connecting — should queue
2271        session.send(vec![1, 2, 3]).await.unwrap();
2272        session.send(vec![4, 5, 6]).await.unwrap();
2273        assert_eq!(session.queued_count().await, 2);
2274
2275        // Simulate handshake completion
2276        session.set_state(ConnectionState::ClassicalReady);
2277        assert!(session.is_data_ready());
2278
2279        // Flush queue
2280        let flushed = session.flush_queue().await.unwrap();
2281        assert_eq!(flushed, 2);
2282        assert_eq!(session.queued_count().await, 0);
2283    }
2284
2285    #[tokio::test]
2286    async fn test_phantom_session_state_progression() {
2287        let session = PhantomSession::connect("example.com:443".to_string());
2288
2289        assert_eq!(session.connection_state(), ConnectionState::Connecting);
2290        assert!(!session.is_data_ready());
2291
2292        session.set_state(ConnectionState::ClassicalReady);
2293        assert!(session.is_data_ready());
2294        assert!(!session.is_pqc_ready());
2295
2296        session.set_state(ConnectionState::PqcUpgrading);
2297        assert!(session.is_data_ready());
2298        assert!(!session.is_pqc_ready());
2299
2300        session.set_state(ConnectionState::PqcReady);
2301        assert!(session.is_data_ready());
2302        assert!(session.is_pqc_ready());
2303
2304        session.set_state(ConnectionState::Connected);
2305        assert!(session.is_data_ready());
2306        assert!(session.is_pqc_ready());
2307    }
2308
2309    #[tokio::test]
2310    async fn test_phantom_session_close() {
2311        let session = PhantomSession::connect("example.com:443".to_string());
2312        session.disconnect().await.unwrap();
2313        assert_eq!(session.connection_state(), ConnectionState::Closed);
2314        assert!(!session.is_data_ready());
2315    }
2316
2317    /// Helper: decrypt an incoming encrypted frame on the test server side.
2318    fn decrypt_incoming(
2319        server_session: &crate::transport::session::Session,
2320        bytes: &[u8],
2321    ) -> Vec<u8> {
2322        let pkt = PhantomPacket::from_wire(bytes).expect("deserialize PhantomPacket");
2323        assert!(
2324            pkt.header.flags.contains(PacketFlags::ENCRYPTED),
2325            "expected ENCRYPTED flag on application data"
2326        );
2327        server_session
2328            .decrypt_packet(&pkt.header, &pkt.payload)
2329            .expect("decrypt application data")
2330    }
2331
2332    /// Helper: build an encrypted reply frame from the test server side.
2333    fn encrypt_outgoing(
2334        server_session: &crate::transport::session::Session,
2335        session_id: SessionId,
2336        stream_id: TransportStreamId,
2337        sequence: u32,
2338        payload: &[u8],
2339    ) -> Vec<u8> {
2340        let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2341        let header =
2342            PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2343                .with_epoch(server_session.current_epoch());
2344        let ct = server_session
2345            .encrypt_packet(&header, payload)
2346            .expect("encrypt reply");
2347        let packet = PhantomPacket::new(header, ct);
2348        packet.to_wire()
2349    }
2350
2351    /// Integration test: Client handshake via ChannelTransport with a
2352    /// simulated server responder.
2353    #[tokio::test]
2354    async fn test_phantom_session_handshake_via_transport() {
2355        let (client_transport, server_transport) = ChannelTransport::pair();
2356        let server_hs = HandshakeServer::new().unwrap();
2357        let server_pinned_key = server_hs.verifying_key().clone();
2358
2359        // Start client session — spawns background handshake (with pinning)
2360        let session = PhantomSession::connect_with_transport(
2361            "test-server:9000",
2362            client_transport,
2363            server_pinned_key,
2364        );
2365
2366        // Queue a message before handshake completes
2367        session.send(b"early-data".to_vec()).await.unwrap();
2368
2369        // Simulate server responder
2370        let server_handle = tokio::spawn(async move {
2371            let client_ip = "127.0.0.1".parse().unwrap();
2372
2373            // 1. Receive the (bare borsh) ClientHello.
2374            let client_hello_bytes = server_transport.recv_bytes().await.unwrap();
2375            let client_hello = borsh::from_slice::<ClientHello>(&client_hello_bytes).unwrap();
2376
2377            // 2. Process — may retry with cookie/PoW.
2378            let server_session = loop {
2379                let response = server_hs.process_client_hello(&client_hello, 0, client_ip);
2380                match response {
2381                    HandshakeResponse::Retry(retry) => {
2382                        let retry_bytes = borsh::to_vec(&retry).unwrap();
2383                        server_transport.send_bytes(&retry_bytes).await.unwrap();
2384                        // Receive retried client hello
2385                        let next_bytes = server_transport.recv_bytes().await.unwrap();
2386                        let next_hello = borsh::from_slice::<ClientHello>(&next_bytes).unwrap();
2387                        let resp2 = server_hs.process_client_hello(&next_hello, 0, client_ip);
2388                        match resp2 {
2389                            HandshakeResponse::Success(server_hello, session, _) => {
2390                                let server_hello_bytes = borsh::to_vec(&server_hello).unwrap();
2391                                server_transport
2392                                    .send_bytes(&server_hello_bytes)
2393                                    .await
2394                                    .unwrap();
2395                                break session;
2396                            }
2397                            _ => panic!("Expected success after retry"),
2398                        }
2399                    }
2400                    HandshakeResponse::Success(server_hello, session, _) => {
2401                        let server_hello_bytes = borsh::to_vec(&server_hello).unwrap();
2402                        server_transport
2403                            .send_bytes(&server_hello_bytes)
2404                            .await
2405                            .unwrap();
2406                        break session;
2407                    }
2408                    HandshakeResponse::Reject(r) => panic!("unexpected reject: {:?}", r),
2409                    HandshakeResponse::Fail(e) => panic!("handshake failed: {:?}", e),
2410                }
2411            };
2412
2413            let session_id = *server_session.id();
2414
2415            // 3. Receive the flushed early data — must be ENCRYPTED.
2416            let early_frame = server_transport.recv_bytes().await.unwrap();
2417            assert!(
2418                !early_frame
2419                    .windows(b"early-data".len())
2420                    .any(|w| w == b"early-data"),
2421                "encrypted frame must not contain plaintext early-data"
2422            );
2423            let early_plain = decrypt_incoming(&server_session, &early_frame);
2424            assert_eq!(early_plain, b"early-data");
2425
2426            // 4. Receive a post-handshake message — must be ENCRYPTED.
2427            let post_frame = server_transport.recv_bytes().await.unwrap();
2428            let post_plain = decrypt_incoming(&server_session, &post_frame);
2429            assert_eq!(post_plain, b"after-handshake");
2430
2431            // 5. Send encrypted reply back.
2432            let reply = encrypt_outgoing(&server_session, session_id, 1, 1, b"server-reply");
2433            server_transport.send_bytes(&reply).await.unwrap();
2434        });
2435
2436        // Wait for handshake to progress
2437        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2438
2439        // Should be connected now
2440        assert_eq!(session.connection_state(), ConnectionState::Connected);
2441
2442        // Send after handshake
2443        session.send(b"after-handshake".to_vec()).await.unwrap();
2444
2445        // Receive server reply — now returns DECRYPTED plaintext payload.
2446        let reply = session.recv().await.unwrap();
2447        assert_eq!(reply, b"server-reply");
2448
2449        server_handle.await.unwrap();
2450        session.disconnect().await.unwrap();
2451    }
2452
2453    /// Reliable delivery: a RELIABLE application send must survive a dropped data frame.
2454    ///
2455    /// The client runs over a `LossyTransport`; once the handshake completes we
2456    /// arm a drop of the next frame (the data frame) and send a reliable
2457    /// payload. The first transmission is lost, so the server only sees the
2458    /// payload because the raw-app stream buffers it and the data pump
2459    /// retransmits the timed-out segment.
2460    #[tokio::test]
2461    async fn reliable_send_survives_a_dropped_data_frame() {
2462        use crate::test_harness::fault_transport::{FaultControl, LossyTransport};
2463
2464        let (client_transport, server_transport) = ChannelTransport::pair();
2465        let faults = FaultControl::new();
2466        let lossy_client = LossyTransport::new(client_transport, faults.clone());
2467
2468        let server_hs = HandshakeServer::new().unwrap();
2469        let server_pinned_key = server_hs.verifying_key().clone();
2470
2471        let session = PhantomSession::connect_with_transport(
2472            "test-server:9000",
2473            lossy_client,
2474            server_pinned_key,
2475        );
2476
2477        let server_handle = tokio::spawn(async move {
2478            let client_ip = "127.0.0.1".parse().unwrap();
2479            let client_hello_bytes = server_transport.recv_bytes().await.unwrap();
2480            let client_hello = borsh::from_slice::<ClientHello>(&client_hello_bytes).unwrap();
2481
2482            // Drive the handshake to completion (may take one cookie/PoW retry).
2483            let server_session = loop {
2484                match server_hs.process_client_hello(&client_hello, 0, client_ip) {
2485                    HandshakeResponse::Retry(retry) => {
2486                        let retry_bytes = borsh::to_vec(&retry).unwrap();
2487                        server_transport.send_bytes(&retry_bytes).await.unwrap();
2488                        let next_bytes = server_transport.recv_bytes().await.unwrap();
2489                        let next_hello = borsh::from_slice::<ClientHello>(&next_bytes).unwrap();
2490                        match server_hs.process_client_hello(&next_hello, 0, client_ip) {
2491                            HandshakeResponse::Success(server_hello, session, _) => {
2492                                let b = borsh::to_vec(&server_hello).unwrap();
2493                                server_transport.send_bytes(&b).await.unwrap();
2494                                break session;
2495                            }
2496                            _ => panic!("expected success after retry"),
2497                        }
2498                    }
2499                    HandshakeResponse::Success(server_hello, session, _) => {
2500                        let b = borsh::to_vec(&server_hello).unwrap();
2501                        server_transport.send_bytes(&b).await.unwrap();
2502                        break session;
2503                    }
2504                    HandshakeResponse::Reject(r) => panic!("unexpected reject: {:?}", r),
2505                    HandshakeResponse::Fail(e) => panic!("handshake failed: {:?}", e),
2506                }
2507            };
2508
2509            // The reliable data frame was dropped on first transmission; it can
2510            // only arrive via retransmission. Time-bounded so a missing
2511            // retransmit fails loudly instead of hanging the test forever.
2512            let data_frame = tokio::time::timeout(
2513                std::time::Duration::from_secs(3),
2514                server_transport.recv_bytes(),
2515            )
2516            .await
2517            .expect(
2518                "reliable payload never arrived within 3s — the dropped data frame was not \
2519                 retransmitted (loss-recovery regression)",
2520            )
2521            .unwrap();
2522            let plain = decrypt_incoming(&server_session, &data_frame);
2523            assert_eq!(plain, b"reliable-payload");
2524        });
2525
2526        // Wait for the handshake to complete.
2527        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2528        assert_eq!(session.connection_state(), ConnectionState::Connected);
2529
2530        // Arm a single drop, then send: the next frame on the wire (the data
2531        // frame) is silently lost.
2532        faults.arm_drop_next(1);
2533        session.send(b"reliable-payload".to_vec()).await.unwrap();
2534
2535        server_handle.await.unwrap();
2536        session.disconnect().await.unwrap();
2537    }
2538
2539    /// A retransmission (RTO expiry) must be reported to congestion control as
2540    /// a loss, driving BBR into FastRecovery — proves the drain → on_packet_lost
2541    /// wiring, not just that the retransmit happens.
2542    #[tokio::test]
2543    async fn drain_reports_a_retransmit_as_loss_to_bbr() {
2544        use crate::transport::bandwidth_estimator::BbrState;
2545
2546        tokio::time::pause();
2547        let sid = fixed_session_id();
2548        let (client, _server) = paired_sessions(sid);
2549
2550        let stream = Arc::new(TransportStream::new(1));
2551        stream.send_reliable(Bytes::from("payload")).await;
2552        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2553        streams.insert(1u32, stream);
2554
2555        let (client_t, _server_t) = ChannelTransport::pair();
2556        let transport = Arc::new(client_t);
2557
2558        // First drain: the initial transmission — not a loss.
2559        drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2560        assert_ne!(client.bbr_state(), BbrState::FastRecovery);
2561
2562        // The RTO expires; the next drain retransmits and must report the loss.
2563        tokio::time::advance(std::time::Duration::from_millis(1100)).await;
2564        drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2565        assert_eq!(
2566            client.bbr_state(),
2567            BbrState::FastRecovery,
2568            "a retransmit must be reported to BBR as a loss"
2569        );
2570    }
2571
2572    /// New data must not be transmitted while inflight already exceeds the
2573    /// congestion window — the drain holds it back until ACKs free the window.
2574    #[tokio::test]
2575    async fn drain_withholds_new_data_when_inflight_exceeds_cwnd() {
2576        let sid = fixed_session_id();
2577        let (client, _server) = paired_sessions(sid);
2578
2579        // Drive inflight far above any plausible initial cwnd, so the window
2580        // has no room for new data.
2581        client.on_packet_sent(100_000_000);
2582        let inflight_before = client.bandwidth_snapshot().inflight_bytes;
2583
2584        let stream = Arc::new(TransportStream::new(1));
2585        stream.send_reliable(Bytes::from("new-data")).await;
2586        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2587        streams.insert(1u32, stream);
2588
2589        let (client_t, _server_t) = ChannelTransport::pair();
2590        let transport = Arc::new(client_t);
2591
2592        drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2593
2594        // No new segment was transmitted — inflight is unchanged (a send would
2595        // have grown it via on_packet_sent).
2596        assert_eq!(
2597            client.bandwidth_snapshot().inflight_bytes,
2598            inflight_before,
2599            "no new data should be sent when inflight >= cwnd"
2600        );
2601    }
2602
2603    // ────────────────────────────────────────────────────────────────────
2604    // V2 wire-routing tests (Phase 4.2 / 2.5 follow-up — data-pump V2)
2605    // ────────────────────────────────────────────────────────────────────
2606
2607    use crate::transport::multiplexer::StreamDemultiplexer;
2608    use crate::transport::session::Session as InnerSession;
2609    use crate::transport::stream::Stream as TransportStream;
2610
2611    /// Build two `InnerSession` instances that share a 32-byte secret —
2612    /// one as the "client" (peer_side=false), one as the "server"
2613    /// (peer_side=true). Mirrors the role split after a real handshake.
2614    fn paired_sessions(session_id: SessionId) -> (Arc<InnerSession>, Arc<InnerSession>) {
2615        let secret = [0x11u8; 32];
2616        let client = Arc::new(InnerSession::new(session_id, &secret, false).unwrap());
2617        let server = Arc::new(InnerSession::new(session_id, &secret, true).unwrap());
2618        (client, server)
2619    }
2620
2621    fn fixed_session_id() -> SessionId {
2622        SessionId::from_bytes([0x88; 32])
2623    }
2624
2625    /// Encrypt a V2 application-data packet from the client side at
2626    /// `stream_id` / `sequence`. The returned bytes are wire-serialised
2627    /// ([`PhantomPacket::to_wire`]) and ready to feed into `handle_packet`.
2628    fn build_app_frame(
2629        client_session: &InnerSession,
2630        session_id: SessionId,
2631        stream_id: TransportStreamId,
2632        sequence: u32,
2633        payload: &[u8],
2634    ) -> Vec<u8> {
2635        let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2636        let header =
2637            PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2638                .with_epoch(client_session.current_epoch());
2639        let ciphertext = client_session
2640            .encrypt_packet(&header, payload)
2641            .expect("encrypt_packet");
2642        let packet = PhantomPacket::new(header, ciphertext);
2643        packet.to_wire()
2644    }
2645
2646    #[tokio::test]
2647    async fn v2_recv_routes_encrypted_app_data_through_recv_channel() {
2648        let session_id = fixed_session_id();
2649        let (client_session, server_session) = paired_sessions(session_id);
2650
2651        // Encrypt a V2 application-data packet on the client side.
2652        let stream_id: TransportStreamId = 1;
2653        let frame = build_app_frame(&client_session, session_id, stream_id, 0, b"hello-v2");
2654
2655        // Receive on the server side: deserialize then drive
2656        // handle_packet, which is the recv-path entry point.
2657        let v2 = PhantomPacket::from_wire(&frame).unwrap();
2658
2659        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2660        let demux = Arc::new(demux);
2661        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2662        let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2663        let undelivered = AtomicU64::new(0);
2664        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2665        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2666            tx: ack_a,
2667            rx: Mutex::new(ack_b),
2668        });
2669
2670        let mut ack_buf = Vec::with_capacity(256);
2671        let mut path_validation_seq: u32 = 0;
2672        let obs = Observability::new(ObservabilityConfig::default());
2673        handle_packet(
2674            v2,
2675            session_id,
2676            &server_session,
2677            &streams,
2678            &demux,
2679            &transport_send,
2680            &transport_send,
2681            &deliver_tx,
2682            &undelivered,
2683            &mut ack_buf,
2684            &mut path_validation_seq,
2685            &obs,
2686            LegType::Tcp,
2687        )
2688        .await;
2689
2690        // The decrypted plaintext must have been handed to the delivery task,
2691        // tagged with its stream id, and counted toward the undelivered backlog.
2692        let (sid, received) = deliver_rx.recv().await.expect("delivery hand-off");
2693        assert_eq!(sid, stream_id as u32);
2694        assert_eq!(&received[..], b"hello-v2");
2695        assert_eq!(
2696            undelivered.load(Ordering::Acquire),
2697            b"hello-v2".len() as u64
2698        );
2699    }
2700
2701    /// Like [`build_app_frame`] but stamps a caller-chosen `path_id` so the
2702    /// receive-side path gate (PATH-001) can be exercised.
2703    fn build_app_frame_on_path(
2704        client_session: &InnerSession,
2705        session_id: SessionId,
2706        stream_id: TransportStreamId,
2707        sequence: u32,
2708        path_id: u8,
2709        payload: &[u8],
2710    ) -> Vec<u8> {
2711        let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2712        let header =
2713            PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2714                .with_epoch(client_session.current_epoch())
2715                .with_path_id(path_id);
2716        let ciphertext = client_session
2717            .encrypt_packet(&header, payload)
2718            .expect("encrypt_packet");
2719        PhantomPacket::new(header, ciphertext).to_wire()
2720    }
2721
2722    #[tokio::test]
2723    async fn app_data_on_non_validated_path_is_dropped() {
2724        // PATH-001 (Invariant 6): application data is delivered only on a
2725        // Validated path. Path 0 is pre-validated; any other path_id must
2726        // complete a PATH_VALIDATION challenge first. A frame on an unvalidated
2727        // path is dropped (even though it decrypts cleanly) and the path is
2728        // registered Unvalidated so a later challenge can promote it.
2729        use crate::transport::path::PathStateKind;
2730        let session_id = fixed_session_id();
2731        let (client_session, server_session) = paired_sessions(session_id);
2732        let stream_id: TransportStreamId = 1;
2733
2734        let bad = build_app_frame_on_path(
2735            &client_session,
2736            session_id,
2737            stream_id,
2738            0,
2739            7, // never validated on the receiver
2740            b"on-bad-path",
2741        );
2742        let bad = PhantomPacket::from_wire(&bad).unwrap();
2743
2744        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2745        let demux = Arc::new(demux);
2746        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2747        let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2748        let undelivered = AtomicU64::new(0);
2749        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2750        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2751            tx: ack_a,
2752            rx: Mutex::new(ack_b),
2753        });
2754        let mut ack_buf = Vec::with_capacity(256);
2755        let mut path_validation_seq: u32 = 0;
2756        let obs = Observability::new(ObservabilityConfig::default());
2757
2758        handle_packet(
2759            bad,
2760            session_id,
2761            &server_session,
2762            &streams,
2763            &demux,
2764            &transport_send,
2765            &transport_send,
2766            &deliver_tx,
2767            &undelivered,
2768            &mut ack_buf,
2769            &mut path_validation_seq,
2770            &obs,
2771            LegType::Tcp,
2772        )
2773        .await;
2774
2775        assert!(
2776            deliver_rx.try_recv().is_err(),
2777            "application data on a non-validated path must be dropped"
2778        );
2779        assert_eq!(
2780            undelivered.load(Ordering::Acquire),
2781            0,
2782            "dropped data must not count toward the backlog"
2783        );
2784        assert_eq!(
2785            server_session.path_state(7),
2786            Some(PathStateKind::Unvalidated),
2787            "the unseen path id must be registered for a later challenge"
2788        );
2789
2790        // Positive control: the SAME stream on the pre-validated path 0 IS
2791        // delivered, proving the gate only blocks non-validated paths.
2792        let good = build_app_frame_on_path(
2793            &client_session,
2794            session_id,
2795            stream_id,
2796            1,
2797            0,
2798            b"on-good-path",
2799        );
2800        let good = PhantomPacket::from_wire(&good).unwrap();
2801        handle_packet(
2802            good,
2803            session_id,
2804            &server_session,
2805            &streams,
2806            &demux,
2807            &transport_send,
2808            &transport_send,
2809            &deliver_tx,
2810            &undelivered,
2811            &mut ack_buf,
2812            &mut path_validation_seq,
2813            &obs,
2814            LegType::Tcp,
2815        )
2816        .await;
2817        let (sid, received) = deliver_rx.recv().await.expect("path-0 delivery");
2818        assert_eq!(sid, stream_id as u32);
2819        assert_eq!(&received[..], b"on-good-path");
2820    }
2821
2822    /// Build an `ENCRYPTED | ACK` frame (H1) from `acker_session` acknowledging
2823    /// `acked_seq` on `stream_id`, with its own header sequence `ack_header_seq`
2824    /// (drawn from the acker's send space, distinct from the acked data
2825    /// sequence). Wire-serialised, ready for `handle_packet`.
2826    fn build_encrypted_ack(
2827        acker_session: &InnerSession,
2828        session_id: SessionId,
2829        stream_id: TransportStreamId,
2830        ack_header_seq: u32,
2831        acked_seq: u32,
2832    ) -> Vec<u8> {
2833        let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::ACK;
2834        let header = PacketHeader::new(
2835            session_id,
2836            stream_id,
2837            ack_header_seq,
2838            PacketFlags::new(flag_bits),
2839        )
2840        .with_epoch(acker_session.current_epoch());
2841        let ct = acker_session
2842            .encrypt_packet(&header, &acked_seq.to_be_bytes())
2843            .expect("encrypt ack");
2844        PhantomPacket::new(header, ct).to_wire()
2845    }
2846
2847    /// Drive a single inbound packet through `handle_packet` against
2848    /// `server_session` with throwaway delivery/transport/observability wiring.
2849    async fn run_recv(
2850        pkt: PhantomPacket,
2851        session_id: SessionId,
2852        server_session: &Arc<InnerSession>,
2853        streams: &Arc<DashMap<u32, Arc<TransportStream>>>,
2854    ) {
2855        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2856        let demux = Arc::new(demux);
2857        let (deliver_tx, _deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2858        let undelivered = AtomicU64::new(0);
2859        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2860        let transport: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2861            tx: ack_a,
2862            rx: Mutex::new(ack_b),
2863        });
2864        let mut ack_buf = Vec::with_capacity(64);
2865        let mut path_validation_seq: u32 = 0;
2866        let obs = Observability::new(ObservabilityConfig::default());
2867        handle_packet(
2868            pkt,
2869            session_id,
2870            server_session,
2871            streams,
2872            &demux,
2873            &transport,
2874            &transport,
2875            &deliver_tx,
2876            &undelivered,
2877            &mut ack_buf,
2878            &mut path_validation_seq,
2879            &obs,
2880            LegType::Tcp,
2881        )
2882        .await;
2883    }
2884
2885    /// Stage a stream with one in-flight reliable segment; returns the stream,
2886    /// the shared streams map, and the segment's sequence number.
2887    async fn staged_pending_segment() -> (
2888        Arc<TransportStream>,
2889        Arc<DashMap<u32, Arc<TransportStream>>>,
2890        u32,
2891    ) {
2892        let stream_id: TransportStreamId = 1;
2893        let stream = Arc::new(TransportStream::new(stream_id));
2894        let seq = stream
2895            .send_reliable(Bytes::from_static(b"reliable-payload"))
2896            .await;
2897        let _ = stream.poll_send(u64::MAX).await.expect("segment in-flight");
2898        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2899        streams.insert(stream_id as u32, stream.clone());
2900        (stream, streams, seq)
2901    }
2902
2903    /// **H1 (Invariant 2).** A forged *unauthenticated* ACK — whether bare
2904    /// (`ACK` flag, empty payload) or carrying a plaintext 4-byte acked-seq —
2905    /// must NOT retire a pending reliable segment. Pre-fix, the ACK branch ran
2906    /// before the AEAD gate and trusted `header.sequence`, so an off-path
2907    /// attacker could silently drop never-acknowledged segments.
2908    #[tokio::test]
2909    async fn forged_plaintext_ack_does_not_retire_pending_segment() {
2910        let session_id = fixed_session_id();
2911        let (_client, server_session) = paired_sessions(session_id);
2912        let (stream, streams, seq) = staged_pending_segment().await;
2913        let stream_id: TransportStreamId = 1;
2914
2915        // Variant 1: bare ACK, no ENCRYPTED, empty payload, guessed sequence.
2916        run_recv(
2917            PhantomPacket::new(
2918                PacketHeader::new(
2919                    session_id,
2920                    stream_id,
2921                    seq,
2922                    PacketFlags::new(PacketFlags::ACK),
2923                ),
2924                Vec::new(),
2925            ),
2926            session_id,
2927            &server_session,
2928            &streams,
2929        )
2930        .await;
2931        // Variant 2: ACK with a plaintext 4-byte acked-seq, no ENCRYPTED.
2932        run_recv(
2933            PhantomPacket::new(
2934                PacketHeader::new(
2935                    session_id,
2936                    stream_id,
2937                    999,
2938                    PacketFlags::new(PacketFlags::ACK),
2939                ),
2940                seq.to_be_bytes().to_vec(),
2941            ),
2942            session_id,
2943            &server_session,
2944            &streams,
2945        )
2946        .await;
2947
2948        assert!(
2949            stream.ack(seq).await.is_some(),
2950            "a forged unauthenticated ACK must not retire the pending reliable segment"
2951        );
2952    }
2953
2954    /// **H1 positive control.** A genuine `ENCRYPTED | ACK` frame from the peer,
2955    /// whose AEAD payload carries the acked data sequence, retires the matching
2956    /// pending segment after AEAD verify. The ACK's own `header.sequence`
2957    /// (`ack_header_seq`) is deliberately different from the acked sequence to
2958    /// prove the handler reads the authenticated payload, not the header.
2959    #[tokio::test]
2960    async fn authenticated_ack_retires_pending_segment() {
2961        let session_id = fixed_session_id();
2962        let (client_session, server_session) = paired_sessions(session_id);
2963        let (stream, streams, seq) = staged_pending_segment().await;
2964        let stream_id: TransportStreamId = 1;
2965
2966        let ack_header_seq = seq.wrapping_add(54_321);
2967        let frame =
2968            build_encrypted_ack(&client_session, session_id, stream_id, ack_header_seq, seq);
2969        let ack_pkt = PhantomPacket::from_wire(&frame).expect("parse ack");
2970        run_recv(ack_pkt, session_id, &server_session, &streams).await;
2971
2972        assert!(
2973            stream.ack(seq).await.is_none(),
2974            "an authenticated ACK must retire the acked pending segment"
2975        );
2976    }
2977
2978    /// **H1 session binding.** A frame whose `header.session_id` does not match
2979    /// the negotiated session must be dropped by the per-frame guard before any
2980    /// state mutation — pre-fix the ACK was processed with no session check.
2981    #[tokio::test]
2982    async fn ack_with_wrong_session_id_is_dropped() {
2983        let session_id = fixed_session_id();
2984        let (_client, server_session) = paired_sessions(session_id);
2985        let (stream, streams, seq) = staged_pending_segment().await;
2986        let stream_id: TransportStreamId = 1;
2987
2988        let wrong_id = SessionId::from_bytes([0x11; 32]);
2989        run_recv(
2990            PhantomPacket::new(
2991                PacketHeader::new(wrong_id, stream_id, seq, PacketFlags::new(PacketFlags::ACK)),
2992                Vec::new(),
2993            ),
2994            session_id,
2995            &server_session,
2996            &streams,
2997        )
2998        .await;
2999
3000        assert!(
3001            stream.ack(seq).await.is_some(),
3002            "an ACK for a different session id must not retire the segment"
3003        );
3004    }
3005
3006    #[tokio::test]
3007    async fn v2_recv_drops_unencrypted_non_empty_post_handshake_payload() {
3008        // Downgrade defense: a V2 application-data packet WITHOUT the
3009        // ENCRYPTED flag but with a non-empty plaintext-looking payload
3010        // must be dropped, mirroring the V1 invariant.
3011        let session_id = fixed_session_id();
3012        let (_, server_session) = paired_sessions(session_id);
3013
3014        let stream_id: TransportStreamId = 2;
3015        let bad_header = PacketHeader::new(
3016            session_id,
3017            stream_id,
3018            0,
3019            PacketFlags::new(PacketFlags::RELIABLE), // no ENCRYPTED
3020        );
3021        let bad_packet = PhantomPacket::new(bad_header, b"leaked-cleartext".to_vec());
3022
3023        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3024        let demux = Arc::new(demux);
3025        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3026        let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3027        let undelivered = AtomicU64::new(0);
3028        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
3029        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3030            tx: ack_a,
3031            rx: Mutex::new(ack_b),
3032        });
3033
3034        let mut ack_buf = Vec::with_capacity(256);
3035        let mut path_validation_seq: u32 = 0;
3036        let obs = Observability::new(ObservabilityConfig::default());
3037        handle_packet(
3038            bad_packet,
3039            session_id,
3040            &server_session,
3041            &streams,
3042            &demux,
3043            &transport_send,
3044            &transport_send,
3045            &deliver_tx,
3046            &undelivered,
3047            &mut ack_buf,
3048            &mut path_validation_seq,
3049            &obs,
3050            LegType::Tcp,
3051        )
3052        .await;
3053
3054        // Nothing should have been handed to the delivery task, and the backlog
3055        // counter must stay at zero (the packet was dropped before hand-off).
3056        assert!(
3057            deliver_rx.try_recv().is_err(),
3058            "unencrypted post-handshake payload must NOT be handed off for delivery"
3059        );
3060        assert_eq!(undelivered.load(Ordering::Acquire), 0);
3061    }
3062
3063    #[tokio::test]
3064    async fn v2_recv_handles_coalesced_bundle_and_routes_each_subpayload() {
3065        use crate::transport::packet_coalescer::{CoalescerConfig, PacketCoalescer};
3066
3067        let session_id = fixed_session_id();
3068        let (client_session, server_session) = paired_sessions(session_id);
3069
3070        // Build a COALESCED bundle of three sub-payloads.
3071        let mut coalescer = PacketCoalescer::new(CoalescerConfig::default());
3072        coalescer.push(b"alpha");
3073        coalescer.push(b"bravo");
3074        coalescer.push(b"charlie");
3075        let bundle = coalescer.flush().expect("bundle");
3076
3077        // Encrypt the bundle and wrap it in a V2 packet with
3078        // ENCRYPTED + COALESCED flags.
3079        let stream_id: TransportStreamId = 3;
3080        let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::COALESCED;
3081        let header = PacketHeader::new(session_id, stream_id, 0, PacketFlags::new(flag_bits))
3082            .with_epoch(client_session.current_epoch());
3083        let ciphertext = client_session
3084            .encrypt_packet(&header, &bundle)
3085            .expect("encrypt bundle");
3086        let v2 = PhantomPacket::new(header, ciphertext);
3087
3088        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3089        let demux = Arc::new(demux);
3090        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3091        let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3092        let undelivered = AtomicU64::new(0);
3093        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
3094        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3095            tx: ack_a,
3096            rx: Mutex::new(ack_b),
3097        });
3098
3099        let mut ack_buf = Vec::with_capacity(256);
3100        let mut path_validation_seq: u32 = 0;
3101        let obs = Observability::new(ObservabilityConfig::default());
3102        handle_packet(
3103            v2,
3104            session_id,
3105            &server_session,
3106            &streams,
3107            &demux,
3108            &transport_send,
3109            &transport_send,
3110            &deliver_tx,
3111            &undelivered,
3112            &mut ack_buf,
3113            &mut path_validation_seq,
3114            &obs,
3115            LegType::Tcp,
3116        )
3117        .await;
3118
3119        // Each sub-payload is handed off IN ORDER through the single FIFO
3120        // delivery channel, every one tagged with the outer stream id, and the
3121        // total counted toward the undelivered backlog.
3122        let (sa, a) = deliver_rx.recv().await.expect("alpha");
3123        let (sb, b) = deliver_rx.recv().await.expect("bravo");
3124        let (sc, c) = deliver_rx.recv().await.expect("charlie");
3125        assert_eq!(
3126            (sa, sb, sc),
3127            (stream_id as u32, stream_id as u32, stream_id as u32)
3128        );
3129        assert_eq!(&a[..], b"alpha");
3130        assert_eq!(&b[..], b"bravo");
3131        assert_eq!(&c[..], b"charlie");
3132        assert_eq!(undelivered.load(Ordering::Acquire), (5 + 5 + 7) as u64);
3133    }
3134
3135    /// Ordering across a COALESCED bundle followed by a normal frame: the single
3136    /// FIFO delivery channel must hand the bundle's `[A, B, C]` AND the later
3137    /// `D` to the consumer in exactly `A, B, C, D` — decoupling delivery from
3138    /// the reader must not reorder application bytes.
3139    #[tokio::test]
3140    async fn delivery_preserves_order_across_coalesced_then_normal_frame() {
3141        use crate::transport::packet_coalescer::{CoalescerConfig, PacketCoalescer};
3142
3143        let session_id = fixed_session_id();
3144        let (client_session, server_session) = paired_sessions(session_id);
3145        let stream_id: TransportStreamId = 1;
3146
3147        // Frame 1: COALESCED [A, B, C] at sequence 0.
3148        let mut coalescer = PacketCoalescer::new(CoalescerConfig::default());
3149        coalescer.push(b"A");
3150        coalescer.push(b"B");
3151        coalescer.push(b"C");
3152        let bundle = coalescer.flush().expect("bundle");
3153        let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::COALESCED;
3154        let h1 = PacketHeader::new(session_id, stream_id, 0, PacketFlags::new(flag_bits))
3155            .with_epoch(client_session.current_epoch());
3156        let ct1 = client_session
3157            .encrypt_packet(&h1, &bundle)
3158            .expect("encrypt bundle");
3159        let coalesced = PhantomPacket::new(h1, ct1);
3160
3161        // Frame 2: a normal RELIABLE "D" at sequence 1.
3162        let d_wire = build_app_frame(&client_session, session_id, stream_id, 1, b"D");
3163        let normal = PhantomPacket::from_wire(&d_wire).unwrap();
3164
3165        let (demux, _ctrl) = StreamDemultiplexer::new(16);
3166        let demux = Arc::new(demux);
3167        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3168        let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3169        let undelivered = AtomicU64::new(0);
3170        let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(8);
3171        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3172            tx: ack_a,
3173            rx: Mutex::new(ack_b),
3174        });
3175        let mut ack_buf = Vec::with_capacity(256);
3176        let mut pv_seq: u32 = 0;
3177        let obs = Observability::new(ObservabilityConfig::default());
3178
3179        for pkt in [coalesced, normal] {
3180            handle_packet(
3181                pkt,
3182                session_id,
3183                &server_session,
3184                &streams,
3185                &demux,
3186                &transport_send,
3187                &transport_send,
3188                &deliver_tx,
3189                &undelivered,
3190                &mut ack_buf,
3191                &mut pv_seq,
3192                &obs,
3193                LegType::Tcp,
3194            )
3195            .await;
3196        }
3197
3198        // Drain the FIFO delivery channel — order must be exactly A, B, C, D.
3199        let mut got: Vec<Bytes> = Vec::new();
3200        while let Ok((_sid, b)) = deliver_rx.try_recv() {
3201            got.push(b);
3202        }
3203        let seen: Vec<&[u8]> = got.iter().map(|b| &b[..]).collect();
3204        assert_eq!(seen, vec![&b"A"[..], b"B", b"C", b"D"]);
3205    }
3206
3207    /// A peer that ignores flow control and floods application data faster than
3208    /// the app drains must NOT grow the receive backlog without bound: once the
3209    /// undelivered backlog crosses the reader's hard cap, the session is torn
3210    /// down (state → `Closed`) instead of buffering unboundedly. The app here
3211    /// never calls `recv()`, so the delivery channel fills and the reader's
3212    /// pre-decrypt cap gate fires.
3213    #[tokio::test]
3214    async fn peer_ignoring_flow_control_trips_delivery_hard_cap_and_closes_session() {
3215        let session_id = fixed_session_id();
3216        let (client_inner, server_inner) = paired_sessions(session_id);
3217        let (client_t, server_t) = ChannelTransport::pair();
3218        let client_t = Arc::new(client_t);
3219
3220        // Full server-side session with a running pump; the app NEVER drains it.
3221        let server = PhantomSession::from_accepted_server_session(
3222            "flooder".to_string(),
3223            server_t,
3224            server_inner,
3225        );
3226
3227        // Drain and discard everything the server sends back (ACKs / control)
3228        // so the server reader never blocks on the back channel — a real
3229        // flooding peer likewise keeps emptying its socket. Without this the
3230        // reader would wedge on its own ACK send and the cap could never trip.
3231        let drain_t = client_t.clone();
3232        let drainer = tokio::spawn(async move { while drain_t.recv_bytes().await.is_ok() {} });
3233
3234        // Malicious client: flood valid RELIABLE app packets with unique
3235        // monotonic sequences (so none are replay-dropped) and never honor a
3236        // WINDOW_UPDATE — i.e. ignore flow control entirely.
3237        let payload = vec![0xABu8; 64 * 1024];
3238        let mut seq: u32 = 0;
3239        let mut torn_down = false;
3240        for _ in 0..4000 {
3241            if server.connection_state() == ConnectionState::Closed {
3242                torn_down = true;
3243                break;
3244            }
3245            let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
3246            let header = PacketHeader::new(session_id, 1, seq, PacketFlags::new(flag_bits))
3247                .with_epoch(client_inner.current_epoch());
3248            let ct = client_inner
3249                .encrypt_packet(&header, &payload)
3250                .expect("encrypt");
3251            // Bound the send so a torn-down (or wedged) transport can't hang the
3252            // test: a closed channel or a stalled reader both mean the flood is
3253            // no longer absorbed — i.e. the session is being torn down.
3254            let wire = PhantomPacket::new(header, ct).to_wire();
3255            match tokio::time::timeout(
3256                std::time::Duration::from_secs(5),
3257                client_t.send_bytes(&wire),
3258            )
3259            .await
3260            {
3261                Ok(Ok(())) => {}
3262                _ => {
3263                    torn_down = true;
3264                    break;
3265                }
3266            }
3267            seq = seq.wrapping_add(1);
3268            tokio::task::yield_now().await;
3269        }
3270        assert!(
3271            torn_down,
3272            "a peer flooding past the delivery hard cap must get its session torn down"
3273        );
3274
3275        // Definitive: the session ends up Closed.
3276        let mut closed = false;
3277        for _ in 0..200 {
3278            if server.connection_state() == ConnectionState::Closed {
3279                closed = true;
3280                break;
3281            }
3282            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
3283        }
3284        drainer.abort();
3285        assert!(
3286            closed,
3287            "session state must be Closed after the hard cap trips"
3288        );
3289    }
3290
3291    /// Phase 4.4 — BBR ACK feedback drives the pacer rate. Build a
3292    /// realistic DeliverySample with known sent_at/acked_at timestamps
3293    /// and packet size; assert that calling `on_packet_acked` causes
3294    /// the pacer to leave its default unlimited state with a finite
3295    /// finite positive rate.
3296    #[tokio::test]
3297    async fn bbr_on_ack_drives_pacer_rate() {
3298        use crate::transport::bandwidth_estimator::DeliverySample;
3299        use std::time::{Duration, Instant};
3300
3301        let session_id = fixed_session_id();
3302        let (client_session, _server_session) = paired_sessions(session_id);
3303
3304        // The default Pacer is `unlimited` — track it before/after.
3305        assert!(!client_session.pacer().is_enabled());
3306
3307        // Simulate sending a 1500-byte packet, then receiving an ACK
3308        // 20 ms later. We feed a few samples in a row so the EMA
3309        // estimator has data to work with.
3310        let now = Instant::now();
3311        for i in 0..16 {
3312            let sent_at = now - Duration::from_millis(20 + i * 5);
3313            let acked_at = now - Duration::from_millis(i * 5);
3314            let sample = DeliverySample {
3315                delivered_bytes: 0,
3316                sent_at,
3317                acked_at,
3318                packet_bytes: 1500,
3319                is_app_limited: false,
3320                ack_delay_us: 100,
3321            };
3322            client_session.on_packet_sent(1500);
3323            let _ = client_session.on_packet_acked(sample);
3324        }
3325
3326        // The pacer should now be set to a real rate (still
3327        // "unlimited" handle, but with a finite stored rate). The
3328        // BandwidthEstimator's `pacing_rate()` is what gets pushed
3329        // into the pacer; assert it is non-zero and finite.
3330        let snap = client_session.bandwidth_snapshot();
3331        assert!(
3332            snap.pacing_rate_bps > 0,
3333            "expected pacing_rate to be non-zero, got {}",
3334            snap.pacing_rate_bps,
3335        );
3336        // The pacer's stored rate must match the estimator's view
3337        // (Session.on_packet_acked mirrors them).
3338        assert_eq!(client_session.pacer().rate(), snap.pacing_rate_bps);
3339    }
3340
3341    /// Phase 4.3 — WINDOW_UPDATE round-trip under the relative-credit model.
3342    /// The receive **delivery** task credits the flow-control window on real
3343    /// app consumption and stages the credit; the **send loop** flushes it as a
3344    /// single encrypted WINDOW_UPDATE via `flush_pending_window_updates`. The
3345    /// sender then ADDS the relative credit to its `peer_send_window` — it does
3346    /// not overwrite it with an absolute value.
3347    #[tokio::test]
3348    async fn flow_control_window_update_round_trip() {
3349        use crate::transport::stream::INITIAL_STREAM_WINDOW;
3350
3351        let session_id = fixed_session_id();
3352        let (client_session, server_session) = paired_sessions(session_id);
3353
3354        let stream_id: TransportStreamId = 9;
3355        let server_streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3356        let server_stream = Arc::new(TransportStream::new(stream_id));
3357        server_streams.insert(stream_id as u32, server_stream.clone());
3358
3359        // Client also has a Stream so we can apply the inbound credit.
3360        let client_stream = Arc::new(TransportStream::new(stream_id));
3361
3362        // Pre-drain the client's peer_send_window so the credit has a real
3363        // effect to assert against.
3364        let drain = INITIAL_STREAM_WINDOW - 1000;
3365        assert!(client_stream.try_consume_send_window(drain));
3366        assert_eq!(client_stream.peer_send_window(), 1000);
3367
3368        // The delivery task credits the window on real consumption: model one
3369        // drain that crosses the half-window threshold and stage the credit
3370        // exactly as `run_data_pump`'s delivery task does.
3371        let consumed = INITIAL_STREAM_WINDOW / 2 + 1;
3372        let credit = server_stream
3373            .record_app_consumed(consumed)
3374            .expect("threshold crossed → credit granted");
3375        server_stream.stage_window_update_credit(credit);
3376
3377        // The send loop flushes the staged credit as a single WINDOW_UPDATE.
3378        let (out_tx, mut out_rx) = mpsc::channel::<Vec<u8>>(4);
3379        let (back_tx, back_rx) = mpsc::channel::<Vec<u8>>(4);
3380        let server_outbound: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3381            tx: out_tx,
3382            rx: Mutex::new(back_rx),
3383        });
3384        let _keep = back_tx;
3385        flush_pending_window_updates(
3386            &server_outbound,
3387            &server_session,
3388            session_id,
3389            &server_streams,
3390        )
3391        .await;
3392
3393        // Exactly one WINDOW_UPDATE was emitted; decrypt it and read the credit.
3394        let frame = tokio::time::timeout(std::time::Duration::from_millis(100), out_rx.recv())
3395            .await
3396            .expect("expected a WINDOW_UPDATE frame")
3397            .expect("channel open");
3398        let pv2 = PhantomPacket::from_wire(&frame).unwrap();
3399        assert!(pv2.header.flags.contains(PacketFlags::WINDOW_UPDATE));
3400        // The control frame's sequence comes from the stream's own send space —
3401        // distinct from any data packet so the AEAD nonce never repeats.
3402        let pt = client_session
3403            .decrypt_packet(&pv2.header, &pv2.payload)
3404            .expect("decrypt WINDOW_UPDATE");
3405        assert_eq!(pt.len(), 4);
3406        let announced = u32::from_be_bytes([pt[0], pt[1], pt[2], pt[3]]);
3407        assert_eq!(
3408            announced, credit,
3409            "WINDOW_UPDATE carries the relative credit (bytes consumed since last update)"
3410        );
3411        // Exactly one frame was emitted — nothing else is queued on the wire.
3412        assert!(
3413            out_rx.try_recv().is_err(),
3414            "exactly one WINDOW_UPDATE must be emitted"
3415        );
3416
3417        // The staged slot is now empty — a second flush emits nothing.
3418        flush_pending_window_updates(
3419            &server_outbound,
3420            &server_session,
3421            session_id,
3422            &server_streams,
3423        )
3424        .await;
3425        assert!(
3426            out_rx.try_recv().is_err(),
3427            "no spurious second WINDOW_UPDATE after the credit was already flushed"
3428        );
3429
3430        // Apply the relative credit on the client side: peer_send_window ADDS it
3431        // to the current 1000 (it does not jump to an absolute value).
3432        client_stream.apply_peer_window_update(announced);
3433        assert_eq!(client_stream.peer_send_window(), 1000 + credit);
3434    }
3435
3436    /// Phase 4.3 — priority scheduler ordering. Two streams enqueue
3437    /// data simultaneously; the higher-priority one must be drained
3438    /// first, all of its data before any of the lower one's.
3439    #[tokio::test]
3440    async fn priority_scheduler_drains_higher_priority_stream_first() {
3441        // Build a real Session (any crypto state — we only inspect
3442        // send order, not ciphertext) and an Arc<Stream> per stream.
3443        let session_id = fixed_session_id();
3444        let (client_session, _server_session) = paired_sessions(session_id);
3445
3446        // Capture every outbound packet by stuffing into a channel-
3447        // backed transport whose tx end we can drain after.
3448        let (tx_a, mut rx_a) = mpsc::channel::<Vec<u8>>(32);
3449        let (tx_b, rx_b) = mpsc::channel::<Vec<u8>>(32);
3450        let transport: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3451            tx: tx_a,
3452            rx: Mutex::new(rx_b),
3453        });
3454        let _keep = tx_b; // keep the recv side alive
3455
3456        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3457
3458        // Stream 11: low priority (1), 3 reliable chunks.
3459        let low = Arc::new(TransportStream::new(11));
3460        low.set_priority(1);
3461        low.send_reliable(Bytes::from_static(b"L0")).await;
3462        low.send_reliable(Bytes::from_static(b"L1")).await;
3463        low.send_reliable(Bytes::from_static(b"L2")).await;
3464        streams.insert(11, low);
3465
3466        // Stream 22: HIGH priority (100), 3 reliable chunks.
3467        let hi = Arc::new(TransportStream::new(22));
3468        hi.set_priority(100);
3469        hi.send_reliable(Bytes::from_static(b"H0")).await;
3470        hi.send_reliable(Bytes::from_static(b"H1")).await;
3471        hi.send_reliable(Bytes::from_static(b"H2")).await;
3472        streams.insert(22, hi);
3473
3474        drain_streams_priority_ordered(&transport, &client_session, session_id, &streams).await;
3475
3476        // Pull all packets off the channel and verify their order:
3477        // the three H* chunks must come before any L* chunk.
3478        let mut order: Vec<&'static str> = Vec::new();
3479        while let Ok(frame) =
3480            tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await
3481        {
3482            let bytes = match frame {
3483                Some(b) => b,
3484                None => break,
3485            };
3486            let v2 = PhantomPacket::from_wire(&bytes).unwrap();
3487            // Decrypt under the SERVER role so the per-direction key
3488            // matches the client-side encrypt.
3489            let plaintext = _server_session
3490                .decrypt_packet(&v2.header, &v2.payload)
3491                .expect("decrypt");
3492            let tag: &'static str = match &plaintext[..] {
3493                b"H0" => "H0",
3494                b"H1" => "H1",
3495                b"H2" => "H2",
3496                b"L0" => "L0",
3497                b"L1" => "L1",
3498                b"L2" => "L2",
3499                other => panic!("unexpected payload {:?}", other),
3500            };
3501            order.push(tag);
3502        }
3503
3504        // All H* before any L*.
3505        let first_low = order
3506            .iter()
3507            .position(|s| s.starts_with('L'))
3508            .unwrap_or(order.len());
3509        let last_high = order.iter().rposition(|s| s.starts_with('H')).unwrap();
3510        assert!(
3511            last_high < first_low,
3512            "strict priority violated: order = {:?}",
3513            order
3514        );
3515    }
3516
3517    #[tokio::test]
3518    async fn v2_recv_echoes_path_validation_challenge_back_as_response() {
3519        // Two paired sessions on different IDs (so neither has a
3520        // pending challenge for the path). The "responder" sees a
3521        // PATH_VALIDATION packet on a new path id and must echo the
3522        // 32-byte payload back via the transport.
3523        let session_id = fixed_session_id();
3524        let (client_session, server_session) = paired_sessions(session_id);
3525
3526        // Build a PATH_VALIDATION packet with ENCRYPTED + path_id=7.
3527        let path_id: u8 = 7;
3528        let payload = [0xDEu8; crate::transport::path::PATH_CHALLENGE_LEN];
3529        let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::PATH_VALIDATION;
3530        let header = PacketHeader::new(session_id, 0, 0, PacketFlags::new(flag_bits))
3531            .with_epoch(client_session.current_epoch())
3532            .with_path_id(path_id);
3533        let ciphertext = client_session
3534            .encrypt_packet(&header, &payload)
3535            .expect("encrypt challenge");
3536        let v2 = PhantomPacket::new(header, ciphertext);
3537
3538        let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3539        let demux = Arc::new(demux);
3540        let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3541        let (deliver_tx, _deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3542        let undelivered = AtomicU64::new(0);
3543        // Server's outbound transport — captures the echo back.
3544        let (echo_tx, mut echo_rx) = mpsc::channel::<Vec<u8>>(4);
3545        let (back_tx, back_rx) = mpsc::channel::<Vec<u8>>(4);
3546        let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3547            tx: echo_tx,
3548            rx: Mutex::new(back_rx),
3549        });
3550        let _back_tx_keepalive = back_tx; // keep the recv side alive
3551
3552        let mut ack_buf = Vec::with_capacity(256);
3553        let mut path_validation_seq: u32 = 100;
3554        let obs = Observability::new(ObservabilityConfig::default());
3555
3556        handle_packet(
3557            v2,
3558            session_id,
3559            &server_session,
3560            &streams,
3561            &demux,
3562            &transport_send,
3563            &transport_send,
3564            &deliver_tx,
3565            &undelivered,
3566            &mut ack_buf,
3567            &mut path_validation_seq,
3568            &obs,
3569            LegType::Tcp,
3570        )
3571        .await;
3572
3573        // Server should have emitted a PATH_VALIDATION response on the
3574        // outbound transport. Pull it out and verify it carries the
3575        // same payload back.
3576        let echo_bytes =
3577            tokio::time::timeout(std::time::Duration::from_millis(200), echo_rx.recv())
3578                .await
3579                .expect("echo should arrive")
3580                .expect("channel open");
3581
3582        // Decrypt the echo on the original (client) side — server-side
3583        // ciphertext authenticates the round-trip.
3584        let echo_v2 = PhantomPacket::from_wire(&echo_bytes).unwrap();
3585        assert!(echo_v2.header.flags.contains(PacketFlags::PATH_VALIDATION));
3586        assert_eq!(echo_v2.header.path_id, path_id);
3587
3588        // Sequence space advanced by exactly one (we sent one echo).
3589        assert_eq!(path_validation_seq, 101);
3590    }
3591
3592    // ────────────────────────────────────────────────────────────────────
3593    // 0-RTT early-data
3594    // ────────────────────────────────────────────────────────────────────
3595
3596    /// Full 0-RTT round-trip over `ChannelTransport`: a priming handshake
3597    /// populates the server cache and yields a resumption hint; a second
3598    /// connect via `connect_with_resumption` carries application early-data
3599    /// sealed inside the resuming ClientHello, which the server decrypts and
3600    /// surfaces. The client learns the verdict via `early_data_accepted()`.
3601    ///
3602    /// The server side runs inline (not a spawned task) so its
3603    /// `ChannelTransport` halves stay alive in scope — dropping them
3604    /// would close the client's data pump and flip the session to
3605    /// `Closed` before the assertions run.
3606    #[tokio::test]
3607    async fn zero_rtt_early_data_full_round_trip() {
3608        // One HandshakeServer shared across both phases so its session
3609        // cache persists between the priming handshake and the resume.
3610        let server_hs = HandshakeServer::new().unwrap();
3611        let server_pinned_key = server_hs.verifying_key().clone();
3612        let client_ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
3613
3614        // ── Step 1: prime — a normal handshake fills the cache ──
3615        let (c1, s1) = ChannelTransport::pair();
3616        let phase1_session =
3617            PhantomSession::connect_with_transport("test:9000", c1, server_pinned_key.clone());
3618
3619        let hello_bytes = s1.recv_bytes().await.unwrap();
3620        let ch = borsh::from_slice::<ClientHello>(&hello_bytes).unwrap();
3621        let retry = match server_hs.process_client_hello(&ch, 0, client_ip) {
3622            HandshakeResponse::Retry(r) => r,
3623            _ => panic!("expected Retry"),
3624        };
3625        s1.send_bytes(&borsh::to_vec(&retry).unwrap())
3626            .await
3627            .unwrap();
3628        let next = s1.recv_bytes().await.unwrap();
3629        let ch2 = borsh::from_slice::<ClientHello>(&next).unwrap();
3630        match server_hs.process_client_hello(&ch2, 0, client_ip) {
3631            HandshakeResponse::Success(sh, _session, _) => {
3632                s1.send_bytes(&borsh::to_vec(&sh).unwrap()).await.unwrap();
3633            }
3634            _ => panic!("expected Success"),
3635        }
3636
3637        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3638        assert_eq!(
3639            phase1_session.connection_state(),
3640            ConnectionState::Connected
3641        );
3642        let hint = phase1_session
3643            .resumption_hint()
3644            .await
3645            .expect("phase 1 produced a resumption hint");
3646        // The Rust-only `connect_with_resumption` takes the raw tuple;
3647        // `resumption_hint()` now yields the UniFFI `ResumptionHint`
3648        // record, so rebuild the tuple from its 32-byte fields.
3649        let hint = (
3650            <[u8; 32]>::try_from(hint.session_id.as_slice()).expect("session_id is 32 bytes"),
3651            <[u8; 32]>::try_from(hint.resumption_secret.as_slice())
3652                .expect("resumption_secret is 32 bytes"),
3653        );
3654
3655        // ── Step 2: resume — the ClientHello carries sealed early-data ──
3656        let early_payload = b"zero-rtt application bytes".to_vec();
3657        let (c2, s2) = ChannelTransport::pair();
3658        let phase2_session = PhantomSession::connect_with_resumption(
3659            "test:9000",
3660            c2,
3661            server_pinned_key.clone(),
3662            hint,
3663            early_payload.clone(),
3664        )
3665        .expect("early_data is within the size cap");
3666
3667        let hello_bytes = s2.recv_bytes().await.unwrap();
3668        let ch3 = borsh::from_slice::<ClientHello>(&hello_bytes).unwrap();
3669        assert!(
3670            ch3.early_data.is_some(),
3671            "phase 2 hello carries sealed 0-RTT early-data"
3672        );
3673        match server_hs.process_client_hello(&ch3, 0, client_ip) {
3674            HandshakeResponse::Success(sh, _session, early_data) => {
3675                // The server decrypted exactly what the client sealed.
3676                assert_eq!(early_data.as_deref(), Some(&early_payload[..]));
3677                assert!(sh.early_data_accepted);
3678                s2.send_bytes(&borsh::to_vec(&sh).unwrap()).await.unwrap();
3679            }
3680            _ => {
3681                panic!("expected Success with accepted early-data — the resumption ticket is fresh")
3682            }
3683        }
3684
3685        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3686        assert_eq!(
3687            phase2_session.connection_state(),
3688            ConnectionState::Connected
3689        );
3690        assert_eq!(
3691            phase2_session.early_data_accepted().await,
3692            Some(true),
3693            "client must see the server accepted its 0-RTT early-data"
3694        );
3695
3696        // Keep the server transports alive until every assertion has
3697        // run — see the doc comment above.
3698        drop((s1, s2));
3699    }
3700
3701    /// `connect_pinned_with_resumption` validates the `ResumptionHint`
3702    /// field lengths *before* opening any socket — a hint whose
3703    /// `session_id` or `resumption_secret` is not exactly 32 bytes is a
3704    /// caller bug and surfaces as `ValidationError`, never a network
3705    /// round-trip.
3706    #[tokio::test]
3707    async fn connect_pinned_with_resumption_rejects_malformed_hint() {
3708        let server_hs = HandshakeServer::new().unwrap();
3709        let pinned = server_hs.verifying_key().to_bytes();
3710
3711        let bad_hint = ResumptionHint {
3712            session_id: vec![0u8; 5], // not 32 bytes
3713            resumption_secret: vec![0u8; 32],
3714        };
3715
3716        let err = connect_pinned_with_resumption(
3717            "127.0.0.1".to_string(),
3718            9,
3719            pinned,
3720            bad_hint,
3721            Vec::new(),
3722        )
3723        .await
3724        .expect_err("a 5-byte session_id must be rejected");
3725
3726        assert!(
3727            matches!(err, CoreError::ValidationError(_)),
3728            "expected ValidationError, got {err:?}"
3729        );
3730    }
3731}