1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
9use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
10use crate::tree::{ParentDeclaration, TreeCoordinate};
11use crate::utils::index::SessionIndex;
12use crate::{FipsAddress, NodeAddr, PeerIdentity};
13use secp256k1::XOnlyPublicKey;
14use std::fmt;
15use std::time::Instant;
16
17#[derive(Clone, Copy, Debug, PartialEq, Eq)]
21pub enum ConnectivityState {
22 Connected,
24 Stale,
26 Reconnecting,
28 Disconnected,
30}
31
32impl ConnectivityState {
33 pub fn can_send(&self) -> bool {
35 matches!(
36 self,
37 ConnectivityState::Connected | ConnectivityState::Stale
38 )
39 }
40
41 pub fn is_terminal(&self) -> bool {
43 matches!(self, ConnectivityState::Disconnected)
44 }
45
46 pub fn is_healthy(&self) -> bool {
48 matches!(self, ConnectivityState::Connected)
49 }
50}
51
52impl fmt::Display for ConnectivityState {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 let s = match self {
55 ConnectivityState::Connected => "connected",
56 ConnectivityState::Stale => "stale",
57 ConnectivityState::Reconnecting => "reconnecting",
58 ConnectivityState::Disconnected => "disconnected",
59 };
60 write!(f, "{}", s)
61 }
62}
63
64#[derive(Debug)]
73pub struct ActivePeer {
74 identity: PeerIdentity,
77
78 link_id: LinkId,
81 connectivity: ConnectivityState,
83
84 noise_session: Option<NoiseSession>,
87 our_index: Option<SessionIndex>,
89 their_index: Option<SessionIndex>,
91 transport_id: Option<TransportId>,
93 current_addr: Option<TransportAddr>,
95
96 declaration: Option<ParentDeclaration>,
99 ancestry: Option<TreeCoordinate>,
101
102 tree_announce_min_interval_ms: u64,
105 last_tree_announce_sent_ms: u64,
107 pending_tree_announce: bool,
109
110 inbound_filter: Option<BloomFilter>,
113 filter_sequence: u64,
115 filter_received_at: u64,
117 pending_filter_update: bool,
119
120 session_start: Instant,
124
125 link_stats: LinkStats,
128 authenticated_at: u64,
130 last_seen: u64,
132
133 remote_epoch: Option<[u8; 8]>,
136
137 mmp: Option<MmpPeerState>,
140
141 last_heartbeat_sent: Option<Instant>,
144
145 handshake_msg2: Option<Vec<u8>>,
149
150 replay_suppressed_count: u32,
153 consecutive_decrypt_failures: u32,
155
156 session_established_at: Instant,
159 current_k_bit: bool,
161 previous_session: Option<NoiseSession>,
163 previous_our_index: Option<SessionIndex>,
165 drain_started: Option<Instant>,
167 pending_new_session: Option<NoiseSession>,
169 pending_our_index: Option<SessionIndex>,
171 pending_their_index: Option<SessionIndex>,
173 rekey_in_progress: bool,
175 last_peer_rekey: Option<Instant>,
177 rekey_handshake: Option<NoiseHandshakeState>,
179 rekey_our_index: Option<SessionIndex>,
181 rekey_msg1: Option<Vec<u8>>,
183 rekey_msg1_next_resend: u64,
185
186 #[cfg(any(target_os = "linux", target_os = "macos"))]
202 connected_udp:
203 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
204
205 #[cfg(any(target_os = "linux", target_os = "macos"))]
212 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
213}
214
215impl ActivePeer {
216 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
221 let now = Instant::now();
222 Self {
223 identity,
224 link_id,
225 connectivity: ConnectivityState::Connected,
226 noise_session: None,
227 our_index: None,
228 their_index: None,
229 transport_id: None,
230 current_addr: None,
231 declaration: None,
232 ancestry: None,
233 tree_announce_min_interval_ms: 500,
234 last_tree_announce_sent_ms: 0,
235 pending_tree_announce: false,
236 inbound_filter: None,
237 filter_sequence: 0,
238 filter_received_at: 0,
239 pending_filter_update: true, session_start: now,
241 link_stats: LinkStats::new(),
242 authenticated_at,
243 last_seen: authenticated_at,
244 remote_epoch: None,
245 mmp: None,
246 last_heartbeat_sent: None,
247 handshake_msg2: None,
248 replay_suppressed_count: 0,
249 consecutive_decrypt_failures: 0,
250 session_established_at: now,
251 current_k_bit: false,
252 previous_session: None,
253 previous_our_index: None,
254 drain_started: None,
255 pending_new_session: None,
256 pending_our_index: None,
257 pending_their_index: None,
258 rekey_in_progress: false,
259 last_peer_rekey: None,
260 rekey_handshake: None,
261 rekey_our_index: None,
262 rekey_msg1: None,
263 rekey_msg1_next_resend: 0,
264 #[cfg(any(target_os = "linux", target_os = "macos"))]
265 connected_udp: None,
266 #[cfg(any(target_os = "linux", target_os = "macos"))]
267 peer_recv_drain: None,
268 }
269 }
270
271 pub fn with_stats(
276 identity: PeerIdentity,
277 link_id: LinkId,
278 authenticated_at: u64,
279 link_stats: LinkStats,
280 ) -> Self {
281 let mut peer = Self::new(identity, link_id, authenticated_at);
282 peer.link_stats = link_stats;
283 peer
284 }
285
286 #[allow(clippy::too_many_arguments)]
291 pub fn with_session(
292 identity: PeerIdentity,
293 link_id: LinkId,
294 authenticated_at: u64,
295 noise_session: NoiseSession,
296 our_index: SessionIndex,
297 their_index: SessionIndex,
298 transport_id: TransportId,
299 current_addr: TransportAddr,
300 link_stats: LinkStats,
301 is_initiator: bool,
302 mmp_config: &MmpConfig,
303 remote_epoch: Option<[u8; 8]>,
304 ) -> Self {
305 let now = Instant::now();
306 Self {
307 identity,
308 link_id,
309 connectivity: ConnectivityState::Connected,
310 noise_session: Some(noise_session),
311 our_index: Some(our_index),
312 their_index: Some(their_index),
313 transport_id: Some(transport_id),
314 current_addr: Some(current_addr),
315 declaration: None,
316 ancestry: None,
317 tree_announce_min_interval_ms: 500,
318 last_tree_announce_sent_ms: 0,
319 pending_tree_announce: false,
320 inbound_filter: None,
321 filter_sequence: 0,
322 filter_received_at: 0,
323 pending_filter_update: true,
324 session_start: now,
325 link_stats,
326 authenticated_at,
327 last_seen: authenticated_at,
328 remote_epoch,
329 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
330 last_heartbeat_sent: None,
331 handshake_msg2: None,
332 replay_suppressed_count: 0,
333 consecutive_decrypt_failures: 0,
334 session_established_at: now,
335 current_k_bit: false,
336 previous_session: None,
337 previous_our_index: None,
338 drain_started: None,
339 pending_new_session: None,
340 pending_our_index: None,
341 pending_their_index: None,
342 rekey_in_progress: false,
343 last_peer_rekey: None,
344 rekey_handshake: None,
345 rekey_our_index: None,
346 rekey_msg1: None,
347 rekey_msg1_next_resend: 0,
348 #[cfg(any(target_os = "linux", target_os = "macos"))]
349 connected_udp: None,
350 #[cfg(any(target_os = "linux", target_os = "macos"))]
351 peer_recv_drain: None,
352 }
353 }
354
355 #[cfg(any(target_os = "linux", target_os = "macos"))]
360 pub(crate) fn connected_udp(
361 &self,
362 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
363 self.connected_udp.clone()
364 }
365
366 #[cfg(any(target_os = "linux", target_os = "macos"))]
380 pub(crate) fn set_connected_udp(
381 &mut self,
382 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
383 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
384 ) {
385 self.peer_recv_drain = None;
389 self.connected_udp = None;
390 self.connected_udp = Some(socket);
391 self.peer_recv_drain = Some(drain);
392 }
393
394 #[cfg(any(target_os = "linux", target_os = "macos"))]
399 pub(crate) fn clear_connected_udp(&mut self) {
400 self.peer_recv_drain = None;
401 self.connected_udp = None;
402 }
403
404 pub fn identity(&self) -> &PeerIdentity {
408 &self.identity
409 }
410
411 pub fn node_addr(&self) -> &NodeAddr {
413 self.identity.node_addr()
414 }
415
416 pub fn address(&self) -> &FipsAddress {
418 self.identity.address()
419 }
420
421 pub fn pubkey(&self) -> XOnlyPublicKey {
423 self.identity.pubkey()
424 }
425
426 pub fn npub(&self) -> String {
428 self.identity.npub()
429 }
430
431 pub fn link_id(&self) -> LinkId {
435 self.link_id
436 }
437
438 pub fn connectivity(&self) -> ConnectivityState {
440 self.connectivity
441 }
442
443 pub fn can_send(&self) -> bool {
445 self.connectivity.can_send()
446 }
447
448 pub fn is_healthy(&self) -> bool {
450 self.connectivity.is_healthy()
451 }
452
453 pub fn is_disconnected(&self) -> bool {
455 self.connectivity.is_terminal()
456 }
457
458 pub fn has_session(&self) -> bool {
462 self.noise_session.is_some()
463 }
464
465 pub fn noise_session(&self) -> Option<&NoiseSession> {
467 self.noise_session.as_ref()
468 }
469
470 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
472 self.noise_session.as_mut()
473 }
474
475 pub fn our_index(&self) -> Option<SessionIndex> {
477 self.our_index
478 }
479
480 pub fn their_index(&self) -> Option<SessionIndex> {
482 self.their_index
483 }
484
485 pub fn set_their_index(&mut self, index: SessionIndex) {
489 self.their_index = Some(index);
490 }
491
492 pub fn replace_session(
502 &mut self,
503 new_session: NoiseSession,
504 new_our_index: SessionIndex,
505 new_their_index: SessionIndex,
506 ) -> Option<SessionIndex> {
507 self.reset_replay_suppressed();
508 let old_our_index = self.our_index;
509 self.noise_session = Some(new_session);
510 self.our_index = Some(new_our_index);
511 self.their_index = Some(new_their_index);
512 old_our_index
513 }
514
515 pub fn transport_id(&self) -> Option<TransportId> {
517 self.transport_id
518 }
519
520 pub fn current_addr(&self) -> Option<&TransportAddr> {
522 self.current_addr.as_ref()
523 }
524
525 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
546 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
547 return false;
548 }
549 self.transport_id = Some(transport_id);
550 self.current_addr = Some(addr.clone());
551 true
552 }
553
554 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
558 self.handshake_msg2 = Some(msg2);
559 }
560
561 pub fn handshake_msg2(&self) -> Option<&[u8]> {
563 self.handshake_msg2.as_deref()
564 }
565
566 pub fn clear_handshake_msg2(&mut self) {
568 self.handshake_msg2 = None;
569 }
570
571 pub fn increment_replay_suppressed(&mut self) -> u32 {
575 self.replay_suppressed_count += 1;
576 self.replay_suppressed_count
577 }
578
579 pub fn reset_replay_suppressed(&mut self) -> u32 {
581 let count = self.replay_suppressed_count;
582 self.replay_suppressed_count = 0;
583 count
584 }
585
586 pub fn replay_suppressed_count(&self) -> u32 {
588 self.replay_suppressed_count
589 }
590
591 pub fn increment_decrypt_failures(&mut self) -> u32 {
595 self.consecutive_decrypt_failures += 1;
596 self.consecutive_decrypt_failures
597 }
598
599 pub fn reset_decrypt_failures(&mut self) {
601 self.consecutive_decrypt_failures = 0;
602 }
603
604 pub fn consecutive_decrypt_failures(&self) -> u32 {
606 self.consecutive_decrypt_failures
607 }
608
609 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
613 self.remote_epoch
614 }
615
616 pub fn coords(&self) -> Option<&TreeCoordinate> {
620 self.ancestry.as_ref()
621 }
622
623 pub fn declaration(&self) -> Option<&ParentDeclaration> {
625 self.declaration.as_ref()
626 }
627
628 pub fn has_tree_position(&self) -> bool {
630 self.declaration.is_some() && self.ancestry.is_some()
631 }
632
633 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
637 self.inbound_filter.as_ref()
638 }
639
640 pub fn filter_sequence(&self) -> u64 {
642 self.filter_sequence
643 }
644
645 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
647 if self.filter_received_at == 0 {
648 return true;
649 }
650 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
651 }
652
653 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
655 match &self.inbound_filter {
656 Some(filter) => filter.contains(node_addr),
657 None => false,
658 }
659 }
660
661 pub fn needs_filter_update(&self) -> bool {
663 self.pending_filter_update
664 }
665
666 pub fn link_stats(&self) -> &LinkStats {
670 &self.link_stats
671 }
672
673 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
675 &mut self.link_stats
676 }
677
678 pub fn mmp(&self) -> Option<&MmpPeerState> {
682 self.mmp.as_ref()
683 }
684
685 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
687 self.mmp.as_mut()
688 }
689
690 pub fn link_cost(&self) -> f64 {
698 match self.mmp() {
699 Some(mmp) => {
700 let etx = mmp.metrics.etx;
701 match mmp.metrics.srtt_ms() {
702 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
703 None => 1.0,
704 }
705 }
706 None => 1.0,
707 }
708 }
709
710 pub fn has_srtt(&self) -> bool {
712 self.mmp()
713 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
714 }
715
716 pub fn authenticated_at(&self) -> u64 {
718 self.authenticated_at
719 }
720
721 pub fn last_seen(&self) -> u64 {
723 self.last_seen
724 }
725
726 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
728 current_time_ms.saturating_sub(self.last_seen)
729 }
730
731 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
733 current_time_ms.saturating_sub(self.authenticated_at)
734 }
735
736 pub fn session_elapsed_ms(&self) -> u32 {
741 self.session_start.elapsed().as_millis() as u32
742 }
743
744 pub fn session_start(&self) -> Instant {
746 self.session_start
747 }
748
749 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
753 self.last_heartbeat_sent
754 }
755
756 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
758 self.last_heartbeat_sent = Some(now);
759 }
760
761 pub fn touch(&mut self, current_time_ms: u64) {
765 self.last_seen = current_time_ms;
766 if self.connectivity == ConnectivityState::Stale {
768 self.connectivity = ConnectivityState::Connected;
769 }
770 }
771
772 pub fn mark_stale(&mut self) {
774 if self.connectivity == ConnectivityState::Connected {
775 self.connectivity = ConnectivityState::Stale;
776 }
777 }
778
779 pub fn mark_reconnecting(&mut self) {
781 self.connectivity = ConnectivityState::Reconnecting;
782 }
783
784 pub fn mark_disconnected(&mut self) {
786 self.connectivity = ConnectivityState::Disconnected;
787 }
788
789 pub fn mark_connected(&mut self, current_time_ms: u64) {
791 self.connectivity = ConnectivityState::Connected;
792 self.last_seen = current_time_ms;
793 }
794
795 pub fn set_link_id(&mut self, link_id: LinkId) {
797 self.link_id = link_id;
798 }
799
800 pub fn update_tree_position(
804 &mut self,
805 declaration: ParentDeclaration,
806 ancestry: TreeCoordinate,
807 current_time_ms: u64,
808 ) {
809 self.declaration = Some(declaration);
810 self.ancestry = Some(ancestry);
811 self.last_seen = current_time_ms;
812 }
813
814 pub fn clear_tree_position(&mut self) {
816 self.declaration = None;
817 self.ancestry = None;
818 }
819
820 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
824 self.tree_announce_min_interval_ms = ms;
825 }
826
827 pub fn last_tree_announce_sent_ms(&self) -> u64 {
829 self.last_tree_announce_sent_ms
830 }
831
832 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
834 self.last_tree_announce_sent_ms = ms;
835 }
836
837 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
839 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
840 }
841
842 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
844 self.last_tree_announce_sent_ms = now_ms;
845 self.pending_tree_announce = false;
846 }
847
848 pub fn mark_tree_announce_pending(&mut self) {
850 self.pending_tree_announce = true;
851 }
852
853 pub fn has_pending_tree_announce(&self) -> bool {
855 self.pending_tree_announce
856 }
857
858 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
862 self.inbound_filter = Some(filter);
863 self.filter_sequence = sequence;
864 self.filter_received_at = current_time_ms;
865 self.last_seen = current_time_ms;
866 }
867
868 pub fn clear_filter(&mut self) {
870 self.inbound_filter = None;
871 self.filter_sequence = 0;
872 self.filter_received_at = 0;
873 }
874
875 pub fn mark_filter_update_needed(&mut self) {
877 self.pending_filter_update = true;
878 }
879
880 pub fn clear_filter_update_needed(&mut self) {
882 self.pending_filter_update = false;
883 }
884
885 pub fn session_established_at(&self) -> Instant {
889 self.session_established_at
890 }
891
892 pub fn current_k_bit(&self) -> bool {
894 self.current_k_bit
895 }
896
897 pub fn rekey_in_progress(&self) -> bool {
899 self.rekey_in_progress
900 }
901
902 pub fn set_rekey_in_progress(&mut self) {
904 self.rekey_in_progress = true;
905 }
906
907 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
909 match self.last_peer_rekey {
910 Some(t) => t.elapsed().as_secs() < dampening_secs,
911 None => false,
912 }
913 }
914
915 pub fn record_peer_rekey(&mut self) {
917 self.last_peer_rekey = Some(Instant::now());
918 }
919
920 pub fn pending_our_index(&self) -> Option<SessionIndex> {
922 self.pending_our_index
923 }
924
925 pub fn pending_their_index(&self) -> Option<SessionIndex> {
927 self.pending_their_index
928 }
929
930 pub fn previous_our_index(&self) -> Option<SessionIndex> {
932 self.previous_our_index
933 }
934
935 pub fn previous_session(&self) -> Option<&NoiseSession> {
937 self.previous_session.as_ref()
938 }
939
940 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
942 self.previous_session.as_mut()
943 }
944
945 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
947 self.pending_new_session.as_ref()
948 }
949
950 pub fn set_pending_session(
955 &mut self,
956 session: NoiseSession,
957 our_index: SessionIndex,
958 their_index: SessionIndex,
959 ) {
960 self.pending_new_session = Some(session);
961 self.pending_our_index = Some(our_index);
962 self.pending_their_index = Some(their_index);
963 self.rekey_in_progress = false;
964 self.rekey_our_index = None;
966 self.rekey_handshake = None;
967 self.rekey_msg1 = None;
968 self.rekey_msg1_next_resend = 0;
969 }
970
971 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
977 let new_session = self.pending_new_session.take()?;
978 let new_our_index = self.pending_our_index.take();
979 let new_their_index = self.pending_their_index.take();
980
981 self.previous_session = self.noise_session.take();
983 self.previous_our_index = self.our_index;
984 self.drain_started = Some(Instant::now());
985
986 self.noise_session = Some(new_session);
988 self.our_index = new_our_index;
989 self.their_index = new_their_index;
990
991 self.current_k_bit = !self.current_k_bit;
993 self.session_established_at = Instant::now();
994 self.session_start = Instant::now();
995 self.rekey_in_progress = false;
996 self.reset_replay_suppressed();
997
998 let now = Instant::now();
1000 if let Some(mmp) = &mut self.mmp {
1001 mmp.reset_for_rekey(now);
1002 }
1003
1004 self.previous_our_index
1005 }
1006
1007 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1012 let new_session = self.pending_new_session.take()?;
1013 let new_our_index = self.pending_our_index.take();
1014 let new_their_index = self.pending_their_index.take();
1015
1016 self.previous_session = self.noise_session.take();
1018 self.previous_our_index = self.our_index;
1019 self.drain_started = Some(Instant::now());
1020
1021 self.noise_session = Some(new_session);
1023 self.our_index = new_our_index;
1024 self.their_index = new_their_index;
1025
1026 self.current_k_bit = !self.current_k_bit;
1028 self.session_established_at = Instant::now();
1029 self.session_start = Instant::now();
1030 self.rekey_in_progress = false;
1031 self.reset_replay_suppressed();
1032
1033 let now = Instant::now();
1035 if let Some(mmp) = &mut self.mmp {
1036 mmp.reset_for_rekey(now);
1037 }
1038
1039 self.previous_our_index
1040 }
1041
1042 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1044 match self.drain_started {
1045 Some(t) => t.elapsed().as_secs() >= drain_secs,
1046 None => false,
1047 }
1048 }
1049
1050 pub fn is_draining(&self) -> bool {
1052 self.drain_started.is_some()
1053 }
1054
1055 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1060 self.previous_session = None;
1061 self.drain_started = None;
1062 self.previous_our_index.take()
1063 }
1064
1065 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1071 self.rekey_handshake = None;
1072 self.rekey_msg1 = None;
1073 self.rekey_msg1_next_resend = 0;
1074 self.rekey_in_progress = false;
1075 self.rekey_our_index.take().or_else(|| {
1077 self.pending_new_session = None;
1078 self.pending_their_index = None;
1079 self.pending_our_index.take()
1080 })
1081 }
1082
1083 pub fn set_rekey_state(
1087 &mut self,
1088 handshake: NoiseHandshakeState,
1089 our_index: SessionIndex,
1090 wire_msg1: Vec<u8>,
1091 next_resend_ms: u64,
1092 ) {
1093 self.rekey_handshake = Some(handshake);
1094 self.rekey_our_index = Some(our_index);
1095 self.rekey_msg1 = Some(wire_msg1);
1096 self.rekey_msg1_next_resend = next_resend_ms;
1097 self.rekey_in_progress = true;
1098 }
1099
1100 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1102 self.rekey_our_index
1103 }
1104
1105 pub fn complete_rekey_msg2(&mut self, msg2_bytes: &[u8]) -> Result<NoiseSession, NoiseError> {
1111 let mut hs = self
1112 .rekey_handshake
1113 .take()
1114 .ok_or_else(|| NoiseError::WrongState {
1115 expected: "rekey handshake in progress".to_string(),
1116 got: "no handshake state".to_string(),
1117 })?;
1118
1119 hs.read_message_2(msg2_bytes)?;
1120 let session = hs.into_session()?;
1121
1122 self.rekey_msg1 = None;
1124 self.rekey_msg1_next_resend = 0;
1125
1126 Ok(session)
1127 }
1128
1129 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1131 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1132 }
1133
1134 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1136 self.rekey_msg1.as_deref()
1137 }
1138
1139 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1141 self.rekey_msg1_next_resend = next_ms;
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::Identity;
1149
1150 fn make_peer_identity() -> PeerIdentity {
1151 let identity = Identity::generate();
1152 PeerIdentity::from_pubkey(identity.pubkey())
1153 }
1154
1155 fn make_node_addr(val: u8) -> NodeAddr {
1156 let mut bytes = [0u8; 16];
1157 bytes[0] = val;
1158 NodeAddr::from_bytes(bytes)
1159 }
1160
1161 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1162 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1163 }
1164
1165 #[test]
1166 fn test_connectivity_state_properties() {
1167 assert!(ConnectivityState::Connected.can_send());
1168 assert!(ConnectivityState::Stale.can_send());
1169 assert!(!ConnectivityState::Reconnecting.can_send());
1170 assert!(!ConnectivityState::Disconnected.can_send());
1171
1172 assert!(ConnectivityState::Connected.is_healthy());
1173 assert!(!ConnectivityState::Stale.is_healthy());
1174
1175 assert!(ConnectivityState::Disconnected.is_terminal());
1176 assert!(!ConnectivityState::Connected.is_terminal());
1177 }
1178
1179 #[test]
1180 fn test_active_peer_creation() {
1181 let identity = make_peer_identity();
1182 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1183
1184 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1185 assert_eq!(peer.link_id(), LinkId::new(1));
1186 assert!(peer.is_healthy());
1187 assert!(peer.can_send());
1188 assert_eq!(peer.authenticated_at(), 1000);
1189 assert!(peer.needs_filter_update()); }
1191
1192 #[test]
1193 fn test_connectivity_transitions() {
1194 let identity = make_peer_identity();
1195 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1196
1197 assert!(peer.is_healthy());
1198
1199 peer.mark_stale();
1200 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1201 assert!(peer.can_send()); peer.touch(2000);
1205 assert!(peer.is_healthy());
1206
1207 peer.mark_reconnecting();
1208 assert!(!peer.can_send());
1209
1210 peer.mark_connected(3000);
1211 assert!(peer.is_healthy());
1212
1213 peer.mark_disconnected();
1214 assert!(peer.is_disconnected());
1215 assert!(!peer.can_send());
1216 }
1217
1218 #[test]
1219 fn test_tree_position() {
1220 let identity = make_peer_identity();
1221 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1222
1223 assert!(!peer.has_tree_position());
1224 assert!(peer.coords().is_none());
1225
1226 let node = make_node_addr(1);
1227 let parent = make_node_addr(2);
1228 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1229 let coords = make_coords(&[1, 2, 0]);
1230
1231 peer.update_tree_position(decl, coords, 2000);
1232
1233 assert!(peer.has_tree_position());
1234 assert!(peer.coords().is_some());
1235 assert_eq!(peer.last_seen(), 2000);
1236 }
1237
1238 #[test]
1239 fn test_bloom_filter() {
1240 let identity = make_peer_identity();
1241 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1242 let target = make_node_addr(42);
1243
1244 assert!(!peer.may_reach(&target));
1245 assert!(peer.filter_is_stale(2000, 500));
1246
1247 let mut filter = BloomFilter::new();
1248 filter.insert(&target);
1249 peer.update_filter(filter, 1, 1500);
1250
1251 assert!(peer.may_reach(&target));
1252 assert!(!peer.filter_is_stale(1800, 500));
1253 assert!(peer.filter_is_stale(2500, 500));
1254 }
1255
1256 #[test]
1257 fn test_timing() {
1258 let identity = make_peer_identity();
1259 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1260
1261 assert_eq!(peer.connection_duration(2000), 1000);
1262 assert_eq!(peer.idle_time(2000), 1000);
1263 }
1264
1265 #[test]
1266 fn test_filter_update_flag() {
1267 let identity = make_peer_identity();
1268 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1269
1270 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1273 assert!(!peer.needs_filter_update());
1274
1275 peer.mark_filter_update_needed();
1276 assert!(peer.needs_filter_update());
1277 }
1278
1279 #[test]
1280 fn test_with_stats() {
1281 let identity = make_peer_identity();
1282 let mut stats = LinkStats::new();
1283 stats.record_sent(100);
1284 stats.record_recv(200, 500);
1285
1286 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1287
1288 assert_eq!(peer.link_stats().packets_sent, 1);
1289 assert_eq!(peer.link_stats().packets_recv, 1);
1290 }
1291
1292 #[test]
1293 fn test_replay_suppression_counter() {
1294 let identity = make_peer_identity();
1295 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1296
1297 assert_eq!(peer.replay_suppressed_count(), 0);
1299
1300 assert_eq!(peer.increment_replay_suppressed(), 1);
1302 assert_eq!(peer.increment_replay_suppressed(), 2);
1303 assert_eq!(peer.increment_replay_suppressed(), 3);
1304 assert_eq!(peer.replay_suppressed_count(), 3);
1305
1306 assert_eq!(peer.reset_replay_suppressed(), 3);
1308 assert_eq!(peer.replay_suppressed_count(), 0);
1309
1310 assert_eq!(peer.increment_replay_suppressed(), 1);
1312 assert_eq!(peer.replay_suppressed_count(), 1);
1313
1314 peer.reset_replay_suppressed();
1316 assert_eq!(peer.reset_replay_suppressed(), 0);
1317 }
1318
1319 #[test]
1320 fn test_increment_decrypt_failures_monotonic() {
1321 let identity = make_peer_identity();
1322 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1323
1324 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1326
1327 let mut prev = 0u32;
1329 for expected in 1..=25u32 {
1330 let count = peer.increment_decrypt_failures();
1331 assert_eq!(count, expected, "increment must return monotonic count");
1332 assert!(count > prev, "count must strictly increase");
1333 assert_eq!(peer.consecutive_decrypt_failures(), count);
1334 prev = count;
1335 }
1336 }
1337
1338 #[test]
1339 fn test_reset_decrypt_failures_zeroes_counter() {
1340 let identity = make_peer_identity();
1341 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1342
1343 for _ in 0..7 {
1345 peer.increment_decrypt_failures();
1346 }
1347 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1348
1349 peer.reset_decrypt_failures();
1351 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1352
1353 peer.reset_decrypt_failures();
1355 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1356
1357 assert_eq!(peer.increment_decrypt_failures(), 1);
1359 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1360 }
1361}