phantom_protocol/api/session.rs
1//! Client-First Transport Session
2//!
3//! `PhantomSession` provides instant connection establishment with
4//! automatic send queuing during handshake. This is the transport-level
5//! API that sits below MLS and above the raw UDP/TCP transport.
6
7use crate::crypto::hybrid_sign::HybridVerifyingKey;
8use crate::errors::CoreError;
9use crate::observability::attrs::{AeadAlgorithm, ReplayReason};
10use crate::observability::{Observability, ObservabilityConfig};
11use crate::runtime::{Runtime, TokioRuntime};
12use crate::transport::handshake::{
13 HandshakeClient, HelloRetryRequest, ServerHello, ServerReject, EARLY_DATA_MAX_LEN,
14};
15use crate::transport::multiplexer::StreamDemultiplexer;
16use crate::transport::packet_coalescer_codec::unwrap_coalesced_packet;
17use crate::transport::path_validation_codec::build_path_validation_packet;
18use crate::transport::session::Session;
19use crate::transport::stream::Stream;
20use crate::transport::types::{
21 LegType, PacketFlags, PacketHeader, PhantomPacket, SessionId, StreamId as TransportStreamId,
22 WIRE_VERSION,
23};
24use bytes::Bytes;
25use dashmap::DashMap;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use tokio::sync::{mpsc, oneshot, Mutex};
29
30/// Generate a fresh 128-bit session identifier from the thread-local CSPRNG.
31///
32/// Replaces the historical `rand::random::<u32>()` (32 bits, insufficient to
33/// avoid birthday collisions at scale and not advertised as cryptographic).
34/// `rand::thread_rng` is seeded from the OS at thread startup and uses a
35/// modern stream cipher (ChaCha) — adequate for non-secret identifiers.
36fn new_session_id() -> String {
37 let bytes: [u8; 16] = rand::random();
38 format!("phantom-{}", hex::encode(bytes))
39}
40
41// ─── Connection State ───────────────────────────────────────────────────────
42
43/// Connection state for `PhantomSession`.
44///
45/// The session is usable from the moment it's created — sends are queued
46/// until the handshake completes.
47#[cfg_attr(feature = "bindings", derive(uniffi::Enum))]
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49#[repr(u8)]
50#[non_exhaustive]
51pub enum ConnectionState {
52 /// Connection initiated, handshake pending
53 Connecting = 0,
54 /// Classical (X25519) channel established — data flows
55 ClassicalReady = 1,
56 /// PQC upgrade in progress
57 PqcUpgrading = 2,
58 /// Full hybrid PQC protection active
59 PqcReady = 3,
60 /// Fully connected and operational
61 Connected = 4,
62 /// Connection failed
63 Failed = 5,
64 /// Gracefully closed
65 Closed = 6,
66}
67
68impl ConnectionState {
69 fn from_u8(v: u8) -> Self {
70 match v {
71 0 => Self::Connecting,
72 1 => Self::ClassicalReady,
73 2 => Self::PqcUpgrading,
74 3 => Self::PqcReady,
75 4 => Self::Connected,
76 5 => Self::Failed,
77 6 => Self::Closed,
78 _ => Self::Failed,
79 }
80 }
81
82 /// Whether data can flow (classical or better).
83 pub fn is_data_ready(&self) -> bool {
84 matches!(
85 self,
86 Self::ClassicalReady | Self::PqcUpgrading | Self::PqcReady | Self::Connected
87 )
88 }
89}
90
91// ─── Resumption Hint ────────────────────────────────────────────────────────
92
93/// 0-RTT resumption material extracted from a completed session.
94///
95/// Produced by [`PhantomSession::resumption_hint`] after a handshake
96/// completes, and fed back into [`connect_pinned_with_resumption`] to
97/// attempt a 0-RTT reconnect to the same server.
98///
99/// Both fields are exactly 32 bytes — this record is the
100/// UniFFI-representable surface for the internal `(session_id,
101/// resumption_secret)` tuple. The fields are `Vec<u8>` because UniFFI
102/// has no fixed-size-array type, so the length is a runtime invariant
103/// checked when the hint is used.
104///
105/// Store the hint alongside the pinned `HybridVerifyingKey` of the
106/// server it was negotiated against: the `resumption_secret` is
107/// server-pinned, and reusing a hint across servers is a configuration
108/// bug.
109#[cfg_attr(feature = "bindings", derive(uniffi::Record))]
110#[derive(Clone)]
111#[non_exhaustive]
112pub struct ResumptionHint {
113 /// The negotiated session id (32 bytes).
114 pub session_id: Vec<u8>,
115 /// The resumption secret (32 bytes) — sensitive; treat like a key.
116 pub resumption_secret: Vec<u8>,
117}
118
119// INFOLEAK-1: hand-written redacting `Debug` (not derived) so a mobile/FFI
120// consumer that logs the hint with `{:?}` cannot leak the 0-RTT `resumption_secret`
121// — the one secret-bearing type that crosses the FFI boundary. Mirrors the
122// REDACTED `Debug` on `HybridSigningKey` / `HybridSecretKey`.
123impl std::fmt::Debug for ResumptionHint {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("ResumptionHint")
126 .field(
127 "session_id",
128 &format_args!("<{} bytes>", self.session_id.len()),
129 )
130 .field("resumption_secret", &"REDACTED")
131 .finish()
132 }
133}
134
135// ─── Transport Abstraction ──────────────────────────────────────────────────
136
137// `SessionTransport` now lives in `crate::transport::session_transport` — a
138// dependency-light module that can compile in a `no_std + alloc` build. It is
139// re-exported here so `crate::api::session::SessionTransport` and the public
140// `phantom_protocol::api::SessionTransport` path stay stable.
141pub use crate::transport::session_transport::{FramePhase, SessionTransport};
142
143/// Transport decorator that records `record_send` / `record_recv` on the
144/// session's [`Observability`] for every frame that crosses the wire — so the
145/// data-plane packet/byte counters reflect a real run without threading the
146/// handle through every send site. Wraps the concrete `SessionTransport` just
147/// before the data pump takes over, so handshake bytes are not counted as
148/// data-plane packets (they have their own handshake metric).
149struct ObservedTransport<T> {
150 inner: T,
151 observability: Arc<Observability>,
152 leg: LegType,
153}
154
155impl<T> ObservedTransport<T> {
156 fn new(inner: T, observability: Arc<Observability>, leg: LegType) -> Self {
157 Self {
158 inner,
159 observability,
160 leg,
161 }
162 }
163}
164
165impl<T: SessionTransport> SessionTransport for ObservedTransport<T> {
166 async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
167 let result = self.inner.send_bytes(data).await;
168 if result.is_ok() {
169 self.observability.record_send(data.len(), self.leg);
170 }
171 result
172 }
173
174 async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
175 let result = self.inner.recv_bytes().await;
176 if let Ok(ref bytes) = result {
177 self.observability.record_recv(bytes.len(), self.leg);
178 }
179 result
180 }
181}
182
183// ─── Session ────────────────────────────────────────────────────────────────
184
185/// Client-first session — instant `connect()`, non-blocking `send()`.
186///
187/// # Design
188///
189/// ```text
190/// let session = PhantomSession::connect("server:443"); // instant!
191/// session.send(data).await; // queued until handshake completes
192/// session.send(data2).await; // also queued
193/// // ... handshake completes in background ...
194/// // queued data auto-flushed, new sends go directly
195/// ```
196///
197/// The session progresses through states:
198/// `Connecting → ClassicalReady → PqcUpgrading → PqcReady → Connected`
199#[cfg_attr(feature = "bindings", derive(uniffi::Object))]
200pub struct PhantomSession {
201 /// Session identifier
202 id: String,
203 /// Target server address
204 peer_addr: String,
205 /// Connection state (atomic for lock-free reads)
206 state: Arc<AtomicU8>,
207 /// Queued messages before connection is ready
208 send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
209 /// Channel to send commands to the background handshake task
210 cmd_tx: mpsc::Sender<SessionCommand>,
211 /// Command receiver — taken by the background task when spawned
212 #[allow(dead_code)]
213 cmd_rx: Mutex<Option<mpsc::Receiver<SessionCommand>>>,
214 /// Received messages channel. Carries `Bytes` (not `Vec<u8>`) so the recv
215 /// path can fan out via cheap refcount clones to both the stream demux
216 /// and the synchronous `recv()` consumer without deep-copying the payload.
217 recv_rx: Mutex<mpsc::Receiver<Bytes>>,
218 /// Multiplexes incoming packets to independent streams
219 demux: Arc<StreamDemultiplexer>,
220 /// Active outgoing streams (ARQ management)
221 streams: Arc<DashMap<u32, Arc<Stream>>>,
222 /// Negotiated session handle, populated by the background task
223 /// once the handshake completes. Exposed via `resumption_hint`
224 /// for Phase 4.1 0-RTT clients. `None` while still handshaking
225 /// or after a failure.
226 inner_session: Arc<Mutex<Option<Arc<Session>>>>,
227 /// 0-RTT verdict. `None` while handshaking, after a failure, or when the
228 /// client sent no early-data on this connect. `Some(true)` — the server
229 /// consumed the early-data; `Some(false)` — the client sent early-data and
230 /// the server rejected it. Exposed via `early_data_accepted()`.
231 early_data_accepted: Arc<Mutex<Option<bool>>>,
232 /// Session observability handle. Server-accepted sessions share the
233 /// `PhantomListener`'s instance (so its `snapshot()` aggregates every
234 /// session it accepted); client sessions get their own. The data pump
235 /// records send/recv, the security drops, and the session lifecycle
236 /// (open/close) against it. A ZST no-op when `telemetry-otel` is off.
237 observability: Arc<Observability>,
238}
239
240/// Commands for the background session task
241pub enum SessionCommand {
242 /// Queue data for sending
243 Send(Vec<u8>),
244 /// Send data on a specific stream reliably
245 SendStreamReliable { stream_id: u32, data: bytes::Bytes },
246 /// Send data on a specific stream unreliably
247 SendStreamUnreliable { stream_id: u32, data: bytes::Bytes },
248 /// Close a specific stream
249 CloseStream { stream_id: u32 },
250 /// Close the session
251 Close,
252}
253
254impl PhantomSession {
255 /// Create a new session and start the background handshake task.
256 ///
257 /// Requires `expected_server_key` for MITM resistance — the client will
258 /// abort the handshake unless the server presents this exact verifying key.
259 /// Callers obtain this key out-of-band (e.g. from `PhantomListener::verifying_key_bytes`).
260 ///
261 /// The handshake runs in the background:
262 /// 1. Exchange hybrid PQC `ClientHello`/`ServerHello`.
263 /// 2. Verify server identity against `expected_server_key`.
264 /// 3. Derive AEAD keys; flush queued sends as encrypted packets.
265 ///
266 /// All network I/O goes through the provided `SessionTransport`. The
267 /// task that drives the handshake + data pump runs on the default
268 /// [`TokioRuntime`]; use
269 /// [`connect_with_transport_with_runtime`](Self::connect_with_transport_with_runtime)
270 /// to substitute a different `Runtime`.
271 pub fn connect_with_transport<T: SessionTransport>(
272 peer_addr: &str,
273 transport: T,
274 expected_server_key: HybridVerifyingKey,
275 ) -> Self {
276 Self::connect_with_transport_with_runtime(
277 peer_addr,
278 transport,
279 expected_server_key,
280 Arc::new(TokioRuntime),
281 )
282 }
283
284 /// Like [`connect_with_transport`](Self::connect_with_transport) but
285 /// runs the background task on the supplied `Runtime`. Intended for
286 /// WASM / embedded / test backends that don't drive `tokio::spawn`.
287 pub fn connect_with_transport_with_runtime<T: SessionTransport>(
288 peer_addr: &str,
289 transport: T,
290 expected_server_key: HybridVerifyingKey,
291 runtime: Arc<dyn Runtime>,
292 ) -> Self {
293 Self::spawn_client(peer_addr, transport, expected_server_key, runtime, None)
294 }
295
296 /// Connect with a **0-RTT resumption attempt**.
297 ///
298 /// `resumption_hint` is the `(session_id, resumption_secret)` tuple
299 /// from a prior session's [`PhantomSession::resumption_hint`].
300 /// `early_data` (≤ [`EARLY_DATA_MAX_LEN`] bytes) is sealed and carried
301 /// inside the resuming ClientHello so it reaches the server on the very
302 /// first flight — saving a round-trip versus 1-RTT.
303 ///
304 /// Acceptance is best-effort: a stale/unknown ticket or an AEAD failure
305 /// leaves [`early_data_accepted`](Self::early_data_accepted) at
306 /// `Some(false)` and the handshake completes as a normal 1-RTT exchange —
307 /// the caller must then send that payload over the normal channel.
308 /// Returns `Err` only when `early_data` exceeds the cap.
309 ///
310 /// Runs on the default [`TokioRuntime`].
311 pub fn connect_with_resumption<T: SessionTransport>(
312 peer_addr: &str,
313 transport: T,
314 expected_server_key: HybridVerifyingKey,
315 resumption_hint: ([u8; 32], [u8; 32]),
316 early_data: Vec<u8>,
317 ) -> Result<Self, CoreError> {
318 // fips bootstrap POST gate. `connect_with_resumption`
319 // returns `Result`, so unlike the infallible `connect_with_transport*`
320 // entry points we can surface the POST failure directly to the
321 // caller (mirrors the `PhantomListener::bind*` and
322 // `connect_pinned*` convention). The same POST is also checked
323 // in `background_task` as a defense-in-depth backstop.
324 #[cfg(feature = "fips")]
325 crate::crypto::self_tests::ensure_post_passed()
326 .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
327
328 if early_data.len() > EARLY_DATA_MAX_LEN {
329 return Err(CoreError::ValidationError(format!(
330 "early_data is {} bytes, exceeds the {}-byte 0-RTT cap",
331 early_data.len(),
332 EARLY_DATA_MAX_LEN
333 )));
334 }
335 let (resume_id, resume_secret) = resumption_hint;
336 Ok(Self::spawn_client(
337 peer_addr,
338 transport,
339 expected_server_key,
340 Arc::new(TokioRuntime),
341 Some((resume_id, resume_secret, early_data)),
342 ))
343 }
344
345 /// Shared constructor body for [`connect_with_transport_with_runtime`]
346 /// and [`connect_with_resumption`]. `resumption_request` is `None`
347 /// for a plain handshake, `Some((id, secret, early_data))` to attempt a
348 /// 0-RTT resumption.
349 fn spawn_client<T: SessionTransport>(
350 peer_addr: &str,
351 transport: T,
352 expected_server_key: HybridVerifyingKey,
353 runtime: Arc<dyn Runtime>,
354 resumption_request: Option<([u8; 32], [u8; 32], Vec<u8>)>,
355 ) -> Self {
356 let (cmd_tx, cmd_rx) = mpsc::channel(256);
357 let (recv_tx, recv_rx) = mpsc::channel(256);
358
359 let state = Arc::new(AtomicU8::new(ConnectionState::Connecting as u8));
360 let send_queue = Arc::new(Mutex::new(Vec::new()));
361 let peer = peer_addr.to_string();
362 let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
363 let demux = Arc::new(demux);
364
365 let streams = Arc::new(DashMap::new());
366 let inner_session: Arc<Mutex<Option<Arc<Session>>>> = Arc::new(Mutex::new(None));
367 let early_data_accepted: Arc<Mutex<Option<bool>>> = Arc::new(Mutex::new(None));
368 // Client sessions have no listener, so they own their observability
369 // instance (its `snapshot()` reflects just this connection).
370 let observability = Observability::new(ObservabilityConfig::default());
371
372 let session = Self {
373 id: new_session_id(),
374 peer_addr: peer.clone(),
375 state: state.clone(),
376 send_queue: send_queue.clone(),
377 cmd_tx: cmd_tx.clone(),
378 cmd_rx: Mutex::new(None), // taken by background task
379 recv_rx: Mutex::new(recv_rx),
380 demux: demux.clone(),
381 streams: streams.clone(),
382 inner_session: inner_session.clone(),
383 early_data_accepted: early_data_accepted.clone(),
384 observability: observability.clone(),
385 };
386
387 // Spawn the background handshake + data pump task on the supplied
388 // runtime. `SpawnHandle` is detached: dropping it leaves the task
389 // running. The session is owned by the caller for its lifetime
390 // and natural shutdown comes via `SessionCommand::Close`.
391 let runtime_for_pump = runtime.clone();
392 let _detached = runtime.spawn(Box::pin(Self::background_task(
393 state,
394 send_queue,
395 cmd_tx,
396 cmd_rx,
397 recv_tx,
398 transport,
399 peer,
400 demux,
401 streams,
402 expected_server_key,
403 runtime_for_pump,
404 inner_session,
405 early_data_accepted,
406 resumption_request,
407 observability,
408 )));
409
410 session
411 }
412
413 /// Install a server-side `Session` (already derived by `HandshakeServer::process_client_hello`)
414 /// and spawn the data pump on the default [`TokioRuntime`]. Used by
415 /// `PhantomListener::accept` after driving the server handshake.
416 ///
417 /// `PhantomListener::accept` itself now uses
418 /// `from_accepted_server_session_with_runtime` so the listener's
419 /// runtime is honored. This wrapper is preserved for callers that
420 /// do not have a runtime handle and want the default `TokioRuntime`.
421 #[allow(dead_code)]
422 pub(crate) fn from_accepted_server_session<T: SessionTransport>(
423 peer_addr: String,
424 transport: T,
425 server_session: Arc<Session>,
426 ) -> Arc<Self> {
427 Self::from_accepted_server_session_with_runtime(
428 peer_addr,
429 transport,
430 server_session,
431 Arc::new(TokioRuntime),
432 Observability::new(ObservabilityConfig::default()),
433 )
434 }
435
436 /// Runtime-aware variant of [`from_accepted_server_session`].
437 pub(crate) fn from_accepted_server_session_with_runtime<T: SessionTransport>(
438 peer_addr: String,
439 transport: T,
440 server_session: Arc<Session>,
441 runtime: Arc<dyn Runtime>,
442 observability: Arc<Observability>,
443 ) -> Arc<Self> {
444 let (cmd_tx, cmd_rx) = mpsc::channel(256);
445 let (recv_tx, recv_rx) = mpsc::channel(256);
446
447 let state = Arc::new(AtomicU8::new(ConnectionState::Connected as u8));
448 let send_queue = Arc::new(Mutex::new(Vec::new()));
449 let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
450 let demux = Arc::new(demux);
451 let streams = Arc::new(DashMap::new());
452
453 let inner_session: Arc<Mutex<Option<Arc<Session>>>> =
454 Arc::new(Mutex::new(Some(server_session.clone())));
455
456 let session = Arc::new(Self {
457 id: new_session_id(),
458 peer_addr: peer_addr.clone(),
459 state: state.clone(),
460 send_queue: send_queue.clone(),
461 cmd_tx,
462 cmd_rx: Mutex::new(None),
463 recv_rx: Mutex::new(recv_rx),
464 demux: demux.clone(),
465 streams: streams.clone(),
466 inner_session,
467 // Server side: 0-RTT early-data is delivered via
468 // `AcceptOutcome`, not this client-facing field.
469 early_data_accepted: Arc::new(Mutex::new(None)),
470 // Shares the listener's instance so its `snapshot()` aggregates
471 // every accepted session.
472 observability: observability.clone(),
473 });
474
475 let session_id = *server_session.id();
476 let runtime_for_pump = runtime.clone();
477 // WIRE-001: the server handshake is complete — raise the receive frame
478 // cap from the tight unauthenticated handshake limit to the steady-state
479 // application limit before the data pump takes over.
480 transport.set_frame_phase(FramePhase::Established);
481 let observed = Arc::new(ObservedTransport::new(
482 transport,
483 observability.clone(),
484 LegType::Tcp,
485 ));
486 let _detached = runtime.spawn(Box::pin(run_data_pump(
487 server_session,
488 session_id,
489 observed,
490 state,
491 send_queue,
492 cmd_rx,
493 recv_tx,
494 demux,
495 streams,
496 runtime_for_pump,
497 observability,
498 LegType::Tcp,
499 )));
500
501 session
502 }
503
504 /// Background task: performs handshake, then pumps data.
505 #[allow(clippy::too_many_arguments)]
506 async fn background_task<T: SessionTransport>(
507 state: Arc<AtomicU8>,
508 send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
509 _cmd_tx: mpsc::Sender<SessionCommand>,
510 cmd_rx: mpsc::Receiver<SessionCommand>,
511 recv_tx: mpsc::Sender<Bytes>,
512 transport: T,
513 peer: String,
514 demux: Arc<StreamDemultiplexer>,
515 streams: Arc<DashMap<u32, Arc<Stream>>>,
516 expected_server_key: HybridVerifyingKey,
517 runtime: Arc<dyn Runtime>,
518 inner_session: Arc<Mutex<Option<Arc<Session>>>>,
519 early_data_accepted: Arc<Mutex<Option<bool>>>,
520 resumption_request: Option<([u8; 32], [u8; 32], Vec<u8>)>,
521 observability: Arc<Observability>,
522 ) {
523 // DEBUG: the peer address is correlatable; keep it off default logs.
524 log::debug!("PhantomSession: starting handshake with {}", peer);
525
526 // fips bootstrap POST gate, mirroring the listener and
527 // `connect_pinned*` paths: the synchronous Rust-only entry
528 // points (`connect_with_transport*` / `connect_with_resumption`)
529 // also need to honor FIPS 140-3 §7.7 before any cryptographic
530 // work. Cached `OnceLock` makes the second+ call an atomic
531 // read; the first call runs the full POST battery.
532 //
533 // On failure we cannot return a `CoreError` (the entry points
534 // are infallible by API contract) — instead we transition the
535 // state machine to `Failed` and bail, matching the existing
536 // handshake-failure shape. The error string lands in the log.
537 #[cfg(feature = "fips")]
538 if let Err(e) = crate::crypto::self_tests::ensure_post_passed() {
539 log::error!(
540 "PhantomSession: FIPS POST self-test failed; refusing to handshake: {:?}",
541 e
542 );
543 state.store(ConnectionState::Failed as u8, Ordering::Relaxed);
544 return;
545 }
546
547 // Retain a copy of any 0-RTT early-data so it can be losslessly
548 // re-sent over the established session if the server rejects it (C3 —
549 // the rejection-retransmission contract). `run_client_handshake`
550 // consumes `resumption_request`, so clone the blob first.
551 let pending_early_data: Option<Vec<u8>> = resumption_request
552 .as_ref()
553 .and_then(|(_, _, ed)| (!ed.is_empty()).then(|| ed.clone()));
554
555 // ── Stage 1 & 2: Hybrid Handshake (optionally 0-RTT resumption) ──
556 // HS-02: bound the whole client handshake by a wall-clock deadline so a
557 // silent or stalling server can't hang the connect indefinitely. The
558 // TIMER is `runtime.sleep` (NOT raw tokio::time) so it stays correct
559 // under WasmRuntime/EmbeddedRuntime; `select!` is just the combinator.
560 const CLIENT_HANDSHAKE_DEADLINE: std::time::Duration = std::time::Duration::from_secs(10);
561 // Scoped so the handshake future's borrow of `transport` ends before
562 // `transport` is moved into the data pump below.
563 let handshake_result = {
564 let handshake_fut =
565 run_client_handshake(&transport, &expected_server_key, resumption_request);
566 let handshake_timeout = runtime.sleep(CLIENT_HANDSHAKE_DEADLINE);
567 tokio::pin!(handshake_fut);
568 tokio::select! {
569 r = &mut handshake_fut => r,
570 _ = handshake_timeout => Err(CoreError::Timeout),
571 }
572 };
573 let (crypto_session, ed_accepted) = match handshake_result {
574 Ok((session, accepted)) => (Arc::new(session), accepted),
575 Err(e) => {
576 log::error!("PhantomSession: handshake failed: {}", e);
577 state.store(ConnectionState::Failed as u8, Ordering::Relaxed);
578 return;
579 }
580 };
581 log::info!("PhantomSession: Handshake complete — hybrid channel ready");
582
583 // Phase 4.1 — publish the negotiated Session + the 0-RTT
584 // verdict via the outer PhantomSession so `resumption_hint()`
585 // and `early_data_accepted()` can reach them after the
586 // background task moves the Arc into the pump.
587 {
588 let mut guard = inner_session.lock().await;
589 *guard = Some(crypto_session.clone());
590 }
591 *early_data_accepted.lock().await = ed_accepted;
592
593 // C3 — 0-RTT rejection retransmission contract. If we sent early-data
594 // and the server rejected it (`Some(false)`), it never reached the
595 // application layer, so re-send it losslessly over the now-established
596 // 1-RTT session. Prepend it to the pre-handshake send queue (drained
597 // first by the pump onto the reliable raw-app stream) so it lands
598 // *ahead* of anything the app queued while connecting — preserving the
599 // order in which the bytes were originally offered. `Some(true)` (the
600 // server consumed it) and `None` (none sent) need no action.
601 if ed_accepted == Some(false) {
602 if let Some(ed) = pending_early_data {
603 send_queue.lock().await.insert(0, ed);
604 log::debug!(
605 "PhantomSession: 0-RTT early-data rejected; re-queued for 1-RTT delivery"
606 );
607 }
608 }
609
610 let session_id = *crypto_session.id();
611 state.store(ConnectionState::Connected as u8, Ordering::Relaxed);
612 log::debug!("PhantomSession: fully connected to {}", peer);
613
614 // Wrap the (post-handshake) transport so every data-plane send/recv is
615 // recorded. The connectionless API rides TCP today (KCP / FakeTLS legs
616 // are not session-wired yet), so the leg label is fixed to TCP.
617 // WIRE-001: the handshake is done — raise the frame cap from the tight
618 // unauthenticated handshake limit to the steady-state application limit.
619 transport.set_frame_phase(FramePhase::Established);
620 let observed = Arc::new(ObservedTransport::new(
621 transport,
622 observability.clone(),
623 LegType::Tcp,
624 ));
625 run_data_pump(
626 crypto_session,
627 session_id,
628 observed,
629 state,
630 send_queue,
631 cmd_rx,
632 recv_tx,
633 demux,
634 streams,
635 runtime,
636 observability,
637 LegType::Tcp,
638 )
639 .await;
640 }
641}
642
643/// Drive the client side of the Phantom handshake to completion.
644///
645/// When `resumption` is `Some((resume_id, resume_secret, early_data))` the
646/// first-flight `ClientHello` carries the resume id and, when `early_data` is
647/// non-empty, a sealed 0-RTT blob folded into `ClientHello.early_data` — so it
648/// reaches the server on the first flight. A cookie/PoW `HelloRetryRequest` is
649/// answered in-loop, reusing the same hello (the early-data blob rides along).
650///
651/// Returns the established `Session` and the 0-RTT verdict (resolved
652/// decision 1):
653/// - `Some(true)` — the client sent early-data and the server consumed it
654/// - `Some(false)` — the client sent early-data and the server rejected it
655/// (stale ticket / oversized / AEAD failure)
656/// - `None` — the client sent no early-data on this connect
657async fn run_client_handshake<T: SessionTransport>(
658 transport: &T,
659 expected_server_key: &HybridVerifyingKey,
660 resumption: Option<([u8; 32], [u8; 32], Vec<u8>)>,
661) -> Result<(Session, Option<bool>), CoreError> {
662 let handshake = HandshakeClient::new()?;
663
664 // Build the first-flight ClientHello. A resumption request folds the
665 // resume id and (optionally) a sealed 0-RTT early-data blob into the
666 // single hello; otherwise it is a plain hello.
667 let mut hello = match &resumption {
668 Some((resume_id, resume_secret, early_data)) => {
669 let ed: Option<&[u8]> = if early_data.is_empty() {
670 None
671 } else {
672 Some(early_data.as_slice())
673 };
674 handshake.create_client_hello_with_resume(*resume_id, resume_secret, ed)
675 }
676 None => handshake.create_client_hello(),
677 };
678
679 // HS-02: cap the number of HelloRetryRequest rounds. The legitimate flow
680 // needs at most one cookie round + one PoW round; a bound of 3 leaves slack
681 // for a benign reorder. Without it, a MITM answering every ClientHello with
682 // a fresh cheap HelloRetryRequest could loop the client forever.
683 const MAX_CLIENT_RETRY_ROUNDS: u32 = 3;
684 let mut retry_rounds: u32 = 0;
685
686 loop {
687 let bytes = borsh::to_vec(&hello).map_err(|e| {
688 CoreError::SerializationError(format!("ClientHello encode failed: {}", e))
689 })?;
690 transport.send_bytes(&bytes).await?;
691 let resp = transport.recv_bytes().await?;
692
693 // The reply is one of three shapes: a `ServerHello` (success), a
694 // `HelloRetryRequest` (cookie/PoW demand), or a `ServerReject` (the
695 // server cannot speak our version). Try the success shape first — a
696 // retry/reject blob is far too small to deserialize as a multi-KiB
697 // ServerHello, so the disambiguation is unambiguous.
698 if let Ok(sh) = borsh::from_slice::<ServerHello>(&resp) {
699 let (session, accepted) =
700 handshake.process_server_hello(&hello, &sh, Some(expected_server_key))?;
701 return Ok((session, accepted));
702 } else if let Ok(reject) = borsh::from_slice::<ServerReject>(&resp) {
703 // The marker guard keeps a same-sized non-reject blob from being
704 // mistaken for a reject. We surface this as a hard error and do
705 // NOT auto-downgrade to `reject.supported_version` — a forced
706 // downgrade on an injected reject would defeat the transcript-bound
707 // version pin (Invariant 7).
708 if reject.has_marker() {
709 return Err(CoreError::HandshakeError(format!(
710 "server rejected the handshake: unsupported protocol version \
711 (client speaks v{}, server speaks v{})",
712 hello.version, reject.supported_version
713 )));
714 }
715 return Err(CoreError::HandshakeError(
716 "invalid ServerHello, Retry, or Reject received".into(),
717 ));
718 } else if let Ok(retry) = borsh::from_slice::<HelloRetryRequest>(&resp) {
719 retry_rounds += 1;
720 if retry_rounds > MAX_CLIENT_RETRY_ROUNDS {
721 return Err(CoreError::HandshakeError(format!(
722 "server demanded more than {MAX_CLIENT_RETRY_ROUNDS} HelloRetryRequest rounds"
723 )));
724 }
725 log::info!("PhantomSession: Received HelloRetryRequest, retrying...");
726 hello.cookie = retry.cookie;
727 if let Some(challenge) = retry.challenge {
728 // H3: cap the accepted difficulty and bound the solver, so an
729 // injected/malicious HelloRetryRequest (e.g. difficulty 255)
730 // surfaces a handshake error instead of pinning a CPU core.
731 log::info!("PhantomSession: Solving PoW challenge...");
732 hello.pow_solution = Some(
733 challenge
734 .solve_capped(crate::crypto::pow::MAX_CLIENT_POW_DIFFICULTY)
735 .map_err(|e| CoreError::HandshakeError(e.to_string()))?,
736 );
737 }
738 continue;
739 } else {
740 return Err(CoreError::HandshakeError(
741 "invalid ServerHello, Retry, or Reject received".into(),
742 ));
743 }
744 }
745}
746
747/// Shared client/server data pump.
748///
749/// After the handshake completes (client side) or after the server `Session` is
750/// derived (server side), this loop:
751/// - drains the queued early-data buffer,
752/// - listens for incoming packets and decrypts them,
753/// - encrypts outgoing application/stream packets,
754/// - sends ACKs for reliable packets.
755// The 11 parameters represent the complete session-identity and I/O surface.
756// Grouping them into a struct would require a generic struct (due to `T:
757// SessionTransport`), add indirection with no safety or clarity gain, and
758// constitute a public-API change. The function is private (`async fn`, no
759// `pub`), so the extra arguments are contained here.
760#[allow(clippy::too_many_arguments)]
761async fn run_data_pump<T: SessionTransport>(
762 crypto_session: Arc<Session>,
763 session_id: SessionId,
764 transport: Arc<T>,
765 state: Arc<AtomicU8>,
766 send_queue: Arc<Mutex<Vec<Vec<u8>>>>,
767 mut cmd_rx: mpsc::Receiver<SessionCommand>,
768 recv_tx: mpsc::Sender<Bytes>,
769 demux: Arc<StreamDemultiplexer>,
770 streams: Arc<DashMap<u32, Arc<Stream>>>,
771 runtime: Arc<dyn Runtime>,
772 observability: Arc<Observability>,
773 leg: LegType,
774) {
775 // Session is now established and active — bump the active-session gauge.
776 // The matching `session_closed` at teardown (below) lets the gauge fall,
777 // so it tracks live sessions instead of growing monotonically.
778 observability.session_opened(leg);
779
780 // ── Raw-app session stream (reserved id 1) ──
781 // The connectionless `send()` / `recv()` surface is multiplexed onto one
782 // reserved stream so it gets the same reliable-delivery machinery as
783 // explicitly-opened streams: `drain_streams_priority_ordered` (re)transmits
784 // its buffered segments on the poll tick / outbound-ready notify, and
785 // inbound ACKs for id 1 clear them via `Stream::ack`. The demultiplexer
786 // hands out ids 2+, so this never collides with a user-opened stream.
787 const RAW_APP_STREAM_ID: u32 = 1;
788 let raw_stream = Arc::new(Stream::new(RAW_APP_STREAM_ID as TransportStreamId));
789 streams.insert(RAW_APP_STREAM_ID, raw_stream.clone());
790
791 // ── Flush queued early-data onto the raw-app stream ──
792 // Routed through the stream (not a one-shot direct send) so queued
793 // pre-handshake data is buffered for retransmit just like post-handshake
794 // sends — a dropped early-data frame is recovered, not lost.
795 {
796 let mut queue = send_queue.lock().await;
797 let count = queue.len();
798 for msg in queue.drain(..) {
799 for chunk in msg.chunks(TRANSPORT_MTU) {
800 raw_stream
801 .send_reliable(Bytes::copy_from_slice(chunk))
802 .await;
803 }
804 }
805 if count > 0 {
806 log::info!(
807 "PhantomSession: queued {} early-data message(s) onto the raw-app stream",
808 count
809 );
810 crypto_session.notify_outbound_ready();
811 }
812 }
813
814 // ── Receive-delivery decoupling ──
815 // The reader task hands decrypted application data to a dedicated delivery
816 // task over an UNBOUNDED channel and never blocks on app delivery, so a slow
817 // `recv()` consumer cannot head-of-line-stall inbound ACK / WINDOW_UPDATE /
818 // control processing. The delivery task does the app-paced `recv_tx.send()`
819 // and credits the flow-control window on *real* consumption; enforced
820 // send-side flow control (`Stream::poll_send`) bounds the in-flight backlog
821 // to ~one window, and `undelivered_bytes` + `RECV_DELIVERY_HARD_CAP` guard
822 // against a peer that ignores flow control.
823 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
824 let undelivered_bytes = Arc::new(AtomicU64::new(0));
825 {
826 let recv_tx_deliver = recv_tx; // move the session recv channel here
827 let demux_deliver = demux.clone();
828 let streams_deliver = streams.clone();
829 let crypto_deliver = crypto_session.clone();
830 let undelivered_deliver = undelivered_bytes.clone();
831 runtime.spawn(Box::pin(async move {
832 while let Some((stream_id, bytes)) = deliver_rx.recv().await {
833 let len = bytes.len() as u64;
834 // Best-effort, non-blocking notification to the (vestigial) demux.
835 demux_deliver.route_data(stream_id, bytes.clone());
836 // Account the item the instant it leaves the UNBOUNDED delivery
837 // queue (which the reader's HARD_CAP guards) — BEFORE the
838 // app-paced `recv_tx.send()` below, which can block for a long
839 // time on a slow consumer. Decrementing (and crediting) only
840 // after a successful send would (a) keep this item counted
841 // against the cap while it sits in the bounded recv pipeline,
842 // inflating `undelivered_bytes`, and (b) leak the count entirely
843 // if the send then fails. The byte is now in the bounded
844 // recv-channel pipeline (capacity-limited, its own backpressure),
845 // so it no longer belongs to the unbounded backlog.
846 undelivered_deliver.fetch_sub(len, Ordering::AcqRel);
847 // Credit the flow-control window: the item has been pulled into
848 // the app-delivery pipeline (matching the inline ACK's "accepted
849 // into my in-memory delivery queue" semantics). The pull rate is
850 // still paced by `recv_tx.send()` completing below, so credit
851 // tracks app consumption (one item of look-ahead) — backpressure
852 // is preserved. Wake the send loop to flush the WINDOW_UPDATE
853 // (emitted there, under rekey ownership, so it is epoch-safe).
854 if let Some(stream) = streams_deliver.get(&stream_id) {
855 if let Some(credit) = stream.record_app_consumed(len as u32) {
856 stream.stage_window_update_credit(credit);
857 crypto_deliver.notify_outbound_ready();
858 }
859 }
860 // Real, app-paced delivery to the session recv channel. A closed
861 // channel means the consumer is gone → session ending; stop. The
862 // item was already removed from the backlog accounting above, so
863 // breaking here leaks nothing.
864 if recv_tx_deliver.send(bytes).await.is_err() {
865 break;
866 }
867 }
868 }));
869 }
870
871 // ── Receive (reader) task: deserialize, decrypt, hand off to delivery ──
872 let transport_recv = transport.clone();
873 let transport_send_ack = transport.clone();
874 let crypto_recv = crypto_session.clone();
875 let demux_recv = demux.clone();
876 let streams_recv = streams.clone();
877 let undelivered_reader = undelivered_bytes.clone();
878 let observability_recv = observability.clone();
879 // Completion signal for the receive task. `SpawnHandle` from the
880 // runtime trait does not expose a `Future` for `.await` directly
881 // (different runtimes provide different join futures), so we wire a
882 // one-shot channel — the recv task sends `()` right before exiting
883 // and the main loop selects on the receiver to detect transport
884 // closure.
885 let (recv_done_tx, mut recv_done_rx) = oneshot::channel::<()>();
886 let transport_for_path = transport.clone();
887 let recv_handle = runtime.spawn(Box::pin(async move {
888 // Reusable buffer for ACK frame serialization. Hoisted out of the
889 // loop (Phase 2.3) so we don't pay a fresh `Vec::new()` allocation
890 // for every ACK we emit on a busy reliable stream. 256 bytes is
891 // comfortably larger than a serialized empty `PhantomPacket` (the
892 // 45-byte header plus a couple of length prefixes), so the underlying
893 // buffer is never reallocated after the first frame.
894 let mut ack_buf: Vec<u8> = Vec::with_capacity(256);
895 // Monotonic sequence space for outbound PATH_VALIDATION packets.
896 // Local to the recv task because that's where
897 // path-validation echoes are emitted in response to incoming
898 // challenges. Wraps via `wrapping_add` — sequence space is the
899 // session's overall stream-0 control space.
900 let mut path_validation_seq: u32 = 0;
901 // Buffering ceiling: the delivery queue is unbounded so the reader
902 // never blocks, but a peer that ignores flow control could flood it.
903 // Compliant senders are bounded by ~one window per stream (enforced
904 // `poll_send`), far below this cap; crossing it means the peer is
905 // misbehaving, so we tear the session down rather than buffer without
906 // limit. 4 MiB tolerates many streams × the 64 KiB window with margin.
907 const RECV_DELIVERY_HARD_CAP: u64 = 4 * 1024 * 1024;
908 loop {
909 // Flow-control / anti-flood gate: if the app-delivery backlog
910 // has blown past the cap, the peer is not honouring the window —
911 // close instead of growing the in-memory queue unboundedly. Cheap
912 // pre-check, before any AEAD work.
913 if undelivered_reader.load(Ordering::Acquire) > RECV_DELIVERY_HARD_CAP {
914 log::warn!(
915 "PhantomSession: receive backlog {} B exceeds cap — peer ignoring flow \
916 control; closing session",
917 undelivered_reader.load(Ordering::Acquire)
918 );
919 break;
920 }
921 let data = match transport_recv.recv_bytes().await {
922 Ok(b) => b,
923 Err(_) => break,
924 };
925
926 // A malformed / unparseable frame (no legitimate peer produces
927 // one) is dropped — never a panic.
928 let packet = match PhantomPacket::from_wire(&data) {
929 Ok(v) => v,
930 Err(_) => continue,
931 };
932 // Pinned wire-version gate: the format is not negotiated, so a
933 // frame carrying any other version byte is dropped.
934 if packet.header.version != WIRE_VERSION {
935 continue;
936 }
937 handle_packet(
938 packet,
939 session_id,
940 &crypto_recv,
941 &streams_recv,
942 &demux_recv,
943 &transport_send_ack,
944 &transport_for_path,
945 &deliver_tx,
946 &undelivered_reader,
947 &mut ack_buf,
948 &mut path_validation_seq,
949 &observability_recv,
950 leg,
951 )
952 .await;
953 }
954 // Reader exiting → drop `deliver_tx` so the delivery task drains any
955 // queued items and then sees the channel closed and exits.
956 drop(deliver_tx);
957 // Signal the main loop that the recv task has exited so it can
958 // also unwind. `send` returns `Err(())` if the receiver was
959 // already dropped — that case is harmless, the main loop has
960 // already shut down.
961 let _ = recv_done_tx.send(());
962 }));
963
964 // MTU for transport packets
965 const TRANSPORT_MTU: usize = 1300;
966 // Phase 2.4: the 10 ms `poll_interval` stays as a retransmit-timer
967 // fallback (streams without an explicit notifier reference still
968 // get swept), but `send_notify.notified()` joins the select! so the
969 // pump wakes immediately when a producer calls
970 // `Session::notify_outbound_ready()`. This drops idle CPU usage to
971 // zero on quiet sessions while keeping the worst-case post-queue
972 // latency at <10 ms even for producers that haven't been wired into
973 // the notifier yet.
974 let mut poll_interval = tokio::time::interval(std::time::Duration::from_millis(10));
975 let send_notify = crypto_session.send_notifier();
976 // Outbound WINDOW_UPDATE control packets are emitted on the send loop — the
977 // single writer that also owns the rekey lock — so the encrypted control
978 // frame is always sealed under a consistent epoch. The delivery task only
979 // stages the relative credit (`Stream::stage_window_update_credit`) and
980 // wakes us; the wire sequence is drawn from the stream's own send-sequence
981 // space inside `flush_pending_window_updates` (no private counter, so it
982 // can never collide with application data on the AEAD nonce).
983
984 loop {
985 tokio::select! {
986 _ = poll_interval.tick() => {
987 flush_pending_window_updates(
988 &transport, &crypto_session, session_id, &streams,
989 )
990 .await;
991 drain_streams_priority_ordered(
992 &transport,
993 &crypto_session,
994 session_id,
995 &streams,
996 )
997 .await;
998 }
999 _ = send_notify.notified() => {
1000 // Same drain logic as the tick arm — fast-wake path. Also flush
1001 // any flow-control credit the delivery task staged.
1002 flush_pending_window_updates(
1003 &transport, &crypto_session, session_id, &streams,
1004 )
1005 .await;
1006 drain_streams_priority_ordered(
1007 &transport,
1008 &crypto_session,
1009 session_id,
1010 &streams,
1011 )
1012 .await;
1013 }
1014 cmd_opt = cmd_rx.recv() => {
1015 match cmd_opt {
1016 Some(SessionCommand::Send(data)) => {
1017 // Route through the raw-app stream so the payload is
1018 // buffered for retransmit until ACKed (drained by
1019 // `drain_streams_priority_ordered`), instead of being
1020 // fired once and forgotten on the wire.
1021 for chunk in data.chunks(TRANSPORT_MTU) {
1022 raw_stream
1023 .send_reliable(Bytes::copy_from_slice(chunk))
1024 .await;
1025 }
1026 crypto_session.notify_outbound_ready();
1027 }
1028 Some(SessionCommand::SendStreamReliable { stream_id, data }) => {
1029 if let Some(stream) = streams.get(&stream_id) {
1030 for chunk in data.chunks(TRANSPORT_MTU) {
1031 stream.send_reliable(Bytes::copy_from_slice(chunk)).await;
1032 }
1033 }
1034 }
1035 Some(SessionCommand::SendStreamUnreliable { stream_id, data }) => {
1036 if let Some(stream) = streams.get(&stream_id) {
1037 for chunk in data.chunks(TRANSPORT_MTU) {
1038 stream.send_unreliable(Bytes::copy_from_slice(chunk)).await;
1039 }
1040 }
1041 }
1042 Some(SessionCommand::CloseStream { stream_id }) => {
1043 if let Some(stream) = streams.get(&stream_id) {
1044 stream.finish().await;
1045 // Same per-stream sequence space as the stream's data
1046 // (and its WINDOW_UPDATEs) so this bare FIN cannot
1047 // collide on the AEAD nonce / replay window.
1048 let seq = stream.next_send_sequence();
1049 let _ = send_app_data(
1050 &transport,
1051 &crypto_session,
1052 session_id,
1053 stream_id as TransportStreamId,
1054 seq,
1055 &[],
1056 PacketFlags::FIN,
1057 ).await;
1058 }
1059 streams.remove(&stream_id);
1060 demux.close_stream(stream_id);
1061 }
1062 Some(SessionCommand::Close) => {
1063 log::info!("PhantomSession: closing");
1064 break;
1065 }
1066 None => {
1067 log::info!("PhantomSession: command channel dropped");
1068 break;
1069 }
1070 }
1071 }
1072 _ = &mut recv_done_rx => {
1073 log::error!("PhantomSession: receive task ended unexpectedly (transport closed)");
1074 break;
1075 }
1076 }
1077 }
1078
1079 // Abort the recv task if it's still running; idempotent on a finished
1080 // handle. Goes through the runtime-agnostic `SpawnHandle::abort`.
1081 recv_handle.abort();
1082 state.store(ConnectionState::Closed as u8, Ordering::Relaxed);
1083 // Session torn down — drop the active-session gauge back down.
1084 observability.session_closed(leg);
1085}
1086
1087/// Emit any flow-control credit the receive **delivery** task staged.
1088///
1089/// The delivery task credits the window on real app consumption and stages the
1090/// relative credit via `Stream::stage_window_update_credit` + a send-loop wake;
1091/// the send loop (this, the single rekey owner) actually encrypts and sends the
1092/// `WINDOW_UPDATE`, so the control frame is always sealed under a consistent
1093/// epoch. The staged credits are snapshotted out of the `DashMap` first so no
1094/// shard lock is held across the `.await` (which would deadlock the delivery /
1095/// reader tasks that also touch `streams`).
1096async fn flush_pending_window_updates<T: SessionTransport>(
1097 transport: &Arc<T>,
1098 crypto_session: &Arc<Session>,
1099 session_id: SessionId,
1100 streams: &Arc<DashMap<u32, Arc<Stream>>>,
1101) {
1102 let pending: Vec<(u32, u32, Arc<Stream>)> = streams
1103 .iter()
1104 .filter_map(|e| {
1105 e.value()
1106 .take_pending_window_update()
1107 .map(|c| (*e.key(), c, e.value().clone()))
1108 })
1109 .collect();
1110 for (stream_id, credit, stream) in pending {
1111 // Draw the control-frame sequence from the SAME per-stream outbound
1112 // space as application data (`Stream::next_send_sequence`) so a
1113 // WINDOW_UPDATE never collides with a data packet on (stream_id,
1114 // sequence) — a collision would reuse an AEAD nonce within the epoch and
1115 // be dropped by the peer's replay window, silently starving flow control.
1116 let seq = stream.next_send_sequence();
1117 if !send_window_update(
1118 transport,
1119 crypto_session,
1120 session_id,
1121 stream_id as TransportStreamId,
1122 seq,
1123 credit,
1124 )
1125 .await
1126 {
1127 // The send failed (transient transport hiccup): re-stage the credit
1128 // so the next send-loop pass — the 10 ms tick at the latest — retries
1129 // it. Dropping it silently would under-credit the peer and could
1130 // eventually stall the sender. Credits accumulate, so a retry simply
1131 // folds back in; a permanently dead transport tears the session down
1132 // via the reader, which ends this loop.
1133 stream.stage_window_update_credit(credit);
1134 }
1135 }
1136}
1137
1138/// Drain every stream with pending data, scheduling them in strict
1139/// priority order (higher `Stream::priority()` wins). Streams of equal
1140/// priority are drained in stream-id order (deterministic so tests
1141/// don't get flaky under DashMap's hash-order shuffle).
1142///
1143/// This is **strict priority**: a stream with priority N never yields
1144/// to a stream with priority < N while it still has data. A future
1145/// weighted-fair scheduler can replace this without changing the
1146/// caller surface. Phase 4.3.
1147async fn drain_streams_priority_ordered<T: SessionTransport>(
1148 transport: &Arc<T>,
1149 crypto_session: &Arc<Session>,
1150 session_id: SessionId,
1151 streams: &Arc<DashMap<u32, Arc<Stream>>>,
1152) {
1153 // Snapshot the stream set so we can sort without holding DashMap
1154 // shard locks across awaits. Each entry is (priority, stream_id,
1155 // stream-Arc) — Arc clones are cheap (refcount bump).
1156 let mut snapshot: Vec<(u32, u32, Arc<Stream>)> = streams
1157 .iter()
1158 .map(|e| (e.value().priority(), *e.key(), e.value().clone()))
1159 .collect();
1160 // Descending priority; ties broken by stream id ascending so the
1161 // order is stable across iterations.
1162 snapshot.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
1163
1164 for (_priority, stream_id, stream) in snapshot {
1165 loop {
1166 // Bytes of new data the congestion window currently permits.
1167 // Recomputed each iteration: every send grows inflight, so the
1168 // budget shrinks and the drain stops once the window is full.
1169 let snap = crypto_session.bandwidth_snapshot();
1170 let budget = snap.cwnd_bytes.saturating_sub(snap.inflight_bytes);
1171 let Some(seg) = stream.poll_send(budget).await else {
1172 break;
1173 };
1174 // A retransmission means the prior send was lost — tell congestion
1175 // control so BBR enters FastRecovery and the pacing rate backs off.
1176 if seg.retransmit {
1177 crypto_session.on_packet_lost(seg.data.len() as u64);
1178 }
1179 let base = if seg.reliable {
1180 PacketFlags::RELIABLE
1181 } else {
1182 PacketFlags::UNRELIABLE
1183 };
1184 if !send_app_data(
1185 transport,
1186 crypto_session,
1187 session_id,
1188 stream_id as TransportStreamId,
1189 seg.seq,
1190 &seg.data,
1191 base,
1192 )
1193 .await
1194 {
1195 log::error!("PhantomSession: priority-ordered drain send failed");
1196 // `poll_send` already stamped `sent_at` on this reliable
1197 // segment, but the bytes never reached the wire. Clear it so the
1198 // next drain re-offers it immediately instead of stalling a full
1199 // RTO before the retransmit pass. Unreliable segments were
1200 // removed by `poll_send` (fire-and-forget) — nothing to reset.
1201 if seg.reliable {
1202 stream.mark_unsent(seg.seq).await;
1203 }
1204 break;
1205 }
1206 }
1207 }
1208}
1209
1210/// Build a `DeliverySample` from a successful Stream ack callback and
1211/// feed it into the session's BBR estimator (Phase 4.4). The BBR loop
1212/// internally re-sets the pacer rate via `Session::on_packet_acked`,
1213/// so the next outbound packet is paced at the freshly-estimated
1214/// bottleneck bandwidth.
1215///
1216/// `ack_delay_us` is the V2 header's `ack_delay` field (microseconds
1217/// the receiver held the ACK before sending) — subtracted from the
1218/// observed RTT to yield the propagation delay. For V1 ACKs there is
1219/// no `ack_delay` field on the wire; pass 0 (the estimator treats
1220/// this as "no peer-side delay reported").
1221fn feed_bbr_on_ack(
1222 crypto_session: &Arc<Session>,
1223 sent_at: tokio::time::Instant,
1224 packet_bytes: u64,
1225 ack_delay_us: u64,
1226) {
1227 let sample = crate::transport::bandwidth_estimator::DeliverySample {
1228 delivered_bytes: 0, // BandwidthEstimator tracks its own counter
1229 sent_at: sent_at.into_std(),
1230 acked_at: std::time::Instant::now(),
1231 packet_bytes,
1232 is_app_limited: false,
1233 ack_delay_us,
1234 };
1235 let _ = crypto_session.on_packet_acked(sample);
1236}
1237
1238/// Wait until the pacer has tokens for `bytes` bytes. No-op when the
1239/// pacer is unlimited (the default until BBR sets a finite rate).
1240async fn pace_send(crypto_session: &Arc<Session>, bytes: u64) {
1241 let pacer = crypto_session.pacer();
1242 if !pacer.is_enabled() {
1243 return;
1244 }
1245 loop {
1246 if pacer.try_consume(bytes) {
1247 return;
1248 }
1249 let wait = pacer.time_until_available(bytes);
1250 if wait.is_zero() {
1251 // Tokens should be available; retry the consume to handle
1252 // a concurrent race with another sender.
1253 continue;
1254 }
1255 // Cap the wait to keep the loop responsive — a stale wait
1256 // estimate from a long-idle pacer is corrected on the next
1257 // iteration.
1258 let cap = std::time::Duration::from_millis(50);
1259 let wait = wait.min(cap);
1260 tokio::time::sleep(wait).await;
1261 }
1262}
1263
1264/// C1: decide whether a rekey is needed before stamping a packet for
1265/// `(stream_id, sequence)` and, if so, perform it. A rekey fires when either the
1266/// direction-wide AEAD high-watermark ([`Session::send_needs_rekey`]) or this
1267/// stream's sequence watermark ([`Session::stream_seq_needs_rekey`]) is crossed
1268/// — the latter prevents a per-stream `u32` sequence from wrapping within one
1269/// epoch and reusing the AEAD nonce `(epoch, stream_id, sequence, path_id)`
1270/// under a fixed key (Invariant 8).
1271///
1272/// Returns the extra flag bits to OR into the header (`PacketFlags::REKEY` on a
1273/// successful rotation, `0` when no rekey was needed), or `None` if a rekey was
1274/// required but failed (epoch saturated at `u8::MAX`) — the caller MUST fail the
1275/// send so the session reconnects rather than reusing a nonce.
1276fn rekey_before_stamp(
1277 crypto_session: &Arc<Session>,
1278 stream_id: TransportStreamId,
1279 sequence: u32,
1280) -> Option<u16> {
1281 if crypto_session.send_needs_rekey()
1282 || crypto_session.stream_seq_needs_rekey(stream_id, sequence)
1283 {
1284 match crypto_session.rekey() {
1285 Ok(_) => Some(PacketFlags::REKEY),
1286 Err(e) => {
1287 log::error!("PhantomSession: mid-session rekey failed: {}", e);
1288 None
1289 }
1290 }
1291 } else {
1292 Some(0)
1293 }
1294}
1295
1296/// V2 send. Builds `PhantomPacket` with `PacketFlags::ENCRYPTED` and
1297/// the negotiated rekey epoch; AEAD nonce derives from the header
1298/// (`Session::encrypt_packet`), so a failed peer decrypt no longer
1299/// desyncs the local counter.
1300async fn send_app_data<T: SessionTransport>(
1301 transport: &Arc<T>,
1302 crypto_session: &Arc<Session>,
1303 session_id: SessionId,
1304 stream_id: TransportStreamId,
1305 sequence: u32,
1306 payload: &[u8],
1307 base_flags: u16,
1308) -> bool {
1309 // Always OR in ENCRYPTED for application data.
1310 let mut flag_bits = base_flags | PacketFlags::ENCRYPTED;
1311 // Mid-session rekey (C1): rotate to a fresh key BEFORE stamping this header
1312 // when either the direction-wide AEAD high-watermark or this stream's
1313 // sequence watermark is crossed, so the header carries the new epoch (+ the
1314 // REKEY flag) and no per-stream sequence can wrap within an epoch and reuse a
1315 // nonce (Invariant 8). The peer follows on the authenticated epoch bump (it
1316 // trial-decrypts under the next key).
1317 match rekey_before_stamp(crypto_session, stream_id, sequence) {
1318 Some(extra) => flag_bits |= extra,
1319 // Epoch saturated (u8::MAX): can't rotate further. Surface as a failed
1320 // send so the caller re-offers; the session reconnects rather than wrap.
1321 None => return false,
1322 }
1323 let header = PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
1324 .with_epoch(crypto_session.current_epoch());
1325 let ciphertext = match crypto_session.encrypt_packet(&header, payload) {
1326 Ok(c) => c,
1327 Err(e) => {
1328 log::error!("PhantomSession: encrypt_packet failed: {}", e);
1329 return false;
1330 }
1331 };
1332 let packet = PhantomPacket::new(header, ciphertext);
1333 let buf = packet.to_wire();
1334 let size = buf.len();
1335 // Pacing is a wire-rate limiter, so it consumes the full on-wire size.
1336 pace_send(crypto_session, size as u64).await;
1337 if let Err(e) = transport.send_bytes(&buf[..size]).await {
1338 log::error!("PhantomSession: transport send failed: {}", e);
1339 return false;
1340 }
1341 // Inflight/cwnd accounting MUST use the same unit the ACK and loss paths
1342 // settle in. `Stream::ack` returns and `on_packet_lost` subtracts the
1343 // segment's *payload* length (`seg.data.len()`), so the send side has to add
1344 // the payload length too — adding the full wire size here leaked ~69 bytes
1345 // (header + length prefixes + AEAD tag) of phantom inflight per packet,
1346 // which silently exhausted the congestion window after a few dozen packets
1347 // and stalled long-lived sessions. (Bandwidth/BDP derive from acked bytes,
1348 // so they stay in the same payload unit.)
1349 crypto_session.on_packet_sent(payload.len() as u64);
1350 true
1351}
1352
1353/// Emit a V2 WINDOW_UPDATE packet announcing `new_window` bytes of
1354/// receive capacity for `stream_id`. Encrypted under the current
1355/// session epoch (Phase 4.3 flow control).
1356async fn send_window_update<T: SessionTransport>(
1357 transport: &Arc<T>,
1358 crypto_session: &Arc<Session>,
1359 session_id: SessionId,
1360 stream_id: TransportStreamId,
1361 sequence: u32,
1362 new_window: u32,
1363) -> bool {
1364 let mut flag_bits = PacketFlags::ENCRYPTED | PacketFlags::WINDOW_UPDATE;
1365 // WINDOW_UPDATE shares the per-stream sequence space with application data,
1366 // so it must obey the same C1 rekey discipline before stamping (Invariant 8).
1367 match rekey_before_stamp(crypto_session, stream_id, sequence) {
1368 Some(extra) => flag_bits |= extra,
1369 None => return false,
1370 }
1371 let header = PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
1372 .with_epoch(crypto_session.current_epoch());
1373 let payload = new_window.to_be_bytes();
1374 let ciphertext = match crypto_session.encrypt_packet(&header, &payload) {
1375 Ok(c) => c,
1376 Err(e) => {
1377 log::error!("PhantomSession: WINDOW_UPDATE encrypt failed: {}", e);
1378 return false;
1379 }
1380 };
1381 let packet = PhantomPacket::new(header, ciphertext);
1382 let buf = packet.to_wire();
1383 if let Err(e) = transport.send_bytes(&buf).await {
1384 log::error!("PhantomSession: WINDOW_UPDATE send failed: {}", e);
1385 return false;
1386 }
1387 true
1388}
1389
1390/// Emit a V2 PATH_VALIDATION packet on `path_id` carrying the given
1391/// 32-byte challenge or response payload. Encrypted under the current
1392/// session epoch.
1393async fn send_path_validation<T: SessionTransport>(
1394 transport: &Arc<T>,
1395 crypto_session: &Arc<Session>,
1396 session_id: SessionId,
1397 path_id: u8,
1398 sequence: u32,
1399 payload: [u8; crate::transport::path::PATH_CHALLENGE_LEN],
1400) -> bool {
1401 // Build the packet skeleton via the codec, then layer ENCRYPTED
1402 // and epoch on top before the actual encrypt.
1403 let mut packet = build_path_validation_packet(session_id, path_id, sequence, payload);
1404 let flag_bits = packet.header.flags.0 | PacketFlags::ENCRYPTED;
1405 packet.header.flags = PacketFlags::new(flag_bits);
1406 packet.header.epoch = crypto_session.current_epoch();
1407 let plaintext = std::mem::take(&mut packet.payload);
1408 let ciphertext = match crypto_session.encrypt_packet(&packet.header, &plaintext) {
1409 Ok(c) => c,
1410 Err(e) => {
1411 log::error!("PhantomSession: PATH_VALIDATION encrypt failed: {}", e);
1412 return false;
1413 }
1414 };
1415 packet.payload = ciphertext;
1416 let buf = packet.to_wire();
1417 if let Err(e) = transport.send_bytes(&buf).await {
1418 log::error!("PhantomSession: PATH_VALIDATION send failed: {}", e);
1419 return false;
1420 }
1421 true
1422}
1423
1424/// Recv-side handler for a packet:
1425/// - session-id guard → drop any frame not stamped with the negotiated
1426/// session id before touching any state (H1).
1427/// - decrypt (REQUIRED on application data — a non-empty unencrypted
1428/// post-handshake packet is a downgrade indicator and is dropped).
1429/// - ACK (now `ENCRYPTED | ACK`, post-decrypt) → read the authenticated
1430/// 4-byte acked sequence from the payload, feed BBR + route to the
1431/// stream / demux. Forged/plaintext ACKs cannot reach this path (H1).
1432/// - PATH_VALIDATION flag → drive the path registry: verify against an
1433/// outstanding challenge if one exists, otherwise echo the payload
1434/// back as a response.
1435/// - WINDOW_UPDATE flag → apply the peer's announced flow-control window.
1436/// - COALESCED flag → split the decrypted bundle into sub-payloads and
1437/// route each through the demux as an independent application chunk.
1438#[allow(clippy::too_many_arguments)]
1439async fn handle_packet<T: SessionTransport>(
1440 packet: PhantomPacket,
1441 session_id: SessionId,
1442 crypto_recv: &Arc<Session>,
1443 streams_recv: &Arc<DashMap<u32, Arc<Stream>>>,
1444 demux_recv: &Arc<StreamDemultiplexer>,
1445 transport_send_ack: &Arc<T>,
1446 transport_for_path: &Arc<T>,
1447 // The reader hands decrypted application data to the delivery task via
1448 // this unbounded channel instead of blocking on `recv_tx`/the demux — so a
1449 // slow `recv()` consumer can never head-of-line-stall inbound ACK/control.
1450 deliver_tx: &mpsc::UnboundedSender<(u32, Bytes)>,
1451 undelivered_bytes: &AtomicU64,
1452 ack_buf: &mut Vec<u8>,
1453 path_validation_seq: &mut u32,
1454 observability: &Observability,
1455 leg: LegType,
1456) {
1457 let stream_id: u32 = packet.header.stream_id.into();
1458 let path_id = packet.header.path_id;
1459
1460 // Bind every inbound frame to the negotiated session (H1). A frame stamped
1461 // with a different session id is dropped before any state mutation, so
1462 // cross-session / off-path control injection (forged ACK/FIN) can never
1463 // reach the stream table, BBR, or the path registry. Application data also
1464 // binds session_id through the AEAD AAD; this guard extends the same
1465 // protection to the non-AEAD header inspection that follows.
1466 if packet.header.session_id != session_id {
1467 return;
1468 }
1469
1470 // Mark path activity even before decrypt (the path id is plaintext
1471 // header bytes; this is just a liveness signal for the sweep).
1472 crypto_recv.mark_path_seen(path_id);
1473
1474 // NOTE: ACK/FIN are NO LONGER processed here, pre-decrypt. They are
1475 // authenticated `ENCRYPTED | ACK` control frames now (H1) and are handled
1476 // *after* the AEAD gate below — see the ACK branch following the decrypt.
1477
1478 // Decrypt if marked. V2 sessions REQUIRE ENCRYPTED on application
1479 // data — a non-empty unencrypted V2 application-data packet is a
1480 // downgrade indicator and is dropped (same posture as V1).
1481 let plaintext: Vec<u8> = if packet.header.flags.contains(PacketFlags::ENCRYPTED) {
1482 // Accept a single authenticated forward rekey step (C1): if this
1483 // packet's epoch is one ahead, the peer rekeyed — trial-decrypt under
1484 // the next key and only commit the ratchet on AEAD success, so a forged
1485 // epoch can't desync us. Same-epoch packets take the ordinary path.
1486 match crypto_recv.decrypt_packet_accepting_rekey(&packet.header, &packet.payload) {
1487 Ok(pt) => pt,
1488 Err(e) => {
1489 // Distinguish the two drop reasons for the security metrics: a
1490 // post-AEAD sliding-window replay reject vs an AEAD-verify
1491 // failure (Invariant 4 — replay is checked after AEAD opens).
1492 // decrypt_packet doesn't surface old-vs-duplicate, so record the
1493 // representative `Duplicate` reason.
1494 if matches!(e, CoreError::ReplayDetected(_)) {
1495 observability.record_replay_rejected(ReplayReason::Duplicate);
1496 } else {
1497 observability.record_aead_failure(leg, AeadAlgorithm::Aes256Gcm);
1498 }
1499 log::warn!("PhantomSession: V2 decrypt failed (dropping packet): {}", e);
1500 return;
1501 }
1502 }
1503 } else if !packet.payload.is_empty() {
1504 // Stripped-flag downgrade defense (Invariant 2): a non-empty unencrypted
1505 // post-handshake application packet is dropped.
1506 observability.record_unencrypted_dropped(leg);
1507 log::warn!(
1508 "PhantomSession: dropping unencrypted V2 post-handshake data packet (downgrade?)"
1509 );
1510 return;
1511 } else {
1512 Vec::new()
1513 };
1514
1515 // Authenticated ACK (H1). ACKs are `ENCRYPTED | ACK` control frames whose
1516 // AEAD payload carries the acked data sequence as 4 big-endian bytes. We act
1517 // on the ACK only *after* AEAD verify, which authenticates the header
1518 // (including `session_id` and `ack_delay`) and the acked-seq payload — so a
1519 // forged or stripped-flag ACK (dropped above by the downgrade defense, or
1520 // failing this length check) can neither retire a pending segment, restore a
1521 // flow-control permit, poison BBR, nor close a stream.
1522 if packet.header.flags.contains(PacketFlags::ACK) {
1523 if plaintext.len() != 4 {
1524 log::warn!(
1525 "PhantomSession: ACK payload length {} (expected 4)",
1526 plaintext.len()
1527 );
1528 return;
1529 }
1530 let acked_seq =
1531 u32::from_be_bytes([plaintext[0], plaintext[1], plaintext[2], plaintext[3]]);
1532 if let Some(stream) = streams_recv.get(&stream_id) {
1533 if let Some((sent_at, bytes)) = stream.ack(acked_seq).await {
1534 feed_bbr_on_ack(crypto_recv, sent_at, bytes, packet.header.ack_delay as u64);
1535 }
1536 }
1537 // Best-effort, non-blocking: the demux/PhantomStream path is vestigial;
1538 // routing the ACK/close notification to it must never block the reader.
1539 demux_recv.route_ack(stream_id, acked_seq);
1540 if packet.header.flags.contains(PacketFlags::FIN) {
1541 demux_recv.route_close(stream_id);
1542 }
1543 return;
1544 }
1545
1546 // WINDOW_UPDATE dispatch (Phase 4.3 flow control). Payload is a
1547 // big-endian u32 carrying relative flow-control credit — the bytes the
1548 // peer's application just consumed, which we ADD to our send window.
1549 if packet.header.flags.contains(PacketFlags::WINDOW_UPDATE) {
1550 if plaintext.len() != 4 {
1551 log::warn!(
1552 "PhantomSession: WINDOW_UPDATE payload length {} (expected 4)",
1553 plaintext.len()
1554 );
1555 return;
1556 }
1557 let credit = u32::from_be_bytes([plaintext[0], plaintext[1], plaintext[2], plaintext[3]]);
1558 if let Some(stream) = streams_recv.get(&stream_id) {
1559 // Relative-credit flow control — add the granted credit, then
1560 // wake the send loop so a window-blocked sender resumes immediately
1561 // instead of waiting a full poll tick.
1562 stream.apply_peer_window_update(credit);
1563 crypto_recv.notify_outbound_ready();
1564 }
1565 return;
1566 }
1567
1568 // PATH_VALIDATION dispatch (Phase 4.2): the codec inspects the *plaintext*
1569 // because the wire packet was sealed by the AEAD layer.
1570 if packet.header.flags.contains(PacketFlags::PATH_VALIDATION) {
1571 if plaintext.len() != crate::transport::path::PATH_CHALLENGE_LEN {
1572 log::warn!(
1573 "PhantomSession: PATH_VALIDATION plaintext length {} (expected {})",
1574 plaintext.len(),
1575 crate::transport::path::PATH_CHALLENGE_LEN
1576 );
1577 return;
1578 }
1579 let mut payload_buf = [0u8; crate::transport::path::PATH_CHALLENGE_LEN];
1580 payload_buf.copy_from_slice(&plaintext);
1581 // If we have an in-flight challenge on this path, try to
1582 // verify against it. If verification succeeds, the path
1583 // transitions to Validated and we're done. If it fails, the
1584 // registry already transitioned to Failed — also done.
1585 match crypto_recv.path_state(path_id) {
1586 Some(crate::transport::path::PathStateKind::Validating) => {
1587 let _ = crypto_recv.complete_path_validation(path_id, &payload_buf);
1588 return;
1589 }
1590 Some(crate::transport::path::PathStateKind::Validated)
1591 | Some(crate::transport::path::PathStateKind::Failed) => {
1592 // Terminal state — ignore.
1593 return;
1594 }
1595 _ => {
1596 // Unknown or Unvalidated: treat this packet as an
1597 // incoming challenge and echo the payload back as our
1598 // response. The remote will then verify it against its
1599 // own pending challenge.
1600 let seq = *path_validation_seq;
1601 *path_validation_seq = path_validation_seq.wrapping_add(1);
1602 let _ = send_path_validation(
1603 transport_for_path,
1604 crypto_recv,
1605 session_id,
1606 path_id,
1607 seq,
1608 payload_buf,
1609 )
1610 .await;
1611 return;
1612 }
1613 }
1614 }
1615
1616 // PATH-001 (Invariant 6): application data is delivered only on a Validated
1617 // path. Path 0 is pre-validated at session establishment; any other path_id
1618 // must complete a PATH_VALIDATION challenge/response first. The control
1619 // frames (ACK / WINDOW_UPDATE / PATH_VALIDATION) were handled above and
1620 // returned; this gates the app-data delivery branches below. It runs AFTER
1621 // AEAD verify, so it never acts on an attacker-chosen plaintext path_id that
1622 // fails decryption.
1623 if !matches!(
1624 crypto_recv.path_state(path_id),
1625 Some(crate::transport::path::PathStateKind::Validated)
1626 ) {
1627 // Track a first-seen path id so a future challenge/response can promote
1628 // it; drop the data until then.
1629 crypto_recv.register_unvalidated_path(path_id);
1630 log::warn!(
1631 "PhantomSession: dropping application data on non-validated path_id {}",
1632 path_id
1633 );
1634 return;
1635 }
1636
1637 // COALESCED dispatch (Phase 2.5): split the decrypted bundle into
1638 // sub-payloads and hand each, IN ORDER, to the single FIFO delivery task.
1639 // The delivery task drains them in this order, so the bundle's internal
1640 // ordering (and its order relative to later frames) is preserved —
1641 // the reader never blocks on application delivery.
1642 if packet.header.flags.contains(PacketFlags::COALESCED) {
1643 let inner_for_codec = PhantomPacket {
1644 header: packet.header,
1645 payload: plaintext,
1646 extensions: Vec::new(),
1647 };
1648 match unwrap_coalesced_packet(&inner_for_codec) {
1649 Ok(Some(subs)) => {
1650 for sub in subs {
1651 if sub.is_empty() {
1652 continue;
1653 }
1654 // Count toward the backlog only once the item is actually
1655 // enqueued. If the delivery task has exited (consumer gone,
1656 // `deliver_rx` dropped) the send fails and we must not inflate
1657 // `undelivered_bytes` for data that was discarded.
1658 let len = sub.len() as u64;
1659 if deliver_tx.send((stream_id, Bytes::from(sub))).is_ok() {
1660 undelivered_bytes.fetch_add(len, Ordering::AcqRel);
1661 }
1662 }
1663 }
1664 Ok(None) => {
1665 log::warn!("PhantomSession: COALESCED flag set but bundle didn't parse");
1666 }
1667 Err(e) => {
1668 log::warn!("PhantomSession: COALESCED parse error: {}", e);
1669 }
1670 }
1671 // Bundles do not auto-ACK at the outer level — sub-packets are not
1672 // independently sequenced and the outer sequence has already been
1673 // consumed by the replay window.
1674 return;
1675 }
1676
1677 // Reliable application data → emit an authenticated ACK **inline in the
1678 // reader** (H1). The ACK is an `ENCRYPTED | ACK` control frame whose AEAD
1679 // payload carries the acked data sequence (4 big-endian bytes); the peer
1680 // acts on it only after AEAD verify, so it cannot be forged off-path. The
1681 // ACK's own `header.sequence` is drawn from this side's per-stream send
1682 // counter — shared with our data/window-update sends on the same stream — so
1683 // `(epoch, stream_id, sequence, path_id)` is unique and never collides with
1684 // our outbound data (the nonce-reuse trap). It obeys the C1 rekey discipline
1685 // and stays prompt even when the app consumer is slow (encrypt is a lock-free
1686 // ArcSwap load). "ACK" means "received, decrypted, replay-passed, accepted."
1687 if packet.header.flags.contains(PacketFlags::RELIABLE) {
1688 let local = streams_recv
1689 .entry(stream_id)
1690 .or_insert_with(|| Arc::new(Stream::new(stream_id as TransportStreamId)))
1691 .clone();
1692 let ack_seq = local.next_send_sequence();
1693 let mut ack_flag_bits = PacketFlags::ENCRYPTED | PacketFlags::ACK;
1694 match rekey_before_stamp(crypto_recv, stream_id as TransportStreamId, ack_seq) {
1695 Some(extra) => ack_flag_bits |= extra,
1696 // Epoch saturated — drop this ACK rather than reuse a nonce; the
1697 // sender retransmits and the session is expected to reconnect.
1698 None => return,
1699 }
1700 let ack_header = PacketHeader::new(
1701 session_id,
1702 stream_id as TransportStreamId,
1703 ack_seq,
1704 PacketFlags::new(ack_flag_bits),
1705 )
1706 .with_epoch(crypto_recv.current_epoch())
1707 .with_path_id(path_id);
1708 let ack_payload = packet.header.sequence.to_be_bytes();
1709 match crypto_recv.encrypt_packet(&ack_header, &ack_payload) {
1710 Ok(ct) => {
1711 let ack_packet = PhantomPacket::new(ack_header, ct);
1712 ack_buf.clear();
1713 ack_buf.extend_from_slice(&ack_packet.to_wire());
1714 let size = ack_buf.len();
1715 let _ = transport_send_ack.send_bytes(&ack_buf[..size]).await;
1716 }
1717 Err(e) => log::error!("PhantomSession: ACK encrypt failed: {}", e),
1718 }
1719 }
1720
1721 // Hand the decrypted application data to the delivery task: unbounded +
1722 // non-blocking, so the reader never stalls on a slow `recv()` consumer. The
1723 // delivery task drains the app-paced `recv_tx.send()` and credits the
1724 // flow-control window. `undelivered_bytes` is the backlog counter the
1725 // reader's HARD_CAP gate watches — counted only on a successful enqueue, so
1726 // a dead delivery task (consumer gone) can't inflate it for discarded data.
1727 if !plaintext.is_empty() {
1728 let len = plaintext.len() as u64;
1729 if deliver_tx.send((stream_id, Bytes::from(plaintext))).is_ok() {
1730 undelivered_bytes.fetch_add(len, Ordering::AcqRel);
1731 }
1732 }
1733
1734 if packet.header.flags.contains(PacketFlags::FIN) {
1735 demux_recv.route_close(stream_id);
1736 }
1737}
1738
1739// Internal-only methods — deliberately NOT on the `#[uniffi::export]` surface.
1740// `set_state` mutates the connection state machine; a foreign caller forcing
1741// `Connected` mid-handshake would make `is_data_ready()` lie and let `send()`
1742// bypass the queue, or `Closed` without tearing down the pump.
1743impl PhantomSession {
1744 /// Transition to a new connection state. Crate-internal: driven by the
1745 /// handshake task and teardown only.
1746 pub(crate) fn set_state(&self, new_state: ConnectionState) {
1747 self.state.store(new_state as u8, Ordering::Relaxed);
1748 }
1749
1750 /// Session observability handle (Rust-only — `Observability` is not a
1751 /// UniFFI type). For a server-accepted session this is the
1752 /// `PhantomListener`'s shared instance; for a client it is the session's
1753 /// own. Read `.snapshot()` for the lock-free metric counters.
1754 pub fn observability(&self) -> Arc<Observability> {
1755 self.observability.clone()
1756 }
1757}
1758
1759#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
1760impl PhantomSession {
1761 /// Create a new session — returns instantly.
1762 ///
1763 /// Handshake is not started until a transport is provided.
1764 /// Use `connect_with_transport()` for full integration.
1765 #[cfg_attr(feature = "bindings", uniffi::constructor)]
1766 pub fn connect(peer_addr: String) -> Arc<Self> {
1767 let (cmd_tx, cmd_rx) = mpsc::channel(256);
1768 let (_recv_tx, recv_rx) = mpsc::channel(256);
1769
1770 let (demux, _ctrl_rx) = StreamDemultiplexer::new(256);
1771 let streams = Arc::new(DashMap::new());
1772 Arc::new(Self {
1773 id: new_session_id(),
1774 peer_addr,
1775 state: Arc::new(AtomicU8::new(ConnectionState::Connecting as u8)),
1776 send_queue: Arc::new(Mutex::new(Vec::new())),
1777 cmd_tx,
1778 cmd_rx: Mutex::new(Some(cmd_rx)),
1779 recv_rx: Mutex::new(recv_rx),
1780 demux: Arc::new(demux),
1781 streams,
1782 inner_session: Arc::new(Mutex::new(None)),
1783 early_data_accepted: Arc::new(Mutex::new(None)),
1784 // Placeholder session (no transport / pump yet); a no-op holder
1785 // until `connect_with_transport` spawns the real pump.
1786 observability: Observability::new(ObservabilityConfig::default()),
1787 })
1788 }
1789
1790 /// Open a new multiplexed stream
1791 pub fn open_stream(&self) -> Arc<crate::api::stream::PhantomStream> {
1792 let handle = self.demux.open_stream(1024);
1793 let stream_id = handle.stream_id;
1794
1795 let transport_stream = Arc::new(Stream::new(stream_id as TransportStreamId));
1796 self.streams.insert(stream_id, transport_stream);
1797
1798 Arc::new(crate::api::stream::PhantomStream::new(
1799 handle,
1800 self.cmd_tx.clone(),
1801 ))
1802 }
1803
1804 /// Send data through the session.
1805 ///
1806 /// - If the session is connected: sends immediately
1807 /// - If still handshaking: queues the data for auto-flush later
1808 pub async fn send(&self, data: Vec<u8>) -> Result<(), CoreError> {
1809 let state = self.connection_state();
1810
1811 if state.is_data_ready() {
1812 // Channel is up — send directly
1813 self.cmd_tx
1814 .send(SessionCommand::Send(data))
1815 .await
1816 .map_err(|_| CoreError::NetworkError("Session closed".into()))?;
1817 } else if state == ConnectionState::Connecting {
1818 // Still handshaking — queue
1819 self.send_queue.lock().await.push(data);
1820 } else {
1821 return Err(CoreError::NetworkError(format!(
1822 "Cannot send in state {:?}",
1823 state
1824 )));
1825 }
1826
1827 Ok(())
1828 }
1829
1830 /// Receive data from the session.
1831 ///
1832 /// Internally the recv pipeline keeps payloads as `Bytes` to avoid the
1833 /// per-packet Vec clone that used to fan out to the stream demux. The
1834 /// FFI surface still hands callers a `Vec<u8>`; if this is the last
1835 /// refcount the Vec is moved out of the underlying buffer, otherwise
1836 /// `Bytes::to_vec` copies.
1837 pub async fn recv(&self) -> Result<Vec<u8>, CoreError> {
1838 let mut rx = self.recv_rx.lock().await;
1839 let bytes = rx
1840 .recv()
1841 .await
1842 .ok_or_else(|| CoreError::NetworkError("Session closed".into()))?;
1843 Ok(bytes.to_vec())
1844 }
1845
1846 /// Get the current connection state (lock-free).
1847 pub fn connection_state(&self) -> ConnectionState {
1848 ConnectionState::from_u8(self.state.load(Ordering::Relaxed))
1849 }
1850
1851 /// Whether the session is ready for data transmission.
1852 pub fn is_data_ready(&self) -> bool {
1853 self.connection_state().is_data_ready()
1854 }
1855
1856 /// Whether the session has full PQC protection.
1857 pub fn is_pqc_ready(&self) -> bool {
1858 matches!(
1859 self.connection_state(),
1860 ConnectionState::PqcReady | ConnectionState::Connected
1861 )
1862 }
1863
1864 /// Flush all queued messages (called when handshake completes).
1865 pub async fn flush_queue(&self) -> Result<u32, CoreError> {
1866 let mut queue = self.send_queue.lock().await;
1867 let count = queue.len() as u32;
1868 for msg in queue.drain(..) {
1869 self.cmd_tx
1870 .send(SessionCommand::Send(msg))
1871 .await
1872 .map_err(|_| CoreError::NetworkError("Session closed during flush".into()))?;
1873 }
1874 Ok(count)
1875 }
1876
1877 /// Number of messages queued (waiting for handshake).
1878 pub async fn queued_count(&self) -> u32 {
1879 self.send_queue.lock().await.len() as u32
1880 }
1881
1882 /// Session identifier.
1883 pub fn id(&self) -> String {
1884 self.id.clone()
1885 }
1886
1887 /// Target peer address.
1888 pub fn peer_addr(&self) -> String {
1889 self.peer_addr.clone()
1890 }
1891
1892 /// The 0-RTT verdict for this session.
1893 ///
1894 /// - `None` — still handshaking, the handshake failed, or the client sent
1895 /// no early-data on this connect.
1896 /// - `Some(true)` — the server consumed the 0-RTT early-data.
1897 /// - `Some(false)` — the client sent early-data and the server rejected it
1898 /// (stale/unknown ticket, oversized blob, or AEAD failure). The caller
1899 /// must re-send that payload over the normal channel.
1900 pub async fn early_data_accepted(&self) -> Option<bool> {
1901 *self.early_data_accepted.lock().await
1902 }
1903
1904 /// Extract a [`ResumptionHint`] for a future 0-RTT reconnect.
1905 ///
1906 /// Returns `Some` after a successful handshake; `None` while still
1907 /// handshaking, after a failure, or before the inner session has
1908 /// been published.
1909 ///
1910 /// Store the hint alongside the pinned `HybridVerifyingKey` of the
1911 /// server it was negotiated against and feed it back to
1912 /// [`connect_pinned_with_resumption`]. Reusing a hint across
1913 /// servers is a configuration bug — the `resumption_secret` is
1914 /// server-pinned.
1915 pub async fn resumption_hint(&self) -> Option<ResumptionHint> {
1916 let guard = self.inner_session.lock().await;
1917 guard
1918 .as_ref()
1919 .and_then(|s| s.resumption_hint())
1920 .map(|(session_id, resumption_secret)| ResumptionHint {
1921 session_id: session_id.to_vec(),
1922 resumption_secret: resumption_secret.to_vec(),
1923 })
1924 }
1925
1926 /// Current rekey epoch of the established session (`None` while still
1927 /// connecting). Rust-only — used by soak / integration tests to confirm
1928 /// that automatic mid-session rekey (C1) advanced the epoch.
1929 pub async fn current_epoch(&self) -> Option<u8> {
1930 self.inner_session
1931 .lock()
1932 .await
1933 .as_ref()
1934 .map(|s| s.current_epoch())
1935 }
1936
1937 /// Override the automatic-rekey send-invocation high-watermark on the
1938 /// established session (default `REKEY_SOFT_LIMIT`). Returns `false` if
1939 /// the session is still connecting. Rust-only — primarily for soak/load
1940 /// harnesses that need to exercise mid-session rekey without sending `2^47`
1941 /// packets.
1942 pub async fn set_rekey_threshold(&self, n: u64) -> bool {
1943 match self.inner_session.lock().await.as_ref() {
1944 Some(s) => {
1945 s.set_rekey_threshold(n);
1946 true
1947 }
1948 None => false,
1949 }
1950 }
1951
1952 /// Send the graceful close frame and shut the session down.
1953 ///
1954 /// Named `disconnect` rather than `close` because UniFFI's Kotlin
1955 /// generator unconditionally adds `AutoCloseable.close()` to every
1956 /// object, and a Rust-side `close` here would conflict with it.
1957 pub async fn disconnect(&self) -> Result<(), CoreError> {
1958 self.set_state(ConnectionState::Closed);
1959 let _ = self.cmd_tx.send(SessionCommand::Close).await;
1960 Ok(())
1961 }
1962}
1963
1964impl PhantomSession {
1965 /// Get the stream demultiplexer (internal use, not exposed to UniFFI)
1966 pub fn demux(&self) -> Arc<StreamDemultiplexer> {
1967 self.demux.clone()
1968 }
1969}
1970
1971impl std::fmt::Debug for PhantomSession {
1972 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1973 f.debug_struct("PhantomSession")
1974 .field("id", &self.id)
1975 .field("peer", &self.peer_addr)
1976 .field("state", &self.connection_state())
1977 .finish()
1978 }
1979}
1980
1981// ─── Pinned-Connect Shim (Phase 7.2 mobile bridge) ──────────────────────────
1982//
1983// `connect_with_transport` itself can't cross the UniFFI boundary directly —
1984// it takes a generic `T: SessionTransport` trait object and a typed
1985// `HybridVerifyingKey`, neither of which is a UniFFI primitive. Mobile
1986// callers (iOS / Android) need a single async entry point that opens a TCP
1987// connection, wraps it in `TcpSessionTransport`, parses the pinned key from
1988// bytes (per security invariant 1 in SECURITY.md), and hands back an
1989// `Arc<PhantomSession>` ready for `send` / `recv`.
1990//
1991// Native-only: `TcpSessionTransport` lives behind `cfg(not(target_arch =
1992// "wasm32"))`, mirroring `crate::api::tcp_transport`. Wasm consumers use
1993// the in-tree `WebSocketLeg` instead.
1994#[cfg(not(target_arch = "wasm32"))]
1995#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
1996pub async fn connect_pinned(
1997 host: String,
1998 port: u16,
1999 pinned_key: Vec<u8>,
2000) -> Result<Arc<PhantomSession>, CoreError> {
2001 // fips bootstrap POST gate (same policy as
2002 // `PhantomListener::bind_inner`). A failure here aborts the
2003 // connect before any socket is opened or key material is
2004 // touched.
2005 #[cfg(feature = "fips")]
2006 crate::crypto::self_tests::ensure_post_passed()
2007 .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
2008
2009 // Decode the server's hybrid verifying key. A malformed blob is a
2010 // crypto-layer problem (wrong length, wrong encoding) rather than a
2011 // network failure — surface it as `CryptoError`.
2012 let expected_server_key = HybridVerifyingKey::from_bytes(&pinned_key)
2013 .map_err(|e| CoreError::CryptoError(format!("invalid pinned key: {}", e)))?;
2014
2015 // Open the TCP stream. The `format!` is shared between the actual
2016 // connect target and the `peer_addr` recorded inside the session
2017 // (`connect_with_transport` takes it as a free-form string).
2018 let addr = format!("{}:{}", host, port);
2019 let stream = tokio::net::TcpStream::connect(&addr)
2020 .await
2021 .map_err(|e| CoreError::NetworkError(format!("connect {}: {}", addr, e)))?;
2022 let transport = crate::api::tcp_transport::TcpSessionTransport::new(stream);
2023
2024 // The handshake is driven by the background task spawned inside
2025 // `connect_with_transport`; the returned `PhantomSession` is usable
2026 // immediately (state `Connecting`, sends auto-queued until ready).
2027 let session = PhantomSession::connect_with_transport(&addr, transport, expected_server_key);
2028 Ok(Arc::new(session))
2029}
2030
2031/// Connect to a pinned server with a **0-RTT resumption attempt** — the
2032/// resumption-aware analogue of [`connect_pinned`].
2033///
2034/// `hint` is a [`ResumptionHint`] from a prior session's
2035/// [`PhantomSession::resumption_hint`]; both of its fields must be
2036/// exactly 32 bytes or the call fails with `ValidationError` before any
2037/// socket is opened. `early_data` (≤ 16 KiB) is sealed into the resuming
2038/// ClientHello so it reaches the server on the very first flight.
2039///
2040/// Acceptance is best-effort: when the server does not consume the early-data
2041/// (stale/unknown ticket or AEAD failure) the handshake completes 1-RTT — the
2042/// caller checks [`PhantomSession::early_data_accepted`] and re-sends over the
2043/// normal channel when it is not `Some(true)`.
2044///
2045/// Native-only, like [`connect_pinned`]: `TcpSessionTransport` lives
2046/// behind `cfg(not(target_arch = "wasm32"))`.
2047#[cfg(not(target_arch = "wasm32"))]
2048#[cfg_attr(feature = "bindings", uniffi::export(async_runtime = "tokio"))]
2049pub async fn connect_pinned_with_resumption(
2050 host: String,
2051 port: u16,
2052 pinned_key: Vec<u8>,
2053 hint: ResumptionHint,
2054 early_data: Vec<u8>,
2055) -> Result<Arc<PhantomSession>, CoreError> {
2056 // fips bootstrap POST gate (same policy as
2057 // `connect_pinned`).
2058 #[cfg(feature = "fips")]
2059 crate::crypto::self_tests::ensure_post_passed()
2060 .map_err(|e| CoreError::FipsSelfTestFailure(format!("{e:?}")))?;
2061
2062 // Server-key pinning stays mandatory (security invariant 1): a
2063 // malformed blob is a crypto-layer problem, surfaced as `CryptoError`.
2064 let expected_server_key = HybridVerifyingKey::from_bytes(&pinned_key)
2065 .map_err(|e| CoreError::CryptoError(format!("invalid pinned key: {}", e)))?;
2066
2067 // `ResumptionHint` fields are `Vec<u8>` (UniFFI has no fixed-size
2068 // array type) — enforce the 32-byte invariant here, before any
2069 // socket is opened, so a caller bug never becomes a network call.
2070 let session_id: [u8; 32] = hint.session_id.as_slice().try_into().map_err(|_| {
2071 CoreError::ValidationError(format!(
2072 "resumption hint session_id must be 32 bytes, got {}",
2073 hint.session_id.len()
2074 ))
2075 })?;
2076 let resumption_secret: [u8; 32] =
2077 hint.resumption_secret.as_slice().try_into().map_err(|_| {
2078 CoreError::ValidationError(format!(
2079 "resumption hint resumption_secret must be 32 bytes, got {}",
2080 hint.resumption_secret.len()
2081 ))
2082 })?;
2083
2084 // APIFFI-03: reject oversized early-data BEFORE opening a socket, so a caller
2085 // bug (or oversized blob) never wastes a TCP connection establishment. The
2086 // inner `connect_with_resumption` enforces the same cap as defense-in-depth.
2087 if early_data.len() > EARLY_DATA_MAX_LEN {
2088 return Err(CoreError::ValidationError(format!(
2089 "early_data is {} bytes, exceeds the {}-byte 0-RTT cap",
2090 early_data.len(),
2091 EARLY_DATA_MAX_LEN
2092 )));
2093 }
2094
2095 let addr = format!("{}:{}", host, port);
2096 let stream = tokio::net::TcpStream::connect(&addr)
2097 .await
2098 .map_err(|e| CoreError::NetworkError(format!("connect {}: {}", addr, e)))?;
2099 let transport = crate::api::tcp_transport::TcpSessionTransport::new(stream);
2100
2101 // Reuses the Rust-only `connect_with_resumption` — no new crypto and
2102 // no new wire format. That path enforces the `EARLY_DATA_MAX_LEN`
2103 // cap and keeps 0-RTT one-shot / best-effort (security invariant 9).
2104 let session = PhantomSession::connect_with_resumption(
2105 &addr,
2106 transport,
2107 expected_server_key,
2108 (session_id, resumption_secret),
2109 early_data,
2110 )?;
2111 Ok(Arc::new(session))
2112}
2113
2114#[cfg(test)]
2115mod tests {
2116 use super::*;
2117 use crate::transport::handshake::{ClientHello, HandshakeResponse, HandshakeServer};
2118
2119 // ── Mock transport for testing ──
2120
2121 /// In-memory transport using channels (simulates a loopback pipe).
2122 struct ChannelTransport {
2123 tx: mpsc::Sender<Vec<u8>>,
2124 rx: Mutex<mpsc::Receiver<Vec<u8>>>,
2125 }
2126
2127 impl ChannelTransport {
2128 /// Create a pair of connected transports (client ↔ server).
2129 fn pair() -> (Self, Self) {
2130 let (a_tx, b_rx) = mpsc::channel(64);
2131 let (b_tx, a_rx) = mpsc::channel(64);
2132 (
2133 Self {
2134 tx: a_tx,
2135 rx: Mutex::new(a_rx),
2136 },
2137 Self {
2138 tx: b_tx,
2139 rx: Mutex::new(b_rx),
2140 },
2141 )
2142 }
2143 }
2144
2145 impl SessionTransport for ChannelTransport {
2146 async fn send_bytes(&self, data: &[u8]) -> Result<(), CoreError> {
2147 self.tx
2148 .send(data.to_vec())
2149 .await
2150 .map_err(|_| CoreError::NetworkError("channel closed".into()))
2151 }
2152
2153 async fn recv_bytes(&self) -> Result<Bytes, CoreError> {
2154 let mut rx = self.rx.lock().await;
2155 let v = rx
2156 .recv()
2157 .await
2158 .ok_or_else(|| CoreError::NetworkError("channel closed".into()))?;
2159 Ok(Bytes::from(v))
2160 }
2161 }
2162
2163 // ── Tests ──
2164
2165 /// H9 forward-compat (client side): when the server answers a `ClientHello`
2166 /// with a typed `ServerReject` (the version isn't one it speaks), the client
2167 /// surfaces a clear version-mismatch error instead of hanging or returning a
2168 /// generic failure — and crucially does NOT auto-downgrade.
2169 #[tokio::test]
2170 async fn client_surfaces_server_reject_as_version_error() {
2171 use crate::transport::handshake::ServerReject;
2172
2173 let (client_transport, server_transport) = ChannelTransport::pair();
2174 // The reject path errors before any key verification, so any key works.
2175 let (_sk, expected_vk) = crate::crypto::hybrid_sign::HybridSigningKey::generate();
2176
2177 let server = tokio::spawn(async move {
2178 // Consume the ClientHello, then reply with the typed reject.
2179 let _hello = server_transport.recv_bytes().await.unwrap();
2180 let reject = borsh::to_vec(&ServerReject::unsupported_version()).unwrap();
2181 server_transport.send_bytes(&reject).await.unwrap();
2182 });
2183
2184 let result = run_client_handshake(&client_transport, &expected_vk, None).await;
2185 server.await.unwrap();
2186
2187 let err = result.expect_err("client must surface the reject as an error");
2188 let msg = format!("{err:?}");
2189 assert!(
2190 msg.contains("unsupported protocol version"),
2191 "expected a version-mismatch error, got: {msg}"
2192 );
2193 }
2194
2195 /// **HS-02.** A MITM that answers every `ClientHello` with a fresh cheap
2196 /// `HelloRetryRequest` must NOT loop the client forever — `run_client_handshake`
2197 /// caps the retry rounds and returns an error. (Pre-fix this test would hang.)
2198 #[tokio::test]
2199 async fn client_handshake_caps_retry_rounds() {
2200 use crate::transport::handshake::HelloRetryRequest;
2201
2202 let (client_transport, server_transport) = ChannelTransport::pair();
2203 let (_sk, expected_vk) = crate::crypto::hybrid_sign::HybridSigningKey::generate();
2204
2205 // Malicious server: answer EVERY ClientHello with a fresh, cheap
2206 // HelloRetryRequest (no cookie, no PoW) — never converging.
2207 let server = tokio::spawn(async move {
2208 loop {
2209 if server_transport.recv_bytes().await.is_err() {
2210 break;
2211 }
2212 let retry = borsh::to_vec(&HelloRetryRequest {
2213 challenge: None,
2214 cookie: None,
2215 })
2216 .expect("encode retry");
2217 if server_transport.send_bytes(&retry).await.is_err() {
2218 break;
2219 }
2220 }
2221 });
2222
2223 let result = run_client_handshake(&client_transport, &expected_vk, None).await;
2224 drop(client_transport); // close the channel so the server task ends
2225 let _ = server.await;
2226
2227 assert!(
2228 matches!(result, Err(CoreError::HandshakeError(_))),
2229 "client must error after the retry-round cap, not loop forever; got {result:?}"
2230 );
2231 }
2232
2233 /// **INFOLEAK-1.** `ResumptionHint`'s `Debug` must redact the 0-RTT
2234 /// `resumption_secret` — a mobile/FFI consumer logging it with `{:?}` must
2235 /// not leak the key material.
2236 #[test]
2237 fn resumption_hint_debug_redacts_secret() {
2238 let hint = ResumptionHint {
2239 session_id: vec![0xAB; 32],
2240 resumption_secret: vec![0xCD; 32],
2241 };
2242 let dbg = format!("{hint:?}");
2243 assert!(dbg.contains("REDACTED"), "secret must be redacted: {dbg}");
2244 // No representation of the secret bytes (0xCD) leaks — neither hex nor
2245 // the decimal the derived Debug would have printed for a Vec<u8>.
2246 assert!(
2247 !dbg.contains("205"),
2248 "no decimal secret bytes in Debug: {dbg}"
2249 );
2250 assert!(
2251 !dbg.to_lowercase().contains("cd, cd"),
2252 "no hex secret bytes: {dbg}"
2253 );
2254 }
2255
2256 #[tokio::test]
2257 async fn test_phantom_session_instant_connect() {
2258 let session = PhantomSession::connect("example.com:443".to_string());
2259
2260 // Should be in Connecting state immediately
2261 assert_eq!(session.connection_state(), ConnectionState::Connecting);
2262 assert!(!session.is_data_ready());
2263 assert_eq!(session.peer_addr(), "example.com:443");
2264 }
2265
2266 #[tokio::test]
2267 async fn test_phantom_session_send_queue() {
2268 let session = PhantomSession::connect("example.com:443".to_string());
2269
2270 // Send while still connecting — should queue
2271 session.send(vec![1, 2, 3]).await.unwrap();
2272 session.send(vec![4, 5, 6]).await.unwrap();
2273 assert_eq!(session.queued_count().await, 2);
2274
2275 // Simulate handshake completion
2276 session.set_state(ConnectionState::ClassicalReady);
2277 assert!(session.is_data_ready());
2278
2279 // Flush queue
2280 let flushed = session.flush_queue().await.unwrap();
2281 assert_eq!(flushed, 2);
2282 assert_eq!(session.queued_count().await, 0);
2283 }
2284
2285 #[tokio::test]
2286 async fn test_phantom_session_state_progression() {
2287 let session = PhantomSession::connect("example.com:443".to_string());
2288
2289 assert_eq!(session.connection_state(), ConnectionState::Connecting);
2290 assert!(!session.is_data_ready());
2291
2292 session.set_state(ConnectionState::ClassicalReady);
2293 assert!(session.is_data_ready());
2294 assert!(!session.is_pqc_ready());
2295
2296 session.set_state(ConnectionState::PqcUpgrading);
2297 assert!(session.is_data_ready());
2298 assert!(!session.is_pqc_ready());
2299
2300 session.set_state(ConnectionState::PqcReady);
2301 assert!(session.is_data_ready());
2302 assert!(session.is_pqc_ready());
2303
2304 session.set_state(ConnectionState::Connected);
2305 assert!(session.is_data_ready());
2306 assert!(session.is_pqc_ready());
2307 }
2308
2309 #[tokio::test]
2310 async fn test_phantom_session_close() {
2311 let session = PhantomSession::connect("example.com:443".to_string());
2312 session.disconnect().await.unwrap();
2313 assert_eq!(session.connection_state(), ConnectionState::Closed);
2314 assert!(!session.is_data_ready());
2315 }
2316
2317 /// Helper: decrypt an incoming encrypted frame on the test server side.
2318 fn decrypt_incoming(
2319 server_session: &crate::transport::session::Session,
2320 bytes: &[u8],
2321 ) -> Vec<u8> {
2322 let pkt = PhantomPacket::from_wire(bytes).expect("deserialize PhantomPacket");
2323 assert!(
2324 pkt.header.flags.contains(PacketFlags::ENCRYPTED),
2325 "expected ENCRYPTED flag on application data"
2326 );
2327 server_session
2328 .decrypt_packet(&pkt.header, &pkt.payload)
2329 .expect("decrypt application data")
2330 }
2331
2332 /// Helper: build an encrypted reply frame from the test server side.
2333 fn encrypt_outgoing(
2334 server_session: &crate::transport::session::Session,
2335 session_id: SessionId,
2336 stream_id: TransportStreamId,
2337 sequence: u32,
2338 payload: &[u8],
2339 ) -> Vec<u8> {
2340 let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2341 let header =
2342 PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2343 .with_epoch(server_session.current_epoch());
2344 let ct = server_session
2345 .encrypt_packet(&header, payload)
2346 .expect("encrypt reply");
2347 let packet = PhantomPacket::new(header, ct);
2348 packet.to_wire()
2349 }
2350
2351 /// Integration test: Client handshake via ChannelTransport with a
2352 /// simulated server responder.
2353 #[tokio::test]
2354 async fn test_phantom_session_handshake_via_transport() {
2355 let (client_transport, server_transport) = ChannelTransport::pair();
2356 let server_hs = HandshakeServer::new().unwrap();
2357 let server_pinned_key = server_hs.verifying_key().clone();
2358
2359 // Start client session — spawns background handshake (with pinning)
2360 let session = PhantomSession::connect_with_transport(
2361 "test-server:9000",
2362 client_transport,
2363 server_pinned_key,
2364 );
2365
2366 // Queue a message before handshake completes
2367 session.send(b"early-data".to_vec()).await.unwrap();
2368
2369 // Simulate server responder
2370 let server_handle = tokio::spawn(async move {
2371 let client_ip = "127.0.0.1".parse().unwrap();
2372
2373 // 1. Receive the (bare borsh) ClientHello.
2374 let client_hello_bytes = server_transport.recv_bytes().await.unwrap();
2375 let client_hello = borsh::from_slice::<ClientHello>(&client_hello_bytes).unwrap();
2376
2377 // 2. Process — may retry with cookie/PoW.
2378 let server_session = loop {
2379 let response = server_hs.process_client_hello(&client_hello, 0, client_ip);
2380 match response {
2381 HandshakeResponse::Retry(retry) => {
2382 let retry_bytes = borsh::to_vec(&retry).unwrap();
2383 server_transport.send_bytes(&retry_bytes).await.unwrap();
2384 // Receive retried client hello
2385 let next_bytes = server_transport.recv_bytes().await.unwrap();
2386 let next_hello = borsh::from_slice::<ClientHello>(&next_bytes).unwrap();
2387 let resp2 = server_hs.process_client_hello(&next_hello, 0, client_ip);
2388 match resp2 {
2389 HandshakeResponse::Success(server_hello, session, _) => {
2390 let server_hello_bytes = borsh::to_vec(&server_hello).unwrap();
2391 server_transport
2392 .send_bytes(&server_hello_bytes)
2393 .await
2394 .unwrap();
2395 break session;
2396 }
2397 _ => panic!("Expected success after retry"),
2398 }
2399 }
2400 HandshakeResponse::Success(server_hello, session, _) => {
2401 let server_hello_bytes = borsh::to_vec(&server_hello).unwrap();
2402 server_transport
2403 .send_bytes(&server_hello_bytes)
2404 .await
2405 .unwrap();
2406 break session;
2407 }
2408 HandshakeResponse::Reject(r) => panic!("unexpected reject: {:?}", r),
2409 HandshakeResponse::Fail(e) => panic!("handshake failed: {:?}", e),
2410 }
2411 };
2412
2413 let session_id = *server_session.id();
2414
2415 // 3. Receive the flushed early data — must be ENCRYPTED.
2416 let early_frame = server_transport.recv_bytes().await.unwrap();
2417 assert!(
2418 !early_frame
2419 .windows(b"early-data".len())
2420 .any(|w| w == b"early-data"),
2421 "encrypted frame must not contain plaintext early-data"
2422 );
2423 let early_plain = decrypt_incoming(&server_session, &early_frame);
2424 assert_eq!(early_plain, b"early-data");
2425
2426 // 4. Receive a post-handshake message — must be ENCRYPTED.
2427 let post_frame = server_transport.recv_bytes().await.unwrap();
2428 let post_plain = decrypt_incoming(&server_session, &post_frame);
2429 assert_eq!(post_plain, b"after-handshake");
2430
2431 // 5. Send encrypted reply back.
2432 let reply = encrypt_outgoing(&server_session, session_id, 1, 1, b"server-reply");
2433 server_transport.send_bytes(&reply).await.unwrap();
2434 });
2435
2436 // Wait for handshake to progress
2437 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2438
2439 // Should be connected now
2440 assert_eq!(session.connection_state(), ConnectionState::Connected);
2441
2442 // Send after handshake
2443 session.send(b"after-handshake".to_vec()).await.unwrap();
2444
2445 // Receive server reply — now returns DECRYPTED plaintext payload.
2446 let reply = session.recv().await.unwrap();
2447 assert_eq!(reply, b"server-reply");
2448
2449 server_handle.await.unwrap();
2450 session.disconnect().await.unwrap();
2451 }
2452
2453 /// Reliable delivery: a RELIABLE application send must survive a dropped data frame.
2454 ///
2455 /// The client runs over a `LossyTransport`; once the handshake completes we
2456 /// arm a drop of the next frame (the data frame) and send a reliable
2457 /// payload. The first transmission is lost, so the server only sees the
2458 /// payload because the raw-app stream buffers it and the data pump
2459 /// retransmits the timed-out segment.
2460 #[tokio::test]
2461 async fn reliable_send_survives_a_dropped_data_frame() {
2462 use crate::test_harness::fault_transport::{FaultControl, LossyTransport};
2463
2464 let (client_transport, server_transport) = ChannelTransport::pair();
2465 let faults = FaultControl::new();
2466 let lossy_client = LossyTransport::new(client_transport, faults.clone());
2467
2468 let server_hs = HandshakeServer::new().unwrap();
2469 let server_pinned_key = server_hs.verifying_key().clone();
2470
2471 let session = PhantomSession::connect_with_transport(
2472 "test-server:9000",
2473 lossy_client,
2474 server_pinned_key,
2475 );
2476
2477 let server_handle = tokio::spawn(async move {
2478 let client_ip = "127.0.0.1".parse().unwrap();
2479 let client_hello_bytes = server_transport.recv_bytes().await.unwrap();
2480 let client_hello = borsh::from_slice::<ClientHello>(&client_hello_bytes).unwrap();
2481
2482 // Drive the handshake to completion (may take one cookie/PoW retry).
2483 let server_session = loop {
2484 match server_hs.process_client_hello(&client_hello, 0, client_ip) {
2485 HandshakeResponse::Retry(retry) => {
2486 let retry_bytes = borsh::to_vec(&retry).unwrap();
2487 server_transport.send_bytes(&retry_bytes).await.unwrap();
2488 let next_bytes = server_transport.recv_bytes().await.unwrap();
2489 let next_hello = borsh::from_slice::<ClientHello>(&next_bytes).unwrap();
2490 match server_hs.process_client_hello(&next_hello, 0, client_ip) {
2491 HandshakeResponse::Success(server_hello, session, _) => {
2492 let b = borsh::to_vec(&server_hello).unwrap();
2493 server_transport.send_bytes(&b).await.unwrap();
2494 break session;
2495 }
2496 _ => panic!("expected success after retry"),
2497 }
2498 }
2499 HandshakeResponse::Success(server_hello, session, _) => {
2500 let b = borsh::to_vec(&server_hello).unwrap();
2501 server_transport.send_bytes(&b).await.unwrap();
2502 break session;
2503 }
2504 HandshakeResponse::Reject(r) => panic!("unexpected reject: {:?}", r),
2505 HandshakeResponse::Fail(e) => panic!("handshake failed: {:?}", e),
2506 }
2507 };
2508
2509 // The reliable data frame was dropped on first transmission; it can
2510 // only arrive via retransmission. Time-bounded so a missing
2511 // retransmit fails loudly instead of hanging the test forever.
2512 let data_frame = tokio::time::timeout(
2513 std::time::Duration::from_secs(3),
2514 server_transport.recv_bytes(),
2515 )
2516 .await
2517 .expect(
2518 "reliable payload never arrived within 3s — the dropped data frame was not \
2519 retransmitted (loss-recovery regression)",
2520 )
2521 .unwrap();
2522 let plain = decrypt_incoming(&server_session, &data_frame);
2523 assert_eq!(plain, b"reliable-payload");
2524 });
2525
2526 // Wait for the handshake to complete.
2527 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2528 assert_eq!(session.connection_state(), ConnectionState::Connected);
2529
2530 // Arm a single drop, then send: the next frame on the wire (the data
2531 // frame) is silently lost.
2532 faults.arm_drop_next(1);
2533 session.send(b"reliable-payload".to_vec()).await.unwrap();
2534
2535 server_handle.await.unwrap();
2536 session.disconnect().await.unwrap();
2537 }
2538
2539 /// A retransmission (RTO expiry) must be reported to congestion control as
2540 /// a loss, driving BBR into FastRecovery — proves the drain → on_packet_lost
2541 /// wiring, not just that the retransmit happens.
2542 #[tokio::test]
2543 async fn drain_reports_a_retransmit_as_loss_to_bbr() {
2544 use crate::transport::bandwidth_estimator::BbrState;
2545
2546 tokio::time::pause();
2547 let sid = fixed_session_id();
2548 let (client, _server) = paired_sessions(sid);
2549
2550 let stream = Arc::new(TransportStream::new(1));
2551 stream.send_reliable(Bytes::from("payload")).await;
2552 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2553 streams.insert(1u32, stream);
2554
2555 let (client_t, _server_t) = ChannelTransport::pair();
2556 let transport = Arc::new(client_t);
2557
2558 // First drain: the initial transmission — not a loss.
2559 drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2560 assert_ne!(client.bbr_state(), BbrState::FastRecovery);
2561
2562 // The RTO expires; the next drain retransmits and must report the loss.
2563 tokio::time::advance(std::time::Duration::from_millis(1100)).await;
2564 drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2565 assert_eq!(
2566 client.bbr_state(),
2567 BbrState::FastRecovery,
2568 "a retransmit must be reported to BBR as a loss"
2569 );
2570 }
2571
2572 /// New data must not be transmitted while inflight already exceeds the
2573 /// congestion window — the drain holds it back until ACKs free the window.
2574 #[tokio::test]
2575 async fn drain_withholds_new_data_when_inflight_exceeds_cwnd() {
2576 let sid = fixed_session_id();
2577 let (client, _server) = paired_sessions(sid);
2578
2579 // Drive inflight far above any plausible initial cwnd, so the window
2580 // has no room for new data.
2581 client.on_packet_sent(100_000_000);
2582 let inflight_before = client.bandwidth_snapshot().inflight_bytes;
2583
2584 let stream = Arc::new(TransportStream::new(1));
2585 stream.send_reliable(Bytes::from("new-data")).await;
2586 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2587 streams.insert(1u32, stream);
2588
2589 let (client_t, _server_t) = ChannelTransport::pair();
2590 let transport = Arc::new(client_t);
2591
2592 drain_streams_priority_ordered(&transport, &client, sid, &streams).await;
2593
2594 // No new segment was transmitted — inflight is unchanged (a send would
2595 // have grown it via on_packet_sent).
2596 assert_eq!(
2597 client.bandwidth_snapshot().inflight_bytes,
2598 inflight_before,
2599 "no new data should be sent when inflight >= cwnd"
2600 );
2601 }
2602
2603 // ────────────────────────────────────────────────────────────────────
2604 // V2 wire-routing tests (Phase 4.2 / 2.5 follow-up — data-pump V2)
2605 // ────────────────────────────────────────────────────────────────────
2606
2607 use crate::transport::multiplexer::StreamDemultiplexer;
2608 use crate::transport::session::Session as InnerSession;
2609 use crate::transport::stream::Stream as TransportStream;
2610
2611 /// Build two `InnerSession` instances that share a 32-byte secret —
2612 /// one as the "client" (peer_side=false), one as the "server"
2613 /// (peer_side=true). Mirrors the role split after a real handshake.
2614 fn paired_sessions(session_id: SessionId) -> (Arc<InnerSession>, Arc<InnerSession>) {
2615 let secret = [0x11u8; 32];
2616 let client = Arc::new(InnerSession::new(session_id, &secret, false).unwrap());
2617 let server = Arc::new(InnerSession::new(session_id, &secret, true).unwrap());
2618 (client, server)
2619 }
2620
2621 fn fixed_session_id() -> SessionId {
2622 SessionId::from_bytes([0x88; 32])
2623 }
2624
2625 /// Encrypt a V2 application-data packet from the client side at
2626 /// `stream_id` / `sequence`. The returned bytes are wire-serialised
2627 /// ([`PhantomPacket::to_wire`]) and ready to feed into `handle_packet`.
2628 fn build_app_frame(
2629 client_session: &InnerSession,
2630 session_id: SessionId,
2631 stream_id: TransportStreamId,
2632 sequence: u32,
2633 payload: &[u8],
2634 ) -> Vec<u8> {
2635 let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2636 let header =
2637 PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2638 .with_epoch(client_session.current_epoch());
2639 let ciphertext = client_session
2640 .encrypt_packet(&header, payload)
2641 .expect("encrypt_packet");
2642 let packet = PhantomPacket::new(header, ciphertext);
2643 packet.to_wire()
2644 }
2645
2646 #[tokio::test]
2647 async fn v2_recv_routes_encrypted_app_data_through_recv_channel() {
2648 let session_id = fixed_session_id();
2649 let (client_session, server_session) = paired_sessions(session_id);
2650
2651 // Encrypt a V2 application-data packet on the client side.
2652 let stream_id: TransportStreamId = 1;
2653 let frame = build_app_frame(&client_session, session_id, stream_id, 0, b"hello-v2");
2654
2655 // Receive on the server side: deserialize then drive
2656 // handle_packet, which is the recv-path entry point.
2657 let v2 = PhantomPacket::from_wire(&frame).unwrap();
2658
2659 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2660 let demux = Arc::new(demux);
2661 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2662 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2663 let undelivered = AtomicU64::new(0);
2664 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2665 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2666 tx: ack_a,
2667 rx: Mutex::new(ack_b),
2668 });
2669
2670 let mut ack_buf = Vec::with_capacity(256);
2671 let mut path_validation_seq: u32 = 0;
2672 let obs = Observability::new(ObservabilityConfig::default());
2673 handle_packet(
2674 v2,
2675 session_id,
2676 &server_session,
2677 &streams,
2678 &demux,
2679 &transport_send,
2680 &transport_send,
2681 &deliver_tx,
2682 &undelivered,
2683 &mut ack_buf,
2684 &mut path_validation_seq,
2685 &obs,
2686 LegType::Tcp,
2687 )
2688 .await;
2689
2690 // The decrypted plaintext must have been handed to the delivery task,
2691 // tagged with its stream id, and counted toward the undelivered backlog.
2692 let (sid, received) = deliver_rx.recv().await.expect("delivery hand-off");
2693 assert_eq!(sid, stream_id as u32);
2694 assert_eq!(&received[..], b"hello-v2");
2695 assert_eq!(
2696 undelivered.load(Ordering::Acquire),
2697 b"hello-v2".len() as u64
2698 );
2699 }
2700
2701 /// Like [`build_app_frame`] but stamps a caller-chosen `path_id` so the
2702 /// receive-side path gate (PATH-001) can be exercised.
2703 fn build_app_frame_on_path(
2704 client_session: &InnerSession,
2705 session_id: SessionId,
2706 stream_id: TransportStreamId,
2707 sequence: u32,
2708 path_id: u8,
2709 payload: &[u8],
2710 ) -> Vec<u8> {
2711 let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
2712 let header =
2713 PacketHeader::new(session_id, stream_id, sequence, PacketFlags::new(flag_bits))
2714 .with_epoch(client_session.current_epoch())
2715 .with_path_id(path_id);
2716 let ciphertext = client_session
2717 .encrypt_packet(&header, payload)
2718 .expect("encrypt_packet");
2719 PhantomPacket::new(header, ciphertext).to_wire()
2720 }
2721
2722 #[tokio::test]
2723 async fn app_data_on_non_validated_path_is_dropped() {
2724 // PATH-001 (Invariant 6): application data is delivered only on a
2725 // Validated path. Path 0 is pre-validated; any other path_id must
2726 // complete a PATH_VALIDATION challenge first. A frame on an unvalidated
2727 // path is dropped (even though it decrypts cleanly) and the path is
2728 // registered Unvalidated so a later challenge can promote it.
2729 use crate::transport::path::PathStateKind;
2730 let session_id = fixed_session_id();
2731 let (client_session, server_session) = paired_sessions(session_id);
2732 let stream_id: TransportStreamId = 1;
2733
2734 let bad = build_app_frame_on_path(
2735 &client_session,
2736 session_id,
2737 stream_id,
2738 0,
2739 7, // never validated on the receiver
2740 b"on-bad-path",
2741 );
2742 let bad = PhantomPacket::from_wire(&bad).unwrap();
2743
2744 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2745 let demux = Arc::new(demux);
2746 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2747 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2748 let undelivered = AtomicU64::new(0);
2749 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2750 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2751 tx: ack_a,
2752 rx: Mutex::new(ack_b),
2753 });
2754 let mut ack_buf = Vec::with_capacity(256);
2755 let mut path_validation_seq: u32 = 0;
2756 let obs = Observability::new(ObservabilityConfig::default());
2757
2758 handle_packet(
2759 bad,
2760 session_id,
2761 &server_session,
2762 &streams,
2763 &demux,
2764 &transport_send,
2765 &transport_send,
2766 &deliver_tx,
2767 &undelivered,
2768 &mut ack_buf,
2769 &mut path_validation_seq,
2770 &obs,
2771 LegType::Tcp,
2772 )
2773 .await;
2774
2775 assert!(
2776 deliver_rx.try_recv().is_err(),
2777 "application data on a non-validated path must be dropped"
2778 );
2779 assert_eq!(
2780 undelivered.load(Ordering::Acquire),
2781 0,
2782 "dropped data must not count toward the backlog"
2783 );
2784 assert_eq!(
2785 server_session.path_state(7),
2786 Some(PathStateKind::Unvalidated),
2787 "the unseen path id must be registered for a later challenge"
2788 );
2789
2790 // Positive control: the SAME stream on the pre-validated path 0 IS
2791 // delivered, proving the gate only blocks non-validated paths.
2792 let good = build_app_frame_on_path(
2793 &client_session,
2794 session_id,
2795 stream_id,
2796 1,
2797 0,
2798 b"on-good-path",
2799 );
2800 let good = PhantomPacket::from_wire(&good).unwrap();
2801 handle_packet(
2802 good,
2803 session_id,
2804 &server_session,
2805 &streams,
2806 &demux,
2807 &transport_send,
2808 &transport_send,
2809 &deliver_tx,
2810 &undelivered,
2811 &mut ack_buf,
2812 &mut path_validation_seq,
2813 &obs,
2814 LegType::Tcp,
2815 )
2816 .await;
2817 let (sid, received) = deliver_rx.recv().await.expect("path-0 delivery");
2818 assert_eq!(sid, stream_id as u32);
2819 assert_eq!(&received[..], b"on-good-path");
2820 }
2821
2822 /// Build an `ENCRYPTED | ACK` frame (H1) from `acker_session` acknowledging
2823 /// `acked_seq` on `stream_id`, with its own header sequence `ack_header_seq`
2824 /// (drawn from the acker's send space, distinct from the acked data
2825 /// sequence). Wire-serialised, ready for `handle_packet`.
2826 fn build_encrypted_ack(
2827 acker_session: &InnerSession,
2828 session_id: SessionId,
2829 stream_id: TransportStreamId,
2830 ack_header_seq: u32,
2831 acked_seq: u32,
2832 ) -> Vec<u8> {
2833 let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::ACK;
2834 let header = PacketHeader::new(
2835 session_id,
2836 stream_id,
2837 ack_header_seq,
2838 PacketFlags::new(flag_bits),
2839 )
2840 .with_epoch(acker_session.current_epoch());
2841 let ct = acker_session
2842 .encrypt_packet(&header, &acked_seq.to_be_bytes())
2843 .expect("encrypt ack");
2844 PhantomPacket::new(header, ct).to_wire()
2845 }
2846
2847 /// Drive a single inbound packet through `handle_packet` against
2848 /// `server_session` with throwaway delivery/transport/observability wiring.
2849 async fn run_recv(
2850 pkt: PhantomPacket,
2851 session_id: SessionId,
2852 server_session: &Arc<InnerSession>,
2853 streams: &Arc<DashMap<u32, Arc<TransportStream>>>,
2854 ) {
2855 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
2856 let demux = Arc::new(demux);
2857 let (deliver_tx, _deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
2858 let undelivered = AtomicU64::new(0);
2859 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
2860 let transport: Arc<ChannelTransport> = Arc::new(ChannelTransport {
2861 tx: ack_a,
2862 rx: Mutex::new(ack_b),
2863 });
2864 let mut ack_buf = Vec::with_capacity(64);
2865 let mut path_validation_seq: u32 = 0;
2866 let obs = Observability::new(ObservabilityConfig::default());
2867 handle_packet(
2868 pkt,
2869 session_id,
2870 server_session,
2871 streams,
2872 &demux,
2873 &transport,
2874 &transport,
2875 &deliver_tx,
2876 &undelivered,
2877 &mut ack_buf,
2878 &mut path_validation_seq,
2879 &obs,
2880 LegType::Tcp,
2881 )
2882 .await;
2883 }
2884
2885 /// Stage a stream with one in-flight reliable segment; returns the stream,
2886 /// the shared streams map, and the segment's sequence number.
2887 async fn staged_pending_segment() -> (
2888 Arc<TransportStream>,
2889 Arc<DashMap<u32, Arc<TransportStream>>>,
2890 u32,
2891 ) {
2892 let stream_id: TransportStreamId = 1;
2893 let stream = Arc::new(TransportStream::new(stream_id));
2894 let seq = stream
2895 .send_reliable(Bytes::from_static(b"reliable-payload"))
2896 .await;
2897 let _ = stream.poll_send(u64::MAX).await.expect("segment in-flight");
2898 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
2899 streams.insert(stream_id as u32, stream.clone());
2900 (stream, streams, seq)
2901 }
2902
2903 /// **H1 (Invariant 2).** A forged *unauthenticated* ACK — whether bare
2904 /// (`ACK` flag, empty payload) or carrying a plaintext 4-byte acked-seq —
2905 /// must NOT retire a pending reliable segment. Pre-fix, the ACK branch ran
2906 /// before the AEAD gate and trusted `header.sequence`, so an off-path
2907 /// attacker could silently drop never-acknowledged segments.
2908 #[tokio::test]
2909 async fn forged_plaintext_ack_does_not_retire_pending_segment() {
2910 let session_id = fixed_session_id();
2911 let (_client, server_session) = paired_sessions(session_id);
2912 let (stream, streams, seq) = staged_pending_segment().await;
2913 let stream_id: TransportStreamId = 1;
2914
2915 // Variant 1: bare ACK, no ENCRYPTED, empty payload, guessed sequence.
2916 run_recv(
2917 PhantomPacket::new(
2918 PacketHeader::new(
2919 session_id,
2920 stream_id,
2921 seq,
2922 PacketFlags::new(PacketFlags::ACK),
2923 ),
2924 Vec::new(),
2925 ),
2926 session_id,
2927 &server_session,
2928 &streams,
2929 )
2930 .await;
2931 // Variant 2: ACK with a plaintext 4-byte acked-seq, no ENCRYPTED.
2932 run_recv(
2933 PhantomPacket::new(
2934 PacketHeader::new(
2935 session_id,
2936 stream_id,
2937 999,
2938 PacketFlags::new(PacketFlags::ACK),
2939 ),
2940 seq.to_be_bytes().to_vec(),
2941 ),
2942 session_id,
2943 &server_session,
2944 &streams,
2945 )
2946 .await;
2947
2948 assert!(
2949 stream.ack(seq).await.is_some(),
2950 "a forged unauthenticated ACK must not retire the pending reliable segment"
2951 );
2952 }
2953
2954 /// **H1 positive control.** A genuine `ENCRYPTED | ACK` frame from the peer,
2955 /// whose AEAD payload carries the acked data sequence, retires the matching
2956 /// pending segment after AEAD verify. The ACK's own `header.sequence`
2957 /// (`ack_header_seq`) is deliberately different from the acked sequence to
2958 /// prove the handler reads the authenticated payload, not the header.
2959 #[tokio::test]
2960 async fn authenticated_ack_retires_pending_segment() {
2961 let session_id = fixed_session_id();
2962 let (client_session, server_session) = paired_sessions(session_id);
2963 let (stream, streams, seq) = staged_pending_segment().await;
2964 let stream_id: TransportStreamId = 1;
2965
2966 let ack_header_seq = seq.wrapping_add(54_321);
2967 let frame =
2968 build_encrypted_ack(&client_session, session_id, stream_id, ack_header_seq, seq);
2969 let ack_pkt = PhantomPacket::from_wire(&frame).expect("parse ack");
2970 run_recv(ack_pkt, session_id, &server_session, &streams).await;
2971
2972 assert!(
2973 stream.ack(seq).await.is_none(),
2974 "an authenticated ACK must retire the acked pending segment"
2975 );
2976 }
2977
2978 /// **H1 session binding.** A frame whose `header.session_id` does not match
2979 /// the negotiated session must be dropped by the per-frame guard before any
2980 /// state mutation — pre-fix the ACK was processed with no session check.
2981 #[tokio::test]
2982 async fn ack_with_wrong_session_id_is_dropped() {
2983 let session_id = fixed_session_id();
2984 let (_client, server_session) = paired_sessions(session_id);
2985 let (stream, streams, seq) = staged_pending_segment().await;
2986 let stream_id: TransportStreamId = 1;
2987
2988 let wrong_id = SessionId::from_bytes([0x11; 32]);
2989 run_recv(
2990 PhantomPacket::new(
2991 PacketHeader::new(wrong_id, stream_id, seq, PacketFlags::new(PacketFlags::ACK)),
2992 Vec::new(),
2993 ),
2994 session_id,
2995 &server_session,
2996 &streams,
2997 )
2998 .await;
2999
3000 assert!(
3001 stream.ack(seq).await.is_some(),
3002 "an ACK for a different session id must not retire the segment"
3003 );
3004 }
3005
3006 #[tokio::test]
3007 async fn v2_recv_drops_unencrypted_non_empty_post_handshake_payload() {
3008 // Downgrade defense: a V2 application-data packet WITHOUT the
3009 // ENCRYPTED flag but with a non-empty plaintext-looking payload
3010 // must be dropped, mirroring the V1 invariant.
3011 let session_id = fixed_session_id();
3012 let (_, server_session) = paired_sessions(session_id);
3013
3014 let stream_id: TransportStreamId = 2;
3015 let bad_header = PacketHeader::new(
3016 session_id,
3017 stream_id,
3018 0,
3019 PacketFlags::new(PacketFlags::RELIABLE), // no ENCRYPTED
3020 );
3021 let bad_packet = PhantomPacket::new(bad_header, b"leaked-cleartext".to_vec());
3022
3023 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3024 let demux = Arc::new(demux);
3025 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3026 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3027 let undelivered = AtomicU64::new(0);
3028 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
3029 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3030 tx: ack_a,
3031 rx: Mutex::new(ack_b),
3032 });
3033
3034 let mut ack_buf = Vec::with_capacity(256);
3035 let mut path_validation_seq: u32 = 0;
3036 let obs = Observability::new(ObservabilityConfig::default());
3037 handle_packet(
3038 bad_packet,
3039 session_id,
3040 &server_session,
3041 &streams,
3042 &demux,
3043 &transport_send,
3044 &transport_send,
3045 &deliver_tx,
3046 &undelivered,
3047 &mut ack_buf,
3048 &mut path_validation_seq,
3049 &obs,
3050 LegType::Tcp,
3051 )
3052 .await;
3053
3054 // Nothing should have been handed to the delivery task, and the backlog
3055 // counter must stay at zero (the packet was dropped before hand-off).
3056 assert!(
3057 deliver_rx.try_recv().is_err(),
3058 "unencrypted post-handshake payload must NOT be handed off for delivery"
3059 );
3060 assert_eq!(undelivered.load(Ordering::Acquire), 0);
3061 }
3062
3063 #[tokio::test]
3064 async fn v2_recv_handles_coalesced_bundle_and_routes_each_subpayload() {
3065 use crate::transport::packet_coalescer::{CoalescerConfig, PacketCoalescer};
3066
3067 let session_id = fixed_session_id();
3068 let (client_session, server_session) = paired_sessions(session_id);
3069
3070 // Build a COALESCED bundle of three sub-payloads.
3071 let mut coalescer = PacketCoalescer::new(CoalescerConfig::default());
3072 coalescer.push(b"alpha");
3073 coalescer.push(b"bravo");
3074 coalescer.push(b"charlie");
3075 let bundle = coalescer.flush().expect("bundle");
3076
3077 // Encrypt the bundle and wrap it in a V2 packet with
3078 // ENCRYPTED + COALESCED flags.
3079 let stream_id: TransportStreamId = 3;
3080 let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::COALESCED;
3081 let header = PacketHeader::new(session_id, stream_id, 0, PacketFlags::new(flag_bits))
3082 .with_epoch(client_session.current_epoch());
3083 let ciphertext = client_session
3084 .encrypt_packet(&header, &bundle)
3085 .expect("encrypt bundle");
3086 let v2 = PhantomPacket::new(header, ciphertext);
3087
3088 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3089 let demux = Arc::new(demux);
3090 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3091 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3092 let undelivered = AtomicU64::new(0);
3093 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(4);
3094 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3095 tx: ack_a,
3096 rx: Mutex::new(ack_b),
3097 });
3098
3099 let mut ack_buf = Vec::with_capacity(256);
3100 let mut path_validation_seq: u32 = 0;
3101 let obs = Observability::new(ObservabilityConfig::default());
3102 handle_packet(
3103 v2,
3104 session_id,
3105 &server_session,
3106 &streams,
3107 &demux,
3108 &transport_send,
3109 &transport_send,
3110 &deliver_tx,
3111 &undelivered,
3112 &mut ack_buf,
3113 &mut path_validation_seq,
3114 &obs,
3115 LegType::Tcp,
3116 )
3117 .await;
3118
3119 // Each sub-payload is handed off IN ORDER through the single FIFO
3120 // delivery channel, every one tagged with the outer stream id, and the
3121 // total counted toward the undelivered backlog.
3122 let (sa, a) = deliver_rx.recv().await.expect("alpha");
3123 let (sb, b) = deliver_rx.recv().await.expect("bravo");
3124 let (sc, c) = deliver_rx.recv().await.expect("charlie");
3125 assert_eq!(
3126 (sa, sb, sc),
3127 (stream_id as u32, stream_id as u32, stream_id as u32)
3128 );
3129 assert_eq!(&a[..], b"alpha");
3130 assert_eq!(&b[..], b"bravo");
3131 assert_eq!(&c[..], b"charlie");
3132 assert_eq!(undelivered.load(Ordering::Acquire), (5 + 5 + 7) as u64);
3133 }
3134
3135 /// Ordering across a COALESCED bundle followed by a normal frame: the single
3136 /// FIFO delivery channel must hand the bundle's `[A, B, C]` AND the later
3137 /// `D` to the consumer in exactly `A, B, C, D` — decoupling delivery from
3138 /// the reader must not reorder application bytes.
3139 #[tokio::test]
3140 async fn delivery_preserves_order_across_coalesced_then_normal_frame() {
3141 use crate::transport::packet_coalescer::{CoalescerConfig, PacketCoalescer};
3142
3143 let session_id = fixed_session_id();
3144 let (client_session, server_session) = paired_sessions(session_id);
3145 let stream_id: TransportStreamId = 1;
3146
3147 // Frame 1: COALESCED [A, B, C] at sequence 0.
3148 let mut coalescer = PacketCoalescer::new(CoalescerConfig::default());
3149 coalescer.push(b"A");
3150 coalescer.push(b"B");
3151 coalescer.push(b"C");
3152 let bundle = coalescer.flush().expect("bundle");
3153 let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::COALESCED;
3154 let h1 = PacketHeader::new(session_id, stream_id, 0, PacketFlags::new(flag_bits))
3155 .with_epoch(client_session.current_epoch());
3156 let ct1 = client_session
3157 .encrypt_packet(&h1, &bundle)
3158 .expect("encrypt bundle");
3159 let coalesced = PhantomPacket::new(h1, ct1);
3160
3161 // Frame 2: a normal RELIABLE "D" at sequence 1.
3162 let d_wire = build_app_frame(&client_session, session_id, stream_id, 1, b"D");
3163 let normal = PhantomPacket::from_wire(&d_wire).unwrap();
3164
3165 let (demux, _ctrl) = StreamDemultiplexer::new(16);
3166 let demux = Arc::new(demux);
3167 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3168 let (deliver_tx, mut deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3169 let undelivered = AtomicU64::new(0);
3170 let (ack_a, ack_b) = mpsc::channel::<Vec<u8>>(8);
3171 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3172 tx: ack_a,
3173 rx: Mutex::new(ack_b),
3174 });
3175 let mut ack_buf = Vec::with_capacity(256);
3176 let mut pv_seq: u32 = 0;
3177 let obs = Observability::new(ObservabilityConfig::default());
3178
3179 for pkt in [coalesced, normal] {
3180 handle_packet(
3181 pkt,
3182 session_id,
3183 &server_session,
3184 &streams,
3185 &demux,
3186 &transport_send,
3187 &transport_send,
3188 &deliver_tx,
3189 &undelivered,
3190 &mut ack_buf,
3191 &mut pv_seq,
3192 &obs,
3193 LegType::Tcp,
3194 )
3195 .await;
3196 }
3197
3198 // Drain the FIFO delivery channel — order must be exactly A, B, C, D.
3199 let mut got: Vec<Bytes> = Vec::new();
3200 while let Ok((_sid, b)) = deliver_rx.try_recv() {
3201 got.push(b);
3202 }
3203 let seen: Vec<&[u8]> = got.iter().map(|b| &b[..]).collect();
3204 assert_eq!(seen, vec![&b"A"[..], b"B", b"C", b"D"]);
3205 }
3206
3207 /// A peer that ignores flow control and floods application data faster than
3208 /// the app drains must NOT grow the receive backlog without bound: once the
3209 /// undelivered backlog crosses the reader's hard cap, the session is torn
3210 /// down (state → `Closed`) instead of buffering unboundedly. The app here
3211 /// never calls `recv()`, so the delivery channel fills and the reader's
3212 /// pre-decrypt cap gate fires.
3213 #[tokio::test]
3214 async fn peer_ignoring_flow_control_trips_delivery_hard_cap_and_closes_session() {
3215 let session_id = fixed_session_id();
3216 let (client_inner, server_inner) = paired_sessions(session_id);
3217 let (client_t, server_t) = ChannelTransport::pair();
3218 let client_t = Arc::new(client_t);
3219
3220 // Full server-side session with a running pump; the app NEVER drains it.
3221 let server = PhantomSession::from_accepted_server_session(
3222 "flooder".to_string(),
3223 server_t,
3224 server_inner,
3225 );
3226
3227 // Drain and discard everything the server sends back (ACKs / control)
3228 // so the server reader never blocks on the back channel — a real
3229 // flooding peer likewise keeps emptying its socket. Without this the
3230 // reader would wedge on its own ACK send and the cap could never trip.
3231 let drain_t = client_t.clone();
3232 let drainer = tokio::spawn(async move { while drain_t.recv_bytes().await.is_ok() {} });
3233
3234 // Malicious client: flood valid RELIABLE app packets with unique
3235 // monotonic sequences (so none are replay-dropped) and never honor a
3236 // WINDOW_UPDATE — i.e. ignore flow control entirely.
3237 let payload = vec![0xABu8; 64 * 1024];
3238 let mut seq: u32 = 0;
3239 let mut torn_down = false;
3240 for _ in 0..4000 {
3241 if server.connection_state() == ConnectionState::Closed {
3242 torn_down = true;
3243 break;
3244 }
3245 let flag_bits = PacketFlags::RELIABLE | PacketFlags::ENCRYPTED;
3246 let header = PacketHeader::new(session_id, 1, seq, PacketFlags::new(flag_bits))
3247 .with_epoch(client_inner.current_epoch());
3248 let ct = client_inner
3249 .encrypt_packet(&header, &payload)
3250 .expect("encrypt");
3251 // Bound the send so a torn-down (or wedged) transport can't hang the
3252 // test: a closed channel or a stalled reader both mean the flood is
3253 // no longer absorbed — i.e. the session is being torn down.
3254 let wire = PhantomPacket::new(header, ct).to_wire();
3255 match tokio::time::timeout(
3256 std::time::Duration::from_secs(5),
3257 client_t.send_bytes(&wire),
3258 )
3259 .await
3260 {
3261 Ok(Ok(())) => {}
3262 _ => {
3263 torn_down = true;
3264 break;
3265 }
3266 }
3267 seq = seq.wrapping_add(1);
3268 tokio::task::yield_now().await;
3269 }
3270 assert!(
3271 torn_down,
3272 "a peer flooding past the delivery hard cap must get its session torn down"
3273 );
3274
3275 // Definitive: the session ends up Closed.
3276 let mut closed = false;
3277 for _ in 0..200 {
3278 if server.connection_state() == ConnectionState::Closed {
3279 closed = true;
3280 break;
3281 }
3282 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
3283 }
3284 drainer.abort();
3285 assert!(
3286 closed,
3287 "session state must be Closed after the hard cap trips"
3288 );
3289 }
3290
3291 /// Phase 4.4 — BBR ACK feedback drives the pacer rate. Build a
3292 /// realistic DeliverySample with known sent_at/acked_at timestamps
3293 /// and packet size; assert that calling `on_packet_acked` causes
3294 /// the pacer to leave its default unlimited state with a finite
3295 /// finite positive rate.
3296 #[tokio::test]
3297 async fn bbr_on_ack_drives_pacer_rate() {
3298 use crate::transport::bandwidth_estimator::DeliverySample;
3299 use std::time::{Duration, Instant};
3300
3301 let session_id = fixed_session_id();
3302 let (client_session, _server_session) = paired_sessions(session_id);
3303
3304 // The default Pacer is `unlimited` — track it before/after.
3305 assert!(!client_session.pacer().is_enabled());
3306
3307 // Simulate sending a 1500-byte packet, then receiving an ACK
3308 // 20 ms later. We feed a few samples in a row so the EMA
3309 // estimator has data to work with.
3310 let now = Instant::now();
3311 for i in 0..16 {
3312 let sent_at = now - Duration::from_millis(20 + i * 5);
3313 let acked_at = now - Duration::from_millis(i * 5);
3314 let sample = DeliverySample {
3315 delivered_bytes: 0,
3316 sent_at,
3317 acked_at,
3318 packet_bytes: 1500,
3319 is_app_limited: false,
3320 ack_delay_us: 100,
3321 };
3322 client_session.on_packet_sent(1500);
3323 let _ = client_session.on_packet_acked(sample);
3324 }
3325
3326 // The pacer should now be set to a real rate (still
3327 // "unlimited" handle, but with a finite stored rate). The
3328 // BandwidthEstimator's `pacing_rate()` is what gets pushed
3329 // into the pacer; assert it is non-zero and finite.
3330 let snap = client_session.bandwidth_snapshot();
3331 assert!(
3332 snap.pacing_rate_bps > 0,
3333 "expected pacing_rate to be non-zero, got {}",
3334 snap.pacing_rate_bps,
3335 );
3336 // The pacer's stored rate must match the estimator's view
3337 // (Session.on_packet_acked mirrors them).
3338 assert_eq!(client_session.pacer().rate(), snap.pacing_rate_bps);
3339 }
3340
3341 /// Phase 4.3 — WINDOW_UPDATE round-trip under the relative-credit model.
3342 /// The receive **delivery** task credits the flow-control window on real
3343 /// app consumption and stages the credit; the **send loop** flushes it as a
3344 /// single encrypted WINDOW_UPDATE via `flush_pending_window_updates`. The
3345 /// sender then ADDS the relative credit to its `peer_send_window` — it does
3346 /// not overwrite it with an absolute value.
3347 #[tokio::test]
3348 async fn flow_control_window_update_round_trip() {
3349 use crate::transport::stream::INITIAL_STREAM_WINDOW;
3350
3351 let session_id = fixed_session_id();
3352 let (client_session, server_session) = paired_sessions(session_id);
3353
3354 let stream_id: TransportStreamId = 9;
3355 let server_streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3356 let server_stream = Arc::new(TransportStream::new(stream_id));
3357 server_streams.insert(stream_id as u32, server_stream.clone());
3358
3359 // Client also has a Stream so we can apply the inbound credit.
3360 let client_stream = Arc::new(TransportStream::new(stream_id));
3361
3362 // Pre-drain the client's peer_send_window so the credit has a real
3363 // effect to assert against.
3364 let drain = INITIAL_STREAM_WINDOW - 1000;
3365 assert!(client_stream.try_consume_send_window(drain));
3366 assert_eq!(client_stream.peer_send_window(), 1000);
3367
3368 // The delivery task credits the window on real consumption: model one
3369 // drain that crosses the half-window threshold and stage the credit
3370 // exactly as `run_data_pump`'s delivery task does.
3371 let consumed = INITIAL_STREAM_WINDOW / 2 + 1;
3372 let credit = server_stream
3373 .record_app_consumed(consumed)
3374 .expect("threshold crossed → credit granted");
3375 server_stream.stage_window_update_credit(credit);
3376
3377 // The send loop flushes the staged credit as a single WINDOW_UPDATE.
3378 let (out_tx, mut out_rx) = mpsc::channel::<Vec<u8>>(4);
3379 let (back_tx, back_rx) = mpsc::channel::<Vec<u8>>(4);
3380 let server_outbound: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3381 tx: out_tx,
3382 rx: Mutex::new(back_rx),
3383 });
3384 let _keep = back_tx;
3385 flush_pending_window_updates(
3386 &server_outbound,
3387 &server_session,
3388 session_id,
3389 &server_streams,
3390 )
3391 .await;
3392
3393 // Exactly one WINDOW_UPDATE was emitted; decrypt it and read the credit.
3394 let frame = tokio::time::timeout(std::time::Duration::from_millis(100), out_rx.recv())
3395 .await
3396 .expect("expected a WINDOW_UPDATE frame")
3397 .expect("channel open");
3398 let pv2 = PhantomPacket::from_wire(&frame).unwrap();
3399 assert!(pv2.header.flags.contains(PacketFlags::WINDOW_UPDATE));
3400 // The control frame's sequence comes from the stream's own send space —
3401 // distinct from any data packet so the AEAD nonce never repeats.
3402 let pt = client_session
3403 .decrypt_packet(&pv2.header, &pv2.payload)
3404 .expect("decrypt WINDOW_UPDATE");
3405 assert_eq!(pt.len(), 4);
3406 let announced = u32::from_be_bytes([pt[0], pt[1], pt[2], pt[3]]);
3407 assert_eq!(
3408 announced, credit,
3409 "WINDOW_UPDATE carries the relative credit (bytes consumed since last update)"
3410 );
3411 // Exactly one frame was emitted — nothing else is queued on the wire.
3412 assert!(
3413 out_rx.try_recv().is_err(),
3414 "exactly one WINDOW_UPDATE must be emitted"
3415 );
3416
3417 // The staged slot is now empty — a second flush emits nothing.
3418 flush_pending_window_updates(
3419 &server_outbound,
3420 &server_session,
3421 session_id,
3422 &server_streams,
3423 )
3424 .await;
3425 assert!(
3426 out_rx.try_recv().is_err(),
3427 "no spurious second WINDOW_UPDATE after the credit was already flushed"
3428 );
3429
3430 // Apply the relative credit on the client side: peer_send_window ADDS it
3431 // to the current 1000 (it does not jump to an absolute value).
3432 client_stream.apply_peer_window_update(announced);
3433 assert_eq!(client_stream.peer_send_window(), 1000 + credit);
3434 }
3435
3436 /// Phase 4.3 — priority scheduler ordering. Two streams enqueue
3437 /// data simultaneously; the higher-priority one must be drained
3438 /// first, all of its data before any of the lower one's.
3439 #[tokio::test]
3440 async fn priority_scheduler_drains_higher_priority_stream_first() {
3441 // Build a real Session (any crypto state — we only inspect
3442 // send order, not ciphertext) and an Arc<Stream> per stream.
3443 let session_id = fixed_session_id();
3444 let (client_session, _server_session) = paired_sessions(session_id);
3445
3446 // Capture every outbound packet by stuffing into a channel-
3447 // backed transport whose tx end we can drain after.
3448 let (tx_a, mut rx_a) = mpsc::channel::<Vec<u8>>(32);
3449 let (tx_b, rx_b) = mpsc::channel::<Vec<u8>>(32);
3450 let transport: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3451 tx: tx_a,
3452 rx: Mutex::new(rx_b),
3453 });
3454 let _keep = tx_b; // keep the recv side alive
3455
3456 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3457
3458 // Stream 11: low priority (1), 3 reliable chunks.
3459 let low = Arc::new(TransportStream::new(11));
3460 low.set_priority(1);
3461 low.send_reliable(Bytes::from_static(b"L0")).await;
3462 low.send_reliable(Bytes::from_static(b"L1")).await;
3463 low.send_reliable(Bytes::from_static(b"L2")).await;
3464 streams.insert(11, low);
3465
3466 // Stream 22: HIGH priority (100), 3 reliable chunks.
3467 let hi = Arc::new(TransportStream::new(22));
3468 hi.set_priority(100);
3469 hi.send_reliable(Bytes::from_static(b"H0")).await;
3470 hi.send_reliable(Bytes::from_static(b"H1")).await;
3471 hi.send_reliable(Bytes::from_static(b"H2")).await;
3472 streams.insert(22, hi);
3473
3474 drain_streams_priority_ordered(&transport, &client_session, session_id, &streams).await;
3475
3476 // Pull all packets off the channel and verify their order:
3477 // the three H* chunks must come before any L* chunk.
3478 let mut order: Vec<&'static str> = Vec::new();
3479 while let Ok(frame) =
3480 tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await
3481 {
3482 let bytes = match frame {
3483 Some(b) => b,
3484 None => break,
3485 };
3486 let v2 = PhantomPacket::from_wire(&bytes).unwrap();
3487 // Decrypt under the SERVER role so the per-direction key
3488 // matches the client-side encrypt.
3489 let plaintext = _server_session
3490 .decrypt_packet(&v2.header, &v2.payload)
3491 .expect("decrypt");
3492 let tag: &'static str = match &plaintext[..] {
3493 b"H0" => "H0",
3494 b"H1" => "H1",
3495 b"H2" => "H2",
3496 b"L0" => "L0",
3497 b"L1" => "L1",
3498 b"L2" => "L2",
3499 other => panic!("unexpected payload {:?}", other),
3500 };
3501 order.push(tag);
3502 }
3503
3504 // All H* before any L*.
3505 let first_low = order
3506 .iter()
3507 .position(|s| s.starts_with('L'))
3508 .unwrap_or(order.len());
3509 let last_high = order.iter().rposition(|s| s.starts_with('H')).unwrap();
3510 assert!(
3511 last_high < first_low,
3512 "strict priority violated: order = {:?}",
3513 order
3514 );
3515 }
3516
3517 #[tokio::test]
3518 async fn v2_recv_echoes_path_validation_challenge_back_as_response() {
3519 // Two paired sessions on different IDs (so neither has a
3520 // pending challenge for the path). The "responder" sees a
3521 // PATH_VALIDATION packet on a new path id and must echo the
3522 // 32-byte payload back via the transport.
3523 let session_id = fixed_session_id();
3524 let (client_session, server_session) = paired_sessions(session_id);
3525
3526 // Build a PATH_VALIDATION packet with ENCRYPTED + path_id=7.
3527 let path_id: u8 = 7;
3528 let payload = [0xDEu8; crate::transport::path::PATH_CHALLENGE_LEN];
3529 let flag_bits = PacketFlags::ENCRYPTED | PacketFlags::PATH_VALIDATION;
3530 let header = PacketHeader::new(session_id, 0, 0, PacketFlags::new(flag_bits))
3531 .with_epoch(client_session.current_epoch())
3532 .with_path_id(path_id);
3533 let ciphertext = client_session
3534 .encrypt_packet(&header, &payload)
3535 .expect("encrypt challenge");
3536 let v2 = PhantomPacket::new(header, ciphertext);
3537
3538 let (demux, _ctrl_rx) = StreamDemultiplexer::new(16);
3539 let demux = Arc::new(demux);
3540 let streams: Arc<DashMap<u32, Arc<TransportStream>>> = Arc::new(DashMap::new());
3541 let (deliver_tx, _deliver_rx) = mpsc::unbounded_channel::<(u32, Bytes)>();
3542 let undelivered = AtomicU64::new(0);
3543 // Server's outbound transport — captures the echo back.
3544 let (echo_tx, mut echo_rx) = mpsc::channel::<Vec<u8>>(4);
3545 let (back_tx, back_rx) = mpsc::channel::<Vec<u8>>(4);
3546 let transport_send: Arc<ChannelTransport> = Arc::new(ChannelTransport {
3547 tx: echo_tx,
3548 rx: Mutex::new(back_rx),
3549 });
3550 let _back_tx_keepalive = back_tx; // keep the recv side alive
3551
3552 let mut ack_buf = Vec::with_capacity(256);
3553 let mut path_validation_seq: u32 = 100;
3554 let obs = Observability::new(ObservabilityConfig::default());
3555
3556 handle_packet(
3557 v2,
3558 session_id,
3559 &server_session,
3560 &streams,
3561 &demux,
3562 &transport_send,
3563 &transport_send,
3564 &deliver_tx,
3565 &undelivered,
3566 &mut ack_buf,
3567 &mut path_validation_seq,
3568 &obs,
3569 LegType::Tcp,
3570 )
3571 .await;
3572
3573 // Server should have emitted a PATH_VALIDATION response on the
3574 // outbound transport. Pull it out and verify it carries the
3575 // same payload back.
3576 let echo_bytes =
3577 tokio::time::timeout(std::time::Duration::from_millis(200), echo_rx.recv())
3578 .await
3579 .expect("echo should arrive")
3580 .expect("channel open");
3581
3582 // Decrypt the echo on the original (client) side — server-side
3583 // ciphertext authenticates the round-trip.
3584 let echo_v2 = PhantomPacket::from_wire(&echo_bytes).unwrap();
3585 assert!(echo_v2.header.flags.contains(PacketFlags::PATH_VALIDATION));
3586 assert_eq!(echo_v2.header.path_id, path_id);
3587
3588 // Sequence space advanced by exactly one (we sent one echo).
3589 assert_eq!(path_validation_seq, 101);
3590 }
3591
3592 // ────────────────────────────────────────────────────────────────────
3593 // 0-RTT early-data
3594 // ────────────────────────────────────────────────────────────────────
3595
3596 /// Full 0-RTT round-trip over `ChannelTransport`: a priming handshake
3597 /// populates the server cache and yields a resumption hint; a second
3598 /// connect via `connect_with_resumption` carries application early-data
3599 /// sealed inside the resuming ClientHello, which the server decrypts and
3600 /// surfaces. The client learns the verdict via `early_data_accepted()`.
3601 ///
3602 /// The server side runs inline (not a spawned task) so its
3603 /// `ChannelTransport` halves stay alive in scope — dropping them
3604 /// would close the client's data pump and flip the session to
3605 /// `Closed` before the assertions run.
3606 #[tokio::test]
3607 async fn zero_rtt_early_data_full_round_trip() {
3608 // One HandshakeServer shared across both phases so its session
3609 // cache persists between the priming handshake and the resume.
3610 let server_hs = HandshakeServer::new().unwrap();
3611 let server_pinned_key = server_hs.verifying_key().clone();
3612 let client_ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
3613
3614 // ── Step 1: prime — a normal handshake fills the cache ──
3615 let (c1, s1) = ChannelTransport::pair();
3616 let phase1_session =
3617 PhantomSession::connect_with_transport("test:9000", c1, server_pinned_key.clone());
3618
3619 let hello_bytes = s1.recv_bytes().await.unwrap();
3620 let ch = borsh::from_slice::<ClientHello>(&hello_bytes).unwrap();
3621 let retry = match server_hs.process_client_hello(&ch, 0, client_ip) {
3622 HandshakeResponse::Retry(r) => r,
3623 _ => panic!("expected Retry"),
3624 };
3625 s1.send_bytes(&borsh::to_vec(&retry).unwrap())
3626 .await
3627 .unwrap();
3628 let next = s1.recv_bytes().await.unwrap();
3629 let ch2 = borsh::from_slice::<ClientHello>(&next).unwrap();
3630 match server_hs.process_client_hello(&ch2, 0, client_ip) {
3631 HandshakeResponse::Success(sh, _session, _) => {
3632 s1.send_bytes(&borsh::to_vec(&sh).unwrap()).await.unwrap();
3633 }
3634 _ => panic!("expected Success"),
3635 }
3636
3637 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3638 assert_eq!(
3639 phase1_session.connection_state(),
3640 ConnectionState::Connected
3641 );
3642 let hint = phase1_session
3643 .resumption_hint()
3644 .await
3645 .expect("phase 1 produced a resumption hint");
3646 // The Rust-only `connect_with_resumption` takes the raw tuple;
3647 // `resumption_hint()` now yields the UniFFI `ResumptionHint`
3648 // record, so rebuild the tuple from its 32-byte fields.
3649 let hint = (
3650 <[u8; 32]>::try_from(hint.session_id.as_slice()).expect("session_id is 32 bytes"),
3651 <[u8; 32]>::try_from(hint.resumption_secret.as_slice())
3652 .expect("resumption_secret is 32 bytes"),
3653 );
3654
3655 // ── Step 2: resume — the ClientHello carries sealed early-data ──
3656 let early_payload = b"zero-rtt application bytes".to_vec();
3657 let (c2, s2) = ChannelTransport::pair();
3658 let phase2_session = PhantomSession::connect_with_resumption(
3659 "test:9000",
3660 c2,
3661 server_pinned_key.clone(),
3662 hint,
3663 early_payload.clone(),
3664 )
3665 .expect("early_data is within the size cap");
3666
3667 let hello_bytes = s2.recv_bytes().await.unwrap();
3668 let ch3 = borsh::from_slice::<ClientHello>(&hello_bytes).unwrap();
3669 assert!(
3670 ch3.early_data.is_some(),
3671 "phase 2 hello carries sealed 0-RTT early-data"
3672 );
3673 match server_hs.process_client_hello(&ch3, 0, client_ip) {
3674 HandshakeResponse::Success(sh, _session, early_data) => {
3675 // The server decrypted exactly what the client sealed.
3676 assert_eq!(early_data.as_deref(), Some(&early_payload[..]));
3677 assert!(sh.early_data_accepted);
3678 s2.send_bytes(&borsh::to_vec(&sh).unwrap()).await.unwrap();
3679 }
3680 _ => {
3681 panic!("expected Success with accepted early-data — the resumption ticket is fresh")
3682 }
3683 }
3684
3685 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
3686 assert_eq!(
3687 phase2_session.connection_state(),
3688 ConnectionState::Connected
3689 );
3690 assert_eq!(
3691 phase2_session.early_data_accepted().await,
3692 Some(true),
3693 "client must see the server accepted its 0-RTT early-data"
3694 );
3695
3696 // Keep the server transports alive until every assertion has
3697 // run — see the doc comment above.
3698 drop((s1, s2));
3699 }
3700
3701 /// `connect_pinned_with_resumption` validates the `ResumptionHint`
3702 /// field lengths *before* opening any socket — a hint whose
3703 /// `session_id` or `resumption_secret` is not exactly 32 bytes is a
3704 /// caller bug and surfaces as `ValidationError`, never a network
3705 /// round-trip.
3706 #[tokio::test]
3707 async fn connect_pinned_with_resumption_rejects_malformed_hint() {
3708 let server_hs = HandshakeServer::new().unwrap();
3709 let pinned = server_hs.verifying_key().to_bytes();
3710
3711 let bad_hint = ResumptionHint {
3712 session_id: vec![0u8; 5], // not 32 bytes
3713 resumption_secret: vec![0u8; 32],
3714 };
3715
3716 let err = connect_pinned_with_resumption(
3717 "127.0.0.1".to_string(),
3718 9,
3719 pinned,
3720 bad_hint,
3721 Vec::new(),
3722 )
3723 .await
3724 .expect_err("a 5-byte session_id must be rejected");
3725
3726 assert!(
3727 matches!(err, CoreError::ValidationError(_)),
3728 "expected ValidationError, got {err:?}"
3729 );
3730 }
3731}