Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20    rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23/// Connectivity state for an active peer.
24///
25/// This is simpler than the full PeerState since authentication is complete.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28    /// Peer is fully connected and responsive.
29    Connected,
30    /// Peer hasn't been heard from recently (potential timeout).
31    Stale,
32    /// Connection lost, attempting to reconnect.
33    Reconnecting,
34    /// Peer has been explicitly disconnected.
35    Disconnected,
36}
37
38impl ConnectivityState {
39    /// Check if the peer is usable for sending traffic.
40    pub fn can_send(&self) -> bool {
41        matches!(
42            self,
43            ConnectivityState::Connected | ConnectivityState::Stale
44        )
45    }
46
47    /// Check if this is a terminal state requiring cleanup.
48    pub fn is_terminal(&self) -> bool {
49        matches!(self, ConnectivityState::Disconnected)
50    }
51
52    /// Check if peer is fully healthy.
53    pub fn is_healthy(&self) -> bool {
54        matches!(self, ConnectivityState::Connected)
55    }
56}
57
58impl fmt::Display for ConnectivityState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let s = match self {
61            ConnectivityState::Connected => "connected",
62            ConnectivityState::Stale => "stale",
63            ConnectivityState::Reconnecting => "reconnecting",
64            ConnectivityState::Disconnected => "disconnected",
65        };
66        write!(f, "{}", s)
67    }
68}
69
70/// A fully authenticated remote FIPS node.
71///
72/// Created only after successful Noise KK handshake. The identity is
73/// cryptographically verified at this point.
74///
75/// Note: ActivePeer intentionally does not implement Clone because it
76/// contains NoiseSession, which cannot be safely cloned (cloning would
77/// risk nonce reuse, a catastrophic security failure).
78#[derive(Debug)]
79pub struct ActivePeer {
80    // === Identity (Verified) ===
81    /// Cryptographic identity (verified via handshake).
82    identity: PeerIdentity,
83
84    // === Connection ===
85    /// Link used to reach this peer.
86    link_id: LinkId,
87    /// Current connectivity state.
88    connectivity: ConnectivityState,
89
90    // === Session (Wire Protocol) ===
91    /// Noise session for encryption/decryption (None if legacy peer).
92    noise_session: Option<NoiseSession>,
93    /// Our session index (they include this when sending TO us).
94    our_index: Option<SessionIndex>,
95    /// Their session index (we include this when sending TO them).
96    their_index: Option<SessionIndex>,
97    /// Transport ID for this peer's link.
98    transport_id: Option<TransportId>,
99    /// Current transport address (for roaming support).
100    current_addr: Option<TransportAddr>,
101
102    // === Spanning Tree ===
103    /// Their latest parent declaration.
104    declaration: Option<ParentDeclaration>,
105    /// Their path to root.
106    ancestry: Option<TreeCoordinate>,
107
108    // === Tree Announce Rate Limiting ===
109    /// Minimum interval between TreeAnnounce messages (milliseconds).
110    tree_announce_min_interval_ms: u64,
111    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
112    last_tree_announce_sent_ms: u64,
113    /// Whether a tree announce is pending (deferred due to rate limit).
114    pending_tree_announce: bool,
115
116    // === Bloom Filter ===
117    /// What's reachable through them (inbound filter).
118    inbound_filter: Option<BloomFilter>,
119    /// Their filter's sequence number.
120    filter_sequence: u64,
121    /// When we received their last filter (Unix milliseconds).
122    filter_received_at: u64,
123    /// Whether we owe them a filter update.
124    pending_filter_update: bool,
125
126    // === Timing ===
127    /// Session start time for computing session-relative timestamps.
128    /// Used as the epoch for the 4-byte inner header timestamp field.
129    session_start: Instant,
130
131    // === Statistics ===
132    /// Link statistics.
133    link_stats: LinkStats,
134    /// When this peer was authenticated (Unix milliseconds).
135    authenticated_at: u64,
136    /// When this peer was last seen (any activity, Unix milliseconds).
137    last_seen: u64,
138
139    // === Epoch (Restart Detection) ===
140    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
141    remote_epoch: Option<[u8; 8]>,
142
143    // === MMP ===
144    /// Per-peer MMP state (None for legacy peers without Noise sessions).
145    mmp: Option<MmpPeerState>,
146
147    // === Heartbeat ===
148    /// When we last sent a heartbeat to this peer.
149    last_heartbeat_sent: Option<Instant>,
150
151    // === Handshake Resend ===
152    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
153    /// Cleared after the handshake timeout window.
154    handshake_msg2: Option<Vec<u8>>,
155
156    // === Replay Detection Suppression ===
157    /// Number of replay detections suppressed since last session reset.
158    replay_suppressed_count: u32,
159    /// Consecutive decryption failures (reset on any successful decrypt).
160    consecutive_decrypt_failures: u32,
161
162    // === Rekey (Key Rotation) ===
163    /// When the current Noise session was established (for rekey timer).
164    session_established_at: Instant,
165    /// Per-session symmetric jitter applied to the rekey timer trigger.
166    rekey_jitter_secs: i64,
167    /// Current K-bit epoch value (alternates each rekey).
168    current_k_bit: bool,
169    /// Previous session kept alive during drain window after cutover.
170    previous_session: Option<NoiseSession>,
171    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
172    previous_our_index: Option<SessionIndex>,
173    /// When the drain window started (None = no drain in progress).
174    drain_started: Option<Instant>,
175    /// Pending new session from completed rekey (before K-bit cutover).
176    pending_new_session: Option<NoiseSession>,
177    /// Pending new session's our_index.
178    pending_our_index: Option<SessionIndex>,
179    /// Pending new session's their_index.
180    pending_their_index: Option<SessionIndex>,
181    /// True when this node initiated the pending FMP rekey.
182    pending_rekey_initiator: bool,
183    /// When the pending FMP rekey completed locally.
184    pending_rekey_completed_at: Option<Instant>,
185    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
186    rekey_in_progress: bool,
187    /// When we last received a rekey msg1 from this peer (dampening).
188    last_peer_rekey: Option<Instant>,
189    /// In-progress rekey: Noise handshake state (initiator only).
190    rekey_handshake: Option<NoiseHandshakeState>,
191    /// In-progress rekey: our new session index.
192    rekey_our_index: Option<SessionIndex>,
193    /// In-progress rekey: wire-format msg1 for resend.
194    rekey_msg1: Option<Vec<u8>>,
195    /// In-progress rekey: next resend timestamp (Unix ms).
196    rekey_msg1_next_resend: u64,
197
198    // === Connected Peer UDP Socket (Unix fast path) ===
199    /// Per-peer `connect()`-ed UDP socket, opened once we have a
200    /// stable kernel `SocketAddr` for the peer (i.e. session
201    /// established + transport address known). When `Some`, the
202    /// encrypt-worker send path can `sendmsg(2)` on this fd without
203    /// per-packet `msg_name` — the kernel-side route + neighbor cache
204    /// is pinned by the `connect()` call. On the receive side, Linux
205    /// and Darwin UDP demux preferentially route inbound packets from
206    /// this peer to this socket (most-specific 5-tuple match via
207    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
208    ///
209    /// Closed automatically on Drop. Behind an `Arc` so the
210    /// encrypt-worker's send path can hold a refcount without owning
211    /// the only handle (rekey / address-change may rotate the socket
212    /// while older jobs are still in-flight on the worker channel).
213    #[cfg(any(target_os = "linux", target_os = "macos"))]
214    connected_udp:
215        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
216
217    /// Per-peer receive-drain thread. Always paired with
218    /// `connected_udp`: while the connected socket is installed, the
219    /// kernel UDP demux preferentially routes inbound packets from
220    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
221    /// socket *must* be drained or packets pile up in its kernel
222    /// recv buffer. Drop signals the thread to exit via self-pipe.
223    #[cfg(any(target_os = "linux", target_os = "macos"))]
224    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
225}
226
227impl ActivePeer {
228    /// Create a new active peer from verified identity.
229    ///
230    /// Called after successful authentication handshake.
231    /// For peers with Noise sessions, use `with_session` instead.
232    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
233        let now = Instant::now();
234        Self {
235            identity,
236            link_id,
237            connectivity: ConnectivityState::Connected,
238            noise_session: None,
239            our_index: None,
240            their_index: None,
241            transport_id: None,
242            current_addr: None,
243            declaration: None,
244            ancestry: None,
245            tree_announce_min_interval_ms: 500,
246            last_tree_announce_sent_ms: 0,
247            pending_tree_announce: false,
248            inbound_filter: None,
249            filter_sequence: 0,
250            filter_received_at: 0,
251            pending_filter_update: true, // Send filter on new connection
252            session_start: now,
253            link_stats: LinkStats::new(),
254            authenticated_at,
255            last_seen: authenticated_at,
256            remote_epoch: None,
257            mmp: None,
258            last_heartbeat_sent: None,
259            handshake_msg2: None,
260            replay_suppressed_count: 0,
261            consecutive_decrypt_failures: 0,
262            session_established_at: now,
263            rekey_jitter_secs: draw_rekey_jitter(),
264            current_k_bit: false,
265            previous_session: None,
266            previous_our_index: None,
267            drain_started: None,
268            pending_new_session: None,
269            pending_our_index: None,
270            pending_their_index: None,
271            pending_rekey_initiator: false,
272            pending_rekey_completed_at: None,
273            rekey_in_progress: false,
274            last_peer_rekey: None,
275            rekey_handshake: None,
276            rekey_our_index: None,
277            rekey_msg1: None,
278            rekey_msg1_next_resend: 0,
279            #[cfg(any(target_os = "linux", target_os = "macos"))]
280            connected_udp: None,
281            #[cfg(any(target_os = "linux", target_os = "macos"))]
282            peer_recv_drain: None,
283        }
284    }
285
286    /// Create from verified identity with existing link stats.
287    ///
288    /// Used when promoting from PeerConnection, preserving handshake stats.
289    /// For peers with Noise sessions, use `with_session` instead.
290    pub fn with_stats(
291        identity: PeerIdentity,
292        link_id: LinkId,
293        authenticated_at: u64,
294        link_stats: LinkStats,
295    ) -> Self {
296        let mut peer = Self::new(identity, link_id, authenticated_at);
297        peer.link_stats = link_stats;
298        peer
299    }
300
301    /// Create from verified identity with Noise session and index tracking.
302    ///
303    /// This is the primary constructor for the wire protocol path.
304    /// The NoiseSession provides encryption/decryption and replay protection.
305    #[allow(clippy::too_many_arguments)]
306    pub fn with_session(
307        identity: PeerIdentity,
308        link_id: LinkId,
309        authenticated_at: u64,
310        noise_session: NoiseSession,
311        our_index: SessionIndex,
312        their_index: SessionIndex,
313        transport_id: TransportId,
314        current_addr: TransportAddr,
315        link_stats: LinkStats,
316        is_initiator: bool,
317        mmp_config: &MmpConfig,
318        remote_epoch: Option<[u8; 8]>,
319    ) -> Self {
320        let now = Instant::now();
321        Self {
322            identity,
323            link_id,
324            connectivity: ConnectivityState::Connected,
325            noise_session: Some(noise_session),
326            our_index: Some(our_index),
327            their_index: Some(their_index),
328            transport_id: Some(transport_id),
329            current_addr: Some(current_addr),
330            declaration: None,
331            ancestry: None,
332            tree_announce_min_interval_ms: 500,
333            last_tree_announce_sent_ms: 0,
334            pending_tree_announce: false,
335            inbound_filter: None,
336            filter_sequence: 0,
337            filter_received_at: 0,
338            pending_filter_update: true,
339            session_start: now,
340            link_stats,
341            authenticated_at,
342            last_seen: authenticated_at,
343            remote_epoch,
344            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
345            last_heartbeat_sent: None,
346            handshake_msg2: None,
347            replay_suppressed_count: 0,
348            consecutive_decrypt_failures: 0,
349            session_established_at: now,
350            rekey_jitter_secs: draw_rekey_jitter(),
351            current_k_bit: false,
352            previous_session: None,
353            previous_our_index: None,
354            drain_started: None,
355            pending_new_session: None,
356            pending_our_index: None,
357            pending_their_index: None,
358            pending_rekey_initiator: false,
359            pending_rekey_completed_at: None,
360            rekey_in_progress: false,
361            last_peer_rekey: None,
362            rekey_handshake: None,
363            rekey_our_index: None,
364            rekey_msg1: None,
365            rekey_msg1_next_resend: 0,
366            #[cfg(any(target_os = "linux", target_os = "macos"))]
367            connected_udp: None,
368            #[cfg(any(target_os = "linux", target_os = "macos"))]
369            peer_recv_drain: None,
370        }
371    }
372
373    /// Unix UDP fast path: clone the refcount on the per-peer
374    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
375    /// worker send path uses this to bypass the wildcard listen
376    /// socket's per-packet sockaddr handling.
377    #[cfg(any(target_os = "linux", target_os = "macos"))]
378    pub(crate) fn connected_udp(
379        &self,
380    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
381        self.connected_udp.clone()
382    }
383
384    /// Install a per-peer `connect()`-ed UDP socket **with** its
385    /// paired recv drain thread. The two are owned together: the
386    /// drain thread is the only consumer of packets arriving on this
387    /// socket (Linux UDP demux preferentially routes them away from
388    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
389    /// so installing one without the other would silently drop
390    /// inbound packets from this peer.
391    ///
392    /// Replacing an existing pair drops the old drain (its self-pipe
393    /// shutdown signal fires; thread exits within one poll
394    /// iteration) and drops the old socket Arc. Any encrypt-worker
395    /// jobs already in-flight holding the old socket Arc stay valid
396    /// until they complete, at which point the kernel fd closes.
397    #[cfg(any(target_os = "linux", target_os = "macos"))]
398    pub(crate) fn set_connected_udp(
399        &mut self,
400        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
401        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
402    ) {
403        // Drop order matters: drop the old drain BEFORE the old
404        // socket so the drain thread's last reference to the kernel
405        // fd is released cleanly.
406        self.peer_recv_drain = None;
407        self.connected_udp = None;
408        self.connected_udp = Some(socket);
409        self.peer_recv_drain = Some(drain);
410    }
411
412    /// Clear the per-peer connected UDP socket + drain thread (e.g.
413    /// on rekey or disconnect). The drain thread exits via self-pipe
414    /// signal; the kernel fd closes when the last `Arc` to the
415    /// socket drops.
416    #[cfg(any(target_os = "linux", target_os = "macos"))]
417    pub(crate) fn clear_connected_udp(&mut self) {
418        self.peer_recv_drain = None;
419        self.connected_udp = None;
420    }
421
422    // === Identity Accessors ===
423
424    /// Get the peer's verified identity.
425    pub fn identity(&self) -> &PeerIdentity {
426        &self.identity
427    }
428
429    /// Get the peer's NodeAddr.
430    pub fn node_addr(&self) -> &NodeAddr {
431        self.identity.node_addr()
432    }
433
434    /// Get the peer's FIPS address.
435    pub fn address(&self) -> &FipsAddress {
436        self.identity.address()
437    }
438
439    /// Get the peer's public key.
440    pub fn pubkey(&self) -> XOnlyPublicKey {
441        self.identity.pubkey()
442    }
443
444    /// Get the peer's npub string.
445    pub fn npub(&self) -> String {
446        self.identity.npub()
447    }
448
449    // === Connection Accessors ===
450
451    /// Get the link ID.
452    pub fn link_id(&self) -> LinkId {
453        self.link_id
454    }
455
456    /// Get the connectivity state.
457    pub fn connectivity(&self) -> ConnectivityState {
458        self.connectivity
459    }
460
461    /// Check if peer can receive traffic.
462    pub fn can_send(&self) -> bool {
463        self.connectivity.can_send()
464    }
465
466    /// Check if peer is fully healthy.
467    pub fn is_healthy(&self) -> bool {
468        self.connectivity.is_healthy()
469    }
470
471    /// Check if peer is disconnected.
472    pub fn is_disconnected(&self) -> bool {
473        self.connectivity.is_terminal()
474    }
475
476    // === Session Accessors ===
477
478    /// Check if this peer has a Noise session.
479    pub fn has_session(&self) -> bool {
480        self.noise_session.is_some()
481    }
482
483    /// Get the Noise session, if present.
484    pub fn noise_session(&self) -> Option<&NoiseSession> {
485        self.noise_session.as_ref()
486    }
487
488    /// Get mutable access to the Noise session.
489    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
490        self.noise_session.as_mut()
491    }
492
493    /// Get our session index (they use this to send TO us).
494    pub fn our_index(&self) -> Option<SessionIndex> {
495        self.our_index
496    }
497
498    /// Get their session index (we use this to send TO them).
499    pub fn their_index(&self) -> Option<SessionIndex> {
500        self.their_index
501    }
502
503    /// Update their session index (used during cross-connection resolution
504    /// when the losing node keeps its inbound session but needs the peer's
505    /// outbound index).
506    pub fn set_their_index(&mut self, index: SessionIndex) {
507        self.their_index = Some(index);
508    }
509
510    /// Replace the Noise session and indices during cross-connection resolution.
511    ///
512    /// When both nodes simultaneously initiate, each promotes its inbound
513    /// handshake first. When the peer's msg2 arrives, we learn the correct
514    /// session — the outbound handshake that pairs with the peer's inbound.
515    /// This replaces the entire session so both nodes use matching keys.
516    ///
517    /// Returns the old our_index so the caller can update peers_by_index.
518    /// Also resets the replay suppression counter since the session changed.
519    pub fn replace_session(
520        &mut self,
521        new_session: NoiseSession,
522        new_our_index: SessionIndex,
523        new_their_index: SessionIndex,
524    ) -> Option<SessionIndex> {
525        self.reset_replay_suppressed();
526        let old_our_index = self.our_index;
527        self.noise_session = Some(new_session);
528        self.our_index = Some(new_our_index);
529        self.their_index = Some(new_their_index);
530        old_our_index
531    }
532
533    /// Get the transport ID for this peer.
534    pub fn transport_id(&self) -> Option<TransportId> {
535        self.transport_id
536    }
537
538    /// Get the current transport address.
539    pub fn current_addr(&self) -> Option<&TransportAddr> {
540        self.current_addr.as_ref()
541    }
542
543    /// Update the current address (for roaming support).
544    ///
545    /// Called when we receive a valid authenticated packet from a new address.
546    /// Short-circuits when neither the transport_id nor the TransportAddr
547    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
548    /// stable per session and the overwhelming majority of inbound
549    /// packets hit this fast path. Saves both the redundant
550    /// `Option::take` + Vec drop on the cached side and the caller's
551    /// `.clone()` allocation on the input side: the caller can pass
552    /// `&TransportAddr` and we only `.to_owned()` when storing.
553    ///
554    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
555    /// actually changed. The caller uses this signal to invalidate
556    /// derived caches whose validity is bound to the peer's 5-tuple —
557    /// in particular the Linux per-peer `connect()`-ed UDP socket,
558    /// which is pinned to one kernel route + neighbour entry and goes
559    /// stale the moment the peer roams. (Clearing it here would force
560    /// `&mut self` users into the wrong shape: the policy of when to
561    /// rebuild the connected socket lives on `Node`, not on the peer
562    /// state. Returning a bool keeps that policy where it belongs.)
563    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
564        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
565            return false;
566        }
567        self.transport_id = Some(transport_id);
568        self.current_addr = Some(addr.clone());
569        true
570    }
571
572    // === Handshake Resend ===
573
574    /// Store wire-format msg2 for resend on duplicate msg1.
575    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
576        self.handshake_msg2 = Some(msg2);
577    }
578
579    /// Get stored msg2 bytes for resend.
580    pub fn handshake_msg2(&self) -> Option<&[u8]> {
581        self.handshake_msg2.as_deref()
582    }
583
584    /// Clear stored msg2 (no longer needed after handshake window).
585    pub fn clear_handshake_msg2(&mut self) {
586        self.handshake_msg2 = None;
587    }
588
589    // === Replay Detection Suppression ===
590
591    /// Increment replay suppression counter. Returns the new count.
592    pub fn increment_replay_suppressed(&mut self) -> u32 {
593        self.replay_suppressed_count += 1;
594        self.replay_suppressed_count
595    }
596
597    /// Reset replay suppression counter, returning previous count.
598    pub fn reset_replay_suppressed(&mut self) -> u32 {
599        let count = self.replay_suppressed_count;
600        self.replay_suppressed_count = 0;
601        count
602    }
603
604    /// Current replay suppression count.
605    pub fn replay_suppressed_count(&self) -> u32 {
606        self.replay_suppressed_count
607    }
608
609    // === Decryption Failure Tracking ===
610
611    /// Increment consecutive decryption failure counter, returning new count.
612    pub fn increment_decrypt_failures(&mut self) -> u32 {
613        self.consecutive_decrypt_failures += 1;
614        self.consecutive_decrypt_failures
615    }
616
617    /// Reset consecutive decryption failure counter.
618    pub fn reset_decrypt_failures(&mut self) {
619        self.consecutive_decrypt_failures = 0;
620    }
621
622    /// Current consecutive decryption failure count.
623    pub fn consecutive_decrypt_failures(&self) -> u32 {
624        self.consecutive_decrypt_failures
625    }
626
627    // === Epoch Accessors ===
628
629    /// Get the remote peer's startup epoch (from handshake).
630    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
631        self.remote_epoch
632    }
633
634    /// Update the remote peer's startup epoch after a successful in-place
635    /// rekey. Initial handshakes set this through `with_session`, but recovery
636    /// rekeys also exchange epochs and must keep restart detection current.
637    pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
638        self.remote_epoch = remote_epoch;
639    }
640
641    // === Tree Accessors ===
642
643    /// Get the peer's tree coordinates, if known.
644    pub fn coords(&self) -> Option<&TreeCoordinate> {
645        self.ancestry.as_ref()
646    }
647
648    /// Get the peer's parent declaration, if known.
649    pub fn declaration(&self) -> Option<&ParentDeclaration> {
650        self.declaration.as_ref()
651    }
652
653    /// Check if this peer has a known tree position.
654    pub fn has_tree_position(&self) -> bool {
655        self.declaration.is_some() && self.ancestry.is_some()
656    }
657
658    // === Filter Accessors ===
659
660    /// Get the peer's inbound filter, if known.
661    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
662        self.inbound_filter.as_ref()
663    }
664
665    /// Get the filter sequence number.
666    pub fn filter_sequence(&self) -> u64 {
667        self.filter_sequence
668    }
669
670    /// Check if this peer's filter is stale.
671    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
672        if self.filter_received_at == 0 {
673            return true;
674        }
675        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
676    }
677
678    /// Check if a destination might be reachable through this peer.
679    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
680        match &self.inbound_filter {
681            Some(filter) => filter.contains(node_addr),
682            None => false,
683        }
684    }
685
686    /// Check if we need to send this peer a filter update.
687    pub fn needs_filter_update(&self) -> bool {
688        self.pending_filter_update
689    }
690
691    // === Statistics Accessors ===
692
693    /// Get link statistics.
694    pub fn link_stats(&self) -> &LinkStats {
695        &self.link_stats
696    }
697
698    /// Get mutable link statistics.
699    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
700        &mut self.link_stats
701    }
702
703    // === MMP Accessors ===
704
705    /// Get MMP state (None for legacy peers without sessions).
706    pub fn mmp(&self) -> Option<&MmpPeerState> {
707        self.mmp.as_ref()
708    }
709
710    /// Get mutable MMP state.
711    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
712        self.mmp.as_mut()
713    }
714
715    /// Link cost for routing decisions.
716    ///
717    /// Returns a scalar cost where lower is better (1.0 = ideal).
718    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
719    ///
720    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
721    /// available, matching depth-only parent selection behavior.
722    pub fn link_cost(&self) -> f64 {
723        match self.mmp() {
724            Some(mmp) => {
725                let etx = mmp.metrics.etx;
726                match mmp.metrics.srtt_ms() {
727                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
728                    None => 1.0,
729                }
730            }
731            None => 1.0,
732        }
733    }
734
735    /// Whether this peer has at least one MMP RTT measurement.
736    pub fn has_srtt(&self) -> bool {
737        self.mmp()
738            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
739    }
740
741    /// When this peer was authenticated.
742    pub fn authenticated_at(&self) -> u64 {
743        self.authenticated_at
744    }
745
746    /// When this peer was last seen.
747    pub fn last_seen(&self) -> u64 {
748        self.last_seen
749    }
750
751    /// Time since last activity.
752    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
753        current_time_ms.saturating_sub(self.last_seen)
754    }
755
756    /// Connection duration since authentication.
757    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
758        current_time_ms.saturating_sub(self.authenticated_at)
759    }
760
761    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
762    ///
763    /// Returns milliseconds since session establishment, truncated to u32.
764    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
765    pub fn session_elapsed_ms(&self) -> u32 {
766        self.session_start.elapsed().as_millis() as u32
767    }
768
769    /// When this peer's session started (for link-dead fallback timing).
770    pub fn session_start(&self) -> Instant {
771        self.session_start
772    }
773
774    // === Heartbeat ===
775
776    /// When we last sent a heartbeat to this peer.
777    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
778        self.last_heartbeat_sent
779    }
780
781    /// Record that we sent a heartbeat.
782    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
783        self.last_heartbeat_sent = Some(now);
784    }
785
786    // === State Updates ===
787
788    /// Update last seen timestamp.
789    pub fn touch(&mut self, current_time_ms: u64) {
790        self.last_seen = current_time_ms;
791        // Stale links are still sendable, so authenticated traffic refreshes
792        // them. Reconnecting links were declared link-dead and need a fresh
793        // handshake/reprobe before they can carry traffic again.
794        if self.connectivity == ConnectivityState::Stale {
795            self.connectivity = ConnectivityState::Connected;
796        }
797    }
798
799    /// Mark peer as stale (no recent traffic).
800    pub fn mark_stale(&mut self) {
801        if self.connectivity == ConnectivityState::Connected {
802            self.connectivity = ConnectivityState::Stale;
803        }
804    }
805
806    /// Mark peer as reconnecting.
807    pub fn mark_reconnecting(&mut self) {
808        self.connectivity = ConnectivityState::Reconnecting;
809    }
810
811    /// Mark peer as disconnected.
812    pub fn mark_disconnected(&mut self) {
813        self.connectivity = ConnectivityState::Disconnected;
814    }
815
816    /// Mark peer as connected (e.g., after successful reconnect).
817    pub fn mark_connected(&mut self, current_time_ms: u64) {
818        self.connectivity = ConnectivityState::Connected;
819        self.last_seen = current_time_ms;
820    }
821
822    /// Update the link ID (e.g., on reconnect).
823    pub fn set_link_id(&mut self, link_id: LinkId) {
824        self.link_id = link_id;
825    }
826
827    // === Tree Updates ===
828
829    /// Update peer's tree position.
830    pub fn update_tree_position(
831        &mut self,
832        declaration: ParentDeclaration,
833        ancestry: TreeCoordinate,
834        current_time_ms: u64,
835    ) {
836        self.declaration = Some(declaration);
837        self.ancestry = Some(ancestry);
838        self.last_seen = current_time_ms;
839    }
840
841    /// Clear peer's tree position.
842    pub fn clear_tree_position(&mut self) {
843        self.declaration = None;
844        self.ancestry = None;
845    }
846
847    // === Tree Announce Rate Limiting ===
848
849    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
850    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
851        self.tree_announce_min_interval_ms = ms;
852    }
853
854    /// Get the last tree announce send timestamp (for carrying across reconnection).
855    pub fn last_tree_announce_sent_ms(&self) -> u64 {
856        self.last_tree_announce_sent_ms
857    }
858
859    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
860    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
861        self.last_tree_announce_sent_ms = ms;
862    }
863
864    /// Check if we can send a TreeAnnounce now (rate limiting).
865    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
866        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
867    }
868
869    /// Record that we sent a TreeAnnounce to this peer.
870    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
871        self.last_tree_announce_sent_ms = now_ms;
872        self.pending_tree_announce = false;
873    }
874
875    /// Mark that a tree announce is pending (deferred due to rate limit).
876    pub fn mark_tree_announce_pending(&mut self) {
877        self.pending_tree_announce = true;
878    }
879
880    /// Check if a deferred tree announce is waiting to be sent.
881    pub fn has_pending_tree_announce(&self) -> bool {
882        self.pending_tree_announce
883    }
884
885    // === Filter Updates ===
886
887    /// Update peer's inbound filter.
888    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
889        self.inbound_filter = Some(filter);
890        self.filter_sequence = sequence;
891        self.filter_received_at = current_time_ms;
892        self.last_seen = current_time_ms;
893    }
894
895    /// Clear peer's inbound filter.
896    pub fn clear_filter(&mut self) {
897        self.inbound_filter = None;
898        self.filter_sequence = 0;
899        self.filter_received_at = 0;
900    }
901
902    /// Mark that we need to send this peer a filter update.
903    pub fn mark_filter_update_needed(&mut self) {
904        self.pending_filter_update = true;
905    }
906
907    /// Clear the pending filter update flag.
908    pub fn clear_filter_update_needed(&mut self) {
909        self.pending_filter_update = false;
910    }
911
912    // === Rekey (Key Rotation) ===
913
914    /// When the current Noise session was established.
915    pub fn session_established_at(&self) -> Instant {
916        self.session_established_at
917    }
918
919    #[cfg(test)]
920    pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
921        self.session_established_at = instant;
922    }
923
924    /// Per-session symmetric rekey-timer jitter offset (seconds).
925    pub fn rekey_jitter_secs(&self) -> i64 {
926        self.rekey_jitter_secs
927    }
928
929    /// Current K-bit epoch value.
930    pub fn current_k_bit(&self) -> bool {
931        self.current_k_bit
932    }
933
934    /// Whether a rekey is currently in progress.
935    pub fn rekey_in_progress(&self) -> bool {
936        self.rekey_in_progress
937    }
938
939    /// Mark that a rekey has been initiated.
940    pub fn set_rekey_in_progress(&mut self) {
941        self.rekey_in_progress = true;
942    }
943
944    /// Check if rekey initiation is dampened (peer recently sent us msg1).
945    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
946        match self.last_peer_rekey {
947            Some(t) => t.elapsed().as_secs() < dampening_secs,
948            None => false,
949        }
950    }
951
952    /// Record that the peer initiated a rekey (for dampening).
953    pub fn record_peer_rekey(&mut self) {
954        self.last_peer_rekey = Some(Instant::now());
955    }
956
957    /// Get the pending new session's our_index.
958    pub fn pending_our_index(&self) -> Option<SessionIndex> {
959        self.pending_our_index
960    }
961
962    /// Get the pending new session's their_index.
963    pub fn pending_their_index(&self) -> Option<SessionIndex> {
964        self.pending_their_index
965    }
966
967    /// Get the previous session's our_index (during drain).
968    pub fn previous_our_index(&self) -> Option<SessionIndex> {
969        self.previous_our_index
970    }
971
972    /// Get the previous session for decryption fallback.
973    pub fn previous_session(&self) -> Option<&NoiseSession> {
974        self.previous_session.as_ref()
975    }
976
977    /// Get mutable access to the previous session for decryption.
978    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
979        self.previous_session.as_mut()
980    }
981
982    /// Get the pending new session (completed rekey, not yet cut over).
983    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
984        self.pending_new_session.as_ref()
985    }
986
987    /// Whether this node should drive the K-bit cutover for the pending FMP rekey.
988    pub fn pending_rekey_initiator(&self) -> bool {
989        self.pending_rekey_initiator
990    }
991
992    /// Whether the locally initiated pending FMP rekey has waited long enough
993    /// to cut over. Responders cut over only after observing the peer's K-bit.
994    pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
995        self.pending_rekey_initiator
996            && self
997                .pending_rekey_completed_at
998                .is_some_and(|completed| completed.elapsed() >= delay)
999    }
1000
1001    /// Store a completed rekey session and its indices.
1002    ///
1003    /// Called when the rekey handshake completes. Initiators cut over after a
1004    /// short grace period; responders hold the session pending until they
1005    /// authenticate the peer's K-bit flip.
1006    pub fn set_pending_session(
1007        &mut self,
1008        session: NoiseSession,
1009        our_index: SessionIndex,
1010        their_index: SessionIndex,
1011        initiated_by_local: bool,
1012    ) {
1013        self.pending_new_session = Some(session);
1014        self.pending_our_index = Some(our_index);
1015        self.pending_their_index = Some(their_index);
1016        self.pending_rekey_initiator = initiated_by_local;
1017        self.pending_rekey_completed_at = Some(Instant::now());
1018        self.rekey_in_progress = false;
1019        // Clear initiator handshake state (index now lives in pending_our_index)
1020        self.rekey_our_index = None;
1021        self.rekey_handshake = None;
1022        self.rekey_msg1 = None;
1023        self.rekey_msg1_next_resend = 0;
1024    }
1025
1026    /// Cut over to the pending new session (initiator side).
1027    ///
1028    /// Moves current session to previous (for drain), promotes pending to current,
1029    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
1030    /// during the drain window.
1031    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1032        let new_session = self.pending_new_session.take()?;
1033        let new_our_index = self.pending_our_index.take();
1034        let new_their_index = self.pending_their_index.take();
1035
1036        // Demote current to previous
1037        self.previous_session = self.noise_session.take();
1038        self.previous_our_index = self.our_index;
1039        self.drain_started = Some(Instant::now());
1040
1041        // Promote pending to current
1042        self.noise_session = Some(new_session);
1043        self.our_index = new_our_index;
1044        self.their_index = new_their_index;
1045        self.pending_rekey_initiator = false;
1046        self.pending_rekey_completed_at = None;
1047
1048        // Flip K-bit and reset timing
1049        self.current_k_bit = !self.current_k_bit;
1050        self.session_established_at = Instant::now();
1051        self.session_start = Instant::now();
1052        self.rekey_in_progress = false;
1053        self.rekey_jitter_secs = draw_rekey_jitter();
1054        self.last_heartbeat_sent = None;
1055        self.reset_replay_suppressed();
1056
1057        // Reset MMP counters to avoid metric discontinuity
1058        let now = Instant::now();
1059        if let Some(mmp) = &mut self.mmp {
1060            mmp.reset_for_rekey(now);
1061        }
1062
1063        self.previous_our_index
1064    }
1065
1066    /// Handle receiving a K-bit flip from the peer (responder side).
1067    ///
1068    /// Promotes pending_new_session to current, demotes current to previous.
1069    /// Returns the old our_index for drain tracking.
1070    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1071        let new_session = self.pending_new_session.take()?;
1072        let new_our_index = self.pending_our_index.take();
1073        let new_their_index = self.pending_their_index.take();
1074
1075        // Demote current to previous
1076        self.previous_session = self.noise_session.take();
1077        self.previous_our_index = self.our_index;
1078        self.drain_started = Some(Instant::now());
1079
1080        // Promote pending to current
1081        self.noise_session = Some(new_session);
1082        self.our_index = new_our_index;
1083        self.their_index = new_their_index;
1084        self.pending_rekey_initiator = false;
1085        self.pending_rekey_completed_at = None;
1086
1087        // Match peer's K-bit
1088        self.current_k_bit = !self.current_k_bit;
1089        self.session_established_at = Instant::now();
1090        self.session_start = Instant::now();
1091        self.rekey_in_progress = false;
1092        self.rekey_jitter_secs = draw_rekey_jitter();
1093        self.last_heartbeat_sent = None;
1094        self.reset_replay_suppressed();
1095
1096        // Reset MMP counters to avoid metric discontinuity
1097        let now = Instant::now();
1098        if let Some(mmp) = &mut self.mmp {
1099            mmp.reset_for_rekey(now);
1100        }
1101
1102        self.previous_our_index
1103    }
1104
1105    /// Check if the drain window has expired.
1106    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1107        match self.drain_started {
1108            Some(t) => t.elapsed().as_secs() >= drain_secs,
1109            None => false,
1110        }
1111    }
1112
1113    /// Whether a drain is in progress.
1114    pub fn is_draining(&self) -> bool {
1115        self.drain_started.is_some()
1116    }
1117
1118    /// Complete the drain: drop previous session and free its index.
1119    ///
1120    /// Returns the previous our_index so the caller can remove it from
1121    /// peers_by_index and free it from the IndexAllocator.
1122    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1123        self.previous_session = None;
1124        self.drain_started = None;
1125        self.previous_our_index.take()
1126    }
1127
1128    /// Abandon an in-progress rekey.
1129    ///
1130    /// Returns the rekey our_index so the caller can free it.
1131    /// Also clears any pending session state if the handshake was completed
1132    /// but not yet cut over.
1133    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1134        self.rekey_handshake = None;
1135        self.rekey_msg1 = None;
1136        self.rekey_msg1_next_resend = 0;
1137        self.rekey_in_progress = false;
1138        // Return whichever index needs freeing
1139        self.rekey_our_index.take().or_else(|| {
1140            self.pending_new_session = None;
1141            self.pending_their_index = None;
1142            self.pending_rekey_initiator = false;
1143            self.pending_rekey_completed_at = None;
1144            self.pending_our_index.take()
1145        })
1146    }
1147
1148    // === Rekey Handshake State (Initiator) ===
1149
1150    /// Store rekey handshake state after sending msg1.
1151    pub fn set_rekey_state(
1152        &mut self,
1153        handshake: NoiseHandshakeState,
1154        our_index: SessionIndex,
1155        wire_msg1: Vec<u8>,
1156        next_resend_ms: u64,
1157    ) {
1158        self.rekey_handshake = Some(handshake);
1159        self.rekey_our_index = Some(our_index);
1160        self.rekey_msg1 = Some(wire_msg1);
1161        self.rekey_msg1_next_resend = next_resend_ms;
1162        self.rekey_in_progress = true;
1163    }
1164
1165    /// Get the rekey our_index (for msg2 dispatch lookup).
1166    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1167        self.rekey_our_index
1168    }
1169
1170    /// Complete the rekey by processing msg2 (initiator side).
1171    ///
1172    /// Takes the stored handshake state, reads msg2, and returns the
1173    /// completed NoiseSession. Clears the handshake-related fields but
1174    /// leaves rekey_our_index for set_pending_session to use.
1175    pub fn complete_rekey_msg2(
1176        &mut self,
1177        msg2_bytes: &[u8],
1178    ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1179        let mut hs = self
1180            .rekey_handshake
1181            .take()
1182            .ok_or_else(|| NoiseError::WrongState {
1183                expected: "rekey handshake in progress".to_string(),
1184                got: "no handshake state".to_string(),
1185            })?;
1186
1187        hs.read_message_2(msg2_bytes)?;
1188        let remote_epoch = hs.remote_epoch();
1189        let session = hs.into_session()?;
1190
1191        // Clear msg1 resend state
1192        self.rekey_msg1 = None;
1193        self.rekey_msg1_next_resend = 0;
1194
1195        Ok((session, remote_epoch))
1196    }
1197
1198    /// Check if msg1 needs resending.
1199    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1200        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1201    }
1202
1203    /// Get msg1 bytes for resend (without consuming).
1204    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1205        self.rekey_msg1.as_deref()
1206    }
1207
1208    /// Update next resend timestamp.
1209    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1210        self.rekey_msg1_next_resend = next_ms;
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use super::*;
1217    use crate::Identity;
1218
1219    fn make_peer_identity() -> PeerIdentity {
1220        let identity = Identity::generate();
1221        PeerIdentity::from_pubkey(identity.pubkey())
1222    }
1223
1224    fn make_node_addr(val: u8) -> NodeAddr {
1225        let mut bytes = [0u8; 16];
1226        bytes[0] = val;
1227        NodeAddr::from_bytes(bytes)
1228    }
1229
1230    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1231        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1232    }
1233
1234    #[test]
1235    fn test_connectivity_state_properties() {
1236        assert!(ConnectivityState::Connected.can_send());
1237        assert!(ConnectivityState::Stale.can_send());
1238        assert!(!ConnectivityState::Reconnecting.can_send());
1239        assert!(!ConnectivityState::Disconnected.can_send());
1240
1241        assert!(ConnectivityState::Connected.is_healthy());
1242        assert!(!ConnectivityState::Stale.is_healthy());
1243
1244        assert!(ConnectivityState::Disconnected.is_terminal());
1245        assert!(!ConnectivityState::Connected.is_terminal());
1246    }
1247
1248    #[test]
1249    fn test_active_peer_creation() {
1250        let identity = make_peer_identity();
1251        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1252
1253        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1254        assert_eq!(peer.link_id(), LinkId::new(1));
1255        assert!(peer.is_healthy());
1256        assert!(peer.can_send());
1257        assert_eq!(peer.authenticated_at(), 1000);
1258        assert!(peer.needs_filter_update()); // New peers need filter
1259    }
1260
1261    #[test]
1262    fn test_connectivity_transitions() {
1263        let identity = make_peer_identity();
1264        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1265
1266        assert!(peer.is_healthy());
1267
1268        peer.mark_stale();
1269        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1270        assert!(peer.can_send()); // Stale can still send
1271
1272        // Traffic received brings back to connected
1273        peer.touch(2000);
1274        assert!(peer.is_healthy());
1275
1276        peer.mark_reconnecting();
1277        assert!(!peer.can_send());
1278        peer.touch(2500);
1279        assert_eq!(peer.connectivity(), ConnectivityState::Reconnecting);
1280        assert!(!peer.can_send());
1281
1282        peer.mark_connected(3000);
1283        assert!(peer.is_healthy());
1284
1285        peer.mark_disconnected();
1286        assert!(peer.is_disconnected());
1287        assert!(!peer.can_send());
1288    }
1289
1290    #[test]
1291    fn test_tree_position() {
1292        let identity = make_peer_identity();
1293        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1294
1295        assert!(!peer.has_tree_position());
1296        assert!(peer.coords().is_none());
1297
1298        let node = make_node_addr(1);
1299        let parent = make_node_addr(2);
1300        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1301        let coords = make_coords(&[1, 2, 0]);
1302
1303        peer.update_tree_position(decl, coords, 2000);
1304
1305        assert!(peer.has_tree_position());
1306        assert!(peer.coords().is_some());
1307        assert_eq!(peer.last_seen(), 2000);
1308    }
1309
1310    #[test]
1311    fn test_bloom_filter() {
1312        let identity = make_peer_identity();
1313        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1314        let target = make_node_addr(42);
1315
1316        assert!(!peer.may_reach(&target));
1317        assert!(peer.filter_is_stale(2000, 500));
1318
1319        let mut filter = BloomFilter::new();
1320        filter.insert(&target);
1321        peer.update_filter(filter, 1, 1500);
1322
1323        assert!(peer.may_reach(&target));
1324        assert!(!peer.filter_is_stale(1800, 500));
1325        assert!(peer.filter_is_stale(2500, 500));
1326    }
1327
1328    #[test]
1329    fn test_timing() {
1330        let identity = make_peer_identity();
1331        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1332
1333        assert_eq!(peer.connection_duration(2000), 1000);
1334        assert_eq!(peer.idle_time(2000), 1000);
1335    }
1336
1337    #[test]
1338    fn test_filter_update_flag() {
1339        let identity = make_peer_identity();
1340        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1341
1342        assert!(peer.needs_filter_update()); // New peer
1343
1344        peer.clear_filter_update_needed();
1345        assert!(!peer.needs_filter_update());
1346
1347        peer.mark_filter_update_needed();
1348        assert!(peer.needs_filter_update());
1349    }
1350
1351    #[test]
1352    fn test_with_stats() {
1353        let identity = make_peer_identity();
1354        let mut stats = LinkStats::new();
1355        stats.record_sent(100);
1356        stats.record_recv(200, 500);
1357
1358        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1359
1360        assert_eq!(peer.link_stats().packets_sent, 1);
1361        assert_eq!(peer.link_stats().packets_recv, 1);
1362    }
1363
1364    #[test]
1365    fn test_replay_suppression_counter() {
1366        let identity = make_peer_identity();
1367        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1368
1369        // Initial count is zero
1370        assert_eq!(peer.replay_suppressed_count(), 0);
1371
1372        // Increment returns new count
1373        assert_eq!(peer.increment_replay_suppressed(), 1);
1374        assert_eq!(peer.increment_replay_suppressed(), 2);
1375        assert_eq!(peer.increment_replay_suppressed(), 3);
1376        assert_eq!(peer.replay_suppressed_count(), 3);
1377
1378        // Reset returns previous count and zeroes it
1379        assert_eq!(peer.reset_replay_suppressed(), 3);
1380        assert_eq!(peer.replay_suppressed_count(), 0);
1381
1382        // Can increment again after reset
1383        assert_eq!(peer.increment_replay_suppressed(), 1);
1384        assert_eq!(peer.replay_suppressed_count(), 1);
1385
1386        // Reset when zero returns zero
1387        peer.reset_replay_suppressed();
1388        assert_eq!(peer.reset_replay_suppressed(), 0);
1389    }
1390
1391    #[test]
1392    fn test_increment_decrypt_failures_monotonic() {
1393        let identity = make_peer_identity();
1394        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1395
1396        // Initial count is zero
1397        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1398
1399        // Each call returns a strictly increasing count
1400        let mut prev = 0u32;
1401        for expected in 1..=25u32 {
1402            let count = peer.increment_decrypt_failures();
1403            assert_eq!(count, expected, "increment must return monotonic count");
1404            assert!(count > prev, "count must strictly increase");
1405            assert_eq!(peer.consecutive_decrypt_failures(), count);
1406            prev = count;
1407        }
1408    }
1409
1410    #[test]
1411    fn test_reset_decrypt_failures_zeroes_counter() {
1412        let identity = make_peer_identity();
1413        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1414
1415        // Drive counter up
1416        for _ in 0..7 {
1417            peer.increment_decrypt_failures();
1418        }
1419        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1420
1421        // Reset zeroes it
1422        peer.reset_decrypt_failures();
1423        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1424
1425        // Reset on zero is a no-op (still zero, no panic)
1426        peer.reset_decrypt_failures();
1427        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1428
1429        // Counter resumes at 1 after reset
1430        assert_eq!(peer.increment_decrypt_failures(), 1);
1431        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1432    }
1433
1434    #[test]
1435    fn test_rekey_jitter_in_range() {
1436        for _ in 0..100 {
1437            let identity = make_peer_identity();
1438            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1439            let jitter = peer.rekey_jitter_secs();
1440            assert!(
1441                (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1442                "jitter {} outside [-{}, +{}]",
1443                jitter,
1444                REKEY_JITTER_SECS,
1445                REKEY_JITTER_SECS
1446            );
1447        }
1448    }
1449
1450    #[test]
1451    fn test_rekey_jitter_mean_near_zero() {
1452        let mut sum = 0i64;
1453        let n = 200i64;
1454
1455        for _ in 0..n {
1456            let identity = make_peer_identity();
1457            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1458            sum += peer.rekey_jitter_secs();
1459        }
1460
1461        let mean = sum / n;
1462        assert!(
1463            mean.abs() < 5,
1464            "empirical mean {} not within 5 of 0 over {} samples",
1465            mean,
1466            n
1467        );
1468    }
1469}