1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::Instant;
18
19fn draw_rekey_jitter() -> i64 {
20 rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28 Connected,
30 Stale,
32 Reconnecting,
34 Disconnected,
36}
37
38impl ConnectivityState {
39 pub fn can_send(&self) -> bool {
41 matches!(
42 self,
43 ConnectivityState::Connected | ConnectivityState::Stale
44 )
45 }
46
47 pub fn is_terminal(&self) -> bool {
49 matches!(self, ConnectivityState::Disconnected)
50 }
51
52 pub fn is_healthy(&self) -> bool {
54 matches!(self, ConnectivityState::Connected)
55 }
56}
57
58impl fmt::Display for ConnectivityState {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 let s = match self {
61 ConnectivityState::Connected => "connected",
62 ConnectivityState::Stale => "stale",
63 ConnectivityState::Reconnecting => "reconnecting",
64 ConnectivityState::Disconnected => "disconnected",
65 };
66 write!(f, "{}", s)
67 }
68}
69
70#[derive(Debug)]
79pub struct ActivePeer {
80 identity: PeerIdentity,
83
84 link_id: LinkId,
87 connectivity: ConnectivityState,
89
90 noise_session: Option<NoiseSession>,
93 our_index: Option<SessionIndex>,
95 their_index: Option<SessionIndex>,
97 transport_id: Option<TransportId>,
99 current_addr: Option<TransportAddr>,
101
102 declaration: Option<ParentDeclaration>,
105 ancestry: Option<TreeCoordinate>,
107
108 tree_announce_min_interval_ms: u64,
111 last_tree_announce_sent_ms: u64,
113 pending_tree_announce: bool,
115
116 inbound_filter: Option<BloomFilter>,
119 filter_sequence: u64,
121 filter_received_at: u64,
123 pending_filter_update: bool,
125
126 session_start: Instant,
130
131 link_stats: LinkStats,
134 authenticated_at: u64,
136 last_seen: u64,
138
139 remote_epoch: Option<[u8; 8]>,
142
143 mmp: Option<MmpPeerState>,
146
147 last_heartbeat_sent: Option<Instant>,
150
151 handshake_msg2: Option<Vec<u8>>,
155
156 replay_suppressed_count: u32,
159 consecutive_decrypt_failures: u32,
161
162 session_established_at: Instant,
165 rekey_jitter_secs: i64,
167 current_k_bit: bool,
169 previous_session: Option<NoiseSession>,
171 previous_our_index: Option<SessionIndex>,
173 drain_started: Option<Instant>,
175 pending_new_session: Option<NoiseSession>,
177 pending_our_index: Option<SessionIndex>,
179 pending_their_index: Option<SessionIndex>,
181 rekey_in_progress: bool,
183 last_peer_rekey: Option<Instant>,
185 rekey_handshake: Option<NoiseHandshakeState>,
187 rekey_our_index: Option<SessionIndex>,
189 rekey_msg1: Option<Vec<u8>>,
191 rekey_msg1_next_resend: u64,
193
194 #[cfg(any(target_os = "linux", target_os = "macos"))]
210 connected_udp:
211 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
212
213 #[cfg(any(target_os = "linux", target_os = "macos"))]
220 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
221}
222
223impl ActivePeer {
224 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
229 let now = Instant::now();
230 Self {
231 identity,
232 link_id,
233 connectivity: ConnectivityState::Connected,
234 noise_session: None,
235 our_index: None,
236 their_index: None,
237 transport_id: None,
238 current_addr: None,
239 declaration: None,
240 ancestry: None,
241 tree_announce_min_interval_ms: 500,
242 last_tree_announce_sent_ms: 0,
243 pending_tree_announce: false,
244 inbound_filter: None,
245 filter_sequence: 0,
246 filter_received_at: 0,
247 pending_filter_update: true, session_start: now,
249 link_stats: LinkStats::new(),
250 authenticated_at,
251 last_seen: authenticated_at,
252 remote_epoch: None,
253 mmp: None,
254 last_heartbeat_sent: None,
255 handshake_msg2: None,
256 replay_suppressed_count: 0,
257 consecutive_decrypt_failures: 0,
258 session_established_at: now,
259 rekey_jitter_secs: draw_rekey_jitter(),
260 current_k_bit: false,
261 previous_session: None,
262 previous_our_index: None,
263 drain_started: None,
264 pending_new_session: None,
265 pending_our_index: None,
266 pending_their_index: None,
267 rekey_in_progress: false,
268 last_peer_rekey: None,
269 rekey_handshake: None,
270 rekey_our_index: None,
271 rekey_msg1: None,
272 rekey_msg1_next_resend: 0,
273 #[cfg(any(target_os = "linux", target_os = "macos"))]
274 connected_udp: None,
275 #[cfg(any(target_os = "linux", target_os = "macos"))]
276 peer_recv_drain: None,
277 }
278 }
279
280 pub fn with_stats(
285 identity: PeerIdentity,
286 link_id: LinkId,
287 authenticated_at: u64,
288 link_stats: LinkStats,
289 ) -> Self {
290 let mut peer = Self::new(identity, link_id, authenticated_at);
291 peer.link_stats = link_stats;
292 peer
293 }
294
295 #[allow(clippy::too_many_arguments)]
300 pub fn with_session(
301 identity: PeerIdentity,
302 link_id: LinkId,
303 authenticated_at: u64,
304 noise_session: NoiseSession,
305 our_index: SessionIndex,
306 their_index: SessionIndex,
307 transport_id: TransportId,
308 current_addr: TransportAddr,
309 link_stats: LinkStats,
310 is_initiator: bool,
311 mmp_config: &MmpConfig,
312 remote_epoch: Option<[u8; 8]>,
313 ) -> Self {
314 let now = Instant::now();
315 Self {
316 identity,
317 link_id,
318 connectivity: ConnectivityState::Connected,
319 noise_session: Some(noise_session),
320 our_index: Some(our_index),
321 their_index: Some(their_index),
322 transport_id: Some(transport_id),
323 current_addr: Some(current_addr),
324 declaration: None,
325 ancestry: None,
326 tree_announce_min_interval_ms: 500,
327 last_tree_announce_sent_ms: 0,
328 pending_tree_announce: false,
329 inbound_filter: None,
330 filter_sequence: 0,
331 filter_received_at: 0,
332 pending_filter_update: true,
333 session_start: now,
334 link_stats,
335 authenticated_at,
336 last_seen: authenticated_at,
337 remote_epoch,
338 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
339 last_heartbeat_sent: None,
340 handshake_msg2: None,
341 replay_suppressed_count: 0,
342 consecutive_decrypt_failures: 0,
343 session_established_at: now,
344 rekey_jitter_secs: draw_rekey_jitter(),
345 current_k_bit: false,
346 previous_session: None,
347 previous_our_index: None,
348 drain_started: None,
349 pending_new_session: None,
350 pending_our_index: None,
351 pending_their_index: None,
352 rekey_in_progress: false,
353 last_peer_rekey: None,
354 rekey_handshake: None,
355 rekey_our_index: None,
356 rekey_msg1: None,
357 rekey_msg1_next_resend: 0,
358 #[cfg(any(target_os = "linux", target_os = "macos"))]
359 connected_udp: None,
360 #[cfg(any(target_os = "linux", target_os = "macos"))]
361 peer_recv_drain: None,
362 }
363 }
364
365 #[cfg(any(target_os = "linux", target_os = "macos"))]
370 pub(crate) fn connected_udp(
371 &self,
372 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
373 self.connected_udp.clone()
374 }
375
376 #[cfg(any(target_os = "linux", target_os = "macos"))]
390 pub(crate) fn set_connected_udp(
391 &mut self,
392 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
393 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
394 ) {
395 self.peer_recv_drain = None;
399 self.connected_udp = None;
400 self.connected_udp = Some(socket);
401 self.peer_recv_drain = Some(drain);
402 }
403
404 #[cfg(any(target_os = "linux", target_os = "macos"))]
409 pub(crate) fn clear_connected_udp(&mut self) {
410 self.peer_recv_drain = None;
411 self.connected_udp = None;
412 }
413
414 pub fn identity(&self) -> &PeerIdentity {
418 &self.identity
419 }
420
421 pub fn node_addr(&self) -> &NodeAddr {
423 self.identity.node_addr()
424 }
425
426 pub fn address(&self) -> &FipsAddress {
428 self.identity.address()
429 }
430
431 pub fn pubkey(&self) -> XOnlyPublicKey {
433 self.identity.pubkey()
434 }
435
436 pub fn npub(&self) -> String {
438 self.identity.npub()
439 }
440
441 pub fn link_id(&self) -> LinkId {
445 self.link_id
446 }
447
448 pub fn connectivity(&self) -> ConnectivityState {
450 self.connectivity
451 }
452
453 pub fn can_send(&self) -> bool {
455 self.connectivity.can_send()
456 }
457
458 pub fn is_healthy(&self) -> bool {
460 self.connectivity.is_healthy()
461 }
462
463 pub fn is_disconnected(&self) -> bool {
465 self.connectivity.is_terminal()
466 }
467
468 pub fn has_session(&self) -> bool {
472 self.noise_session.is_some()
473 }
474
475 pub fn noise_session(&self) -> Option<&NoiseSession> {
477 self.noise_session.as_ref()
478 }
479
480 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
482 self.noise_session.as_mut()
483 }
484
485 pub fn our_index(&self) -> Option<SessionIndex> {
487 self.our_index
488 }
489
490 pub fn their_index(&self) -> Option<SessionIndex> {
492 self.their_index
493 }
494
495 pub fn set_their_index(&mut self, index: SessionIndex) {
499 self.their_index = Some(index);
500 }
501
502 pub fn replace_session(
512 &mut self,
513 new_session: NoiseSession,
514 new_our_index: SessionIndex,
515 new_their_index: SessionIndex,
516 ) -> Option<SessionIndex> {
517 self.reset_replay_suppressed();
518 let old_our_index = self.our_index;
519 self.noise_session = Some(new_session);
520 self.our_index = Some(new_our_index);
521 self.their_index = Some(new_their_index);
522 old_our_index
523 }
524
525 pub fn transport_id(&self) -> Option<TransportId> {
527 self.transport_id
528 }
529
530 pub fn current_addr(&self) -> Option<&TransportAddr> {
532 self.current_addr.as_ref()
533 }
534
535 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
556 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
557 return false;
558 }
559 self.transport_id = Some(transport_id);
560 self.current_addr = Some(addr.clone());
561 true
562 }
563
564 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
568 self.handshake_msg2 = Some(msg2);
569 }
570
571 pub fn handshake_msg2(&self) -> Option<&[u8]> {
573 self.handshake_msg2.as_deref()
574 }
575
576 pub fn clear_handshake_msg2(&mut self) {
578 self.handshake_msg2 = None;
579 }
580
581 pub fn increment_replay_suppressed(&mut self) -> u32 {
585 self.replay_suppressed_count += 1;
586 self.replay_suppressed_count
587 }
588
589 pub fn reset_replay_suppressed(&mut self) -> u32 {
591 let count = self.replay_suppressed_count;
592 self.replay_suppressed_count = 0;
593 count
594 }
595
596 pub fn replay_suppressed_count(&self) -> u32 {
598 self.replay_suppressed_count
599 }
600
601 pub fn increment_decrypt_failures(&mut self) -> u32 {
605 self.consecutive_decrypt_failures += 1;
606 self.consecutive_decrypt_failures
607 }
608
609 pub fn reset_decrypt_failures(&mut self) {
611 self.consecutive_decrypt_failures = 0;
612 }
613
614 pub fn consecutive_decrypt_failures(&self) -> u32 {
616 self.consecutive_decrypt_failures
617 }
618
619 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
623 self.remote_epoch
624 }
625
626 pub fn coords(&self) -> Option<&TreeCoordinate> {
630 self.ancestry.as_ref()
631 }
632
633 pub fn declaration(&self) -> Option<&ParentDeclaration> {
635 self.declaration.as_ref()
636 }
637
638 pub fn has_tree_position(&self) -> bool {
640 self.declaration.is_some() && self.ancestry.is_some()
641 }
642
643 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
647 self.inbound_filter.as_ref()
648 }
649
650 pub fn filter_sequence(&self) -> u64 {
652 self.filter_sequence
653 }
654
655 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
657 if self.filter_received_at == 0 {
658 return true;
659 }
660 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
661 }
662
663 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
665 match &self.inbound_filter {
666 Some(filter) => filter.contains(node_addr),
667 None => false,
668 }
669 }
670
671 pub fn needs_filter_update(&self) -> bool {
673 self.pending_filter_update
674 }
675
676 pub fn link_stats(&self) -> &LinkStats {
680 &self.link_stats
681 }
682
683 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
685 &mut self.link_stats
686 }
687
688 pub fn mmp(&self) -> Option<&MmpPeerState> {
692 self.mmp.as_ref()
693 }
694
695 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
697 self.mmp.as_mut()
698 }
699
700 pub fn link_cost(&self) -> f64 {
708 match self.mmp() {
709 Some(mmp) => {
710 let etx = mmp.metrics.etx;
711 match mmp.metrics.srtt_ms() {
712 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
713 None => 1.0,
714 }
715 }
716 None => 1.0,
717 }
718 }
719
720 pub fn has_srtt(&self) -> bool {
722 self.mmp()
723 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
724 }
725
726 pub fn authenticated_at(&self) -> u64 {
728 self.authenticated_at
729 }
730
731 pub fn last_seen(&self) -> u64 {
733 self.last_seen
734 }
735
736 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
738 current_time_ms.saturating_sub(self.last_seen)
739 }
740
741 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
743 current_time_ms.saturating_sub(self.authenticated_at)
744 }
745
746 pub fn session_elapsed_ms(&self) -> u32 {
751 self.session_start.elapsed().as_millis() as u32
752 }
753
754 pub fn session_start(&self) -> Instant {
756 self.session_start
757 }
758
759 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
763 self.last_heartbeat_sent
764 }
765
766 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
768 self.last_heartbeat_sent = Some(now);
769 }
770
771 pub fn touch(&mut self, current_time_ms: u64) {
775 self.last_seen = current_time_ms;
776 if self.connectivity == ConnectivityState::Stale {
778 self.connectivity = ConnectivityState::Connected;
779 }
780 }
781
782 pub fn mark_stale(&mut self) {
784 if self.connectivity == ConnectivityState::Connected {
785 self.connectivity = ConnectivityState::Stale;
786 }
787 }
788
789 pub fn mark_reconnecting(&mut self) {
791 self.connectivity = ConnectivityState::Reconnecting;
792 }
793
794 pub fn mark_disconnected(&mut self) {
796 self.connectivity = ConnectivityState::Disconnected;
797 }
798
799 pub fn mark_connected(&mut self, current_time_ms: u64) {
801 self.connectivity = ConnectivityState::Connected;
802 self.last_seen = current_time_ms;
803 }
804
805 pub fn set_link_id(&mut self, link_id: LinkId) {
807 self.link_id = link_id;
808 }
809
810 pub fn update_tree_position(
814 &mut self,
815 declaration: ParentDeclaration,
816 ancestry: TreeCoordinate,
817 current_time_ms: u64,
818 ) {
819 self.declaration = Some(declaration);
820 self.ancestry = Some(ancestry);
821 self.last_seen = current_time_ms;
822 }
823
824 pub fn clear_tree_position(&mut self) {
826 self.declaration = None;
827 self.ancestry = None;
828 }
829
830 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
834 self.tree_announce_min_interval_ms = ms;
835 }
836
837 pub fn last_tree_announce_sent_ms(&self) -> u64 {
839 self.last_tree_announce_sent_ms
840 }
841
842 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
844 self.last_tree_announce_sent_ms = ms;
845 }
846
847 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
849 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
850 }
851
852 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
854 self.last_tree_announce_sent_ms = now_ms;
855 self.pending_tree_announce = false;
856 }
857
858 pub fn mark_tree_announce_pending(&mut self) {
860 self.pending_tree_announce = true;
861 }
862
863 pub fn has_pending_tree_announce(&self) -> bool {
865 self.pending_tree_announce
866 }
867
868 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
872 self.inbound_filter = Some(filter);
873 self.filter_sequence = sequence;
874 self.filter_received_at = current_time_ms;
875 self.last_seen = current_time_ms;
876 }
877
878 pub fn clear_filter(&mut self) {
880 self.inbound_filter = None;
881 self.filter_sequence = 0;
882 self.filter_received_at = 0;
883 }
884
885 pub fn mark_filter_update_needed(&mut self) {
887 self.pending_filter_update = true;
888 }
889
890 pub fn clear_filter_update_needed(&mut self) {
892 self.pending_filter_update = false;
893 }
894
895 pub fn session_established_at(&self) -> Instant {
899 self.session_established_at
900 }
901
902 #[cfg(test)]
903 pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
904 self.session_established_at = instant;
905 }
906
907 pub fn rekey_jitter_secs(&self) -> i64 {
909 self.rekey_jitter_secs
910 }
911
912 pub fn current_k_bit(&self) -> bool {
914 self.current_k_bit
915 }
916
917 pub fn rekey_in_progress(&self) -> bool {
919 self.rekey_in_progress
920 }
921
922 pub fn set_rekey_in_progress(&mut self) {
924 self.rekey_in_progress = true;
925 }
926
927 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
929 match self.last_peer_rekey {
930 Some(t) => t.elapsed().as_secs() < dampening_secs,
931 None => false,
932 }
933 }
934
935 pub fn record_peer_rekey(&mut self) {
937 self.last_peer_rekey = Some(Instant::now());
938 }
939
940 pub fn pending_our_index(&self) -> Option<SessionIndex> {
942 self.pending_our_index
943 }
944
945 pub fn pending_their_index(&self) -> Option<SessionIndex> {
947 self.pending_their_index
948 }
949
950 pub fn previous_our_index(&self) -> Option<SessionIndex> {
952 self.previous_our_index
953 }
954
955 pub fn previous_session(&self) -> Option<&NoiseSession> {
957 self.previous_session.as_ref()
958 }
959
960 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
962 self.previous_session.as_mut()
963 }
964
965 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
967 self.pending_new_session.as_ref()
968 }
969
970 pub fn set_pending_session(
975 &mut self,
976 session: NoiseSession,
977 our_index: SessionIndex,
978 their_index: SessionIndex,
979 ) {
980 self.pending_new_session = Some(session);
981 self.pending_our_index = Some(our_index);
982 self.pending_their_index = Some(their_index);
983 self.rekey_in_progress = false;
984 self.rekey_our_index = None;
986 self.rekey_handshake = None;
987 self.rekey_msg1 = None;
988 self.rekey_msg1_next_resend = 0;
989 }
990
991 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
997 let new_session = self.pending_new_session.take()?;
998 let new_our_index = self.pending_our_index.take();
999 let new_their_index = self.pending_their_index.take();
1000
1001 self.previous_session = self.noise_session.take();
1003 self.previous_our_index = self.our_index;
1004 self.drain_started = Some(Instant::now());
1005
1006 self.noise_session = Some(new_session);
1008 self.our_index = new_our_index;
1009 self.their_index = new_their_index;
1010
1011 self.current_k_bit = !self.current_k_bit;
1013 self.session_established_at = Instant::now();
1014 self.session_start = Instant::now();
1015 self.rekey_in_progress = false;
1016 self.rekey_jitter_secs = draw_rekey_jitter();
1017 self.reset_replay_suppressed();
1018
1019 let now = Instant::now();
1021 if let Some(mmp) = &mut self.mmp {
1022 mmp.reset_for_rekey(now);
1023 }
1024
1025 self.previous_our_index
1026 }
1027
1028 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1033 let new_session = self.pending_new_session.take()?;
1034 let new_our_index = self.pending_our_index.take();
1035 let new_their_index = self.pending_their_index.take();
1036
1037 self.previous_session = self.noise_session.take();
1039 self.previous_our_index = self.our_index;
1040 self.drain_started = Some(Instant::now());
1041
1042 self.noise_session = Some(new_session);
1044 self.our_index = new_our_index;
1045 self.their_index = new_their_index;
1046
1047 self.current_k_bit = !self.current_k_bit;
1049 self.session_established_at = Instant::now();
1050 self.session_start = Instant::now();
1051 self.rekey_in_progress = false;
1052 self.rekey_jitter_secs = draw_rekey_jitter();
1053 self.reset_replay_suppressed();
1054
1055 let now = Instant::now();
1057 if let Some(mmp) = &mut self.mmp {
1058 mmp.reset_for_rekey(now);
1059 }
1060
1061 self.previous_our_index
1062 }
1063
1064 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1066 match self.drain_started {
1067 Some(t) => t.elapsed().as_secs() >= drain_secs,
1068 None => false,
1069 }
1070 }
1071
1072 pub fn is_draining(&self) -> bool {
1074 self.drain_started.is_some()
1075 }
1076
1077 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1082 self.previous_session = None;
1083 self.drain_started = None;
1084 self.previous_our_index.take()
1085 }
1086
1087 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1093 self.rekey_handshake = None;
1094 self.rekey_msg1 = None;
1095 self.rekey_msg1_next_resend = 0;
1096 self.rekey_in_progress = false;
1097 self.rekey_our_index.take().or_else(|| {
1099 self.pending_new_session = None;
1100 self.pending_their_index = None;
1101 self.pending_our_index.take()
1102 })
1103 }
1104
1105 pub fn set_rekey_state(
1109 &mut self,
1110 handshake: NoiseHandshakeState,
1111 our_index: SessionIndex,
1112 wire_msg1: Vec<u8>,
1113 next_resend_ms: u64,
1114 ) {
1115 self.rekey_handshake = Some(handshake);
1116 self.rekey_our_index = Some(our_index);
1117 self.rekey_msg1 = Some(wire_msg1);
1118 self.rekey_msg1_next_resend = next_resend_ms;
1119 self.rekey_in_progress = true;
1120 }
1121
1122 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1124 self.rekey_our_index
1125 }
1126
1127 pub fn complete_rekey_msg2(&mut self, msg2_bytes: &[u8]) -> Result<NoiseSession, NoiseError> {
1133 let mut hs = self
1134 .rekey_handshake
1135 .take()
1136 .ok_or_else(|| NoiseError::WrongState {
1137 expected: "rekey handshake in progress".to_string(),
1138 got: "no handshake state".to_string(),
1139 })?;
1140
1141 hs.read_message_2(msg2_bytes)?;
1142 let session = hs.into_session()?;
1143
1144 self.rekey_msg1 = None;
1146 self.rekey_msg1_next_resend = 0;
1147
1148 Ok(session)
1149 }
1150
1151 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1153 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1154 }
1155
1156 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1158 self.rekey_msg1.as_deref()
1159 }
1160
1161 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1163 self.rekey_msg1_next_resend = next_ms;
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use super::*;
1170 use crate::Identity;
1171
1172 fn make_peer_identity() -> PeerIdentity {
1173 let identity = Identity::generate();
1174 PeerIdentity::from_pubkey(identity.pubkey())
1175 }
1176
1177 fn make_node_addr(val: u8) -> NodeAddr {
1178 let mut bytes = [0u8; 16];
1179 bytes[0] = val;
1180 NodeAddr::from_bytes(bytes)
1181 }
1182
1183 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1184 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1185 }
1186
1187 #[test]
1188 fn test_connectivity_state_properties() {
1189 assert!(ConnectivityState::Connected.can_send());
1190 assert!(ConnectivityState::Stale.can_send());
1191 assert!(!ConnectivityState::Reconnecting.can_send());
1192 assert!(!ConnectivityState::Disconnected.can_send());
1193
1194 assert!(ConnectivityState::Connected.is_healthy());
1195 assert!(!ConnectivityState::Stale.is_healthy());
1196
1197 assert!(ConnectivityState::Disconnected.is_terminal());
1198 assert!(!ConnectivityState::Connected.is_terminal());
1199 }
1200
1201 #[test]
1202 fn test_active_peer_creation() {
1203 let identity = make_peer_identity();
1204 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1205
1206 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1207 assert_eq!(peer.link_id(), LinkId::new(1));
1208 assert!(peer.is_healthy());
1209 assert!(peer.can_send());
1210 assert_eq!(peer.authenticated_at(), 1000);
1211 assert!(peer.needs_filter_update()); }
1213
1214 #[test]
1215 fn test_connectivity_transitions() {
1216 let identity = make_peer_identity();
1217 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1218
1219 assert!(peer.is_healthy());
1220
1221 peer.mark_stale();
1222 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1223 assert!(peer.can_send()); peer.touch(2000);
1227 assert!(peer.is_healthy());
1228
1229 peer.mark_reconnecting();
1230 assert!(!peer.can_send());
1231
1232 peer.mark_connected(3000);
1233 assert!(peer.is_healthy());
1234
1235 peer.mark_disconnected();
1236 assert!(peer.is_disconnected());
1237 assert!(!peer.can_send());
1238 }
1239
1240 #[test]
1241 fn test_tree_position() {
1242 let identity = make_peer_identity();
1243 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1244
1245 assert!(!peer.has_tree_position());
1246 assert!(peer.coords().is_none());
1247
1248 let node = make_node_addr(1);
1249 let parent = make_node_addr(2);
1250 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1251 let coords = make_coords(&[1, 2, 0]);
1252
1253 peer.update_tree_position(decl, coords, 2000);
1254
1255 assert!(peer.has_tree_position());
1256 assert!(peer.coords().is_some());
1257 assert_eq!(peer.last_seen(), 2000);
1258 }
1259
1260 #[test]
1261 fn test_bloom_filter() {
1262 let identity = make_peer_identity();
1263 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1264 let target = make_node_addr(42);
1265
1266 assert!(!peer.may_reach(&target));
1267 assert!(peer.filter_is_stale(2000, 500));
1268
1269 let mut filter = BloomFilter::new();
1270 filter.insert(&target);
1271 peer.update_filter(filter, 1, 1500);
1272
1273 assert!(peer.may_reach(&target));
1274 assert!(!peer.filter_is_stale(1800, 500));
1275 assert!(peer.filter_is_stale(2500, 500));
1276 }
1277
1278 #[test]
1279 fn test_timing() {
1280 let identity = make_peer_identity();
1281 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1282
1283 assert_eq!(peer.connection_duration(2000), 1000);
1284 assert_eq!(peer.idle_time(2000), 1000);
1285 }
1286
1287 #[test]
1288 fn test_filter_update_flag() {
1289 let identity = make_peer_identity();
1290 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1291
1292 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1295 assert!(!peer.needs_filter_update());
1296
1297 peer.mark_filter_update_needed();
1298 assert!(peer.needs_filter_update());
1299 }
1300
1301 #[test]
1302 fn test_with_stats() {
1303 let identity = make_peer_identity();
1304 let mut stats = LinkStats::new();
1305 stats.record_sent(100);
1306 stats.record_recv(200, 500);
1307
1308 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1309
1310 assert_eq!(peer.link_stats().packets_sent, 1);
1311 assert_eq!(peer.link_stats().packets_recv, 1);
1312 }
1313
1314 #[test]
1315 fn test_replay_suppression_counter() {
1316 let identity = make_peer_identity();
1317 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1318
1319 assert_eq!(peer.replay_suppressed_count(), 0);
1321
1322 assert_eq!(peer.increment_replay_suppressed(), 1);
1324 assert_eq!(peer.increment_replay_suppressed(), 2);
1325 assert_eq!(peer.increment_replay_suppressed(), 3);
1326 assert_eq!(peer.replay_suppressed_count(), 3);
1327
1328 assert_eq!(peer.reset_replay_suppressed(), 3);
1330 assert_eq!(peer.replay_suppressed_count(), 0);
1331
1332 assert_eq!(peer.increment_replay_suppressed(), 1);
1334 assert_eq!(peer.replay_suppressed_count(), 1);
1335
1336 peer.reset_replay_suppressed();
1338 assert_eq!(peer.reset_replay_suppressed(), 0);
1339 }
1340
1341 #[test]
1342 fn test_increment_decrypt_failures_monotonic() {
1343 let identity = make_peer_identity();
1344 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1345
1346 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1348
1349 let mut prev = 0u32;
1351 for expected in 1..=25u32 {
1352 let count = peer.increment_decrypt_failures();
1353 assert_eq!(count, expected, "increment must return monotonic count");
1354 assert!(count > prev, "count must strictly increase");
1355 assert_eq!(peer.consecutive_decrypt_failures(), count);
1356 prev = count;
1357 }
1358 }
1359
1360 #[test]
1361 fn test_reset_decrypt_failures_zeroes_counter() {
1362 let identity = make_peer_identity();
1363 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1364
1365 for _ in 0..7 {
1367 peer.increment_decrypt_failures();
1368 }
1369 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1370
1371 peer.reset_decrypt_failures();
1373 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1374
1375 peer.reset_decrypt_failures();
1377 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1378
1379 assert_eq!(peer.increment_decrypt_failures(), 1);
1381 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1382 }
1383
1384 #[test]
1385 fn test_rekey_jitter_in_range() {
1386 for _ in 0..100 {
1387 let identity = make_peer_identity();
1388 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1389 let jitter = peer.rekey_jitter_secs();
1390 assert!(
1391 (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1392 "jitter {} outside [-{}, +{}]",
1393 jitter,
1394 REKEY_JITTER_SECS,
1395 REKEY_JITTER_SECS
1396 );
1397 }
1398 }
1399
1400 #[test]
1401 fn test_rekey_jitter_mean_near_zero() {
1402 let mut sum = 0i64;
1403 let n = 200i64;
1404
1405 for _ in 0..n {
1406 let identity = make_peer_identity();
1407 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1408 sum += peer.rekey_jitter_secs();
1409 }
1410
1411 let mean = sum / n;
1412 assert!(
1413 mean.abs() < 5,
1414 "empirical mean {} not within 5 of 0 over {} samples",
1415 mean,
1416 n
1417 );
1418 }
1419}