1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod path_requests;
10pub mod pathfinder;
11pub mod queries;
12pub mod rate_limit;
13pub mod retention;
14pub mod tables;
15pub mod tunnel;
16pub mod types;
17
18use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
19use alloc::string::String;
20use alloc::vec::Vec;
21use core::mem::size_of;
22
23use rns_crypto::Rng;
24
25use crate::announce::AnnounceData;
26use crate::constants;
27use crate::hash;
28use crate::packet::RawPacket;
29
30use self::announce_proc::compute_path_expires;
31use self::announce_queue::AnnounceQueues;
32use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
33use self::dedup::{AnnounceSignatureCache, PacketHashlist};
34use self::inbound::{
35 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
36 route_via_link_table,
37};
38use self::ingress_control::IngressControl;
39use self::outbound::{route_outbound, should_transmit_announce};
40use self::pathfinder::{
41 decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
42 timebase_from_random_blobs, MultiPathDecision,
43};
44use self::rate_limit::AnnounceRateLimiter;
45use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
46use self::tunnel::TunnelTable;
47use self::types::{
48 BlackholeEntry, InterfaceId, InterfaceInfo, PacketBytes, TransportAction, TransportConfig,
49};
50
51pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
52pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
53
54#[derive(Debug, Clone, Copy, PartialEq, Default)]
55pub struct RxMetadata {
56 pub rssi: Option<i16>,
57 pub snr: Option<f32>,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq)]
61pub struct InboundFrame<'a> {
62 pub raw: &'a [u8],
63 pub iface: InterfaceId,
64 pub now: f64,
65 pub rx: RxMetadata,
66}
67
68impl<'a> InboundFrame<'a> {
69 pub fn new(raw: &'a [u8], iface: InterfaceId, now: f64) -> Self {
70 Self {
71 raw,
72 iface,
73 now,
74 rx: RxMetadata::default(),
75 }
76 }
77
78 pub fn with_rx(mut self, rx: RxMetadata) -> Self {
79 self.rx = rx;
80 self
81 }
82}
83
84struct InboundPacketCtx {
85 packet: RawPacket,
86 original_raw: Option<Vec<u8>>,
87 iface: InterfaceId,
88 now: f64,
89 from_local_client: bool,
90}
91
92struct VerifiedAnnounceCtx<'a> {
93 packet: &'a RawPacket,
94 original_raw: &'a [u8],
95 iface: InterfaceId,
96 now: f64,
97 validated: crate::announce::ValidatedAnnounce,
98 received_from: [u8; 16],
99 random_blob: [u8; 10],
100 announce_emitted: u64,
101}
102
103struct TickCtx<'a> {
104 now: f64,
105 rng: &'a mut dyn Rng,
106 actions: Vec<TransportAction>,
107}
108
109struct PathRequestCtx<'a> {
110 tag: &'a [u8],
111 interface_id: InterfaceId,
112 now: f64,
113 destination_hash: [u8; 16],
114}
115
116pub struct TransportEngine {
121 config: TransportConfig,
122 path_table: BTreeMap<[u8; 16], PathSet>,
123 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
124 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
125 link_table: BTreeMap<[u8; 16], LinkEntry>,
126 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
127 packet_hashlist: PacketHashlist,
128 announce_sig_cache: AnnounceSignatureCache,
129 rate_limiter: AnnounceRateLimiter,
130 path_states: BTreeMap<[u8; 16], u8>,
131 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
132 local_destinations: BTreeMap<[u8; 16], u8>,
133 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
134 announce_queues: AnnounceQueues,
135 ingress_control: IngressControl,
136 tunnel_table: TunnelTable,
137 discovery_pr_tags: VecDeque<[u8; 32]>,
138 discovery_pr_tag_set: BTreeSet<[u8; 32]>,
139 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
140 path_destination_cap_evict_count: usize,
141 announces_last_checked: f64,
143 tables_last_culled: f64,
144}
145
146impl TransportEngine {
147 pub fn new(config: TransportConfig) -> Self {
148 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
149 let sig_cache_max = if config.announce_sig_cache_enabled {
150 config.announce_sig_cache_max_entries
151 } else {
152 0
153 };
154 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
155 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
156 TransportEngine {
157 config,
158 path_table: BTreeMap::new(),
159 announce_table: BTreeMap::new(),
160 reverse_table: BTreeMap::new(),
161 link_table: BTreeMap::new(),
162 held_announces: BTreeMap::new(),
163 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
164 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
165 rate_limiter: AnnounceRateLimiter::new(),
166 path_states: BTreeMap::new(),
167 interfaces: BTreeMap::new(),
168 local_destinations: BTreeMap::new(),
169 blackholed_identities: BTreeMap::new(),
170 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
171 ingress_control: IngressControl::new(),
172 tunnel_table: TunnelTable::new(),
173 discovery_pr_tags: VecDeque::new(),
174 discovery_pr_tag_set: BTreeSet::new(),
175 discovery_path_requests: BTreeMap::new(),
176 path_destination_cap_evict_count: 0,
177 announces_last_checked: 0.0,
178 tables_last_culled: 0.0,
179 }
180 }
181
182 pub fn register_interface(&mut self, info: InterfaceInfo) {
187 self.interfaces.insert(info.id, info);
188 }
189
190 pub fn deregister_interface(&mut self, id: InterfaceId) {
191 self.interfaces.remove(&id);
192 self.announce_queues.remove_interface(id);
193 self.ingress_control.remove_interface(&id);
194 }
195
196 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
201 self.local_destinations.insert(dest_hash, dest_type);
202 }
203
204 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
205 self.local_destinations.remove(dest_hash);
206 }
207
208 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
213 self.path_table
214 .get(dest_hash)
215 .is_some_and(|ps| !ps.is_empty())
216 }
217
218 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
219 self.path_table
220 .get(dest_hash)
221 .and_then(|ps| ps.primary())
222 .map(|e| e.hops)
223 }
224
225 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
226 self.path_table
227 .get(dest_hash)
228 .and_then(|ps| ps.primary())
229 .map(|e| e.next_hop)
230 }
231
232 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
233 self.path_table
234 .get(dest_hash)
235 .and_then(|ps| ps.primary())
236 .map(|e| e.receiving_interface)
237 }
238
239 pub fn mark_path_unresponsive(
249 &mut self,
250 dest_hash: &[u8; 16],
251 receiving_interface: Option<InterfaceId>,
252 ) {
253 if let Some(iface_id) = receiving_interface {
254 if let Some(info) = self.interfaces.get(&iface_id) {
255 if info.mode == constants::MODE_BOUNDARY {
256 return;
257 }
258 }
259 }
260
261 if let Some(ps) = self.path_table.get_mut(dest_hash) {
263 if ps.len() > 1 {
264 ps.failover(false); self.path_states.remove(dest_hash);
267 return;
268 }
269 }
270
271 self.path_states
272 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
273 }
274
275 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
276 self.path_states
277 .insert(*dest_hash, constants::STATE_RESPONSIVE);
278 }
279
280 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
281 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
282 }
283
284 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
285 if let Some(ps) = self.path_table.get_mut(dest_hash) {
286 ps.expire_all();
287 }
288 }
289
290 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
295 self.link_table.insert(link_id, entry);
296 }
297
298 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
299 if let Some(entry) = self.link_table.get_mut(link_id) {
300 entry.validated = true;
301 }
302 }
303
304 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
305 self.link_table.remove(link_id);
306 }
307
308 pub fn blackhole_identity(
319 &mut self,
320 identity_hash: [u8; 16],
321 now: f64,
322 duration_hours: Option<f64>,
323 reason: Option<String>,
324 ) {
325 let expires = match duration_hours {
326 Some(h) if h > 0.0 => now + h * 3600.0,
327 _ => 0.0, };
329 self.blackholed_identities.insert(
330 identity_hash,
331 BlackholeEntry {
332 created: now,
333 expires,
334 reason,
335 },
336 );
337 }
338
339 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
344 self.blackholed_identities.remove(identity_hash).is_some()
345 }
346
347 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
349 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
350 if entry.expires == 0.0 || entry.expires > now {
351 return true;
352 }
353 }
354 false
355 }
356
357 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
359 self.blackholed_identities.iter()
360 }
361
362 fn cull_blackholed(&mut self, now: f64) {
364 self.blackholed_identities
365 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
366 }
367
368 pub fn handle_tunnel(
376 &mut self,
377 tunnel_id: [u8; 32],
378 interface: InterfaceId,
379 now: f64,
380 ) -> Vec<TransportAction> {
381 let mut actions = Vec::new();
382
383 if let Some(info) = self.interfaces.get_mut(&interface) {
385 info.tunnel_id = Some(tunnel_id);
386 }
387
388 let restored_paths = self.tunnel_table.handle_tunnel(
389 tunnel_id,
390 interface,
391 now,
392 self.config.destination_timeout_secs,
393 );
394
395 for (dest_hash, tunnel_path) in &restored_paths {
397 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
398 Some(existing) => {
399 if tunnel_path.hops <= existing.hops || existing.expires < now {
402 let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
403 let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
404 tunnel_timebase >= existing_timebase
405 } else {
406 false
407 }
408 }
409 None => now < tunnel_path.expires,
410 };
411
412 if should_restore {
413 let entry = PathEntry {
414 timestamp: tunnel_path.timestamp,
415 next_hop: tunnel_path.received_from,
416 hops: tunnel_path.hops,
417 expires: tunnel_path.expires,
418 random_blobs: tunnel_path.random_blobs.clone(),
419 receiving_interface: interface,
420 packet_hash: tunnel_path.packet_hash,
421 announce_raw: None,
422 };
423 self.upsert_path_destination(*dest_hash, entry, now);
424 }
425 }
426
427 actions.push(TransportAction::TunnelEstablished {
428 tunnel_id,
429 interface,
430 });
431
432 actions
433 }
434
435 pub fn synthesize_tunnel(
443 &self,
444 identity: &rns_crypto::identity::Identity,
445 interface_id: InterfaceId,
446 rng: &mut dyn Rng,
447 ) -> Vec<TransportAction> {
448 let mut actions = Vec::new();
449
450 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
452 hash::full_hash(info.name.as_bytes())
453 } else {
454 log::warn!(
455 "Cannot synthesize tunnel on {:?}: unknown interface",
456 interface_id
457 );
458 return actions;
459 };
460
461 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
462 Ok((data, _tunnel_id)) => {
463 let dest_hash = crate::destination::destination_hash(
464 "rnstransport",
465 &["tunnel", "synthesize"],
466 None,
467 );
468 actions.push(TransportAction::TunnelSynthesize {
469 interface: interface_id,
470 data,
471 dest_hash,
472 });
473 }
474 Err(e) => {
475 log::warn!("Cannot synthesize tunnel on {:?}: {}", interface_id, e);
476 }
477 }
478
479 actions
480 }
481
482 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
484 self.tunnel_table.void_tunnel_interface(tunnel_id);
485 }
486
487 pub fn tunnel_table(&self) -> &TunnelTable {
489 &self.tunnel_table
490 }
491
492 fn has_local_clients(&self) -> bool {
498 self.interfaces.values().any(|i| i.is_local_client)
499 }
500
501 fn packet_filter(&self, packet: &RawPacket) -> bool {
505 if packet.transport_id.is_some()
507 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
508 {
509 if let Some(ref identity_hash) = self.config.identity_hash {
510 if packet.transport_id.as_ref() != Some(identity_hash) {
511 return false;
512 }
513 }
514 }
515
516 match packet.context {
518 constants::CONTEXT_KEEPALIVE
519 | constants::CONTEXT_RESOURCE_REQ
520 | constants::CONTEXT_RESOURCE_PRF
521 | constants::CONTEXT_RESOURCE
522 | constants::CONTEXT_CACHE_REQUEST
523 | constants::CONTEXT_CHANNEL => return true,
524 _ => {}
525 }
526
527 if packet.flags.destination_type == constants::DESTINATION_PLAIN
529 || packet.flags.destination_type == constants::DESTINATION_GROUP
530 {
531 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
532 return packet.hops <= 1;
533 } else {
534 return false;
536 }
537 }
538
539 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
541 return true;
542 }
543
544 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
546 && packet.flags.destination_type == constants::DESTINATION_SINGLE
547 {
548 return true;
549 }
550
551 false
552 }
553
554 pub fn handle_inbound(
562 &mut self,
563 frame: InboundFrame<'_>,
564 rng: &mut dyn Rng,
565 ) -> Vec<TransportAction> {
566 self.handle_inbound_with_announce_queue(frame, rng, None)
567 }
568
569 pub fn handle_inbound_with_announce_queue(
570 &mut self,
571 frame: InboundFrame<'_>,
572 rng: &mut dyn Rng,
573 announce_queue: Option<&mut AnnounceVerifyQueue>,
574 ) -> Vec<TransportAction> {
575 let Some(ctx) = self.prepare_inbound_packet(frame) else {
576 return Vec::new();
577 };
578 let mut actions = Vec::new();
579
580 self.remember_inbound_packet_hash(&ctx.packet);
581 self.bridge_plain_broadcast(&ctx, &mut actions);
582 self.handle_transport_forwarding(&ctx, &mut actions);
583 self.handle_link_table_routing(&ctx, &mut actions);
584 self.handle_inbound_announce(&ctx, rng, announce_queue, &mut actions);
585
586 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
587 self.process_inbound_proof(&ctx.packet, ctx.iface, ctx.now, &mut actions);
588 }
589
590 self.handle_inbound_local_delivery(&ctx, &mut actions);
591 actions
592 }
593
594 fn prepare_inbound_packet(&self, frame: InboundFrame<'_>) -> Option<InboundPacketCtx> {
595 let mut packet = RawPacket::unpack(frame.raw).ok()?;
596 let from_local_client = self
597 .interfaces
598 .get(&frame.iface)
599 .map(|i| i.is_local_client)
600 .unwrap_or(false);
601 packet.hops += 1;
602 packet.rssi = frame.rx.rssi;
603 packet.snr = frame.rx.snr;
604 if from_local_client {
605 packet.hops = packet.hops.saturating_sub(1);
606 }
607 if !self.packet_filter(&packet) {
608 return None;
609 }
610 let retain_original_raw = packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE;
611 Some(InboundPacketCtx {
612 packet,
613 original_raw: if retain_original_raw {
614 Some(frame.raw.to_vec())
615 } else {
616 None
617 },
618 iface: frame.iface,
619 now: frame.now,
620 from_local_client,
621 })
622 }
623
624 fn remember_inbound_packet_hash(&mut self, packet: &RawPacket) {
625 let remember_hash = !(self.link_table.contains_key(&packet.destination_hash)
626 || (packet.flags.packet_type == constants::PACKET_TYPE_PROOF
627 && packet.context == constants::CONTEXT_LRPROOF));
628 if remember_hash {
629 self.packet_hashlist.add(packet.packet_hash);
630 }
631 }
632
633 fn bridge_plain_broadcast(&self, ctx: &InboundPacketCtx, actions: &mut Vec<TransportAction>) {
634 if ctx.packet.flags.destination_type != constants::DESTINATION_PLAIN
635 || ctx.packet.flags.transport_type != constants::TRANSPORT_BROADCAST
636 || !self.has_local_clients()
637 {
638 return;
639 }
640
641 if ctx.from_local_client {
642 actions.push(TransportAction::ForwardPlainBroadcast {
643 raw: PacketBytes::from(ctx.packet.raw.clone()),
644 to_local: false,
645 exclude: Some(ctx.iface),
646 });
647 } else {
648 actions.push(TransportAction::ForwardPlainBroadcast {
649 raw: PacketBytes::from(ctx.packet.raw.clone()),
650 to_local: true,
651 exclude: None,
652 });
653 }
654 }
655
656 fn handle_transport_forwarding(
657 &mut self,
658 ctx: &InboundPacketCtx,
659 actions: &mut Vec<TransportAction>,
660 ) {
661 if !(self.config.transport_enabled || self.config.identity_hash.is_some()) {
662 return;
663 }
664 if ctx.packet.transport_id.is_none()
665 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
666 {
667 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA {
668 log::debug!(
669 "TransportForward: DATA dest={:02x}{:02x}{:02x}{:02x}.. not transport-addressed header={} iface={}",
670 ctx.packet.destination_hash[0],
671 ctx.packet.destination_hash[1],
672 ctx.packet.destination_hash[2],
673 ctx.packet.destination_hash[3],
674 ctx.packet.flags.header_type,
675 ctx.iface.0
676 );
677 }
678 return;
679 }
680
681 let Some(identity_hash) = self.config.identity_hash else {
682 return;
683 };
684 if ctx.packet.transport_id != Some(identity_hash) {
685 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA {
686 log::debug!(
687 "TransportForward: DATA dest={:02x}{:02x}{:02x}{:02x}.. transport mismatch got={:02x?} own={:02x?} iface={}",
688 ctx.packet.destination_hash[0],
689 ctx.packet.destination_hash[1],
690 ctx.packet.destination_hash[2],
691 ctx.packet.destination_hash[3],
692 ctx.packet.transport_id.as_ref().map(|id| &id[..4]),
693 &identity_hash[..4],
694 ctx.iface.0
695 );
696 }
697 return;
698 }
699
700 let Some(path_entry) = self
701 .path_table
702 .get(&ctx.packet.destination_hash)
703 .and_then(|ps| ps.primary())
704 else {
705 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA {
706 log::debug!(
707 "TransportForward: DATA dest={:02x}{:02x}{:02x}{:02x}.. addressed to us but no path iface={}",
708 ctx.packet.destination_hash[0],
709 ctx.packet.destination_hash[1],
710 ctx.packet.destination_hash[2],
711 ctx.packet.destination_hash[3],
712 ctx.iface.0
713 );
714 }
715 return;
716 };
717
718 let next_hop = path_entry.next_hop;
719 let remaining_hops = path_entry.hops;
720 let outbound_interface = path_entry.receiving_interface;
721 let outbound_is_local_client = self
722 .interfaces
723 .get(&outbound_interface)
724 .map(|info| info.is_local_client)
725 .unwrap_or(false);
726 let forwarded_remaining_hops = if outbound_is_local_client {
727 0
728 } else {
729 remaining_hops
730 };
731 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA {
732 log::debug!(
733 "TransportForward: DATA dest={:02x}{:02x}{:02x}{:02x}.. remaining_hops={} out_iface={} next_hop={:02x?}",
734 ctx.packet.destination_hash[0],
735 ctx.packet.destination_hash[1],
736 ctx.packet.destination_hash[2],
737 ctx.packet.destination_hash[3],
738 remaining_hops,
739 outbound_interface.0,
740 &next_hop[..4]
741 );
742 }
743 let new_raw = forward_transport_packet(
744 &ctx.packet,
745 next_hop,
746 forwarded_remaining_hops,
747 outbound_interface,
748 );
749
750 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
751 let proof_timeout = ctx.now
752 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP * (remaining_hops.max(1) as f64);
753 let (link_id, link_entry) = create_link_entry(
754 &ctx.packet,
755 next_hop,
756 outbound_interface,
757 remaining_hops,
758 ctx.iface,
759 ctx.now,
760 proof_timeout,
761 );
762 self.link_table.insert(link_id, link_entry);
763 actions.push(TransportAction::LinkRequestReceived {
764 link_id,
765 destination_hash: ctx.packet.destination_hash,
766 receiving_interface: ctx.iface,
767 });
768 } else {
769 let (trunc_hash, reverse_entry) =
770 create_reverse_entry(&ctx.packet, outbound_interface, ctx.iface, ctx.now);
771 self.reverse_table.insert(trunc_hash, reverse_entry);
772 }
773
774 actions.push(TransportAction::SendOnInterface {
775 interface: outbound_interface,
776 raw: new_raw.into(),
777 });
778
779 if let Some(entry) = self
780 .path_table
781 .get_mut(&ctx.packet.destination_hash)
782 .and_then(|ps| ps.primary_mut())
783 {
784 entry.timestamp = ctx.now;
785 }
786 }
787
788 fn handle_link_table_routing(
789 &mut self,
790 ctx: &InboundPacketCtx,
791 actions: &mut Vec<TransportAction>,
792 ) {
793 if !self.config.transport_enabled && self.config.identity_hash.is_none() {
794 return;
795 }
796 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
797 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
798 || ctx.packet.context == constants::CONTEXT_LRPROOF
799 {
800 return;
801 }
802
803 let Some(link_entry) = self.link_table.get(&ctx.packet.destination_hash).cloned() else {
804 return;
805 };
806 let Some((outbound_iface, new_raw)) =
807 route_via_link_table(&ctx.packet, &link_entry, ctx.iface)
808 else {
809 return;
810 };
811
812 self.packet_hashlist.add(ctx.packet.packet_hash);
813 actions.push(TransportAction::SendOnInterface {
814 interface: outbound_iface,
815 raw: new_raw.into(),
816 });
817
818 if let Some(entry) = self.link_table.get_mut(&ctx.packet.destination_hash) {
819 entry.timestamp = ctx.now;
820 }
821 }
822
823 fn handle_inbound_announce(
824 &mut self,
825 ctx: &InboundPacketCtx,
826 rng: &mut dyn Rng,
827 announce_queue: Option<&mut AnnounceVerifyQueue>,
828 actions: &mut Vec<TransportAction>,
829 ) {
830 if ctx.packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
831 return;
832 }
833
834 if let Some(queue) = announce_queue {
835 self.try_enqueue_announce(ctx, rng, queue, actions);
836 } else {
837 let original_raw = ctx
838 .original_raw
839 .as_deref()
840 .expect("announce packets retain original raw bytes");
841 self.process_inbound_announce(
842 &ctx.packet,
843 original_raw,
844 ctx.iface,
845 ctx.now,
846 rng,
847 actions,
848 );
849 }
850 }
851
852 fn handle_inbound_local_delivery(
853 &self,
854 ctx: &InboundPacketCtx,
855 actions: &mut Vec<TransportAction>,
856 ) {
857 if (ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
858 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA)
859 && self
860 .local_destinations
861 .contains_key(&ctx.packet.destination_hash)
862 {
863 actions.push(TransportAction::DeliverLocal {
864 destination_hash: ctx.packet.destination_hash,
865 raw: PacketBytes::from(ctx.packet.raw.clone()),
866 packet_hash: ctx.packet.packet_hash,
867 receiving_interface: ctx.iface,
868 });
869 }
870 }
871
872 fn process_inbound_announce(
877 &mut self,
878 packet: &RawPacket,
879 original_raw: &[u8],
880 iface: InterfaceId,
881 now: f64,
882 rng: &mut dyn Rng,
883 actions: &mut Vec<TransportAction>,
884 ) {
885 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
886 return;
887 }
888
889 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
890
891 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
893 Ok(a) => a,
894 Err(_) => return,
895 };
896
897 if self.should_hold_announce(packet, original_raw, iface, now) {
898 return;
899 }
900
901 let sig_cache_key =
902 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
903
904 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
905 announce.to_validated_unchecked()
906 } else {
907 match announce.validate(&packet.destination_hash) {
908 Ok(v) => {
909 self.announce_sig_cache.insert(sig_cache_key, now);
910 v
911 }
912 Err(_) => return,
913 }
914 };
915
916 let received_from = self.announce_received_from(packet, now);
917 let random_blob = match extract_random_blob(&packet.data) {
918 Some(b) => b,
919 None => return,
920 };
921 let announce_emitted = timebase_from_random_blob(&random_blob);
922
923 self.process_verified_announce(
924 VerifiedAnnounceCtx {
925 packet,
926 original_raw,
927 iface,
928 now,
929 validated,
930 received_from,
931 random_blob,
932 announce_emitted,
933 },
934 rng,
935 actions,
936 );
937 }
938
939 fn announce_raw_for_local_clients(&self, packet: &RawPacket) -> PacketBytes {
940 let Some(identity_hash) = self.config.identity_hash else {
941 return PacketBytes::from(packet.raw.clone());
942 };
943
944 if packet.raw.len() < 2 {
945 return PacketBytes::from(packet.raw.clone());
946 }
947
948 let payload_start = if packet.flags.header_type == constants::HEADER_2 {
949 18usize
950 } else {
951 2usize
952 };
953 if packet.raw.len() < payload_start {
954 return PacketBytes::from(packet.raw.clone());
955 }
956
957 let flags = (constants::HEADER_2 << 6)
958 | (constants::TRANSPORT_TRANSPORT << 4)
959 | (packet.raw[0] & 0x0F);
960 let mut raw = Vec::with_capacity(18 + packet.raw.len() - payload_start);
961 raw.push(flags);
962 raw.push(packet.hops);
963 raw.extend_from_slice(&identity_hash);
964 raw.extend_from_slice(&packet.raw[payload_start..]);
965 PacketBytes::from(raw)
966 }
967
968 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
969 let mut material = [0u8; 80];
970 material[..16].copy_from_slice(&destination_hash);
971 material[16..].copy_from_slice(signature);
972 hash::full_hash(&material)
973 }
974
975 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
976 if let Some(transport_id) = packet.transport_id {
977 if self.config.transport_enabled {
978 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
979 {
980 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
981 announce_entry.local_rebroadcasts += 1;
982 if announce_entry.retries > 0
983 && announce_entry.local_rebroadcasts
984 >= constants::LOCAL_REBROADCASTS_MAX
985 {
986 self.announce_table.remove(&packet.destination_hash);
987 }
988 }
989 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
990 {
991 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
992 && announce_entry.retries > 0
993 && now < announce_entry.retransmit_timeout
994 {
995 self.announce_table.remove(&packet.destination_hash);
996 }
997 }
998 }
999 }
1000 transport_id
1001 } else {
1002 packet.destination_hash
1003 }
1004 }
1005
1006 fn should_hold_announce(
1007 &mut self,
1008 packet: &RawPacket,
1009 original_raw: &[u8],
1010 iface: InterfaceId,
1011 now: f64,
1012 ) -> bool {
1013 if self.has_path(&packet.destination_hash) {
1014 return false;
1015 }
1016 if self
1017 .discovery_path_requests
1018 .contains_key(&packet.destination_hash)
1019 {
1020 return false;
1021 }
1022 let Some(info) = self.interfaces.get(&iface) else {
1023 return false;
1024 };
1025 if packet.context == constants::CONTEXT_PATH_RESPONSE
1026 || !self.ingress_control.should_ingress_limit(
1027 iface,
1028 &info.ingress_control,
1029 info.ia_freq,
1030 info.started,
1031 now,
1032 )
1033 {
1034 return false;
1035 }
1036 self.ingress_control.hold_announce(
1037 iface,
1038 &info.ingress_control,
1039 packet.destination_hash,
1040 ingress_control::HeldAnnounce {
1041 raw: original_raw.to_vec(),
1042 hops: packet.hops,
1043 receiving_interface: iface,
1044 rx: RxMetadata {
1045 rssi: packet.rssi,
1046 snr: packet.snr,
1047 },
1048 timestamp: now,
1049 },
1050 );
1051 true
1052 }
1053
1054 fn try_enqueue_announce(
1055 &mut self,
1056 ctx: &InboundPacketCtx,
1057 rng: &mut dyn Rng,
1058 announce_queue: &mut AnnounceVerifyQueue,
1059 actions: &mut Vec<TransportAction>,
1060 ) {
1061 if ctx.packet.flags.destination_type != constants::DESTINATION_SINGLE {
1062 return;
1063 }
1064
1065 let has_ratchet = ctx.packet.flags.context_flag == constants::FLAG_SET;
1066 let announce = match AnnounceData::unpack(&ctx.packet.data, has_ratchet) {
1067 Ok(a) => a,
1068 Err(_) => return,
1069 };
1070
1071 let received_from = self.announce_received_from(&ctx.packet, ctx.now);
1072
1073 if self
1074 .local_destinations
1075 .contains_key(&ctx.packet.destination_hash)
1076 {
1077 log::debug!(
1078 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1079 ctx.packet.destination_hash[0],
1080 ctx.packet.destination_hash[1],
1081 ctx.packet.destination_hash[2],
1082 ctx.packet.destination_hash[3],
1083 );
1084 return;
1085 }
1086
1087 let original_raw = ctx
1088 .original_raw
1089 .as_deref()
1090 .expect("announce packets retain original raw bytes");
1091 if self.should_hold_announce(&ctx.packet, original_raw, ctx.iface, ctx.now) {
1092 return;
1093 }
1094
1095 let sig_cache_key =
1096 Self::announce_sig_cache_key(ctx.packet.destination_hash, &announce.signature);
1097 if self.announce_sig_cache.contains(&sig_cache_key) {
1098 let validated = announce.to_validated_unchecked();
1099 let random_blob = match extract_random_blob(&ctx.packet.data) {
1100 Some(b) => b,
1101 None => return,
1102 };
1103 let announce_emitted = timebase_from_random_blob(&random_blob);
1104 self.process_verified_announce(
1105 VerifiedAnnounceCtx {
1106 packet: &ctx.packet,
1107 original_raw,
1108 iface: ctx.iface,
1109 now: ctx.now,
1110 validated,
1111 received_from,
1112 random_blob,
1113 announce_emitted,
1114 },
1115 rng,
1116 actions,
1117 );
1118 return;
1119 }
1120
1121 if ctx.packet.context == constants::CONTEXT_PATH_RESPONSE {
1122 let Ok(validated) = announce.validate(&ctx.packet.destination_hash) else {
1123 return;
1124 };
1125 self.announce_sig_cache.insert(sig_cache_key, ctx.now);
1126 let random_blob = match extract_random_blob(&ctx.packet.data) {
1127 Some(b) => b,
1128 None => return,
1129 };
1130 let announce_emitted = timebase_from_random_blob(&random_blob);
1131 self.process_verified_announce(
1132 VerifiedAnnounceCtx {
1133 packet: &ctx.packet,
1134 original_raw,
1135 iface: ctx.iface,
1136 now: ctx.now,
1137 validated,
1138 received_from,
1139 random_blob,
1140 announce_emitted,
1141 },
1142 rng,
1143 actions,
1144 );
1145 return;
1146 }
1147
1148 let random_blob = match extract_random_blob(&ctx.packet.data) {
1149 Some(b) => b,
1150 None => return,
1151 };
1152 let announce_emitted = timebase_from_random_blob(&random_blob);
1153 let key = AnnounceVerifyKey {
1154 destination_hash: ctx.packet.destination_hash,
1155 random_blob,
1156 received_from,
1157 };
1158 let pending = PendingAnnounce {
1159 original_raw: original_raw.to_vec(),
1160 packet: ctx.packet.clone(),
1161 interface: ctx.iface,
1162 received_from,
1163 queued_at: ctx.now,
1164 best_hops: ctx.packet.hops,
1165 emission_ts: announce_emitted,
1166 random_blob,
1167 };
1168 let _ = announce_queue.enqueue(key, pending);
1169 }
1170
1171 pub fn complete_verified_announce(
1172 &mut self,
1173 pending: PendingAnnounce,
1174 validated: crate::announce::ValidatedAnnounce,
1175 sig_cache_key: [u8; 32],
1176 now: f64,
1177 rng: &mut dyn Rng,
1178 ) -> Vec<TransportAction> {
1179 self.announce_sig_cache.insert(sig_cache_key, now);
1180 let mut actions = Vec::new();
1181 self.process_verified_announce(
1182 VerifiedAnnounceCtx {
1183 packet: &pending.packet,
1184 original_raw: &pending.original_raw,
1185 iface: pending.interface,
1186 now,
1187 validated,
1188 received_from: pending.received_from,
1189 random_blob: pending.random_blob,
1190 announce_emitted: pending.emission_ts,
1191 },
1192 rng,
1193 &mut actions,
1194 );
1195 actions
1196 }
1197
1198 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1199
1200 fn process_verified_announce(
1201 &mut self,
1202 ctx: VerifiedAnnounceCtx<'_>,
1203 rng: &mut dyn Rng,
1204 actions: &mut Vec<TransportAction>,
1205 ) {
1206 if self.is_blackholed(&ctx.validated.identity_hash, ctx.now) {
1207 return;
1208 }
1209 if ctx.packet.hops > constants::PATHFINDER_M {
1210 return;
1211 }
1212
1213 let existing_set = self.path_table.get(&ctx.packet.destination_hash);
1214 let was_unknown_destination = existing_set.is_none_or(|ps| ps.is_empty());
1215
1216 if was_unknown_destination {
1219 self.path_states.remove(&ctx.packet.destination_hash);
1220 }
1221
1222 let is_unresponsive = self.path_is_unresponsive(&ctx.packet.destination_hash);
1224
1225 let mp_decision = decide_announce_multipath(
1226 existing_set,
1227 ctx.packet.hops,
1228 ctx.announce_emitted,
1229 &ctx.random_blob,
1230 &ctx.received_from,
1231 is_unresponsive,
1232 ctx.now,
1233 self.config.prefer_shorter_path,
1234 );
1235
1236 if mp_decision == MultiPathDecision::Reject {
1237 log::debug!(
1238 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1239 ctx.packet.destination_hash[0],
1240 ctx.packet.destination_hash[1],
1241 ctx.packet.destination_hash[2],
1242 ctx.packet.destination_hash[3],
1243 );
1244 return;
1245 }
1246
1247 let rate_blocked = if ctx.packet.context != constants::CONTEXT_PATH_RESPONSE {
1249 if let Some(iface_info) = self.interfaces.get(&ctx.iface) {
1250 self.rate_limiter.check_and_update(
1251 &ctx.packet.destination_hash,
1252 ctx.now,
1253 iface_info.announce_rate_target,
1254 iface_info.announce_rate_grace,
1255 iface_info.announce_rate_penalty,
1256 )
1257 } else {
1258 false
1259 }
1260 } else {
1261 false
1262 };
1263
1264 let interface_mode = self
1266 .interfaces
1267 .get(&ctx.iface)
1268 .map(|i| i.mode)
1269 .unwrap_or(constants::MODE_FULL);
1270
1271 let expires = compute_path_expires(ctx.now, interface_mode);
1272
1273 let existing_blobs = self
1275 .path_table
1276 .get(&ctx.packet.destination_hash)
1277 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1278 .map(|e| e.random_blobs.clone())
1279 .unwrap_or_default();
1280
1281 let mut rng_bytes = [0u8; 8];
1283 rng.fill_bytes(&mut rng_bytes);
1284 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1285
1286 let is_path_response = ctx.packet.context == constants::CONTEXT_PATH_RESPONSE;
1287
1288 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1289 ctx.packet.destination_hash,
1290 ctx.packet.hops,
1291 &ctx.packet.data,
1292 &ctx.packet.raw,
1293 ctx.packet.packet_hash,
1294 ctx.packet.flags.context_flag,
1295 ctx.received_from,
1296 ctx.iface,
1297 ctx.now,
1298 existing_blobs,
1299 ctx.random_blob,
1300 expires,
1301 rng_value,
1302 self.config.transport_enabled,
1303 is_path_response,
1304 rate_blocked,
1305 Some(ctx.original_raw.to_vec()),
1306 );
1307
1308 actions.push(TransportAction::CacheAnnounce {
1310 packet_hash: ctx.packet.packet_hash,
1311 raw: ctx.original_raw.to_vec().into(),
1312 });
1313
1314 self.upsert_path_destination(ctx.packet.destination_hash, path_entry, ctx.now);
1316
1317 if let Some(tunnel_id) = self.interfaces.get(&ctx.iface).and_then(|i| i.tunnel_id) {
1319 let blobs = self
1320 .path_table
1321 .get(&ctx.packet.destination_hash)
1322 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1323 .map(|e| e.random_blobs.clone())
1324 .unwrap_or_default();
1325 self.tunnel_table.store_tunnel_path(
1326 &tunnel_id,
1327 ctx.packet.destination_hash,
1328 tunnel::TunnelPath {
1329 timestamp: ctx.now,
1330 received_from: ctx.received_from,
1331 hops: ctx.packet.hops,
1332 expires,
1333 random_blobs: blobs,
1334 packet_hash: ctx.packet.packet_hash,
1335 },
1336 ctx.now,
1337 self.config.destination_timeout_secs,
1338 self.config.max_tunnel_destinations_total,
1339 );
1340 }
1341
1342 self.path_states.remove(&ctx.packet.destination_hash);
1345
1346 if let Some(ann) = announce_entry {
1348 self.insert_announce_entry(ctx.packet.destination_hash, ann, ctx.now);
1349 }
1350
1351 actions.push(TransportAction::AnnounceReceived {
1353 destination_hash: ctx.packet.destination_hash,
1354 identity_hash: ctx.validated.identity_hash,
1355 public_key: ctx.validated.public_key,
1356 name_hash: ctx.validated.name_hash,
1357 random_hash: ctx.validated.random_hash,
1358 ratchet: ctx.validated.ratchet,
1359 app_data: ctx.validated.app_data,
1360 hops: ctx.packet.hops,
1361 receiving_interface: ctx.iface,
1362 rx: RxMetadata {
1363 rssi: ctx.packet.rssi,
1364 snr: ctx.packet.snr,
1365 },
1366 });
1367
1368 actions.push(TransportAction::PathUpdated {
1369 destination_hash: ctx.packet.destination_hash,
1370 hops: ctx.packet.hops,
1371 next_hop: ctx.received_from,
1372 interface: ctx.iface,
1373 });
1374
1375 if self.has_local_clients() {
1377 actions.push(TransportAction::ForwardToLocalClients {
1378 raw: self.announce_raw_for_local_clients(ctx.packet),
1379 exclude: Some(ctx.iface),
1380 });
1381 }
1382
1383 if let Some(pr_entry) = self.discovery_path_requests_waiting(&ctx.packet.destination_hash) {
1385 let entry = AnnounceEntry {
1387 timestamp: ctx.now,
1388 retransmit_timeout: ctx.now,
1389 retries: constants::PATHFINDER_R,
1390 received_from: ctx.received_from,
1391 hops: ctx.packet.hops,
1392 packet_raw: ctx.packet.raw.clone(),
1393 packet_data: ctx.packet.data.clone(),
1394 destination_hash: ctx.packet.destination_hash,
1395 context_flag: ctx.packet.flags.context_flag,
1396 local_rebroadcasts: 0,
1397 block_rebroadcasts: true,
1398 attached_interface: Some(pr_entry),
1399 };
1400 self.insert_announce_entry(ctx.packet.destination_hash, entry, ctx.now);
1401 }
1402 }
1403
1404 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1405 self.announce_sig_cache.contains(sig_cache_key)
1406 }
1407
1408 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1411 self.discovery_path_requests
1412 .remove(dest_hash)
1413 .map(|req| req.requesting_interface)
1414 }
1415
1416 fn process_inbound_proof(
1421 &mut self,
1422 packet: &RawPacket,
1423 iface: InterfaceId,
1424 _now: f64,
1425 actions: &mut Vec<TransportAction>,
1426 ) {
1427 if packet.context == constants::CONTEXT_LRPROOF {
1428 if (self.config.transport_enabled)
1430 && self.link_table.contains_key(&packet.destination_hash)
1431 {
1432 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1433 if let Some(entry) = link_entry {
1434 if let Some((outbound_interface, new_raw)) =
1435 route_via_link_table(packet, &entry, iface)
1436 {
1437 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1442 le.validated = true;
1443 }
1444
1445 actions.push(TransportAction::LinkEstablished {
1446 link_id: packet.destination_hash,
1447 interface: outbound_interface,
1448 });
1449
1450 actions.push(TransportAction::SendOnInterface {
1451 interface: outbound_interface,
1452 raw: new_raw.into(),
1453 });
1454 }
1455 }
1456 } else {
1457 actions.push(TransportAction::DeliverLocal {
1459 destination_hash: packet.destination_hash,
1460 raw: PacketBytes::from(packet.raw.clone()),
1461 packet_hash: packet.packet_hash,
1462 receiving_interface: iface,
1463 });
1464 }
1465 } else {
1466 if self.config.transport_enabled {
1468 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1469 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1470 actions.push(action);
1471 }
1472 }
1473 }
1474
1475 actions.push(TransportAction::DeliverLocal {
1477 destination_hash: packet.destination_hash,
1478 raw: PacketBytes::from(packet.raw.clone()),
1479 packet_hash: packet.packet_hash,
1480 receiving_interface: iface,
1481 });
1482 }
1483 }
1484
1485 pub fn handle_outbound(
1491 &mut self,
1492 packet: &RawPacket,
1493 dest_type: u8,
1494 attached_interface: Option<InterfaceId>,
1495 now: f64,
1496 ) -> Vec<TransportAction> {
1497 let actions = route_outbound(
1498 &self.path_table,
1499 &self.interfaces,
1500 &self.local_destinations,
1501 packet,
1502 dest_type,
1503 attached_interface,
1504 now,
1505 );
1506
1507 self.packet_hashlist.add(packet.packet_hash);
1509
1510 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1512 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1513 } else {
1514 actions
1515 }
1516 }
1517
1518 fn gate_announce_actions(
1520 &mut self,
1521 actions: Vec<TransportAction>,
1522 dest_hash: &[u8; 16],
1523 hops: u8,
1524 now: f64,
1525 ) -> Vec<TransportAction> {
1526 let mut result = Vec::new();
1527 for action in actions {
1528 match action {
1529 TransportAction::SendOnInterface { interface, raw } => {
1530 let (bitrate, airtime_profile, announce_cap) =
1531 if let Some(info) = self.interfaces.get(&interface) {
1532 (info.bitrate, info.airtime_profile, info.announce_cap)
1533 } else {
1534 (None, None, constants::ANNOUNCE_CAP)
1535 };
1536 if let Some(send_action) = self.announce_queues.gate_announce(
1537 interface,
1538 raw,
1539 *dest_hash,
1540 hops,
1541 now,
1542 now,
1543 bitrate,
1544 airtime_profile,
1545 announce_cap,
1546 ) {
1547 result.push(send_action);
1548 }
1549 }
1551 other => result.push(other),
1552 }
1553 }
1554 result
1555 }
1556
1557 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1563 let mut ctx = TickCtx {
1564 now,
1565 rng,
1566 actions: Vec::new(),
1567 };
1568 self.process_tick_pending_announces(&mut ctx);
1569
1570 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1571 ctx.actions.append(&mut queue_actions);
1572
1573 self.process_tick_ingress_release(&mut ctx);
1574 self.cull_tick_tables(&mut ctx);
1575 ctx.actions
1576 }
1577
1578 fn process_tick_pending_announces(&mut self, ctx: &mut TickCtx<'_>) {
1579 if ctx.now <= self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1580 return;
1581 }
1582
1583 self.cull_expired_announce_entries(ctx.now);
1584 self.enforce_announce_retention_cap(ctx.now);
1585 if let Some(identity_hash) = self.config.identity_hash {
1586 let announce_actions = jobs::process_pending_announces(
1587 &mut self.announce_table,
1588 &mut self.held_announces,
1589 &identity_hash,
1590 ctx.now,
1591 );
1592 let gated = self.gate_retransmit_actions(announce_actions, ctx.now);
1593 ctx.actions.extend(gated);
1594 }
1595 self.cull_expired_announce_entries(ctx.now);
1596 self.enforce_announce_retention_cap(ctx.now);
1597 self.announces_last_checked = ctx.now;
1598 }
1599
1600 fn process_tick_ingress_release(&mut self, ctx: &mut TickCtx<'_>) {
1601 let ic_interfaces = self.ingress_control.interfaces_with_held();
1602 for iface_id in ic_interfaces {
1603 let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1604 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1605 None => continue,
1606 };
1607 if !ingress_config.enabled {
1608 continue;
1609 }
1610 if let Some(held) = self.ingress_control.process_held_announces(
1611 iface_id,
1612 &ingress_config,
1613 ia_freq,
1614 started,
1615 ctx.now,
1616 ) {
1617 let released_actions = self.handle_inbound(
1618 InboundFrame {
1619 raw: &held.raw,
1620 iface: held.receiving_interface,
1621 now: ctx.now,
1622 rx: held.rx,
1623 },
1624 ctx.rng,
1625 );
1626 ctx.actions.extend(released_actions);
1627 }
1628 }
1629 }
1630
1631 fn cull_tick_tables(&mut self, ctx: &mut TickCtx<'_>) {
1632 if ctx.now <= self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1633 return;
1634 }
1635
1636 jobs::cull_path_table(&mut self.path_table, &self.interfaces, ctx.now);
1637 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, ctx.now);
1638 let (_culled, link_closed_actions) =
1639 jobs::cull_link_table(&mut self.link_table, &self.interfaces, ctx.now);
1640 ctx.actions.extend(link_closed_actions);
1641 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1642 self.cull_blackholed(ctx.now);
1643 self.discovery_path_requests
1644 .retain(|_, req| ctx.now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1645 self.tunnel_table
1646 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1647 self.tunnel_table.cull(ctx.now);
1648 self.announce_sig_cache.cull(ctx.now);
1649 self.tables_last_culled = ctx.now;
1650 }
1651
1652 fn gate_retransmit_actions(
1657 &mut self,
1658 actions: Vec<TransportAction>,
1659 now: f64,
1660 ) -> Vec<TransportAction> {
1661 let mut result = Vec::new();
1662 for action in actions {
1663 match action {
1664 TransportAction::SendOnInterface { interface, raw } => {
1665 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1667 let (bitrate, airtime_profile, announce_cap) =
1668 if let Some(info) = self.interfaces.get(&interface) {
1669 (info.bitrate, info.airtime_profile, info.announce_cap)
1670 } else {
1671 (None, None, constants::ANNOUNCE_CAP)
1672 };
1673 if let Some(send_action) = self.announce_queues.gate_announce(
1674 interface,
1675 raw,
1676 dest_hash,
1677 hops,
1678 now,
1679 now,
1680 bitrate,
1681 airtime_profile,
1682 announce_cap,
1683 ) {
1684 result.push(send_action);
1685 }
1686 }
1687 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1688 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1689 let iface_ids: Vec<(
1692 InterfaceId,
1693 Option<u64>,
1694 Option<types::AirtimeProfile>,
1695 f64,
1696 )> = self
1697 .interfaces
1698 .iter()
1699 .filter(|(_, info)| info.out_capable)
1700 .filter(|(id, _)| {
1701 if let Some(ref ex) = exclude {
1702 **id != *ex
1703 } else {
1704 true
1705 }
1706 })
1707 .filter(|(_, info)| {
1708 should_transmit_announce(
1709 info,
1710 &dest_hash,
1711 hops,
1712 &self.local_destinations,
1713 &self.path_table,
1714 &self.interfaces,
1715 )
1716 })
1717 .map(|(id, info)| {
1718 (*id, info.bitrate, info.airtime_profile, info.announce_cap)
1719 })
1720 .collect();
1721
1722 for (iface_id, bitrate, airtime_profile, announce_cap) in iface_ids {
1723 if let Some(send_action) = self.announce_queues.gate_announce(
1724 iface_id,
1725 raw.clone(),
1726 dest_hash,
1727 hops,
1728 now,
1729 now,
1730 bitrate,
1731 airtime_profile,
1732 announce_cap,
1733 ) {
1734 result.push(send_action);
1735 }
1736 }
1737 }
1738 other => result.push(other),
1739 }
1740 }
1741 result
1742 }
1743
1744 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1746 if raw.len() < 18 {
1747 return ([0; 16], 0);
1748 }
1749 let header_type = (raw[0] >> 6) & 0x03;
1750 let hops = raw[1];
1751 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1752 let mut dest = [0u8; 16];
1754 dest.copy_from_slice(&raw[18..34]);
1755 (dest, hops)
1756 } else {
1757 let mut dest = [0u8; 16];
1759 dest.copy_from_slice(&raw[2..18]);
1760 (dest, hops)
1761 }
1762 }
1763
1764 #[cfg(test)]
1765 #[allow(dead_code)]
1766 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
1767 &self.link_table
1768 }
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773 use super::*;
1774 use crate::packet::PacketFlags;
1775
1776 fn make_config(transport_enabled: bool) -> TransportConfig {
1777 TransportConfig {
1778 transport_enabled,
1779 identity_hash: if transport_enabled {
1780 Some([0x42; 16])
1781 } else {
1782 None
1783 },
1784 prefer_shorter_path: false,
1785 max_paths_per_destination: 1,
1786 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
1787 max_discovery_pr_tags: constants::MAX_PR_TAGS,
1788 max_path_destinations: usize::MAX,
1789 max_tunnel_destinations_total: usize::MAX,
1790 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
1791 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
1792 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
1793 announce_sig_cache_enabled: true,
1794 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
1795 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
1796 announce_queue_max_entries: 256,
1797 announce_queue_max_interfaces: 1024,
1798 }
1799 }
1800
1801 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
1802 InterfaceInfo {
1803 id: InterfaceId(id),
1804 name: String::from("test"),
1805 mode,
1806 out_capable: true,
1807 in_capable: true,
1808 bitrate: None,
1809 airtime_profile: None,
1810 announce_rate_target: None,
1811 announce_rate_grace: 0,
1812 announce_rate_penalty: 0.0,
1813 announce_cap: constants::ANNOUNCE_CAP,
1814 is_local_client: false,
1815 wants_tunnel: false,
1816 tunnel_id: None,
1817 mtu: constants::MTU as u32,
1818 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
1819 ia_freq: 0.0,
1820 ip_freq: 0.0,
1821 op_freq: 0.0,
1822 op_samples: 0,
1823 started: 0.0,
1824 }
1825 }
1826
1827 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
1828 AnnounceEntry {
1829 timestamp,
1830 retransmit_timeout: timestamp,
1831 retries: 0,
1832 received_from: [0xAA; 16],
1833 hops: 2,
1834 packet_raw: vec![0x01; fill_len],
1835 packet_data: vec![0x02; fill_len],
1836 destination_hash: dest_hash,
1837 context_flag: 0,
1838 local_rebroadcasts: 0,
1839 block_rebroadcasts: false,
1840 attached_interface: None,
1841 }
1842 }
1843
1844 fn make_path_entry(
1845 timestamp: f64,
1846 hops: u8,
1847 receiving_interface: InterfaceId,
1848 next_hop: [u8; 16],
1849 ) -> PathEntry {
1850 PathEntry {
1851 timestamp,
1852 next_hop,
1853 hops,
1854 expires: timestamp + 10_000.0,
1855 random_blobs: Vec::new(),
1856 receiving_interface,
1857 packet_hash: [0; 32],
1858 announce_raw: None,
1859 }
1860 }
1861
1862 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
1863 let mut unique_tag = [0u8; 32];
1864 let tag_len = tag.len().min(16);
1865 unique_tag[..16].copy_from_slice(&dest_hash);
1866 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1867 unique_tag
1868 }
1869
1870 fn make_random_blob(timebase: u64) -> [u8; 10] {
1871 let mut blob = [0u8; 10];
1872 let bytes = timebase.to_be_bytes();
1873 blob[5..10].copy_from_slice(&bytes[3..8]);
1874 blob
1875 }
1876
1877 #[test]
1878 fn test_empty_engine() {
1879 let engine = TransportEngine::new(make_config(false));
1880 assert!(!engine.has_path(&[0; 16]));
1881 assert!(engine.hops_to(&[0; 16]).is_none());
1882 assert!(engine.next_hop(&[0; 16]).is_none());
1883 }
1884
1885 #[test]
1886 fn test_register_deregister_interface() {
1887 let mut engine = TransportEngine::new(make_config(false));
1888 engine.register_interface(make_interface(1, constants::MODE_FULL));
1889 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
1890
1891 engine.deregister_interface(InterfaceId(1));
1892 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
1893 }
1894
1895 #[test]
1896 fn test_deregister_interface_removes_announce_queue_state() {
1897 let mut engine = TransportEngine::new(make_config(false));
1898 engine.register_interface(make_interface(1, constants::MODE_FULL));
1899
1900 let _ = engine.announce_queues.gate_announce(
1901 InterfaceId(1),
1902 vec![0x01; 100].into(),
1903 [0xAA; 16],
1904 2,
1905 0.0,
1906 0.0,
1907 Some(1000),
1908 None,
1909 constants::ANNOUNCE_CAP,
1910 );
1911 let _ = engine.announce_queues.gate_announce(
1912 InterfaceId(1),
1913 vec![0x02; 100].into(),
1914 [0xBB; 16],
1915 3,
1916 0.0,
1917 0.0,
1918 Some(1000),
1919 None,
1920 constants::ANNOUNCE_CAP,
1921 );
1922 assert_eq!(engine.announce_queue_count(), 1);
1923
1924 engine.deregister_interface(InterfaceId(1));
1925 assert_eq!(engine.announce_queue_count(), 0);
1926 }
1927
1928 #[test]
1929 fn test_deregister_interface_preserves_other_announce_queues() {
1930 let mut engine = TransportEngine::new(make_config(false));
1931 engine.register_interface(make_interface(1, constants::MODE_FULL));
1932 engine.register_interface(make_interface(2, constants::MODE_FULL));
1933
1934 let _ = engine.announce_queues.gate_announce(
1935 InterfaceId(1),
1936 vec![0x01; 100].into(),
1937 [0xAA; 16],
1938 2,
1939 0.0,
1940 0.0,
1941 Some(1000),
1942 None,
1943 constants::ANNOUNCE_CAP,
1944 );
1945 let _ = engine.announce_queues.gate_announce(
1946 InterfaceId(1),
1947 vec![0x02; 100].into(),
1948 [0xAB; 16],
1949 3,
1950 0.0,
1951 0.0,
1952 Some(1000),
1953 None,
1954 constants::ANNOUNCE_CAP,
1955 );
1956 let _ = engine.announce_queues.gate_announce(
1957 InterfaceId(2),
1958 vec![0x03; 100].into(),
1959 [0xBA; 16],
1960 2,
1961 0.0,
1962 0.0,
1963 Some(1000),
1964 None,
1965 constants::ANNOUNCE_CAP,
1966 );
1967 let _ = engine.announce_queues.gate_announce(
1968 InterfaceId(2),
1969 vec![0x04; 100].into(),
1970 [0xBB; 16],
1971 3,
1972 0.0,
1973 0.0,
1974 Some(1000),
1975 None,
1976 constants::ANNOUNCE_CAP,
1977 );
1978
1979 engine.deregister_interface(InterfaceId(1));
1980 assert_eq!(engine.announce_queue_count(), 1);
1981 assert_eq!(engine.nonempty_announce_queue_count(), 1);
1982 }
1983
1984 #[test]
1985 fn test_register_deregister_destination() {
1986 let mut engine = TransportEngine::new(make_config(false));
1987 let dest = [0x11; 16];
1988 engine.register_destination(dest, constants::DESTINATION_SINGLE);
1989 assert!(engine.local_destinations.contains_key(&dest));
1990
1991 engine.deregister_destination(&dest);
1992 assert!(!engine.local_destinations.contains_key(&dest));
1993 }
1994
1995 #[test]
1996 fn test_path_state() {
1997 let mut engine = TransportEngine::new(make_config(false));
1998 let dest = [0x22; 16];
1999
2000 assert!(!engine.path_is_unresponsive(&dest));
2001
2002 engine.mark_path_unresponsive(&dest, None);
2003 assert!(engine.path_is_unresponsive(&dest));
2004
2005 engine.mark_path_responsive(&dest);
2006 assert!(!engine.path_is_unresponsive(&dest));
2007 }
2008
2009 #[test]
2010 fn test_announce_clears_stale_path_state_for_unknown_destination() {
2011 use crate::announce::AnnounceData;
2012 use crate::destination::{destination_hash, name_hash};
2013
2014 let mut engine = TransportEngine::new(make_config(false));
2015 engine.register_interface(make_interface(1, constants::MODE_FULL));
2016
2017 let identity =
2018 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x61; 32]));
2019 let dest_hash = destination_hash("pathfix", &["announce"], Some(identity.hash()));
2020 let name_h = name_hash("pathfix", &["announce"]);
2021 let random_hash = [0x24u8; 10];
2022
2023 let (announce_data, _) =
2024 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2025
2026 let packet = RawPacket::pack(
2027 PacketFlags {
2028 header_type: constants::HEADER_1,
2029 context_flag: constants::FLAG_UNSET,
2030 transport_type: constants::TRANSPORT_BROADCAST,
2031 destination_type: constants::DESTINATION_SINGLE,
2032 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2033 },
2034 0,
2035 &dest_hash,
2036 None,
2037 constants::CONTEXT_NONE,
2038 &announce_data,
2039 )
2040 .unwrap();
2041
2042 engine.mark_path_unresponsive(&dest_hash, None);
2043 assert!(engine.path_is_unresponsive(&dest_hash));
2044 assert!(!engine.has_path(&dest_hash));
2045
2046 let mut rng = rns_crypto::FixedRng::new(&[0x62; 32]);
2047 let actions = engine.handle_inbound(
2048 InboundFrame {
2049 raw: &packet.raw,
2050 iface: InterfaceId(1),
2051 now: 1000.0,
2052 rx: RxMetadata {
2053 rssi: None,
2054 snr: None,
2055 },
2056 },
2057 &mut rng,
2058 );
2059
2060 assert!(engine.has_path(&dest_hash));
2061 assert!(
2062 !engine.path_is_unresponsive(&dest_hash),
2063 "stale path state should be cleared for newly installed paths"
2064 );
2065 assert!(actions.iter().any(|action| matches!(
2066 action,
2067 TransportAction::PathUpdated {
2068 destination_hash,
2069 interface,
2070 ..
2071 } if *destination_hash == dest_hash && *interface == InterfaceId(1)
2072 )));
2073 }
2074
2075 #[test]
2076 fn test_boundary_exempts_unresponsive() {
2077 let mut engine = TransportEngine::new(make_config(false));
2078 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2079 let dest = [0xB1; 16];
2080
2081 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2083 assert!(!engine.path_is_unresponsive(&dest));
2084 }
2085
2086 #[test]
2087 fn test_non_boundary_marks_unresponsive() {
2088 let mut engine = TransportEngine::new(make_config(false));
2089 engine.register_interface(make_interface(1, constants::MODE_FULL));
2090 let dest = [0xB2; 16];
2091
2092 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2094 assert!(engine.path_is_unresponsive(&dest));
2095 }
2096
2097 #[test]
2098 fn test_expire_path() {
2099 let mut engine = TransportEngine::new(make_config(false));
2100 let dest = [0x33; 16];
2101
2102 engine.path_table.insert(
2103 dest,
2104 PathSet::from_single(
2105 PathEntry {
2106 timestamp: 1000.0,
2107 next_hop: [0; 16],
2108 hops: 2,
2109 expires: 9999.0,
2110 random_blobs: Vec::new(),
2111 receiving_interface: InterfaceId(1),
2112 packet_hash: [0; 32],
2113 announce_raw: None,
2114 },
2115 1,
2116 ),
2117 );
2118
2119 assert!(engine.has_path(&dest));
2120 engine.expire_path(&dest);
2121 assert!(engine.has_path(&dest));
2123 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2124 }
2125
2126 #[test]
2127 fn test_link_table_operations() {
2128 let mut engine = TransportEngine::new(make_config(false));
2129 let link_id = [0x44; 16];
2130
2131 engine.register_link(
2132 link_id,
2133 LinkEntry {
2134 timestamp: 100.0,
2135 next_hop_transport_id: [0; 16],
2136 next_hop_interface: InterfaceId(1),
2137 remaining_hops: 3,
2138 received_interface: InterfaceId(2),
2139 taken_hops: 2,
2140 destination_hash: [0xAA; 16],
2141 validated: false,
2142 proof_timeout: 200.0,
2143 },
2144 );
2145
2146 assert!(engine.link_table.contains_key(&link_id));
2147 assert!(!engine.link_table[&link_id].validated);
2148
2149 engine.validate_link(&link_id);
2150 assert!(engine.link_table[&link_id].validated);
2151
2152 engine.remove_link(&link_id);
2153 assert!(!engine.link_table.contains_key(&link_id));
2154 }
2155
2156 #[test]
2157 fn test_lrproof_routes_from_originating_side_via_link_table() {
2158 let mut engine = TransportEngine::new(make_config(true));
2159 engine.register_interface(make_interface(1, constants::MODE_FULL));
2160 engine.register_interface(make_interface(2, constants::MODE_FULL));
2161
2162 let link_id = [0x44; 16];
2163 engine.register_link(
2164 link_id,
2165 LinkEntry {
2166 timestamp: 100.0,
2167 next_hop_transport_id: [0xAA; 16],
2168 next_hop_interface: InterfaceId(2),
2169 remaining_hops: 3,
2170 received_interface: InterfaceId(1),
2171 taken_hops: 1,
2172 destination_hash: [0xBB; 16],
2173 validated: false,
2174 proof_timeout: 200.0,
2175 },
2176 );
2177
2178 let flags = PacketFlags {
2179 header_type: constants::HEADER_1,
2180 context_flag: constants::FLAG_UNSET,
2181 transport_type: constants::TRANSPORT_BROADCAST,
2182 destination_type: constants::DESTINATION_LINK,
2183 packet_type: constants::PACKET_TYPE_PROOF,
2184 };
2185 let packet = RawPacket::pack(
2186 flags,
2187 0,
2188 &link_id,
2189 None,
2190 constants::CONTEXT_LRPROOF,
2191 &[0xCC; 64],
2192 )
2193 .unwrap();
2194 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2195
2196 let actions = engine.handle_inbound(
2197 InboundFrame {
2198 raw: &packet.raw,
2199 iface: InterfaceId(1),
2200 now: 101.0,
2201 rx: RxMetadata {
2202 rssi: None,
2203 snr: None,
2204 },
2205 },
2206 &mut rng,
2207 );
2208
2209 assert!(matches!(
2210 engine
2211 .link_table_ref()
2212 .get(&link_id)
2213 .map(|entry| entry.validated),
2214 Some(true)
2215 ));
2216 assert!(actions.iter().any(|action| matches!(
2217 action,
2218 TransportAction::LinkEstablished {
2219 link_id: established,
2220 interface: InterfaceId(2),
2221 } if *established == link_id
2222 )));
2223 assert!(actions.iter().any(|action| matches!(
2224 action,
2225 TransportAction::SendOnInterface {
2226 interface: InterfaceId(2),
2227 ..
2228 }
2229 )));
2230 }
2231
2232 #[test]
2233 fn test_packet_filter_drops_plain_announce() {
2234 let engine = TransportEngine::new(make_config(false));
2235 let flags = PacketFlags {
2236 header_type: constants::HEADER_1,
2237 context_flag: constants::FLAG_UNSET,
2238 transport_type: constants::TRANSPORT_BROADCAST,
2239 destination_type: constants::DESTINATION_PLAIN,
2240 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2241 };
2242 let packet =
2243 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2244 assert!(!engine.packet_filter(&packet));
2245 }
2246
2247 #[test]
2248 fn test_packet_filter_allows_keepalive() {
2249 let engine = TransportEngine::new(make_config(false));
2250 let flags = PacketFlags {
2251 header_type: constants::HEADER_1,
2252 context_flag: constants::FLAG_UNSET,
2253 transport_type: constants::TRANSPORT_BROADCAST,
2254 destination_type: constants::DESTINATION_SINGLE,
2255 packet_type: constants::PACKET_TYPE_DATA,
2256 };
2257 let packet = RawPacket::pack(
2258 flags,
2259 0,
2260 &[0; 16],
2261 None,
2262 constants::CONTEXT_KEEPALIVE,
2263 b"test",
2264 )
2265 .unwrap();
2266 assert!(engine.packet_filter(&packet));
2267 }
2268
2269 #[test]
2270 fn test_packet_filter_drops_high_hop_plain() {
2271 let engine = TransportEngine::new(make_config(false));
2272 let flags = PacketFlags {
2273 header_type: constants::HEADER_1,
2274 context_flag: constants::FLAG_UNSET,
2275 transport_type: constants::TRANSPORT_BROADCAST,
2276 destination_type: constants::DESTINATION_PLAIN,
2277 packet_type: constants::PACKET_TYPE_DATA,
2278 };
2279 let mut packet =
2280 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2281 packet.hops = 2;
2282 assert!(!engine.packet_filter(&packet));
2283 }
2284
2285 #[test]
2286 fn test_packet_filter_allows_duplicate_single_announce() {
2287 let mut engine = TransportEngine::new(make_config(false));
2288 let flags = PacketFlags {
2289 header_type: constants::HEADER_1,
2290 context_flag: constants::FLAG_UNSET,
2291 transport_type: constants::TRANSPORT_BROADCAST,
2292 destination_type: constants::DESTINATION_SINGLE,
2293 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2294 };
2295 let packet = RawPacket::pack(
2296 flags,
2297 0,
2298 &[0; 16],
2299 None,
2300 constants::CONTEXT_NONE,
2301 &[0xAA; 64],
2302 )
2303 .unwrap();
2304
2305 engine.packet_hashlist.add(packet.packet_hash);
2307
2308 assert!(engine.packet_filter(&packet));
2310 }
2311
2312 #[test]
2313 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2314 let mut engine = TransportEngine::new(make_config(false));
2315 engine.packet_hashlist = PacketHashlist::new(2);
2316
2317 let make_packet = |seed: u8| {
2318 let flags = PacketFlags {
2319 header_type: constants::HEADER_1,
2320 context_flag: constants::FLAG_UNSET,
2321 transport_type: constants::TRANSPORT_BROADCAST,
2322 destination_type: constants::DESTINATION_SINGLE,
2323 packet_type: constants::PACKET_TYPE_DATA,
2324 };
2325 RawPacket::pack(
2326 flags,
2327 0,
2328 &[seed; 16],
2329 None,
2330 constants::CONTEXT_NONE,
2331 &[seed; 4],
2332 )
2333 .unwrap()
2334 };
2335
2336 let packet1 = make_packet(1);
2337 let packet2 = make_packet(2);
2338 let packet3 = make_packet(3);
2339
2340 engine.packet_hashlist.add(packet1.packet_hash);
2341 engine.packet_hashlist.add(packet2.packet_hash);
2342 assert!(!engine.packet_filter(&packet1));
2343
2344 engine.packet_hashlist.add(packet3.packet_hash);
2345
2346 assert!(engine.packet_filter(&packet1));
2347 assert!(!engine.packet_filter(&packet2));
2348 assert!(!engine.packet_filter(&packet3));
2349 }
2350
2351 #[test]
2352 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2353 let mut engine = TransportEngine::new(make_config(false));
2354 engine.packet_hashlist = PacketHashlist::new(2);
2355
2356 let make_packet = |seed: u8| {
2357 let flags = PacketFlags {
2358 header_type: constants::HEADER_1,
2359 context_flag: constants::FLAG_UNSET,
2360 transport_type: constants::TRANSPORT_BROADCAST,
2361 destination_type: constants::DESTINATION_SINGLE,
2362 packet_type: constants::PACKET_TYPE_DATA,
2363 };
2364 RawPacket::pack(
2365 flags,
2366 0,
2367 &[seed; 16],
2368 None,
2369 constants::CONTEXT_NONE,
2370 &[seed; 4],
2371 )
2372 .unwrap()
2373 };
2374
2375 let packet1 = make_packet(1);
2376 let packet2 = make_packet(2);
2377 let packet3 = make_packet(3);
2378
2379 engine.packet_hashlist.add(packet1.packet_hash);
2380 engine.packet_hashlist.add(packet2.packet_hash);
2381 engine.packet_hashlist.add(packet2.packet_hash);
2382 engine.packet_hashlist.add(packet3.packet_hash);
2383
2384 assert!(engine.packet_filter(&packet1));
2385 assert!(!engine.packet_filter(&packet2));
2386 assert!(!engine.packet_filter(&packet3));
2387 }
2388
2389 #[test]
2390 fn test_tick_retransmits_announce() {
2391 let mut engine = TransportEngine::new(make_config(true));
2392 engine.register_interface(make_interface(1, constants::MODE_FULL));
2393
2394 let dest = [0x55; 16];
2395 engine.insert_announce_entry(
2396 dest,
2397 AnnounceEntry {
2398 timestamp: 190.0,
2399 retransmit_timeout: 100.0, retries: 0,
2401 received_from: [0xAA; 16],
2402 hops: 2,
2403 packet_raw: vec![0x01, 0x02],
2404 packet_data: vec![0xCC; 10],
2405 destination_hash: dest,
2406 context_flag: 0,
2407 local_rebroadcasts: 0,
2408 block_rebroadcasts: false,
2409 attached_interface: None,
2410 },
2411 190.0,
2412 );
2413
2414 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2415 let actions = engine.tick(200.0, &mut rng);
2416
2417 assert!(!actions.is_empty());
2420 assert!(matches!(
2421 &actions[0],
2422 TransportAction::SendOnInterface { .. }
2423 ));
2424
2425 assert_eq!(engine.announce_table[&dest].retries, 1);
2427 }
2428
2429 #[test]
2430 fn test_gate_retransmit_actions_expands_broadcast_to_matching_interfaces() {
2431 let mut engine = TransportEngine::new(make_config(false));
2432 engine.register_interface(make_interface(1, constants::MODE_FULL));
2433 engine.register_interface(make_interface(2, constants::MODE_FULL));
2434 engine.register_interface(make_interface(3, constants::MODE_ACCESS_POINT));
2435
2436 let dest = [0x56; 16];
2437 let raw = make_announce_raw(&dest, &[0xAB; 32]);
2438 let actions = engine.gate_retransmit_actions(
2439 vec![TransportAction::BroadcastOnAllInterfaces {
2440 raw: raw.clone().into(),
2441 exclude: None,
2442 }],
2443 1000.0,
2444 );
2445
2446 assert_eq!(actions.len(), 2);
2447 for action in &actions {
2448 match action {
2449 TransportAction::SendOnInterface {
2450 interface,
2451 raw: sent,
2452 } => {
2453 assert!(*interface == InterfaceId(1) || *interface == InterfaceId(2));
2454 assert_eq!(&**sent, raw.as_slice());
2455 }
2456 other => panic!("expected SendOnInterface, got {:?}", other),
2457 }
2458 }
2459 }
2460
2461 #[test]
2462 fn test_tick_culls_expired_announce_entries() {
2463 let mut config = make_config(true);
2464 config.announce_table_ttl_secs = 10.0;
2465 let mut engine = TransportEngine::new(config);
2466
2467 let dest1 = [0x61; 16];
2468 let dest2 = [0x62; 16];
2469 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2470 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2471
2472 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2473 let _ = engine.tick(111.0, &mut rng);
2474
2475 assert!(!engine.announce_table().contains_key(&dest1));
2476 assert!(!engine.held_announces().contains_key(&dest2));
2477 }
2478
2479 #[test]
2480 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2481 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2482 let mut config = make_config(true);
2483 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2484 * 2
2485 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2486 let max_bytes = config.announce_table_max_bytes;
2487 let mut engine = TransportEngine::new(config);
2488
2489 let held_dest = [0x71; 16];
2490 let active_dest = [0x72; 16];
2491 let newest_dest = [0x73; 16];
2492
2493 assert!(engine.insert_held_announce(
2494 held_dest,
2495 make_announce_entry(held_dest, 100.0, 32),
2496 100.0,
2497 ));
2498 assert!(engine.insert_announce_entry(
2499 active_dest,
2500 make_announce_entry(active_dest, 100.0, 32),
2501 100.0,
2502 ));
2503 assert!(engine.insert_announce_entry(
2504 newest_dest,
2505 make_announce_entry(newest_dest, 101.0, 32),
2506 101.0,
2507 ));
2508
2509 assert!(!engine.held_announces().contains_key(&held_dest));
2510 assert!(engine.announce_table().contains_key(&active_dest));
2511 assert!(engine.announce_table().contains_key(&newest_dest));
2512 assert!(engine.announce_retained_bytes() <= max_bytes);
2513 }
2514
2515 #[test]
2516 fn test_oversized_announce_entry_is_not_retained() {
2517 let mut config = make_config(true);
2518 config.announce_table_max_bytes = 200;
2519 let mut engine = TransportEngine::new(config);
2520 let dest = [0x81; 16];
2521
2522 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2523 assert!(!engine.announce_table().contains_key(&dest));
2524 assert_eq!(engine.announce_retained_bytes(), 0);
2525 }
2526
2527 #[test]
2528 fn test_void_queues_clears_shutdown_transients() {
2529 let mut engine = TransportEngine::new(make_config(true));
2530 engine.register_interface(make_interface(1, constants::MODE_FULL));
2531
2532 let active_dest = [0x91; 16];
2533 let held_dest = [0x92; 16];
2534 assert!(engine.insert_announce_entry(
2535 active_dest,
2536 make_announce_entry(active_dest, 100.0, 16),
2537 100.0,
2538 ));
2539 assert!(engine.insert_held_announce(
2540 held_dest,
2541 make_announce_entry(held_dest, 100.0, 16),
2542 100.0,
2543 ));
2544 engine.reverse_table.insert(
2545 [0x93; 16],
2546 tables::ReverseEntry {
2547 receiving_interface: InterfaceId(1),
2548 outbound_interface: InterfaceId(2),
2549 timestamp: 100.0,
2550 },
2551 );
2552 let _ = engine.announce_queues.gate_announce(
2553 InterfaceId(1),
2554 vec![0xAA; 32].into(),
2555 [0x94; 16],
2556 2,
2557 100.0,
2558 100.0,
2559 Some(1000),
2560 None,
2561 constants::ANNOUNCE_CAP,
2562 );
2563 let _ = engine.announce_queues.gate_announce(
2564 InterfaceId(1),
2565 vec![0xBB; 32].into(),
2566 [0x95; 16],
2567 3,
2568 100.0,
2569 100.0,
2570 Some(1000),
2571 None,
2572 constants::ANNOUNCE_CAP,
2573 );
2574
2575 assert_eq!(engine.announce_table_count(), 1);
2576 assert_eq!(engine.held_announces_count(), 1);
2577 assert_eq!(engine.reverse_table_count(), 1);
2578 assert_eq!(engine.queued_announce_count(), 1);
2579
2580 engine.void_queues();
2581
2582 assert_eq!(engine.announce_table_count(), 0);
2583 assert_eq!(engine.held_announces_count(), 0);
2584 assert_eq!(engine.reverse_table_count(), 0);
2585 assert_eq!(engine.queued_announce_count(), 0);
2586 assert_eq!(engine.nonempty_announce_queue_count(), 0);
2587 assert_eq!(engine.announce_retained_bytes(), 0);
2588 }
2589
2590 #[test]
2591 fn test_blackhole_identity() {
2592 let mut engine = TransportEngine::new(make_config(false));
2593 let hash = [0xAA; 16];
2594 let now = 1000.0;
2595
2596 assert!(!engine.is_blackholed(&hash, now));
2597
2598 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2599 assert!(engine.is_blackholed(&hash, now));
2600 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
2603 assert!(!engine.is_blackholed(&hash, now));
2604 assert!(!engine.unblackhole_identity(&hash)); }
2606
2607 #[test]
2608 fn test_blackhole_with_duration() {
2609 let mut engine = TransportEngine::new(make_config(false));
2610 let hash = [0xBB; 16];
2611 let now = 1000.0;
2612
2613 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
2615 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
2618
2619 #[test]
2620 fn test_cull_blackholed() {
2621 let mut engine = TransportEngine::new(make_config(false));
2622 let hash1 = [0xCC; 16];
2623 let hash2 = [0xDD; 16];
2624 let now = 1000.0;
2625
2626 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
2632 assert!(engine.blackholed_identities.contains_key(&hash2));
2633 }
2634
2635 #[test]
2636 fn test_blackhole_blocks_announce() {
2637 use crate::announce::AnnounceData;
2638 use crate::destination::{destination_hash, name_hash};
2639
2640 let mut engine = TransportEngine::new(make_config(false));
2641 engine.register_interface(make_interface(1, constants::MODE_FULL));
2642
2643 let identity =
2644 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2645 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2646 let name_h = name_hash("test", &["app"]);
2647 let random_hash = [0x42u8; 10];
2648
2649 let (announce_data, _) =
2650 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2651
2652 let flags = PacketFlags {
2653 header_type: constants::HEADER_1,
2654 context_flag: constants::FLAG_UNSET,
2655 transport_type: constants::TRANSPORT_BROADCAST,
2656 destination_type: constants::DESTINATION_SINGLE,
2657 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2658 };
2659 let packet = RawPacket::pack(
2660 flags,
2661 0,
2662 &dest_hash,
2663 None,
2664 constants::CONTEXT_NONE,
2665 &announce_data,
2666 )
2667 .unwrap();
2668
2669 let now = 1000.0;
2671 engine.blackhole_identity(*identity.hash(), now, None, None);
2672
2673 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2674 let actions = engine.handle_inbound(
2675 InboundFrame {
2676 raw: &packet.raw,
2677 iface: InterfaceId(1),
2678 now,
2679 rx: RxMetadata {
2680 rssi: None,
2681 snr: None,
2682 },
2683 },
2684 &mut rng,
2685 );
2686
2687 assert!(actions
2689 .iter()
2690 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2691 assert!(actions
2692 .iter()
2693 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2694 }
2695
2696 #[test]
2697 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2698 use crate::announce::AnnounceData;
2699 use crate::destination::{destination_hash, name_hash};
2700 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2701
2702 let mut engine = TransportEngine::new(make_config(true));
2703 engine.register_interface(make_interface(1, constants::MODE_FULL));
2704
2705 let identity =
2706 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2707 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2708 let name_h = name_hash("async", &["announce"]);
2709 let random_hash = [0x44u8; 10];
2710 let (announce_data, _) =
2711 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2712
2713 let packet = RawPacket::pack(
2714 PacketFlags {
2715 header_type: constants::HEADER_2,
2716 context_flag: constants::FLAG_UNSET,
2717 transport_type: constants::TRANSPORT_TRANSPORT,
2718 destination_type: constants::DESTINATION_SINGLE,
2719 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2720 },
2721 3,
2722 &dest_hash,
2723 Some(&[0xBB; 16]),
2724 constants::CONTEXT_NONE,
2725 &announce_data,
2726 )
2727 .unwrap();
2728
2729 engine.announce_table.insert(
2730 dest_hash,
2731 AnnounceEntry {
2732 timestamp: 1000.0,
2733 retransmit_timeout: 2000.0,
2734 retries: constants::PATHFINDER_R,
2735 received_from: [0xBB; 16],
2736 hops: 2,
2737 packet_raw: packet.raw.clone(),
2738 packet_data: packet.data.clone(),
2739 destination_hash: dest_hash,
2740 context_flag: constants::FLAG_UNSET,
2741 local_rebroadcasts: 0,
2742 block_rebroadcasts: false,
2743 attached_interface: None,
2744 },
2745 );
2746
2747 let mut queue = AnnounceVerifyQueue::new(8);
2748 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2749 let actions = engine.handle_inbound_with_announce_queue(
2750 InboundFrame {
2751 raw: &packet.raw,
2752 iface: InterfaceId(1),
2753 now: 1000.0,
2754 rx: RxMetadata {
2755 rssi: None,
2756 snr: None,
2757 },
2758 },
2759 &mut rng,
2760 Some(&mut queue),
2761 );
2762
2763 assert!(actions.is_empty());
2764 assert_eq!(queue.len(), 1);
2765 assert!(
2766 !engine.announce_table.contains_key(&dest_hash),
2767 "retransmit completion should clear announce_table before queueing"
2768 );
2769 }
2770
2771 #[test]
2772 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
2773 use crate::announce::AnnounceData;
2774 use crate::destination::{destination_hash, name_hash};
2775 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2776
2777 let mut engine = TransportEngine::new(make_config(false));
2778 engine.register_interface(make_interface(1, constants::MODE_FULL));
2779
2780 let identity =
2781 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
2782 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
2783 let name_h = name_hash("async", &["cache"]);
2784 let random_hash = [0x55u8; 10];
2785 let (announce_data, _) =
2786 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2787
2788 let packet = RawPacket::pack(
2789 PacketFlags {
2790 header_type: constants::HEADER_1,
2791 context_flag: constants::FLAG_UNSET,
2792 transport_type: constants::TRANSPORT_BROADCAST,
2793 destination_type: constants::DESTINATION_SINGLE,
2794 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2795 },
2796 0,
2797 &dest_hash,
2798 None,
2799 constants::CONTEXT_NONE,
2800 &announce_data,
2801 )
2802 .unwrap();
2803
2804 let mut queue = AnnounceVerifyQueue::new(8);
2805 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
2806 let actions = engine.handle_inbound_with_announce_queue(
2807 InboundFrame {
2808 raw: &packet.raw,
2809 iface: InterfaceId(1),
2810 now: 1000.0,
2811 rx: RxMetadata {
2812 rssi: None,
2813 snr: None,
2814 },
2815 },
2816 &mut rng,
2817 Some(&mut queue),
2818 );
2819 assert!(actions.is_empty());
2820 assert_eq!(queue.len(), 1);
2821
2822 let mut batch = queue.take_pending(1000.0);
2823 assert_eq!(batch.len(), 1);
2824 let (key, pending) = batch.pop().unwrap();
2825
2826 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
2827 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
2828 let mut material = [0u8; 80];
2829 material[..16].copy_from_slice(&pending.packet.destination_hash);
2830 material[16..].copy_from_slice(&announce.signature);
2831 let sig_cache_key = hash::full_hash(&material);
2832
2833 let pending = queue.complete_success(&key).unwrap();
2834 let actions =
2835 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
2836 assert!(actions
2837 .iter()
2838 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
2839 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
2840
2841 let actions = engine.handle_inbound_with_announce_queue(
2842 InboundFrame {
2843 raw: &packet.raw,
2844 iface: InterfaceId(1),
2845 now: 1001.0,
2846 rx: RxMetadata {
2847 rssi: None,
2848 snr: None,
2849 },
2850 },
2851 &mut rng,
2852 Some(&mut queue),
2853 );
2854 assert!(actions.is_empty());
2855 assert_eq!(queue.len(), 0);
2856 }
2857
2858 #[test]
2859 fn test_tick_culls_expired_path() {
2860 let mut engine = TransportEngine::new(make_config(false));
2861 engine.register_interface(make_interface(1, constants::MODE_FULL));
2862
2863 let dest = [0x66; 16];
2864 engine.path_table.insert(
2865 dest,
2866 PathSet::from_single(
2867 PathEntry {
2868 timestamp: 100.0,
2869 next_hop: [0; 16],
2870 hops: 2,
2871 expires: 200.0,
2872 random_blobs: Vec::new(),
2873 receiving_interface: InterfaceId(1),
2874 packet_hash: [0; 32],
2875 announce_raw: None,
2876 },
2877 1,
2878 ),
2879 );
2880
2881 assert!(engine.has_path(&dest));
2882
2883 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2884 engine.tick(300.0, &mut rng);
2886
2887 assert!(!engine.has_path(&dest));
2888 }
2889
2890 fn make_local_client_interface(id: u64) -> InterfaceInfo {
2895 InterfaceInfo {
2896 id: InterfaceId(id),
2897 name: String::from("local_client"),
2898 mode: constants::MODE_FULL,
2899 out_capable: true,
2900 in_capable: true,
2901 bitrate: None,
2902 airtime_profile: None,
2903 announce_rate_target: None,
2904 announce_rate_grace: 0,
2905 announce_rate_penalty: 0.0,
2906 announce_cap: constants::ANNOUNCE_CAP,
2907 is_local_client: true,
2908 wants_tunnel: false,
2909 tunnel_id: None,
2910 mtu: constants::MTU as u32,
2911 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2912 ia_freq: 0.0,
2913 ip_freq: 0.0,
2914 op_freq: 0.0,
2915 op_samples: 0,
2916 started: 0.0,
2917 }
2918 }
2919
2920 #[test]
2921 fn test_has_local_clients() {
2922 let mut engine = TransportEngine::new(make_config(false));
2923 assert!(!engine.has_local_clients());
2924
2925 engine.register_interface(make_interface(1, constants::MODE_FULL));
2926 assert!(!engine.has_local_clients());
2927
2928 engine.register_interface(make_local_client_interface(2));
2929 assert!(engine.has_local_clients());
2930
2931 engine.deregister_interface(InterfaceId(2));
2932 assert!(!engine.has_local_clients());
2933 }
2934
2935 #[test]
2936 fn test_local_client_hop_decrement() {
2937 let mut engine = TransportEngine::new(make_config(false));
2940 engine.register_interface(make_local_client_interface(1));
2941 engine.register_interface(make_interface(2, constants::MODE_FULL));
2942
2943 let dest = [0xAA; 16];
2945 engine.register_destination(dest, constants::DESTINATION_PLAIN);
2946
2947 let flags = PacketFlags {
2948 header_type: constants::HEADER_1,
2949 context_flag: constants::FLAG_UNSET,
2950 transport_type: constants::TRANSPORT_BROADCAST,
2951 destination_type: constants::DESTINATION_PLAIN,
2952 packet_type: constants::PACKET_TYPE_DATA,
2953 };
2954 let packet =
2956 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2957
2958 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2959 let actions = engine.handle_inbound(
2960 InboundFrame {
2961 raw: &packet.raw,
2962 iface: InterfaceId(1),
2963 now: 1000.0,
2964 rx: RxMetadata {
2965 rssi: None,
2966 snr: None,
2967 },
2968 },
2969 &mut rng,
2970 );
2971
2972 let deliver = actions
2975 .iter()
2976 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
2977 assert!(deliver.is_some(), "Should deliver locally");
2978 }
2979
2980 #[test]
2981 fn test_prepare_inbound_packet_only_retains_original_raw_for_announces() {
2982 let engine = TransportEngine::new(make_config(false));
2983 let dest = [0xAB; 16];
2984 let flags = PacketFlags {
2985 header_type: constants::HEADER_1,
2986 context_flag: constants::FLAG_UNSET,
2987 transport_type: constants::TRANSPORT_BROADCAST,
2988 destination_type: constants::DESTINATION_SINGLE,
2989 packet_type: constants::PACKET_TYPE_DATA,
2990 };
2991 let packet =
2992 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2993
2994 let ctx = engine
2995 .prepare_inbound_packet(InboundFrame {
2996 raw: &packet.raw,
2997 iface: InterfaceId(9),
2998 now: 1000.0,
2999 rx: RxMetadata {
3000 rssi: None,
3001 snr: None,
3002 },
3003 })
3004 .expect("packet should parse and pass filter");
3005
3006 assert!(ctx.original_raw.is_none());
3007 assert_eq!(ctx.packet.raw, packet.raw);
3008 assert_eq!(ctx.packet.hops, 1);
3009 assert_eq!(ctx.iface, InterfaceId(9));
3010
3011 let announce_flags = PacketFlags {
3012 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3013 ..flags
3014 };
3015 let announce = RawPacket::pack(
3016 announce_flags,
3017 0,
3018 &dest,
3019 None,
3020 constants::CONTEXT_NONE,
3021 &[0u8; 91],
3022 )
3023 .unwrap();
3024 let announce_ctx = engine
3025 .prepare_inbound_packet(InboundFrame {
3026 raw: &announce.raw,
3027 iface: InterfaceId(9),
3028 now: 1000.0,
3029 rx: RxMetadata {
3030 rssi: None,
3031 snr: None,
3032 },
3033 })
3034 .expect("announce should parse and pass filter");
3035 assert_eq!(
3036 announce_ctx.original_raw.as_deref(),
3037 Some(announce.raw.as_slice())
3038 );
3039 }
3040
3041 #[test]
3042 fn test_deliver_local_preserves_original_raw_and_metadata() {
3043 let mut engine = TransportEngine::new(make_config(false));
3044 engine.register_interface(make_interface(1, constants::MODE_FULL));
3045
3046 let dest = [0xAC; 16];
3047 engine.register_destination(dest, constants::DESTINATION_SINGLE);
3048
3049 let flags = PacketFlags {
3050 header_type: constants::HEADER_1,
3051 context_flag: constants::FLAG_UNSET,
3052 transport_type: constants::TRANSPORT_BROADCAST,
3053 destination_type: constants::DESTINATION_SINGLE,
3054 packet_type: constants::PACKET_TYPE_DATA,
3055 };
3056 let packet =
3057 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"deliver").unwrap();
3058
3059 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3060 let actions = engine.handle_inbound(
3061 InboundFrame {
3062 raw: &packet.raw,
3063 iface: InterfaceId(1),
3064 now: 1000.0,
3065 rx: RxMetadata {
3066 rssi: None,
3067 snr: None,
3068 },
3069 },
3070 &mut rng,
3071 );
3072
3073 let deliver = actions
3074 .iter()
3075 .find_map(|action| match action {
3076 TransportAction::DeliverLocal {
3077 destination_hash,
3078 raw,
3079 packet_hash,
3080 receiving_interface,
3081 } => Some((destination_hash, raw, packet_hash, receiving_interface)),
3082 _ => None,
3083 })
3084 .expect("should produce DeliverLocal");
3085
3086 assert_eq!(*deliver.0, dest);
3087 assert_eq!(&**deliver.1, packet.raw.as_slice());
3088 assert_eq!(*deliver.2, packet.packet_hash);
3089 assert_eq!(*deliver.3, InterfaceId(1));
3090 }
3091
3092 #[test]
3093 fn test_plain_broadcast_from_local_client() {
3094 let mut engine = TransportEngine::new(make_config(false));
3096 engine.register_interface(make_local_client_interface(1));
3097 engine.register_interface(make_interface(2, constants::MODE_FULL));
3098
3099 let dest = [0xBB; 16];
3100 let flags = PacketFlags {
3101 header_type: constants::HEADER_1,
3102 context_flag: constants::FLAG_UNSET,
3103 transport_type: constants::TRANSPORT_BROADCAST,
3104 destination_type: constants::DESTINATION_PLAIN,
3105 packet_type: constants::PACKET_TYPE_DATA,
3106 };
3107 let packet =
3108 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3109
3110 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3111 let actions = engine.handle_inbound(
3112 InboundFrame {
3113 raw: &packet.raw,
3114 iface: InterfaceId(1),
3115 now: 1000.0,
3116 rx: RxMetadata {
3117 rssi: None,
3118 snr: None,
3119 },
3120 },
3121 &mut rng,
3122 );
3123
3124 let forward = actions.iter().find(|a| {
3126 matches!(
3127 a,
3128 TransportAction::ForwardPlainBroadcast {
3129 to_local: false,
3130 ..
3131 }
3132 )
3133 });
3134 assert!(forward.is_some(), "Should forward to external interfaces");
3135 }
3136
3137 #[test]
3138 fn test_plain_broadcast_from_external() {
3139 let mut engine = TransportEngine::new(make_config(false));
3141 engine.register_interface(make_local_client_interface(1));
3142 engine.register_interface(make_interface(2, constants::MODE_FULL));
3143
3144 let dest = [0xCC; 16];
3145 let flags = PacketFlags {
3146 header_type: constants::HEADER_1,
3147 context_flag: constants::FLAG_UNSET,
3148 transport_type: constants::TRANSPORT_BROADCAST,
3149 destination_type: constants::DESTINATION_PLAIN,
3150 packet_type: constants::PACKET_TYPE_DATA,
3151 };
3152 let packet =
3153 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3154
3155 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3156 let actions = engine.handle_inbound(
3157 InboundFrame {
3158 raw: &packet.raw,
3159 iface: InterfaceId(2),
3160 now: 1000.0,
3161 rx: RxMetadata {
3162 rssi: None,
3163 snr: None,
3164 },
3165 },
3166 &mut rng,
3167 );
3168
3169 let forward = actions.iter().find(|a| {
3171 matches!(
3172 a,
3173 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3174 )
3175 });
3176 assert!(forward.is_some(), "Should forward to local clients");
3177 }
3178
3179 #[test]
3180 fn test_no_plain_broadcast_bridging_without_local_clients() {
3181 let mut engine = TransportEngine::new(make_config(false));
3183 engine.register_interface(make_interface(1, constants::MODE_FULL));
3184 engine.register_interface(make_interface(2, constants::MODE_FULL));
3185
3186 let dest = [0xDD; 16];
3187 let flags = PacketFlags {
3188 header_type: constants::HEADER_1,
3189 context_flag: constants::FLAG_UNSET,
3190 transport_type: constants::TRANSPORT_BROADCAST,
3191 destination_type: constants::DESTINATION_PLAIN,
3192 packet_type: constants::PACKET_TYPE_DATA,
3193 };
3194 let packet =
3195 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3196
3197 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3198 let actions = engine.handle_inbound(
3199 InboundFrame {
3200 raw: &packet.raw,
3201 iface: InterfaceId(1),
3202 now: 1000.0,
3203 rx: RxMetadata {
3204 rssi: None,
3205 snr: None,
3206 },
3207 },
3208 &mut rng,
3209 );
3210
3211 let has_forward = actions
3213 .iter()
3214 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3215 assert!(!has_forward, "No bridging without local clients");
3216 }
3217
3218 #[test]
3219 fn test_announce_forwarded_to_local_clients() {
3220 use crate::announce::AnnounceData;
3221 use crate::destination::{destination_hash, name_hash};
3222
3223 let mut engine = TransportEngine::new(make_config(true));
3224 engine.register_interface(make_interface(1, constants::MODE_FULL));
3225 engine.register_interface(make_local_client_interface(2));
3226
3227 let identity =
3228 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3229 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3230 let name_h = name_hash("test", &["fwd"]);
3231 let random_hash = [0x42u8; 10];
3232
3233 let (announce_data, _) =
3234 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3235
3236 let flags = PacketFlags {
3237 header_type: constants::HEADER_1,
3238 context_flag: constants::FLAG_UNSET,
3239 transport_type: constants::TRANSPORT_BROADCAST,
3240 destination_type: constants::DESTINATION_SINGLE,
3241 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3242 };
3243 let packet = RawPacket::pack(
3244 flags,
3245 0,
3246 &dest_hash,
3247 None,
3248 constants::CONTEXT_NONE,
3249 &announce_data,
3250 )
3251 .unwrap();
3252
3253 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3254 let actions = engine.handle_inbound(
3255 InboundFrame {
3256 raw: &packet.raw,
3257 iface: InterfaceId(1),
3258 now: 1000.0,
3259 rx: RxMetadata {
3260 rssi: None,
3261 snr: None,
3262 },
3263 },
3264 &mut rng,
3265 );
3266
3267 let forward = actions
3269 .iter()
3270 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3271 assert!(
3272 forward.is_some(),
3273 "Should forward announce to local clients"
3274 );
3275
3276 match forward.unwrap() {
3278 TransportAction::ForwardToLocalClients { exclude, raw } => {
3279 assert_eq!(*exclude, Some(InterfaceId(1)));
3280 let flags = PacketFlags::unpack(raw[0]);
3281 assert_eq!(flags.header_type, constants::HEADER_2);
3282 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
3283 assert_eq!(&raw[2..18], &[0x42; 16]);
3284 }
3285 _ => unreachable!(),
3286 }
3287 }
3288
3289 #[test]
3290 fn test_no_announce_forward_without_local_clients() {
3291 use crate::announce::AnnounceData;
3292 use crate::destination::{destination_hash, name_hash};
3293
3294 let mut engine = TransportEngine::new(make_config(false));
3295 engine.register_interface(make_interface(1, constants::MODE_FULL));
3296
3297 let identity =
3298 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3299 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3300 let name_h = name_hash("test", &["nofwd"]);
3301 let random_hash = [0x42u8; 10];
3302
3303 let (announce_data, _) =
3304 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3305
3306 let flags = PacketFlags {
3307 header_type: constants::HEADER_1,
3308 context_flag: constants::FLAG_UNSET,
3309 transport_type: constants::TRANSPORT_BROADCAST,
3310 destination_type: constants::DESTINATION_SINGLE,
3311 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3312 };
3313 let packet = RawPacket::pack(
3314 flags,
3315 0,
3316 &dest_hash,
3317 None,
3318 constants::CONTEXT_NONE,
3319 &announce_data,
3320 )
3321 .unwrap();
3322
3323 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3324 let actions = engine.handle_inbound(
3325 InboundFrame {
3326 raw: &packet.raw,
3327 iface: InterfaceId(1),
3328 now: 1000.0,
3329 rx: RxMetadata {
3330 rssi: None,
3331 snr: None,
3332 },
3333 },
3334 &mut rng,
3335 );
3336
3337 let has_forward = actions
3339 .iter()
3340 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3341 assert!(!has_forward, "No forward without local clients");
3342 }
3343
3344 #[test]
3345 fn test_local_client_exclude_from_forward() {
3346 use crate::announce::AnnounceData;
3347 use crate::destination::{destination_hash, name_hash};
3348
3349 let mut engine = TransportEngine::new(make_config(false));
3350 engine.register_interface(make_local_client_interface(1));
3351 engine.register_interface(make_local_client_interface(2));
3352
3353 let identity =
3354 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3355 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3356 let name_h = name_hash("test", &["excl"]);
3357 let random_hash = [0x42u8; 10];
3358
3359 let (announce_data, _) =
3360 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3361
3362 let flags = PacketFlags {
3363 header_type: constants::HEADER_1,
3364 context_flag: constants::FLAG_UNSET,
3365 transport_type: constants::TRANSPORT_BROADCAST,
3366 destination_type: constants::DESTINATION_SINGLE,
3367 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3368 };
3369 let packet = RawPacket::pack(
3370 flags,
3371 0,
3372 &dest_hash,
3373 None,
3374 constants::CONTEXT_NONE,
3375 &announce_data,
3376 )
3377 .unwrap();
3378
3379 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3380 let actions = engine.handle_inbound(
3382 InboundFrame {
3383 raw: &packet.raw,
3384 iface: InterfaceId(1),
3385 now: 1000.0,
3386 rx: RxMetadata {
3387 rssi: None,
3388 snr: None,
3389 },
3390 },
3391 &mut rng,
3392 );
3393
3394 let forward = actions
3396 .iter()
3397 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3398 assert!(forward.is_some());
3399 match forward.unwrap() {
3400 TransportAction::ForwardToLocalClients { exclude, .. } => {
3401 assert_eq!(*exclude, Some(InterfaceId(1)));
3402 }
3403 _ => unreachable!(),
3404 }
3405 }
3406
3407 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3412 InterfaceInfo {
3413 id: InterfaceId(id),
3414 name: String::from("tunnel_iface"),
3415 mode: constants::MODE_FULL,
3416 out_capable: true,
3417 in_capable: true,
3418 bitrate: None,
3419 airtime_profile: None,
3420 announce_rate_target: None,
3421 announce_rate_grace: 0,
3422 announce_rate_penalty: 0.0,
3423 announce_cap: constants::ANNOUNCE_CAP,
3424 is_local_client: false,
3425 wants_tunnel: true,
3426 tunnel_id: None,
3427 mtu: constants::MTU as u32,
3428 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3429 ia_freq: 0.0,
3430 ip_freq: 0.0,
3431 op_freq: 0.0,
3432 op_samples: 0,
3433 started: 0.0,
3434 }
3435 }
3436
3437 #[test]
3438 fn test_handle_tunnel_new() {
3439 let mut engine = TransportEngine::new(make_config(true));
3440 engine.register_interface(make_tunnel_interface(1));
3441
3442 let tunnel_id = [0xAA; 32];
3443 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3444
3445 assert!(actions
3447 .iter()
3448 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3449
3450 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3452 assert_eq!(info.tunnel_id, Some(tunnel_id));
3453
3454 assert_eq!(engine.tunnel_table().len(), 1);
3456 }
3457
3458 #[test]
3459 fn test_announce_stores_tunnel_path() {
3460 use crate::announce::AnnounceData;
3461 use crate::destination::{destination_hash, name_hash};
3462
3463 let mut engine = TransportEngine::new(make_config(false));
3464 let mut iface = make_tunnel_interface(1);
3465 let tunnel_id = [0xBB; 32];
3466 iface.tunnel_id = Some(tunnel_id);
3467 engine.register_interface(iface);
3468
3469 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3471
3472 let identity =
3474 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3475 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3476 let name_h = name_hash("test", &["tunnel"]);
3477 let random_hash = [0x42u8; 10];
3478
3479 let (announce_data, _) =
3480 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3481
3482 let flags = PacketFlags {
3483 header_type: constants::HEADER_1,
3484 context_flag: constants::FLAG_UNSET,
3485 transport_type: constants::TRANSPORT_BROADCAST,
3486 destination_type: constants::DESTINATION_SINGLE,
3487 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3488 };
3489 let packet = RawPacket::pack(
3490 flags,
3491 0,
3492 &dest_hash,
3493 None,
3494 constants::CONTEXT_NONE,
3495 &announce_data,
3496 )
3497 .unwrap();
3498
3499 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3500 engine.handle_inbound(
3501 InboundFrame {
3502 raw: &packet.raw,
3503 iface: InterfaceId(1),
3504 now: 1000.0,
3505 rx: RxMetadata {
3506 rssi: None,
3507 snr: None,
3508 },
3509 },
3510 &mut rng,
3511 );
3512
3513 assert!(engine.has_path(&dest_hash));
3515
3516 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3518 assert_eq!(tunnel.paths.len(), 1);
3519 assert!(tunnel.paths.contains_key(&dest_hash));
3520 }
3521
3522 #[test]
3523 fn test_tunnel_reattach_restores_paths() {
3524 let mut engine = TransportEngine::new(make_config(true));
3525 engine.register_interface(make_tunnel_interface(1));
3526
3527 let tunnel_id = [0xCC; 32];
3528 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3529
3530 let dest = [0xDD; 16];
3532 engine.tunnel_table.store_tunnel_path(
3533 &tunnel_id,
3534 dest,
3535 tunnel::TunnelPath {
3536 timestamp: 1000.0,
3537 received_from: [0xEE; 16],
3538 hops: 3,
3539 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3540 random_blobs: Vec::new(),
3541 packet_hash: [0xFF; 32],
3542 },
3543 1000.0,
3544 constants::DESTINATION_TIMEOUT,
3545 usize::MAX,
3546 );
3547
3548 engine.void_tunnel_interface(&tunnel_id);
3550
3551 engine.path_table.remove(&dest);
3553 assert!(!engine.has_path(&dest));
3554
3555 engine.register_interface(make_interface(2, constants::MODE_FULL));
3557 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3558
3559 assert!(engine.has_path(&dest));
3561 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3562 assert_eq!(path.hops, 3);
3563 assert_eq!(path.receiving_interface, InterfaceId(2));
3564
3565 assert!(actions
3567 .iter()
3568 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3569 }
3570
3571 #[test]
3572 fn test_active_packet_hashes_include_detached_tunnel_paths() {
3573 let mut engine = TransportEngine::new(make_config(true));
3574 engine.register_interface(make_tunnel_interface(1));
3575
3576 let tunnel_id = [0xCA; 32];
3577 let destination_hash = [0xDB; 16];
3578 let packet_hash = [0xEC; 32];
3579 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3580 engine.tunnel_table.store_tunnel_path(
3581 &tunnel_id,
3582 destination_hash,
3583 tunnel::TunnelPath {
3584 timestamp: 1000.0,
3585 received_from: [0xFE; 16],
3586 hops: 2,
3587 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3588 random_blobs: Vec::new(),
3589 packet_hash,
3590 },
3591 1000.0,
3592 constants::DESTINATION_TIMEOUT,
3593 usize::MAX,
3594 );
3595 engine.void_tunnel_interface(&tunnel_id);
3596 engine.path_table.remove(&destination_hash);
3597
3598 let active_hashes = engine.active_packet_hashes();
3599 assert_eq!(active_hashes, vec![packet_hash]);
3600 }
3601
3602 #[test]
3603 fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3604 let mut engine = TransportEngine::new(make_config(true));
3605 engine.register_interface(make_tunnel_interface(1));
3606
3607 let tunnel_id = [0xCD; 32];
3608 let dest = [0xDE; 16];
3609 let older_blob = make_random_blob(100);
3610 let newer_blob = make_random_blob(200);
3611
3612 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3613 engine.tunnel_table.store_tunnel_path(
3614 &tunnel_id,
3615 dest,
3616 tunnel::TunnelPath {
3617 timestamp: 1000.0,
3618 received_from: [0xEE; 16],
3619 hops: 2,
3620 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3621 random_blobs: vec![older_blob],
3622 packet_hash: [0x11; 32],
3623 },
3624 1000.0,
3625 constants::DESTINATION_TIMEOUT,
3626 usize::MAX,
3627 );
3628 engine.void_tunnel_interface(&tunnel_id);
3629
3630 engine.path_table.insert(
3631 dest,
3632 PathSet::from_single(
3633 PathEntry {
3634 timestamp: 1500.0,
3635 next_hop: [0xAB; 16],
3636 hops: 3,
3637 expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3638 random_blobs: vec![newer_blob],
3639 receiving_interface: InterfaceId(3),
3640 packet_hash: [0x22; 32],
3641 announce_raw: None,
3642 },
3643 1,
3644 ),
3645 );
3646
3647 engine.register_interface(make_interface(2, constants::MODE_FULL));
3648 engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3649
3650 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3651 assert_eq!(path.next_hop, [0xAB; 16]);
3652 assert_eq!(path.hops, 3);
3653 assert_eq!(path.receiving_interface, InterfaceId(3));
3654 assert_eq!(path.random_blobs, vec![newer_blob]);
3655 }
3656
3657 #[test]
3658 fn test_void_tunnel_interface() {
3659 let mut engine = TransportEngine::new(make_config(true));
3660 engine.register_interface(make_tunnel_interface(1));
3661
3662 let tunnel_id = [0xDD; 32];
3663 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3664
3665 assert_eq!(
3667 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3668 Some(InterfaceId(1))
3669 );
3670
3671 engine.void_tunnel_interface(&tunnel_id);
3672
3673 assert_eq!(engine.tunnel_table().len(), 1);
3675 assert_eq!(
3676 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3677 None
3678 );
3679 }
3680
3681 #[test]
3682 fn test_tick_culls_tunnels() {
3683 let mut engine = TransportEngine::new(make_config(true));
3684 engine.register_interface(make_tunnel_interface(1));
3685
3686 let tunnel_id = [0xEE; 32];
3687 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3688 assert_eq!(engine.tunnel_table().len(), 1);
3689
3690 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3691
3692 engine.tick(
3694 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3695 &mut rng,
3696 );
3697
3698 assert_eq!(engine.tunnel_table().len(), 0);
3699 }
3700
3701 #[test]
3702 fn test_synthesize_tunnel() {
3703 let mut engine = TransportEngine::new(make_config(true));
3704 engine.register_interface(make_tunnel_interface(1));
3705
3706 let identity =
3707 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3708 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3709
3710 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3711
3712 assert_eq!(actions.len(), 1);
3714 match &actions[0] {
3715 TransportAction::TunnelSynthesize {
3716 interface,
3717 data,
3718 dest_hash,
3719 } => {
3720 assert_eq!(*interface, InterfaceId(1));
3721 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3722 let expected_dest = crate::destination::destination_hash(
3724 "rnstransport",
3725 &["tunnel", "synthesize"],
3726 None,
3727 );
3728 assert_eq!(*dest_hash, expected_dest);
3729 }
3730 _ => panic!("Expected TunnelSynthesize"),
3731 }
3732 }
3733
3734 #[test]
3735 fn test_synthesize_tunnel_missing_interface_is_dropped() {
3736 let engine = TransportEngine::new(make_config(true));
3737
3738 let identity =
3739 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3740 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3741
3742 let actions = engine.synthesize_tunnel(&identity, InterfaceId(99), &mut rng);
3743
3744 assert!(actions.is_empty());
3745 }
3746
3747 #[test]
3748 fn test_synthesize_tunnel_public_only_identity_is_dropped() {
3749 let mut engine = TransportEngine::new(make_config(true));
3750 engine.register_interface(make_tunnel_interface(1));
3751
3752 let identity =
3753 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3754 let public_key = identity.get_public_key().unwrap();
3755 let public_only_identity = rns_crypto::identity::Identity::from_public_key(&public_key);
3756 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3757
3758 let actions = engine.synthesize_tunnel(&public_only_identity, InterfaceId(1), &mut rng);
3759
3760 assert!(actions.is_empty());
3761 }
3762
3763 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3768 let mut data = Vec::new();
3769 data.extend_from_slice(dest_hash);
3770 data.extend_from_slice(tag);
3771 data
3772 }
3773
3774 fn make_transport_path_request_data(
3775 dest_hash: &[u8; 16],
3776 requestor_transport_id: &[u8; 16],
3777 tag: &[u8],
3778 ) -> Vec<u8> {
3779 let mut data = Vec::new();
3780 data.extend_from_slice(dest_hash);
3781 data.extend_from_slice(requestor_transport_id);
3782 data.extend_from_slice(tag);
3783 data
3784 }
3785
3786 fn assert_recursive_path_request_packet(raw: &[u8], dest: &[u8; 16], tag: &[u8]) {
3787 let packet = RawPacket::unpack(raw).expect("recursive path request packet");
3788 let path_request_dest =
3789 crate::destination::destination_hash("rnstransport", &["path", "request"], None);
3790
3791 assert_eq!(packet.flags.header_type, constants::HEADER_1);
3792 assert_eq!(packet.flags.transport_type, constants::TRANSPORT_BROADCAST);
3793 assert_eq!(packet.flags.destination_type, constants::DESTINATION_PLAIN);
3794 assert_eq!(packet.flags.packet_type, constants::PACKET_TYPE_DATA);
3795 assert_eq!(packet.hops, 0);
3796 assert_eq!(packet.context, constants::CONTEXT_NONE);
3797 assert_eq!(packet.destination_hash, path_request_dest);
3798
3799 let mut expected_data = Vec::new();
3800 expected_data.extend_from_slice(dest);
3801 expected_data.extend_from_slice(&[0x42; 16]);
3802 expected_data.extend_from_slice(tag);
3803 assert_eq!(packet.data, expected_data);
3804 }
3805
3806 #[test]
3807 fn test_path_request_forwarded_on_ap() {
3808 let mut engine = TransportEngine::new(make_config(true));
3809 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3810 engine.register_interface(make_interface(2, constants::MODE_FULL));
3811
3812 let dest = [0xD1; 16];
3813 let tag = [0x01; 16];
3814 let data = make_path_request_data(&dest, &tag);
3815
3816 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3817
3818 assert_eq!(actions.len(), 1);
3820 match &actions[0] {
3821 TransportAction::SendOnInterface { interface, .. } => {
3822 assert_eq!(*interface, InterfaceId(2));
3823 }
3824 _ => panic!("Expected SendOnInterface for forwarded path request"),
3825 }
3826 assert!(engine.discovery_path_requests.contains_key(&dest));
3828 }
3829
3830 #[test]
3831 fn test_recursive_path_request_rebuilds_transport_payload() {
3832 let mut engine = TransportEngine::new(make_config(true));
3833 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3834 engine.register_interface(make_interface(2, constants::MODE_FULL));
3835
3836 let dest = [0xD8; 16];
3837 let original_requestor_transport_id = [0x99; 16];
3838 let tag = [0x08; 16];
3839 let data = make_transport_path_request_data(&dest, &original_requestor_transport_id, &tag);
3840
3841 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3842
3843 assert_eq!(actions.len(), 1);
3844 match &actions[0] {
3845 TransportAction::SendOnInterface { interface, raw } => {
3846 assert_eq!(*interface, InterfaceId(2));
3847 assert_recursive_path_request_packet(raw.as_ref(), &dest, &tag);
3848 }
3849 _ => panic!("expected SendOnInterface for recursive path request"),
3850 }
3851 assert!(engine.discovery_path_requests.contains_key(&dest));
3852 }
3853
3854 #[test]
3855 fn test_path_request_not_forwarded_on_full() {
3856 let mut engine = TransportEngine::new(make_config(true));
3857 engine.register_interface(make_interface(1, constants::MODE_FULL));
3858 engine.register_interface(make_interface(2, constants::MODE_FULL));
3859
3860 let dest = [0xD2; 16];
3861 let tag = [0x02; 16];
3862 let data = make_path_request_data(&dest, &tag);
3863
3864 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3865
3866 assert!(actions.is_empty());
3868 assert!(!engine.discovery_path_requests.contains_key(&dest));
3869 }
3870
3871 #[test]
3872 fn test_duplicate_discovery_path_request_is_suppressed() {
3873 let mut engine = TransportEngine::new(make_config(true));
3874 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3875 engine.register_interface(make_interface(2, constants::MODE_FULL));
3876
3877 let dest = [0xD7; 16];
3878 let tag = [0x07; 16];
3879 let data = make_path_request_data(&dest, &tag);
3880
3881 let first = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3882 let second = engine.handle_path_request(&data, InterfaceId(1), 1001.0);
3883
3884 assert_eq!(first.len(), 1);
3885 assert!(
3886 second.is_empty(),
3887 "duplicate discovery request should be dropped"
3888 );
3889 assert_eq!(engine.discovery_pr_tags_count(), 1);
3890 }
3891
3892 #[test]
3893 fn test_path_request_ingress_burst_suppresses_recursive_discovery() {
3894 let mut engine = TransportEngine::new(make_config(true));
3895 let mut ingress = make_interface(1, constants::MODE_ACCESS_POINT);
3896 ingress.ingress_control.enabled = true;
3897 ingress.ip_freq = constants::IC_PR_BURST_FREQ + 1.0;
3898 engine.register_interface(ingress);
3899 engine.register_interface(make_interface(2, constants::MODE_FULL));
3900
3901 let dest = [0xE1; 16];
3902 let tag = [0x11; 16];
3903 let data = make_path_request_data(&dest, &tag);
3904
3905 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3906
3907 assert!(actions.is_empty());
3908 assert!(!engine.discovery_path_requests.contains_key(&dest));
3909 }
3910
3911 #[test]
3912 fn test_path_request_egress_limit_skips_only_limited_interface() {
3913 let mut engine = TransportEngine::new(make_config(true));
3914 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3915
3916 let mut limited = make_interface(2, constants::MODE_FULL);
3917 limited.ingress_control.egress_enabled = true;
3918 limited.op_freq = constants::EC_PR_FREQ + 1.0;
3919 limited.op_samples = constants::IC_BURST_MIN_SAMPLES;
3920 engine.register_interface(limited);
3921
3922 let mut allowed = make_interface(3, constants::MODE_FULL);
3923 allowed.ingress_control.egress_enabled = true;
3924 allowed.op_freq = constants::EC_PR_FREQ - 1.0;
3925 allowed.op_samples = constants::IC_BURST_MIN_SAMPLES;
3926 engine.register_interface(allowed);
3927
3928 let dest = [0xE2; 16];
3929 let tag = [0x12; 16];
3930 let data = make_path_request_data(&dest, &tag);
3931
3932 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3933
3934 assert_eq!(actions.len(), 1);
3935 match &actions[0] {
3936 TransportAction::SendOnInterface { interface, .. } => {
3937 assert_eq!(*interface, InterfaceId(3))
3938 }
3939 _ => panic!("expected SendOnInterface for the unlimited egress interface"),
3940 }
3941 assert!(engine.discovery_path_requests.contains_key(&dest));
3942 }
3943
3944 #[test]
3945 fn test_recursive_path_request_skips_interface_with_queued_announces() {
3946 let mut engine = TransportEngine::new(make_config(true));
3947 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3948 let mut blocked = make_interface(2, constants::MODE_FULL);
3949 blocked.bitrate = Some(1_000);
3950 engine.register_interface(blocked);
3951 engine.register_interface(make_interface(3, constants::MODE_FULL));
3952
3953 let _ = engine.announce_queues.gate_announce(
3954 InterfaceId(2),
3955 vec![0xAA; 100].into(),
3956 [0xA0; 16],
3957 1,
3958 900.0,
3959 900.0,
3960 Some(1_000),
3961 None,
3962 constants::ANNOUNCE_CAP,
3963 );
3964 let _ = engine.announce_queues.gate_announce(
3965 InterfaceId(2),
3966 vec![0xBB; 100].into(),
3967 [0xB0; 16],
3968 1,
3969 901.0,
3970 901.0,
3971 Some(1_000),
3972 None,
3973 constants::ANNOUNCE_CAP,
3974 );
3975
3976 let dest = [0xE3; 16];
3977 let tag = [0x13; 16];
3978 let data = make_path_request_data(&dest, &tag);
3979 let actions = engine.handle_path_request(&data, InterfaceId(1), 902.0);
3980
3981 assert_eq!(actions.len(), 1);
3982 match &actions[0] {
3983 TransportAction::SendOnInterface { interface, .. } => {
3984 assert_eq!(*interface, InterfaceId(3));
3985 }
3986 _ => panic!("expected SendOnInterface for the unqueued egress interface"),
3987 }
3988 assert!(engine.discovery_path_requests.contains_key(&dest));
3989 }
3990
3991 #[test]
3992 fn test_recursive_path_request_skips_interface_with_active_announce_cap() {
3993 let mut engine = TransportEngine::new(make_config(true));
3994 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3995 let mut blocked = make_interface(2, constants::MODE_FULL);
3996 blocked.bitrate = Some(1_000);
3997 engine.register_interface(blocked);
3998
3999 let _ = engine.announce_queues.gate_announce(
4000 InterfaceId(2),
4001 vec![0xAA; 100].into(),
4002 [0xA0; 16],
4003 1,
4004 900.0,
4005 900.0,
4006 Some(1_000),
4007 None,
4008 constants::ANNOUNCE_CAP,
4009 );
4010
4011 let dest = [0xE4; 16];
4012 let tag = [0x14; 16];
4013 let data = make_path_request_data(&dest, &tag);
4014 let actions = engine.handle_path_request(&data, InterfaceId(1), 901.0);
4015
4016 assert!(actions.is_empty());
4017 assert!(!engine.discovery_path_requests.contains_key(&dest));
4018 }
4019
4020 #[test]
4021 fn test_recursive_path_request_reserves_announce_cap_on_sent_interface() {
4022 let mut engine = TransportEngine::new(make_config(true));
4023 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4024 let mut egress = make_interface(2, constants::MODE_FULL);
4025 egress.bitrate = Some(1_000);
4026 engine.register_interface(egress);
4027
4028 let dest = [0xE5; 16];
4029 let tag = [0x15; 16];
4030 let data = make_path_request_data(&dest, &tag);
4031 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4032
4033 assert_eq!(actions.len(), 1);
4034 let queue = engine
4035 .announce_queues
4036 .queue_for(&InterfaceId(2))
4037 .expect("sent recursive PR should create announce-cap state");
4038 assert!(
4039 queue.announce_allowed_at > 1000.0,
4040 "recursive PR should reserve announce-cap airtime"
4041 );
4042 assert!(queue.entries.is_empty());
4043 }
4044
4045 #[test]
4046 fn test_discovery_pr_tags_fifo_eviction() {
4047 let mut config = make_config(true);
4048 config.max_discovery_pr_tags = 2;
4049 let mut engine = TransportEngine::new(config);
4050
4051 let dest1 = [0xA1; 16];
4052 let dest2 = [0xA2; 16];
4053 let dest3 = [0xA3; 16];
4054 let tag1 = [0x01; 16];
4055 let tag2 = [0x02; 16];
4056 let tag3 = [0x03; 16];
4057
4058 engine.handle_path_request(
4059 &make_path_request_data(&dest1, &tag1),
4060 InterfaceId(1),
4061 1000.0,
4062 );
4063 engine.handle_path_request(
4064 &make_path_request_data(&dest2, &tag2),
4065 InterfaceId(1),
4066 1001.0,
4067 );
4068 assert_eq!(engine.discovery_pr_tags_count(), 2);
4069
4070 let unique1 = make_unique_tag(dest1, &tag1);
4071 let unique2 = make_unique_tag(dest2, &tag2);
4072 assert!(engine.has_discovery_pr_tag(&unique1));
4073 assert!(engine.has_discovery_pr_tag(&unique2));
4074
4075 engine.handle_path_request(
4076 &make_path_request_data(&dest3, &tag3),
4077 InterfaceId(1),
4078 1002.0,
4079 );
4080 assert_eq!(engine.discovery_pr_tags_count(), 2);
4081 assert!(!engine.has_discovery_pr_tag(&unique1));
4082 assert!(engine.has_discovery_pr_tag(&unique2));
4083
4084 engine.handle_path_request(
4085 &make_path_request_data(&dest1, &tag1),
4086 InterfaceId(1),
4087 1003.0,
4088 );
4089 assert_eq!(engine.discovery_pr_tags_count(), 2);
4090 assert!(engine.has_discovery_pr_tag(&unique1));
4091 }
4092
4093 #[test]
4094 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
4095 let mut config = make_config(false);
4096 config.max_path_destinations = 2;
4097 let mut engine = TransportEngine::new(config);
4098 engine.register_interface(make_interface(1, constants::MODE_FULL));
4099
4100 let dest1 = [0xB1; 16];
4101 let dest2 = [0xB2; 16];
4102 let dest3 = [0xB3; 16];
4103
4104 engine.upsert_path_destination(
4105 dest1,
4106 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
4107 1000.0,
4108 );
4109 engine.upsert_path_destination(
4110 dest2,
4111 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
4112 1001.0,
4113 );
4114 engine
4115 .path_states
4116 .insert(dest1, constants::STATE_UNRESPONSIVE);
4117
4118 engine.upsert_path_destination(
4119 dest3,
4120 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
4121 1002.0,
4122 );
4123
4124 assert_eq!(engine.path_table_count(), 2);
4125 assert!(!engine.has_path(&dest1));
4126 assert!(engine.has_path(&dest2));
4127 assert!(engine.has_path(&dest3));
4128 assert!(!engine.path_states.contains_key(&dest1));
4129 assert_eq!(engine.path_destination_cap_evict_count(), 1);
4130 }
4131
4132 #[test]
4133 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
4134 let mut config = make_config(false);
4135 config.max_path_destinations = 2;
4136 config.max_paths_per_destination = 2;
4137 let mut engine = TransportEngine::new(config);
4138 engine.register_interface(make_interface(1, constants::MODE_FULL));
4139
4140 let dest1 = [0xC1; 16];
4141 let dest2 = [0xC2; 16];
4142
4143 engine.upsert_path_destination(
4144 dest1,
4145 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
4146 1000.0,
4147 );
4148 engine.upsert_path_destination(
4149 dest2,
4150 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
4151 1001.0,
4152 );
4153
4154 engine.upsert_path_destination(
4155 dest2,
4156 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
4157 1002.0,
4158 );
4159
4160 assert_eq!(engine.path_table_count(), 2);
4161 assert!(engine.has_path(&dest1));
4162 assert!(engine.has_path(&dest2));
4163 }
4164
4165 #[test]
4166 fn test_roaming_loop_prevention() {
4167 let mut engine = TransportEngine::new(make_config(true));
4168 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
4169
4170 let dest = [0xD3; 16];
4171 engine.path_table.insert(
4173 dest,
4174 PathSet::from_single(
4175 PathEntry {
4176 timestamp: 900.0,
4177 next_hop: [0xAA; 16],
4178 hops: 2,
4179 expires: 9999.0,
4180 random_blobs: Vec::new(),
4181 receiving_interface: InterfaceId(1),
4182 packet_hash: [0; 32],
4183 announce_raw: None,
4184 },
4185 1,
4186 ),
4187 );
4188
4189 let tag = [0x03; 16];
4190 let data = make_path_request_data(&dest, &tag);
4191
4192 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4193
4194 assert!(actions.is_empty());
4196 assert!(!engine.announce_table.contains_key(&dest));
4197 }
4198
4199 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
4201 let flags: u8 = 0x01; let mut raw = Vec::new();
4205 raw.push(flags);
4206 raw.push(0x02); raw.extend_from_slice(dest_hash);
4208 raw.push(constants::CONTEXT_NONE);
4209 raw.extend_from_slice(payload);
4210 raw
4211 }
4212
4213 #[test]
4214 fn test_path_request_populates_announce_entry_from_raw() {
4215 let mut engine = TransportEngine::new(make_config(true));
4216 engine.register_interface(make_interface(1, constants::MODE_FULL));
4217 engine.register_interface(make_interface(2, constants::MODE_FULL));
4218
4219 let dest = [0xD5; 16];
4220 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
4222
4223 engine.path_table.insert(
4224 dest,
4225 PathSet::from_single(
4226 PathEntry {
4227 timestamp: 900.0,
4228 next_hop: [0xBB; 16],
4229 hops: 2,
4230 expires: 9999.0,
4231 random_blobs: Vec::new(),
4232 receiving_interface: InterfaceId(2),
4233 packet_hash: [0; 32],
4234 announce_raw: Some(announce_raw.clone()),
4235 },
4236 1,
4237 ),
4238 );
4239
4240 let tag = [0x05; 16];
4241 let data = make_path_request_data(&dest, &tag);
4242 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4243
4244 let entry = engine
4246 .announce_table
4247 .get(&dest)
4248 .expect("announce entry must exist");
4249 assert_eq!(entry.packet_raw, announce_raw);
4250 assert_eq!(entry.packet_data, payload);
4251 assert!(entry.block_rebroadcasts);
4252 }
4253
4254 #[test]
4255 fn test_path_request_discovers_when_known_path_has_no_announce_raw() {
4256 let mut engine = TransportEngine::new(make_config(true));
4257 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4258 engine.register_interface(make_interface(2, constants::MODE_FULL));
4259
4260 let dest = [0xD6; 16];
4261
4262 engine.path_table.insert(
4263 dest,
4264 PathSet::from_single(
4265 PathEntry {
4266 timestamp: 900.0,
4267 next_hop: [0xCC; 16],
4268 hops: 1,
4269 expires: 9999.0,
4270 random_blobs: Vec::new(),
4271 receiving_interface: InterfaceId(2),
4272 packet_hash: [0; 32],
4273 announce_raw: None, },
4275 1,
4276 ),
4277 );
4278
4279 let tag = [0x06; 16];
4280 let data = make_path_request_data(&dest, &tag);
4281 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4282
4283 assert!(!engine.announce_table.contains_key(&dest));
4284 assert_eq!(actions.len(), 1);
4285 match &actions[0] {
4286 TransportAction::SendOnInterface { interface, raw } => {
4287 assert_eq!(*interface, InterfaceId(2));
4288 assert_recursive_path_request_packet(raw.as_ref(), &dest, &tag);
4289 }
4290 _ => panic!("expected SendOnInterface for recursive path request"),
4291 }
4292 assert!(engine.discovery_path_requests.contains_key(&dest));
4293 }
4294
4295 #[test]
4296 fn test_discovery_request_consumed_on_announce() {
4297 let mut engine = TransportEngine::new(make_config(true));
4298 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4299
4300 let dest = [0xD4; 16];
4301
4302 engine.discovery_path_requests.insert(
4304 dest,
4305 DiscoveryPathRequest {
4306 timestamp: 900.0,
4307 requesting_interface: InterfaceId(1),
4308 },
4309 );
4310
4311 let iface = engine.discovery_path_requests_waiting(&dest);
4313 assert_eq!(iface, Some(InterfaceId(1)));
4314
4315 assert!(!engine.discovery_path_requests.contains_key(&dest));
4317 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
4318 }
4319
4320 #[test]
4321 fn test_pending_path_request_announce_bypasses_ingress_control() {
4322 let mut engine = TransportEngine::new(make_config(true));
4323 let mut inbound = make_interface(1, constants::MODE_FULL);
4324 inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
4325 inbound.ia_freq = 10_000.0;
4326 inbound.started = 0.0;
4327 engine.register_interface(inbound);
4328 engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
4329
4330 let identity =
4331 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4332 let dest_hash = crate::destination::destination_hash(
4333 "ingress",
4334 &["path-request"],
4335 Some(identity.hash()),
4336 );
4337 let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
4338 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4339
4340 engine.discovery_path_requests.insert(
4341 dest_hash,
4342 DiscoveryPathRequest {
4343 timestamp: 999.0,
4344 requesting_interface: InterfaceId(2),
4345 },
4346 );
4347
4348 let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
4349 let actions = engine.handle_inbound(
4350 InboundFrame {
4351 raw: &announce_raw,
4352 iface: InterfaceId(1),
4353 now: 1000.0,
4354 rx: RxMetadata {
4355 rssi: None,
4356 snr: None,
4357 },
4358 },
4359 &mut rng,
4360 );
4361
4362 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
4363 assert!(engine.has_path(&dest_hash));
4364 assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
4365 assert!(actions.iter().any(|a| {
4366 matches!(
4367 a,
4368 TransportAction::AnnounceReceived {
4369 destination_hash,
4370 receiving_interface: InterfaceId(1),
4371 ..
4372 } if *destination_hash == dest_hash
4373 )
4374 }));
4375
4376 let entry = engine
4377 .announce_table
4378 .get(&dest_hash)
4379 .expect("path response announce should be queued");
4380 assert!(entry.block_rebroadcasts);
4381 assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
4382 }
4383
4384 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
4390 let identity =
4391 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4392 let random_hash = [0x42u8; 10];
4393 let (announce_data, _) = crate::announce::AnnounceData::pack(
4394 &identity,
4395 dest_hash,
4396 name_hash,
4397 &random_hash,
4398 None,
4399 None,
4400 )
4401 .unwrap();
4402 let flags = PacketFlags {
4403 header_type: constants::HEADER_1,
4404 context_flag: constants::FLAG_UNSET,
4405 transport_type: constants::TRANSPORT_BROADCAST,
4406 destination_type: constants::DESTINATION_SINGLE,
4407 packet_type: constants::PACKET_TYPE_ANNOUNCE,
4408 };
4409 RawPacket::pack(
4410 flags,
4411 0,
4412 dest_hash,
4413 None,
4414 constants::CONTEXT_NONE,
4415 &announce_data,
4416 )
4417 .unwrap()
4418 .raw
4419 }
4420
4421 #[test]
4422 fn test_ingress_held_announce_preserves_rx_metadata_on_release() {
4423 let mut engine = TransportEngine::new(make_config(true));
4424 let mut inbound = make_interface(1, constants::MODE_FULL);
4425 inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
4426 inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
4427 inbound.started = 0.0;
4428 engine.register_interface(inbound);
4429
4430 let identity =
4431 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4432 let dest_hash =
4433 crate::destination::destination_hash("ingress", &["rx"], Some(identity.hash()));
4434 let name_hash = crate::destination::name_hash("ingress", &["rx"]);
4435 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4436 let rx = RxMetadata {
4437 rssi: Some(-91),
4438 snr: Some(5.5),
4439 };
4440
4441 let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
4442 let held_actions = engine.handle_inbound(
4443 InboundFrame::new(&announce_raw, InterfaceId(1), 10000.0).with_rx(rx),
4444 &mut rng,
4445 );
4446
4447 assert!(held_actions.is_empty());
4448 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 1);
4449 assert!(!engine.has_path(&dest_hash));
4450
4451 engine
4452 .interfaces
4453 .get_mut(&InterfaceId(1))
4454 .expect("interface must exist")
4455 .ia_freq = 0.0;
4456
4457 let released_actions = engine.tick(10000.0 + constants::IC_BURST_PENALTY + 1.0, &mut rng);
4458
4459 let released_rx = released_actions.iter().find_map(|action| match action {
4460 TransportAction::AnnounceReceived {
4461 destination_hash,
4462 rx: action_rx,
4463 ..
4464 } if *destination_hash == dest_hash => Some(*action_rx),
4465 _ => None,
4466 });
4467
4468 assert_eq!(released_rx, Some(rx));
4469 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
4470 assert!(engine.has_path(&dest_hash));
4471 }
4472
4473 #[test]
4474 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
4475 let mut engine = TransportEngine::new(make_config(false));
4480 engine.register_interface(make_local_client_interface(1));
4481
4482 let identity =
4483 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4484 let dest_hash =
4485 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
4486 let name_hash = crate::destination::name_hash("issue4", &["test"]);
4487 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4488
4489 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
4493 announce_packet.raw[1] = 1;
4494 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4495 engine.handle_inbound(
4496 InboundFrame {
4497 raw: &announce_packet.raw,
4498 iface: InterfaceId(1),
4499 now: 1000.0,
4500 rx: RxMetadata {
4501 rssi: None,
4502 snr: None,
4503 },
4504 },
4505 &mut rng,
4506 );
4507 assert!(engine.has_path(&dest_hash));
4508 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4509
4510 let data_flags = PacketFlags {
4512 header_type: constants::HEADER_1,
4513 context_flag: constants::FLAG_UNSET,
4514 transport_type: constants::TRANSPORT_BROADCAST,
4515 destination_type: constants::DESTINATION_SINGLE,
4516 packet_type: constants::PACKET_TYPE_DATA,
4517 };
4518 let data_packet = RawPacket::pack(
4519 data_flags,
4520 0,
4521 &dest_hash,
4522 None,
4523 constants::CONTEXT_NONE,
4524 b"hello",
4525 )
4526 .unwrap();
4527
4528 let actions =
4529 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
4530
4531 let send = actions.iter().find_map(|a| match a {
4532 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4533 _ => None,
4534 });
4535 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4536 assert_eq!(*interface, InterfaceId(1));
4537 let flags = PacketFlags::unpack(raw[0]);
4538 assert_eq!(flags.header_type, constants::HEADER_2);
4539 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4540 }
4541
4542 #[test]
4543 fn test_issue4_external_data_to_shared_client_strips_transport_header() {
4544 let daemon_id = [0x42; 16];
4545 let mut engine = TransportEngine::new(make_config(true));
4546 engine.register_interface(make_interface(1, constants::MODE_FULL));
4547 engine.register_interface(make_local_client_interface(2));
4548
4549 let dest_hash = [0x99; 16];
4550 engine.upsert_path_destination(
4551 dest_hash,
4552 make_path_entry(1000.0, 1, InterfaceId(2), daemon_id),
4553 1000.0,
4554 );
4555
4556 let h2_flags = PacketFlags {
4557 header_type: constants::HEADER_2,
4558 context_flag: constants::FLAG_UNSET,
4559 transport_type: constants::TRANSPORT_TRANSPORT,
4560 destination_type: constants::DESTINATION_SINGLE,
4561 packet_type: constants::PACKET_TYPE_DATA,
4562 };
4563 let mut h2_raw = Vec::new();
4564 h2_raw.push(h2_flags.pack());
4565 h2_raw.push(0);
4566 h2_raw.extend_from_slice(&daemon_id);
4567 h2_raw.extend_from_slice(&dest_hash);
4568 h2_raw.push(constants::CONTEXT_NONE);
4569 h2_raw.extend_from_slice(b"hello shared client");
4570
4571 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
4572 let actions = engine.handle_inbound(
4573 InboundFrame {
4574 raw: &h2_raw,
4575 iface: InterfaceId(1),
4576 now: 1001.0,
4577 rx: RxMetadata {
4578 rssi: None,
4579 snr: None,
4580 },
4581 },
4582 &mut rng,
4583 );
4584
4585 let raw = actions.iter().find_map(|a| match a {
4586 TransportAction::SendOnInterface { interface, raw } if *interface == InterfaceId(2) => {
4587 Some(raw)
4588 }
4589 _ => None,
4590 });
4591 let raw = raw.expect("daemon should forward external DATA to shared client");
4592 let flags = PacketFlags::unpack(raw[0]);
4593 assert_eq!(flags.header_type, constants::HEADER_1);
4594 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
4595 assert_eq!(&raw[2..18], &dest_hash);
4596 assert_eq!(&raw[19..], b"hello shared client");
4597 }
4598
4599 #[test]
4600 fn test_issue4_external_data_to_1hop_via_transport_works() {
4601 let daemon_id = [0x42; 16];
4607 let mut engine = TransportEngine::new(TransportConfig {
4608 transport_enabled: true,
4609 identity_hash: Some(daemon_id),
4610 prefer_shorter_path: false,
4611 max_paths_per_destination: 1,
4612 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4613 max_discovery_pr_tags: constants::MAX_PR_TAGS,
4614 max_path_destinations: usize::MAX,
4615 max_tunnel_destinations_total: usize::MAX,
4616 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4617 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4618 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4619 announce_sig_cache_enabled: true,
4620 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4621 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4622 announce_queue_max_entries: 256,
4623 announce_queue_max_interfaces: 1024,
4624 });
4625 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
4629 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4630 let dest_hash =
4631 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4632 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4633 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4634
4635 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4637 engine.handle_inbound(
4638 InboundFrame {
4639 raw: &announce_raw,
4640 iface: InterfaceId(2),
4641 now: 1000.0,
4642 rx: RxMetadata {
4643 rssi: None,
4644 snr: None,
4645 },
4646 },
4647 &mut rng,
4648 );
4649 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4650
4651 let h2_flags = PacketFlags {
4654 header_type: constants::HEADER_2,
4655 context_flag: constants::FLAG_UNSET,
4656 transport_type: constants::TRANSPORT_TRANSPORT,
4657 destination_type: constants::DESTINATION_SINGLE,
4658 packet_type: constants::PACKET_TYPE_DATA,
4659 };
4660 let mut h2_raw = Vec::new();
4662 h2_raw.push(h2_flags.pack());
4663 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
4666 h2_raw.push(constants::CONTEXT_NONE);
4667 h2_raw.extend_from_slice(b"hello via transport");
4668
4669 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4670 let actions = engine.handle_inbound(
4671 InboundFrame {
4672 raw: &h2_raw,
4673 iface: InterfaceId(1),
4674 now: 1001.0,
4675 rx: RxMetadata {
4676 rssi: None,
4677 snr: None,
4678 },
4679 },
4680 &mut rng2,
4681 );
4682
4683 let has_send = actions.iter().any(|a| {
4685 matches!(
4686 a,
4687 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4688 )
4689 });
4690 assert!(
4691 has_send,
4692 "HEADER_2 transport packet should be forwarded (control test)"
4693 );
4694 }
4695}