1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod pathfinder;
10pub mod rate_limit;
11pub mod tables;
12pub mod tunnel;
13pub mod types;
14
15use alloc::collections::BTreeMap;
16use alloc::string::String;
17use alloc::vec::Vec;
18use core::mem::size_of;
19
20use rns_crypto::Rng;
21
22use crate::announce::AnnounceData;
23use crate::constants;
24use crate::hash;
25use crate::packet::RawPacket;
26
27use self::announce_proc::compute_path_expires;
28use self::announce_queue::AnnounceQueues;
29use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
30use self::dedup::{AnnounceSignatureCache, PacketHashlist};
31use self::inbound::{
32 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
33 route_via_link_table,
34};
35use self::ingress_control::IngressControl;
36use self::outbound::{route_outbound, should_transmit_announce};
37use self::pathfinder::{
38 decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
39 timebase_from_random_blobs, MultiPathDecision,
40};
41use self::rate_limit::AnnounceRateLimiter;
42use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
43use self::tunnel::TunnelTable;
44use self::types::{BlackholeEntry, InterfaceId, InterfaceInfo, TransportAction, TransportConfig};
45
46pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
47pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
48
49pub struct TransportEngine {
54 config: TransportConfig,
55 path_table: BTreeMap<[u8; 16], PathSet>,
56 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
57 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
58 link_table: BTreeMap<[u8; 16], LinkEntry>,
59 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
60 packet_hashlist: PacketHashlist,
61 announce_sig_cache: AnnounceSignatureCache,
62 rate_limiter: AnnounceRateLimiter,
63 path_states: BTreeMap<[u8; 16], u8>,
64 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
65 local_destinations: BTreeMap<[u8; 16], u8>,
66 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
67 announce_queues: AnnounceQueues,
68 ingress_control: IngressControl,
69 tunnel_table: TunnelTable,
70 discovery_pr_tags: Vec<[u8; 32]>,
71 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
72 path_destination_cap_evict_count: usize,
73 announces_last_checked: f64,
75 tables_last_culled: f64,
76}
77
78impl TransportEngine {
79 pub fn new(config: TransportConfig) -> Self {
80 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
81 let sig_cache_max = if config.announce_sig_cache_enabled {
82 config.announce_sig_cache_max_entries
83 } else {
84 0
85 };
86 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
87 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
88 TransportEngine {
89 config,
90 path_table: BTreeMap::new(),
91 announce_table: BTreeMap::new(),
92 reverse_table: BTreeMap::new(),
93 link_table: BTreeMap::new(),
94 held_announces: BTreeMap::new(),
95 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
96 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
97 rate_limiter: AnnounceRateLimiter::new(),
98 path_states: BTreeMap::new(),
99 interfaces: BTreeMap::new(),
100 local_destinations: BTreeMap::new(),
101 blackholed_identities: BTreeMap::new(),
102 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
103 ingress_control: IngressControl::new(),
104 tunnel_table: TunnelTable::new(),
105 discovery_pr_tags: Vec::new(),
106 discovery_path_requests: BTreeMap::new(),
107 path_destination_cap_evict_count: 0,
108 announces_last_checked: 0.0,
109 tables_last_culled: 0.0,
110 }
111 }
112
113 fn insert_discovery_pr_tag(&mut self, unique_tag: [u8; 32]) -> bool {
114 if self.discovery_pr_tags.contains(&unique_tag) {
115 return false;
116 }
117 if self.config.max_discovery_pr_tags != usize::MAX
118 && self.discovery_pr_tags.len() >= self.config.max_discovery_pr_tags
119 && !self.discovery_pr_tags.is_empty()
120 {
121 self.discovery_pr_tags.remove(0);
122 }
123 self.discovery_pr_tags.push(unique_tag);
124 true
125 }
126
127 fn upsert_path_destination(&mut self, dest_hash: [u8; 16], entry: PathEntry, now: f64) {
128 let max_paths = self.config.max_paths_per_destination;
129 if let Some(ps) = self.path_table.get_mut(&dest_hash) {
130 ps.upsert(entry);
131 return;
132 }
133 self.enforce_path_destination_cap(now);
134 self.path_table
135 .insert(dest_hash, PathSet::from_single(entry, max_paths));
136 }
137
138 fn enforce_path_destination_cap(&mut self, now: f64) {
139 if self.config.max_path_destinations == usize::MAX {
140 return;
141 }
142 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
143 while self.path_table.len() >= self.config.max_path_destinations {
144 let Some(dest_hash) = self.oldest_path_destination() else {
145 break;
146 };
147 self.path_table.remove(&dest_hash);
148 self.path_states.remove(&dest_hash);
149 self.path_destination_cap_evict_count += 1;
150 }
151 }
152
153 fn oldest_path_destination(&self) -> Option<[u8; 16]> {
154 self.path_table
155 .iter()
156 .filter_map(|(dest_hash, path_set)| {
157 path_set
158 .primary()
159 .map(|primary| (*dest_hash, primary.timestamp, primary.hops))
160 })
161 .min_by(|a, b| {
162 a.1.partial_cmp(&b.1)
163 .unwrap_or(core::cmp::Ordering::Equal)
164 .then_with(|| b.2.cmp(&a.2))
165 })
166 .map(|(dest_hash, _, _)| dest_hash)
167 }
168
169 fn announce_entry_size_bytes(entry: &AnnounceEntry) -> usize {
170 size_of::<AnnounceEntry>() + entry.packet_raw.capacity() + entry.packet_data.capacity()
171 }
172
173 fn announce_retained_bytes_total(&self) -> usize {
174 self.announce_table
175 .values()
176 .chain(self.held_announces.values())
177 .map(Self::announce_entry_size_bytes)
178 .sum()
179 }
180
181 fn cull_expired_announce_entries(&mut self, now: f64) -> usize {
182 let ttl = self.config.announce_table_ttl_secs;
183 let mut removed = 0usize;
184
185 self.announce_table.retain(|_, entry| {
186 let keep = now <= entry.timestamp + ttl;
187 if !keep {
188 removed += 1;
189 }
190 keep
191 });
192
193 self.held_announces.retain(|_, entry| {
194 let keep = now <= entry.timestamp + ttl;
195 if !keep {
196 removed += 1;
197 }
198 keep
199 });
200
201 removed
202 }
203
204 fn oldest_retained_announce(&self) -> Option<([u8; 16], bool)> {
205 let oldest_active = self
206 .announce_table
207 .iter()
208 .map(|(dest_hash, entry)| (*dest_hash, false, entry.timestamp))
209 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
210 let oldest_held = self
211 .held_announces
212 .iter()
213 .map(|(dest_hash, entry)| (*dest_hash, true, entry.timestamp))
214 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
215
216 match (oldest_active, oldest_held) {
217 (Some(active), Some(held)) => {
218 let ordering = active
219 .2
220 .partial_cmp(&held.2)
221 .unwrap_or(core::cmp::Ordering::Equal);
222 if ordering == core::cmp::Ordering::Less {
223 Some((active.0, active.1))
224 } else {
225 Some((held.0, held.1))
226 }
227 }
228 (Some(active), None) => Some((active.0, active.1)),
229 (None, Some(held)) => Some((held.0, held.1)),
230 (None, None) => None,
231 }
232 }
233
234 fn enforce_announce_retention_cap(&mut self, now: f64) {
235 self.cull_expired_announce_entries(now);
236 while self.announce_retained_bytes_total() > self.config.announce_table_max_bytes {
237 let Some((dest_hash, is_held)) = self.oldest_retained_announce() else {
238 break;
239 };
240 if is_held {
241 self.held_announces.remove(&dest_hash);
242 } else {
243 self.announce_table.remove(&dest_hash);
244 }
245 }
246 }
247
248 fn insert_announce_entry(
249 &mut self,
250 dest_hash: [u8; 16],
251 entry: AnnounceEntry,
252 now: f64,
253 ) -> bool {
254 self.cull_expired_announce_entries(now);
255 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
256 return false;
257 }
258 self.announce_table.insert(dest_hash, entry);
259 self.enforce_announce_retention_cap(now);
260 self.announce_table.contains_key(&dest_hash)
261 }
262
263 fn insert_held_announce(
264 &mut self,
265 dest_hash: [u8; 16],
266 entry: AnnounceEntry,
267 now: f64,
268 ) -> bool {
269 self.cull_expired_announce_entries(now);
270 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
271 return false;
272 }
273 self.held_announces.insert(dest_hash, entry);
274 self.enforce_announce_retention_cap(now);
275 self.held_announces.contains_key(&dest_hash)
276 }
277
278 pub fn register_interface(&mut self, info: InterfaceInfo) {
283 self.interfaces.insert(info.id, info);
284 }
285
286 pub fn deregister_interface(&mut self, id: InterfaceId) {
287 self.interfaces.remove(&id);
288 self.announce_queues.remove_interface(id);
289 self.ingress_control.remove_interface(&id);
290 }
291
292 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
297 self.local_destinations.insert(dest_hash, dest_type);
298 }
299
300 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
301 self.local_destinations.remove(dest_hash);
302 }
303
304 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
309 self.path_table
310 .get(dest_hash)
311 .is_some_and(|ps| !ps.is_empty())
312 }
313
314 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
315 self.path_table
316 .get(dest_hash)
317 .and_then(|ps| ps.primary())
318 .map(|e| e.hops)
319 }
320
321 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
322 self.path_table
323 .get(dest_hash)
324 .and_then(|ps| ps.primary())
325 .map(|e| e.next_hop)
326 }
327
328 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
329 self.path_table
330 .get(dest_hash)
331 .and_then(|ps| ps.primary())
332 .map(|e| e.receiving_interface)
333 }
334
335 pub fn mark_path_unresponsive(
345 &mut self,
346 dest_hash: &[u8; 16],
347 receiving_interface: Option<InterfaceId>,
348 ) {
349 if let Some(iface_id) = receiving_interface {
350 if let Some(info) = self.interfaces.get(&iface_id) {
351 if info.mode == constants::MODE_BOUNDARY {
352 return;
353 }
354 }
355 }
356
357 if let Some(ps) = self.path_table.get_mut(dest_hash) {
359 if ps.len() > 1 {
360 ps.failover(false); self.path_states.remove(dest_hash);
363 return;
364 }
365 }
366
367 self.path_states
368 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
369 }
370
371 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
372 self.path_states
373 .insert(*dest_hash, constants::STATE_RESPONSIVE);
374 }
375
376 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
377 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
378 }
379
380 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
381 if let Some(ps) = self.path_table.get_mut(dest_hash) {
382 ps.expire_all();
383 }
384 }
385
386 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
391 self.link_table.insert(link_id, entry);
392 }
393
394 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
395 if let Some(entry) = self.link_table.get_mut(link_id) {
396 entry.validated = true;
397 }
398 }
399
400 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
401 self.link_table.remove(link_id);
402 }
403
404 pub fn blackhole_identity(
410 &mut self,
411 identity_hash: [u8; 16],
412 now: f64,
413 duration_hours: Option<f64>,
414 reason: Option<String>,
415 ) {
416 let expires = match duration_hours {
417 Some(h) if h > 0.0 => now + h * 3600.0,
418 _ => 0.0, };
420 self.blackholed_identities.insert(
421 identity_hash,
422 BlackholeEntry {
423 created: now,
424 expires,
425 reason,
426 },
427 );
428 }
429
430 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
432 self.blackholed_identities.remove(identity_hash).is_some()
433 }
434
435 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
437 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
438 if entry.expires == 0.0 || entry.expires > now {
439 return true;
440 }
441 }
442 false
443 }
444
445 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
447 self.blackholed_identities.iter()
448 }
449
450 fn cull_blackholed(&mut self, now: f64) {
452 self.blackholed_identities
453 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
454 }
455
456 pub fn handle_tunnel(
464 &mut self,
465 tunnel_id: [u8; 32],
466 interface: InterfaceId,
467 now: f64,
468 ) -> Vec<TransportAction> {
469 let mut actions = Vec::new();
470
471 if let Some(info) = self.interfaces.get_mut(&interface) {
473 info.tunnel_id = Some(tunnel_id);
474 }
475
476 let restored_paths = self.tunnel_table.handle_tunnel(
477 tunnel_id,
478 interface,
479 now,
480 self.config.destination_timeout_secs,
481 );
482
483 for (dest_hash, tunnel_path) in &restored_paths {
485 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
486 Some(existing) => {
487 if tunnel_path.hops <= existing.hops || existing.expires < now {
490 let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
491 let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
492 tunnel_timebase >= existing_timebase
493 } else {
494 false
495 }
496 }
497 None => now < tunnel_path.expires,
498 };
499
500 if should_restore {
501 let entry = PathEntry {
502 timestamp: tunnel_path.timestamp,
503 next_hop: tunnel_path.received_from,
504 hops: tunnel_path.hops,
505 expires: tunnel_path.expires,
506 random_blobs: tunnel_path.random_blobs.clone(),
507 receiving_interface: interface,
508 packet_hash: tunnel_path.packet_hash,
509 announce_raw: None,
510 };
511 self.upsert_path_destination(*dest_hash, entry, now);
512 }
513 }
514
515 actions.push(TransportAction::TunnelEstablished {
516 tunnel_id,
517 interface,
518 });
519
520 actions
521 }
522
523 pub fn synthesize_tunnel(
531 &self,
532 identity: &rns_crypto::identity::Identity,
533 interface_id: InterfaceId,
534 rng: &mut dyn Rng,
535 ) -> Vec<TransportAction> {
536 let mut actions = Vec::new();
537
538 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
540 hash::full_hash(info.name.as_bytes())
541 } else {
542 return actions;
543 };
544
545 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
546 Ok((data, _tunnel_id)) => {
547 let dest_hash = crate::destination::destination_hash(
548 "rnstransport",
549 &["tunnel", "synthesize"],
550 None,
551 );
552 actions.push(TransportAction::TunnelSynthesize {
553 interface: interface_id,
554 data,
555 dest_hash,
556 });
557 }
558 Err(e) => {
559 let _ = e;
561 }
562 }
563
564 actions
565 }
566
567 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
569 self.tunnel_table.void_tunnel_interface(tunnel_id);
570 }
571
572 pub fn tunnel_table(&self) -> &TunnelTable {
574 &self.tunnel_table
575 }
576
577 fn has_local_clients(&self) -> bool {
583 self.interfaces.values().any(|i| i.is_local_client)
584 }
585
586 fn packet_filter(&self, packet: &RawPacket) -> bool {
590 if packet.transport_id.is_some()
592 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
593 {
594 if let Some(ref identity_hash) = self.config.identity_hash {
595 if packet.transport_id.as_ref() != Some(identity_hash) {
596 return false;
597 }
598 }
599 }
600
601 match packet.context {
603 constants::CONTEXT_KEEPALIVE
604 | constants::CONTEXT_RESOURCE_REQ
605 | constants::CONTEXT_RESOURCE_PRF
606 | constants::CONTEXT_RESOURCE
607 | constants::CONTEXT_CACHE_REQUEST
608 | constants::CONTEXT_CHANNEL => return true,
609 _ => {}
610 }
611
612 if packet.flags.destination_type == constants::DESTINATION_PLAIN
614 || packet.flags.destination_type == constants::DESTINATION_GROUP
615 {
616 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
617 return packet.hops <= 1;
618 } else {
619 return false;
621 }
622 }
623
624 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
626 return true;
627 }
628
629 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
631 && packet.flags.destination_type == constants::DESTINATION_SINGLE
632 {
633 return true;
634 }
635
636 false
637 }
638
639 pub fn handle_inbound(
647 &mut self,
648 raw: &[u8],
649 iface: InterfaceId,
650 now: f64,
651 rng: &mut dyn Rng,
652 ) -> Vec<TransportAction> {
653 self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
654 }
655
656 pub fn handle_inbound_with_announce_queue(
657 &mut self,
658 raw: &[u8],
659 iface: InterfaceId,
660 now: f64,
661 rng: &mut dyn Rng,
662 announce_queue: Option<&mut AnnounceVerifyQueue>,
663 ) -> Vec<TransportAction> {
664 let mut actions = Vec::new();
665
666 let mut packet = match RawPacket::unpack(raw) {
668 Ok(p) => p,
669 Err(_) => return actions, };
671
672 let original_raw = raw.to_vec();
674
675 packet.hops += 1;
677
678 let from_local_client = self
681 .interfaces
682 .get(&iface)
683 .map(|i| i.is_local_client)
684 .unwrap_or(false);
685 if from_local_client {
686 packet.hops = packet.hops.saturating_sub(1);
687 }
688
689 if !self.packet_filter(&packet) {
691 return actions;
692 }
693
694 let mut remember_hash = true;
696
697 if self.link_table.contains_key(&packet.destination_hash) {
698 remember_hash = false;
699 }
700 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF
701 && packet.context == constants::CONTEXT_LRPROOF
702 {
703 remember_hash = false;
704 }
705
706 if remember_hash {
707 self.packet_hashlist.add(packet.packet_hash);
708 }
709
710 if packet.flags.destination_type == constants::DESTINATION_PLAIN
712 && packet.flags.transport_type == constants::TRANSPORT_BROADCAST
713 && self.has_local_clients()
714 {
715 if from_local_client {
716 actions.push(TransportAction::ForwardPlainBroadcast {
718 raw: packet.raw.clone(),
719 to_local: false,
720 exclude: Some(iface),
721 });
722 } else {
723 actions.push(TransportAction::ForwardPlainBroadcast {
725 raw: packet.raw.clone(),
726 to_local: true,
727 exclude: None,
728 });
729 }
730 }
731
732 if self.config.transport_enabled || self.config.identity_hash.is_some() {
734 if packet.transport_id.is_some()
735 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
736 {
737 if let Some(ref identity_hash) = self.config.identity_hash {
738 if packet.transport_id.as_ref() == Some(identity_hash) {
739 if let Some(path_entry) = self
740 .path_table
741 .get(&packet.destination_hash)
742 .and_then(|ps| ps.primary())
743 {
744 let next_hop = path_entry.next_hop;
745 let remaining_hops = path_entry.hops;
746 let outbound_interface = path_entry.receiving_interface;
747
748 let new_raw = forward_transport_packet(
749 &packet,
750 next_hop,
751 remaining_hops,
752 outbound_interface,
753 );
754
755 if packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
757 let proof_timeout = now
758 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP
759 * (remaining_hops.max(1) as f64);
760
761 let (link_id, link_entry) = create_link_entry(
762 &packet,
763 next_hop,
764 outbound_interface,
765 remaining_hops,
766 iface,
767 now,
768 proof_timeout,
769 );
770 self.link_table.insert(link_id, link_entry);
771 actions.push(TransportAction::LinkRequestReceived {
772 link_id,
773 destination_hash: packet.destination_hash,
774 receiving_interface: iface,
775 });
776 } else {
777 let (trunc_hash, reverse_entry) =
778 create_reverse_entry(&packet, outbound_interface, iface, now);
779 self.reverse_table.insert(trunc_hash, reverse_entry);
780 }
781
782 actions.push(TransportAction::SendOnInterface {
783 interface: outbound_interface,
784 raw: new_raw,
785 });
786
787 if let Some(entry) = self
789 .path_table
790 .get_mut(&packet.destination_hash)
791 .and_then(|ps| ps.primary_mut())
792 {
793 entry.timestamp = now;
794 }
795 }
796 }
797 }
798 }
799
800 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
802 && packet.flags.packet_type != constants::PACKET_TYPE_LINKREQUEST
803 && packet.context != constants::CONTEXT_LRPROOF
804 {
805 if let Some(link_entry) = self.link_table.get(&packet.destination_hash).cloned() {
806 if let Some((outbound_iface, new_raw)) =
807 route_via_link_table(&packet, &link_entry, iface)
808 {
809 self.packet_hashlist.add(packet.packet_hash);
811
812 actions.push(TransportAction::SendOnInterface {
813 interface: outbound_iface,
814 raw: new_raw,
815 });
816
817 if let Some(entry) = self.link_table.get_mut(&packet.destination_hash) {
819 entry.timestamp = now;
820 }
821 }
822 }
823 }
824 }
825
826 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE {
828 if let Some(queue) = announce_queue {
829 self.try_enqueue_announce(
830 &packet,
831 &original_raw,
832 iface,
833 now,
834 rng,
835 queue,
836 &mut actions,
837 );
838 } else {
839 self.process_inbound_announce(
840 &packet,
841 &original_raw,
842 iface,
843 now,
844 rng,
845 &mut actions,
846 );
847 }
848 }
849
850 if packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
852 self.process_inbound_proof(&packet, iface, now, &mut actions);
853 }
854
855 if (packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
857 || packet.flags.packet_type == constants::PACKET_TYPE_DATA)
858 && self
859 .local_destinations
860 .contains_key(&packet.destination_hash)
861 {
862 actions.push(TransportAction::DeliverLocal {
863 destination_hash: packet.destination_hash,
864 raw: packet.raw.clone(),
865 packet_hash: packet.packet_hash,
866 receiving_interface: iface,
867 });
868 }
869
870 actions
871 }
872
873 fn process_inbound_announce(
878 &mut self,
879 packet: &RawPacket,
880 original_raw: &[u8],
881 iface: InterfaceId,
882 now: f64,
883 rng: &mut dyn Rng,
884 actions: &mut Vec<TransportAction>,
885 ) {
886 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
887 return;
888 }
889
890 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
891
892 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
894 Ok(a) => a,
895 Err(_) => return,
896 };
897
898 let sig_cache_key =
899 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
900
901 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
902 announce.to_validated_unchecked()
903 } else {
904 match announce.validate(&packet.destination_hash) {
905 Ok(v) => {
906 self.announce_sig_cache.insert(sig_cache_key, now);
907 v
908 }
909 Err(_) => return,
910 }
911 };
912
913 let received_from = self.announce_received_from(packet, now);
914 let random_blob = match extract_random_blob(&packet.data) {
915 Some(b) => b,
916 None => return,
917 };
918 let announce_emitted = timebase_from_random_blob(&random_blob);
919
920 self.process_verified_announce(
921 packet,
922 original_raw,
923 iface,
924 now,
925 rng,
926 validated,
927 received_from,
928 random_blob,
929 announce_emitted,
930 actions,
931 );
932 }
933
934 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
935 let mut material = [0u8; 80];
936 material[..16].copy_from_slice(&destination_hash);
937 material[16..].copy_from_slice(signature);
938 hash::full_hash(&material)
939 }
940
941 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
942 if let Some(transport_id) = packet.transport_id {
943 if self.config.transport_enabled {
944 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
945 {
946 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
947 announce_entry.local_rebroadcasts += 1;
948 if announce_entry.retries > 0
949 && announce_entry.local_rebroadcasts
950 >= constants::LOCAL_REBROADCASTS_MAX
951 {
952 self.announce_table.remove(&packet.destination_hash);
953 }
954 }
955 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
956 {
957 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
958 && announce_entry.retries > 0
959 && now < announce_entry.retransmit_timeout
960 {
961 self.announce_table.remove(&packet.destination_hash);
962 }
963 }
964 }
965 }
966 transport_id
967 } else {
968 packet.destination_hash
969 }
970 }
971
972 fn should_hold_announce(
973 &mut self,
974 packet: &RawPacket,
975 original_raw: &[u8],
976 iface: InterfaceId,
977 now: f64,
978 ) -> bool {
979 if self.has_path(&packet.destination_hash) {
980 return false;
981 }
982 if self
983 .discovery_path_requests
984 .contains_key(&packet.destination_hash)
985 {
986 return false;
987 }
988 let Some(info) = self.interfaces.get(&iface) else {
989 return false;
990 };
991 if packet.context == constants::CONTEXT_PATH_RESPONSE
992 || !self.ingress_control.should_ingress_limit(
993 iface,
994 &info.ingress_control,
995 info.ia_freq,
996 info.started,
997 now,
998 )
999 {
1000 return false;
1001 }
1002 self.ingress_control.hold_announce(
1003 iface,
1004 &info.ingress_control,
1005 packet.destination_hash,
1006 ingress_control::HeldAnnounce {
1007 raw: original_raw.to_vec(),
1008 hops: packet.hops,
1009 receiving_interface: iface,
1010 timestamp: now,
1011 },
1012 );
1013 true
1014 }
1015
1016 fn try_enqueue_announce(
1017 &mut self,
1018 packet: &RawPacket,
1019 original_raw: &[u8],
1020 iface: InterfaceId,
1021 now: f64,
1022 rng: &mut dyn Rng,
1023 announce_queue: &mut AnnounceVerifyQueue,
1024 actions: &mut Vec<TransportAction>,
1025 ) {
1026 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
1027 return;
1028 }
1029
1030 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
1031 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
1032 Ok(a) => a,
1033 Err(_) => return,
1034 };
1035
1036 let received_from = self.announce_received_from(packet, now);
1037
1038 if self
1039 .local_destinations
1040 .contains_key(&packet.destination_hash)
1041 {
1042 log::debug!(
1043 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1044 packet.destination_hash[0],
1045 packet.destination_hash[1],
1046 packet.destination_hash[2],
1047 packet.destination_hash[3],
1048 );
1049 return;
1050 }
1051
1052 if self.should_hold_announce(packet, original_raw, iface, now) {
1053 return;
1054 }
1055
1056 let sig_cache_key =
1057 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
1058 if self.announce_sig_cache.contains(&sig_cache_key) {
1059 let validated = announce.to_validated_unchecked();
1060 let random_blob = match extract_random_blob(&packet.data) {
1061 Some(b) => b,
1062 None => return,
1063 };
1064 let announce_emitted = timebase_from_random_blob(&random_blob);
1065 self.process_verified_announce(
1066 packet,
1067 original_raw,
1068 iface,
1069 now,
1070 rng,
1071 validated,
1072 received_from,
1073 random_blob,
1074 announce_emitted,
1075 actions,
1076 );
1077 return;
1078 }
1079
1080 if packet.context == constants::CONTEXT_PATH_RESPONSE {
1081 let Ok(validated) = announce.validate(&packet.destination_hash) else {
1082 return;
1083 };
1084 self.announce_sig_cache.insert(sig_cache_key, now);
1085 let random_blob = match extract_random_blob(&packet.data) {
1086 Some(b) => b,
1087 None => return,
1088 };
1089 let announce_emitted = timebase_from_random_blob(&random_blob);
1090 self.process_verified_announce(
1091 packet,
1092 original_raw,
1093 iface,
1094 now,
1095 rng,
1096 validated,
1097 received_from,
1098 random_blob,
1099 announce_emitted,
1100 actions,
1101 );
1102 return;
1103 }
1104
1105 let random_blob = match extract_random_blob(&packet.data) {
1106 Some(b) => b,
1107 None => return,
1108 };
1109 let announce_emitted = timebase_from_random_blob(&random_blob);
1110 let key = AnnounceVerifyKey {
1111 destination_hash: packet.destination_hash,
1112 random_blob,
1113 received_from,
1114 };
1115 let pending = PendingAnnounce {
1116 original_raw: original_raw.to_vec(),
1117 packet: packet.clone(),
1118 interface: iface,
1119 received_from,
1120 queued_at: now,
1121 best_hops: packet.hops,
1122 emission_ts: announce_emitted,
1123 random_blob,
1124 };
1125 let _ = announce_queue.enqueue(key, pending);
1126 }
1127
1128 pub fn complete_verified_announce(
1129 &mut self,
1130 pending: PendingAnnounce,
1131 validated: crate::announce::ValidatedAnnounce,
1132 sig_cache_key: [u8; 32],
1133 now: f64,
1134 rng: &mut dyn Rng,
1135 ) -> Vec<TransportAction> {
1136 self.announce_sig_cache.insert(sig_cache_key, now);
1137 let mut actions = Vec::new();
1138 self.process_verified_announce(
1139 &pending.packet,
1140 &pending.original_raw,
1141 pending.interface,
1142 now,
1143 rng,
1144 validated,
1145 pending.received_from,
1146 pending.random_blob,
1147 pending.emission_ts,
1148 &mut actions,
1149 );
1150 actions
1151 }
1152
1153 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1154
1155 fn process_verified_announce(
1156 &mut self,
1157 packet: &RawPacket,
1158 original_raw: &[u8],
1159 iface: InterfaceId,
1160 now: f64,
1161 rng: &mut dyn Rng,
1162 validated: crate::announce::ValidatedAnnounce,
1163 received_from: [u8; 16],
1164 random_blob: [u8; 10],
1165 announce_emitted: u64,
1166 actions: &mut Vec<TransportAction>,
1167 ) {
1168 if self.is_blackholed(&validated.identity_hash, now) {
1169 return;
1170 }
1171 if packet.hops > constants::PATHFINDER_M {
1172 return;
1173 }
1174
1175 let existing_set = self.path_table.get(&packet.destination_hash);
1177 let is_unresponsive = self.path_is_unresponsive(&packet.destination_hash);
1178
1179 let mp_decision = decide_announce_multipath(
1180 existing_set,
1181 packet.hops,
1182 announce_emitted,
1183 &random_blob,
1184 &received_from,
1185 is_unresponsive,
1186 now,
1187 self.config.prefer_shorter_path,
1188 );
1189
1190 if mp_decision == MultiPathDecision::Reject {
1191 log::debug!(
1192 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1193 packet.destination_hash[0],
1194 packet.destination_hash[1],
1195 packet.destination_hash[2],
1196 packet.destination_hash[3],
1197 );
1198 return;
1199 }
1200
1201 let rate_blocked = if packet.context != constants::CONTEXT_PATH_RESPONSE {
1203 if let Some(iface_info) = self.interfaces.get(&iface) {
1204 self.rate_limiter.check_and_update(
1205 &packet.destination_hash,
1206 now,
1207 iface_info.announce_rate_target,
1208 iface_info.announce_rate_grace,
1209 iface_info.announce_rate_penalty,
1210 )
1211 } else {
1212 false
1213 }
1214 } else {
1215 false
1216 };
1217
1218 let interface_mode = self
1220 .interfaces
1221 .get(&iface)
1222 .map(|i| i.mode)
1223 .unwrap_or(constants::MODE_FULL);
1224
1225 let expires = compute_path_expires(now, interface_mode);
1226
1227 let existing_blobs = self
1229 .path_table
1230 .get(&packet.destination_hash)
1231 .and_then(|ps| ps.find_by_next_hop(&received_from))
1232 .map(|e| e.random_blobs.clone())
1233 .unwrap_or_default();
1234
1235 let mut rng_bytes = [0u8; 8];
1237 rng.fill_bytes(&mut rng_bytes);
1238 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1239
1240 let is_path_response = packet.context == constants::CONTEXT_PATH_RESPONSE;
1241
1242 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1243 packet.destination_hash,
1244 packet.hops,
1245 &packet.data,
1246 &packet.raw,
1247 packet.packet_hash,
1248 packet.flags.context_flag,
1249 received_from,
1250 iface,
1251 now,
1252 existing_blobs,
1253 random_blob,
1254 expires,
1255 rng_value,
1256 self.config.transport_enabled,
1257 is_path_response,
1258 rate_blocked,
1259 Some(original_raw.to_vec()),
1260 );
1261
1262 actions.push(TransportAction::CacheAnnounce {
1264 packet_hash: packet.packet_hash,
1265 raw: original_raw.to_vec(),
1266 });
1267
1268 self.upsert_path_destination(packet.destination_hash, path_entry, now);
1270
1271 if let Some(tunnel_id) = self.interfaces.get(&iface).and_then(|i| i.tunnel_id) {
1273 let blobs = self
1274 .path_table
1275 .get(&packet.destination_hash)
1276 .and_then(|ps| ps.find_by_next_hop(&received_from))
1277 .map(|e| e.random_blobs.clone())
1278 .unwrap_or_default();
1279 self.tunnel_table.store_tunnel_path(
1280 &tunnel_id,
1281 packet.destination_hash,
1282 tunnel::TunnelPath {
1283 timestamp: now,
1284 received_from,
1285 hops: packet.hops,
1286 expires,
1287 random_blobs: blobs,
1288 packet_hash: packet.packet_hash,
1289 },
1290 now,
1291 self.config.destination_timeout_secs,
1292 self.config.max_tunnel_destinations_total,
1293 );
1294 }
1295
1296 self.path_states.remove(&packet.destination_hash);
1298
1299 if let Some(ann) = announce_entry {
1301 self.insert_announce_entry(packet.destination_hash, ann, now);
1302 }
1303
1304 actions.push(TransportAction::AnnounceReceived {
1306 destination_hash: packet.destination_hash,
1307 identity_hash: validated.identity_hash,
1308 public_key: validated.public_key,
1309 name_hash: validated.name_hash,
1310 random_hash: validated.random_hash,
1311 app_data: validated.app_data,
1312 hops: packet.hops,
1313 receiving_interface: iface,
1314 });
1315
1316 actions.push(TransportAction::PathUpdated {
1317 destination_hash: packet.destination_hash,
1318 hops: packet.hops,
1319 next_hop: received_from,
1320 interface: iface,
1321 });
1322
1323 if self.has_local_clients() {
1325 actions.push(TransportAction::ForwardToLocalClients {
1326 raw: packet.raw.clone(),
1327 exclude: Some(iface),
1328 });
1329 }
1330
1331 if let Some(pr_entry) = self.discovery_path_requests_waiting(&packet.destination_hash) {
1333 let entry = AnnounceEntry {
1335 timestamp: now,
1336 retransmit_timeout: now,
1337 retries: constants::PATHFINDER_R,
1338 received_from,
1339 hops: packet.hops,
1340 packet_raw: packet.raw.clone(),
1341 packet_data: packet.data.clone(),
1342 destination_hash: packet.destination_hash,
1343 context_flag: packet.flags.context_flag,
1344 local_rebroadcasts: 0,
1345 block_rebroadcasts: true,
1346 attached_interface: Some(pr_entry),
1347 };
1348 self.insert_announce_entry(packet.destination_hash, entry, now);
1349 }
1350 }
1351
1352 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1353 self.announce_sig_cache.contains(sig_cache_key)
1354 }
1355
1356 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1359 self.discovery_path_requests
1360 .remove(dest_hash)
1361 .map(|req| req.requesting_interface)
1362 }
1363
1364 fn process_inbound_proof(
1369 &mut self,
1370 packet: &RawPacket,
1371 iface: InterfaceId,
1372 _now: f64,
1373 actions: &mut Vec<TransportAction>,
1374 ) {
1375 if packet.context == constants::CONTEXT_LRPROOF {
1376 if (self.config.transport_enabled)
1378 && self.link_table.contains_key(&packet.destination_hash)
1379 {
1380 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1381 if let Some(entry) = link_entry {
1382 if let Some((outbound_interface, new_raw)) =
1383 route_via_link_table(packet, &entry, iface)
1384 {
1385 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1390 le.validated = true;
1391 }
1392
1393 actions.push(TransportAction::LinkEstablished {
1394 link_id: packet.destination_hash,
1395 interface: outbound_interface,
1396 });
1397
1398 actions.push(TransportAction::SendOnInterface {
1399 interface: outbound_interface,
1400 raw: new_raw,
1401 });
1402 }
1403 }
1404 } else {
1405 actions.push(TransportAction::DeliverLocal {
1407 destination_hash: packet.destination_hash,
1408 raw: packet.raw.clone(),
1409 packet_hash: packet.packet_hash,
1410 receiving_interface: iface,
1411 });
1412 }
1413 } else {
1414 if self.config.transport_enabled {
1416 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1417 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1418 actions.push(action);
1419 }
1420 }
1421 }
1422
1423 actions.push(TransportAction::DeliverLocal {
1425 destination_hash: packet.destination_hash,
1426 raw: packet.raw.clone(),
1427 packet_hash: packet.packet_hash,
1428 receiving_interface: iface,
1429 });
1430 }
1431 }
1432
1433 pub fn handle_outbound(
1439 &mut self,
1440 packet: &RawPacket,
1441 dest_type: u8,
1442 attached_interface: Option<InterfaceId>,
1443 now: f64,
1444 ) -> Vec<TransportAction> {
1445 let actions = route_outbound(
1446 &self.path_table,
1447 &self.interfaces,
1448 &self.local_destinations,
1449 packet,
1450 dest_type,
1451 attached_interface,
1452 now,
1453 );
1454
1455 self.packet_hashlist.add(packet.packet_hash);
1457
1458 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1460 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1461 } else {
1462 actions
1463 }
1464 }
1465
1466 fn gate_announce_actions(
1468 &mut self,
1469 actions: Vec<TransportAction>,
1470 dest_hash: &[u8; 16],
1471 hops: u8,
1472 now: f64,
1473 ) -> Vec<TransportAction> {
1474 let mut result = Vec::new();
1475 for action in actions {
1476 match action {
1477 TransportAction::SendOnInterface { interface, raw } => {
1478 let (bitrate, announce_cap) =
1479 if let Some(info) = self.interfaces.get(&interface) {
1480 (info.bitrate, info.announce_cap)
1481 } else {
1482 (None, constants::ANNOUNCE_CAP)
1483 };
1484 if let Some(send_action) = self.announce_queues.gate_announce(
1485 interface,
1486 raw,
1487 *dest_hash,
1488 hops,
1489 now,
1490 now,
1491 bitrate,
1492 announce_cap,
1493 ) {
1494 result.push(send_action);
1495 }
1496 }
1498 other => result.push(other),
1499 }
1500 }
1501 result
1502 }
1503
1504 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1510 let mut actions = Vec::new();
1511
1512 if now > self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1514 self.cull_expired_announce_entries(now);
1515 self.enforce_announce_retention_cap(now);
1516 if let Some(ref identity_hash) = self.config.identity_hash {
1517 let ih = *identity_hash;
1518 let announce_actions = jobs::process_pending_announces(
1519 &mut self.announce_table,
1520 &mut self.held_announces,
1521 &ih,
1522 now,
1523 );
1524 let gated = self.gate_retransmit_actions(announce_actions, now);
1526 actions.extend(gated);
1527 }
1528 self.cull_expired_announce_entries(now);
1529 self.enforce_announce_retention_cap(now);
1530 self.announces_last_checked = now;
1531 }
1532
1533 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1535 actions.append(&mut queue_actions);
1536
1537 let ic_interfaces = self.ingress_control.interfaces_with_held();
1539 for iface_id in ic_interfaces {
1540 let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1541 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1542 None => continue,
1543 };
1544 if !ingress_config.enabled {
1545 continue;
1546 }
1547 if let Some(held) = self.ingress_control.process_held_announces(
1548 iface_id,
1549 &ingress_config,
1550 ia_freq,
1551 started,
1552 now,
1553 ) {
1554 let released_actions =
1555 self.handle_inbound(&held.raw, held.receiving_interface, now, rng);
1556 actions.extend(released_actions);
1557 }
1558 }
1559
1560 if now > self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1562 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
1563 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, now);
1564 let (_culled, link_closed_actions) =
1565 jobs::cull_link_table(&mut self.link_table, &self.interfaces, now);
1566 actions.extend(link_closed_actions);
1567 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1568 self.cull_blackholed(now);
1569 self.discovery_path_requests
1571 .retain(|_, req| now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1572 self.tunnel_table
1574 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1575 self.tunnel_table.cull(now);
1576 self.announce_sig_cache.cull(now);
1577 self.tables_last_culled = now;
1578 }
1579
1580 actions
1581 }
1582
1583 fn gate_retransmit_actions(
1588 &mut self,
1589 actions: Vec<TransportAction>,
1590 now: f64,
1591 ) -> Vec<TransportAction> {
1592 let mut result = Vec::new();
1593 for action in actions {
1594 match action {
1595 TransportAction::SendOnInterface { interface, raw } => {
1596 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1598 let (bitrate, announce_cap) =
1599 if let Some(info) = self.interfaces.get(&interface) {
1600 (info.bitrate, info.announce_cap)
1601 } else {
1602 (None, constants::ANNOUNCE_CAP)
1603 };
1604 if let Some(send_action) = self.announce_queues.gate_announce(
1605 interface,
1606 raw,
1607 dest_hash,
1608 hops,
1609 now,
1610 now,
1611 bitrate,
1612 announce_cap,
1613 ) {
1614 result.push(send_action);
1615 }
1616 }
1617 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1618 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1619 let iface_ids: Vec<(InterfaceId, Option<u64>, f64)> = self
1622 .interfaces
1623 .iter()
1624 .filter(|(_, info)| info.out_capable)
1625 .filter(|(id, _)| {
1626 if let Some(ref ex) = exclude {
1627 **id != *ex
1628 } else {
1629 true
1630 }
1631 })
1632 .filter(|(_, info)| {
1633 should_transmit_announce(
1634 info,
1635 &dest_hash,
1636 hops,
1637 &self.local_destinations,
1638 &self.path_table,
1639 &self.interfaces,
1640 )
1641 })
1642 .map(|(id, info)| (*id, info.bitrate, info.announce_cap))
1643 .collect();
1644
1645 for (iface_id, bitrate, announce_cap) in iface_ids {
1646 if let Some(send_action) = self.announce_queues.gate_announce(
1647 iface_id,
1648 raw.clone(),
1649 dest_hash,
1650 hops,
1651 now,
1652 now,
1653 bitrate,
1654 announce_cap,
1655 ) {
1656 result.push(send_action);
1657 }
1658 }
1659 }
1660 other => result.push(other),
1661 }
1662 }
1663 result
1664 }
1665
1666 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1668 if raw.len() < 18 {
1669 return ([0; 16], 0);
1670 }
1671 let header_type = (raw[0] >> 6) & 0x03;
1672 let hops = raw[1];
1673 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1674 let mut dest = [0u8; 16];
1676 dest.copy_from_slice(&raw[18..34]);
1677 (dest, hops)
1678 } else {
1679 let mut dest = [0u8; 16];
1681 dest.copy_from_slice(&raw[2..18]);
1682 (dest, hops)
1683 }
1684 }
1685
1686 pub fn handle_path_request(
1699 &mut self,
1700 data: &[u8],
1701 interface_id: InterfaceId,
1702 now: f64,
1703 ) -> Vec<TransportAction> {
1704 let mut actions = Vec::new();
1705
1706 if data.len() < 16 {
1707 return actions;
1708 }
1709
1710 let mut destination_hash = [0u8; 16];
1711 destination_hash.copy_from_slice(&data[..16]);
1712
1713 let _requesting_transport_id = if data.len() > 32 {
1715 let mut id = [0u8; 16];
1716 id.copy_from_slice(&data[16..32]);
1717 Some(id)
1718 } else {
1719 None
1720 };
1721
1722 let tag_bytes = if data.len() > 32 {
1724 Some(&data[32..])
1725 } else if data.len() > 16 {
1726 Some(&data[16..])
1727 } else {
1728 None
1729 };
1730
1731 if let Some(tag) = tag_bytes {
1732 let tag_len = tag.len().min(16);
1733 let mut unique_tag = [0u8; 32];
1734 unique_tag[..16].copy_from_slice(&destination_hash);
1735 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1736
1737 if !self.insert_discovery_pr_tag(unique_tag) {
1738 return actions; }
1740 } else {
1741 return actions; }
1743
1744 if self.local_destinations.contains_key(&destination_hash) {
1746 return actions;
1747 }
1748
1749 if self.config.transport_enabled && self.has_path(&destination_hash) {
1751 let path = self
1752 .path_table
1753 .get(&destination_hash)
1754 .unwrap()
1755 .primary()
1756 .unwrap()
1757 .clone();
1758
1759 if let Some(recv_info) = self.interfaces.get(&interface_id) {
1763 if recv_info.mode == constants::MODE_ROAMING
1764 && path.receiving_interface == interface_id
1765 {
1766 return actions;
1767 }
1768 }
1769
1770 if let Some(ref raw) = path.announce_raw {
1774 if let Some(existing) = self.announce_table.remove(&destination_hash) {
1776 self.insert_held_announce(destination_hash, existing, now);
1777 }
1778 let retransmit_timeout =
1779 if let Some(iface_info) = self.interfaces.get(&interface_id) {
1780 let base = now + constants::PATH_REQUEST_GRACE;
1781 if iface_info.mode == constants::MODE_ROAMING {
1782 base + constants::PATH_REQUEST_RG
1783 } else {
1784 base
1785 }
1786 } else {
1787 now + constants::PATH_REQUEST_GRACE
1788 };
1789
1790 let (packet_data, context_flag) = match RawPacket::unpack(raw) {
1791 Ok(parsed) => (parsed.data, parsed.flags.context_flag),
1792 Err(_) => {
1793 return actions;
1794 }
1795 };
1796
1797 let entry = AnnounceEntry {
1798 timestamp: now,
1799 retransmit_timeout,
1800 retries: constants::PATHFINDER_R,
1801 received_from: path.next_hop,
1802 hops: path.hops,
1803 packet_raw: raw.clone(),
1804 packet_data,
1805 destination_hash,
1806 context_flag,
1807 local_rebroadcasts: 0,
1808 block_rebroadcasts: true,
1809 attached_interface: Some(interface_id),
1810 };
1811
1812 self.insert_announce_entry(destination_hash, entry, now);
1813 }
1814 } else if self.config.transport_enabled {
1815 let should_discover = self
1817 .interfaces
1818 .get(&interface_id)
1819 .map(|info| constants::DISCOVER_PATHS_FOR.contains(&info.mode))
1820 .unwrap_or(false);
1821
1822 if should_discover {
1823 self.discovery_path_requests.insert(
1825 destination_hash,
1826 DiscoveryPathRequest {
1827 timestamp: now,
1828 requesting_interface: interface_id,
1829 },
1830 );
1831
1832 for (_, iface_info) in self.interfaces.iter() {
1834 if iface_info.id != interface_id && iface_info.out_capable {
1835 actions.push(TransportAction::SendOnInterface {
1836 interface: iface_info.id,
1837 raw: data.to_vec(),
1838 });
1839 }
1840 }
1841 }
1842 }
1843
1844 actions
1845 }
1846
1847 pub fn path_table_entries(&self) -> impl Iterator<Item = (&[u8; 16], &PathEntry)> {
1853 self.path_table
1854 .iter()
1855 .filter_map(|(k, ps)| ps.primary().map(|e| (k, e)))
1856 }
1857
1858 pub fn path_table_sets(&self) -> impl Iterator<Item = (&[u8; 16], &PathSet)> {
1860 self.path_table.iter()
1861 }
1862
1863 pub fn interface_count(&self) -> usize {
1865 self.interfaces.len()
1866 }
1867
1868 pub fn link_table_count(&self) -> usize {
1870 self.link_table.len()
1871 }
1872
1873 pub fn path_table_count(&self) -> usize {
1875 self.path_table.len()
1876 }
1877
1878 pub fn announce_table_count(&self) -> usize {
1880 self.announce_table.len()
1881 }
1882
1883 pub fn reverse_table_count(&self) -> usize {
1885 self.reverse_table.len()
1886 }
1887
1888 pub fn held_announces_count(&self) -> usize {
1890 self.held_announces.len()
1891 }
1892
1893 pub fn packet_hashlist_len(&self) -> usize {
1895 self.packet_hashlist.len()
1896 }
1897
1898 pub fn announce_sig_cache_len(&self) -> usize {
1900 self.announce_sig_cache.len()
1901 }
1902
1903 pub fn rate_limiter_count(&self) -> usize {
1905 self.rate_limiter.len()
1906 }
1907
1908 pub fn blackholed_count(&self) -> usize {
1910 self.blackholed_identities.len()
1911 }
1912
1913 pub fn tunnel_count(&self) -> usize {
1915 self.tunnel_table.len()
1916 }
1917
1918 pub fn discovery_pr_tags_count(&self) -> usize {
1920 self.discovery_pr_tags.len()
1921 }
1922
1923 pub fn discovery_path_requests_count(&self) -> usize {
1925 self.discovery_path_requests.len()
1926 }
1927
1928 pub fn announce_queue_count(&self) -> usize {
1930 self.announce_queues.queue_count()
1931 }
1932
1933 pub fn nonempty_announce_queue_count(&self) -> usize {
1935 self.announce_queues.nonempty_queue_count()
1936 }
1937
1938 pub fn queued_announce_count(&self) -> usize {
1940 self.announce_queues.total_queued_announces()
1941 }
1942
1943 pub fn queued_announce_bytes(&self) -> usize {
1945 self.announce_queues.total_queued_bytes()
1946 }
1947
1948 pub fn announce_queue_interface_cap_drop_count(&self) -> u64 {
1950 self.announce_queues.interface_cap_drop_count()
1951 }
1952
1953 pub fn local_destinations_count(&self) -> usize {
1955 self.local_destinations.len()
1956 }
1957
1958 pub fn rate_limiter(&self) -> &AnnounceRateLimiter {
1960 &self.rate_limiter
1961 }
1962
1963 pub fn interface_info(&self, id: &InterfaceId) -> Option<&InterfaceInfo> {
1965 self.interfaces.get(id)
1966 }
1967
1968 pub fn redirect_path(&mut self, dest_hash: &[u8; 16], interface: InterfaceId, now: f64) {
1971 if let Some(entry) = self
1972 .path_table
1973 .get_mut(dest_hash)
1974 .and_then(|ps| ps.primary_mut())
1975 {
1976 entry.receiving_interface = interface;
1977 entry.hops = 1;
1978 } else {
1979 self.upsert_path_destination(
1980 *dest_hash,
1981 PathEntry {
1982 timestamp: now,
1983 next_hop: [0u8; 16],
1984 hops: 1,
1985 expires: now + 3600.0,
1986 random_blobs: Vec::new(),
1987 receiving_interface: interface,
1988 packet_hash: [0u8; 32],
1989 announce_raw: None,
1990 },
1991 now,
1992 );
1993 }
1994 }
1995
1996 pub fn inject_path(&mut self, dest_hash: [u8; 16], entry: PathEntry) {
1998 self.upsert_path_destination(dest_hash, entry.clone(), entry.timestamp);
1999 }
2000
2001 pub fn drop_path(&mut self, dest_hash: &[u8; 16]) -> bool {
2003 self.path_table.remove(dest_hash).is_some()
2004 }
2005
2006 pub fn drop_all_via(&mut self, transport_hash: &[u8; 16]) -> usize {
2011 let mut removed = 0usize;
2012 for ps in self.path_table.values_mut() {
2013 let before = ps.len();
2014 ps.retain(|entry| &entry.next_hop != transport_hash);
2015 removed += before - ps.len();
2016 }
2017 self.path_table.retain(|_, ps| !ps.is_empty());
2018 removed
2019 }
2020
2021 pub fn drop_paths_for_interface(&mut self, interface: InterfaceId) -> usize {
2023 let mut removed = 0usize;
2024 let mut cleared_destinations = Vec::new();
2025 for (dest_hash, ps) in self.path_table.iter_mut() {
2026 let before = ps.len();
2027 ps.retain(|entry| entry.receiving_interface != interface);
2028 if ps.is_empty() {
2029 cleared_destinations.push(*dest_hash);
2030 }
2031 removed += before - ps.len();
2032 }
2033 self.path_table.retain(|_, ps| !ps.is_empty());
2034 for dest_hash in cleared_destinations {
2035 self.path_states.remove(&dest_hash);
2036 }
2037 removed
2038 }
2039
2040 pub fn drop_reverse_for_interface(&mut self, interface: InterfaceId) -> usize {
2042 let before = self.reverse_table.len();
2043 self.reverse_table.retain(|_, entry| {
2044 entry.receiving_interface != interface && entry.outbound_interface != interface
2045 });
2046 before - self.reverse_table.len()
2047 }
2048
2049 pub fn drop_links_for_interface(&mut self, interface: InterfaceId) -> usize {
2051 let before = self.link_table.len();
2052 self.link_table.retain(|_, entry| {
2053 entry.next_hop_interface != interface && entry.received_interface != interface
2054 });
2055 before - self.link_table.len()
2056 }
2057
2058 pub fn drop_announce_queues(&mut self) {
2060 self.announce_table.clear();
2061 self.held_announces.clear();
2062 self.announce_queues = AnnounceQueues::new(self.config.announce_queue_max_interfaces);
2063 self.ingress_control.clear();
2064 }
2065
2066 pub fn identity_hash(&self) -> Option<&[u8; 16]> {
2068 self.config.identity_hash.as_ref()
2069 }
2070
2071 pub fn transport_enabled(&self) -> bool {
2073 self.config.transport_enabled
2074 }
2075
2076 pub fn config(&self) -> &TransportConfig {
2078 &self.config
2079 }
2080
2081 pub fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
2082 self.config.packet_hashlist_max_entries = max_entries;
2083 self.packet_hashlist = PacketHashlist::new(max_entries);
2084 }
2085
2086 pub fn get_path_table(&self, max_hops: Option<u8>) -> Vec<PathTableRow> {
2090 let mut result = Vec::new();
2091 for (dest_hash, ps) in self.path_table.iter() {
2092 if let Some(entry) = ps.primary() {
2093 if let Some(max) = max_hops {
2094 if entry.hops > max {
2095 continue;
2096 }
2097 }
2098 let iface_name = self
2099 .interfaces
2100 .get(&entry.receiving_interface)
2101 .map(|i| i.name.clone())
2102 .unwrap_or_else(|| {
2103 alloc::format!("Interface({})", entry.receiving_interface.0)
2104 });
2105 result.push((
2106 *dest_hash,
2107 entry.timestamp,
2108 entry.next_hop,
2109 entry.hops,
2110 entry.expires,
2111 iface_name,
2112 ));
2113 }
2114 }
2115 result
2116 }
2117
2118 pub fn get_rate_table(&self) -> Vec<RateTableRow> {
2121 self.rate_limiter
2122 .entries()
2123 .map(|(hash, entry)| {
2124 (
2125 *hash,
2126 entry.last,
2127 entry.rate_violations,
2128 entry.blocked_until,
2129 entry.timestamps.clone(),
2130 )
2131 })
2132 .collect()
2133 }
2134
2135 pub fn get_blackholed(&self) -> Vec<([u8; 16], f64, f64, Option<alloc::string::String>)> {
2138 self.blackholed_entries()
2139 .map(|(hash, entry)| (*hash, entry.created, entry.expires, entry.reason.clone()))
2140 .collect()
2141 }
2142
2143 pub fn active_destination_hashes(&self) -> alloc::collections::BTreeSet<[u8; 16]> {
2149 self.path_table.keys().copied().collect()
2150 }
2151
2152 pub fn path_destination_cap_evict_count(&self) -> usize {
2153 self.path_destination_cap_evict_count
2154 }
2155
2156 pub fn active_packet_hashes(&self) -> Vec<[u8; 32]> {
2158 self.path_table
2159 .values()
2160 .flat_map(|ps| ps.iter().map(|p| p.packet_hash))
2161 .collect()
2162 }
2163
2164 pub fn cull_rate_limiter(
2167 &mut self,
2168 active: &alloc::collections::BTreeSet<[u8; 16]>,
2169 now: f64,
2170 ttl_secs: f64,
2171 ) -> usize {
2172 self.rate_limiter.cull_stale(active, now, ttl_secs)
2173 }
2174
2175 pub fn update_interface_freq(&mut self, id: InterfaceId, ia_freq: f64) {
2181 if let Some(info) = self.interfaces.get_mut(&id) {
2182 info.ia_freq = ia_freq;
2183 }
2184 }
2185
2186 pub fn held_announce_count(&self, interface: &InterfaceId) -> usize {
2188 self.ingress_control.held_count(interface)
2189 }
2190
2191 #[cfg(test)]
2196 #[allow(dead_code)]
2197 pub(crate) fn path_table(&self) -> &BTreeMap<[u8; 16], PathSet> {
2198 &self.path_table
2199 }
2200
2201 #[cfg(test)]
2202 #[allow(dead_code)]
2203 pub(crate) fn announce_table(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2204 &self.announce_table
2205 }
2206
2207 #[cfg(test)]
2208 #[allow(dead_code)]
2209 pub(crate) fn held_announces(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2210 &self.held_announces
2211 }
2212
2213 #[cfg(test)]
2214 #[allow(dead_code)]
2215 pub(crate) fn announce_retained_bytes(&self) -> usize {
2216 self.announce_retained_bytes_total()
2217 }
2218
2219 #[cfg(test)]
2220 #[allow(dead_code)]
2221 pub(crate) fn reverse_table(&self) -> &BTreeMap<[u8; 16], tables::ReverseEntry> {
2222 &self.reverse_table
2223 }
2224
2225 #[cfg(test)]
2226 #[allow(dead_code)]
2227 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
2228 &self.link_table
2229 }
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234 use super::*;
2235 use crate::packet::PacketFlags;
2236
2237 fn make_config(transport_enabled: bool) -> TransportConfig {
2238 TransportConfig {
2239 transport_enabled,
2240 identity_hash: if transport_enabled {
2241 Some([0x42; 16])
2242 } else {
2243 None
2244 },
2245 prefer_shorter_path: false,
2246 max_paths_per_destination: 1,
2247 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
2248 max_discovery_pr_tags: constants::MAX_PR_TAGS,
2249 max_path_destinations: usize::MAX,
2250 max_tunnel_destinations_total: usize::MAX,
2251 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
2252 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
2253 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
2254 announce_sig_cache_enabled: true,
2255 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
2256 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
2257 announce_queue_max_entries: 256,
2258 announce_queue_max_interfaces: 1024,
2259 }
2260 }
2261
2262 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
2263 InterfaceInfo {
2264 id: InterfaceId(id),
2265 name: String::from("test"),
2266 mode,
2267 out_capable: true,
2268 in_capable: true,
2269 bitrate: None,
2270 announce_rate_target: None,
2271 announce_rate_grace: 0,
2272 announce_rate_penalty: 0.0,
2273 announce_cap: constants::ANNOUNCE_CAP,
2274 is_local_client: false,
2275 wants_tunnel: false,
2276 tunnel_id: None,
2277 mtu: constants::MTU as u32,
2278 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2279 ia_freq: 0.0,
2280 started: 0.0,
2281 }
2282 }
2283
2284 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
2285 AnnounceEntry {
2286 timestamp,
2287 retransmit_timeout: timestamp,
2288 retries: 0,
2289 received_from: [0xAA; 16],
2290 hops: 2,
2291 packet_raw: vec![0x01; fill_len],
2292 packet_data: vec![0x02; fill_len],
2293 destination_hash: dest_hash,
2294 context_flag: 0,
2295 local_rebroadcasts: 0,
2296 block_rebroadcasts: false,
2297 attached_interface: None,
2298 }
2299 }
2300
2301 fn make_path_entry(
2302 timestamp: f64,
2303 hops: u8,
2304 receiving_interface: InterfaceId,
2305 next_hop: [u8; 16],
2306 ) -> PathEntry {
2307 PathEntry {
2308 timestamp,
2309 next_hop,
2310 hops,
2311 expires: timestamp + 10_000.0,
2312 random_blobs: Vec::new(),
2313 receiving_interface,
2314 packet_hash: [0; 32],
2315 announce_raw: None,
2316 }
2317 }
2318
2319 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
2320 let mut unique_tag = [0u8; 32];
2321 let tag_len = tag.len().min(16);
2322 unique_tag[..16].copy_from_slice(&dest_hash);
2323 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
2324 unique_tag
2325 }
2326
2327 fn make_random_blob(timebase: u64) -> [u8; 10] {
2328 let mut blob = [0u8; 10];
2329 let bytes = timebase.to_be_bytes();
2330 blob[5..10].copy_from_slice(&bytes[3..8]);
2331 blob
2332 }
2333
2334 #[test]
2335 fn test_empty_engine() {
2336 let engine = TransportEngine::new(make_config(false));
2337 assert!(!engine.has_path(&[0; 16]));
2338 assert!(engine.hops_to(&[0; 16]).is_none());
2339 assert!(engine.next_hop(&[0; 16]).is_none());
2340 }
2341
2342 #[test]
2343 fn test_register_deregister_interface() {
2344 let mut engine = TransportEngine::new(make_config(false));
2345 engine.register_interface(make_interface(1, constants::MODE_FULL));
2346 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
2347
2348 engine.deregister_interface(InterfaceId(1));
2349 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
2350 }
2351
2352 #[test]
2353 fn test_deregister_interface_removes_announce_queue_state() {
2354 let mut engine = TransportEngine::new(make_config(false));
2355 engine.register_interface(make_interface(1, constants::MODE_FULL));
2356
2357 let _ = engine.announce_queues.gate_announce(
2358 InterfaceId(1),
2359 vec![0x01; 100],
2360 [0xAA; 16],
2361 2,
2362 0.0,
2363 0.0,
2364 Some(1000),
2365 constants::ANNOUNCE_CAP,
2366 );
2367 let _ = engine.announce_queues.gate_announce(
2368 InterfaceId(1),
2369 vec![0x02; 100],
2370 [0xBB; 16],
2371 3,
2372 0.0,
2373 0.0,
2374 Some(1000),
2375 constants::ANNOUNCE_CAP,
2376 );
2377 assert_eq!(engine.announce_queue_count(), 1);
2378
2379 engine.deregister_interface(InterfaceId(1));
2380 assert_eq!(engine.announce_queue_count(), 0);
2381 }
2382
2383 #[test]
2384 fn test_deregister_interface_preserves_other_announce_queues() {
2385 let mut engine = TransportEngine::new(make_config(false));
2386 engine.register_interface(make_interface(1, constants::MODE_FULL));
2387 engine.register_interface(make_interface(2, constants::MODE_FULL));
2388
2389 let _ = engine.announce_queues.gate_announce(
2390 InterfaceId(1),
2391 vec![0x01; 100],
2392 [0xAA; 16],
2393 2,
2394 0.0,
2395 0.0,
2396 Some(1000),
2397 constants::ANNOUNCE_CAP,
2398 );
2399 let _ = engine.announce_queues.gate_announce(
2400 InterfaceId(1),
2401 vec![0x02; 100],
2402 [0xAB; 16],
2403 3,
2404 0.0,
2405 0.0,
2406 Some(1000),
2407 constants::ANNOUNCE_CAP,
2408 );
2409 let _ = engine.announce_queues.gate_announce(
2410 InterfaceId(2),
2411 vec![0x03; 100],
2412 [0xBA; 16],
2413 2,
2414 0.0,
2415 0.0,
2416 Some(1000),
2417 constants::ANNOUNCE_CAP,
2418 );
2419 let _ = engine.announce_queues.gate_announce(
2420 InterfaceId(2),
2421 vec![0x04; 100],
2422 [0xBB; 16],
2423 3,
2424 0.0,
2425 0.0,
2426 Some(1000),
2427 constants::ANNOUNCE_CAP,
2428 );
2429
2430 engine.deregister_interface(InterfaceId(1));
2431 assert_eq!(engine.announce_queue_count(), 1);
2432 assert_eq!(engine.nonempty_announce_queue_count(), 1);
2433 }
2434
2435 #[test]
2436 fn test_register_deregister_destination() {
2437 let mut engine = TransportEngine::new(make_config(false));
2438 let dest = [0x11; 16];
2439 engine.register_destination(dest, constants::DESTINATION_SINGLE);
2440 assert!(engine.local_destinations.contains_key(&dest));
2441
2442 engine.deregister_destination(&dest);
2443 assert!(!engine.local_destinations.contains_key(&dest));
2444 }
2445
2446 #[test]
2447 fn test_path_state() {
2448 let mut engine = TransportEngine::new(make_config(false));
2449 let dest = [0x22; 16];
2450
2451 assert!(!engine.path_is_unresponsive(&dest));
2452
2453 engine.mark_path_unresponsive(&dest, None);
2454 assert!(engine.path_is_unresponsive(&dest));
2455
2456 engine.mark_path_responsive(&dest);
2457 assert!(!engine.path_is_unresponsive(&dest));
2458 }
2459
2460 #[test]
2461 fn test_boundary_exempts_unresponsive() {
2462 let mut engine = TransportEngine::new(make_config(false));
2463 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2464 let dest = [0xB1; 16];
2465
2466 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2468 assert!(!engine.path_is_unresponsive(&dest));
2469 }
2470
2471 #[test]
2472 fn test_non_boundary_marks_unresponsive() {
2473 let mut engine = TransportEngine::new(make_config(false));
2474 engine.register_interface(make_interface(1, constants::MODE_FULL));
2475 let dest = [0xB2; 16];
2476
2477 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2479 assert!(engine.path_is_unresponsive(&dest));
2480 }
2481
2482 #[test]
2483 fn test_expire_path() {
2484 let mut engine = TransportEngine::new(make_config(false));
2485 let dest = [0x33; 16];
2486
2487 engine.path_table.insert(
2488 dest,
2489 PathSet::from_single(
2490 PathEntry {
2491 timestamp: 1000.0,
2492 next_hop: [0; 16],
2493 hops: 2,
2494 expires: 9999.0,
2495 random_blobs: Vec::new(),
2496 receiving_interface: InterfaceId(1),
2497 packet_hash: [0; 32],
2498 announce_raw: None,
2499 },
2500 1,
2501 ),
2502 );
2503
2504 assert!(engine.has_path(&dest));
2505 engine.expire_path(&dest);
2506 assert!(engine.has_path(&dest));
2508 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2509 }
2510
2511 #[test]
2512 fn test_link_table_operations() {
2513 let mut engine = TransportEngine::new(make_config(false));
2514 let link_id = [0x44; 16];
2515
2516 engine.register_link(
2517 link_id,
2518 LinkEntry {
2519 timestamp: 100.0,
2520 next_hop_transport_id: [0; 16],
2521 next_hop_interface: InterfaceId(1),
2522 remaining_hops: 3,
2523 received_interface: InterfaceId(2),
2524 taken_hops: 2,
2525 destination_hash: [0xAA; 16],
2526 validated: false,
2527 proof_timeout: 200.0,
2528 },
2529 );
2530
2531 assert!(engine.link_table.contains_key(&link_id));
2532 assert!(!engine.link_table[&link_id].validated);
2533
2534 engine.validate_link(&link_id);
2535 assert!(engine.link_table[&link_id].validated);
2536
2537 engine.remove_link(&link_id);
2538 assert!(!engine.link_table.contains_key(&link_id));
2539 }
2540
2541 #[test]
2542 fn test_lrproof_routes_from_originating_side_via_link_table() {
2543 let mut engine = TransportEngine::new(make_config(true));
2544 engine.register_interface(make_interface(1, constants::MODE_FULL));
2545 engine.register_interface(make_interface(2, constants::MODE_FULL));
2546
2547 let link_id = [0x44; 16];
2548 engine.register_link(
2549 link_id,
2550 LinkEntry {
2551 timestamp: 100.0,
2552 next_hop_transport_id: [0xAA; 16],
2553 next_hop_interface: InterfaceId(2),
2554 remaining_hops: 3,
2555 received_interface: InterfaceId(1),
2556 taken_hops: 1,
2557 destination_hash: [0xBB; 16],
2558 validated: false,
2559 proof_timeout: 200.0,
2560 },
2561 );
2562
2563 let flags = PacketFlags {
2564 header_type: constants::HEADER_1,
2565 context_flag: constants::FLAG_UNSET,
2566 transport_type: constants::TRANSPORT_BROADCAST,
2567 destination_type: constants::DESTINATION_LINK,
2568 packet_type: constants::PACKET_TYPE_PROOF,
2569 };
2570 let packet = RawPacket::pack(
2571 flags,
2572 0,
2573 &link_id,
2574 None,
2575 constants::CONTEXT_LRPROOF,
2576 &[0xCC; 64],
2577 )
2578 .unwrap();
2579 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2580
2581 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 101.0, &mut rng);
2582
2583 assert!(matches!(
2584 engine
2585 .link_table_ref()
2586 .get(&link_id)
2587 .map(|entry| entry.validated),
2588 Some(true)
2589 ));
2590 assert!(actions.iter().any(|action| matches!(
2591 action,
2592 TransportAction::LinkEstablished {
2593 link_id: established,
2594 interface: InterfaceId(2),
2595 } if *established == link_id
2596 )));
2597 assert!(actions.iter().any(|action| matches!(
2598 action,
2599 TransportAction::SendOnInterface {
2600 interface: InterfaceId(2),
2601 ..
2602 }
2603 )));
2604 }
2605
2606 #[test]
2607 fn test_packet_filter_drops_plain_announce() {
2608 let engine = TransportEngine::new(make_config(false));
2609 let flags = PacketFlags {
2610 header_type: constants::HEADER_1,
2611 context_flag: constants::FLAG_UNSET,
2612 transport_type: constants::TRANSPORT_BROADCAST,
2613 destination_type: constants::DESTINATION_PLAIN,
2614 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2615 };
2616 let packet =
2617 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2618 assert!(!engine.packet_filter(&packet));
2619 }
2620
2621 #[test]
2622 fn test_packet_filter_allows_keepalive() {
2623 let engine = TransportEngine::new(make_config(false));
2624 let flags = PacketFlags {
2625 header_type: constants::HEADER_1,
2626 context_flag: constants::FLAG_UNSET,
2627 transport_type: constants::TRANSPORT_BROADCAST,
2628 destination_type: constants::DESTINATION_SINGLE,
2629 packet_type: constants::PACKET_TYPE_DATA,
2630 };
2631 let packet = RawPacket::pack(
2632 flags,
2633 0,
2634 &[0; 16],
2635 None,
2636 constants::CONTEXT_KEEPALIVE,
2637 b"test",
2638 )
2639 .unwrap();
2640 assert!(engine.packet_filter(&packet));
2641 }
2642
2643 #[test]
2644 fn test_packet_filter_drops_high_hop_plain() {
2645 let engine = TransportEngine::new(make_config(false));
2646 let flags = PacketFlags {
2647 header_type: constants::HEADER_1,
2648 context_flag: constants::FLAG_UNSET,
2649 transport_type: constants::TRANSPORT_BROADCAST,
2650 destination_type: constants::DESTINATION_PLAIN,
2651 packet_type: constants::PACKET_TYPE_DATA,
2652 };
2653 let mut packet =
2654 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2655 packet.hops = 2;
2656 assert!(!engine.packet_filter(&packet));
2657 }
2658
2659 #[test]
2660 fn test_packet_filter_allows_duplicate_single_announce() {
2661 let mut engine = TransportEngine::new(make_config(false));
2662 let flags = PacketFlags {
2663 header_type: constants::HEADER_1,
2664 context_flag: constants::FLAG_UNSET,
2665 transport_type: constants::TRANSPORT_BROADCAST,
2666 destination_type: constants::DESTINATION_SINGLE,
2667 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2668 };
2669 let packet = RawPacket::pack(
2670 flags,
2671 0,
2672 &[0; 16],
2673 None,
2674 constants::CONTEXT_NONE,
2675 &[0xAA; 64],
2676 )
2677 .unwrap();
2678
2679 engine.packet_hashlist.add(packet.packet_hash);
2681
2682 assert!(engine.packet_filter(&packet));
2684 }
2685
2686 #[test]
2687 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2688 let mut engine = TransportEngine::new(make_config(false));
2689 engine.packet_hashlist = PacketHashlist::new(2);
2690
2691 let make_packet = |seed: u8| {
2692 let flags = PacketFlags {
2693 header_type: constants::HEADER_1,
2694 context_flag: constants::FLAG_UNSET,
2695 transport_type: constants::TRANSPORT_BROADCAST,
2696 destination_type: constants::DESTINATION_SINGLE,
2697 packet_type: constants::PACKET_TYPE_DATA,
2698 };
2699 RawPacket::pack(
2700 flags,
2701 0,
2702 &[seed; 16],
2703 None,
2704 constants::CONTEXT_NONE,
2705 &[seed; 4],
2706 )
2707 .unwrap()
2708 };
2709
2710 let packet1 = make_packet(1);
2711 let packet2 = make_packet(2);
2712 let packet3 = make_packet(3);
2713
2714 engine.packet_hashlist.add(packet1.packet_hash);
2715 engine.packet_hashlist.add(packet2.packet_hash);
2716 assert!(!engine.packet_filter(&packet1));
2717
2718 engine.packet_hashlist.add(packet3.packet_hash);
2719
2720 assert!(engine.packet_filter(&packet1));
2721 assert!(!engine.packet_filter(&packet2));
2722 assert!(!engine.packet_filter(&packet3));
2723 }
2724
2725 #[test]
2726 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2727 let mut engine = TransportEngine::new(make_config(false));
2728 engine.packet_hashlist = PacketHashlist::new(2);
2729
2730 let make_packet = |seed: u8| {
2731 let flags = PacketFlags {
2732 header_type: constants::HEADER_1,
2733 context_flag: constants::FLAG_UNSET,
2734 transport_type: constants::TRANSPORT_BROADCAST,
2735 destination_type: constants::DESTINATION_SINGLE,
2736 packet_type: constants::PACKET_TYPE_DATA,
2737 };
2738 RawPacket::pack(
2739 flags,
2740 0,
2741 &[seed; 16],
2742 None,
2743 constants::CONTEXT_NONE,
2744 &[seed; 4],
2745 )
2746 .unwrap()
2747 };
2748
2749 let packet1 = make_packet(1);
2750 let packet2 = make_packet(2);
2751 let packet3 = make_packet(3);
2752
2753 engine.packet_hashlist.add(packet1.packet_hash);
2754 engine.packet_hashlist.add(packet2.packet_hash);
2755 engine.packet_hashlist.add(packet2.packet_hash);
2756 engine.packet_hashlist.add(packet3.packet_hash);
2757
2758 assert!(engine.packet_filter(&packet1));
2759 assert!(!engine.packet_filter(&packet2));
2760 assert!(!engine.packet_filter(&packet3));
2761 }
2762
2763 #[test]
2764 fn test_tick_retransmits_announce() {
2765 let mut engine = TransportEngine::new(make_config(true));
2766 engine.register_interface(make_interface(1, constants::MODE_FULL));
2767
2768 let dest = [0x55; 16];
2769 engine.insert_announce_entry(
2770 dest,
2771 AnnounceEntry {
2772 timestamp: 190.0,
2773 retransmit_timeout: 100.0, retries: 0,
2775 received_from: [0xAA; 16],
2776 hops: 2,
2777 packet_raw: vec![0x01, 0x02],
2778 packet_data: vec![0xCC; 10],
2779 destination_hash: dest,
2780 context_flag: 0,
2781 local_rebroadcasts: 0,
2782 block_rebroadcasts: false,
2783 attached_interface: None,
2784 },
2785 190.0,
2786 );
2787
2788 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2789 let actions = engine.tick(200.0, &mut rng);
2790
2791 assert!(!actions.is_empty());
2794 assert!(matches!(
2795 &actions[0],
2796 TransportAction::SendOnInterface { .. }
2797 ));
2798
2799 assert_eq!(engine.announce_table[&dest].retries, 1);
2801 }
2802
2803 #[test]
2804 fn test_tick_culls_expired_announce_entries() {
2805 let mut config = make_config(true);
2806 config.announce_table_ttl_secs = 10.0;
2807 let mut engine = TransportEngine::new(config);
2808
2809 let dest1 = [0x61; 16];
2810 let dest2 = [0x62; 16];
2811 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2812 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2813
2814 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2815 let _ = engine.tick(111.0, &mut rng);
2816
2817 assert!(!engine.announce_table().contains_key(&dest1));
2818 assert!(!engine.held_announces().contains_key(&dest2));
2819 }
2820
2821 #[test]
2822 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2823 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2824 let mut config = make_config(true);
2825 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2826 * 2
2827 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2828 let max_bytes = config.announce_table_max_bytes;
2829 let mut engine = TransportEngine::new(config);
2830
2831 let held_dest = [0x71; 16];
2832 let active_dest = [0x72; 16];
2833 let newest_dest = [0x73; 16];
2834
2835 assert!(engine.insert_held_announce(
2836 held_dest,
2837 make_announce_entry(held_dest, 100.0, 32),
2838 100.0,
2839 ));
2840 assert!(engine.insert_announce_entry(
2841 active_dest,
2842 make_announce_entry(active_dest, 100.0, 32),
2843 100.0,
2844 ));
2845 assert!(engine.insert_announce_entry(
2846 newest_dest,
2847 make_announce_entry(newest_dest, 101.0, 32),
2848 101.0,
2849 ));
2850
2851 assert!(!engine.held_announces().contains_key(&held_dest));
2852 assert!(engine.announce_table().contains_key(&active_dest));
2853 assert!(engine.announce_table().contains_key(&newest_dest));
2854 assert!(engine.announce_retained_bytes() <= max_bytes);
2855 }
2856
2857 #[test]
2858 fn test_oversized_announce_entry_is_not_retained() {
2859 let mut config = make_config(true);
2860 config.announce_table_max_bytes = 200;
2861 let mut engine = TransportEngine::new(config);
2862 let dest = [0x81; 16];
2863
2864 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2865 assert!(!engine.announce_table().contains_key(&dest));
2866 assert_eq!(engine.announce_retained_bytes(), 0);
2867 }
2868
2869 #[test]
2870 fn test_blackhole_identity() {
2871 let mut engine = TransportEngine::new(make_config(false));
2872 let hash = [0xAA; 16];
2873 let now = 1000.0;
2874
2875 assert!(!engine.is_blackholed(&hash, now));
2876
2877 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2878 assert!(engine.is_blackholed(&hash, now));
2879 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
2882 assert!(!engine.is_blackholed(&hash, now));
2883 assert!(!engine.unblackhole_identity(&hash)); }
2885
2886 #[test]
2887 fn test_blackhole_with_duration() {
2888 let mut engine = TransportEngine::new(make_config(false));
2889 let hash = [0xBB; 16];
2890 let now = 1000.0;
2891
2892 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
2894 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
2897
2898 #[test]
2899 fn test_cull_blackholed() {
2900 let mut engine = TransportEngine::new(make_config(false));
2901 let hash1 = [0xCC; 16];
2902 let hash2 = [0xDD; 16];
2903 let now = 1000.0;
2904
2905 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
2911 assert!(engine.blackholed_identities.contains_key(&hash2));
2912 }
2913
2914 #[test]
2915 fn test_blackhole_blocks_announce() {
2916 use crate::announce::AnnounceData;
2917 use crate::destination::{destination_hash, name_hash};
2918
2919 let mut engine = TransportEngine::new(make_config(false));
2920 engine.register_interface(make_interface(1, constants::MODE_FULL));
2921
2922 let identity =
2923 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2924 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2925 let name_h = name_hash("test", &["app"]);
2926 let random_hash = [0x42u8; 10];
2927
2928 let (announce_data, _) =
2929 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2930
2931 let flags = PacketFlags {
2932 header_type: constants::HEADER_1,
2933 context_flag: constants::FLAG_UNSET,
2934 transport_type: constants::TRANSPORT_BROADCAST,
2935 destination_type: constants::DESTINATION_SINGLE,
2936 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2937 };
2938 let packet = RawPacket::pack(
2939 flags,
2940 0,
2941 &dest_hash,
2942 None,
2943 constants::CONTEXT_NONE,
2944 &announce_data,
2945 )
2946 .unwrap();
2947
2948 let now = 1000.0;
2950 engine.blackhole_identity(*identity.hash(), now, None, None);
2951
2952 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2953 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
2954
2955 assert!(actions
2957 .iter()
2958 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2959 assert!(actions
2960 .iter()
2961 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2962 }
2963
2964 #[test]
2965 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2966 use crate::announce::AnnounceData;
2967 use crate::destination::{destination_hash, name_hash};
2968 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2969
2970 let mut engine = TransportEngine::new(make_config(true));
2971 engine.register_interface(make_interface(1, constants::MODE_FULL));
2972
2973 let identity =
2974 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2975 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2976 let name_h = name_hash("async", &["announce"]);
2977 let random_hash = [0x44u8; 10];
2978 let (announce_data, _) =
2979 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2980
2981 let packet = RawPacket::pack(
2982 PacketFlags {
2983 header_type: constants::HEADER_2,
2984 context_flag: constants::FLAG_UNSET,
2985 transport_type: constants::TRANSPORT_TRANSPORT,
2986 destination_type: constants::DESTINATION_SINGLE,
2987 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2988 },
2989 3,
2990 &dest_hash,
2991 Some(&[0xBB; 16]),
2992 constants::CONTEXT_NONE,
2993 &announce_data,
2994 )
2995 .unwrap();
2996
2997 engine.announce_table.insert(
2998 dest_hash,
2999 AnnounceEntry {
3000 timestamp: 1000.0,
3001 retransmit_timeout: 2000.0,
3002 retries: constants::PATHFINDER_R,
3003 received_from: [0xBB; 16],
3004 hops: 2,
3005 packet_raw: packet.raw.clone(),
3006 packet_data: packet.data.clone(),
3007 destination_hash: dest_hash,
3008 context_flag: constants::FLAG_UNSET,
3009 local_rebroadcasts: 0,
3010 block_rebroadcasts: false,
3011 attached_interface: None,
3012 },
3013 );
3014
3015 let mut queue = AnnounceVerifyQueue::new(8);
3016 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3017 let actions = engine.handle_inbound_with_announce_queue(
3018 &packet.raw,
3019 InterfaceId(1),
3020 1000.0,
3021 &mut rng,
3022 Some(&mut queue),
3023 );
3024
3025 assert!(actions.is_empty());
3026 assert_eq!(queue.len(), 1);
3027 assert!(
3028 !engine.announce_table.contains_key(&dest_hash),
3029 "retransmit completion should clear announce_table before queueing"
3030 );
3031 }
3032
3033 #[test]
3034 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
3035 use crate::announce::AnnounceData;
3036 use crate::destination::{destination_hash, name_hash};
3037 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
3038
3039 let mut engine = TransportEngine::new(make_config(false));
3040 engine.register_interface(make_interface(1, constants::MODE_FULL));
3041
3042 let identity =
3043 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
3044 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
3045 let name_h = name_hash("async", &["cache"]);
3046 let random_hash = [0x55u8; 10];
3047 let (announce_data, _) =
3048 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3049
3050 let packet = RawPacket::pack(
3051 PacketFlags {
3052 header_type: constants::HEADER_1,
3053 context_flag: constants::FLAG_UNSET,
3054 transport_type: constants::TRANSPORT_BROADCAST,
3055 destination_type: constants::DESTINATION_SINGLE,
3056 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3057 },
3058 0,
3059 &dest_hash,
3060 None,
3061 constants::CONTEXT_NONE,
3062 &announce_data,
3063 )
3064 .unwrap();
3065
3066 let mut queue = AnnounceVerifyQueue::new(8);
3067 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
3068 let actions = engine.handle_inbound_with_announce_queue(
3069 &packet.raw,
3070 InterfaceId(1),
3071 1000.0,
3072 &mut rng,
3073 Some(&mut queue),
3074 );
3075 assert!(actions.is_empty());
3076 assert_eq!(queue.len(), 1);
3077
3078 let mut batch = queue.take_pending(1000.0);
3079 assert_eq!(batch.len(), 1);
3080 let (key, pending) = batch.pop().unwrap();
3081
3082 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
3083 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
3084 let mut material = [0u8; 80];
3085 material[..16].copy_from_slice(&pending.packet.destination_hash);
3086 material[16..].copy_from_slice(&announce.signature);
3087 let sig_cache_key = hash::full_hash(&material);
3088
3089 let pending = queue.complete_success(&key).unwrap();
3090 let actions =
3091 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
3092 assert!(actions
3093 .iter()
3094 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
3095 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
3096
3097 let actions = engine.handle_inbound_with_announce_queue(
3098 &packet.raw,
3099 InterfaceId(1),
3100 1001.0,
3101 &mut rng,
3102 Some(&mut queue),
3103 );
3104 assert!(actions.is_empty());
3105 assert_eq!(queue.len(), 0);
3106 }
3107
3108 #[test]
3109 fn test_tick_culls_expired_path() {
3110 let mut engine = TransportEngine::new(make_config(false));
3111 engine.register_interface(make_interface(1, constants::MODE_FULL));
3112
3113 let dest = [0x66; 16];
3114 engine.path_table.insert(
3115 dest,
3116 PathSet::from_single(
3117 PathEntry {
3118 timestamp: 100.0,
3119 next_hop: [0; 16],
3120 hops: 2,
3121 expires: 200.0,
3122 random_blobs: Vec::new(),
3123 receiving_interface: InterfaceId(1),
3124 packet_hash: [0; 32],
3125 announce_raw: None,
3126 },
3127 1,
3128 ),
3129 );
3130
3131 assert!(engine.has_path(&dest));
3132
3133 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3134 engine.tick(300.0, &mut rng);
3136
3137 assert!(!engine.has_path(&dest));
3138 }
3139
3140 fn make_local_client_interface(id: u64) -> InterfaceInfo {
3145 InterfaceInfo {
3146 id: InterfaceId(id),
3147 name: String::from("local_client"),
3148 mode: constants::MODE_FULL,
3149 out_capable: true,
3150 in_capable: true,
3151 bitrate: None,
3152 announce_rate_target: None,
3153 announce_rate_grace: 0,
3154 announce_rate_penalty: 0.0,
3155 announce_cap: constants::ANNOUNCE_CAP,
3156 is_local_client: true,
3157 wants_tunnel: false,
3158 tunnel_id: None,
3159 mtu: constants::MTU as u32,
3160 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3161 ia_freq: 0.0,
3162 started: 0.0,
3163 }
3164 }
3165
3166 #[test]
3167 fn test_has_local_clients() {
3168 let mut engine = TransportEngine::new(make_config(false));
3169 assert!(!engine.has_local_clients());
3170
3171 engine.register_interface(make_interface(1, constants::MODE_FULL));
3172 assert!(!engine.has_local_clients());
3173
3174 engine.register_interface(make_local_client_interface(2));
3175 assert!(engine.has_local_clients());
3176
3177 engine.deregister_interface(InterfaceId(2));
3178 assert!(!engine.has_local_clients());
3179 }
3180
3181 #[test]
3182 fn test_local_client_hop_decrement() {
3183 let mut engine = TransportEngine::new(make_config(false));
3186 engine.register_interface(make_local_client_interface(1));
3187 engine.register_interface(make_interface(2, constants::MODE_FULL));
3188
3189 let dest = [0xAA; 16];
3191 engine.register_destination(dest, constants::DESTINATION_PLAIN);
3192
3193 let flags = PacketFlags {
3194 header_type: constants::HEADER_1,
3195 context_flag: constants::FLAG_UNSET,
3196 transport_type: constants::TRANSPORT_BROADCAST,
3197 destination_type: constants::DESTINATION_PLAIN,
3198 packet_type: constants::PACKET_TYPE_DATA,
3199 };
3200 let packet =
3202 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3203
3204 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3205 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3206
3207 let deliver = actions
3210 .iter()
3211 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
3212 assert!(deliver.is_some(), "Should deliver locally");
3213 }
3214
3215 #[test]
3216 fn test_plain_broadcast_from_local_client() {
3217 let mut engine = TransportEngine::new(make_config(false));
3219 engine.register_interface(make_local_client_interface(1));
3220 engine.register_interface(make_interface(2, constants::MODE_FULL));
3221
3222 let dest = [0xBB; 16];
3223 let flags = PacketFlags {
3224 header_type: constants::HEADER_1,
3225 context_flag: constants::FLAG_UNSET,
3226 transport_type: constants::TRANSPORT_BROADCAST,
3227 destination_type: constants::DESTINATION_PLAIN,
3228 packet_type: constants::PACKET_TYPE_DATA,
3229 };
3230 let packet =
3231 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3232
3233 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3234 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3235
3236 let forward = actions.iter().find(|a| {
3238 matches!(
3239 a,
3240 TransportAction::ForwardPlainBroadcast {
3241 to_local: false,
3242 ..
3243 }
3244 )
3245 });
3246 assert!(forward.is_some(), "Should forward to external interfaces");
3247 }
3248
3249 #[test]
3250 fn test_plain_broadcast_from_external() {
3251 let mut engine = TransportEngine::new(make_config(false));
3253 engine.register_interface(make_local_client_interface(1));
3254 engine.register_interface(make_interface(2, constants::MODE_FULL));
3255
3256 let dest = [0xCC; 16];
3257 let flags = PacketFlags {
3258 header_type: constants::HEADER_1,
3259 context_flag: constants::FLAG_UNSET,
3260 transport_type: constants::TRANSPORT_BROADCAST,
3261 destination_type: constants::DESTINATION_PLAIN,
3262 packet_type: constants::PACKET_TYPE_DATA,
3263 };
3264 let packet =
3265 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3266
3267 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3268 let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
3269
3270 let forward = actions.iter().find(|a| {
3272 matches!(
3273 a,
3274 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3275 )
3276 });
3277 assert!(forward.is_some(), "Should forward to local clients");
3278 }
3279
3280 #[test]
3281 fn test_no_plain_broadcast_bridging_without_local_clients() {
3282 let mut engine = TransportEngine::new(make_config(false));
3284 engine.register_interface(make_interface(1, constants::MODE_FULL));
3285 engine.register_interface(make_interface(2, constants::MODE_FULL));
3286
3287 let dest = [0xDD; 16];
3288 let flags = PacketFlags {
3289 header_type: constants::HEADER_1,
3290 context_flag: constants::FLAG_UNSET,
3291 transport_type: constants::TRANSPORT_BROADCAST,
3292 destination_type: constants::DESTINATION_PLAIN,
3293 packet_type: constants::PACKET_TYPE_DATA,
3294 };
3295 let packet =
3296 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3297
3298 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3299 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3300
3301 let has_forward = actions
3303 .iter()
3304 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3305 assert!(!has_forward, "No bridging without local clients");
3306 }
3307
3308 #[test]
3309 fn test_announce_forwarded_to_local_clients() {
3310 use crate::announce::AnnounceData;
3311 use crate::destination::{destination_hash, name_hash};
3312
3313 let mut engine = TransportEngine::new(make_config(false));
3314 engine.register_interface(make_interface(1, constants::MODE_FULL));
3315 engine.register_interface(make_local_client_interface(2));
3316
3317 let identity =
3318 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3319 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3320 let name_h = name_hash("test", &["fwd"]);
3321 let random_hash = [0x42u8; 10];
3322
3323 let (announce_data, _) =
3324 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3325
3326 let flags = PacketFlags {
3327 header_type: constants::HEADER_1,
3328 context_flag: constants::FLAG_UNSET,
3329 transport_type: constants::TRANSPORT_BROADCAST,
3330 destination_type: constants::DESTINATION_SINGLE,
3331 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3332 };
3333 let packet = RawPacket::pack(
3334 flags,
3335 0,
3336 &dest_hash,
3337 None,
3338 constants::CONTEXT_NONE,
3339 &announce_data,
3340 )
3341 .unwrap();
3342
3343 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3344 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3345
3346 let forward = actions
3348 .iter()
3349 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3350 assert!(
3351 forward.is_some(),
3352 "Should forward announce to local clients"
3353 );
3354
3355 match forward.unwrap() {
3357 TransportAction::ForwardToLocalClients { exclude, .. } => {
3358 assert_eq!(*exclude, Some(InterfaceId(1)));
3359 }
3360 _ => unreachable!(),
3361 }
3362 }
3363
3364 #[test]
3365 fn test_no_announce_forward_without_local_clients() {
3366 use crate::announce::AnnounceData;
3367 use crate::destination::{destination_hash, name_hash};
3368
3369 let mut engine = TransportEngine::new(make_config(false));
3370 engine.register_interface(make_interface(1, constants::MODE_FULL));
3371
3372 let identity =
3373 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3374 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3375 let name_h = name_hash("test", &["nofwd"]);
3376 let random_hash = [0x42u8; 10];
3377
3378 let (announce_data, _) =
3379 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3380
3381 let flags = PacketFlags {
3382 header_type: constants::HEADER_1,
3383 context_flag: constants::FLAG_UNSET,
3384 transport_type: constants::TRANSPORT_BROADCAST,
3385 destination_type: constants::DESTINATION_SINGLE,
3386 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3387 };
3388 let packet = RawPacket::pack(
3389 flags,
3390 0,
3391 &dest_hash,
3392 None,
3393 constants::CONTEXT_NONE,
3394 &announce_data,
3395 )
3396 .unwrap();
3397
3398 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3399 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3400
3401 let has_forward = actions
3403 .iter()
3404 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3405 assert!(!has_forward, "No forward without local clients");
3406 }
3407
3408 #[test]
3409 fn test_local_client_exclude_from_forward() {
3410 use crate::announce::AnnounceData;
3411 use crate::destination::{destination_hash, name_hash};
3412
3413 let mut engine = TransportEngine::new(make_config(false));
3414 engine.register_interface(make_local_client_interface(1));
3415 engine.register_interface(make_local_client_interface(2));
3416
3417 let identity =
3418 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3419 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3420 let name_h = name_hash("test", &["excl"]);
3421 let random_hash = [0x42u8; 10];
3422
3423 let (announce_data, _) =
3424 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3425
3426 let flags = PacketFlags {
3427 header_type: constants::HEADER_1,
3428 context_flag: constants::FLAG_UNSET,
3429 transport_type: constants::TRANSPORT_BROADCAST,
3430 destination_type: constants::DESTINATION_SINGLE,
3431 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3432 };
3433 let packet = RawPacket::pack(
3434 flags,
3435 0,
3436 &dest_hash,
3437 None,
3438 constants::CONTEXT_NONE,
3439 &announce_data,
3440 )
3441 .unwrap();
3442
3443 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3444 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3446
3447 let forward = actions
3449 .iter()
3450 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3451 assert!(forward.is_some());
3452 match forward.unwrap() {
3453 TransportAction::ForwardToLocalClients { exclude, .. } => {
3454 assert_eq!(*exclude, Some(InterfaceId(1)));
3455 }
3456 _ => unreachable!(),
3457 }
3458 }
3459
3460 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3465 InterfaceInfo {
3466 id: InterfaceId(id),
3467 name: String::from("tunnel_iface"),
3468 mode: constants::MODE_FULL,
3469 out_capable: true,
3470 in_capable: true,
3471 bitrate: None,
3472 announce_rate_target: None,
3473 announce_rate_grace: 0,
3474 announce_rate_penalty: 0.0,
3475 announce_cap: constants::ANNOUNCE_CAP,
3476 is_local_client: false,
3477 wants_tunnel: true,
3478 tunnel_id: None,
3479 mtu: constants::MTU as u32,
3480 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3481 ia_freq: 0.0,
3482 started: 0.0,
3483 }
3484 }
3485
3486 #[test]
3487 fn test_handle_tunnel_new() {
3488 let mut engine = TransportEngine::new(make_config(true));
3489 engine.register_interface(make_tunnel_interface(1));
3490
3491 let tunnel_id = [0xAA; 32];
3492 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3493
3494 assert!(actions
3496 .iter()
3497 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3498
3499 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3501 assert_eq!(info.tunnel_id, Some(tunnel_id));
3502
3503 assert_eq!(engine.tunnel_table().len(), 1);
3505 }
3506
3507 #[test]
3508 fn test_announce_stores_tunnel_path() {
3509 use crate::announce::AnnounceData;
3510 use crate::destination::{destination_hash, name_hash};
3511
3512 let mut engine = TransportEngine::new(make_config(false));
3513 let mut iface = make_tunnel_interface(1);
3514 let tunnel_id = [0xBB; 32];
3515 iface.tunnel_id = Some(tunnel_id);
3516 engine.register_interface(iface);
3517
3518 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3520
3521 let identity =
3523 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3524 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3525 let name_h = name_hash("test", &["tunnel"]);
3526 let random_hash = [0x42u8; 10];
3527
3528 let (announce_data, _) =
3529 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3530
3531 let flags = PacketFlags {
3532 header_type: constants::HEADER_1,
3533 context_flag: constants::FLAG_UNSET,
3534 transport_type: constants::TRANSPORT_BROADCAST,
3535 destination_type: constants::DESTINATION_SINGLE,
3536 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3537 };
3538 let packet = RawPacket::pack(
3539 flags,
3540 0,
3541 &dest_hash,
3542 None,
3543 constants::CONTEXT_NONE,
3544 &announce_data,
3545 )
3546 .unwrap();
3547
3548 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3549 engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3550
3551 assert!(engine.has_path(&dest_hash));
3553
3554 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3556 assert_eq!(tunnel.paths.len(), 1);
3557 assert!(tunnel.paths.contains_key(&dest_hash));
3558 }
3559
3560 #[test]
3561 fn test_tunnel_reattach_restores_paths() {
3562 let mut engine = TransportEngine::new(make_config(true));
3563 engine.register_interface(make_tunnel_interface(1));
3564
3565 let tunnel_id = [0xCC; 32];
3566 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3567
3568 let dest = [0xDD; 16];
3570 engine.tunnel_table.store_tunnel_path(
3571 &tunnel_id,
3572 dest,
3573 tunnel::TunnelPath {
3574 timestamp: 1000.0,
3575 received_from: [0xEE; 16],
3576 hops: 3,
3577 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3578 random_blobs: Vec::new(),
3579 packet_hash: [0xFF; 32],
3580 },
3581 1000.0,
3582 constants::DESTINATION_TIMEOUT,
3583 usize::MAX,
3584 );
3585
3586 engine.void_tunnel_interface(&tunnel_id);
3588
3589 engine.path_table.remove(&dest);
3591 assert!(!engine.has_path(&dest));
3592
3593 engine.register_interface(make_interface(2, constants::MODE_FULL));
3595 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3596
3597 assert!(engine.has_path(&dest));
3599 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3600 assert_eq!(path.hops, 3);
3601 assert_eq!(path.receiving_interface, InterfaceId(2));
3602
3603 assert!(actions
3605 .iter()
3606 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3607 }
3608
3609 #[test]
3610 fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3611 let mut engine = TransportEngine::new(make_config(true));
3612 engine.register_interface(make_tunnel_interface(1));
3613
3614 let tunnel_id = [0xCD; 32];
3615 let dest = [0xDE; 16];
3616 let older_blob = make_random_blob(100);
3617 let newer_blob = make_random_blob(200);
3618
3619 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3620 engine.tunnel_table.store_tunnel_path(
3621 &tunnel_id,
3622 dest,
3623 tunnel::TunnelPath {
3624 timestamp: 1000.0,
3625 received_from: [0xEE; 16],
3626 hops: 2,
3627 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3628 random_blobs: vec![older_blob],
3629 packet_hash: [0x11; 32],
3630 },
3631 1000.0,
3632 constants::DESTINATION_TIMEOUT,
3633 usize::MAX,
3634 );
3635 engine.void_tunnel_interface(&tunnel_id);
3636
3637 engine.path_table.insert(
3638 dest,
3639 PathSet::from_single(
3640 PathEntry {
3641 timestamp: 1500.0,
3642 next_hop: [0xAB; 16],
3643 hops: 3,
3644 expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3645 random_blobs: vec![newer_blob],
3646 receiving_interface: InterfaceId(3),
3647 packet_hash: [0x22; 32],
3648 announce_raw: None,
3649 },
3650 1,
3651 ),
3652 );
3653
3654 engine.register_interface(make_interface(2, constants::MODE_FULL));
3655 engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3656
3657 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3658 assert_eq!(path.next_hop, [0xAB; 16]);
3659 assert_eq!(path.hops, 3);
3660 assert_eq!(path.receiving_interface, InterfaceId(3));
3661 assert_eq!(path.random_blobs, vec![newer_blob]);
3662 }
3663
3664 #[test]
3665 fn test_void_tunnel_interface() {
3666 let mut engine = TransportEngine::new(make_config(true));
3667 engine.register_interface(make_tunnel_interface(1));
3668
3669 let tunnel_id = [0xDD; 32];
3670 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3671
3672 assert_eq!(
3674 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3675 Some(InterfaceId(1))
3676 );
3677
3678 engine.void_tunnel_interface(&tunnel_id);
3679
3680 assert_eq!(engine.tunnel_table().len(), 1);
3682 assert_eq!(
3683 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3684 None
3685 );
3686 }
3687
3688 #[test]
3689 fn test_tick_culls_tunnels() {
3690 let mut engine = TransportEngine::new(make_config(true));
3691 engine.register_interface(make_tunnel_interface(1));
3692
3693 let tunnel_id = [0xEE; 32];
3694 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3695 assert_eq!(engine.tunnel_table().len(), 1);
3696
3697 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3698
3699 engine.tick(
3701 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3702 &mut rng,
3703 );
3704
3705 assert_eq!(engine.tunnel_table().len(), 0);
3706 }
3707
3708 #[test]
3709 fn test_synthesize_tunnel() {
3710 let mut engine = TransportEngine::new(make_config(true));
3711 engine.register_interface(make_tunnel_interface(1));
3712
3713 let identity =
3714 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3715 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3716
3717 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3718
3719 assert_eq!(actions.len(), 1);
3721 match &actions[0] {
3722 TransportAction::TunnelSynthesize {
3723 interface,
3724 data,
3725 dest_hash,
3726 } => {
3727 assert_eq!(*interface, InterfaceId(1));
3728 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3729 let expected_dest = crate::destination::destination_hash(
3731 "rnstransport",
3732 &["tunnel", "synthesize"],
3733 None,
3734 );
3735 assert_eq!(*dest_hash, expected_dest);
3736 }
3737 _ => panic!("Expected TunnelSynthesize"),
3738 }
3739 }
3740
3741 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3746 let mut data = Vec::new();
3747 data.extend_from_slice(dest_hash);
3748 data.extend_from_slice(tag);
3749 data
3750 }
3751
3752 #[test]
3753 fn test_path_request_forwarded_on_ap() {
3754 let mut engine = TransportEngine::new(make_config(true));
3755 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3756 engine.register_interface(make_interface(2, constants::MODE_FULL));
3757
3758 let dest = [0xD1; 16];
3759 let tag = [0x01; 16];
3760 let data = make_path_request_data(&dest, &tag);
3761
3762 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3763
3764 assert_eq!(actions.len(), 1);
3766 match &actions[0] {
3767 TransportAction::SendOnInterface { interface, .. } => {
3768 assert_eq!(*interface, InterfaceId(2));
3769 }
3770 _ => panic!("Expected SendOnInterface for forwarded path request"),
3771 }
3772 assert!(engine.discovery_path_requests.contains_key(&dest));
3774 }
3775
3776 #[test]
3777 fn test_path_request_not_forwarded_on_full() {
3778 let mut engine = TransportEngine::new(make_config(true));
3779 engine.register_interface(make_interface(1, constants::MODE_FULL));
3780 engine.register_interface(make_interface(2, constants::MODE_FULL));
3781
3782 let dest = [0xD2; 16];
3783 let tag = [0x02; 16];
3784 let data = make_path_request_data(&dest, &tag);
3785
3786 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3787
3788 assert!(actions.is_empty());
3790 assert!(!engine.discovery_path_requests.contains_key(&dest));
3791 }
3792
3793 #[test]
3794 fn test_discovery_pr_tags_fifo_eviction() {
3795 let mut config = make_config(true);
3796 config.max_discovery_pr_tags = 2;
3797 let mut engine = TransportEngine::new(config);
3798
3799 let dest1 = [0xA1; 16];
3800 let dest2 = [0xA2; 16];
3801 let dest3 = [0xA3; 16];
3802 let tag1 = [0x01; 16];
3803 let tag2 = [0x02; 16];
3804 let tag3 = [0x03; 16];
3805
3806 engine.handle_path_request(
3807 &make_path_request_data(&dest1, &tag1),
3808 InterfaceId(1),
3809 1000.0,
3810 );
3811 engine.handle_path_request(
3812 &make_path_request_data(&dest2, &tag2),
3813 InterfaceId(1),
3814 1001.0,
3815 );
3816 assert_eq!(engine.discovery_pr_tags_count(), 2);
3817
3818 let unique1 = make_unique_tag(dest1, &tag1);
3819 let unique2 = make_unique_tag(dest2, &tag2);
3820 assert!(engine.discovery_pr_tags.contains(&unique1));
3821 assert!(engine.discovery_pr_tags.contains(&unique2));
3822
3823 engine.handle_path_request(
3824 &make_path_request_data(&dest3, &tag3),
3825 InterfaceId(1),
3826 1002.0,
3827 );
3828 assert_eq!(engine.discovery_pr_tags_count(), 2);
3829 assert!(!engine.discovery_pr_tags.contains(&unique1));
3830 assert!(engine.discovery_pr_tags.contains(&unique2));
3831
3832 engine.handle_path_request(
3833 &make_path_request_data(&dest1, &tag1),
3834 InterfaceId(1),
3835 1003.0,
3836 );
3837 assert_eq!(engine.discovery_pr_tags_count(), 2);
3838 assert!(engine.discovery_pr_tags.contains(&unique1));
3839 }
3840
3841 #[test]
3842 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3843 let mut config = make_config(false);
3844 config.max_path_destinations = 2;
3845 let mut engine = TransportEngine::new(config);
3846 engine.register_interface(make_interface(1, constants::MODE_FULL));
3847
3848 let dest1 = [0xB1; 16];
3849 let dest2 = [0xB2; 16];
3850 let dest3 = [0xB3; 16];
3851
3852 engine.upsert_path_destination(
3853 dest1,
3854 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3855 1000.0,
3856 );
3857 engine.upsert_path_destination(
3858 dest2,
3859 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3860 1001.0,
3861 );
3862 engine
3863 .path_states
3864 .insert(dest1, constants::STATE_UNRESPONSIVE);
3865
3866 engine.upsert_path_destination(
3867 dest3,
3868 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3869 1002.0,
3870 );
3871
3872 assert_eq!(engine.path_table_count(), 2);
3873 assert!(!engine.has_path(&dest1));
3874 assert!(engine.has_path(&dest2));
3875 assert!(engine.has_path(&dest3));
3876 assert!(!engine.path_states.contains_key(&dest1));
3877 assert_eq!(engine.path_destination_cap_evict_count(), 1);
3878 }
3879
3880 #[test]
3881 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
3882 let mut config = make_config(false);
3883 config.max_path_destinations = 2;
3884 config.max_paths_per_destination = 2;
3885 let mut engine = TransportEngine::new(config);
3886 engine.register_interface(make_interface(1, constants::MODE_FULL));
3887
3888 let dest1 = [0xC1; 16];
3889 let dest2 = [0xC2; 16];
3890
3891 engine.upsert_path_destination(
3892 dest1,
3893 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
3894 1000.0,
3895 );
3896 engine.upsert_path_destination(
3897 dest2,
3898 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
3899 1001.0,
3900 );
3901
3902 engine.upsert_path_destination(
3903 dest2,
3904 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
3905 1002.0,
3906 );
3907
3908 assert_eq!(engine.path_table_count(), 2);
3909 assert!(engine.has_path(&dest1));
3910 assert!(engine.has_path(&dest2));
3911 }
3912
3913 #[test]
3914 fn test_roaming_loop_prevention() {
3915 let mut engine = TransportEngine::new(make_config(true));
3916 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
3917
3918 let dest = [0xD3; 16];
3919 engine.path_table.insert(
3921 dest,
3922 PathSet::from_single(
3923 PathEntry {
3924 timestamp: 900.0,
3925 next_hop: [0xAA; 16],
3926 hops: 2,
3927 expires: 9999.0,
3928 random_blobs: Vec::new(),
3929 receiving_interface: InterfaceId(1),
3930 packet_hash: [0; 32],
3931 announce_raw: None,
3932 },
3933 1,
3934 ),
3935 );
3936
3937 let tag = [0x03; 16];
3938 let data = make_path_request_data(&dest, &tag);
3939
3940 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3941
3942 assert!(actions.is_empty());
3944 assert!(!engine.announce_table.contains_key(&dest));
3945 }
3946
3947 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
3949 let flags: u8 = 0x01; let mut raw = Vec::new();
3953 raw.push(flags);
3954 raw.push(0x02); raw.extend_from_slice(dest_hash);
3956 raw.push(constants::CONTEXT_NONE);
3957 raw.extend_from_slice(payload);
3958 raw
3959 }
3960
3961 #[test]
3962 fn test_path_request_populates_announce_entry_from_raw() {
3963 let mut engine = TransportEngine::new(make_config(true));
3964 engine.register_interface(make_interface(1, constants::MODE_FULL));
3965 engine.register_interface(make_interface(2, constants::MODE_FULL));
3966
3967 let dest = [0xD5; 16];
3968 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
3970
3971 engine.path_table.insert(
3972 dest,
3973 PathSet::from_single(
3974 PathEntry {
3975 timestamp: 900.0,
3976 next_hop: [0xBB; 16],
3977 hops: 2,
3978 expires: 9999.0,
3979 random_blobs: Vec::new(),
3980 receiving_interface: InterfaceId(2),
3981 packet_hash: [0; 32],
3982 announce_raw: Some(announce_raw.clone()),
3983 },
3984 1,
3985 ),
3986 );
3987
3988 let tag = [0x05; 16];
3989 let data = make_path_request_data(&dest, &tag);
3990 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3991
3992 let entry = engine
3994 .announce_table
3995 .get(&dest)
3996 .expect("announce entry must exist");
3997 assert_eq!(entry.packet_raw, announce_raw);
3998 assert_eq!(entry.packet_data, payload);
3999 assert!(entry.block_rebroadcasts);
4000 }
4001
4002 #[test]
4003 fn test_path_request_skips_when_no_announce_raw() {
4004 let mut engine = TransportEngine::new(make_config(true));
4005 engine.register_interface(make_interface(1, constants::MODE_FULL));
4006 engine.register_interface(make_interface(2, constants::MODE_FULL));
4007
4008 let dest = [0xD6; 16];
4009
4010 engine.path_table.insert(
4011 dest,
4012 PathSet::from_single(
4013 PathEntry {
4014 timestamp: 900.0,
4015 next_hop: [0xCC; 16],
4016 hops: 1,
4017 expires: 9999.0,
4018 random_blobs: Vec::new(),
4019 receiving_interface: InterfaceId(2),
4020 packet_hash: [0; 32],
4021 announce_raw: None, },
4023 1,
4024 ),
4025 );
4026
4027 let tag = [0x06; 16];
4028 let data = make_path_request_data(&dest, &tag);
4029 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4030
4031 assert!(actions.is_empty());
4033 assert!(!engine.announce_table.contains_key(&dest));
4034 }
4035
4036 #[test]
4037 fn test_discovery_request_consumed_on_announce() {
4038 let mut engine = TransportEngine::new(make_config(true));
4039 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4040
4041 let dest = [0xD4; 16];
4042
4043 engine.discovery_path_requests.insert(
4045 dest,
4046 DiscoveryPathRequest {
4047 timestamp: 900.0,
4048 requesting_interface: InterfaceId(1),
4049 },
4050 );
4051
4052 let iface = engine.discovery_path_requests_waiting(&dest);
4054 assert_eq!(iface, Some(InterfaceId(1)));
4055
4056 assert!(!engine.discovery_path_requests.contains_key(&dest));
4058 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
4059 }
4060
4061 #[test]
4062 fn test_pending_path_request_announce_bypasses_ingress_control() {
4063 let mut engine = TransportEngine::new(make_config(true));
4064 let mut inbound = make_interface(1, constants::MODE_FULL);
4065 inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
4066 inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
4067 inbound.started = 0.0;
4068 engine.register_interface(inbound);
4069 engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
4070
4071 let identity =
4072 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4073 let dest_hash = crate::destination::destination_hash(
4074 "ingress",
4075 &["path-request"],
4076 Some(identity.hash()),
4077 );
4078 let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
4079 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4080
4081 engine.discovery_path_requests.insert(
4082 dest_hash,
4083 DiscoveryPathRequest {
4084 timestamp: 999.0,
4085 requesting_interface: InterfaceId(2),
4086 },
4087 );
4088
4089 let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
4090 let actions = engine.handle_inbound(&announce_raw, InterfaceId(1), 1000.0, &mut rng);
4091
4092 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
4093 assert!(engine.has_path(&dest_hash));
4094 assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
4095 assert!(actions.iter().any(|a| {
4096 matches!(
4097 a,
4098 TransportAction::AnnounceReceived {
4099 destination_hash,
4100 receiving_interface: InterfaceId(1),
4101 ..
4102 } if *destination_hash == dest_hash
4103 )
4104 }));
4105
4106 let entry = engine
4107 .announce_table
4108 .get(&dest_hash)
4109 .expect("path response announce should be queued");
4110 assert!(entry.block_rebroadcasts);
4111 assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
4112 }
4113
4114 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
4120 let identity =
4121 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4122 let random_hash = [0x42u8; 10];
4123 let (announce_data, _) = crate::announce::AnnounceData::pack(
4124 &identity,
4125 dest_hash,
4126 name_hash,
4127 &random_hash,
4128 None,
4129 None,
4130 )
4131 .unwrap();
4132 let flags = PacketFlags {
4133 header_type: constants::HEADER_1,
4134 context_flag: constants::FLAG_UNSET,
4135 transport_type: constants::TRANSPORT_BROADCAST,
4136 destination_type: constants::DESTINATION_SINGLE,
4137 packet_type: constants::PACKET_TYPE_ANNOUNCE,
4138 };
4139 RawPacket::pack(
4140 flags,
4141 0,
4142 dest_hash,
4143 None,
4144 constants::CONTEXT_NONE,
4145 &announce_data,
4146 )
4147 .unwrap()
4148 .raw
4149 }
4150
4151 #[test]
4152 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
4153 let mut engine = TransportEngine::new(make_config(false));
4158 engine.register_interface(make_local_client_interface(1));
4159
4160 let identity =
4161 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4162 let dest_hash =
4163 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
4164 let name_hash = crate::destination::name_hash("issue4", &["test"]);
4165 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4166
4167 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
4171 announce_packet.raw[1] = 1;
4172 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4173 engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
4174 assert!(engine.has_path(&dest_hash));
4175 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4176
4177 let data_flags = PacketFlags {
4179 header_type: constants::HEADER_1,
4180 context_flag: constants::FLAG_UNSET,
4181 transport_type: constants::TRANSPORT_BROADCAST,
4182 destination_type: constants::DESTINATION_SINGLE,
4183 packet_type: constants::PACKET_TYPE_DATA,
4184 };
4185 let data_packet = RawPacket::pack(
4186 data_flags,
4187 0,
4188 &dest_hash,
4189 None,
4190 constants::CONTEXT_NONE,
4191 b"hello",
4192 )
4193 .unwrap();
4194
4195 let actions =
4196 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
4197
4198 let send = actions.iter().find_map(|a| match a {
4199 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4200 _ => None,
4201 });
4202 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4203 assert_eq!(*interface, InterfaceId(1));
4204 let flags = PacketFlags::unpack(raw[0]);
4205 assert_eq!(flags.header_type, constants::HEADER_2);
4206 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4207 }
4208
4209 #[test]
4210 fn test_issue4_external_data_to_1hop_via_transport_works() {
4211 let daemon_id = [0x42; 16];
4217 let mut engine = TransportEngine::new(TransportConfig {
4218 transport_enabled: true,
4219 identity_hash: Some(daemon_id),
4220 prefer_shorter_path: false,
4221 max_paths_per_destination: 1,
4222 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4223 max_discovery_pr_tags: constants::MAX_PR_TAGS,
4224 max_path_destinations: usize::MAX,
4225 max_tunnel_destinations_total: usize::MAX,
4226 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4227 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4228 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4229 announce_sig_cache_enabled: true,
4230 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4231 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4232 announce_queue_max_entries: 256,
4233 announce_queue_max_interfaces: 1024,
4234 });
4235 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
4239 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4240 let dest_hash =
4241 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4242 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4243 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4244
4245 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4247 engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
4248 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4249
4250 let h2_flags = PacketFlags {
4253 header_type: constants::HEADER_2,
4254 context_flag: constants::FLAG_UNSET,
4255 transport_type: constants::TRANSPORT_TRANSPORT,
4256 destination_type: constants::DESTINATION_SINGLE,
4257 packet_type: constants::PACKET_TYPE_DATA,
4258 };
4259 let mut h2_raw = Vec::new();
4261 h2_raw.push(h2_flags.pack());
4262 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
4265 h2_raw.push(constants::CONTEXT_NONE);
4266 h2_raw.extend_from_slice(b"hello via transport");
4267
4268 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4269 let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
4270
4271 let has_send = actions.iter().any(|a| {
4273 matches!(
4274 a,
4275 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4276 )
4277 });
4278 assert!(
4279 has_send,
4280 "HEADER_2 transport packet should be forwarded (control test)"
4281 );
4282 }
4283}