Skip to main content

specter/transport/h3/
path.rs

1//! Native QUIC path validation, connection-ID inventory, and anti-amplification
2//! primitives.
3//!
4//! Implements the RFC 9000 § 5.1 / § 8.1 / § 9 state needed to support path
5//! migration beyond the existing `QuicPathValidator` token tracker:
6//!
7//! - `QuicAntiAmplificationLimit` enforces the RFC 9000 § 8.1 3x send budget
8//!   that protects unvalidated peer addresses from being used to amplify
9//!   traffic toward third parties.
10//! - `QuicConnectionIdInventory` tracks locally issued connection IDs (RFC 9000
11//!   § 5.1.1) and peer-issued connection IDs (RFC 9000 § 5.1.2), processes
12//!   NEW_CONNECTION_ID (RFC 9000 § 19.15) and RETIRE_CONNECTION_ID (RFC 9000
13//!   § 19.16) frames, enforces `active_connection_id_limit` (RFC 9000 § 18.2),
14//!   and surfaces retire-prior-to obligations as outbound frame work.
15//! - `QuicPathState` / `QuicPath` / `QuicPathSet` track the RFC 9000 § 9
16//!   primary path and any probing paths during a migration attempt, including
17//!   per-path anti-amplification accounting and pending PATH_CHALLENGE tokens.
18//!
19//! Driver / handshake integration (issuing NEW_CONNECTION_ID after handshake
20//! completion, switching the active path on validation success, gating
21//! outbound packet builders on the anti-amplification budget) is layered on
22//! top of these primitives.
23
24use std::collections::{BTreeMap, VecDeque};
25use std::net::SocketAddr;
26use std::time::Instant;
27
28use bytes::Bytes;
29
30use crate::error::{Error, Result};
31use crate::transport::h3::quic::ConnectionId;
32
33/// RFC 9000 § 18.2 minimum value for `active_connection_id_limit`. A peer must
34/// be willing to track at least two connection IDs in addition to the one used
35/// during the handshake; we enforce the same floor for our own inventory.
36pub const MIN_ACTIVE_CONNECTION_ID_LIMIT: u64 = 2;
37
38/// RFC 9000 § 8.1 anti-amplification factor. Until a peer address is
39/// validated, an endpoint must not send more than three times the amount of
40/// data it has received from that address.
41pub const ANTI_AMPLIFICATION_FACTOR: u64 = 3;
42
43/// RFC 9000 § 8.1 per-path send budget tracker. Until the path is validated,
44/// the endpoint may not send more than `ANTI_AMPLIFICATION_FACTOR * bytes_received`.
45#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
46pub struct QuicAntiAmplificationLimit {
47    bytes_received: u64,
48    bytes_sent: u64,
49    validated: bool,
50}
51
52impl QuicAntiAmplificationLimit {
53    pub fn new() -> Self {
54        Self::default()
55    }
56
57    pub fn bytes_received(&self) -> u64 {
58        self.bytes_received
59    }
60
61    pub fn bytes_sent(&self) -> u64 {
62        self.bytes_sent
63    }
64
65    pub fn validated(&self) -> bool {
66        self.validated
67    }
68
69    /// Mark the path as validated, removing the 3x cap (RFC 9000 § 8.1).
70    pub fn mark_validated(&mut self) {
71        self.validated = true;
72    }
73
74    pub fn on_received(&mut self, len: usize) {
75        self.bytes_received = self.bytes_received.saturating_add(len as u64);
76    }
77
78    pub fn on_sent(&mut self, len: usize) {
79        self.bytes_sent = self.bytes_sent.saturating_add(len as u64);
80    }
81
82    /// Remaining send budget before the 3x cap is hit. Returns `u64::MAX` once
83    /// the path is validated.
84    pub fn remaining_send_budget(&self) -> u64 {
85        if self.validated {
86            return u64::MAX;
87        }
88        let allowance = self
89            .bytes_received
90            .saturating_mul(ANTI_AMPLIFICATION_FACTOR);
91        allowance.saturating_sub(self.bytes_sent)
92    }
93
94    /// Whether sending `additional_bytes` is permitted under the current
95    /// anti-amplification accounting.
96    pub fn may_send(&self, additional_bytes: usize) -> bool {
97        if self.validated {
98            return true;
99        }
100        self.remaining_send_budget() >= additional_bytes as u64
101    }
102}
103
104/// RFC 9000 § 5.1.1: a connection ID issued by this endpoint. We accept
105/// packets destined to any non-retired local CID.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct LocalConnectionIdEntry {
108    pub sequence_number: u64,
109    pub connection_id: ConnectionId,
110    pub stateless_reset_token: [u8; 16],
111    pub retired: bool,
112}
113
114/// RFC 9000 § 5.1.2: a connection ID issued by the peer. We use one of these
115/// as the destination CID on outbound packets.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct PeerConnectionIdEntry {
118    pub sequence_number: u64,
119    pub connection_id: Bytes,
120    pub stateless_reset_token: [u8; 16],
121    pub retired: bool,
122}
123
124/// Per-connection CID inventory tracking both locally issued and peer-issued
125/// connection IDs (RFC 9000 § 5.1). Enforces the `active_connection_id_limit`
126/// transport parameter (RFC 9000 § 18.2) and surfaces retire-prior-to
127/// obligations from incoming NEW_CONNECTION_ID frames (RFC 9000 § 19.15).
128#[derive(Debug, Clone)]
129pub struct QuicConnectionIdInventory {
130    active_connection_id_limit: u64,
131    next_local_sequence: u64,
132    locals: BTreeMap<u64, LocalConnectionIdEntry>,
133    peer_retire_prior_to: u64,
134    next_peer_sequence: u64,
135    peers: BTreeMap<u64, PeerConnectionIdEntry>,
136    pending_peer_retires: VecDeque<u64>,
137    active_peer_sequence: Option<u64>,
138    active_local_sequence: Option<u64>,
139}
140
141impl QuicConnectionIdInventory {
142    /// Create an inventory with the negotiated `active_connection_id_limit`.
143    /// RFC 9000 § 18.2 requires the value be at least 2; we clamp to
144    /// `MIN_ACTIVE_CONNECTION_ID_LIMIT`.
145    pub fn new(active_connection_id_limit: u64) -> Self {
146        Self {
147            active_connection_id_limit: active_connection_id_limit
148                .max(MIN_ACTIVE_CONNECTION_ID_LIMIT),
149            next_local_sequence: 0,
150            locals: BTreeMap::new(),
151            peer_retire_prior_to: 0,
152            next_peer_sequence: 0,
153            peers: BTreeMap::new(),
154            pending_peer_retires: VecDeque::new(),
155            active_peer_sequence: None,
156            active_local_sequence: None,
157        }
158    }
159
160    pub fn active_connection_id_limit(&self) -> u64 {
161        self.active_connection_id_limit
162    }
163
164    /// Install the connection ID negotiated during the handshake as local
165    /// sequence number 0 (RFC 9000 § 5.1.1).
166    pub fn install_initial_local(
167        &mut self,
168        connection_id: ConnectionId,
169        stateless_reset_token: [u8; 16],
170    ) -> u64 {
171        let sequence_number = self.next_local_sequence;
172        self.locals.insert(
173            sequence_number,
174            LocalConnectionIdEntry {
175                sequence_number,
176                connection_id,
177                stateless_reset_token,
178                retired: false,
179            },
180        );
181        self.next_local_sequence = self.next_local_sequence.saturating_add(1);
182        if self.active_local_sequence.is_none() {
183            self.active_local_sequence = Some(sequence_number);
184        }
185        sequence_number
186    }
187
188    /// Install the peer-issued connection ID from the handshake as peer
189    /// sequence number 0 (RFC 9000 § 5.1.2).
190    pub fn install_initial_peer(
191        &mut self,
192        connection_id: Bytes,
193        stateless_reset_token: [u8; 16],
194    ) -> u64 {
195        let sequence_number = self.next_peer_sequence;
196        self.peers.insert(
197            sequence_number,
198            PeerConnectionIdEntry {
199                sequence_number,
200                connection_id,
201                stateless_reset_token,
202                retired: false,
203            },
204        );
205        self.next_peer_sequence = self.next_peer_sequence.saturating_add(1);
206        if self.active_peer_sequence.is_none() {
207            self.active_peer_sequence = Some(sequence_number);
208        }
209        sequence_number
210    }
211
212    /// Allocate the next outbound NEW_CONNECTION_ID frame, respecting the
213    /// peer's `active_connection_id_limit` (RFC 9000 § 18.2). Returns `None`
214    /// when issuing another local CID would exceed the negotiated limit.
215    pub fn allocate_next_local_to_issue(
216        &mut self,
217        connection_id: ConnectionId,
218        stateless_reset_token: [u8; 16],
219    ) -> Option<LocalConnectionIdEntry> {
220        if self.unretired_local_count() >= self.active_connection_id_limit as usize {
221            return None;
222        }
223        let sequence_number = self.next_local_sequence;
224        let entry = LocalConnectionIdEntry {
225            sequence_number,
226            connection_id,
227            stateless_reset_token,
228            retired: false,
229        };
230        self.locals.insert(sequence_number, entry.clone());
231        self.next_local_sequence = self.next_local_sequence.saturating_add(1);
232        Some(entry)
233    }
234
235    /// Register a locally issued connection ID at an explicit sequence number.
236    pub fn register_local_issued(
237        &mut self,
238        sequence_number: u64,
239        connection_id: ConnectionId,
240        stateless_reset_token: [u8; 16],
241    ) -> Result<()> {
242        if connection_id.as_bytes().is_empty() {
243            return Err(Error::quic(
244                "RFC9000 19.15: NEW_CONNECTION_ID cannot carry an empty connection id",
245            ));
246        }
247        if let Some(existing) = self.locals.get(&sequence_number) {
248            if existing.connection_id != connection_id
249                || existing.stateless_reset_token != stateless_reset_token
250            {
251                return Err(Error::quic(
252                    "RFC9000 19.15: NEW_CONNECTION_ID reuses sequence number with different CID",
253                ));
254            }
255            return Ok(());
256        }
257        if self.unretired_local_count() >= self.active_connection_id_limit as usize {
258            return Err(Error::quic(
259                "RFC9000 18.2: exceeded active_connection_id_limit",
260            ));
261        }
262        self.locals.insert(
263            sequence_number,
264            LocalConnectionIdEntry {
265                sequence_number,
266                connection_id,
267                stateless_reset_token,
268                retired: false,
269            },
270        );
271        if self.next_local_sequence <= sequence_number {
272            self.next_local_sequence = sequence_number.saturating_add(1);
273        }
274        Ok(())
275    }
276
277    /// Process an inbound NEW_CONNECTION_ID frame (RFC 9000 § 19.15). Validates
278    /// that the sequence number is novel, that `retire_prior_to` does not
279    /// exceed `sequence_number`, and enforces the `active_connection_id_limit`.
280    pub fn observe_peer_new_connection_id(
281        &mut self,
282        sequence_number: u64,
283        retire_prior_to: u64,
284        connection_id: Bytes,
285        stateless_reset_token: [u8; 16],
286    ) -> Result<()> {
287        if retire_prior_to > sequence_number {
288            return Err(Error::quic(
289                "RFC9000 19.15: NEW_CONNECTION_ID retire_prior_to exceeds sequence_number",
290            ));
291        }
292        if let Some(existing) = self.peers.get(&sequence_number) {
293            if existing.connection_id != connection_id
294                || existing.stateless_reset_token != stateless_reset_token
295            {
296                return Err(Error::quic(
297                    "RFC9000 19.15: NEW_CONNECTION_ID reuses sequence number with different CID",
298                ));
299            }
300            return Ok(());
301        }
302        if retire_prior_to > self.peer_retire_prior_to {
303            self.peer_retire_prior_to = retire_prior_to;
304            self.retire_peer_below(retire_prior_to);
305        }
306        let entry = PeerConnectionIdEntry {
307            sequence_number,
308            connection_id,
309            stateless_reset_token,
310            retired: sequence_number < self.peer_retire_prior_to,
311        };
312        if entry.retired {
313            self.pending_peer_retires.push_back(sequence_number);
314        }
315        self.peers.insert(sequence_number, entry);
316        if self.next_peer_sequence <= sequence_number {
317            self.next_peer_sequence = sequence_number.saturating_add(1);
318        }
319        if self.unretired_peer_count() > self.active_connection_id_limit as usize {
320            return Err(Error::quic(
321                "RFC9000 18.2: peer exceeded active_connection_id_limit",
322            ));
323        }
324        if self.active_peer_sequence.is_none() {
325            self.active_peer_sequence = Some(sequence_number);
326        } else if self
327            .active_peer_sequence
328            .is_some_and(|active| active < self.peer_retire_prior_to)
329        {
330            self.active_peer_sequence =
331                self.peers.iter().find_map(
332                    |(seq, entry)| {
333                        if entry.retired {
334                            None
335                        } else {
336                            Some(*seq)
337                        }
338                    },
339                );
340        }
341        Ok(())
342    }
343
344    /// Process an inbound RETIRE_CONNECTION_ID frame (RFC 9000 § 19.16): the
345    /// peer is retiring one of the connection IDs we previously issued.
346    pub fn observe_peer_retire_connection_id(&mut self, sequence_number: u64) -> Result<()> {
347        {
348            let entry = self.locals.get_mut(&sequence_number).ok_or_else(|| {
349                Error::quic("RFC9000 19.16: RETIRE_CONNECTION_ID for unknown local sequence")
350            })?;
351            entry.retired = true;
352        }
353        if Some(sequence_number) == self.active_local_sequence {
354            self.active_local_sequence = self.locals.iter().find_map(|(seq, value)| {
355                if !value.retired && *seq != sequence_number {
356                    Some(*seq)
357                } else {
358                    None
359                }
360            });
361        }
362        Ok(())
363    }
364
365    /// Drain the queue of peer sequence numbers we need to retire via outbound
366    /// RETIRE_CONNECTION_ID frames (driven by retire_prior_to obligations from
367    /// previously observed NEW_CONNECTION_ID frames).
368    pub fn drain_pending_peer_retires(&mut self) -> Vec<u64> {
369        self.pending_peer_retires.drain(..).collect()
370    }
371
372    pub fn active_local(&self) -> Option<&LocalConnectionIdEntry> {
373        self.active_local_sequence
374            .and_then(|seq| self.locals.get(&seq))
375    }
376
377    pub fn active_peer(&self) -> Option<&PeerConnectionIdEntry> {
378        self.active_peer_sequence
379            .and_then(|seq| self.peers.get(&seq))
380    }
381
382    /// Promote a non-retired peer CID to active, for example when migrating
383    /// to a probed path (RFC 9000 § 9.5).
384    pub fn promote_peer_to_active(&mut self, sequence_number: u64) -> Result<()> {
385        let entry = self
386            .peers
387            .get(&sequence_number)
388            .ok_or_else(|| Error::quic("RFC9000 9.5: cannot promote unknown peer connection ID"))?;
389        if entry.retired {
390            return Err(Error::quic(
391                "RFC9000 9.5: cannot promote a retired peer connection ID",
392            ));
393        }
394        self.active_peer_sequence = Some(sequence_number);
395        Ok(())
396    }
397
398    pub fn unretired_local_count(&self) -> usize {
399        self.locals.values().filter(|entry| !entry.retired).count()
400    }
401
402    pub fn unretired_peer_count(&self) -> usize {
403        self.peers.values().filter(|entry| !entry.retired).count()
404    }
405
406    pub fn unretired_locals(&self) -> impl Iterator<Item = &LocalConnectionIdEntry> {
407        self.locals.values().filter(|entry| !entry.retired)
408    }
409
410    fn retire_peer_below(&mut self, threshold: u64) {
411        for (sequence, entry) in self.peers.iter_mut() {
412            if *sequence < threshold && !entry.retired {
413                entry.retired = true;
414                self.pending_peer_retires.push_back(*sequence);
415            }
416        }
417    }
418}
419
420/// Match an inbound short-header packet's destination CID against locally
421/// issued connection IDs. Returns the matched CID and its byte length using
422/// longest-prefix selection when multiple local CIDs match.
423pub fn match_local_connection_id<'a>(
424    packet: &[u8],
425    locals: impl Iterator<Item = &'a LocalConnectionIdEntry>,
426) -> Option<(ConnectionId, usize)> {
427    if packet.first().is_some_and(|byte| byte & 0x80 != 0) {
428        return None;
429    }
430    let mut best: Option<(ConnectionId, usize)> = None;
431    for entry in locals {
432        let cid = entry.connection_id.as_bytes();
433        if packet.len() > cid.len()
434            && packet[1..1 + cid.len()] == cid[..]
435            && best.as_ref().is_none_or(|(_, len)| cid.len() > *len)
436        {
437            best = Some((entry.connection_id.clone(), cid.len()));
438        }
439    }
440    best
441}
442
443/// RFC 9000 § 9 lifecycle of a single path for a QUIC connection. A
444/// connection always has one `Primary` path; `Probing` paths are validated
445/// before promotion to `Primary`.
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum QuicPathState {
448    /// Newly observed peer address, no challenge in flight yet.
449    Probing,
450    /// PATH_CHALLENGE issued, awaiting matching PATH_RESPONSE (RFC 9000 § 8.2).
451    Validating,
452    /// PATH_RESPONSE matched; eligible for promotion (RFC 9000 § 9.4).
453    Validated,
454    /// Primary path for the connection.
455    Primary,
456    /// Abandoned after migration or validation failure.
457    Abandoned,
458}
459
460/// Per-path state used by the migration state machine.
461#[derive(Debug, Clone)]
462pub struct QuicPath {
463    pub peer_addr: SocketAddr,
464    pub state: QuicPathState,
465    pub anti_amplification: QuicAntiAmplificationLimit,
466    pub pending_challenges: Vec<[u8; 8]>,
467    pub last_activity: Option<Instant>,
468}
469
470impl QuicPath {
471    fn new(peer_addr: SocketAddr, state: QuicPathState) -> Self {
472        let mut anti_amplification = QuicAntiAmplificationLimit::default();
473        if matches!(state, QuicPathState::Primary | QuicPathState::Validated) {
474            anti_amplification.mark_validated();
475        }
476        Self {
477            peer_addr,
478            state,
479            anti_amplification,
480            pending_challenges: Vec::new(),
481            last_activity: None,
482        }
483    }
484}
485
486/// RFC 9000 § 9 container for the primary path plus any concurrent probing
487/// paths during a migration attempt. Sends and receives are tracked per path
488/// so anti-amplification accounting follows the actual peer address.
489#[derive(Debug, Default)]
490pub struct QuicPathSet {
491    paths: Vec<QuicPath>,
492    primary_index: Option<usize>,
493}
494
495impl QuicPathSet {
496    pub fn new() -> Self {
497        Self::default()
498    }
499
500    /// Install the handshake peer address as the primary path. Already
501    /// validated per RFC 9000 § 8.1 (the handshake itself validates the path).
502    pub fn install_primary(&mut self, peer_addr: SocketAddr) -> &QuicPath {
503        if let Some(existing) = self
504            .paths
505            .iter()
506            .position(|path| path.peer_addr == peer_addr)
507        {
508            self.primary_index = Some(existing);
509            let path = &mut self.paths[existing];
510            path.state = QuicPathState::Primary;
511            path.anti_amplification.mark_validated();
512            return &self.paths[existing];
513        }
514        self.paths
515            .push(QuicPath::new(peer_addr, QuicPathState::Primary));
516        let index = self.paths.len() - 1;
517        self.primary_index = Some(index);
518        &self.paths[index]
519    }
520
521    /// Observe an inbound packet from `peer_addr`. If the address is new it is
522    /// added as a `Probing` path; in either case the per-path
523    /// received-byte counter is incremented for anti-amplification accounting.
524    pub fn observe_packet_from(&mut self, peer_addr: SocketAddr, len: usize, now: Instant) {
525        if let Some(index) = self
526            .paths
527            .iter()
528            .position(|path| path.peer_addr == peer_addr)
529        {
530            let path = &mut self.paths[index];
531            path.anti_amplification.on_received(len);
532            path.last_activity = Some(now);
533            return;
534        }
535        let mut path = QuicPath::new(peer_addr, QuicPathState::Probing);
536        path.anti_amplification.on_received(len);
537        path.last_activity = Some(now);
538        self.paths.push(path);
539    }
540
541    /// Record bytes sent to `peer_addr` so anti-amplification accounting
542    /// stays accurate. Returns the per-path remaining send budget after the
543    /// accounting update.
544    pub fn record_sent_to(&mut self, peer_addr: SocketAddr, len: usize) -> Option<u64> {
545        let path = self
546            .paths
547            .iter_mut()
548            .find(|path| path.peer_addr == peer_addr)?;
549        path.anti_amplification.on_sent(len);
550        Some(path.anti_amplification.remaining_send_budget())
551    }
552
553    /// Whether the endpoint may send `additional_bytes` to `peer_addr` under
554    /// the current anti-amplification budget.
555    pub fn may_send_to(&self, peer_addr: SocketAddr, additional_bytes: usize) -> bool {
556        self.paths
557            .iter()
558            .find(|path| path.peer_addr == peer_addr)
559            .map(|path| path.anti_amplification.may_send(additional_bytes))
560            .unwrap_or(false)
561    }
562
563    /// Issue a PATH_CHALLENGE token for `peer_addr`. Returns true if a probing
564    /// or primary path exists for that address.
565    pub fn issue_challenge(&mut self, peer_addr: SocketAddr, token: [u8; 8]) -> bool {
566        if let Some(path) = self
567            .paths
568            .iter_mut()
569            .find(|path| path.peer_addr == peer_addr)
570        {
571            path.pending_challenges.push(token);
572            if path.state == QuicPathState::Probing {
573                path.state = QuicPathState::Validating;
574            }
575            true
576        } else {
577            false
578        }
579    }
580
581    /// Observe a PATH_RESPONSE from `peer_addr`. Returns true if the token
582    /// matched an outstanding challenge on that path; the path transitions to
583    /// `Validated` and its anti-amplification budget is removed.
584    pub fn observe_path_response(&mut self, peer_addr: SocketAddr, token: [u8; 8]) -> bool {
585        let Some(path) = self
586            .paths
587            .iter_mut()
588            .find(|path| path.peer_addr == peer_addr)
589        else {
590            return false;
591        };
592        let initial = path.pending_challenges.len();
593        path.pending_challenges.retain(|pending| pending != &token);
594        if path.pending_challenges.len() == initial {
595            return false;
596        }
597        path.state = QuicPathState::Validated;
598        path.anti_amplification.mark_validated();
599        true
600    }
601
602    /// Promote `peer_addr` from `Validated` to `Primary` and demote any
603    /// existing primary to `Abandoned` (RFC 9000 § 9.5).
604    pub fn promote_to_primary(&mut self, peer_addr: SocketAddr) -> bool {
605        let Some(target_index) = self
606            .paths
607            .iter()
608            .position(|path| path.peer_addr == peer_addr)
609        else {
610            return false;
611        };
612        if !matches!(
613            self.paths[target_index].state,
614            QuicPathState::Validated | QuicPathState::Primary
615        ) {
616            return false;
617        }
618        if let Some(previous) = self.primary_index {
619            if previous != target_index {
620                self.paths[previous].state = QuicPathState::Abandoned;
621            }
622        }
623        self.paths[target_index].state = QuicPathState::Primary;
624        self.paths[target_index].anti_amplification.mark_validated();
625        self.primary_index = Some(target_index);
626        true
627    }
628
629    pub fn primary(&self) -> Option<&QuicPath> {
630        self.primary_index.and_then(|index| self.paths.get(index))
631    }
632
633    pub fn path(&self, peer_addr: SocketAddr) -> Option<&QuicPath> {
634        self.paths.iter().find(|path| path.peer_addr == peer_addr)
635    }
636
637    pub fn paths(&self) -> &[QuicPath] {
638        &self.paths
639    }
640
641    /// Mark a known path as validated without requiring a PATH_RESPONSE token
642    /// match. Used when handshake-level validation state is authoritative.
643    pub fn mark_validated(&mut self, peer_addr: SocketAddr) -> bool {
644        let Some(path) = self
645            .paths
646            .iter_mut()
647            .find(|path| path.peer_addr == peer_addr)
648        else {
649            return false;
650        };
651        path.state = QuicPathState::Validated;
652        path.anti_amplification.mark_validated();
653        true
654    }
655
656    pub fn is_known_address(&self, peer_addr: SocketAddr) -> bool {
657        self.path(peer_addr).is_some()
658    }
659
660    pub fn is_probing_address(&self, peer_addr: SocketAddr) -> bool {
661        self.path(peer_addr).is_some_and(|path| {
662            matches!(
663                path.state,
664                QuicPathState::Probing | QuicPathState::Validating
665            )
666        })
667    }
668}
669
670/// Server-side path migration runtime: tracks primary vs probing peer
671/// addresses and anti-amplification budgets during active migration.
672#[derive(Debug)]
673pub struct QuicServerPathRuntime {
674    primary_peer: SocketAddr,
675    path_set: QuicPathSet,
676    probing_peer: Option<SocketAddr>,
677}
678
679impl QuicServerPathRuntime {
680    pub fn new(primary_peer: SocketAddr) -> Self {
681        let mut path_set = QuicPathSet::new();
682        path_set.install_primary(primary_peer);
683        Self {
684            primary_peer,
685            path_set,
686            probing_peer: None,
687        }
688    }
689
690    pub fn install_primary(&mut self, peer: SocketAddr) {
691        self.path_set.install_primary(peer);
692        self.primary_peer = peer;
693        self.probing_peer = None;
694    }
695
696    pub fn process_inbound(&mut self, remote: SocketAddr, len: usize, now: Instant) {
697        self.path_set.observe_packet_from(remote, len, now);
698    }
699
700    pub fn primary_peer(&self) -> SocketAddr {
701        self.primary_peer
702    }
703
704    pub fn probing_peer(&self) -> Option<SocketAddr> {
705        self.probing_peer
706    }
707
708    pub fn set_probing_peer(&mut self, addr: SocketAddr) {
709        self.probing_peer = Some(addr);
710    }
711
712    pub fn clear_probing_peer(&mut self) {
713        self.probing_peer = None;
714    }
715
716    pub fn may_send_to(&self, remote: SocketAddr, bytes: usize) -> bool {
717        self.path_set.may_send_to(remote, bytes)
718    }
719
720    pub fn record_sent_to(&mut self, remote: SocketAddr, len: usize) {
721        self.path_set.record_sent_to(remote, len);
722    }
723
724    pub fn issue_challenge(&mut self, remote: SocketAddr, token: [u8; 8]) -> bool {
725        self.set_probing_peer(remote);
726        self.path_set.issue_challenge(remote, token)
727    }
728
729    pub fn on_path_response(&mut self, remote: SocketAddr, token: [u8; 8]) -> bool {
730        self.path_set.observe_path_response(remote, token)
731    }
732
733    pub fn promote_primary(&mut self, remote: SocketAddr) -> bool {
734        if self.path_set.promote_to_primary(remote) {
735            self.primary_peer = remote;
736            self.clear_probing_peer();
737            true
738        } else {
739            false
740        }
741    }
742
743    pub fn mark_validated(&mut self, remote: SocketAddr) -> bool {
744        self.path_set.mark_validated(remote)
745    }
746
747    pub fn is_new_address(&self, remote: SocketAddr) -> bool {
748        !self.path_set.is_known_address(remote)
749    }
750
751    pub fn path_set(&self) -> &QuicPathSet {
752        &self.path_set
753    }
754
755    pub fn path_set_mut(&mut self) -> &mut QuicPathSet {
756        &mut self.path_set
757    }
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    fn local_entry(seq: u64) -> LocalConnectionIdEntry {
765        LocalConnectionIdEntry {
766            sequence_number: seq,
767            connection_id: ConnectionId::from_slice(&[seq as u8; 8]),
768            stateless_reset_token: [seq as u8; 16],
769            retired: false,
770        }
771    }
772
773    #[test]
774    fn anti_amplification_blocks_send_beyond_three_times_received() {
775        let mut limit = QuicAntiAmplificationLimit::new();
776        limit.on_received(1200);
777        assert_eq!(limit.remaining_send_budget(), 3600);
778        assert!(limit.may_send(3600));
779        assert!(!limit.may_send(3601));
780
781        limit.on_sent(1200);
782        assert_eq!(limit.remaining_send_budget(), 2400);
783        assert!(limit.may_send(2400));
784        assert!(!limit.may_send(2401));
785    }
786
787    #[test]
788    fn anti_amplification_validation_removes_cap() {
789        let mut limit = QuicAntiAmplificationLimit::new();
790        limit.on_received(100);
791        assert!(!limit.may_send(1_000_000));
792        limit.mark_validated();
793        assert!(limit.may_send(1_000_000));
794        assert_eq!(limit.remaining_send_budget(), u64::MAX);
795    }
796
797    #[test]
798    fn inventory_installs_initial_local_and_peer_at_sequence_zero() {
799        let mut inventory = QuicConnectionIdInventory::new(4);
800        let local_seq =
801            inventory.install_initial_local(ConnectionId::from_slice(&[1; 8]), [0xAA; 16]);
802        let peer_seq = inventory.install_initial_peer(Bytes::from_static(&[2; 8]), [0xBB; 16]);
803        assert_eq!(local_seq, 0);
804        assert_eq!(peer_seq, 0);
805        assert_eq!(inventory.active_local().map(|e| e.sequence_number), Some(0));
806        assert_eq!(inventory.active_peer().map(|e| e.sequence_number), Some(0));
807        assert_eq!(inventory.unretired_local_count(), 1);
808        assert_eq!(inventory.unretired_peer_count(), 1);
809    }
810
811    #[test]
812    fn inventory_observes_peer_new_connection_id_within_active_limit() {
813        let mut inventory = QuicConnectionIdInventory::new(4);
814        inventory.install_initial_peer(Bytes::from_static(&[0; 8]), [0xBB; 16]);
815
816        inventory
817            .observe_peer_new_connection_id(1, 0, Bytes::from_static(&[1; 8]), [0xCC; 16])
818            .expect("novel sequence accepted");
819        inventory
820            .observe_peer_new_connection_id(2, 0, Bytes::from_static(&[2; 8]), [0xDD; 16])
821            .expect("novel sequence accepted");
822        assert_eq!(inventory.unretired_peer_count(), 3);
823    }
824
825    #[test]
826    fn inventory_rejects_peer_new_connection_id_above_active_limit() {
827        let mut inventory = QuicConnectionIdInventory::new(2);
828        inventory.install_initial_peer(Bytes::from_static(&[0; 8]), [0xBB; 16]);
829        inventory
830            .observe_peer_new_connection_id(1, 0, Bytes::from_static(&[1; 8]), [0xCC; 16])
831            .expect("first novel sequence accepted");
832
833        let err = inventory
834            .observe_peer_new_connection_id(2, 0, Bytes::from_static(&[2; 8]), [0xDD; 16])
835            .expect_err("third unretired CID must violate active_connection_id_limit=2");
836        match err {
837            Error::Quic(msg) => {
838                assert!(msg.contains("active_connection_id_limit"), "{msg}");
839            }
840            other => panic!("unexpected error variant: {other:?}"),
841        }
842    }
843
844    #[test]
845    fn inventory_rejects_retire_prior_to_above_sequence_number() {
846        let mut inventory = QuicConnectionIdInventory::new(4);
847        inventory.install_initial_peer(Bytes::from_static(&[0; 8]), [0xBB; 16]);
848
849        let err = inventory
850            .observe_peer_new_connection_id(1, 2, Bytes::from_static(&[1; 8]), [0xCC; 16])
851            .expect_err("retire_prior_to > sequence_number is a protocol violation");
852        match err {
853            Error::Quic(msg) => {
854                assert!(msg.contains("retire_prior_to"), "{msg}");
855            }
856            other => panic!("unexpected error variant: {other:?}"),
857        }
858    }
859
860    #[test]
861    fn inventory_queues_peer_retires_when_retire_prior_to_advances() {
862        let mut inventory = QuicConnectionIdInventory::new(4);
863        inventory.install_initial_peer(Bytes::from_static(&[0; 8]), [0xBB; 16]);
864        inventory
865            .observe_peer_new_connection_id(1, 0, Bytes::from_static(&[1; 8]), [0xCC; 16])
866            .expect("first novel sequence accepted");
867        inventory
868            .observe_peer_new_connection_id(2, 2, Bytes::from_static(&[2; 8]), [0xDD; 16])
869            .expect("retire_prior_to=2 retires sequences 0 and 1");
870
871        let retired = inventory.drain_pending_peer_retires();
872        assert_eq!(retired, vec![0, 1]);
873        assert_eq!(inventory.unretired_peer_count(), 1);
874        assert_eq!(inventory.active_peer().map(|e| e.sequence_number), Some(2));
875    }
876
877    #[test]
878    fn inventory_retires_local_on_peer_retire_connection_id() {
879        let mut inventory = QuicConnectionIdInventory::new(4);
880        inventory.install_initial_local(ConnectionId::from_slice(&[1; 8]), [0xAA; 16]);
881        let issued = inventory
882            .allocate_next_local_to_issue(ConnectionId::from_slice(&[2; 8]), [0xBB; 16])
883            .expect("allocation within active_connection_id_limit");
884        assert_eq!(issued.sequence_number, 1);
885        assert_eq!(inventory.unretired_local_count(), 2);
886
887        inventory
888            .observe_peer_retire_connection_id(0)
889            .expect("peer retire of issued local sequence");
890        assert_eq!(inventory.unretired_local_count(), 1);
891        assert_eq!(
892            inventory.active_local().map(|e| e.sequence_number),
893            Some(1),
894            "active local shifts to the surviving sequence"
895        );
896    }
897
898    #[test]
899    fn inventory_rejects_retire_of_unknown_local_sequence() {
900        let mut inventory = QuicConnectionIdInventory::new(4);
901        inventory.install_initial_local(ConnectionId::from_slice(&[1; 8]), [0xAA; 16]);
902        let err = inventory
903            .observe_peer_retire_connection_id(99)
904            .expect_err("unknown sequence retire must error");
905        match err {
906            Error::Quic(msg) => {
907                assert!(msg.contains("RETIRE_CONNECTION_ID"), "{msg}");
908            }
909            other => panic!("unexpected error variant: {other:?}"),
910        }
911    }
912
913    #[test]
914    fn inventory_allocation_caps_at_active_connection_id_limit() {
915        let mut inventory = QuicConnectionIdInventory::new(2);
916        inventory.install_initial_local(ConnectionId::from_slice(&[1; 8]), [0xAA; 16]);
917        assert!(inventory
918            .allocate_next_local_to_issue(ConnectionId::from_slice(&[2; 8]), [0xBB; 16])
919            .is_some());
920        assert!(
921            inventory
922                .allocate_next_local_to_issue(ConnectionId::from_slice(&[3; 8]), [0xCC; 16])
923                .is_none(),
924            "third allocation must be rejected at limit=2"
925        );
926    }
927
928    #[test]
929    fn inventory_active_connection_id_limit_clamps_to_two() {
930        let inventory = QuicConnectionIdInventory::new(0);
931        assert_eq!(inventory.active_connection_id_limit(), 2);
932        let inventory = QuicConnectionIdInventory::new(1);
933        assert_eq!(inventory.active_connection_id_limit(), 2);
934    }
935
936    #[test]
937    fn inventory_promote_peer_to_active_requires_unretired_sequence() {
938        let mut inventory = QuicConnectionIdInventory::new(4);
939        inventory.install_initial_peer(Bytes::from_static(&[0; 8]), [0xBB; 16]);
940        inventory
941            .observe_peer_new_connection_id(1, 0, Bytes::from_static(&[1; 8]), [0xCC; 16])
942            .unwrap();
943        inventory.promote_peer_to_active(1).expect("known sequence");
944        assert_eq!(inventory.active_peer().map(|e| e.sequence_number), Some(1));
945
946        inventory
947            .observe_peer_new_connection_id(2, 2, Bytes::from_static(&[2; 8]), [0xDD; 16])
948            .expect("retire_prior_to=2 retires sequences 0 and 1");
949        let err = inventory
950            .promote_peer_to_active(1)
951            .expect_err("promoting a retired sequence must fail");
952        match err {
953            Error::Quic(msg) => assert!(msg.contains("retired"), "{msg}"),
954            other => panic!("unexpected error variant: {other:?}"),
955        }
956    }
957
958    #[test]
959    fn local_entries_default_to_active_local() {
960        let mut inventory = QuicConnectionIdInventory::new(4);
961        let seq = inventory.install_initial_local(local_entry(0).connection_id, [0xAA; 16]);
962        assert_eq!(
963            inventory.active_local().map(|e| e.sequence_number),
964            Some(seq)
965        );
966    }
967
968    fn addr(port: u16) -> SocketAddr {
969        SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), port)
970    }
971
972    #[test]
973    fn pathset_install_primary_is_already_validated() {
974        let mut set = QuicPathSet::new();
975        let primary = set.install_primary(addr(7000));
976        assert_eq!(primary.state, QuicPathState::Primary);
977        assert!(primary.anti_amplification.validated());
978        assert!(set.may_send_to(addr(7000), 1_000_000));
979    }
980
981    #[test]
982    fn pathset_observes_probing_path_from_new_address() {
983        let mut set = QuicPathSet::new();
984        set.install_primary(addr(7000));
985        set.observe_packet_from(addr(7001), 1200, Instant::now());
986        let probing = set.path(addr(7001)).expect("path tracked");
987        assert_eq!(probing.state, QuicPathState::Probing);
988        assert!(!probing.anti_amplification.validated());
989        assert_eq!(probing.anti_amplification.bytes_received(), 1200);
990        assert!(set.may_send_to(addr(7001), 3600));
991        assert!(!set.may_send_to(addr(7001), 3601));
992    }
993
994    #[test]
995    fn pathset_challenge_validation_promotes_path_and_unblocks_send_budget() {
996        let mut set = QuicPathSet::new();
997        set.install_primary(addr(7000));
998        set.observe_packet_from(addr(7001), 1200, Instant::now());
999        let token = [0xAB; 8];
1000        assert!(set.issue_challenge(addr(7001), token));
1001        assert!(matches!(
1002            set.path(addr(7001)).map(|p| p.state),
1003            Some(QuicPathState::Validating)
1004        ));
1005        assert!(set.observe_path_response(addr(7001), token));
1006        let validated = set.path(addr(7001)).expect("path still tracked");
1007        assert_eq!(validated.state, QuicPathState::Validated);
1008        assert!(validated.anti_amplification.validated());
1009        assert!(set.may_send_to(addr(7001), 1_000_000));
1010    }
1011
1012    #[test]
1013    fn pathset_promote_to_primary_demotes_previous_primary() {
1014        let mut set = QuicPathSet::new();
1015        set.install_primary(addr(7000));
1016        set.observe_packet_from(addr(7001), 1200, Instant::now());
1017        let token = [0xCD; 8];
1018        set.issue_challenge(addr(7001), token);
1019        set.observe_path_response(addr(7001), token);
1020        assert!(set.promote_to_primary(addr(7001)));
1021        assert_eq!(
1022            set.primary().map(|p| p.peer_addr),
1023            Some(addr(7001)),
1024            "new primary path is the validated address"
1025        );
1026        assert_eq!(
1027            set.path(addr(7000)).map(|p| p.state),
1028            Some(QuicPathState::Abandoned)
1029        );
1030    }
1031
1032    #[test]
1033    fn pathset_observe_path_response_ignores_unknown_token() {
1034        let mut set = QuicPathSet::new();
1035        set.install_primary(addr(7000));
1036        set.observe_packet_from(addr(7001), 1200, Instant::now());
1037        set.issue_challenge(addr(7001), [0xAA; 8]);
1038        assert!(
1039            !set.observe_path_response(addr(7001), [0xBB; 8]),
1040            "non-matching token must be ignored"
1041        );
1042        assert!(
1043            !set.path(addr(7001)).unwrap().anti_amplification.validated(),
1044            "validation must not be claimed on a bad token"
1045        );
1046    }
1047
1048    #[test]
1049    fn pathset_mark_validated_clears_anti_amplification_cap() {
1050        let mut set = QuicPathSet::new();
1051        set.install_primary(addr(7000));
1052        set.observe_packet_from(addr(7001), 1200, Instant::now());
1053        assert!(!set.may_send_to(addr(7001), 3601));
1054        assert!(set.mark_validated(addr(7001)));
1055        assert!(set.may_send_to(addr(7001), 1_000_000));
1056    }
1057
1058    #[test]
1059    fn match_local_connection_id_prefers_longest_prefix() {
1060        let short = LocalConnectionIdEntry {
1061            sequence_number: 0,
1062            connection_id: ConnectionId::from_slice(b"abc"),
1063            stateless_reset_token: [0; 16],
1064            retired: false,
1065        };
1066        let long = LocalConnectionIdEntry {
1067            sequence_number: 1,
1068            connection_id: ConnectionId::from_slice(b"abcdefgh"),
1069            stateless_reset_token: [0; 16],
1070            retired: false,
1071        };
1072        let mut packet = vec![0x40];
1073        packet.extend_from_slice(b"abcdefgh");
1074        packet.push(0x42);
1075
1076        let matched = match_local_connection_id(&packet, [short, long].iter())
1077            .expect("longest local CID should match");
1078        assert_eq!(matched.0.as_bytes(), b"abcdefgh");
1079        assert_eq!(matched.1, b"abcdefgh".len());
1080    }
1081
1082    #[test]
1083    fn server_path_runtime_promotes_primary_after_validation() {
1084        let primary = addr(7000);
1085        let migrated = addr(7001);
1086        let mut runtime = QuicServerPathRuntime::new(primary);
1087        runtime.process_inbound(migrated, 1200, Instant::now());
1088        let token = [0xEF; 8];
1089        assert!(runtime.issue_challenge(migrated, token));
1090        assert_eq!(runtime.probing_peer(), Some(migrated));
1091        assert!(runtime.on_path_response(migrated, token));
1092        assert!(runtime.promote_primary(migrated));
1093        assert_eq!(runtime.primary_peer(), migrated);
1094        assert_eq!(runtime.probing_peer(), None);
1095        assert!(runtime.may_send_to(migrated, 1_000_000));
1096    }
1097}