Skip to main content

phantom_protocol/transport/
session.rs

1//! Phantom Transport - Session Management
2//!
3//! Virtual association that persists across IP changes.
4//! Manages streams, encryption state, and multi-path scheduling.
5
6use crate::crypto::adaptive_crypto::{CryptoSession, AEAD_MAX_INVOCATIONS};
7use crate::errors::CoreError;
8use crate::security::ReplayWindow;
9use crate::transport::{
10    bandwidth_estimator::{BandwidthEstimator, DeliverySample},
11    fallback::FallbackStateMachine,
12    pacer::Pacer,
13    path::{PathRegistry, PathStateKind, PATH_CHALLENGE_LEN},
14    scheduler::Scheduler,
15    stream::Stream,
16    types::{
17        ControlMessage, PacketFlags, PacketHeader, PhantomPacket, SchedulerMode, SessionId,
18        StreamId,
19    },
20};
21
22use arc_swap::ArcSwap;
23use dashmap::DashMap;
24use parking_lot::{Mutex, RwLock};
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use zeroize::{Zeroize, ZeroizeOnDrop};
30
31/// Session state machine
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum SessionState {
34    /// Initial state, handshake in progress
35    Handshaking,
36    /// Fully established, data can flow
37    Connected,
38    /// Migrating to new IP address
39    Migrating,
40    /// Graceful shutdown in progress
41    Closing,
42    /// Session is closed
43    Closed,
44}
45
46/// Soft high-watermark for automatic mid-session rekey (C1). Once a direction's
47/// AEAD invocation count crosses this, the data pump rotates to a fresh key
48/// *before* the hard [`AEAD_MAX_INVOCATIONS`] ceiling (Invariant 8) so a
49/// long-lived session ratchets keys instead of failing with `NonceExhausted`.
50///
51/// Default is half the ceiling — `2^47` invocations — which leaves a `2^47`
52/// headroom for in-flight packets from the old epoch. It is far larger than any
53/// realistic session lifetime, so production sessions essentially never hit it;
54/// it is a correctness backstop. Tests lower it via
55/// [`Session::set_rekey_threshold`] to exercise the path.
56pub const REKEY_SOFT_LIMIT: u64 = AEAD_MAX_INVOCATIONS / 2;
57
58/// How many epochs the receive path will catch up in one packet when accepting
59/// an authenticated forward rekey (C1). A small bound caps the HKDF work an
60/// attacker can force per spoofed packet (each step is a trial that commits
61/// nothing unless AEAD verifies) while comfortably absorbing the small epoch
62/// divergence that arises when both directions rekey at slightly different
63/// cadences. A gap larger than this is rejected; over a reliable transport the
64/// sender retransmits at the then-current epoch, so no data is lost. In
65/// practice (production `REKEY_SOFT_LIMIT` of `2^47`) the gap is essentially
66/// always 0 or 1.
67pub const MAX_REKEY_CATCHUP: u8 = 16;
68
69/// Per-stream sequence-space high-watermark that forces a mid-session rekey
70/// (C1). The AEAD nonce is `(epoch, stream_id, sequence, path_id)`; `sequence`
71/// is a per-stream `u32` that wraps at `2^32`. A single hot stream would wrap —
72/// reusing a nonce under a fixed key (the Forbidden Attack on AES-GCM) — long
73/// before the *direction-wide* [`REKEY_SOFT_LIMIT`] (`2^47`) could fire. So once
74/// *any* stream's sequence advances this far within the current epoch, the send
75/// path forces a rekey: the epoch bump gives every subsequent packet a fresh
76/// nonce prefix, and no stream can traverse the full `2^32` sequence space
77/// within a single epoch. `2^31` leaves a full `2^31` of headroom below the wrap
78/// to absorb reordered / in-flight packets from the old epoch. Tests lower it
79/// via [`Session::set_seq_rekey_watermark`].
80pub const SEQ_REKEY_WATERMARK: u32 = 1 << 31;
81
82/// Crypto state for session encryption.
83///
84/// On drop, `session_key` is zeroed. The wrapped [`CryptoSession`] holds AEAD
85/// keys in ring's opaque `LessSafeKey` (which cannot be zeroed directly — we
86/// rely on the OS reclaiming memory and on the `Arc<CryptoSessionInner>` going
87/// out of scope alongside this struct).
88#[derive(ZeroizeOnDrop)]
89pub struct CryptoState {
90    /// Bidirectional crypto session
91    #[zeroize(skip)]
92    pub session: CryptoSession,
93    /// Shared session key (for additional derivations)
94    pub session_key: [u8; 32],
95}
96
97impl CryptoState {
98    /// Create new crypto state from shared secret
99    pub fn new(shared_secret: &[u8; 32], peer_side: bool) -> Result<Self, CoreError> {
100        let session = if peer_side {
101            CryptoSession::from_shared_secret_peer(shared_secret)?
102        } else {
103            CryptoSession::from_shared_secret(shared_secret)?
104        };
105
106        // Derive additional session keys using HKDF
107        let hk = hkdf::Hkdf::<sha2::Sha256>::from_prk(shared_secret)
108            .map_err(|_| CoreError::CryptoError("HKDF PRK failed".into()))?;
109
110        let mut key_bytes = [0u8; 32];
111        hk.expand(b"phantom-transport-key", &mut key_bytes)
112            .map_err(|_| CoreError::KeyDerivationError)?;
113
114        Ok(Self {
115            session,
116            session_key: key_bytes,
117        })
118    }
119
120    /// Encrypt with a caller-supplied 12-byte nonce. Used by
121    /// `Session::encrypt_packet`, which constructs the nonce from the
122    /// authenticated `(epoch, stream_id, sequence, path_id)` of the packet
123    /// header — so the receiver survives failed decrypts without desyncing.
124    pub fn encrypt_with_nonce(
125        &self,
126        nonce: [u8; 12],
127        aad: &[u8],
128        plaintext: &[u8],
129    ) -> Result<Vec<u8>, CoreError> {
130        self.session
131            .encrypt_with_nonce(nonce, aad, plaintext)
132            .map_err(|e| CoreError::CryptoError(e.to_string()))
133    }
134
135    /// V2-path decrypt: caller supplies the 12-byte nonce explicitly.
136    pub fn decrypt_with_nonce(
137        &self,
138        nonce: [u8; 12],
139        aad: &[u8],
140        ciphertext: &[u8],
141    ) -> Result<Vec<u8>, CoreError> {
142        self.session
143            .decrypt_with_nonce(nonce, aad, ciphertext)
144            .map_err(|e| CoreError::CryptoError(e.to_string()))
145    }
146
147    /// Borrow the 4-byte nonce prefix derived at session establishment.
148    pub fn nonce_prefix(&self) -> [u8; 4] {
149        self.session.nonce_prefix()
150    }
151
152    /// Per-direction send-side AEAD invocation count for this epoch. Resets to
153    /// 0 on rekey (a fresh `CryptoState` is installed). Drives the C1
154    /// automatic-rekey trigger.
155    pub fn send_invocations(&self) -> u64 {
156        self.session.send_invocations()
157    }
158}
159
160/// Session - virtual association between two endpoints
161pub struct Session {
162    /// Unique session identifier (256-bit)
163    id: SessionId,
164    /// Current state
165    state: RwLock<SessionState>,
166    /// Active `CryptoState` — wrapped in `ArcSwap` so `rekey()` can swap it
167    /// in lock-free (Phase 1.5 + Phase 2.7).
168    ///
169    /// Encrypt/decrypt callsites do `self.crypto.load()` which is an atomic
170    /// pointer load + deref to the inner `CryptoState`. No lock acquisition
171    /// per packet. `rekey()` is a single `store()` of a freshly-derived
172    /// `Arc<CryptoState>`.
173    crypto: ArcSwap<CryptoState>,
174    /// Per-direction traffic secret. Initial value is the hybrid handshake's
175    /// shared secret; each `rekey()` derives the next via
176    /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` (Phase 1.5).
177    traffic_secret: RwLock<[u8; 32]>,
178    /// Rekey generation counter. Starts at 0 at session establishment; each
179    /// successful `rekey()` increments it. Wire-emitted in
180    /// `PacketHeader.epoch` so the peer can match the right key.
181    epoch: AtomicU8,
182    /// Send-side AEAD-invocation high-watermark that triggers an automatic
183    /// mid-session rekey (C1). Defaults to [`REKEY_SOFT_LIMIT`]; tests/embedders
184    /// lower it via [`set_rekey_threshold`](Self::set_rekey_threshold).
185    rekey_after: AtomicU64,
186    /// Per-stream sequence high-watermark that forces a rekey for AEAD nonce
187    /// uniqueness (C1). Defaults to [`SEQ_REKEY_WATERMARK`] (`2^31`); tests lower
188    /// it via [`set_seq_rekey_watermark`](Self::set_seq_rekey_watermark).
189    seq_rekey_watermark: AtomicU32,
190    /// Per-stream `(epoch, base_sequence)` checkpoint bounding how far a stream's
191    /// sequence may advance within one epoch (C1). `base_sequence` is the stream's
192    /// sequence when it entered the current epoch; the send path forces a rekey
193    /// once `sequence - base_sequence` crosses
194    /// [`seq_rekey_watermark`](Self::seq_rekey_watermark). Rebased lazily, per
195    /// stream, on the first send after an epoch change.
196    seq_epoch_base: DashMap<StreamId, (u8, u32)>,
197    /// Serialises every epoch transition (C1). The data pump runs the send loop
198    /// and the receive task concurrently over one `Arc<Session>`, so a send-side
199    /// `rekey()` can race a receive-side ratchet. Both hold this mutex across
200    /// their derive→install→epoch-bump so the installed key depth and the epoch
201    /// counter never diverge (the bug would otherwise wedge the session).
202    rekey_lock: Mutex<()>,
203    /// Which side of the handshake we are. Carried into every
204    /// `CryptoState::new(...)` re-derivation so the per-direction keys are
205    /// laid out the same way they were at session establishment.
206    is_server: bool,
207    /// Active streams
208    streams: RwLock<HashMap<StreamId, Arc<Stream>>>,
209    /// Next stream ID counter
210    next_stream_id: AtomicU32,
211    /// Control sequence number
212    control_sequence: AtomicU32,
213    /// Multi-path scheduler
214    scheduler: Arc<Scheduler>,
215    /// Resumption secret for 0-RTT
216    resumption_secret: RwLock<Option<[u8; 32]>>,
217    /// Last activity timestamp
218    last_activity: RwLock<Instant>,
219    /// Fallback state machine
220    #[allow(dead_code)]
221    fallback: Arc<FallbackStateMachine>,
222    /// Per-stream sliding-window replay protection. Lazily populated as
223    /// streams appear on the wire. Sits alongside (not in place of) the AEAD
224    /// strict-counter replay protection — see `decrypt_packet`.
225    replay_windows: DashMap<StreamId, Mutex<ReplayWindow>>,
226    /// Cumulative count of replay rejections (across all streams) — exposed
227    /// for metrics/telemetry.
228    replay_rejected_total: AtomicU64,
229    /// Per-path validation state (Phase 4.2). Each `path_id` referenced in a
230    /// `PacketHeader.path_id` must transit through `Unvalidated →
231    /// Validating → Validated` (via a challenge-response round trip)
232    /// before the data pump treats it as authoritative. Defaults to a
233    /// pre-populated entry for `path_id = 0` (the implicit single-path)
234    /// in the `Validated` state so legacy single-leg sessions keep
235    /// working without any explicit setup.
236    path_registry: Arc<PathRegistry>,
237    /// Outbound rate-limiter (Phase 2.6). Defaults to
238    /// [`Pacer::unlimited`] so the historical no-pacing behavior is
239    /// unchanged unless the caller explicitly sets a rate via
240    /// [`Session::pacer`]. The data pump consults this before every
241    /// outbound packet — the existing implementation just calls
242    /// `try_consume` and falls through if the pacer is disabled, so the
243    /// integration is zero-overhead in the default configuration.
244    pacer: Arc<Pacer>,
245    /// BBR-style bandwidth + RTT estimator (Phase 2.6 / Phase 4.4
246    /// foundation). The data pump feeds it via [`Session::on_packet_sent`]
247    /// and [`Session::on_packet_acked`]; the resulting `pacing_rate()`
248    /// feeds back into the `pacer` to close the loop.
249    bandwidth_estimator: parking_lot::Mutex<BandwidthEstimator>,
250    /// Outbound-ready signal (Phase 2.4). Streams or the application
251    /// can `notify_one()` this to wake the data pump immediately
252    /// instead of waiting for the next 10 ms `poll_interval` tick.
253    /// The pump keeps the tick as a retransmit-timer fallback.
254    send_notify: Arc<tokio::sync::Notify>,
255}
256
257impl Session {
258    /// Create a new session with given shared secret
259    pub fn new(
260        session_id: SessionId,
261        shared_secret: &[u8; 32],
262        peer_side: bool,
263    ) -> Result<Self, CoreError> {
264        let crypto = CryptoState::new(shared_secret, peer_side)?;
265        let path_registry = Arc::new(PathRegistry::new());
266        // Pre-register `path_id = 0` as the implicit default path — the
267        // handshake itself proved reachability over this path, so no
268        // additional PATH_CHALLENGE is needed (Phase 4.2).
269        path_registry.register_validated(0);
270
271        Ok(Self {
272            id: session_id,
273            state: RwLock::new(SessionState::Handshaking),
274            crypto: ArcSwap::new(Arc::new(crypto)),
275            traffic_secret: RwLock::new(*shared_secret),
276            epoch: AtomicU8::new(0),
277            rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
278            seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
279            seq_epoch_base: DashMap::new(),
280            rekey_lock: Mutex::new(()),
281            is_server: peer_side,
282            streams: RwLock::new(HashMap::new()),
283            next_stream_id: AtomicU32::new(1),
284            control_sequence: AtomicU32::new(0),
285            scheduler: Arc::new(Scheduler::new(SchedulerMode::LowLatency)),
286            resumption_secret: RwLock::new(None),
287            last_activity: RwLock::new(Instant::now()),
288            fallback: Arc::new(FallbackStateMachine::with_defaults()),
289            replay_windows: DashMap::new(),
290            replay_rejected_total: AtomicU64::new(0),
291            path_registry,
292            pacer: Arc::new(Pacer::unlimited()),
293            bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
294            send_notify: Arc::new(tokio::sync::Notify::new()),
295        })
296    }
297
298    /// Create session from a pre-derived crypto state (e.g., after handshake).
299    ///
300    /// `traffic_secret` is the master from which the supplied `crypto` was
301    /// derived — it seeds the [`rekey`](Self::rekey) HKDF chain. `is_server`
302    /// records which side of the handshake we are; rekey re-derives keys
303    /// with the same side so per-direction layout is preserved.
304    pub fn from_derived(
305        session_id: SessionId,
306        crypto: CryptoState,
307        scheduler_mode: SchedulerMode,
308        traffic_secret: [u8; 32],
309        is_server: bool,
310    ) -> Self {
311        let path_registry = Arc::new(PathRegistry::new());
312        path_registry.register_validated(0);
313        Self {
314            id: session_id,
315            state: RwLock::new(SessionState::Connected),
316            crypto: ArcSwap::new(Arc::new(crypto)),
317            traffic_secret: RwLock::new(traffic_secret),
318            epoch: AtomicU8::new(0),
319            rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
320            seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
321            seq_epoch_base: DashMap::new(),
322            rekey_lock: Mutex::new(()),
323            is_server,
324            streams: RwLock::new(HashMap::new()),
325            next_stream_id: AtomicU32::new(1),
326            control_sequence: AtomicU32::new(0),
327            scheduler: Arc::new(Scheduler::new(scheduler_mode)),
328            resumption_secret: RwLock::new(None),
329            last_activity: RwLock::new(Instant::now()),
330            fallback: Arc::new(FallbackStateMachine::with_defaults()),
331            replay_windows: DashMap::new(),
332            replay_rejected_total: AtomicU64::new(0),
333            path_registry,
334            pacer: Arc::new(Pacer::unlimited()),
335            bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
336            send_notify: Arc::new(tokio::sync::Notify::new()),
337        }
338    }
339
340    /// Resume a session using resumption secret (0-RTT)
341    pub fn resume(
342        session_id: SessionId,
343        resumption_secret: &[u8; 32],
344        peer_side: bool,
345    ) -> Result<Self, CoreError> {
346        let crypto = CryptoState::new(resumption_secret, peer_side)?;
347        let path_registry = Arc::new(PathRegistry::new());
348        path_registry.register_validated(0);
349
350        Ok(Self {
351            id: session_id,
352            state: RwLock::new(SessionState::Connected),
353            crypto: ArcSwap::new(Arc::new(crypto)),
354            traffic_secret: RwLock::new(*resumption_secret),
355            epoch: AtomicU8::new(0),
356            rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
357            seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
358            seq_epoch_base: DashMap::new(),
359            rekey_lock: Mutex::new(()),
360            is_server: peer_side,
361            streams: RwLock::new(HashMap::new()),
362            next_stream_id: AtomicU32::new(1),
363            control_sequence: AtomicU32::new(0),
364            scheduler: Arc::new(Scheduler::new(SchedulerMode::LowLatency)),
365            resumption_secret: RwLock::new(Some(*resumption_secret)),
366            last_activity: RwLock::new(Instant::now()),
367            fallback: Arc::new(FallbackStateMachine::with_defaults()),
368            replay_windows: DashMap::new(),
369            replay_rejected_total: AtomicU64::new(0),
370            path_registry,
371            pacer: Arc::new(Pacer::unlimited()),
372            bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
373            send_notify: Arc::new(tokio::sync::Notify::new()),
374        })
375    }
376
377    /// Get session ID
378    pub fn id(&self) -> &SessionId {
379        &self.id
380    }
381
382    /// Get current state
383    pub fn state(&self) -> SessionState {
384        *self.state.read()
385    }
386
387    /// Transition to a new state
388    pub fn set_state(&self, new_state: SessionState) {
389        *self.state.write() = new_state;
390    }
391
392    /// Open a new stream
393    pub fn open_stream(&self) -> Arc<Stream> {
394        let stream_id = self.next_stream_id.fetch_add(1, Ordering::SeqCst) as StreamId;
395        let stream = Arc::new(Stream::new(stream_id));
396
397        self.streams.write().insert(stream_id, stream.clone());
398        stream
399    }
400
401    /// Get an existing stream
402    pub fn get_stream(&self, stream_id: StreamId) -> Option<Arc<Stream>> {
403        self.streams.read().get(&stream_id).cloned()
404    }
405
406    /// Close a stream
407    pub fn close_stream(&self, stream_id: StreamId) -> bool {
408        self.streams.write().remove(&stream_id).is_some()
409    }
410
411    /// Get number of active streams
412    pub fn stream_count(&self) -> u32 {
413        self.streams.read().len() as u32
414    }
415
416    /// Total number of replayed packets rejected by the sliding-window check
417    /// across all streams in this session. Intended for the
418    /// `replay_rejected_total` metric.
419    pub fn replay_rejected_total(&self) -> u64 {
420        self.replay_rejected_total.load(Ordering::Relaxed)
421    }
422
423    /// Current rekey generation (Phase 1.5). Starts at 0; each successful
424    /// [`rekey`](Self::rekey) increments by one. Carried on the wire in
425    /// `PacketHeader.epoch` so the peer can match the right derived key.
426    pub fn current_epoch(&self) -> u8 {
427        self.epoch.load(Ordering::Relaxed)
428    }
429
430    /// Whether this session is acting as the server side. Determined at
431    /// construction; required for re-deriving per-direction keys on rekey.
432    pub fn is_server(&self) -> bool {
433        self.is_server
434    }
435
436    /// Mid-session key rotation (Phase 1.5).
437    ///
438    /// Derives the next traffic secret from the current one via
439    /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` and builds a fresh
440    /// [`CryptoState`] under that secret. The new state is installed via
441    /// an atomic `ArcSwap::store`, so concurrent encrypt/decrypt calls
442    /// observe either the old or the new state — never a partially-written
443    /// in-between. The previous traffic secret is explicitly zeroed before
444    /// being overwritten.
445    ///
446    /// Returns the new epoch (1, 2, 3, ...). Wraps an error if the epoch
447    /// counter has saturated `u8::MAX` (after 255 successful rekeys —
448    /// equivalent to ~5 days at the default 30-minute cadence; long-lived
449    /// sessions are expected to reconnect rather than wrap).
450    ///
451    /// Wire signalling: callers that want the peer to follow this rekey
452    /// emit a V2 packet whose header carries the new epoch (and optionally
453    /// the `PacketFlags::REKEY` flag). Receivers respond by calling
454    /// `rekey()` themselves once they see the bump — keeping both ends in
455    /// lockstep.
456    #[tracing::instrument(name = "phantom.session.rekey", skip_all)]
457    pub fn rekey(&self) -> Result<u8, CoreError> {
458        // Serialise the whole transition (C1): the send loop and the receive
459        // task share this `Session`, so derive+install+epoch-bump must be atomic
460        // w.r.t. a concurrent receive-side ratchet, or the installed key depth
461        // and the epoch counter diverge and wedge the session.
462        let _rekey = self.rekey_lock.lock();
463        let current_epoch = self.epoch.load(Ordering::Relaxed);
464        if current_epoch == u8::MAX {
465            return Err(CoreError::CryptoError(
466                "session epoch saturated (u8::MAX); reconnect required".into(),
467            ));
468        }
469        let (next_secret, new_crypto) = self.derive_forward_crypto(1)?;
470        self.commit_forward_crypto(1, next_secret, new_crypto);
471        Ok(current_epoch + 1)
472    }
473
474    /// Derive the next epoch's traffic secret + [`CryptoState`] from the current
475    /// secret WITHOUT installing them. The HKDF chain step is
476    /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` (Invariant 5 — the label is
477    /// load-bearing; it must match the committing path in `rekey`). Returns the
478    /// derived secret and a fresh per-direction AEAD state under it.
479    ///
480    /// This is the non-committing half used by the receive path to verify a
481    /// claimed-next-epoch packet (trial decrypt) before trusting the epoch bump,
482    /// so a forged, unauthenticated `header.epoch` cannot desync the session.
483    ///
484    /// `steps` ≥ 1 applies the chain that many times (the receive path may need
485    /// to catch up several epochs when both directions rekey at slightly
486    /// different cadences). Intermediate secrets are zeroed as the walk
487    /// proceeds; only the final-epoch secret is returned for the caller to
488    /// commit.
489    fn derive_forward_crypto(&self, steps: u8) -> Result<([u8; 32], CryptoState), CoreError> {
490        use zeroize::Zeroizing;
491        debug_assert!(steps >= 1, "derive_forward_crypto needs at least one step");
492        // `Zeroizing` so every intermediate secret — and the working copy of the
493        // current secret — is wiped on *every* exit path, including the early
494        // `?` returns (an attacker can force this derivation merely by setting
495        // `header.epoch`, so the candidate is genuinely sensitive).
496        let mut secret: Zeroizing<[u8; 32]> = Zeroizing::new(*self.traffic_secret.read());
497        for _ in 0..steps {
498            let mut next: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
499            let hk = hkdf::Hkdf::<sha2::Sha256>::from_prk(&*secret)
500                .map_err(|_| CoreError::KeyDerivationError)?;
501            // Invariant 5 — the `phantom-rekey-v1` label is load-bearing and must
502            // match the committing path in `rekey`.
503            hk.expand(b"phantom-rekey-v1", &mut *next)
504                .map_err(|_| CoreError::KeyDerivationError)?;
505            secret = next; // previous-step secret drops → zeroed
506        }
507        let new_crypto = CryptoState::new(&secret, self.is_server)?;
508        // Copy the bytes out for the caller; the `Zeroizing` working copy is
509        // wiped when it drops here. The caller is responsible for the returned
510        // secret (committed into `traffic_secret`, or zeroed on a failed trial).
511        Ok((*secret, new_crypto))
512    }
513
514    /// Install a [`derive_forward_crypto`](Self::derive_forward_crypto)d epoch:
515    /// swap in the new `CryptoState` via the lock-free `ArcSwap`, zero+replace
516    /// the traffic secret under the write lock, and saturatingly advance the
517    /// epoch by `steps` (Invariant 5 — epoch never wraps). The caller MUST have
518    /// authenticated the transition (a successful trial decrypt, or its own
519    /// send-side rekey) — this routine verifies nothing itself.
520    fn commit_forward_crypto(&self, steps: u8, final_secret: [u8; 32], new_crypto: CryptoState) {
521        let mut current = self.traffic_secret.write();
522        // Install the new AEAD state, then the new epoch (SeqCst) so the wire
523        // header the send path stamps matches the key it encrypts under.
524        self.crypto.store(Arc::new(new_crypto));
525        // Zero the old secret before overwriting it so the previous-epoch key
526        // material does not survive in memory.
527        current.zeroize();
528        *current = final_secret;
529        let cur = self.epoch.load(Ordering::Relaxed);
530        self.epoch
531            .store(cur.saturating_add(steps), Ordering::SeqCst);
532    }
533
534    /// Send-side AEAD invocation count for the current epoch (resets to 0 on
535    /// each rekey). Drives [`send_needs_rekey`](Self::send_needs_rekey).
536    pub fn send_invocations(&self) -> u64 {
537        self.crypto.load().send_invocations()
538    }
539
540    /// The send-invocation high-watermark at which the pump auto-rekeys.
541    pub fn rekey_threshold(&self) -> u64 {
542        self.rekey_after.load(Ordering::Relaxed)
543    }
544
545    /// Override the auto-rekey high-watermark (default [`REKEY_SOFT_LIMIT`]).
546    /// Clamped to `>= 1`. Rust-only — primarily for tests/soak harnesses that
547    /// need to exercise mid-session rekey without sending `2^47` packets.
548    pub fn set_rekey_threshold(&self, n: u64) {
549        self.rekey_after.store(n.max(1), Ordering::Relaxed);
550    }
551
552    /// Override the per-stream sequence rekey watermark (default
553    /// [`SEQ_REKEY_WATERMARK`], `2^31`). Clamped to `>= 1`. Rust-only — primarily
554    /// for tests/soak harnesses that need to exercise the per-stream forced rekey
555    /// (C1) without driving a single stream through `2^31` sequence numbers.
556    pub fn set_seq_rekey_watermark(&self, n: u32) {
557        self.seq_rekey_watermark.store(n.max(1), Ordering::Relaxed);
558    }
559
560    /// True once `stream_id`'s sequence has advanced past the per-stream
561    /// watermark within the current epoch (C1). The send path checks this before
562    /// stamping each packet and, when set, forces a [`rekey`](Self::rekey) so a
563    /// per-stream `u32` sequence can never wrap within one epoch — which would
564    /// otherwise repeat the AEAD nonce `(epoch, stream_id, sequence, path_id)`
565    /// under a fixed key (Invariant 8).
566    ///
567    /// The per-stream `(epoch, base)` checkpoint is rebased lazily on the first
568    /// call after an epoch change, so the measured span is always relative to
569    /// where the stream entered the *current* epoch.
570    pub fn stream_seq_needs_rekey(&self, stream_id: StreamId, seq: u32) -> bool {
571        let epoch = self.current_epoch();
572        let mut entry = self.seq_epoch_base.entry(stream_id).or_insert((epoch, seq));
573        let (base_epoch, base_seq) = *entry;
574        if base_epoch != epoch {
575            // First send on this stream since a rekey — rebase to the current
576            // sequence and measure this epoch's span from here.
577            *entry = (epoch, seq);
578            return false;
579        }
580        seq.wrapping_sub(base_seq) >= self.seq_rekey_watermark.load(Ordering::Relaxed)
581    }
582
583    /// True once the send direction has crossed the rekey high-watermark and the
584    /// epoch has room to advance. The data pump checks this before each
585    /// application send and, when set, rekeys + flags the packet `REKEY` so the
586    /// peer follows via the authenticated epoch bump.
587    pub fn send_needs_rekey(&self) -> bool {
588        self.current_epoch() < u8::MAX
589            && self.send_invocations() >= self.rekey_after.load(Ordering::Relaxed)
590    }
591
592    /// Decrypt a packet, transparently following an **authenticated** forward
593    /// rekey of up to [`MAX_REKEY_CATCHUP`] epochs (C1).
594    ///
595    /// - `header.epoch == current`: ordinary [`decrypt_packet`](Self::decrypt_packet).
596    /// - `current < header.epoch <= current + MAX_REKEY_CATCHUP`: derive the
597    ///   candidate key that many epochs ahead and *trial*-decrypt. Only on AEAD
598    ///   success — i.e. once the epoch bump is proven authentic — is the rekey
599    ///   committed and the replay window consulted (Invariant 4 ordering
600    ///   preserved). A forged `header.epoch` fails the AEAD open, nothing is
601    ///   committed, and the session does not desync. The bound caps an attacker
602    ///   to at most `MAX_REKEY_CATCHUP` HKDF steps per spoofed packet.
603    /// - anything else (behind current, more than `MAX_REKEY_CATCHUP` ahead, or
604    ///   epoch saturated): rejected. Over a reliable transport the sender
605    ///   retransmits at the then-current epoch, so no data is lost.
606    pub fn decrypt_packet_accepting_rekey(
607        &self,
608        header: &PacketHeader,
609        ciphertext: &[u8],
610    ) -> Result<Vec<u8>, CoreError> {
611        // Fast paths that need no epoch transition (no lock).
612        let cur = self.current_epoch();
613        if header.epoch == cur {
614            return self.decrypt_packet(header, ciphertext);
615        }
616        if header.epoch < cur {
617            return Err(CoreError::CryptoError(format!(
618                "packet epoch {} is behind the current epoch {}",
619                header.epoch, cur
620            )));
621        }
622
623        // A forward ratchet mutates the epoch + key, so it must be serialised
624        // against a concurrent send-side `rekey()` (C1 — both tasks share this
625        // `Session`). Hold the rekey lock across the re-check, derive, trial
626        // decrypt, and commit so the installed key depth and the epoch stay in
627        // lockstep.
628        let _rekey = self.rekey_lock.lock();
629        // Re-read under the lock: a concurrent rekey may have already advanced us.
630        let cur = self.current_epoch();
631        if header.epoch == cur {
632            drop(_rekey);
633            return self.decrypt_packet(header, ciphertext);
634        }
635        if header.epoch < cur {
636            return Err(CoreError::CryptoError(format!(
637                "packet epoch {} is behind the current epoch {}",
638                header.epoch, cur
639            )));
640        }
641        let steps = header.epoch - cur; // > 0, both u8 → no underflow
642        if steps > MAX_REKEY_CATCHUP {
643            return Err(CoreError::CryptoError(format!(
644                "packet epoch {} is more than {} epochs ahead of current {}",
645                header.epoch, MAX_REKEY_CATCHUP, cur
646            )));
647        }
648
649        // Candidate key `steps` epochs ahead — derived but NOT installed.
650        let (mut final_secret, final_crypto) = self.derive_forward_crypto(steps)?;
651        let nonce = Self::build_packet_nonce(final_crypto.nonce_prefix(), header);
652        let header_bytes = header.to_wire();
653        // AEAD gate: a forged epoch bump fails here and we return without
654        // committing — the live epoch is untouched. Zero the (valid, sensitive)
655        // candidate secret we are not going to install.
656        let plaintext = match final_crypto.decrypt_with_nonce(nonce, &header_bytes, ciphertext) {
657            Ok(pt) => pt,
658            Err(e) => {
659                final_secret.zeroize();
660                return Err(e);
661            }
662        };
663
664        // Authentic forward rekey — commit (still under the rekey lock; `cur` was
665        // read under it, so `cur + steps == header.epoch` is the absolute, race-
666        // free target), then drop the lock and apply the replay window AFTER the
667        // AEAD open (Invariant 4 — the window is per-(stream,sequence) and
668        // epoch-independent, so it needs no rekey serialisation).
669        self.commit_forward_crypto(steps, final_secret, final_crypto);
670        drop(_rekey);
671        let window_entry = self
672            .replay_windows
673            .entry(header.stream_id)
674            .or_insert_with(|| Mutex::new(ReplayWindow::new()));
675        let accepted = window_entry.lock().accept(header.sequence);
676        if !accepted {
677            self.replay_rejected_total.fetch_add(1, Ordering::Relaxed);
678            return Err(CoreError::ReplayDetected(format!(
679                "stream {} sequence {} already seen or beyond window",
680                header.stream_id, header.sequence
681            )));
682        }
683        Ok(plaintext)
684    }
685
686    /// Advance to a specific target epoch by repeatedly applying the rekey
687    /// HKDF chain. Used by the receive path to "catch up" when it sees a
688    /// packet from a higher epoch than the locally known one. Refuses to go
689    /// backwards (a lower target than current returns Ok without changes).
690    pub fn ratchet_to_epoch(&self, target: u8) -> Result<(), CoreError> {
691        let mut current = self.epoch.load(Ordering::Relaxed);
692        while current < target {
693            self.rekey()?;
694            current = self.epoch.load(Ordering::Relaxed);
695        }
696        Ok(())
697    }
698
699    // ── Multi-path / migration (Phase 4.2) ────────────────────────────
700
701    /// Snapshot of currently `Validated` path ids. Useful for the
702    /// scheduler when picking an outbound path.
703    pub fn validated_paths(&self) -> Vec<u8> {
704        self.path_registry.validated_paths()
705    }
706
707    /// State of a specific path within this session. Returns `None` for
708    /// path ids the session has never observed.
709    pub fn path_state(&self, path_id: u8) -> Option<PathStateKind> {
710        self.path_registry.state(path_id)
711    }
712
713    /// Register a new path id and immediately issue a 32-byte
714    /// PATH_CHALLENGE for it. Returns the challenge bytes; the caller
715    /// must transmit them in a V2 packet with `PacketFlags::PATH_VALIDATION`
716    /// set on the new path. Subsequent calls on an already-Validating
717    /// path re-issue a fresh challenge.
718    ///
719    /// Returns `None` if the path is in a terminal state (`Validated`
720    /// or `Failed`).
721    #[tracing::instrument(name = "phantom.path.begin_validation", skip_all, fields(path_id = path_id))]
722    pub fn begin_path_validation(&self, path_id: u8) -> Option<[u8; PATH_CHALLENGE_LEN]> {
723        self.path_registry.register(path_id);
724        self.path_registry.issue_challenge(path_id)
725    }
726
727    /// Register `path_id` as `Unvalidated` if the session has never observed it
728    /// (PATH-001). Used by the receive-side gate to start tracking a fresh path
729    /// id seen on inbound (AEAD-authenticated) data, so a later
730    /// challenge/response can promote it to `Validated`. Idempotent — never
731    /// resets an already-known path (e.g. the pre-validated path 0).
732    pub(crate) fn register_unvalidated_path(&self, path_id: u8) {
733        if self.path_registry.state(path_id).is_none() {
734            self.path_registry.register(path_id);
735        }
736    }
737
738    /// Verify a peer's `PATH_VALIDATION` response. Returns `true` if
739    /// the response matches the in-flight challenge (path is now
740    /// `Validated`). Returns `false` otherwise — the path may have
741    /// transitioned to `Failed`.
742    #[tracing::instrument(name = "phantom.path.complete_validation", skip(response), fields(path_id = path_id))]
743    pub fn complete_path_validation(&self, path_id: u8, response: &[u8]) -> bool {
744        self.path_registry.verify_response(path_id, response)
745    }
746
747    /// Record that a packet was observed on the path. Cheap to call
748    /// per-packet — used by the data pump to keep `last_packet_seen`
749    /// fresh for the timeout sweep.
750    pub fn mark_path_seen(&self, path_id: u8) {
751        self.path_registry.mark_seen(path_id);
752    }
753
754    // ── Pacer / BandwidthEstimator (Phase 2.6) ─────────────────────────
755
756    /// Shared handle to this session's outbound rate-limiter. Cheap to
757    /// clone (`Arc`). The data pump consults this before every outbound
758    /// packet; idle by default ([`Pacer::unlimited`]).
759    pub fn pacer(&self) -> Arc<Pacer> {
760        self.pacer.clone()
761    }
762
763    /// Record that a packet of `bytes` length is going on the wire.
764    /// Feeds the BBR-style bandwidth estimator. Cheap (one mutex lock
765    /// + a counter increment).
766    pub fn on_packet_sent(&self, bytes: u64) {
767        self.bandwidth_estimator.lock().on_send(bytes);
768    }
769
770    /// Record that an ACK arrived with delivery sample `sample`. The
771    /// returned `u64` is the updated bottleneck bandwidth estimate; we
772    /// reflect it into the pacer so the outbound rate tracks the
773    /// peer's actual receive throughput.
774    pub fn on_packet_acked(&self, sample: DeliverySample) -> u64 {
775        let bw = self.bandwidth_estimator.lock().on_ack(sample);
776        // Mirror the estimator's pacing decision onto the pacer so the
777        // two stay in lock-step.
778        let rate = self.bandwidth_estimator.lock().pacing_rate();
779        if rate > 0 {
780            self.pacer.set_rate(rate);
781        }
782        bw
783    }
784
785    /// Record that a packet of `bytes` length was lost (no ACK before
786    /// retransmit timer fired). Drives BBR's loss-based feedback.
787    pub fn on_packet_lost(&self, bytes: u64) {
788        self.bandwidth_estimator.lock().on_loss(bytes);
789    }
790
791    /// Current BBR congestion-control state. Observability / test hook — lets
792    /// callers confirm a loss drove the estimator into `FastRecovery`.
793    pub fn bbr_state(&self) -> crate::transport::bandwidth_estimator::BbrState {
794        self.bandwidth_estimator.lock().state()
795    }
796
797    /// Read a snapshot of the bandwidth / pacing estimator. Cheap; held
798    /// over a single mutex lock.
799    pub fn bandwidth_snapshot(&self) -> BandwidthSnapshot {
800        let est = self.bandwidth_estimator.lock();
801        BandwidthSnapshot {
802            bottleneck_bw_bps: est.bottleneck_bandwidth(),
803            min_rtt: est.min_rtt(),
804            pacing_rate_bps: est.pacing_rate(),
805            cwnd_bytes: est.cwnd(),
806            inflight_bytes: est.inflight_bytes(),
807        }
808    }
809
810    // ── Event-driven send-loop wake-up (Phase 2.4) ─────────────────────
811
812    /// Shared handle to the outbound-ready notify. The API-layer data
813    /// pump awaits this via `Notify::notified()`; any task with the
814    /// handle can wake it instantly via [`Self::notify_outbound_ready`].
815    pub fn send_notifier(&self) -> Arc<tokio::sync::Notify> {
816        self.send_notify.clone()
817    }
818
819    /// Wake the data pump's send loop immediately so it can drain newly-
820    /// queued packets instead of waiting for the next 10 ms tick. Cheap
821    /// (a single `notify_one()`); duplicate calls collapse to one wake.
822    pub fn notify_outbound_ready(&self) {
823        self.send_notify.notify_one();
824    }
825
826    /// Build the AEAD nonce from the authenticated header fields.
827    ///
828    /// Layout (12 bytes total):
829    /// ```text
830    ///   [0..4]  : nonce_prefix (from CryptoState; identical for the lifetime
831    ///             of a session, freshly derived per rekey)
832    ///   [4]     : epoch
833    ///   [5..7]  : stream_id (big-endian)
834    ///   [7..11] : sequence  (big-endian)
835    ///   [11]    : path_id
836    /// ```
837    ///
838    /// Uniqueness argument: senders never reuse `(stream_id, sequence)`
839    /// within a single epoch. The path_id distinguishes the same logical
840    /// packet replayed across paths (Phase 4.2 multi-path). Together the
841    /// 12-byte nonce is unique for every `seal_in_place_*` invocation
842    /// under the given key.
843    fn build_packet_nonce(prefix: [u8; 4], header: &PacketHeader) -> [u8; 12] {
844        let mut n = [0u8; 12];
845        n[..4].copy_from_slice(&prefix);
846        n[4] = header.epoch;
847        n[5..7].copy_from_slice(&header.stream_id.to_be_bytes());
848        n[7..11].copy_from_slice(&header.sequence.to_be_bytes());
849        n[11] = header.path_id;
850        n
851    }
852
853    /// Encrypt a packet payload.
854    ///
855    /// The AEAD nonce is derived from the authenticated `(epoch, stream_id,
856    /// sequence, path_id)` fields of the packet header rather than from an
857    /// internal monotonic counter, so a failed peer decrypt never desyncs the
858    /// receiver. The AAD is the 45-byte wire image of the header
859    /// ([`PacketHeader::to_wire`]), so any wire-level mutation invalidates the tag.
860    pub fn encrypt_packet(
861        &self,
862        header: &PacketHeader,
863        plaintext: &[u8],
864    ) -> Result<Vec<u8>, CoreError> {
865        let crypto = self.crypto.load();
866        let nonce = Self::build_packet_nonce(crypto.nonce_prefix(), header);
867        let header_bytes = header.to_wire();
868        crypto.encrypt_with_nonce(nonce, &header_bytes, plaintext)
869    }
870
871    /// Decrypt a packet payload. Performs AEAD verify + per-stream
872    /// sliding-window replay rejection (the window check runs **after** a
873    /// successful AEAD open — Invariant 4 — so we never key off
874    /// un-authenticated sequence numbers).
875    ///
876    /// A failed decrypt does NOT desync future decrypts: the AEAD nonce is
877    /// derived from this packet's authenticated header fields, so the receiver
878    /// stays in lock-step with the sender regardless of intervening bad
879    /// packets.
880    pub fn decrypt_packet(
881        &self,
882        header: &PacketHeader,
883        ciphertext: &[u8],
884    ) -> Result<Vec<u8>, CoreError> {
885        let crypto = self.crypto.load();
886        let nonce = Self::build_packet_nonce(crypto.nonce_prefix(), header);
887        let header_bytes = header.to_wire();
888        let plaintext = crypto.decrypt_with_nonce(nonce, &header_bytes, ciphertext)?;
889
890        // Sliding-window guard. ReplayWindow keys on `(stream_id, sequence)`
891        // only — the `epoch` / `path_id` fields do NOT contribute to the
892        // replay identity because replay is a property of "is this sequence
893        // a duplicate", independent of which path it arrived over or which
894        // rekey generation produced it.
895        let window_entry = self
896            .replay_windows
897            .entry(header.stream_id)
898            .or_insert_with(|| Mutex::new(ReplayWindow::new()));
899        let accepted = window_entry.lock().accept(header.sequence);
900        if !accepted {
901            self.replay_rejected_total.fetch_add(1, Ordering::Relaxed);
902            return Err(CoreError::ReplayDetected(format!(
903                "stream {} sequence {} already seen or beyond window",
904                header.stream_id, header.sequence
905            )));
906        }
907        Ok(plaintext)
908    }
909
910    /// Create a control packet
911    pub fn create_control_packet(
912        &self,
913        _message: ControlMessage,
914        payload: Vec<u8>,
915    ) -> PhantomPacket {
916        let seq = self.control_sequence.fetch_add(1, Ordering::SeqCst);
917        let header = PacketHeader::new(
918            self.id,
919            0, // control stream
920            seq,
921            PacketFlags::new(PacketFlags::CONTROL | PacketFlags::RELIABLE),
922        );
923        // Note: Real implementation would also encrypt control packet
924        PhantomPacket::new(header, payload)
925    }
926
927    /// Get the scheduler
928    pub fn scheduler(&self) -> &Arc<Scheduler> {
929        &self.scheduler
930    }
931
932    /// Set resumption secret for 0-RTT.
933    ///
934    /// If a secret was already set, the previous bytes are explicitly zeroed
935    /// before being replaced — defense in depth in case `set_resumption_secret`
936    /// is called multiple times within a session.
937    pub fn set_resumption_secret(&self, secret: [u8; 32]) {
938        let mut guard = self.resumption_secret.write();
939        if let Some(mut old) = guard.take() {
940            old.zeroize();
941        }
942        *guard = Some(secret);
943    }
944
945    /// The resumption secret for 0-RTT, if one has been installed. Rust-only —
946    /// the FFI surface exposes this via `PhantomSession::resumption_hint()`.
947    pub fn resumption_secret(&self) -> Option<[u8; 32]> {
948        *self.resumption_secret.read()
949    }
950
951    /// Check if session can be resumed (has resumption secret)
952    pub fn can_resume(&self) -> bool {
953        self.resumption_secret.read().is_some()
954    }
955
956    /// Extract the resumption hint needed to attempt 0-RTT resume on
957    /// a future connect (Phase 4.1). Returns `Some((session_id_bytes,
958    /// resumption_secret))` only after a successful handshake — the
959    /// resumption_secret is set by `process_client_hello` /
960    /// `process_server_hello` once shared key material is in place.
961    ///
962    /// The caller is responsible for storing the tuple alongside the
963    /// pinned `HybridVerifyingKey` of the server it was negotiated
964    /// with. Mixing tickets across servers is a configuration bug —
965    /// the resumption_secret is server-pinned.
966    pub fn resumption_hint(&self) -> Option<([u8; 32], [u8; 32])> {
967        let secret = (*self.resumption_secret.read())?;
968        Some((self.id.0, secret))
969    }
970
971    /// Update last activity timestamp
972    pub fn update_activity(&self) {
973        *self.last_activity.write() = Instant::now();
974    }
975
976    /// Check if session is expired
977    pub fn is_expired(&self, timeout: Duration) -> bool {
978        self.last_activity.read().elapsed() > timeout
979    }
980}
981
982/// Read-only snapshot of the session's pacing / bandwidth state
983/// (Phase 2.6). Returned by [`Session::bandwidth_snapshot`] for
984/// telemetry / debugging without exposing the mutable estimator.
985#[derive(Debug, Clone, Copy)]
986pub struct BandwidthSnapshot {
987    pub bottleneck_bw_bps: u64,
988    pub min_rtt: Duration,
989    pub pacing_rate_bps: u64,
990    pub cwnd_bytes: u64,
991    pub inflight_bytes: u64,
992}
993
994impl std::fmt::Debug for Session {
995    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
996        f.debug_struct("Session").field("id", &self.id).finish()
997    }
998}
999
1000impl Drop for Session {
1001    /// On session drop, explicitly zero the resumption secret. The
1002    /// `CryptoState` inside `crypto` is itself `ZeroizeOnDrop`, so its
1003    /// `session_key` is handled there.
1004    fn drop(&mut self) {
1005        if let Some(mut secret) = self.resumption_secret.write().take() {
1006            secret.zeroize();
1007        }
1008    }
1009}