Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20    rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23/// Connectivity state for an active peer.
24///
25/// This is simpler than the full PeerState since authentication is complete.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28    /// Peer is fully connected and responsive.
29    Connected,
30    /// Peer hasn't been heard from recently (potential timeout).
31    Stale,
32    /// Connection lost, attempting to reconnect.
33    Reconnecting,
34    /// Peer has been explicitly disconnected.
35    Disconnected,
36}
37
38impl ConnectivityState {
39    /// Check if the peer is usable for sending traffic.
40    pub fn can_send(&self) -> bool {
41        matches!(
42            self,
43            ConnectivityState::Connected | ConnectivityState::Stale
44        )
45    }
46
47    /// Check if this is a terminal state requiring cleanup.
48    pub fn is_terminal(&self) -> bool {
49        matches!(self, ConnectivityState::Disconnected)
50    }
51
52    /// Check if peer is fully healthy.
53    pub fn is_healthy(&self) -> bool {
54        matches!(self, ConnectivityState::Connected)
55    }
56}
57
58impl fmt::Display for ConnectivityState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let s = match self {
61            ConnectivityState::Connected => "connected",
62            ConnectivityState::Stale => "stale",
63            ConnectivityState::Reconnecting => "reconnecting",
64            ConnectivityState::Disconnected => "disconnected",
65        };
66        write!(f, "{}", s)
67    }
68}
69
70/// A fully authenticated remote FIPS node.
71///
72/// Created only after successful Noise KK handshake. The identity is
73/// cryptographically verified at this point.
74///
75/// Note: ActivePeer intentionally does not implement Clone because it
76/// contains NoiseSession, which cannot be safely cloned (cloning would
77/// risk nonce reuse, a catastrophic security failure).
78#[derive(Debug)]
79pub struct ActivePeer {
80    // === Identity (Verified) ===
81    /// Cryptographic identity (verified via handshake).
82    identity: PeerIdentity,
83
84    // === Connection ===
85    /// Link used to reach this peer.
86    link_id: LinkId,
87    /// Current connectivity state.
88    connectivity: ConnectivityState,
89
90    // === Session (Wire Protocol) ===
91    /// Noise session for encryption/decryption (None if legacy peer).
92    noise_session: Option<NoiseSession>,
93    /// Our session index (they include this when sending TO us).
94    our_index: Option<SessionIndex>,
95    /// Their session index (we include this when sending TO them).
96    their_index: Option<SessionIndex>,
97    /// Transport ID for this peer's link.
98    transport_id: Option<TransportId>,
99    /// Current transport address (for roaming support).
100    current_addr: Option<TransportAddr>,
101
102    // === Spanning Tree ===
103    /// Their latest parent declaration.
104    declaration: Option<ParentDeclaration>,
105    /// Their path to root.
106    ancestry: Option<TreeCoordinate>,
107
108    // === Tree Announce Rate Limiting ===
109    /// Minimum interval between TreeAnnounce messages (milliseconds).
110    tree_announce_min_interval_ms: u64,
111    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
112    last_tree_announce_sent_ms: u64,
113    /// Whether a tree announce is pending (deferred due to rate limit).
114    pending_tree_announce: bool,
115
116    // === Bloom Filter ===
117    /// What's reachable through them (inbound filter).
118    inbound_filter: Option<BloomFilter>,
119    /// Their filter's sequence number.
120    filter_sequence: u64,
121    /// When we received their last filter (Unix milliseconds).
122    filter_received_at: u64,
123    /// Whether we owe them a filter update.
124    pending_filter_update: bool,
125
126    // === Timing ===
127    /// Session start time for computing session-relative timestamps.
128    /// Used as the epoch for the 4-byte inner header timestamp field.
129    session_start: Instant,
130
131    // === Statistics ===
132    /// Link statistics.
133    link_stats: LinkStats,
134    /// When this peer was authenticated (Unix milliseconds).
135    authenticated_at: u64,
136    /// When this peer was last seen (any activity, Unix milliseconds).
137    last_seen: u64,
138
139    // === Epoch (Restart Detection) ===
140    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
141    remote_epoch: Option<[u8; 8]>,
142
143    // === MMP ===
144    /// Per-peer MMP state (None for legacy peers without Noise sessions).
145    mmp: Option<MmpPeerState>,
146
147    // === Heartbeat ===
148    /// When we last sent a heartbeat to this peer.
149    last_heartbeat_sent: Option<Instant>,
150
151    // === Handshake Resend ===
152    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
153    /// Cleared after the handshake timeout window.
154    handshake_msg2: Option<Vec<u8>>,
155
156    // === Replay Detection Suppression ===
157    /// Number of replay detections suppressed since last session reset.
158    replay_suppressed_count: u32,
159    /// Consecutive decryption failures (reset on any successful decrypt).
160    consecutive_decrypt_failures: u32,
161
162    // === Rekey (Key Rotation) ===
163    /// When the current Noise session was established (for rekey timer).
164    session_established_at: Instant,
165    /// Per-session symmetric jitter applied to the rekey timer trigger.
166    rekey_jitter_secs: i64,
167    /// Current K-bit epoch value (alternates each rekey).
168    current_k_bit: bool,
169    /// Previous session kept alive during drain window after cutover.
170    previous_session: Option<NoiseSession>,
171    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
172    previous_our_index: Option<SessionIndex>,
173    /// When the drain window started (None = no drain in progress).
174    drain_started: Option<Instant>,
175    /// Pending new session from completed rekey (before K-bit cutover).
176    pending_new_session: Option<NoiseSession>,
177    /// Pending new session's our_index.
178    pending_our_index: Option<SessionIndex>,
179    /// Pending new session's their_index.
180    pending_their_index: Option<SessionIndex>,
181    /// True when this node initiated the pending FMP rekey.
182    pending_rekey_initiator: bool,
183    /// When the pending FMP rekey completed locally.
184    pending_rekey_completed_at: Option<Instant>,
185    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
186    rekey_in_progress: bool,
187    /// When we last received a rekey msg1 from this peer (dampening).
188    last_peer_rekey: Option<Instant>,
189    /// In-progress rekey: Noise handshake state (initiator only).
190    rekey_handshake: Option<NoiseHandshakeState>,
191    /// In-progress rekey: our new session index.
192    rekey_our_index: Option<SessionIndex>,
193    /// In-progress rekey: wire-format msg1 for resend.
194    rekey_msg1: Option<Vec<u8>>,
195    /// In-progress rekey: next resend timestamp (Unix ms).
196    rekey_msg1_next_resend: u64,
197
198    // === Connected Peer UDP Socket (Unix fast path) ===
199    /// Per-peer `connect()`-ed UDP socket, opened once we have a
200    /// stable kernel `SocketAddr` for the peer (i.e. session
201    /// established + transport address known). When `Some`, the
202    /// encrypt-worker send path can `sendmsg(2)` on this fd without
203    /// per-packet `msg_name` — the kernel-side route + neighbor cache
204    /// is pinned by the `connect()` call. On the receive side, Linux
205    /// and Darwin UDP demux preferentially route inbound packets from
206    /// this peer to this socket (most-specific 5-tuple match via
207    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
208    ///
209    /// Closed automatically on Drop. Behind an `Arc` so the
210    /// encrypt-worker's send path can hold a refcount without owning
211    /// the only handle (rekey / address-change may rotate the socket
212    /// while older jobs are still in-flight on the worker channel).
213    #[cfg(any(target_os = "linux", target_os = "macos"))]
214    connected_udp:
215        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
216
217    /// Per-peer receive-drain thread. Always paired with
218    /// `connected_udp`: while the connected socket is installed, the
219    /// kernel UDP demux preferentially routes inbound packets from
220    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
221    /// socket *must* be drained or packets pile up in its kernel
222    /// recv buffer. Drop signals the thread to exit via self-pipe.
223    #[cfg(any(target_os = "linux", target_os = "macos"))]
224    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
225}
226
227impl ActivePeer {
228    /// Create a new active peer from verified identity.
229    ///
230    /// Called after successful authentication handshake.
231    /// For peers with Noise sessions, use `with_session` instead.
232    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
233        let now = Instant::now();
234        Self {
235            identity,
236            link_id,
237            connectivity: ConnectivityState::Connected,
238            noise_session: None,
239            our_index: None,
240            their_index: None,
241            transport_id: None,
242            current_addr: None,
243            declaration: None,
244            ancestry: None,
245            tree_announce_min_interval_ms: 500,
246            last_tree_announce_sent_ms: 0,
247            pending_tree_announce: false,
248            inbound_filter: None,
249            filter_sequence: 0,
250            filter_received_at: 0,
251            pending_filter_update: true, // Send filter on new connection
252            session_start: now,
253            link_stats: LinkStats::new(),
254            authenticated_at,
255            last_seen: authenticated_at,
256            remote_epoch: None,
257            mmp: None,
258            last_heartbeat_sent: None,
259            handshake_msg2: None,
260            replay_suppressed_count: 0,
261            consecutive_decrypt_failures: 0,
262            session_established_at: now,
263            rekey_jitter_secs: draw_rekey_jitter(),
264            current_k_bit: false,
265            previous_session: None,
266            previous_our_index: None,
267            drain_started: None,
268            pending_new_session: None,
269            pending_our_index: None,
270            pending_their_index: None,
271            pending_rekey_initiator: false,
272            pending_rekey_completed_at: None,
273            rekey_in_progress: false,
274            last_peer_rekey: None,
275            rekey_handshake: None,
276            rekey_our_index: None,
277            rekey_msg1: None,
278            rekey_msg1_next_resend: 0,
279            #[cfg(any(target_os = "linux", target_os = "macos"))]
280            connected_udp: None,
281            #[cfg(any(target_os = "linux", target_os = "macos"))]
282            peer_recv_drain: None,
283        }
284    }
285
286    /// Create from verified identity with existing link stats.
287    ///
288    /// Used when promoting from PeerConnection, preserving handshake stats.
289    /// For peers with Noise sessions, use `with_session` instead.
290    pub fn with_stats(
291        identity: PeerIdentity,
292        link_id: LinkId,
293        authenticated_at: u64,
294        link_stats: LinkStats,
295    ) -> Self {
296        let mut peer = Self::new(identity, link_id, authenticated_at);
297        peer.link_stats = link_stats;
298        peer
299    }
300
301    /// Create from verified identity with Noise session and index tracking.
302    ///
303    /// This is the primary constructor for the wire protocol path.
304    /// The NoiseSession provides encryption/decryption and replay protection.
305    #[allow(clippy::too_many_arguments)]
306    pub fn with_session(
307        identity: PeerIdentity,
308        link_id: LinkId,
309        authenticated_at: u64,
310        noise_session: NoiseSession,
311        our_index: SessionIndex,
312        their_index: SessionIndex,
313        transport_id: TransportId,
314        current_addr: TransportAddr,
315        link_stats: LinkStats,
316        is_initiator: bool,
317        mmp_config: &MmpConfig,
318        remote_epoch: Option<[u8; 8]>,
319    ) -> Self {
320        let now = Instant::now();
321        Self {
322            identity,
323            link_id,
324            connectivity: ConnectivityState::Connected,
325            noise_session: Some(noise_session),
326            our_index: Some(our_index),
327            their_index: Some(their_index),
328            transport_id: Some(transport_id),
329            current_addr: Some(current_addr),
330            declaration: None,
331            ancestry: None,
332            tree_announce_min_interval_ms: 500,
333            last_tree_announce_sent_ms: 0,
334            pending_tree_announce: false,
335            inbound_filter: None,
336            filter_sequence: 0,
337            filter_received_at: 0,
338            pending_filter_update: true,
339            session_start: now,
340            link_stats,
341            authenticated_at,
342            last_seen: authenticated_at,
343            remote_epoch,
344            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
345            last_heartbeat_sent: None,
346            handshake_msg2: None,
347            replay_suppressed_count: 0,
348            consecutive_decrypt_failures: 0,
349            session_established_at: now,
350            rekey_jitter_secs: draw_rekey_jitter(),
351            current_k_bit: false,
352            previous_session: None,
353            previous_our_index: None,
354            drain_started: None,
355            pending_new_session: None,
356            pending_our_index: None,
357            pending_their_index: None,
358            pending_rekey_initiator: false,
359            pending_rekey_completed_at: None,
360            rekey_in_progress: false,
361            last_peer_rekey: None,
362            rekey_handshake: None,
363            rekey_our_index: None,
364            rekey_msg1: None,
365            rekey_msg1_next_resend: 0,
366            #[cfg(any(target_os = "linux", target_os = "macos"))]
367            connected_udp: None,
368            #[cfg(any(target_os = "linux", target_os = "macos"))]
369            peer_recv_drain: None,
370        }
371    }
372
373    /// Unix UDP fast path: clone the refcount on the per-peer
374    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
375    /// worker send path uses this to bypass the wildcard listen
376    /// socket's per-packet sockaddr handling.
377    #[cfg(any(target_os = "linux", target_os = "macos"))]
378    pub(crate) fn connected_udp(
379        &self,
380    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
381        self.connected_udp.clone()
382    }
383
384    /// Install a per-peer `connect()`-ed UDP socket **with** its
385    /// paired recv drain thread. The two are owned together: the
386    /// drain thread is the only consumer of packets arriving on this
387    /// socket (Linux UDP demux preferentially routes them away from
388    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
389    /// so installing one without the other would silently drop
390    /// inbound packets from this peer.
391    ///
392    /// Replacing an existing pair drops the old drain (its self-pipe
393    /// shutdown signal fires; thread exits within one poll
394    /// iteration) and drops the old socket Arc. Any encrypt-worker
395    /// jobs already in-flight holding the old socket Arc stay valid
396    /// until they complete, at which point the kernel fd closes.
397    #[cfg(any(target_os = "linux", target_os = "macos"))]
398    pub(crate) fn set_connected_udp(
399        &mut self,
400        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
401        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
402    ) {
403        // Drop order matters: drop the old drain BEFORE the old
404        // socket so the drain thread's last reference to the kernel
405        // fd is released cleanly.
406        self.peer_recv_drain = None;
407        self.connected_udp = None;
408        self.connected_udp = Some(socket);
409        self.peer_recv_drain = Some(drain);
410    }
411
412    /// Clear the per-peer connected UDP socket + drain thread (e.g.
413    /// on rekey or disconnect). The drain thread exits via self-pipe
414    /// signal; the kernel fd closes when the last `Arc` to the
415    /// socket drops.
416    #[cfg(any(target_os = "linux", target_os = "macos"))]
417    pub(crate) fn clear_connected_udp(&mut self) {
418        self.peer_recv_drain = None;
419        self.connected_udp = None;
420    }
421
422    // === Identity Accessors ===
423
424    /// Get the peer's verified identity.
425    pub fn identity(&self) -> &PeerIdentity {
426        &self.identity
427    }
428
429    /// Get the peer's NodeAddr.
430    pub fn node_addr(&self) -> &NodeAddr {
431        self.identity.node_addr()
432    }
433
434    /// Get the peer's FIPS address.
435    pub fn address(&self) -> &FipsAddress {
436        self.identity.address()
437    }
438
439    /// Get the peer's public key.
440    pub fn pubkey(&self) -> XOnlyPublicKey {
441        self.identity.pubkey()
442    }
443
444    /// Get the peer's npub string.
445    pub fn npub(&self) -> String {
446        self.identity.npub()
447    }
448
449    // === Connection Accessors ===
450
451    /// Get the link ID.
452    pub fn link_id(&self) -> LinkId {
453        self.link_id
454    }
455
456    /// Get the connectivity state.
457    pub fn connectivity(&self) -> ConnectivityState {
458        self.connectivity
459    }
460
461    /// Check if peer can receive traffic.
462    pub fn can_send(&self) -> bool {
463        self.connectivity.can_send()
464    }
465
466    /// Check if peer is fully healthy.
467    pub fn is_healthy(&self) -> bool {
468        self.connectivity.is_healthy()
469    }
470
471    /// Check if peer is disconnected.
472    pub fn is_disconnected(&self) -> bool {
473        self.connectivity.is_terminal()
474    }
475
476    // === Session Accessors ===
477
478    /// Check if this peer has a Noise session.
479    pub fn has_session(&self) -> bool {
480        self.noise_session.is_some()
481    }
482
483    /// Get the Noise session, if present.
484    pub fn noise_session(&self) -> Option<&NoiseSession> {
485        self.noise_session.as_ref()
486    }
487
488    /// Get mutable access to the Noise session.
489    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
490        self.noise_session.as_mut()
491    }
492
493    /// Get our session index (they use this to send TO us).
494    pub fn our_index(&self) -> Option<SessionIndex> {
495        self.our_index
496    }
497
498    /// Get their session index (we use this to send TO them).
499    pub fn their_index(&self) -> Option<SessionIndex> {
500        self.their_index
501    }
502
503    /// Update their session index (used during cross-connection resolution
504    /// when the losing node keeps its inbound session but needs the peer's
505    /// outbound index).
506    pub fn set_their_index(&mut self, index: SessionIndex) {
507        self.their_index = Some(index);
508    }
509
510    /// Replace the Noise session and indices during cross-connection resolution.
511    ///
512    /// When both nodes simultaneously initiate, each promotes its inbound
513    /// handshake first. When the peer's msg2 arrives, we learn the correct
514    /// session — the outbound handshake that pairs with the peer's inbound.
515    /// This replaces the entire session so both nodes use matching keys.
516    ///
517    /// Returns the old our_index so the caller can update peers_by_index.
518    /// Also resets the replay suppression counter since the session changed.
519    pub fn replace_session(
520        &mut self,
521        new_session: NoiseSession,
522        new_our_index: SessionIndex,
523        new_their_index: SessionIndex,
524    ) -> Option<SessionIndex> {
525        self.reset_replay_suppressed();
526        let old_our_index = self.our_index;
527        self.noise_session = Some(new_session);
528        self.our_index = Some(new_our_index);
529        self.their_index = Some(new_their_index);
530        old_our_index
531    }
532
533    /// Get the transport ID for this peer.
534    pub fn transport_id(&self) -> Option<TransportId> {
535        self.transport_id
536    }
537
538    /// Get the current transport address.
539    pub fn current_addr(&self) -> Option<&TransportAddr> {
540        self.current_addr.as_ref()
541    }
542
543    /// Update the current address (for roaming support).
544    ///
545    /// Called when we receive a valid authenticated packet from a new address.
546    /// Short-circuits when neither the transport_id nor the TransportAddr
547    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
548    /// stable per session and the overwhelming majority of inbound
549    /// packets hit this fast path. Saves both the redundant
550    /// `Option::take` + Vec drop on the cached side and the caller's
551    /// `.clone()` allocation on the input side: the caller can pass
552    /// `&TransportAddr` and we only `.to_owned()` when storing.
553    ///
554    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
555    /// actually changed. The caller uses this signal to invalidate
556    /// derived caches whose validity is bound to the peer's 5-tuple —
557    /// in particular the Linux per-peer `connect()`-ed UDP socket,
558    /// which is pinned to one kernel route + neighbour entry and goes
559    /// stale the moment the peer roams. (Clearing it here would force
560    /// `&mut self` users into the wrong shape: the policy of when to
561    /// rebuild the connected socket lives on `Node`, not on the peer
562    /// state. Returning a bool keeps that policy where it belongs.)
563    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
564        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
565            return false;
566        }
567        self.transport_id = Some(transport_id);
568        self.current_addr = Some(addr.clone());
569        true
570    }
571
572    // === Handshake Resend ===
573
574    /// Store wire-format msg2 for resend on duplicate msg1.
575    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
576        self.handshake_msg2 = Some(msg2);
577    }
578
579    /// Get stored msg2 bytes for resend.
580    pub fn handshake_msg2(&self) -> Option<&[u8]> {
581        self.handshake_msg2.as_deref()
582    }
583
584    /// Clear stored msg2 (no longer needed after handshake window).
585    pub fn clear_handshake_msg2(&mut self) {
586        self.handshake_msg2 = None;
587    }
588
589    // === Replay Detection Suppression ===
590
591    /// Increment replay suppression counter. Returns the new count.
592    pub fn increment_replay_suppressed(&mut self) -> u32 {
593        self.replay_suppressed_count += 1;
594        self.replay_suppressed_count
595    }
596
597    /// Reset replay suppression counter, returning previous count.
598    pub fn reset_replay_suppressed(&mut self) -> u32 {
599        let count = self.replay_suppressed_count;
600        self.replay_suppressed_count = 0;
601        count
602    }
603
604    /// Current replay suppression count.
605    pub fn replay_suppressed_count(&self) -> u32 {
606        self.replay_suppressed_count
607    }
608
609    // === Decryption Failure Tracking ===
610
611    /// Increment consecutive decryption failure counter, returning new count.
612    pub fn increment_decrypt_failures(&mut self) -> u32 {
613        self.consecutive_decrypt_failures += 1;
614        self.consecutive_decrypt_failures
615    }
616
617    /// Reset consecutive decryption failure counter.
618    pub fn reset_decrypt_failures(&mut self) {
619        self.consecutive_decrypt_failures = 0;
620    }
621
622    /// Current consecutive decryption failure count.
623    pub fn consecutive_decrypt_failures(&self) -> u32 {
624        self.consecutive_decrypt_failures
625    }
626
627    // === Epoch Accessors ===
628
629    /// Get the remote peer's startup epoch (from handshake).
630    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
631        self.remote_epoch
632    }
633
634    /// Update the remote peer's startup epoch after a successful in-place
635    /// rekey. Initial handshakes set this through `with_session`, but recovery
636    /// rekeys also exchange epochs and must keep restart detection current.
637    pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
638        self.remote_epoch = remote_epoch;
639    }
640
641    // === Tree Accessors ===
642
643    /// Get the peer's tree coordinates, if known.
644    pub fn coords(&self) -> Option<&TreeCoordinate> {
645        self.ancestry.as_ref()
646    }
647
648    /// Get the peer's parent declaration, if known.
649    pub fn declaration(&self) -> Option<&ParentDeclaration> {
650        self.declaration.as_ref()
651    }
652
653    /// Check if this peer has a known tree position.
654    pub fn has_tree_position(&self) -> bool {
655        self.declaration.is_some() && self.ancestry.is_some()
656    }
657
658    // === Filter Accessors ===
659
660    /// Get the peer's inbound filter, if known.
661    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
662        self.inbound_filter.as_ref()
663    }
664
665    /// Get the filter sequence number.
666    pub fn filter_sequence(&self) -> u64 {
667        self.filter_sequence
668    }
669
670    /// Check if this peer's filter is stale.
671    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
672        if self.filter_received_at == 0 {
673            return true;
674        }
675        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
676    }
677
678    /// Check if a destination might be reachable through this peer.
679    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
680        match &self.inbound_filter {
681            Some(filter) => filter.contains(node_addr),
682            None => false,
683        }
684    }
685
686    /// Check if we need to send this peer a filter update.
687    pub fn needs_filter_update(&self) -> bool {
688        self.pending_filter_update
689    }
690
691    // === Statistics Accessors ===
692
693    /// Get link statistics.
694    pub fn link_stats(&self) -> &LinkStats {
695        &self.link_stats
696    }
697
698    /// Get mutable link statistics.
699    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
700        &mut self.link_stats
701    }
702
703    // === MMP Accessors ===
704
705    /// Get MMP state (None for legacy peers without sessions).
706    pub fn mmp(&self) -> Option<&MmpPeerState> {
707        self.mmp.as_ref()
708    }
709
710    /// Get mutable MMP state.
711    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
712        self.mmp.as_mut()
713    }
714
715    /// Link cost for routing decisions.
716    ///
717    /// Returns a scalar cost where lower is better (1.0 = ideal).
718    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
719    ///
720    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
721    /// available, matching depth-only parent selection behavior.
722    pub fn link_cost(&self) -> f64 {
723        match self.mmp() {
724            Some(mmp) => {
725                let etx = mmp.metrics.etx;
726                match mmp.metrics.srtt_ms() {
727                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
728                    None => 1.0,
729                }
730            }
731            None => 1.0,
732        }
733    }
734
735    /// Whether this peer has at least one MMP RTT measurement.
736    pub fn has_srtt(&self) -> bool {
737        self.mmp()
738            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
739    }
740
741    /// When this peer was authenticated.
742    pub fn authenticated_at(&self) -> u64 {
743        self.authenticated_at
744    }
745
746    /// When this peer was last seen.
747    pub fn last_seen(&self) -> u64 {
748        self.last_seen
749    }
750
751    /// Time since last activity.
752    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
753        current_time_ms.saturating_sub(self.last_seen)
754    }
755
756    /// Connection duration since authentication.
757    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
758        current_time_ms.saturating_sub(self.authenticated_at)
759    }
760
761    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
762    ///
763    /// Returns milliseconds since session establishment, truncated to u32.
764    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
765    pub fn session_elapsed_ms(&self) -> u32 {
766        self.session_start.elapsed().as_millis() as u32
767    }
768
769    /// When this peer's session started (for link-dead fallback timing).
770    pub fn session_start(&self) -> Instant {
771        self.session_start
772    }
773
774    // === Heartbeat ===
775
776    /// When we last sent a heartbeat to this peer.
777    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
778        self.last_heartbeat_sent
779    }
780
781    /// Record that we sent a heartbeat.
782    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
783        self.last_heartbeat_sent = Some(now);
784    }
785
786    // === State Updates ===
787
788    /// Update last seen timestamp.
789    pub fn touch(&mut self, current_time_ms: u64) {
790        self.last_seen = current_time_ms;
791        // If we were stale, receiving traffic makes us connected again
792        if self.connectivity == ConnectivityState::Stale {
793            self.connectivity = ConnectivityState::Connected;
794        }
795    }
796
797    /// Mark peer as stale (no recent traffic).
798    pub fn mark_stale(&mut self) {
799        if self.connectivity == ConnectivityState::Connected {
800            self.connectivity = ConnectivityState::Stale;
801        }
802    }
803
804    /// Mark peer as reconnecting.
805    pub fn mark_reconnecting(&mut self) {
806        self.connectivity = ConnectivityState::Reconnecting;
807    }
808
809    /// Mark peer as disconnected.
810    pub fn mark_disconnected(&mut self) {
811        self.connectivity = ConnectivityState::Disconnected;
812    }
813
814    /// Mark peer as connected (e.g., after successful reconnect).
815    pub fn mark_connected(&mut self, current_time_ms: u64) {
816        self.connectivity = ConnectivityState::Connected;
817        self.last_seen = current_time_ms;
818    }
819
820    /// Update the link ID (e.g., on reconnect).
821    pub fn set_link_id(&mut self, link_id: LinkId) {
822        self.link_id = link_id;
823    }
824
825    // === Tree Updates ===
826
827    /// Update peer's tree position.
828    pub fn update_tree_position(
829        &mut self,
830        declaration: ParentDeclaration,
831        ancestry: TreeCoordinate,
832        current_time_ms: u64,
833    ) {
834        self.declaration = Some(declaration);
835        self.ancestry = Some(ancestry);
836        self.last_seen = current_time_ms;
837    }
838
839    /// Clear peer's tree position.
840    pub fn clear_tree_position(&mut self) {
841        self.declaration = None;
842        self.ancestry = None;
843    }
844
845    // === Tree Announce Rate Limiting ===
846
847    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
848    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
849        self.tree_announce_min_interval_ms = ms;
850    }
851
852    /// Get the last tree announce send timestamp (for carrying across reconnection).
853    pub fn last_tree_announce_sent_ms(&self) -> u64 {
854        self.last_tree_announce_sent_ms
855    }
856
857    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
858    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
859        self.last_tree_announce_sent_ms = ms;
860    }
861
862    /// Check if we can send a TreeAnnounce now (rate limiting).
863    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
864        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
865    }
866
867    /// Record that we sent a TreeAnnounce to this peer.
868    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
869        self.last_tree_announce_sent_ms = now_ms;
870        self.pending_tree_announce = false;
871    }
872
873    /// Mark that a tree announce is pending (deferred due to rate limit).
874    pub fn mark_tree_announce_pending(&mut self) {
875        self.pending_tree_announce = true;
876    }
877
878    /// Check if a deferred tree announce is waiting to be sent.
879    pub fn has_pending_tree_announce(&self) -> bool {
880        self.pending_tree_announce
881    }
882
883    // === Filter Updates ===
884
885    /// Update peer's inbound filter.
886    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
887        self.inbound_filter = Some(filter);
888        self.filter_sequence = sequence;
889        self.filter_received_at = current_time_ms;
890        self.last_seen = current_time_ms;
891    }
892
893    /// Clear peer's inbound filter.
894    pub fn clear_filter(&mut self) {
895        self.inbound_filter = None;
896        self.filter_sequence = 0;
897        self.filter_received_at = 0;
898    }
899
900    /// Mark that we need to send this peer a filter update.
901    pub fn mark_filter_update_needed(&mut self) {
902        self.pending_filter_update = true;
903    }
904
905    /// Clear the pending filter update flag.
906    pub fn clear_filter_update_needed(&mut self) {
907        self.pending_filter_update = false;
908    }
909
910    // === Rekey (Key Rotation) ===
911
912    /// When the current Noise session was established.
913    pub fn session_established_at(&self) -> Instant {
914        self.session_established_at
915    }
916
917    #[cfg(test)]
918    pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
919        self.session_established_at = instant;
920    }
921
922    /// Per-session symmetric rekey-timer jitter offset (seconds).
923    pub fn rekey_jitter_secs(&self) -> i64 {
924        self.rekey_jitter_secs
925    }
926
927    /// Current K-bit epoch value.
928    pub fn current_k_bit(&self) -> bool {
929        self.current_k_bit
930    }
931
932    /// Whether a rekey is currently in progress.
933    pub fn rekey_in_progress(&self) -> bool {
934        self.rekey_in_progress
935    }
936
937    /// Mark that a rekey has been initiated.
938    pub fn set_rekey_in_progress(&mut self) {
939        self.rekey_in_progress = true;
940    }
941
942    /// Check if rekey initiation is dampened (peer recently sent us msg1).
943    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
944        match self.last_peer_rekey {
945            Some(t) => t.elapsed().as_secs() < dampening_secs,
946            None => false,
947        }
948    }
949
950    /// Record that the peer initiated a rekey (for dampening).
951    pub fn record_peer_rekey(&mut self) {
952        self.last_peer_rekey = Some(Instant::now());
953    }
954
955    /// Get the pending new session's our_index.
956    pub fn pending_our_index(&self) -> Option<SessionIndex> {
957        self.pending_our_index
958    }
959
960    /// Get the pending new session's their_index.
961    pub fn pending_their_index(&self) -> Option<SessionIndex> {
962        self.pending_their_index
963    }
964
965    /// Get the previous session's our_index (during drain).
966    pub fn previous_our_index(&self) -> Option<SessionIndex> {
967        self.previous_our_index
968    }
969
970    /// Get the previous session for decryption fallback.
971    pub fn previous_session(&self) -> Option<&NoiseSession> {
972        self.previous_session.as_ref()
973    }
974
975    /// Get mutable access to the previous session for decryption.
976    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
977        self.previous_session.as_mut()
978    }
979
980    /// Get the pending new session (completed rekey, not yet cut over).
981    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
982        self.pending_new_session.as_ref()
983    }
984
985    /// Whether this node should drive the K-bit cutover for the pending FMP rekey.
986    pub fn pending_rekey_initiator(&self) -> bool {
987        self.pending_rekey_initiator
988    }
989
990    /// Whether the locally initiated pending FMP rekey has waited long enough
991    /// to cut over. Responders cut over only after observing the peer's K-bit.
992    pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
993        self.pending_rekey_initiator
994            && self
995                .pending_rekey_completed_at
996                .is_some_and(|completed| completed.elapsed() >= delay)
997    }
998
999    /// Store a completed rekey session and its indices.
1000    ///
1001    /// Called when the rekey handshake completes. Initiators cut over after a
1002    /// short grace period; responders hold the session pending until they
1003    /// authenticate the peer's K-bit flip.
1004    pub fn set_pending_session(
1005        &mut self,
1006        session: NoiseSession,
1007        our_index: SessionIndex,
1008        their_index: SessionIndex,
1009        initiated_by_local: bool,
1010    ) {
1011        self.pending_new_session = Some(session);
1012        self.pending_our_index = Some(our_index);
1013        self.pending_their_index = Some(their_index);
1014        self.pending_rekey_initiator = initiated_by_local;
1015        self.pending_rekey_completed_at = Some(Instant::now());
1016        self.rekey_in_progress = false;
1017        // Clear initiator handshake state (index now lives in pending_our_index)
1018        self.rekey_our_index = None;
1019        self.rekey_handshake = None;
1020        self.rekey_msg1 = None;
1021        self.rekey_msg1_next_resend = 0;
1022    }
1023
1024    /// Cut over to the pending new session (initiator side).
1025    ///
1026    /// Moves current session to previous (for drain), promotes pending to current,
1027    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
1028    /// during the drain window.
1029    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1030        let new_session = self.pending_new_session.take()?;
1031        let new_our_index = self.pending_our_index.take();
1032        let new_their_index = self.pending_their_index.take();
1033
1034        // Demote current to previous
1035        self.previous_session = self.noise_session.take();
1036        self.previous_our_index = self.our_index;
1037        self.drain_started = Some(Instant::now());
1038
1039        // Promote pending to current
1040        self.noise_session = Some(new_session);
1041        self.our_index = new_our_index;
1042        self.their_index = new_their_index;
1043        self.pending_rekey_initiator = false;
1044        self.pending_rekey_completed_at = None;
1045
1046        // Flip K-bit and reset timing
1047        self.current_k_bit = !self.current_k_bit;
1048        self.session_established_at = Instant::now();
1049        self.session_start = Instant::now();
1050        self.rekey_in_progress = false;
1051        self.rekey_jitter_secs = draw_rekey_jitter();
1052        self.last_heartbeat_sent = None;
1053        self.reset_replay_suppressed();
1054
1055        // Reset MMP counters to avoid metric discontinuity
1056        let now = Instant::now();
1057        if let Some(mmp) = &mut self.mmp {
1058            mmp.reset_for_rekey(now);
1059        }
1060
1061        self.previous_our_index
1062    }
1063
1064    /// Handle receiving a K-bit flip from the peer (responder side).
1065    ///
1066    /// Promotes pending_new_session to current, demotes current to previous.
1067    /// Returns the old our_index for drain tracking.
1068    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1069        let new_session = self.pending_new_session.take()?;
1070        let new_our_index = self.pending_our_index.take();
1071        let new_their_index = self.pending_their_index.take();
1072
1073        // Demote current to previous
1074        self.previous_session = self.noise_session.take();
1075        self.previous_our_index = self.our_index;
1076        self.drain_started = Some(Instant::now());
1077
1078        // Promote pending to current
1079        self.noise_session = Some(new_session);
1080        self.our_index = new_our_index;
1081        self.their_index = new_their_index;
1082        self.pending_rekey_initiator = false;
1083        self.pending_rekey_completed_at = None;
1084
1085        // Match peer's K-bit
1086        self.current_k_bit = !self.current_k_bit;
1087        self.session_established_at = Instant::now();
1088        self.session_start = Instant::now();
1089        self.rekey_in_progress = false;
1090        self.rekey_jitter_secs = draw_rekey_jitter();
1091        self.last_heartbeat_sent = None;
1092        self.reset_replay_suppressed();
1093
1094        // Reset MMP counters to avoid metric discontinuity
1095        let now = Instant::now();
1096        if let Some(mmp) = &mut self.mmp {
1097            mmp.reset_for_rekey(now);
1098        }
1099
1100        self.previous_our_index
1101    }
1102
1103    /// Check if the drain window has expired.
1104    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1105        match self.drain_started {
1106            Some(t) => t.elapsed().as_secs() >= drain_secs,
1107            None => false,
1108        }
1109    }
1110
1111    /// Whether a drain is in progress.
1112    pub fn is_draining(&self) -> bool {
1113        self.drain_started.is_some()
1114    }
1115
1116    /// Complete the drain: drop previous session and free its index.
1117    ///
1118    /// Returns the previous our_index so the caller can remove it from
1119    /// peers_by_index and free it from the IndexAllocator.
1120    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1121        self.previous_session = None;
1122        self.drain_started = None;
1123        self.previous_our_index.take()
1124    }
1125
1126    /// Abandon an in-progress rekey.
1127    ///
1128    /// Returns the rekey our_index so the caller can free it.
1129    /// Also clears any pending session state if the handshake was completed
1130    /// but not yet cut over.
1131    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1132        self.rekey_handshake = None;
1133        self.rekey_msg1 = None;
1134        self.rekey_msg1_next_resend = 0;
1135        self.rekey_in_progress = false;
1136        // Return whichever index needs freeing
1137        self.rekey_our_index.take().or_else(|| {
1138            self.pending_new_session = None;
1139            self.pending_their_index = None;
1140            self.pending_rekey_initiator = false;
1141            self.pending_rekey_completed_at = None;
1142            self.pending_our_index.take()
1143        })
1144    }
1145
1146    // === Rekey Handshake State (Initiator) ===
1147
1148    /// Store rekey handshake state after sending msg1.
1149    pub fn set_rekey_state(
1150        &mut self,
1151        handshake: NoiseHandshakeState,
1152        our_index: SessionIndex,
1153        wire_msg1: Vec<u8>,
1154        next_resend_ms: u64,
1155    ) {
1156        self.rekey_handshake = Some(handshake);
1157        self.rekey_our_index = Some(our_index);
1158        self.rekey_msg1 = Some(wire_msg1);
1159        self.rekey_msg1_next_resend = next_resend_ms;
1160        self.rekey_in_progress = true;
1161    }
1162
1163    /// Get the rekey our_index (for msg2 dispatch lookup).
1164    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1165        self.rekey_our_index
1166    }
1167
1168    /// Complete the rekey by processing msg2 (initiator side).
1169    ///
1170    /// Takes the stored handshake state, reads msg2, and returns the
1171    /// completed NoiseSession. Clears the handshake-related fields but
1172    /// leaves rekey_our_index for set_pending_session to use.
1173    pub fn complete_rekey_msg2(
1174        &mut self,
1175        msg2_bytes: &[u8],
1176    ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1177        let mut hs = self
1178            .rekey_handshake
1179            .take()
1180            .ok_or_else(|| NoiseError::WrongState {
1181                expected: "rekey handshake in progress".to_string(),
1182                got: "no handshake state".to_string(),
1183            })?;
1184
1185        hs.read_message_2(msg2_bytes)?;
1186        let remote_epoch = hs.remote_epoch();
1187        let session = hs.into_session()?;
1188
1189        // Clear msg1 resend state
1190        self.rekey_msg1 = None;
1191        self.rekey_msg1_next_resend = 0;
1192
1193        Ok((session, remote_epoch))
1194    }
1195
1196    /// Check if msg1 needs resending.
1197    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1198        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1199    }
1200
1201    /// Get msg1 bytes for resend (without consuming).
1202    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1203        self.rekey_msg1.as_deref()
1204    }
1205
1206    /// Update next resend timestamp.
1207    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1208        self.rekey_msg1_next_resend = next_ms;
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use super::*;
1215    use crate::Identity;
1216
1217    fn make_peer_identity() -> PeerIdentity {
1218        let identity = Identity::generate();
1219        PeerIdentity::from_pubkey(identity.pubkey())
1220    }
1221
1222    fn make_node_addr(val: u8) -> NodeAddr {
1223        let mut bytes = [0u8; 16];
1224        bytes[0] = val;
1225        NodeAddr::from_bytes(bytes)
1226    }
1227
1228    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1229        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1230    }
1231
1232    #[test]
1233    fn test_connectivity_state_properties() {
1234        assert!(ConnectivityState::Connected.can_send());
1235        assert!(ConnectivityState::Stale.can_send());
1236        assert!(!ConnectivityState::Reconnecting.can_send());
1237        assert!(!ConnectivityState::Disconnected.can_send());
1238
1239        assert!(ConnectivityState::Connected.is_healthy());
1240        assert!(!ConnectivityState::Stale.is_healthy());
1241
1242        assert!(ConnectivityState::Disconnected.is_terminal());
1243        assert!(!ConnectivityState::Connected.is_terminal());
1244    }
1245
1246    #[test]
1247    fn test_active_peer_creation() {
1248        let identity = make_peer_identity();
1249        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1250
1251        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1252        assert_eq!(peer.link_id(), LinkId::new(1));
1253        assert!(peer.is_healthy());
1254        assert!(peer.can_send());
1255        assert_eq!(peer.authenticated_at(), 1000);
1256        assert!(peer.needs_filter_update()); // New peers need filter
1257    }
1258
1259    #[test]
1260    fn test_connectivity_transitions() {
1261        let identity = make_peer_identity();
1262        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1263
1264        assert!(peer.is_healthy());
1265
1266        peer.mark_stale();
1267        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1268        assert!(peer.can_send()); // Stale can still send
1269
1270        // Traffic received brings back to connected
1271        peer.touch(2000);
1272        assert!(peer.is_healthy());
1273
1274        peer.mark_reconnecting();
1275        assert!(!peer.can_send());
1276
1277        peer.mark_connected(3000);
1278        assert!(peer.is_healthy());
1279
1280        peer.mark_disconnected();
1281        assert!(peer.is_disconnected());
1282        assert!(!peer.can_send());
1283    }
1284
1285    #[test]
1286    fn test_tree_position() {
1287        let identity = make_peer_identity();
1288        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1289
1290        assert!(!peer.has_tree_position());
1291        assert!(peer.coords().is_none());
1292
1293        let node = make_node_addr(1);
1294        let parent = make_node_addr(2);
1295        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1296        let coords = make_coords(&[1, 2, 0]);
1297
1298        peer.update_tree_position(decl, coords, 2000);
1299
1300        assert!(peer.has_tree_position());
1301        assert!(peer.coords().is_some());
1302        assert_eq!(peer.last_seen(), 2000);
1303    }
1304
1305    #[test]
1306    fn test_bloom_filter() {
1307        let identity = make_peer_identity();
1308        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1309        let target = make_node_addr(42);
1310
1311        assert!(!peer.may_reach(&target));
1312        assert!(peer.filter_is_stale(2000, 500));
1313
1314        let mut filter = BloomFilter::new();
1315        filter.insert(&target);
1316        peer.update_filter(filter, 1, 1500);
1317
1318        assert!(peer.may_reach(&target));
1319        assert!(!peer.filter_is_stale(1800, 500));
1320        assert!(peer.filter_is_stale(2500, 500));
1321    }
1322
1323    #[test]
1324    fn test_timing() {
1325        let identity = make_peer_identity();
1326        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1327
1328        assert_eq!(peer.connection_duration(2000), 1000);
1329        assert_eq!(peer.idle_time(2000), 1000);
1330    }
1331
1332    #[test]
1333    fn test_filter_update_flag() {
1334        let identity = make_peer_identity();
1335        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1336
1337        assert!(peer.needs_filter_update()); // New peer
1338
1339        peer.clear_filter_update_needed();
1340        assert!(!peer.needs_filter_update());
1341
1342        peer.mark_filter_update_needed();
1343        assert!(peer.needs_filter_update());
1344    }
1345
1346    #[test]
1347    fn test_with_stats() {
1348        let identity = make_peer_identity();
1349        let mut stats = LinkStats::new();
1350        stats.record_sent(100);
1351        stats.record_recv(200, 500);
1352
1353        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1354
1355        assert_eq!(peer.link_stats().packets_sent, 1);
1356        assert_eq!(peer.link_stats().packets_recv, 1);
1357    }
1358
1359    #[test]
1360    fn test_replay_suppression_counter() {
1361        let identity = make_peer_identity();
1362        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1363
1364        // Initial count is zero
1365        assert_eq!(peer.replay_suppressed_count(), 0);
1366
1367        // Increment returns new count
1368        assert_eq!(peer.increment_replay_suppressed(), 1);
1369        assert_eq!(peer.increment_replay_suppressed(), 2);
1370        assert_eq!(peer.increment_replay_suppressed(), 3);
1371        assert_eq!(peer.replay_suppressed_count(), 3);
1372
1373        // Reset returns previous count and zeroes it
1374        assert_eq!(peer.reset_replay_suppressed(), 3);
1375        assert_eq!(peer.replay_suppressed_count(), 0);
1376
1377        // Can increment again after reset
1378        assert_eq!(peer.increment_replay_suppressed(), 1);
1379        assert_eq!(peer.replay_suppressed_count(), 1);
1380
1381        // Reset when zero returns zero
1382        peer.reset_replay_suppressed();
1383        assert_eq!(peer.reset_replay_suppressed(), 0);
1384    }
1385
1386    #[test]
1387    fn test_increment_decrypt_failures_monotonic() {
1388        let identity = make_peer_identity();
1389        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1390
1391        // Initial count is zero
1392        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1393
1394        // Each call returns a strictly increasing count
1395        let mut prev = 0u32;
1396        for expected in 1..=25u32 {
1397            let count = peer.increment_decrypt_failures();
1398            assert_eq!(count, expected, "increment must return monotonic count");
1399            assert!(count > prev, "count must strictly increase");
1400            assert_eq!(peer.consecutive_decrypt_failures(), count);
1401            prev = count;
1402        }
1403    }
1404
1405    #[test]
1406    fn test_reset_decrypt_failures_zeroes_counter() {
1407        let identity = make_peer_identity();
1408        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1409
1410        // Drive counter up
1411        for _ in 0..7 {
1412            peer.increment_decrypt_failures();
1413        }
1414        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1415
1416        // Reset zeroes it
1417        peer.reset_decrypt_failures();
1418        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1419
1420        // Reset on zero is a no-op (still zero, no panic)
1421        peer.reset_decrypt_failures();
1422        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1423
1424        // Counter resumes at 1 after reset
1425        assert_eq!(peer.increment_decrypt_failures(), 1);
1426        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1427    }
1428
1429    #[test]
1430    fn test_rekey_jitter_in_range() {
1431        for _ in 0..100 {
1432            let identity = make_peer_identity();
1433            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1434            let jitter = peer.rekey_jitter_secs();
1435            assert!(
1436                (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1437                "jitter {} outside [-{}, +{}]",
1438                jitter,
1439                REKEY_JITTER_SECS,
1440                REKEY_JITTER_SECS
1441            );
1442        }
1443    }
1444
1445    #[test]
1446    fn test_rekey_jitter_mean_near_zero() {
1447        let mut sum = 0i64;
1448        let n = 200i64;
1449
1450        for _ in 0..n {
1451            let identity = make_peer_identity();
1452            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1453            sum += peer.rekey_jitter_secs();
1454        }
1455
1456        let mean = sum / n;
1457        assert!(
1458            mean.abs() < 5,
1459            "empirical mean {} not within 5 of 0 over {} samples",
1460            mean,
1461            n
1462        );
1463    }
1464}