1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::Instant;
18
19fn draw_rekey_jitter() -> i64 {
20 rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28 Connected,
30 Stale,
32 Reconnecting,
34 Disconnected,
36}
37
38impl ConnectivityState {
39 pub fn can_send(&self) -> bool {
41 matches!(
42 self,
43 ConnectivityState::Connected | ConnectivityState::Stale
44 )
45 }
46
47 pub fn is_terminal(&self) -> bool {
49 matches!(self, ConnectivityState::Disconnected)
50 }
51
52 pub fn is_healthy(&self) -> bool {
54 matches!(self, ConnectivityState::Connected)
55 }
56}
57
58impl fmt::Display for ConnectivityState {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 let s = match self {
61 ConnectivityState::Connected => "connected",
62 ConnectivityState::Stale => "stale",
63 ConnectivityState::Reconnecting => "reconnecting",
64 ConnectivityState::Disconnected => "disconnected",
65 };
66 write!(f, "{}", s)
67 }
68}
69
70#[derive(Debug)]
79pub struct ActivePeer {
80 identity: PeerIdentity,
83
84 link_id: LinkId,
87 connectivity: ConnectivityState,
89
90 noise_session: Option<NoiseSession>,
93 our_index: Option<SessionIndex>,
95 their_index: Option<SessionIndex>,
97 transport_id: Option<TransportId>,
99 current_addr: Option<TransportAddr>,
101
102 declaration: Option<ParentDeclaration>,
105 ancestry: Option<TreeCoordinate>,
107
108 tree_announce_min_interval_ms: u64,
111 last_tree_announce_sent_ms: u64,
113 pending_tree_announce: bool,
115
116 inbound_filter: Option<BloomFilter>,
119 filter_sequence: u64,
121 filter_received_at: u64,
123 pending_filter_update: bool,
125
126 session_start: Instant,
130
131 link_stats: LinkStats,
134 authenticated_at: u64,
136 last_seen: u64,
138
139 remote_epoch: Option<[u8; 8]>,
142
143 mmp: Option<MmpPeerState>,
146
147 last_heartbeat_sent: Option<Instant>,
150
151 handshake_msg2: Option<Vec<u8>>,
155
156 replay_suppressed_count: u32,
159 consecutive_decrypt_failures: u32,
161
162 session_established_at: Instant,
165 rekey_jitter_secs: i64,
167 current_k_bit: bool,
169 previous_session: Option<NoiseSession>,
171 previous_our_index: Option<SessionIndex>,
173 drain_started: Option<Instant>,
175 pending_new_session: Option<NoiseSession>,
177 pending_our_index: Option<SessionIndex>,
179 pending_their_index: Option<SessionIndex>,
181 rekey_in_progress: bool,
183 last_peer_rekey: Option<Instant>,
185 rekey_handshake: Option<NoiseHandshakeState>,
187 rekey_our_index: Option<SessionIndex>,
189 rekey_msg1: Option<Vec<u8>>,
191 rekey_msg1_next_resend: u64,
193
194 #[cfg(any(target_os = "linux", target_os = "macos"))]
210 connected_udp:
211 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
212
213 #[cfg(any(target_os = "linux", target_os = "macos"))]
220 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
221}
222
223impl ActivePeer {
224 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
229 let now = Instant::now();
230 Self {
231 identity,
232 link_id,
233 connectivity: ConnectivityState::Connected,
234 noise_session: None,
235 our_index: None,
236 their_index: None,
237 transport_id: None,
238 current_addr: None,
239 declaration: None,
240 ancestry: None,
241 tree_announce_min_interval_ms: 500,
242 last_tree_announce_sent_ms: 0,
243 pending_tree_announce: false,
244 inbound_filter: None,
245 filter_sequence: 0,
246 filter_received_at: 0,
247 pending_filter_update: true, session_start: now,
249 link_stats: LinkStats::new(),
250 authenticated_at,
251 last_seen: authenticated_at,
252 remote_epoch: None,
253 mmp: None,
254 last_heartbeat_sent: None,
255 handshake_msg2: None,
256 replay_suppressed_count: 0,
257 consecutive_decrypt_failures: 0,
258 session_established_at: now,
259 rekey_jitter_secs: draw_rekey_jitter(),
260 current_k_bit: false,
261 previous_session: None,
262 previous_our_index: None,
263 drain_started: None,
264 pending_new_session: None,
265 pending_our_index: None,
266 pending_their_index: None,
267 rekey_in_progress: false,
268 last_peer_rekey: None,
269 rekey_handshake: None,
270 rekey_our_index: None,
271 rekey_msg1: None,
272 rekey_msg1_next_resend: 0,
273 #[cfg(any(target_os = "linux", target_os = "macos"))]
274 connected_udp: None,
275 #[cfg(any(target_os = "linux", target_os = "macos"))]
276 peer_recv_drain: None,
277 }
278 }
279
280 pub fn with_stats(
285 identity: PeerIdentity,
286 link_id: LinkId,
287 authenticated_at: u64,
288 link_stats: LinkStats,
289 ) -> Self {
290 let mut peer = Self::new(identity, link_id, authenticated_at);
291 peer.link_stats = link_stats;
292 peer
293 }
294
295 #[allow(clippy::too_many_arguments)]
300 pub fn with_session(
301 identity: PeerIdentity,
302 link_id: LinkId,
303 authenticated_at: u64,
304 noise_session: NoiseSession,
305 our_index: SessionIndex,
306 their_index: SessionIndex,
307 transport_id: TransportId,
308 current_addr: TransportAddr,
309 link_stats: LinkStats,
310 is_initiator: bool,
311 mmp_config: &MmpConfig,
312 remote_epoch: Option<[u8; 8]>,
313 ) -> Self {
314 let now = Instant::now();
315 Self {
316 identity,
317 link_id,
318 connectivity: ConnectivityState::Connected,
319 noise_session: Some(noise_session),
320 our_index: Some(our_index),
321 their_index: Some(their_index),
322 transport_id: Some(transport_id),
323 current_addr: Some(current_addr),
324 declaration: None,
325 ancestry: None,
326 tree_announce_min_interval_ms: 500,
327 last_tree_announce_sent_ms: 0,
328 pending_tree_announce: false,
329 inbound_filter: None,
330 filter_sequence: 0,
331 filter_received_at: 0,
332 pending_filter_update: true,
333 session_start: now,
334 link_stats,
335 authenticated_at,
336 last_seen: authenticated_at,
337 remote_epoch,
338 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
339 last_heartbeat_sent: None,
340 handshake_msg2: None,
341 replay_suppressed_count: 0,
342 consecutive_decrypt_failures: 0,
343 session_established_at: now,
344 rekey_jitter_secs: draw_rekey_jitter(),
345 current_k_bit: false,
346 previous_session: None,
347 previous_our_index: None,
348 drain_started: None,
349 pending_new_session: None,
350 pending_our_index: None,
351 pending_their_index: None,
352 rekey_in_progress: false,
353 last_peer_rekey: None,
354 rekey_handshake: None,
355 rekey_our_index: None,
356 rekey_msg1: None,
357 rekey_msg1_next_resend: 0,
358 #[cfg(any(target_os = "linux", target_os = "macos"))]
359 connected_udp: None,
360 #[cfg(any(target_os = "linux", target_os = "macos"))]
361 peer_recv_drain: None,
362 }
363 }
364
365 #[cfg(any(target_os = "linux", target_os = "macos"))]
370 pub(crate) fn connected_udp(
371 &self,
372 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
373 self.connected_udp.clone()
374 }
375
376 #[cfg(any(target_os = "linux", target_os = "macos"))]
390 pub(crate) fn set_connected_udp(
391 &mut self,
392 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
393 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
394 ) {
395 self.peer_recv_drain = None;
399 self.connected_udp = None;
400 self.connected_udp = Some(socket);
401 self.peer_recv_drain = Some(drain);
402 }
403
404 #[cfg(any(target_os = "linux", target_os = "macos"))]
409 pub(crate) fn clear_connected_udp(&mut self) {
410 self.peer_recv_drain = None;
411 self.connected_udp = None;
412 }
413
414 pub fn identity(&self) -> &PeerIdentity {
418 &self.identity
419 }
420
421 pub fn node_addr(&self) -> &NodeAddr {
423 self.identity.node_addr()
424 }
425
426 pub fn address(&self) -> &FipsAddress {
428 self.identity.address()
429 }
430
431 pub fn pubkey(&self) -> XOnlyPublicKey {
433 self.identity.pubkey()
434 }
435
436 pub fn npub(&self) -> String {
438 self.identity.npub()
439 }
440
441 pub fn link_id(&self) -> LinkId {
445 self.link_id
446 }
447
448 pub fn connectivity(&self) -> ConnectivityState {
450 self.connectivity
451 }
452
453 pub fn can_send(&self) -> bool {
455 self.connectivity.can_send()
456 }
457
458 pub fn is_healthy(&self) -> bool {
460 self.connectivity.is_healthy()
461 }
462
463 pub fn is_disconnected(&self) -> bool {
465 self.connectivity.is_terminal()
466 }
467
468 pub fn has_session(&self) -> bool {
472 self.noise_session.is_some()
473 }
474
475 pub fn noise_session(&self) -> Option<&NoiseSession> {
477 self.noise_session.as_ref()
478 }
479
480 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
482 self.noise_session.as_mut()
483 }
484
485 pub fn our_index(&self) -> Option<SessionIndex> {
487 self.our_index
488 }
489
490 pub fn their_index(&self) -> Option<SessionIndex> {
492 self.their_index
493 }
494
495 pub fn set_their_index(&mut self, index: SessionIndex) {
499 self.their_index = Some(index);
500 }
501
502 pub fn replace_session(
512 &mut self,
513 new_session: NoiseSession,
514 new_our_index: SessionIndex,
515 new_their_index: SessionIndex,
516 ) -> Option<SessionIndex> {
517 self.reset_replay_suppressed();
518 let old_our_index = self.our_index;
519 self.noise_session = Some(new_session);
520 self.our_index = Some(new_our_index);
521 self.their_index = Some(new_their_index);
522 old_our_index
523 }
524
525 pub fn transport_id(&self) -> Option<TransportId> {
527 self.transport_id
528 }
529
530 pub fn current_addr(&self) -> Option<&TransportAddr> {
532 self.current_addr.as_ref()
533 }
534
535 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
556 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
557 return false;
558 }
559 self.transport_id = Some(transport_id);
560 self.current_addr = Some(addr.clone());
561 true
562 }
563
564 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
568 self.handshake_msg2 = Some(msg2);
569 }
570
571 pub fn handshake_msg2(&self) -> Option<&[u8]> {
573 self.handshake_msg2.as_deref()
574 }
575
576 pub fn clear_handshake_msg2(&mut self) {
578 self.handshake_msg2 = None;
579 }
580
581 pub fn increment_replay_suppressed(&mut self) -> u32 {
585 self.replay_suppressed_count += 1;
586 self.replay_suppressed_count
587 }
588
589 pub fn reset_replay_suppressed(&mut self) -> u32 {
591 let count = self.replay_suppressed_count;
592 self.replay_suppressed_count = 0;
593 count
594 }
595
596 pub fn replay_suppressed_count(&self) -> u32 {
598 self.replay_suppressed_count
599 }
600
601 pub fn increment_decrypt_failures(&mut self) -> u32 {
605 self.consecutive_decrypt_failures += 1;
606 self.consecutive_decrypt_failures
607 }
608
609 pub fn reset_decrypt_failures(&mut self) {
611 self.consecutive_decrypt_failures = 0;
612 }
613
614 pub fn consecutive_decrypt_failures(&self) -> u32 {
616 self.consecutive_decrypt_failures
617 }
618
619 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
623 self.remote_epoch
624 }
625
626 pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
630 self.remote_epoch = remote_epoch;
631 }
632
633 pub fn coords(&self) -> Option<&TreeCoordinate> {
637 self.ancestry.as_ref()
638 }
639
640 pub fn declaration(&self) -> Option<&ParentDeclaration> {
642 self.declaration.as_ref()
643 }
644
645 pub fn has_tree_position(&self) -> bool {
647 self.declaration.is_some() && self.ancestry.is_some()
648 }
649
650 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
654 self.inbound_filter.as_ref()
655 }
656
657 pub fn filter_sequence(&self) -> u64 {
659 self.filter_sequence
660 }
661
662 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
664 if self.filter_received_at == 0 {
665 return true;
666 }
667 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
668 }
669
670 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
672 match &self.inbound_filter {
673 Some(filter) => filter.contains(node_addr),
674 None => false,
675 }
676 }
677
678 pub fn needs_filter_update(&self) -> bool {
680 self.pending_filter_update
681 }
682
683 pub fn link_stats(&self) -> &LinkStats {
687 &self.link_stats
688 }
689
690 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
692 &mut self.link_stats
693 }
694
695 pub fn mmp(&self) -> Option<&MmpPeerState> {
699 self.mmp.as_ref()
700 }
701
702 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
704 self.mmp.as_mut()
705 }
706
707 pub fn link_cost(&self) -> f64 {
715 match self.mmp() {
716 Some(mmp) => {
717 let etx = mmp.metrics.etx;
718 match mmp.metrics.srtt_ms() {
719 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
720 None => 1.0,
721 }
722 }
723 None => 1.0,
724 }
725 }
726
727 pub fn has_srtt(&self) -> bool {
729 self.mmp()
730 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
731 }
732
733 pub fn authenticated_at(&self) -> u64 {
735 self.authenticated_at
736 }
737
738 pub fn last_seen(&self) -> u64 {
740 self.last_seen
741 }
742
743 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
745 current_time_ms.saturating_sub(self.last_seen)
746 }
747
748 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
750 current_time_ms.saturating_sub(self.authenticated_at)
751 }
752
753 pub fn session_elapsed_ms(&self) -> u32 {
758 self.session_start.elapsed().as_millis() as u32
759 }
760
761 pub fn session_start(&self) -> Instant {
763 self.session_start
764 }
765
766 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
770 self.last_heartbeat_sent
771 }
772
773 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
775 self.last_heartbeat_sent = Some(now);
776 }
777
778 pub fn touch(&mut self, current_time_ms: u64) {
782 self.last_seen = current_time_ms;
783 if self.connectivity == ConnectivityState::Stale {
785 self.connectivity = ConnectivityState::Connected;
786 }
787 }
788
789 pub fn mark_stale(&mut self) {
791 if self.connectivity == ConnectivityState::Connected {
792 self.connectivity = ConnectivityState::Stale;
793 }
794 }
795
796 pub fn mark_reconnecting(&mut self) {
798 self.connectivity = ConnectivityState::Reconnecting;
799 }
800
801 pub fn mark_disconnected(&mut self) {
803 self.connectivity = ConnectivityState::Disconnected;
804 }
805
806 pub fn mark_connected(&mut self, current_time_ms: u64) {
808 self.connectivity = ConnectivityState::Connected;
809 self.last_seen = current_time_ms;
810 }
811
812 pub fn set_link_id(&mut self, link_id: LinkId) {
814 self.link_id = link_id;
815 }
816
817 pub fn update_tree_position(
821 &mut self,
822 declaration: ParentDeclaration,
823 ancestry: TreeCoordinate,
824 current_time_ms: u64,
825 ) {
826 self.declaration = Some(declaration);
827 self.ancestry = Some(ancestry);
828 self.last_seen = current_time_ms;
829 }
830
831 pub fn clear_tree_position(&mut self) {
833 self.declaration = None;
834 self.ancestry = None;
835 }
836
837 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
841 self.tree_announce_min_interval_ms = ms;
842 }
843
844 pub fn last_tree_announce_sent_ms(&self) -> u64 {
846 self.last_tree_announce_sent_ms
847 }
848
849 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
851 self.last_tree_announce_sent_ms = ms;
852 }
853
854 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
856 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
857 }
858
859 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
861 self.last_tree_announce_sent_ms = now_ms;
862 self.pending_tree_announce = false;
863 }
864
865 pub fn mark_tree_announce_pending(&mut self) {
867 self.pending_tree_announce = true;
868 }
869
870 pub fn has_pending_tree_announce(&self) -> bool {
872 self.pending_tree_announce
873 }
874
875 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
879 self.inbound_filter = Some(filter);
880 self.filter_sequence = sequence;
881 self.filter_received_at = current_time_ms;
882 self.last_seen = current_time_ms;
883 }
884
885 pub fn clear_filter(&mut self) {
887 self.inbound_filter = None;
888 self.filter_sequence = 0;
889 self.filter_received_at = 0;
890 }
891
892 pub fn mark_filter_update_needed(&mut self) {
894 self.pending_filter_update = true;
895 }
896
897 pub fn clear_filter_update_needed(&mut self) {
899 self.pending_filter_update = false;
900 }
901
902 pub fn session_established_at(&self) -> Instant {
906 self.session_established_at
907 }
908
909 #[cfg(test)]
910 pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
911 self.session_established_at = instant;
912 }
913
914 pub fn rekey_jitter_secs(&self) -> i64 {
916 self.rekey_jitter_secs
917 }
918
919 pub fn current_k_bit(&self) -> bool {
921 self.current_k_bit
922 }
923
924 pub fn rekey_in_progress(&self) -> bool {
926 self.rekey_in_progress
927 }
928
929 pub fn set_rekey_in_progress(&mut self) {
931 self.rekey_in_progress = true;
932 }
933
934 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
936 match self.last_peer_rekey {
937 Some(t) => t.elapsed().as_secs() < dampening_secs,
938 None => false,
939 }
940 }
941
942 pub fn record_peer_rekey(&mut self) {
944 self.last_peer_rekey = Some(Instant::now());
945 }
946
947 pub fn pending_our_index(&self) -> Option<SessionIndex> {
949 self.pending_our_index
950 }
951
952 pub fn pending_their_index(&self) -> Option<SessionIndex> {
954 self.pending_their_index
955 }
956
957 pub fn previous_our_index(&self) -> Option<SessionIndex> {
959 self.previous_our_index
960 }
961
962 pub fn previous_session(&self) -> Option<&NoiseSession> {
964 self.previous_session.as_ref()
965 }
966
967 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
969 self.previous_session.as_mut()
970 }
971
972 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
974 self.pending_new_session.as_ref()
975 }
976
977 pub fn set_pending_session(
982 &mut self,
983 session: NoiseSession,
984 our_index: SessionIndex,
985 their_index: SessionIndex,
986 ) {
987 self.pending_new_session = Some(session);
988 self.pending_our_index = Some(our_index);
989 self.pending_their_index = Some(their_index);
990 self.rekey_in_progress = false;
991 self.rekey_our_index = None;
993 self.rekey_handshake = None;
994 self.rekey_msg1 = None;
995 self.rekey_msg1_next_resend = 0;
996 }
997
998 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1004 let new_session = self.pending_new_session.take()?;
1005 let new_our_index = self.pending_our_index.take();
1006 let new_their_index = self.pending_their_index.take();
1007
1008 self.previous_session = self.noise_session.take();
1010 self.previous_our_index = self.our_index;
1011 self.drain_started = Some(Instant::now());
1012
1013 self.noise_session = Some(new_session);
1015 self.our_index = new_our_index;
1016 self.their_index = new_their_index;
1017
1018 self.current_k_bit = !self.current_k_bit;
1020 self.session_established_at = Instant::now();
1021 self.session_start = Instant::now();
1022 self.rekey_in_progress = false;
1023 self.rekey_jitter_secs = draw_rekey_jitter();
1024 self.reset_replay_suppressed();
1025
1026 let now = Instant::now();
1028 if let Some(mmp) = &mut self.mmp {
1029 mmp.reset_for_rekey(now);
1030 }
1031
1032 self.previous_our_index
1033 }
1034
1035 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1040 let new_session = self.pending_new_session.take()?;
1041 let new_our_index = self.pending_our_index.take();
1042 let new_their_index = self.pending_their_index.take();
1043
1044 self.previous_session = self.noise_session.take();
1046 self.previous_our_index = self.our_index;
1047 self.drain_started = Some(Instant::now());
1048
1049 self.noise_session = Some(new_session);
1051 self.our_index = new_our_index;
1052 self.their_index = new_their_index;
1053
1054 self.current_k_bit = !self.current_k_bit;
1056 self.session_established_at = Instant::now();
1057 self.session_start = Instant::now();
1058 self.rekey_in_progress = false;
1059 self.rekey_jitter_secs = draw_rekey_jitter();
1060 self.reset_replay_suppressed();
1061
1062 let now = Instant::now();
1064 if let Some(mmp) = &mut self.mmp {
1065 mmp.reset_for_rekey(now);
1066 }
1067
1068 self.previous_our_index
1069 }
1070
1071 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1073 match self.drain_started {
1074 Some(t) => t.elapsed().as_secs() >= drain_secs,
1075 None => false,
1076 }
1077 }
1078
1079 pub fn is_draining(&self) -> bool {
1081 self.drain_started.is_some()
1082 }
1083
1084 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1089 self.previous_session = None;
1090 self.drain_started = None;
1091 self.previous_our_index.take()
1092 }
1093
1094 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1100 self.rekey_handshake = None;
1101 self.rekey_msg1 = None;
1102 self.rekey_msg1_next_resend = 0;
1103 self.rekey_in_progress = false;
1104 self.rekey_our_index.take().or_else(|| {
1106 self.pending_new_session = None;
1107 self.pending_their_index = None;
1108 self.pending_our_index.take()
1109 })
1110 }
1111
1112 pub fn set_rekey_state(
1116 &mut self,
1117 handshake: NoiseHandshakeState,
1118 our_index: SessionIndex,
1119 wire_msg1: Vec<u8>,
1120 next_resend_ms: u64,
1121 ) {
1122 self.rekey_handshake = Some(handshake);
1123 self.rekey_our_index = Some(our_index);
1124 self.rekey_msg1 = Some(wire_msg1);
1125 self.rekey_msg1_next_resend = next_resend_ms;
1126 self.rekey_in_progress = true;
1127 }
1128
1129 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1131 self.rekey_our_index
1132 }
1133
1134 pub fn complete_rekey_msg2(
1140 &mut self,
1141 msg2_bytes: &[u8],
1142 ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1143 let mut hs = self
1144 .rekey_handshake
1145 .take()
1146 .ok_or_else(|| NoiseError::WrongState {
1147 expected: "rekey handshake in progress".to_string(),
1148 got: "no handshake state".to_string(),
1149 })?;
1150
1151 hs.read_message_2(msg2_bytes)?;
1152 let remote_epoch = hs.remote_epoch();
1153 let session = hs.into_session()?;
1154
1155 self.rekey_msg1 = None;
1157 self.rekey_msg1_next_resend = 0;
1158
1159 Ok((session, remote_epoch))
1160 }
1161
1162 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1164 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1165 }
1166
1167 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1169 self.rekey_msg1.as_deref()
1170 }
1171
1172 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1174 self.rekey_msg1_next_resend = next_ms;
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181 use crate::Identity;
1182
1183 fn make_peer_identity() -> PeerIdentity {
1184 let identity = Identity::generate();
1185 PeerIdentity::from_pubkey(identity.pubkey())
1186 }
1187
1188 fn make_node_addr(val: u8) -> NodeAddr {
1189 let mut bytes = [0u8; 16];
1190 bytes[0] = val;
1191 NodeAddr::from_bytes(bytes)
1192 }
1193
1194 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1195 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1196 }
1197
1198 #[test]
1199 fn test_connectivity_state_properties() {
1200 assert!(ConnectivityState::Connected.can_send());
1201 assert!(ConnectivityState::Stale.can_send());
1202 assert!(!ConnectivityState::Reconnecting.can_send());
1203 assert!(!ConnectivityState::Disconnected.can_send());
1204
1205 assert!(ConnectivityState::Connected.is_healthy());
1206 assert!(!ConnectivityState::Stale.is_healthy());
1207
1208 assert!(ConnectivityState::Disconnected.is_terminal());
1209 assert!(!ConnectivityState::Connected.is_terminal());
1210 }
1211
1212 #[test]
1213 fn test_active_peer_creation() {
1214 let identity = make_peer_identity();
1215 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1216
1217 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1218 assert_eq!(peer.link_id(), LinkId::new(1));
1219 assert!(peer.is_healthy());
1220 assert!(peer.can_send());
1221 assert_eq!(peer.authenticated_at(), 1000);
1222 assert!(peer.needs_filter_update()); }
1224
1225 #[test]
1226 fn test_connectivity_transitions() {
1227 let identity = make_peer_identity();
1228 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1229
1230 assert!(peer.is_healthy());
1231
1232 peer.mark_stale();
1233 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1234 assert!(peer.can_send()); peer.touch(2000);
1238 assert!(peer.is_healthy());
1239
1240 peer.mark_reconnecting();
1241 assert!(!peer.can_send());
1242
1243 peer.mark_connected(3000);
1244 assert!(peer.is_healthy());
1245
1246 peer.mark_disconnected();
1247 assert!(peer.is_disconnected());
1248 assert!(!peer.can_send());
1249 }
1250
1251 #[test]
1252 fn test_tree_position() {
1253 let identity = make_peer_identity();
1254 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1255
1256 assert!(!peer.has_tree_position());
1257 assert!(peer.coords().is_none());
1258
1259 let node = make_node_addr(1);
1260 let parent = make_node_addr(2);
1261 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1262 let coords = make_coords(&[1, 2, 0]);
1263
1264 peer.update_tree_position(decl, coords, 2000);
1265
1266 assert!(peer.has_tree_position());
1267 assert!(peer.coords().is_some());
1268 assert_eq!(peer.last_seen(), 2000);
1269 }
1270
1271 #[test]
1272 fn test_bloom_filter() {
1273 let identity = make_peer_identity();
1274 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1275 let target = make_node_addr(42);
1276
1277 assert!(!peer.may_reach(&target));
1278 assert!(peer.filter_is_stale(2000, 500));
1279
1280 let mut filter = BloomFilter::new();
1281 filter.insert(&target);
1282 peer.update_filter(filter, 1, 1500);
1283
1284 assert!(peer.may_reach(&target));
1285 assert!(!peer.filter_is_stale(1800, 500));
1286 assert!(peer.filter_is_stale(2500, 500));
1287 }
1288
1289 #[test]
1290 fn test_timing() {
1291 let identity = make_peer_identity();
1292 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1293
1294 assert_eq!(peer.connection_duration(2000), 1000);
1295 assert_eq!(peer.idle_time(2000), 1000);
1296 }
1297
1298 #[test]
1299 fn test_filter_update_flag() {
1300 let identity = make_peer_identity();
1301 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1302
1303 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1306 assert!(!peer.needs_filter_update());
1307
1308 peer.mark_filter_update_needed();
1309 assert!(peer.needs_filter_update());
1310 }
1311
1312 #[test]
1313 fn test_with_stats() {
1314 let identity = make_peer_identity();
1315 let mut stats = LinkStats::new();
1316 stats.record_sent(100);
1317 stats.record_recv(200, 500);
1318
1319 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1320
1321 assert_eq!(peer.link_stats().packets_sent, 1);
1322 assert_eq!(peer.link_stats().packets_recv, 1);
1323 }
1324
1325 #[test]
1326 fn test_replay_suppression_counter() {
1327 let identity = make_peer_identity();
1328 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1329
1330 assert_eq!(peer.replay_suppressed_count(), 0);
1332
1333 assert_eq!(peer.increment_replay_suppressed(), 1);
1335 assert_eq!(peer.increment_replay_suppressed(), 2);
1336 assert_eq!(peer.increment_replay_suppressed(), 3);
1337 assert_eq!(peer.replay_suppressed_count(), 3);
1338
1339 assert_eq!(peer.reset_replay_suppressed(), 3);
1341 assert_eq!(peer.replay_suppressed_count(), 0);
1342
1343 assert_eq!(peer.increment_replay_suppressed(), 1);
1345 assert_eq!(peer.replay_suppressed_count(), 1);
1346
1347 peer.reset_replay_suppressed();
1349 assert_eq!(peer.reset_replay_suppressed(), 0);
1350 }
1351
1352 #[test]
1353 fn test_increment_decrypt_failures_monotonic() {
1354 let identity = make_peer_identity();
1355 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1356
1357 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1359
1360 let mut prev = 0u32;
1362 for expected in 1..=25u32 {
1363 let count = peer.increment_decrypt_failures();
1364 assert_eq!(count, expected, "increment must return monotonic count");
1365 assert!(count > prev, "count must strictly increase");
1366 assert_eq!(peer.consecutive_decrypt_failures(), count);
1367 prev = count;
1368 }
1369 }
1370
1371 #[test]
1372 fn test_reset_decrypt_failures_zeroes_counter() {
1373 let identity = make_peer_identity();
1374 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1375
1376 for _ in 0..7 {
1378 peer.increment_decrypt_failures();
1379 }
1380 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1381
1382 peer.reset_decrypt_failures();
1384 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1385
1386 peer.reset_decrypt_failures();
1388 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1389
1390 assert_eq!(peer.increment_decrypt_failures(), 1);
1392 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1393 }
1394
1395 #[test]
1396 fn test_rekey_jitter_in_range() {
1397 for _ in 0..100 {
1398 let identity = make_peer_identity();
1399 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1400 let jitter = peer.rekey_jitter_secs();
1401 assert!(
1402 (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1403 "jitter {} outside [-{}, +{}]",
1404 jitter,
1405 REKEY_JITTER_SECS,
1406 REKEY_JITTER_SECS
1407 );
1408 }
1409 }
1410
1411 #[test]
1412 fn test_rekey_jitter_mean_near_zero() {
1413 let mut sum = 0i64;
1414 let n = 200i64;
1415
1416 for _ in 0..n {
1417 let identity = make_peer_identity();
1418 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1419 sum += peer.rekey_jitter_secs();
1420 }
1421
1422 let mean = sum / n;
1423 assert!(
1424 mean.abs() < 5,
1425 "empirical mean {} not within 5 of 0 over {} samples",
1426 mean,
1427 n
1428 );
1429 }
1430}