phantom_protocol/transport/session.rs
1//! Phantom Transport - Session Management
2//!
3//! Virtual association that persists across IP changes.
4//! Manages streams, encryption state, and multi-path scheduling.
5
6use crate::crypto::adaptive_crypto::{CryptoSession, AEAD_MAX_INVOCATIONS};
7use crate::errors::CoreError;
8use crate::security::ReplayWindow;
9use crate::transport::{
10 bandwidth_estimator::{BandwidthEstimator, DeliverySample},
11 fallback::FallbackStateMachine,
12 pacer::Pacer,
13 path::{PathRegistry, PathStateKind, PATH_CHALLENGE_LEN},
14 scheduler::Scheduler,
15 stream::Stream,
16 types::{
17 ControlMessage, PacketFlags, PacketHeader, PhantomPacket, SchedulerMode, SessionId,
18 StreamId,
19 },
20};
21
22use arc_swap::ArcSwap;
23use dashmap::DashMap;
24use parking_lot::{Mutex, RwLock};
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use zeroize::{Zeroize, ZeroizeOnDrop};
30
31/// Session state machine
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum SessionState {
34 /// Initial state, handshake in progress
35 Handshaking,
36 /// Fully established, data can flow
37 Connected,
38 /// Migrating to new IP address
39 Migrating,
40 /// Graceful shutdown in progress
41 Closing,
42 /// Session is closed
43 Closed,
44}
45
46/// Soft high-watermark for automatic mid-session rekey (C1). Once a direction's
47/// AEAD invocation count crosses this, the data pump rotates to a fresh key
48/// *before* the hard [`AEAD_MAX_INVOCATIONS`] ceiling (Invariant 8) so a
49/// long-lived session ratchets keys instead of failing with `NonceExhausted`.
50///
51/// Default is half the ceiling — `2^47` invocations — which leaves a `2^47`
52/// headroom for in-flight packets from the old epoch. It is far larger than any
53/// realistic session lifetime, so production sessions essentially never hit it;
54/// it is a correctness backstop. Tests lower it via
55/// [`Session::set_rekey_threshold`] to exercise the path.
56pub const REKEY_SOFT_LIMIT: u64 = AEAD_MAX_INVOCATIONS / 2;
57
58/// How many epochs the receive path will catch up in one packet when accepting
59/// an authenticated forward rekey (C1). A small bound caps the HKDF work an
60/// attacker can force per spoofed packet (each step is a trial that commits
61/// nothing unless AEAD verifies) while comfortably absorbing the small epoch
62/// divergence that arises when both directions rekey at slightly different
63/// cadences. A gap larger than this is rejected; over a reliable transport the
64/// sender retransmits at the then-current epoch, so no data is lost. In
65/// practice (production `REKEY_SOFT_LIMIT` of `2^47`) the gap is essentially
66/// always 0 or 1.
67pub const MAX_REKEY_CATCHUP: u8 = 16;
68
69/// Per-stream sequence-space high-watermark that forces a mid-session rekey
70/// (C1). The AEAD nonce is `(epoch, stream_id, sequence, path_id)`; `sequence`
71/// is a per-stream `u32` that wraps at `2^32`. A single hot stream would wrap —
72/// reusing a nonce under a fixed key (the Forbidden Attack on AES-GCM) — long
73/// before the *direction-wide* [`REKEY_SOFT_LIMIT`] (`2^47`) could fire. So once
74/// *any* stream's sequence advances this far within the current epoch, the send
75/// path forces a rekey: the epoch bump gives every subsequent packet a fresh
76/// nonce prefix, and no stream can traverse the full `2^32` sequence space
77/// within a single epoch. `2^31` leaves a full `2^31` of headroom below the wrap
78/// to absorb reordered / in-flight packets from the old epoch. Tests lower it
79/// via [`Session::set_seq_rekey_watermark`].
80pub const SEQ_REKEY_WATERMARK: u32 = 1 << 31;
81
82/// Crypto state for session encryption.
83///
84/// On drop, `session_key` is zeroed. The wrapped [`CryptoSession`] holds AEAD
85/// keys in ring's opaque `LessSafeKey` (which cannot be zeroed directly — we
86/// rely on the OS reclaiming memory and on the `Arc<CryptoSessionInner>` going
87/// out of scope alongside this struct).
88#[derive(ZeroizeOnDrop)]
89pub struct CryptoState {
90 /// Bidirectional crypto session
91 #[zeroize(skip)]
92 pub session: CryptoSession,
93 /// Shared session key (for additional derivations)
94 pub session_key: [u8; 32],
95}
96
97impl CryptoState {
98 /// Create new crypto state from shared secret
99 pub fn new(shared_secret: &[u8; 32], peer_side: bool) -> Result<Self, CoreError> {
100 let session = if peer_side {
101 CryptoSession::from_shared_secret_peer(shared_secret)?
102 } else {
103 CryptoSession::from_shared_secret(shared_secret)?
104 };
105
106 // Derive additional session keys using HKDF
107 let hk = hkdf::Hkdf::<sha2::Sha256>::from_prk(shared_secret)
108 .map_err(|_| CoreError::CryptoError("HKDF PRK failed".into()))?;
109
110 let mut key_bytes = [0u8; 32];
111 hk.expand(b"phantom-transport-key", &mut key_bytes)
112 .map_err(|_| CoreError::KeyDerivationError)?;
113
114 Ok(Self {
115 session,
116 session_key: key_bytes,
117 })
118 }
119
120 /// Encrypt with a caller-supplied 12-byte nonce. Used by
121 /// `Session::encrypt_packet`, which constructs the nonce from the
122 /// authenticated `(epoch, stream_id, sequence, path_id)` of the packet
123 /// header — so the receiver survives failed decrypts without desyncing.
124 pub fn encrypt_with_nonce(
125 &self,
126 nonce: [u8; 12],
127 aad: &[u8],
128 plaintext: &[u8],
129 ) -> Result<Vec<u8>, CoreError> {
130 self.session
131 .encrypt_with_nonce(nonce, aad, plaintext)
132 .map_err(|e| CoreError::CryptoError(e.to_string()))
133 }
134
135 /// V2-path decrypt: caller supplies the 12-byte nonce explicitly.
136 pub fn decrypt_with_nonce(
137 &self,
138 nonce: [u8; 12],
139 aad: &[u8],
140 ciphertext: &[u8],
141 ) -> Result<Vec<u8>, CoreError> {
142 self.session
143 .decrypt_with_nonce(nonce, aad, ciphertext)
144 .map_err(|e| CoreError::CryptoError(e.to_string()))
145 }
146
147 /// Borrow the 4-byte nonce prefix derived at session establishment.
148 pub fn nonce_prefix(&self) -> [u8; 4] {
149 self.session.nonce_prefix()
150 }
151
152 /// Per-direction send-side AEAD invocation count for this epoch. Resets to
153 /// 0 on rekey (a fresh `CryptoState` is installed). Drives the C1
154 /// automatic-rekey trigger.
155 pub fn send_invocations(&self) -> u64 {
156 self.session.send_invocations()
157 }
158}
159
160/// Session - virtual association between two endpoints
161pub struct Session {
162 /// Unique session identifier (256-bit)
163 id: SessionId,
164 /// Current state
165 state: RwLock<SessionState>,
166 /// Active `CryptoState` — wrapped in `ArcSwap` so `rekey()` can swap it
167 /// in lock-free (Phase 1.5 + Phase 2.7).
168 ///
169 /// Encrypt/decrypt callsites do `self.crypto.load()` which is an atomic
170 /// pointer load + deref to the inner `CryptoState`. No lock acquisition
171 /// per packet. `rekey()` is a single `store()` of a freshly-derived
172 /// `Arc<CryptoState>`.
173 crypto: ArcSwap<CryptoState>,
174 /// Per-direction traffic secret. Initial value is the hybrid handshake's
175 /// shared secret; each `rekey()` derives the next via
176 /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` (Phase 1.5).
177 traffic_secret: RwLock<[u8; 32]>,
178 /// Rekey generation counter. Starts at 0 at session establishment; each
179 /// successful `rekey()` increments it. Wire-emitted in
180 /// `PacketHeader.epoch` so the peer can match the right key.
181 epoch: AtomicU8,
182 /// Send-side AEAD-invocation high-watermark that triggers an automatic
183 /// mid-session rekey (C1). Defaults to [`REKEY_SOFT_LIMIT`]; tests/embedders
184 /// lower it via [`set_rekey_threshold`](Self::set_rekey_threshold).
185 rekey_after: AtomicU64,
186 /// Per-stream sequence high-watermark that forces a rekey for AEAD nonce
187 /// uniqueness (C1). Defaults to [`SEQ_REKEY_WATERMARK`] (`2^31`); tests lower
188 /// it via [`set_seq_rekey_watermark`](Self::set_seq_rekey_watermark).
189 seq_rekey_watermark: AtomicU32,
190 /// Per-stream `(epoch, base_sequence)` checkpoint bounding how far a stream's
191 /// sequence may advance within one epoch (C1). `base_sequence` is the stream's
192 /// sequence when it entered the current epoch; the send path forces a rekey
193 /// once `sequence - base_sequence` crosses
194 /// [`seq_rekey_watermark`](Self::seq_rekey_watermark). Rebased lazily, per
195 /// stream, on the first send after an epoch change.
196 seq_epoch_base: DashMap<StreamId, (u8, u32)>,
197 /// Serialises every epoch transition (C1). The data pump runs the send loop
198 /// and the receive task concurrently over one `Arc<Session>`, so a send-side
199 /// `rekey()` can race a receive-side ratchet. Both hold this mutex across
200 /// their derive→install→epoch-bump so the installed key depth and the epoch
201 /// counter never diverge (the bug would otherwise wedge the session).
202 rekey_lock: Mutex<()>,
203 /// Which side of the handshake we are. Carried into every
204 /// `CryptoState::new(...)` re-derivation so the per-direction keys are
205 /// laid out the same way they were at session establishment.
206 is_server: bool,
207 /// Active streams
208 streams: RwLock<HashMap<StreamId, Arc<Stream>>>,
209 /// Next stream ID counter
210 next_stream_id: AtomicU32,
211 /// Control sequence number
212 control_sequence: AtomicU32,
213 /// Multi-path scheduler
214 scheduler: Arc<Scheduler>,
215 /// Resumption secret for 0-RTT
216 resumption_secret: RwLock<Option<[u8; 32]>>,
217 /// Last activity timestamp
218 last_activity: RwLock<Instant>,
219 /// Fallback state machine
220 #[allow(dead_code)]
221 fallback: Arc<FallbackStateMachine>,
222 /// Per-stream sliding-window replay protection. Lazily populated as
223 /// streams appear on the wire. Sits alongside (not in place of) the AEAD
224 /// strict-counter replay protection — see `decrypt_packet`.
225 replay_windows: DashMap<StreamId, Mutex<ReplayWindow>>,
226 /// Cumulative count of replay rejections (across all streams) — exposed
227 /// for metrics/telemetry.
228 replay_rejected_total: AtomicU64,
229 /// Per-path validation state (Phase 4.2). Each `path_id` referenced in a
230 /// `PacketHeader.path_id` must transit through `Unvalidated →
231 /// Validating → Validated` (via a challenge-response round trip)
232 /// before the data pump treats it as authoritative. Defaults to a
233 /// pre-populated entry for `path_id = 0` (the implicit single-path)
234 /// in the `Validated` state so legacy single-leg sessions keep
235 /// working without any explicit setup.
236 path_registry: Arc<PathRegistry>,
237 /// Outbound rate-limiter (Phase 2.6). Defaults to
238 /// [`Pacer::unlimited`] so the historical no-pacing behavior is
239 /// unchanged unless the caller explicitly sets a rate via
240 /// [`Session::pacer`]. The data pump consults this before every
241 /// outbound packet — the existing implementation just calls
242 /// `try_consume` and falls through if the pacer is disabled, so the
243 /// integration is zero-overhead in the default configuration.
244 pacer: Arc<Pacer>,
245 /// BBR-style bandwidth + RTT estimator (Phase 2.6 / Phase 4.4
246 /// foundation). The data pump feeds it via [`Session::on_packet_sent`]
247 /// and [`Session::on_packet_acked`]; the resulting `pacing_rate()`
248 /// feeds back into the `pacer` to close the loop.
249 bandwidth_estimator: parking_lot::Mutex<BandwidthEstimator>,
250 /// Outbound-ready signal (Phase 2.4). Streams or the application
251 /// can `notify_one()` this to wake the data pump immediately
252 /// instead of waiting for the next 10 ms `poll_interval` tick.
253 /// The pump keeps the tick as a retransmit-timer fallback.
254 send_notify: Arc<tokio::sync::Notify>,
255}
256
257impl Session {
258 /// Create a new session with given shared secret
259 pub fn new(
260 session_id: SessionId,
261 shared_secret: &[u8; 32],
262 peer_side: bool,
263 ) -> Result<Self, CoreError> {
264 let crypto = CryptoState::new(shared_secret, peer_side)?;
265 let path_registry = Arc::new(PathRegistry::new());
266 // Pre-register `path_id = 0` as the implicit default path — the
267 // handshake itself proved reachability over this path, so no
268 // additional PATH_CHALLENGE is needed (Phase 4.2).
269 path_registry.register_validated(0);
270
271 Ok(Self {
272 id: session_id,
273 state: RwLock::new(SessionState::Handshaking),
274 crypto: ArcSwap::new(Arc::new(crypto)),
275 traffic_secret: RwLock::new(*shared_secret),
276 epoch: AtomicU8::new(0),
277 rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
278 seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
279 seq_epoch_base: DashMap::new(),
280 rekey_lock: Mutex::new(()),
281 is_server: peer_side,
282 streams: RwLock::new(HashMap::new()),
283 next_stream_id: AtomicU32::new(1),
284 control_sequence: AtomicU32::new(0),
285 scheduler: Arc::new(Scheduler::new(SchedulerMode::LowLatency)),
286 resumption_secret: RwLock::new(None),
287 last_activity: RwLock::new(Instant::now()),
288 fallback: Arc::new(FallbackStateMachine::with_defaults()),
289 replay_windows: DashMap::new(),
290 replay_rejected_total: AtomicU64::new(0),
291 path_registry,
292 pacer: Arc::new(Pacer::unlimited()),
293 bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
294 send_notify: Arc::new(tokio::sync::Notify::new()),
295 })
296 }
297
298 /// Create session from a pre-derived crypto state (e.g., after handshake).
299 ///
300 /// `traffic_secret` is the master from which the supplied `crypto` was
301 /// derived — it seeds the [`rekey`](Self::rekey) HKDF chain. `is_server`
302 /// records which side of the handshake we are; rekey re-derives keys
303 /// with the same side so per-direction layout is preserved.
304 pub fn from_derived(
305 session_id: SessionId,
306 crypto: CryptoState,
307 scheduler_mode: SchedulerMode,
308 traffic_secret: [u8; 32],
309 is_server: bool,
310 ) -> Self {
311 let path_registry = Arc::new(PathRegistry::new());
312 path_registry.register_validated(0);
313 Self {
314 id: session_id,
315 state: RwLock::new(SessionState::Connected),
316 crypto: ArcSwap::new(Arc::new(crypto)),
317 traffic_secret: RwLock::new(traffic_secret),
318 epoch: AtomicU8::new(0),
319 rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
320 seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
321 seq_epoch_base: DashMap::new(),
322 rekey_lock: Mutex::new(()),
323 is_server,
324 streams: RwLock::new(HashMap::new()),
325 next_stream_id: AtomicU32::new(1),
326 control_sequence: AtomicU32::new(0),
327 scheduler: Arc::new(Scheduler::new(scheduler_mode)),
328 resumption_secret: RwLock::new(None),
329 last_activity: RwLock::new(Instant::now()),
330 fallback: Arc::new(FallbackStateMachine::with_defaults()),
331 replay_windows: DashMap::new(),
332 replay_rejected_total: AtomicU64::new(0),
333 path_registry,
334 pacer: Arc::new(Pacer::unlimited()),
335 bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
336 send_notify: Arc::new(tokio::sync::Notify::new()),
337 }
338 }
339
340 /// Resume a session using resumption secret (0-RTT)
341 pub fn resume(
342 session_id: SessionId,
343 resumption_secret: &[u8; 32],
344 peer_side: bool,
345 ) -> Result<Self, CoreError> {
346 let crypto = CryptoState::new(resumption_secret, peer_side)?;
347 let path_registry = Arc::new(PathRegistry::new());
348 path_registry.register_validated(0);
349
350 Ok(Self {
351 id: session_id,
352 state: RwLock::new(SessionState::Connected),
353 crypto: ArcSwap::new(Arc::new(crypto)),
354 traffic_secret: RwLock::new(*resumption_secret),
355 epoch: AtomicU8::new(0),
356 rekey_after: AtomicU64::new(REKEY_SOFT_LIMIT),
357 seq_rekey_watermark: AtomicU32::new(SEQ_REKEY_WATERMARK),
358 seq_epoch_base: DashMap::new(),
359 rekey_lock: Mutex::new(()),
360 is_server: peer_side,
361 streams: RwLock::new(HashMap::new()),
362 next_stream_id: AtomicU32::new(1),
363 control_sequence: AtomicU32::new(0),
364 scheduler: Arc::new(Scheduler::new(SchedulerMode::LowLatency)),
365 resumption_secret: RwLock::new(Some(*resumption_secret)),
366 last_activity: RwLock::new(Instant::now()),
367 fallback: Arc::new(FallbackStateMachine::with_defaults()),
368 replay_windows: DashMap::new(),
369 replay_rejected_total: AtomicU64::new(0),
370 path_registry,
371 pacer: Arc::new(Pacer::unlimited()),
372 bandwidth_estimator: parking_lot::Mutex::new(BandwidthEstimator::new()),
373 send_notify: Arc::new(tokio::sync::Notify::new()),
374 })
375 }
376
377 /// Get session ID
378 pub fn id(&self) -> &SessionId {
379 &self.id
380 }
381
382 /// Get current state
383 pub fn state(&self) -> SessionState {
384 *self.state.read()
385 }
386
387 /// Transition to a new state
388 pub fn set_state(&self, new_state: SessionState) {
389 *self.state.write() = new_state;
390 }
391
392 /// Open a new stream
393 pub fn open_stream(&self) -> Arc<Stream> {
394 let stream_id = self.next_stream_id.fetch_add(1, Ordering::SeqCst) as StreamId;
395 let stream = Arc::new(Stream::new(stream_id));
396
397 self.streams.write().insert(stream_id, stream.clone());
398 stream
399 }
400
401 /// Get an existing stream
402 pub fn get_stream(&self, stream_id: StreamId) -> Option<Arc<Stream>> {
403 self.streams.read().get(&stream_id).cloned()
404 }
405
406 /// Close a stream
407 pub fn close_stream(&self, stream_id: StreamId) -> bool {
408 self.streams.write().remove(&stream_id).is_some()
409 }
410
411 /// Get number of active streams
412 pub fn stream_count(&self) -> u32 {
413 self.streams.read().len() as u32
414 }
415
416 /// Total number of replayed packets rejected by the sliding-window check
417 /// across all streams in this session. Intended for the
418 /// `replay_rejected_total` metric.
419 pub fn replay_rejected_total(&self) -> u64 {
420 self.replay_rejected_total.load(Ordering::Relaxed)
421 }
422
423 /// Current rekey generation (Phase 1.5). Starts at 0; each successful
424 /// [`rekey`](Self::rekey) increments by one. Carried on the wire in
425 /// `PacketHeader.epoch` so the peer can match the right derived key.
426 pub fn current_epoch(&self) -> u8 {
427 self.epoch.load(Ordering::Relaxed)
428 }
429
430 /// Whether this session is acting as the server side. Determined at
431 /// construction; required for re-deriving per-direction keys on rekey.
432 pub fn is_server(&self) -> bool {
433 self.is_server
434 }
435
436 /// Mid-session key rotation (Phase 1.5).
437 ///
438 /// Derives the next traffic secret from the current one via
439 /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` and builds a fresh
440 /// [`CryptoState`] under that secret. The new state is installed via
441 /// an atomic `ArcSwap::store`, so concurrent encrypt/decrypt calls
442 /// observe either the old or the new state — never a partially-written
443 /// in-between. The previous traffic secret is explicitly zeroed before
444 /// being overwritten.
445 ///
446 /// Returns the new epoch (1, 2, 3, ...). Wraps an error if the epoch
447 /// counter has saturated `u8::MAX` (after 255 successful rekeys —
448 /// equivalent to ~5 days at the default 30-minute cadence; long-lived
449 /// sessions are expected to reconnect rather than wrap).
450 ///
451 /// Wire signalling: callers that want the peer to follow this rekey
452 /// emit a V2 packet whose header carries the new epoch (and optionally
453 /// the `PacketFlags::REKEY` flag). Receivers respond by calling
454 /// `rekey()` themselves once they see the bump — keeping both ends in
455 /// lockstep.
456 #[tracing::instrument(name = "phantom.session.rekey", skip_all)]
457 pub fn rekey(&self) -> Result<u8, CoreError> {
458 // Serialise the whole transition (C1): the send loop and the receive
459 // task share this `Session`, so derive+install+epoch-bump must be atomic
460 // w.r.t. a concurrent receive-side ratchet, or the installed key depth
461 // and the epoch counter diverge and wedge the session.
462 let _rekey = self.rekey_lock.lock();
463 let current_epoch = self.epoch.load(Ordering::Relaxed);
464 if current_epoch == u8::MAX {
465 return Err(CoreError::CryptoError(
466 "session epoch saturated (u8::MAX); reconnect required".into(),
467 ));
468 }
469 let (next_secret, new_crypto) = self.derive_forward_crypto(1)?;
470 self.commit_forward_crypto(1, next_secret, new_crypto);
471 Ok(current_epoch + 1)
472 }
473
474 /// Derive the next epoch's traffic secret + [`CryptoState`] from the current
475 /// secret WITHOUT installing them. The HKDF chain step is
476 /// `HKDF-Expand(current, "phantom-rekey-v1", 32)` (Invariant 5 — the label is
477 /// load-bearing; it must match the committing path in `rekey`). Returns the
478 /// derived secret and a fresh per-direction AEAD state under it.
479 ///
480 /// This is the non-committing half used by the receive path to verify a
481 /// claimed-next-epoch packet (trial decrypt) before trusting the epoch bump,
482 /// so a forged, unauthenticated `header.epoch` cannot desync the session.
483 ///
484 /// `steps` ≥ 1 applies the chain that many times (the receive path may need
485 /// to catch up several epochs when both directions rekey at slightly
486 /// different cadences). Intermediate secrets are zeroed as the walk
487 /// proceeds; only the final-epoch secret is returned for the caller to
488 /// commit.
489 fn derive_forward_crypto(&self, steps: u8) -> Result<([u8; 32], CryptoState), CoreError> {
490 use zeroize::Zeroizing;
491 debug_assert!(steps >= 1, "derive_forward_crypto needs at least one step");
492 // `Zeroizing` so every intermediate secret — and the working copy of the
493 // current secret — is wiped on *every* exit path, including the early
494 // `?` returns (an attacker can force this derivation merely by setting
495 // `header.epoch`, so the candidate is genuinely sensitive).
496 let mut secret: Zeroizing<[u8; 32]> = Zeroizing::new(*self.traffic_secret.read());
497 for _ in 0..steps {
498 let mut next: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
499 let hk = hkdf::Hkdf::<sha2::Sha256>::from_prk(&*secret)
500 .map_err(|_| CoreError::KeyDerivationError)?;
501 // Invariant 5 — the `phantom-rekey-v1` label is load-bearing and must
502 // match the committing path in `rekey`.
503 hk.expand(b"phantom-rekey-v1", &mut *next)
504 .map_err(|_| CoreError::KeyDerivationError)?;
505 secret = next; // previous-step secret drops → zeroed
506 }
507 let new_crypto = CryptoState::new(&secret, self.is_server)?;
508 // Copy the bytes out for the caller; the `Zeroizing` working copy is
509 // wiped when it drops here. The caller is responsible for the returned
510 // secret (committed into `traffic_secret`, or zeroed on a failed trial).
511 Ok((*secret, new_crypto))
512 }
513
514 /// Install a [`derive_forward_crypto`](Self::derive_forward_crypto)d epoch:
515 /// swap in the new `CryptoState` via the lock-free `ArcSwap`, zero+replace
516 /// the traffic secret under the write lock, and saturatingly advance the
517 /// epoch by `steps` (Invariant 5 — epoch never wraps). The caller MUST have
518 /// authenticated the transition (a successful trial decrypt, or its own
519 /// send-side rekey) — this routine verifies nothing itself.
520 fn commit_forward_crypto(&self, steps: u8, final_secret: [u8; 32], new_crypto: CryptoState) {
521 let mut current = self.traffic_secret.write();
522 // Install the new AEAD state, then the new epoch (SeqCst) so the wire
523 // header the send path stamps matches the key it encrypts under.
524 self.crypto.store(Arc::new(new_crypto));
525 // Zero the old secret before overwriting it so the previous-epoch key
526 // material does not survive in memory.
527 current.zeroize();
528 *current = final_secret;
529 let cur = self.epoch.load(Ordering::Relaxed);
530 self.epoch
531 .store(cur.saturating_add(steps), Ordering::SeqCst);
532 }
533
534 /// Send-side AEAD invocation count for the current epoch (resets to 0 on
535 /// each rekey). Drives [`send_needs_rekey`](Self::send_needs_rekey).
536 pub fn send_invocations(&self) -> u64 {
537 self.crypto.load().send_invocations()
538 }
539
540 /// The send-invocation high-watermark at which the pump auto-rekeys.
541 pub fn rekey_threshold(&self) -> u64 {
542 self.rekey_after.load(Ordering::Relaxed)
543 }
544
545 /// Override the auto-rekey high-watermark (default [`REKEY_SOFT_LIMIT`]).
546 /// Clamped to `>= 1`. Rust-only — primarily for tests/soak harnesses that
547 /// need to exercise mid-session rekey without sending `2^47` packets.
548 pub fn set_rekey_threshold(&self, n: u64) {
549 self.rekey_after.store(n.max(1), Ordering::Relaxed);
550 }
551
552 /// Override the per-stream sequence rekey watermark (default
553 /// [`SEQ_REKEY_WATERMARK`], `2^31`). Clamped to `>= 1`. Rust-only — primarily
554 /// for tests/soak harnesses that need to exercise the per-stream forced rekey
555 /// (C1) without driving a single stream through `2^31` sequence numbers.
556 pub fn set_seq_rekey_watermark(&self, n: u32) {
557 self.seq_rekey_watermark.store(n.max(1), Ordering::Relaxed);
558 }
559
560 /// True once `stream_id`'s sequence has advanced past the per-stream
561 /// watermark within the current epoch (C1). The send path checks this before
562 /// stamping each packet and, when set, forces a [`rekey`](Self::rekey) so a
563 /// per-stream `u32` sequence can never wrap within one epoch — which would
564 /// otherwise repeat the AEAD nonce `(epoch, stream_id, sequence, path_id)`
565 /// under a fixed key (Invariant 8).
566 ///
567 /// The per-stream `(epoch, base)` checkpoint is rebased lazily on the first
568 /// call after an epoch change, so the measured span is always relative to
569 /// where the stream entered the *current* epoch.
570 pub fn stream_seq_needs_rekey(&self, stream_id: StreamId, seq: u32) -> bool {
571 let epoch = self.current_epoch();
572 let mut entry = self.seq_epoch_base.entry(stream_id).or_insert((epoch, seq));
573 let (base_epoch, base_seq) = *entry;
574 if base_epoch != epoch {
575 // First send on this stream since a rekey — rebase to the current
576 // sequence and measure this epoch's span from here.
577 *entry = (epoch, seq);
578 return false;
579 }
580 seq.wrapping_sub(base_seq) >= self.seq_rekey_watermark.load(Ordering::Relaxed)
581 }
582
583 /// True once the send direction has crossed the rekey high-watermark and the
584 /// epoch has room to advance. The data pump checks this before each
585 /// application send and, when set, rekeys + flags the packet `REKEY` so the
586 /// peer follows via the authenticated epoch bump.
587 pub fn send_needs_rekey(&self) -> bool {
588 self.current_epoch() < u8::MAX
589 && self.send_invocations() >= self.rekey_after.load(Ordering::Relaxed)
590 }
591
592 /// Decrypt a packet, transparently following an **authenticated** forward
593 /// rekey of up to [`MAX_REKEY_CATCHUP`] epochs (C1).
594 ///
595 /// - `header.epoch == current`: ordinary [`decrypt_packet`](Self::decrypt_packet).
596 /// - `current < header.epoch <= current + MAX_REKEY_CATCHUP`: derive the
597 /// candidate key that many epochs ahead and *trial*-decrypt. Only on AEAD
598 /// success — i.e. once the epoch bump is proven authentic — is the rekey
599 /// committed and the replay window consulted (Invariant 4 ordering
600 /// preserved). A forged `header.epoch` fails the AEAD open, nothing is
601 /// committed, and the session does not desync. The bound caps an attacker
602 /// to at most `MAX_REKEY_CATCHUP` HKDF steps per spoofed packet.
603 /// - anything else (behind current, more than `MAX_REKEY_CATCHUP` ahead, or
604 /// epoch saturated): rejected. Over a reliable transport the sender
605 /// retransmits at the then-current epoch, so no data is lost.
606 pub fn decrypt_packet_accepting_rekey(
607 &self,
608 header: &PacketHeader,
609 ciphertext: &[u8],
610 ) -> Result<Vec<u8>, CoreError> {
611 // Fast paths that need no epoch transition (no lock).
612 let cur = self.current_epoch();
613 if header.epoch == cur {
614 return self.decrypt_packet(header, ciphertext);
615 }
616 if header.epoch < cur {
617 return Err(CoreError::CryptoError(format!(
618 "packet epoch {} is behind the current epoch {}",
619 header.epoch, cur
620 )));
621 }
622
623 // A forward ratchet mutates the epoch + key, so it must be serialised
624 // against a concurrent send-side `rekey()` (C1 — both tasks share this
625 // `Session`). Hold the rekey lock across the re-check, derive, trial
626 // decrypt, and commit so the installed key depth and the epoch stay in
627 // lockstep.
628 let _rekey = self.rekey_lock.lock();
629 // Re-read under the lock: a concurrent rekey may have already advanced us.
630 let cur = self.current_epoch();
631 if header.epoch == cur {
632 drop(_rekey);
633 return self.decrypt_packet(header, ciphertext);
634 }
635 if header.epoch < cur {
636 return Err(CoreError::CryptoError(format!(
637 "packet epoch {} is behind the current epoch {}",
638 header.epoch, cur
639 )));
640 }
641 let steps = header.epoch - cur; // > 0, both u8 → no underflow
642 if steps > MAX_REKEY_CATCHUP {
643 return Err(CoreError::CryptoError(format!(
644 "packet epoch {} is more than {} epochs ahead of current {}",
645 header.epoch, MAX_REKEY_CATCHUP, cur
646 )));
647 }
648
649 // Candidate key `steps` epochs ahead — derived but NOT installed.
650 let (mut final_secret, final_crypto) = self.derive_forward_crypto(steps)?;
651 let nonce = Self::build_packet_nonce(final_crypto.nonce_prefix(), header);
652 let header_bytes = header.to_wire();
653 // AEAD gate: a forged epoch bump fails here and we return without
654 // committing — the live epoch is untouched. Zero the (valid, sensitive)
655 // candidate secret we are not going to install.
656 let plaintext = match final_crypto.decrypt_with_nonce(nonce, &header_bytes, ciphertext) {
657 Ok(pt) => pt,
658 Err(e) => {
659 final_secret.zeroize();
660 return Err(e);
661 }
662 };
663
664 // Authentic forward rekey — commit (still under the rekey lock; `cur` was
665 // read under it, so `cur + steps == header.epoch` is the absolute, race-
666 // free target), then drop the lock and apply the replay window AFTER the
667 // AEAD open (Invariant 4 — the window is per-(stream,sequence) and
668 // epoch-independent, so it needs no rekey serialisation).
669 self.commit_forward_crypto(steps, final_secret, final_crypto);
670 drop(_rekey);
671 let window_entry = self
672 .replay_windows
673 .entry(header.stream_id)
674 .or_insert_with(|| Mutex::new(ReplayWindow::new()));
675 let accepted = window_entry.lock().accept(header.sequence);
676 if !accepted {
677 self.replay_rejected_total.fetch_add(1, Ordering::Relaxed);
678 return Err(CoreError::ReplayDetected(format!(
679 "stream {} sequence {} already seen or beyond window",
680 header.stream_id, header.sequence
681 )));
682 }
683 Ok(plaintext)
684 }
685
686 /// Advance to a specific target epoch by repeatedly applying the rekey
687 /// HKDF chain. Used by the receive path to "catch up" when it sees a
688 /// packet from a higher epoch than the locally known one. Refuses to go
689 /// backwards (a lower target than current returns Ok without changes).
690 pub fn ratchet_to_epoch(&self, target: u8) -> Result<(), CoreError> {
691 let mut current = self.epoch.load(Ordering::Relaxed);
692 while current < target {
693 self.rekey()?;
694 current = self.epoch.load(Ordering::Relaxed);
695 }
696 Ok(())
697 }
698
699 // ── Multi-path / migration (Phase 4.2) ────────────────────────────
700
701 /// Snapshot of currently `Validated` path ids. Useful for the
702 /// scheduler when picking an outbound path.
703 pub fn validated_paths(&self) -> Vec<u8> {
704 self.path_registry.validated_paths()
705 }
706
707 /// State of a specific path within this session. Returns `None` for
708 /// path ids the session has never observed.
709 pub fn path_state(&self, path_id: u8) -> Option<PathStateKind> {
710 self.path_registry.state(path_id)
711 }
712
713 /// Register a new path id and immediately issue a 32-byte
714 /// PATH_CHALLENGE for it. Returns the challenge bytes; the caller
715 /// must transmit them in a V2 packet with `PacketFlags::PATH_VALIDATION`
716 /// set on the new path. Subsequent calls on an already-Validating
717 /// path re-issue a fresh challenge.
718 ///
719 /// Returns `None` if the path is in a terminal state (`Validated`
720 /// or `Failed`).
721 #[tracing::instrument(name = "phantom.path.begin_validation", skip_all, fields(path_id = path_id))]
722 pub fn begin_path_validation(&self, path_id: u8) -> Option<[u8; PATH_CHALLENGE_LEN]> {
723 self.path_registry.register(path_id);
724 self.path_registry.issue_challenge(path_id)
725 }
726
727 /// Register `path_id` as `Unvalidated` if the session has never observed it
728 /// (PATH-001). Used by the receive-side gate to start tracking a fresh path
729 /// id seen on inbound (AEAD-authenticated) data, so a later
730 /// challenge/response can promote it to `Validated`. Idempotent — never
731 /// resets an already-known path (e.g. the pre-validated path 0).
732 pub(crate) fn register_unvalidated_path(&self, path_id: u8) {
733 if self.path_registry.state(path_id).is_none() {
734 self.path_registry.register(path_id);
735 }
736 }
737
738 /// Verify a peer's `PATH_VALIDATION` response. Returns `true` if
739 /// the response matches the in-flight challenge (path is now
740 /// `Validated`). Returns `false` otherwise — the path may have
741 /// transitioned to `Failed`.
742 #[tracing::instrument(name = "phantom.path.complete_validation", skip(response), fields(path_id = path_id))]
743 pub fn complete_path_validation(&self, path_id: u8, response: &[u8]) -> bool {
744 self.path_registry.verify_response(path_id, response)
745 }
746
747 /// Record that a packet was observed on the path. Cheap to call
748 /// per-packet — used by the data pump to keep `last_packet_seen`
749 /// fresh for the timeout sweep.
750 pub fn mark_path_seen(&self, path_id: u8) {
751 self.path_registry.mark_seen(path_id);
752 }
753
754 // ── Pacer / BandwidthEstimator (Phase 2.6) ─────────────────────────
755
756 /// Shared handle to this session's outbound rate-limiter. Cheap to
757 /// clone (`Arc`). The data pump consults this before every outbound
758 /// packet; idle by default ([`Pacer::unlimited`]).
759 pub fn pacer(&self) -> Arc<Pacer> {
760 self.pacer.clone()
761 }
762
763 /// Record that a packet of `bytes` length is going on the wire.
764 /// Feeds the BBR-style bandwidth estimator. Cheap (one mutex lock
765 /// + a counter increment).
766 pub fn on_packet_sent(&self, bytes: u64) {
767 self.bandwidth_estimator.lock().on_send(bytes);
768 }
769
770 /// Record that an ACK arrived with delivery sample `sample`. The
771 /// returned `u64` is the updated bottleneck bandwidth estimate; we
772 /// reflect it into the pacer so the outbound rate tracks the
773 /// peer's actual receive throughput.
774 pub fn on_packet_acked(&self, sample: DeliverySample) -> u64 {
775 let bw = self.bandwidth_estimator.lock().on_ack(sample);
776 // Mirror the estimator's pacing decision onto the pacer so the
777 // two stay in lock-step.
778 let rate = self.bandwidth_estimator.lock().pacing_rate();
779 if rate > 0 {
780 self.pacer.set_rate(rate);
781 }
782 bw
783 }
784
785 /// Record that a packet of `bytes` length was lost (no ACK before
786 /// retransmit timer fired). Drives BBR's loss-based feedback.
787 pub fn on_packet_lost(&self, bytes: u64) {
788 self.bandwidth_estimator.lock().on_loss(bytes);
789 }
790
791 /// Current BBR congestion-control state. Observability / test hook — lets
792 /// callers confirm a loss drove the estimator into `FastRecovery`.
793 pub fn bbr_state(&self) -> crate::transport::bandwidth_estimator::BbrState {
794 self.bandwidth_estimator.lock().state()
795 }
796
797 /// Read a snapshot of the bandwidth / pacing estimator. Cheap; held
798 /// over a single mutex lock.
799 pub fn bandwidth_snapshot(&self) -> BandwidthSnapshot {
800 let est = self.bandwidth_estimator.lock();
801 BandwidthSnapshot {
802 bottleneck_bw_bps: est.bottleneck_bandwidth(),
803 min_rtt: est.min_rtt(),
804 pacing_rate_bps: est.pacing_rate(),
805 cwnd_bytes: est.cwnd(),
806 inflight_bytes: est.inflight_bytes(),
807 }
808 }
809
810 // ── Event-driven send-loop wake-up (Phase 2.4) ─────────────────────
811
812 /// Shared handle to the outbound-ready notify. The API-layer data
813 /// pump awaits this via `Notify::notified()`; any task with the
814 /// handle can wake it instantly via [`Self::notify_outbound_ready`].
815 pub fn send_notifier(&self) -> Arc<tokio::sync::Notify> {
816 self.send_notify.clone()
817 }
818
819 /// Wake the data pump's send loop immediately so it can drain newly-
820 /// queued packets instead of waiting for the next 10 ms tick. Cheap
821 /// (a single `notify_one()`); duplicate calls collapse to one wake.
822 pub fn notify_outbound_ready(&self) {
823 self.send_notify.notify_one();
824 }
825
826 /// Build the AEAD nonce from the authenticated header fields.
827 ///
828 /// Layout (12 bytes total):
829 /// ```text
830 /// [0..4] : nonce_prefix (from CryptoState; identical for the lifetime
831 /// of a session, freshly derived per rekey)
832 /// [4] : epoch
833 /// [5..7] : stream_id (big-endian)
834 /// [7..11] : sequence (big-endian)
835 /// [11] : path_id
836 /// ```
837 ///
838 /// Uniqueness argument: senders never reuse `(stream_id, sequence)`
839 /// within a single epoch. The path_id distinguishes the same logical
840 /// packet replayed across paths (Phase 4.2 multi-path). Together the
841 /// 12-byte nonce is unique for every `seal_in_place_*` invocation
842 /// under the given key.
843 fn build_packet_nonce(prefix: [u8; 4], header: &PacketHeader) -> [u8; 12] {
844 let mut n = [0u8; 12];
845 n[..4].copy_from_slice(&prefix);
846 n[4] = header.epoch;
847 n[5..7].copy_from_slice(&header.stream_id.to_be_bytes());
848 n[7..11].copy_from_slice(&header.sequence.to_be_bytes());
849 n[11] = header.path_id;
850 n
851 }
852
853 /// Encrypt a packet payload.
854 ///
855 /// The AEAD nonce is derived from the authenticated `(epoch, stream_id,
856 /// sequence, path_id)` fields of the packet header rather than from an
857 /// internal monotonic counter, so a failed peer decrypt never desyncs the
858 /// receiver. The AAD is the 45-byte wire image of the header
859 /// ([`PacketHeader::to_wire`]), so any wire-level mutation invalidates the tag.
860 pub fn encrypt_packet(
861 &self,
862 header: &PacketHeader,
863 plaintext: &[u8],
864 ) -> Result<Vec<u8>, CoreError> {
865 let crypto = self.crypto.load();
866 let nonce = Self::build_packet_nonce(crypto.nonce_prefix(), header);
867 let header_bytes = header.to_wire();
868 crypto.encrypt_with_nonce(nonce, &header_bytes, plaintext)
869 }
870
871 /// Decrypt a packet payload. Performs AEAD verify + per-stream
872 /// sliding-window replay rejection (the window check runs **after** a
873 /// successful AEAD open — Invariant 4 — so we never key off
874 /// un-authenticated sequence numbers).
875 ///
876 /// A failed decrypt does NOT desync future decrypts: the AEAD nonce is
877 /// derived from this packet's authenticated header fields, so the receiver
878 /// stays in lock-step with the sender regardless of intervening bad
879 /// packets.
880 pub fn decrypt_packet(
881 &self,
882 header: &PacketHeader,
883 ciphertext: &[u8],
884 ) -> Result<Vec<u8>, CoreError> {
885 let crypto = self.crypto.load();
886 let nonce = Self::build_packet_nonce(crypto.nonce_prefix(), header);
887 let header_bytes = header.to_wire();
888 let plaintext = crypto.decrypt_with_nonce(nonce, &header_bytes, ciphertext)?;
889
890 // Sliding-window guard. ReplayWindow keys on `(stream_id, sequence)`
891 // only — the `epoch` / `path_id` fields do NOT contribute to the
892 // replay identity because replay is a property of "is this sequence
893 // a duplicate", independent of which path it arrived over or which
894 // rekey generation produced it.
895 let window_entry = self
896 .replay_windows
897 .entry(header.stream_id)
898 .or_insert_with(|| Mutex::new(ReplayWindow::new()));
899 let accepted = window_entry.lock().accept(header.sequence);
900 if !accepted {
901 self.replay_rejected_total.fetch_add(1, Ordering::Relaxed);
902 return Err(CoreError::ReplayDetected(format!(
903 "stream {} sequence {} already seen or beyond window",
904 header.stream_id, header.sequence
905 )));
906 }
907 Ok(plaintext)
908 }
909
910 /// Create a control packet
911 pub fn create_control_packet(
912 &self,
913 _message: ControlMessage,
914 payload: Vec<u8>,
915 ) -> PhantomPacket {
916 let seq = self.control_sequence.fetch_add(1, Ordering::SeqCst);
917 let header = PacketHeader::new(
918 self.id,
919 0, // control stream
920 seq,
921 PacketFlags::new(PacketFlags::CONTROL | PacketFlags::RELIABLE),
922 );
923 // Note: Real implementation would also encrypt control packet
924 PhantomPacket::new(header, payload)
925 }
926
927 /// Get the scheduler
928 pub fn scheduler(&self) -> &Arc<Scheduler> {
929 &self.scheduler
930 }
931
932 /// Set resumption secret for 0-RTT.
933 ///
934 /// If a secret was already set, the previous bytes are explicitly zeroed
935 /// before being replaced — defense in depth in case `set_resumption_secret`
936 /// is called multiple times within a session.
937 pub fn set_resumption_secret(&self, secret: [u8; 32]) {
938 let mut guard = self.resumption_secret.write();
939 if let Some(mut old) = guard.take() {
940 old.zeroize();
941 }
942 *guard = Some(secret);
943 }
944
945 /// The resumption secret for 0-RTT, if one has been installed. Rust-only —
946 /// the FFI surface exposes this via `PhantomSession::resumption_hint()`.
947 pub fn resumption_secret(&self) -> Option<[u8; 32]> {
948 *self.resumption_secret.read()
949 }
950
951 /// Check if session can be resumed (has resumption secret)
952 pub fn can_resume(&self) -> bool {
953 self.resumption_secret.read().is_some()
954 }
955
956 /// Extract the resumption hint needed to attempt 0-RTT resume on
957 /// a future connect (Phase 4.1). Returns `Some((session_id_bytes,
958 /// resumption_secret))` only after a successful handshake — the
959 /// resumption_secret is set by `process_client_hello` /
960 /// `process_server_hello` once shared key material is in place.
961 ///
962 /// The caller is responsible for storing the tuple alongside the
963 /// pinned `HybridVerifyingKey` of the server it was negotiated
964 /// with. Mixing tickets across servers is a configuration bug —
965 /// the resumption_secret is server-pinned.
966 pub fn resumption_hint(&self) -> Option<([u8; 32], [u8; 32])> {
967 let secret = (*self.resumption_secret.read())?;
968 Some((self.id.0, secret))
969 }
970
971 /// Update last activity timestamp
972 pub fn update_activity(&self) {
973 *self.last_activity.write() = Instant::now();
974 }
975
976 /// Check if session is expired
977 pub fn is_expired(&self, timeout: Duration) -> bool {
978 self.last_activity.read().elapsed() > timeout
979 }
980}
981
982/// Read-only snapshot of the session's pacing / bandwidth state
983/// (Phase 2.6). Returned by [`Session::bandwidth_snapshot`] for
984/// telemetry / debugging without exposing the mutable estimator.
985#[derive(Debug, Clone, Copy)]
986pub struct BandwidthSnapshot {
987 pub bottleneck_bw_bps: u64,
988 pub min_rtt: Duration,
989 pub pacing_rate_bps: u64,
990 pub cwnd_bytes: u64,
991 pub inflight_bytes: u64,
992}
993
994impl std::fmt::Debug for Session {
995 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
996 f.debug_struct("Session").field("id", &self.id).finish()
997 }
998}
999
1000impl Drop for Session {
1001 /// On session drop, explicitly zero the resumption secret. The
1002 /// `CryptoState` inside `crypto` is itself `ZeroizeOnDrop`, so its
1003 /// `session_key` is handled there.
1004 fn drop(&mut self) {
1005 if let Some(mut secret) = self.resumption_secret.write().take() {
1006 secret.zeroize();
1007 }
1008 }
1009}