1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod pathfinder;
10pub mod rate_limit;
11pub mod tables;
12pub mod tunnel;
13pub mod types;
14
15use alloc::collections::BTreeMap;
16use alloc::string::String;
17use alloc::vec::Vec;
18use core::mem::size_of;
19
20use rns_crypto::Rng;
21
22use crate::announce::AnnounceData;
23use crate::constants;
24use crate::hash;
25use crate::packet::RawPacket;
26
27use self::announce_proc::compute_path_expires;
28use self::announce_queue::AnnounceQueues;
29use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
30use self::dedup::{AnnounceSignatureCache, PacketHashlist};
31use self::inbound::{
32 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
33 route_via_link_table,
34};
35use self::ingress_control::IngressControl;
36use self::outbound::{route_outbound, should_transmit_announce};
37use self::pathfinder::{
38 decide_announce_multipath, extract_random_blob, timebase_from_random_blob, MultiPathDecision,
39};
40use self::rate_limit::AnnounceRateLimiter;
41use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
42use self::tunnel::TunnelTable;
43use self::types::{BlackholeEntry, InterfaceId, InterfaceInfo, TransportAction, TransportConfig};
44
45pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
46pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
47
48pub struct TransportEngine {
53 config: TransportConfig,
54 path_table: BTreeMap<[u8; 16], PathSet>,
55 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
56 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
57 link_table: BTreeMap<[u8; 16], LinkEntry>,
58 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
59 packet_hashlist: PacketHashlist,
60 announce_sig_cache: AnnounceSignatureCache,
61 rate_limiter: AnnounceRateLimiter,
62 path_states: BTreeMap<[u8; 16], u8>,
63 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
64 local_destinations: BTreeMap<[u8; 16], u8>,
65 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
66 announce_queues: AnnounceQueues,
67 ingress_control: IngressControl,
68 tunnel_table: TunnelTable,
69 discovery_pr_tags: Vec<[u8; 32]>,
70 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
71 path_destination_cap_evict_count: usize,
72 announces_last_checked: f64,
74 tables_last_culled: f64,
75}
76
77impl TransportEngine {
78 pub fn new(config: TransportConfig) -> Self {
79 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
80 let sig_cache_max = if config.announce_sig_cache_enabled {
81 config.announce_sig_cache_max_entries
82 } else {
83 0
84 };
85 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
86 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
87 TransportEngine {
88 config,
89 path_table: BTreeMap::new(),
90 announce_table: BTreeMap::new(),
91 reverse_table: BTreeMap::new(),
92 link_table: BTreeMap::new(),
93 held_announces: BTreeMap::new(),
94 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
95 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
96 rate_limiter: AnnounceRateLimiter::new(),
97 path_states: BTreeMap::new(),
98 interfaces: BTreeMap::new(),
99 local_destinations: BTreeMap::new(),
100 blackholed_identities: BTreeMap::new(),
101 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
102 ingress_control: IngressControl::new(),
103 tunnel_table: TunnelTable::new(),
104 discovery_pr_tags: Vec::new(),
105 discovery_path_requests: BTreeMap::new(),
106 path_destination_cap_evict_count: 0,
107 announces_last_checked: 0.0,
108 tables_last_culled: 0.0,
109 }
110 }
111
112 fn insert_discovery_pr_tag(&mut self, unique_tag: [u8; 32]) -> bool {
113 if self.discovery_pr_tags.contains(&unique_tag) {
114 return false;
115 }
116 if self.config.max_discovery_pr_tags != usize::MAX
117 && self.discovery_pr_tags.len() >= self.config.max_discovery_pr_tags
118 && !self.discovery_pr_tags.is_empty()
119 {
120 self.discovery_pr_tags.remove(0);
121 }
122 self.discovery_pr_tags.push(unique_tag);
123 true
124 }
125
126 fn upsert_path_destination(&mut self, dest_hash: [u8; 16], entry: PathEntry, now: f64) {
127 let max_paths = self.config.max_paths_per_destination;
128 if let Some(ps) = self.path_table.get_mut(&dest_hash) {
129 ps.upsert(entry);
130 return;
131 }
132 self.enforce_path_destination_cap(now);
133 self.path_table
134 .insert(dest_hash, PathSet::from_single(entry, max_paths));
135 }
136
137 fn enforce_path_destination_cap(&mut self, now: f64) {
138 if self.config.max_path_destinations == usize::MAX {
139 return;
140 }
141 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
142 while self.path_table.len() >= self.config.max_path_destinations {
143 let Some(dest_hash) = self.oldest_path_destination() else {
144 break;
145 };
146 self.path_table.remove(&dest_hash);
147 self.path_states.remove(&dest_hash);
148 self.path_destination_cap_evict_count += 1;
149 }
150 }
151
152 fn oldest_path_destination(&self) -> Option<[u8; 16]> {
153 self.path_table
154 .iter()
155 .filter_map(|(dest_hash, path_set)| {
156 path_set
157 .primary()
158 .map(|primary| (*dest_hash, primary.timestamp, primary.hops))
159 })
160 .min_by(|a, b| {
161 a.1.partial_cmp(&b.1)
162 .unwrap_or(core::cmp::Ordering::Equal)
163 .then_with(|| b.2.cmp(&a.2))
164 })
165 .map(|(dest_hash, _, _)| dest_hash)
166 }
167
168 fn announce_entry_size_bytes(entry: &AnnounceEntry) -> usize {
169 size_of::<AnnounceEntry>() + entry.packet_raw.capacity() + entry.packet_data.capacity()
170 }
171
172 fn announce_retained_bytes_total(&self) -> usize {
173 self.announce_table
174 .values()
175 .chain(self.held_announces.values())
176 .map(Self::announce_entry_size_bytes)
177 .sum()
178 }
179
180 fn cull_expired_announce_entries(&mut self, now: f64) -> usize {
181 let ttl = self.config.announce_table_ttl_secs;
182 let mut removed = 0usize;
183
184 self.announce_table.retain(|_, entry| {
185 let keep = now <= entry.timestamp + ttl;
186 if !keep {
187 removed += 1;
188 }
189 keep
190 });
191
192 self.held_announces.retain(|_, entry| {
193 let keep = now <= entry.timestamp + ttl;
194 if !keep {
195 removed += 1;
196 }
197 keep
198 });
199
200 removed
201 }
202
203 fn oldest_retained_announce(&self) -> Option<([u8; 16], bool)> {
204 let oldest_active = self
205 .announce_table
206 .iter()
207 .map(|(dest_hash, entry)| (*dest_hash, false, entry.timestamp))
208 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
209 let oldest_held = self
210 .held_announces
211 .iter()
212 .map(|(dest_hash, entry)| (*dest_hash, true, entry.timestamp))
213 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
214
215 match (oldest_active, oldest_held) {
216 (Some(active), Some(held)) => {
217 let ordering = active
218 .2
219 .partial_cmp(&held.2)
220 .unwrap_or(core::cmp::Ordering::Equal);
221 if ordering == core::cmp::Ordering::Less {
222 Some((active.0, active.1))
223 } else {
224 Some((held.0, held.1))
225 }
226 }
227 (Some(active), None) => Some((active.0, active.1)),
228 (None, Some(held)) => Some((held.0, held.1)),
229 (None, None) => None,
230 }
231 }
232
233 fn enforce_announce_retention_cap(&mut self, now: f64) {
234 self.cull_expired_announce_entries(now);
235 while self.announce_retained_bytes_total() > self.config.announce_table_max_bytes {
236 let Some((dest_hash, is_held)) = self.oldest_retained_announce() else {
237 break;
238 };
239 if is_held {
240 self.held_announces.remove(&dest_hash);
241 } else {
242 self.announce_table.remove(&dest_hash);
243 }
244 }
245 }
246
247 fn insert_announce_entry(
248 &mut self,
249 dest_hash: [u8; 16],
250 entry: AnnounceEntry,
251 now: f64,
252 ) -> bool {
253 self.cull_expired_announce_entries(now);
254 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
255 return false;
256 }
257 self.announce_table.insert(dest_hash, entry);
258 self.enforce_announce_retention_cap(now);
259 self.announce_table.contains_key(&dest_hash)
260 }
261
262 fn insert_held_announce(
263 &mut self,
264 dest_hash: [u8; 16],
265 entry: AnnounceEntry,
266 now: f64,
267 ) -> bool {
268 self.cull_expired_announce_entries(now);
269 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
270 return false;
271 }
272 self.held_announces.insert(dest_hash, entry);
273 self.enforce_announce_retention_cap(now);
274 self.held_announces.contains_key(&dest_hash)
275 }
276
277 pub fn register_interface(&mut self, info: InterfaceInfo) {
282 self.interfaces.insert(info.id, info);
283 }
284
285 pub fn deregister_interface(&mut self, id: InterfaceId) {
286 self.interfaces.remove(&id);
287 self.announce_queues.remove_interface(id);
288 self.ingress_control.remove_interface(&id);
289 }
290
291 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
296 self.local_destinations.insert(dest_hash, dest_type);
297 }
298
299 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
300 self.local_destinations.remove(dest_hash);
301 }
302
303 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
308 self.path_table
309 .get(dest_hash)
310 .is_some_and(|ps| !ps.is_empty())
311 }
312
313 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
314 self.path_table
315 .get(dest_hash)
316 .and_then(|ps| ps.primary())
317 .map(|e| e.hops)
318 }
319
320 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
321 self.path_table
322 .get(dest_hash)
323 .and_then(|ps| ps.primary())
324 .map(|e| e.next_hop)
325 }
326
327 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
328 self.path_table
329 .get(dest_hash)
330 .and_then(|ps| ps.primary())
331 .map(|e| e.receiving_interface)
332 }
333
334 pub fn mark_path_unresponsive(
344 &mut self,
345 dest_hash: &[u8; 16],
346 receiving_interface: Option<InterfaceId>,
347 ) {
348 if let Some(iface_id) = receiving_interface {
349 if let Some(info) = self.interfaces.get(&iface_id) {
350 if info.mode == constants::MODE_BOUNDARY {
351 return;
352 }
353 }
354 }
355
356 if let Some(ps) = self.path_table.get_mut(dest_hash) {
358 if ps.len() > 1 {
359 ps.failover(false); self.path_states.remove(dest_hash);
362 return;
363 }
364 }
365
366 self.path_states
367 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
368 }
369
370 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
371 self.path_states
372 .insert(*dest_hash, constants::STATE_RESPONSIVE);
373 }
374
375 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
376 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
377 }
378
379 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
380 if let Some(ps) = self.path_table.get_mut(dest_hash) {
381 ps.expire_all();
382 }
383 }
384
385 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
390 self.link_table.insert(link_id, entry);
391 }
392
393 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
394 if let Some(entry) = self.link_table.get_mut(link_id) {
395 entry.validated = true;
396 }
397 }
398
399 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
400 self.link_table.remove(link_id);
401 }
402
403 pub fn blackhole_identity(
409 &mut self,
410 identity_hash: [u8; 16],
411 now: f64,
412 duration_hours: Option<f64>,
413 reason: Option<String>,
414 ) {
415 let expires = match duration_hours {
416 Some(h) if h > 0.0 => now + h * 3600.0,
417 _ => 0.0, };
419 self.blackholed_identities.insert(
420 identity_hash,
421 BlackholeEntry {
422 created: now,
423 expires,
424 reason,
425 },
426 );
427 }
428
429 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
431 self.blackholed_identities.remove(identity_hash).is_some()
432 }
433
434 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
436 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
437 if entry.expires == 0.0 || entry.expires > now {
438 return true;
439 }
440 }
441 false
442 }
443
444 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
446 self.blackholed_identities.iter()
447 }
448
449 fn cull_blackholed(&mut self, now: f64) {
451 self.blackholed_identities
452 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
453 }
454
455 pub fn handle_tunnel(
463 &mut self,
464 tunnel_id: [u8; 32],
465 interface: InterfaceId,
466 now: f64,
467 ) -> Vec<TransportAction> {
468 let mut actions = Vec::new();
469
470 if let Some(info) = self.interfaces.get_mut(&interface) {
472 info.tunnel_id = Some(tunnel_id);
473 }
474
475 let restored_paths = self.tunnel_table.handle_tunnel(
476 tunnel_id,
477 interface,
478 now,
479 self.config.destination_timeout_secs,
480 );
481
482 for (dest_hash, tunnel_path) in &restored_paths {
484 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
485 Some(existing) => {
486 tunnel_path.hops <= existing.hops || existing.expires < now
488 }
489 None => true,
490 };
491
492 if should_restore {
493 let entry = PathEntry {
494 timestamp: tunnel_path.timestamp,
495 next_hop: tunnel_path.received_from,
496 hops: tunnel_path.hops,
497 expires: tunnel_path.expires,
498 random_blobs: tunnel_path.random_blobs.clone(),
499 receiving_interface: interface,
500 packet_hash: tunnel_path.packet_hash,
501 announce_raw: None,
502 };
503 self.upsert_path_destination(*dest_hash, entry, now);
504 }
505 }
506
507 actions.push(TransportAction::TunnelEstablished {
508 tunnel_id,
509 interface,
510 });
511
512 actions
513 }
514
515 pub fn synthesize_tunnel(
523 &self,
524 identity: &rns_crypto::identity::Identity,
525 interface_id: InterfaceId,
526 rng: &mut dyn Rng,
527 ) -> Vec<TransportAction> {
528 let mut actions = Vec::new();
529
530 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
532 hash::full_hash(info.name.as_bytes())
533 } else {
534 return actions;
535 };
536
537 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
538 Ok((data, _tunnel_id)) => {
539 let dest_hash = crate::destination::destination_hash(
540 "rnstransport",
541 &["tunnel", "synthesize"],
542 None,
543 );
544 actions.push(TransportAction::TunnelSynthesize {
545 interface: interface_id,
546 data,
547 dest_hash,
548 });
549 }
550 Err(e) => {
551 let _ = e;
553 }
554 }
555
556 actions
557 }
558
559 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
561 self.tunnel_table.void_tunnel_interface(tunnel_id);
562 }
563
564 pub fn tunnel_table(&self) -> &TunnelTable {
566 &self.tunnel_table
567 }
568
569 fn has_local_clients(&self) -> bool {
575 self.interfaces.values().any(|i| i.is_local_client)
576 }
577
578 fn packet_filter(&self, packet: &RawPacket) -> bool {
582 if packet.transport_id.is_some()
584 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
585 {
586 if let Some(ref identity_hash) = self.config.identity_hash {
587 if packet.transport_id.as_ref() != Some(identity_hash) {
588 return false;
589 }
590 }
591 }
592
593 match packet.context {
595 constants::CONTEXT_KEEPALIVE
596 | constants::CONTEXT_RESOURCE_REQ
597 | constants::CONTEXT_RESOURCE_PRF
598 | constants::CONTEXT_RESOURCE
599 | constants::CONTEXT_CACHE_REQUEST
600 | constants::CONTEXT_CHANNEL => return true,
601 _ => {}
602 }
603
604 if packet.flags.destination_type == constants::DESTINATION_PLAIN
606 || packet.flags.destination_type == constants::DESTINATION_GROUP
607 {
608 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
609 return packet.hops <= 1;
610 } else {
611 return false;
613 }
614 }
615
616 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
618 return true;
619 }
620
621 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
623 && packet.flags.destination_type == constants::DESTINATION_SINGLE
624 {
625 return true;
626 }
627
628 false
629 }
630
631 pub fn handle_inbound(
639 &mut self,
640 raw: &[u8],
641 iface: InterfaceId,
642 now: f64,
643 rng: &mut dyn Rng,
644 ) -> Vec<TransportAction> {
645 self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
646 }
647
648 pub fn handle_inbound_with_announce_queue(
649 &mut self,
650 raw: &[u8],
651 iface: InterfaceId,
652 now: f64,
653 rng: &mut dyn Rng,
654 announce_queue: Option<&mut AnnounceVerifyQueue>,
655 ) -> Vec<TransportAction> {
656 let mut actions = Vec::new();
657
658 let mut packet = match RawPacket::unpack(raw) {
660 Ok(p) => p,
661 Err(_) => return actions, };
663
664 let original_raw = raw.to_vec();
666
667 packet.hops += 1;
669
670 let from_local_client = self
673 .interfaces
674 .get(&iface)
675 .map(|i| i.is_local_client)
676 .unwrap_or(false);
677 if from_local_client {
678 packet.hops = packet.hops.saturating_sub(1);
679 }
680
681 if !self.packet_filter(&packet) {
683 return actions;
684 }
685
686 let mut remember_hash = true;
688
689 if self.link_table.contains_key(&packet.destination_hash) {
690 remember_hash = false;
691 }
692 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF
693 && packet.context == constants::CONTEXT_LRPROOF
694 {
695 remember_hash = false;
696 }
697
698 if remember_hash {
699 self.packet_hashlist.add(packet.packet_hash);
700 }
701
702 if packet.flags.destination_type == constants::DESTINATION_PLAIN
704 && packet.flags.transport_type == constants::TRANSPORT_BROADCAST
705 && self.has_local_clients()
706 {
707 if from_local_client {
708 actions.push(TransportAction::ForwardPlainBroadcast {
710 raw: packet.raw.clone(),
711 to_local: false,
712 exclude: Some(iface),
713 });
714 } else {
715 actions.push(TransportAction::ForwardPlainBroadcast {
717 raw: packet.raw.clone(),
718 to_local: true,
719 exclude: None,
720 });
721 }
722 }
723
724 if self.config.transport_enabled || self.config.identity_hash.is_some() {
726 if packet.transport_id.is_some()
727 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
728 {
729 if let Some(ref identity_hash) = self.config.identity_hash {
730 if packet.transport_id.as_ref() == Some(identity_hash) {
731 if let Some(path_entry) = self
732 .path_table
733 .get(&packet.destination_hash)
734 .and_then(|ps| ps.primary())
735 {
736 let next_hop = path_entry.next_hop;
737 let remaining_hops = path_entry.hops;
738 let outbound_interface = path_entry.receiving_interface;
739
740 let new_raw = forward_transport_packet(
741 &packet,
742 next_hop,
743 remaining_hops,
744 outbound_interface,
745 );
746
747 if packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
749 let proof_timeout = now
750 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP
751 * (remaining_hops.max(1) as f64);
752
753 let (link_id, link_entry) = create_link_entry(
754 &packet,
755 next_hop,
756 outbound_interface,
757 remaining_hops,
758 iface,
759 now,
760 proof_timeout,
761 );
762 self.link_table.insert(link_id, link_entry);
763 actions.push(TransportAction::LinkRequestReceived {
764 link_id,
765 destination_hash: packet.destination_hash,
766 receiving_interface: iface,
767 });
768 } else {
769 let (trunc_hash, reverse_entry) =
770 create_reverse_entry(&packet, outbound_interface, iface, now);
771 self.reverse_table.insert(trunc_hash, reverse_entry);
772 }
773
774 actions.push(TransportAction::SendOnInterface {
775 interface: outbound_interface,
776 raw: new_raw,
777 });
778
779 if let Some(entry) = self
781 .path_table
782 .get_mut(&packet.destination_hash)
783 .and_then(|ps| ps.primary_mut())
784 {
785 entry.timestamp = now;
786 }
787 }
788 }
789 }
790 }
791
792 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
794 && packet.flags.packet_type != constants::PACKET_TYPE_LINKREQUEST
795 && packet.context != constants::CONTEXT_LRPROOF
796 {
797 if let Some(link_entry) = self.link_table.get(&packet.destination_hash).cloned() {
798 if let Some((outbound_iface, new_raw)) =
799 route_via_link_table(&packet, &link_entry, iface)
800 {
801 self.packet_hashlist.add(packet.packet_hash);
803
804 actions.push(TransportAction::SendOnInterface {
805 interface: outbound_iface,
806 raw: new_raw,
807 });
808
809 if let Some(entry) = self.link_table.get_mut(&packet.destination_hash) {
811 entry.timestamp = now;
812 }
813 }
814 }
815 }
816 }
817
818 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE {
820 if let Some(queue) = announce_queue {
821 self.try_enqueue_announce(
822 &packet,
823 &original_raw,
824 iface,
825 now,
826 rng,
827 queue,
828 &mut actions,
829 );
830 } else {
831 self.process_inbound_announce(
832 &packet,
833 &original_raw,
834 iface,
835 now,
836 rng,
837 &mut actions,
838 );
839 }
840 }
841
842 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
844 self.process_inbound_proof(&packet, iface, now, &mut actions);
845 }
846
847 if (packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
849 || packet.flags.packet_type == constants::PACKET_TYPE_DATA)
850 && self
851 .local_destinations
852 .contains_key(&packet.destination_hash)
853 {
854 actions.push(TransportAction::DeliverLocal {
855 destination_hash: packet.destination_hash,
856 raw: packet.raw.clone(),
857 packet_hash: packet.packet_hash,
858 receiving_interface: iface,
859 });
860 }
861
862 actions
863 }
864
865 fn process_inbound_announce(
870 &mut self,
871 packet: &RawPacket,
872 original_raw: &[u8],
873 iface: InterfaceId,
874 now: f64,
875 rng: &mut dyn Rng,
876 actions: &mut Vec<TransportAction>,
877 ) {
878 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
879 return;
880 }
881
882 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
883
884 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
886 Ok(a) => a,
887 Err(_) => return,
888 };
889
890 let sig_cache_key =
891 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
892
893 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
894 announce.to_validated_unchecked()
895 } else {
896 match announce.validate(&packet.destination_hash) {
897 Ok(v) => {
898 self.announce_sig_cache.insert(sig_cache_key, now);
899 v
900 }
901 Err(_) => return,
902 }
903 };
904
905 let received_from = self.announce_received_from(packet, now);
906 let random_blob = match extract_random_blob(&packet.data) {
907 Some(b) => b,
908 None => return,
909 };
910 let announce_emitted = timebase_from_random_blob(&random_blob);
911
912 self.process_verified_announce(
913 packet,
914 original_raw,
915 iface,
916 now,
917 rng,
918 validated,
919 received_from,
920 random_blob,
921 announce_emitted,
922 actions,
923 );
924 }
925
926 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
927 let mut material = [0u8; 80];
928 material[..16].copy_from_slice(&destination_hash);
929 material[16..].copy_from_slice(signature);
930 hash::full_hash(&material)
931 }
932
933 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
934 if let Some(transport_id) = packet.transport_id {
935 if self.config.transport_enabled {
936 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
937 {
938 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
939 announce_entry.local_rebroadcasts += 1;
940 if announce_entry.retries > 0
941 && announce_entry.local_rebroadcasts
942 >= constants::LOCAL_REBROADCASTS_MAX
943 {
944 self.announce_table.remove(&packet.destination_hash);
945 }
946 }
947 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
948 {
949 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
950 && announce_entry.retries > 0
951 && now < announce_entry.retransmit_timeout
952 {
953 self.announce_table.remove(&packet.destination_hash);
954 }
955 }
956 }
957 }
958 transport_id
959 } else {
960 packet.destination_hash
961 }
962 }
963
964 fn should_hold_announce(
965 &mut self,
966 packet: &RawPacket,
967 original_raw: &[u8],
968 iface: InterfaceId,
969 now: f64,
970 ) -> bool {
971 if self.has_path(&packet.destination_hash) {
972 return false;
973 }
974 let Some(info) = self.interfaces.get(&iface) else {
975 return false;
976 };
977 if packet.context == constants::CONTEXT_PATH_RESPONSE
978 || !info.ingress_control
979 || !self
980 .ingress_control
981 .should_ingress_limit(iface, info.ia_freq, info.started, now)
982 {
983 return false;
984 }
985 self.ingress_control.hold_announce(
986 iface,
987 packet.destination_hash,
988 ingress_control::HeldAnnounce {
989 raw: original_raw.to_vec(),
990 hops: packet.hops,
991 receiving_interface: iface,
992 timestamp: now,
993 },
994 );
995 true
996 }
997
998 fn try_enqueue_announce(
999 &mut self,
1000 packet: &RawPacket,
1001 original_raw: &[u8],
1002 iface: InterfaceId,
1003 now: f64,
1004 rng: &mut dyn Rng,
1005 announce_queue: &mut AnnounceVerifyQueue,
1006 actions: &mut Vec<TransportAction>,
1007 ) {
1008 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
1009 return;
1010 }
1011
1012 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
1013 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
1014 Ok(a) => a,
1015 Err(_) => return,
1016 };
1017
1018 let received_from = self.announce_received_from(packet, now);
1019
1020 if self
1021 .local_destinations
1022 .contains_key(&packet.destination_hash)
1023 {
1024 log::debug!(
1025 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1026 packet.destination_hash[0],
1027 packet.destination_hash[1],
1028 packet.destination_hash[2],
1029 packet.destination_hash[3],
1030 );
1031 return;
1032 }
1033
1034 if self.should_hold_announce(packet, original_raw, iface, now) {
1035 return;
1036 }
1037
1038 let sig_cache_key =
1039 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
1040 if self.announce_sig_cache.contains(&sig_cache_key) {
1041 let validated = announce.to_validated_unchecked();
1042 let random_blob = match extract_random_blob(&packet.data) {
1043 Some(b) => b,
1044 None => return,
1045 };
1046 let announce_emitted = timebase_from_random_blob(&random_blob);
1047 self.process_verified_announce(
1048 packet,
1049 original_raw,
1050 iface,
1051 now,
1052 rng,
1053 validated,
1054 received_from,
1055 random_blob,
1056 announce_emitted,
1057 actions,
1058 );
1059 return;
1060 }
1061
1062 if packet.context == constants::CONTEXT_PATH_RESPONSE {
1063 let Ok(validated) = announce.validate(&packet.destination_hash) else {
1064 return;
1065 };
1066 self.announce_sig_cache.insert(sig_cache_key, now);
1067 let random_blob = match extract_random_blob(&packet.data) {
1068 Some(b) => b,
1069 None => return,
1070 };
1071 let announce_emitted = timebase_from_random_blob(&random_blob);
1072 self.process_verified_announce(
1073 packet,
1074 original_raw,
1075 iface,
1076 now,
1077 rng,
1078 validated,
1079 received_from,
1080 random_blob,
1081 announce_emitted,
1082 actions,
1083 );
1084 return;
1085 }
1086
1087 let random_blob = match extract_random_blob(&packet.data) {
1088 Some(b) => b,
1089 None => return,
1090 };
1091 let announce_emitted = timebase_from_random_blob(&random_blob);
1092 let key = AnnounceVerifyKey {
1093 destination_hash: packet.destination_hash,
1094 random_blob,
1095 received_from,
1096 };
1097 let pending = PendingAnnounce {
1098 original_raw: original_raw.to_vec(),
1099 packet: packet.clone(),
1100 interface: iface,
1101 received_from,
1102 queued_at: now,
1103 best_hops: packet.hops,
1104 emission_ts: announce_emitted,
1105 random_blob,
1106 };
1107 let _ = announce_queue.enqueue(key, pending);
1108 }
1109
1110 pub fn complete_verified_announce(
1111 &mut self,
1112 pending: PendingAnnounce,
1113 validated: crate::announce::ValidatedAnnounce,
1114 sig_cache_key: [u8; 32],
1115 now: f64,
1116 rng: &mut dyn Rng,
1117 ) -> Vec<TransportAction> {
1118 self.announce_sig_cache.insert(sig_cache_key, now);
1119 let mut actions = Vec::new();
1120 self.process_verified_announce(
1121 &pending.packet,
1122 &pending.original_raw,
1123 pending.interface,
1124 now,
1125 rng,
1126 validated,
1127 pending.received_from,
1128 pending.random_blob,
1129 pending.emission_ts,
1130 &mut actions,
1131 );
1132 actions
1133 }
1134
1135 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1136
1137 fn process_verified_announce(
1138 &mut self,
1139 packet: &RawPacket,
1140 original_raw: &[u8],
1141 iface: InterfaceId,
1142 now: f64,
1143 rng: &mut dyn Rng,
1144 validated: crate::announce::ValidatedAnnounce,
1145 received_from: [u8; 16],
1146 random_blob: [u8; 10],
1147 announce_emitted: u64,
1148 actions: &mut Vec<TransportAction>,
1149 ) {
1150 if self.is_blackholed(&validated.identity_hash, now) {
1151 return;
1152 }
1153 if packet.hops > constants::PATHFINDER_M {
1154 return;
1155 }
1156
1157 let existing_set = self.path_table.get(&packet.destination_hash);
1159 let is_unresponsive = self.path_is_unresponsive(&packet.destination_hash);
1160
1161 let mp_decision = decide_announce_multipath(
1162 existing_set,
1163 packet.hops,
1164 announce_emitted,
1165 &random_blob,
1166 &received_from,
1167 is_unresponsive,
1168 now,
1169 self.config.prefer_shorter_path,
1170 );
1171
1172 if mp_decision == MultiPathDecision::Reject {
1173 log::debug!(
1174 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1175 packet.destination_hash[0],
1176 packet.destination_hash[1],
1177 packet.destination_hash[2],
1178 packet.destination_hash[3],
1179 );
1180 return;
1181 }
1182
1183 let rate_blocked = if packet.context != constants::CONTEXT_PATH_RESPONSE {
1185 if let Some(iface_info) = self.interfaces.get(&iface) {
1186 self.rate_limiter.check_and_update(
1187 &packet.destination_hash,
1188 now,
1189 iface_info.announce_rate_target,
1190 iface_info.announce_rate_grace,
1191 iface_info.announce_rate_penalty,
1192 )
1193 } else {
1194 false
1195 }
1196 } else {
1197 false
1198 };
1199
1200 let interface_mode = self
1202 .interfaces
1203 .get(&iface)
1204 .map(|i| i.mode)
1205 .unwrap_or(constants::MODE_FULL);
1206
1207 let expires = compute_path_expires(now, interface_mode);
1208
1209 let existing_blobs = self
1211 .path_table
1212 .get(&packet.destination_hash)
1213 .and_then(|ps| ps.find_by_next_hop(&received_from))
1214 .map(|e| e.random_blobs.clone())
1215 .unwrap_or_default();
1216
1217 let mut rng_bytes = [0u8; 8];
1219 rng.fill_bytes(&mut rng_bytes);
1220 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1221
1222 let is_path_response = packet.context == constants::CONTEXT_PATH_RESPONSE;
1223
1224 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1225 packet.destination_hash,
1226 packet.hops,
1227 &packet.data,
1228 &packet.raw,
1229 packet.packet_hash,
1230 packet.flags.context_flag,
1231 received_from,
1232 iface,
1233 now,
1234 existing_blobs,
1235 random_blob,
1236 expires,
1237 rng_value,
1238 self.config.transport_enabled,
1239 is_path_response,
1240 rate_blocked,
1241 Some(original_raw.to_vec()),
1242 );
1243
1244 actions.push(TransportAction::CacheAnnounce {
1246 packet_hash: packet.packet_hash,
1247 raw: original_raw.to_vec(),
1248 });
1249
1250 self.upsert_path_destination(packet.destination_hash, path_entry, now);
1252
1253 if let Some(tunnel_id) = self.interfaces.get(&iface).and_then(|i| i.tunnel_id) {
1255 let blobs = self
1256 .path_table
1257 .get(&packet.destination_hash)
1258 .and_then(|ps| ps.find_by_next_hop(&received_from))
1259 .map(|e| e.random_blobs.clone())
1260 .unwrap_or_default();
1261 self.tunnel_table.store_tunnel_path(
1262 &tunnel_id,
1263 packet.destination_hash,
1264 tunnel::TunnelPath {
1265 timestamp: now,
1266 received_from,
1267 hops: packet.hops,
1268 expires,
1269 random_blobs: blobs,
1270 packet_hash: packet.packet_hash,
1271 },
1272 now,
1273 self.config.destination_timeout_secs,
1274 self.config.max_tunnel_destinations_total,
1275 );
1276 }
1277
1278 self.path_states.remove(&packet.destination_hash);
1280
1281 if let Some(ann) = announce_entry {
1283 self.insert_announce_entry(packet.destination_hash, ann, now);
1284 }
1285
1286 actions.push(TransportAction::AnnounceReceived {
1288 destination_hash: packet.destination_hash,
1289 identity_hash: validated.identity_hash,
1290 public_key: validated.public_key,
1291 name_hash: validated.name_hash,
1292 random_hash: validated.random_hash,
1293 app_data: validated.app_data,
1294 hops: packet.hops,
1295 receiving_interface: iface,
1296 });
1297
1298 actions.push(TransportAction::PathUpdated {
1299 destination_hash: packet.destination_hash,
1300 hops: packet.hops,
1301 next_hop: received_from,
1302 interface: iface,
1303 });
1304
1305 if self.has_local_clients() {
1307 actions.push(TransportAction::ForwardToLocalClients {
1308 raw: packet.raw.clone(),
1309 exclude: Some(iface),
1310 });
1311 }
1312
1313 if let Some(pr_entry) = self.discovery_path_requests_waiting(&packet.destination_hash) {
1315 let entry = AnnounceEntry {
1317 timestamp: now,
1318 retransmit_timeout: now,
1319 retries: constants::PATHFINDER_R,
1320 received_from,
1321 hops: packet.hops,
1322 packet_raw: packet.raw.clone(),
1323 packet_data: packet.data.clone(),
1324 destination_hash: packet.destination_hash,
1325 context_flag: packet.flags.context_flag,
1326 local_rebroadcasts: 0,
1327 block_rebroadcasts: true,
1328 attached_interface: Some(pr_entry),
1329 };
1330 self.insert_announce_entry(packet.destination_hash, entry, now);
1331 }
1332 }
1333
1334 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1335 self.announce_sig_cache.contains(sig_cache_key)
1336 }
1337
1338 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1341 self.discovery_path_requests
1342 .remove(dest_hash)
1343 .map(|req| req.requesting_interface)
1344 }
1345
1346 fn process_inbound_proof(
1351 &mut self,
1352 packet: &RawPacket,
1353 iface: InterfaceId,
1354 _now: f64,
1355 actions: &mut Vec<TransportAction>,
1356 ) {
1357 if packet.context == constants::CONTEXT_LRPROOF {
1358 if (self.config.transport_enabled)
1360 && self.link_table.contains_key(&packet.destination_hash)
1361 {
1362 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1363 if let Some(entry) = link_entry {
1364 if packet.hops == entry.remaining_hops && iface == entry.next_hop_interface {
1365 let mut new_raw = Vec::new();
1368 new_raw.push(packet.raw[0]);
1369 new_raw.push(packet.hops);
1370 new_raw.extend_from_slice(&packet.raw[2..]);
1371
1372 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1374 le.validated = true;
1375 }
1376
1377 actions.push(TransportAction::LinkEstablished {
1378 link_id: packet.destination_hash,
1379 interface: entry.received_interface,
1380 });
1381
1382 actions.push(TransportAction::SendOnInterface {
1383 interface: entry.received_interface,
1384 raw: new_raw,
1385 });
1386 }
1387 }
1388 } else {
1389 actions.push(TransportAction::DeliverLocal {
1391 destination_hash: packet.destination_hash,
1392 raw: packet.raw.clone(),
1393 packet_hash: packet.packet_hash,
1394 receiving_interface: iface,
1395 });
1396 }
1397 } else {
1398 if self.config.transport_enabled {
1400 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1401 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1402 actions.push(action);
1403 }
1404 }
1405 }
1406
1407 actions.push(TransportAction::DeliverLocal {
1409 destination_hash: packet.destination_hash,
1410 raw: packet.raw.clone(),
1411 packet_hash: packet.packet_hash,
1412 receiving_interface: iface,
1413 });
1414 }
1415 }
1416
1417 pub fn handle_outbound(
1423 &mut self,
1424 packet: &RawPacket,
1425 dest_type: u8,
1426 attached_interface: Option<InterfaceId>,
1427 now: f64,
1428 ) -> Vec<TransportAction> {
1429 let actions = route_outbound(
1430 &self.path_table,
1431 &self.interfaces,
1432 &self.local_destinations,
1433 packet,
1434 dest_type,
1435 attached_interface,
1436 now,
1437 );
1438
1439 self.packet_hashlist.add(packet.packet_hash);
1441
1442 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1444 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1445 } else {
1446 actions
1447 }
1448 }
1449
1450 fn gate_announce_actions(
1452 &mut self,
1453 actions: Vec<TransportAction>,
1454 dest_hash: &[u8; 16],
1455 hops: u8,
1456 now: f64,
1457 ) -> Vec<TransportAction> {
1458 let mut result = Vec::new();
1459 for action in actions {
1460 match action {
1461 TransportAction::SendOnInterface { interface, raw } => {
1462 let (bitrate, announce_cap) =
1463 if let Some(info) = self.interfaces.get(&interface) {
1464 (info.bitrate, info.announce_cap)
1465 } else {
1466 (None, constants::ANNOUNCE_CAP)
1467 };
1468 if let Some(send_action) = self.announce_queues.gate_announce(
1469 interface,
1470 raw,
1471 *dest_hash,
1472 hops,
1473 now,
1474 now,
1475 bitrate,
1476 announce_cap,
1477 ) {
1478 result.push(send_action);
1479 }
1480 }
1482 other => result.push(other),
1483 }
1484 }
1485 result
1486 }
1487
1488 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1494 let mut actions = Vec::new();
1495
1496 if now > self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1498 self.cull_expired_announce_entries(now);
1499 self.enforce_announce_retention_cap(now);
1500 if let Some(ref identity_hash) = self.config.identity_hash {
1501 let ih = *identity_hash;
1502 let announce_actions = jobs::process_pending_announces(
1503 &mut self.announce_table,
1504 &mut self.held_announces,
1505 &ih,
1506 now,
1507 );
1508 let gated = self.gate_retransmit_actions(announce_actions, now);
1510 actions.extend(gated);
1511 }
1512 self.cull_expired_announce_entries(now);
1513 self.enforce_announce_retention_cap(now);
1514 self.announces_last_checked = now;
1515 }
1516
1517 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1519 actions.append(&mut queue_actions);
1520
1521 let ic_interfaces = self.ingress_control.interfaces_with_held();
1523 for iface_id in ic_interfaces {
1524 let (ia_freq, started, ic_enabled) = match self.interfaces.get(&iface_id) {
1525 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1526 None => continue,
1527 };
1528 if !ic_enabled {
1529 continue;
1530 }
1531 if let Some(held) = self
1532 .ingress_control
1533 .process_held_announces(iface_id, ia_freq, started, now)
1534 {
1535 let released_actions =
1536 self.handle_inbound(&held.raw, held.receiving_interface, now, rng);
1537 actions.extend(released_actions);
1538 }
1539 }
1540
1541 if now > self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1543 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
1544 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, now);
1545 let (_culled, link_closed_actions) =
1546 jobs::cull_link_table(&mut self.link_table, &self.interfaces, now);
1547 actions.extend(link_closed_actions);
1548 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1549 self.cull_blackholed(now);
1550 self.discovery_path_requests
1552 .retain(|_, req| now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1553 self.tunnel_table
1555 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1556 self.tunnel_table.cull(now);
1557 self.announce_sig_cache.cull(now);
1558 self.tables_last_culled = now;
1559 }
1560
1561 actions
1562 }
1563
1564 fn gate_retransmit_actions(
1569 &mut self,
1570 actions: Vec<TransportAction>,
1571 now: f64,
1572 ) -> Vec<TransportAction> {
1573 let mut result = Vec::new();
1574 for action in actions {
1575 match action {
1576 TransportAction::SendOnInterface { interface, raw } => {
1577 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1579 let (bitrate, announce_cap) =
1580 if let Some(info) = self.interfaces.get(&interface) {
1581 (info.bitrate, info.announce_cap)
1582 } else {
1583 (None, constants::ANNOUNCE_CAP)
1584 };
1585 if let Some(send_action) = self.announce_queues.gate_announce(
1586 interface,
1587 raw,
1588 dest_hash,
1589 hops,
1590 now,
1591 now,
1592 bitrate,
1593 announce_cap,
1594 ) {
1595 result.push(send_action);
1596 }
1597 }
1598 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1599 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1600 let iface_ids: Vec<(InterfaceId, Option<u64>, f64)> = self
1603 .interfaces
1604 .iter()
1605 .filter(|(_, info)| info.out_capable)
1606 .filter(|(id, _)| {
1607 if let Some(ref ex) = exclude {
1608 **id != *ex
1609 } else {
1610 true
1611 }
1612 })
1613 .filter(|(_, info)| {
1614 should_transmit_announce(
1615 info,
1616 &dest_hash,
1617 hops,
1618 &self.local_destinations,
1619 &self.path_table,
1620 &self.interfaces,
1621 )
1622 })
1623 .map(|(id, info)| (*id, info.bitrate, info.announce_cap))
1624 .collect();
1625
1626 for (iface_id, bitrate, announce_cap) in iface_ids {
1627 if let Some(send_action) = self.announce_queues.gate_announce(
1628 iface_id,
1629 raw.clone(),
1630 dest_hash,
1631 hops,
1632 now,
1633 now,
1634 bitrate,
1635 announce_cap,
1636 ) {
1637 result.push(send_action);
1638 }
1639 }
1640 }
1641 other => result.push(other),
1642 }
1643 }
1644 result
1645 }
1646
1647 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1649 if raw.len() < 18 {
1650 return ([0; 16], 0);
1651 }
1652 let header_type = (raw[0] >> 6) & 0x03;
1653 let hops = raw[1];
1654 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1655 let mut dest = [0u8; 16];
1657 dest.copy_from_slice(&raw[18..34]);
1658 (dest, hops)
1659 } else {
1660 let mut dest = [0u8; 16];
1662 dest.copy_from_slice(&raw[2..18]);
1663 (dest, hops)
1664 }
1665 }
1666
1667 pub fn handle_path_request(
1680 &mut self,
1681 data: &[u8],
1682 interface_id: InterfaceId,
1683 now: f64,
1684 ) -> Vec<TransportAction> {
1685 let mut actions = Vec::new();
1686
1687 if data.len() < 16 {
1688 return actions;
1689 }
1690
1691 let mut destination_hash = [0u8; 16];
1692 destination_hash.copy_from_slice(&data[..16]);
1693
1694 let _requesting_transport_id = if data.len() > 32 {
1696 let mut id = [0u8; 16];
1697 id.copy_from_slice(&data[16..32]);
1698 Some(id)
1699 } else {
1700 None
1701 };
1702
1703 let tag_bytes = if data.len() > 32 {
1705 Some(&data[32..])
1706 } else if data.len() > 16 {
1707 Some(&data[16..])
1708 } else {
1709 None
1710 };
1711
1712 if let Some(tag) = tag_bytes {
1713 let tag_len = tag.len().min(16);
1714 let mut unique_tag = [0u8; 32];
1715 unique_tag[..16].copy_from_slice(&destination_hash);
1716 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1717
1718 if !self.insert_discovery_pr_tag(unique_tag) {
1719 return actions; }
1721 } else {
1722 return actions; }
1724
1725 if self.local_destinations.contains_key(&destination_hash) {
1727 return actions;
1728 }
1729
1730 if self.config.transport_enabled && self.has_path(&destination_hash) {
1732 let path = self
1733 .path_table
1734 .get(&destination_hash)
1735 .unwrap()
1736 .primary()
1737 .unwrap()
1738 .clone();
1739
1740 if let Some(recv_info) = self.interfaces.get(&interface_id) {
1744 if recv_info.mode == constants::MODE_ROAMING
1745 && path.receiving_interface == interface_id
1746 {
1747 return actions;
1748 }
1749 }
1750
1751 if let Some(ref raw) = path.announce_raw {
1755 if let Some(existing) = self.announce_table.remove(&destination_hash) {
1757 self.insert_held_announce(destination_hash, existing, now);
1758 }
1759 let retransmit_timeout =
1760 if let Some(iface_info) = self.interfaces.get(&interface_id) {
1761 let base = now + constants::PATH_REQUEST_GRACE;
1762 if iface_info.mode == constants::MODE_ROAMING {
1763 base + constants::PATH_REQUEST_RG
1764 } else {
1765 base
1766 }
1767 } else {
1768 now + constants::PATH_REQUEST_GRACE
1769 };
1770
1771 let (packet_data, context_flag) = match RawPacket::unpack(raw) {
1772 Ok(parsed) => (parsed.data, parsed.flags.context_flag),
1773 Err(_) => {
1774 return actions;
1775 }
1776 };
1777
1778 let entry = AnnounceEntry {
1779 timestamp: now,
1780 retransmit_timeout,
1781 retries: constants::PATHFINDER_R,
1782 received_from: path.next_hop,
1783 hops: path.hops,
1784 packet_raw: raw.clone(),
1785 packet_data,
1786 destination_hash,
1787 context_flag,
1788 local_rebroadcasts: 0,
1789 block_rebroadcasts: true,
1790 attached_interface: Some(interface_id),
1791 };
1792
1793 self.insert_announce_entry(destination_hash, entry, now);
1794 }
1795 } else if self.config.transport_enabled {
1796 let should_discover = self
1798 .interfaces
1799 .get(&interface_id)
1800 .map(|info| constants::DISCOVER_PATHS_FOR.contains(&info.mode))
1801 .unwrap_or(false);
1802
1803 if should_discover {
1804 self.discovery_path_requests.insert(
1806 destination_hash,
1807 DiscoveryPathRequest {
1808 timestamp: now,
1809 requesting_interface: interface_id,
1810 },
1811 );
1812
1813 for (_, iface_info) in self.interfaces.iter() {
1815 if iface_info.id != interface_id && iface_info.out_capable {
1816 actions.push(TransportAction::SendOnInterface {
1817 interface: iface_info.id,
1818 raw: data.to_vec(),
1819 });
1820 }
1821 }
1822 }
1823 }
1824
1825 actions
1826 }
1827
1828 pub fn path_table_entries(&self) -> impl Iterator<Item = (&[u8; 16], &PathEntry)> {
1834 self.path_table
1835 .iter()
1836 .filter_map(|(k, ps)| ps.primary().map(|e| (k, e)))
1837 }
1838
1839 pub fn path_table_sets(&self) -> impl Iterator<Item = (&[u8; 16], &PathSet)> {
1841 self.path_table.iter()
1842 }
1843
1844 pub fn interface_count(&self) -> usize {
1846 self.interfaces.len()
1847 }
1848
1849 pub fn link_table_count(&self) -> usize {
1851 self.link_table.len()
1852 }
1853
1854 pub fn path_table_count(&self) -> usize {
1856 self.path_table.len()
1857 }
1858
1859 pub fn announce_table_count(&self) -> usize {
1861 self.announce_table.len()
1862 }
1863
1864 pub fn reverse_table_count(&self) -> usize {
1866 self.reverse_table.len()
1867 }
1868
1869 pub fn held_announces_count(&self) -> usize {
1871 self.held_announces.len()
1872 }
1873
1874 pub fn packet_hashlist_len(&self) -> usize {
1876 self.packet_hashlist.len()
1877 }
1878
1879 pub fn announce_sig_cache_len(&self) -> usize {
1881 self.announce_sig_cache.len()
1882 }
1883
1884 pub fn rate_limiter_count(&self) -> usize {
1886 self.rate_limiter.len()
1887 }
1888
1889 pub fn blackholed_count(&self) -> usize {
1891 self.blackholed_identities.len()
1892 }
1893
1894 pub fn tunnel_count(&self) -> usize {
1896 self.tunnel_table.len()
1897 }
1898
1899 pub fn discovery_pr_tags_count(&self) -> usize {
1901 self.discovery_pr_tags.len()
1902 }
1903
1904 pub fn discovery_path_requests_count(&self) -> usize {
1906 self.discovery_path_requests.len()
1907 }
1908
1909 pub fn announce_queue_count(&self) -> usize {
1911 self.announce_queues.queue_count()
1912 }
1913
1914 pub fn nonempty_announce_queue_count(&self) -> usize {
1916 self.announce_queues.nonempty_queue_count()
1917 }
1918
1919 pub fn queued_announce_count(&self) -> usize {
1921 self.announce_queues.total_queued_announces()
1922 }
1923
1924 pub fn queued_announce_bytes(&self) -> usize {
1926 self.announce_queues.total_queued_bytes()
1927 }
1928
1929 pub fn announce_queue_interface_cap_drop_count(&self) -> u64 {
1931 self.announce_queues.interface_cap_drop_count()
1932 }
1933
1934 pub fn local_destinations_count(&self) -> usize {
1936 self.local_destinations.len()
1937 }
1938
1939 pub fn rate_limiter(&self) -> &AnnounceRateLimiter {
1941 &self.rate_limiter
1942 }
1943
1944 pub fn interface_info(&self, id: &InterfaceId) -> Option<&InterfaceInfo> {
1946 self.interfaces.get(id)
1947 }
1948
1949 pub fn redirect_path(&mut self, dest_hash: &[u8; 16], interface: InterfaceId, now: f64) {
1952 if let Some(entry) = self
1953 .path_table
1954 .get_mut(dest_hash)
1955 .and_then(|ps| ps.primary_mut())
1956 {
1957 entry.receiving_interface = interface;
1958 entry.hops = 1;
1959 } else {
1960 self.upsert_path_destination(
1961 *dest_hash,
1962 PathEntry {
1963 timestamp: now,
1964 next_hop: [0u8; 16],
1965 hops: 1,
1966 expires: now + 3600.0,
1967 random_blobs: Vec::new(),
1968 receiving_interface: interface,
1969 packet_hash: [0u8; 32],
1970 announce_raw: None,
1971 },
1972 now,
1973 );
1974 }
1975 }
1976
1977 pub fn inject_path(&mut self, dest_hash: [u8; 16], entry: PathEntry) {
1979 self.upsert_path_destination(dest_hash, entry.clone(), entry.timestamp);
1980 }
1981
1982 pub fn drop_path(&mut self, dest_hash: &[u8; 16]) -> bool {
1984 self.path_table.remove(dest_hash).is_some()
1985 }
1986
1987 pub fn drop_all_via(&mut self, transport_hash: &[u8; 16]) -> usize {
1992 let mut removed = 0usize;
1993 for ps in self.path_table.values_mut() {
1994 let before = ps.len();
1995 ps.retain(|entry| &entry.next_hop != transport_hash);
1996 removed += before - ps.len();
1997 }
1998 self.path_table.retain(|_, ps| !ps.is_empty());
1999 removed
2000 }
2001
2002 pub fn drop_paths_for_interface(&mut self, interface: InterfaceId) -> usize {
2004 let mut removed = 0usize;
2005 let mut cleared_destinations = Vec::new();
2006 for (dest_hash, ps) in self.path_table.iter_mut() {
2007 let before = ps.len();
2008 ps.retain(|entry| entry.receiving_interface != interface);
2009 if ps.is_empty() {
2010 cleared_destinations.push(*dest_hash);
2011 }
2012 removed += before - ps.len();
2013 }
2014 self.path_table.retain(|_, ps| !ps.is_empty());
2015 for dest_hash in cleared_destinations {
2016 self.path_states.remove(&dest_hash);
2017 }
2018 removed
2019 }
2020
2021 pub fn drop_reverse_for_interface(&mut self, interface: InterfaceId) -> usize {
2023 let before = self.reverse_table.len();
2024 self.reverse_table.retain(|_, entry| {
2025 entry.receiving_interface != interface && entry.outbound_interface != interface
2026 });
2027 before - self.reverse_table.len()
2028 }
2029
2030 pub fn drop_links_for_interface(&mut self, interface: InterfaceId) -> usize {
2032 let before = self.link_table.len();
2033 self.link_table.retain(|_, entry| {
2034 entry.next_hop_interface != interface && entry.received_interface != interface
2035 });
2036 before - self.link_table.len()
2037 }
2038
2039 pub fn drop_announce_queues(&mut self) {
2041 self.announce_table.clear();
2042 self.held_announces.clear();
2043 self.announce_queues = AnnounceQueues::new(self.config.announce_queue_max_interfaces);
2044 self.ingress_control.clear();
2045 }
2046
2047 pub fn identity_hash(&self) -> Option<&[u8; 16]> {
2049 self.config.identity_hash.as_ref()
2050 }
2051
2052 pub fn transport_enabled(&self) -> bool {
2054 self.config.transport_enabled
2055 }
2056
2057 pub fn config(&self) -> &TransportConfig {
2059 &self.config
2060 }
2061
2062 pub fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
2063 self.config.packet_hashlist_max_entries = max_entries;
2064 self.packet_hashlist = PacketHashlist::new(max_entries);
2065 }
2066
2067 pub fn get_path_table(&self, max_hops: Option<u8>) -> Vec<PathTableRow> {
2071 let mut result = Vec::new();
2072 for (dest_hash, ps) in self.path_table.iter() {
2073 if let Some(entry) = ps.primary() {
2074 if let Some(max) = max_hops {
2075 if entry.hops > max {
2076 continue;
2077 }
2078 }
2079 let iface_name = self
2080 .interfaces
2081 .get(&entry.receiving_interface)
2082 .map(|i| i.name.clone())
2083 .unwrap_or_else(|| {
2084 alloc::format!("Interface({})", entry.receiving_interface.0)
2085 });
2086 result.push((
2087 *dest_hash,
2088 entry.timestamp,
2089 entry.next_hop,
2090 entry.hops,
2091 entry.expires,
2092 iface_name,
2093 ));
2094 }
2095 }
2096 result
2097 }
2098
2099 pub fn get_rate_table(&self) -> Vec<RateTableRow> {
2102 self.rate_limiter
2103 .entries()
2104 .map(|(hash, entry)| {
2105 (
2106 *hash,
2107 entry.last,
2108 entry.rate_violations,
2109 entry.blocked_until,
2110 entry.timestamps.clone(),
2111 )
2112 })
2113 .collect()
2114 }
2115
2116 pub fn get_blackholed(&self) -> Vec<([u8; 16], f64, f64, Option<alloc::string::String>)> {
2119 self.blackholed_entries()
2120 .map(|(hash, entry)| (*hash, entry.created, entry.expires, entry.reason.clone()))
2121 .collect()
2122 }
2123
2124 pub fn active_destination_hashes(&self) -> alloc::collections::BTreeSet<[u8; 16]> {
2130 self.path_table.keys().copied().collect()
2131 }
2132
2133 pub fn path_destination_cap_evict_count(&self) -> usize {
2134 self.path_destination_cap_evict_count
2135 }
2136
2137 pub fn active_packet_hashes(&self) -> Vec<[u8; 32]> {
2139 self.path_table
2140 .values()
2141 .flat_map(|ps| ps.iter().map(|p| p.packet_hash))
2142 .collect()
2143 }
2144
2145 pub fn cull_rate_limiter(
2148 &mut self,
2149 active: &alloc::collections::BTreeSet<[u8; 16]>,
2150 now: f64,
2151 ttl_secs: f64,
2152 ) -> usize {
2153 self.rate_limiter.cull_stale(active, now, ttl_secs)
2154 }
2155
2156 pub fn update_interface_freq(&mut self, id: InterfaceId, ia_freq: f64) {
2162 if let Some(info) = self.interfaces.get_mut(&id) {
2163 info.ia_freq = ia_freq;
2164 }
2165 }
2166
2167 pub fn held_announce_count(&self, interface: &InterfaceId) -> usize {
2169 self.ingress_control.held_count(interface)
2170 }
2171
2172 #[cfg(test)]
2177 #[allow(dead_code)]
2178 pub(crate) fn path_table(&self) -> &BTreeMap<[u8; 16], PathSet> {
2179 &self.path_table
2180 }
2181
2182 #[cfg(test)]
2183 #[allow(dead_code)]
2184 pub(crate) fn announce_table(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2185 &self.announce_table
2186 }
2187
2188 #[cfg(test)]
2189 #[allow(dead_code)]
2190 pub(crate) fn held_announces(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2191 &self.held_announces
2192 }
2193
2194 #[cfg(test)]
2195 #[allow(dead_code)]
2196 pub(crate) fn announce_retained_bytes(&self) -> usize {
2197 self.announce_retained_bytes_total()
2198 }
2199
2200 #[cfg(test)]
2201 #[allow(dead_code)]
2202 pub(crate) fn reverse_table(&self) -> &BTreeMap<[u8; 16], tables::ReverseEntry> {
2203 &self.reverse_table
2204 }
2205
2206 #[cfg(test)]
2207 #[allow(dead_code)]
2208 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
2209 &self.link_table
2210 }
2211}
2212
2213#[cfg(test)]
2214mod tests {
2215 use super::*;
2216 use crate::packet::PacketFlags;
2217
2218 fn make_config(transport_enabled: bool) -> TransportConfig {
2219 TransportConfig {
2220 transport_enabled,
2221 identity_hash: if transport_enabled {
2222 Some([0x42; 16])
2223 } else {
2224 None
2225 },
2226 prefer_shorter_path: false,
2227 max_paths_per_destination: 1,
2228 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
2229 max_discovery_pr_tags: constants::MAX_PR_TAGS,
2230 max_path_destinations: usize::MAX,
2231 max_tunnel_destinations_total: usize::MAX,
2232 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
2233 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
2234 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
2235 announce_sig_cache_enabled: true,
2236 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
2237 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
2238 announce_queue_max_entries: 256,
2239 announce_queue_max_interfaces: 1024,
2240 }
2241 }
2242
2243 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
2244 InterfaceInfo {
2245 id: InterfaceId(id),
2246 name: String::from("test"),
2247 mode,
2248 out_capable: true,
2249 in_capable: true,
2250 bitrate: None,
2251 announce_rate_target: None,
2252 announce_rate_grace: 0,
2253 announce_rate_penalty: 0.0,
2254 announce_cap: constants::ANNOUNCE_CAP,
2255 is_local_client: false,
2256 wants_tunnel: false,
2257 tunnel_id: None,
2258 mtu: constants::MTU as u32,
2259 ingress_control: false,
2260 ia_freq: 0.0,
2261 started: 0.0,
2262 }
2263 }
2264
2265 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
2266 AnnounceEntry {
2267 timestamp,
2268 retransmit_timeout: timestamp,
2269 retries: 0,
2270 received_from: [0xAA; 16],
2271 hops: 2,
2272 packet_raw: vec![0x01; fill_len],
2273 packet_data: vec![0x02; fill_len],
2274 destination_hash: dest_hash,
2275 context_flag: 0,
2276 local_rebroadcasts: 0,
2277 block_rebroadcasts: false,
2278 attached_interface: None,
2279 }
2280 }
2281
2282 fn make_path_entry(
2283 timestamp: f64,
2284 hops: u8,
2285 receiving_interface: InterfaceId,
2286 next_hop: [u8; 16],
2287 ) -> PathEntry {
2288 PathEntry {
2289 timestamp,
2290 next_hop,
2291 hops,
2292 expires: timestamp + 10_000.0,
2293 random_blobs: Vec::new(),
2294 receiving_interface,
2295 packet_hash: [0; 32],
2296 announce_raw: None,
2297 }
2298 }
2299
2300 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
2301 let mut unique_tag = [0u8; 32];
2302 let tag_len = tag.len().min(16);
2303 unique_tag[..16].copy_from_slice(&dest_hash);
2304 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
2305 unique_tag
2306 }
2307
2308 #[test]
2309 fn test_empty_engine() {
2310 let engine = TransportEngine::new(make_config(false));
2311 assert!(!engine.has_path(&[0; 16]));
2312 assert!(engine.hops_to(&[0; 16]).is_none());
2313 assert!(engine.next_hop(&[0; 16]).is_none());
2314 }
2315
2316 #[test]
2317 fn test_register_deregister_interface() {
2318 let mut engine = TransportEngine::new(make_config(false));
2319 engine.register_interface(make_interface(1, constants::MODE_FULL));
2320 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
2321
2322 engine.deregister_interface(InterfaceId(1));
2323 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
2324 }
2325
2326 #[test]
2327 fn test_deregister_interface_removes_announce_queue_state() {
2328 let mut engine = TransportEngine::new(make_config(false));
2329 engine.register_interface(make_interface(1, constants::MODE_FULL));
2330
2331 let _ = engine.announce_queues.gate_announce(
2332 InterfaceId(1),
2333 vec![0x01; 100],
2334 [0xAA; 16],
2335 2,
2336 0.0,
2337 0.0,
2338 Some(1000),
2339 constants::ANNOUNCE_CAP,
2340 );
2341 let _ = engine.announce_queues.gate_announce(
2342 InterfaceId(1),
2343 vec![0x02; 100],
2344 [0xBB; 16],
2345 3,
2346 0.0,
2347 0.0,
2348 Some(1000),
2349 constants::ANNOUNCE_CAP,
2350 );
2351 assert_eq!(engine.announce_queue_count(), 1);
2352
2353 engine.deregister_interface(InterfaceId(1));
2354 assert_eq!(engine.announce_queue_count(), 0);
2355 }
2356
2357 #[test]
2358 fn test_deregister_interface_preserves_other_announce_queues() {
2359 let mut engine = TransportEngine::new(make_config(false));
2360 engine.register_interface(make_interface(1, constants::MODE_FULL));
2361 engine.register_interface(make_interface(2, constants::MODE_FULL));
2362
2363 let _ = engine.announce_queues.gate_announce(
2364 InterfaceId(1),
2365 vec![0x01; 100],
2366 [0xAA; 16],
2367 2,
2368 0.0,
2369 0.0,
2370 Some(1000),
2371 constants::ANNOUNCE_CAP,
2372 );
2373 let _ = engine.announce_queues.gate_announce(
2374 InterfaceId(1),
2375 vec![0x02; 100],
2376 [0xAB; 16],
2377 3,
2378 0.0,
2379 0.0,
2380 Some(1000),
2381 constants::ANNOUNCE_CAP,
2382 );
2383 let _ = engine.announce_queues.gate_announce(
2384 InterfaceId(2),
2385 vec![0x03; 100],
2386 [0xBA; 16],
2387 2,
2388 0.0,
2389 0.0,
2390 Some(1000),
2391 constants::ANNOUNCE_CAP,
2392 );
2393 let _ = engine.announce_queues.gate_announce(
2394 InterfaceId(2),
2395 vec![0x04; 100],
2396 [0xBB; 16],
2397 3,
2398 0.0,
2399 0.0,
2400 Some(1000),
2401 constants::ANNOUNCE_CAP,
2402 );
2403
2404 engine.deregister_interface(InterfaceId(1));
2405 assert_eq!(engine.announce_queue_count(), 1);
2406 assert_eq!(engine.nonempty_announce_queue_count(), 1);
2407 }
2408
2409 #[test]
2410 fn test_register_deregister_destination() {
2411 let mut engine = TransportEngine::new(make_config(false));
2412 let dest = [0x11; 16];
2413 engine.register_destination(dest, constants::DESTINATION_SINGLE);
2414 assert!(engine.local_destinations.contains_key(&dest));
2415
2416 engine.deregister_destination(&dest);
2417 assert!(!engine.local_destinations.contains_key(&dest));
2418 }
2419
2420 #[test]
2421 fn test_path_state() {
2422 let mut engine = TransportEngine::new(make_config(false));
2423 let dest = [0x22; 16];
2424
2425 assert!(!engine.path_is_unresponsive(&dest));
2426
2427 engine.mark_path_unresponsive(&dest, None);
2428 assert!(engine.path_is_unresponsive(&dest));
2429
2430 engine.mark_path_responsive(&dest);
2431 assert!(!engine.path_is_unresponsive(&dest));
2432 }
2433
2434 #[test]
2435 fn test_boundary_exempts_unresponsive() {
2436 let mut engine = TransportEngine::new(make_config(false));
2437 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2438 let dest = [0xB1; 16];
2439
2440 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2442 assert!(!engine.path_is_unresponsive(&dest));
2443 }
2444
2445 #[test]
2446 fn test_non_boundary_marks_unresponsive() {
2447 let mut engine = TransportEngine::new(make_config(false));
2448 engine.register_interface(make_interface(1, constants::MODE_FULL));
2449 let dest = [0xB2; 16];
2450
2451 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2453 assert!(engine.path_is_unresponsive(&dest));
2454 }
2455
2456 #[test]
2457 fn test_expire_path() {
2458 let mut engine = TransportEngine::new(make_config(false));
2459 let dest = [0x33; 16];
2460
2461 engine.path_table.insert(
2462 dest,
2463 PathSet::from_single(
2464 PathEntry {
2465 timestamp: 1000.0,
2466 next_hop: [0; 16],
2467 hops: 2,
2468 expires: 9999.0,
2469 random_blobs: Vec::new(),
2470 receiving_interface: InterfaceId(1),
2471 packet_hash: [0; 32],
2472 announce_raw: None,
2473 },
2474 1,
2475 ),
2476 );
2477
2478 assert!(engine.has_path(&dest));
2479 engine.expire_path(&dest);
2480 assert!(engine.has_path(&dest));
2482 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2483 }
2484
2485 #[test]
2486 fn test_link_table_operations() {
2487 let mut engine = TransportEngine::new(make_config(false));
2488 let link_id = [0x44; 16];
2489
2490 engine.register_link(
2491 link_id,
2492 LinkEntry {
2493 timestamp: 100.0,
2494 next_hop_transport_id: [0; 16],
2495 next_hop_interface: InterfaceId(1),
2496 remaining_hops: 3,
2497 received_interface: InterfaceId(2),
2498 taken_hops: 2,
2499 destination_hash: [0xAA; 16],
2500 validated: false,
2501 proof_timeout: 200.0,
2502 },
2503 );
2504
2505 assert!(engine.link_table.contains_key(&link_id));
2506 assert!(!engine.link_table[&link_id].validated);
2507
2508 engine.validate_link(&link_id);
2509 assert!(engine.link_table[&link_id].validated);
2510
2511 engine.remove_link(&link_id);
2512 assert!(!engine.link_table.contains_key(&link_id));
2513 }
2514
2515 #[test]
2516 fn test_packet_filter_drops_plain_announce() {
2517 let engine = TransportEngine::new(make_config(false));
2518 let flags = PacketFlags {
2519 header_type: constants::HEADER_1,
2520 context_flag: constants::FLAG_UNSET,
2521 transport_type: constants::TRANSPORT_BROADCAST,
2522 destination_type: constants::DESTINATION_PLAIN,
2523 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2524 };
2525 let packet =
2526 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2527 assert!(!engine.packet_filter(&packet));
2528 }
2529
2530 #[test]
2531 fn test_packet_filter_allows_keepalive() {
2532 let engine = TransportEngine::new(make_config(false));
2533 let flags = PacketFlags {
2534 header_type: constants::HEADER_1,
2535 context_flag: constants::FLAG_UNSET,
2536 transport_type: constants::TRANSPORT_BROADCAST,
2537 destination_type: constants::DESTINATION_SINGLE,
2538 packet_type: constants::PACKET_TYPE_DATA,
2539 };
2540 let packet = RawPacket::pack(
2541 flags,
2542 0,
2543 &[0; 16],
2544 None,
2545 constants::CONTEXT_KEEPALIVE,
2546 b"test",
2547 )
2548 .unwrap();
2549 assert!(engine.packet_filter(&packet));
2550 }
2551
2552 #[test]
2553 fn test_packet_filter_drops_high_hop_plain() {
2554 let engine = TransportEngine::new(make_config(false));
2555 let flags = PacketFlags {
2556 header_type: constants::HEADER_1,
2557 context_flag: constants::FLAG_UNSET,
2558 transport_type: constants::TRANSPORT_BROADCAST,
2559 destination_type: constants::DESTINATION_PLAIN,
2560 packet_type: constants::PACKET_TYPE_DATA,
2561 };
2562 let mut packet =
2563 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2564 packet.hops = 2;
2565 assert!(!engine.packet_filter(&packet));
2566 }
2567
2568 #[test]
2569 fn test_packet_filter_allows_duplicate_single_announce() {
2570 let mut engine = TransportEngine::new(make_config(false));
2571 let flags = PacketFlags {
2572 header_type: constants::HEADER_1,
2573 context_flag: constants::FLAG_UNSET,
2574 transport_type: constants::TRANSPORT_BROADCAST,
2575 destination_type: constants::DESTINATION_SINGLE,
2576 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2577 };
2578 let packet = RawPacket::pack(
2579 flags,
2580 0,
2581 &[0; 16],
2582 None,
2583 constants::CONTEXT_NONE,
2584 &[0xAA; 64],
2585 )
2586 .unwrap();
2587
2588 engine.packet_hashlist.add(packet.packet_hash);
2590
2591 assert!(engine.packet_filter(&packet));
2593 }
2594
2595 #[test]
2596 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2597 let mut engine = TransportEngine::new(make_config(false));
2598 engine.packet_hashlist = PacketHashlist::new(2);
2599
2600 let make_packet = |seed: u8| {
2601 let flags = PacketFlags {
2602 header_type: constants::HEADER_1,
2603 context_flag: constants::FLAG_UNSET,
2604 transport_type: constants::TRANSPORT_BROADCAST,
2605 destination_type: constants::DESTINATION_SINGLE,
2606 packet_type: constants::PACKET_TYPE_DATA,
2607 };
2608 RawPacket::pack(
2609 flags,
2610 0,
2611 &[seed; 16],
2612 None,
2613 constants::CONTEXT_NONE,
2614 &[seed; 4],
2615 )
2616 .unwrap()
2617 };
2618
2619 let packet1 = make_packet(1);
2620 let packet2 = make_packet(2);
2621 let packet3 = make_packet(3);
2622
2623 engine.packet_hashlist.add(packet1.packet_hash);
2624 engine.packet_hashlist.add(packet2.packet_hash);
2625 assert!(!engine.packet_filter(&packet1));
2626
2627 engine.packet_hashlist.add(packet3.packet_hash);
2628
2629 assert!(engine.packet_filter(&packet1));
2630 assert!(!engine.packet_filter(&packet2));
2631 assert!(!engine.packet_filter(&packet3));
2632 }
2633
2634 #[test]
2635 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2636 let mut engine = TransportEngine::new(make_config(false));
2637 engine.packet_hashlist = PacketHashlist::new(2);
2638
2639 let make_packet = |seed: u8| {
2640 let flags = PacketFlags {
2641 header_type: constants::HEADER_1,
2642 context_flag: constants::FLAG_UNSET,
2643 transport_type: constants::TRANSPORT_BROADCAST,
2644 destination_type: constants::DESTINATION_SINGLE,
2645 packet_type: constants::PACKET_TYPE_DATA,
2646 };
2647 RawPacket::pack(
2648 flags,
2649 0,
2650 &[seed; 16],
2651 None,
2652 constants::CONTEXT_NONE,
2653 &[seed; 4],
2654 )
2655 .unwrap()
2656 };
2657
2658 let packet1 = make_packet(1);
2659 let packet2 = make_packet(2);
2660 let packet3 = make_packet(3);
2661
2662 engine.packet_hashlist.add(packet1.packet_hash);
2663 engine.packet_hashlist.add(packet2.packet_hash);
2664 engine.packet_hashlist.add(packet2.packet_hash);
2665 engine.packet_hashlist.add(packet3.packet_hash);
2666
2667 assert!(engine.packet_filter(&packet1));
2668 assert!(!engine.packet_filter(&packet2));
2669 assert!(!engine.packet_filter(&packet3));
2670 }
2671
2672 #[test]
2673 fn test_tick_retransmits_announce() {
2674 let mut engine = TransportEngine::new(make_config(true));
2675 engine.register_interface(make_interface(1, constants::MODE_FULL));
2676
2677 let dest = [0x55; 16];
2678 engine.insert_announce_entry(
2679 dest,
2680 AnnounceEntry {
2681 timestamp: 190.0,
2682 retransmit_timeout: 100.0, retries: 0,
2684 received_from: [0xAA; 16],
2685 hops: 2,
2686 packet_raw: vec![0x01, 0x02],
2687 packet_data: vec![0xCC; 10],
2688 destination_hash: dest,
2689 context_flag: 0,
2690 local_rebroadcasts: 0,
2691 block_rebroadcasts: false,
2692 attached_interface: None,
2693 },
2694 190.0,
2695 );
2696
2697 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2698 let actions = engine.tick(200.0, &mut rng);
2699
2700 assert!(!actions.is_empty());
2703 assert!(matches!(
2704 &actions[0],
2705 TransportAction::SendOnInterface { .. }
2706 ));
2707
2708 assert_eq!(engine.announce_table[&dest].retries, 1);
2710 }
2711
2712 #[test]
2713 fn test_tick_culls_expired_announce_entries() {
2714 let mut config = make_config(true);
2715 config.announce_table_ttl_secs = 10.0;
2716 let mut engine = TransportEngine::new(config);
2717
2718 let dest1 = [0x61; 16];
2719 let dest2 = [0x62; 16];
2720 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2721 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2722
2723 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2724 let _ = engine.tick(111.0, &mut rng);
2725
2726 assert!(!engine.announce_table().contains_key(&dest1));
2727 assert!(!engine.held_announces().contains_key(&dest2));
2728 }
2729
2730 #[test]
2731 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2732 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2733 let mut config = make_config(true);
2734 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2735 * 2
2736 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2737 let max_bytes = config.announce_table_max_bytes;
2738 let mut engine = TransportEngine::new(config);
2739
2740 let held_dest = [0x71; 16];
2741 let active_dest = [0x72; 16];
2742 let newest_dest = [0x73; 16];
2743
2744 assert!(engine.insert_held_announce(
2745 held_dest,
2746 make_announce_entry(held_dest, 100.0, 32),
2747 100.0,
2748 ));
2749 assert!(engine.insert_announce_entry(
2750 active_dest,
2751 make_announce_entry(active_dest, 100.0, 32),
2752 100.0,
2753 ));
2754 assert!(engine.insert_announce_entry(
2755 newest_dest,
2756 make_announce_entry(newest_dest, 101.0, 32),
2757 101.0,
2758 ));
2759
2760 assert!(!engine.held_announces().contains_key(&held_dest));
2761 assert!(engine.announce_table().contains_key(&active_dest));
2762 assert!(engine.announce_table().contains_key(&newest_dest));
2763 assert!(engine.announce_retained_bytes() <= max_bytes);
2764 }
2765
2766 #[test]
2767 fn test_oversized_announce_entry_is_not_retained() {
2768 let mut config = make_config(true);
2769 config.announce_table_max_bytes = 200;
2770 let mut engine = TransportEngine::new(config);
2771 let dest = [0x81; 16];
2772
2773 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2774 assert!(!engine.announce_table().contains_key(&dest));
2775 assert_eq!(engine.announce_retained_bytes(), 0);
2776 }
2777
2778 #[test]
2779 fn test_blackhole_identity() {
2780 let mut engine = TransportEngine::new(make_config(false));
2781 let hash = [0xAA; 16];
2782 let now = 1000.0;
2783
2784 assert!(!engine.is_blackholed(&hash, now));
2785
2786 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2787 assert!(engine.is_blackholed(&hash, now));
2788 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
2791 assert!(!engine.is_blackholed(&hash, now));
2792 assert!(!engine.unblackhole_identity(&hash)); }
2794
2795 #[test]
2796 fn test_blackhole_with_duration() {
2797 let mut engine = TransportEngine::new(make_config(false));
2798 let hash = [0xBB; 16];
2799 let now = 1000.0;
2800
2801 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
2803 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
2806
2807 #[test]
2808 fn test_cull_blackholed() {
2809 let mut engine = TransportEngine::new(make_config(false));
2810 let hash1 = [0xCC; 16];
2811 let hash2 = [0xDD; 16];
2812 let now = 1000.0;
2813
2814 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
2820 assert!(engine.blackholed_identities.contains_key(&hash2));
2821 }
2822
2823 #[test]
2824 fn test_blackhole_blocks_announce() {
2825 use crate::announce::AnnounceData;
2826 use crate::destination::{destination_hash, name_hash};
2827
2828 let mut engine = TransportEngine::new(make_config(false));
2829 engine.register_interface(make_interface(1, constants::MODE_FULL));
2830
2831 let identity =
2832 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2833 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2834 let name_h = name_hash("test", &["app"]);
2835 let random_hash = [0x42u8; 10];
2836
2837 let (announce_data, _) =
2838 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2839
2840 let flags = PacketFlags {
2841 header_type: constants::HEADER_1,
2842 context_flag: constants::FLAG_UNSET,
2843 transport_type: constants::TRANSPORT_BROADCAST,
2844 destination_type: constants::DESTINATION_SINGLE,
2845 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2846 };
2847 let packet = RawPacket::pack(
2848 flags,
2849 0,
2850 &dest_hash,
2851 None,
2852 constants::CONTEXT_NONE,
2853 &announce_data,
2854 )
2855 .unwrap();
2856
2857 let now = 1000.0;
2859 engine.blackhole_identity(*identity.hash(), now, None, None);
2860
2861 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2862 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
2863
2864 assert!(actions
2866 .iter()
2867 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2868 assert!(actions
2869 .iter()
2870 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2871 }
2872
2873 #[test]
2874 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2875 use crate::announce::AnnounceData;
2876 use crate::destination::{destination_hash, name_hash};
2877 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2878
2879 let mut engine = TransportEngine::new(make_config(true));
2880 engine.register_interface(make_interface(1, constants::MODE_FULL));
2881
2882 let identity =
2883 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2884 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2885 let name_h = name_hash("async", &["announce"]);
2886 let random_hash = [0x44u8; 10];
2887 let (announce_data, _) =
2888 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2889
2890 let packet = RawPacket::pack(
2891 PacketFlags {
2892 header_type: constants::HEADER_2,
2893 context_flag: constants::FLAG_UNSET,
2894 transport_type: constants::TRANSPORT_TRANSPORT,
2895 destination_type: constants::DESTINATION_SINGLE,
2896 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2897 },
2898 3,
2899 &dest_hash,
2900 Some(&[0xBB; 16]),
2901 constants::CONTEXT_NONE,
2902 &announce_data,
2903 )
2904 .unwrap();
2905
2906 engine.announce_table.insert(
2907 dest_hash,
2908 AnnounceEntry {
2909 timestamp: 1000.0,
2910 retransmit_timeout: 2000.0,
2911 retries: constants::PATHFINDER_R,
2912 received_from: [0xBB; 16],
2913 hops: 2,
2914 packet_raw: packet.raw.clone(),
2915 packet_data: packet.data.clone(),
2916 destination_hash: dest_hash,
2917 context_flag: constants::FLAG_UNSET,
2918 local_rebroadcasts: 0,
2919 block_rebroadcasts: false,
2920 attached_interface: None,
2921 },
2922 );
2923
2924 let mut queue = AnnounceVerifyQueue::new(8);
2925 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2926 let actions = engine.handle_inbound_with_announce_queue(
2927 &packet.raw,
2928 InterfaceId(1),
2929 1000.0,
2930 &mut rng,
2931 Some(&mut queue),
2932 );
2933
2934 assert!(actions.is_empty());
2935 assert_eq!(queue.len(), 1);
2936 assert!(
2937 !engine.announce_table.contains_key(&dest_hash),
2938 "retransmit completion should clear announce_table before queueing"
2939 );
2940 }
2941
2942 #[test]
2943 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
2944 use crate::announce::AnnounceData;
2945 use crate::destination::{destination_hash, name_hash};
2946 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2947
2948 let mut engine = TransportEngine::new(make_config(false));
2949 engine.register_interface(make_interface(1, constants::MODE_FULL));
2950
2951 let identity =
2952 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
2953 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
2954 let name_h = name_hash("async", &["cache"]);
2955 let random_hash = [0x55u8; 10];
2956 let (announce_data, _) =
2957 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2958
2959 let packet = RawPacket::pack(
2960 PacketFlags {
2961 header_type: constants::HEADER_1,
2962 context_flag: constants::FLAG_UNSET,
2963 transport_type: constants::TRANSPORT_BROADCAST,
2964 destination_type: constants::DESTINATION_SINGLE,
2965 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2966 },
2967 0,
2968 &dest_hash,
2969 None,
2970 constants::CONTEXT_NONE,
2971 &announce_data,
2972 )
2973 .unwrap();
2974
2975 let mut queue = AnnounceVerifyQueue::new(8);
2976 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
2977 let actions = engine.handle_inbound_with_announce_queue(
2978 &packet.raw,
2979 InterfaceId(1),
2980 1000.0,
2981 &mut rng,
2982 Some(&mut queue),
2983 );
2984 assert!(actions.is_empty());
2985 assert_eq!(queue.len(), 1);
2986
2987 let mut batch = queue.take_pending(1000.0);
2988 assert_eq!(batch.len(), 1);
2989 let (key, pending) = batch.pop().unwrap();
2990
2991 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
2992 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
2993 let mut material = [0u8; 80];
2994 material[..16].copy_from_slice(&pending.packet.destination_hash);
2995 material[16..].copy_from_slice(&announce.signature);
2996 let sig_cache_key = hash::full_hash(&material);
2997
2998 let pending = queue.complete_success(&key).unwrap();
2999 let actions =
3000 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
3001 assert!(actions
3002 .iter()
3003 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
3004 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
3005
3006 let actions = engine.handle_inbound_with_announce_queue(
3007 &packet.raw,
3008 InterfaceId(1),
3009 1001.0,
3010 &mut rng,
3011 Some(&mut queue),
3012 );
3013 assert!(actions.is_empty());
3014 assert_eq!(queue.len(), 0);
3015 }
3016
3017 #[test]
3018 fn test_tick_culls_expired_path() {
3019 let mut engine = TransportEngine::new(make_config(false));
3020 engine.register_interface(make_interface(1, constants::MODE_FULL));
3021
3022 let dest = [0x66; 16];
3023 engine.path_table.insert(
3024 dest,
3025 PathSet::from_single(
3026 PathEntry {
3027 timestamp: 100.0,
3028 next_hop: [0; 16],
3029 hops: 2,
3030 expires: 200.0,
3031 random_blobs: Vec::new(),
3032 receiving_interface: InterfaceId(1),
3033 packet_hash: [0; 32],
3034 announce_raw: None,
3035 },
3036 1,
3037 ),
3038 );
3039
3040 assert!(engine.has_path(&dest));
3041
3042 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3043 engine.tick(300.0, &mut rng);
3045
3046 assert!(!engine.has_path(&dest));
3047 }
3048
3049 fn make_local_client_interface(id: u64) -> InterfaceInfo {
3054 InterfaceInfo {
3055 id: InterfaceId(id),
3056 name: String::from("local_client"),
3057 mode: constants::MODE_FULL,
3058 out_capable: true,
3059 in_capable: true,
3060 bitrate: None,
3061 announce_rate_target: None,
3062 announce_rate_grace: 0,
3063 announce_rate_penalty: 0.0,
3064 announce_cap: constants::ANNOUNCE_CAP,
3065 is_local_client: true,
3066 wants_tunnel: false,
3067 tunnel_id: None,
3068 mtu: constants::MTU as u32,
3069 ingress_control: false,
3070 ia_freq: 0.0,
3071 started: 0.0,
3072 }
3073 }
3074
3075 #[test]
3076 fn test_has_local_clients() {
3077 let mut engine = TransportEngine::new(make_config(false));
3078 assert!(!engine.has_local_clients());
3079
3080 engine.register_interface(make_interface(1, constants::MODE_FULL));
3081 assert!(!engine.has_local_clients());
3082
3083 engine.register_interface(make_local_client_interface(2));
3084 assert!(engine.has_local_clients());
3085
3086 engine.deregister_interface(InterfaceId(2));
3087 assert!(!engine.has_local_clients());
3088 }
3089
3090 #[test]
3091 fn test_local_client_hop_decrement() {
3092 let mut engine = TransportEngine::new(make_config(false));
3095 engine.register_interface(make_local_client_interface(1));
3096 engine.register_interface(make_interface(2, constants::MODE_FULL));
3097
3098 let dest = [0xAA; 16];
3100 engine.register_destination(dest, constants::DESTINATION_PLAIN);
3101
3102 let flags = PacketFlags {
3103 header_type: constants::HEADER_1,
3104 context_flag: constants::FLAG_UNSET,
3105 transport_type: constants::TRANSPORT_BROADCAST,
3106 destination_type: constants::DESTINATION_PLAIN,
3107 packet_type: constants::PACKET_TYPE_DATA,
3108 };
3109 let packet =
3111 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3112
3113 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3114 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3115
3116 let deliver = actions
3119 .iter()
3120 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
3121 assert!(deliver.is_some(), "Should deliver locally");
3122 }
3123
3124 #[test]
3125 fn test_plain_broadcast_from_local_client() {
3126 let mut engine = TransportEngine::new(make_config(false));
3128 engine.register_interface(make_local_client_interface(1));
3129 engine.register_interface(make_interface(2, constants::MODE_FULL));
3130
3131 let dest = [0xBB; 16];
3132 let flags = PacketFlags {
3133 header_type: constants::HEADER_1,
3134 context_flag: constants::FLAG_UNSET,
3135 transport_type: constants::TRANSPORT_BROADCAST,
3136 destination_type: constants::DESTINATION_PLAIN,
3137 packet_type: constants::PACKET_TYPE_DATA,
3138 };
3139 let packet =
3140 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3141
3142 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3143 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3144
3145 let forward = actions.iter().find(|a| {
3147 matches!(
3148 a,
3149 TransportAction::ForwardPlainBroadcast {
3150 to_local: false,
3151 ..
3152 }
3153 )
3154 });
3155 assert!(forward.is_some(), "Should forward to external interfaces");
3156 }
3157
3158 #[test]
3159 fn test_plain_broadcast_from_external() {
3160 let mut engine = TransportEngine::new(make_config(false));
3162 engine.register_interface(make_local_client_interface(1));
3163 engine.register_interface(make_interface(2, constants::MODE_FULL));
3164
3165 let dest = [0xCC; 16];
3166 let flags = PacketFlags {
3167 header_type: constants::HEADER_1,
3168 context_flag: constants::FLAG_UNSET,
3169 transport_type: constants::TRANSPORT_BROADCAST,
3170 destination_type: constants::DESTINATION_PLAIN,
3171 packet_type: constants::PACKET_TYPE_DATA,
3172 };
3173 let packet =
3174 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3175
3176 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3177 let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
3178
3179 let forward = actions.iter().find(|a| {
3181 matches!(
3182 a,
3183 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3184 )
3185 });
3186 assert!(forward.is_some(), "Should forward to local clients");
3187 }
3188
3189 #[test]
3190 fn test_no_plain_broadcast_bridging_without_local_clients() {
3191 let mut engine = TransportEngine::new(make_config(false));
3193 engine.register_interface(make_interface(1, constants::MODE_FULL));
3194 engine.register_interface(make_interface(2, constants::MODE_FULL));
3195
3196 let dest = [0xDD; 16];
3197 let flags = PacketFlags {
3198 header_type: constants::HEADER_1,
3199 context_flag: constants::FLAG_UNSET,
3200 transport_type: constants::TRANSPORT_BROADCAST,
3201 destination_type: constants::DESTINATION_PLAIN,
3202 packet_type: constants::PACKET_TYPE_DATA,
3203 };
3204 let packet =
3205 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3206
3207 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3208 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3209
3210 let has_forward = actions
3212 .iter()
3213 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3214 assert!(!has_forward, "No bridging without local clients");
3215 }
3216
3217 #[test]
3218 fn test_announce_forwarded_to_local_clients() {
3219 use crate::announce::AnnounceData;
3220 use crate::destination::{destination_hash, name_hash};
3221
3222 let mut engine = TransportEngine::new(make_config(false));
3223 engine.register_interface(make_interface(1, constants::MODE_FULL));
3224 engine.register_interface(make_local_client_interface(2));
3225
3226 let identity =
3227 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3228 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3229 let name_h = name_hash("test", &["fwd"]);
3230 let random_hash = [0x42u8; 10];
3231
3232 let (announce_data, _) =
3233 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3234
3235 let flags = PacketFlags {
3236 header_type: constants::HEADER_1,
3237 context_flag: constants::FLAG_UNSET,
3238 transport_type: constants::TRANSPORT_BROADCAST,
3239 destination_type: constants::DESTINATION_SINGLE,
3240 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3241 };
3242 let packet = RawPacket::pack(
3243 flags,
3244 0,
3245 &dest_hash,
3246 None,
3247 constants::CONTEXT_NONE,
3248 &announce_data,
3249 )
3250 .unwrap();
3251
3252 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3253 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3254
3255 let forward = actions
3257 .iter()
3258 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3259 assert!(
3260 forward.is_some(),
3261 "Should forward announce to local clients"
3262 );
3263
3264 match forward.unwrap() {
3266 TransportAction::ForwardToLocalClients { exclude, .. } => {
3267 assert_eq!(*exclude, Some(InterfaceId(1)));
3268 }
3269 _ => unreachable!(),
3270 }
3271 }
3272
3273 #[test]
3274 fn test_no_announce_forward_without_local_clients() {
3275 use crate::announce::AnnounceData;
3276 use crate::destination::{destination_hash, name_hash};
3277
3278 let mut engine = TransportEngine::new(make_config(false));
3279 engine.register_interface(make_interface(1, constants::MODE_FULL));
3280
3281 let identity =
3282 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3283 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3284 let name_h = name_hash("test", &["nofwd"]);
3285 let random_hash = [0x42u8; 10];
3286
3287 let (announce_data, _) =
3288 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3289
3290 let flags = PacketFlags {
3291 header_type: constants::HEADER_1,
3292 context_flag: constants::FLAG_UNSET,
3293 transport_type: constants::TRANSPORT_BROADCAST,
3294 destination_type: constants::DESTINATION_SINGLE,
3295 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3296 };
3297 let packet = RawPacket::pack(
3298 flags,
3299 0,
3300 &dest_hash,
3301 None,
3302 constants::CONTEXT_NONE,
3303 &announce_data,
3304 )
3305 .unwrap();
3306
3307 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3308 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3309
3310 let has_forward = actions
3312 .iter()
3313 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3314 assert!(!has_forward, "No forward without local clients");
3315 }
3316
3317 #[test]
3318 fn test_local_client_exclude_from_forward() {
3319 use crate::announce::AnnounceData;
3320 use crate::destination::{destination_hash, name_hash};
3321
3322 let mut engine = TransportEngine::new(make_config(false));
3323 engine.register_interface(make_local_client_interface(1));
3324 engine.register_interface(make_local_client_interface(2));
3325
3326 let identity =
3327 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3328 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3329 let name_h = name_hash("test", &["excl"]);
3330 let random_hash = [0x42u8; 10];
3331
3332 let (announce_data, _) =
3333 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3334
3335 let flags = PacketFlags {
3336 header_type: constants::HEADER_1,
3337 context_flag: constants::FLAG_UNSET,
3338 transport_type: constants::TRANSPORT_BROADCAST,
3339 destination_type: constants::DESTINATION_SINGLE,
3340 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3341 };
3342 let packet = RawPacket::pack(
3343 flags,
3344 0,
3345 &dest_hash,
3346 None,
3347 constants::CONTEXT_NONE,
3348 &announce_data,
3349 )
3350 .unwrap();
3351
3352 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3353 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3355
3356 let forward = actions
3358 .iter()
3359 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3360 assert!(forward.is_some());
3361 match forward.unwrap() {
3362 TransportAction::ForwardToLocalClients { exclude, .. } => {
3363 assert_eq!(*exclude, Some(InterfaceId(1)));
3364 }
3365 _ => unreachable!(),
3366 }
3367 }
3368
3369 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3374 InterfaceInfo {
3375 id: InterfaceId(id),
3376 name: String::from("tunnel_iface"),
3377 mode: constants::MODE_FULL,
3378 out_capable: true,
3379 in_capable: true,
3380 bitrate: None,
3381 announce_rate_target: None,
3382 announce_rate_grace: 0,
3383 announce_rate_penalty: 0.0,
3384 announce_cap: constants::ANNOUNCE_CAP,
3385 is_local_client: false,
3386 wants_tunnel: true,
3387 tunnel_id: None,
3388 mtu: constants::MTU as u32,
3389 ingress_control: false,
3390 ia_freq: 0.0,
3391 started: 0.0,
3392 }
3393 }
3394
3395 #[test]
3396 fn test_handle_tunnel_new() {
3397 let mut engine = TransportEngine::new(make_config(true));
3398 engine.register_interface(make_tunnel_interface(1));
3399
3400 let tunnel_id = [0xAA; 32];
3401 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3402
3403 assert!(actions
3405 .iter()
3406 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3407
3408 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3410 assert_eq!(info.tunnel_id, Some(tunnel_id));
3411
3412 assert_eq!(engine.tunnel_table().len(), 1);
3414 }
3415
3416 #[test]
3417 fn test_announce_stores_tunnel_path() {
3418 use crate::announce::AnnounceData;
3419 use crate::destination::{destination_hash, name_hash};
3420
3421 let mut engine = TransportEngine::new(make_config(false));
3422 let mut iface = make_tunnel_interface(1);
3423 let tunnel_id = [0xBB; 32];
3424 iface.tunnel_id = Some(tunnel_id);
3425 engine.register_interface(iface);
3426
3427 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3429
3430 let identity =
3432 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3433 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3434 let name_h = name_hash("test", &["tunnel"]);
3435 let random_hash = [0x42u8; 10];
3436
3437 let (announce_data, _) =
3438 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3439
3440 let flags = PacketFlags {
3441 header_type: constants::HEADER_1,
3442 context_flag: constants::FLAG_UNSET,
3443 transport_type: constants::TRANSPORT_BROADCAST,
3444 destination_type: constants::DESTINATION_SINGLE,
3445 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3446 };
3447 let packet = RawPacket::pack(
3448 flags,
3449 0,
3450 &dest_hash,
3451 None,
3452 constants::CONTEXT_NONE,
3453 &announce_data,
3454 )
3455 .unwrap();
3456
3457 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3458 engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3459
3460 assert!(engine.has_path(&dest_hash));
3462
3463 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3465 assert_eq!(tunnel.paths.len(), 1);
3466 assert!(tunnel.paths.contains_key(&dest_hash));
3467 }
3468
3469 #[test]
3470 fn test_tunnel_reattach_restores_paths() {
3471 let mut engine = TransportEngine::new(make_config(true));
3472 engine.register_interface(make_tunnel_interface(1));
3473
3474 let tunnel_id = [0xCC; 32];
3475 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3476
3477 let dest = [0xDD; 16];
3479 engine.tunnel_table.store_tunnel_path(
3480 &tunnel_id,
3481 dest,
3482 tunnel::TunnelPath {
3483 timestamp: 1000.0,
3484 received_from: [0xEE; 16],
3485 hops: 3,
3486 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3487 random_blobs: Vec::new(),
3488 packet_hash: [0xFF; 32],
3489 },
3490 1000.0,
3491 constants::DESTINATION_TIMEOUT,
3492 usize::MAX,
3493 );
3494
3495 engine.void_tunnel_interface(&tunnel_id);
3497
3498 engine.path_table.remove(&dest);
3500 assert!(!engine.has_path(&dest));
3501
3502 engine.register_interface(make_interface(2, constants::MODE_FULL));
3504 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3505
3506 assert!(engine.has_path(&dest));
3508 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3509 assert_eq!(path.hops, 3);
3510 assert_eq!(path.receiving_interface, InterfaceId(2));
3511
3512 assert!(actions
3514 .iter()
3515 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3516 }
3517
3518 #[test]
3519 fn test_void_tunnel_interface() {
3520 let mut engine = TransportEngine::new(make_config(true));
3521 engine.register_interface(make_tunnel_interface(1));
3522
3523 let tunnel_id = [0xDD; 32];
3524 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3525
3526 assert_eq!(
3528 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3529 Some(InterfaceId(1))
3530 );
3531
3532 engine.void_tunnel_interface(&tunnel_id);
3533
3534 assert_eq!(engine.tunnel_table().len(), 1);
3536 assert_eq!(
3537 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3538 None
3539 );
3540 }
3541
3542 #[test]
3543 fn test_tick_culls_tunnels() {
3544 let mut engine = TransportEngine::new(make_config(true));
3545 engine.register_interface(make_tunnel_interface(1));
3546
3547 let tunnel_id = [0xEE; 32];
3548 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3549 assert_eq!(engine.tunnel_table().len(), 1);
3550
3551 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3552
3553 engine.tick(
3555 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3556 &mut rng,
3557 );
3558
3559 assert_eq!(engine.tunnel_table().len(), 0);
3560 }
3561
3562 #[test]
3563 fn test_synthesize_tunnel() {
3564 let mut engine = TransportEngine::new(make_config(true));
3565 engine.register_interface(make_tunnel_interface(1));
3566
3567 let identity =
3568 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3569 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3570
3571 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3572
3573 assert_eq!(actions.len(), 1);
3575 match &actions[0] {
3576 TransportAction::TunnelSynthesize {
3577 interface,
3578 data,
3579 dest_hash,
3580 } => {
3581 assert_eq!(*interface, InterfaceId(1));
3582 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3583 let expected_dest = crate::destination::destination_hash(
3585 "rnstransport",
3586 &["tunnel", "synthesize"],
3587 None,
3588 );
3589 assert_eq!(*dest_hash, expected_dest);
3590 }
3591 _ => panic!("Expected TunnelSynthesize"),
3592 }
3593 }
3594
3595 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3600 let mut data = Vec::new();
3601 data.extend_from_slice(dest_hash);
3602 data.extend_from_slice(tag);
3603 data
3604 }
3605
3606 #[test]
3607 fn test_path_request_forwarded_on_ap() {
3608 let mut engine = TransportEngine::new(make_config(true));
3609 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3610 engine.register_interface(make_interface(2, constants::MODE_FULL));
3611
3612 let dest = [0xD1; 16];
3613 let tag = [0x01; 16];
3614 let data = make_path_request_data(&dest, &tag);
3615
3616 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3617
3618 assert_eq!(actions.len(), 1);
3620 match &actions[0] {
3621 TransportAction::SendOnInterface { interface, .. } => {
3622 assert_eq!(*interface, InterfaceId(2));
3623 }
3624 _ => panic!("Expected SendOnInterface for forwarded path request"),
3625 }
3626 assert!(engine.discovery_path_requests.contains_key(&dest));
3628 }
3629
3630 #[test]
3631 fn test_path_request_not_forwarded_on_full() {
3632 let mut engine = TransportEngine::new(make_config(true));
3633 engine.register_interface(make_interface(1, constants::MODE_FULL));
3634 engine.register_interface(make_interface(2, constants::MODE_FULL));
3635
3636 let dest = [0xD2; 16];
3637 let tag = [0x02; 16];
3638 let data = make_path_request_data(&dest, &tag);
3639
3640 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3641
3642 assert!(actions.is_empty());
3644 assert!(!engine.discovery_path_requests.contains_key(&dest));
3645 }
3646
3647 #[test]
3648 fn test_discovery_pr_tags_fifo_eviction() {
3649 let mut config = make_config(true);
3650 config.max_discovery_pr_tags = 2;
3651 let mut engine = TransportEngine::new(config);
3652
3653 let dest1 = [0xA1; 16];
3654 let dest2 = [0xA2; 16];
3655 let dest3 = [0xA3; 16];
3656 let tag1 = [0x01; 16];
3657 let tag2 = [0x02; 16];
3658 let tag3 = [0x03; 16];
3659
3660 engine.handle_path_request(
3661 &make_path_request_data(&dest1, &tag1),
3662 InterfaceId(1),
3663 1000.0,
3664 );
3665 engine.handle_path_request(
3666 &make_path_request_data(&dest2, &tag2),
3667 InterfaceId(1),
3668 1001.0,
3669 );
3670 assert_eq!(engine.discovery_pr_tags_count(), 2);
3671
3672 let unique1 = make_unique_tag(dest1, &tag1);
3673 let unique2 = make_unique_tag(dest2, &tag2);
3674 assert!(engine.discovery_pr_tags.contains(&unique1));
3675 assert!(engine.discovery_pr_tags.contains(&unique2));
3676
3677 engine.handle_path_request(
3678 &make_path_request_data(&dest3, &tag3),
3679 InterfaceId(1),
3680 1002.0,
3681 );
3682 assert_eq!(engine.discovery_pr_tags_count(), 2);
3683 assert!(!engine.discovery_pr_tags.contains(&unique1));
3684 assert!(engine.discovery_pr_tags.contains(&unique2));
3685
3686 engine.handle_path_request(
3687 &make_path_request_data(&dest1, &tag1),
3688 InterfaceId(1),
3689 1003.0,
3690 );
3691 assert_eq!(engine.discovery_pr_tags_count(), 2);
3692 assert!(engine.discovery_pr_tags.contains(&unique1));
3693 }
3694
3695 #[test]
3696 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3697 let mut config = make_config(false);
3698 config.max_path_destinations = 2;
3699 let mut engine = TransportEngine::new(config);
3700 engine.register_interface(make_interface(1, constants::MODE_FULL));
3701
3702 let dest1 = [0xB1; 16];
3703 let dest2 = [0xB2; 16];
3704 let dest3 = [0xB3; 16];
3705
3706 engine.upsert_path_destination(
3707 dest1,
3708 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3709 1000.0,
3710 );
3711 engine.upsert_path_destination(
3712 dest2,
3713 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3714 1001.0,
3715 );
3716 engine
3717 .path_states
3718 .insert(dest1, constants::STATE_UNRESPONSIVE);
3719
3720 engine.upsert_path_destination(
3721 dest3,
3722 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3723 1002.0,
3724 );
3725
3726 assert_eq!(engine.path_table_count(), 2);
3727 assert!(!engine.has_path(&dest1));
3728 assert!(engine.has_path(&dest2));
3729 assert!(engine.has_path(&dest3));
3730 assert!(!engine.path_states.contains_key(&dest1));
3731 assert_eq!(engine.path_destination_cap_evict_count(), 1);
3732 }
3733
3734 #[test]
3735 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
3736 let mut config = make_config(false);
3737 config.max_path_destinations = 2;
3738 config.max_paths_per_destination = 2;
3739 let mut engine = TransportEngine::new(config);
3740 engine.register_interface(make_interface(1, constants::MODE_FULL));
3741
3742 let dest1 = [0xC1; 16];
3743 let dest2 = [0xC2; 16];
3744
3745 engine.upsert_path_destination(
3746 dest1,
3747 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
3748 1000.0,
3749 );
3750 engine.upsert_path_destination(
3751 dest2,
3752 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
3753 1001.0,
3754 );
3755
3756 engine.upsert_path_destination(
3757 dest2,
3758 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
3759 1002.0,
3760 );
3761
3762 assert_eq!(engine.path_table_count(), 2);
3763 assert!(engine.has_path(&dest1));
3764 assert!(engine.has_path(&dest2));
3765 }
3766
3767 #[test]
3768 fn test_roaming_loop_prevention() {
3769 let mut engine = TransportEngine::new(make_config(true));
3770 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
3771
3772 let dest = [0xD3; 16];
3773 engine.path_table.insert(
3775 dest,
3776 PathSet::from_single(
3777 PathEntry {
3778 timestamp: 900.0,
3779 next_hop: [0xAA; 16],
3780 hops: 2,
3781 expires: 9999.0,
3782 random_blobs: Vec::new(),
3783 receiving_interface: InterfaceId(1),
3784 packet_hash: [0; 32],
3785 announce_raw: None,
3786 },
3787 1,
3788 ),
3789 );
3790
3791 let tag = [0x03; 16];
3792 let data = make_path_request_data(&dest, &tag);
3793
3794 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3795
3796 assert!(actions.is_empty());
3798 assert!(!engine.announce_table.contains_key(&dest));
3799 }
3800
3801 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
3803 let flags: u8 = 0x01; let mut raw = Vec::new();
3807 raw.push(flags);
3808 raw.push(0x02); raw.extend_from_slice(dest_hash);
3810 raw.push(constants::CONTEXT_NONE);
3811 raw.extend_from_slice(payload);
3812 raw
3813 }
3814
3815 #[test]
3816 fn test_path_request_populates_announce_entry_from_raw() {
3817 let mut engine = TransportEngine::new(make_config(true));
3818 engine.register_interface(make_interface(1, constants::MODE_FULL));
3819 engine.register_interface(make_interface(2, constants::MODE_FULL));
3820
3821 let dest = [0xD5; 16];
3822 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
3824
3825 engine.path_table.insert(
3826 dest,
3827 PathSet::from_single(
3828 PathEntry {
3829 timestamp: 900.0,
3830 next_hop: [0xBB; 16],
3831 hops: 2,
3832 expires: 9999.0,
3833 random_blobs: Vec::new(),
3834 receiving_interface: InterfaceId(2),
3835 packet_hash: [0; 32],
3836 announce_raw: Some(announce_raw.clone()),
3837 },
3838 1,
3839 ),
3840 );
3841
3842 let tag = [0x05; 16];
3843 let data = make_path_request_data(&dest, &tag);
3844 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3845
3846 let entry = engine
3848 .announce_table
3849 .get(&dest)
3850 .expect("announce entry must exist");
3851 assert_eq!(entry.packet_raw, announce_raw);
3852 assert_eq!(entry.packet_data, payload);
3853 assert!(entry.block_rebroadcasts);
3854 }
3855
3856 #[test]
3857 fn test_path_request_skips_when_no_announce_raw() {
3858 let mut engine = TransportEngine::new(make_config(true));
3859 engine.register_interface(make_interface(1, constants::MODE_FULL));
3860 engine.register_interface(make_interface(2, constants::MODE_FULL));
3861
3862 let dest = [0xD6; 16];
3863
3864 engine.path_table.insert(
3865 dest,
3866 PathSet::from_single(
3867 PathEntry {
3868 timestamp: 900.0,
3869 next_hop: [0xCC; 16],
3870 hops: 1,
3871 expires: 9999.0,
3872 random_blobs: Vec::new(),
3873 receiving_interface: InterfaceId(2),
3874 packet_hash: [0; 32],
3875 announce_raw: None, },
3877 1,
3878 ),
3879 );
3880
3881 let tag = [0x06; 16];
3882 let data = make_path_request_data(&dest, &tag);
3883 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3884
3885 assert!(actions.is_empty());
3887 assert!(!engine.announce_table.contains_key(&dest));
3888 }
3889
3890 #[test]
3891 fn test_discovery_request_consumed_on_announce() {
3892 let mut engine = TransportEngine::new(make_config(true));
3893 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3894
3895 let dest = [0xD4; 16];
3896
3897 engine.discovery_path_requests.insert(
3899 dest,
3900 DiscoveryPathRequest {
3901 timestamp: 900.0,
3902 requesting_interface: InterfaceId(1),
3903 },
3904 );
3905
3906 let iface = engine.discovery_path_requests_waiting(&dest);
3908 assert_eq!(iface, Some(InterfaceId(1)));
3909
3910 assert!(!engine.discovery_path_requests.contains_key(&dest));
3912 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
3913 }
3914
3915 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
3921 let identity =
3922 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3923 let random_hash = [0x42u8; 10];
3924 let (announce_data, _) = crate::announce::AnnounceData::pack(
3925 &identity,
3926 dest_hash,
3927 name_hash,
3928 &random_hash,
3929 None,
3930 None,
3931 )
3932 .unwrap();
3933 let flags = PacketFlags {
3934 header_type: constants::HEADER_1,
3935 context_flag: constants::FLAG_UNSET,
3936 transport_type: constants::TRANSPORT_BROADCAST,
3937 destination_type: constants::DESTINATION_SINGLE,
3938 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3939 };
3940 RawPacket::pack(
3941 flags,
3942 0,
3943 dest_hash,
3944 None,
3945 constants::CONTEXT_NONE,
3946 &announce_data,
3947 )
3948 .unwrap()
3949 .raw
3950 }
3951
3952 #[test]
3953 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
3954 let mut engine = TransportEngine::new(make_config(false));
3959 engine.register_interface(make_local_client_interface(1));
3960
3961 let identity =
3962 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3963 let dest_hash =
3964 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
3965 let name_hash = crate::destination::name_hash("issue4", &["test"]);
3966 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3967
3968 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
3972 announce_packet.raw[1] = 1;
3973 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3974 engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
3975 assert!(engine.has_path(&dest_hash));
3976 assert_eq!(engine.hops_to(&dest_hash), Some(1));
3977
3978 let data_flags = PacketFlags {
3980 header_type: constants::HEADER_1,
3981 context_flag: constants::FLAG_UNSET,
3982 transport_type: constants::TRANSPORT_BROADCAST,
3983 destination_type: constants::DESTINATION_SINGLE,
3984 packet_type: constants::PACKET_TYPE_DATA,
3985 };
3986 let data_packet = RawPacket::pack(
3987 data_flags,
3988 0,
3989 &dest_hash,
3990 None,
3991 constants::CONTEXT_NONE,
3992 b"hello",
3993 )
3994 .unwrap();
3995
3996 let actions =
3997 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
3998
3999 let send = actions.iter().find_map(|a| match a {
4000 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4001 _ => None,
4002 });
4003 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4004 assert_eq!(*interface, InterfaceId(1));
4005 let flags = PacketFlags::unpack(raw[0]);
4006 assert_eq!(flags.header_type, constants::HEADER_2);
4007 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4008 }
4009
4010 #[test]
4011 fn test_issue4_external_data_to_1hop_via_transport_works() {
4012 let daemon_id = [0x42; 16];
4018 let mut engine = TransportEngine::new(TransportConfig {
4019 transport_enabled: true,
4020 identity_hash: Some(daemon_id),
4021 prefer_shorter_path: false,
4022 max_paths_per_destination: 1,
4023 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4024 max_discovery_pr_tags: constants::MAX_PR_TAGS,
4025 max_path_destinations: usize::MAX,
4026 max_tunnel_destinations_total: usize::MAX,
4027 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4028 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4029 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4030 announce_sig_cache_enabled: true,
4031 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4032 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4033 announce_queue_max_entries: 256,
4034 announce_queue_max_interfaces: 1024,
4035 });
4036 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
4040 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4041 let dest_hash =
4042 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4043 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4044 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4045
4046 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4048 engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
4049 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4050
4051 let h2_flags = PacketFlags {
4054 header_type: constants::HEADER_2,
4055 context_flag: constants::FLAG_UNSET,
4056 transport_type: constants::TRANSPORT_TRANSPORT,
4057 destination_type: constants::DESTINATION_SINGLE,
4058 packet_type: constants::PACKET_TYPE_DATA,
4059 };
4060 let mut h2_raw = Vec::new();
4062 h2_raw.push(h2_flags.pack());
4063 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
4066 h2_raw.push(constants::CONTEXT_NONE);
4067 h2_raw.extend_from_slice(b"hello via transport");
4068
4069 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4070 let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
4071
4072 let has_send = actions.iter().any(|a| {
4074 matches!(
4075 a,
4076 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4077 )
4078 });
4079 assert!(
4080 has_send,
4081 "HEADER_2 transport packet should be forwarded (control test)"
4082 );
4083 }
4084}