Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20    rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23/// Connectivity state for an active peer.
24///
25/// This is simpler than the full PeerState since authentication is complete.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28    /// Peer is fully connected and responsive.
29    Connected,
30    /// Peer hasn't been heard from recently (potential timeout).
31    Stale,
32    /// Connection lost, attempting to reconnect.
33    Reconnecting,
34    /// Peer has been explicitly disconnected.
35    Disconnected,
36}
37
38impl ConnectivityState {
39    /// Check if the peer is usable for sending traffic.
40    pub fn can_send(&self) -> bool {
41        matches!(
42            self,
43            ConnectivityState::Connected | ConnectivityState::Stale
44        )
45    }
46
47    /// Check if this is a terminal state requiring cleanup.
48    pub fn is_terminal(&self) -> bool {
49        matches!(self, ConnectivityState::Disconnected)
50    }
51
52    /// Check if peer is fully healthy.
53    pub fn is_healthy(&self) -> bool {
54        matches!(self, ConnectivityState::Connected)
55    }
56}
57
58impl fmt::Display for ConnectivityState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let s = match self {
61            ConnectivityState::Connected => "connected",
62            ConnectivityState::Stale => "stale",
63            ConnectivityState::Reconnecting => "reconnecting",
64            ConnectivityState::Disconnected => "disconnected",
65        };
66        write!(f, "{}", s)
67    }
68}
69
70/// A fully authenticated remote FIPS node.
71///
72/// Created only after successful Noise KK handshake. The identity is
73/// cryptographically verified at this point.
74///
75/// Note: ActivePeer intentionally does not implement Clone because it
76/// contains NoiseSession, which cannot be safely cloned (cloning would
77/// risk nonce reuse, a catastrophic security failure).
78#[derive(Debug)]
79pub struct ActivePeer {
80    // === Identity (Verified) ===
81    /// Cryptographic identity (verified via handshake).
82    identity: PeerIdentity,
83
84    // === Connection ===
85    /// Link used to reach this peer.
86    link_id: LinkId,
87    /// Current connectivity state.
88    connectivity: ConnectivityState,
89
90    // === Session (Wire Protocol) ===
91    /// Noise session for encryption/decryption (None if legacy peer).
92    noise_session: Option<NoiseSession>,
93    /// Our session index (they include this when sending TO us).
94    our_index: Option<SessionIndex>,
95    /// Their session index (we include this when sending TO them).
96    their_index: Option<SessionIndex>,
97    /// Transport ID for this peer's link.
98    transport_id: Option<TransportId>,
99    /// Current transport address (for roaming support).
100    current_addr: Option<TransportAddr>,
101
102    // === Spanning Tree ===
103    /// Their latest parent declaration.
104    declaration: Option<ParentDeclaration>,
105    /// Their path to root.
106    ancestry: Option<TreeCoordinate>,
107
108    // === Tree Announce Rate Limiting ===
109    /// Minimum interval between TreeAnnounce messages (milliseconds).
110    tree_announce_min_interval_ms: u64,
111    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
112    last_tree_announce_sent_ms: u64,
113    /// Whether a tree announce is pending (deferred due to rate limit).
114    pending_tree_announce: bool,
115
116    // === Bloom Filter ===
117    /// What's reachable through them (inbound filter).
118    inbound_filter: Option<BloomFilter>,
119    /// Their filter's sequence number.
120    filter_sequence: u64,
121    /// When we received their last filter (Unix milliseconds).
122    filter_received_at: u64,
123    /// Whether we owe them a filter update.
124    pending_filter_update: bool,
125
126    // === Timing ===
127    /// Session start time for computing session-relative timestamps.
128    /// Used as the epoch for the 4-byte inner header timestamp field.
129    session_start: Instant,
130
131    // === Statistics ===
132    /// Link statistics.
133    link_stats: LinkStats,
134    /// When this peer was authenticated (Unix milliseconds).
135    authenticated_at: u64,
136    /// When this peer was last seen (any activity, Unix milliseconds).
137    last_seen: u64,
138
139    // === Epoch (Restart Detection) ===
140    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
141    remote_epoch: Option<[u8; 8]>,
142
143    // === MMP ===
144    /// Per-peer MMP state (None for legacy peers without Noise sessions).
145    mmp: Option<MmpPeerState>,
146
147    // === Heartbeat ===
148    /// When we last sent a heartbeat to this peer.
149    last_heartbeat_sent: Option<Instant>,
150
151    // === Handshake Resend ===
152    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
153    /// Cleared after the handshake timeout window.
154    handshake_msg2: Option<Vec<u8>>,
155
156    // === Replay Detection Suppression ===
157    /// Number of replay detections suppressed since last session reset.
158    replay_suppressed_count: u32,
159    /// Consecutive decryption failures (reset on any successful decrypt).
160    consecutive_decrypt_failures: u32,
161
162    // === Rekey (Key Rotation) ===
163    /// When the current Noise session was established (for rekey timer).
164    session_established_at: Instant,
165    /// Per-session symmetric jitter applied to the rekey timer trigger.
166    rekey_jitter_secs: i64,
167    /// Current K-bit epoch value (alternates each rekey).
168    current_k_bit: bool,
169    /// Previous session kept alive during drain window after cutover.
170    previous_session: Option<NoiseSession>,
171    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
172    previous_our_index: Option<SessionIndex>,
173    /// When the drain window started (None = no drain in progress).
174    drain_started: Option<Instant>,
175    /// Pending new session from completed rekey (before K-bit cutover).
176    pending_new_session: Option<NoiseSession>,
177    /// Pending new session's our_index.
178    pending_our_index: Option<SessionIndex>,
179    /// Pending new session's their_index.
180    pending_their_index: Option<SessionIndex>,
181    /// True when this node initiated the pending FMP rekey.
182    pending_rekey_initiator: bool,
183    /// When the pending FMP rekey completed locally.
184    pending_rekey_completed_at: Option<Instant>,
185    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
186    rekey_in_progress: bool,
187    /// When we last received a rekey msg1 from this peer (dampening).
188    last_peer_rekey: Option<Instant>,
189    /// In-progress rekey: Noise handshake state (initiator only).
190    rekey_handshake: Option<NoiseHandshakeState>,
191    /// In-progress rekey: our new session index.
192    rekey_our_index: Option<SessionIndex>,
193    /// In-progress rekey: wire-format msg1 for resend.
194    rekey_msg1: Option<Vec<u8>>,
195    /// In-progress rekey: next resend timestamp (Unix ms).
196    rekey_msg1_next_resend: u64,
197    /// In-progress rekey: number of msg1 retransmissions performed so far.
198    rekey_msg1_resend_count: u32,
199
200    // === Connected Peer UDP Socket (Unix fast path) ===
201    /// Per-peer `connect()`-ed UDP socket, opened once we have a
202    /// stable kernel `SocketAddr` for the peer (i.e. session
203    /// established + transport address known). When `Some`, the
204    /// encrypt-worker send path can `sendmsg(2)` on this fd without
205    /// per-packet `msg_name` — the kernel-side route + neighbor cache
206    /// is pinned by the `connect()` call. On the receive side, Linux
207    /// and Darwin UDP demux preferentially route inbound packets from
208    /// this peer to this socket (most-specific 5-tuple match via
209    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
210    ///
211    /// Closed automatically on Drop. Behind an `Arc` so the
212    /// encrypt-worker's send path can hold a refcount without owning
213    /// the only handle (rekey / address-change may rotate the socket
214    /// while older jobs are still in-flight on the worker channel).
215    #[cfg(any(target_os = "linux", target_os = "macos"))]
216    connected_udp:
217        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
218
219    /// Per-peer receive-drain thread. Always paired with
220    /// `connected_udp`: while the connected socket is installed, the
221    /// kernel UDP demux preferentially routes inbound packets from
222    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
223    /// socket *must* be drained or packets pile up in its kernel
224    /// recv buffer. Drop signals the thread to exit via self-pipe.
225    #[cfg(any(target_os = "linux", target_os = "macos"))]
226    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
227}
228
229impl ActivePeer {
230    /// Create a new active peer from verified identity.
231    ///
232    /// Called after successful authentication handshake.
233    /// For peers with Noise sessions, use `with_session` instead.
234    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
235        let now = Instant::now();
236        Self {
237            identity,
238            link_id,
239            connectivity: ConnectivityState::Connected,
240            noise_session: None,
241            our_index: None,
242            their_index: None,
243            transport_id: None,
244            current_addr: None,
245            declaration: None,
246            ancestry: None,
247            tree_announce_min_interval_ms: 500,
248            last_tree_announce_sent_ms: 0,
249            pending_tree_announce: false,
250            inbound_filter: None,
251            filter_sequence: 0,
252            filter_received_at: 0,
253            pending_filter_update: true, // Send filter on new connection
254            session_start: now,
255            link_stats: LinkStats::new(),
256            authenticated_at,
257            last_seen: authenticated_at,
258            remote_epoch: None,
259            mmp: None,
260            last_heartbeat_sent: None,
261            handshake_msg2: None,
262            replay_suppressed_count: 0,
263            consecutive_decrypt_failures: 0,
264            session_established_at: now,
265            rekey_jitter_secs: draw_rekey_jitter(),
266            current_k_bit: false,
267            previous_session: None,
268            previous_our_index: None,
269            drain_started: None,
270            pending_new_session: None,
271            pending_our_index: None,
272            pending_their_index: None,
273            pending_rekey_initiator: false,
274            pending_rekey_completed_at: None,
275            rekey_in_progress: false,
276            last_peer_rekey: None,
277            rekey_handshake: None,
278            rekey_our_index: None,
279            rekey_msg1: None,
280            rekey_msg1_next_resend: 0,
281            rekey_msg1_resend_count: 0,
282            #[cfg(any(target_os = "linux", target_os = "macos"))]
283            connected_udp: None,
284            #[cfg(any(target_os = "linux", target_os = "macos"))]
285            peer_recv_drain: None,
286        }
287    }
288
289    /// Create from verified identity with existing link stats.
290    ///
291    /// Used when promoting from PeerConnection, preserving handshake stats.
292    /// For peers with Noise sessions, use `with_session` instead.
293    pub fn with_stats(
294        identity: PeerIdentity,
295        link_id: LinkId,
296        authenticated_at: u64,
297        link_stats: LinkStats,
298    ) -> Self {
299        let mut peer = Self::new(identity, link_id, authenticated_at);
300        peer.link_stats = link_stats;
301        peer
302    }
303
304    /// Create from verified identity with Noise session and index tracking.
305    ///
306    /// This is the primary constructor for the wire protocol path.
307    /// The NoiseSession provides encryption/decryption and replay protection.
308    #[allow(clippy::too_many_arguments)]
309    pub fn with_session(
310        identity: PeerIdentity,
311        link_id: LinkId,
312        authenticated_at: u64,
313        noise_session: NoiseSession,
314        our_index: SessionIndex,
315        their_index: SessionIndex,
316        transport_id: TransportId,
317        current_addr: TransportAddr,
318        link_stats: LinkStats,
319        is_initiator: bool,
320        mmp_config: &MmpConfig,
321        remote_epoch: Option<[u8; 8]>,
322    ) -> Self {
323        let now = Instant::now();
324        Self {
325            identity,
326            link_id,
327            connectivity: ConnectivityState::Connected,
328            noise_session: Some(noise_session),
329            our_index: Some(our_index),
330            their_index: Some(their_index),
331            transport_id: Some(transport_id),
332            current_addr: Some(current_addr),
333            declaration: None,
334            ancestry: None,
335            tree_announce_min_interval_ms: 500,
336            last_tree_announce_sent_ms: 0,
337            pending_tree_announce: false,
338            inbound_filter: None,
339            filter_sequence: 0,
340            filter_received_at: 0,
341            pending_filter_update: true,
342            session_start: now,
343            link_stats,
344            authenticated_at,
345            last_seen: authenticated_at,
346            remote_epoch,
347            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
348            last_heartbeat_sent: None,
349            handshake_msg2: None,
350            replay_suppressed_count: 0,
351            consecutive_decrypt_failures: 0,
352            session_established_at: now,
353            rekey_jitter_secs: draw_rekey_jitter(),
354            current_k_bit: false,
355            previous_session: None,
356            previous_our_index: None,
357            drain_started: None,
358            pending_new_session: None,
359            pending_our_index: None,
360            pending_their_index: None,
361            pending_rekey_initiator: false,
362            pending_rekey_completed_at: None,
363            rekey_in_progress: false,
364            last_peer_rekey: None,
365            rekey_handshake: None,
366            rekey_our_index: None,
367            rekey_msg1: None,
368            rekey_msg1_next_resend: 0,
369            rekey_msg1_resend_count: 0,
370            #[cfg(any(target_os = "linux", target_os = "macos"))]
371            connected_udp: None,
372            #[cfg(any(target_os = "linux", target_os = "macos"))]
373            peer_recv_drain: None,
374        }
375    }
376
377    /// Unix UDP fast path: clone the refcount on the per-peer
378    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
379    /// worker send path uses this to bypass the wildcard listen
380    /// socket's per-packet sockaddr handling.
381    #[cfg(any(target_os = "linux", target_os = "macos"))]
382    pub(crate) fn connected_udp(
383        &self,
384    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
385        self.connected_udp.clone()
386    }
387
388    /// Install a per-peer `connect()`-ed UDP socket **with** its
389    /// paired recv drain thread. The two are owned together: the
390    /// drain thread is the only consumer of packets arriving on this
391    /// socket (Linux UDP demux preferentially routes them away from
392    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
393    /// so installing one without the other would silently drop
394    /// inbound packets from this peer.
395    ///
396    /// Replacing an existing pair drops the old drain (its self-pipe
397    /// shutdown signal fires; thread exits within one poll
398    /// iteration) and drops the old socket Arc. Any encrypt-worker
399    /// jobs already in-flight holding the old socket Arc stay valid
400    /// until they complete, at which point the kernel fd closes.
401    #[cfg(any(target_os = "linux", target_os = "macos"))]
402    pub(crate) fn set_connected_udp(
403        &mut self,
404        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
405        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
406    ) {
407        // Drop order matters: drop the old drain BEFORE the old
408        // socket so the drain thread's last reference to the kernel
409        // fd is released cleanly.
410        self.peer_recv_drain = None;
411        self.connected_udp = None;
412        self.connected_udp = Some(socket);
413        self.peer_recv_drain = Some(drain);
414    }
415
416    /// Clear the per-peer connected UDP socket + drain thread (e.g.
417    /// on rekey or disconnect). The drain thread exits via self-pipe
418    /// signal; the kernel fd closes when the last `Arc` to the
419    /// socket drops.
420    #[cfg(any(target_os = "linux", target_os = "macos"))]
421    pub(crate) fn clear_connected_udp(&mut self) {
422        self.peer_recv_drain = None;
423        self.connected_udp = None;
424    }
425
426    // === Identity Accessors ===
427
428    /// Get the peer's verified identity.
429    pub fn identity(&self) -> &PeerIdentity {
430        &self.identity
431    }
432
433    /// Get the peer's NodeAddr.
434    pub fn node_addr(&self) -> &NodeAddr {
435        self.identity.node_addr()
436    }
437
438    /// Get the peer's FIPS address.
439    pub fn address(&self) -> &FipsAddress {
440        self.identity.address()
441    }
442
443    /// Get the peer's public key.
444    pub fn pubkey(&self) -> XOnlyPublicKey {
445        self.identity.pubkey()
446    }
447
448    /// Get the peer's npub string.
449    pub fn npub(&self) -> String {
450        self.identity.npub()
451    }
452
453    // === Connection Accessors ===
454
455    /// Get the link ID.
456    pub fn link_id(&self) -> LinkId {
457        self.link_id
458    }
459
460    /// Get the connectivity state.
461    pub fn connectivity(&self) -> ConnectivityState {
462        self.connectivity
463    }
464
465    /// Check if peer can receive traffic.
466    pub fn can_send(&self) -> bool {
467        self.connectivity.can_send()
468    }
469
470    /// Check if peer is fully healthy.
471    pub fn is_healthy(&self) -> bool {
472        self.connectivity.is_healthy()
473    }
474
475    /// Check if peer is disconnected.
476    pub fn is_disconnected(&self) -> bool {
477        self.connectivity.is_terminal()
478    }
479
480    // === Session Accessors ===
481
482    /// Check if this peer has a Noise session.
483    pub fn has_session(&self) -> bool {
484        self.noise_session.is_some()
485    }
486
487    /// Get the Noise session, if present.
488    pub fn noise_session(&self) -> Option<&NoiseSession> {
489        self.noise_session.as_ref()
490    }
491
492    /// Get mutable access to the Noise session.
493    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
494        self.noise_session.as_mut()
495    }
496
497    /// Get our session index (they use this to send TO us).
498    pub fn our_index(&self) -> Option<SessionIndex> {
499        self.our_index
500    }
501
502    /// Get their session index (we use this to send TO them).
503    pub fn their_index(&self) -> Option<SessionIndex> {
504        self.their_index
505    }
506
507    /// Update their session index (used during cross-connection resolution
508    /// when the losing node keeps its inbound session but needs the peer's
509    /// outbound index).
510    pub fn set_their_index(&mut self, index: SessionIndex) {
511        self.their_index = Some(index);
512    }
513
514    /// Replace the Noise session and indices during cross-connection resolution.
515    ///
516    /// When both nodes simultaneously initiate, each promotes its inbound
517    /// handshake first. When the peer's msg2 arrives, we learn the correct
518    /// session — the outbound handshake that pairs with the peer's inbound.
519    /// This replaces the entire session so both nodes use matching keys.
520    ///
521    /// Returns the old our_index so the caller can update peers_by_index.
522    /// Also resets the replay suppression counter since the session changed.
523    pub fn replace_session(
524        &mut self,
525        new_session: NoiseSession,
526        new_our_index: SessionIndex,
527        new_their_index: SessionIndex,
528    ) -> Option<SessionIndex> {
529        self.reset_replay_suppressed();
530        let old_our_index = self.our_index;
531        self.noise_session = Some(new_session);
532        self.our_index = Some(new_our_index);
533        self.their_index = Some(new_their_index);
534        old_our_index
535    }
536
537    /// Get the transport ID for this peer.
538    pub fn transport_id(&self) -> Option<TransportId> {
539        self.transport_id
540    }
541
542    /// Get the current transport address.
543    pub fn current_addr(&self) -> Option<&TransportAddr> {
544        self.current_addr.as_ref()
545    }
546
547    /// Update the current address (for roaming support).
548    ///
549    /// Called when we receive a valid authenticated packet from a new address.
550    /// Short-circuits when neither the transport_id nor the TransportAddr
551    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
552    /// stable per session and the overwhelming majority of inbound
553    /// packets hit this fast path. Saves both the redundant
554    /// `Option::take` + Vec drop on the cached side and the caller's
555    /// `.clone()` allocation on the input side: the caller can pass
556    /// `&TransportAddr` and we only `.to_owned()` when storing.
557    ///
558    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
559    /// actually changed. The caller uses this signal to invalidate
560    /// derived caches whose validity is bound to the peer's 5-tuple —
561    /// in particular the Linux per-peer `connect()`-ed UDP socket,
562    /// which is pinned to one kernel route + neighbour entry and goes
563    /// stale the moment the peer roams. (Clearing it here would force
564    /// `&mut self` users into the wrong shape: the policy of when to
565    /// rebuild the connected socket lives on `Node`, not on the peer
566    /// state. Returning a bool keeps that policy where it belongs.)
567    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
568        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
569            return false;
570        }
571        self.transport_id = Some(transport_id);
572        self.current_addr = Some(addr.clone());
573        true
574    }
575
576    // === Handshake Resend ===
577
578    /// Store wire-format msg2 for resend on duplicate msg1.
579    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
580        self.handshake_msg2 = Some(msg2);
581    }
582
583    /// Get stored msg2 bytes for resend.
584    pub fn handshake_msg2(&self) -> Option<&[u8]> {
585        self.handshake_msg2.as_deref()
586    }
587
588    /// Clear stored msg2 (no longer needed after handshake window).
589    pub fn clear_handshake_msg2(&mut self) {
590        self.handshake_msg2 = None;
591    }
592
593    // === Replay Detection Suppression ===
594
595    /// Increment replay suppression counter. Returns the new count.
596    pub fn increment_replay_suppressed(&mut self) -> u32 {
597        self.replay_suppressed_count += 1;
598        self.replay_suppressed_count
599    }
600
601    /// Reset replay suppression counter, returning previous count.
602    pub fn reset_replay_suppressed(&mut self) -> u32 {
603        let count = self.replay_suppressed_count;
604        self.replay_suppressed_count = 0;
605        count
606    }
607
608    /// Current replay suppression count.
609    pub fn replay_suppressed_count(&self) -> u32 {
610        self.replay_suppressed_count
611    }
612
613    // === Decryption Failure Tracking ===
614
615    /// Increment consecutive decryption failure counter, returning new count.
616    pub fn increment_decrypt_failures(&mut self) -> u32 {
617        self.consecutive_decrypt_failures += 1;
618        self.consecutive_decrypt_failures
619    }
620
621    /// Reset consecutive decryption failure counter.
622    pub fn reset_decrypt_failures(&mut self) {
623        self.consecutive_decrypt_failures = 0;
624    }
625
626    /// Current consecutive decryption failure count.
627    pub fn consecutive_decrypt_failures(&self) -> u32 {
628        self.consecutive_decrypt_failures
629    }
630
631    // === Epoch Accessors ===
632
633    /// Get the remote peer's startup epoch (from handshake).
634    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
635        self.remote_epoch
636    }
637
638    /// Update the remote peer's startup epoch after a successful in-place
639    /// rekey. Initial handshakes set this through `with_session`, but recovery
640    /// rekeys also exchange epochs and must keep restart detection current.
641    pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
642        self.remote_epoch = remote_epoch;
643    }
644
645    // === Tree Accessors ===
646
647    /// Get the peer's tree coordinates, if known.
648    pub fn coords(&self) -> Option<&TreeCoordinate> {
649        self.ancestry.as_ref()
650    }
651
652    /// Get the peer's parent declaration, if known.
653    pub fn declaration(&self) -> Option<&ParentDeclaration> {
654        self.declaration.as_ref()
655    }
656
657    /// Check if this peer has a known tree position.
658    pub fn has_tree_position(&self) -> bool {
659        self.declaration.is_some() && self.ancestry.is_some()
660    }
661
662    // === Filter Accessors ===
663
664    /// Get the peer's inbound filter, if known.
665    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
666        self.inbound_filter.as_ref()
667    }
668
669    /// Get the filter sequence number.
670    pub fn filter_sequence(&self) -> u64 {
671        self.filter_sequence
672    }
673
674    /// Check if this peer's filter is stale.
675    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
676        if self.filter_received_at == 0 {
677            return true;
678        }
679        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
680    }
681
682    /// Check if a destination might be reachable through this peer.
683    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
684        match &self.inbound_filter {
685            Some(filter) => filter.contains(node_addr),
686            None => false,
687        }
688    }
689
690    /// Check if we need to send this peer a filter update.
691    pub fn needs_filter_update(&self) -> bool {
692        self.pending_filter_update
693    }
694
695    // === Statistics Accessors ===
696
697    /// Get link statistics.
698    pub fn link_stats(&self) -> &LinkStats {
699        &self.link_stats
700    }
701
702    /// Get mutable link statistics.
703    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
704        &mut self.link_stats
705    }
706
707    // === MMP Accessors ===
708
709    /// Get MMP state (None for legacy peers without sessions).
710    pub fn mmp(&self) -> Option<&MmpPeerState> {
711        self.mmp.as_ref()
712    }
713
714    /// Get mutable MMP state.
715    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
716        self.mmp.as_mut()
717    }
718
719    /// Link cost for routing decisions.
720    ///
721    /// Returns a scalar cost where lower is better (1.0 = ideal).
722    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
723    ///
724    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
725    /// available, matching depth-only parent selection behavior.
726    pub fn link_cost(&self) -> f64 {
727        match self.mmp() {
728            Some(mmp) => {
729                let etx = mmp.metrics.etx;
730                match mmp.metrics.srtt_ms() {
731                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
732                    None => 1.0,
733                }
734            }
735            None => 1.0,
736        }
737    }
738
739    /// Whether this peer has at least one MMP RTT measurement.
740    pub fn has_srtt(&self) -> bool {
741        self.mmp()
742            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
743    }
744
745    /// When this peer was authenticated.
746    pub fn authenticated_at(&self) -> u64 {
747        self.authenticated_at
748    }
749
750    /// When this peer was last seen.
751    pub fn last_seen(&self) -> u64 {
752        self.last_seen
753    }
754
755    /// Time since last activity.
756    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
757        current_time_ms.saturating_sub(self.last_seen)
758    }
759
760    /// Connection duration since authentication.
761    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
762        current_time_ms.saturating_sub(self.authenticated_at)
763    }
764
765    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
766    ///
767    /// Returns milliseconds since session establishment, truncated to u32.
768    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
769    pub fn session_elapsed_ms(&self) -> u32 {
770        self.session_start.elapsed().as_millis() as u32
771    }
772
773    /// When this peer's session started (for link-dead fallback timing).
774    pub fn session_start(&self) -> Instant {
775        self.session_start
776    }
777
778    // === Heartbeat ===
779
780    /// When we last sent a heartbeat to this peer.
781    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
782        self.last_heartbeat_sent
783    }
784
785    /// Record that we sent a heartbeat.
786    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
787        self.last_heartbeat_sent = Some(now);
788    }
789
790    // === State Updates ===
791
792    /// Update last seen timestamp.
793    pub fn touch(&mut self, current_time_ms: u64) {
794        self.last_seen = current_time_ms;
795        // Stale links are still sendable, so authenticated traffic refreshes
796        // them. Reconnecting links were declared link-dead and need a fresh
797        // handshake/reprobe before they can carry traffic again.
798        if self.connectivity == ConnectivityState::Stale {
799            self.connectivity = ConnectivityState::Connected;
800        }
801    }
802
803    /// Mark peer as stale (no recent traffic).
804    pub fn mark_stale(&mut self) {
805        if self.connectivity == ConnectivityState::Connected {
806            self.connectivity = ConnectivityState::Stale;
807        }
808    }
809
810    /// Mark peer as reconnecting.
811    pub fn mark_reconnecting(&mut self) {
812        self.connectivity = ConnectivityState::Reconnecting;
813    }
814
815    /// Mark peer as disconnected.
816    pub fn mark_disconnected(&mut self) {
817        self.connectivity = ConnectivityState::Disconnected;
818    }
819
820    /// Mark peer as connected (e.g., after successful reconnect).
821    pub fn mark_connected(&mut self, current_time_ms: u64) {
822        self.connectivity = ConnectivityState::Connected;
823        self.last_seen = current_time_ms;
824    }
825
826    /// Update the link ID (e.g., on reconnect).
827    pub fn set_link_id(&mut self, link_id: LinkId) {
828        self.link_id = link_id;
829    }
830
831    // === Tree Updates ===
832
833    /// Update peer's tree position.
834    pub fn update_tree_position(
835        &mut self,
836        declaration: ParentDeclaration,
837        ancestry: TreeCoordinate,
838        current_time_ms: u64,
839    ) {
840        self.declaration = Some(declaration);
841        self.ancestry = Some(ancestry);
842        self.last_seen = current_time_ms;
843    }
844
845    /// Clear peer's tree position.
846    pub fn clear_tree_position(&mut self) {
847        self.declaration = None;
848        self.ancestry = None;
849    }
850
851    // === Tree Announce Rate Limiting ===
852
853    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
854    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
855        self.tree_announce_min_interval_ms = ms;
856    }
857
858    /// Get the last tree announce send timestamp (for carrying across reconnection).
859    pub fn last_tree_announce_sent_ms(&self) -> u64 {
860        self.last_tree_announce_sent_ms
861    }
862
863    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
864    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
865        self.last_tree_announce_sent_ms = ms;
866    }
867
868    /// Check if we can send a TreeAnnounce now (rate limiting).
869    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
870        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
871    }
872
873    /// Record that we sent a TreeAnnounce to this peer.
874    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
875        self.last_tree_announce_sent_ms = now_ms;
876        self.pending_tree_announce = false;
877    }
878
879    /// Mark that a tree announce is pending (deferred due to rate limit).
880    pub fn mark_tree_announce_pending(&mut self) {
881        self.pending_tree_announce = true;
882    }
883
884    /// Check if a deferred tree announce is waiting to be sent.
885    pub fn has_pending_tree_announce(&self) -> bool {
886        self.pending_tree_announce
887    }
888
889    // === Filter Updates ===
890
891    /// Update peer's inbound filter.
892    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
893        self.inbound_filter = Some(filter);
894        self.filter_sequence = sequence;
895        self.filter_received_at = current_time_ms;
896        self.last_seen = current_time_ms;
897    }
898
899    /// Clear peer's inbound filter.
900    pub fn clear_filter(&mut self) {
901        self.inbound_filter = None;
902        self.filter_sequence = 0;
903        self.filter_received_at = 0;
904    }
905
906    /// Mark that we need to send this peer a filter update.
907    pub fn mark_filter_update_needed(&mut self) {
908        self.pending_filter_update = true;
909    }
910
911    /// Clear the pending filter update flag.
912    pub fn clear_filter_update_needed(&mut self) {
913        self.pending_filter_update = false;
914    }
915
916    // === Rekey (Key Rotation) ===
917
918    /// When the current Noise session was established.
919    pub fn session_established_at(&self) -> Instant {
920        self.session_established_at
921    }
922
923    #[cfg(test)]
924    pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
925        self.session_established_at = instant;
926    }
927
928    /// Per-session symmetric rekey-timer jitter offset (seconds).
929    pub fn rekey_jitter_secs(&self) -> i64 {
930        self.rekey_jitter_secs
931    }
932
933    /// Current K-bit epoch value.
934    pub fn current_k_bit(&self) -> bool {
935        self.current_k_bit
936    }
937
938    /// Whether a rekey is currently in progress.
939    pub fn rekey_in_progress(&self) -> bool {
940        self.rekey_in_progress
941    }
942
943    /// Mark that a rekey has been initiated.
944    pub fn set_rekey_in_progress(&mut self) {
945        self.rekey_in_progress = true;
946    }
947
948    /// Check if rekey initiation is dampened (peer recently sent us msg1).
949    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
950        match self.last_peer_rekey {
951            Some(t) => t.elapsed().as_secs() < dampening_secs,
952            None => false,
953        }
954    }
955
956    /// Record that the peer initiated a rekey (for dampening).
957    pub fn record_peer_rekey(&mut self) {
958        self.last_peer_rekey = Some(Instant::now());
959    }
960
961    /// Get the pending new session's our_index.
962    pub fn pending_our_index(&self) -> Option<SessionIndex> {
963        self.pending_our_index
964    }
965
966    /// Get the pending new session's their_index.
967    pub fn pending_their_index(&self) -> Option<SessionIndex> {
968        self.pending_their_index
969    }
970
971    /// Get the previous session's our_index (during drain).
972    pub fn previous_our_index(&self) -> Option<SessionIndex> {
973        self.previous_our_index
974    }
975
976    /// Get the previous session for decryption fallback.
977    pub fn previous_session(&self) -> Option<&NoiseSession> {
978        self.previous_session.as_ref()
979    }
980
981    /// Get mutable access to the previous session for decryption.
982    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
983        self.previous_session.as_mut()
984    }
985
986    /// Get the pending new session (completed rekey, not yet cut over).
987    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
988        self.pending_new_session.as_ref()
989    }
990
991    /// Trial-decrypt a peer K-bit flip frame against the pending FMP session.
992    ///
993    /// The peer's K-bit is only a hint that it may have cut over. The
994    /// authenticated decrypt against `pending_new_session` is the real cutover
995    /// signal; failed trials leave the pending replay window untouched.
996    pub(crate) fn trial_decrypt_pending_new_session(
997        &mut self,
998        ciphertext: &[u8],
999        counter: u64,
1000        aad: &[u8],
1001    ) -> Option<Vec<u8>> {
1002        self.pending_new_session.as_mut().and_then(|session| {
1003            session
1004                .decrypt_with_replay_check_and_aad(ciphertext, counter, aad)
1005                .ok()
1006        })
1007    }
1008
1009    /// Whether this node should drive the K-bit cutover for the pending FMP rekey.
1010    pub fn pending_rekey_initiator(&self) -> bool {
1011        self.pending_rekey_initiator
1012    }
1013
1014    /// Whether the locally initiated pending FMP rekey has waited long enough
1015    /// to cut over. Responders cut over only after observing the peer's K-bit.
1016    pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
1017        self.pending_rekey_initiator
1018            && self
1019                .pending_rekey_completed_at
1020                .is_some_and(|completed| completed.elapsed() >= delay)
1021    }
1022
1023    /// Store a completed rekey session and its indices.
1024    ///
1025    /// Called when the rekey handshake completes. Initiators cut over after a
1026    /// short grace period; responders hold the session pending until they
1027    /// authenticate the peer's K-bit flip.
1028    pub fn set_pending_session(
1029        &mut self,
1030        session: NoiseSession,
1031        our_index: SessionIndex,
1032        their_index: SessionIndex,
1033        initiated_by_local: bool,
1034    ) {
1035        self.pending_new_session = Some(session);
1036        self.pending_our_index = Some(our_index);
1037        self.pending_their_index = Some(their_index);
1038        self.pending_rekey_initiator = initiated_by_local;
1039        self.pending_rekey_completed_at = Some(Instant::now());
1040        self.rekey_in_progress = false;
1041        // Clear initiator handshake state (index now lives in pending_our_index)
1042        self.rekey_our_index = None;
1043        self.rekey_handshake = None;
1044        self.rekey_msg1 = None;
1045        self.rekey_msg1_next_resend = 0;
1046        self.rekey_msg1_resend_count = 0;
1047    }
1048
1049    /// Cut over to the pending new session (initiator side).
1050    ///
1051    /// Moves current session to previous (for drain), promotes pending to current,
1052    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
1053    /// during the drain window.
1054    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1055        let new_session = self.pending_new_session.take()?;
1056        let new_our_index = self.pending_our_index.take();
1057        let new_their_index = self.pending_their_index.take();
1058
1059        // Demote current to previous
1060        self.previous_session = self.noise_session.take();
1061        self.previous_our_index = self.our_index;
1062        self.drain_started = Some(Instant::now());
1063
1064        // Promote pending to current
1065        self.noise_session = Some(new_session);
1066        self.our_index = new_our_index;
1067        self.their_index = new_their_index;
1068        self.pending_rekey_initiator = false;
1069        self.pending_rekey_completed_at = None;
1070
1071        // Flip K-bit and reset timing
1072        self.current_k_bit = !self.current_k_bit;
1073        self.session_established_at = Instant::now();
1074        self.session_start = Instant::now();
1075        self.rekey_in_progress = false;
1076        self.rekey_msg1_resend_count = 0;
1077        self.rekey_jitter_secs = draw_rekey_jitter();
1078        self.last_heartbeat_sent = None;
1079        self.reset_replay_suppressed();
1080
1081        // Reset MMP counters to avoid metric discontinuity
1082        let now = Instant::now();
1083        if let Some(mmp) = &mut self.mmp {
1084            mmp.reset_for_rekey(now);
1085        }
1086
1087        self.previous_our_index
1088    }
1089
1090    /// Handle receiving a K-bit flip from the peer (responder side).
1091    ///
1092    /// Promotes pending_new_session to current, demotes current to previous.
1093    /// Returns the old our_index for drain tracking.
1094    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1095        let new_session = self.pending_new_session.take()?;
1096        let new_our_index = self.pending_our_index.take();
1097        let new_their_index = self.pending_their_index.take();
1098
1099        // Demote current to previous
1100        self.previous_session = self.noise_session.take();
1101        self.previous_our_index = self.our_index;
1102        self.drain_started = Some(Instant::now());
1103
1104        // Promote pending to current
1105        self.noise_session = Some(new_session);
1106        self.our_index = new_our_index;
1107        self.their_index = new_their_index;
1108        self.pending_rekey_initiator = false;
1109        self.pending_rekey_completed_at = None;
1110
1111        // Match peer's K-bit
1112        self.current_k_bit = !self.current_k_bit;
1113        self.session_established_at = Instant::now();
1114        self.session_start = Instant::now();
1115        self.rekey_in_progress = false;
1116        self.rekey_msg1_resend_count = 0;
1117        self.rekey_jitter_secs = draw_rekey_jitter();
1118        self.last_heartbeat_sent = None;
1119        self.reset_replay_suppressed();
1120
1121        // Reset MMP counters to avoid metric discontinuity
1122        let now = Instant::now();
1123        if let Some(mmp) = &mut self.mmp {
1124            mmp.reset_for_rekey(now);
1125        }
1126
1127        self.previous_our_index
1128    }
1129
1130    /// Check if the drain window has expired.
1131    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1132        match self.drain_started {
1133            Some(t) => t.elapsed().as_secs() >= drain_secs,
1134            None => false,
1135        }
1136    }
1137
1138    /// Whether a drain is in progress.
1139    pub fn is_draining(&self) -> bool {
1140        self.drain_started.is_some()
1141    }
1142
1143    /// Complete the drain: drop previous session and free its index.
1144    ///
1145    /// Returns the previous our_index so the caller can remove it from
1146    /// peers_by_index and free it from the IndexAllocator.
1147    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1148        self.previous_session = None;
1149        self.drain_started = None;
1150        self.previous_our_index.take()
1151    }
1152
1153    /// Abandon an in-progress rekey.
1154    ///
1155    /// Returns the rekey our_index so the caller can free it.
1156    /// Also clears any pending session state if the handshake was completed
1157    /// but not yet cut over.
1158    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1159        self.rekey_handshake = None;
1160        self.rekey_msg1 = None;
1161        self.rekey_msg1_next_resend = 0;
1162        self.rekey_msg1_resend_count = 0;
1163        self.rekey_in_progress = false;
1164        // Return whichever index needs freeing
1165        self.rekey_our_index.take().or_else(|| {
1166            self.pending_new_session = None;
1167            self.pending_their_index = None;
1168            self.pending_rekey_initiator = false;
1169            self.pending_rekey_completed_at = None;
1170            self.pending_our_index.take()
1171        })
1172    }
1173
1174    // === Rekey Handshake State (Initiator) ===
1175
1176    /// Store rekey handshake state after sending msg1.
1177    pub fn set_rekey_state(
1178        &mut self,
1179        handshake: NoiseHandshakeState,
1180        our_index: SessionIndex,
1181        wire_msg1: Vec<u8>,
1182        next_resend_ms: u64,
1183    ) {
1184        self.rekey_handshake = Some(handshake);
1185        self.rekey_our_index = Some(our_index);
1186        self.rekey_msg1 = Some(wire_msg1);
1187        self.rekey_msg1_next_resend = next_resend_ms;
1188        self.rekey_msg1_resend_count = 0;
1189        self.rekey_in_progress = true;
1190    }
1191
1192    /// Get the rekey our_index (for msg2 dispatch lookup).
1193    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1194        self.rekey_our_index
1195    }
1196
1197    /// Complete the rekey by processing msg2 (initiator side).
1198    ///
1199    /// Takes the stored handshake state, reads msg2, and returns the
1200    /// completed NoiseSession. Clears the handshake-related fields but
1201    /// leaves rekey_our_index for set_pending_session to use.
1202    pub fn complete_rekey_msg2(
1203        &mut self,
1204        msg2_bytes: &[u8],
1205    ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1206        let mut hs = self
1207            .rekey_handshake
1208            .take()
1209            .ok_or_else(|| NoiseError::WrongState {
1210                expected: "rekey handshake in progress".to_string(),
1211                got: "no handshake state".to_string(),
1212            })?;
1213
1214        hs.read_message_2(msg2_bytes)?;
1215        let remote_epoch = hs.remote_epoch();
1216        let session = hs.into_session()?;
1217
1218        // Clear msg1 resend state
1219        self.rekey_msg1 = None;
1220        self.rekey_msg1_next_resend = 0;
1221        self.rekey_msg1_resend_count = 0;
1222
1223        Ok((session, remote_epoch))
1224    }
1225
1226    /// Check if msg1 needs resending.
1227    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1228        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1229    }
1230
1231    /// Get msg1 bytes for resend (without consuming).
1232    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1233        self.rekey_msg1.as_deref()
1234    }
1235
1236    /// Update next resend timestamp.
1237    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1238        self.rekey_msg1_next_resend = next_ms;
1239    }
1240
1241    /// Number of rekey msg1 retransmissions performed so far.
1242    pub fn rekey_msg1_resend_count(&self) -> u32 {
1243        self.rekey_msg1_resend_count
1244    }
1245
1246    /// Record a rekey msg1 retransmission and schedule the next one.
1247    pub fn record_rekey_msg1_resend(&mut self, next_ms: u64) {
1248        self.rekey_msg1_resend_count = self.rekey_msg1_resend_count.saturating_add(1);
1249        self.rekey_msg1_next_resend = next_ms;
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256    use crate::Identity;
1257
1258    fn make_peer_identity() -> PeerIdentity {
1259        let identity = Identity::generate();
1260        PeerIdentity::from_pubkey(identity.pubkey())
1261    }
1262
1263    fn make_node_addr(val: u8) -> NodeAddr {
1264        let mut bytes = [0u8; 16];
1265        bytes[0] = val;
1266        NodeAddr::from_bytes(bytes)
1267    }
1268
1269    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1270        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1271    }
1272
1273    fn ik_session_pair() -> (NoiseSession, NoiseSession) {
1274        let initiator_id = Identity::generate();
1275        let responder_id = Identity::generate();
1276        let mut initiator =
1277            NoiseHandshakeState::new_initiator(initiator_id.keypair(), responder_id.pubkey_full());
1278        let mut responder = NoiseHandshakeState::new_responder(responder_id.keypair());
1279        initiator.set_local_epoch([0xA1, 0xB2, 0xC3, 0xD4, 0x11, 0x22, 0x33, 0x44]);
1280        responder.set_local_epoch([0xD4, 0xC3, 0xB2, 0xA1, 0x44, 0x33, 0x22, 0x11]);
1281
1282        let msg1 = initiator.write_message_1().unwrap();
1283        responder.read_message_1(&msg1).unwrap();
1284        let msg2 = responder.write_message_2().unwrap();
1285        initiator.read_message_2(&msg2).unwrap();
1286
1287        (
1288            initiator.into_session().unwrap(),
1289            responder.into_session().unwrap(),
1290        )
1291    }
1292
1293    fn seal_fmp(
1294        sender: &mut NoiseSession,
1295        receiver_idx: SessionIndex,
1296        plaintext: &[u8],
1297        k_bit: bool,
1298    ) -> (Vec<u8>, u64, [u8; 16]) {
1299        use crate::node::wire::{FLAG_KEY_EPOCH, build_established_header};
1300
1301        let counter = sender.current_send_counter();
1302        let flags = if k_bit { FLAG_KEY_EPOCH } else { 0 };
1303        let header = build_established_header(receiver_idx, counter, flags, plaintext.len() as u16);
1304        let ciphertext = sender.encrypt_with_aad(plaintext, &header).unwrap();
1305        (ciphertext, counter, header)
1306    }
1307
1308    fn peer_with_current(current_recv: NoiseSession) -> ActivePeer {
1309        let identity = make_peer_identity();
1310        ActivePeer::with_session(
1311            identity,
1312            LinkId::new(1),
1313            1_000,
1314            current_recv,
1315            SessionIndex::new(1),
1316            SessionIndex::new(2),
1317            TransportId::new(1),
1318            TransportAddr::from_string("hci0/AA:BB:CC:DD:EE:01"),
1319            LinkStats::new(),
1320            true,
1321            &MmpConfig::default(),
1322            None,
1323        )
1324    }
1325
1326    #[test]
1327    fn test_connectivity_state_properties() {
1328        assert!(ConnectivityState::Connected.can_send());
1329        assert!(ConnectivityState::Stale.can_send());
1330        assert!(!ConnectivityState::Reconnecting.can_send());
1331        assert!(!ConnectivityState::Disconnected.can_send());
1332
1333        assert!(ConnectivityState::Connected.is_healthy());
1334        assert!(!ConnectivityState::Stale.is_healthy());
1335
1336        assert!(ConnectivityState::Disconnected.is_terminal());
1337        assert!(!ConnectivityState::Connected.is_terminal());
1338    }
1339
1340    #[test]
1341    fn test_active_peer_creation() {
1342        let identity = make_peer_identity();
1343        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1344
1345        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1346        assert_eq!(peer.link_id(), LinkId::new(1));
1347        assert!(peer.is_healthy());
1348        assert!(peer.can_send());
1349        assert_eq!(peer.authenticated_at(), 1000);
1350        assert!(peer.needs_filter_update()); // New peers need filter
1351    }
1352
1353    #[test]
1354    fn test_connectivity_transitions() {
1355        let identity = make_peer_identity();
1356        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1357
1358        assert!(peer.is_healthy());
1359
1360        peer.mark_stale();
1361        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1362        assert!(peer.can_send()); // Stale can still send
1363
1364        // Traffic received brings back to connected
1365        peer.touch(2000);
1366        assert!(peer.is_healthy());
1367
1368        peer.mark_reconnecting();
1369        assert!(!peer.can_send());
1370        peer.touch(2500);
1371        assert_eq!(peer.connectivity(), ConnectivityState::Reconnecting);
1372        assert!(!peer.can_send());
1373
1374        peer.mark_connected(3000);
1375        assert!(peer.is_healthy());
1376
1377        peer.mark_disconnected();
1378        assert!(peer.is_disconnected());
1379        assert!(!peer.can_send());
1380    }
1381
1382    #[test]
1383    fn test_tree_position() {
1384        let identity = make_peer_identity();
1385        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1386
1387        assert!(!peer.has_tree_position());
1388        assert!(peer.coords().is_none());
1389
1390        let node = make_node_addr(1);
1391        let parent = make_node_addr(2);
1392        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1393        let coords = make_coords(&[1, 2, 0]);
1394
1395        peer.update_tree_position(decl, coords, 2000);
1396
1397        assert!(peer.has_tree_position());
1398        assert!(peer.coords().is_some());
1399        assert_eq!(peer.last_seen(), 2000);
1400    }
1401
1402    #[test]
1403    fn test_bloom_filter() {
1404        let identity = make_peer_identity();
1405        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1406        let target = make_node_addr(42);
1407
1408        assert!(!peer.may_reach(&target));
1409        assert!(peer.filter_is_stale(2000, 500));
1410
1411        let mut filter = BloomFilter::new();
1412        filter.insert(&target);
1413        peer.update_filter(filter, 1, 1500);
1414
1415        assert!(peer.may_reach(&target));
1416        assert!(!peer.filter_is_stale(1800, 500));
1417        assert!(peer.filter_is_stale(2500, 500));
1418    }
1419
1420    #[test]
1421    fn test_timing() {
1422        let identity = make_peer_identity();
1423        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1424
1425        assert_eq!(peer.connection_duration(2000), 1000);
1426        assert_eq!(peer.idle_time(2000), 1000);
1427    }
1428
1429    #[test]
1430    fn test_filter_update_flag() {
1431        let identity = make_peer_identity();
1432        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1433
1434        assert!(peer.needs_filter_update()); // New peer
1435
1436        peer.clear_filter_update_needed();
1437        assert!(!peer.needs_filter_update());
1438
1439        peer.mark_filter_update_needed();
1440        assert!(peer.needs_filter_update());
1441    }
1442
1443    #[test]
1444    fn test_with_stats() {
1445        let identity = make_peer_identity();
1446        let mut stats = LinkStats::new();
1447        stats.record_sent(100);
1448        stats.record_recv(200, 500);
1449
1450        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1451
1452        assert_eq!(peer.link_stats().packets_sent, 1);
1453        assert_eq!(peer.link_stats().packets_recv, 1);
1454    }
1455
1456    #[test]
1457    fn test_replay_suppression_counter() {
1458        let identity = make_peer_identity();
1459        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1460
1461        // Initial count is zero
1462        assert_eq!(peer.replay_suppressed_count(), 0);
1463
1464        // Increment returns new count
1465        assert_eq!(peer.increment_replay_suppressed(), 1);
1466        assert_eq!(peer.increment_replay_suppressed(), 2);
1467        assert_eq!(peer.increment_replay_suppressed(), 3);
1468        assert_eq!(peer.replay_suppressed_count(), 3);
1469
1470        // Reset returns previous count and zeroes it
1471        assert_eq!(peer.reset_replay_suppressed(), 3);
1472        assert_eq!(peer.replay_suppressed_count(), 0);
1473
1474        // Can increment again after reset
1475        assert_eq!(peer.increment_replay_suppressed(), 1);
1476        assert_eq!(peer.replay_suppressed_count(), 1);
1477
1478        // Reset when zero returns zero
1479        peer.reset_replay_suppressed();
1480        assert_eq!(peer.reset_replay_suppressed(), 0);
1481    }
1482
1483    #[test]
1484    fn test_increment_decrypt_failures_monotonic() {
1485        let identity = make_peer_identity();
1486        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1487
1488        // Initial count is zero
1489        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1490
1491        // Each call returns a strictly increasing count
1492        let mut prev = 0u32;
1493        for expected in 1..=25u32 {
1494            let count = peer.increment_decrypt_failures();
1495            assert_eq!(count, expected, "increment must return monotonic count");
1496            assert!(count > prev, "count must strictly increase");
1497            assert_eq!(peer.consecutive_decrypt_failures(), count);
1498            prev = count;
1499        }
1500    }
1501
1502    #[test]
1503    fn test_reset_decrypt_failures_zeroes_counter() {
1504        let identity = make_peer_identity();
1505        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1506
1507        // Drive counter up
1508        for _ in 0..7 {
1509            peer.increment_decrypt_failures();
1510        }
1511        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1512
1513        // Reset zeroes it
1514        peer.reset_decrypt_failures();
1515        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1516
1517        // Reset on zero is a no-op (still zero, no panic)
1518        peer.reset_decrypt_failures();
1519        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1520
1521        // Counter resumes at 1 after reset
1522        assert_eq!(peer.increment_decrypt_failures(), 1);
1523        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1524    }
1525
1526    #[test]
1527    fn test_rekey_jitter_in_range() {
1528        for _ in 0..100 {
1529            let identity = make_peer_identity();
1530            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1531            let jitter = peer.rekey_jitter_secs();
1532            assert!(
1533                (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1534                "jitter {} outside [-{}, +{}]",
1535                jitter,
1536                REKEY_JITTER_SECS,
1537                REKEY_JITTER_SECS
1538            );
1539        }
1540    }
1541
1542    #[test]
1543    fn test_rekey_jitter_mean_near_zero() {
1544        let mut sum = 0i64;
1545        let n = 200i64;
1546
1547        for _ in 0..n {
1548            let identity = make_peer_identity();
1549            let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1550            sum += peer.rekey_jitter_secs();
1551        }
1552
1553        let mean = sum / n;
1554        assert!(
1555            mean.abs() < 5,
1556            "empirical mean {} not within 5 of 0 over {} samples",
1557            mean,
1558            n
1559        );
1560    }
1561
1562    #[test]
1563    fn fmp_pending_trial_authenticates_then_promotes() {
1564        let (_current_sender, current_receiver) = ik_session_pair();
1565        let (mut pending_sender, pending_receiver) = ik_session_pair();
1566        let mut peer = peer_with_current(current_receiver);
1567        let k_before = peer.current_k_bit();
1568        peer.set_pending_session(
1569            pending_receiver,
1570            SessionIndex::new(3),
1571            SessionIndex::new(4),
1572            false,
1573        );
1574
1575        let (ciphertext, counter, header) = seal_fmp(
1576            &mut pending_sender,
1577            SessionIndex::new(3),
1578            b"new-epoch",
1579            !k_before,
1580        );
1581        let plaintext = peer
1582            .trial_decrypt_pending_new_session(&ciphertext, counter, &header)
1583            .expect("pending frame must authenticate");
1584
1585        assert_eq!(plaintext, b"new-epoch");
1586        assert!(peer.handle_peer_kbit_flip().is_some());
1587        assert!(peer.pending_new_session().is_none());
1588        assert_eq!(peer.current_k_bit(), !k_before);
1589        assert!(peer.previous_session().is_some());
1590    }
1591
1592    #[test]
1593    fn fmp_pending_trial_failure_leaves_pending_replay_intact() {
1594        let (_current_sender, current_receiver) = ik_session_pair();
1595        let (mut pending_sender, pending_receiver) = ik_session_pair();
1596        let (mut stale_sender, _stale_receiver) = ik_session_pair();
1597        let mut peer = peer_with_current(current_receiver);
1598        let k_before = peer.current_k_bit();
1599        peer.set_pending_session(
1600            pending_receiver,
1601            SessionIndex::new(3),
1602            SessionIndex::new(4),
1603            false,
1604        );
1605
1606        let (stale_ciphertext, stale_counter, stale_header) =
1607            seal_fmp(&mut stale_sender, SessionIndex::new(3), b"stale", !k_before);
1608        assert!(
1609            peer.trial_decrypt_pending_new_session(&stale_ciphertext, stale_counter, &stale_header)
1610                .is_none()
1611        );
1612        assert!(peer.pending_new_session().is_some());
1613        assert_eq!(peer.current_k_bit(), k_before);
1614
1615        let (pending_ciphertext, pending_counter, pending_header) = seal_fmp(
1616            &mut pending_sender,
1617            SessionIndex::new(3),
1618            b"new-epoch",
1619            !k_before,
1620        );
1621        let plaintext = peer
1622            .trial_decrypt_pending_new_session(
1623                &pending_ciphertext,
1624                pending_counter,
1625                &pending_header,
1626            )
1627            .expect("failed stale trial must not consume pending replay window");
1628        assert_eq!(plaintext, b"new-epoch");
1629    }
1630}