1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20 rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28 Connected,
30 Stale,
32 Reconnecting,
34 Disconnected,
36}
37
38impl ConnectivityState {
39 pub fn can_send(&self) -> bool {
41 matches!(
42 self,
43 ConnectivityState::Connected | ConnectivityState::Stale
44 )
45 }
46
47 pub fn is_terminal(&self) -> bool {
49 matches!(self, ConnectivityState::Disconnected)
50 }
51
52 pub fn is_healthy(&self) -> bool {
54 matches!(self, ConnectivityState::Connected)
55 }
56}
57
58impl fmt::Display for ConnectivityState {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 let s = match self {
61 ConnectivityState::Connected => "connected",
62 ConnectivityState::Stale => "stale",
63 ConnectivityState::Reconnecting => "reconnecting",
64 ConnectivityState::Disconnected => "disconnected",
65 };
66 write!(f, "{}", s)
67 }
68}
69
70#[derive(Debug)]
79pub struct ActivePeer {
80 identity: PeerIdentity,
83
84 link_id: LinkId,
87 connectivity: ConnectivityState,
89
90 noise_session: Option<NoiseSession>,
93 our_index: Option<SessionIndex>,
95 their_index: Option<SessionIndex>,
97 transport_id: Option<TransportId>,
99 current_addr: Option<TransportAddr>,
101
102 declaration: Option<ParentDeclaration>,
105 ancestry: Option<TreeCoordinate>,
107
108 tree_announce_min_interval_ms: u64,
111 last_tree_announce_sent_ms: u64,
113 pending_tree_announce: bool,
115
116 inbound_filter: Option<BloomFilter>,
119 filter_sequence: u64,
121 filter_received_at: u64,
123 pending_filter_update: bool,
125
126 session_start: Instant,
130
131 link_stats: LinkStats,
134 authenticated_at: u64,
136 last_seen: u64,
138
139 remote_epoch: Option<[u8; 8]>,
142
143 mmp: Option<MmpPeerState>,
146
147 last_heartbeat_sent: Option<Instant>,
150
151 handshake_msg2: Option<Vec<u8>>,
155
156 replay_suppressed_count: u32,
159 consecutive_decrypt_failures: u32,
161
162 session_established_at: Instant,
165 rekey_jitter_secs: i64,
167 current_k_bit: bool,
169 previous_session: Option<NoiseSession>,
171 previous_our_index: Option<SessionIndex>,
173 drain_started: Option<Instant>,
175 pending_new_session: Option<NoiseSession>,
177 pending_our_index: Option<SessionIndex>,
179 pending_their_index: Option<SessionIndex>,
181 pending_rekey_initiator: bool,
183 pending_rekey_completed_at: Option<Instant>,
185 rekey_in_progress: bool,
187 last_peer_rekey: Option<Instant>,
189 rekey_handshake: Option<NoiseHandshakeState>,
191 rekey_our_index: Option<SessionIndex>,
193 rekey_msg1: Option<Vec<u8>>,
195 rekey_msg1_next_resend: u64,
197
198 #[cfg(any(target_os = "linux", target_os = "macos"))]
214 connected_udp:
215 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
216
217 #[cfg(any(target_os = "linux", target_os = "macos"))]
224 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
225}
226
227impl ActivePeer {
228 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
233 let now = Instant::now();
234 Self {
235 identity,
236 link_id,
237 connectivity: ConnectivityState::Connected,
238 noise_session: None,
239 our_index: None,
240 their_index: None,
241 transport_id: None,
242 current_addr: None,
243 declaration: None,
244 ancestry: None,
245 tree_announce_min_interval_ms: 500,
246 last_tree_announce_sent_ms: 0,
247 pending_tree_announce: false,
248 inbound_filter: None,
249 filter_sequence: 0,
250 filter_received_at: 0,
251 pending_filter_update: true, session_start: now,
253 link_stats: LinkStats::new(),
254 authenticated_at,
255 last_seen: authenticated_at,
256 remote_epoch: None,
257 mmp: None,
258 last_heartbeat_sent: None,
259 handshake_msg2: None,
260 replay_suppressed_count: 0,
261 consecutive_decrypt_failures: 0,
262 session_established_at: now,
263 rekey_jitter_secs: draw_rekey_jitter(),
264 current_k_bit: false,
265 previous_session: None,
266 previous_our_index: None,
267 drain_started: None,
268 pending_new_session: None,
269 pending_our_index: None,
270 pending_their_index: None,
271 pending_rekey_initiator: false,
272 pending_rekey_completed_at: None,
273 rekey_in_progress: false,
274 last_peer_rekey: None,
275 rekey_handshake: None,
276 rekey_our_index: None,
277 rekey_msg1: None,
278 rekey_msg1_next_resend: 0,
279 #[cfg(any(target_os = "linux", target_os = "macos"))]
280 connected_udp: None,
281 #[cfg(any(target_os = "linux", target_os = "macos"))]
282 peer_recv_drain: None,
283 }
284 }
285
286 pub fn with_stats(
291 identity: PeerIdentity,
292 link_id: LinkId,
293 authenticated_at: u64,
294 link_stats: LinkStats,
295 ) -> Self {
296 let mut peer = Self::new(identity, link_id, authenticated_at);
297 peer.link_stats = link_stats;
298 peer
299 }
300
301 #[allow(clippy::too_many_arguments)]
306 pub fn with_session(
307 identity: PeerIdentity,
308 link_id: LinkId,
309 authenticated_at: u64,
310 noise_session: NoiseSession,
311 our_index: SessionIndex,
312 their_index: SessionIndex,
313 transport_id: TransportId,
314 current_addr: TransportAddr,
315 link_stats: LinkStats,
316 is_initiator: bool,
317 mmp_config: &MmpConfig,
318 remote_epoch: Option<[u8; 8]>,
319 ) -> Self {
320 let now = Instant::now();
321 Self {
322 identity,
323 link_id,
324 connectivity: ConnectivityState::Connected,
325 noise_session: Some(noise_session),
326 our_index: Some(our_index),
327 their_index: Some(their_index),
328 transport_id: Some(transport_id),
329 current_addr: Some(current_addr),
330 declaration: None,
331 ancestry: None,
332 tree_announce_min_interval_ms: 500,
333 last_tree_announce_sent_ms: 0,
334 pending_tree_announce: false,
335 inbound_filter: None,
336 filter_sequence: 0,
337 filter_received_at: 0,
338 pending_filter_update: true,
339 session_start: now,
340 link_stats,
341 authenticated_at,
342 last_seen: authenticated_at,
343 remote_epoch,
344 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
345 last_heartbeat_sent: None,
346 handshake_msg2: None,
347 replay_suppressed_count: 0,
348 consecutive_decrypt_failures: 0,
349 session_established_at: now,
350 rekey_jitter_secs: draw_rekey_jitter(),
351 current_k_bit: false,
352 previous_session: None,
353 previous_our_index: None,
354 drain_started: None,
355 pending_new_session: None,
356 pending_our_index: None,
357 pending_their_index: None,
358 pending_rekey_initiator: false,
359 pending_rekey_completed_at: None,
360 rekey_in_progress: false,
361 last_peer_rekey: None,
362 rekey_handshake: None,
363 rekey_our_index: None,
364 rekey_msg1: None,
365 rekey_msg1_next_resend: 0,
366 #[cfg(any(target_os = "linux", target_os = "macos"))]
367 connected_udp: None,
368 #[cfg(any(target_os = "linux", target_os = "macos"))]
369 peer_recv_drain: None,
370 }
371 }
372
373 #[cfg(any(target_os = "linux", target_os = "macos"))]
378 pub(crate) fn connected_udp(
379 &self,
380 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
381 self.connected_udp.clone()
382 }
383
384 #[cfg(any(target_os = "linux", target_os = "macos"))]
398 pub(crate) fn set_connected_udp(
399 &mut self,
400 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
401 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
402 ) {
403 self.peer_recv_drain = None;
407 self.connected_udp = None;
408 self.connected_udp = Some(socket);
409 self.peer_recv_drain = Some(drain);
410 }
411
412 #[cfg(any(target_os = "linux", target_os = "macos"))]
417 pub(crate) fn clear_connected_udp(&mut self) {
418 self.peer_recv_drain = None;
419 self.connected_udp = None;
420 }
421
422 pub fn identity(&self) -> &PeerIdentity {
426 &self.identity
427 }
428
429 pub fn node_addr(&self) -> &NodeAddr {
431 self.identity.node_addr()
432 }
433
434 pub fn address(&self) -> &FipsAddress {
436 self.identity.address()
437 }
438
439 pub fn pubkey(&self) -> XOnlyPublicKey {
441 self.identity.pubkey()
442 }
443
444 pub fn npub(&self) -> String {
446 self.identity.npub()
447 }
448
449 pub fn link_id(&self) -> LinkId {
453 self.link_id
454 }
455
456 pub fn connectivity(&self) -> ConnectivityState {
458 self.connectivity
459 }
460
461 pub fn can_send(&self) -> bool {
463 self.connectivity.can_send()
464 }
465
466 pub fn is_healthy(&self) -> bool {
468 self.connectivity.is_healthy()
469 }
470
471 pub fn is_disconnected(&self) -> bool {
473 self.connectivity.is_terminal()
474 }
475
476 pub fn has_session(&self) -> bool {
480 self.noise_session.is_some()
481 }
482
483 pub fn noise_session(&self) -> Option<&NoiseSession> {
485 self.noise_session.as_ref()
486 }
487
488 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
490 self.noise_session.as_mut()
491 }
492
493 pub fn our_index(&self) -> Option<SessionIndex> {
495 self.our_index
496 }
497
498 pub fn their_index(&self) -> Option<SessionIndex> {
500 self.their_index
501 }
502
503 pub fn set_their_index(&mut self, index: SessionIndex) {
507 self.their_index = Some(index);
508 }
509
510 pub fn replace_session(
520 &mut self,
521 new_session: NoiseSession,
522 new_our_index: SessionIndex,
523 new_their_index: SessionIndex,
524 ) -> Option<SessionIndex> {
525 self.reset_replay_suppressed();
526 let old_our_index = self.our_index;
527 self.noise_session = Some(new_session);
528 self.our_index = Some(new_our_index);
529 self.their_index = Some(new_their_index);
530 old_our_index
531 }
532
533 pub fn transport_id(&self) -> Option<TransportId> {
535 self.transport_id
536 }
537
538 pub fn current_addr(&self) -> Option<&TransportAddr> {
540 self.current_addr.as_ref()
541 }
542
543 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
564 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
565 return false;
566 }
567 self.transport_id = Some(transport_id);
568 self.current_addr = Some(addr.clone());
569 true
570 }
571
572 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
576 self.handshake_msg2 = Some(msg2);
577 }
578
579 pub fn handshake_msg2(&self) -> Option<&[u8]> {
581 self.handshake_msg2.as_deref()
582 }
583
584 pub fn clear_handshake_msg2(&mut self) {
586 self.handshake_msg2 = None;
587 }
588
589 pub fn increment_replay_suppressed(&mut self) -> u32 {
593 self.replay_suppressed_count += 1;
594 self.replay_suppressed_count
595 }
596
597 pub fn reset_replay_suppressed(&mut self) -> u32 {
599 let count = self.replay_suppressed_count;
600 self.replay_suppressed_count = 0;
601 count
602 }
603
604 pub fn replay_suppressed_count(&self) -> u32 {
606 self.replay_suppressed_count
607 }
608
609 pub fn increment_decrypt_failures(&mut self) -> u32 {
613 self.consecutive_decrypt_failures += 1;
614 self.consecutive_decrypt_failures
615 }
616
617 pub fn reset_decrypt_failures(&mut self) {
619 self.consecutive_decrypt_failures = 0;
620 }
621
622 pub fn consecutive_decrypt_failures(&self) -> u32 {
624 self.consecutive_decrypt_failures
625 }
626
627 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
631 self.remote_epoch
632 }
633
634 pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
638 self.remote_epoch = remote_epoch;
639 }
640
641 pub fn coords(&self) -> Option<&TreeCoordinate> {
645 self.ancestry.as_ref()
646 }
647
648 pub fn declaration(&self) -> Option<&ParentDeclaration> {
650 self.declaration.as_ref()
651 }
652
653 pub fn has_tree_position(&self) -> bool {
655 self.declaration.is_some() && self.ancestry.is_some()
656 }
657
658 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
662 self.inbound_filter.as_ref()
663 }
664
665 pub fn filter_sequence(&self) -> u64 {
667 self.filter_sequence
668 }
669
670 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
672 if self.filter_received_at == 0 {
673 return true;
674 }
675 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
676 }
677
678 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
680 match &self.inbound_filter {
681 Some(filter) => filter.contains(node_addr),
682 None => false,
683 }
684 }
685
686 pub fn needs_filter_update(&self) -> bool {
688 self.pending_filter_update
689 }
690
691 pub fn link_stats(&self) -> &LinkStats {
695 &self.link_stats
696 }
697
698 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
700 &mut self.link_stats
701 }
702
703 pub fn mmp(&self) -> Option<&MmpPeerState> {
707 self.mmp.as_ref()
708 }
709
710 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
712 self.mmp.as_mut()
713 }
714
715 pub fn link_cost(&self) -> f64 {
723 match self.mmp() {
724 Some(mmp) => {
725 let etx = mmp.metrics.etx;
726 match mmp.metrics.srtt_ms() {
727 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
728 None => 1.0,
729 }
730 }
731 None => 1.0,
732 }
733 }
734
735 pub fn has_srtt(&self) -> bool {
737 self.mmp()
738 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
739 }
740
741 pub fn authenticated_at(&self) -> u64 {
743 self.authenticated_at
744 }
745
746 pub fn last_seen(&self) -> u64 {
748 self.last_seen
749 }
750
751 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
753 current_time_ms.saturating_sub(self.last_seen)
754 }
755
756 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
758 current_time_ms.saturating_sub(self.authenticated_at)
759 }
760
761 pub fn session_elapsed_ms(&self) -> u32 {
766 self.session_start.elapsed().as_millis() as u32
767 }
768
769 pub fn session_start(&self) -> Instant {
771 self.session_start
772 }
773
774 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
778 self.last_heartbeat_sent
779 }
780
781 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
783 self.last_heartbeat_sent = Some(now);
784 }
785
786 pub fn touch(&mut self, current_time_ms: u64) {
790 self.last_seen = current_time_ms;
791 if self.connectivity == ConnectivityState::Stale {
793 self.connectivity = ConnectivityState::Connected;
794 }
795 }
796
797 pub fn mark_stale(&mut self) {
799 if self.connectivity == ConnectivityState::Connected {
800 self.connectivity = ConnectivityState::Stale;
801 }
802 }
803
804 pub fn mark_reconnecting(&mut self) {
806 self.connectivity = ConnectivityState::Reconnecting;
807 }
808
809 pub fn mark_disconnected(&mut self) {
811 self.connectivity = ConnectivityState::Disconnected;
812 }
813
814 pub fn mark_connected(&mut self, current_time_ms: u64) {
816 self.connectivity = ConnectivityState::Connected;
817 self.last_seen = current_time_ms;
818 }
819
820 pub fn set_link_id(&mut self, link_id: LinkId) {
822 self.link_id = link_id;
823 }
824
825 pub fn update_tree_position(
829 &mut self,
830 declaration: ParentDeclaration,
831 ancestry: TreeCoordinate,
832 current_time_ms: u64,
833 ) {
834 self.declaration = Some(declaration);
835 self.ancestry = Some(ancestry);
836 self.last_seen = current_time_ms;
837 }
838
839 pub fn clear_tree_position(&mut self) {
841 self.declaration = None;
842 self.ancestry = None;
843 }
844
845 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
849 self.tree_announce_min_interval_ms = ms;
850 }
851
852 pub fn last_tree_announce_sent_ms(&self) -> u64 {
854 self.last_tree_announce_sent_ms
855 }
856
857 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
859 self.last_tree_announce_sent_ms = ms;
860 }
861
862 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
864 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
865 }
866
867 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
869 self.last_tree_announce_sent_ms = now_ms;
870 self.pending_tree_announce = false;
871 }
872
873 pub fn mark_tree_announce_pending(&mut self) {
875 self.pending_tree_announce = true;
876 }
877
878 pub fn has_pending_tree_announce(&self) -> bool {
880 self.pending_tree_announce
881 }
882
883 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
887 self.inbound_filter = Some(filter);
888 self.filter_sequence = sequence;
889 self.filter_received_at = current_time_ms;
890 self.last_seen = current_time_ms;
891 }
892
893 pub fn clear_filter(&mut self) {
895 self.inbound_filter = None;
896 self.filter_sequence = 0;
897 self.filter_received_at = 0;
898 }
899
900 pub fn mark_filter_update_needed(&mut self) {
902 self.pending_filter_update = true;
903 }
904
905 pub fn clear_filter_update_needed(&mut self) {
907 self.pending_filter_update = false;
908 }
909
910 pub fn session_established_at(&self) -> Instant {
914 self.session_established_at
915 }
916
917 #[cfg(test)]
918 pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
919 self.session_established_at = instant;
920 }
921
922 pub fn rekey_jitter_secs(&self) -> i64 {
924 self.rekey_jitter_secs
925 }
926
927 pub fn current_k_bit(&self) -> bool {
929 self.current_k_bit
930 }
931
932 pub fn rekey_in_progress(&self) -> bool {
934 self.rekey_in_progress
935 }
936
937 pub fn set_rekey_in_progress(&mut self) {
939 self.rekey_in_progress = true;
940 }
941
942 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
944 match self.last_peer_rekey {
945 Some(t) => t.elapsed().as_secs() < dampening_secs,
946 None => false,
947 }
948 }
949
950 pub fn record_peer_rekey(&mut self) {
952 self.last_peer_rekey = Some(Instant::now());
953 }
954
955 pub fn pending_our_index(&self) -> Option<SessionIndex> {
957 self.pending_our_index
958 }
959
960 pub fn pending_their_index(&self) -> Option<SessionIndex> {
962 self.pending_their_index
963 }
964
965 pub fn previous_our_index(&self) -> Option<SessionIndex> {
967 self.previous_our_index
968 }
969
970 pub fn previous_session(&self) -> Option<&NoiseSession> {
972 self.previous_session.as_ref()
973 }
974
975 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
977 self.previous_session.as_mut()
978 }
979
980 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
982 self.pending_new_session.as_ref()
983 }
984
985 pub fn pending_rekey_initiator(&self) -> bool {
987 self.pending_rekey_initiator
988 }
989
990 pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
993 self.pending_rekey_initiator
994 && self
995 .pending_rekey_completed_at
996 .is_some_and(|completed| completed.elapsed() >= delay)
997 }
998
999 pub fn set_pending_session(
1005 &mut self,
1006 session: NoiseSession,
1007 our_index: SessionIndex,
1008 their_index: SessionIndex,
1009 initiated_by_local: bool,
1010 ) {
1011 self.pending_new_session = Some(session);
1012 self.pending_our_index = Some(our_index);
1013 self.pending_their_index = Some(their_index);
1014 self.pending_rekey_initiator = initiated_by_local;
1015 self.pending_rekey_completed_at = Some(Instant::now());
1016 self.rekey_in_progress = false;
1017 self.rekey_our_index = None;
1019 self.rekey_handshake = None;
1020 self.rekey_msg1 = None;
1021 self.rekey_msg1_next_resend = 0;
1022 }
1023
1024 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1030 let new_session = self.pending_new_session.take()?;
1031 let new_our_index = self.pending_our_index.take();
1032 let new_their_index = self.pending_their_index.take();
1033
1034 self.previous_session = self.noise_session.take();
1036 self.previous_our_index = self.our_index;
1037 self.drain_started = Some(Instant::now());
1038
1039 self.noise_session = Some(new_session);
1041 self.our_index = new_our_index;
1042 self.their_index = new_their_index;
1043 self.pending_rekey_initiator = false;
1044 self.pending_rekey_completed_at = None;
1045
1046 self.current_k_bit = !self.current_k_bit;
1048 self.session_established_at = Instant::now();
1049 self.session_start = Instant::now();
1050 self.rekey_in_progress = false;
1051 self.rekey_jitter_secs = draw_rekey_jitter();
1052 self.last_heartbeat_sent = None;
1053 self.reset_replay_suppressed();
1054
1055 let now = Instant::now();
1057 if let Some(mmp) = &mut self.mmp {
1058 mmp.reset_for_rekey(now);
1059 }
1060
1061 self.previous_our_index
1062 }
1063
1064 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1069 let new_session = self.pending_new_session.take()?;
1070 let new_our_index = self.pending_our_index.take();
1071 let new_their_index = self.pending_their_index.take();
1072
1073 self.previous_session = self.noise_session.take();
1075 self.previous_our_index = self.our_index;
1076 self.drain_started = Some(Instant::now());
1077
1078 self.noise_session = Some(new_session);
1080 self.our_index = new_our_index;
1081 self.their_index = new_their_index;
1082 self.pending_rekey_initiator = false;
1083 self.pending_rekey_completed_at = None;
1084
1085 self.current_k_bit = !self.current_k_bit;
1087 self.session_established_at = Instant::now();
1088 self.session_start = Instant::now();
1089 self.rekey_in_progress = false;
1090 self.rekey_jitter_secs = draw_rekey_jitter();
1091 self.last_heartbeat_sent = None;
1092 self.reset_replay_suppressed();
1093
1094 let now = Instant::now();
1096 if let Some(mmp) = &mut self.mmp {
1097 mmp.reset_for_rekey(now);
1098 }
1099
1100 self.previous_our_index
1101 }
1102
1103 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1105 match self.drain_started {
1106 Some(t) => t.elapsed().as_secs() >= drain_secs,
1107 None => false,
1108 }
1109 }
1110
1111 pub fn is_draining(&self) -> bool {
1113 self.drain_started.is_some()
1114 }
1115
1116 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1121 self.previous_session = None;
1122 self.drain_started = None;
1123 self.previous_our_index.take()
1124 }
1125
1126 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1132 self.rekey_handshake = None;
1133 self.rekey_msg1 = None;
1134 self.rekey_msg1_next_resend = 0;
1135 self.rekey_in_progress = false;
1136 self.rekey_our_index.take().or_else(|| {
1138 self.pending_new_session = None;
1139 self.pending_their_index = None;
1140 self.pending_rekey_initiator = false;
1141 self.pending_rekey_completed_at = None;
1142 self.pending_our_index.take()
1143 })
1144 }
1145
1146 pub fn set_rekey_state(
1150 &mut self,
1151 handshake: NoiseHandshakeState,
1152 our_index: SessionIndex,
1153 wire_msg1: Vec<u8>,
1154 next_resend_ms: u64,
1155 ) {
1156 self.rekey_handshake = Some(handshake);
1157 self.rekey_our_index = Some(our_index);
1158 self.rekey_msg1 = Some(wire_msg1);
1159 self.rekey_msg1_next_resend = next_resend_ms;
1160 self.rekey_in_progress = true;
1161 }
1162
1163 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1165 self.rekey_our_index
1166 }
1167
1168 pub fn complete_rekey_msg2(
1174 &mut self,
1175 msg2_bytes: &[u8],
1176 ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1177 let mut hs = self
1178 .rekey_handshake
1179 .take()
1180 .ok_or_else(|| NoiseError::WrongState {
1181 expected: "rekey handshake in progress".to_string(),
1182 got: "no handshake state".to_string(),
1183 })?;
1184
1185 hs.read_message_2(msg2_bytes)?;
1186 let remote_epoch = hs.remote_epoch();
1187 let session = hs.into_session()?;
1188
1189 self.rekey_msg1 = None;
1191 self.rekey_msg1_next_resend = 0;
1192
1193 Ok((session, remote_epoch))
1194 }
1195
1196 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1198 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1199 }
1200
1201 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1203 self.rekey_msg1.as_deref()
1204 }
1205
1206 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1208 self.rekey_msg1_next_resend = next_ms;
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215 use crate::Identity;
1216
1217 fn make_peer_identity() -> PeerIdentity {
1218 let identity = Identity::generate();
1219 PeerIdentity::from_pubkey(identity.pubkey())
1220 }
1221
1222 fn make_node_addr(val: u8) -> NodeAddr {
1223 let mut bytes = [0u8; 16];
1224 bytes[0] = val;
1225 NodeAddr::from_bytes(bytes)
1226 }
1227
1228 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1229 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1230 }
1231
1232 #[test]
1233 fn test_connectivity_state_properties() {
1234 assert!(ConnectivityState::Connected.can_send());
1235 assert!(ConnectivityState::Stale.can_send());
1236 assert!(!ConnectivityState::Reconnecting.can_send());
1237 assert!(!ConnectivityState::Disconnected.can_send());
1238
1239 assert!(ConnectivityState::Connected.is_healthy());
1240 assert!(!ConnectivityState::Stale.is_healthy());
1241
1242 assert!(ConnectivityState::Disconnected.is_terminal());
1243 assert!(!ConnectivityState::Connected.is_terminal());
1244 }
1245
1246 #[test]
1247 fn test_active_peer_creation() {
1248 let identity = make_peer_identity();
1249 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1250
1251 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1252 assert_eq!(peer.link_id(), LinkId::new(1));
1253 assert!(peer.is_healthy());
1254 assert!(peer.can_send());
1255 assert_eq!(peer.authenticated_at(), 1000);
1256 assert!(peer.needs_filter_update()); }
1258
1259 #[test]
1260 fn test_connectivity_transitions() {
1261 let identity = make_peer_identity();
1262 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1263
1264 assert!(peer.is_healthy());
1265
1266 peer.mark_stale();
1267 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1268 assert!(peer.can_send()); peer.touch(2000);
1272 assert!(peer.is_healthy());
1273
1274 peer.mark_reconnecting();
1275 assert!(!peer.can_send());
1276
1277 peer.mark_connected(3000);
1278 assert!(peer.is_healthy());
1279
1280 peer.mark_disconnected();
1281 assert!(peer.is_disconnected());
1282 assert!(!peer.can_send());
1283 }
1284
1285 #[test]
1286 fn test_tree_position() {
1287 let identity = make_peer_identity();
1288 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1289
1290 assert!(!peer.has_tree_position());
1291 assert!(peer.coords().is_none());
1292
1293 let node = make_node_addr(1);
1294 let parent = make_node_addr(2);
1295 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1296 let coords = make_coords(&[1, 2, 0]);
1297
1298 peer.update_tree_position(decl, coords, 2000);
1299
1300 assert!(peer.has_tree_position());
1301 assert!(peer.coords().is_some());
1302 assert_eq!(peer.last_seen(), 2000);
1303 }
1304
1305 #[test]
1306 fn test_bloom_filter() {
1307 let identity = make_peer_identity();
1308 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1309 let target = make_node_addr(42);
1310
1311 assert!(!peer.may_reach(&target));
1312 assert!(peer.filter_is_stale(2000, 500));
1313
1314 let mut filter = BloomFilter::new();
1315 filter.insert(&target);
1316 peer.update_filter(filter, 1, 1500);
1317
1318 assert!(peer.may_reach(&target));
1319 assert!(!peer.filter_is_stale(1800, 500));
1320 assert!(peer.filter_is_stale(2500, 500));
1321 }
1322
1323 #[test]
1324 fn test_timing() {
1325 let identity = make_peer_identity();
1326 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1327
1328 assert_eq!(peer.connection_duration(2000), 1000);
1329 assert_eq!(peer.idle_time(2000), 1000);
1330 }
1331
1332 #[test]
1333 fn test_filter_update_flag() {
1334 let identity = make_peer_identity();
1335 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1336
1337 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1340 assert!(!peer.needs_filter_update());
1341
1342 peer.mark_filter_update_needed();
1343 assert!(peer.needs_filter_update());
1344 }
1345
1346 #[test]
1347 fn test_with_stats() {
1348 let identity = make_peer_identity();
1349 let mut stats = LinkStats::new();
1350 stats.record_sent(100);
1351 stats.record_recv(200, 500);
1352
1353 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1354
1355 assert_eq!(peer.link_stats().packets_sent, 1);
1356 assert_eq!(peer.link_stats().packets_recv, 1);
1357 }
1358
1359 #[test]
1360 fn test_replay_suppression_counter() {
1361 let identity = make_peer_identity();
1362 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1363
1364 assert_eq!(peer.replay_suppressed_count(), 0);
1366
1367 assert_eq!(peer.increment_replay_suppressed(), 1);
1369 assert_eq!(peer.increment_replay_suppressed(), 2);
1370 assert_eq!(peer.increment_replay_suppressed(), 3);
1371 assert_eq!(peer.replay_suppressed_count(), 3);
1372
1373 assert_eq!(peer.reset_replay_suppressed(), 3);
1375 assert_eq!(peer.replay_suppressed_count(), 0);
1376
1377 assert_eq!(peer.increment_replay_suppressed(), 1);
1379 assert_eq!(peer.replay_suppressed_count(), 1);
1380
1381 peer.reset_replay_suppressed();
1383 assert_eq!(peer.reset_replay_suppressed(), 0);
1384 }
1385
1386 #[test]
1387 fn test_increment_decrypt_failures_monotonic() {
1388 let identity = make_peer_identity();
1389 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1390
1391 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1393
1394 let mut prev = 0u32;
1396 for expected in 1..=25u32 {
1397 let count = peer.increment_decrypt_failures();
1398 assert_eq!(count, expected, "increment must return monotonic count");
1399 assert!(count > prev, "count must strictly increase");
1400 assert_eq!(peer.consecutive_decrypt_failures(), count);
1401 prev = count;
1402 }
1403 }
1404
1405 #[test]
1406 fn test_reset_decrypt_failures_zeroes_counter() {
1407 let identity = make_peer_identity();
1408 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1409
1410 for _ in 0..7 {
1412 peer.increment_decrypt_failures();
1413 }
1414 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1415
1416 peer.reset_decrypt_failures();
1418 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1419
1420 peer.reset_decrypt_failures();
1422 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1423
1424 assert_eq!(peer.increment_decrypt_failures(), 1);
1426 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1427 }
1428
1429 #[test]
1430 fn test_rekey_jitter_in_range() {
1431 for _ in 0..100 {
1432 let identity = make_peer_identity();
1433 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1434 let jitter = peer.rekey_jitter_secs();
1435 assert!(
1436 (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1437 "jitter {} outside [-{}, +{}]",
1438 jitter,
1439 REKEY_JITTER_SECS,
1440 REKEY_JITTER_SECS
1441 );
1442 }
1443 }
1444
1445 #[test]
1446 fn test_rekey_jitter_mean_near_zero() {
1447 let mut sum = 0i64;
1448 let n = 200i64;
1449
1450 for _ in 0..n {
1451 let identity = make_peer_identity();
1452 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1453 sum += peer.rekey_jitter_secs();
1454 }
1455
1456 let mean = sum / n;
1457 assert!(
1458 mean.abs() < 5,
1459 "empirical mean {} not within 5 of 0 over {} samples",
1460 mean,
1461 n
1462 );
1463 }
1464}