Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::Instant;
18
19fn draw_rekey_jitter() -> i64 {
20    rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23/// Connectivity state for an active peer.
24///
25/// This is simpler than the full PeerState since authentication is complete.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28    /// Peer is fully connected and responsive.
29    Connected,
30    /// Peer hasn't been heard from recently (potential timeout).
31    Stale,
32    /// Connection lost, attempting to reconnect.
33    Reconnecting,
34    /// Peer has been explicitly disconnected.
35    Disconnected,
36}
37
38impl ConnectivityState {
39    /// Check if the peer is usable for sending traffic.
40    pub fn can_send(&self) -> bool {
41        matches!(
42            self,
43            ConnectivityState::Connected | ConnectivityState::Stale
44        )
45    }
46
47    /// Check if this is a terminal state requiring cleanup.
48    pub fn is_terminal(&self) -> bool {
49        matches!(self, ConnectivityState::Disconnected)
50    }
51
52    /// Check if peer is fully healthy.
53    pub fn is_healthy(&self) -> bool {
54        matches!(self, ConnectivityState::Connected)
55    }
56}
57
58impl fmt::Display for ConnectivityState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let s = match self {
61            ConnectivityState::Connected => "connected",
62            ConnectivityState::Stale => "stale",
63            ConnectivityState::Reconnecting => "reconnecting",
64            ConnectivityState::Disconnected => "disconnected",
65        };
66        write!(f, "{}", s)
67    }
68}
69
70/// A fully authenticated remote FIPS node.
71///
72/// Created only after successful Noise KK handshake. The identity is
73/// cryptographically verified at this point.
74///
75/// Note: ActivePeer intentionally does not implement Clone because it
76/// contains NoiseSession, which cannot be safely cloned (cloning would
77/// risk nonce reuse, a catastrophic security failure).
78#[derive(Debug)]
79pub struct ActivePeer {
80    // === Identity (Verified) ===
81    /// Cryptographic identity (verified via handshake).
82    identity: PeerIdentity,
83
84    // === Connection ===
85    /// Link used to reach this peer.
86    link_id: LinkId,
87    /// Current connectivity state.
88    connectivity: ConnectivityState,
89
90    // === Session (Wire Protocol) ===
91    /// Noise session for encryption/decryption (None if legacy peer).
92    noise_session: Option<NoiseSession>,
93    /// Our session index (they include this when sending TO us).
94    our_index: Option<SessionIndex>,
95    /// Their session index (we include this when sending TO them).
96    their_index: Option<SessionIndex>,
97    /// Transport ID for this peer's link.
98    transport_id: Option<TransportId>,
99    /// Current transport address (for roaming support).
100    current_addr: Option<TransportAddr>,
101
102    // === Spanning Tree ===
103    /// Their latest parent declaration.
104    declaration: Option<ParentDeclaration>,
105    /// Their path to root.
106    ancestry: Option<TreeCoordinate>,
107
108    // === Tree Announce Rate Limiting ===
109    /// Minimum interval between TreeAnnounce messages (milliseconds).
110    tree_announce_min_interval_ms: u64,
111    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
112    last_tree_announce_sent_ms: u64,
113    /// Whether a tree announce is pending (deferred due to rate limit).
114    pending_tree_announce: bool,
115
116    // === Bloom Filter ===
117    /// What's reachable through them (inbound filter).
118    inbound_filter: Option<BloomFilter>,
119    /// Their filter's sequence number.
120    filter_sequence: u64,
121    /// When we received their last filter (Unix milliseconds).
122    filter_received_at: u64,
123    /// Whether we owe them a filter update.
124    pending_filter_update: bool,
125
126    // === Timing ===
127    /// Session start time for computing session-relative timestamps.
128    /// Used as the epoch for the 4-byte inner header timestamp field.
129    session_start: Instant,
130
131    // === Statistics ===
132    /// Link statistics.
133    link_stats: LinkStats,
134    /// When this peer was authenticated (Unix milliseconds).
135    authenticated_at: u64,
136    /// When this peer was last seen (any activity, Unix milliseconds).
137    last_seen: u64,
138
139    // === Epoch (Restart Detection) ===
140    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
141    remote_epoch: Option<[u8; 8]>,
142
143    // === MMP ===
144    /// Per-peer MMP state (None for legacy peers without Noise sessions).
145    mmp: Option<MmpPeerState>,
146
147    // === Heartbeat ===
148    /// When we last sent a heartbeat to this peer.
149    last_heartbeat_sent: Option<Instant>,
150
151    // === Handshake Resend ===
152    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
153    /// Cleared after the handshake timeout window.
154    handshake_msg2: Option<Vec<u8>>,
155
156    // === Replay Detection Suppression ===
157    /// Number of replay detections suppressed since last session reset.
158    replay_suppressed_count: u32,
159    /// Consecutive decryption failures (reset on any successful decrypt).
160    consecutive_decrypt_failures: u32,
161
162    // === Rekey (Key Rotation) ===
163    /// When the current Noise session was established (for rekey timer).
164    session_established_at: Instant,
165    /// Per-session symmetric jitter applied to the rekey timer trigger.
166    rekey_jitter_secs: i64,
167    /// Current K-bit epoch value (alternates each rekey).
168    current_k_bit: bool,
169    /// Previous session kept alive during drain window after cutover.
170    previous_session: Option<NoiseSession>,
171    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
172    previous_our_index: Option<SessionIndex>,
173    /// When the drain window started (None = no drain in progress).
174    drain_started: Option<Instant>,
175    /// Pending new session from completed rekey (before K-bit cutover).
176    pending_new_session: Option<NoiseSession>,
177    /// Pending new session's our_index.
178    pending_our_index: Option<SessionIndex>,
179    /// Pending new session's their_index.
180    pending_their_index: Option<SessionIndex>,
181    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
182    rekey_in_progress: bool,
183    /// When we last received a rekey msg1 from this peer (dampening).
184    last_peer_rekey: Option<Instant>,
185    /// In-progress rekey: Noise handshake state (initiator only).
186    rekey_handshake: Option<NoiseHandshakeState>,
187    /// In-progress rekey: our new session index.
188    rekey_our_index: Option<SessionIndex>,
189    /// In-progress rekey: wire-format msg1 for resend.
190    rekey_msg1: Option<Vec<u8>>,
191    /// In-progress rekey: next resend timestamp (Unix ms).
192    rekey_msg1_next_resend: u64,
193
194    // === Connected Peer UDP Socket (Unix fast path) ===
195    /// Per-peer `connect()`-ed UDP socket, opened once we have a
196    /// stable kernel `SocketAddr` for the peer (i.e. session
197    /// established + transport address known). When `Some`, the
198    /// encrypt-worker send path can `sendmsg(2)` on this fd without
199    /// per-packet `msg_name` — the kernel-side route + neighbor cache
200    /// is pinned by the `connect()` call. On the receive side, Linux
201    /// and Darwin UDP demux preferentially route inbound packets from
202    /// this peer to this socket (most-specific 5-tuple match via
203    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
204    ///
205    /// Closed automatically on Drop. Behind an `Arc` so the
206    /// encrypt-worker's send path can hold a refcount without owning
207    /// the only handle (rekey / address-change may rotate the socket
208    /// while older jobs are still in-flight on the worker channel).
209    #[cfg(any(target_os = "linux", target_os = "macos"))]
210    connected_udp:
211        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
212
213    /// Per-peer receive-drain thread. Always paired with
214    /// `connected_udp`: while the connected socket is installed, the
215    /// kernel UDP demux preferentially routes inbound packets from
216    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
217    /// socket *must* be drained or packets pile up in its kernel
218    /// recv buffer. Drop signals the thread to exit via self-pipe.
219    #[cfg(any(target_os = "linux", target_os = "macos"))]
220    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
221}
222
223impl ActivePeer {
224    /// Create a new active peer from verified identity.
225    ///
226    /// Called after successful authentication handshake.
227    /// For peers with Noise sessions, use `with_session` instead.
228    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
229        let now = Instant::now();
230        Self {
231            identity,
232            link_id,
233            connectivity: ConnectivityState::Connected,
234            noise_session: None,
235            our_index: None,
236            their_index: None,
237            transport_id: None,
238            current_addr: None,
239            declaration: None,
240            ancestry: None,
241            tree_announce_min_interval_ms: 500,
242            last_tree_announce_sent_ms: 0,
243            pending_tree_announce: false,
244            inbound_filter: None,
245            filter_sequence: 0,
246            filter_received_at: 0,
247            pending_filter_update: true, // Send filter on new connection
248            session_start: now,
249            link_stats: LinkStats::new(),
250            authenticated_at,
251            last_seen: authenticated_at,
252            remote_epoch: None,
253            mmp: None,
254            last_heartbeat_sent: None,
255            handshake_msg2: None,
256            replay_suppressed_count: 0,
257            consecutive_decrypt_failures: 0,
258            session_established_at: now,
259            rekey_jitter_secs: draw_rekey_jitter(),
260            current_k_bit: false,
261            previous_session: None,
262            previous_our_index: None,
263            drain_started: None,
264            pending_new_session: None,
265            pending_our_index: None,
266            pending_their_index: None,
267            rekey_in_progress: false,
268            last_peer_rekey: None,
269            rekey_handshake: None,
270            rekey_our_index: None,
271            rekey_msg1: None,
272            rekey_msg1_next_resend: 0,
273            #[cfg(any(target_os = "linux", target_os = "macos"))]
274            connected_udp: None,
275            #[cfg(any(target_os = "linux", target_os = "macos"))]
276            peer_recv_drain: None,
277        }
278    }
279
280    /// Create from verified identity with existing link stats.
281    ///
282    /// Used when promoting from PeerConnection, preserving handshake stats.
283    /// For peers with Noise sessions, use `with_session` instead.
284    pub fn with_stats(
285        identity: PeerIdentity,
286        link_id: LinkId,
287        authenticated_at: u64,
288        link_stats: LinkStats,
289    ) -> Self {
290        let mut peer = Self::new(identity, link_id, authenticated_at);
291        peer.link_stats = link_stats;
292        peer
293    }
294
295    /// Create from verified identity with Noise session and index tracking.
296    ///
297    /// This is the primary constructor for the wire protocol path.
298    /// The NoiseSession provides encryption/decryption and replay protection.
299    #[allow(clippy::too_many_arguments)]
300    pub fn with_session(
301        identity: PeerIdentity,
302        link_id: LinkId,
303        authenticated_at: u64,
304        noise_session: NoiseSession,
305        our_index: SessionIndex,
306        their_index: SessionIndex,
307        transport_id: TransportId,
308        current_addr: TransportAddr,
309        link_stats: LinkStats,
310        is_initiator: bool,
311        mmp_config: &MmpConfig,
312        remote_epoch: Option<[u8; 8]>,
313    ) -> Self {
314        let now = Instant::now();
315        Self {
316            identity,
317            link_id,
318            connectivity: ConnectivityState::Connected,
319            noise_session: Some(noise_session),
320            our_index: Some(our_index),
321            their_index: Some(their_index),
322            transport_id: Some(transport_id),
323            current_addr: Some(current_addr),
324            declaration: None,
325            ancestry: None,
326            tree_announce_min_interval_ms: 500,
327            last_tree_announce_sent_ms: 0,
328            pending_tree_announce: false,
329            inbound_filter: None,
330            filter_sequence: 0,
331            filter_received_at: 0,
332            pending_filter_update: true,
333            session_start: now,
334            link_stats,
335            authenticated_at,
336            last_seen: authenticated_at,
337            remote_epoch,
338            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
339            last_heartbeat_sent: None,
340            handshake_msg2: None,
341            replay_suppressed_count: 0,
342            consecutive_decrypt_failures: 0,
343            session_established_at: now,
344            rekey_jitter_secs: draw_rekey_jitter(),
345            current_k_bit: false,
346            previous_session: None,
347            previous_our_index: None,
348            drain_started: None,
349            pending_new_session: None,
350            pending_our_index: None,
351            pending_their_index: None,
352            rekey_in_progress: false,
353            last_peer_rekey: None,
354            rekey_handshake: None,
355            rekey_our_index: None,
356            rekey_msg1: None,
357            rekey_msg1_next_resend: 0,
358            #[cfg(any(target_os = "linux", target_os = "macos"))]
359            connected_udp: None,
360            #[cfg(any(target_os = "linux", target_os = "macos"))]
361            peer_recv_drain: None,
362        }
363    }
364
365    /// Unix UDP fast path: clone the refcount on the per-peer
366    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
367    /// worker send path uses this to bypass the wildcard listen
368    /// socket's per-packet sockaddr handling.
369    #[cfg(any(target_os = "linux", target_os = "macos"))]
370    pub(crate) fn connected_udp(
371        &self,
372    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
373        self.connected_udp.clone()
374    }
375
376    /// Install a per-peer `connect()`-ed UDP socket **with** its
377    /// paired recv drain thread. The two are owned together: the
378    /// drain thread is the only consumer of packets arriving on this
379    /// socket (Linux UDP demux preferentially routes them away from
380    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
381    /// so installing one without the other would silently drop
382    /// inbound packets from this peer.
383    ///
384    /// Replacing an existing pair drops the old drain (its self-pipe
385    /// shutdown signal fires; thread exits within one poll
386    /// iteration) and drops the old socket Arc. Any encrypt-worker
387    /// jobs already in-flight holding the old socket Arc stay valid
388    /// until they complete, at which point the kernel fd closes.
389    #[cfg(any(target_os = "linux", target_os = "macos"))]
390    pub(crate) fn set_connected_udp(
391        &mut self,
392        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
393        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
394    ) {
395        // Drop order matters: drop the old drain BEFORE the old
396        // socket so the drain thread's last reference to the kernel
397        // fd is released cleanly.
398        self.peer_recv_drain = None;
399        self.connected_udp = None;
400        self.connected_udp = Some(socket);
401        self.peer_recv_drain = Some(drain);
402    }
403
404    /// Clear the per-peer connected UDP socket + drain thread (e.g.
405    /// on rekey or disconnect). The drain thread exits via self-pipe
406    /// signal; the kernel fd closes when the last `Arc` to the
407    /// socket drops.
408    #[cfg(any(target_os = "linux", target_os = "macos"))]
409    pub(crate) fn clear_connected_udp(&mut self) {
410        self.peer_recv_drain = None;
411        self.connected_udp = None;
412    }
413
414    // === Identity Accessors ===
415
416    /// Get the peer's verified identity.
417    pub fn identity(&self) -> &PeerIdentity {
418        &self.identity
419    }
420
421    /// Get the peer's NodeAddr.
422    pub fn node_addr(&self) -> &NodeAddr {
423        self.identity.node_addr()
424    }
425
426    /// Get the peer's FIPS address.
427    pub fn address(&self) -> &FipsAddress {
428        self.identity.address()
429    }
430
431    /// Get the peer's public key.
432    pub fn pubkey(&self) -> XOnlyPublicKey {
433        self.identity.pubkey()
434    }
435
436    /// Get the peer's npub string.
437    pub fn npub(&self) -> String {
438        self.identity.npub()
439    }
440
441    // === Connection Accessors ===
442
443    /// Get the link ID.
444    pub fn link_id(&self) -> LinkId {
445        self.link_id
446    }
447
448    /// Get the connectivity state.
449    pub fn connectivity(&self) -> ConnectivityState {
450        self.connectivity
451    }
452
453    /// Check if peer can receive traffic.
454    pub fn can_send(&self) -> bool {
455        self.connectivity.can_send()
456    }
457
458    /// Check if peer is fully healthy.
459    pub fn is_healthy(&self) -> bool {
460        self.connectivity.is_healthy()
461    }
462
463    /// Check if peer is disconnected.
464    pub fn is_disconnected(&self) -> bool {
465        self.connectivity.is_terminal()
466    }
467
468    // === Session Accessors ===
469
470    /// Check if this peer has a Noise session.
471    pub fn has_session(&self) -> bool {
472        self.noise_session.is_some()
473    }
474
475    /// Get the Noise session, if present.
476    pub fn noise_session(&self) -> Option<&NoiseSession> {
477        self.noise_session.as_ref()
478    }
479
480    /// Get mutable access to the Noise session.
481    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
482        self.noise_session.as_mut()
483    }
484
485    /// Get our session index (they use this to send TO us).
486    pub fn our_index(&self) -> Option<SessionIndex> {
487        self.our_index
488    }
489
490    /// Get their session index (we use this to send TO them).
491    pub fn their_index(&self) -> Option<SessionIndex> {
492        self.their_index
493    }
494
495    /// Update their session index (used during cross-connection resolution
496    /// when the losing node keeps its inbound session but needs the peer's
497    /// outbound index).
498    pub fn set_their_index(&mut self, index: SessionIndex) {
499        self.their_index = Some(index);
500    }
501
502    /// Replace the Noise session and indices during cross-connection resolution.
503    ///
504    /// When both nodes simultaneously initiate, each promotes its inbound
505    /// handshake first. When the peer's msg2 arrives, we learn the correct
506    /// session — the outbound handshake that pairs with the peer's inbound.
507    /// This replaces the entire session so both nodes use matching keys.
508    ///
509    /// Returns the old our_index so the caller can update peers_by_index.
510    /// Also resets the replay suppression counter since the session changed.
511    pub fn replace_session(
512        &mut self,
513        new_session: NoiseSession,
514        new_our_index: SessionIndex,
515        new_their_index: SessionIndex,
516    ) -> Option<SessionIndex> {
517        self.reset_replay_suppressed();
518        let old_our_index = self.our_index;
519        self.noise_session = Some(new_session);
520        self.our_index = Some(new_our_index);
521        self.their_index = Some(new_their_index);
522        old_our_index
523    }
524
525    /// Get the transport ID for this peer.
526    pub fn transport_id(&self) -> Option<TransportId> {
527        self.transport_id
528    }
529
530    /// Get the current transport address.
531    pub fn current_addr(&self) -> Option<&TransportAddr> {
532        self.current_addr.as_ref()
533    }
534
535    /// Update the current address (for roaming support).
536    ///
537    /// Called when we receive a valid authenticated packet from a new address.
538    /// Short-circuits when neither the transport_id nor the TransportAddr
539    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
540    /// stable per session and the overwhelming majority of inbound
541    /// packets hit this fast path. Saves both the redundant
542    /// `Option::take` + Vec drop on the cached side and the caller's
543    /// `.clone()` allocation on the input side: the caller can pass
544    /// `&TransportAddr` and we only `.to_owned()` when storing.
545    ///
546    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
547    /// actually changed. The caller uses this signal to invalidate
548    /// derived caches whose validity is bound to the peer's 5-tuple —
549    /// in particular the Linux per-peer `connect()`-ed UDP socket,
550    /// which is pinned to one kernel route + neighbour entry and goes
551    /// stale the moment the peer roams. (Clearing it here would force
552    /// `&mut self` users into the wrong shape: the policy of when to
553    /// rebuild the connected socket lives on `Node`, not on the peer
554    /// state. Returning a bool keeps that policy where it belongs.)
555    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
556        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
557            return false;
558        }
559        self.transport_id = Some(transport_id);
560        self.current_addr = Some(addr.clone());
561        true
562    }
563
564    // === Handshake Resend ===
565
566    /// Store wire-format msg2 for resend on duplicate msg1.
567    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
568        self.handshake_msg2 = Some(msg2);
569    }
570
571    /// Get stored msg2 bytes for resend.
572    pub fn handshake_msg2(&self) -> Option<&[u8]> {
573        self.handshake_msg2.as_deref()
574    }
575
576    /// Clear stored msg2 (no longer needed after handshake window).
577    pub fn clear_handshake_msg2(&mut self) {
578        self.handshake_msg2 = None;
579    }
580
581    // === Replay Detection Suppression ===
582
583    /// Increment replay suppression counter. Returns the new count.
584    pub fn increment_replay_suppressed(&mut self) -> u32 {
585        self.replay_suppressed_count += 1;
586        self.replay_suppressed_count
587    }
588
589    /// Reset replay suppression counter, returning previous count.
590    pub fn reset_replay_suppressed(&mut self) -> u32 {
591        let count = self.replay_suppressed_count;
592        self.replay_suppressed_count = 0;
593        count
594    }
595
596    /// Current replay suppression count.
597    pub fn replay_suppressed_count(&self) -> u32 {
598        self.replay_suppressed_count
599    }
600
601    // === Decryption Failure Tracking ===
602
603    /// Increment consecutive decryption failure counter, returning new count.
604    pub fn increment_decrypt_failures(&mut self) -> u32 {
605        self.consecutive_decrypt_failures += 1;
606        self.consecutive_decrypt_failures
607    }
608
609    /// Reset consecutive decryption failure counter.
610    pub fn reset_decrypt_failures(&mut self) {
611        self.consecutive_decrypt_failures = 0;
612    }
613
614    /// Current consecutive decryption failure count.
615    pub fn consecutive_decrypt_failures(&self) -> u32 {
616        self.consecutive_decrypt_failures
617    }
618
619    // === Epoch Accessors ===
620
621    /// Get the remote peer's startup epoch (from handshake).
622    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
623        self.remote_epoch
624    }
625
626    // === Tree Accessors ===
627
628    /// Get the peer's tree coordinates, if known.
629    pub fn coords(&self) -> Option<&TreeCoordinate> {
630        self.ancestry.as_ref()
631    }
632
633    /// Get the peer's parent declaration, if known.
634    pub fn declaration(&self) -> Option<&ParentDeclaration> {
635        self.declaration.as_ref()
636    }
637
638    /// Check if this peer has a known tree position.
639    pub fn has_tree_position(&self) -> bool {
640        self.declaration.is_some() && self.ancestry.is_some()
641    }
642
643    // === Filter Accessors ===
644
645    /// Get the peer's inbound filter, if known.
646    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
647        self.inbound_filter.as_ref()
648    }
649
650    /// Get the filter sequence number.
651    pub fn filter_sequence(&self) -> u64 {
652        self.filter_sequence
653    }
654
655    /// Check if this peer's filter is stale.
656    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
657        if self.filter_received_at == 0 {
658            return true;
659        }
660        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
661    }
662
663    /// Check if a destination might be reachable through this peer.
664    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
665        match &self.inbound_filter {
666            Some(filter) => filter.contains(node_addr),
667            None => false,
668        }
669    }
670
671    /// Check if we need to send this peer a filter update.
672    pub fn needs_filter_update(&self) -> bool {
673        self.pending_filter_update
674    }
675
676    // === Statistics Accessors ===
677
678    /// Get link statistics.
679    pub fn link_stats(&self) -> &LinkStats {
680        &self.link_stats
681    }
682
683    /// Get mutable link statistics.
684    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
685        &mut self.link_stats
686    }
687
688    // === MMP Accessors ===
689
690    /// Get MMP state (None for legacy peers without sessions).
691    pub fn mmp(&self) -> Option<&MmpPeerState> {
692        self.mmp.as_ref()
693    }
694
695    /// Get mutable MMP state.
696    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
697        self.mmp.as_mut()
698    }
699
700    /// Link cost for routing decisions.
701    ///
702    /// Returns a scalar cost where lower is better (1.0 = ideal).
703    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
704    ///
705    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
706    /// available, matching depth-only parent selection behavior.
707    pub fn link_cost(&self) -> f64 {
708        match self.mmp() {
709            Some(mmp) => {
710                let etx = mmp.metrics.etx;
711                match mmp.metrics.srtt_ms() {
712                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
713                    None => 1.0,
714                }
715            }
716            None => 1.0,
717        }
718    }
719
720    /// Whether this peer has at least one MMP RTT measurement.
721    pub fn has_srtt(&self) -> bool {
722        self.mmp()
723            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
724    }
725
726    /// When this peer was authenticated.
727    pub fn authenticated_at(&self) -> u64 {
728        self.authenticated_at
729    }
730
731    /// When this peer was last seen.
732    pub fn last_seen(&self) -> u64 {
733        self.last_seen
734    }
735
736    /// Time since last activity.
737    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
738        current_time_ms.saturating_sub(self.last_seen)
739    }
740
741    /// Connection duration since authentication.
742    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
743        current_time_ms.saturating_sub(self.authenticated_at)
744    }
745
746    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
747    ///
748    /// Returns milliseconds since session establishment, truncated to u32.
749    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
750    pub fn session_elapsed_ms(&self) -> u32 {
751        self.session_start.elapsed().as_millis() as u32
752    }
753
754    /// When this peer's session started (for link-dead fallback timing).
755    pub fn session_start(&self) -> Instant {
756        self.session_start
757    }
758
759    // === Heartbeat ===
760
761    /// When we last sent a heartbeat to this peer.
762    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
763        self.last_heartbeat_sent
764    }
765
766    /// Record that we sent a heartbeat.
767    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
768        self.last_heartbeat_sent = Some(now);
769    }
770
771    // === State Updates ===
772
773    /// Update last seen timestamp.
774    pub fn touch(&mut self, current_time_ms: u64) {
775        self.last_seen = current_time_ms;
776        // If we were stale, receiving traffic makes us connected again
777        if self.connectivity == ConnectivityState::Stale {
778            self.connectivity = ConnectivityState::Connected;
779        }
780    }
781
782    /// Mark peer as stale (no recent traffic).
783    pub fn mark_stale(&mut self) {
784        if self.connectivity == ConnectivityState::Connected {
785            self.connectivity = ConnectivityState::Stale;
786        }
787    }
788
789    /// Mark peer as reconnecting.
790    pub fn mark_reconnecting(&mut self) {
791        self.connectivity = ConnectivityState::Reconnecting;
792    }
793
794    /// Mark peer as disconnected.
795    pub fn mark_disconnected(&mut self) {
796        self.connectivity = ConnectivityState::Disconnected;
797    }
798
799    /// Mark peer as connected (e.g., after successful reconnect).
800    pub fn mark_connected(&mut self, current_time_ms: u64) {
801        self.connectivity = ConnectivityState::Connected;
802        self.last_seen = current_time_ms;
803    }
804
805    /// Update the link ID (e.g., on reconnect).
806    pub fn set_link_id(&mut self, link_id: LinkId) {
807        self.link_id = link_id;
808    }
809
810    // === Tree Updates ===
811
812    /// Update peer's tree position.
813    pub fn update_tree_position(
814        &mut self,
815        declaration: ParentDeclaration,
816        ancestry: TreeCoordinate,
817        current_time_ms: u64,
818    ) {
819        self.declaration = Some(declaration);
820        self.ancestry = Some(ancestry);
821        self.last_seen = current_time_ms;
822    }
823
824    /// Clear peer's tree position.
825    pub fn clear_tree_position(&mut self) {
826        self.declaration = None;
827        self.ancestry = None;
828    }
829
830    // === Tree Announce Rate Limiting ===
831
832    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
833    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
834        self.tree_announce_min_interval_ms = ms;
835    }
836
837    /// Get the last tree announce send timestamp (for carrying across reconnection).
838    pub fn last_tree_announce_sent_ms(&self) -> u64 {
839        self.last_tree_announce_sent_ms
840    }
841
842    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
843    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
844        self.last_tree_announce_sent_ms = ms;
845    }
846
847    /// Check if we can send a TreeAnnounce now (rate limiting).
848    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
849        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
850    }
851
852    /// Record that we sent a TreeAnnounce to this peer.
853    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
854        self.last_tree_announce_sent_ms = now_ms;
855        self.pending_tree_announce = false;
856    }
857
858    /// Mark that a tree announce is pending (deferred due to rate limit).
859    pub fn mark_tree_announce_pending(&mut self) {
860        self.pending_tree_announce = true;
861    }
862
863    /// Check if a deferred tree announce is waiting to be sent.
864    pub fn has_pending_tree_announce(&self) -> bool {
865        self.pending_tree_announce
866    }
867
868    // === Filter Updates ===
869
870    /// Update peer's inbound filter.
871    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
872        self.inbound_filter = Some(filter);
873        self.filter_sequence = sequence;
874        self.filter_received_at = current_time_ms;
875        self.last_seen = current_time_ms;
876    }
877
878    /// Clear peer's inbound filter.
879    pub fn clear_filter(&mut self) {
880        self.inbound_filter = None;
881        self.filter_sequence = 0;
882        self.filter_received_at = 0;
883    }
884
885    /// Mark that we need to send this peer a filter update.
886    pub fn mark_filter_update_needed(&mut self) {
887        self.pending_filter_update = true;
888    }
889
890    /// Clear the pending filter update flag.
891    pub fn clear_filter_update_needed(&mut self) {
892        self.pending_filter_update = false;
893    }
894
895    // === Rekey (Key Rotation) ===
896
897    /// When the current Noise session was established.
898    pub fn session_established_at(&self) -> Instant {
899        self.session_established_at
900    }
901
902    #[cfg(test)]
903    pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
904        self.session_established_at = instant;
905    }
906
907    /// Per-session symmetric rekey-timer jitter offset (seconds).
908    pub fn rekey_jitter_secs(&self) -> i64 {
909        self.rekey_jitter_secs
910    }
911
912    /// Current K-bit epoch value.
913    pub fn current_k_bit(&self) -> bool {
914        self.current_k_bit
915    }
916
917    /// Whether a rekey is currently in progress.
918    pub fn rekey_in_progress(&self) -> bool {
919        self.rekey_in_progress
920    }
921
922    /// Mark that a rekey has been initiated.
923    pub fn set_rekey_in_progress(&mut self) {
924        self.rekey_in_progress = true;
925    }
926
927    /// Check if rekey initiation is dampened (peer recently sent us msg1).
928    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
929        match self.last_peer_rekey {
930            Some(t) => t.elapsed().as_secs() < dampening_secs,
931            None => false,
932        }
933    }
934
935    /// Record that the peer initiated a rekey (for dampening).
936    pub fn record_peer_rekey(&mut self) {
937        self.last_peer_rekey = Some(Instant::now());
938    }
939
940    /// Get the pending new session's our_index.
941    pub fn pending_our_index(&self) -> Option<SessionIndex> {
942        self.pending_our_index
943    }
944
945    /// Get the pending new session's their_index.
946    pub fn pending_their_index(&self) -> Option<SessionIndex> {
947        self.pending_their_index
948    }
949
950    /// Get the previous session's our_index (during drain).
951    pub fn previous_our_index(&self) -> Option<SessionIndex> {
952        self.previous_our_index
953    }
954
955    /// Get the previous session for decryption fallback.
956    pub fn previous_session(&self) -> Option<&NoiseSession> {
957        self.previous_session.as_ref()
958    }
959
960    /// Get mutable access to the previous session for decryption.
961    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
962        self.previous_session.as_mut()
963    }
964
965    /// Get the pending new session (completed rekey, not yet cut over).
966    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
967        self.pending_new_session.as_ref()
968    }
969
970    /// Store a completed rekey session and its indices.
971    ///
972    /// Called when the rekey handshake completes. The session is held
973    /// as pending until the initiator flips the K-bit on the next outbound packet.
974    pub fn set_pending_session(
975        &mut self,
976        session: NoiseSession,
977        our_index: SessionIndex,
978        their_index: SessionIndex,
979    ) {
980        self.pending_new_session = Some(session);
981        self.pending_our_index = Some(our_index);
982        self.pending_their_index = Some(their_index);
983        self.rekey_in_progress = false;
984        // Clear initiator handshake state (index now lives in pending_our_index)
985        self.rekey_our_index = None;
986        self.rekey_handshake = None;
987        self.rekey_msg1 = None;
988        self.rekey_msg1_next_resend = 0;
989    }
990
991    /// Cut over to the pending new session (initiator side).
992    ///
993    /// Moves current session to previous (for drain), promotes pending to current,
994    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
995    /// during the drain window.
996    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
997        let new_session = self.pending_new_session.take()?;
998        let new_our_index = self.pending_our_index.take();
999        let new_their_index = self.pending_their_index.take();
1000
1001        // Demote current to previous
1002        self.previous_session = self.noise_session.take();
1003        self.previous_our_index = self.our_index;
1004        self.drain_started = Some(Instant::now());
1005
1006        // Promote pending to current
1007        self.noise_session = Some(new_session);
1008        self.our_index = new_our_index;
1009        self.their_index = new_their_index;
1010
1011        // Flip K-bit and reset timing
1012        self.current_k_bit = !self.current_k_bit;
1013        self.session_established_at = Instant::now();
1014        self.session_start = Instant::now();
1015        self.rekey_in_progress = false;
1016        self.rekey_jitter_secs = draw_rekey_jitter();
1017        self.reset_replay_suppressed();
1018
1019        // Reset MMP counters to avoid metric discontinuity
1020        let now = Instant::now();
1021        if let Some(mmp) = &mut self.mmp {
1022            mmp.reset_for_rekey(now);
1023        }
1024
1025        self.previous_our_index
1026    }
1027
1028    /// Handle receiving a K-bit flip from the peer (responder side).
1029    ///
1030    /// Promotes pending_new_session to current, demotes current to previous.
1031    /// Returns the old our_index for drain tracking.
1032    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1033        let new_session = self.pending_new_session.take()?;
1034        let new_our_index = self.pending_our_index.take();
1035        let new_their_index = self.pending_their_index.take();
1036
1037        // Demote current to previous
1038        self.previous_session = self.noise_session.take();
1039        self.previous_our_index = self.our_index;
1040        self.drain_started = Some(Instant::now());
1041
1042        // Promote pending to current
1043        self.noise_session = Some(new_session);
1044        self.our_index = new_our_index;
1045        self.their_index = new_their_index;
1046
1047        // Match peer's K-bit
1048        self.current_k_bit = !self.current_k_bit;
1049        self.session_established_at = Instant::now();
1050        self.session_start = Instant::now();
1051        self.rekey_in_progress = false;
1052        self.rekey_jitter_secs = draw_rekey_jitter();
1053        self.reset_replay_suppressed();
1054
1055        // Reset MMP counters to avoid metric discontinuity
1056        let now = Instant::now();
1057        if let Some(mmp) = &mut self.mmp {
1058            mmp.reset_for_rekey(now);
1059        }
1060
1061        self.previous_our_index
1062    }
1063
1064    /// Check if the drain window has expired.
1065    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1066        match self.drain_started {
1067            Some(t) => t.elapsed().as_secs() >= drain_secs,
1068            None => false,
1069        }
1070    }
1071
1072    /// Whether a drain is in progress.
1073    pub fn is_draining(&self) -> bool {
1074        self.drain_started.is_some()
1075    }
1076
1077    /// Complete the drain: drop previous session and free its index.
1078    ///
1079    /// Returns the previous our_index so the caller can remove it from
1080    /// peers_by_index and free it from the IndexAllocator.
1081    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1082        self.previous_session = None;
1083        self.drain_started = None;
1084        self.previous_our_index.take()
1085    }
1086
1087    /// Abandon an in-progress rekey.
1088    ///
1089    /// Returns the rekey our_index so the caller can free it.
1090    /// Also clears any pending session state if the handshake was completed
1091    /// but not yet cut over.
1092    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1093        self.rekey_handshake = None;
1094        self.rekey_msg1 = None;
1095        self.rekey_msg1_next_resend = 0;
1096        self.rekey_in_progress = false;
1097        // Return whichever index needs freeing
1098        self.rekey_our_index.take().or_else(|| {
1099            self.pending_new_session = None;
1100            self.pending_their_index = None;
1101            self.pending_our_index.take()
1102        })
1103    }
1104
1105    // === Rekey Handshake State (Initiator) ===
1106
1107    /// Store rekey handshake state after sending msg1.
1108    pub fn set_rekey_state(
1109        &mut self,
1110        handshake: NoiseHandshakeState,
1111        our_index: SessionIndex,
1112        wire_msg1: Vec<u8>,
1113        next_resend_ms: u64,
1114    ) {
1115        self.rekey_handshake = Some(handshake);
1116        self.rekey_our_index = Some(our_index);
1117        self.rekey_msg1 = Some(wire_msg1);
1118        self.rekey_msg1_next_resend = next_resend_ms;
1119        self.rekey_in_progress = true;
1120    }
1121
1122    /// Get the rekey our_index (for msg2 dispatch lookup).
1123    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1124        self.rekey_our_index
1125    }
1126
1127    /// Complete the rekey by processing msg2 (initiator side).
1128    ///
1129    /// Takes the stored handshake state, reads msg2, and returns the
1130    /// completed NoiseSession. Clears the handshake-related fields but
1131    /// leaves rekey_our_index for set_pending_session to use.
1132    pub fn complete_rekey_msg2(&mut self, msg2_bytes: &[u8]) -> Result<NoiseSession, NoiseError> {
1133        let mut hs = self
1134            .rekey_handshake
1135            .take()
1136            .ok_or_else(|| NoiseError::WrongState {
1137                expected: "rekey handshake in progress".to_string(),
1138                got: "no handshake state".to_string(),
1139            })?;
1140
1141        hs.read_message_2(msg2_bytes)?;
1142        let session = hs.into_session()?;
1143
1144        // Clear msg1 resend state
1145        self.rekey_msg1 = None;
1146        self.rekey_msg1_next_resend = 0;
1147
1148        Ok(session)
1149    }
1150
1151    /// Check if msg1 needs resending.
1152    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1153        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1154    }
1155
1156    /// Get msg1 bytes for resend (without consuming).
1157    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1158        self.rekey_msg1.as_deref()
1159    }
1160
1161    /// Update next resend timestamp.
1162    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1163        self.rekey_msg1_next_resend = next_ms;
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use super::*;
1170    use crate::Identity;
1171
1172    fn make_peer_identity() -> PeerIdentity {
1173        let identity = Identity::generate();
1174        PeerIdentity::from_pubkey(identity.pubkey())
1175    }
1176
1177    fn make_node_addr(val: u8) -> NodeAddr {
1178        let mut bytes = [0u8; 16];
1179        bytes[0] = val;
1180        NodeAddr::from_bytes(bytes)
1181    }
1182
1183    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1184        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1185    }
1186
1187    #[test]
1188    fn test_connectivity_state_properties() {
1189        assert!(ConnectivityState::Connected.can_send());
1190        assert!(ConnectivityState::Stale.can_send());
1191        assert!(!ConnectivityState::Reconnecting.can_send());
1192        assert!(!ConnectivityState::Disconnected.can_send());
1193
1194        assert!(ConnectivityState::Connected.is_healthy());
1195        assert!(!ConnectivityState::Stale.is_healthy());
1196
1197        assert!(ConnectivityState::Disconnected.is_terminal());
1198        assert!(!ConnectivityState::Connected.is_terminal());
1199    }
1200
1201    #[test]
1202    fn test_active_peer_creation() {
1203        let identity = make_peer_identity();
1204        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1205
1206        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1207        assert_eq!(peer.link_id(), LinkId::new(1));
1208        assert!(peer.is_healthy());
1209        assert!(peer.can_send());
1210        assert_eq!(peer.authenticated_at(), 1000);
1211        assert!(peer.needs_filter_update()); // New peers need filter
1212    }
1213
1214    #[test]
1215    fn test_connectivity_transitions() {
1216        let identity = make_peer_identity();
1217        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1218
1219        assert!(peer.is_healthy());
1220
1221        peer.mark_stale();
1222        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1223        assert!(peer.can_send()); // Stale can still send
1224
1225        // Traffic received brings back to connected
1226        peer.touch(2000);
1227        assert!(peer.is_healthy());
1228
1229        peer.mark_reconnecting();
1230        assert!(!peer.can_send());
1231
1232        peer.mark_connected(3000);
1233        assert!(peer.is_healthy());
1234
1235        peer.mark_disconnected();
1236        assert!(peer.is_disconnected());
1237        assert!(!peer.can_send());
1238    }
1239
1240    #[test]
1241    fn test_tree_position() {
1242        let identity = make_peer_identity();
1243        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1244
1245        assert!(!peer.has_tree_position());
1246        assert!(peer.coords().is_none());
1247
1248        let node = make_node_addr(1);
1249        let parent = make_node_addr(2);
1250        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1251        let coords = make_coords(&[1, 2, 0]);
1252
1253        peer.update_tree_position(decl, coords, 2000);
1254
1255        assert!(peer.has_tree_position());
1256        assert!(peer.coords().is_some());
1257        assert_eq!(peer.last_seen(), 2000);
1258    }
1259
1260    #[test]
1261    fn test_bloom_filter() {
1262        let identity = make_peer_identity();
1263        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1264        let target = make_node_addr(42);
1265
1266        assert!(!peer.may_reach(&target));
1267        assert!(peer.filter_is_stale(2000, 500));
1268
1269        let mut filter = BloomFilter::new();
1270        filter.insert(&target);
1271        peer.update_filter(filter, 1, 1500);
1272
1273        assert!(peer.may_reach(&target));
1274        assert!(!peer.filter_is_stale(1800, 500));
1275        assert!(peer.filter_is_stale(2500, 500));
1276    }
1277
1278    #[test]
1279    fn test_timing() {
1280        let identity = make_peer_identity();
1281        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1282
1283        assert_eq!(peer.connection_duration(2000), 1000);
1284        assert_eq!(peer.idle_time(2000), 1000);
1285    }
1286
1287    #[test]
1288    fn test_filter_update_flag() {
1289        let identity = make_peer_identity();
1290        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1291
1292        assert!(peer.needs_filter_update()); // New peer
1293
1294        peer.clear_filter_update_needed();
1295        assert!(!peer.needs_filter_update());
1296
1297        peer.mark_filter_update_needed();
1298        assert!(peer.needs_filter_update());
1299    }
1300
1301    #[test]
1302    fn test_with_stats() {
1303        let identity = make_peer_identity();
1304        let mut stats = LinkStats::new();
1305        stats.record_sent(100);
1306        stats.record_recv(200, 500);
1307
1308        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1309
1310        assert_eq!(peer.link_stats().packets_sent, 1);
1311        assert_eq!(peer.link_stats().packets_recv, 1);
1312    }
1313
1314    #[test]
1315    fn test_replay_suppression_counter() {
1316        let identity = make_peer_identity();
1317        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1318
1319        // Initial count is zero
1320        assert_eq!(peer.replay_suppressed_count(), 0);
1321
1322        // Increment returns new count
1323        assert_eq!(peer.increment_replay_suppressed(), 1);
1324        assert_eq!(peer.increment_replay_suppressed(), 2);
1325        assert_eq!(peer.increment_replay_suppressed(), 3);
1326        assert_eq!(peer.replay_suppressed_count(), 3);
1327
1328        // Reset returns previous count and zeroes it
1329        assert_eq!(peer.reset_replay_suppressed(), 3);
1330        assert_eq!(peer.replay_suppressed_count(), 0);
1331
1332        // Can increment again after reset
1333        assert_eq!(peer.increment_replay_suppressed(), 1);
1334        assert_eq!(peer.replay_suppressed_count(), 1);
1335
1336        // Reset when zero returns zero
1337        peer.reset_replay_suppressed();
1338        assert_eq!(peer.reset_replay_suppressed(), 0);
1339    }
1340
1341    #[test]
1342    fn test_increment_decrypt_failures_monotonic() {
1343        let identity = make_peer_identity();
1344        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1345
1346        // Initial count is zero
1347        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1348
1349        // Each call returns a strictly increasing count
1350        let mut prev = 0u32;
1351        for expected in 1..=25u32 {
1352            let count = peer.increment_decrypt_failures();
1353            assert_eq!(count, expected, "increment must return monotonic count");
1354            assert!(count > prev, "count must strictly increase");
1355            assert_eq!(peer.consecutive_decrypt_failures(), count);
1356            prev = count;
1357        }
1358    }
1359
1360    #[test]
1361    fn test_reset_decrypt_failures_zeroes_counter() {
1362        let identity = make_peer_identity();
1363        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1364
1365        // Drive counter up
1366        for _ in 0..7 {
1367            peer.increment_decrypt_failures();
1368        }
1369        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1370
1371        // Reset zeroes it
1372        peer.reset_decrypt_failures();
1373        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1374
1375        // Reset on zero is a no-op (still zero, no panic)
1376        peer.reset_decrypt_failures();
1377        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1378
1379        // Counter resumes at 1 after reset
1380        assert_eq!(peer.increment_decrypt_failures(), 1);
1381        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1382    }
1383
1384    #[test]
1385    fn test_rekey_jitter_in_range() {
1386        for _ in 0..100 {
1387            let identity = make_peer_identity();
1388            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1389            let jitter = peer.rekey_jitter_secs();
1390            assert!(
1391                (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1392                "jitter {} outside [-{}, +{}]",
1393                jitter,
1394                REKEY_JITTER_SECS,
1395                REKEY_JITTER_SECS
1396            );
1397        }
1398    }
1399
1400    #[test]
1401    fn test_rekey_jitter_mean_near_zero() {
1402        let mut sum = 0i64;
1403        let n = 200i64;
1404
1405        for _ in 0..n {
1406            let identity = make_peer_identity();
1407            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1408            sum += peer.rekey_jitter_secs();
1409        }
1410
1411        let mean = sum / n;
1412        assert!(
1413            mean.abs() < 5,
1414            "empirical mean {} not within 5 of 0 over {} samples",
1415            mean,
1416            n
1417        );
1418    }
1419}