1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod pathfinder;
10pub mod rate_limit;
11pub mod tables;
12pub mod tunnel;
13pub mod types;
14
15use alloc::collections::BTreeMap;
16use alloc::string::String;
17use alloc::vec::Vec;
18use core::mem::size_of;
19
20use rns_crypto::Rng;
21
22use crate::announce::AnnounceData;
23use crate::constants;
24use crate::hash;
25use crate::packet::RawPacket;
26
27use self::announce_proc::compute_path_expires;
28use self::announce_queue::AnnounceQueues;
29use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
30use self::dedup::{AnnounceSignatureCache, PacketHashlist};
31use self::inbound::{
32 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
33 route_via_link_table,
34};
35use self::ingress_control::IngressControl;
36use self::outbound::{route_outbound, should_transmit_announce};
37use self::pathfinder::{
38 decide_announce_multipath, extract_random_blob, timebase_from_random_blob, MultiPathDecision,
39};
40use self::rate_limit::AnnounceRateLimiter;
41use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
42use self::tunnel::TunnelTable;
43use self::types::{BlackholeEntry, InterfaceId, InterfaceInfo, TransportAction, TransportConfig};
44
45pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
46pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
47
48pub struct TransportEngine {
53 config: TransportConfig,
54 path_table: BTreeMap<[u8; 16], PathSet>,
55 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
56 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
57 link_table: BTreeMap<[u8; 16], LinkEntry>,
58 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
59 packet_hashlist: PacketHashlist,
60 announce_sig_cache: AnnounceSignatureCache,
61 rate_limiter: AnnounceRateLimiter,
62 path_states: BTreeMap<[u8; 16], u8>,
63 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
64 local_destinations: BTreeMap<[u8; 16], u8>,
65 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
66 announce_queues: AnnounceQueues,
67 ingress_control: IngressControl,
68 tunnel_table: TunnelTable,
69 discovery_pr_tags: Vec<[u8; 32]>,
70 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
71 path_destination_cap_evict_count: usize,
72 announces_last_checked: f64,
74 tables_last_culled: f64,
75}
76
77impl TransportEngine {
78 pub fn new(config: TransportConfig) -> Self {
79 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
80 let sig_cache_max = if config.announce_sig_cache_enabled {
81 config.announce_sig_cache_max_entries
82 } else {
83 0
84 };
85 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
86 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
87 TransportEngine {
88 config,
89 path_table: BTreeMap::new(),
90 announce_table: BTreeMap::new(),
91 reverse_table: BTreeMap::new(),
92 link_table: BTreeMap::new(),
93 held_announces: BTreeMap::new(),
94 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
95 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
96 rate_limiter: AnnounceRateLimiter::new(),
97 path_states: BTreeMap::new(),
98 interfaces: BTreeMap::new(),
99 local_destinations: BTreeMap::new(),
100 blackholed_identities: BTreeMap::new(),
101 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
102 ingress_control: IngressControl::new(),
103 tunnel_table: TunnelTable::new(),
104 discovery_pr_tags: Vec::new(),
105 discovery_path_requests: BTreeMap::new(),
106 path_destination_cap_evict_count: 0,
107 announces_last_checked: 0.0,
108 tables_last_culled: 0.0,
109 }
110 }
111
112 fn insert_discovery_pr_tag(&mut self, unique_tag: [u8; 32]) -> bool {
113 if self.discovery_pr_tags.contains(&unique_tag) {
114 return false;
115 }
116 if self.config.max_discovery_pr_tags != usize::MAX
117 && self.discovery_pr_tags.len() >= self.config.max_discovery_pr_tags
118 && !self.discovery_pr_tags.is_empty()
119 {
120 self.discovery_pr_tags.remove(0);
121 }
122 self.discovery_pr_tags.push(unique_tag);
123 true
124 }
125
126 fn upsert_path_destination(&mut self, dest_hash: [u8; 16], entry: PathEntry, now: f64) {
127 let max_paths = self.config.max_paths_per_destination;
128 if let Some(ps) = self.path_table.get_mut(&dest_hash) {
129 ps.upsert(entry);
130 return;
131 }
132 self.enforce_path_destination_cap(now);
133 self.path_table
134 .insert(dest_hash, PathSet::from_single(entry, max_paths));
135 }
136
137 fn enforce_path_destination_cap(&mut self, now: f64) {
138 if self.config.max_path_destinations == usize::MAX {
139 return;
140 }
141 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
142 while self.path_table.len() >= self.config.max_path_destinations {
143 let Some(dest_hash) = self.oldest_path_destination() else {
144 break;
145 };
146 self.path_table.remove(&dest_hash);
147 self.path_states.remove(&dest_hash);
148 self.path_destination_cap_evict_count += 1;
149 }
150 }
151
152 fn oldest_path_destination(&self) -> Option<[u8; 16]> {
153 self.path_table
154 .iter()
155 .filter_map(|(dest_hash, path_set)| {
156 path_set
157 .primary()
158 .map(|primary| (*dest_hash, primary.timestamp, primary.hops))
159 })
160 .min_by(|a, b| {
161 a.1.partial_cmp(&b.1)
162 .unwrap_or(core::cmp::Ordering::Equal)
163 .then_with(|| b.2.cmp(&a.2))
164 })
165 .map(|(dest_hash, _, _)| dest_hash)
166 }
167
168 fn announce_entry_size_bytes(entry: &AnnounceEntry) -> usize {
169 size_of::<AnnounceEntry>() + entry.packet_raw.capacity() + entry.packet_data.capacity()
170 }
171
172 fn announce_retained_bytes_total(&self) -> usize {
173 self.announce_table
174 .values()
175 .chain(self.held_announces.values())
176 .map(Self::announce_entry_size_bytes)
177 .sum()
178 }
179
180 fn cull_expired_announce_entries(&mut self, now: f64) -> usize {
181 let ttl = self.config.announce_table_ttl_secs;
182 let mut removed = 0usize;
183
184 self.announce_table.retain(|_, entry| {
185 let keep = now <= entry.timestamp + ttl;
186 if !keep {
187 removed += 1;
188 }
189 keep
190 });
191
192 self.held_announces.retain(|_, entry| {
193 let keep = now <= entry.timestamp + ttl;
194 if !keep {
195 removed += 1;
196 }
197 keep
198 });
199
200 removed
201 }
202
203 fn oldest_retained_announce(&self) -> Option<([u8; 16], bool)> {
204 let oldest_active = self
205 .announce_table
206 .iter()
207 .map(|(dest_hash, entry)| (*dest_hash, false, entry.timestamp))
208 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
209 let oldest_held = self
210 .held_announces
211 .iter()
212 .map(|(dest_hash, entry)| (*dest_hash, true, entry.timestamp))
213 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
214
215 match (oldest_active, oldest_held) {
216 (Some(active), Some(held)) => {
217 let ordering = active
218 .2
219 .partial_cmp(&held.2)
220 .unwrap_or(core::cmp::Ordering::Equal);
221 if ordering == core::cmp::Ordering::Less {
222 Some((active.0, active.1))
223 } else {
224 Some((held.0, held.1))
225 }
226 }
227 (Some(active), None) => Some((active.0, active.1)),
228 (None, Some(held)) => Some((held.0, held.1)),
229 (None, None) => None,
230 }
231 }
232
233 fn enforce_announce_retention_cap(&mut self, now: f64) {
234 self.cull_expired_announce_entries(now);
235 while self.announce_retained_bytes_total() > self.config.announce_table_max_bytes {
236 let Some((dest_hash, is_held)) = self.oldest_retained_announce() else {
237 break;
238 };
239 if is_held {
240 self.held_announces.remove(&dest_hash);
241 } else {
242 self.announce_table.remove(&dest_hash);
243 }
244 }
245 }
246
247 fn insert_announce_entry(
248 &mut self,
249 dest_hash: [u8; 16],
250 entry: AnnounceEntry,
251 now: f64,
252 ) -> bool {
253 self.cull_expired_announce_entries(now);
254 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
255 return false;
256 }
257 self.announce_table.insert(dest_hash, entry);
258 self.enforce_announce_retention_cap(now);
259 self.announce_table.contains_key(&dest_hash)
260 }
261
262 fn insert_held_announce(
263 &mut self,
264 dest_hash: [u8; 16],
265 entry: AnnounceEntry,
266 now: f64,
267 ) -> bool {
268 self.cull_expired_announce_entries(now);
269 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
270 return false;
271 }
272 self.held_announces.insert(dest_hash, entry);
273 self.enforce_announce_retention_cap(now);
274 self.held_announces.contains_key(&dest_hash)
275 }
276
277 pub fn register_interface(&mut self, info: InterfaceInfo) {
282 self.interfaces.insert(info.id, info);
283 }
284
285 pub fn deregister_interface(&mut self, id: InterfaceId) {
286 self.interfaces.remove(&id);
287 self.announce_queues.remove_interface(id);
288 self.ingress_control.remove_interface(&id);
289 }
290
291 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
296 self.local_destinations.insert(dest_hash, dest_type);
297 }
298
299 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
300 self.local_destinations.remove(dest_hash);
301 }
302
303 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
308 self.path_table
309 .get(dest_hash)
310 .is_some_and(|ps| !ps.is_empty())
311 }
312
313 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
314 self.path_table
315 .get(dest_hash)
316 .and_then(|ps| ps.primary())
317 .map(|e| e.hops)
318 }
319
320 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
321 self.path_table
322 .get(dest_hash)
323 .and_then(|ps| ps.primary())
324 .map(|e| e.next_hop)
325 }
326
327 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
328 self.path_table
329 .get(dest_hash)
330 .and_then(|ps| ps.primary())
331 .map(|e| e.receiving_interface)
332 }
333
334 pub fn mark_path_unresponsive(
344 &mut self,
345 dest_hash: &[u8; 16],
346 receiving_interface: Option<InterfaceId>,
347 ) {
348 if let Some(iface_id) = receiving_interface {
349 if let Some(info) = self.interfaces.get(&iface_id) {
350 if info.mode == constants::MODE_BOUNDARY {
351 return;
352 }
353 }
354 }
355
356 if let Some(ps) = self.path_table.get_mut(dest_hash) {
358 if ps.len() > 1 {
359 ps.failover(false); self.path_states.remove(dest_hash);
362 return;
363 }
364 }
365
366 self.path_states
367 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
368 }
369
370 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
371 self.path_states
372 .insert(*dest_hash, constants::STATE_RESPONSIVE);
373 }
374
375 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
376 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
377 }
378
379 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
380 if let Some(ps) = self.path_table.get_mut(dest_hash) {
381 ps.expire_all();
382 }
383 }
384
385 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
390 self.link_table.insert(link_id, entry);
391 }
392
393 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
394 if let Some(entry) = self.link_table.get_mut(link_id) {
395 entry.validated = true;
396 }
397 }
398
399 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
400 self.link_table.remove(link_id);
401 }
402
403 pub fn blackhole_identity(
409 &mut self,
410 identity_hash: [u8; 16],
411 now: f64,
412 duration_hours: Option<f64>,
413 reason: Option<String>,
414 ) {
415 let expires = match duration_hours {
416 Some(h) if h > 0.0 => now + h * 3600.0,
417 _ => 0.0, };
419 self.blackholed_identities.insert(
420 identity_hash,
421 BlackholeEntry {
422 created: now,
423 expires,
424 reason,
425 },
426 );
427 }
428
429 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
431 self.blackholed_identities.remove(identity_hash).is_some()
432 }
433
434 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
436 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
437 if entry.expires == 0.0 || entry.expires > now {
438 return true;
439 }
440 }
441 false
442 }
443
444 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
446 self.blackholed_identities.iter()
447 }
448
449 fn cull_blackholed(&mut self, now: f64) {
451 self.blackholed_identities
452 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
453 }
454
455 pub fn handle_tunnel(
463 &mut self,
464 tunnel_id: [u8; 32],
465 interface: InterfaceId,
466 now: f64,
467 ) -> Vec<TransportAction> {
468 let mut actions = Vec::new();
469
470 if let Some(info) = self.interfaces.get_mut(&interface) {
472 info.tunnel_id = Some(tunnel_id);
473 }
474
475 let restored_paths = self.tunnel_table.handle_tunnel(
476 tunnel_id,
477 interface,
478 now,
479 self.config.destination_timeout_secs,
480 );
481
482 for (dest_hash, tunnel_path) in &restored_paths {
484 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
485 Some(existing) => {
486 tunnel_path.hops <= existing.hops || existing.expires < now
488 }
489 None => true,
490 };
491
492 if should_restore {
493 let entry = PathEntry {
494 timestamp: tunnel_path.timestamp,
495 next_hop: tunnel_path.received_from,
496 hops: tunnel_path.hops,
497 expires: tunnel_path.expires,
498 random_blobs: tunnel_path.random_blobs.clone(),
499 receiving_interface: interface,
500 packet_hash: tunnel_path.packet_hash,
501 announce_raw: None,
502 };
503 self.upsert_path_destination(*dest_hash, entry, now);
504 }
505 }
506
507 actions.push(TransportAction::TunnelEstablished {
508 tunnel_id,
509 interface,
510 });
511
512 actions
513 }
514
515 pub fn synthesize_tunnel(
523 &self,
524 identity: &rns_crypto::identity::Identity,
525 interface_id: InterfaceId,
526 rng: &mut dyn Rng,
527 ) -> Vec<TransportAction> {
528 let mut actions = Vec::new();
529
530 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
532 hash::full_hash(info.name.as_bytes())
533 } else {
534 return actions;
535 };
536
537 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
538 Ok((data, _tunnel_id)) => {
539 let dest_hash = crate::destination::destination_hash(
540 "rnstransport",
541 &["tunnel", "synthesize"],
542 None,
543 );
544 actions.push(TransportAction::TunnelSynthesize {
545 interface: interface_id,
546 data,
547 dest_hash,
548 });
549 }
550 Err(e) => {
551 let _ = e;
553 }
554 }
555
556 actions
557 }
558
559 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
561 self.tunnel_table.void_tunnel_interface(tunnel_id);
562 }
563
564 pub fn tunnel_table(&self) -> &TunnelTable {
566 &self.tunnel_table
567 }
568
569 fn has_local_clients(&self) -> bool {
575 self.interfaces.values().any(|i| i.is_local_client)
576 }
577
578 fn packet_filter(&self, packet: &RawPacket) -> bool {
582 if packet.transport_id.is_some()
584 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
585 {
586 if let Some(ref identity_hash) = self.config.identity_hash {
587 if packet.transport_id.as_ref() != Some(identity_hash) {
588 return false;
589 }
590 }
591 }
592
593 match packet.context {
595 constants::CONTEXT_KEEPALIVE
596 | constants::CONTEXT_RESOURCE_REQ
597 | constants::CONTEXT_RESOURCE_PRF
598 | constants::CONTEXT_RESOURCE
599 | constants::CONTEXT_CACHE_REQUEST
600 | constants::CONTEXT_CHANNEL => return true,
601 _ => {}
602 }
603
604 if packet.flags.destination_type == constants::DESTINATION_PLAIN
606 || packet.flags.destination_type == constants::DESTINATION_GROUP
607 {
608 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
609 return packet.hops <= 1;
610 } else {
611 return false;
613 }
614 }
615
616 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
618 return true;
619 }
620
621 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
623 && packet.flags.destination_type == constants::DESTINATION_SINGLE
624 {
625 return true;
626 }
627
628 false
629 }
630
631 pub fn handle_inbound(
639 &mut self,
640 raw: &[u8],
641 iface: InterfaceId,
642 now: f64,
643 rng: &mut dyn Rng,
644 ) -> Vec<TransportAction> {
645 self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
646 }
647
648 pub fn handle_inbound_with_announce_queue(
649 &mut self,
650 raw: &[u8],
651 iface: InterfaceId,
652 now: f64,
653 rng: &mut dyn Rng,
654 announce_queue: Option<&mut AnnounceVerifyQueue>,
655 ) -> Vec<TransportAction> {
656 let mut actions = Vec::new();
657
658 let mut packet = match RawPacket::unpack(raw) {
660 Ok(p) => p,
661 Err(_) => return actions, };
663
664 let original_raw = raw.to_vec();
666
667 packet.hops += 1;
669
670 let from_local_client = self
673 .interfaces
674 .get(&iface)
675 .map(|i| i.is_local_client)
676 .unwrap_or(false);
677 if from_local_client {
678 packet.hops = packet.hops.saturating_sub(1);
679 }
680
681 if !self.packet_filter(&packet) {
683 return actions;
684 }
685
686 let mut remember_hash = true;
688
689 if self.link_table.contains_key(&packet.destination_hash) {
690 remember_hash = false;
691 }
692 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF
693 && packet.context == constants::CONTEXT_LRPROOF
694 {
695 remember_hash = false;
696 }
697
698 if remember_hash {
699 self.packet_hashlist.add(packet.packet_hash);
700 }
701
702 if packet.flags.destination_type == constants::DESTINATION_PLAIN
704 && packet.flags.transport_type == constants::TRANSPORT_BROADCAST
705 && self.has_local_clients()
706 {
707 if from_local_client {
708 actions.push(TransportAction::ForwardPlainBroadcast {
710 raw: packet.raw.clone(),
711 to_local: false,
712 exclude: Some(iface),
713 });
714 } else {
715 actions.push(TransportAction::ForwardPlainBroadcast {
717 raw: packet.raw.clone(),
718 to_local: true,
719 exclude: None,
720 });
721 }
722 }
723
724 if self.config.transport_enabled || self.config.identity_hash.is_some() {
726 if packet.transport_id.is_some()
727 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
728 {
729 if let Some(ref identity_hash) = self.config.identity_hash {
730 if packet.transport_id.as_ref() == Some(identity_hash) {
731 if let Some(path_entry) = self
732 .path_table
733 .get(&packet.destination_hash)
734 .and_then(|ps| ps.primary())
735 {
736 let next_hop = path_entry.next_hop;
737 let remaining_hops = path_entry.hops;
738 let outbound_interface = path_entry.receiving_interface;
739
740 let new_raw = forward_transport_packet(
741 &packet,
742 next_hop,
743 remaining_hops,
744 outbound_interface,
745 );
746
747 if packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
749 let proof_timeout = now
750 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP
751 * (remaining_hops.max(1) as f64);
752
753 let (link_id, link_entry) = create_link_entry(
754 &packet,
755 next_hop,
756 outbound_interface,
757 remaining_hops,
758 iface,
759 now,
760 proof_timeout,
761 );
762 self.link_table.insert(link_id, link_entry);
763 actions.push(TransportAction::LinkRequestReceived {
764 link_id,
765 destination_hash: packet.destination_hash,
766 receiving_interface: iface,
767 });
768 } else {
769 let (trunc_hash, reverse_entry) =
770 create_reverse_entry(&packet, outbound_interface, iface, now);
771 self.reverse_table.insert(trunc_hash, reverse_entry);
772 }
773
774 actions.push(TransportAction::SendOnInterface {
775 interface: outbound_interface,
776 raw: new_raw,
777 });
778
779 if let Some(entry) = self
781 .path_table
782 .get_mut(&packet.destination_hash)
783 .and_then(|ps| ps.primary_mut())
784 {
785 entry.timestamp = now;
786 }
787 }
788 }
789 }
790 }
791
792 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
794 && packet.flags.packet_type != constants::PACKET_TYPE_LINKREQUEST
795 && packet.context != constants::CONTEXT_LRPROOF
796 {
797 if let Some(link_entry) = self.link_table.get(&packet.destination_hash).cloned() {
798 if let Some((outbound_iface, new_raw)) =
799 route_via_link_table(&packet, &link_entry, iface)
800 {
801 self.packet_hashlist.add(packet.packet_hash);
803
804 actions.push(TransportAction::SendOnInterface {
805 interface: outbound_iface,
806 raw: new_raw,
807 });
808
809 if let Some(entry) = self.link_table.get_mut(&packet.destination_hash) {
811 entry.timestamp = now;
812 }
813 }
814 }
815 }
816 }
817
818 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE {
820 if let Some(queue) = announce_queue {
821 self.try_enqueue_announce(
822 &packet,
823 &original_raw,
824 iface,
825 now,
826 rng,
827 queue,
828 &mut actions,
829 );
830 } else {
831 self.process_inbound_announce(
832 &packet,
833 &original_raw,
834 iface,
835 now,
836 rng,
837 &mut actions,
838 );
839 }
840 }
841
842 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
844 self.process_inbound_proof(&packet, iface, now, &mut actions);
845 }
846
847 if (packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
849 || packet.flags.packet_type == constants::PACKET_TYPE_DATA)
850 && self
851 .local_destinations
852 .contains_key(&packet.destination_hash)
853 {
854 actions.push(TransportAction::DeliverLocal {
855 destination_hash: packet.destination_hash,
856 raw: packet.raw.clone(),
857 packet_hash: packet.packet_hash,
858 receiving_interface: iface,
859 });
860 }
861
862 actions
863 }
864
865 fn process_inbound_announce(
870 &mut self,
871 packet: &RawPacket,
872 original_raw: &[u8],
873 iface: InterfaceId,
874 now: f64,
875 rng: &mut dyn Rng,
876 actions: &mut Vec<TransportAction>,
877 ) {
878 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
879 return;
880 }
881
882 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
883
884 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
886 Ok(a) => a,
887 Err(_) => return,
888 };
889
890 let sig_cache_key =
891 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
892
893 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
894 announce.to_validated_unchecked()
895 } else {
896 match announce.validate(&packet.destination_hash) {
897 Ok(v) => {
898 self.announce_sig_cache.insert(sig_cache_key, now);
899 v
900 }
901 Err(_) => return,
902 }
903 };
904
905 let received_from = self.announce_received_from(packet, now);
906 let random_blob = match extract_random_blob(&packet.data) {
907 Some(b) => b,
908 None => return,
909 };
910 let announce_emitted = timebase_from_random_blob(&random_blob);
911
912 self.process_verified_announce(
913 packet,
914 original_raw,
915 iface,
916 now,
917 rng,
918 validated,
919 received_from,
920 random_blob,
921 announce_emitted,
922 actions,
923 );
924 }
925
926 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
927 let mut material = [0u8; 80];
928 material[..16].copy_from_slice(&destination_hash);
929 material[16..].copy_from_slice(signature);
930 hash::full_hash(&material)
931 }
932
933 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
934 if let Some(transport_id) = packet.transport_id {
935 if self.config.transport_enabled {
936 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
937 {
938 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
939 announce_entry.local_rebroadcasts += 1;
940 if announce_entry.retries > 0
941 && announce_entry.local_rebroadcasts
942 >= constants::LOCAL_REBROADCASTS_MAX
943 {
944 self.announce_table.remove(&packet.destination_hash);
945 }
946 }
947 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
948 {
949 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
950 && announce_entry.retries > 0
951 && now < announce_entry.retransmit_timeout
952 {
953 self.announce_table.remove(&packet.destination_hash);
954 }
955 }
956 }
957 }
958 transport_id
959 } else {
960 packet.destination_hash
961 }
962 }
963
964 fn should_hold_announce(
965 &mut self,
966 packet: &RawPacket,
967 original_raw: &[u8],
968 iface: InterfaceId,
969 now: f64,
970 ) -> bool {
971 if self.has_path(&packet.destination_hash) {
972 return false;
973 }
974 let Some(info) = self.interfaces.get(&iface) else {
975 return false;
976 };
977 if packet.context == constants::CONTEXT_PATH_RESPONSE
978 || !self.ingress_control.should_ingress_limit(
979 iface,
980 &info.ingress_control,
981 info.ia_freq,
982 info.started,
983 now,
984 )
985 {
986 return false;
987 }
988 self.ingress_control.hold_announce(
989 iface,
990 &info.ingress_control,
991 packet.destination_hash,
992 ingress_control::HeldAnnounce {
993 raw: original_raw.to_vec(),
994 hops: packet.hops,
995 receiving_interface: iface,
996 timestamp: now,
997 },
998 );
999 true
1000 }
1001
1002 fn try_enqueue_announce(
1003 &mut self,
1004 packet: &RawPacket,
1005 original_raw: &[u8],
1006 iface: InterfaceId,
1007 now: f64,
1008 rng: &mut dyn Rng,
1009 announce_queue: &mut AnnounceVerifyQueue,
1010 actions: &mut Vec<TransportAction>,
1011 ) {
1012 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
1013 return;
1014 }
1015
1016 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
1017 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
1018 Ok(a) => a,
1019 Err(_) => return,
1020 };
1021
1022 let received_from = self.announce_received_from(packet, now);
1023
1024 if self
1025 .local_destinations
1026 .contains_key(&packet.destination_hash)
1027 {
1028 log::debug!(
1029 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1030 packet.destination_hash[0],
1031 packet.destination_hash[1],
1032 packet.destination_hash[2],
1033 packet.destination_hash[3],
1034 );
1035 return;
1036 }
1037
1038 if self.should_hold_announce(packet, original_raw, iface, now) {
1039 return;
1040 }
1041
1042 let sig_cache_key =
1043 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
1044 if self.announce_sig_cache.contains(&sig_cache_key) {
1045 let validated = announce.to_validated_unchecked();
1046 let random_blob = match extract_random_blob(&packet.data) {
1047 Some(b) => b,
1048 None => return,
1049 };
1050 let announce_emitted = timebase_from_random_blob(&random_blob);
1051 self.process_verified_announce(
1052 packet,
1053 original_raw,
1054 iface,
1055 now,
1056 rng,
1057 validated,
1058 received_from,
1059 random_blob,
1060 announce_emitted,
1061 actions,
1062 );
1063 return;
1064 }
1065
1066 if packet.context == constants::CONTEXT_PATH_RESPONSE {
1067 let Ok(validated) = announce.validate(&packet.destination_hash) else {
1068 return;
1069 };
1070 self.announce_sig_cache.insert(sig_cache_key, now);
1071 let random_blob = match extract_random_blob(&packet.data) {
1072 Some(b) => b,
1073 None => return,
1074 };
1075 let announce_emitted = timebase_from_random_blob(&random_blob);
1076 self.process_verified_announce(
1077 packet,
1078 original_raw,
1079 iface,
1080 now,
1081 rng,
1082 validated,
1083 received_from,
1084 random_blob,
1085 announce_emitted,
1086 actions,
1087 );
1088 return;
1089 }
1090
1091 let random_blob = match extract_random_blob(&packet.data) {
1092 Some(b) => b,
1093 None => return,
1094 };
1095 let announce_emitted = timebase_from_random_blob(&random_blob);
1096 let key = AnnounceVerifyKey {
1097 destination_hash: packet.destination_hash,
1098 random_blob,
1099 received_from,
1100 };
1101 let pending = PendingAnnounce {
1102 original_raw: original_raw.to_vec(),
1103 packet: packet.clone(),
1104 interface: iface,
1105 received_from,
1106 queued_at: now,
1107 best_hops: packet.hops,
1108 emission_ts: announce_emitted,
1109 random_blob,
1110 };
1111 let _ = announce_queue.enqueue(key, pending);
1112 }
1113
1114 pub fn complete_verified_announce(
1115 &mut self,
1116 pending: PendingAnnounce,
1117 validated: crate::announce::ValidatedAnnounce,
1118 sig_cache_key: [u8; 32],
1119 now: f64,
1120 rng: &mut dyn Rng,
1121 ) -> Vec<TransportAction> {
1122 self.announce_sig_cache.insert(sig_cache_key, now);
1123 let mut actions = Vec::new();
1124 self.process_verified_announce(
1125 &pending.packet,
1126 &pending.original_raw,
1127 pending.interface,
1128 now,
1129 rng,
1130 validated,
1131 pending.received_from,
1132 pending.random_blob,
1133 pending.emission_ts,
1134 &mut actions,
1135 );
1136 actions
1137 }
1138
1139 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1140
1141 fn process_verified_announce(
1142 &mut self,
1143 packet: &RawPacket,
1144 original_raw: &[u8],
1145 iface: InterfaceId,
1146 now: f64,
1147 rng: &mut dyn Rng,
1148 validated: crate::announce::ValidatedAnnounce,
1149 received_from: [u8; 16],
1150 random_blob: [u8; 10],
1151 announce_emitted: u64,
1152 actions: &mut Vec<TransportAction>,
1153 ) {
1154 if self.is_blackholed(&validated.identity_hash, now) {
1155 return;
1156 }
1157 if packet.hops > constants::PATHFINDER_M {
1158 return;
1159 }
1160
1161 let existing_set = self.path_table.get(&packet.destination_hash);
1163 let is_unresponsive = self.path_is_unresponsive(&packet.destination_hash);
1164
1165 let mp_decision = decide_announce_multipath(
1166 existing_set,
1167 packet.hops,
1168 announce_emitted,
1169 &random_blob,
1170 &received_from,
1171 is_unresponsive,
1172 now,
1173 self.config.prefer_shorter_path,
1174 );
1175
1176 if mp_decision == MultiPathDecision::Reject {
1177 log::debug!(
1178 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1179 packet.destination_hash[0],
1180 packet.destination_hash[1],
1181 packet.destination_hash[2],
1182 packet.destination_hash[3],
1183 );
1184 return;
1185 }
1186
1187 let rate_blocked = if packet.context != constants::CONTEXT_PATH_RESPONSE {
1189 if let Some(iface_info) = self.interfaces.get(&iface) {
1190 self.rate_limiter.check_and_update(
1191 &packet.destination_hash,
1192 now,
1193 iface_info.announce_rate_target,
1194 iface_info.announce_rate_grace,
1195 iface_info.announce_rate_penalty,
1196 )
1197 } else {
1198 false
1199 }
1200 } else {
1201 false
1202 };
1203
1204 let interface_mode = self
1206 .interfaces
1207 .get(&iface)
1208 .map(|i| i.mode)
1209 .unwrap_or(constants::MODE_FULL);
1210
1211 let expires = compute_path_expires(now, interface_mode);
1212
1213 let existing_blobs = self
1215 .path_table
1216 .get(&packet.destination_hash)
1217 .and_then(|ps| ps.find_by_next_hop(&received_from))
1218 .map(|e| e.random_blobs.clone())
1219 .unwrap_or_default();
1220
1221 let mut rng_bytes = [0u8; 8];
1223 rng.fill_bytes(&mut rng_bytes);
1224 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1225
1226 let is_path_response = packet.context == constants::CONTEXT_PATH_RESPONSE;
1227
1228 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1229 packet.destination_hash,
1230 packet.hops,
1231 &packet.data,
1232 &packet.raw,
1233 packet.packet_hash,
1234 packet.flags.context_flag,
1235 received_from,
1236 iface,
1237 now,
1238 existing_blobs,
1239 random_blob,
1240 expires,
1241 rng_value,
1242 self.config.transport_enabled,
1243 is_path_response,
1244 rate_blocked,
1245 Some(original_raw.to_vec()),
1246 );
1247
1248 actions.push(TransportAction::CacheAnnounce {
1250 packet_hash: packet.packet_hash,
1251 raw: original_raw.to_vec(),
1252 });
1253
1254 self.upsert_path_destination(packet.destination_hash, path_entry, now);
1256
1257 if let Some(tunnel_id) = self.interfaces.get(&iface).and_then(|i| i.tunnel_id) {
1259 let blobs = self
1260 .path_table
1261 .get(&packet.destination_hash)
1262 .and_then(|ps| ps.find_by_next_hop(&received_from))
1263 .map(|e| e.random_blobs.clone())
1264 .unwrap_or_default();
1265 self.tunnel_table.store_tunnel_path(
1266 &tunnel_id,
1267 packet.destination_hash,
1268 tunnel::TunnelPath {
1269 timestamp: now,
1270 received_from,
1271 hops: packet.hops,
1272 expires,
1273 random_blobs: blobs,
1274 packet_hash: packet.packet_hash,
1275 },
1276 now,
1277 self.config.destination_timeout_secs,
1278 self.config.max_tunnel_destinations_total,
1279 );
1280 }
1281
1282 self.path_states.remove(&packet.destination_hash);
1284
1285 if let Some(ann) = announce_entry {
1287 self.insert_announce_entry(packet.destination_hash, ann, now);
1288 }
1289
1290 actions.push(TransportAction::AnnounceReceived {
1292 destination_hash: packet.destination_hash,
1293 identity_hash: validated.identity_hash,
1294 public_key: validated.public_key,
1295 name_hash: validated.name_hash,
1296 random_hash: validated.random_hash,
1297 app_data: validated.app_data,
1298 hops: packet.hops,
1299 receiving_interface: iface,
1300 });
1301
1302 actions.push(TransportAction::PathUpdated {
1303 destination_hash: packet.destination_hash,
1304 hops: packet.hops,
1305 next_hop: received_from,
1306 interface: iface,
1307 });
1308
1309 if self.has_local_clients() {
1311 actions.push(TransportAction::ForwardToLocalClients {
1312 raw: packet.raw.clone(),
1313 exclude: Some(iface),
1314 });
1315 }
1316
1317 if let Some(pr_entry) = self.discovery_path_requests_waiting(&packet.destination_hash) {
1319 let entry = AnnounceEntry {
1321 timestamp: now,
1322 retransmit_timeout: now,
1323 retries: constants::PATHFINDER_R,
1324 received_from,
1325 hops: packet.hops,
1326 packet_raw: packet.raw.clone(),
1327 packet_data: packet.data.clone(),
1328 destination_hash: packet.destination_hash,
1329 context_flag: packet.flags.context_flag,
1330 local_rebroadcasts: 0,
1331 block_rebroadcasts: true,
1332 attached_interface: Some(pr_entry),
1333 };
1334 self.insert_announce_entry(packet.destination_hash, entry, now);
1335 }
1336 }
1337
1338 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1339 self.announce_sig_cache.contains(sig_cache_key)
1340 }
1341
1342 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1345 self.discovery_path_requests
1346 .remove(dest_hash)
1347 .map(|req| req.requesting_interface)
1348 }
1349
1350 fn process_inbound_proof(
1355 &mut self,
1356 packet: &RawPacket,
1357 iface: InterfaceId,
1358 _now: f64,
1359 actions: &mut Vec<TransportAction>,
1360 ) {
1361 if packet.context == constants::CONTEXT_LRPROOF {
1362 if (self.config.transport_enabled)
1364 && self.link_table.contains_key(&packet.destination_hash)
1365 {
1366 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1367 if let Some(entry) = link_entry {
1368 if packet.hops == entry.remaining_hops && iface == entry.next_hop_interface {
1369 let mut new_raw = Vec::new();
1372 new_raw.push(packet.raw[0]);
1373 new_raw.push(packet.hops);
1374 new_raw.extend_from_slice(&packet.raw[2..]);
1375
1376 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1378 le.validated = true;
1379 }
1380
1381 actions.push(TransportAction::LinkEstablished {
1382 link_id: packet.destination_hash,
1383 interface: entry.received_interface,
1384 });
1385
1386 actions.push(TransportAction::SendOnInterface {
1387 interface: entry.received_interface,
1388 raw: new_raw,
1389 });
1390 }
1391 }
1392 } else {
1393 actions.push(TransportAction::DeliverLocal {
1395 destination_hash: packet.destination_hash,
1396 raw: packet.raw.clone(),
1397 packet_hash: packet.packet_hash,
1398 receiving_interface: iface,
1399 });
1400 }
1401 } else {
1402 if self.config.transport_enabled {
1404 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1405 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1406 actions.push(action);
1407 }
1408 }
1409 }
1410
1411 actions.push(TransportAction::DeliverLocal {
1413 destination_hash: packet.destination_hash,
1414 raw: packet.raw.clone(),
1415 packet_hash: packet.packet_hash,
1416 receiving_interface: iface,
1417 });
1418 }
1419 }
1420
1421 pub fn handle_outbound(
1427 &mut self,
1428 packet: &RawPacket,
1429 dest_type: u8,
1430 attached_interface: Option<InterfaceId>,
1431 now: f64,
1432 ) -> Vec<TransportAction> {
1433 let actions = route_outbound(
1434 &self.path_table,
1435 &self.interfaces,
1436 &self.local_destinations,
1437 packet,
1438 dest_type,
1439 attached_interface,
1440 now,
1441 );
1442
1443 self.packet_hashlist.add(packet.packet_hash);
1445
1446 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1448 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1449 } else {
1450 actions
1451 }
1452 }
1453
1454 fn gate_announce_actions(
1456 &mut self,
1457 actions: Vec<TransportAction>,
1458 dest_hash: &[u8; 16],
1459 hops: u8,
1460 now: f64,
1461 ) -> Vec<TransportAction> {
1462 let mut result = Vec::new();
1463 for action in actions {
1464 match action {
1465 TransportAction::SendOnInterface { interface, raw } => {
1466 let (bitrate, announce_cap) =
1467 if let Some(info) = self.interfaces.get(&interface) {
1468 (info.bitrate, info.announce_cap)
1469 } else {
1470 (None, constants::ANNOUNCE_CAP)
1471 };
1472 if let Some(send_action) = self.announce_queues.gate_announce(
1473 interface,
1474 raw,
1475 *dest_hash,
1476 hops,
1477 now,
1478 now,
1479 bitrate,
1480 announce_cap,
1481 ) {
1482 result.push(send_action);
1483 }
1484 }
1486 other => result.push(other),
1487 }
1488 }
1489 result
1490 }
1491
1492 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1498 let mut actions = Vec::new();
1499
1500 if now > self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1502 self.cull_expired_announce_entries(now);
1503 self.enforce_announce_retention_cap(now);
1504 if let Some(ref identity_hash) = self.config.identity_hash {
1505 let ih = *identity_hash;
1506 let announce_actions = jobs::process_pending_announces(
1507 &mut self.announce_table,
1508 &mut self.held_announces,
1509 &ih,
1510 now,
1511 );
1512 let gated = self.gate_retransmit_actions(announce_actions, now);
1514 actions.extend(gated);
1515 }
1516 self.cull_expired_announce_entries(now);
1517 self.enforce_announce_retention_cap(now);
1518 self.announces_last_checked = now;
1519 }
1520
1521 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1523 actions.append(&mut queue_actions);
1524
1525 let ic_interfaces = self.ingress_control.interfaces_with_held();
1527 for iface_id in ic_interfaces {
1528 let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1529 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1530 None => continue,
1531 };
1532 if !ingress_config.enabled {
1533 continue;
1534 }
1535 if let Some(held) = self.ingress_control.process_held_announces(
1536 iface_id,
1537 &ingress_config,
1538 ia_freq,
1539 started,
1540 now,
1541 ) {
1542 let released_actions =
1543 self.handle_inbound(&held.raw, held.receiving_interface, now, rng);
1544 actions.extend(released_actions);
1545 }
1546 }
1547
1548 if now > self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1550 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
1551 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, now);
1552 let (_culled, link_closed_actions) =
1553 jobs::cull_link_table(&mut self.link_table, &self.interfaces, now);
1554 actions.extend(link_closed_actions);
1555 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1556 self.cull_blackholed(now);
1557 self.discovery_path_requests
1559 .retain(|_, req| now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1560 self.tunnel_table
1562 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1563 self.tunnel_table.cull(now);
1564 self.announce_sig_cache.cull(now);
1565 self.tables_last_culled = now;
1566 }
1567
1568 actions
1569 }
1570
1571 fn gate_retransmit_actions(
1576 &mut self,
1577 actions: Vec<TransportAction>,
1578 now: f64,
1579 ) -> Vec<TransportAction> {
1580 let mut result = Vec::new();
1581 for action in actions {
1582 match action {
1583 TransportAction::SendOnInterface { interface, raw } => {
1584 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1586 let (bitrate, announce_cap) =
1587 if let Some(info) = self.interfaces.get(&interface) {
1588 (info.bitrate, info.announce_cap)
1589 } else {
1590 (None, constants::ANNOUNCE_CAP)
1591 };
1592 if let Some(send_action) = self.announce_queues.gate_announce(
1593 interface,
1594 raw,
1595 dest_hash,
1596 hops,
1597 now,
1598 now,
1599 bitrate,
1600 announce_cap,
1601 ) {
1602 result.push(send_action);
1603 }
1604 }
1605 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1606 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1607 let iface_ids: Vec<(InterfaceId, Option<u64>, f64)> = self
1610 .interfaces
1611 .iter()
1612 .filter(|(_, info)| info.out_capable)
1613 .filter(|(id, _)| {
1614 if let Some(ref ex) = exclude {
1615 **id != *ex
1616 } else {
1617 true
1618 }
1619 })
1620 .filter(|(_, info)| {
1621 should_transmit_announce(
1622 info,
1623 &dest_hash,
1624 hops,
1625 &self.local_destinations,
1626 &self.path_table,
1627 &self.interfaces,
1628 )
1629 })
1630 .map(|(id, info)| (*id, info.bitrate, info.announce_cap))
1631 .collect();
1632
1633 for (iface_id, bitrate, announce_cap) in iface_ids {
1634 if let Some(send_action) = self.announce_queues.gate_announce(
1635 iface_id,
1636 raw.clone(),
1637 dest_hash,
1638 hops,
1639 now,
1640 now,
1641 bitrate,
1642 announce_cap,
1643 ) {
1644 result.push(send_action);
1645 }
1646 }
1647 }
1648 other => result.push(other),
1649 }
1650 }
1651 result
1652 }
1653
1654 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1656 if raw.len() < 18 {
1657 return ([0; 16], 0);
1658 }
1659 let header_type = (raw[0] >> 6) & 0x03;
1660 let hops = raw[1];
1661 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1662 let mut dest = [0u8; 16];
1664 dest.copy_from_slice(&raw[18..34]);
1665 (dest, hops)
1666 } else {
1667 let mut dest = [0u8; 16];
1669 dest.copy_from_slice(&raw[2..18]);
1670 (dest, hops)
1671 }
1672 }
1673
1674 pub fn handle_path_request(
1687 &mut self,
1688 data: &[u8],
1689 interface_id: InterfaceId,
1690 now: f64,
1691 ) -> Vec<TransportAction> {
1692 let mut actions = Vec::new();
1693
1694 if data.len() < 16 {
1695 return actions;
1696 }
1697
1698 let mut destination_hash = [0u8; 16];
1699 destination_hash.copy_from_slice(&data[..16]);
1700
1701 let _requesting_transport_id = if data.len() > 32 {
1703 let mut id = [0u8; 16];
1704 id.copy_from_slice(&data[16..32]);
1705 Some(id)
1706 } else {
1707 None
1708 };
1709
1710 let tag_bytes = if data.len() > 32 {
1712 Some(&data[32..])
1713 } else if data.len() > 16 {
1714 Some(&data[16..])
1715 } else {
1716 None
1717 };
1718
1719 if let Some(tag) = tag_bytes {
1720 let tag_len = tag.len().min(16);
1721 let mut unique_tag = [0u8; 32];
1722 unique_tag[..16].copy_from_slice(&destination_hash);
1723 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1724
1725 if !self.insert_discovery_pr_tag(unique_tag) {
1726 return actions; }
1728 } else {
1729 return actions; }
1731
1732 if self.local_destinations.contains_key(&destination_hash) {
1734 return actions;
1735 }
1736
1737 if self.config.transport_enabled && self.has_path(&destination_hash) {
1739 let path = self
1740 .path_table
1741 .get(&destination_hash)
1742 .unwrap()
1743 .primary()
1744 .unwrap()
1745 .clone();
1746
1747 if let Some(recv_info) = self.interfaces.get(&interface_id) {
1751 if recv_info.mode == constants::MODE_ROAMING
1752 && path.receiving_interface == interface_id
1753 {
1754 return actions;
1755 }
1756 }
1757
1758 if let Some(ref raw) = path.announce_raw {
1762 if let Some(existing) = self.announce_table.remove(&destination_hash) {
1764 self.insert_held_announce(destination_hash, existing, now);
1765 }
1766 let retransmit_timeout =
1767 if let Some(iface_info) = self.interfaces.get(&interface_id) {
1768 let base = now + constants::PATH_REQUEST_GRACE;
1769 if iface_info.mode == constants::MODE_ROAMING {
1770 base + constants::PATH_REQUEST_RG
1771 } else {
1772 base
1773 }
1774 } else {
1775 now + constants::PATH_REQUEST_GRACE
1776 };
1777
1778 let (packet_data, context_flag) = match RawPacket::unpack(raw) {
1779 Ok(parsed) => (parsed.data, parsed.flags.context_flag),
1780 Err(_) => {
1781 return actions;
1782 }
1783 };
1784
1785 let entry = AnnounceEntry {
1786 timestamp: now,
1787 retransmit_timeout,
1788 retries: constants::PATHFINDER_R,
1789 received_from: path.next_hop,
1790 hops: path.hops,
1791 packet_raw: raw.clone(),
1792 packet_data,
1793 destination_hash,
1794 context_flag,
1795 local_rebroadcasts: 0,
1796 block_rebroadcasts: true,
1797 attached_interface: Some(interface_id),
1798 };
1799
1800 self.insert_announce_entry(destination_hash, entry, now);
1801 }
1802 } else if self.config.transport_enabled {
1803 let should_discover = self
1805 .interfaces
1806 .get(&interface_id)
1807 .map(|info| constants::DISCOVER_PATHS_FOR.contains(&info.mode))
1808 .unwrap_or(false);
1809
1810 if should_discover {
1811 self.discovery_path_requests.insert(
1813 destination_hash,
1814 DiscoveryPathRequest {
1815 timestamp: now,
1816 requesting_interface: interface_id,
1817 },
1818 );
1819
1820 for (_, iface_info) in self.interfaces.iter() {
1822 if iface_info.id != interface_id && iface_info.out_capable {
1823 actions.push(TransportAction::SendOnInterface {
1824 interface: iface_info.id,
1825 raw: data.to_vec(),
1826 });
1827 }
1828 }
1829 }
1830 }
1831
1832 actions
1833 }
1834
1835 pub fn path_table_entries(&self) -> impl Iterator<Item = (&[u8; 16], &PathEntry)> {
1841 self.path_table
1842 .iter()
1843 .filter_map(|(k, ps)| ps.primary().map(|e| (k, e)))
1844 }
1845
1846 pub fn path_table_sets(&self) -> impl Iterator<Item = (&[u8; 16], &PathSet)> {
1848 self.path_table.iter()
1849 }
1850
1851 pub fn interface_count(&self) -> usize {
1853 self.interfaces.len()
1854 }
1855
1856 pub fn link_table_count(&self) -> usize {
1858 self.link_table.len()
1859 }
1860
1861 pub fn path_table_count(&self) -> usize {
1863 self.path_table.len()
1864 }
1865
1866 pub fn announce_table_count(&self) -> usize {
1868 self.announce_table.len()
1869 }
1870
1871 pub fn reverse_table_count(&self) -> usize {
1873 self.reverse_table.len()
1874 }
1875
1876 pub fn held_announces_count(&self) -> usize {
1878 self.held_announces.len()
1879 }
1880
1881 pub fn packet_hashlist_len(&self) -> usize {
1883 self.packet_hashlist.len()
1884 }
1885
1886 pub fn announce_sig_cache_len(&self) -> usize {
1888 self.announce_sig_cache.len()
1889 }
1890
1891 pub fn rate_limiter_count(&self) -> usize {
1893 self.rate_limiter.len()
1894 }
1895
1896 pub fn blackholed_count(&self) -> usize {
1898 self.blackholed_identities.len()
1899 }
1900
1901 pub fn tunnel_count(&self) -> usize {
1903 self.tunnel_table.len()
1904 }
1905
1906 pub fn discovery_pr_tags_count(&self) -> usize {
1908 self.discovery_pr_tags.len()
1909 }
1910
1911 pub fn discovery_path_requests_count(&self) -> usize {
1913 self.discovery_path_requests.len()
1914 }
1915
1916 pub fn announce_queue_count(&self) -> usize {
1918 self.announce_queues.queue_count()
1919 }
1920
1921 pub fn nonempty_announce_queue_count(&self) -> usize {
1923 self.announce_queues.nonempty_queue_count()
1924 }
1925
1926 pub fn queued_announce_count(&self) -> usize {
1928 self.announce_queues.total_queued_announces()
1929 }
1930
1931 pub fn queued_announce_bytes(&self) -> usize {
1933 self.announce_queues.total_queued_bytes()
1934 }
1935
1936 pub fn announce_queue_interface_cap_drop_count(&self) -> u64 {
1938 self.announce_queues.interface_cap_drop_count()
1939 }
1940
1941 pub fn local_destinations_count(&self) -> usize {
1943 self.local_destinations.len()
1944 }
1945
1946 pub fn rate_limiter(&self) -> &AnnounceRateLimiter {
1948 &self.rate_limiter
1949 }
1950
1951 pub fn interface_info(&self, id: &InterfaceId) -> Option<&InterfaceInfo> {
1953 self.interfaces.get(id)
1954 }
1955
1956 pub fn redirect_path(&mut self, dest_hash: &[u8; 16], interface: InterfaceId, now: f64) {
1959 if let Some(entry) = self
1960 .path_table
1961 .get_mut(dest_hash)
1962 .and_then(|ps| ps.primary_mut())
1963 {
1964 entry.receiving_interface = interface;
1965 entry.hops = 1;
1966 } else {
1967 self.upsert_path_destination(
1968 *dest_hash,
1969 PathEntry {
1970 timestamp: now,
1971 next_hop: [0u8; 16],
1972 hops: 1,
1973 expires: now + 3600.0,
1974 random_blobs: Vec::new(),
1975 receiving_interface: interface,
1976 packet_hash: [0u8; 32],
1977 announce_raw: None,
1978 },
1979 now,
1980 );
1981 }
1982 }
1983
1984 pub fn inject_path(&mut self, dest_hash: [u8; 16], entry: PathEntry) {
1986 self.upsert_path_destination(dest_hash, entry.clone(), entry.timestamp);
1987 }
1988
1989 pub fn drop_path(&mut self, dest_hash: &[u8; 16]) -> bool {
1991 self.path_table.remove(dest_hash).is_some()
1992 }
1993
1994 pub fn drop_all_via(&mut self, transport_hash: &[u8; 16]) -> usize {
1999 let mut removed = 0usize;
2000 for ps in self.path_table.values_mut() {
2001 let before = ps.len();
2002 ps.retain(|entry| &entry.next_hop != transport_hash);
2003 removed += before - ps.len();
2004 }
2005 self.path_table.retain(|_, ps| !ps.is_empty());
2006 removed
2007 }
2008
2009 pub fn drop_paths_for_interface(&mut self, interface: InterfaceId) -> usize {
2011 let mut removed = 0usize;
2012 let mut cleared_destinations = Vec::new();
2013 for (dest_hash, ps) in self.path_table.iter_mut() {
2014 let before = ps.len();
2015 ps.retain(|entry| entry.receiving_interface != interface);
2016 if ps.is_empty() {
2017 cleared_destinations.push(*dest_hash);
2018 }
2019 removed += before - ps.len();
2020 }
2021 self.path_table.retain(|_, ps| !ps.is_empty());
2022 for dest_hash in cleared_destinations {
2023 self.path_states.remove(&dest_hash);
2024 }
2025 removed
2026 }
2027
2028 pub fn drop_reverse_for_interface(&mut self, interface: InterfaceId) -> usize {
2030 let before = self.reverse_table.len();
2031 self.reverse_table.retain(|_, entry| {
2032 entry.receiving_interface != interface && entry.outbound_interface != interface
2033 });
2034 before - self.reverse_table.len()
2035 }
2036
2037 pub fn drop_links_for_interface(&mut self, interface: InterfaceId) -> usize {
2039 let before = self.link_table.len();
2040 self.link_table.retain(|_, entry| {
2041 entry.next_hop_interface != interface && entry.received_interface != interface
2042 });
2043 before - self.link_table.len()
2044 }
2045
2046 pub fn drop_announce_queues(&mut self) {
2048 self.announce_table.clear();
2049 self.held_announces.clear();
2050 self.announce_queues = AnnounceQueues::new(self.config.announce_queue_max_interfaces);
2051 self.ingress_control.clear();
2052 }
2053
2054 pub fn identity_hash(&self) -> Option<&[u8; 16]> {
2056 self.config.identity_hash.as_ref()
2057 }
2058
2059 pub fn transport_enabled(&self) -> bool {
2061 self.config.transport_enabled
2062 }
2063
2064 pub fn config(&self) -> &TransportConfig {
2066 &self.config
2067 }
2068
2069 pub fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
2070 self.config.packet_hashlist_max_entries = max_entries;
2071 self.packet_hashlist = PacketHashlist::new(max_entries);
2072 }
2073
2074 pub fn get_path_table(&self, max_hops: Option<u8>) -> Vec<PathTableRow> {
2078 let mut result = Vec::new();
2079 for (dest_hash, ps) in self.path_table.iter() {
2080 if let Some(entry) = ps.primary() {
2081 if let Some(max) = max_hops {
2082 if entry.hops > max {
2083 continue;
2084 }
2085 }
2086 let iface_name = self
2087 .interfaces
2088 .get(&entry.receiving_interface)
2089 .map(|i| i.name.clone())
2090 .unwrap_or_else(|| {
2091 alloc::format!("Interface({})", entry.receiving_interface.0)
2092 });
2093 result.push((
2094 *dest_hash,
2095 entry.timestamp,
2096 entry.next_hop,
2097 entry.hops,
2098 entry.expires,
2099 iface_name,
2100 ));
2101 }
2102 }
2103 result
2104 }
2105
2106 pub fn get_rate_table(&self) -> Vec<RateTableRow> {
2109 self.rate_limiter
2110 .entries()
2111 .map(|(hash, entry)| {
2112 (
2113 *hash,
2114 entry.last,
2115 entry.rate_violations,
2116 entry.blocked_until,
2117 entry.timestamps.clone(),
2118 )
2119 })
2120 .collect()
2121 }
2122
2123 pub fn get_blackholed(&self) -> Vec<([u8; 16], f64, f64, Option<alloc::string::String>)> {
2126 self.blackholed_entries()
2127 .map(|(hash, entry)| (*hash, entry.created, entry.expires, entry.reason.clone()))
2128 .collect()
2129 }
2130
2131 pub fn active_destination_hashes(&self) -> alloc::collections::BTreeSet<[u8; 16]> {
2137 self.path_table.keys().copied().collect()
2138 }
2139
2140 pub fn path_destination_cap_evict_count(&self) -> usize {
2141 self.path_destination_cap_evict_count
2142 }
2143
2144 pub fn active_packet_hashes(&self) -> Vec<[u8; 32]> {
2146 self.path_table
2147 .values()
2148 .flat_map(|ps| ps.iter().map(|p| p.packet_hash))
2149 .collect()
2150 }
2151
2152 pub fn cull_rate_limiter(
2155 &mut self,
2156 active: &alloc::collections::BTreeSet<[u8; 16]>,
2157 now: f64,
2158 ttl_secs: f64,
2159 ) -> usize {
2160 self.rate_limiter.cull_stale(active, now, ttl_secs)
2161 }
2162
2163 pub fn update_interface_freq(&mut self, id: InterfaceId, ia_freq: f64) {
2169 if let Some(info) = self.interfaces.get_mut(&id) {
2170 info.ia_freq = ia_freq;
2171 }
2172 }
2173
2174 pub fn held_announce_count(&self, interface: &InterfaceId) -> usize {
2176 self.ingress_control.held_count(interface)
2177 }
2178
2179 #[cfg(test)]
2184 #[allow(dead_code)]
2185 pub(crate) fn path_table(&self) -> &BTreeMap<[u8; 16], PathSet> {
2186 &self.path_table
2187 }
2188
2189 #[cfg(test)]
2190 #[allow(dead_code)]
2191 pub(crate) fn announce_table(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2192 &self.announce_table
2193 }
2194
2195 #[cfg(test)]
2196 #[allow(dead_code)]
2197 pub(crate) fn held_announces(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2198 &self.held_announces
2199 }
2200
2201 #[cfg(test)]
2202 #[allow(dead_code)]
2203 pub(crate) fn announce_retained_bytes(&self) -> usize {
2204 self.announce_retained_bytes_total()
2205 }
2206
2207 #[cfg(test)]
2208 #[allow(dead_code)]
2209 pub(crate) fn reverse_table(&self) -> &BTreeMap<[u8; 16], tables::ReverseEntry> {
2210 &self.reverse_table
2211 }
2212
2213 #[cfg(test)]
2214 #[allow(dead_code)]
2215 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
2216 &self.link_table
2217 }
2218}
2219
2220#[cfg(test)]
2221mod tests {
2222 use super::*;
2223 use crate::packet::PacketFlags;
2224
2225 fn make_config(transport_enabled: bool) -> TransportConfig {
2226 TransportConfig {
2227 transport_enabled,
2228 identity_hash: if transport_enabled {
2229 Some([0x42; 16])
2230 } else {
2231 None
2232 },
2233 prefer_shorter_path: false,
2234 max_paths_per_destination: 1,
2235 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
2236 max_discovery_pr_tags: constants::MAX_PR_TAGS,
2237 max_path_destinations: usize::MAX,
2238 max_tunnel_destinations_total: usize::MAX,
2239 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
2240 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
2241 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
2242 announce_sig_cache_enabled: true,
2243 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
2244 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
2245 announce_queue_max_entries: 256,
2246 announce_queue_max_interfaces: 1024,
2247 }
2248 }
2249
2250 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
2251 InterfaceInfo {
2252 id: InterfaceId(id),
2253 name: String::from("test"),
2254 mode,
2255 out_capable: true,
2256 in_capable: true,
2257 bitrate: None,
2258 announce_rate_target: None,
2259 announce_rate_grace: 0,
2260 announce_rate_penalty: 0.0,
2261 announce_cap: constants::ANNOUNCE_CAP,
2262 is_local_client: false,
2263 wants_tunnel: false,
2264 tunnel_id: None,
2265 mtu: constants::MTU as u32,
2266 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2267 ia_freq: 0.0,
2268 started: 0.0,
2269 }
2270 }
2271
2272 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
2273 AnnounceEntry {
2274 timestamp,
2275 retransmit_timeout: timestamp,
2276 retries: 0,
2277 received_from: [0xAA; 16],
2278 hops: 2,
2279 packet_raw: vec![0x01; fill_len],
2280 packet_data: vec![0x02; fill_len],
2281 destination_hash: dest_hash,
2282 context_flag: 0,
2283 local_rebroadcasts: 0,
2284 block_rebroadcasts: false,
2285 attached_interface: None,
2286 }
2287 }
2288
2289 fn make_path_entry(
2290 timestamp: f64,
2291 hops: u8,
2292 receiving_interface: InterfaceId,
2293 next_hop: [u8; 16],
2294 ) -> PathEntry {
2295 PathEntry {
2296 timestamp,
2297 next_hop,
2298 hops,
2299 expires: timestamp + 10_000.0,
2300 random_blobs: Vec::new(),
2301 receiving_interface,
2302 packet_hash: [0; 32],
2303 announce_raw: None,
2304 }
2305 }
2306
2307 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
2308 let mut unique_tag = [0u8; 32];
2309 let tag_len = tag.len().min(16);
2310 unique_tag[..16].copy_from_slice(&dest_hash);
2311 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
2312 unique_tag
2313 }
2314
2315 #[test]
2316 fn test_empty_engine() {
2317 let engine = TransportEngine::new(make_config(false));
2318 assert!(!engine.has_path(&[0; 16]));
2319 assert!(engine.hops_to(&[0; 16]).is_none());
2320 assert!(engine.next_hop(&[0; 16]).is_none());
2321 }
2322
2323 #[test]
2324 fn test_register_deregister_interface() {
2325 let mut engine = TransportEngine::new(make_config(false));
2326 engine.register_interface(make_interface(1, constants::MODE_FULL));
2327 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
2328
2329 engine.deregister_interface(InterfaceId(1));
2330 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
2331 }
2332
2333 #[test]
2334 fn test_deregister_interface_removes_announce_queue_state() {
2335 let mut engine = TransportEngine::new(make_config(false));
2336 engine.register_interface(make_interface(1, constants::MODE_FULL));
2337
2338 let _ = engine.announce_queues.gate_announce(
2339 InterfaceId(1),
2340 vec![0x01; 100],
2341 [0xAA; 16],
2342 2,
2343 0.0,
2344 0.0,
2345 Some(1000),
2346 constants::ANNOUNCE_CAP,
2347 );
2348 let _ = engine.announce_queues.gate_announce(
2349 InterfaceId(1),
2350 vec![0x02; 100],
2351 [0xBB; 16],
2352 3,
2353 0.0,
2354 0.0,
2355 Some(1000),
2356 constants::ANNOUNCE_CAP,
2357 );
2358 assert_eq!(engine.announce_queue_count(), 1);
2359
2360 engine.deregister_interface(InterfaceId(1));
2361 assert_eq!(engine.announce_queue_count(), 0);
2362 }
2363
2364 #[test]
2365 fn test_deregister_interface_preserves_other_announce_queues() {
2366 let mut engine = TransportEngine::new(make_config(false));
2367 engine.register_interface(make_interface(1, constants::MODE_FULL));
2368 engine.register_interface(make_interface(2, constants::MODE_FULL));
2369
2370 let _ = engine.announce_queues.gate_announce(
2371 InterfaceId(1),
2372 vec![0x01; 100],
2373 [0xAA; 16],
2374 2,
2375 0.0,
2376 0.0,
2377 Some(1000),
2378 constants::ANNOUNCE_CAP,
2379 );
2380 let _ = engine.announce_queues.gate_announce(
2381 InterfaceId(1),
2382 vec![0x02; 100],
2383 [0xAB; 16],
2384 3,
2385 0.0,
2386 0.0,
2387 Some(1000),
2388 constants::ANNOUNCE_CAP,
2389 );
2390 let _ = engine.announce_queues.gate_announce(
2391 InterfaceId(2),
2392 vec![0x03; 100],
2393 [0xBA; 16],
2394 2,
2395 0.0,
2396 0.0,
2397 Some(1000),
2398 constants::ANNOUNCE_CAP,
2399 );
2400 let _ = engine.announce_queues.gate_announce(
2401 InterfaceId(2),
2402 vec![0x04; 100],
2403 [0xBB; 16],
2404 3,
2405 0.0,
2406 0.0,
2407 Some(1000),
2408 constants::ANNOUNCE_CAP,
2409 );
2410
2411 engine.deregister_interface(InterfaceId(1));
2412 assert_eq!(engine.announce_queue_count(), 1);
2413 assert_eq!(engine.nonempty_announce_queue_count(), 1);
2414 }
2415
2416 #[test]
2417 fn test_register_deregister_destination() {
2418 let mut engine = TransportEngine::new(make_config(false));
2419 let dest = [0x11; 16];
2420 engine.register_destination(dest, constants::DESTINATION_SINGLE);
2421 assert!(engine.local_destinations.contains_key(&dest));
2422
2423 engine.deregister_destination(&dest);
2424 assert!(!engine.local_destinations.contains_key(&dest));
2425 }
2426
2427 #[test]
2428 fn test_path_state() {
2429 let mut engine = TransportEngine::new(make_config(false));
2430 let dest = [0x22; 16];
2431
2432 assert!(!engine.path_is_unresponsive(&dest));
2433
2434 engine.mark_path_unresponsive(&dest, None);
2435 assert!(engine.path_is_unresponsive(&dest));
2436
2437 engine.mark_path_responsive(&dest);
2438 assert!(!engine.path_is_unresponsive(&dest));
2439 }
2440
2441 #[test]
2442 fn test_boundary_exempts_unresponsive() {
2443 let mut engine = TransportEngine::new(make_config(false));
2444 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2445 let dest = [0xB1; 16];
2446
2447 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2449 assert!(!engine.path_is_unresponsive(&dest));
2450 }
2451
2452 #[test]
2453 fn test_non_boundary_marks_unresponsive() {
2454 let mut engine = TransportEngine::new(make_config(false));
2455 engine.register_interface(make_interface(1, constants::MODE_FULL));
2456 let dest = [0xB2; 16];
2457
2458 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2460 assert!(engine.path_is_unresponsive(&dest));
2461 }
2462
2463 #[test]
2464 fn test_expire_path() {
2465 let mut engine = TransportEngine::new(make_config(false));
2466 let dest = [0x33; 16];
2467
2468 engine.path_table.insert(
2469 dest,
2470 PathSet::from_single(
2471 PathEntry {
2472 timestamp: 1000.0,
2473 next_hop: [0; 16],
2474 hops: 2,
2475 expires: 9999.0,
2476 random_blobs: Vec::new(),
2477 receiving_interface: InterfaceId(1),
2478 packet_hash: [0; 32],
2479 announce_raw: None,
2480 },
2481 1,
2482 ),
2483 );
2484
2485 assert!(engine.has_path(&dest));
2486 engine.expire_path(&dest);
2487 assert!(engine.has_path(&dest));
2489 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2490 }
2491
2492 #[test]
2493 fn test_link_table_operations() {
2494 let mut engine = TransportEngine::new(make_config(false));
2495 let link_id = [0x44; 16];
2496
2497 engine.register_link(
2498 link_id,
2499 LinkEntry {
2500 timestamp: 100.0,
2501 next_hop_transport_id: [0; 16],
2502 next_hop_interface: InterfaceId(1),
2503 remaining_hops: 3,
2504 received_interface: InterfaceId(2),
2505 taken_hops: 2,
2506 destination_hash: [0xAA; 16],
2507 validated: false,
2508 proof_timeout: 200.0,
2509 },
2510 );
2511
2512 assert!(engine.link_table.contains_key(&link_id));
2513 assert!(!engine.link_table[&link_id].validated);
2514
2515 engine.validate_link(&link_id);
2516 assert!(engine.link_table[&link_id].validated);
2517
2518 engine.remove_link(&link_id);
2519 assert!(!engine.link_table.contains_key(&link_id));
2520 }
2521
2522 #[test]
2523 fn test_packet_filter_drops_plain_announce() {
2524 let engine = TransportEngine::new(make_config(false));
2525 let flags = PacketFlags {
2526 header_type: constants::HEADER_1,
2527 context_flag: constants::FLAG_UNSET,
2528 transport_type: constants::TRANSPORT_BROADCAST,
2529 destination_type: constants::DESTINATION_PLAIN,
2530 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2531 };
2532 let packet =
2533 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2534 assert!(!engine.packet_filter(&packet));
2535 }
2536
2537 #[test]
2538 fn test_packet_filter_allows_keepalive() {
2539 let engine = TransportEngine::new(make_config(false));
2540 let flags = PacketFlags {
2541 header_type: constants::HEADER_1,
2542 context_flag: constants::FLAG_UNSET,
2543 transport_type: constants::TRANSPORT_BROADCAST,
2544 destination_type: constants::DESTINATION_SINGLE,
2545 packet_type: constants::PACKET_TYPE_DATA,
2546 };
2547 let packet = RawPacket::pack(
2548 flags,
2549 0,
2550 &[0; 16],
2551 None,
2552 constants::CONTEXT_KEEPALIVE,
2553 b"test",
2554 )
2555 .unwrap();
2556 assert!(engine.packet_filter(&packet));
2557 }
2558
2559 #[test]
2560 fn test_packet_filter_drops_high_hop_plain() {
2561 let engine = TransportEngine::new(make_config(false));
2562 let flags = PacketFlags {
2563 header_type: constants::HEADER_1,
2564 context_flag: constants::FLAG_UNSET,
2565 transport_type: constants::TRANSPORT_BROADCAST,
2566 destination_type: constants::DESTINATION_PLAIN,
2567 packet_type: constants::PACKET_TYPE_DATA,
2568 };
2569 let mut packet =
2570 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2571 packet.hops = 2;
2572 assert!(!engine.packet_filter(&packet));
2573 }
2574
2575 #[test]
2576 fn test_packet_filter_allows_duplicate_single_announce() {
2577 let mut engine = TransportEngine::new(make_config(false));
2578 let flags = PacketFlags {
2579 header_type: constants::HEADER_1,
2580 context_flag: constants::FLAG_UNSET,
2581 transport_type: constants::TRANSPORT_BROADCAST,
2582 destination_type: constants::DESTINATION_SINGLE,
2583 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2584 };
2585 let packet = RawPacket::pack(
2586 flags,
2587 0,
2588 &[0; 16],
2589 None,
2590 constants::CONTEXT_NONE,
2591 &[0xAA; 64],
2592 )
2593 .unwrap();
2594
2595 engine.packet_hashlist.add(packet.packet_hash);
2597
2598 assert!(engine.packet_filter(&packet));
2600 }
2601
2602 #[test]
2603 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2604 let mut engine = TransportEngine::new(make_config(false));
2605 engine.packet_hashlist = PacketHashlist::new(2);
2606
2607 let make_packet = |seed: u8| {
2608 let flags = PacketFlags {
2609 header_type: constants::HEADER_1,
2610 context_flag: constants::FLAG_UNSET,
2611 transport_type: constants::TRANSPORT_BROADCAST,
2612 destination_type: constants::DESTINATION_SINGLE,
2613 packet_type: constants::PACKET_TYPE_DATA,
2614 };
2615 RawPacket::pack(
2616 flags,
2617 0,
2618 &[seed; 16],
2619 None,
2620 constants::CONTEXT_NONE,
2621 &[seed; 4],
2622 )
2623 .unwrap()
2624 };
2625
2626 let packet1 = make_packet(1);
2627 let packet2 = make_packet(2);
2628 let packet3 = make_packet(3);
2629
2630 engine.packet_hashlist.add(packet1.packet_hash);
2631 engine.packet_hashlist.add(packet2.packet_hash);
2632 assert!(!engine.packet_filter(&packet1));
2633
2634 engine.packet_hashlist.add(packet3.packet_hash);
2635
2636 assert!(engine.packet_filter(&packet1));
2637 assert!(!engine.packet_filter(&packet2));
2638 assert!(!engine.packet_filter(&packet3));
2639 }
2640
2641 #[test]
2642 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2643 let mut engine = TransportEngine::new(make_config(false));
2644 engine.packet_hashlist = PacketHashlist::new(2);
2645
2646 let make_packet = |seed: u8| {
2647 let flags = PacketFlags {
2648 header_type: constants::HEADER_1,
2649 context_flag: constants::FLAG_UNSET,
2650 transport_type: constants::TRANSPORT_BROADCAST,
2651 destination_type: constants::DESTINATION_SINGLE,
2652 packet_type: constants::PACKET_TYPE_DATA,
2653 };
2654 RawPacket::pack(
2655 flags,
2656 0,
2657 &[seed; 16],
2658 None,
2659 constants::CONTEXT_NONE,
2660 &[seed; 4],
2661 )
2662 .unwrap()
2663 };
2664
2665 let packet1 = make_packet(1);
2666 let packet2 = make_packet(2);
2667 let packet3 = make_packet(3);
2668
2669 engine.packet_hashlist.add(packet1.packet_hash);
2670 engine.packet_hashlist.add(packet2.packet_hash);
2671 engine.packet_hashlist.add(packet2.packet_hash);
2672 engine.packet_hashlist.add(packet3.packet_hash);
2673
2674 assert!(engine.packet_filter(&packet1));
2675 assert!(!engine.packet_filter(&packet2));
2676 assert!(!engine.packet_filter(&packet3));
2677 }
2678
2679 #[test]
2680 fn test_tick_retransmits_announce() {
2681 let mut engine = TransportEngine::new(make_config(true));
2682 engine.register_interface(make_interface(1, constants::MODE_FULL));
2683
2684 let dest = [0x55; 16];
2685 engine.insert_announce_entry(
2686 dest,
2687 AnnounceEntry {
2688 timestamp: 190.0,
2689 retransmit_timeout: 100.0, retries: 0,
2691 received_from: [0xAA; 16],
2692 hops: 2,
2693 packet_raw: vec![0x01, 0x02],
2694 packet_data: vec![0xCC; 10],
2695 destination_hash: dest,
2696 context_flag: 0,
2697 local_rebroadcasts: 0,
2698 block_rebroadcasts: false,
2699 attached_interface: None,
2700 },
2701 190.0,
2702 );
2703
2704 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2705 let actions = engine.tick(200.0, &mut rng);
2706
2707 assert!(!actions.is_empty());
2710 assert!(matches!(
2711 &actions[0],
2712 TransportAction::SendOnInterface { .. }
2713 ));
2714
2715 assert_eq!(engine.announce_table[&dest].retries, 1);
2717 }
2718
2719 #[test]
2720 fn test_tick_culls_expired_announce_entries() {
2721 let mut config = make_config(true);
2722 config.announce_table_ttl_secs = 10.0;
2723 let mut engine = TransportEngine::new(config);
2724
2725 let dest1 = [0x61; 16];
2726 let dest2 = [0x62; 16];
2727 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2728 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2729
2730 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2731 let _ = engine.tick(111.0, &mut rng);
2732
2733 assert!(!engine.announce_table().contains_key(&dest1));
2734 assert!(!engine.held_announces().contains_key(&dest2));
2735 }
2736
2737 #[test]
2738 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2739 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2740 let mut config = make_config(true);
2741 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2742 * 2
2743 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2744 let max_bytes = config.announce_table_max_bytes;
2745 let mut engine = TransportEngine::new(config);
2746
2747 let held_dest = [0x71; 16];
2748 let active_dest = [0x72; 16];
2749 let newest_dest = [0x73; 16];
2750
2751 assert!(engine.insert_held_announce(
2752 held_dest,
2753 make_announce_entry(held_dest, 100.0, 32),
2754 100.0,
2755 ));
2756 assert!(engine.insert_announce_entry(
2757 active_dest,
2758 make_announce_entry(active_dest, 100.0, 32),
2759 100.0,
2760 ));
2761 assert!(engine.insert_announce_entry(
2762 newest_dest,
2763 make_announce_entry(newest_dest, 101.0, 32),
2764 101.0,
2765 ));
2766
2767 assert!(!engine.held_announces().contains_key(&held_dest));
2768 assert!(engine.announce_table().contains_key(&active_dest));
2769 assert!(engine.announce_table().contains_key(&newest_dest));
2770 assert!(engine.announce_retained_bytes() <= max_bytes);
2771 }
2772
2773 #[test]
2774 fn test_oversized_announce_entry_is_not_retained() {
2775 let mut config = make_config(true);
2776 config.announce_table_max_bytes = 200;
2777 let mut engine = TransportEngine::new(config);
2778 let dest = [0x81; 16];
2779
2780 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2781 assert!(!engine.announce_table().contains_key(&dest));
2782 assert_eq!(engine.announce_retained_bytes(), 0);
2783 }
2784
2785 #[test]
2786 fn test_blackhole_identity() {
2787 let mut engine = TransportEngine::new(make_config(false));
2788 let hash = [0xAA; 16];
2789 let now = 1000.0;
2790
2791 assert!(!engine.is_blackholed(&hash, now));
2792
2793 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2794 assert!(engine.is_blackholed(&hash, now));
2795 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
2798 assert!(!engine.is_blackholed(&hash, now));
2799 assert!(!engine.unblackhole_identity(&hash)); }
2801
2802 #[test]
2803 fn test_blackhole_with_duration() {
2804 let mut engine = TransportEngine::new(make_config(false));
2805 let hash = [0xBB; 16];
2806 let now = 1000.0;
2807
2808 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
2810 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
2813
2814 #[test]
2815 fn test_cull_blackholed() {
2816 let mut engine = TransportEngine::new(make_config(false));
2817 let hash1 = [0xCC; 16];
2818 let hash2 = [0xDD; 16];
2819 let now = 1000.0;
2820
2821 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
2827 assert!(engine.blackholed_identities.contains_key(&hash2));
2828 }
2829
2830 #[test]
2831 fn test_blackhole_blocks_announce() {
2832 use crate::announce::AnnounceData;
2833 use crate::destination::{destination_hash, name_hash};
2834
2835 let mut engine = TransportEngine::new(make_config(false));
2836 engine.register_interface(make_interface(1, constants::MODE_FULL));
2837
2838 let identity =
2839 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2840 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2841 let name_h = name_hash("test", &["app"]);
2842 let random_hash = [0x42u8; 10];
2843
2844 let (announce_data, _) =
2845 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2846
2847 let flags = PacketFlags {
2848 header_type: constants::HEADER_1,
2849 context_flag: constants::FLAG_UNSET,
2850 transport_type: constants::TRANSPORT_BROADCAST,
2851 destination_type: constants::DESTINATION_SINGLE,
2852 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2853 };
2854 let packet = RawPacket::pack(
2855 flags,
2856 0,
2857 &dest_hash,
2858 None,
2859 constants::CONTEXT_NONE,
2860 &announce_data,
2861 )
2862 .unwrap();
2863
2864 let now = 1000.0;
2866 engine.blackhole_identity(*identity.hash(), now, None, None);
2867
2868 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2869 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
2870
2871 assert!(actions
2873 .iter()
2874 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2875 assert!(actions
2876 .iter()
2877 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2878 }
2879
2880 #[test]
2881 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2882 use crate::announce::AnnounceData;
2883 use crate::destination::{destination_hash, name_hash};
2884 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2885
2886 let mut engine = TransportEngine::new(make_config(true));
2887 engine.register_interface(make_interface(1, constants::MODE_FULL));
2888
2889 let identity =
2890 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2891 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2892 let name_h = name_hash("async", &["announce"]);
2893 let random_hash = [0x44u8; 10];
2894 let (announce_data, _) =
2895 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2896
2897 let packet = RawPacket::pack(
2898 PacketFlags {
2899 header_type: constants::HEADER_2,
2900 context_flag: constants::FLAG_UNSET,
2901 transport_type: constants::TRANSPORT_TRANSPORT,
2902 destination_type: constants::DESTINATION_SINGLE,
2903 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2904 },
2905 3,
2906 &dest_hash,
2907 Some(&[0xBB; 16]),
2908 constants::CONTEXT_NONE,
2909 &announce_data,
2910 )
2911 .unwrap();
2912
2913 engine.announce_table.insert(
2914 dest_hash,
2915 AnnounceEntry {
2916 timestamp: 1000.0,
2917 retransmit_timeout: 2000.0,
2918 retries: constants::PATHFINDER_R,
2919 received_from: [0xBB; 16],
2920 hops: 2,
2921 packet_raw: packet.raw.clone(),
2922 packet_data: packet.data.clone(),
2923 destination_hash: dest_hash,
2924 context_flag: constants::FLAG_UNSET,
2925 local_rebroadcasts: 0,
2926 block_rebroadcasts: false,
2927 attached_interface: None,
2928 },
2929 );
2930
2931 let mut queue = AnnounceVerifyQueue::new(8);
2932 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2933 let actions = engine.handle_inbound_with_announce_queue(
2934 &packet.raw,
2935 InterfaceId(1),
2936 1000.0,
2937 &mut rng,
2938 Some(&mut queue),
2939 );
2940
2941 assert!(actions.is_empty());
2942 assert_eq!(queue.len(), 1);
2943 assert!(
2944 !engine.announce_table.contains_key(&dest_hash),
2945 "retransmit completion should clear announce_table before queueing"
2946 );
2947 }
2948
2949 #[test]
2950 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
2951 use crate::announce::AnnounceData;
2952 use crate::destination::{destination_hash, name_hash};
2953 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2954
2955 let mut engine = TransportEngine::new(make_config(false));
2956 engine.register_interface(make_interface(1, constants::MODE_FULL));
2957
2958 let identity =
2959 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
2960 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
2961 let name_h = name_hash("async", &["cache"]);
2962 let random_hash = [0x55u8; 10];
2963 let (announce_data, _) =
2964 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2965
2966 let packet = RawPacket::pack(
2967 PacketFlags {
2968 header_type: constants::HEADER_1,
2969 context_flag: constants::FLAG_UNSET,
2970 transport_type: constants::TRANSPORT_BROADCAST,
2971 destination_type: constants::DESTINATION_SINGLE,
2972 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2973 },
2974 0,
2975 &dest_hash,
2976 None,
2977 constants::CONTEXT_NONE,
2978 &announce_data,
2979 )
2980 .unwrap();
2981
2982 let mut queue = AnnounceVerifyQueue::new(8);
2983 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
2984 let actions = engine.handle_inbound_with_announce_queue(
2985 &packet.raw,
2986 InterfaceId(1),
2987 1000.0,
2988 &mut rng,
2989 Some(&mut queue),
2990 );
2991 assert!(actions.is_empty());
2992 assert_eq!(queue.len(), 1);
2993
2994 let mut batch = queue.take_pending(1000.0);
2995 assert_eq!(batch.len(), 1);
2996 let (key, pending) = batch.pop().unwrap();
2997
2998 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
2999 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
3000 let mut material = [0u8; 80];
3001 material[..16].copy_from_slice(&pending.packet.destination_hash);
3002 material[16..].copy_from_slice(&announce.signature);
3003 let sig_cache_key = hash::full_hash(&material);
3004
3005 let pending = queue.complete_success(&key).unwrap();
3006 let actions =
3007 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
3008 assert!(actions
3009 .iter()
3010 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
3011 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
3012
3013 let actions = engine.handle_inbound_with_announce_queue(
3014 &packet.raw,
3015 InterfaceId(1),
3016 1001.0,
3017 &mut rng,
3018 Some(&mut queue),
3019 );
3020 assert!(actions.is_empty());
3021 assert_eq!(queue.len(), 0);
3022 }
3023
3024 #[test]
3025 fn test_tick_culls_expired_path() {
3026 let mut engine = TransportEngine::new(make_config(false));
3027 engine.register_interface(make_interface(1, constants::MODE_FULL));
3028
3029 let dest = [0x66; 16];
3030 engine.path_table.insert(
3031 dest,
3032 PathSet::from_single(
3033 PathEntry {
3034 timestamp: 100.0,
3035 next_hop: [0; 16],
3036 hops: 2,
3037 expires: 200.0,
3038 random_blobs: Vec::new(),
3039 receiving_interface: InterfaceId(1),
3040 packet_hash: [0; 32],
3041 announce_raw: None,
3042 },
3043 1,
3044 ),
3045 );
3046
3047 assert!(engine.has_path(&dest));
3048
3049 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3050 engine.tick(300.0, &mut rng);
3052
3053 assert!(!engine.has_path(&dest));
3054 }
3055
3056 fn make_local_client_interface(id: u64) -> InterfaceInfo {
3061 InterfaceInfo {
3062 id: InterfaceId(id),
3063 name: String::from("local_client"),
3064 mode: constants::MODE_FULL,
3065 out_capable: true,
3066 in_capable: true,
3067 bitrate: None,
3068 announce_rate_target: None,
3069 announce_rate_grace: 0,
3070 announce_rate_penalty: 0.0,
3071 announce_cap: constants::ANNOUNCE_CAP,
3072 is_local_client: true,
3073 wants_tunnel: false,
3074 tunnel_id: None,
3075 mtu: constants::MTU as u32,
3076 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3077 ia_freq: 0.0,
3078 started: 0.0,
3079 }
3080 }
3081
3082 #[test]
3083 fn test_has_local_clients() {
3084 let mut engine = TransportEngine::new(make_config(false));
3085 assert!(!engine.has_local_clients());
3086
3087 engine.register_interface(make_interface(1, constants::MODE_FULL));
3088 assert!(!engine.has_local_clients());
3089
3090 engine.register_interface(make_local_client_interface(2));
3091 assert!(engine.has_local_clients());
3092
3093 engine.deregister_interface(InterfaceId(2));
3094 assert!(!engine.has_local_clients());
3095 }
3096
3097 #[test]
3098 fn test_local_client_hop_decrement() {
3099 let mut engine = TransportEngine::new(make_config(false));
3102 engine.register_interface(make_local_client_interface(1));
3103 engine.register_interface(make_interface(2, constants::MODE_FULL));
3104
3105 let dest = [0xAA; 16];
3107 engine.register_destination(dest, constants::DESTINATION_PLAIN);
3108
3109 let flags = PacketFlags {
3110 header_type: constants::HEADER_1,
3111 context_flag: constants::FLAG_UNSET,
3112 transport_type: constants::TRANSPORT_BROADCAST,
3113 destination_type: constants::DESTINATION_PLAIN,
3114 packet_type: constants::PACKET_TYPE_DATA,
3115 };
3116 let packet =
3118 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3119
3120 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3121 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3122
3123 let deliver = actions
3126 .iter()
3127 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
3128 assert!(deliver.is_some(), "Should deliver locally");
3129 }
3130
3131 #[test]
3132 fn test_plain_broadcast_from_local_client() {
3133 let mut engine = TransportEngine::new(make_config(false));
3135 engine.register_interface(make_local_client_interface(1));
3136 engine.register_interface(make_interface(2, constants::MODE_FULL));
3137
3138 let dest = [0xBB; 16];
3139 let flags = PacketFlags {
3140 header_type: constants::HEADER_1,
3141 context_flag: constants::FLAG_UNSET,
3142 transport_type: constants::TRANSPORT_BROADCAST,
3143 destination_type: constants::DESTINATION_PLAIN,
3144 packet_type: constants::PACKET_TYPE_DATA,
3145 };
3146 let packet =
3147 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3148
3149 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3150 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3151
3152 let forward = actions.iter().find(|a| {
3154 matches!(
3155 a,
3156 TransportAction::ForwardPlainBroadcast {
3157 to_local: false,
3158 ..
3159 }
3160 )
3161 });
3162 assert!(forward.is_some(), "Should forward to external interfaces");
3163 }
3164
3165 #[test]
3166 fn test_plain_broadcast_from_external() {
3167 let mut engine = TransportEngine::new(make_config(false));
3169 engine.register_interface(make_local_client_interface(1));
3170 engine.register_interface(make_interface(2, constants::MODE_FULL));
3171
3172 let dest = [0xCC; 16];
3173 let flags = PacketFlags {
3174 header_type: constants::HEADER_1,
3175 context_flag: constants::FLAG_UNSET,
3176 transport_type: constants::TRANSPORT_BROADCAST,
3177 destination_type: constants::DESTINATION_PLAIN,
3178 packet_type: constants::PACKET_TYPE_DATA,
3179 };
3180 let packet =
3181 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3182
3183 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3184 let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
3185
3186 let forward = actions.iter().find(|a| {
3188 matches!(
3189 a,
3190 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3191 )
3192 });
3193 assert!(forward.is_some(), "Should forward to local clients");
3194 }
3195
3196 #[test]
3197 fn test_no_plain_broadcast_bridging_without_local_clients() {
3198 let mut engine = TransportEngine::new(make_config(false));
3200 engine.register_interface(make_interface(1, constants::MODE_FULL));
3201 engine.register_interface(make_interface(2, constants::MODE_FULL));
3202
3203 let dest = [0xDD; 16];
3204 let flags = PacketFlags {
3205 header_type: constants::HEADER_1,
3206 context_flag: constants::FLAG_UNSET,
3207 transport_type: constants::TRANSPORT_BROADCAST,
3208 destination_type: constants::DESTINATION_PLAIN,
3209 packet_type: constants::PACKET_TYPE_DATA,
3210 };
3211 let packet =
3212 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3213
3214 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3215 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3216
3217 let has_forward = actions
3219 .iter()
3220 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3221 assert!(!has_forward, "No bridging without local clients");
3222 }
3223
3224 #[test]
3225 fn test_announce_forwarded_to_local_clients() {
3226 use crate::announce::AnnounceData;
3227 use crate::destination::{destination_hash, name_hash};
3228
3229 let mut engine = TransportEngine::new(make_config(false));
3230 engine.register_interface(make_interface(1, constants::MODE_FULL));
3231 engine.register_interface(make_local_client_interface(2));
3232
3233 let identity =
3234 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3235 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3236 let name_h = name_hash("test", &["fwd"]);
3237 let random_hash = [0x42u8; 10];
3238
3239 let (announce_data, _) =
3240 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3241
3242 let flags = PacketFlags {
3243 header_type: constants::HEADER_1,
3244 context_flag: constants::FLAG_UNSET,
3245 transport_type: constants::TRANSPORT_BROADCAST,
3246 destination_type: constants::DESTINATION_SINGLE,
3247 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3248 };
3249 let packet = RawPacket::pack(
3250 flags,
3251 0,
3252 &dest_hash,
3253 None,
3254 constants::CONTEXT_NONE,
3255 &announce_data,
3256 )
3257 .unwrap();
3258
3259 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3260 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3261
3262 let forward = actions
3264 .iter()
3265 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3266 assert!(
3267 forward.is_some(),
3268 "Should forward announce to local clients"
3269 );
3270
3271 match forward.unwrap() {
3273 TransportAction::ForwardToLocalClients { exclude, .. } => {
3274 assert_eq!(*exclude, Some(InterfaceId(1)));
3275 }
3276 _ => unreachable!(),
3277 }
3278 }
3279
3280 #[test]
3281 fn test_no_announce_forward_without_local_clients() {
3282 use crate::announce::AnnounceData;
3283 use crate::destination::{destination_hash, name_hash};
3284
3285 let mut engine = TransportEngine::new(make_config(false));
3286 engine.register_interface(make_interface(1, constants::MODE_FULL));
3287
3288 let identity =
3289 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3290 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3291 let name_h = name_hash("test", &["nofwd"]);
3292 let random_hash = [0x42u8; 10];
3293
3294 let (announce_data, _) =
3295 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3296
3297 let flags = PacketFlags {
3298 header_type: constants::HEADER_1,
3299 context_flag: constants::FLAG_UNSET,
3300 transport_type: constants::TRANSPORT_BROADCAST,
3301 destination_type: constants::DESTINATION_SINGLE,
3302 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3303 };
3304 let packet = RawPacket::pack(
3305 flags,
3306 0,
3307 &dest_hash,
3308 None,
3309 constants::CONTEXT_NONE,
3310 &announce_data,
3311 )
3312 .unwrap();
3313
3314 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3315 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3316
3317 let has_forward = actions
3319 .iter()
3320 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3321 assert!(!has_forward, "No forward without local clients");
3322 }
3323
3324 #[test]
3325 fn test_local_client_exclude_from_forward() {
3326 use crate::announce::AnnounceData;
3327 use crate::destination::{destination_hash, name_hash};
3328
3329 let mut engine = TransportEngine::new(make_config(false));
3330 engine.register_interface(make_local_client_interface(1));
3331 engine.register_interface(make_local_client_interface(2));
3332
3333 let identity =
3334 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3335 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3336 let name_h = name_hash("test", &["excl"]);
3337 let random_hash = [0x42u8; 10];
3338
3339 let (announce_data, _) =
3340 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3341
3342 let flags = PacketFlags {
3343 header_type: constants::HEADER_1,
3344 context_flag: constants::FLAG_UNSET,
3345 transport_type: constants::TRANSPORT_BROADCAST,
3346 destination_type: constants::DESTINATION_SINGLE,
3347 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3348 };
3349 let packet = RawPacket::pack(
3350 flags,
3351 0,
3352 &dest_hash,
3353 None,
3354 constants::CONTEXT_NONE,
3355 &announce_data,
3356 )
3357 .unwrap();
3358
3359 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3360 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3362
3363 let forward = actions
3365 .iter()
3366 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3367 assert!(forward.is_some());
3368 match forward.unwrap() {
3369 TransportAction::ForwardToLocalClients { exclude, .. } => {
3370 assert_eq!(*exclude, Some(InterfaceId(1)));
3371 }
3372 _ => unreachable!(),
3373 }
3374 }
3375
3376 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3381 InterfaceInfo {
3382 id: InterfaceId(id),
3383 name: String::from("tunnel_iface"),
3384 mode: constants::MODE_FULL,
3385 out_capable: true,
3386 in_capable: true,
3387 bitrate: None,
3388 announce_rate_target: None,
3389 announce_rate_grace: 0,
3390 announce_rate_penalty: 0.0,
3391 announce_cap: constants::ANNOUNCE_CAP,
3392 is_local_client: false,
3393 wants_tunnel: true,
3394 tunnel_id: None,
3395 mtu: constants::MTU as u32,
3396 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3397 ia_freq: 0.0,
3398 started: 0.0,
3399 }
3400 }
3401
3402 #[test]
3403 fn test_handle_tunnel_new() {
3404 let mut engine = TransportEngine::new(make_config(true));
3405 engine.register_interface(make_tunnel_interface(1));
3406
3407 let tunnel_id = [0xAA; 32];
3408 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3409
3410 assert!(actions
3412 .iter()
3413 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3414
3415 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3417 assert_eq!(info.tunnel_id, Some(tunnel_id));
3418
3419 assert_eq!(engine.tunnel_table().len(), 1);
3421 }
3422
3423 #[test]
3424 fn test_announce_stores_tunnel_path() {
3425 use crate::announce::AnnounceData;
3426 use crate::destination::{destination_hash, name_hash};
3427
3428 let mut engine = TransportEngine::new(make_config(false));
3429 let mut iface = make_tunnel_interface(1);
3430 let tunnel_id = [0xBB; 32];
3431 iface.tunnel_id = Some(tunnel_id);
3432 engine.register_interface(iface);
3433
3434 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3436
3437 let identity =
3439 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3440 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3441 let name_h = name_hash("test", &["tunnel"]);
3442 let random_hash = [0x42u8; 10];
3443
3444 let (announce_data, _) =
3445 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3446
3447 let flags = PacketFlags {
3448 header_type: constants::HEADER_1,
3449 context_flag: constants::FLAG_UNSET,
3450 transport_type: constants::TRANSPORT_BROADCAST,
3451 destination_type: constants::DESTINATION_SINGLE,
3452 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3453 };
3454 let packet = RawPacket::pack(
3455 flags,
3456 0,
3457 &dest_hash,
3458 None,
3459 constants::CONTEXT_NONE,
3460 &announce_data,
3461 )
3462 .unwrap();
3463
3464 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3465 engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3466
3467 assert!(engine.has_path(&dest_hash));
3469
3470 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3472 assert_eq!(tunnel.paths.len(), 1);
3473 assert!(tunnel.paths.contains_key(&dest_hash));
3474 }
3475
3476 #[test]
3477 fn test_tunnel_reattach_restores_paths() {
3478 let mut engine = TransportEngine::new(make_config(true));
3479 engine.register_interface(make_tunnel_interface(1));
3480
3481 let tunnel_id = [0xCC; 32];
3482 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3483
3484 let dest = [0xDD; 16];
3486 engine.tunnel_table.store_tunnel_path(
3487 &tunnel_id,
3488 dest,
3489 tunnel::TunnelPath {
3490 timestamp: 1000.0,
3491 received_from: [0xEE; 16],
3492 hops: 3,
3493 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3494 random_blobs: Vec::new(),
3495 packet_hash: [0xFF; 32],
3496 },
3497 1000.0,
3498 constants::DESTINATION_TIMEOUT,
3499 usize::MAX,
3500 );
3501
3502 engine.void_tunnel_interface(&tunnel_id);
3504
3505 engine.path_table.remove(&dest);
3507 assert!(!engine.has_path(&dest));
3508
3509 engine.register_interface(make_interface(2, constants::MODE_FULL));
3511 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3512
3513 assert!(engine.has_path(&dest));
3515 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3516 assert_eq!(path.hops, 3);
3517 assert_eq!(path.receiving_interface, InterfaceId(2));
3518
3519 assert!(actions
3521 .iter()
3522 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3523 }
3524
3525 #[test]
3526 fn test_void_tunnel_interface() {
3527 let mut engine = TransportEngine::new(make_config(true));
3528 engine.register_interface(make_tunnel_interface(1));
3529
3530 let tunnel_id = [0xDD; 32];
3531 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3532
3533 assert_eq!(
3535 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3536 Some(InterfaceId(1))
3537 );
3538
3539 engine.void_tunnel_interface(&tunnel_id);
3540
3541 assert_eq!(engine.tunnel_table().len(), 1);
3543 assert_eq!(
3544 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3545 None
3546 );
3547 }
3548
3549 #[test]
3550 fn test_tick_culls_tunnels() {
3551 let mut engine = TransportEngine::new(make_config(true));
3552 engine.register_interface(make_tunnel_interface(1));
3553
3554 let tunnel_id = [0xEE; 32];
3555 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3556 assert_eq!(engine.tunnel_table().len(), 1);
3557
3558 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3559
3560 engine.tick(
3562 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3563 &mut rng,
3564 );
3565
3566 assert_eq!(engine.tunnel_table().len(), 0);
3567 }
3568
3569 #[test]
3570 fn test_synthesize_tunnel() {
3571 let mut engine = TransportEngine::new(make_config(true));
3572 engine.register_interface(make_tunnel_interface(1));
3573
3574 let identity =
3575 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3576 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3577
3578 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3579
3580 assert_eq!(actions.len(), 1);
3582 match &actions[0] {
3583 TransportAction::TunnelSynthesize {
3584 interface,
3585 data,
3586 dest_hash,
3587 } => {
3588 assert_eq!(*interface, InterfaceId(1));
3589 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3590 let expected_dest = crate::destination::destination_hash(
3592 "rnstransport",
3593 &["tunnel", "synthesize"],
3594 None,
3595 );
3596 assert_eq!(*dest_hash, expected_dest);
3597 }
3598 _ => panic!("Expected TunnelSynthesize"),
3599 }
3600 }
3601
3602 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3607 let mut data = Vec::new();
3608 data.extend_from_slice(dest_hash);
3609 data.extend_from_slice(tag);
3610 data
3611 }
3612
3613 #[test]
3614 fn test_path_request_forwarded_on_ap() {
3615 let mut engine = TransportEngine::new(make_config(true));
3616 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3617 engine.register_interface(make_interface(2, constants::MODE_FULL));
3618
3619 let dest = [0xD1; 16];
3620 let tag = [0x01; 16];
3621 let data = make_path_request_data(&dest, &tag);
3622
3623 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3624
3625 assert_eq!(actions.len(), 1);
3627 match &actions[0] {
3628 TransportAction::SendOnInterface { interface, .. } => {
3629 assert_eq!(*interface, InterfaceId(2));
3630 }
3631 _ => panic!("Expected SendOnInterface for forwarded path request"),
3632 }
3633 assert!(engine.discovery_path_requests.contains_key(&dest));
3635 }
3636
3637 #[test]
3638 fn test_path_request_not_forwarded_on_full() {
3639 let mut engine = TransportEngine::new(make_config(true));
3640 engine.register_interface(make_interface(1, constants::MODE_FULL));
3641 engine.register_interface(make_interface(2, constants::MODE_FULL));
3642
3643 let dest = [0xD2; 16];
3644 let tag = [0x02; 16];
3645 let data = make_path_request_data(&dest, &tag);
3646
3647 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3648
3649 assert!(actions.is_empty());
3651 assert!(!engine.discovery_path_requests.contains_key(&dest));
3652 }
3653
3654 #[test]
3655 fn test_discovery_pr_tags_fifo_eviction() {
3656 let mut config = make_config(true);
3657 config.max_discovery_pr_tags = 2;
3658 let mut engine = TransportEngine::new(config);
3659
3660 let dest1 = [0xA1; 16];
3661 let dest2 = [0xA2; 16];
3662 let dest3 = [0xA3; 16];
3663 let tag1 = [0x01; 16];
3664 let tag2 = [0x02; 16];
3665 let tag3 = [0x03; 16];
3666
3667 engine.handle_path_request(
3668 &make_path_request_data(&dest1, &tag1),
3669 InterfaceId(1),
3670 1000.0,
3671 );
3672 engine.handle_path_request(
3673 &make_path_request_data(&dest2, &tag2),
3674 InterfaceId(1),
3675 1001.0,
3676 );
3677 assert_eq!(engine.discovery_pr_tags_count(), 2);
3678
3679 let unique1 = make_unique_tag(dest1, &tag1);
3680 let unique2 = make_unique_tag(dest2, &tag2);
3681 assert!(engine.discovery_pr_tags.contains(&unique1));
3682 assert!(engine.discovery_pr_tags.contains(&unique2));
3683
3684 engine.handle_path_request(
3685 &make_path_request_data(&dest3, &tag3),
3686 InterfaceId(1),
3687 1002.0,
3688 );
3689 assert_eq!(engine.discovery_pr_tags_count(), 2);
3690 assert!(!engine.discovery_pr_tags.contains(&unique1));
3691 assert!(engine.discovery_pr_tags.contains(&unique2));
3692
3693 engine.handle_path_request(
3694 &make_path_request_data(&dest1, &tag1),
3695 InterfaceId(1),
3696 1003.0,
3697 );
3698 assert_eq!(engine.discovery_pr_tags_count(), 2);
3699 assert!(engine.discovery_pr_tags.contains(&unique1));
3700 }
3701
3702 #[test]
3703 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3704 let mut config = make_config(false);
3705 config.max_path_destinations = 2;
3706 let mut engine = TransportEngine::new(config);
3707 engine.register_interface(make_interface(1, constants::MODE_FULL));
3708
3709 let dest1 = [0xB1; 16];
3710 let dest2 = [0xB2; 16];
3711 let dest3 = [0xB3; 16];
3712
3713 engine.upsert_path_destination(
3714 dest1,
3715 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3716 1000.0,
3717 );
3718 engine.upsert_path_destination(
3719 dest2,
3720 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3721 1001.0,
3722 );
3723 engine
3724 .path_states
3725 .insert(dest1, constants::STATE_UNRESPONSIVE);
3726
3727 engine.upsert_path_destination(
3728 dest3,
3729 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3730 1002.0,
3731 );
3732
3733 assert_eq!(engine.path_table_count(), 2);
3734 assert!(!engine.has_path(&dest1));
3735 assert!(engine.has_path(&dest2));
3736 assert!(engine.has_path(&dest3));
3737 assert!(!engine.path_states.contains_key(&dest1));
3738 assert_eq!(engine.path_destination_cap_evict_count(), 1);
3739 }
3740
3741 #[test]
3742 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
3743 let mut config = make_config(false);
3744 config.max_path_destinations = 2;
3745 config.max_paths_per_destination = 2;
3746 let mut engine = TransportEngine::new(config);
3747 engine.register_interface(make_interface(1, constants::MODE_FULL));
3748
3749 let dest1 = [0xC1; 16];
3750 let dest2 = [0xC2; 16];
3751
3752 engine.upsert_path_destination(
3753 dest1,
3754 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
3755 1000.0,
3756 );
3757 engine.upsert_path_destination(
3758 dest2,
3759 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
3760 1001.0,
3761 );
3762
3763 engine.upsert_path_destination(
3764 dest2,
3765 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
3766 1002.0,
3767 );
3768
3769 assert_eq!(engine.path_table_count(), 2);
3770 assert!(engine.has_path(&dest1));
3771 assert!(engine.has_path(&dest2));
3772 }
3773
3774 #[test]
3775 fn test_roaming_loop_prevention() {
3776 let mut engine = TransportEngine::new(make_config(true));
3777 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
3778
3779 let dest = [0xD3; 16];
3780 engine.path_table.insert(
3782 dest,
3783 PathSet::from_single(
3784 PathEntry {
3785 timestamp: 900.0,
3786 next_hop: [0xAA; 16],
3787 hops: 2,
3788 expires: 9999.0,
3789 random_blobs: Vec::new(),
3790 receiving_interface: InterfaceId(1),
3791 packet_hash: [0; 32],
3792 announce_raw: None,
3793 },
3794 1,
3795 ),
3796 );
3797
3798 let tag = [0x03; 16];
3799 let data = make_path_request_data(&dest, &tag);
3800
3801 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3802
3803 assert!(actions.is_empty());
3805 assert!(!engine.announce_table.contains_key(&dest));
3806 }
3807
3808 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
3810 let flags: u8 = 0x01; let mut raw = Vec::new();
3814 raw.push(flags);
3815 raw.push(0x02); raw.extend_from_slice(dest_hash);
3817 raw.push(constants::CONTEXT_NONE);
3818 raw.extend_from_slice(payload);
3819 raw
3820 }
3821
3822 #[test]
3823 fn test_path_request_populates_announce_entry_from_raw() {
3824 let mut engine = TransportEngine::new(make_config(true));
3825 engine.register_interface(make_interface(1, constants::MODE_FULL));
3826 engine.register_interface(make_interface(2, constants::MODE_FULL));
3827
3828 let dest = [0xD5; 16];
3829 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
3831
3832 engine.path_table.insert(
3833 dest,
3834 PathSet::from_single(
3835 PathEntry {
3836 timestamp: 900.0,
3837 next_hop: [0xBB; 16],
3838 hops: 2,
3839 expires: 9999.0,
3840 random_blobs: Vec::new(),
3841 receiving_interface: InterfaceId(2),
3842 packet_hash: [0; 32],
3843 announce_raw: Some(announce_raw.clone()),
3844 },
3845 1,
3846 ),
3847 );
3848
3849 let tag = [0x05; 16];
3850 let data = make_path_request_data(&dest, &tag);
3851 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3852
3853 let entry = engine
3855 .announce_table
3856 .get(&dest)
3857 .expect("announce entry must exist");
3858 assert_eq!(entry.packet_raw, announce_raw);
3859 assert_eq!(entry.packet_data, payload);
3860 assert!(entry.block_rebroadcasts);
3861 }
3862
3863 #[test]
3864 fn test_path_request_skips_when_no_announce_raw() {
3865 let mut engine = TransportEngine::new(make_config(true));
3866 engine.register_interface(make_interface(1, constants::MODE_FULL));
3867 engine.register_interface(make_interface(2, constants::MODE_FULL));
3868
3869 let dest = [0xD6; 16];
3870
3871 engine.path_table.insert(
3872 dest,
3873 PathSet::from_single(
3874 PathEntry {
3875 timestamp: 900.0,
3876 next_hop: [0xCC; 16],
3877 hops: 1,
3878 expires: 9999.0,
3879 random_blobs: Vec::new(),
3880 receiving_interface: InterfaceId(2),
3881 packet_hash: [0; 32],
3882 announce_raw: None, },
3884 1,
3885 ),
3886 );
3887
3888 let tag = [0x06; 16];
3889 let data = make_path_request_data(&dest, &tag);
3890 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3891
3892 assert!(actions.is_empty());
3894 assert!(!engine.announce_table.contains_key(&dest));
3895 }
3896
3897 #[test]
3898 fn test_discovery_request_consumed_on_announce() {
3899 let mut engine = TransportEngine::new(make_config(true));
3900 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3901
3902 let dest = [0xD4; 16];
3903
3904 engine.discovery_path_requests.insert(
3906 dest,
3907 DiscoveryPathRequest {
3908 timestamp: 900.0,
3909 requesting_interface: InterfaceId(1),
3910 },
3911 );
3912
3913 let iface = engine.discovery_path_requests_waiting(&dest);
3915 assert_eq!(iface, Some(InterfaceId(1)));
3916
3917 assert!(!engine.discovery_path_requests.contains_key(&dest));
3919 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
3920 }
3921
3922 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
3928 let identity =
3929 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3930 let random_hash = [0x42u8; 10];
3931 let (announce_data, _) = crate::announce::AnnounceData::pack(
3932 &identity,
3933 dest_hash,
3934 name_hash,
3935 &random_hash,
3936 None,
3937 None,
3938 )
3939 .unwrap();
3940 let flags = PacketFlags {
3941 header_type: constants::HEADER_1,
3942 context_flag: constants::FLAG_UNSET,
3943 transport_type: constants::TRANSPORT_BROADCAST,
3944 destination_type: constants::DESTINATION_SINGLE,
3945 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3946 };
3947 RawPacket::pack(
3948 flags,
3949 0,
3950 dest_hash,
3951 None,
3952 constants::CONTEXT_NONE,
3953 &announce_data,
3954 )
3955 .unwrap()
3956 .raw
3957 }
3958
3959 #[test]
3960 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
3961 let mut engine = TransportEngine::new(make_config(false));
3966 engine.register_interface(make_local_client_interface(1));
3967
3968 let identity =
3969 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3970 let dest_hash =
3971 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
3972 let name_hash = crate::destination::name_hash("issue4", &["test"]);
3973 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3974
3975 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
3979 announce_packet.raw[1] = 1;
3980 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3981 engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
3982 assert!(engine.has_path(&dest_hash));
3983 assert_eq!(engine.hops_to(&dest_hash), Some(1));
3984
3985 let data_flags = PacketFlags {
3987 header_type: constants::HEADER_1,
3988 context_flag: constants::FLAG_UNSET,
3989 transport_type: constants::TRANSPORT_BROADCAST,
3990 destination_type: constants::DESTINATION_SINGLE,
3991 packet_type: constants::PACKET_TYPE_DATA,
3992 };
3993 let data_packet = RawPacket::pack(
3994 data_flags,
3995 0,
3996 &dest_hash,
3997 None,
3998 constants::CONTEXT_NONE,
3999 b"hello",
4000 )
4001 .unwrap();
4002
4003 let actions =
4004 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
4005
4006 let send = actions.iter().find_map(|a| match a {
4007 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4008 _ => None,
4009 });
4010 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4011 assert_eq!(*interface, InterfaceId(1));
4012 let flags = PacketFlags::unpack(raw[0]);
4013 assert_eq!(flags.header_type, constants::HEADER_2);
4014 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4015 }
4016
4017 #[test]
4018 fn test_issue4_external_data_to_1hop_via_transport_works() {
4019 let daemon_id = [0x42; 16];
4025 let mut engine = TransportEngine::new(TransportConfig {
4026 transport_enabled: true,
4027 identity_hash: Some(daemon_id),
4028 prefer_shorter_path: false,
4029 max_paths_per_destination: 1,
4030 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4031 max_discovery_pr_tags: constants::MAX_PR_TAGS,
4032 max_path_destinations: usize::MAX,
4033 max_tunnel_destinations_total: usize::MAX,
4034 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4035 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4036 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4037 announce_sig_cache_enabled: true,
4038 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4039 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4040 announce_queue_max_entries: 256,
4041 announce_queue_max_interfaces: 1024,
4042 });
4043 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
4047 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4048 let dest_hash =
4049 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4050 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4051 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4052
4053 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4055 engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
4056 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4057
4058 let h2_flags = PacketFlags {
4061 header_type: constants::HEADER_2,
4062 context_flag: constants::FLAG_UNSET,
4063 transport_type: constants::TRANSPORT_TRANSPORT,
4064 destination_type: constants::DESTINATION_SINGLE,
4065 packet_type: constants::PACKET_TYPE_DATA,
4066 };
4067 let mut h2_raw = Vec::new();
4069 h2_raw.push(h2_flags.pack());
4070 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
4073 h2_raw.push(constants::CONTEXT_NONE);
4074 h2_raw.extend_from_slice(b"hello via transport");
4075
4076 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4077 let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
4078
4079 let has_send = actions.iter().any(|a| {
4081 matches!(
4082 a,
4083 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4084 )
4085 });
4086 assert!(
4087 has_send,
4088 "HEADER_2 transport packet should be forwarded (control test)"
4089 );
4090 }
4091}