1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod path_requests;
10pub mod pathfinder;
11pub mod queries;
12pub mod rate_limit;
13pub mod retention;
14pub mod tables;
15pub mod tunnel;
16pub mod types;
17
18use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
19use alloc::string::String;
20use alloc::vec::Vec;
21use core::mem::size_of;
22
23use rns_crypto::Rng;
24
25use crate::announce::AnnounceData;
26use crate::constants;
27use crate::hash;
28use crate::packet::RawPacket;
29
30use self::announce_proc::compute_path_expires;
31use self::announce_queue::AnnounceQueues;
32use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
33use self::dedup::{AnnounceSignatureCache, PacketHashlist};
34use self::inbound::{
35 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
36 route_via_link_table,
37};
38use self::ingress_control::IngressControl;
39use self::outbound::{route_outbound, should_transmit_announce};
40use self::pathfinder::{
41 decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
42 timebase_from_random_blobs, MultiPathDecision,
43};
44use self::rate_limit::AnnounceRateLimiter;
45use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
46use self::tunnel::TunnelTable;
47use self::types::{
48 BlackholeEntry, InterfaceId, InterfaceInfo, PacketBytes, TransportAction, TransportConfig,
49};
50
51pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
52pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
53
54struct InboundPacketCtx {
55 packet: RawPacket,
56 original_raw: Option<Vec<u8>>,
57 iface: InterfaceId,
58 now: f64,
59 from_local_client: bool,
60}
61
62struct VerifiedAnnounceCtx<'a> {
63 packet: &'a RawPacket,
64 original_raw: &'a [u8],
65 iface: InterfaceId,
66 now: f64,
67 validated: crate::announce::ValidatedAnnounce,
68 received_from: [u8; 16],
69 random_blob: [u8; 10],
70 announce_emitted: u64,
71}
72
73struct TickCtx<'a> {
74 now: f64,
75 rng: &'a mut dyn Rng,
76 actions: Vec<TransportAction>,
77}
78
79struct PathRequestCtx<'a> {
80 data: &'a [u8],
81 interface_id: InterfaceId,
82 now: f64,
83 destination_hash: [u8; 16],
84}
85
86pub struct TransportEngine {
91 config: TransportConfig,
92 path_table: BTreeMap<[u8; 16], PathSet>,
93 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
94 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
95 link_table: BTreeMap<[u8; 16], LinkEntry>,
96 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
97 packet_hashlist: PacketHashlist,
98 announce_sig_cache: AnnounceSignatureCache,
99 rate_limiter: AnnounceRateLimiter,
100 path_states: BTreeMap<[u8; 16], u8>,
101 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
102 local_destinations: BTreeMap<[u8; 16], u8>,
103 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
104 announce_queues: AnnounceQueues,
105 ingress_control: IngressControl,
106 tunnel_table: TunnelTable,
107 discovery_pr_tags: VecDeque<[u8; 32]>,
108 discovery_pr_tag_set: BTreeSet<[u8; 32]>,
109 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
110 path_destination_cap_evict_count: usize,
111 announces_last_checked: f64,
113 tables_last_culled: f64,
114}
115
116impl TransportEngine {
117 pub fn new(config: TransportConfig) -> Self {
118 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
119 let sig_cache_max = if config.announce_sig_cache_enabled {
120 config.announce_sig_cache_max_entries
121 } else {
122 0
123 };
124 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
125 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
126 TransportEngine {
127 config,
128 path_table: BTreeMap::new(),
129 announce_table: BTreeMap::new(),
130 reverse_table: BTreeMap::new(),
131 link_table: BTreeMap::new(),
132 held_announces: BTreeMap::new(),
133 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
134 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
135 rate_limiter: AnnounceRateLimiter::new(),
136 path_states: BTreeMap::new(),
137 interfaces: BTreeMap::new(),
138 local_destinations: BTreeMap::new(),
139 blackholed_identities: BTreeMap::new(),
140 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
141 ingress_control: IngressControl::new(),
142 tunnel_table: TunnelTable::new(),
143 discovery_pr_tags: VecDeque::new(),
144 discovery_pr_tag_set: BTreeSet::new(),
145 discovery_path_requests: BTreeMap::new(),
146 path_destination_cap_evict_count: 0,
147 announces_last_checked: 0.0,
148 tables_last_culled: 0.0,
149 }
150 }
151
152 pub fn register_interface(&mut self, info: InterfaceInfo) {
157 self.interfaces.insert(info.id, info);
158 }
159
160 pub fn deregister_interface(&mut self, id: InterfaceId) {
161 self.interfaces.remove(&id);
162 self.announce_queues.remove_interface(id);
163 self.ingress_control.remove_interface(&id);
164 }
165
166 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
171 self.local_destinations.insert(dest_hash, dest_type);
172 }
173
174 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
175 self.local_destinations.remove(dest_hash);
176 }
177
178 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
183 self.path_table
184 .get(dest_hash)
185 .is_some_and(|ps| !ps.is_empty())
186 }
187
188 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
189 self.path_table
190 .get(dest_hash)
191 .and_then(|ps| ps.primary())
192 .map(|e| e.hops)
193 }
194
195 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
196 self.path_table
197 .get(dest_hash)
198 .and_then(|ps| ps.primary())
199 .map(|e| e.next_hop)
200 }
201
202 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
203 self.path_table
204 .get(dest_hash)
205 .and_then(|ps| ps.primary())
206 .map(|e| e.receiving_interface)
207 }
208
209 pub fn mark_path_unresponsive(
219 &mut self,
220 dest_hash: &[u8; 16],
221 receiving_interface: Option<InterfaceId>,
222 ) {
223 if let Some(iface_id) = receiving_interface {
224 if let Some(info) = self.interfaces.get(&iface_id) {
225 if info.mode == constants::MODE_BOUNDARY {
226 return;
227 }
228 }
229 }
230
231 if let Some(ps) = self.path_table.get_mut(dest_hash) {
233 if ps.len() > 1 {
234 ps.failover(false); self.path_states.remove(dest_hash);
237 return;
238 }
239 }
240
241 self.path_states
242 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
243 }
244
245 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
246 self.path_states
247 .insert(*dest_hash, constants::STATE_RESPONSIVE);
248 }
249
250 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
251 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
252 }
253
254 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
255 if let Some(ps) = self.path_table.get_mut(dest_hash) {
256 ps.expire_all();
257 }
258 }
259
260 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
265 self.link_table.insert(link_id, entry);
266 }
267
268 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
269 if let Some(entry) = self.link_table.get_mut(link_id) {
270 entry.validated = true;
271 }
272 }
273
274 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
275 self.link_table.remove(link_id);
276 }
277
278 pub fn blackhole_identity(
284 &mut self,
285 identity_hash: [u8; 16],
286 now: f64,
287 duration_hours: Option<f64>,
288 reason: Option<String>,
289 ) {
290 let expires = match duration_hours {
291 Some(h) if h > 0.0 => now + h * 3600.0,
292 _ => 0.0, };
294 self.blackholed_identities.insert(
295 identity_hash,
296 BlackholeEntry {
297 created: now,
298 expires,
299 reason,
300 },
301 );
302 }
303
304 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
306 self.blackholed_identities.remove(identity_hash).is_some()
307 }
308
309 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
311 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
312 if entry.expires == 0.0 || entry.expires > now {
313 return true;
314 }
315 }
316 false
317 }
318
319 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
321 self.blackholed_identities.iter()
322 }
323
324 fn cull_blackholed(&mut self, now: f64) {
326 self.blackholed_identities
327 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
328 }
329
330 pub fn handle_tunnel(
338 &mut self,
339 tunnel_id: [u8; 32],
340 interface: InterfaceId,
341 now: f64,
342 ) -> Vec<TransportAction> {
343 let mut actions = Vec::new();
344
345 if let Some(info) = self.interfaces.get_mut(&interface) {
347 info.tunnel_id = Some(tunnel_id);
348 }
349
350 let restored_paths = self.tunnel_table.handle_tunnel(
351 tunnel_id,
352 interface,
353 now,
354 self.config.destination_timeout_secs,
355 );
356
357 for (dest_hash, tunnel_path) in &restored_paths {
359 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
360 Some(existing) => {
361 if tunnel_path.hops <= existing.hops || existing.expires < now {
364 let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
365 let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
366 tunnel_timebase >= existing_timebase
367 } else {
368 false
369 }
370 }
371 None => now < tunnel_path.expires,
372 };
373
374 if should_restore {
375 let entry = PathEntry {
376 timestamp: tunnel_path.timestamp,
377 next_hop: tunnel_path.received_from,
378 hops: tunnel_path.hops,
379 expires: tunnel_path.expires,
380 random_blobs: tunnel_path.random_blobs.clone(),
381 receiving_interface: interface,
382 packet_hash: tunnel_path.packet_hash,
383 announce_raw: None,
384 };
385 self.upsert_path_destination(*dest_hash, entry, now);
386 }
387 }
388
389 actions.push(TransportAction::TunnelEstablished {
390 tunnel_id,
391 interface,
392 });
393
394 actions
395 }
396
397 pub fn synthesize_tunnel(
405 &self,
406 identity: &rns_crypto::identity::Identity,
407 interface_id: InterfaceId,
408 rng: &mut dyn Rng,
409 ) -> Vec<TransportAction> {
410 let mut actions = Vec::new();
411
412 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
414 hash::full_hash(info.name.as_bytes())
415 } else {
416 return actions;
417 };
418
419 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
420 Ok((data, _tunnel_id)) => {
421 let dest_hash = crate::destination::destination_hash(
422 "rnstransport",
423 &["tunnel", "synthesize"],
424 None,
425 );
426 actions.push(TransportAction::TunnelSynthesize {
427 interface: interface_id,
428 data,
429 dest_hash,
430 });
431 }
432 Err(e) => {
433 let _ = e;
435 }
436 }
437
438 actions
439 }
440
441 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
443 self.tunnel_table.void_tunnel_interface(tunnel_id);
444 }
445
446 pub fn tunnel_table(&self) -> &TunnelTable {
448 &self.tunnel_table
449 }
450
451 fn has_local_clients(&self) -> bool {
457 self.interfaces.values().any(|i| i.is_local_client)
458 }
459
460 fn packet_filter(&self, packet: &RawPacket) -> bool {
464 if packet.transport_id.is_some()
466 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
467 {
468 if let Some(ref identity_hash) = self.config.identity_hash {
469 if packet.transport_id.as_ref() != Some(identity_hash) {
470 return false;
471 }
472 }
473 }
474
475 match packet.context {
477 constants::CONTEXT_KEEPALIVE
478 | constants::CONTEXT_RESOURCE_REQ
479 | constants::CONTEXT_RESOURCE_PRF
480 | constants::CONTEXT_RESOURCE
481 | constants::CONTEXT_CACHE_REQUEST
482 | constants::CONTEXT_CHANNEL => return true,
483 _ => {}
484 }
485
486 if packet.flags.destination_type == constants::DESTINATION_PLAIN
488 || packet.flags.destination_type == constants::DESTINATION_GROUP
489 {
490 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
491 return packet.hops <= 1;
492 } else {
493 return false;
495 }
496 }
497
498 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
500 return true;
501 }
502
503 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
505 && packet.flags.destination_type == constants::DESTINATION_SINGLE
506 {
507 return true;
508 }
509
510 false
511 }
512
513 pub fn handle_inbound(
521 &mut self,
522 raw: &[u8],
523 iface: InterfaceId,
524 now: f64,
525 rng: &mut dyn Rng,
526 ) -> Vec<TransportAction> {
527 self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
528 }
529
530 pub fn handle_inbound_with_announce_queue(
531 &mut self,
532 raw: &[u8],
533 iface: InterfaceId,
534 now: f64,
535 rng: &mut dyn Rng,
536 announce_queue: Option<&mut AnnounceVerifyQueue>,
537 ) -> Vec<TransportAction> {
538 let Some(ctx) = self.prepare_inbound_packet(raw, iface, now) else {
539 return Vec::new();
540 };
541 let mut actions = Vec::new();
542
543 self.remember_inbound_packet_hash(&ctx.packet);
544 self.bridge_plain_broadcast(&ctx, &mut actions);
545 self.handle_transport_forwarding(&ctx, &mut actions);
546 self.handle_link_table_routing(&ctx, &mut actions);
547 self.handle_inbound_announce(&ctx, rng, announce_queue, &mut actions);
548
549 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
550 self.process_inbound_proof(&ctx.packet, ctx.iface, ctx.now, &mut actions);
551 }
552
553 self.handle_inbound_local_delivery(&ctx, &mut actions);
554 actions
555 }
556
557 fn prepare_inbound_packet(
558 &self,
559 raw: &[u8],
560 iface: InterfaceId,
561 now: f64,
562 ) -> Option<InboundPacketCtx> {
563 let mut packet = RawPacket::unpack(raw).ok()?;
564 let from_local_client = self
565 .interfaces
566 .get(&iface)
567 .map(|i| i.is_local_client)
568 .unwrap_or(false);
569 packet.hops += 1;
570 if from_local_client {
571 packet.hops = packet.hops.saturating_sub(1);
572 }
573 if !self.packet_filter(&packet) {
574 return None;
575 }
576 let retain_original_raw = packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE;
577 Some(InboundPacketCtx {
578 packet,
579 original_raw: if retain_original_raw {
580 Some(raw.to_vec())
581 } else {
582 None
583 },
584 iface,
585 now,
586 from_local_client,
587 })
588 }
589
590 fn remember_inbound_packet_hash(&mut self, packet: &RawPacket) {
591 let remember_hash = !(self.link_table.contains_key(&packet.destination_hash)
592 || (packet.flags.packet_type == constants::PACKET_TYPE_PROOF
593 && packet.context == constants::CONTEXT_LRPROOF));
594 if remember_hash {
595 self.packet_hashlist.add(packet.packet_hash);
596 }
597 }
598
599 fn bridge_plain_broadcast(&self, ctx: &InboundPacketCtx, actions: &mut Vec<TransportAction>) {
600 if ctx.packet.flags.destination_type != constants::DESTINATION_PLAIN
601 || ctx.packet.flags.transport_type != constants::TRANSPORT_BROADCAST
602 || !self.has_local_clients()
603 {
604 return;
605 }
606
607 if ctx.from_local_client {
608 actions.push(TransportAction::ForwardPlainBroadcast {
609 raw: PacketBytes::from(ctx.packet.raw.clone()),
610 to_local: false,
611 exclude: Some(ctx.iface),
612 });
613 } else {
614 actions.push(TransportAction::ForwardPlainBroadcast {
615 raw: PacketBytes::from(ctx.packet.raw.clone()),
616 to_local: true,
617 exclude: None,
618 });
619 }
620 }
621
622 fn handle_transport_forwarding(
623 &mut self,
624 ctx: &InboundPacketCtx,
625 actions: &mut Vec<TransportAction>,
626 ) {
627 if !(self.config.transport_enabled || self.config.identity_hash.is_some()) {
628 return;
629 }
630 if ctx.packet.transport_id.is_none()
631 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
632 {
633 return;
634 }
635
636 let Some(identity_hash) = self.config.identity_hash else {
637 return;
638 };
639 if ctx.packet.transport_id != Some(identity_hash) {
640 return;
641 }
642
643 let Some(path_entry) = self
644 .path_table
645 .get(&ctx.packet.destination_hash)
646 .and_then(|ps| ps.primary())
647 else {
648 return;
649 };
650
651 let next_hop = path_entry.next_hop;
652 let remaining_hops = path_entry.hops;
653 let outbound_interface = path_entry.receiving_interface;
654 let new_raw =
655 forward_transport_packet(&ctx.packet, next_hop, remaining_hops, outbound_interface);
656
657 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
658 let proof_timeout = ctx.now
659 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP * (remaining_hops.max(1) as f64);
660 let (link_id, link_entry) = create_link_entry(
661 &ctx.packet,
662 next_hop,
663 outbound_interface,
664 remaining_hops,
665 ctx.iface,
666 ctx.now,
667 proof_timeout,
668 );
669 self.link_table.insert(link_id, link_entry);
670 actions.push(TransportAction::LinkRequestReceived {
671 link_id,
672 destination_hash: ctx.packet.destination_hash,
673 receiving_interface: ctx.iface,
674 });
675 } else {
676 let (trunc_hash, reverse_entry) =
677 create_reverse_entry(&ctx.packet, outbound_interface, ctx.iface, ctx.now);
678 self.reverse_table.insert(trunc_hash, reverse_entry);
679 }
680
681 actions.push(TransportAction::SendOnInterface {
682 interface: outbound_interface,
683 raw: new_raw.into(),
684 });
685
686 if let Some(entry) = self
687 .path_table
688 .get_mut(&ctx.packet.destination_hash)
689 .and_then(|ps| ps.primary_mut())
690 {
691 entry.timestamp = ctx.now;
692 }
693 }
694
695 fn handle_link_table_routing(
696 &mut self,
697 ctx: &InboundPacketCtx,
698 actions: &mut Vec<TransportAction>,
699 ) {
700 if !self.config.transport_enabled && self.config.identity_hash.is_none() {
701 return;
702 }
703 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
704 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
705 || ctx.packet.context == constants::CONTEXT_LRPROOF
706 {
707 return;
708 }
709
710 let Some(link_entry) = self.link_table.get(&ctx.packet.destination_hash).cloned() else {
711 return;
712 };
713 let Some((outbound_iface, new_raw)) =
714 route_via_link_table(&ctx.packet, &link_entry, ctx.iface)
715 else {
716 return;
717 };
718
719 self.packet_hashlist.add(ctx.packet.packet_hash);
720 actions.push(TransportAction::SendOnInterface {
721 interface: outbound_iface,
722 raw: new_raw.into(),
723 });
724
725 if let Some(entry) = self.link_table.get_mut(&ctx.packet.destination_hash) {
726 entry.timestamp = ctx.now;
727 }
728 }
729
730 fn handle_inbound_announce(
731 &mut self,
732 ctx: &InboundPacketCtx,
733 rng: &mut dyn Rng,
734 announce_queue: Option<&mut AnnounceVerifyQueue>,
735 actions: &mut Vec<TransportAction>,
736 ) {
737 if ctx.packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
738 return;
739 }
740
741 if let Some(queue) = announce_queue {
742 self.try_enqueue_announce(ctx, rng, queue, actions);
743 } else {
744 let original_raw = ctx
745 .original_raw
746 .as_deref()
747 .expect("announce packets retain original raw bytes");
748 self.process_inbound_announce(
749 &ctx.packet,
750 original_raw,
751 ctx.iface,
752 ctx.now,
753 rng,
754 actions,
755 );
756 }
757 }
758
759 fn handle_inbound_local_delivery(
760 &self,
761 ctx: &InboundPacketCtx,
762 actions: &mut Vec<TransportAction>,
763 ) {
764 if (ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
765 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA)
766 && self
767 .local_destinations
768 .contains_key(&ctx.packet.destination_hash)
769 {
770 actions.push(TransportAction::DeliverLocal {
771 destination_hash: ctx.packet.destination_hash,
772 raw: PacketBytes::from(ctx.packet.raw.clone()),
773 packet_hash: ctx.packet.packet_hash,
774 receiving_interface: ctx.iface,
775 });
776 }
777 }
778
779 fn process_inbound_announce(
784 &mut self,
785 packet: &RawPacket,
786 original_raw: &[u8],
787 iface: InterfaceId,
788 now: f64,
789 rng: &mut dyn Rng,
790 actions: &mut Vec<TransportAction>,
791 ) {
792 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
793 return;
794 }
795
796 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
797
798 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
800 Ok(a) => a,
801 Err(_) => return,
802 };
803
804 let sig_cache_key =
805 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
806
807 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
808 announce.to_validated_unchecked()
809 } else {
810 match announce.validate(&packet.destination_hash) {
811 Ok(v) => {
812 self.announce_sig_cache.insert(sig_cache_key, now);
813 v
814 }
815 Err(_) => return,
816 }
817 };
818
819 let received_from = self.announce_received_from(packet, now);
820 let random_blob = match extract_random_blob(&packet.data) {
821 Some(b) => b,
822 None => return,
823 };
824 let announce_emitted = timebase_from_random_blob(&random_blob);
825
826 self.process_verified_announce(
827 VerifiedAnnounceCtx {
828 packet,
829 original_raw,
830 iface,
831 now,
832 validated,
833 received_from,
834 random_blob,
835 announce_emitted,
836 },
837 rng,
838 actions,
839 );
840 }
841
842 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
843 let mut material = [0u8; 80];
844 material[..16].copy_from_slice(&destination_hash);
845 material[16..].copy_from_slice(signature);
846 hash::full_hash(&material)
847 }
848
849 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
850 if let Some(transport_id) = packet.transport_id {
851 if self.config.transport_enabled {
852 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
853 {
854 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
855 announce_entry.local_rebroadcasts += 1;
856 if announce_entry.retries > 0
857 && announce_entry.local_rebroadcasts
858 >= constants::LOCAL_REBROADCASTS_MAX
859 {
860 self.announce_table.remove(&packet.destination_hash);
861 }
862 }
863 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
864 {
865 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
866 && announce_entry.retries > 0
867 && now < announce_entry.retransmit_timeout
868 {
869 self.announce_table.remove(&packet.destination_hash);
870 }
871 }
872 }
873 }
874 transport_id
875 } else {
876 packet.destination_hash
877 }
878 }
879
880 fn should_hold_announce(
881 &mut self,
882 packet: &RawPacket,
883 original_raw: &[u8],
884 iface: InterfaceId,
885 now: f64,
886 ) -> bool {
887 if self.has_path(&packet.destination_hash) {
888 return false;
889 }
890 if self
891 .discovery_path_requests
892 .contains_key(&packet.destination_hash)
893 {
894 return false;
895 }
896 let Some(info) = self.interfaces.get(&iface) else {
897 return false;
898 };
899 if packet.context == constants::CONTEXT_PATH_RESPONSE
900 || !self.ingress_control.should_ingress_limit(
901 iface,
902 &info.ingress_control,
903 info.ia_freq,
904 info.started,
905 now,
906 )
907 {
908 return false;
909 }
910 self.ingress_control.hold_announce(
911 iface,
912 &info.ingress_control,
913 packet.destination_hash,
914 ingress_control::HeldAnnounce {
915 raw: original_raw.to_vec(),
916 hops: packet.hops,
917 receiving_interface: iface,
918 timestamp: now,
919 },
920 );
921 true
922 }
923
924 fn try_enqueue_announce(
925 &mut self,
926 ctx: &InboundPacketCtx,
927 rng: &mut dyn Rng,
928 announce_queue: &mut AnnounceVerifyQueue,
929 actions: &mut Vec<TransportAction>,
930 ) {
931 if ctx.packet.flags.destination_type != constants::DESTINATION_SINGLE {
932 return;
933 }
934
935 let has_ratchet = ctx.packet.flags.context_flag == constants::FLAG_SET;
936 let announce = match AnnounceData::unpack(&ctx.packet.data, has_ratchet) {
937 Ok(a) => a,
938 Err(_) => return,
939 };
940
941 let received_from = self.announce_received_from(&ctx.packet, ctx.now);
942
943 if self
944 .local_destinations
945 .contains_key(&ctx.packet.destination_hash)
946 {
947 log::debug!(
948 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
949 ctx.packet.destination_hash[0],
950 ctx.packet.destination_hash[1],
951 ctx.packet.destination_hash[2],
952 ctx.packet.destination_hash[3],
953 );
954 return;
955 }
956
957 let original_raw = ctx
958 .original_raw
959 .as_deref()
960 .expect("announce packets retain original raw bytes");
961 if self.should_hold_announce(&ctx.packet, original_raw, ctx.iface, ctx.now) {
962 return;
963 }
964
965 let sig_cache_key =
966 Self::announce_sig_cache_key(ctx.packet.destination_hash, &announce.signature);
967 if self.announce_sig_cache.contains(&sig_cache_key) {
968 let validated = announce.to_validated_unchecked();
969 let random_blob = match extract_random_blob(&ctx.packet.data) {
970 Some(b) => b,
971 None => return,
972 };
973 let announce_emitted = timebase_from_random_blob(&random_blob);
974 self.process_verified_announce(
975 VerifiedAnnounceCtx {
976 packet: &ctx.packet,
977 original_raw,
978 iface: ctx.iface,
979 now: ctx.now,
980 validated,
981 received_from,
982 random_blob,
983 announce_emitted,
984 },
985 rng,
986 actions,
987 );
988 return;
989 }
990
991 if ctx.packet.context == constants::CONTEXT_PATH_RESPONSE {
992 let Ok(validated) = announce.validate(&ctx.packet.destination_hash) else {
993 return;
994 };
995 self.announce_sig_cache.insert(sig_cache_key, ctx.now);
996 let random_blob = match extract_random_blob(&ctx.packet.data) {
997 Some(b) => b,
998 None => return,
999 };
1000 let announce_emitted = timebase_from_random_blob(&random_blob);
1001 self.process_verified_announce(
1002 VerifiedAnnounceCtx {
1003 packet: &ctx.packet,
1004 original_raw,
1005 iface: ctx.iface,
1006 now: ctx.now,
1007 validated,
1008 received_from,
1009 random_blob,
1010 announce_emitted,
1011 },
1012 rng,
1013 actions,
1014 );
1015 return;
1016 }
1017
1018 let random_blob = match extract_random_blob(&ctx.packet.data) {
1019 Some(b) => b,
1020 None => return,
1021 };
1022 let announce_emitted = timebase_from_random_blob(&random_blob);
1023 let key = AnnounceVerifyKey {
1024 destination_hash: ctx.packet.destination_hash,
1025 random_blob,
1026 received_from,
1027 };
1028 let pending = PendingAnnounce {
1029 original_raw: original_raw.to_vec(),
1030 packet: ctx.packet.clone(),
1031 interface: ctx.iface,
1032 received_from,
1033 queued_at: ctx.now,
1034 best_hops: ctx.packet.hops,
1035 emission_ts: announce_emitted,
1036 random_blob,
1037 };
1038 let _ = announce_queue.enqueue(key, pending);
1039 }
1040
1041 pub fn complete_verified_announce(
1042 &mut self,
1043 pending: PendingAnnounce,
1044 validated: crate::announce::ValidatedAnnounce,
1045 sig_cache_key: [u8; 32],
1046 now: f64,
1047 rng: &mut dyn Rng,
1048 ) -> Vec<TransportAction> {
1049 self.announce_sig_cache.insert(sig_cache_key, now);
1050 let mut actions = Vec::new();
1051 self.process_verified_announce(
1052 VerifiedAnnounceCtx {
1053 packet: &pending.packet,
1054 original_raw: &pending.original_raw,
1055 iface: pending.interface,
1056 now,
1057 validated,
1058 received_from: pending.received_from,
1059 random_blob: pending.random_blob,
1060 announce_emitted: pending.emission_ts,
1061 },
1062 rng,
1063 &mut actions,
1064 );
1065 actions
1066 }
1067
1068 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1069
1070 fn process_verified_announce(
1071 &mut self,
1072 ctx: VerifiedAnnounceCtx<'_>,
1073 rng: &mut dyn Rng,
1074 actions: &mut Vec<TransportAction>,
1075 ) {
1076 if self.is_blackholed(&ctx.validated.identity_hash, ctx.now) {
1077 return;
1078 }
1079 if ctx.packet.hops > constants::PATHFINDER_M {
1080 return;
1081 }
1082
1083 let existing_set = self.path_table.get(&ctx.packet.destination_hash);
1084 let was_unknown_destination = existing_set.is_none_or(|ps| ps.is_empty());
1085
1086 if was_unknown_destination {
1089 self.path_states.remove(&ctx.packet.destination_hash);
1090 }
1091
1092 let is_unresponsive = self.path_is_unresponsive(&ctx.packet.destination_hash);
1094
1095 let mp_decision = decide_announce_multipath(
1096 existing_set,
1097 ctx.packet.hops,
1098 ctx.announce_emitted,
1099 &ctx.random_blob,
1100 &ctx.received_from,
1101 is_unresponsive,
1102 ctx.now,
1103 self.config.prefer_shorter_path,
1104 );
1105
1106 if mp_decision == MultiPathDecision::Reject {
1107 log::debug!(
1108 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1109 ctx.packet.destination_hash[0],
1110 ctx.packet.destination_hash[1],
1111 ctx.packet.destination_hash[2],
1112 ctx.packet.destination_hash[3],
1113 );
1114 return;
1115 }
1116
1117 let rate_blocked = if ctx.packet.context != constants::CONTEXT_PATH_RESPONSE {
1119 if let Some(iface_info) = self.interfaces.get(&ctx.iface) {
1120 self.rate_limiter.check_and_update(
1121 &ctx.packet.destination_hash,
1122 ctx.now,
1123 iface_info.announce_rate_target,
1124 iface_info.announce_rate_grace,
1125 iface_info.announce_rate_penalty,
1126 )
1127 } else {
1128 false
1129 }
1130 } else {
1131 false
1132 };
1133
1134 let interface_mode = self
1136 .interfaces
1137 .get(&ctx.iface)
1138 .map(|i| i.mode)
1139 .unwrap_or(constants::MODE_FULL);
1140
1141 let expires = compute_path_expires(ctx.now, interface_mode);
1142
1143 let existing_blobs = self
1145 .path_table
1146 .get(&ctx.packet.destination_hash)
1147 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1148 .map(|e| e.random_blobs.clone())
1149 .unwrap_or_default();
1150
1151 let mut rng_bytes = [0u8; 8];
1153 rng.fill_bytes(&mut rng_bytes);
1154 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1155
1156 let is_path_response = ctx.packet.context == constants::CONTEXT_PATH_RESPONSE;
1157
1158 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1159 ctx.packet.destination_hash,
1160 ctx.packet.hops,
1161 &ctx.packet.data,
1162 &ctx.packet.raw,
1163 ctx.packet.packet_hash,
1164 ctx.packet.flags.context_flag,
1165 ctx.received_from,
1166 ctx.iface,
1167 ctx.now,
1168 existing_blobs,
1169 ctx.random_blob,
1170 expires,
1171 rng_value,
1172 self.config.transport_enabled,
1173 is_path_response,
1174 rate_blocked,
1175 Some(ctx.original_raw.to_vec()),
1176 );
1177
1178 actions.push(TransportAction::CacheAnnounce {
1180 packet_hash: ctx.packet.packet_hash,
1181 raw: ctx.original_raw.to_vec().into(),
1182 });
1183
1184 self.upsert_path_destination(ctx.packet.destination_hash, path_entry, ctx.now);
1186
1187 if let Some(tunnel_id) = self.interfaces.get(&ctx.iface).and_then(|i| i.tunnel_id) {
1189 let blobs = self
1190 .path_table
1191 .get(&ctx.packet.destination_hash)
1192 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1193 .map(|e| e.random_blobs.clone())
1194 .unwrap_or_default();
1195 self.tunnel_table.store_tunnel_path(
1196 &tunnel_id,
1197 ctx.packet.destination_hash,
1198 tunnel::TunnelPath {
1199 timestamp: ctx.now,
1200 received_from: ctx.received_from,
1201 hops: ctx.packet.hops,
1202 expires,
1203 random_blobs: blobs,
1204 packet_hash: ctx.packet.packet_hash,
1205 },
1206 ctx.now,
1207 self.config.destination_timeout_secs,
1208 self.config.max_tunnel_destinations_total,
1209 );
1210 }
1211
1212 self.path_states.remove(&ctx.packet.destination_hash);
1215
1216 if let Some(ann) = announce_entry {
1218 self.insert_announce_entry(ctx.packet.destination_hash, ann, ctx.now);
1219 }
1220
1221 actions.push(TransportAction::AnnounceReceived {
1223 destination_hash: ctx.packet.destination_hash,
1224 identity_hash: ctx.validated.identity_hash,
1225 public_key: ctx.validated.public_key,
1226 name_hash: ctx.validated.name_hash,
1227 random_hash: ctx.validated.random_hash,
1228 ratchet: ctx.validated.ratchet,
1229 app_data: ctx.validated.app_data,
1230 hops: ctx.packet.hops,
1231 receiving_interface: ctx.iface,
1232 });
1233
1234 actions.push(TransportAction::PathUpdated {
1235 destination_hash: ctx.packet.destination_hash,
1236 hops: ctx.packet.hops,
1237 next_hop: ctx.received_from,
1238 interface: ctx.iface,
1239 });
1240
1241 if self.has_local_clients() {
1243 actions.push(TransportAction::ForwardToLocalClients {
1244 raw: PacketBytes::from(ctx.packet.raw.clone()),
1245 exclude: Some(ctx.iface),
1246 });
1247 }
1248
1249 if let Some(pr_entry) = self.discovery_path_requests_waiting(&ctx.packet.destination_hash) {
1251 let entry = AnnounceEntry {
1253 timestamp: ctx.now,
1254 retransmit_timeout: ctx.now,
1255 retries: constants::PATHFINDER_R,
1256 received_from: ctx.received_from,
1257 hops: ctx.packet.hops,
1258 packet_raw: ctx.packet.raw.clone(),
1259 packet_data: ctx.packet.data.clone(),
1260 destination_hash: ctx.packet.destination_hash,
1261 context_flag: ctx.packet.flags.context_flag,
1262 local_rebroadcasts: 0,
1263 block_rebroadcasts: true,
1264 attached_interface: Some(pr_entry),
1265 };
1266 self.insert_announce_entry(ctx.packet.destination_hash, entry, ctx.now);
1267 }
1268 }
1269
1270 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1271 self.announce_sig_cache.contains(sig_cache_key)
1272 }
1273
1274 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1277 self.discovery_path_requests
1278 .remove(dest_hash)
1279 .map(|req| req.requesting_interface)
1280 }
1281
1282 fn process_inbound_proof(
1287 &mut self,
1288 packet: &RawPacket,
1289 iface: InterfaceId,
1290 _now: f64,
1291 actions: &mut Vec<TransportAction>,
1292 ) {
1293 if packet.context == constants::CONTEXT_LRPROOF {
1294 if (self.config.transport_enabled)
1296 && self.link_table.contains_key(&packet.destination_hash)
1297 {
1298 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1299 if let Some(entry) = link_entry {
1300 if let Some((outbound_interface, new_raw)) =
1301 route_via_link_table(packet, &entry, iface)
1302 {
1303 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1308 le.validated = true;
1309 }
1310
1311 actions.push(TransportAction::LinkEstablished {
1312 link_id: packet.destination_hash,
1313 interface: outbound_interface,
1314 });
1315
1316 actions.push(TransportAction::SendOnInterface {
1317 interface: outbound_interface,
1318 raw: new_raw.into(),
1319 });
1320 }
1321 }
1322 } else {
1323 actions.push(TransportAction::DeliverLocal {
1325 destination_hash: packet.destination_hash,
1326 raw: PacketBytes::from(packet.raw.clone()),
1327 packet_hash: packet.packet_hash,
1328 receiving_interface: iface,
1329 });
1330 }
1331 } else {
1332 if self.config.transport_enabled {
1334 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1335 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1336 actions.push(action);
1337 }
1338 }
1339 }
1340
1341 actions.push(TransportAction::DeliverLocal {
1343 destination_hash: packet.destination_hash,
1344 raw: PacketBytes::from(packet.raw.clone()),
1345 packet_hash: packet.packet_hash,
1346 receiving_interface: iface,
1347 });
1348 }
1349 }
1350
1351 pub fn handle_outbound(
1357 &mut self,
1358 packet: &RawPacket,
1359 dest_type: u8,
1360 attached_interface: Option<InterfaceId>,
1361 now: f64,
1362 ) -> Vec<TransportAction> {
1363 let actions = route_outbound(
1364 &self.path_table,
1365 &self.interfaces,
1366 &self.local_destinations,
1367 packet,
1368 dest_type,
1369 attached_interface,
1370 now,
1371 );
1372
1373 self.packet_hashlist.add(packet.packet_hash);
1375
1376 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1378 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1379 } else {
1380 actions
1381 }
1382 }
1383
1384 fn gate_announce_actions(
1386 &mut self,
1387 actions: Vec<TransportAction>,
1388 dest_hash: &[u8; 16],
1389 hops: u8,
1390 now: f64,
1391 ) -> Vec<TransportAction> {
1392 let mut result = Vec::new();
1393 for action in actions {
1394 match action {
1395 TransportAction::SendOnInterface { interface, raw } => {
1396 let (bitrate, airtime_profile, announce_cap) =
1397 if let Some(info) = self.interfaces.get(&interface) {
1398 (info.bitrate, info.airtime_profile, info.announce_cap)
1399 } else {
1400 (None, None, constants::ANNOUNCE_CAP)
1401 };
1402 if let Some(send_action) = self.announce_queues.gate_announce(
1403 interface,
1404 raw,
1405 *dest_hash,
1406 hops,
1407 now,
1408 now,
1409 bitrate,
1410 airtime_profile,
1411 announce_cap,
1412 ) {
1413 result.push(send_action);
1414 }
1415 }
1417 other => result.push(other),
1418 }
1419 }
1420 result
1421 }
1422
1423 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1429 let mut ctx = TickCtx {
1430 now,
1431 rng,
1432 actions: Vec::new(),
1433 };
1434 self.process_tick_pending_announces(&mut ctx);
1435
1436 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1437 ctx.actions.append(&mut queue_actions);
1438
1439 self.process_tick_ingress_release(&mut ctx);
1440 self.cull_tick_tables(&mut ctx);
1441 ctx.actions
1442 }
1443
1444 fn process_tick_pending_announces(&mut self, ctx: &mut TickCtx<'_>) {
1445 if ctx.now <= self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1446 return;
1447 }
1448
1449 self.cull_expired_announce_entries(ctx.now);
1450 self.enforce_announce_retention_cap(ctx.now);
1451 if let Some(identity_hash) = self.config.identity_hash {
1452 let announce_actions = jobs::process_pending_announces(
1453 &mut self.announce_table,
1454 &mut self.held_announces,
1455 &identity_hash,
1456 ctx.now,
1457 );
1458 let gated = self.gate_retransmit_actions(announce_actions, ctx.now);
1459 ctx.actions.extend(gated);
1460 }
1461 self.cull_expired_announce_entries(ctx.now);
1462 self.enforce_announce_retention_cap(ctx.now);
1463 self.announces_last_checked = ctx.now;
1464 }
1465
1466 fn process_tick_ingress_release(&mut self, ctx: &mut TickCtx<'_>) {
1467 let ic_interfaces = self.ingress_control.interfaces_with_held();
1468 for iface_id in ic_interfaces {
1469 let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1470 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1471 None => continue,
1472 };
1473 if !ingress_config.enabled {
1474 continue;
1475 }
1476 if let Some(held) = self.ingress_control.process_held_announces(
1477 iface_id,
1478 &ingress_config,
1479 ia_freq,
1480 started,
1481 ctx.now,
1482 ) {
1483 let released_actions =
1484 self.handle_inbound(&held.raw, held.receiving_interface, ctx.now, ctx.rng);
1485 ctx.actions.extend(released_actions);
1486 }
1487 }
1488 }
1489
1490 fn cull_tick_tables(&mut self, ctx: &mut TickCtx<'_>) {
1491 if ctx.now <= self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1492 return;
1493 }
1494
1495 jobs::cull_path_table(&mut self.path_table, &self.interfaces, ctx.now);
1496 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, ctx.now);
1497 let (_culled, link_closed_actions) =
1498 jobs::cull_link_table(&mut self.link_table, &self.interfaces, ctx.now);
1499 ctx.actions.extend(link_closed_actions);
1500 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1501 self.cull_blackholed(ctx.now);
1502 self.discovery_path_requests
1503 .retain(|_, req| ctx.now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1504 self.tunnel_table
1505 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1506 self.tunnel_table.cull(ctx.now);
1507 self.announce_sig_cache.cull(ctx.now);
1508 self.tables_last_culled = ctx.now;
1509 }
1510
1511 fn gate_retransmit_actions(
1516 &mut self,
1517 actions: Vec<TransportAction>,
1518 now: f64,
1519 ) -> Vec<TransportAction> {
1520 let mut result = Vec::new();
1521 for action in actions {
1522 match action {
1523 TransportAction::SendOnInterface { interface, raw } => {
1524 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1526 let (bitrate, airtime_profile, announce_cap) =
1527 if let Some(info) = self.interfaces.get(&interface) {
1528 (info.bitrate, info.airtime_profile, info.announce_cap)
1529 } else {
1530 (None, None, constants::ANNOUNCE_CAP)
1531 };
1532 if let Some(send_action) = self.announce_queues.gate_announce(
1533 interface,
1534 raw,
1535 dest_hash,
1536 hops,
1537 now,
1538 now,
1539 bitrate,
1540 airtime_profile,
1541 announce_cap,
1542 ) {
1543 result.push(send_action);
1544 }
1545 }
1546 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1547 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1548 let iface_ids: Vec<(
1551 InterfaceId,
1552 Option<u64>,
1553 Option<types::AirtimeProfile>,
1554 f64,
1555 )> = self
1556 .interfaces
1557 .iter()
1558 .filter(|(_, info)| info.out_capable)
1559 .filter(|(id, _)| {
1560 if let Some(ref ex) = exclude {
1561 **id != *ex
1562 } else {
1563 true
1564 }
1565 })
1566 .filter(|(_, info)| {
1567 should_transmit_announce(
1568 info,
1569 &dest_hash,
1570 hops,
1571 &self.local_destinations,
1572 &self.path_table,
1573 &self.interfaces,
1574 )
1575 })
1576 .map(|(id, info)| {
1577 (*id, info.bitrate, info.airtime_profile, info.announce_cap)
1578 })
1579 .collect();
1580
1581 for (iface_id, bitrate, airtime_profile, announce_cap) in iface_ids {
1582 if let Some(send_action) = self.announce_queues.gate_announce(
1583 iface_id,
1584 raw.clone(),
1585 dest_hash,
1586 hops,
1587 now,
1588 now,
1589 bitrate,
1590 airtime_profile,
1591 announce_cap,
1592 ) {
1593 result.push(send_action);
1594 }
1595 }
1596 }
1597 other => result.push(other),
1598 }
1599 }
1600 result
1601 }
1602
1603 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1605 if raw.len() < 18 {
1606 return ([0; 16], 0);
1607 }
1608 let header_type = (raw[0] >> 6) & 0x03;
1609 let hops = raw[1];
1610 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1611 let mut dest = [0u8; 16];
1613 dest.copy_from_slice(&raw[18..34]);
1614 (dest, hops)
1615 } else {
1616 let mut dest = [0u8; 16];
1618 dest.copy_from_slice(&raw[2..18]);
1619 (dest, hops)
1620 }
1621 }
1622
1623 #[cfg(test)]
1624 #[allow(dead_code)]
1625 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
1626 &self.link_table
1627 }
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use super::*;
1633 use crate::packet::PacketFlags;
1634
1635 fn make_config(transport_enabled: bool) -> TransportConfig {
1636 TransportConfig {
1637 transport_enabled,
1638 identity_hash: if transport_enabled {
1639 Some([0x42; 16])
1640 } else {
1641 None
1642 },
1643 prefer_shorter_path: false,
1644 max_paths_per_destination: 1,
1645 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
1646 max_discovery_pr_tags: constants::MAX_PR_TAGS,
1647 max_path_destinations: usize::MAX,
1648 max_tunnel_destinations_total: usize::MAX,
1649 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
1650 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
1651 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
1652 announce_sig_cache_enabled: true,
1653 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
1654 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
1655 announce_queue_max_entries: 256,
1656 announce_queue_max_interfaces: 1024,
1657 }
1658 }
1659
1660 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
1661 InterfaceInfo {
1662 id: InterfaceId(id),
1663 name: String::from("test"),
1664 mode,
1665 out_capable: true,
1666 in_capable: true,
1667 bitrate: None,
1668 airtime_profile: None,
1669 announce_rate_target: None,
1670 announce_rate_grace: 0,
1671 announce_rate_penalty: 0.0,
1672 announce_cap: constants::ANNOUNCE_CAP,
1673 is_local_client: false,
1674 wants_tunnel: false,
1675 tunnel_id: None,
1676 mtu: constants::MTU as u32,
1677 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
1678 ia_freq: 0.0,
1679 started: 0.0,
1680 }
1681 }
1682
1683 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
1684 AnnounceEntry {
1685 timestamp,
1686 retransmit_timeout: timestamp,
1687 retries: 0,
1688 received_from: [0xAA; 16],
1689 hops: 2,
1690 packet_raw: vec![0x01; fill_len],
1691 packet_data: vec![0x02; fill_len],
1692 destination_hash: dest_hash,
1693 context_flag: 0,
1694 local_rebroadcasts: 0,
1695 block_rebroadcasts: false,
1696 attached_interface: None,
1697 }
1698 }
1699
1700 fn make_path_entry(
1701 timestamp: f64,
1702 hops: u8,
1703 receiving_interface: InterfaceId,
1704 next_hop: [u8; 16],
1705 ) -> PathEntry {
1706 PathEntry {
1707 timestamp,
1708 next_hop,
1709 hops,
1710 expires: timestamp + 10_000.0,
1711 random_blobs: Vec::new(),
1712 receiving_interface,
1713 packet_hash: [0; 32],
1714 announce_raw: None,
1715 }
1716 }
1717
1718 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
1719 let mut unique_tag = [0u8; 32];
1720 let tag_len = tag.len().min(16);
1721 unique_tag[..16].copy_from_slice(&dest_hash);
1722 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1723 unique_tag
1724 }
1725
1726 fn make_random_blob(timebase: u64) -> [u8; 10] {
1727 let mut blob = [0u8; 10];
1728 let bytes = timebase.to_be_bytes();
1729 blob[5..10].copy_from_slice(&bytes[3..8]);
1730 blob
1731 }
1732
1733 #[test]
1734 fn test_empty_engine() {
1735 let engine = TransportEngine::new(make_config(false));
1736 assert!(!engine.has_path(&[0; 16]));
1737 assert!(engine.hops_to(&[0; 16]).is_none());
1738 assert!(engine.next_hop(&[0; 16]).is_none());
1739 }
1740
1741 #[test]
1742 fn test_register_deregister_interface() {
1743 let mut engine = TransportEngine::new(make_config(false));
1744 engine.register_interface(make_interface(1, constants::MODE_FULL));
1745 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
1746
1747 engine.deregister_interface(InterfaceId(1));
1748 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
1749 }
1750
1751 #[test]
1752 fn test_deregister_interface_removes_announce_queue_state() {
1753 let mut engine = TransportEngine::new(make_config(false));
1754 engine.register_interface(make_interface(1, constants::MODE_FULL));
1755
1756 let _ = engine.announce_queues.gate_announce(
1757 InterfaceId(1),
1758 vec![0x01; 100].into(),
1759 [0xAA; 16],
1760 2,
1761 0.0,
1762 0.0,
1763 Some(1000),
1764 None,
1765 constants::ANNOUNCE_CAP,
1766 );
1767 let _ = engine.announce_queues.gate_announce(
1768 InterfaceId(1),
1769 vec![0x02; 100].into(),
1770 [0xBB; 16],
1771 3,
1772 0.0,
1773 0.0,
1774 Some(1000),
1775 None,
1776 constants::ANNOUNCE_CAP,
1777 );
1778 assert_eq!(engine.announce_queue_count(), 1);
1779
1780 engine.deregister_interface(InterfaceId(1));
1781 assert_eq!(engine.announce_queue_count(), 0);
1782 }
1783
1784 #[test]
1785 fn test_deregister_interface_preserves_other_announce_queues() {
1786 let mut engine = TransportEngine::new(make_config(false));
1787 engine.register_interface(make_interface(1, constants::MODE_FULL));
1788 engine.register_interface(make_interface(2, constants::MODE_FULL));
1789
1790 let _ = engine.announce_queues.gate_announce(
1791 InterfaceId(1),
1792 vec![0x01; 100].into(),
1793 [0xAA; 16],
1794 2,
1795 0.0,
1796 0.0,
1797 Some(1000),
1798 None,
1799 constants::ANNOUNCE_CAP,
1800 );
1801 let _ = engine.announce_queues.gate_announce(
1802 InterfaceId(1),
1803 vec![0x02; 100].into(),
1804 [0xAB; 16],
1805 3,
1806 0.0,
1807 0.0,
1808 Some(1000),
1809 None,
1810 constants::ANNOUNCE_CAP,
1811 );
1812 let _ = engine.announce_queues.gate_announce(
1813 InterfaceId(2),
1814 vec![0x03; 100].into(),
1815 [0xBA; 16],
1816 2,
1817 0.0,
1818 0.0,
1819 Some(1000),
1820 None,
1821 constants::ANNOUNCE_CAP,
1822 );
1823 let _ = engine.announce_queues.gate_announce(
1824 InterfaceId(2),
1825 vec![0x04; 100].into(),
1826 [0xBB; 16],
1827 3,
1828 0.0,
1829 0.0,
1830 Some(1000),
1831 None,
1832 constants::ANNOUNCE_CAP,
1833 );
1834
1835 engine.deregister_interface(InterfaceId(1));
1836 assert_eq!(engine.announce_queue_count(), 1);
1837 assert_eq!(engine.nonempty_announce_queue_count(), 1);
1838 }
1839
1840 #[test]
1841 fn test_register_deregister_destination() {
1842 let mut engine = TransportEngine::new(make_config(false));
1843 let dest = [0x11; 16];
1844 engine.register_destination(dest, constants::DESTINATION_SINGLE);
1845 assert!(engine.local_destinations.contains_key(&dest));
1846
1847 engine.deregister_destination(&dest);
1848 assert!(!engine.local_destinations.contains_key(&dest));
1849 }
1850
1851 #[test]
1852 fn test_path_state() {
1853 let mut engine = TransportEngine::new(make_config(false));
1854 let dest = [0x22; 16];
1855
1856 assert!(!engine.path_is_unresponsive(&dest));
1857
1858 engine.mark_path_unresponsive(&dest, None);
1859 assert!(engine.path_is_unresponsive(&dest));
1860
1861 engine.mark_path_responsive(&dest);
1862 assert!(!engine.path_is_unresponsive(&dest));
1863 }
1864
1865 #[test]
1866 fn test_announce_clears_stale_path_state_for_unknown_destination() {
1867 use crate::announce::AnnounceData;
1868 use crate::destination::{destination_hash, name_hash};
1869
1870 let mut engine = TransportEngine::new(make_config(false));
1871 engine.register_interface(make_interface(1, constants::MODE_FULL));
1872
1873 let identity =
1874 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x61; 32]));
1875 let dest_hash = destination_hash("pathfix", &["announce"], Some(identity.hash()));
1876 let name_h = name_hash("pathfix", &["announce"]);
1877 let random_hash = [0x24u8; 10];
1878
1879 let (announce_data, _) =
1880 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
1881
1882 let packet = RawPacket::pack(
1883 PacketFlags {
1884 header_type: constants::HEADER_1,
1885 context_flag: constants::FLAG_UNSET,
1886 transport_type: constants::TRANSPORT_BROADCAST,
1887 destination_type: constants::DESTINATION_SINGLE,
1888 packet_type: constants::PACKET_TYPE_ANNOUNCE,
1889 },
1890 0,
1891 &dest_hash,
1892 None,
1893 constants::CONTEXT_NONE,
1894 &announce_data,
1895 )
1896 .unwrap();
1897
1898 engine.mark_path_unresponsive(&dest_hash, None);
1899 assert!(engine.path_is_unresponsive(&dest_hash));
1900 assert!(!engine.has_path(&dest_hash));
1901
1902 let mut rng = rns_crypto::FixedRng::new(&[0x62; 32]);
1903 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
1904
1905 assert!(engine.has_path(&dest_hash));
1906 assert!(
1907 !engine.path_is_unresponsive(&dest_hash),
1908 "stale path state should be cleared for newly installed paths"
1909 );
1910 assert!(actions.iter().any(|action| matches!(
1911 action,
1912 TransportAction::PathUpdated {
1913 destination_hash,
1914 interface,
1915 ..
1916 } if *destination_hash == dest_hash && *interface == InterfaceId(1)
1917 )));
1918 }
1919
1920 #[test]
1921 fn test_boundary_exempts_unresponsive() {
1922 let mut engine = TransportEngine::new(make_config(false));
1923 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
1924 let dest = [0xB1; 16];
1925
1926 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
1928 assert!(!engine.path_is_unresponsive(&dest));
1929 }
1930
1931 #[test]
1932 fn test_non_boundary_marks_unresponsive() {
1933 let mut engine = TransportEngine::new(make_config(false));
1934 engine.register_interface(make_interface(1, constants::MODE_FULL));
1935 let dest = [0xB2; 16];
1936
1937 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
1939 assert!(engine.path_is_unresponsive(&dest));
1940 }
1941
1942 #[test]
1943 fn test_expire_path() {
1944 let mut engine = TransportEngine::new(make_config(false));
1945 let dest = [0x33; 16];
1946
1947 engine.path_table.insert(
1948 dest,
1949 PathSet::from_single(
1950 PathEntry {
1951 timestamp: 1000.0,
1952 next_hop: [0; 16],
1953 hops: 2,
1954 expires: 9999.0,
1955 random_blobs: Vec::new(),
1956 receiving_interface: InterfaceId(1),
1957 packet_hash: [0; 32],
1958 announce_raw: None,
1959 },
1960 1,
1961 ),
1962 );
1963
1964 assert!(engine.has_path(&dest));
1965 engine.expire_path(&dest);
1966 assert!(engine.has_path(&dest));
1968 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
1969 }
1970
1971 #[test]
1972 fn test_link_table_operations() {
1973 let mut engine = TransportEngine::new(make_config(false));
1974 let link_id = [0x44; 16];
1975
1976 engine.register_link(
1977 link_id,
1978 LinkEntry {
1979 timestamp: 100.0,
1980 next_hop_transport_id: [0; 16],
1981 next_hop_interface: InterfaceId(1),
1982 remaining_hops: 3,
1983 received_interface: InterfaceId(2),
1984 taken_hops: 2,
1985 destination_hash: [0xAA; 16],
1986 validated: false,
1987 proof_timeout: 200.0,
1988 },
1989 );
1990
1991 assert!(engine.link_table.contains_key(&link_id));
1992 assert!(!engine.link_table[&link_id].validated);
1993
1994 engine.validate_link(&link_id);
1995 assert!(engine.link_table[&link_id].validated);
1996
1997 engine.remove_link(&link_id);
1998 assert!(!engine.link_table.contains_key(&link_id));
1999 }
2000
2001 #[test]
2002 fn test_lrproof_routes_from_originating_side_via_link_table() {
2003 let mut engine = TransportEngine::new(make_config(true));
2004 engine.register_interface(make_interface(1, constants::MODE_FULL));
2005 engine.register_interface(make_interface(2, constants::MODE_FULL));
2006
2007 let link_id = [0x44; 16];
2008 engine.register_link(
2009 link_id,
2010 LinkEntry {
2011 timestamp: 100.0,
2012 next_hop_transport_id: [0xAA; 16],
2013 next_hop_interface: InterfaceId(2),
2014 remaining_hops: 3,
2015 received_interface: InterfaceId(1),
2016 taken_hops: 1,
2017 destination_hash: [0xBB; 16],
2018 validated: false,
2019 proof_timeout: 200.0,
2020 },
2021 );
2022
2023 let flags = PacketFlags {
2024 header_type: constants::HEADER_1,
2025 context_flag: constants::FLAG_UNSET,
2026 transport_type: constants::TRANSPORT_BROADCAST,
2027 destination_type: constants::DESTINATION_LINK,
2028 packet_type: constants::PACKET_TYPE_PROOF,
2029 };
2030 let packet = RawPacket::pack(
2031 flags,
2032 0,
2033 &link_id,
2034 None,
2035 constants::CONTEXT_LRPROOF,
2036 &[0xCC; 64],
2037 )
2038 .unwrap();
2039 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2040
2041 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 101.0, &mut rng);
2042
2043 assert!(matches!(
2044 engine
2045 .link_table_ref()
2046 .get(&link_id)
2047 .map(|entry| entry.validated),
2048 Some(true)
2049 ));
2050 assert!(actions.iter().any(|action| matches!(
2051 action,
2052 TransportAction::LinkEstablished {
2053 link_id: established,
2054 interface: InterfaceId(2),
2055 } if *established == link_id
2056 )));
2057 assert!(actions.iter().any(|action| matches!(
2058 action,
2059 TransportAction::SendOnInterface {
2060 interface: InterfaceId(2),
2061 ..
2062 }
2063 )));
2064 }
2065
2066 #[test]
2067 fn test_packet_filter_drops_plain_announce() {
2068 let engine = TransportEngine::new(make_config(false));
2069 let flags = PacketFlags {
2070 header_type: constants::HEADER_1,
2071 context_flag: constants::FLAG_UNSET,
2072 transport_type: constants::TRANSPORT_BROADCAST,
2073 destination_type: constants::DESTINATION_PLAIN,
2074 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2075 };
2076 let packet =
2077 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2078 assert!(!engine.packet_filter(&packet));
2079 }
2080
2081 #[test]
2082 fn test_packet_filter_allows_keepalive() {
2083 let engine = TransportEngine::new(make_config(false));
2084 let flags = PacketFlags {
2085 header_type: constants::HEADER_1,
2086 context_flag: constants::FLAG_UNSET,
2087 transport_type: constants::TRANSPORT_BROADCAST,
2088 destination_type: constants::DESTINATION_SINGLE,
2089 packet_type: constants::PACKET_TYPE_DATA,
2090 };
2091 let packet = RawPacket::pack(
2092 flags,
2093 0,
2094 &[0; 16],
2095 None,
2096 constants::CONTEXT_KEEPALIVE,
2097 b"test",
2098 )
2099 .unwrap();
2100 assert!(engine.packet_filter(&packet));
2101 }
2102
2103 #[test]
2104 fn test_packet_filter_drops_high_hop_plain() {
2105 let engine = TransportEngine::new(make_config(false));
2106 let flags = PacketFlags {
2107 header_type: constants::HEADER_1,
2108 context_flag: constants::FLAG_UNSET,
2109 transport_type: constants::TRANSPORT_BROADCAST,
2110 destination_type: constants::DESTINATION_PLAIN,
2111 packet_type: constants::PACKET_TYPE_DATA,
2112 };
2113 let mut packet =
2114 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2115 packet.hops = 2;
2116 assert!(!engine.packet_filter(&packet));
2117 }
2118
2119 #[test]
2120 fn test_packet_filter_allows_duplicate_single_announce() {
2121 let mut engine = TransportEngine::new(make_config(false));
2122 let flags = PacketFlags {
2123 header_type: constants::HEADER_1,
2124 context_flag: constants::FLAG_UNSET,
2125 transport_type: constants::TRANSPORT_BROADCAST,
2126 destination_type: constants::DESTINATION_SINGLE,
2127 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2128 };
2129 let packet = RawPacket::pack(
2130 flags,
2131 0,
2132 &[0; 16],
2133 None,
2134 constants::CONTEXT_NONE,
2135 &[0xAA; 64],
2136 )
2137 .unwrap();
2138
2139 engine.packet_hashlist.add(packet.packet_hash);
2141
2142 assert!(engine.packet_filter(&packet));
2144 }
2145
2146 #[test]
2147 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2148 let mut engine = TransportEngine::new(make_config(false));
2149 engine.packet_hashlist = PacketHashlist::new(2);
2150
2151 let make_packet = |seed: u8| {
2152 let flags = PacketFlags {
2153 header_type: constants::HEADER_1,
2154 context_flag: constants::FLAG_UNSET,
2155 transport_type: constants::TRANSPORT_BROADCAST,
2156 destination_type: constants::DESTINATION_SINGLE,
2157 packet_type: constants::PACKET_TYPE_DATA,
2158 };
2159 RawPacket::pack(
2160 flags,
2161 0,
2162 &[seed; 16],
2163 None,
2164 constants::CONTEXT_NONE,
2165 &[seed; 4],
2166 )
2167 .unwrap()
2168 };
2169
2170 let packet1 = make_packet(1);
2171 let packet2 = make_packet(2);
2172 let packet3 = make_packet(3);
2173
2174 engine.packet_hashlist.add(packet1.packet_hash);
2175 engine.packet_hashlist.add(packet2.packet_hash);
2176 assert!(!engine.packet_filter(&packet1));
2177
2178 engine.packet_hashlist.add(packet3.packet_hash);
2179
2180 assert!(engine.packet_filter(&packet1));
2181 assert!(!engine.packet_filter(&packet2));
2182 assert!(!engine.packet_filter(&packet3));
2183 }
2184
2185 #[test]
2186 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2187 let mut engine = TransportEngine::new(make_config(false));
2188 engine.packet_hashlist = PacketHashlist::new(2);
2189
2190 let make_packet = |seed: u8| {
2191 let flags = PacketFlags {
2192 header_type: constants::HEADER_1,
2193 context_flag: constants::FLAG_UNSET,
2194 transport_type: constants::TRANSPORT_BROADCAST,
2195 destination_type: constants::DESTINATION_SINGLE,
2196 packet_type: constants::PACKET_TYPE_DATA,
2197 };
2198 RawPacket::pack(
2199 flags,
2200 0,
2201 &[seed; 16],
2202 None,
2203 constants::CONTEXT_NONE,
2204 &[seed; 4],
2205 )
2206 .unwrap()
2207 };
2208
2209 let packet1 = make_packet(1);
2210 let packet2 = make_packet(2);
2211 let packet3 = make_packet(3);
2212
2213 engine.packet_hashlist.add(packet1.packet_hash);
2214 engine.packet_hashlist.add(packet2.packet_hash);
2215 engine.packet_hashlist.add(packet2.packet_hash);
2216 engine.packet_hashlist.add(packet3.packet_hash);
2217
2218 assert!(engine.packet_filter(&packet1));
2219 assert!(!engine.packet_filter(&packet2));
2220 assert!(!engine.packet_filter(&packet3));
2221 }
2222
2223 #[test]
2224 fn test_tick_retransmits_announce() {
2225 let mut engine = TransportEngine::new(make_config(true));
2226 engine.register_interface(make_interface(1, constants::MODE_FULL));
2227
2228 let dest = [0x55; 16];
2229 engine.insert_announce_entry(
2230 dest,
2231 AnnounceEntry {
2232 timestamp: 190.0,
2233 retransmit_timeout: 100.0, retries: 0,
2235 received_from: [0xAA; 16],
2236 hops: 2,
2237 packet_raw: vec![0x01, 0x02],
2238 packet_data: vec![0xCC; 10],
2239 destination_hash: dest,
2240 context_flag: 0,
2241 local_rebroadcasts: 0,
2242 block_rebroadcasts: false,
2243 attached_interface: None,
2244 },
2245 190.0,
2246 );
2247
2248 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2249 let actions = engine.tick(200.0, &mut rng);
2250
2251 assert!(!actions.is_empty());
2254 assert!(matches!(
2255 &actions[0],
2256 TransportAction::SendOnInterface { .. }
2257 ));
2258
2259 assert_eq!(engine.announce_table[&dest].retries, 1);
2261 }
2262
2263 #[test]
2264 fn test_gate_retransmit_actions_expands_broadcast_to_matching_interfaces() {
2265 let mut engine = TransportEngine::new(make_config(false));
2266 engine.register_interface(make_interface(1, constants::MODE_FULL));
2267 engine.register_interface(make_interface(2, constants::MODE_FULL));
2268 engine.register_interface(make_interface(3, constants::MODE_ACCESS_POINT));
2269
2270 let dest = [0x56; 16];
2271 let raw = make_announce_raw(&dest, &[0xAB; 32]);
2272 let actions = engine.gate_retransmit_actions(
2273 vec![TransportAction::BroadcastOnAllInterfaces {
2274 raw: raw.clone().into(),
2275 exclude: None,
2276 }],
2277 1000.0,
2278 );
2279
2280 assert_eq!(actions.len(), 2);
2281 for action in &actions {
2282 match action {
2283 TransportAction::SendOnInterface {
2284 interface,
2285 raw: sent,
2286 } => {
2287 assert!(*interface == InterfaceId(1) || *interface == InterfaceId(2));
2288 assert_eq!(&**sent, raw.as_slice());
2289 }
2290 other => panic!("expected SendOnInterface, got {:?}", other),
2291 }
2292 }
2293 }
2294
2295 #[test]
2296 fn test_tick_culls_expired_announce_entries() {
2297 let mut config = make_config(true);
2298 config.announce_table_ttl_secs = 10.0;
2299 let mut engine = TransportEngine::new(config);
2300
2301 let dest1 = [0x61; 16];
2302 let dest2 = [0x62; 16];
2303 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2304 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2305
2306 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2307 let _ = engine.tick(111.0, &mut rng);
2308
2309 assert!(!engine.announce_table().contains_key(&dest1));
2310 assert!(!engine.held_announces().contains_key(&dest2));
2311 }
2312
2313 #[test]
2314 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2315 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2316 let mut config = make_config(true);
2317 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2318 * 2
2319 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2320 let max_bytes = config.announce_table_max_bytes;
2321 let mut engine = TransportEngine::new(config);
2322
2323 let held_dest = [0x71; 16];
2324 let active_dest = [0x72; 16];
2325 let newest_dest = [0x73; 16];
2326
2327 assert!(engine.insert_held_announce(
2328 held_dest,
2329 make_announce_entry(held_dest, 100.0, 32),
2330 100.0,
2331 ));
2332 assert!(engine.insert_announce_entry(
2333 active_dest,
2334 make_announce_entry(active_dest, 100.0, 32),
2335 100.0,
2336 ));
2337 assert!(engine.insert_announce_entry(
2338 newest_dest,
2339 make_announce_entry(newest_dest, 101.0, 32),
2340 101.0,
2341 ));
2342
2343 assert!(!engine.held_announces().contains_key(&held_dest));
2344 assert!(engine.announce_table().contains_key(&active_dest));
2345 assert!(engine.announce_table().contains_key(&newest_dest));
2346 assert!(engine.announce_retained_bytes() <= max_bytes);
2347 }
2348
2349 #[test]
2350 fn test_oversized_announce_entry_is_not_retained() {
2351 let mut config = make_config(true);
2352 config.announce_table_max_bytes = 200;
2353 let mut engine = TransportEngine::new(config);
2354 let dest = [0x81; 16];
2355
2356 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2357 assert!(!engine.announce_table().contains_key(&dest));
2358 assert_eq!(engine.announce_retained_bytes(), 0);
2359 }
2360
2361 #[test]
2362 fn test_blackhole_identity() {
2363 let mut engine = TransportEngine::new(make_config(false));
2364 let hash = [0xAA; 16];
2365 let now = 1000.0;
2366
2367 assert!(!engine.is_blackholed(&hash, now));
2368
2369 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2370 assert!(engine.is_blackholed(&hash, now));
2371 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
2374 assert!(!engine.is_blackholed(&hash, now));
2375 assert!(!engine.unblackhole_identity(&hash)); }
2377
2378 #[test]
2379 fn test_blackhole_with_duration() {
2380 let mut engine = TransportEngine::new(make_config(false));
2381 let hash = [0xBB; 16];
2382 let now = 1000.0;
2383
2384 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
2386 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
2389
2390 #[test]
2391 fn test_cull_blackholed() {
2392 let mut engine = TransportEngine::new(make_config(false));
2393 let hash1 = [0xCC; 16];
2394 let hash2 = [0xDD; 16];
2395 let now = 1000.0;
2396
2397 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
2403 assert!(engine.blackholed_identities.contains_key(&hash2));
2404 }
2405
2406 #[test]
2407 fn test_blackhole_blocks_announce() {
2408 use crate::announce::AnnounceData;
2409 use crate::destination::{destination_hash, name_hash};
2410
2411 let mut engine = TransportEngine::new(make_config(false));
2412 engine.register_interface(make_interface(1, constants::MODE_FULL));
2413
2414 let identity =
2415 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2416 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2417 let name_h = name_hash("test", &["app"]);
2418 let random_hash = [0x42u8; 10];
2419
2420 let (announce_data, _) =
2421 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2422
2423 let flags = PacketFlags {
2424 header_type: constants::HEADER_1,
2425 context_flag: constants::FLAG_UNSET,
2426 transport_type: constants::TRANSPORT_BROADCAST,
2427 destination_type: constants::DESTINATION_SINGLE,
2428 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2429 };
2430 let packet = RawPacket::pack(
2431 flags,
2432 0,
2433 &dest_hash,
2434 None,
2435 constants::CONTEXT_NONE,
2436 &announce_data,
2437 )
2438 .unwrap();
2439
2440 let now = 1000.0;
2442 engine.blackhole_identity(*identity.hash(), now, None, None);
2443
2444 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2445 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
2446
2447 assert!(actions
2449 .iter()
2450 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2451 assert!(actions
2452 .iter()
2453 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2454 }
2455
2456 #[test]
2457 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2458 use crate::announce::AnnounceData;
2459 use crate::destination::{destination_hash, name_hash};
2460 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2461
2462 let mut engine = TransportEngine::new(make_config(true));
2463 engine.register_interface(make_interface(1, constants::MODE_FULL));
2464
2465 let identity =
2466 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2467 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2468 let name_h = name_hash("async", &["announce"]);
2469 let random_hash = [0x44u8; 10];
2470 let (announce_data, _) =
2471 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2472
2473 let packet = RawPacket::pack(
2474 PacketFlags {
2475 header_type: constants::HEADER_2,
2476 context_flag: constants::FLAG_UNSET,
2477 transport_type: constants::TRANSPORT_TRANSPORT,
2478 destination_type: constants::DESTINATION_SINGLE,
2479 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2480 },
2481 3,
2482 &dest_hash,
2483 Some(&[0xBB; 16]),
2484 constants::CONTEXT_NONE,
2485 &announce_data,
2486 )
2487 .unwrap();
2488
2489 engine.announce_table.insert(
2490 dest_hash,
2491 AnnounceEntry {
2492 timestamp: 1000.0,
2493 retransmit_timeout: 2000.0,
2494 retries: constants::PATHFINDER_R,
2495 received_from: [0xBB; 16],
2496 hops: 2,
2497 packet_raw: packet.raw.clone(),
2498 packet_data: packet.data.clone(),
2499 destination_hash: dest_hash,
2500 context_flag: constants::FLAG_UNSET,
2501 local_rebroadcasts: 0,
2502 block_rebroadcasts: false,
2503 attached_interface: None,
2504 },
2505 );
2506
2507 let mut queue = AnnounceVerifyQueue::new(8);
2508 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2509 let actions = engine.handle_inbound_with_announce_queue(
2510 &packet.raw,
2511 InterfaceId(1),
2512 1000.0,
2513 &mut rng,
2514 Some(&mut queue),
2515 );
2516
2517 assert!(actions.is_empty());
2518 assert_eq!(queue.len(), 1);
2519 assert!(
2520 !engine.announce_table.contains_key(&dest_hash),
2521 "retransmit completion should clear announce_table before queueing"
2522 );
2523 }
2524
2525 #[test]
2526 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
2527 use crate::announce::AnnounceData;
2528 use crate::destination::{destination_hash, name_hash};
2529 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2530
2531 let mut engine = TransportEngine::new(make_config(false));
2532 engine.register_interface(make_interface(1, constants::MODE_FULL));
2533
2534 let identity =
2535 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
2536 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
2537 let name_h = name_hash("async", &["cache"]);
2538 let random_hash = [0x55u8; 10];
2539 let (announce_data, _) =
2540 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2541
2542 let packet = RawPacket::pack(
2543 PacketFlags {
2544 header_type: constants::HEADER_1,
2545 context_flag: constants::FLAG_UNSET,
2546 transport_type: constants::TRANSPORT_BROADCAST,
2547 destination_type: constants::DESTINATION_SINGLE,
2548 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2549 },
2550 0,
2551 &dest_hash,
2552 None,
2553 constants::CONTEXT_NONE,
2554 &announce_data,
2555 )
2556 .unwrap();
2557
2558 let mut queue = AnnounceVerifyQueue::new(8);
2559 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
2560 let actions = engine.handle_inbound_with_announce_queue(
2561 &packet.raw,
2562 InterfaceId(1),
2563 1000.0,
2564 &mut rng,
2565 Some(&mut queue),
2566 );
2567 assert!(actions.is_empty());
2568 assert_eq!(queue.len(), 1);
2569
2570 let mut batch = queue.take_pending(1000.0);
2571 assert_eq!(batch.len(), 1);
2572 let (key, pending) = batch.pop().unwrap();
2573
2574 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
2575 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
2576 let mut material = [0u8; 80];
2577 material[..16].copy_from_slice(&pending.packet.destination_hash);
2578 material[16..].copy_from_slice(&announce.signature);
2579 let sig_cache_key = hash::full_hash(&material);
2580
2581 let pending = queue.complete_success(&key).unwrap();
2582 let actions =
2583 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
2584 assert!(actions
2585 .iter()
2586 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
2587 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
2588
2589 let actions = engine.handle_inbound_with_announce_queue(
2590 &packet.raw,
2591 InterfaceId(1),
2592 1001.0,
2593 &mut rng,
2594 Some(&mut queue),
2595 );
2596 assert!(actions.is_empty());
2597 assert_eq!(queue.len(), 0);
2598 }
2599
2600 #[test]
2601 fn test_tick_culls_expired_path() {
2602 let mut engine = TransportEngine::new(make_config(false));
2603 engine.register_interface(make_interface(1, constants::MODE_FULL));
2604
2605 let dest = [0x66; 16];
2606 engine.path_table.insert(
2607 dest,
2608 PathSet::from_single(
2609 PathEntry {
2610 timestamp: 100.0,
2611 next_hop: [0; 16],
2612 hops: 2,
2613 expires: 200.0,
2614 random_blobs: Vec::new(),
2615 receiving_interface: InterfaceId(1),
2616 packet_hash: [0; 32],
2617 announce_raw: None,
2618 },
2619 1,
2620 ),
2621 );
2622
2623 assert!(engine.has_path(&dest));
2624
2625 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2626 engine.tick(300.0, &mut rng);
2628
2629 assert!(!engine.has_path(&dest));
2630 }
2631
2632 fn make_local_client_interface(id: u64) -> InterfaceInfo {
2637 InterfaceInfo {
2638 id: InterfaceId(id),
2639 name: String::from("local_client"),
2640 mode: constants::MODE_FULL,
2641 out_capable: true,
2642 in_capable: true,
2643 bitrate: None,
2644 airtime_profile: None,
2645 announce_rate_target: None,
2646 announce_rate_grace: 0,
2647 announce_rate_penalty: 0.0,
2648 announce_cap: constants::ANNOUNCE_CAP,
2649 is_local_client: true,
2650 wants_tunnel: false,
2651 tunnel_id: None,
2652 mtu: constants::MTU as u32,
2653 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2654 ia_freq: 0.0,
2655 started: 0.0,
2656 }
2657 }
2658
2659 #[test]
2660 fn test_has_local_clients() {
2661 let mut engine = TransportEngine::new(make_config(false));
2662 assert!(!engine.has_local_clients());
2663
2664 engine.register_interface(make_interface(1, constants::MODE_FULL));
2665 assert!(!engine.has_local_clients());
2666
2667 engine.register_interface(make_local_client_interface(2));
2668 assert!(engine.has_local_clients());
2669
2670 engine.deregister_interface(InterfaceId(2));
2671 assert!(!engine.has_local_clients());
2672 }
2673
2674 #[test]
2675 fn test_local_client_hop_decrement() {
2676 let mut engine = TransportEngine::new(make_config(false));
2679 engine.register_interface(make_local_client_interface(1));
2680 engine.register_interface(make_interface(2, constants::MODE_FULL));
2681
2682 let dest = [0xAA; 16];
2684 engine.register_destination(dest, constants::DESTINATION_PLAIN);
2685
2686 let flags = PacketFlags {
2687 header_type: constants::HEADER_1,
2688 context_flag: constants::FLAG_UNSET,
2689 transport_type: constants::TRANSPORT_BROADCAST,
2690 destination_type: constants::DESTINATION_PLAIN,
2691 packet_type: constants::PACKET_TYPE_DATA,
2692 };
2693 let packet =
2695 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2696
2697 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2698 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2699
2700 let deliver = actions
2703 .iter()
2704 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
2705 assert!(deliver.is_some(), "Should deliver locally");
2706 }
2707
2708 #[test]
2709 fn test_prepare_inbound_packet_only_retains_original_raw_for_announces() {
2710 let engine = TransportEngine::new(make_config(false));
2711 let dest = [0xAB; 16];
2712 let flags = PacketFlags {
2713 header_type: constants::HEADER_1,
2714 context_flag: constants::FLAG_UNSET,
2715 transport_type: constants::TRANSPORT_BROADCAST,
2716 destination_type: constants::DESTINATION_SINGLE,
2717 packet_type: constants::PACKET_TYPE_DATA,
2718 };
2719 let packet =
2720 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2721
2722 let ctx = engine
2723 .prepare_inbound_packet(&packet.raw, InterfaceId(9), 1000.0)
2724 .expect("packet should parse and pass filter");
2725
2726 assert!(ctx.original_raw.is_none());
2727 assert_eq!(ctx.packet.raw, packet.raw);
2728 assert_eq!(ctx.packet.hops, 1);
2729 assert_eq!(ctx.iface, InterfaceId(9));
2730
2731 let announce_flags = PacketFlags {
2732 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2733 ..flags
2734 };
2735 let announce = RawPacket::pack(
2736 announce_flags,
2737 0,
2738 &dest,
2739 None,
2740 constants::CONTEXT_NONE,
2741 &[0u8; 91],
2742 )
2743 .unwrap();
2744 let announce_ctx = engine
2745 .prepare_inbound_packet(&announce.raw, InterfaceId(9), 1000.0)
2746 .expect("announce should parse and pass filter");
2747 assert_eq!(
2748 announce_ctx.original_raw.as_deref(),
2749 Some(announce.raw.as_slice())
2750 );
2751 }
2752
2753 #[test]
2754 fn test_deliver_local_preserves_original_raw_and_metadata() {
2755 let mut engine = TransportEngine::new(make_config(false));
2756 engine.register_interface(make_interface(1, constants::MODE_FULL));
2757
2758 let dest = [0xAC; 16];
2759 engine.register_destination(dest, constants::DESTINATION_SINGLE);
2760
2761 let flags = PacketFlags {
2762 header_type: constants::HEADER_1,
2763 context_flag: constants::FLAG_UNSET,
2764 transport_type: constants::TRANSPORT_BROADCAST,
2765 destination_type: constants::DESTINATION_SINGLE,
2766 packet_type: constants::PACKET_TYPE_DATA,
2767 };
2768 let packet =
2769 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"deliver").unwrap();
2770
2771 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2772 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2773
2774 let deliver = actions
2775 .iter()
2776 .find_map(|action| match action {
2777 TransportAction::DeliverLocal {
2778 destination_hash,
2779 raw,
2780 packet_hash,
2781 receiving_interface,
2782 } => Some((destination_hash, raw, packet_hash, receiving_interface)),
2783 _ => None,
2784 })
2785 .expect("should produce DeliverLocal");
2786
2787 assert_eq!(*deliver.0, dest);
2788 assert_eq!(&**deliver.1, packet.raw.as_slice());
2789 assert_eq!(*deliver.2, packet.packet_hash);
2790 assert_eq!(*deliver.3, InterfaceId(1));
2791 }
2792
2793 #[test]
2794 fn test_plain_broadcast_from_local_client() {
2795 let mut engine = TransportEngine::new(make_config(false));
2797 engine.register_interface(make_local_client_interface(1));
2798 engine.register_interface(make_interface(2, constants::MODE_FULL));
2799
2800 let dest = [0xBB; 16];
2801 let flags = PacketFlags {
2802 header_type: constants::HEADER_1,
2803 context_flag: constants::FLAG_UNSET,
2804 transport_type: constants::TRANSPORT_BROADCAST,
2805 destination_type: constants::DESTINATION_PLAIN,
2806 packet_type: constants::PACKET_TYPE_DATA,
2807 };
2808 let packet =
2809 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2810
2811 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2812 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2813
2814 let forward = actions.iter().find(|a| {
2816 matches!(
2817 a,
2818 TransportAction::ForwardPlainBroadcast {
2819 to_local: false,
2820 ..
2821 }
2822 )
2823 });
2824 assert!(forward.is_some(), "Should forward to external interfaces");
2825 }
2826
2827 #[test]
2828 fn test_plain_broadcast_from_external() {
2829 let mut engine = TransportEngine::new(make_config(false));
2831 engine.register_interface(make_local_client_interface(1));
2832 engine.register_interface(make_interface(2, constants::MODE_FULL));
2833
2834 let dest = [0xCC; 16];
2835 let flags = PacketFlags {
2836 header_type: constants::HEADER_1,
2837 context_flag: constants::FLAG_UNSET,
2838 transport_type: constants::TRANSPORT_BROADCAST,
2839 destination_type: constants::DESTINATION_PLAIN,
2840 packet_type: constants::PACKET_TYPE_DATA,
2841 };
2842 let packet =
2843 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2844
2845 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2846 let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
2847
2848 let forward = actions.iter().find(|a| {
2850 matches!(
2851 a,
2852 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
2853 )
2854 });
2855 assert!(forward.is_some(), "Should forward to local clients");
2856 }
2857
2858 #[test]
2859 fn test_no_plain_broadcast_bridging_without_local_clients() {
2860 let mut engine = TransportEngine::new(make_config(false));
2862 engine.register_interface(make_interface(1, constants::MODE_FULL));
2863 engine.register_interface(make_interface(2, constants::MODE_FULL));
2864
2865 let dest = [0xDD; 16];
2866 let flags = PacketFlags {
2867 header_type: constants::HEADER_1,
2868 context_flag: constants::FLAG_UNSET,
2869 transport_type: constants::TRANSPORT_BROADCAST,
2870 destination_type: constants::DESTINATION_PLAIN,
2871 packet_type: constants::PACKET_TYPE_DATA,
2872 };
2873 let packet =
2874 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2875
2876 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2877 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2878
2879 let has_forward = actions
2881 .iter()
2882 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
2883 assert!(!has_forward, "No bridging without local clients");
2884 }
2885
2886 #[test]
2887 fn test_announce_forwarded_to_local_clients() {
2888 use crate::announce::AnnounceData;
2889 use crate::destination::{destination_hash, name_hash};
2890
2891 let mut engine = TransportEngine::new(make_config(false));
2892 engine.register_interface(make_interface(1, constants::MODE_FULL));
2893 engine.register_interface(make_local_client_interface(2));
2894
2895 let identity =
2896 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
2897 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
2898 let name_h = name_hash("test", &["fwd"]);
2899 let random_hash = [0x42u8; 10];
2900
2901 let (announce_data, _) =
2902 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2903
2904 let flags = PacketFlags {
2905 header_type: constants::HEADER_1,
2906 context_flag: constants::FLAG_UNSET,
2907 transport_type: constants::TRANSPORT_BROADCAST,
2908 destination_type: constants::DESTINATION_SINGLE,
2909 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2910 };
2911 let packet = RawPacket::pack(
2912 flags,
2913 0,
2914 &dest_hash,
2915 None,
2916 constants::CONTEXT_NONE,
2917 &announce_data,
2918 )
2919 .unwrap();
2920
2921 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2922 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2923
2924 let forward = actions
2926 .iter()
2927 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
2928 assert!(
2929 forward.is_some(),
2930 "Should forward announce to local clients"
2931 );
2932
2933 match forward.unwrap() {
2935 TransportAction::ForwardToLocalClients { exclude, .. } => {
2936 assert_eq!(*exclude, Some(InterfaceId(1)));
2937 }
2938 _ => unreachable!(),
2939 }
2940 }
2941
2942 #[test]
2943 fn test_no_announce_forward_without_local_clients() {
2944 use crate::announce::AnnounceData;
2945 use crate::destination::{destination_hash, name_hash};
2946
2947 let mut engine = TransportEngine::new(make_config(false));
2948 engine.register_interface(make_interface(1, constants::MODE_FULL));
2949
2950 let identity =
2951 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
2952 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
2953 let name_h = name_hash("test", &["nofwd"]);
2954 let random_hash = [0x42u8; 10];
2955
2956 let (announce_data, _) =
2957 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2958
2959 let flags = PacketFlags {
2960 header_type: constants::HEADER_1,
2961 context_flag: constants::FLAG_UNSET,
2962 transport_type: constants::TRANSPORT_BROADCAST,
2963 destination_type: constants::DESTINATION_SINGLE,
2964 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2965 };
2966 let packet = RawPacket::pack(
2967 flags,
2968 0,
2969 &dest_hash,
2970 None,
2971 constants::CONTEXT_NONE,
2972 &announce_data,
2973 )
2974 .unwrap();
2975
2976 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
2977 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2978
2979 let has_forward = actions
2981 .iter()
2982 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
2983 assert!(!has_forward, "No forward without local clients");
2984 }
2985
2986 #[test]
2987 fn test_local_client_exclude_from_forward() {
2988 use crate::announce::AnnounceData;
2989 use crate::destination::{destination_hash, name_hash};
2990
2991 let mut engine = TransportEngine::new(make_config(false));
2992 engine.register_interface(make_local_client_interface(1));
2993 engine.register_interface(make_local_client_interface(2));
2994
2995 let identity =
2996 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
2997 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
2998 let name_h = name_hash("test", &["excl"]);
2999 let random_hash = [0x42u8; 10];
3000
3001 let (announce_data, _) =
3002 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3003
3004 let flags = PacketFlags {
3005 header_type: constants::HEADER_1,
3006 context_flag: constants::FLAG_UNSET,
3007 transport_type: constants::TRANSPORT_BROADCAST,
3008 destination_type: constants::DESTINATION_SINGLE,
3009 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3010 };
3011 let packet = RawPacket::pack(
3012 flags,
3013 0,
3014 &dest_hash,
3015 None,
3016 constants::CONTEXT_NONE,
3017 &announce_data,
3018 )
3019 .unwrap();
3020
3021 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3022 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3024
3025 let forward = actions
3027 .iter()
3028 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3029 assert!(forward.is_some());
3030 match forward.unwrap() {
3031 TransportAction::ForwardToLocalClients { exclude, .. } => {
3032 assert_eq!(*exclude, Some(InterfaceId(1)));
3033 }
3034 _ => unreachable!(),
3035 }
3036 }
3037
3038 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3043 InterfaceInfo {
3044 id: InterfaceId(id),
3045 name: String::from("tunnel_iface"),
3046 mode: constants::MODE_FULL,
3047 out_capable: true,
3048 in_capable: true,
3049 bitrate: None,
3050 airtime_profile: None,
3051 announce_rate_target: None,
3052 announce_rate_grace: 0,
3053 announce_rate_penalty: 0.0,
3054 announce_cap: constants::ANNOUNCE_CAP,
3055 is_local_client: false,
3056 wants_tunnel: true,
3057 tunnel_id: None,
3058 mtu: constants::MTU as u32,
3059 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3060 ia_freq: 0.0,
3061 started: 0.0,
3062 }
3063 }
3064
3065 #[test]
3066 fn test_handle_tunnel_new() {
3067 let mut engine = TransportEngine::new(make_config(true));
3068 engine.register_interface(make_tunnel_interface(1));
3069
3070 let tunnel_id = [0xAA; 32];
3071 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3072
3073 assert!(actions
3075 .iter()
3076 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3077
3078 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3080 assert_eq!(info.tunnel_id, Some(tunnel_id));
3081
3082 assert_eq!(engine.tunnel_table().len(), 1);
3084 }
3085
3086 #[test]
3087 fn test_announce_stores_tunnel_path() {
3088 use crate::announce::AnnounceData;
3089 use crate::destination::{destination_hash, name_hash};
3090
3091 let mut engine = TransportEngine::new(make_config(false));
3092 let mut iface = make_tunnel_interface(1);
3093 let tunnel_id = [0xBB; 32];
3094 iface.tunnel_id = Some(tunnel_id);
3095 engine.register_interface(iface);
3096
3097 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3099
3100 let identity =
3102 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3103 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3104 let name_h = name_hash("test", &["tunnel"]);
3105 let random_hash = [0x42u8; 10];
3106
3107 let (announce_data, _) =
3108 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3109
3110 let flags = PacketFlags {
3111 header_type: constants::HEADER_1,
3112 context_flag: constants::FLAG_UNSET,
3113 transport_type: constants::TRANSPORT_BROADCAST,
3114 destination_type: constants::DESTINATION_SINGLE,
3115 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3116 };
3117 let packet = RawPacket::pack(
3118 flags,
3119 0,
3120 &dest_hash,
3121 None,
3122 constants::CONTEXT_NONE,
3123 &announce_data,
3124 )
3125 .unwrap();
3126
3127 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3128 engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3129
3130 assert!(engine.has_path(&dest_hash));
3132
3133 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3135 assert_eq!(tunnel.paths.len(), 1);
3136 assert!(tunnel.paths.contains_key(&dest_hash));
3137 }
3138
3139 #[test]
3140 fn test_tunnel_reattach_restores_paths() {
3141 let mut engine = TransportEngine::new(make_config(true));
3142 engine.register_interface(make_tunnel_interface(1));
3143
3144 let tunnel_id = [0xCC; 32];
3145 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3146
3147 let dest = [0xDD; 16];
3149 engine.tunnel_table.store_tunnel_path(
3150 &tunnel_id,
3151 dest,
3152 tunnel::TunnelPath {
3153 timestamp: 1000.0,
3154 received_from: [0xEE; 16],
3155 hops: 3,
3156 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3157 random_blobs: Vec::new(),
3158 packet_hash: [0xFF; 32],
3159 },
3160 1000.0,
3161 constants::DESTINATION_TIMEOUT,
3162 usize::MAX,
3163 );
3164
3165 engine.void_tunnel_interface(&tunnel_id);
3167
3168 engine.path_table.remove(&dest);
3170 assert!(!engine.has_path(&dest));
3171
3172 engine.register_interface(make_interface(2, constants::MODE_FULL));
3174 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3175
3176 assert!(engine.has_path(&dest));
3178 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3179 assert_eq!(path.hops, 3);
3180 assert_eq!(path.receiving_interface, InterfaceId(2));
3181
3182 assert!(actions
3184 .iter()
3185 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3186 }
3187
3188 #[test]
3189 fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3190 let mut engine = TransportEngine::new(make_config(true));
3191 engine.register_interface(make_tunnel_interface(1));
3192
3193 let tunnel_id = [0xCD; 32];
3194 let dest = [0xDE; 16];
3195 let older_blob = make_random_blob(100);
3196 let newer_blob = make_random_blob(200);
3197
3198 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3199 engine.tunnel_table.store_tunnel_path(
3200 &tunnel_id,
3201 dest,
3202 tunnel::TunnelPath {
3203 timestamp: 1000.0,
3204 received_from: [0xEE; 16],
3205 hops: 2,
3206 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3207 random_blobs: vec![older_blob],
3208 packet_hash: [0x11; 32],
3209 },
3210 1000.0,
3211 constants::DESTINATION_TIMEOUT,
3212 usize::MAX,
3213 );
3214 engine.void_tunnel_interface(&tunnel_id);
3215
3216 engine.path_table.insert(
3217 dest,
3218 PathSet::from_single(
3219 PathEntry {
3220 timestamp: 1500.0,
3221 next_hop: [0xAB; 16],
3222 hops: 3,
3223 expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3224 random_blobs: vec![newer_blob],
3225 receiving_interface: InterfaceId(3),
3226 packet_hash: [0x22; 32],
3227 announce_raw: None,
3228 },
3229 1,
3230 ),
3231 );
3232
3233 engine.register_interface(make_interface(2, constants::MODE_FULL));
3234 engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3235
3236 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3237 assert_eq!(path.next_hop, [0xAB; 16]);
3238 assert_eq!(path.hops, 3);
3239 assert_eq!(path.receiving_interface, InterfaceId(3));
3240 assert_eq!(path.random_blobs, vec![newer_blob]);
3241 }
3242
3243 #[test]
3244 fn test_void_tunnel_interface() {
3245 let mut engine = TransportEngine::new(make_config(true));
3246 engine.register_interface(make_tunnel_interface(1));
3247
3248 let tunnel_id = [0xDD; 32];
3249 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3250
3251 assert_eq!(
3253 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3254 Some(InterfaceId(1))
3255 );
3256
3257 engine.void_tunnel_interface(&tunnel_id);
3258
3259 assert_eq!(engine.tunnel_table().len(), 1);
3261 assert_eq!(
3262 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3263 None
3264 );
3265 }
3266
3267 #[test]
3268 fn test_tick_culls_tunnels() {
3269 let mut engine = TransportEngine::new(make_config(true));
3270 engine.register_interface(make_tunnel_interface(1));
3271
3272 let tunnel_id = [0xEE; 32];
3273 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3274 assert_eq!(engine.tunnel_table().len(), 1);
3275
3276 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3277
3278 engine.tick(
3280 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3281 &mut rng,
3282 );
3283
3284 assert_eq!(engine.tunnel_table().len(), 0);
3285 }
3286
3287 #[test]
3288 fn test_synthesize_tunnel() {
3289 let mut engine = TransportEngine::new(make_config(true));
3290 engine.register_interface(make_tunnel_interface(1));
3291
3292 let identity =
3293 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3294 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3295
3296 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3297
3298 assert_eq!(actions.len(), 1);
3300 match &actions[0] {
3301 TransportAction::TunnelSynthesize {
3302 interface,
3303 data,
3304 dest_hash,
3305 } => {
3306 assert_eq!(*interface, InterfaceId(1));
3307 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3308 let expected_dest = crate::destination::destination_hash(
3310 "rnstransport",
3311 &["tunnel", "synthesize"],
3312 None,
3313 );
3314 assert_eq!(*dest_hash, expected_dest);
3315 }
3316 _ => panic!("Expected TunnelSynthesize"),
3317 }
3318 }
3319
3320 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3325 let mut data = Vec::new();
3326 data.extend_from_slice(dest_hash);
3327 data.extend_from_slice(tag);
3328 data
3329 }
3330
3331 #[test]
3332 fn test_path_request_forwarded_on_ap() {
3333 let mut engine = TransportEngine::new(make_config(true));
3334 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3335 engine.register_interface(make_interface(2, constants::MODE_FULL));
3336
3337 let dest = [0xD1; 16];
3338 let tag = [0x01; 16];
3339 let data = make_path_request_data(&dest, &tag);
3340
3341 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3342
3343 assert_eq!(actions.len(), 1);
3345 match &actions[0] {
3346 TransportAction::SendOnInterface { interface, .. } => {
3347 assert_eq!(*interface, InterfaceId(2));
3348 }
3349 _ => panic!("Expected SendOnInterface for forwarded path request"),
3350 }
3351 assert!(engine.discovery_path_requests.contains_key(&dest));
3353 }
3354
3355 #[test]
3356 fn test_path_request_not_forwarded_on_full() {
3357 let mut engine = TransportEngine::new(make_config(true));
3358 engine.register_interface(make_interface(1, constants::MODE_FULL));
3359 engine.register_interface(make_interface(2, constants::MODE_FULL));
3360
3361 let dest = [0xD2; 16];
3362 let tag = [0x02; 16];
3363 let data = make_path_request_data(&dest, &tag);
3364
3365 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3366
3367 assert!(actions.is_empty());
3369 assert!(!engine.discovery_path_requests.contains_key(&dest));
3370 }
3371
3372 #[test]
3373 fn test_duplicate_discovery_path_request_is_suppressed() {
3374 let mut engine = TransportEngine::new(make_config(true));
3375 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3376 engine.register_interface(make_interface(2, constants::MODE_FULL));
3377
3378 let dest = [0xD7; 16];
3379 let tag = [0x07; 16];
3380 let data = make_path_request_data(&dest, &tag);
3381
3382 let first = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3383 let second = engine.handle_path_request(&data, InterfaceId(1), 1001.0);
3384
3385 assert_eq!(first.len(), 1);
3386 assert!(
3387 second.is_empty(),
3388 "duplicate discovery request should be dropped"
3389 );
3390 assert_eq!(engine.discovery_pr_tags_count(), 1);
3391 }
3392
3393 #[test]
3394 fn test_discovery_pr_tags_fifo_eviction() {
3395 let mut config = make_config(true);
3396 config.max_discovery_pr_tags = 2;
3397 let mut engine = TransportEngine::new(config);
3398
3399 let dest1 = [0xA1; 16];
3400 let dest2 = [0xA2; 16];
3401 let dest3 = [0xA3; 16];
3402 let tag1 = [0x01; 16];
3403 let tag2 = [0x02; 16];
3404 let tag3 = [0x03; 16];
3405
3406 engine.handle_path_request(
3407 &make_path_request_data(&dest1, &tag1),
3408 InterfaceId(1),
3409 1000.0,
3410 );
3411 engine.handle_path_request(
3412 &make_path_request_data(&dest2, &tag2),
3413 InterfaceId(1),
3414 1001.0,
3415 );
3416 assert_eq!(engine.discovery_pr_tags_count(), 2);
3417
3418 let unique1 = make_unique_tag(dest1, &tag1);
3419 let unique2 = make_unique_tag(dest2, &tag2);
3420 assert!(engine.has_discovery_pr_tag(&unique1));
3421 assert!(engine.has_discovery_pr_tag(&unique2));
3422
3423 engine.handle_path_request(
3424 &make_path_request_data(&dest3, &tag3),
3425 InterfaceId(1),
3426 1002.0,
3427 );
3428 assert_eq!(engine.discovery_pr_tags_count(), 2);
3429 assert!(!engine.has_discovery_pr_tag(&unique1));
3430 assert!(engine.has_discovery_pr_tag(&unique2));
3431
3432 engine.handle_path_request(
3433 &make_path_request_data(&dest1, &tag1),
3434 InterfaceId(1),
3435 1003.0,
3436 );
3437 assert_eq!(engine.discovery_pr_tags_count(), 2);
3438 assert!(engine.has_discovery_pr_tag(&unique1));
3439 }
3440
3441 #[test]
3442 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3443 let mut config = make_config(false);
3444 config.max_path_destinations = 2;
3445 let mut engine = TransportEngine::new(config);
3446 engine.register_interface(make_interface(1, constants::MODE_FULL));
3447
3448 let dest1 = [0xB1; 16];
3449 let dest2 = [0xB2; 16];
3450 let dest3 = [0xB3; 16];
3451
3452 engine.upsert_path_destination(
3453 dest1,
3454 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3455 1000.0,
3456 );
3457 engine.upsert_path_destination(
3458 dest2,
3459 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3460 1001.0,
3461 );
3462 engine
3463 .path_states
3464 .insert(dest1, constants::STATE_UNRESPONSIVE);
3465
3466 engine.upsert_path_destination(
3467 dest3,
3468 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3469 1002.0,
3470 );
3471
3472 assert_eq!(engine.path_table_count(), 2);
3473 assert!(!engine.has_path(&dest1));
3474 assert!(engine.has_path(&dest2));
3475 assert!(engine.has_path(&dest3));
3476 assert!(!engine.path_states.contains_key(&dest1));
3477 assert_eq!(engine.path_destination_cap_evict_count(), 1);
3478 }
3479
3480 #[test]
3481 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
3482 let mut config = make_config(false);
3483 config.max_path_destinations = 2;
3484 config.max_paths_per_destination = 2;
3485 let mut engine = TransportEngine::new(config);
3486 engine.register_interface(make_interface(1, constants::MODE_FULL));
3487
3488 let dest1 = [0xC1; 16];
3489 let dest2 = [0xC2; 16];
3490
3491 engine.upsert_path_destination(
3492 dest1,
3493 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
3494 1000.0,
3495 );
3496 engine.upsert_path_destination(
3497 dest2,
3498 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
3499 1001.0,
3500 );
3501
3502 engine.upsert_path_destination(
3503 dest2,
3504 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
3505 1002.0,
3506 );
3507
3508 assert_eq!(engine.path_table_count(), 2);
3509 assert!(engine.has_path(&dest1));
3510 assert!(engine.has_path(&dest2));
3511 }
3512
3513 #[test]
3514 fn test_roaming_loop_prevention() {
3515 let mut engine = TransportEngine::new(make_config(true));
3516 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
3517
3518 let dest = [0xD3; 16];
3519 engine.path_table.insert(
3521 dest,
3522 PathSet::from_single(
3523 PathEntry {
3524 timestamp: 900.0,
3525 next_hop: [0xAA; 16],
3526 hops: 2,
3527 expires: 9999.0,
3528 random_blobs: Vec::new(),
3529 receiving_interface: InterfaceId(1),
3530 packet_hash: [0; 32],
3531 announce_raw: None,
3532 },
3533 1,
3534 ),
3535 );
3536
3537 let tag = [0x03; 16];
3538 let data = make_path_request_data(&dest, &tag);
3539
3540 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3541
3542 assert!(actions.is_empty());
3544 assert!(!engine.announce_table.contains_key(&dest));
3545 }
3546
3547 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
3549 let flags: u8 = 0x01; let mut raw = Vec::new();
3553 raw.push(flags);
3554 raw.push(0x02); raw.extend_from_slice(dest_hash);
3556 raw.push(constants::CONTEXT_NONE);
3557 raw.extend_from_slice(payload);
3558 raw
3559 }
3560
3561 #[test]
3562 fn test_path_request_populates_announce_entry_from_raw() {
3563 let mut engine = TransportEngine::new(make_config(true));
3564 engine.register_interface(make_interface(1, constants::MODE_FULL));
3565 engine.register_interface(make_interface(2, constants::MODE_FULL));
3566
3567 let dest = [0xD5; 16];
3568 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
3570
3571 engine.path_table.insert(
3572 dest,
3573 PathSet::from_single(
3574 PathEntry {
3575 timestamp: 900.0,
3576 next_hop: [0xBB; 16],
3577 hops: 2,
3578 expires: 9999.0,
3579 random_blobs: Vec::new(),
3580 receiving_interface: InterfaceId(2),
3581 packet_hash: [0; 32],
3582 announce_raw: Some(announce_raw.clone()),
3583 },
3584 1,
3585 ),
3586 );
3587
3588 let tag = [0x05; 16];
3589 let data = make_path_request_data(&dest, &tag);
3590 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3591
3592 let entry = engine
3594 .announce_table
3595 .get(&dest)
3596 .expect("announce entry must exist");
3597 assert_eq!(entry.packet_raw, announce_raw);
3598 assert_eq!(entry.packet_data, payload);
3599 assert!(entry.block_rebroadcasts);
3600 }
3601
3602 #[test]
3603 fn test_path_request_skips_when_no_announce_raw() {
3604 let mut engine = TransportEngine::new(make_config(true));
3605 engine.register_interface(make_interface(1, constants::MODE_FULL));
3606 engine.register_interface(make_interface(2, constants::MODE_FULL));
3607
3608 let dest = [0xD6; 16];
3609
3610 engine.path_table.insert(
3611 dest,
3612 PathSet::from_single(
3613 PathEntry {
3614 timestamp: 900.0,
3615 next_hop: [0xCC; 16],
3616 hops: 1,
3617 expires: 9999.0,
3618 random_blobs: Vec::new(),
3619 receiving_interface: InterfaceId(2),
3620 packet_hash: [0; 32],
3621 announce_raw: None, },
3623 1,
3624 ),
3625 );
3626
3627 let tag = [0x06; 16];
3628 let data = make_path_request_data(&dest, &tag);
3629 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3630
3631 assert!(actions.is_empty());
3633 assert!(!engine.announce_table.contains_key(&dest));
3634 }
3635
3636 #[test]
3637 fn test_discovery_request_consumed_on_announce() {
3638 let mut engine = TransportEngine::new(make_config(true));
3639 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3640
3641 let dest = [0xD4; 16];
3642
3643 engine.discovery_path_requests.insert(
3645 dest,
3646 DiscoveryPathRequest {
3647 timestamp: 900.0,
3648 requesting_interface: InterfaceId(1),
3649 },
3650 );
3651
3652 let iface = engine.discovery_path_requests_waiting(&dest);
3654 assert_eq!(iface, Some(InterfaceId(1)));
3655
3656 assert!(!engine.discovery_path_requests.contains_key(&dest));
3658 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
3659 }
3660
3661 #[test]
3662 fn test_pending_path_request_announce_bypasses_ingress_control() {
3663 let mut engine = TransportEngine::new(make_config(true));
3664 let mut inbound = make_interface(1, constants::MODE_FULL);
3665 inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
3666 inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
3667 inbound.started = 0.0;
3668 engine.register_interface(inbound);
3669 engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
3670
3671 let identity =
3672 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3673 let dest_hash = crate::destination::destination_hash(
3674 "ingress",
3675 &["path-request"],
3676 Some(identity.hash()),
3677 );
3678 let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
3679 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3680
3681 engine.discovery_path_requests.insert(
3682 dest_hash,
3683 DiscoveryPathRequest {
3684 timestamp: 999.0,
3685 requesting_interface: InterfaceId(2),
3686 },
3687 );
3688
3689 let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
3690 let actions = engine.handle_inbound(&announce_raw, InterfaceId(1), 1000.0, &mut rng);
3691
3692 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
3693 assert!(engine.has_path(&dest_hash));
3694 assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
3695 assert!(actions.iter().any(|a| {
3696 matches!(
3697 a,
3698 TransportAction::AnnounceReceived {
3699 destination_hash,
3700 receiving_interface: InterfaceId(1),
3701 ..
3702 } if *destination_hash == dest_hash
3703 )
3704 }));
3705
3706 let entry = engine
3707 .announce_table
3708 .get(&dest_hash)
3709 .expect("path response announce should be queued");
3710 assert!(entry.block_rebroadcasts);
3711 assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
3712 }
3713
3714 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
3720 let identity =
3721 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3722 let random_hash = [0x42u8; 10];
3723 let (announce_data, _) = crate::announce::AnnounceData::pack(
3724 &identity,
3725 dest_hash,
3726 name_hash,
3727 &random_hash,
3728 None,
3729 None,
3730 )
3731 .unwrap();
3732 let flags = PacketFlags {
3733 header_type: constants::HEADER_1,
3734 context_flag: constants::FLAG_UNSET,
3735 transport_type: constants::TRANSPORT_BROADCAST,
3736 destination_type: constants::DESTINATION_SINGLE,
3737 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3738 };
3739 RawPacket::pack(
3740 flags,
3741 0,
3742 dest_hash,
3743 None,
3744 constants::CONTEXT_NONE,
3745 &announce_data,
3746 )
3747 .unwrap()
3748 .raw
3749 }
3750
3751 #[test]
3752 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
3753 let mut engine = TransportEngine::new(make_config(false));
3758 engine.register_interface(make_local_client_interface(1));
3759
3760 let identity =
3761 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3762 let dest_hash =
3763 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
3764 let name_hash = crate::destination::name_hash("issue4", &["test"]);
3765 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3766
3767 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
3771 announce_packet.raw[1] = 1;
3772 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3773 engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
3774 assert!(engine.has_path(&dest_hash));
3775 assert_eq!(engine.hops_to(&dest_hash), Some(1));
3776
3777 let data_flags = PacketFlags {
3779 header_type: constants::HEADER_1,
3780 context_flag: constants::FLAG_UNSET,
3781 transport_type: constants::TRANSPORT_BROADCAST,
3782 destination_type: constants::DESTINATION_SINGLE,
3783 packet_type: constants::PACKET_TYPE_DATA,
3784 };
3785 let data_packet = RawPacket::pack(
3786 data_flags,
3787 0,
3788 &dest_hash,
3789 None,
3790 constants::CONTEXT_NONE,
3791 b"hello",
3792 )
3793 .unwrap();
3794
3795 let actions =
3796 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
3797
3798 let send = actions.iter().find_map(|a| match a {
3799 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
3800 _ => None,
3801 });
3802 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
3803 assert_eq!(*interface, InterfaceId(1));
3804 let flags = PacketFlags::unpack(raw[0]);
3805 assert_eq!(flags.header_type, constants::HEADER_2);
3806 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
3807 }
3808
3809 #[test]
3810 fn test_issue4_external_data_to_1hop_via_transport_works() {
3811 let daemon_id = [0x42; 16];
3817 let mut engine = TransportEngine::new(TransportConfig {
3818 transport_enabled: true,
3819 identity_hash: Some(daemon_id),
3820 prefer_shorter_path: false,
3821 max_paths_per_destination: 1,
3822 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
3823 max_discovery_pr_tags: constants::MAX_PR_TAGS,
3824 max_path_destinations: usize::MAX,
3825 max_tunnel_destinations_total: usize::MAX,
3826 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
3827 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
3828 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
3829 announce_sig_cache_enabled: true,
3830 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
3831 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
3832 announce_queue_max_entries: 256,
3833 announce_queue_max_interfaces: 1024,
3834 });
3835 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
3839 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3840 let dest_hash =
3841 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
3842 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
3843 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3844
3845 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3847 engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
3848 assert_eq!(engine.hops_to(&dest_hash), Some(1));
3849
3850 let h2_flags = PacketFlags {
3853 header_type: constants::HEADER_2,
3854 context_flag: constants::FLAG_UNSET,
3855 transport_type: constants::TRANSPORT_TRANSPORT,
3856 destination_type: constants::DESTINATION_SINGLE,
3857 packet_type: constants::PACKET_TYPE_DATA,
3858 };
3859 let mut h2_raw = Vec::new();
3861 h2_raw.push(h2_flags.pack());
3862 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
3865 h2_raw.push(constants::CONTEXT_NONE);
3866 h2_raw.extend_from_slice(b"hello via transport");
3867
3868 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
3869 let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
3870
3871 let has_send = actions.iter().any(|a| {
3873 matches!(
3874 a,
3875 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
3876 )
3877 });
3878 assert!(
3879 has_send,
3880 "HEADER_2 transport packet should be forwarded (control test)"
3881 );
3882 }
3883}