1use crate::bloom::BloomFilter;
7use crate::mmp::{MmpConfig, MmpPeerState};
8use crate::node::REKEY_JITTER_SECS;
9use crate::noise::{HandshakeState as NoiseHandshakeState, NoiseError, NoiseSession};
10use crate::transport::{LinkId, LinkStats, TransportAddr, TransportId};
11use crate::tree::{ParentDeclaration, TreeCoordinate};
12use crate::utils::index::SessionIndex;
13use crate::{FipsAddress, NodeAddr, PeerIdentity};
14use rand::RngExt;
15use secp256k1::XOnlyPublicKey;
16use std::fmt;
17use std::time::{Duration, Instant};
18
19fn draw_rekey_jitter() -> i64 {
20 rand::rng().random_range(-REKEY_JITTER_SECS..=REKEY_JITTER_SECS)
21}
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum ConnectivityState {
28 Connected,
30 Stale,
32 Reconnecting,
34 Disconnected,
36}
37
38impl ConnectivityState {
39 pub fn can_send(&self) -> bool {
41 matches!(
42 self,
43 ConnectivityState::Connected | ConnectivityState::Stale
44 )
45 }
46
47 pub fn is_terminal(&self) -> bool {
49 matches!(self, ConnectivityState::Disconnected)
50 }
51
52 pub fn is_healthy(&self) -> bool {
54 matches!(self, ConnectivityState::Connected)
55 }
56}
57
58impl fmt::Display for ConnectivityState {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 let s = match self {
61 ConnectivityState::Connected => "connected",
62 ConnectivityState::Stale => "stale",
63 ConnectivityState::Reconnecting => "reconnecting",
64 ConnectivityState::Disconnected => "disconnected",
65 };
66 write!(f, "{}", s)
67 }
68}
69
70#[derive(Debug)]
79pub struct ActivePeer {
80 identity: PeerIdentity,
83
84 link_id: LinkId,
87 connectivity: ConnectivityState,
89
90 noise_session: Option<NoiseSession>,
93 our_index: Option<SessionIndex>,
95 their_index: Option<SessionIndex>,
97 transport_id: Option<TransportId>,
99 current_addr: Option<TransportAddr>,
101
102 declaration: Option<ParentDeclaration>,
105 ancestry: Option<TreeCoordinate>,
107
108 tree_announce_min_interval_ms: u64,
111 last_tree_announce_sent_ms: u64,
113 pending_tree_announce: bool,
115
116 inbound_filter: Option<BloomFilter>,
119 filter_sequence: u64,
121 filter_received_at: u64,
123 pending_filter_update: bool,
125
126 session_start: Instant,
130
131 link_stats: LinkStats,
134 authenticated_at: u64,
136 last_seen: u64,
138
139 remote_epoch: Option<[u8; 8]>,
142
143 mmp: Option<MmpPeerState>,
146
147 last_heartbeat_sent: Option<Instant>,
150
151 handshake_msg2: Option<Vec<u8>>,
155
156 replay_suppressed_count: u32,
159 consecutive_decrypt_failures: u32,
161
162 session_established_at: Instant,
165 rekey_jitter_secs: i64,
167 current_k_bit: bool,
169 previous_session: Option<NoiseSession>,
171 previous_our_index: Option<SessionIndex>,
173 drain_started: Option<Instant>,
175 pending_new_session: Option<NoiseSession>,
177 pending_our_index: Option<SessionIndex>,
179 pending_their_index: Option<SessionIndex>,
181 pending_rekey_initiator: bool,
183 pending_rekey_completed_at: Option<Instant>,
185 rekey_in_progress: bool,
187 last_peer_rekey: Option<Instant>,
189 rekey_handshake: Option<NoiseHandshakeState>,
191 rekey_our_index: Option<SessionIndex>,
193 rekey_msg1: Option<Vec<u8>>,
195 rekey_msg1_next_resend: u64,
197 rekey_msg1_resend_count: u32,
199
200 #[cfg(any(target_os = "linux", target_os = "macos"))]
216 connected_udp:
217 Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>>,
218
219 #[cfg(any(target_os = "linux", target_os = "macos"))]
226 peer_recv_drain: Option<crate::transport::udp::peer_drain::PeerRecvDrain>,
227}
228
229impl ActivePeer {
230 pub fn new(identity: PeerIdentity, link_id: LinkId, authenticated_at: u64) -> Self {
235 let now = Instant::now();
236 Self {
237 identity,
238 link_id,
239 connectivity: ConnectivityState::Connected,
240 noise_session: None,
241 our_index: None,
242 their_index: None,
243 transport_id: None,
244 current_addr: None,
245 declaration: None,
246 ancestry: None,
247 tree_announce_min_interval_ms: 500,
248 last_tree_announce_sent_ms: 0,
249 pending_tree_announce: false,
250 inbound_filter: None,
251 filter_sequence: 0,
252 filter_received_at: 0,
253 pending_filter_update: true, session_start: now,
255 link_stats: LinkStats::new(),
256 authenticated_at,
257 last_seen: authenticated_at,
258 remote_epoch: None,
259 mmp: None,
260 last_heartbeat_sent: None,
261 handshake_msg2: None,
262 replay_suppressed_count: 0,
263 consecutive_decrypt_failures: 0,
264 session_established_at: now,
265 rekey_jitter_secs: draw_rekey_jitter(),
266 current_k_bit: false,
267 previous_session: None,
268 previous_our_index: None,
269 drain_started: None,
270 pending_new_session: None,
271 pending_our_index: None,
272 pending_their_index: None,
273 pending_rekey_initiator: false,
274 pending_rekey_completed_at: None,
275 rekey_in_progress: false,
276 last_peer_rekey: None,
277 rekey_handshake: None,
278 rekey_our_index: None,
279 rekey_msg1: None,
280 rekey_msg1_next_resend: 0,
281 rekey_msg1_resend_count: 0,
282 #[cfg(any(target_os = "linux", target_os = "macos"))]
283 connected_udp: None,
284 #[cfg(any(target_os = "linux", target_os = "macos"))]
285 peer_recv_drain: None,
286 }
287 }
288
289 pub fn with_stats(
294 identity: PeerIdentity,
295 link_id: LinkId,
296 authenticated_at: u64,
297 link_stats: LinkStats,
298 ) -> Self {
299 let mut peer = Self::new(identity, link_id, authenticated_at);
300 peer.link_stats = link_stats;
301 peer
302 }
303
304 #[allow(clippy::too_many_arguments)]
309 pub fn with_session(
310 identity: PeerIdentity,
311 link_id: LinkId,
312 authenticated_at: u64,
313 noise_session: NoiseSession,
314 our_index: SessionIndex,
315 their_index: SessionIndex,
316 transport_id: TransportId,
317 current_addr: TransportAddr,
318 link_stats: LinkStats,
319 is_initiator: bool,
320 mmp_config: &MmpConfig,
321 remote_epoch: Option<[u8; 8]>,
322 ) -> Self {
323 let now = Instant::now();
324 Self {
325 identity,
326 link_id,
327 connectivity: ConnectivityState::Connected,
328 noise_session: Some(noise_session),
329 our_index: Some(our_index),
330 their_index: Some(their_index),
331 transport_id: Some(transport_id),
332 current_addr: Some(current_addr),
333 declaration: None,
334 ancestry: None,
335 tree_announce_min_interval_ms: 500,
336 last_tree_announce_sent_ms: 0,
337 pending_tree_announce: false,
338 inbound_filter: None,
339 filter_sequence: 0,
340 filter_received_at: 0,
341 pending_filter_update: true,
342 session_start: now,
343 link_stats,
344 authenticated_at,
345 last_seen: authenticated_at,
346 remote_epoch,
347 mmp: Some(MmpPeerState::new(mmp_config, is_initiator)),
348 last_heartbeat_sent: None,
349 handshake_msg2: None,
350 replay_suppressed_count: 0,
351 consecutive_decrypt_failures: 0,
352 session_established_at: now,
353 rekey_jitter_secs: draw_rekey_jitter(),
354 current_k_bit: false,
355 previous_session: None,
356 previous_our_index: None,
357 drain_started: None,
358 pending_new_session: None,
359 pending_our_index: None,
360 pending_their_index: None,
361 pending_rekey_initiator: false,
362 pending_rekey_completed_at: None,
363 rekey_in_progress: false,
364 last_peer_rekey: None,
365 rekey_handshake: None,
366 rekey_our_index: None,
367 rekey_msg1: None,
368 rekey_msg1_next_resend: 0,
369 rekey_msg1_resend_count: 0,
370 #[cfg(any(target_os = "linux", target_os = "macos"))]
371 connected_udp: None,
372 #[cfg(any(target_os = "linux", target_os = "macos"))]
373 peer_recv_drain: None,
374 }
375 }
376
377 #[cfg(any(target_os = "linux", target_os = "macos"))]
382 pub(crate) fn connected_udp(
383 &self,
384 ) -> Option<std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>> {
385 self.connected_udp.clone()
386 }
387
388 #[cfg(any(target_os = "linux", target_os = "macos"))]
402 pub(crate) fn set_connected_udp(
403 &mut self,
404 socket: std::sync::Arc<crate::transport::udp::connected_peer::ConnectedPeerSocket>,
405 drain: crate::transport::udp::peer_drain::PeerRecvDrain,
406 ) {
407 self.peer_recv_drain = None;
411 self.connected_udp = None;
412 self.connected_udp = Some(socket);
413 self.peer_recv_drain = Some(drain);
414 }
415
416 #[cfg(any(target_os = "linux", target_os = "macos"))]
421 pub(crate) fn clear_connected_udp(&mut self) {
422 self.peer_recv_drain = None;
423 self.connected_udp = None;
424 }
425
426 pub fn identity(&self) -> &PeerIdentity {
430 &self.identity
431 }
432
433 pub fn node_addr(&self) -> &NodeAddr {
435 self.identity.node_addr()
436 }
437
438 pub fn address(&self) -> &FipsAddress {
440 self.identity.address()
441 }
442
443 pub fn pubkey(&self) -> XOnlyPublicKey {
445 self.identity.pubkey()
446 }
447
448 pub fn npub(&self) -> String {
450 self.identity.npub()
451 }
452
453 pub fn link_id(&self) -> LinkId {
457 self.link_id
458 }
459
460 pub fn connectivity(&self) -> ConnectivityState {
462 self.connectivity
463 }
464
465 pub fn can_send(&self) -> bool {
467 self.connectivity.can_send()
468 }
469
470 pub fn is_healthy(&self) -> bool {
472 self.connectivity.is_healthy()
473 }
474
475 pub fn is_disconnected(&self) -> bool {
477 self.connectivity.is_terminal()
478 }
479
480 pub fn has_session(&self) -> bool {
484 self.noise_session.is_some()
485 }
486
487 pub fn noise_session(&self) -> Option<&NoiseSession> {
489 self.noise_session.as_ref()
490 }
491
492 pub fn noise_session_mut(&mut self) -> Option<&mut NoiseSession> {
494 self.noise_session.as_mut()
495 }
496
497 pub fn our_index(&self) -> Option<SessionIndex> {
499 self.our_index
500 }
501
502 pub fn their_index(&self) -> Option<SessionIndex> {
504 self.their_index
505 }
506
507 pub fn set_their_index(&mut self, index: SessionIndex) {
511 self.their_index = Some(index);
512 }
513
514 pub fn replace_session(
524 &mut self,
525 new_session: NoiseSession,
526 new_our_index: SessionIndex,
527 new_their_index: SessionIndex,
528 ) -> Option<SessionIndex> {
529 self.reset_replay_suppressed();
530 let old_our_index = self.our_index;
531 self.noise_session = Some(new_session);
532 self.our_index = Some(new_our_index);
533 self.their_index = Some(new_their_index);
534 old_our_index
535 }
536
537 pub fn transport_id(&self) -> Option<TransportId> {
539 self.transport_id
540 }
541
542 pub fn current_addr(&self) -> Option<&TransportAddr> {
544 self.current_addr.as_ref()
545 }
546
547 pub fn set_current_addr(&mut self, transport_id: TransportId, addr: &TransportAddr) -> bool {
568 if self.transport_id == Some(transport_id) && self.current_addr.as_ref() == Some(addr) {
569 return false;
570 }
571 self.transport_id = Some(transport_id);
572 self.current_addr = Some(addr.clone());
573 true
574 }
575
576 pub fn set_handshake_msg2(&mut self, msg2: Vec<u8>) {
580 self.handshake_msg2 = Some(msg2);
581 }
582
583 pub fn handshake_msg2(&self) -> Option<&[u8]> {
585 self.handshake_msg2.as_deref()
586 }
587
588 pub fn clear_handshake_msg2(&mut self) {
590 self.handshake_msg2 = None;
591 }
592
593 pub fn increment_replay_suppressed(&mut self) -> u32 {
597 self.replay_suppressed_count += 1;
598 self.replay_suppressed_count
599 }
600
601 pub fn reset_replay_suppressed(&mut self) -> u32 {
603 let count = self.replay_suppressed_count;
604 self.replay_suppressed_count = 0;
605 count
606 }
607
608 pub fn replay_suppressed_count(&self) -> u32 {
610 self.replay_suppressed_count
611 }
612
613 pub fn increment_decrypt_failures(&mut self) -> u32 {
617 self.consecutive_decrypt_failures += 1;
618 self.consecutive_decrypt_failures
619 }
620
621 pub fn reset_decrypt_failures(&mut self) {
623 self.consecutive_decrypt_failures = 0;
624 }
625
626 pub fn consecutive_decrypt_failures(&self) -> u32 {
628 self.consecutive_decrypt_failures
629 }
630
631 pub fn remote_epoch(&self) -> Option<[u8; 8]> {
635 self.remote_epoch
636 }
637
638 pub(crate) fn set_remote_epoch(&mut self, remote_epoch: Option<[u8; 8]>) {
642 self.remote_epoch = remote_epoch;
643 }
644
645 pub fn coords(&self) -> Option<&TreeCoordinate> {
649 self.ancestry.as_ref()
650 }
651
652 pub fn declaration(&self) -> Option<&ParentDeclaration> {
654 self.declaration.as_ref()
655 }
656
657 pub fn has_tree_position(&self) -> bool {
659 self.declaration.is_some() && self.ancestry.is_some()
660 }
661
662 pub fn inbound_filter(&self) -> Option<&BloomFilter> {
666 self.inbound_filter.as_ref()
667 }
668
669 pub fn filter_sequence(&self) -> u64 {
671 self.filter_sequence
672 }
673
674 pub fn filter_is_stale(&self, current_time_ms: u64, stale_threshold_ms: u64) -> bool {
676 if self.filter_received_at == 0 {
677 return true;
678 }
679 current_time_ms.saturating_sub(self.filter_received_at) > stale_threshold_ms
680 }
681
682 pub fn may_reach(&self, node_addr: &NodeAddr) -> bool {
684 match &self.inbound_filter {
685 Some(filter) => filter.contains(node_addr),
686 None => false,
687 }
688 }
689
690 pub fn needs_filter_update(&self) -> bool {
692 self.pending_filter_update
693 }
694
695 pub fn link_stats(&self) -> &LinkStats {
699 &self.link_stats
700 }
701
702 pub fn link_stats_mut(&mut self) -> &mut LinkStats {
704 &mut self.link_stats
705 }
706
707 pub fn mmp(&self) -> Option<&MmpPeerState> {
711 self.mmp.as_ref()
712 }
713
714 pub fn mmp_mut(&mut self) -> Option<&mut MmpPeerState> {
716 self.mmp.as_mut()
717 }
718
719 pub fn link_cost(&self) -> f64 {
727 match self.mmp() {
728 Some(mmp) => {
729 let etx = mmp.metrics.etx;
730 match mmp.metrics.srtt_ms() {
731 Some(srtt_ms) => etx * (1.0 + srtt_ms / 100.0),
732 None => 1.0,
733 }
734 }
735 None => 1.0,
736 }
737 }
738
739 pub fn has_srtt(&self) -> bool {
741 self.mmp()
742 .is_some_and(|mmp| mmp.metrics.srtt_ms().is_some())
743 }
744
745 pub fn authenticated_at(&self) -> u64 {
747 self.authenticated_at
748 }
749
750 pub fn last_seen(&self) -> u64 {
752 self.last_seen
753 }
754
755 pub fn idle_time(&self, current_time_ms: u64) -> u64 {
757 current_time_ms.saturating_sub(self.last_seen)
758 }
759
760 pub fn connection_duration(&self, current_time_ms: u64) -> u64 {
762 current_time_ms.saturating_sub(self.authenticated_at)
763 }
764
765 pub fn session_elapsed_ms(&self) -> u32 {
770 self.session_start.elapsed().as_millis() as u32
771 }
772
773 pub fn session_start(&self) -> Instant {
775 self.session_start
776 }
777
778 pub fn last_heartbeat_sent(&self) -> Option<Instant> {
782 self.last_heartbeat_sent
783 }
784
785 pub fn mark_heartbeat_sent(&mut self, now: Instant) {
787 self.last_heartbeat_sent = Some(now);
788 }
789
790 pub fn touch(&mut self, current_time_ms: u64) {
794 self.last_seen = current_time_ms;
795 if self.connectivity == ConnectivityState::Stale {
799 self.connectivity = ConnectivityState::Connected;
800 }
801 }
802
803 pub fn mark_stale(&mut self) {
805 if self.connectivity == ConnectivityState::Connected {
806 self.connectivity = ConnectivityState::Stale;
807 }
808 }
809
810 pub fn mark_reconnecting(&mut self) {
812 self.connectivity = ConnectivityState::Reconnecting;
813 }
814
815 pub fn mark_disconnected(&mut self) {
817 self.connectivity = ConnectivityState::Disconnected;
818 }
819
820 pub fn mark_connected(&mut self, current_time_ms: u64) {
822 self.connectivity = ConnectivityState::Connected;
823 self.last_seen = current_time_ms;
824 }
825
826 pub fn set_link_id(&mut self, link_id: LinkId) {
828 self.link_id = link_id;
829 }
830
831 pub fn update_tree_position(
835 &mut self,
836 declaration: ParentDeclaration,
837 ancestry: TreeCoordinate,
838 current_time_ms: u64,
839 ) {
840 self.declaration = Some(declaration);
841 self.ancestry = Some(ancestry);
842 self.last_seen = current_time_ms;
843 }
844
845 pub fn clear_tree_position(&mut self) {
847 self.declaration = None;
848 self.ancestry = None;
849 }
850
851 pub fn set_tree_announce_min_interval_ms(&mut self, ms: u64) {
855 self.tree_announce_min_interval_ms = ms;
856 }
857
858 pub fn last_tree_announce_sent_ms(&self) -> u64 {
860 self.last_tree_announce_sent_ms
861 }
862
863 pub fn set_last_tree_announce_sent_ms(&mut self, ms: u64) {
865 self.last_tree_announce_sent_ms = ms;
866 }
867
868 pub fn can_send_tree_announce(&self, now_ms: u64) -> bool {
870 now_ms.saturating_sub(self.last_tree_announce_sent_ms) >= self.tree_announce_min_interval_ms
871 }
872
873 pub fn record_tree_announce_sent(&mut self, now_ms: u64) {
875 self.last_tree_announce_sent_ms = now_ms;
876 self.pending_tree_announce = false;
877 }
878
879 pub fn mark_tree_announce_pending(&mut self) {
881 self.pending_tree_announce = true;
882 }
883
884 pub fn has_pending_tree_announce(&self) -> bool {
886 self.pending_tree_announce
887 }
888
889 pub fn update_filter(&mut self, filter: BloomFilter, sequence: u64, current_time_ms: u64) {
893 self.inbound_filter = Some(filter);
894 self.filter_sequence = sequence;
895 self.filter_received_at = current_time_ms;
896 self.last_seen = current_time_ms;
897 }
898
899 pub fn clear_filter(&mut self) {
901 self.inbound_filter = None;
902 self.filter_sequence = 0;
903 self.filter_received_at = 0;
904 }
905
906 pub fn mark_filter_update_needed(&mut self) {
908 self.pending_filter_update = true;
909 }
910
911 pub fn clear_filter_update_needed(&mut self) {
913 self.pending_filter_update = false;
914 }
915
916 pub fn session_established_at(&self) -> Instant {
920 self.session_established_at
921 }
922
923 #[cfg(test)]
924 pub(crate) fn set_session_established_at_for_test(&mut self, instant: Instant) {
925 self.session_established_at = instant;
926 }
927
928 pub fn rekey_jitter_secs(&self) -> i64 {
930 self.rekey_jitter_secs
931 }
932
933 pub fn current_k_bit(&self) -> bool {
935 self.current_k_bit
936 }
937
938 pub fn rekey_in_progress(&self) -> bool {
940 self.rekey_in_progress
941 }
942
943 pub fn set_rekey_in_progress(&mut self) {
945 self.rekey_in_progress = true;
946 }
947
948 pub fn is_rekey_dampened(&self, dampening_secs: u64) -> bool {
950 match self.last_peer_rekey {
951 Some(t) => t.elapsed().as_secs() < dampening_secs,
952 None => false,
953 }
954 }
955
956 pub fn record_peer_rekey(&mut self) {
958 self.last_peer_rekey = Some(Instant::now());
959 }
960
961 pub fn pending_our_index(&self) -> Option<SessionIndex> {
963 self.pending_our_index
964 }
965
966 pub fn pending_their_index(&self) -> Option<SessionIndex> {
968 self.pending_their_index
969 }
970
971 pub fn previous_our_index(&self) -> Option<SessionIndex> {
973 self.previous_our_index
974 }
975
976 pub fn previous_session(&self) -> Option<&NoiseSession> {
978 self.previous_session.as_ref()
979 }
980
981 pub fn previous_session_mut(&mut self) -> Option<&mut NoiseSession> {
983 self.previous_session.as_mut()
984 }
985
986 pub fn pending_new_session(&self) -> Option<&NoiseSession> {
988 self.pending_new_session.as_ref()
989 }
990
991 pub(crate) fn trial_decrypt_pending_new_session(
997 &mut self,
998 ciphertext: &[u8],
999 counter: u64,
1000 aad: &[u8],
1001 ) -> Option<Vec<u8>> {
1002 self.pending_new_session.as_mut().and_then(|session| {
1003 session
1004 .decrypt_with_replay_check_and_aad(ciphertext, counter, aad)
1005 .ok()
1006 })
1007 }
1008
1009 pub fn pending_rekey_initiator(&self) -> bool {
1011 self.pending_rekey_initiator
1012 }
1013
1014 pub fn pending_rekey_cutover_due(&self, delay: Duration) -> bool {
1017 self.pending_rekey_initiator
1018 && self
1019 .pending_rekey_completed_at
1020 .is_some_and(|completed| completed.elapsed() >= delay)
1021 }
1022
1023 pub fn set_pending_session(
1029 &mut self,
1030 session: NoiseSession,
1031 our_index: SessionIndex,
1032 their_index: SessionIndex,
1033 initiated_by_local: bool,
1034 ) {
1035 self.pending_new_session = Some(session);
1036 self.pending_our_index = Some(our_index);
1037 self.pending_their_index = Some(their_index);
1038 self.pending_rekey_initiator = initiated_by_local;
1039 self.pending_rekey_completed_at = Some(Instant::now());
1040 self.rekey_in_progress = false;
1041 self.rekey_our_index = None;
1043 self.rekey_handshake = None;
1044 self.rekey_msg1 = None;
1045 self.rekey_msg1_next_resend = 0;
1046 self.rekey_msg1_resend_count = 0;
1047 }
1048
1049 pub fn cutover_to_new_session(&mut self) -> Option<SessionIndex> {
1055 let new_session = self.pending_new_session.take()?;
1056 let new_our_index = self.pending_our_index.take();
1057 let new_their_index = self.pending_their_index.take();
1058
1059 self.previous_session = self.noise_session.take();
1061 self.previous_our_index = self.our_index;
1062 self.drain_started = Some(Instant::now());
1063
1064 self.noise_session = Some(new_session);
1066 self.our_index = new_our_index;
1067 self.their_index = new_their_index;
1068 self.pending_rekey_initiator = false;
1069 self.pending_rekey_completed_at = None;
1070
1071 self.current_k_bit = !self.current_k_bit;
1073 self.session_established_at = Instant::now();
1074 self.session_start = Instant::now();
1075 self.rekey_in_progress = false;
1076 self.rekey_msg1_resend_count = 0;
1077 self.rekey_jitter_secs = draw_rekey_jitter();
1078 self.last_heartbeat_sent = None;
1079 self.reset_replay_suppressed();
1080
1081 let now = Instant::now();
1083 if let Some(mmp) = &mut self.mmp {
1084 mmp.reset_for_rekey(now);
1085 }
1086
1087 self.previous_our_index
1088 }
1089
1090 pub fn handle_peer_kbit_flip(&mut self) -> Option<SessionIndex> {
1095 let new_session = self.pending_new_session.take()?;
1096 let new_our_index = self.pending_our_index.take();
1097 let new_their_index = self.pending_their_index.take();
1098
1099 self.previous_session = self.noise_session.take();
1101 self.previous_our_index = self.our_index;
1102 self.drain_started = Some(Instant::now());
1103
1104 self.noise_session = Some(new_session);
1106 self.our_index = new_our_index;
1107 self.their_index = new_their_index;
1108 self.pending_rekey_initiator = false;
1109 self.pending_rekey_completed_at = None;
1110
1111 self.current_k_bit = !self.current_k_bit;
1113 self.session_established_at = Instant::now();
1114 self.session_start = Instant::now();
1115 self.rekey_in_progress = false;
1116 self.rekey_msg1_resend_count = 0;
1117 self.rekey_jitter_secs = draw_rekey_jitter();
1118 self.last_heartbeat_sent = None;
1119 self.reset_replay_suppressed();
1120
1121 let now = Instant::now();
1123 if let Some(mmp) = &mut self.mmp {
1124 mmp.reset_for_rekey(now);
1125 }
1126
1127 self.previous_our_index
1128 }
1129
1130 pub fn drain_expired(&self, drain_secs: u64) -> bool {
1132 match self.drain_started {
1133 Some(t) => t.elapsed().as_secs() >= drain_secs,
1134 None => false,
1135 }
1136 }
1137
1138 pub fn is_draining(&self) -> bool {
1140 self.drain_started.is_some()
1141 }
1142
1143 pub fn complete_drain(&mut self) -> Option<SessionIndex> {
1148 self.previous_session = None;
1149 self.drain_started = None;
1150 self.previous_our_index.take()
1151 }
1152
1153 pub fn abandon_rekey(&mut self) -> Option<SessionIndex> {
1159 self.rekey_handshake = None;
1160 self.rekey_msg1 = None;
1161 self.rekey_msg1_next_resend = 0;
1162 self.rekey_msg1_resend_count = 0;
1163 self.rekey_in_progress = false;
1164 self.rekey_our_index.take().or_else(|| {
1166 self.pending_new_session = None;
1167 self.pending_their_index = None;
1168 self.pending_rekey_initiator = false;
1169 self.pending_rekey_completed_at = None;
1170 self.pending_our_index.take()
1171 })
1172 }
1173
1174 pub fn set_rekey_state(
1178 &mut self,
1179 handshake: NoiseHandshakeState,
1180 our_index: SessionIndex,
1181 wire_msg1: Vec<u8>,
1182 next_resend_ms: u64,
1183 ) {
1184 self.rekey_handshake = Some(handshake);
1185 self.rekey_our_index = Some(our_index);
1186 self.rekey_msg1 = Some(wire_msg1);
1187 self.rekey_msg1_next_resend = next_resend_ms;
1188 self.rekey_msg1_resend_count = 0;
1189 self.rekey_in_progress = true;
1190 }
1191
1192 pub fn rekey_our_index(&self) -> Option<SessionIndex> {
1194 self.rekey_our_index
1195 }
1196
1197 pub fn complete_rekey_msg2(
1203 &mut self,
1204 msg2_bytes: &[u8],
1205 ) -> Result<(NoiseSession, Option<[u8; 8]>), NoiseError> {
1206 let mut hs = self
1207 .rekey_handshake
1208 .take()
1209 .ok_or_else(|| NoiseError::WrongState {
1210 expected: "rekey handshake in progress".to_string(),
1211 got: "no handshake state".to_string(),
1212 })?;
1213
1214 hs.read_message_2(msg2_bytes)?;
1215 let remote_epoch = hs.remote_epoch();
1216 let session = hs.into_session()?;
1217
1218 self.rekey_msg1 = None;
1220 self.rekey_msg1_next_resend = 0;
1221 self.rekey_msg1_resend_count = 0;
1222
1223 Ok((session, remote_epoch))
1224 }
1225
1226 pub fn needs_msg1_resend(&self, now_ms: u64) -> bool {
1228 self.rekey_in_progress && self.rekey_msg1.is_some() && now_ms >= self.rekey_msg1_next_resend
1229 }
1230
1231 pub fn rekey_msg1(&self) -> Option<&[u8]> {
1233 self.rekey_msg1.as_deref()
1234 }
1235
1236 pub fn set_msg1_next_resend(&mut self, next_ms: u64) {
1238 self.rekey_msg1_next_resend = next_ms;
1239 }
1240
1241 pub fn rekey_msg1_resend_count(&self) -> u32 {
1243 self.rekey_msg1_resend_count
1244 }
1245
1246 pub fn record_rekey_msg1_resend(&mut self, next_ms: u64) {
1248 self.rekey_msg1_resend_count = self.rekey_msg1_resend_count.saturating_add(1);
1249 self.rekey_msg1_next_resend = next_ms;
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256 use crate::Identity;
1257
1258 fn make_peer_identity() -> PeerIdentity {
1259 let identity = Identity::generate();
1260 PeerIdentity::from_pubkey(identity.pubkey())
1261 }
1262
1263 fn make_node_addr(val: u8) -> NodeAddr {
1264 let mut bytes = [0u8; 16];
1265 bytes[0] = val;
1266 NodeAddr::from_bytes(bytes)
1267 }
1268
1269 fn make_coords(ids: &[u8]) -> TreeCoordinate {
1270 TreeCoordinate::from_addrs(ids.iter().map(|&v| make_node_addr(v)).collect()).unwrap()
1271 }
1272
1273 fn ik_session_pair() -> (NoiseSession, NoiseSession) {
1274 let initiator_id = Identity::generate();
1275 let responder_id = Identity::generate();
1276 let mut initiator =
1277 NoiseHandshakeState::new_initiator(initiator_id.keypair(), responder_id.pubkey_full());
1278 let mut responder = NoiseHandshakeState::new_responder(responder_id.keypair());
1279 initiator.set_local_epoch([0xA1, 0xB2, 0xC3, 0xD4, 0x11, 0x22, 0x33, 0x44]);
1280 responder.set_local_epoch([0xD4, 0xC3, 0xB2, 0xA1, 0x44, 0x33, 0x22, 0x11]);
1281
1282 let msg1 = initiator.write_message_1().unwrap();
1283 responder.read_message_1(&msg1).unwrap();
1284 let msg2 = responder.write_message_2().unwrap();
1285 initiator.read_message_2(&msg2).unwrap();
1286
1287 (
1288 initiator.into_session().unwrap(),
1289 responder.into_session().unwrap(),
1290 )
1291 }
1292
1293 fn seal_fmp(
1294 sender: &mut NoiseSession,
1295 receiver_idx: SessionIndex,
1296 plaintext: &[u8],
1297 k_bit: bool,
1298 ) -> (Vec<u8>, u64, [u8; 16]) {
1299 use crate::node::wire::{FLAG_KEY_EPOCH, build_established_header};
1300
1301 let counter = sender.current_send_counter();
1302 let flags = if k_bit { FLAG_KEY_EPOCH } else { 0 };
1303 let header = build_established_header(receiver_idx, counter, flags, plaintext.len() as u16);
1304 let ciphertext = sender.encrypt_with_aad(plaintext, &header).unwrap();
1305 (ciphertext, counter, header)
1306 }
1307
1308 fn peer_with_current(current_recv: NoiseSession) -> ActivePeer {
1309 let identity = make_peer_identity();
1310 ActivePeer::with_session(
1311 identity,
1312 LinkId::new(1),
1313 1_000,
1314 current_recv,
1315 SessionIndex::new(1),
1316 SessionIndex::new(2),
1317 TransportId::new(1),
1318 TransportAddr::from_string("hci0/AA:BB:CC:DD:EE:01"),
1319 LinkStats::new(),
1320 true,
1321 &MmpConfig::default(),
1322 None,
1323 )
1324 }
1325
1326 #[test]
1327 fn test_connectivity_state_properties() {
1328 assert!(ConnectivityState::Connected.can_send());
1329 assert!(ConnectivityState::Stale.can_send());
1330 assert!(!ConnectivityState::Reconnecting.can_send());
1331 assert!(!ConnectivityState::Disconnected.can_send());
1332
1333 assert!(ConnectivityState::Connected.is_healthy());
1334 assert!(!ConnectivityState::Stale.is_healthy());
1335
1336 assert!(ConnectivityState::Disconnected.is_terminal());
1337 assert!(!ConnectivityState::Connected.is_terminal());
1338 }
1339
1340 #[test]
1341 fn test_active_peer_creation() {
1342 let identity = make_peer_identity();
1343 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1344
1345 assert_eq!(peer.identity().node_addr(), identity.node_addr());
1346 assert_eq!(peer.link_id(), LinkId::new(1));
1347 assert!(peer.is_healthy());
1348 assert!(peer.can_send());
1349 assert_eq!(peer.authenticated_at(), 1000);
1350 assert!(peer.needs_filter_update()); }
1352
1353 #[test]
1354 fn test_connectivity_transitions() {
1355 let identity = make_peer_identity();
1356 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1357
1358 assert!(peer.is_healthy());
1359
1360 peer.mark_stale();
1361 assert_eq!(peer.connectivity(), ConnectivityState::Stale);
1362 assert!(peer.can_send()); peer.touch(2000);
1366 assert!(peer.is_healthy());
1367
1368 peer.mark_reconnecting();
1369 assert!(!peer.can_send());
1370 peer.touch(2500);
1371 assert_eq!(peer.connectivity(), ConnectivityState::Reconnecting);
1372 assert!(!peer.can_send());
1373
1374 peer.mark_connected(3000);
1375 assert!(peer.is_healthy());
1376
1377 peer.mark_disconnected();
1378 assert!(peer.is_disconnected());
1379 assert!(!peer.can_send());
1380 }
1381
1382 #[test]
1383 fn test_tree_position() {
1384 let identity = make_peer_identity();
1385 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1386
1387 assert!(!peer.has_tree_position());
1388 assert!(peer.coords().is_none());
1389
1390 let node = make_node_addr(1);
1391 let parent = make_node_addr(2);
1392 let decl = ParentDeclaration::new(node, parent, 1, 1000);
1393 let coords = make_coords(&[1, 2, 0]);
1394
1395 peer.update_tree_position(decl, coords, 2000);
1396
1397 assert!(peer.has_tree_position());
1398 assert!(peer.coords().is_some());
1399 assert_eq!(peer.last_seen(), 2000);
1400 }
1401
1402 #[test]
1403 fn test_bloom_filter() {
1404 let identity = make_peer_identity();
1405 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1406 let target = make_node_addr(42);
1407
1408 assert!(!peer.may_reach(&target));
1409 assert!(peer.filter_is_stale(2000, 500));
1410
1411 let mut filter = BloomFilter::new();
1412 filter.insert(&target);
1413 peer.update_filter(filter, 1, 1500);
1414
1415 assert!(peer.may_reach(&target));
1416 assert!(!peer.filter_is_stale(1800, 500));
1417 assert!(peer.filter_is_stale(2500, 500));
1418 }
1419
1420 #[test]
1421 fn test_timing() {
1422 let identity = make_peer_identity();
1423 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1424
1425 assert_eq!(peer.connection_duration(2000), 1000);
1426 assert_eq!(peer.idle_time(2000), 1000);
1427 }
1428
1429 #[test]
1430 fn test_filter_update_flag() {
1431 let identity = make_peer_identity();
1432 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1433
1434 assert!(peer.needs_filter_update()); peer.clear_filter_update_needed();
1437 assert!(!peer.needs_filter_update());
1438
1439 peer.mark_filter_update_needed();
1440 assert!(peer.needs_filter_update());
1441 }
1442
1443 #[test]
1444 fn test_with_stats() {
1445 let identity = make_peer_identity();
1446 let mut stats = LinkStats::new();
1447 stats.record_sent(100);
1448 stats.record_recv(200, 500);
1449
1450 let peer = ActivePeer::with_stats(identity, LinkId::new(1), 1000, stats);
1451
1452 assert_eq!(peer.link_stats().packets_sent, 1);
1453 assert_eq!(peer.link_stats().packets_recv, 1);
1454 }
1455
1456 #[test]
1457 fn test_replay_suppression_counter() {
1458 let identity = make_peer_identity();
1459 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1460
1461 assert_eq!(peer.replay_suppressed_count(), 0);
1463
1464 assert_eq!(peer.increment_replay_suppressed(), 1);
1466 assert_eq!(peer.increment_replay_suppressed(), 2);
1467 assert_eq!(peer.increment_replay_suppressed(), 3);
1468 assert_eq!(peer.replay_suppressed_count(), 3);
1469
1470 assert_eq!(peer.reset_replay_suppressed(), 3);
1472 assert_eq!(peer.replay_suppressed_count(), 0);
1473
1474 assert_eq!(peer.increment_replay_suppressed(), 1);
1476 assert_eq!(peer.replay_suppressed_count(), 1);
1477
1478 peer.reset_replay_suppressed();
1480 assert_eq!(peer.reset_replay_suppressed(), 0);
1481 }
1482
1483 #[test]
1484 fn test_increment_decrypt_failures_monotonic() {
1485 let identity = make_peer_identity();
1486 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1487
1488 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1490
1491 let mut prev = 0u32;
1493 for expected in 1..=25u32 {
1494 let count = peer.increment_decrypt_failures();
1495 assert_eq!(count, expected, "increment must return monotonic count");
1496 assert!(count > prev, "count must strictly increase");
1497 assert_eq!(peer.consecutive_decrypt_failures(), count);
1498 prev = count;
1499 }
1500 }
1501
1502 #[test]
1503 fn test_reset_decrypt_failures_zeroes_counter() {
1504 let identity = make_peer_identity();
1505 let mut peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1506
1507 for _ in 0..7 {
1509 peer.increment_decrypt_failures();
1510 }
1511 assert_eq!(peer.consecutive_decrypt_failures(), 7);
1512
1513 peer.reset_decrypt_failures();
1515 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1516
1517 peer.reset_decrypt_failures();
1519 assert_eq!(peer.consecutive_decrypt_failures(), 0);
1520
1521 assert_eq!(peer.increment_decrypt_failures(), 1);
1523 assert_eq!(peer.consecutive_decrypt_failures(), 1);
1524 }
1525
1526 #[test]
1527 fn test_rekey_jitter_in_range() {
1528 for _ in 0..100 {
1529 let identity = make_peer_identity();
1530 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1531 let jitter = peer.rekey_jitter_secs();
1532 assert!(
1533 (-REKEY_JITTER_SECS..=REKEY_JITTER_SECS).contains(&jitter),
1534 "jitter {} outside [-{}, +{}]",
1535 jitter,
1536 REKEY_JITTER_SECS,
1537 REKEY_JITTER_SECS
1538 );
1539 }
1540 }
1541
1542 #[test]
1543 fn test_rekey_jitter_mean_near_zero() {
1544 let mut sum = 0i64;
1545 let n = 200i64;
1546
1547 for _ in 0..n {
1548 let identity = make_peer_identity();
1549 let peer = ActivePeer::new(identity, LinkId::new(1), 1000);
1550 sum += peer.rekey_jitter_secs();
1551 }
1552
1553 let mean = sum / n;
1554 assert!(
1555 mean.abs() < 5,
1556 "empirical mean {} not within 5 of 0 over {} samples",
1557 mean,
1558 n
1559 );
1560 }
1561
1562 #[test]
1563 fn fmp_pending_trial_authenticates_then_promotes() {
1564 let (_current_sender, current_receiver) = ik_session_pair();
1565 let (mut pending_sender, pending_receiver) = ik_session_pair();
1566 let mut peer = peer_with_current(current_receiver);
1567 let k_before = peer.current_k_bit();
1568 peer.set_pending_session(
1569 pending_receiver,
1570 SessionIndex::new(3),
1571 SessionIndex::new(4),
1572 false,
1573 );
1574
1575 let (ciphertext, counter, header) = seal_fmp(
1576 &mut pending_sender,
1577 SessionIndex::new(3),
1578 b"new-epoch",
1579 !k_before,
1580 );
1581 let plaintext = peer
1582 .trial_decrypt_pending_new_session(&ciphertext, counter, &header)
1583 .expect("pending frame must authenticate");
1584
1585 assert_eq!(plaintext, b"new-epoch");
1586 assert!(peer.handle_peer_kbit_flip().is_some());
1587 assert!(peer.pending_new_session().is_none());
1588 assert_eq!(peer.current_k_bit(), !k_before);
1589 assert!(peer.previous_session().is_some());
1590 }
1591
1592 #[test]
1593 fn fmp_pending_trial_failure_leaves_pending_replay_intact() {
1594 let (_current_sender, current_receiver) = ik_session_pair();
1595 let (mut pending_sender, pending_receiver) = ik_session_pair();
1596 let (mut stale_sender, _stale_receiver) = ik_session_pair();
1597 let mut peer = peer_with_current(current_receiver);
1598 let k_before = peer.current_k_bit();
1599 peer.set_pending_session(
1600 pending_receiver,
1601 SessionIndex::new(3),
1602 SessionIndex::new(4),
1603 false,
1604 );
1605
1606 let (stale_ciphertext, stale_counter, stale_header) =
1607 seal_fmp(&mut stale_sender, SessionIndex::new(3), b"stale", !k_before);
1608 assert!(
1609 peer.trial_decrypt_pending_new_session(&stale_ciphertext, stale_counter, &stale_header)
1610 .is_none()
1611 );
1612 assert!(peer.pending_new_session().is_some());
1613 assert_eq!(peer.current_k_bit(), k_before);
1614
1615 let (pending_ciphertext, pending_counter, pending_header) = seal_fmp(
1616 &mut pending_sender,
1617 SessionIndex::new(3),
1618 b"new-epoch",
1619 !k_before,
1620 );
1621 let plaintext = peer
1622 .trial_decrypt_pending_new_session(
1623 &pending_ciphertext,
1624 pending_counter,
1625 &pending_header,
1626 )
1627 .expect("failed stale trial must not consume pending replay window");
1628 assert_eq!(plaintext, b"new-epoch");
1629 }
1630}