Skip to main content

fips_core/peer/
active.rs

1//! Active Peer (Authenticated Phase)
2//!
3//! Represents a fully authenticated peer after successful Noise handshake.
4//! ActivePeer holds tree state, Bloom filter, and routing information.
5
6use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
9use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
10use crate::tree::{ParentDeclaration, TreeCoordinate};
11use crate::utils::index::SessionIndex;
12use crate::{FipsAddress, NodeAddr, PeerIdentity};
13use secp256k1::XOnlyPublicKey;
14use std::fmt;
15use std::time::Instant;
16
17/// Connectivity state for an active peer.
18///
19/// This is simpler than the full PeerState since authentication is complete.
20#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub enum ConnectivityState {
22    /// Peer is fully connected and responsive.
23    Connected,
24    /// Peer hasn't been heard from recently (potential timeout).
25    Stale,
26    /// Connection lost, attempting to reconnect.
27    Reconnecting,
28    /// Peer has been explicitly disconnected.
29    Disconnected,
30}
31
32impl ConnectivityState {
33    /// Check if the peer is usable for sending traffic.
34    pub fn can_send(&self) -> bool {
35        matches!(
36            self,
37            ConnectivityState::Connected | ConnectivityState::Stale
38        )
39    }
40
41    /// Check if this is a terminal state requiring cleanup.
42    pub fn is_terminal(&self) -> bool {
43        matches!(self, ConnectivityState::Disconnected)
44    }
45
46    /// Check if peer is fully healthy.
47    pub fn is_healthy(&self) -> bool {
48        matches!(self, ConnectivityState::Connected)
49    }
50}
51
52impl fmt::Display for ConnectivityState {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        let s = match self {
55            ConnectivityState::Connected => "connected",
56            ConnectivityState::Stale => "stale",
57            ConnectivityState::Reconnecting => "reconnecting",
58            ConnectivityState::Disconnected => "disconnected",
59        };
60        write!(f, "{}", s)
61    }
62}
63
64/// A fully authenticated remote FIPS node.
65///
66/// Created only after successful Noise KK handshake. The identity is
67/// cryptographically verified at this point.
68///
69/// Note: ActivePeer intentionally does not implement Clone because it
70/// contains NoiseSession, which cannot be safely cloned (cloning would
71/// risk nonce reuse, a catastrophic security failure).
72#[derive(Debug)]
73pub struct ActivePeer {
74    // === Identity (Verified) ===
75    /// Cryptographic identity (verified via handshake).
76    identity: PeerIdentity,
77
78    // === Connection ===
79    /// Link used to reach this peer.
80    link_id: LinkId,
81    /// Current connectivity state.
82    connectivity: ConnectivityState,
83
84    // === Session (Wire Protocol) ===
85    /// Noise session for encryption/decryption (None if legacy peer).
86    noise_session: Option<NoiseSession>,
87    /// Our session index (they include this when sending TO us).
88    our_index: Option<SessionIndex>,
89    /// Their session index (we include this when sending TO them).
90    their_index: Option<SessionIndex>,
91    /// Transport ID for this peer's link.
92    transport_id: Option<TransportId>,
93    /// Current transport address (for roaming support).
94    current_addr: Option<TransportAddr>,
95
96    // === Spanning Tree ===
97    /// Their latest parent declaration.
98    declaration: Option<ParentDeclaration>,
99    /// Their path to root.
100    ancestry: Option<TreeCoordinate>,
101
102    // === Tree Announce Rate Limiting ===
103    /// Minimum interval between TreeAnnounce messages (milliseconds).
104    tree_announce_min_interval_ms: u64,
105    /// Last time we sent a TreeAnnounce to this peer (Unix milliseconds).
106    last_tree_announce_sent_ms: u64,
107    /// Whether a tree announce is pending (deferred due to rate limit).
108    pending_tree_announce: bool,
109
110    // === Bloom Filter ===
111    /// What's reachable through them (inbound filter).
112    inbound_filter: Option<BloomFilter>,
113    /// Their filter's sequence number.
114    filter_sequence: u64,
115    /// When we received their last filter (Unix milliseconds).
116    filter_received_at: u64,
117    /// Whether we owe them a filter update.
118    pending_filter_update: bool,
119
120    // === Timing ===
121    /// Session start time for computing session-relative timestamps.
122    /// Used as the epoch for the 4-byte inner header timestamp field.
123    session_start: Instant,
124
125    // === Statistics ===
126    /// Link statistics.
127    link_stats: LinkStats,
128    /// When this peer was authenticated (Unix milliseconds).
129    authenticated_at: u64,
130    /// When this peer was last seen (any activity, Unix milliseconds).
131    last_seen: u64,
132
133    // === Epoch (Restart Detection) ===
134    /// Remote peer's startup epoch (from handshake). Used to detect restarts.
135    remote_epoch: Option<[u8; 8]>,
136
137    // === MMP ===
138    /// Per-peer MMP state (None for legacy peers without Noise sessions).
139    mmp: Option<MmpPeerState>,
140
141    // === Heartbeat ===
142    /// When we last sent a heartbeat to this peer.
143    last_heartbeat_sent: Option<Instant>,
144
145    // === Handshake Resend ===
146    /// Wire-format msg2 for resend on duplicate msg1 (responder only).
147    /// Cleared after the handshake timeout window.
148    handshake_msg2: Option<Vec<u8>>,
149
150    // === Replay Detection Suppression ===
151    /// Number of replay detections suppressed since last session reset.
152    replay_suppressed_count: u32,
153    /// Consecutive decryption failures (reset on any successful decrypt).
154    consecutive_decrypt_failures: u32,
155
156    // === Rekey (Key Rotation) ===
157    /// When the current Noise session was established (for rekey timer).
158    session_established_at: Instant,
159    /// Current K-bit epoch value (alternates each rekey).
160    current_k_bit: bool,
161    /// Previous session kept alive during drain window after cutover.
162    previous_session: Option<NoiseSession>,
163    /// Previous session's our_index (for peers_by_index cleanup on drain expiry).
164    previous_our_index: Option<SessionIndex>,
165    /// When the drain window started (None = no drain in progress).
166    drain_started: Option<Instant>,
167    /// Pending new session from completed rekey (before K-bit cutover).
168    pending_new_session: Option<NoiseSession>,
169    /// Pending new session's our_index.
170    pending_our_index: Option<SessionIndex>,
171    /// Pending new session's their_index.
172    pending_their_index: Option<SessionIndex>,
173    /// Whether a rekey is currently in progress (handshake sent, not yet complete).
174    rekey_in_progress: bool,
175    /// When we last received a rekey msg1 from this peer (dampening).
176    last_peer_rekey: Option<Instant>,
177    /// In-progress rekey: Noise handshake state (initiator only).
178    rekey_handshake: Option<NoiseHandshakeState>,
179    /// In-progress rekey: our new session index.
180    rekey_our_index: Option<SessionIndex>,
181    /// In-progress rekey: wire-format msg1 for resend.
182    rekey_msg1: Option<Vec<u8>>,
183    /// In-progress rekey: next resend timestamp (Unix ms).
184    rekey_msg1_next_resend: u64,
185
186    // === Connected Peer UDP Socket (Unix fast path) ===
187    /// Per-peer `connect()`-ed UDP socket, opened once we have a
188    /// stable kernel `SocketAddr` for the peer (i.e. session
189    /// established + transport address known). When `Some`, the
190    /// encrypt-worker send path can `sendmsg(2)` on this fd without
191    /// per-packet `msg_name` — the kernel-side route + neighbor cache
192    /// is pinned by the `connect()` call. On the receive side, Linux
193    /// and Darwin UDP demux preferentially route inbound packets from
194    /// this peer to this socket (most-specific 5-tuple match via
195    /// `SO_REUSEPORT`), so the paired drain thread must keep it empty.
196    ///
197    /// Closed automatically on Drop. Behind an `Arc` so the
198    /// encrypt-worker's send path can hold a refcount without owning
199    /// the only handle (rekey / address-change may rotate the socket
200    /// while older jobs are still in-flight on the worker channel).
201    #[cfg(any(target_os = "linux", target_os = "macos"))]
202    connected_udp:
203        Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
204
205    /// Per-peer receive-drain thread. Always paired with
206    /// `connected_udp`: while the connected socket is installed, the
207    /// kernel UDP demux preferentially routes inbound packets from
208    /// this peer to it (via SO_REUSEPORT + 5-tuple match), so the
209    /// socket *must* be drained or packets pile up in its kernel
210    /// recv buffer. Drop signals the thread to exit via self-pipe.
211    #[cfg(any(target_os = "linux", target_os = "macos"))]
212    peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
213}
214
215impl ActivePeer {
216    /// Create a new active peer from verified identity.
217    ///
218    /// Called after successful authentication handshake.
219    /// For peers with Noise sessions, use `with_session` instead.
220    pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
221        let now = Instant::now();
222        Self {
223            identity,
224            link_id,
225            connectivity: ConnectivityState::Connected,
226            noise_session: None,
227            our_index: None,
228            their_index: None,
229            transport_id: None,
230            current_addr: None,
231            declaration: None,
232            ancestry: None,
233            tree_announce_min_interval_ms: 500,
234            last_tree_announce_sent_ms: 0,
235            pending_tree_announce: false,
236            inbound_filter: None,
237            filter_sequence: 0,
238            filter_received_at: 0,
239            pending_filter_update: true, // Send filter on new connection
240            session_start: now,
241            link_stats: LinkStats::new(),
242            authenticated_at,
243            last_seen: authenticated_at,
244            remote_epoch: None,
245            mmp: None,
246            last_heartbeat_sent: None,
247            handshake_msg2: None,
248            replay_suppressed_count: 0,
249            consecutive_decrypt_failures: 0,
250            session_established_at: now,
251            current_k_bit: false,
252            previous_session: None,
253            previous_our_index: None,
254            drain_started: None,
255            pending_new_session: None,
256            pending_our_index: None,
257            pending_their_index: None,
258            rekey_in_progress: false,
259            last_peer_rekey: None,
260            rekey_handshake: None,
261            rekey_our_index: None,
262            rekey_msg1: None,
263            rekey_msg1_next_resend: 0,
264            #[cfg(any(target_os = "linux", target_os = "macos"))]
265            connected_udp: None,
266            #[cfg(any(target_os = "linux", target_os = "macos"))]
267            peer_recv_drain: None,
268        }
269    }
270
271    /// Create from verified identity with existing link stats.
272    ///
273    /// Used when promoting from PeerConnection, preserving handshake stats.
274    /// For peers with Noise sessions, use `with_session` instead.
275    pub fn with_stats(
276        identity: PeerIdentity,
277        link_id: LinkId,
278        authenticated_at: u64,
279        link_stats: LinkStats,
280    ) -> Self {
281        let mut peer = Self::new(identity, link_id, authenticated_at);
282        peer.link_stats = link_stats;
283        peer
284    }
285
286    /// Create from verified identity with Noise session and index tracking.
287    ///
288    /// This is the primary constructor for the wire protocol path.
289    /// The NoiseSession provides encryption/decryption and replay protection.
290    #[allow(clippy::too_many_arguments)]
291    pub fn with_session(
292        identity: PeerIdentity,
293        link_id: LinkId,
294        authenticated_at: u64,
295        noise_session: NoiseSession,
296        our_index: SessionIndex,
297        their_index: SessionIndex,
298        transport_id: TransportId,
299        current_addr: TransportAddr,
300        link_stats: LinkStats,
301        is_initiator: bool,
302        mmp_config: &MmpConfig,
303        remote_epoch: Option<[u8; 8]>,
304    ) -> Self {
305        let now = Instant::now();
306        Self {
307            identity,
308            link_id,
309            connectivity: ConnectivityState::Connected,
310            noise_session: Some(noise_session),
311            our_index: Some(our_index),
312            their_index: Some(their_index),
313            transport_id: Some(transport_id),
314            current_addr: Some(current_addr),
315            declaration: None,
316            ancestry: None,
317            tree_announce_min_interval_ms: 500,
318            last_tree_announce_sent_ms: 0,
319            pending_tree_announce: false,
320            inbound_filter: None,
321            filter_sequence: 0,
322            filter_received_at: 0,
323            pending_filter_update: true,
324            session_start: now,
325            link_stats,
326            authenticated_at,
327            last_seen: authenticated_at,
328            remote_epoch,
329            mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
330            last_heartbeat_sent: None,
331            handshake_msg2: None,
332            replay_suppressed_count: 0,
333            consecutive_decrypt_failures: 0,
334            session_established_at: now,
335            current_k_bit: false,
336            previous_session: None,
337            previous_our_index: None,
338            drain_started: None,
339            pending_new_session: None,
340            pending_our_index: None,
341            pending_their_index: None,
342            rekey_in_progress: false,
343            last_peer_rekey: None,
344            rekey_handshake: None,
345            rekey_our_index: None,
346            rekey_msg1: None,
347            rekey_msg1_next_resend: 0,
348            #[cfg(any(target_os = "linux", target_os = "macos"))]
349            connected_udp: None,
350            #[cfg(any(target_os = "linux", target_os = "macos"))]
351            peer_recv_drain: None,
352        }
353    }
354
355    /// Unix UDP fast path: clone the refcount on the per-peer
356    /// `connect()`-ed UDP socket if one has been installed. Encrypt-
357    /// worker send path uses this to bypass the wildcard listen
358    /// socket's per-packet sockaddr handling.
359    #[cfg(any(target_os = "linux", target_os = "macos"))]
360    pub(crate) fn connected_udp(
361        &self,
362    ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
363        self.connected_udp.clone()
364    }
365
366    /// Install a per-peer `connect()`-ed UDP socket **with** its
367    /// paired recv drain thread. The two are owned together: the
368    /// drain thread is the only consumer of packets arriving on this
369    /// socket (Linux UDP demux preferentially routes them away from
370    /// the wildcard listen socket via SO_REUSEPORT 5-tuple match),
371    /// so installing one without the other would silently drop
372    /// inbound packets from this peer.
373    ///
374    /// Replacing an existing pair drops the old drain (its self-pipe
375    /// shutdown signal fires; thread exits within one poll
376    /// iteration) and drops the old socket Arc. Any encrypt-worker
377    /// jobs already in-flight holding the old socket Arc stay valid
378    /// until they complete, at which point the kernel fd closes.
379    #[cfg(any(target_os = "linux", target_os = "macos"))]
380    pub(crate) fn set_connected_udp(
381        &mut self,
382        socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
383        drain: crate::transport::udp::peer_drain::PeerRecvDrain,
384    ) {
385        // Drop order matters: drop the old drain BEFORE the old
386        // socket so the drain thread's last reference to the kernel
387        // fd is released cleanly.
388        self.peer_recv_drain = None;
389        self.connected_udp = None;
390        self.connected_udp = Some(socket);
391        self.peer_recv_drain = Some(drain);
392    }
393
394    /// Clear the per-peer connected UDP socket + drain thread (e.g.
395    /// on rekey or disconnect). The drain thread exits via self-pipe
396    /// signal; the kernel fd closes when the last `Arc` to the
397    /// socket drops.
398    #[cfg(any(target_os = "linux", target_os = "macos"))]
399    pub(crate) fn clear_connected_udp(&mut self) {
400        self.peer_recv_drain = None;
401        self.connected_udp = None;
402    }
403
404    // === Identity Accessors ===
405
406    /// Get the peer's verified identity.
407    pub fn identity(&self) -> &PeerIdentity {
408        &self.identity
409    }
410
411    /// Get the peer's NodeAddr.
412    pub fn node_addr(&self) -> &NodeAddr {
413        self.identity.node_addr()
414    }
415
416    /// Get the peer's FIPS address.
417    pub fn address(&self) -> &FipsAddress {
418        self.identity.address()
419    }
420
421    /// Get the peer's public key.
422    pub fn pubkey(&self) -> XOnlyPublicKey {
423        self.identity.pubkey()
424    }
425
426    /// Get the peer's npub string.
427    pub fn npub(&self) -> String {
428        self.identity.npub()
429    }
430
431    // === Connection Accessors ===
432
433    /// Get the link ID.
434    pub fn link_id(&self) -> LinkId {
435        self.link_id
436    }
437
438    /// Get the connectivity state.
439    pub fn connectivity(&self) -> ConnectivityState {
440        self.connectivity
441    }
442
443    /// Check if peer can receive traffic.
444    pub fn can_send(&self) -> bool {
445        self.connectivity.can_send()
446    }
447
448    /// Check if peer is fully healthy.
449    pub fn is_healthy(&self) -> bool {
450        self.connectivity.is_healthy()
451    }
452
453    /// Check if peer is disconnected.
454    pub fn is_disconnected(&self) -> bool {
455        self.connectivity.is_terminal()
456    }
457
458    // === Session Accessors ===
459
460    /// Check if this peer has a Noise session.
461    pub fn has_session(&self) -> bool {
462        self.noise_session.is_some()
463    }
464
465    /// Get the Noise session, if present.
466    pub fn noise_session(&self) -> Option<&NoiseSession> {
467        self.noise_session.as_ref()
468    }
469
470    /// Get mutable access to the Noise session.
471    pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
472        self.noise_session.as_mut()
473    }
474
475    /// Get our session index (they use this to send TO us).
476    pub fn our_index(&self) -> Option<SessionIndex> {
477        self.our_index
478    }
479
480    /// Get their session index (we use this to send TO them).
481    pub fn their_index(&self) -> Option<SessionIndex> {
482        self.their_index
483    }
484
485    /// Update their session index (used during cross-connection resolution
486    /// when the losing node keeps its inbound session but needs the peer's
487    /// outbound index).
488    pub fn set_their_index(&mut self, index: SessionIndex) {
489        self.their_index = Some(index);
490    }
491
492    /// Replace the Noise session and indices during cross-connection resolution.
493    ///
494    /// When both nodes simultaneously initiate, each promotes its inbound
495    /// handshake first. When the peer's msg2 arrives, we learn the correct
496    /// session — the outbound handshake that pairs with the peer's inbound.
497    /// This replaces the entire session so both nodes use matching keys.
498    ///
499    /// Returns the old our_index so the caller can update peers_by_index.
500    /// Also resets the replay suppression counter since the session changed.
501    pub fn replace_session(
502        &mut self,
503        new_session: NoiseSession,
504        new_our_index: SessionIndex,
505        new_their_index: SessionIndex,
506    ) -> Option<SessionIndex> {
507        self.reset_replay_suppressed();
508        let old_our_index = self.our_index;
509        self.noise_session = Some(new_session);
510        self.our_index = Some(new_our_index);
511        self.their_index = Some(new_their_index);
512        old_our_index
513    }
514
515    /// Get the transport ID for this peer.
516    pub fn transport_id(&self) -> Option<TransportId> {
517        self.transport_id
518    }
519
520    /// Get the current transport address.
521    pub fn current_addr(&self) -> Option<&TransportAddr> {
522        self.current_addr.as_ref()
523    }
524
525    /// Update the current address (for roaming support).
526    ///
527    /// Called when we receive a valid authenticated packet from a new address.
528    /// Short-circuits when neither the transport_id nor the TransportAddr
529    /// bytes changed — at multi-Gbps the same peer's source 4-tuple is
530    /// stable per session and the overwhelming majority of inbound
531    /// packets hit this fast path. Saves both the redundant
532    /// `Option::take` + Vec drop on the cached side and the caller's
533    /// `.clone()` allocation on the input side: the caller can pass
534    /// `&TransportAddr` and we only `.to_owned()` when storing.
535    ///
536    /// Returns `true` iff the stored `(transport_id, current_addr)` pair
537    /// actually changed. The caller uses this signal to invalidate
538    /// derived caches whose validity is bound to the peer's 5-tuple —
539    /// in particular the Linux per-peer `connect()`-ed UDP socket,
540    /// which is pinned to one kernel route + neighbour entry and goes
541    /// stale the moment the peer roams. (Clearing it here would force
542    /// `&mut self` users into the wrong shape: the policy of when to
543    /// rebuild the connected socket lives on `Node`, not on the peer
544    /// state. Returning a bool keeps that policy where it belongs.)
545    pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
546        if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
547            return false;
548        }
549        self.transport_id = Some(transport_id);
550        self.current_addr = Some(addr.clone());
551        true
552    }
553
554    // === Handshake Resend ===
555
556    /// Store wire-format msg2 for resend on duplicate msg1.
557    pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
558        self.handshake_msg2 = Some(msg2);
559    }
560
561    /// Get stored msg2 bytes for resend.
562    pub fn handshake_msg2(&self) -> Option<&[u8]> {
563        self.handshake_msg2.as_deref()
564    }
565
566    /// Clear stored msg2 (no longer needed after handshake window).
567    pub fn clear_handshake_msg2(&mut self) {
568        self.handshake_msg2 = None;
569    }
570
571    // === Replay Detection Suppression ===
572
573    /// Increment replay suppression counter. Returns the new count.
574    pub fn increment_replay_suppressed(&mut self) -> u32 {
575        self.replay_suppressed_count += 1;
576        self.replay_suppressed_count
577    }
578
579    /// Reset replay suppression counter, returning previous count.
580    pub fn reset_replay_suppressed(&mut self) -> u32 {
581        let count = self.replay_suppressed_count;
582        self.replay_suppressed_count = 0;
583        count
584    }
585
586    /// Current replay suppression count.
587    pub fn replay_suppressed_count(&self) -> u32 {
588        self.replay_suppressed_count
589    }
590
591    // === Decryption Failure Tracking ===
592
593    /// Increment consecutive decryption failure counter, returning new count.
594    pub fn increment_decrypt_failures(&mut self) -> u32 {
595        self.consecutive_decrypt_failures += 1;
596        self.consecutive_decrypt_failures
597    }
598
599    /// Reset consecutive decryption failure counter.
600    pub fn reset_decrypt_failures(&mut self) {
601        self.consecutive_decrypt_failures = 0;
602    }
603
604    /// Current consecutive decryption failure count.
605    pub fn consecutive_decrypt_failures(&self) -> u32 {
606        self.consecutive_decrypt_failures
607    }
608
609    // === Epoch Accessors ===
610
611    /// Get the remote peer's startup epoch (from handshake).
612    pub fn remote_epoch(&self) -> Option<[u8; 8]> {
613        self.remote_epoch
614    }
615
616    // === Tree Accessors ===
617
618    /// Get the peer's tree coordinates, if known.
619    pub fn coords(&self) -> Option<&TreeCoordinate> {
620        self.ancestry.as_ref()
621    }
622
623    /// Get the peer's parent declaration, if known.
624    pub fn declaration(&self) -> Option<&ParentDeclaration> {
625        self.declaration.as_ref()
626    }
627
628    /// Check if this peer has a known tree position.
629    pub fn has_tree_position(&self) -> bool {
630        self.declaration.is_some() && self.ancestry.is_some()
631    }
632
633    // === Filter Accessors ===
634
635    /// Get the peer's inbound filter, if known.
636    pub fn inbound_filter(&self) -> Option<&BloomFilter> {
637        self.inbound_filter.as_ref()
638    }
639
640    /// Get the filter sequence number.
641    pub fn filter_sequence(&self) -> u64 {
642        self.filter_sequence
643    }
644
645    /// Check if this peer's filter is stale.
646    pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
647        if self.filter_received_at == 0 {
648            return true;
649        }
650        current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
651    }
652
653    /// Check if a destination might be reachable through this peer.
654    pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
655        match &self.inbound_filter {
656            Some(filter) => filter.contains(node_addr),
657            None => false,
658        }
659    }
660
661    /// Check if we need to send this peer a filter update.
662    pub fn needs_filter_update(&self) -> bool {
663        self.pending_filter_update
664    }
665
666    // === Statistics Accessors ===
667
668    /// Get link statistics.
669    pub fn link_stats(&self) -> &LinkStats {
670        &self.link_stats
671    }
672
673    /// Get mutable link statistics.
674    pub fn link_stats_mut(&mut self) -> &mut LinkStats {
675        &mut self.link_stats
676    }
677
678    // === MMP Accessors ===
679
680    /// Get MMP state (None for legacy peers without sessions).
681    pub fn mmp(&self) -> Option<&MmpPeerState> {
682        self.mmp.as_ref()
683    }
684
685    /// Get mutable MMP state.
686    pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
687        self.mmp.as_mut()
688    }
689
690    /// Link cost for routing decisions.
691    ///
692    /// Returns a scalar cost where lower is better (1.0 = ideal).
693    /// Computed as RTT-weighted ETX: `etx * (1.0 + srtt_ms / 100.0)`.
694    ///
695    /// Returns 1.0 (optimistic default) when MMP metrics are not yet
696    /// available, matching depth-only parent selection behavior.
697    pub fn link_cost(&self) -> f64 {
698        match self.mmp() {
699            Some(mmp) => {
700                let etx = mmp.metrics.etx;
701                match mmp.metrics.srtt_ms() {
702                    Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
703                    None => 1.0,
704                }
705            }
706            None => 1.0,
707        }
708    }
709
710    /// Whether this peer has at least one MMP RTT measurement.
711    pub fn has_srtt(&self) -> bool {
712        self.mmp()
713            .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
714    }
715
716    /// When this peer was authenticated.
717    pub fn authenticated_at(&self) -> u64 {
718        self.authenticated_at
719    }
720
721    /// When this peer was last seen.
722    pub fn last_seen(&self) -> u64 {
723        self.last_seen
724    }
725
726    /// Time since last activity.
727    pub fn idle_time(&self, current_time_ms: u64) -> u64 {
728        current_time_ms.saturating_sub(self.last_seen)
729    }
730
731    /// Connection duration since authentication.
732    pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
733        current_time_ms.saturating_sub(self.authenticated_at)
734    }
735
736    /// Session-relative elapsed time in milliseconds (for inner header timestamp).
737    ///
738    /// Returns milliseconds since session establishment, truncated to u32.
739    /// Wraps at ~49.7 days which is acceptable for session-relative timing.
740    pub fn session_elapsed_ms(&self) -> u32 {
741        self.session_start.elapsed().as_millis() as u32
742    }
743
744    /// When this peer's session started (for link-dead fallback timing).
745    pub fn session_start(&self) -> Instant {
746        self.session_start
747    }
748
749    // === Heartbeat ===
750
751    /// When we last sent a heartbeat to this peer.
752    pub fn last_heartbeat_sent(&self) -> Option<Instant> {
753        self.last_heartbeat_sent
754    }
755
756    /// Record that we sent a heartbeat.
757    pub fn mark_heartbeat_sent(&mut self, now: Instant) {
758        self.last_heartbeat_sent = Some(now);
759    }
760
761    // === State Updates ===
762
763    /// Update last seen timestamp.
764    pub fn touch(&mut self, current_time_ms: u64) {
765        self.last_seen = current_time_ms;
766        // If we were stale, receiving traffic makes us connected again
767        if self.connectivity == ConnectivityState::Stale {
768            self.connectivity = ConnectivityState::Connected;
769        }
770    }
771
772    /// Mark peer as stale (no recent traffic).
773    pub fn mark_stale(&mut self) {
774        if self.connectivity == ConnectivityState::Connected {
775            self.connectivity = ConnectivityState::Stale;
776        }
777    }
778
779    /// Mark peer as reconnecting.
780    pub fn mark_reconnecting(&mut self) {
781        self.connectivity = ConnectivityState::Reconnecting;
782    }
783
784    /// Mark peer as disconnected.
785    pub fn mark_disconnected(&mut self) {
786        self.connectivity = ConnectivityState::Disconnected;
787    }
788
789    /// Mark peer as connected (e.g., after successful reconnect).
790    pub fn mark_connected(&mut self, current_time_ms: u64) {
791        self.connectivity = ConnectivityState::Connected;
792        self.last_seen = current_time_ms;
793    }
794
795    /// Update the link ID (e.g., on reconnect).
796    pub fn set_link_id(&mut self, link_id: LinkId) {
797        self.link_id = link_id;
798    }
799
800    // === Tree Updates ===
801
802    /// Update peer's tree position.
803    pub fn update_tree_position(
804        &mut self,
805        declaration: ParentDeclaration,
806        ancestry: TreeCoordinate,
807        current_time_ms: u64,
808    ) {
809        self.declaration = Some(declaration);
810        self.ancestry = Some(ancestry);
811        self.last_seen = current_time_ms;
812    }
813
814    /// Clear peer's tree position.
815    pub fn clear_tree_position(&mut self) {
816        self.declaration = None;
817        self.ancestry = None;
818    }
819
820    // === Tree Announce Rate Limiting ===
821
822    /// Set the minimum interval between TreeAnnounce messages (milliseconds).
823    pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
824        self.tree_announce_min_interval_ms = ms;
825    }
826
827    /// Get the last tree announce send timestamp (for carrying across reconnection).
828    pub fn last_tree_announce_sent_ms(&self) -> u64 {
829        self.last_tree_announce_sent_ms
830    }
831
832    /// Set the last tree announce send timestamp (to preserve rate limit across reconnection).
833    pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
834        self.last_tree_announce_sent_ms = ms;
835    }
836
837    /// Check if we can send a TreeAnnounce now (rate limiting).
838    pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
839        now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
840    }
841
842    /// Record that we sent a TreeAnnounce to this peer.
843    pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
844        self.last_tree_announce_sent_ms = now_ms;
845        self.pending_tree_announce = false;
846    }
847
848    /// Mark that a tree announce is pending (deferred due to rate limit).
849    pub fn mark_tree_announce_pending(&mut self) {
850        self.pending_tree_announce = true;
851    }
852
853    /// Check if a deferred tree announce is waiting to be sent.
854    pub fn has_pending_tree_announce(&self) -> bool {
855        self.pending_tree_announce
856    }
857
858    // === Filter Updates ===
859
860    /// Update peer's inbound filter.
861    pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
862        self.inbound_filter = Some(filter);
863        self.filter_sequence = sequence;
864        self.filter_received_at = current_time_ms;
865        self.last_seen = current_time_ms;
866    }
867
868    /// Clear peer's inbound filter.
869    pub fn clear_filter(&mut self) {
870        self.inbound_filter = None;
871        self.filter_sequence = 0;
872        self.filter_received_at = 0;
873    }
874
875    /// Mark that we need to send this peer a filter update.
876    pub fn mark_filter_update_needed(&mut self) {
877        self.pending_filter_update = true;
878    }
879
880    /// Clear the pending filter update flag.
881    pub fn clear_filter_update_needed(&mut self) {
882        self.pending_filter_update = false;
883    }
884
885    // === Rekey (Key Rotation) ===
886
887    /// When the current Noise session was established.
888    pub fn session_established_at(&self) -> Instant {
889        self.session_established_at
890    }
891
892    /// Current K-bit epoch value.
893    pub fn current_k_bit(&self) -> bool {
894        self.current_k_bit
895    }
896
897    /// Whether a rekey is currently in progress.
898    pub fn rekey_in_progress(&self) -> bool {
899        self.rekey_in_progress
900    }
901
902    /// Mark that a rekey has been initiated.
903    pub fn set_rekey_in_progress(&mut self) {
904        self.rekey_in_progress = true;
905    }
906
907    /// Check if rekey initiation is dampened (peer recently sent us msg1).
908    pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
909        match self.last_peer_rekey {
910            Some(t) => t.elapsed().as_secs() < dampening_secs,
911            None => false,
912        }
913    }
914
915    /// Record that the peer initiated a rekey (for dampening).
916    pub fn record_peer_rekey(&mut self) {
917        self.last_peer_rekey = Some(Instant::now());
918    }
919
920    /// Get the pending new session's our_index.
921    pub fn pending_our_index(&self) -> Option<SessionIndex> {
922        self.pending_our_index
923    }
924
925    /// Get the pending new session's their_index.
926    pub fn pending_their_index(&self) -> Option<SessionIndex> {
927        self.pending_their_index
928    }
929
930    /// Get the previous session's our_index (during drain).
931    pub fn previous_our_index(&self) -> Option<SessionIndex> {
932        self.previous_our_index
933    }
934
935    /// Get the previous session for decryption fallback.
936    pub fn previous_session(&self) -> Option<&NoiseSession> {
937        self.previous_session.as_ref()
938    }
939
940    /// Get mutable access to the previous session for decryption.
941    pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
942        self.previous_session.as_mut()
943    }
944
945    /// Get the pending new session (completed rekey, not yet cut over).
946    pub fn pending_new_session(&self) -> Option<&NoiseSession> {
947        self.pending_new_session.as_ref()
948    }
949
950    /// Store a completed rekey session and its indices.
951    ///
952    /// Called when the rekey handshake completes. The session is held
953    /// as pending until the initiator flips the K-bit on the next outbound packet.
954    pub fn set_pending_session(
955        &mut self,
956        session: NoiseSession,
957        our_index: SessionIndex,
958        their_index: SessionIndex,
959    ) {
960        self.pending_new_session = Some(session);
961        self.pending_our_index = Some(our_index);
962        self.pending_their_index = Some(their_index);
963        self.rekey_in_progress = false;
964        // Clear initiator handshake state (index now lives in pending_our_index)
965        self.rekey_our_index = None;
966        self.rekey_handshake = None;
967        self.rekey_msg1 = None;
968        self.rekey_msg1_next_resend = 0;
969    }
970
971    /// Cut over to the pending new session (initiator side).
972    ///
973    /// Moves current session to previous (for drain), promotes pending to current,
974    /// flips the K-bit. Returns the old our_index that should remain in peers_by_index
975    /// during the drain window.
976    pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
977        let new_session = self.pending_new_session.take()?;
978        let new_our_index = self.pending_our_index.take();
979        let new_their_index = self.pending_their_index.take();
980
981        // Demote current to previous
982        self.previous_session = self.noise_session.take();
983        self.previous_our_index = self.our_index;
984        self.drain_started = Some(Instant::now());
985
986        // Promote pending to current
987        self.noise_session = Some(new_session);
988        self.our_index = new_our_index;
989        self.their_index = new_their_index;
990
991        // Flip K-bit and reset timing
992        self.current_k_bit = !self.current_k_bit;
993        self.session_established_at = Instant::now();
994        self.session_start = Instant::now();
995        self.rekey_in_progress = false;
996        self.reset_replay_suppressed();
997
998        // Reset MMP counters to avoid metric discontinuity
999        let now = Instant::now();
1000        if let Some(mmp) = &mut self.mmp {
1001            mmp.reset_for_rekey(now);
1002        }
1003
1004        self.previous_our_index
1005    }
1006
1007    /// Handle receiving a K-bit flip from the peer (responder side).
1008    ///
1009    /// Promotes pending_new_session to current, demotes current to previous.
1010    /// Returns the old our_index for drain tracking.
1011    pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1012        let new_session = self.pending_new_session.take()?;
1013        let new_our_index = self.pending_our_index.take();
1014        let new_their_index = self.pending_their_index.take();
1015
1016        // Demote current to previous
1017        self.previous_session = self.noise_session.take();
1018        self.previous_our_index = self.our_index;
1019        self.drain_started = Some(Instant::now());
1020
1021        // Promote pending to current
1022        self.noise_session = Some(new_session);
1023        self.our_index = new_our_index;
1024        self.their_index = new_their_index;
1025
1026        // Match peer's K-bit
1027        self.current_k_bit = !self.current_k_bit;
1028        self.session_established_at = Instant::now();
1029        self.session_start = Instant::now();
1030        self.rekey_in_progress = false;
1031        self.reset_replay_suppressed();
1032
1033        // Reset MMP counters to avoid metric discontinuity
1034        let now = Instant::now();
1035        if let Some(mmp) = &mut self.mmp {
1036            mmp.reset_for_rekey(now);
1037        }
1038
1039        self.previous_our_index
1040    }
1041
1042    /// Check if the drain window has expired.
1043    pub fn drain_expired(&self, drain_secs: u64) -> bool {
1044        match self.drain_started {
1045            Some(t) => t.elapsed().as_secs() >= drain_secs,
1046            None => false,
1047        }
1048    }
1049
1050    /// Whether a drain is in progress.
1051    pub fn is_draining(&self) -> bool {
1052        self.drain_started.is_some()
1053    }
1054
1055    /// Complete the drain: drop previous session and free its index.
1056    ///
1057    /// Returns the previous our_index so the caller can remove it from
1058    /// peers_by_index and free it from the IndexAllocator.
1059    pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1060        self.previous_session = None;
1061        self.drain_started = None;
1062        self.previous_our_index.take()
1063    }
1064
1065    /// Abandon an in-progress rekey.
1066    ///
1067    /// Returns the rekey our_index so the caller can free it.
1068    /// Also clears any pending session state if the handshake was completed
1069    /// but not yet cut over.
1070    pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1071        self.rekey_handshake = None;
1072        self.rekey_msg1 = None;
1073        self.rekey_msg1_next_resend = 0;
1074        self.rekey_in_progress = false;
1075        // Return whichever index needs freeing
1076        self.rekey_our_index.take().or_else(|| {
1077            self.pending_new_session = None;
1078            self.pending_their_index = None;
1079            self.pending_our_index.take()
1080        })
1081    }
1082
1083    // === Rekey Handshake State (Initiator) ===
1084
1085    /// Store rekey handshake state after sending msg1.
1086    pub fn set_rekey_state(
1087        &mut self,
1088        handshake: NoiseHandshakeState,
1089        our_index: SessionIndex,
1090        wire_msg1: Vec<u8>,
1091        next_resend_ms: u64,
1092    ) {
1093        self.rekey_handshake = Some(handshake);
1094        self.rekey_our_index = Some(our_index);
1095        self.rekey_msg1 = Some(wire_msg1);
1096        self.rekey_msg1_next_resend = next_resend_ms;
1097        self.rekey_in_progress = true;
1098    }
1099
1100    /// Get the rekey our_index (for msg2 dispatch lookup).
1101    pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1102        self.rekey_our_index
1103    }
1104
1105    /// Complete the rekey by processing msg2 (initiator side).
1106    ///
1107    /// Takes the stored handshake state, reads msg2, and returns the
1108    /// completed NoiseSession. Clears the handshake-related fields but
1109    /// leaves rekey_our_index for set_pending_session to use.
1110    pub fn complete_rekey_msg2(&mut self, msg2_bytes: &[u8]) -> Result<NoiseSession, NoiseError> {
1111        let mut hs = self
1112            .rekey_handshake
1113            .take()
1114            .ok_or_else(|| NoiseError::WrongState {
1115                expected: "rekey handshake in progress".to_string(),
1116                got: "no handshake state".to_string(),
1117            })?;
1118
1119        hs.read_message_2(msg2_bytes)?;
1120        let session = hs.into_session()?;
1121
1122        // Clear msg1 resend state
1123        self.rekey_msg1 = None;
1124        self.rekey_msg1_next_resend = 0;
1125
1126        Ok(session)
1127    }
1128
1129    /// Check if msg1 needs resending.
1130    pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1131        self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1132    }
1133
1134    /// Get msg1 bytes for resend (without consuming).
1135    pub fn rekey_msg1(&self) -> Option<&[u8]> {
1136        self.rekey_msg1.as_deref()
1137    }
1138
1139    /// Update next resend timestamp.
1140    pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1141        self.rekey_msg1_next_resend = next_ms;
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::Identity;
1149
1150    fn make_peer_identity() -> PeerIdentity {
1151        let identity = Identity::generate();
1152        PeerIdentity::from_pubkey(identity.pubkey())
1153    }
1154
1155    fn make_node_addr(val: u8) -> NodeAddr {
1156        let mut bytes = [0u8; 16];
1157        bytes[0] = val;
1158        NodeAddr::from_bytes(bytes)
1159    }
1160
1161    fn make_coords(ids: &[u8]) -> TreeCoordinate {
1162        TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1163    }
1164
1165    #[test]
1166    fn test_connectivity_state_properties() {
1167        assert!(ConnectivityState::Connected.can_send());
1168        assert!(ConnectivityState::Stale.can_send());
1169        assert!(!ConnectivityState::Reconnecting.can_send());
1170        assert!(!ConnectivityState::Disconnected.can_send());
1171
1172        assert!(ConnectivityState::Connected.is_healthy());
1173        assert!(!ConnectivityState::Stale.is_healthy());
1174
1175        assert!(ConnectivityState::Disconnected.is_terminal());
1176        assert!(!ConnectivityState::Connected.is_terminal());
1177    }
1178
1179    #[test]
1180    fn test_active_peer_creation() {
1181        let identity = make_peer_identity();
1182        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1183
1184        assert_eq!(peer.identity().node_addr(), identity.node_addr());
1185        assert_eq!(peer.link_id(), LinkId::new(1));
1186        assert!(peer.is_healthy());
1187        assert!(peer.can_send());
1188        assert_eq!(peer.authenticated_at(), 1000);
1189        assert!(peer.needs_filter_update()); // New peers need filter
1190    }
1191
1192    #[test]
1193    fn test_connectivity_transitions() {
1194        let identity = make_peer_identity();
1195        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1196
1197        assert!(peer.is_healthy());
1198
1199        peer.mark_stale();
1200        assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1201        assert!(peer.can_send()); // Stale can still send
1202
1203        // Traffic received brings back to connected
1204        peer.touch(2000);
1205        assert!(peer.is_healthy());
1206
1207        peer.mark_reconnecting();
1208        assert!(!peer.can_send());
1209
1210        peer.mark_connected(3000);
1211        assert!(peer.is_healthy());
1212
1213        peer.mark_disconnected();
1214        assert!(peer.is_disconnected());
1215        assert!(!peer.can_send());
1216    }
1217
1218    #[test]
1219    fn test_tree_position() {
1220        let identity = make_peer_identity();
1221        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1222
1223        assert!(!peer.has_tree_position());
1224        assert!(peer.coords().is_none());
1225
1226        let node = make_node_addr(1);
1227        let parent = make_node_addr(2);
1228        let decl = ParentDeclaration::new(node, parent, 1, 1000);
1229        let coords = make_coords(&[1, 2, 0]);
1230
1231        peer.update_tree_position(decl, coords, 2000);
1232
1233        assert!(peer.has_tree_position());
1234        assert!(peer.coords().is_some());
1235        assert_eq!(peer.last_seen(), 2000);
1236    }
1237
1238    #[test]
1239    fn test_bloom_filter() {
1240        let identity = make_peer_identity();
1241        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1242        let target = make_node_addr(42);
1243
1244        assert!(!peer.may_reach(&target));
1245        assert!(peer.filter_is_stale(2000, 500));
1246
1247        let mut filter = BloomFilter::new();
1248        filter.insert(&target);
1249        peer.update_filter(filter, 1, 1500);
1250
1251        assert!(peer.may_reach(&target));
1252        assert!(!peer.filter_is_stale(1800, 500));
1253        assert!(peer.filter_is_stale(2500, 500));
1254    }
1255
1256    #[test]
1257    fn test_timing() {
1258        let identity = make_peer_identity();
1259        let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1260
1261        assert_eq!(peer.connection_duration(2000), 1000);
1262        assert_eq!(peer.idle_time(2000), 1000);
1263    }
1264
1265    #[test]
1266    fn test_filter_update_flag() {
1267        let identity = make_peer_identity();
1268        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1269
1270        assert!(peer.needs_filter_update()); // New peer
1271
1272        peer.clear_filter_update_needed();
1273        assert!(!peer.needs_filter_update());
1274
1275        peer.mark_filter_update_needed();
1276        assert!(peer.needs_filter_update());
1277    }
1278
1279    #[test]
1280    fn test_with_stats() {
1281        let identity = make_peer_identity();
1282        let mut stats = LinkStats::new();
1283        stats.record_sent(100);
1284        stats.record_recv(200, 500);
1285
1286        let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1287
1288        assert_eq!(peer.link_stats().packets_sent, 1);
1289        assert_eq!(peer.link_stats().packets_recv, 1);
1290    }
1291
1292    #[test]
1293    fn test_replay_suppression_counter() {
1294        let identity = make_peer_identity();
1295        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1296
1297        // Initial count is zero
1298        assert_eq!(peer.replay_suppressed_count(), 0);
1299
1300        // Increment returns new count
1301        assert_eq!(peer.increment_replay_suppressed(), 1);
1302        assert_eq!(peer.increment_replay_suppressed(), 2);
1303        assert_eq!(peer.increment_replay_suppressed(), 3);
1304        assert_eq!(peer.replay_suppressed_count(), 3);
1305
1306        // Reset returns previous count and zeroes it
1307        assert_eq!(peer.reset_replay_suppressed(), 3);
1308        assert_eq!(peer.replay_suppressed_count(), 0);
1309
1310        // Can increment again after reset
1311        assert_eq!(peer.increment_replay_suppressed(), 1);
1312        assert_eq!(peer.replay_suppressed_count(), 1);
1313
1314        // Reset when zero returns zero
1315        peer.reset_replay_suppressed();
1316        assert_eq!(peer.reset_replay_suppressed(), 0);
1317    }
1318
1319    #[test]
1320    fn test_increment_decrypt_failures_monotonic() {
1321        let identity = make_peer_identity();
1322        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1323
1324        // Initial count is zero
1325        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1326
1327        // Each call returns a strictly increasing count
1328        let mut prev = 0u32;
1329        for expected in 1..=25u32 {
1330            let count = peer.increment_decrypt_failures();
1331            assert_eq!(count, expected, "increment must return monotonic count");
1332            assert!(count > prev, "count must strictly increase");
1333            assert_eq!(peer.consecutive_decrypt_failures(), count);
1334            prev = count;
1335        }
1336    }
1337
1338    #[test]
1339    fn test_reset_decrypt_failures_zeroes_counter() {
1340        let identity = make_peer_identity();
1341        let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1342
1343        // Drive counter up
1344        for _ in 0..7 {
1345            peer.increment_decrypt_failures();
1346        }
1347        assert_eq!(peer.consecutive_decrypt_failures(), 7);
1348
1349        // Reset zeroes it
1350        peer.reset_decrypt_failures();
1351        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1352
1353        // Reset on zero is a no-op (still zero, no panic)
1354        peer.reset_decrypt_failures();
1355        assert_eq!(peer.consecutive_decrypt_failures(), 0);
1356
1357        // Counter resumes at 1 after reset
1358        assert_eq!(peer.increment_decrypt_failures(), 1);
1359        assert_eq!(peer.consecutive_decrypt_failures(), 1);
1360    }
1361}