Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::Instant;
18
19fn draw_rekey_jitter() -> i64 {
20    rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23/// Connectivity state for an active peer.
24///
25/// This is simpler than the full PeerState since authentication is complete.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28    /// Peer is fully connected and responsive.
29    Connected,
30    /// Peer hasn't been heard from recently (potential timeout).
31    Stale,
32    /// Connection lost, attempting to reconnect.
33    Reconnecting,
34    /// Peer has been explicitly disconnected.
35    Disconnected,
36}
37
38impl ConnectivityState {
39    /// Check if the peer is usable for sending traffic.
40    pub fn can_send(&self) -> bool {
41        matches!(
42            self,
43            ConnectivityState::Connected | ConnectivityState::Stale
44        )
45    }
46
47    /// Check if this is a terminal state requiring cleanup.
48    pub fn is_terminal(&self) -> bool {
49        matches!(self, ConnectivityState::Disconnected)
50    }
51
52    /// Check if peer is fully healthy.
53    pub fn is_healthy(&self) -> bool {
54        matches!(self, ConnectivityState::Connected)
55    }
56}
57
58impl fmt::Display for ConnectivityState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let s = match self {
61            ConnectivityState::Connected => "connected",
62            ConnectivityState::Stale => "stale",
63            ConnectivityState::Reconnecting => "reconnecting",
64            ConnectivityState::Disconnected => "disconnected",
65        };
66        write!(f, "{}", s)
67    }
68}
69
70/// A fully authenticated remote FIPS node.
71///
72/// Created only after successful Noise KK handshake. The identity is
73/// cryptographically verified at this point.
74///
75/// Note: ActivePeer intentionally does not implement Clone because it
76/// contains NoiseSession, which cannot be safely cloned (cloning would
77/// risk nonce reuse, a catastrophic security failure).
78#[derive(Debug)]
79pub struct ActivePeer {
80    // === Identity (Verified) ===
81    /// Cryptographic identity (verified via handshake).
82    identity: PeerIdentity,
83
84    // === Connection ===
85    /// Link used to reach this peer.
86    link_id: LinkId,
87    /// Current connectivity state.
88    connectivity: ConnectivityState,
89
90    // === Session (Wire Protocol) ===
91    /// Noise session for encryption/decryption (None if legacy peer).
92    noise_session: Option<NoiseSession>,
93    /// Our session index (they include this when sending TO us).
94    our_index: Option<SessionIndex>,
95    /// Their session index (we include this when sending TO them).
96    their_index: Option<SessionIndex>,
97    /// Transport ID for this peer's link.
98    transport_id: Option<TransportId>,
99    /// Current transport address (for roaming support).
100    current_addr: Option<TransportAddr>,
101
102    // === Spanning Tree ===
103    /// Their latest parent declaration.
104    declaration: Option<ParentDeclaration>,
105    /// Their path to root.
106    ancestry: Option<TreeCoordinate>,
107
108    // === Tree Announce Rate Limiting ===
109    /// Minimum interval between TreeAnnounce messages (milliseconds).
110    tree_announce_min_interval_ms: u64,
111    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
112    last_tree_announce_sent_ms: u64,
113    /// Whether a tree announce is pending (deferred due to rate limit).
114    pending_tree_announce: bool,
115
116    // === Bloom Filter ===
117    /// What's reachable through them (inbound filter).
118    inbound_filter: Option<BloomFilter>,
119    /// Their filter's sequence number.
120    filter_sequence: u64,
121    /// When we received their last filter (Unix milliseconds).
122    filter_received_at: u64,
123    /// Whether we owe them a filter update.
124    pending_filter_update: bool,
125
126    // === Timing ===
127    /// Session start time for computing session-relative timestamps.
128    /// Used as the epoch for the 4-byte inner header timestamp field.
129    session_start: Instant,
130
131    // === Statistics ===
132    /// Link statistics.
133    link_stats: LinkStats,
134    /// When this peer was authenticated (Unix milliseconds).
135    authenticated_at: u64,
136    /// When this peer was last seen (any activity, Unix milliseconds).
137    last_seen: u64,
138
139    // === Epoch (Restart Detection) ===
140    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
141    remote_epoch: Option<[u8; 8]>,
142
143    // === MMP ===
144    /// Per-peer MMP state (None for legacy peers without Noise sessions).
145    mmp: Option<MmpPeerState>,
146
147    // === Heartbeat ===
148    /// When we last sent a heartbeat to this peer.
149    last_heartbeat_sent: Option<Instant>,
150
151    // === Handshake Resend ===
152    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
153    /// Cleared after the handshake timeout window.
154    handshake_msg2: Option<Vec<u8>>,
155
156    // === Replay Detection Suppression ===
157    /// Number of replay detections suppressed since last session reset.
158    replay_suppressed_count: u32,
159    /// Consecutive decryption failures (reset on any successful decrypt).
160    consecutive_decrypt_failures: u32,
161
162    // === Rekey (Key Rotation) ===
163    /// When the current Noise session was established (for rekey timer).
164    session_established_at: Instant,
165    /// Per-session symmetric jitter applied to the rekey timer trigger.
166    rekey_jitter_secs: i64,
167    /// Current K-bit epoch value (alternates each rekey).
168    current_k_bit: bool,
169    /// Previous session kept alive during drain window after cutover.
170    previous_session: Option<NoiseSession>,
171    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
172    previous_our_index: Option<SessionIndex>,
173    /// When the drain window started (None = no drain in progress).
174    drain_started: Option<Instant>,
175    /// Pending new session from completed rekey (before K-bit cutover).
176    pending_new_session: Option<NoiseSession>,
177    /// Pending new session's our_index.
178    pending_our_index: Option<SessionIndex>,
179    /// Pending new session's their_index.
180    pending_their_index: Option<SessionIndex>,
181    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
182    rekey_in_progress: bool,
183    /// When we last received a rekey msg1 from this peer (dampening).
184    last_peer_rekey: Option<Instant>,
185    /// In-progress rekey: Noise handshake state (initiator only).
186    rekey_handshake: Option<NoiseHandshakeState>,
187    /// In-progress rekey: our new session index.
188    rekey_our_index: Option<SessionIndex>,
189    /// In-progress rekey: wire-format msg1 for resend.
190    rekey_msg1: Option<Vec<u8>>,
191    /// In-progress rekey: next resend timestamp (Unix ms).
192    rekey_msg1_next_resend: u64,
193
194    // === Connected Peer UDP Socket (Unix fast path) ===
195    /// Per-peer `connect()`-ed UDP socket, opened once we have a
196    /// stable kernel `SocketAddr` for the peer (i.e. session
197    /// established + transport address known). When `Some`, the
198    /// encrypt-worker send path can `sendmsg(2)` on this fd without
199    /// per-packet `msg_name` — the kernel-side route + neighbor cache
200    /// is pinned by the `connect()` call. On the receive side, Linux
201    /// and Darwin UDP demux preferentially route inbound packets from
202    /// this peer to this socket (most-specific 5-tuple match via
203    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
204    ///
205    /// Closed automatically on Drop. Behind an `Arc` so the
206    /// encrypt-worker's send path can hold a refcount without owning
207    /// the only handle (rekey / address-change may rotate the socket
208    /// while older jobs are still in-flight on the worker channel).
209    #[cfg(any(target_os = "linux", target_os = "macos"))]
210    connected_udp:
211        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
212
213    /// Per-peer receive-drain thread. Always paired with
214    /// `connected_udp`: while the connected socket is installed, the
215    /// kernel UDP demux preferentially routes inbound packets from
216    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
217    /// socket *must* be drained or packets pile up in its kernel
218    /// recv buffer. Drop signals the thread to exit via self-pipe.
219    #[cfg(any(target_os = "linux", target_os = "macos"))]
220    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
221}
222
223impl ActivePeer {
224    /// Create a new active peer from verified identity.
225    ///
226    /// Called after successful authentication handshake.
227    /// For peers with Noise sessions, use `with_session` instead.
228    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
229        let now = Instant::now();
230        Self {
231            identity,
232            link_id,
233            connectivity: ConnectivityState::Connected,
234            noise_session: None,
235            our_index: None,
236            their_index: None,
237            transport_id: None,
238            current_addr: None,
239            declaration: None,
240            ancestry: None,
241            tree_announce_min_interval_ms: 500,
242            last_tree_announce_sent_ms: 0,
243            pending_tree_announce: false,
244            inbound_filter: None,
245            filter_sequence: 0,
246            filter_received_at: 0,
247            pending_filter_update: true, // Send filter on new connection
248            session_start: now,
249            link_stats: LinkStats::new(),
250            authenticated_at,
251            last_seen: authenticated_at,
252            remote_epoch: None,
253            mmp: None,
254            last_heartbeat_sent: None,
255            handshake_msg2: None,
256            replay_suppressed_count: 0,
257            consecutive_decrypt_failures: 0,
258            session_established_at: now,
259            rekey_jitter_secs: draw_rekey_jitter(),
260            current_k_bit: false,
261            previous_session: None,
262            previous_our_index: None,
263            drain_started: None,
264            pending_new_session: None,
265            pending_our_index: None,
266            pending_their_index: None,
267            rekey_in_progress: false,
268            last_peer_rekey: None,
269            rekey_handshake: None,
270            rekey_our_index: None,
271            rekey_msg1: None,
272            rekey_msg1_next_resend: 0,
273            #[cfg(any(target_os = "linux", target_os = "macos"))]
274            connected_udp: None,
275            #[cfg(any(target_os = "linux", target_os = "macos"))]
276            peer_recv_drain: None,
277        }
278    }
279
280    /// Create from verified identity with existing link stats.
281    ///
282    /// Used when promoting from PeerConnection, preserving handshake stats.
283    /// For peers with Noise sessions, use `with_session` instead.
284    pub fn with_stats(
285        identity: PeerIdentity,
286        link_id: LinkId,
287        authenticated_at: u64,
288        link_stats: LinkStats,
289    ) -> Self {
290        let mut peer = Self::new(identity, link_id, authenticated_at);
291        peer.link_stats = link_stats;
292        peer
293    }
294
295    /// Create from verified identity with Noise session and index tracking.
296    ///
297    /// This is the primary constructor for the wire protocol path.
298    /// The NoiseSession provides encryption/decryption and replay protection.
299    #[allow(clippy::too_many_arguments)]
300    pub fn with_session(
301        identity: PeerIdentity,
302        link_id: LinkId,
303        authenticated_at: u64,
304        noise_session: NoiseSession,
305        our_index: SessionIndex,
306        their_index: SessionIndex,
307        transport_id: TransportId,
308        current_addr: TransportAddr,
309        link_stats: LinkStats,
310        is_initiator: bool,
311        mmp_config: &MmpConfig,
312        remote_epoch: Option<[u8; 8]>,
313    ) -> Self {
314        let now = Instant::now();
315        Self {
316            identity,
317            link_id,
318            connectivity: ConnectivityState::Connected,
319            noise_session: Some(noise_session),
320            our_index: Some(our_index),
321            their_index: Some(their_index),
322            transport_id: Some(transport_id),
323            current_addr: Some(current_addr),
324            declaration: None,
325            ancestry: None,
326            tree_announce_min_interval_ms: 500,
327            last_tree_announce_sent_ms: 0,
328            pending_tree_announce: false,
329            inbound_filter: None,
330            filter_sequence: 0,
331            filter_received_at: 0,
332            pending_filter_update: true,
333            session_start: now,
334            link_stats,
335            authenticated_at,
336            last_seen: authenticated_at,
337            remote_epoch,
338            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
339            last_heartbeat_sent: None,
340            handshake_msg2: None,
341            replay_suppressed_count: 0,
342            consecutive_decrypt_failures: 0,
343            session_established_at: now,
344            rekey_jitter_secs: draw_rekey_jitter(),
345            current_k_bit: false,
346            previous_session: None,
347            previous_our_index: None,
348            drain_started: None,
349            pending_new_session: None,
350            pending_our_index: None,
351            pending_their_index: None,
352            rekey_in_progress: false,
353            last_peer_rekey: None,
354            rekey_handshake: None,
355            rekey_our_index: None,
356            rekey_msg1: None,
357            rekey_msg1_next_resend: 0,
358            #[cfg(any(target_os = "linux", target_os = "macos"))]
359            connected_udp: None,
360            #[cfg(any(target_os = "linux", target_os = "macos"))]
361            peer_recv_drain: None,
362        }
363    }
364
365    /// Unix UDP fast path: clone the refcount on the per-peer
366    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
367    /// worker send path uses this to bypass the wildcard listen
368    /// socket's per-packet sockaddr handling.
369    #[cfg(any(target_os = "linux", target_os = "macos"))]
370    pub(crate) fn connected_udp(
371        &self,
372    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
373        self.connected_udp.clone()
374    }
375
376    /// Install a per-peer `connect()`-ed UDP socket **with** its
377    /// paired recv drain thread. The two are owned together: the
378    /// drain thread is the only consumer of packets arriving on this
379    /// socket (Linux UDP demux preferentially routes them away from
380    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
381    /// so installing one without the other would silently drop
382    /// inbound packets from this peer.
383    ///
384    /// Replacing an existing pair drops the old drain (its self-pipe
385    /// shutdown signal fires; thread exits within one poll
386    /// iteration) and drops the old socket Arc. Any encrypt-worker
387    /// jobs already in-flight holding the old socket Arc stay valid
388    /// until they complete, at which point the kernel fd closes.
389    #[cfg(any(target_os = "linux", target_os = "macos"))]
390    pub(crate) fn set_connected_udp(
391        &mut self,
392        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
393        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
394    ) {
395        // Drop order matters: drop the old drain BEFORE the old
396        // socket so the drain thread's last reference to the kernel
397        // fd is released cleanly.
398        self.peer_recv_drain = None;
399        self.connected_udp = None;
400        self.connected_udp = Some(socket);
401        self.peer_recv_drain = Some(drain);
402    }
403
404    /// Clear the per-peer connected UDP socket + drain thread (e.g.
405    /// on rekey or disconnect). The drain thread exits via self-pipe
406    /// signal; the kernel fd closes when the last `Arc` to the
407    /// socket drops.
408    #[cfg(any(target_os = "linux", target_os = "macos"))]
409    pub(crate) fn clear_connected_udp(&mut self) {
410        self.peer_recv_drain = None;
411        self.connected_udp = None;
412    }
413
414    // === Identity Accessors ===
415
416    /// Get the peer's verified identity.
417    pub fn identity(&self) -> &PeerIdentity {
418        &self.identity
419    }
420
421    /// Get the peer's NodeAddr.
422    pub fn node_addr(&self) -> &NodeAddr {
423        self.identity.node_addr()
424    }
425
426    /// Get the peer's FIPS address.
427    pub fn address(&self) -> &FipsAddress {
428        self.identity.address()
429    }
430
431    /// Get the peer's public key.
432    pub fn pubkey(&self) -> XOnlyPublicKey {
433        self.identity.pubkey()
434    }
435
436    /// Get the peer's npub string.
437    pub fn npub(&self) -> String {
438        self.identity.npub()
439    }
440
441    // === Connection Accessors ===
442
443    /// Get the link ID.
444    pub fn link_id(&self) -> LinkId {
445        self.link_id
446    }
447
448    /// Get the connectivity state.
449    pub fn connectivity(&self) -> ConnectivityState {
450        self.connectivity
451    }
452
453    /// Check if peer can receive traffic.
454    pub fn can_send(&self) -> bool {
455        self.connectivity.can_send()
456    }
457
458    /// Check if peer is fully healthy.
459    pub fn is_healthy(&self) -> bool {
460        self.connectivity.is_healthy()
461    }
462
463    /// Check if peer is disconnected.
464    pub fn is_disconnected(&self) -> bool {
465        self.connectivity.is_terminal()
466    }
467
468    // === Session Accessors ===
469
470    /// Check if this peer has a Noise session.
471    pub fn has_session(&self) -> bool {
472        self.noise_session.is_some()
473    }
474
475    /// Get the Noise session, if present.
476    pub fn noise_session(&self) -> Option<&NoiseSession> {
477        self.noise_session.as_ref()
478    }
479
480    /// Get mutable access to the Noise session.
481    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
482        self.noise_session.as_mut()
483    }
484
485    /// Get our session index (they use this to send TO us).
486    pub fn our_index(&self) -> Option<SessionIndex> {
487        self.our_index
488    }
489
490    /// Get their session index (we use this to send TO them).
491    pub fn their_index(&self) -> Option<SessionIndex> {
492        self.their_index
493    }
494
495    /// Update their session index (used during cross-connection resolution
496    /// when the losing node keeps its inbound session but needs the peer's
497    /// outbound index).
498    pub fn set_their_index(&mut self, index: SessionIndex) {
499        self.their_index = Some(index);
500    }
501
502    /// Replace the Noise session and indices during cross-connection resolution.
503    ///
504    /// When both nodes simultaneously initiate, each promotes its inbound
505    /// handshake first. When the peer's msg2 arrives, we learn the correct
506    /// session — the outbound handshake that pairs with the peer's inbound.
507    /// This replaces the entire session so both nodes use matching keys.
508    ///
509    /// Returns the old our_index so the caller can update peers_by_index.
510    /// Also resets the replay suppression counter since the session changed.
511    pub fn replace_session(
512        &mut self,
513        new_session: NoiseSession,
514        new_our_index: SessionIndex,
515        new_their_index: SessionIndex,
516    ) -> Option<SessionIndex> {
517        self.reset_replay_suppressed();
518        let old_our_index = self.our_index;
519        self.noise_session = Some(new_session);
520        self.our_index = Some(new_our_index);
521        self.their_index = Some(new_their_index);
522        old_our_index
523    }
524
525    /// Get the transport ID for this peer.
526    pub fn transport_id(&self) -> Option<TransportId> {
527        self.transport_id
528    }
529
530    /// Get the current transport address.
531    pub fn current_addr(&self) -> Option<&TransportAddr> {
532        self.current_addr.as_ref()
533    }
534
535    /// Update the current address (for roaming support).
536    ///
537    /// Called when we receive a valid authenticated packet from a new address.
538    /// Short-circuits when neither the transport_id nor the TransportAddr
539    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
540    /// stable per session and the overwhelming majority of inbound
541    /// packets hit this fast path. Saves both the redundant
542    /// `Option::take` + Vec drop on the cached side and the caller's
543    /// `.clone()` allocation on the input side: the caller can pass
544    /// `&TransportAddr` and we only `.to_owned()` when storing.
545    ///
546    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
547    /// actually changed. The caller uses this signal to invalidate
548    /// derived caches whose validity is bound to the peer's 5-tuple —
549    /// in particular the Linux per-peer `connect()`-ed UDP socket,
550    /// which is pinned to one kernel route + neighbour entry and goes
551    /// stale the moment the peer roams. (Clearing it here would force
552    /// `&mut self` users into the wrong shape: the policy of when to
553    /// rebuild the connected socket lives on `Node`, not on the peer
554    /// state. Returning a bool keeps that policy where it belongs.)
555    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
556        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
557            return false;
558        }
559        self.transport_id = Some(transport_id);
560        self.current_addr = Some(addr.clone());
561        true
562    }
563
564    // === Handshake Resend ===
565
566    /// Store wire-format msg2 for resend on duplicate msg1.
567    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
568        self.handshake_msg2 = Some(msg2);
569    }
570
571    /// Get stored msg2 bytes for resend.
572    pub fn handshake_msg2(&self) -> Option<&[u8]> {
573        self.handshake_msg2.as_deref()
574    }
575
576    /// Clear stored msg2 (no longer needed after handshake window).
577    pub fn clear_handshake_msg2(&mut self) {
578        self.handshake_msg2 = None;
579    }
580
581    // === Replay Detection Suppression ===
582
583    /// Increment replay suppression counter. Returns the new count.
584    pub fn increment_replay_suppressed(&mut self) -> u32 {
585        self.replay_suppressed_count += 1;
586        self.replay_suppressed_count
587    }
588
589    /// Reset replay suppression counter, returning previous count.
590    pub fn reset_replay_suppressed(&mut self) -> u32 {
591        let count = self.replay_suppressed_count;
592        self.replay_suppressed_count = 0;
593        count
594    }
595
596    /// Current replay suppression count.
597    pub fn replay_suppressed_count(&self) -> u32 {
598        self.replay_suppressed_count
599    }
600
601    // === Decryption Failure Tracking ===
602
603    /// Increment consecutive decryption failure counter, returning new count.
604    pub fn increment_decrypt_failures(&mut self) -> u32 {
605        self.consecutive_decrypt_failures += 1;
606        self.consecutive_decrypt_failures
607    }
608
609    /// Reset consecutive decryption failure counter.
610    pub fn reset_decrypt_failures(&mut self) {
611        self.consecutive_decrypt_failures = 0;
612    }
613
614    /// Current consecutive decryption failure count.
615    pub fn consecutive_decrypt_failures(&self) -> u32 {
616        self.consecutive_decrypt_failures
617    }
618
619    // === Epoch Accessors ===
620
621    /// Get the remote peer's startup epoch (from handshake).
622    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
623        self.remote_epoch
624    }
625
626    /// Update the remote peer's startup epoch after a successful in-place
627    /// rekey. Initial handshakes set this through `with_session`, but recovery
628    /// rekeys also exchange epochs and must keep restart detection current.
629    pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
630        self.remote_epoch = remote_epoch;
631    }
632
633    // === Tree Accessors ===
634
635    /// Get the peer's tree coordinates, if known.
636    pub fn coords(&self) -> Option<&TreeCoordinate> {
637        self.ancestry.as_ref()
638    }
639
640    /// Get the peer's parent declaration, if known.
641    pub fn declaration(&self) -> Option<&ParentDeclaration> {
642        self.declaration.as_ref()
643    }
644
645    /// Check if this peer has a known tree position.
646    pub fn has_tree_position(&self) -> bool {
647        self.declaration.is_some() && self.ancestry.is_some()
648    }
649
650    // === Filter Accessors ===
651
652    /// Get the peer's inbound filter, if known.
653    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
654        self.inbound_filter.as_ref()
655    }
656
657    /// Get the filter sequence number.
658    pub fn filter_sequence(&self) -> u64 {
659        self.filter_sequence
660    }
661
662    /// Check if this peer's filter is stale.
663    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
664        if self.filter_received_at == 0 {
665            return true;
666        }
667        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
668    }
669
670    /// Check if a destination might be reachable through this peer.
671    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
672        match &self.inbound_filter {
673            Some(filter) => filter.contains(node_addr),
674            None => false,
675        }
676    }
677
678    /// Check if we need to send this peer a filter update.
679    pub fn needs_filter_update(&self) -> bool {
680        self.pending_filter_update
681    }
682
683    // === Statistics Accessors ===
684
685    /// Get link statistics.
686    pub fn link_stats(&self) -> &LinkStats {
687        &self.link_stats
688    }
689
690    /// Get mutable link statistics.
691    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
692        &mut self.link_stats
693    }
694
695    // === MMP Accessors ===
696
697    /// Get MMP state (None for legacy peers without sessions).
698    pub fn mmp(&self) -> Option<&MmpPeerState> {
699        self.mmp.as_ref()
700    }
701
702    /// Get mutable MMP state.
703    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
704        self.mmp.as_mut()
705    }
706
707    /// Link cost for routing decisions.
708    ///
709    /// Returns a scalar cost where lower is better (1.0 = ideal).
710    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
711    ///
712    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
713    /// available, matching depth-only parent selection behavior.
714    pub fn link_cost(&self) -> f64 {
715        match self.mmp() {
716            Some(mmp) => {
717                let etx = mmp.metrics.etx;
718                match mmp.metrics.srtt_ms() {
719                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
720                    None => 1.0,
721                }
722            }
723            None => 1.0,
724        }
725    }
726
727    /// Whether this peer has at least one MMP RTT measurement.
728    pub fn has_srtt(&self) -> bool {
729        self.mmp()
730            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
731    }
732
733    /// When this peer was authenticated.
734    pub fn authenticated_at(&self) -> u64 {
735        self.authenticated_at
736    }
737
738    /// When this peer was last seen.
739    pub fn last_seen(&self) -> u64 {
740        self.last_seen
741    }
742
743    /// Time since last activity.
744    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
745        current_time_ms.saturating_sub(self.last_seen)
746    }
747
748    /// Connection duration since authentication.
749    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
750        current_time_ms.saturating_sub(self.authenticated_at)
751    }
752
753    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
754    ///
755    /// Returns milliseconds since session establishment, truncated to u32.
756    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
757    pub fn session_elapsed_ms(&self) -> u32 {
758        self.session_start.elapsed().as_millis() as u32
759    }
760
761    /// When this peer's session started (for link-dead fallback timing).
762    pub fn session_start(&self) -> Instant {
763        self.session_start
764    }
765
766    // === Heartbeat ===
767
768    /// When we last sent a heartbeat to this peer.
769    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
770        self.last_heartbeat_sent
771    }
772
773    /// Record that we sent a heartbeat.
774    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
775        self.last_heartbeat_sent = Some(now);
776    }
777
778    // === State Updates ===
779
780    /// Update last seen timestamp.
781    pub fn touch(&mut self, current_time_ms: u64) {
782        self.last_seen = current_time_ms;
783        // If we were stale, receiving traffic makes us connected again
784        if self.connectivity == ConnectivityState::Stale {
785            self.connectivity = ConnectivityState::Connected;
786        }
787    }
788
789    /// Mark peer as stale (no recent traffic).
790    pub fn mark_stale(&mut self) {
791        if self.connectivity == ConnectivityState::Connected {
792            self.connectivity = ConnectivityState::Stale;
793        }
794    }
795
796    /// Mark peer as reconnecting.
797    pub fn mark_reconnecting(&mut self) {
798        self.connectivity = ConnectivityState::Reconnecting;
799    }
800
801    /// Mark peer as disconnected.
802    pub fn mark_disconnected(&mut self) {
803        self.connectivity = ConnectivityState::Disconnected;
804    }
805
806    /// Mark peer as connected (e.g., after successful reconnect).
807    pub fn mark_connected(&mut self, current_time_ms: u64) {
808        self.connectivity = ConnectivityState::Connected;
809        self.last_seen = current_time_ms;
810    }
811
812    /// Update the link ID (e.g., on reconnect).
813    pub fn set_link_id(&mut self, link_id: LinkId) {
814        self.link_id = link_id;
815    }
816
817    // === Tree Updates ===
818
819    /// Update peer's tree position.
820    pub fn update_tree_position(
821        &mut self,
822        declaration: ParentDeclaration,
823        ancestry: TreeCoordinate,
824        current_time_ms: u64,
825    ) {
826        self.declaration = Some(declaration);
827        self.ancestry = Some(ancestry);
828        self.last_seen = current_time_ms;
829    }
830
831    /// Clear peer's tree position.
832    pub fn clear_tree_position(&mut self) {
833        self.declaration = None;
834        self.ancestry = None;
835    }
836
837    // === Tree Announce Rate Limiting ===
838
839    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
840    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
841        self.tree_announce_min_interval_ms = ms;
842    }
843
844    /// Get the last tree announce send timestamp (for carrying across reconnection).
845    pub fn last_tree_announce_sent_ms(&self) -> u64 {
846        self.last_tree_announce_sent_ms
847    }
848
849    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
850    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
851        self.last_tree_announce_sent_ms = ms;
852    }
853
854    /// Check if we can send a TreeAnnounce now (rate limiting).
855    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
856        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
857    }
858
859    /// Record that we sent a TreeAnnounce to this peer.
860    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
861        self.last_tree_announce_sent_ms = now_ms;
862        self.pending_tree_announce = false;
863    }
864
865    /// Mark that a tree announce is pending (deferred due to rate limit).
866    pub fn mark_tree_announce_pending(&mut self) {
867        self.pending_tree_announce = true;
868    }
869
870    /// Check if a deferred tree announce is waiting to be sent.
871    pub fn has_pending_tree_announce(&self) -> bool {
872        self.pending_tree_announce
873    }
874
875    // === Filter Updates ===
876
877    /// Update peer's inbound filter.
878    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
879        self.inbound_filter = Some(filter);
880        self.filter_sequence = sequence;
881        self.filter_received_at = current_time_ms;
882        self.last_seen = current_time_ms;
883    }
884
885    /// Clear peer's inbound filter.
886    pub fn clear_filter(&mut self) {
887        self.inbound_filter = None;
888        self.filter_sequence = 0;
889        self.filter_received_at = 0;
890    }
891
892    /// Mark that we need to send this peer a filter update.
893    pub fn mark_filter_update_needed(&mut self) {
894        self.pending_filter_update = true;
895    }
896
897    /// Clear the pending filter update flag.
898    pub fn clear_filter_update_needed(&mut self) {
899        self.pending_filter_update = false;
900    }
901
902    // === Rekey (Key Rotation) ===
903
904    /// When the current Noise session was established.
905    pub fn session_established_at(&self) -> Instant {
906        self.session_established_at
907    }
908
909    #[cfg(test)]
910    pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
911        self.session_established_at = instant;
912    }
913
914    /// Per-session symmetric rekey-timer jitter offset (seconds).
915    pub fn rekey_jitter_secs(&self) -> i64 {
916        self.rekey_jitter_secs
917    }
918
919    /// Current K-bit epoch value.
920    pub fn current_k_bit(&self) -> bool {
921        self.current_k_bit
922    }
923
924    /// Whether a rekey is currently in progress.
925    pub fn rekey_in_progress(&self) -> bool {
926        self.rekey_in_progress
927    }
928
929    /// Mark that a rekey has been initiated.
930    pub fn set_rekey_in_progress(&mut self) {
931        self.rekey_in_progress = true;
932    }
933
934    /// Check if rekey initiation is dampened (peer recently sent us msg1).
935    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
936        match self.last_peer_rekey {
937            Some(t) => t.elapsed().as_secs() < dampening_secs,
938            None => false,
939        }
940    }
941
942    /// Record that the peer initiated a rekey (for dampening).
943    pub fn record_peer_rekey(&mut self) {
944        self.last_peer_rekey = Some(Instant::now());
945    }
946
947    /// Get the pending new session's our_index.
948    pub fn pending_our_index(&self) -> Option<SessionIndex> {
949        self.pending_our_index
950    }
951
952    /// Get the pending new session's their_index.
953    pub fn pending_their_index(&self) -> Option<SessionIndex> {
954        self.pending_their_index
955    }
956
957    /// Get the previous session's our_index (during drain).
958    pub fn previous_our_index(&self) -> Option<SessionIndex> {
959        self.previous_our_index
960    }
961
962    /// Get the previous session for decryption fallback.
963    pub fn previous_session(&self) -> Option<&NoiseSession> {
964        self.previous_session.as_ref()
965    }
966
967    /// Get mutable access to the previous session for decryption.
968    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
969        self.previous_session.as_mut()
970    }
971
972    /// Get the pending new session (completed rekey, not yet cut over).
973    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
974        self.pending_new_session.as_ref()
975    }
976
977    /// Store a completed rekey session and its indices.
978    ///
979    /// Called when the rekey handshake completes. The session is held
980    /// as pending until the initiator flips the K-bit on the next outbound packet.
981    pub fn set_pending_session(
982        &mut self,
983        session: NoiseSession,
984        our_index: SessionIndex,
985        their_index: SessionIndex,
986    ) {
987        self.pending_new_session = Some(session);
988        self.pending_our_index = Some(our_index);
989        self.pending_their_index = Some(their_index);
990        self.rekey_in_progress = false;
991        // Clear initiator handshake state (index now lives in pending_our_index)
992        self.rekey_our_index = None;
993        self.rekey_handshake = None;
994        self.rekey_msg1 = None;
995        self.rekey_msg1_next_resend = 0;
996    }
997
998    /// Cut over to the pending new session (initiator side).
999    ///
1000    /// Moves current session to previous (for drain), promotes pending to current,
1001    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
1002    /// during the drain window.
1003    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1004        let new_session = self.pending_new_session.take()?;
1005        let new_our_index = self.pending_our_index.take();
1006        let new_their_index = self.pending_their_index.take();
1007
1008        // Demote current to previous
1009        self.previous_session = self.noise_session.take();
1010        self.previous_our_index = self.our_index;
1011        self.drain_started = Some(Instant::now());
1012
1013        // Promote pending to current
1014        self.noise_session = Some(new_session);
1015        self.our_index = new_our_index;
1016        self.their_index = new_their_index;
1017
1018        // Flip K-bit and reset timing
1019        self.current_k_bit = !self.current_k_bit;
1020        self.session_established_at = Instant::now();
1021        self.session_start = Instant::now();
1022        self.rekey_in_progress = false;
1023        self.rekey_jitter_secs = draw_rekey_jitter();
1024        self.reset_replay_suppressed();
1025
1026        // Reset MMP counters to avoid metric discontinuity
1027        let now = Instant::now();
1028        if let Some(mmp) = &mut self.mmp {
1029            mmp.reset_for_rekey(now);
1030        }
1031
1032        self.previous_our_index
1033    }
1034
1035    /// Handle receiving a K-bit flip from the peer (responder side).
1036    ///
1037    /// Promotes pending_new_session to current, demotes current to previous.
1038    /// Returns the old our_index for drain tracking.
1039    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1040        let new_session = self.pending_new_session.take()?;
1041        let new_our_index = self.pending_our_index.take();
1042        let new_their_index = self.pending_their_index.take();
1043
1044        // Demote current to previous
1045        self.previous_session = self.noise_session.take();
1046        self.previous_our_index = self.our_index;
1047        self.drain_started = Some(Instant::now());
1048
1049        // Promote pending to current
1050        self.noise_session = Some(new_session);
1051        self.our_index = new_our_index;
1052        self.their_index = new_their_index;
1053
1054        // Match peer's K-bit
1055        self.current_k_bit = !self.current_k_bit;
1056        self.session_established_at = Instant::now();
1057        self.session_start = Instant::now();
1058        self.rekey_in_progress = false;
1059        self.rekey_jitter_secs = draw_rekey_jitter();
1060        self.reset_replay_suppressed();
1061
1062        // Reset MMP counters to avoid metric discontinuity
1063        let now = Instant::now();
1064        if let Some(mmp) = &mut self.mmp {
1065            mmp.reset_for_rekey(now);
1066        }
1067
1068        self.previous_our_index
1069    }
1070
1071    /// Check if the drain window has expired.
1072    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1073        match self.drain_started {
1074            Some(t) => t.elapsed().as_secs() >= drain_secs,
1075            None => false,
1076        }
1077    }
1078
1079    /// Whether a drain is in progress.
1080    pub fn is_draining(&self) -> bool {
1081        self.drain_started.is_some()
1082    }
1083
1084    /// Complete the drain: drop previous session and free its index.
1085    ///
1086    /// Returns the previous our_index so the caller can remove it from
1087    /// peers_by_index and free it from the IndexAllocator.
1088    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1089        self.previous_session = None;
1090        self.drain_started = None;
1091        self.previous_our_index.take()
1092    }
1093
1094    /// Abandon an in-progress rekey.
1095    ///
1096    /// Returns the rekey our_index so the caller can free it.
1097    /// Also clears any pending session state if the handshake was completed
1098    /// but not yet cut over.
1099    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1100        self.rekey_handshake = None;
1101        self.rekey_msg1 = None;
1102        self.rekey_msg1_next_resend = 0;
1103        self.rekey_in_progress = false;
1104        // Return whichever index needs freeing
1105        self.rekey_our_index.take().or_else(|| {
1106            self.pending_new_session = None;
1107            self.pending_their_index = None;
1108            self.pending_our_index.take()
1109        })
1110    }
1111
1112    // === Rekey Handshake State (Initiator) ===
1113
1114    /// Store rekey handshake state after sending msg1.
1115    pub fn set_rekey_state(
1116        &mut self,
1117        handshake: NoiseHandshakeState,
1118        our_index: SessionIndex,
1119        wire_msg1: Vec<u8>,
1120        next_resend_ms: u64,
1121    ) {
1122        self.rekey_handshake = Some(handshake);
1123        self.rekey_our_index = Some(our_index);
1124        self.rekey_msg1 = Some(wire_msg1);
1125        self.rekey_msg1_next_resend = next_resend_ms;
1126        self.rekey_in_progress = true;
1127    }
1128
1129    /// Get the rekey our_index (for msg2 dispatch lookup).
1130    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1131        self.rekey_our_index
1132    }
1133
1134    /// Complete the rekey by processing msg2 (initiator side).
1135    ///
1136    /// Takes the stored handshake state, reads msg2, and returns the
1137    /// completed NoiseSession. Clears the handshake-related fields but
1138    /// leaves rekey_our_index for set_pending_session to use.
1139    pub fn complete_rekey_msg2(
1140        &mut self,
1141        msg2_bytes: &[u8],
1142    ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1143        let mut hs = self
1144            .rekey_handshake
1145            .take()
1146            .ok_or_else(|| NoiseError::WrongState {
1147                expected: "rekey handshake in progress".to_string(),
1148                got: "no handshake state".to_string(),
1149            })?;
1150
1151        hs.read_message_2(msg2_bytes)?;
1152        let remote_epoch = hs.remote_epoch();
1153        let session = hs.into_session()?;
1154
1155        // Clear msg1 resend state
1156        self.rekey_msg1 = None;
1157        self.rekey_msg1_next_resend = 0;
1158
1159        Ok((session, remote_epoch))
1160    }
1161
1162    /// Check if msg1 needs resending.
1163    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1164        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1165    }
1166
1167    /// Get msg1 bytes for resend (without consuming).
1168    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1169        self.rekey_msg1.as_deref()
1170    }
1171
1172    /// Update next resend timestamp.
1173    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1174        self.rekey_msg1_next_resend = next_ms;
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181    use crate::Identity;
1182
1183    fn make_peer_identity() -> PeerIdentity {
1184        let identity = Identity::generate();
1185        PeerIdentity::from_pubkey(identity.pubkey())
1186    }
1187
1188    fn make_node_addr(val: u8) -> NodeAddr {
1189        let mut bytes = [0u8; 16];
1190        bytes[0] = val;
1191        NodeAddr::from_bytes(bytes)
1192    }
1193
1194    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1195        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1196    }
1197
1198    #[test]
1199    fn test_connectivity_state_properties() {
1200        assert!(ConnectivityState::Connected.can_send());
1201        assert!(ConnectivityState::Stale.can_send());
1202        assert!(!ConnectivityState::Reconnecting.can_send());
1203        assert!(!ConnectivityState::Disconnected.can_send());
1204
1205        assert!(ConnectivityState::Connected.is_healthy());
1206        assert!(!ConnectivityState::Stale.is_healthy());
1207
1208        assert!(ConnectivityState::Disconnected.is_terminal());
1209        assert!(!ConnectivityState::Connected.is_terminal());
1210    }
1211
1212    #[test]
1213    fn test_active_peer_creation() {
1214        let identity = make_peer_identity();
1215        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1216
1217        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1218        assert_eq!(peer.link_id(), LinkId::new(1));
1219        assert!(peer.is_healthy());
1220        assert!(peer.can_send());
1221        assert_eq!(peer.authenticated_at(), 1000);
1222        assert!(peer.needs_filter_update()); // New peers need filter
1223    }
1224
1225    #[test]
1226    fn test_connectivity_transitions() {
1227        let identity = make_peer_identity();
1228        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1229
1230        assert!(peer.is_healthy());
1231
1232        peer.mark_stale();
1233        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1234        assert!(peer.can_send()); // Stale can still send
1235
1236        // Traffic received brings back to connected
1237        peer.touch(2000);
1238        assert!(peer.is_healthy());
1239
1240        peer.mark_reconnecting();
1241        assert!(!peer.can_send());
1242
1243        peer.mark_connected(3000);
1244        assert!(peer.is_healthy());
1245
1246        peer.mark_disconnected();
1247        assert!(peer.is_disconnected());
1248        assert!(!peer.can_send());
1249    }
1250
1251    #[test]
1252    fn test_tree_position() {
1253        let identity = make_peer_identity();
1254        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1255
1256        assert!(!peer.has_tree_position());
1257        assert!(peer.coords().is_none());
1258
1259        let node = make_node_addr(1);
1260        let parent = make_node_addr(2);
1261        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1262        let coords = make_coords(&[1, 2, 0]);
1263
1264        peer.update_tree_position(decl, coords, 2000);
1265
1266        assert!(peer.has_tree_position());
1267        assert!(peer.coords().is_some());
1268        assert_eq!(peer.last_seen(), 2000);
1269    }
1270
1271    #[test]
1272    fn test_bloom_filter() {
1273        let identity = make_peer_identity();
1274        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1275        let target = make_node_addr(42);
1276
1277        assert!(!peer.may_reach(&target));
1278        assert!(peer.filter_is_stale(2000, 500));
1279
1280        let mut filter = BloomFilter::new();
1281        filter.insert(&target);
1282        peer.update_filter(filter, 1, 1500);
1283
1284        assert!(peer.may_reach(&target));
1285        assert!(!peer.filter_is_stale(1800, 500));
1286        assert!(peer.filter_is_stale(2500, 500));
1287    }
1288
1289    #[test]
1290    fn test_timing() {
1291        let identity = make_peer_identity();
1292        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1293
1294        assert_eq!(peer.connection_duration(2000), 1000);
1295        assert_eq!(peer.idle_time(2000), 1000);
1296    }
1297
1298    #[test]
1299    fn test_filter_update_flag() {
1300        let identity = make_peer_identity();
1301        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1302
1303        assert!(peer.needs_filter_update()); // New peer
1304
1305        peer.clear_filter_update_needed();
1306        assert!(!peer.needs_filter_update());
1307
1308        peer.mark_filter_update_needed();
1309        assert!(peer.needs_filter_update());
1310    }
1311
1312    #[test]
1313    fn test_with_stats() {
1314        let identity = make_peer_identity();
1315        let mut stats = LinkStats::new();
1316        stats.record_sent(100);
1317        stats.record_recv(200, 500);
1318
1319        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1320
1321        assert_eq!(peer.link_stats().packets_sent, 1);
1322        assert_eq!(peer.link_stats().packets_recv, 1);
1323    }
1324
1325    #[test]
1326    fn test_replay_suppression_counter() {
1327        let identity = make_peer_identity();
1328        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1329
1330        // Initial count is zero
1331        assert_eq!(peer.replay_suppressed_count(), 0);
1332
1333        // Increment returns new count
1334        assert_eq!(peer.increment_replay_suppressed(), 1);
1335        assert_eq!(peer.increment_replay_suppressed(), 2);
1336        assert_eq!(peer.increment_replay_suppressed(), 3);
1337        assert_eq!(peer.replay_suppressed_count(), 3);
1338
1339        // Reset returns previous count and zeroes it
1340        assert_eq!(peer.reset_replay_suppressed(), 3);
1341        assert_eq!(peer.replay_suppressed_count(), 0);
1342
1343        // Can increment again after reset
1344        assert_eq!(peer.increment_replay_suppressed(), 1);
1345        assert_eq!(peer.replay_suppressed_count(), 1);
1346
1347        // Reset when zero returns zero
1348        peer.reset_replay_suppressed();
1349        assert_eq!(peer.reset_replay_suppressed(), 0);
1350    }
1351
1352    #[test]
1353    fn test_increment_decrypt_failures_monotonic() {
1354        let identity = make_peer_identity();
1355        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1356
1357        // Initial count is zero
1358        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1359
1360        // Each call returns a strictly increasing count
1361        let mut prev = 0u32;
1362        for expected in 1..=25u32 {
1363            let count = peer.increment_decrypt_failures();
1364            assert_eq!(count, expected, "increment must return monotonic count");
1365            assert!(count > prev, "count must strictly increase");
1366            assert_eq!(peer.consecutive_decrypt_failures(), count);
1367            prev = count;
1368        }
1369    }
1370
1371    #[test]
1372    fn test_reset_decrypt_failures_zeroes_counter() {
1373        let identity = make_peer_identity();
1374        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1375
1376        // Drive counter up
1377        for _ in 0..7 {
1378            peer.increment_decrypt_failures();
1379        }
1380        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1381
1382        // Reset zeroes it
1383        peer.reset_decrypt_failures();
1384        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1385
1386        // Reset on zero is a no-op (still zero, no panic)
1387        peer.reset_decrypt_failures();
1388        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1389
1390        // Counter resumes at 1 after reset
1391        assert_eq!(peer.increment_decrypt_failures(), 1);
1392        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1393    }
1394
1395    #[test]
1396    fn test_rekey_jitter_in_range() {
1397        for _ in 0..100 {
1398            let identity = make_peer_identity();
1399            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1400            let jitter = peer.rekey_jitter_secs();
1401            assert!(
1402                (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1403                "jitter {} outside [-{}, +{}]",
1404                jitter,
1405                REKEY_JITTER_SECS,
1406                REKEY_JITTER_SECS
1407            );
1408        }
1409    }
1410
1411    #[test]
1412    fn test_rekey_jitter_mean_near_zero() {
1413        let mut sum = 0i64;
1414        let n = 200i64;
1415
1416        for _ in 0..n {
1417            let identity = make_peer_identity();
1418            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1419            sum += peer.rekey_jitter_secs();
1420        }
1421
1422        let mean = sum / n;
1423        assert!(
1424            mean.abs() < 5,
1425            "empirical mean {} not within 5 of 0 over {} samples",
1426            mean,
1427            n
1428        );
1429    }
1430}