1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20 rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28 Connected,
30 Stale,
32 Reconnecting,
34 Disconnected,
36}
37
38impl ConnectivityState {
39 pub fn can_send(&self) -> bool {
41 matches!(
42 self,
43 ConnectivityState::Connected | ConnectivityState::Stale
44 )
45 }
46
47 pub fn is_terminal(&self) -> bool {
49 matches!(self, ConnectivityState::Disconnected)
50 }
51
52 pub fn is_healthy(&self) -> bool {
54 matches!(self, ConnectivityState::Connected)
55 }
56}
57
58impl fmt::Display for ConnectivityState {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 let s = match self {
61 ConnectivityState::Connected => "connected",
62 ConnectivityState::Stale => "stale",
63 ConnectivityState::Reconnecting => "reconnecting",
64 ConnectivityState::Disconnected => "disconnected",
65 };
66 write!(f, "{}", s)
67 }
68}
69
70#[derive(Debug)]
79pub struct ActivePeer {
80 identity: PeerIdentity,
83
84 link_id: LinkId,
87 connectivity: ConnectivityState,
89
90 noise_session: Option<NoiseSession>,
93 our_index: Option<SessionIndex>,
95 their_index: Option<SessionIndex>,
97 transport_id: Option<TransportId>,
99 current_addr: Option<TransportAddr>,
101
102 declaration: Option<ParentDeclaration>,
105 ancestry: Option<TreeCoordinate>,
107
108 tree_announce_min_interval_ms: u64,
111 last_tree_announce_sent_ms: u64,
113 pending_tree_announce: bool,
115
116 inbound_filter: Option<BloomFilter>,
119 filter_sequence: u64,
121 filter_received_at: u64,
123 pending_filter_update: bool,
125
126 session_start: Instant,
130
131 link_stats: LinkStats,
134 authenticated_at: u64,
136 last_seen: u64,
138
139 remote_epoch: Option<[u8; 8]>,
142
143 mmp: Option<MmpPeerState>,
146
147 last_heartbeat_sent: Option<Instant>,
150
151 handshake_msg2: Option<Vec<u8>>,
155
156 replay_suppressed_count: u32,
159 consecutive_decrypt_failures: u32,
161
162 session_established_at: Instant,
165 rekey_jitter_secs: i64,
167 current_k_bit: bool,
169 previous_session: Option<NoiseSession>,
171 previous_our_index: Option<SessionIndex>,
173 drain_started: Option<Instant>,
175 pending_new_session: Option<NoiseSession>,
177 pending_our_index: Option<SessionIndex>,
179 pending_their_index: Option<SessionIndex>,
181 pending_rekey_initiator: bool,
183 pending_rekey_completed_at: Option<Instant>,
185 rekey_in_progress: bool,
187 last_peer_rekey: Option<Instant>,
189 rekey_handshake: Option<NoiseHandshakeState>,
191 rekey_our_index: Option<SessionIndex>,
193 rekey_msg1: Option<Vec<u8>>,
195 rekey_msg1_next_resend: u64,
197
198 #[cfg(any(target_os = "linux", target_os = "macos"))]
214 connected_udp:
215 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
216
217 #[cfg(any(target_os = "linux", target_os = "macos"))]
224 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
225}
226
227impl ActivePeer {
228 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
233 let now = Instant::now();
234 Self {
235 identity,
236 link_id,
237 connectivity: ConnectivityState::Connected,
238 noise_session: None,
239 our_index: None,
240 their_index: None,
241 transport_id: None,
242 current_addr: None,
243 declaration: None,
244 ancestry: None,
245 tree_announce_min_interval_ms: 500,
246 last_tree_announce_sent_ms: 0,
247 pending_tree_announce: false,
248 inbound_filter: None,
249 filter_sequence: 0,
250 filter_received_at: 0,
251 pending_filter_update: true, session_start: now,
253 link_stats: LinkStats::new(),
254 authenticated_at,
255 last_seen: authenticated_at,
256 remote_epoch: None,
257 mmp: None,
258 last_heartbeat_sent: None,
259 handshake_msg2: None,
260 replay_suppressed_count: 0,
261 consecutive_decrypt_failures: 0,
262 session_established_at: now,
263 rekey_jitter_secs: draw_rekey_jitter(),
264 current_k_bit: false,
265 previous_session: None,
266 previous_our_index: None,
267 drain_started: None,
268 pending_new_session: None,
269 pending_our_index: None,
270 pending_their_index: None,
271 pending_rekey_initiator: false,
272 pending_rekey_completed_at: None,
273 rekey_in_progress: false,
274 last_peer_rekey: None,
275 rekey_handshake: None,
276 rekey_our_index: None,
277 rekey_msg1: None,
278 rekey_msg1_next_resend: 0,
279 #[cfg(any(target_os = "linux", target_os = "macos"))]
280 connected_udp: None,
281 #[cfg(any(target_os = "linux", target_os = "macos"))]
282 peer_recv_drain: None,
283 }
284 }
285
286 pub fn with_stats(
291 identity: PeerIdentity,
292 link_id: LinkId,
293 authenticated_at: u64,
294 link_stats: LinkStats,
295 ) -> Self {
296 let mut peer = Self::new(identity, link_id, authenticated_at);
297 peer.link_stats = link_stats;
298 peer
299 }
300
301 #[allow(clippy::too_many_arguments)]
306 pub fn with_session(
307 identity: PeerIdentity,
308 link_id: LinkId,
309 authenticated_at: u64,
310 noise_session: NoiseSession,
311 our_index: SessionIndex,
312 their_index: SessionIndex,
313 transport_id: TransportId,
314 current_addr: TransportAddr,
315 link_stats: LinkStats,
316 is_initiator: bool,
317 mmp_config: &MmpConfig,
318 remote_epoch: Option<[u8; 8]>,
319 ) -> Self {
320 let now = Instant::now();
321 Self {
322 identity,
323 link_id,
324 connectivity: ConnectivityState::Connected,
325 noise_session: Some(noise_session),
326 our_index: Some(our_index),
327 their_index: Some(their_index),
328 transport_id: Some(transport_id),
329 current_addr: Some(current_addr),
330 declaration: None,
331 ancestry: None,
332 tree_announce_min_interval_ms: 500,
333 last_tree_announce_sent_ms: 0,
334 pending_tree_announce: false,
335 inbound_filter: None,
336 filter_sequence: 0,
337 filter_received_at: 0,
338 pending_filter_update: true,
339 session_start: now,
340 link_stats,
341 authenticated_at,
342 last_seen: authenticated_at,
343 remote_epoch,
344 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
345 last_heartbeat_sent: None,
346 handshake_msg2: None,
347 replay_suppressed_count: 0,
348 consecutive_decrypt_failures: 0,
349 session_established_at: now,
350 rekey_jitter_secs: draw_rekey_jitter(),
351 current_k_bit: false,
352 previous_session: None,
353 previous_our_index: None,
354 drain_started: None,
355 pending_new_session: None,
356 pending_our_index: None,
357 pending_their_index: None,
358 pending_rekey_initiator: false,
359 pending_rekey_completed_at: None,
360 rekey_in_progress: false,
361 last_peer_rekey: None,
362 rekey_handshake: None,
363 rekey_our_index: None,
364 rekey_msg1: None,
365 rekey_msg1_next_resend: 0,
366 #[cfg(any(target_os = "linux", target_os = "macos"))]
367 connected_udp: None,
368 #[cfg(any(target_os = "linux", target_os = "macos"))]
369 peer_recv_drain: None,
370 }
371 }
372
373 #[cfg(any(target_os = "linux", target_os = "macos"))]
378 pub(crate) fn connected_udp(
379 &self,
380 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
381 self.connected_udp.clone()
382 }
383
384 #[cfg(any(target_os = "linux", target_os = "macos"))]
398 pub(crate) fn set_connected_udp(
399 &mut self,
400 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
401 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
402 ) {
403 self.peer_recv_drain = None;
407 self.connected_udp = None;
408 self.connected_udp = Some(socket);
409 self.peer_recv_drain = Some(drain);
410 }
411
412 #[cfg(any(target_os = "linux", target_os = "macos"))]
417 pub(crate) fn clear_connected_udp(&mut self) {
418 self.peer_recv_drain = None;
419 self.connected_udp = None;
420 }
421
422 pub fn identity(&self) -> &PeerIdentity {
426 &self.identity
427 }
428
429 pub fn node_addr(&self) -> &NodeAddr {
431 self.identity.node_addr()
432 }
433
434 pub fn address(&self) -> &FipsAddress {
436 self.identity.address()
437 }
438
439 pub fn pubkey(&self) -> XOnlyPublicKey {
441 self.identity.pubkey()
442 }
443
444 pub fn npub(&self) -> String {
446 self.identity.npub()
447 }
448
449 pub fn link_id(&self) -> LinkId {
453 self.link_id
454 }
455
456 pub fn connectivity(&self) -> ConnectivityState {
458 self.connectivity
459 }
460
461 pub fn can_send(&self) -> bool {
463 self.connectivity.can_send()
464 }
465
466 pub fn is_healthy(&self) -> bool {
468 self.connectivity.is_healthy()
469 }
470
471 pub fn is_disconnected(&self) -> bool {
473 self.connectivity.is_terminal()
474 }
475
476 pub fn has_session(&self) -> bool {
480 self.noise_session.is_some()
481 }
482
483 pub fn noise_session(&self) -> Option<&NoiseSession> {
485 self.noise_session.as_ref()
486 }
487
488 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
490 self.noise_session.as_mut()
491 }
492
493 pub fn our_index(&self) -> Option<SessionIndex> {
495 self.our_index
496 }
497
498 pub fn their_index(&self) -> Option<SessionIndex> {
500 self.their_index
501 }
502
503 pub fn set_their_index(&mut self, index: SessionIndex) {
507 self.their_index = Some(index);
508 }
509
510 pub fn replace_session(
520 &mut self,
521 new_session: NoiseSession,
522 new_our_index: SessionIndex,
523 new_their_index: SessionIndex,
524 ) -> Option<SessionIndex> {
525 self.reset_replay_suppressed();
526 let old_our_index = self.our_index;
527 self.noise_session = Some(new_session);
528 self.our_index = Some(new_our_index);
529 self.their_index = Some(new_their_index);
530 old_our_index
531 }
532
533 pub fn transport_id(&self) -> Option<TransportId> {
535 self.transport_id
536 }
537
538 pub fn current_addr(&self) -> Option<&TransportAddr> {
540 self.current_addr.as_ref()
541 }
542
543 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
564 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
565 return false;
566 }
567 self.transport_id = Some(transport_id);
568 self.current_addr = Some(addr.clone());
569 true
570 }
571
572 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
576 self.handshake_msg2 = Some(msg2);
577 }
578
579 pub fn handshake_msg2(&self) -> Option<&[u8]> {
581 self.handshake_msg2.as_deref()
582 }
583
584 pub fn clear_handshake_msg2(&mut self) {
586 self.handshake_msg2 = None;
587 }
588
589 pub fn increment_replay_suppressed(&mut self) -> u32 {
593 self.replay_suppressed_count += 1;
594 self.replay_suppressed_count
595 }
596
597 pub fn reset_replay_suppressed(&mut self) -> u32 {
599 let count = self.replay_suppressed_count;
600 self.replay_suppressed_count = 0;
601 count
602 }
603
604 pub fn replay_suppressed_count(&self) -> u32 {
606 self.replay_suppressed_count
607 }
608
609 pub fn increment_decrypt_failures(&mut self) -> u32 {
613 self.consecutive_decrypt_failures += 1;
614 self.consecutive_decrypt_failures
615 }
616
617 pub fn reset_decrypt_failures(&mut self) {
619 self.consecutive_decrypt_failures = 0;
620 }
621
622 pub fn consecutive_decrypt_failures(&self) -> u32 {
624 self.consecutive_decrypt_failures
625 }
626
627 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
631 self.remote_epoch
632 }
633
634 pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
638 self.remote_epoch = remote_epoch;
639 }
640
641 pub fn coords(&self) -> Option<&TreeCoordinate> {
645 self.ancestry.as_ref()
646 }
647
648 pub fn declaration(&self) -> Option<&ParentDeclaration> {
650 self.declaration.as_ref()
651 }
652
653 pub fn has_tree_position(&self) -> bool {
655 self.declaration.is_some() && self.ancestry.is_some()
656 }
657
658 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
662 self.inbound_filter.as_ref()
663 }
664
665 pub fn filter_sequence(&self) -> u64 {
667 self.filter_sequence
668 }
669
670 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
672 if self.filter_received_at == 0 {
673 return true;
674 }
675 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
676 }
677
678 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
680 match &self.inbound_filter {
681 Some(filter) => filter.contains(node_addr),
682 None => false,
683 }
684 }
685
686 pub fn needs_filter_update(&self) -> bool {
688 self.pending_filter_update
689 }
690
691 pub fn link_stats(&self) -> &LinkStats {
695 &self.link_stats
696 }
697
698 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
700 &mut self.link_stats
701 }
702
703 pub fn mmp(&self) -> Option<&MmpPeerState> {
707 self.mmp.as_ref()
708 }
709
710 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
712 self.mmp.as_mut()
713 }
714
715 pub fn link_cost(&self) -> f64 {
723 match self.mmp() {
724 Some(mmp) => {
725 let etx = mmp.metrics.etx;
726 match mmp.metrics.srtt_ms() {
727 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
728 None => 1.0,
729 }
730 }
731 None => 1.0,
732 }
733 }
734
735 pub fn has_srtt(&self) -> bool {
737 self.mmp()
738 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
739 }
740
741 pub fn authenticated_at(&self) -> u64 {
743 self.authenticated_at
744 }
745
746 pub fn last_seen(&self) -> u64 {
748 self.last_seen
749 }
750
751 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
753 current_time_ms.saturating_sub(self.last_seen)
754 }
755
756 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
758 current_time_ms.saturating_sub(self.authenticated_at)
759 }
760
761 pub fn session_elapsed_ms(&self) -> u32 {
766 self.session_start.elapsed().as_millis() as u32
767 }
768
769 pub fn session_start(&self) -> Instant {
771 self.session_start
772 }
773
774 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
778 self.last_heartbeat_sent
779 }
780
781 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
783 self.last_heartbeat_sent = Some(now);
784 }
785
786 pub fn touch(&mut self, current_time_ms: u64) {
790 self.last_seen = current_time_ms;
791 if self.connectivity == ConnectivityState::Stale {
795 self.connectivity = ConnectivityState::Connected;
796 }
797 }
798
799 pub fn mark_stale(&mut self) {
801 if self.connectivity == ConnectivityState::Connected {
802 self.connectivity = ConnectivityState::Stale;
803 }
804 }
805
806 pub fn mark_reconnecting(&mut self) {
808 self.connectivity = ConnectivityState::Reconnecting;
809 }
810
811 pub fn mark_disconnected(&mut self) {
813 self.connectivity = ConnectivityState::Disconnected;
814 }
815
816 pub fn mark_connected(&mut self, current_time_ms: u64) {
818 self.connectivity = ConnectivityState::Connected;
819 self.last_seen = current_time_ms;
820 }
821
822 pub fn set_link_id(&mut self, link_id: LinkId) {
824 self.link_id = link_id;
825 }
826
827 pub fn update_tree_position(
831 &mut self,
832 declaration: ParentDeclaration,
833 ancestry: TreeCoordinate,
834 current_time_ms: u64,
835 ) {
836 self.declaration = Some(declaration);
837 self.ancestry = Some(ancestry);
838 self.last_seen = current_time_ms;
839 }
840
841 pub fn clear_tree_position(&mut self) {
843 self.declaration = None;
844 self.ancestry = None;
845 }
846
847 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
851 self.tree_announce_min_interval_ms = ms;
852 }
853
854 pub fn last_tree_announce_sent_ms(&self) -> u64 {
856 self.last_tree_announce_sent_ms
857 }
858
859 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
861 self.last_tree_announce_sent_ms = ms;
862 }
863
864 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
866 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
867 }
868
869 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
871 self.last_tree_announce_sent_ms = now_ms;
872 self.pending_tree_announce = false;
873 }
874
875 pub fn mark_tree_announce_pending(&mut self) {
877 self.pending_tree_announce = true;
878 }
879
880 pub fn has_pending_tree_announce(&self) -> bool {
882 self.pending_tree_announce
883 }
884
885 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
889 self.inbound_filter = Some(filter);
890 self.filter_sequence = sequence;
891 self.filter_received_at = current_time_ms;
892 self.last_seen = current_time_ms;
893 }
894
895 pub fn clear_filter(&mut self) {
897 self.inbound_filter = None;
898 self.filter_sequence = 0;
899 self.filter_received_at = 0;
900 }
901
902 pub fn mark_filter_update_needed(&mut self) {
904 self.pending_filter_update = true;
905 }
906
907 pub fn clear_filter_update_needed(&mut self) {
909 self.pending_filter_update = false;
910 }
911
912 pub fn session_established_at(&self) -> Instant {
916 self.session_established_at
917 }
918
919 #[cfg(test)]
920 pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
921 self.session_established_at = instant;
922 }
923
924 pub fn rekey_jitter_secs(&self) -> i64 {
926 self.rekey_jitter_secs
927 }
928
929 pub fn current_k_bit(&self) -> bool {
931 self.current_k_bit
932 }
933
934 pub fn rekey_in_progress(&self) -> bool {
936 self.rekey_in_progress
937 }
938
939 pub fn set_rekey_in_progress(&mut self) {
941 self.rekey_in_progress = true;
942 }
943
944 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
946 match self.last_peer_rekey {
947 Some(t) => t.elapsed().as_secs() < dampening_secs,
948 None => false,
949 }
950 }
951
952 pub fn record_peer_rekey(&mut self) {
954 self.last_peer_rekey = Some(Instant::now());
955 }
956
957 pub fn pending_our_index(&self) -> Option<SessionIndex> {
959 self.pending_our_index
960 }
961
962 pub fn pending_their_index(&self) -> Option<SessionIndex> {
964 self.pending_their_index
965 }
966
967 pub fn previous_our_index(&self) -> Option<SessionIndex> {
969 self.previous_our_index
970 }
971
972 pub fn previous_session(&self) -> Option<&NoiseSession> {
974 self.previous_session.as_ref()
975 }
976
977 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
979 self.previous_session.as_mut()
980 }
981
982 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
984 self.pending_new_session.as_ref()
985 }
986
987 pub fn pending_rekey_initiator(&self) -> bool {
989 self.pending_rekey_initiator
990 }
991
992 pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
995 self.pending_rekey_initiator
996 && self
997 .pending_rekey_completed_at
998 .is_some_and(|completed| completed.elapsed() >= delay)
999 }
1000
1001 pub fn set_pending_session(
1007 &mut self,
1008 session: NoiseSession,
1009 our_index: SessionIndex,
1010 their_index: SessionIndex,
1011 initiated_by_local: bool,
1012 ) {
1013 self.pending_new_session = Some(session);
1014 self.pending_our_index = Some(our_index);
1015 self.pending_their_index = Some(their_index);
1016 self.pending_rekey_initiator = initiated_by_local;
1017 self.pending_rekey_completed_at = Some(Instant::now());
1018 self.rekey_in_progress = false;
1019 self.rekey_our_index = None;
1021 self.rekey_handshake = None;
1022 self.rekey_msg1 = None;
1023 self.rekey_msg1_next_resend = 0;
1024 }
1025
1026 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1032 let new_session = self.pending_new_session.take()?;
1033 let new_our_index = self.pending_our_index.take();
1034 let new_their_index = self.pending_their_index.take();
1035
1036 self.previous_session = self.noise_session.take();
1038 self.previous_our_index = self.our_index;
1039 self.drain_started = Some(Instant::now());
1040
1041 self.noise_session = Some(new_session);
1043 self.our_index = new_our_index;
1044 self.their_index = new_their_index;
1045 self.pending_rekey_initiator = false;
1046 self.pending_rekey_completed_at = None;
1047
1048 self.current_k_bit = !self.current_k_bit;
1050 self.session_established_at = Instant::now();
1051 self.session_start = Instant::now();
1052 self.rekey_in_progress = false;
1053 self.rekey_jitter_secs = draw_rekey_jitter();
1054 self.last_heartbeat_sent = None;
1055 self.reset_replay_suppressed();
1056
1057 let now = Instant::now();
1059 if let Some(mmp) = &mut self.mmp {
1060 mmp.reset_for_rekey(now);
1061 }
1062
1063 self.previous_our_index
1064 }
1065
1066 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1071 let new_session = self.pending_new_session.take()?;
1072 let new_our_index = self.pending_our_index.take();
1073 let new_their_index = self.pending_their_index.take();
1074
1075 self.previous_session = self.noise_session.take();
1077 self.previous_our_index = self.our_index;
1078 self.drain_started = Some(Instant::now());
1079
1080 self.noise_session = Some(new_session);
1082 self.our_index = new_our_index;
1083 self.their_index = new_their_index;
1084 self.pending_rekey_initiator = false;
1085 self.pending_rekey_completed_at = None;
1086
1087 self.current_k_bit = !self.current_k_bit;
1089 self.session_established_at = Instant::now();
1090 self.session_start = Instant::now();
1091 self.rekey_in_progress = false;
1092 self.rekey_jitter_secs = draw_rekey_jitter();
1093 self.last_heartbeat_sent = None;
1094 self.reset_replay_suppressed();
1095
1096 let now = Instant::now();
1098 if let Some(mmp) = &mut self.mmp {
1099 mmp.reset_for_rekey(now);
1100 }
1101
1102 self.previous_our_index
1103 }
1104
1105 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1107 match self.drain_started {
1108 Some(t) => t.elapsed().as_secs() >= drain_secs,
1109 None => false,
1110 }
1111 }
1112
1113 pub fn is_draining(&self) -> bool {
1115 self.drain_started.is_some()
1116 }
1117
1118 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1123 self.previous_session = None;
1124 self.drain_started = None;
1125 self.previous_our_index.take()
1126 }
1127
1128 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1134 self.rekey_handshake = None;
1135 self.rekey_msg1 = None;
1136 self.rekey_msg1_next_resend = 0;
1137 self.rekey_in_progress = false;
1138 self.rekey_our_index.take().or_else(|| {
1140 self.pending_new_session = None;
1141 self.pending_their_index = None;
1142 self.pending_rekey_initiator = false;
1143 self.pending_rekey_completed_at = None;
1144 self.pending_our_index.take()
1145 })
1146 }
1147
1148 pub fn set_rekey_state(
1152 &mut self,
1153 handshake: NoiseHandshakeState,
1154 our_index: SessionIndex,
1155 wire_msg1: Vec<u8>,
1156 next_resend_ms: u64,
1157 ) {
1158 self.rekey_handshake = Some(handshake);
1159 self.rekey_our_index = Some(our_index);
1160 self.rekey_msg1 = Some(wire_msg1);
1161 self.rekey_msg1_next_resend = next_resend_ms;
1162 self.rekey_in_progress = true;
1163 }
1164
1165 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1167 self.rekey_our_index
1168 }
1169
1170 pub fn complete_rekey_msg2(
1176 &mut self,
1177 msg2_bytes: &[u8],
1178 ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1179 let mut hs = self
1180 .rekey_handshake
1181 .take()
1182 .ok_or_else(|| NoiseError::WrongState {
1183 expected: "rekey handshake in progress".to_string(),
1184 got: "no handshake state".to_string(),
1185 })?;
1186
1187 hs.read_message_2(msg2_bytes)?;
1188 let remote_epoch = hs.remote_epoch();
1189 let session = hs.into_session()?;
1190
1191 self.rekey_msg1 = None;
1193 self.rekey_msg1_next_resend = 0;
1194
1195 Ok((session, remote_epoch))
1196 }
1197
1198 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1200 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1201 }
1202
1203 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1205 self.rekey_msg1.as_deref()
1206 }
1207
1208 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1210 self.rekey_msg1_next_resend = next_ms;
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use super::*;
1217 use crate::Identity;
1218
1219 fn make_peer_identity() -> PeerIdentity {
1220 let identity = Identity::generate();
1221 PeerIdentity::from_pubkey(identity.pubkey())
1222 }
1223
1224 fn make_node_addr(val: u8) -> NodeAddr {
1225 let mut bytes = [0u8; 16];
1226 bytes[0] = val;
1227 NodeAddr::from_bytes(bytes)
1228 }
1229
1230 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1231 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1232 }
1233
1234 #[test]
1235 fn test_connectivity_state_properties() {
1236 assert!(ConnectivityState::Connected.can_send());
1237 assert!(ConnectivityState::Stale.can_send());
1238 assert!(!ConnectivityState::Reconnecting.can_send());
1239 assert!(!ConnectivityState::Disconnected.can_send());
1240
1241 assert!(ConnectivityState::Connected.is_healthy());
1242 assert!(!ConnectivityState::Stale.is_healthy());
1243
1244 assert!(ConnectivityState::Disconnected.is_terminal());
1245 assert!(!ConnectivityState::Connected.is_terminal());
1246 }
1247
1248 #[test]
1249 fn test_active_peer_creation() {
1250 let identity = make_peer_identity();
1251 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1252
1253 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1254 assert_eq!(peer.link_id(), LinkId::new(1));
1255 assert!(peer.is_healthy());
1256 assert!(peer.can_send());
1257 assert_eq!(peer.authenticated_at(), 1000);
1258 assert!(peer.needs_filter_update()); }
1260
1261 #[test]
1262 fn test_connectivity_transitions() {
1263 let identity = make_peer_identity();
1264 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1265
1266 assert!(peer.is_healthy());
1267
1268 peer.mark_stale();
1269 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1270 assert!(peer.can_send()); peer.touch(2000);
1274 assert!(peer.is_healthy());
1275
1276 peer.mark_reconnecting();
1277 assert!(!peer.can_send());
1278 peer.touch(2500);
1279 assert_eq!(peer.connectivity(), ConnectivityState::Reconnecting);
1280 assert!(!peer.can_send());
1281
1282 peer.mark_connected(3000);
1283 assert!(peer.is_healthy());
1284
1285 peer.mark_disconnected();
1286 assert!(peer.is_disconnected());
1287 assert!(!peer.can_send());
1288 }
1289
1290 #[test]
1291 fn test_tree_position() {
1292 let identity = make_peer_identity();
1293 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1294
1295 assert!(!peer.has_tree_position());
1296 assert!(peer.coords().is_none());
1297
1298 let node = make_node_addr(1);
1299 let parent = make_node_addr(2);
1300 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1301 let coords = make_coords(&[1, 2, 0]);
1302
1303 peer.update_tree_position(decl, coords, 2000);
1304
1305 assert!(peer.has_tree_position());
1306 assert!(peer.coords().is_some());
1307 assert_eq!(peer.last_seen(), 2000);
1308 }
1309
1310 #[test]
1311 fn test_bloom_filter() {
1312 let identity = make_peer_identity();
1313 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1314 let target = make_node_addr(42);
1315
1316 assert!(!peer.may_reach(&target));
1317 assert!(peer.filter_is_stale(2000, 500));
1318
1319 let mut filter = BloomFilter::new();
1320 filter.insert(&target);
1321 peer.update_filter(filter, 1, 1500);
1322
1323 assert!(peer.may_reach(&target));
1324 assert!(!peer.filter_is_stale(1800, 500));
1325 assert!(peer.filter_is_stale(2500, 500));
1326 }
1327
1328 #[test]
1329 fn test_timing() {
1330 let identity = make_peer_identity();
1331 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1332
1333 assert_eq!(peer.connection_duration(2000), 1000);
1334 assert_eq!(peer.idle_time(2000), 1000);
1335 }
1336
1337 #[test]
1338 fn test_filter_update_flag() {
1339 let identity = make_peer_identity();
1340 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1341
1342 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1345 assert!(!peer.needs_filter_update());
1346
1347 peer.mark_filter_update_needed();
1348 assert!(peer.needs_filter_update());
1349 }
1350
1351 #[test]
1352 fn test_with_stats() {
1353 let identity = make_peer_identity();
1354 let mut stats = LinkStats::new();
1355 stats.record_sent(100);
1356 stats.record_recv(200, 500);
1357
1358 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1359
1360 assert_eq!(peer.link_stats().packets_sent, 1);
1361 assert_eq!(peer.link_stats().packets_recv, 1);
1362 }
1363
1364 #[test]
1365 fn test_replay_suppression_counter() {
1366 let identity = make_peer_identity();
1367 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1368
1369 assert_eq!(peer.replay_suppressed_count(), 0);
1371
1372 assert_eq!(peer.increment_replay_suppressed(), 1);
1374 assert_eq!(peer.increment_replay_suppressed(), 2);
1375 assert_eq!(peer.increment_replay_suppressed(), 3);
1376 assert_eq!(peer.replay_suppressed_count(), 3);
1377
1378 assert_eq!(peer.reset_replay_suppressed(), 3);
1380 assert_eq!(peer.replay_suppressed_count(), 0);
1381
1382 assert_eq!(peer.increment_replay_suppressed(), 1);
1384 assert_eq!(peer.replay_suppressed_count(), 1);
1385
1386 peer.reset_replay_suppressed();
1388 assert_eq!(peer.reset_replay_suppressed(), 0);
1389 }
1390
1391 #[test]
1392 fn test_increment_decrypt_failures_monotonic() {
1393 let identity = make_peer_identity();
1394 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1395
1396 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1398
1399 let mut prev = 0u32;
1401 for expected in 1..=25u32 {
1402 let count = peer.increment_decrypt_failures();
1403 assert_eq!(count, expected, "increment must return monotonic count");
1404 assert!(count > prev, "count must strictly increase");
1405 assert_eq!(peer.consecutive_decrypt_failures(), count);
1406 prev = count;
1407 }
1408 }
1409
1410 #[test]
1411 fn test_reset_decrypt_failures_zeroes_counter() {
1412 let identity = make_peer_identity();
1413 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1414
1415 for _ in 0..7 {
1417 peer.increment_decrypt_failures();
1418 }
1419 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1420
1421 peer.reset_decrypt_failures();
1423 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1424
1425 peer.reset_decrypt_failures();
1427 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1428
1429 assert_eq!(peer.increment_decrypt_failures(), 1);
1431 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1432 }
1433
1434 #[test]
1435 fn test_rekey_jitter_in_range() {
1436 for _ in 0..100 {
1437 let identity = make_peer_identity();
1438 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1439 let jitter = peer.rekey_jitter_secs();
1440 assert!(
1441 (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1442 "jitter {} outside [-{}, +{}]",
1443 jitter,
1444 REKEY_JITTER_SECS,
1445 REKEY_JITTER_SECS
1446 );
1447 }
1448 }
1449
1450 #[test]
1451 fn test_rekey_jitter_mean_near_zero() {
1452 let mut sum = 0i64;
1453 let n = 200i64;
1454
1455 for _ in 0..n {
1456 let identity = make_peer_identity();
1457 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1458 sum += peer.rekey_jitter_secs();
1459 }
1460
1461 let mean = sum / n;
1462 assert!(
1463 mean.abs() < 5,
1464 "empirical mean {} not within 5 of 0 over {} samples",
1465 mean,
1466 n
1467 );
1468 }
1469}