1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod pathfinder;
10pub mod rate_limit;
11pub mod tables;
12pub mod tunnel;
13pub mod types;
14
15use alloc::collections::BTreeMap;
16use alloc::string::String;
17use alloc::vec::Vec;
18use core::mem::size_of;
19
20use rns_crypto::Rng;
21
22use crate::announce::AnnounceData;
23use crate::constants;
24use crate::hash;
25use crate::packet::RawPacket;
26
27use self::announce_proc::compute_path_expires;
28use self::announce_queue::AnnounceQueues;
29use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
30use self::dedup::{AnnounceSignatureCache, PacketHashlist};
31use self::inbound::{
32 create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
33 route_via_link_table,
34};
35use self::ingress_control::IngressControl;
36use self::outbound::{route_outbound, should_transmit_announce};
37use self::pathfinder::{
38 decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
39 timebase_from_random_blobs, MultiPathDecision,
40};
41use self::rate_limit::AnnounceRateLimiter;
42use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
43use self::tunnel::TunnelTable;
44use self::types::{BlackholeEntry, InterfaceId, InterfaceInfo, TransportAction, TransportConfig};
45
46pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
47pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
48
49struct InboundPacketCtx {
50 packet: RawPacket,
51 original_raw: Vec<u8>,
52 iface: InterfaceId,
53 now: f64,
54 from_local_client: bool,
55}
56
57struct VerifiedAnnounceCtx<'a> {
58 packet: &'a RawPacket,
59 original_raw: &'a [u8],
60 iface: InterfaceId,
61 now: f64,
62 validated: crate::announce::ValidatedAnnounce,
63 received_from: [u8; 16],
64 random_blob: [u8; 10],
65 announce_emitted: u64,
66}
67
68struct TickCtx<'a> {
69 now: f64,
70 rng: &'a mut dyn Rng,
71 actions: Vec<TransportAction>,
72}
73
74struct PathRequestCtx<'a> {
75 data: &'a [u8],
76 interface_id: InterfaceId,
77 now: f64,
78 destination_hash: [u8; 16],
79}
80
81pub struct TransportEngine {
86 config: TransportConfig,
87 path_table: BTreeMap<[u8; 16], PathSet>,
88 announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
89 reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
90 link_table: BTreeMap<[u8; 16], LinkEntry>,
91 held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
92 packet_hashlist: PacketHashlist,
93 announce_sig_cache: AnnounceSignatureCache,
94 rate_limiter: AnnounceRateLimiter,
95 path_states: BTreeMap<[u8; 16], u8>,
96 interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
97 local_destinations: BTreeMap<[u8; 16], u8>,
98 blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
99 announce_queues: AnnounceQueues,
100 ingress_control: IngressControl,
101 tunnel_table: TunnelTable,
102 discovery_pr_tags: Vec<[u8; 32]>,
103 discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
104 path_destination_cap_evict_count: usize,
105 announces_last_checked: f64,
107 tables_last_culled: f64,
108}
109
110impl TransportEngine {
111 pub fn new(config: TransportConfig) -> Self {
112 let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
113 let sig_cache_max = if config.announce_sig_cache_enabled {
114 config.announce_sig_cache_max_entries
115 } else {
116 0
117 };
118 let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
119 let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
120 TransportEngine {
121 config,
122 path_table: BTreeMap::new(),
123 announce_table: BTreeMap::new(),
124 reverse_table: BTreeMap::new(),
125 link_table: BTreeMap::new(),
126 held_announces: BTreeMap::new(),
127 packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
128 announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
129 rate_limiter: AnnounceRateLimiter::new(),
130 path_states: BTreeMap::new(),
131 interfaces: BTreeMap::new(),
132 local_destinations: BTreeMap::new(),
133 blackholed_identities: BTreeMap::new(),
134 announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
135 ingress_control: IngressControl::new(),
136 tunnel_table: TunnelTable::new(),
137 discovery_pr_tags: Vec::new(),
138 discovery_path_requests: BTreeMap::new(),
139 path_destination_cap_evict_count: 0,
140 announces_last_checked: 0.0,
141 tables_last_culled: 0.0,
142 }
143 }
144
145 fn insert_discovery_pr_tag(&mut self, unique_tag: [u8; 32]) -> bool {
146 if self.discovery_pr_tags.contains(&unique_tag) {
147 return false;
148 }
149 if self.config.max_discovery_pr_tags != usize::MAX
150 && self.discovery_pr_tags.len() >= self.config.max_discovery_pr_tags
151 && !self.discovery_pr_tags.is_empty()
152 {
153 self.discovery_pr_tags.remove(0);
154 }
155 self.discovery_pr_tags.push(unique_tag);
156 true
157 }
158
159 fn upsert_path_destination(&mut self, dest_hash: [u8; 16], entry: PathEntry, now: f64) {
160 let max_paths = self.config.max_paths_per_destination;
161 if let Some(ps) = self.path_table.get_mut(&dest_hash) {
162 ps.upsert(entry);
163 return;
164 }
165 self.enforce_path_destination_cap(now);
166 self.path_table
167 .insert(dest_hash, PathSet::from_single(entry, max_paths));
168 }
169
170 fn enforce_path_destination_cap(&mut self, now: f64) {
171 if self.config.max_path_destinations == usize::MAX {
172 return;
173 }
174 jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
175 while self.path_table.len() >= self.config.max_path_destinations {
176 let Some(dest_hash) = self.oldest_path_destination() else {
177 break;
178 };
179 self.path_table.remove(&dest_hash);
180 self.path_states.remove(&dest_hash);
181 self.path_destination_cap_evict_count += 1;
182 }
183 }
184
185 fn oldest_path_destination(&self) -> Option<[u8; 16]> {
186 self.path_table
187 .iter()
188 .filter_map(|(dest_hash, path_set)| {
189 path_set
190 .primary()
191 .map(|primary| (*dest_hash, primary.timestamp, primary.hops))
192 })
193 .min_by(|a, b| {
194 a.1.partial_cmp(&b.1)
195 .unwrap_or(core::cmp::Ordering::Equal)
196 .then_with(|| b.2.cmp(&a.2))
197 })
198 .map(|(dest_hash, _, _)| dest_hash)
199 }
200
201 fn announce_entry_size_bytes(entry: &AnnounceEntry) -> usize {
202 size_of::<AnnounceEntry>() + entry.packet_raw.capacity() + entry.packet_data.capacity()
203 }
204
205 fn announce_retained_bytes_total(&self) -> usize {
206 self.announce_table
207 .values()
208 .chain(self.held_announces.values())
209 .map(Self::announce_entry_size_bytes)
210 .sum()
211 }
212
213 fn cull_expired_announce_entries(&mut self, now: f64) -> usize {
214 let ttl = self.config.announce_table_ttl_secs;
215 let mut removed = 0usize;
216
217 self.announce_table.retain(|_, entry| {
218 let keep = now <= entry.timestamp + ttl;
219 if !keep {
220 removed += 1;
221 }
222 keep
223 });
224
225 self.held_announces.retain(|_, entry| {
226 let keep = now <= entry.timestamp + ttl;
227 if !keep {
228 removed += 1;
229 }
230 keep
231 });
232
233 removed
234 }
235
236 fn oldest_retained_announce(&self) -> Option<([u8; 16], bool)> {
237 let oldest_active = self
238 .announce_table
239 .iter()
240 .map(|(dest_hash, entry)| (*dest_hash, false, entry.timestamp))
241 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
242 let oldest_held = self
243 .held_announces
244 .iter()
245 .map(|(dest_hash, entry)| (*dest_hash, true, entry.timestamp))
246 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
247
248 match (oldest_active, oldest_held) {
249 (Some(active), Some(held)) => {
250 let ordering = active
251 .2
252 .partial_cmp(&held.2)
253 .unwrap_or(core::cmp::Ordering::Equal);
254 if ordering == core::cmp::Ordering::Less {
255 Some((active.0, active.1))
256 } else {
257 Some((held.0, held.1))
258 }
259 }
260 (Some(active), None) => Some((active.0, active.1)),
261 (None, Some(held)) => Some((held.0, held.1)),
262 (None, None) => None,
263 }
264 }
265
266 fn enforce_announce_retention_cap(&mut self, now: f64) {
267 self.cull_expired_announce_entries(now);
268 while self.announce_retained_bytes_total() > self.config.announce_table_max_bytes {
269 let Some((dest_hash, is_held)) = self.oldest_retained_announce() else {
270 break;
271 };
272 if is_held {
273 self.held_announces.remove(&dest_hash);
274 } else {
275 self.announce_table.remove(&dest_hash);
276 }
277 }
278 }
279
280 fn insert_announce_entry(
281 &mut self,
282 dest_hash: [u8; 16],
283 entry: AnnounceEntry,
284 now: f64,
285 ) -> bool {
286 self.cull_expired_announce_entries(now);
287 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
288 return false;
289 }
290 self.announce_table.insert(dest_hash, entry);
291 self.enforce_announce_retention_cap(now);
292 self.announce_table.contains_key(&dest_hash)
293 }
294
295 fn insert_held_announce(
296 &mut self,
297 dest_hash: [u8; 16],
298 entry: AnnounceEntry,
299 now: f64,
300 ) -> bool {
301 self.cull_expired_announce_entries(now);
302 if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
303 return false;
304 }
305 self.held_announces.insert(dest_hash, entry);
306 self.enforce_announce_retention_cap(now);
307 self.held_announces.contains_key(&dest_hash)
308 }
309
310 pub fn register_interface(&mut self, info: InterfaceInfo) {
315 self.interfaces.insert(info.id, info);
316 }
317
318 pub fn deregister_interface(&mut self, id: InterfaceId) {
319 self.interfaces.remove(&id);
320 self.announce_queues.remove_interface(id);
321 self.ingress_control.remove_interface(&id);
322 }
323
324 pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
329 self.local_destinations.insert(dest_hash, dest_type);
330 }
331
332 pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
333 self.local_destinations.remove(dest_hash);
334 }
335
336 pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
341 self.path_table
342 .get(dest_hash)
343 .is_some_and(|ps| !ps.is_empty())
344 }
345
346 pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
347 self.path_table
348 .get(dest_hash)
349 .and_then(|ps| ps.primary())
350 .map(|e| e.hops)
351 }
352
353 pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
354 self.path_table
355 .get(dest_hash)
356 .and_then(|ps| ps.primary())
357 .map(|e| e.next_hop)
358 }
359
360 pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
361 self.path_table
362 .get(dest_hash)
363 .and_then(|ps| ps.primary())
364 .map(|e| e.receiving_interface)
365 }
366
367 pub fn mark_path_unresponsive(
377 &mut self,
378 dest_hash: &[u8; 16],
379 receiving_interface: Option<InterfaceId>,
380 ) {
381 if let Some(iface_id) = receiving_interface {
382 if let Some(info) = self.interfaces.get(&iface_id) {
383 if info.mode == constants::MODE_BOUNDARY {
384 return;
385 }
386 }
387 }
388
389 if let Some(ps) = self.path_table.get_mut(dest_hash) {
391 if ps.len() > 1 {
392 ps.failover(false); self.path_states.remove(dest_hash);
395 return;
396 }
397 }
398
399 self.path_states
400 .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
401 }
402
403 pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
404 self.path_states
405 .insert(*dest_hash, constants::STATE_RESPONSIVE);
406 }
407
408 pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
409 self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
410 }
411
412 pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
413 if let Some(ps) = self.path_table.get_mut(dest_hash) {
414 ps.expire_all();
415 }
416 }
417
418 pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
423 self.link_table.insert(link_id, entry);
424 }
425
426 pub fn validate_link(&mut self, link_id: &[u8; 16]) {
427 if let Some(entry) = self.link_table.get_mut(link_id) {
428 entry.validated = true;
429 }
430 }
431
432 pub fn remove_link(&mut self, link_id: &[u8; 16]) {
433 self.link_table.remove(link_id);
434 }
435
436 pub fn blackhole_identity(
442 &mut self,
443 identity_hash: [u8; 16],
444 now: f64,
445 duration_hours: Option<f64>,
446 reason: Option<String>,
447 ) {
448 let expires = match duration_hours {
449 Some(h) if h > 0.0 => now + h * 3600.0,
450 _ => 0.0, };
452 self.blackholed_identities.insert(
453 identity_hash,
454 BlackholeEntry {
455 created: now,
456 expires,
457 reason,
458 },
459 );
460 }
461
462 pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
464 self.blackholed_identities.remove(identity_hash).is_some()
465 }
466
467 pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
469 if let Some(entry) = self.blackholed_identities.get(identity_hash) {
470 if entry.expires == 0.0 || entry.expires > now {
471 return true;
472 }
473 }
474 false
475 }
476
477 pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
479 self.blackholed_identities.iter()
480 }
481
482 fn cull_blackholed(&mut self, now: f64) {
484 self.blackholed_identities
485 .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
486 }
487
488 pub fn handle_tunnel(
496 &mut self,
497 tunnel_id: [u8; 32],
498 interface: InterfaceId,
499 now: f64,
500 ) -> Vec<TransportAction> {
501 let mut actions = Vec::new();
502
503 if let Some(info) = self.interfaces.get_mut(&interface) {
505 info.tunnel_id = Some(tunnel_id);
506 }
507
508 let restored_paths = self.tunnel_table.handle_tunnel(
509 tunnel_id,
510 interface,
511 now,
512 self.config.destination_timeout_secs,
513 );
514
515 for (dest_hash, tunnel_path) in &restored_paths {
517 let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
518 Some(existing) => {
519 if tunnel_path.hops <= existing.hops || existing.expires < now {
522 let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
523 let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
524 tunnel_timebase >= existing_timebase
525 } else {
526 false
527 }
528 }
529 None => now < tunnel_path.expires,
530 };
531
532 if should_restore {
533 let entry = PathEntry {
534 timestamp: tunnel_path.timestamp,
535 next_hop: tunnel_path.received_from,
536 hops: tunnel_path.hops,
537 expires: tunnel_path.expires,
538 random_blobs: tunnel_path.random_blobs.clone(),
539 receiving_interface: interface,
540 packet_hash: tunnel_path.packet_hash,
541 announce_raw: None,
542 };
543 self.upsert_path_destination(*dest_hash, entry, now);
544 }
545 }
546
547 actions.push(TransportAction::TunnelEstablished {
548 tunnel_id,
549 interface,
550 });
551
552 actions
553 }
554
555 pub fn synthesize_tunnel(
563 &self,
564 identity: &rns_crypto::identity::Identity,
565 interface_id: InterfaceId,
566 rng: &mut dyn Rng,
567 ) -> Vec<TransportAction> {
568 let mut actions = Vec::new();
569
570 let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
572 hash::full_hash(info.name.as_bytes())
573 } else {
574 return actions;
575 };
576
577 match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
578 Ok((data, _tunnel_id)) => {
579 let dest_hash = crate::destination::destination_hash(
580 "rnstransport",
581 &["tunnel", "synthesize"],
582 None,
583 );
584 actions.push(TransportAction::TunnelSynthesize {
585 interface: interface_id,
586 data,
587 dest_hash,
588 });
589 }
590 Err(e) => {
591 let _ = e;
593 }
594 }
595
596 actions
597 }
598
599 pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
601 self.tunnel_table.void_tunnel_interface(tunnel_id);
602 }
603
604 pub fn tunnel_table(&self) -> &TunnelTable {
606 &self.tunnel_table
607 }
608
609 fn has_local_clients(&self) -> bool {
615 self.interfaces.values().any(|i| i.is_local_client)
616 }
617
618 fn packet_filter(&self, packet: &RawPacket) -> bool {
622 if packet.transport_id.is_some()
624 && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
625 {
626 if let Some(ref identity_hash) = self.config.identity_hash {
627 if packet.transport_id.as_ref() != Some(identity_hash) {
628 return false;
629 }
630 }
631 }
632
633 match packet.context {
635 constants::CONTEXT_KEEPALIVE
636 | constants::CONTEXT_RESOURCE_REQ
637 | constants::CONTEXT_RESOURCE_PRF
638 | constants::CONTEXT_RESOURCE
639 | constants::CONTEXT_CACHE_REQUEST
640 | constants::CONTEXT_CHANNEL => return true,
641 _ => {}
642 }
643
644 if packet.flags.destination_type == constants::DESTINATION_PLAIN
646 || packet.flags.destination_type == constants::DESTINATION_GROUP
647 {
648 if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
649 return packet.hops <= 1;
650 } else {
651 return false;
653 }
654 }
655
656 if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
658 return true;
659 }
660
661 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
663 && packet.flags.destination_type == constants::DESTINATION_SINGLE
664 {
665 return true;
666 }
667
668 false
669 }
670
671 pub fn handle_inbound(
679 &mut self,
680 raw: &[u8],
681 iface: InterfaceId,
682 now: f64,
683 rng: &mut dyn Rng,
684 ) -> Vec<TransportAction> {
685 self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
686 }
687
688 pub fn handle_inbound_with_announce_queue(
689 &mut self,
690 raw: &[u8],
691 iface: InterfaceId,
692 now: f64,
693 rng: &mut dyn Rng,
694 announce_queue: Option<&mut AnnounceVerifyQueue>,
695 ) -> Vec<TransportAction> {
696 let Some(ctx) = self.prepare_inbound_packet(raw, iface, now) else {
697 return Vec::new();
698 };
699 let mut actions = Vec::new();
700
701 self.remember_inbound_packet_hash(&ctx.packet);
702 self.bridge_plain_broadcast(&ctx, &mut actions);
703 self.handle_transport_forwarding(&ctx, &mut actions);
704 self.handle_link_table_routing(&ctx, &mut actions);
705 self.handle_inbound_announce(&ctx, rng, announce_queue, &mut actions);
706
707 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
708 self.process_inbound_proof(&ctx.packet, ctx.iface, ctx.now, &mut actions);
709 }
710
711 self.handle_inbound_local_delivery(&ctx, &mut actions);
712 actions
713 }
714
715 fn prepare_inbound_packet(
716 &self,
717 raw: &[u8],
718 iface: InterfaceId,
719 now: f64,
720 ) -> Option<InboundPacketCtx> {
721 let mut packet = RawPacket::unpack(raw).ok()?;
722 let from_local_client = self
723 .interfaces
724 .get(&iface)
725 .map(|i| i.is_local_client)
726 .unwrap_or(false);
727 packet.hops += 1;
728 if from_local_client {
729 packet.hops = packet.hops.saturating_sub(1);
730 }
731 if !self.packet_filter(&packet) {
732 return None;
733 }
734 Some(InboundPacketCtx {
735 packet,
736 original_raw: raw.to_vec(),
737 iface,
738 now,
739 from_local_client,
740 })
741 }
742
743 fn remember_inbound_packet_hash(&mut self, packet: &RawPacket) {
744 let remember_hash = !(self.link_table.contains_key(&packet.destination_hash)
745 || (packet.flags.packet_type == constants::PACKET_TYPE_PROOF
746 && packet.context == constants::CONTEXT_LRPROOF));
747 if remember_hash {
748 self.packet_hashlist.add(packet.packet_hash);
749 }
750 }
751
752 fn bridge_plain_broadcast(&self, ctx: &InboundPacketCtx, actions: &mut Vec<TransportAction>) {
753 if ctx.packet.flags.destination_type != constants::DESTINATION_PLAIN
754 || ctx.packet.flags.transport_type != constants::TRANSPORT_BROADCAST
755 || !self.has_local_clients()
756 {
757 return;
758 }
759
760 if ctx.from_local_client {
761 actions.push(TransportAction::ForwardPlainBroadcast {
762 raw: ctx.packet.raw.clone(),
763 to_local: false,
764 exclude: Some(ctx.iface),
765 });
766 } else {
767 actions.push(TransportAction::ForwardPlainBroadcast {
768 raw: ctx.packet.raw.clone(),
769 to_local: true,
770 exclude: None,
771 });
772 }
773 }
774
775 fn handle_transport_forwarding(
776 &mut self,
777 ctx: &InboundPacketCtx,
778 actions: &mut Vec<TransportAction>,
779 ) {
780 if !(self.config.transport_enabled || self.config.identity_hash.is_some()) {
781 return;
782 }
783 if ctx.packet.transport_id.is_none()
784 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
785 {
786 return;
787 }
788
789 let Some(identity_hash) = self.config.identity_hash else {
790 return;
791 };
792 if ctx.packet.transport_id != Some(identity_hash) {
793 return;
794 }
795
796 let Some(path_entry) = self
797 .path_table
798 .get(&ctx.packet.destination_hash)
799 .and_then(|ps| ps.primary())
800 else {
801 return;
802 };
803
804 let next_hop = path_entry.next_hop;
805 let remaining_hops = path_entry.hops;
806 let outbound_interface = path_entry.receiving_interface;
807 let new_raw =
808 forward_transport_packet(&ctx.packet, next_hop, remaining_hops, outbound_interface);
809
810 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
811 let proof_timeout = ctx.now
812 + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP * (remaining_hops.max(1) as f64);
813 let (link_id, link_entry) = create_link_entry(
814 &ctx.packet,
815 next_hop,
816 outbound_interface,
817 remaining_hops,
818 ctx.iface,
819 ctx.now,
820 proof_timeout,
821 );
822 self.link_table.insert(link_id, link_entry);
823 actions.push(TransportAction::LinkRequestReceived {
824 link_id,
825 destination_hash: ctx.packet.destination_hash,
826 receiving_interface: ctx.iface,
827 });
828 } else {
829 let (trunc_hash, reverse_entry) =
830 create_reverse_entry(&ctx.packet, outbound_interface, ctx.iface, ctx.now);
831 self.reverse_table.insert(trunc_hash, reverse_entry);
832 }
833
834 actions.push(TransportAction::SendOnInterface {
835 interface: outbound_interface,
836 raw: new_raw,
837 });
838
839 if let Some(entry) = self
840 .path_table
841 .get_mut(&ctx.packet.destination_hash)
842 .and_then(|ps| ps.primary_mut())
843 {
844 entry.timestamp = ctx.now;
845 }
846 }
847
848 fn handle_link_table_routing(
849 &mut self,
850 ctx: &InboundPacketCtx,
851 actions: &mut Vec<TransportAction>,
852 ) {
853 if !self.config.transport_enabled && self.config.identity_hash.is_none() {
854 return;
855 }
856 if ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
857 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
858 || ctx.packet.context == constants::CONTEXT_LRPROOF
859 {
860 return;
861 }
862
863 let Some(link_entry) = self.link_table.get(&ctx.packet.destination_hash).cloned() else {
864 return;
865 };
866 let Some((outbound_iface, new_raw)) =
867 route_via_link_table(&ctx.packet, &link_entry, ctx.iface)
868 else {
869 return;
870 };
871
872 self.packet_hashlist.add(ctx.packet.packet_hash);
873 actions.push(TransportAction::SendOnInterface {
874 interface: outbound_iface,
875 raw: new_raw,
876 });
877
878 if let Some(entry) = self.link_table.get_mut(&ctx.packet.destination_hash) {
879 entry.timestamp = ctx.now;
880 }
881 }
882
883 fn handle_inbound_announce(
884 &mut self,
885 ctx: &InboundPacketCtx,
886 rng: &mut dyn Rng,
887 announce_queue: Option<&mut AnnounceVerifyQueue>,
888 actions: &mut Vec<TransportAction>,
889 ) {
890 if ctx.packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
891 return;
892 }
893
894 if let Some(queue) = announce_queue {
895 self.try_enqueue_announce(ctx, rng, queue, actions);
896 } else {
897 self.process_inbound_announce(
898 &ctx.packet,
899 &ctx.original_raw,
900 ctx.iface,
901 ctx.now,
902 rng,
903 actions,
904 );
905 }
906 }
907
908 fn handle_inbound_local_delivery(
909 &self,
910 ctx: &InboundPacketCtx,
911 actions: &mut Vec<TransportAction>,
912 ) {
913 if (ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
914 || ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA)
915 && self
916 .local_destinations
917 .contains_key(&ctx.packet.destination_hash)
918 {
919 actions.push(TransportAction::DeliverLocal {
920 destination_hash: ctx.packet.destination_hash,
921 raw: ctx.packet.raw.clone(),
922 packet_hash: ctx.packet.packet_hash,
923 receiving_interface: ctx.iface,
924 });
925 }
926 }
927
928 fn process_inbound_announce(
933 &mut self,
934 packet: &RawPacket,
935 original_raw: &[u8],
936 iface: InterfaceId,
937 now: f64,
938 rng: &mut dyn Rng,
939 actions: &mut Vec<TransportAction>,
940 ) {
941 if packet.flags.destination_type != constants::DESTINATION_SINGLE {
942 return;
943 }
944
945 let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
946
947 let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
949 Ok(a) => a,
950 Err(_) => return,
951 };
952
953 let sig_cache_key =
954 Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
955
956 let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
957 announce.to_validated_unchecked()
958 } else {
959 match announce.validate(&packet.destination_hash) {
960 Ok(v) => {
961 self.announce_sig_cache.insert(sig_cache_key, now);
962 v
963 }
964 Err(_) => return,
965 }
966 };
967
968 let received_from = self.announce_received_from(packet, now);
969 let random_blob = match extract_random_blob(&packet.data) {
970 Some(b) => b,
971 None => return,
972 };
973 let announce_emitted = timebase_from_random_blob(&random_blob);
974
975 self.process_verified_announce(
976 VerifiedAnnounceCtx {
977 packet,
978 original_raw,
979 iface,
980 now,
981 validated,
982 received_from,
983 random_blob,
984 announce_emitted,
985 },
986 rng,
987 actions,
988 );
989 }
990
991 fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
992 let mut material = [0u8; 80];
993 material[..16].copy_from_slice(&destination_hash);
994 material[16..].copy_from_slice(signature);
995 hash::full_hash(&material)
996 }
997
998 fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
999 if let Some(transport_id) = packet.transport_id {
1000 if self.config.transport_enabled {
1001 if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
1002 {
1003 if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
1004 announce_entry.local_rebroadcasts += 1;
1005 if announce_entry.retries > 0
1006 && announce_entry.local_rebroadcasts
1007 >= constants::LOCAL_REBROADCASTS_MAX
1008 {
1009 self.announce_table.remove(&packet.destination_hash);
1010 }
1011 }
1012 if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
1013 {
1014 if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
1015 && announce_entry.retries > 0
1016 && now < announce_entry.retransmit_timeout
1017 {
1018 self.announce_table.remove(&packet.destination_hash);
1019 }
1020 }
1021 }
1022 }
1023 transport_id
1024 } else {
1025 packet.destination_hash
1026 }
1027 }
1028
1029 fn should_hold_announce(
1030 &mut self,
1031 packet: &RawPacket,
1032 original_raw: &[u8],
1033 iface: InterfaceId,
1034 now: f64,
1035 ) -> bool {
1036 if self.has_path(&packet.destination_hash) {
1037 return false;
1038 }
1039 if self
1040 .discovery_path_requests
1041 .contains_key(&packet.destination_hash)
1042 {
1043 return false;
1044 }
1045 let Some(info) = self.interfaces.get(&iface) else {
1046 return false;
1047 };
1048 if packet.context == constants::CONTEXT_PATH_RESPONSE
1049 || !self.ingress_control.should_ingress_limit(
1050 iface,
1051 &info.ingress_control,
1052 info.ia_freq,
1053 info.started,
1054 now,
1055 )
1056 {
1057 return false;
1058 }
1059 self.ingress_control.hold_announce(
1060 iface,
1061 &info.ingress_control,
1062 packet.destination_hash,
1063 ingress_control::HeldAnnounce {
1064 raw: original_raw.to_vec(),
1065 hops: packet.hops,
1066 receiving_interface: iface,
1067 timestamp: now,
1068 },
1069 );
1070 true
1071 }
1072
1073 fn try_enqueue_announce(
1074 &mut self,
1075 ctx: &InboundPacketCtx,
1076 rng: &mut dyn Rng,
1077 announce_queue: &mut AnnounceVerifyQueue,
1078 actions: &mut Vec<TransportAction>,
1079 ) {
1080 if ctx.packet.flags.destination_type != constants::DESTINATION_SINGLE {
1081 return;
1082 }
1083
1084 let has_ratchet = ctx.packet.flags.context_flag == constants::FLAG_SET;
1085 let announce = match AnnounceData::unpack(&ctx.packet.data, has_ratchet) {
1086 Ok(a) => a,
1087 Err(_) => return,
1088 };
1089
1090 let received_from = self.announce_received_from(&ctx.packet, ctx.now);
1091
1092 if self
1093 .local_destinations
1094 .contains_key(&ctx.packet.destination_hash)
1095 {
1096 log::debug!(
1097 "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1098 ctx.packet.destination_hash[0],
1099 ctx.packet.destination_hash[1],
1100 ctx.packet.destination_hash[2],
1101 ctx.packet.destination_hash[3],
1102 );
1103 return;
1104 }
1105
1106 if self.should_hold_announce(&ctx.packet, &ctx.original_raw, ctx.iface, ctx.now) {
1107 return;
1108 }
1109
1110 let sig_cache_key =
1111 Self::announce_sig_cache_key(ctx.packet.destination_hash, &announce.signature);
1112 if self.announce_sig_cache.contains(&sig_cache_key) {
1113 let validated = announce.to_validated_unchecked();
1114 let random_blob = match extract_random_blob(&ctx.packet.data) {
1115 Some(b) => b,
1116 None => return,
1117 };
1118 let announce_emitted = timebase_from_random_blob(&random_blob);
1119 self.process_verified_announce(
1120 VerifiedAnnounceCtx {
1121 packet: &ctx.packet,
1122 original_raw: &ctx.original_raw,
1123 iface: ctx.iface,
1124 now: ctx.now,
1125 validated,
1126 received_from,
1127 random_blob,
1128 announce_emitted,
1129 },
1130 rng,
1131 actions,
1132 );
1133 return;
1134 }
1135
1136 if ctx.packet.context == constants::CONTEXT_PATH_RESPONSE {
1137 let Ok(validated) = announce.validate(&ctx.packet.destination_hash) else {
1138 return;
1139 };
1140 self.announce_sig_cache.insert(sig_cache_key, ctx.now);
1141 let random_blob = match extract_random_blob(&ctx.packet.data) {
1142 Some(b) => b,
1143 None => return,
1144 };
1145 let announce_emitted = timebase_from_random_blob(&random_blob);
1146 self.process_verified_announce(
1147 VerifiedAnnounceCtx {
1148 packet: &ctx.packet,
1149 original_raw: &ctx.original_raw,
1150 iface: ctx.iface,
1151 now: ctx.now,
1152 validated,
1153 received_from,
1154 random_blob,
1155 announce_emitted,
1156 },
1157 rng,
1158 actions,
1159 );
1160 return;
1161 }
1162
1163 let random_blob = match extract_random_blob(&ctx.packet.data) {
1164 Some(b) => b,
1165 None => return,
1166 };
1167 let announce_emitted = timebase_from_random_blob(&random_blob);
1168 let key = AnnounceVerifyKey {
1169 destination_hash: ctx.packet.destination_hash,
1170 random_blob,
1171 received_from,
1172 };
1173 let pending = PendingAnnounce {
1174 original_raw: ctx.original_raw.clone(),
1175 packet: ctx.packet.clone(),
1176 interface: ctx.iface,
1177 received_from,
1178 queued_at: ctx.now,
1179 best_hops: ctx.packet.hops,
1180 emission_ts: announce_emitted,
1181 random_blob,
1182 };
1183 let _ = announce_queue.enqueue(key, pending);
1184 }
1185
1186 pub fn complete_verified_announce(
1187 &mut self,
1188 pending: PendingAnnounce,
1189 validated: crate::announce::ValidatedAnnounce,
1190 sig_cache_key: [u8; 32],
1191 now: f64,
1192 rng: &mut dyn Rng,
1193 ) -> Vec<TransportAction> {
1194 self.announce_sig_cache.insert(sig_cache_key, now);
1195 let mut actions = Vec::new();
1196 self.process_verified_announce(
1197 VerifiedAnnounceCtx {
1198 packet: &pending.packet,
1199 original_raw: &pending.original_raw,
1200 iface: pending.interface,
1201 now,
1202 validated,
1203 received_from: pending.received_from,
1204 random_blob: pending.random_blob,
1205 announce_emitted: pending.emission_ts,
1206 },
1207 rng,
1208 &mut actions,
1209 );
1210 actions
1211 }
1212
1213 pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1214
1215 fn process_verified_announce(
1216 &mut self,
1217 ctx: VerifiedAnnounceCtx<'_>,
1218 rng: &mut dyn Rng,
1219 actions: &mut Vec<TransportAction>,
1220 ) {
1221 if self.is_blackholed(&ctx.validated.identity_hash, ctx.now) {
1222 return;
1223 }
1224 if ctx.packet.hops > constants::PATHFINDER_M {
1225 return;
1226 }
1227
1228 let existing_set = self.path_table.get(&ctx.packet.destination_hash);
1229 let was_unknown_destination = existing_set.map_or(true, |ps| ps.is_empty());
1230
1231 if was_unknown_destination {
1234 self.path_states.remove(&ctx.packet.destination_hash);
1235 }
1236
1237 let is_unresponsive = self.path_is_unresponsive(&ctx.packet.destination_hash);
1239
1240 let mp_decision = decide_announce_multipath(
1241 existing_set,
1242 ctx.packet.hops,
1243 ctx.announce_emitted,
1244 &ctx.random_blob,
1245 &ctx.received_from,
1246 is_unresponsive,
1247 ctx.now,
1248 self.config.prefer_shorter_path,
1249 );
1250
1251 if mp_decision == MultiPathDecision::Reject {
1252 log::debug!(
1253 "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1254 ctx.packet.destination_hash[0],
1255 ctx.packet.destination_hash[1],
1256 ctx.packet.destination_hash[2],
1257 ctx.packet.destination_hash[3],
1258 );
1259 return;
1260 }
1261
1262 let rate_blocked = if ctx.packet.context != constants::CONTEXT_PATH_RESPONSE {
1264 if let Some(iface_info) = self.interfaces.get(&ctx.iface) {
1265 self.rate_limiter.check_and_update(
1266 &ctx.packet.destination_hash,
1267 ctx.now,
1268 iface_info.announce_rate_target,
1269 iface_info.announce_rate_grace,
1270 iface_info.announce_rate_penalty,
1271 )
1272 } else {
1273 false
1274 }
1275 } else {
1276 false
1277 };
1278
1279 let interface_mode = self
1281 .interfaces
1282 .get(&ctx.iface)
1283 .map(|i| i.mode)
1284 .unwrap_or(constants::MODE_FULL);
1285
1286 let expires = compute_path_expires(ctx.now, interface_mode);
1287
1288 let existing_blobs = self
1290 .path_table
1291 .get(&ctx.packet.destination_hash)
1292 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1293 .map(|e| e.random_blobs.clone())
1294 .unwrap_or_default();
1295
1296 let mut rng_bytes = [0u8; 8];
1298 rng.fill_bytes(&mut rng_bytes);
1299 let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1300
1301 let is_path_response = ctx.packet.context == constants::CONTEXT_PATH_RESPONSE;
1302
1303 let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1304 ctx.packet.destination_hash,
1305 ctx.packet.hops,
1306 &ctx.packet.data,
1307 &ctx.packet.raw,
1308 ctx.packet.packet_hash,
1309 ctx.packet.flags.context_flag,
1310 ctx.received_from,
1311 ctx.iface,
1312 ctx.now,
1313 existing_blobs,
1314 ctx.random_blob,
1315 expires,
1316 rng_value,
1317 self.config.transport_enabled,
1318 is_path_response,
1319 rate_blocked,
1320 Some(ctx.original_raw.to_vec()),
1321 );
1322
1323 actions.push(TransportAction::CacheAnnounce {
1325 packet_hash: ctx.packet.packet_hash,
1326 raw: ctx.original_raw.to_vec(),
1327 });
1328
1329 self.upsert_path_destination(ctx.packet.destination_hash, path_entry, ctx.now);
1331
1332 if let Some(tunnel_id) = self.interfaces.get(&ctx.iface).and_then(|i| i.tunnel_id) {
1334 let blobs = self
1335 .path_table
1336 .get(&ctx.packet.destination_hash)
1337 .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1338 .map(|e| e.random_blobs.clone())
1339 .unwrap_or_default();
1340 self.tunnel_table.store_tunnel_path(
1341 &tunnel_id,
1342 ctx.packet.destination_hash,
1343 tunnel::TunnelPath {
1344 timestamp: ctx.now,
1345 received_from: ctx.received_from,
1346 hops: ctx.packet.hops,
1347 expires,
1348 random_blobs: blobs,
1349 packet_hash: ctx.packet.packet_hash,
1350 },
1351 ctx.now,
1352 self.config.destination_timeout_secs,
1353 self.config.max_tunnel_destinations_total,
1354 );
1355 }
1356
1357 self.path_states.remove(&ctx.packet.destination_hash);
1360
1361 if let Some(ann) = announce_entry {
1363 self.insert_announce_entry(ctx.packet.destination_hash, ann, ctx.now);
1364 }
1365
1366 actions.push(TransportAction::AnnounceReceived {
1368 destination_hash: ctx.packet.destination_hash,
1369 identity_hash: ctx.validated.identity_hash,
1370 public_key: ctx.validated.public_key,
1371 name_hash: ctx.validated.name_hash,
1372 random_hash: ctx.validated.random_hash,
1373 app_data: ctx.validated.app_data,
1374 hops: ctx.packet.hops,
1375 receiving_interface: ctx.iface,
1376 });
1377
1378 actions.push(TransportAction::PathUpdated {
1379 destination_hash: ctx.packet.destination_hash,
1380 hops: ctx.packet.hops,
1381 next_hop: ctx.received_from,
1382 interface: ctx.iface,
1383 });
1384
1385 if self.has_local_clients() {
1387 actions.push(TransportAction::ForwardToLocalClients {
1388 raw: ctx.packet.raw.clone(),
1389 exclude: Some(ctx.iface),
1390 });
1391 }
1392
1393 if let Some(pr_entry) = self.discovery_path_requests_waiting(&ctx.packet.destination_hash) {
1395 let entry = AnnounceEntry {
1397 timestamp: ctx.now,
1398 retransmit_timeout: ctx.now,
1399 retries: constants::PATHFINDER_R,
1400 received_from: ctx.received_from,
1401 hops: ctx.packet.hops,
1402 packet_raw: ctx.packet.raw.clone(),
1403 packet_data: ctx.packet.data.clone(),
1404 destination_hash: ctx.packet.destination_hash,
1405 context_flag: ctx.packet.flags.context_flag,
1406 local_rebroadcasts: 0,
1407 block_rebroadcasts: true,
1408 attached_interface: Some(pr_entry),
1409 };
1410 self.insert_announce_entry(ctx.packet.destination_hash, entry, ctx.now);
1411 }
1412 }
1413
1414 pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1415 self.announce_sig_cache.contains(sig_cache_key)
1416 }
1417
1418 fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1421 self.discovery_path_requests
1422 .remove(dest_hash)
1423 .map(|req| req.requesting_interface)
1424 }
1425
1426 fn process_inbound_proof(
1431 &mut self,
1432 packet: &RawPacket,
1433 iface: InterfaceId,
1434 _now: f64,
1435 actions: &mut Vec<TransportAction>,
1436 ) {
1437 if packet.context == constants::CONTEXT_LRPROOF {
1438 if (self.config.transport_enabled)
1440 && self.link_table.contains_key(&packet.destination_hash)
1441 {
1442 let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1443 if let Some(entry) = link_entry {
1444 if let Some((outbound_interface, new_raw)) =
1445 route_via_link_table(packet, &entry, iface)
1446 {
1447 if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1452 le.validated = true;
1453 }
1454
1455 actions.push(TransportAction::LinkEstablished {
1456 link_id: packet.destination_hash,
1457 interface: outbound_interface,
1458 });
1459
1460 actions.push(TransportAction::SendOnInterface {
1461 interface: outbound_interface,
1462 raw: new_raw,
1463 });
1464 }
1465 }
1466 } else {
1467 actions.push(TransportAction::DeliverLocal {
1469 destination_hash: packet.destination_hash,
1470 raw: packet.raw.clone(),
1471 packet_hash: packet.packet_hash,
1472 receiving_interface: iface,
1473 });
1474 }
1475 } else {
1476 if self.config.transport_enabled {
1478 if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1479 if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1480 actions.push(action);
1481 }
1482 }
1483 }
1484
1485 actions.push(TransportAction::DeliverLocal {
1487 destination_hash: packet.destination_hash,
1488 raw: packet.raw.clone(),
1489 packet_hash: packet.packet_hash,
1490 receiving_interface: iface,
1491 });
1492 }
1493 }
1494
1495 pub fn handle_outbound(
1501 &mut self,
1502 packet: &RawPacket,
1503 dest_type: u8,
1504 attached_interface: Option<InterfaceId>,
1505 now: f64,
1506 ) -> Vec<TransportAction> {
1507 let actions = route_outbound(
1508 &self.path_table,
1509 &self.interfaces,
1510 &self.local_destinations,
1511 packet,
1512 dest_type,
1513 attached_interface,
1514 now,
1515 );
1516
1517 self.packet_hashlist.add(packet.packet_hash);
1519
1520 if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1522 self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1523 } else {
1524 actions
1525 }
1526 }
1527
1528 fn gate_announce_actions(
1530 &mut self,
1531 actions: Vec<TransportAction>,
1532 dest_hash: &[u8; 16],
1533 hops: u8,
1534 now: f64,
1535 ) -> Vec<TransportAction> {
1536 let mut result = Vec::new();
1537 for action in actions {
1538 match action {
1539 TransportAction::SendOnInterface { interface, raw } => {
1540 let (bitrate, announce_cap) =
1541 if let Some(info) = self.interfaces.get(&interface) {
1542 (info.bitrate, info.announce_cap)
1543 } else {
1544 (None, constants::ANNOUNCE_CAP)
1545 };
1546 if let Some(send_action) = self.announce_queues.gate_announce(
1547 interface,
1548 raw,
1549 *dest_hash,
1550 hops,
1551 now,
1552 now,
1553 bitrate,
1554 announce_cap,
1555 ) {
1556 result.push(send_action);
1557 }
1558 }
1560 other => result.push(other),
1561 }
1562 }
1563 result
1564 }
1565
1566 pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1572 let mut ctx = TickCtx {
1573 now,
1574 rng,
1575 actions: Vec::new(),
1576 };
1577 self.process_tick_pending_announces(&mut ctx);
1578
1579 let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1580 ctx.actions.append(&mut queue_actions);
1581
1582 self.process_tick_ingress_release(&mut ctx);
1583 self.cull_tick_tables(&mut ctx);
1584 ctx.actions
1585 }
1586
1587 fn process_tick_pending_announces(&mut self, ctx: &mut TickCtx<'_>) {
1588 if ctx.now <= self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1589 return;
1590 }
1591
1592 self.cull_expired_announce_entries(ctx.now);
1593 self.enforce_announce_retention_cap(ctx.now);
1594 if let Some(identity_hash) = self.config.identity_hash {
1595 let announce_actions = jobs::process_pending_announces(
1596 &mut self.announce_table,
1597 &mut self.held_announces,
1598 &identity_hash,
1599 ctx.now,
1600 );
1601 let gated = self.gate_retransmit_actions(announce_actions, ctx.now);
1602 ctx.actions.extend(gated);
1603 }
1604 self.cull_expired_announce_entries(ctx.now);
1605 self.enforce_announce_retention_cap(ctx.now);
1606 self.announces_last_checked = ctx.now;
1607 }
1608
1609 fn process_tick_ingress_release(&mut self, ctx: &mut TickCtx<'_>) {
1610 let ic_interfaces = self.ingress_control.interfaces_with_held();
1611 for iface_id in ic_interfaces {
1612 let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1613 Some(info) => (info.ia_freq, info.started, info.ingress_control),
1614 None => continue,
1615 };
1616 if !ingress_config.enabled {
1617 continue;
1618 }
1619 if let Some(held) = self.ingress_control.process_held_announces(
1620 iface_id,
1621 &ingress_config,
1622 ia_freq,
1623 started,
1624 ctx.now,
1625 ) {
1626 let released_actions =
1627 self.handle_inbound(&held.raw, held.receiving_interface, ctx.now, ctx.rng);
1628 ctx.actions.extend(released_actions);
1629 }
1630 }
1631 }
1632
1633 fn cull_tick_tables(&mut self, ctx: &mut TickCtx<'_>) {
1634 if ctx.now <= self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1635 return;
1636 }
1637
1638 jobs::cull_path_table(&mut self.path_table, &self.interfaces, ctx.now);
1639 jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, ctx.now);
1640 let (_culled, link_closed_actions) =
1641 jobs::cull_link_table(&mut self.link_table, &self.interfaces, ctx.now);
1642 ctx.actions.extend(link_closed_actions);
1643 jobs::cull_path_states(&mut self.path_states, &self.path_table);
1644 self.cull_blackholed(ctx.now);
1645 self.discovery_path_requests
1646 .retain(|_, req| ctx.now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1647 self.tunnel_table
1648 .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1649 self.tunnel_table.cull(ctx.now);
1650 self.announce_sig_cache.cull(ctx.now);
1651 self.tables_last_culled = ctx.now;
1652 }
1653
1654 fn gate_retransmit_actions(
1659 &mut self,
1660 actions: Vec<TransportAction>,
1661 now: f64,
1662 ) -> Vec<TransportAction> {
1663 let mut result = Vec::new();
1664 for action in actions {
1665 match action {
1666 TransportAction::SendOnInterface { interface, raw } => {
1667 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1669 let (bitrate, announce_cap) =
1670 if let Some(info) = self.interfaces.get(&interface) {
1671 (info.bitrate, info.announce_cap)
1672 } else {
1673 (None, constants::ANNOUNCE_CAP)
1674 };
1675 if let Some(send_action) = self.announce_queues.gate_announce(
1676 interface,
1677 raw,
1678 dest_hash,
1679 hops,
1680 now,
1681 now,
1682 bitrate,
1683 announce_cap,
1684 ) {
1685 result.push(send_action);
1686 }
1687 }
1688 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1689 let (dest_hash, hops) = Self::extract_announce_info(&raw);
1690 let iface_ids: Vec<(InterfaceId, Option<u64>, f64)> = self
1693 .interfaces
1694 .iter()
1695 .filter(|(_, info)| info.out_capable)
1696 .filter(|(id, _)| {
1697 if let Some(ref ex) = exclude {
1698 **id != *ex
1699 } else {
1700 true
1701 }
1702 })
1703 .filter(|(_, info)| {
1704 should_transmit_announce(
1705 info,
1706 &dest_hash,
1707 hops,
1708 &self.local_destinations,
1709 &self.path_table,
1710 &self.interfaces,
1711 )
1712 })
1713 .map(|(id, info)| (*id, info.bitrate, info.announce_cap))
1714 .collect();
1715
1716 for (iface_id, bitrate, announce_cap) in iface_ids {
1717 if let Some(send_action) = self.announce_queues.gate_announce(
1718 iface_id,
1719 raw.clone(),
1720 dest_hash,
1721 hops,
1722 now,
1723 now,
1724 bitrate,
1725 announce_cap,
1726 ) {
1727 result.push(send_action);
1728 }
1729 }
1730 }
1731 other => result.push(other),
1732 }
1733 }
1734 result
1735 }
1736
1737 fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1739 if raw.len() < 18 {
1740 return ([0; 16], 0);
1741 }
1742 let header_type = (raw[0] >> 6) & 0x03;
1743 let hops = raw[1];
1744 if header_type == constants::HEADER_2 && raw.len() >= 34 {
1745 let mut dest = [0u8; 16];
1747 dest.copy_from_slice(&raw[18..34]);
1748 (dest, hops)
1749 } else {
1750 let mut dest = [0u8; 16];
1752 dest.copy_from_slice(&raw[2..18]);
1753 (dest, hops)
1754 }
1755 }
1756
1757 pub fn handle_path_request(
1770 &mut self,
1771 data: &[u8],
1772 interface_id: InterfaceId,
1773 now: f64,
1774 ) -> Vec<TransportAction> {
1775 let Some(ctx) = self.parse_path_request(data, interface_id, now) else {
1776 return Vec::new();
1777 };
1778 if self.local_destinations.contains_key(&ctx.destination_hash) {
1779 return Vec::new();
1780 }
1781 if self.config.transport_enabled && self.has_path(&ctx.destination_hash) {
1782 self.handle_known_path_request(&ctx);
1783 return Vec::new();
1784 }
1785 if self.config.transport_enabled {
1786 return self.handle_discovery_path_request(&ctx);
1787 }
1788 Vec::new()
1789 }
1790
1791 fn parse_path_request<'a>(
1792 &mut self,
1793 data: &'a [u8],
1794 interface_id: InterfaceId,
1795 now: f64,
1796 ) -> Option<PathRequestCtx<'a>> {
1797 if data.len() < 16 {
1798 return None;
1799 }
1800
1801 let mut destination_hash = [0u8; 16];
1802 destination_hash.copy_from_slice(&data[..16]);
1803
1804 let tag_bytes = if data.len() > 32 {
1805 Some(&data[32..])
1806 } else if data.len() > 16 {
1807 Some(&data[16..])
1808 } else {
1809 None
1810 }?;
1811
1812 let tag_len = tag_bytes.len().min(16);
1813 let mut unique_tag = [0u8; 32];
1814 unique_tag[..16].copy_from_slice(&destination_hash);
1815 unique_tag[16..16 + tag_len].copy_from_slice(&tag_bytes[..tag_len]);
1816 if !self.insert_discovery_pr_tag(unique_tag) {
1817 return None;
1818 }
1819
1820 Some(PathRequestCtx {
1821 data,
1822 interface_id,
1823 now,
1824 destination_hash,
1825 })
1826 }
1827
1828 fn handle_known_path_request(&mut self, ctx: &PathRequestCtx<'_>) {
1829 let Some(path) = self
1830 .path_table
1831 .get(&ctx.destination_hash)
1832 .and_then(|ps| ps.primary())
1833 .cloned()
1834 else {
1835 return;
1836 };
1837
1838 if let Some(recv_info) = self.interfaces.get(&ctx.interface_id) {
1839 if recv_info.mode == constants::MODE_ROAMING
1840 && path.receiving_interface == ctx.interface_id
1841 {
1842 return;
1843 }
1844 }
1845
1846 let Some(raw) = path.announce_raw.as_ref() else {
1847 return;
1848 };
1849 if let Some(existing) = self.announce_table.remove(&ctx.destination_hash) {
1850 self.insert_held_announce(ctx.destination_hash, existing, ctx.now);
1851 }
1852 let retransmit_timeout = if let Some(iface_info) = self.interfaces.get(&ctx.interface_id) {
1853 let base = ctx.now + constants::PATH_REQUEST_GRACE;
1854 if iface_info.mode == constants::MODE_ROAMING {
1855 base + constants::PATH_REQUEST_RG
1856 } else {
1857 base
1858 }
1859 } else {
1860 ctx.now + constants::PATH_REQUEST_GRACE
1861 };
1862
1863 let Ok(parsed) = RawPacket::unpack(raw) else {
1864 return;
1865 };
1866
1867 let entry = AnnounceEntry {
1868 timestamp: ctx.now,
1869 retransmit_timeout,
1870 retries: constants::PATHFINDER_R,
1871 received_from: path.next_hop,
1872 hops: path.hops,
1873 packet_raw: raw.clone(),
1874 packet_data: parsed.data,
1875 destination_hash: ctx.destination_hash,
1876 context_flag: parsed.flags.context_flag,
1877 local_rebroadcasts: 0,
1878 block_rebroadcasts: true,
1879 attached_interface: Some(ctx.interface_id),
1880 };
1881
1882 self.insert_announce_entry(ctx.destination_hash, entry, ctx.now);
1883 }
1884
1885 fn handle_discovery_path_request(&mut self, ctx: &PathRequestCtx<'_>) -> Vec<TransportAction> {
1886 let should_discover = self
1887 .interfaces
1888 .get(&ctx.interface_id)
1889 .map(|info| constants::DISCOVER_PATHS_FOR.contains(&info.mode))
1890 .unwrap_or(false);
1891 if !should_discover {
1892 return Vec::new();
1893 }
1894
1895 self.discovery_path_requests.insert(
1896 ctx.destination_hash,
1897 DiscoveryPathRequest {
1898 timestamp: ctx.now,
1899 requesting_interface: ctx.interface_id,
1900 },
1901 );
1902
1903 self.interfaces
1904 .values()
1905 .filter(|iface_info| iface_info.id != ctx.interface_id && iface_info.out_capable)
1906 .map(|iface_info| TransportAction::SendOnInterface {
1907 interface: iface_info.id,
1908 raw: ctx.data.to_vec(),
1909 })
1910 .collect()
1911 }
1912
1913 pub fn path_table_entries(&self) -> impl Iterator<Item = (&[u8; 16], &PathEntry)> {
1919 self.path_table
1920 .iter()
1921 .filter_map(|(k, ps)| ps.primary().map(|e| (k, e)))
1922 }
1923
1924 pub fn path_table_sets(&self) -> impl Iterator<Item = (&[u8; 16], &PathSet)> {
1926 self.path_table.iter()
1927 }
1928
1929 pub fn interface_count(&self) -> usize {
1931 self.interfaces.len()
1932 }
1933
1934 pub fn link_table_count(&self) -> usize {
1936 self.link_table.len()
1937 }
1938
1939 pub fn path_table_count(&self) -> usize {
1941 self.path_table.len()
1942 }
1943
1944 pub fn announce_table_count(&self) -> usize {
1946 self.announce_table.len()
1947 }
1948
1949 pub fn reverse_table_count(&self) -> usize {
1951 self.reverse_table.len()
1952 }
1953
1954 pub fn held_announces_count(&self) -> usize {
1956 self.held_announces.len()
1957 }
1958
1959 pub fn packet_hashlist_len(&self) -> usize {
1961 self.packet_hashlist.len()
1962 }
1963
1964 pub fn announce_sig_cache_len(&self) -> usize {
1966 self.announce_sig_cache.len()
1967 }
1968
1969 pub fn rate_limiter_count(&self) -> usize {
1971 self.rate_limiter.len()
1972 }
1973
1974 pub fn blackholed_count(&self) -> usize {
1976 self.blackholed_identities.len()
1977 }
1978
1979 pub fn tunnel_count(&self) -> usize {
1981 self.tunnel_table.len()
1982 }
1983
1984 pub fn discovery_pr_tags_count(&self) -> usize {
1986 self.discovery_pr_tags.len()
1987 }
1988
1989 pub fn discovery_path_requests_count(&self) -> usize {
1991 self.discovery_path_requests.len()
1992 }
1993
1994 pub fn announce_queue_count(&self) -> usize {
1996 self.announce_queues.queue_count()
1997 }
1998
1999 pub fn nonempty_announce_queue_count(&self) -> usize {
2001 self.announce_queues.nonempty_queue_count()
2002 }
2003
2004 pub fn queued_announce_count(&self) -> usize {
2006 self.announce_queues.total_queued_announces()
2007 }
2008
2009 pub fn queued_announce_bytes(&self) -> usize {
2011 self.announce_queues.total_queued_bytes()
2012 }
2013
2014 pub fn announce_queue_interface_cap_drop_count(&self) -> u64 {
2016 self.announce_queues.interface_cap_drop_count()
2017 }
2018
2019 pub fn local_destinations_count(&self) -> usize {
2021 self.local_destinations.len()
2022 }
2023
2024 pub fn rate_limiter(&self) -> &AnnounceRateLimiter {
2026 &self.rate_limiter
2027 }
2028
2029 pub fn interface_info(&self, id: &InterfaceId) -> Option<&InterfaceInfo> {
2031 self.interfaces.get(id)
2032 }
2033
2034 pub fn redirect_path(&mut self, dest_hash: &[u8; 16], interface: InterfaceId, now: f64) {
2037 if let Some(entry) = self
2038 .path_table
2039 .get_mut(dest_hash)
2040 .and_then(|ps| ps.primary_mut())
2041 {
2042 entry.receiving_interface = interface;
2043 entry.hops = 1;
2044 } else {
2045 self.upsert_path_destination(
2046 *dest_hash,
2047 PathEntry {
2048 timestamp: now,
2049 next_hop: [0u8; 16],
2050 hops: 1,
2051 expires: now + 3600.0,
2052 random_blobs: Vec::new(),
2053 receiving_interface: interface,
2054 packet_hash: [0u8; 32],
2055 announce_raw: None,
2056 },
2057 now,
2058 );
2059 }
2060 }
2061
2062 pub fn inject_path(&mut self, dest_hash: [u8; 16], entry: PathEntry) {
2064 self.upsert_path_destination(dest_hash, entry.clone(), entry.timestamp);
2065 }
2066
2067 pub fn drop_path(&mut self, dest_hash: &[u8; 16]) -> bool {
2069 self.path_table.remove(dest_hash).is_some()
2070 }
2071
2072 pub fn drop_all_via(&mut self, transport_hash: &[u8; 16]) -> usize {
2077 let mut removed = 0usize;
2078 for ps in self.path_table.values_mut() {
2079 let before = ps.len();
2080 ps.retain(|entry| &entry.next_hop != transport_hash);
2081 removed += before - ps.len();
2082 }
2083 self.path_table.retain(|_, ps| !ps.is_empty());
2084 removed
2085 }
2086
2087 pub fn drop_paths_for_interface(&mut self, interface: InterfaceId) -> usize {
2089 let mut removed = 0usize;
2090 let mut cleared_destinations = Vec::new();
2091 for (dest_hash, ps) in self.path_table.iter_mut() {
2092 let before = ps.len();
2093 ps.retain(|entry| entry.receiving_interface != interface);
2094 if ps.is_empty() {
2095 cleared_destinations.push(*dest_hash);
2096 }
2097 removed += before - ps.len();
2098 }
2099 self.path_table.retain(|_, ps| !ps.is_empty());
2100 for dest_hash in cleared_destinations {
2101 self.path_states.remove(&dest_hash);
2102 }
2103 removed
2104 }
2105
2106 pub fn drop_reverse_for_interface(&mut self, interface: InterfaceId) -> usize {
2108 let before = self.reverse_table.len();
2109 self.reverse_table.retain(|_, entry| {
2110 entry.receiving_interface != interface && entry.outbound_interface != interface
2111 });
2112 before - self.reverse_table.len()
2113 }
2114
2115 pub fn drop_links_for_interface(&mut self, interface: InterfaceId) -> usize {
2117 let before = self.link_table.len();
2118 self.link_table.retain(|_, entry| {
2119 entry.next_hop_interface != interface && entry.received_interface != interface
2120 });
2121 before - self.link_table.len()
2122 }
2123
2124 pub fn drop_announce_queues(&mut self) {
2126 self.announce_table.clear();
2127 self.held_announces.clear();
2128 self.announce_queues = AnnounceQueues::new(self.config.announce_queue_max_interfaces);
2129 self.ingress_control.clear();
2130 }
2131
2132 pub fn identity_hash(&self) -> Option<&[u8; 16]> {
2134 self.config.identity_hash.as_ref()
2135 }
2136
2137 pub fn transport_enabled(&self) -> bool {
2139 self.config.transport_enabled
2140 }
2141
2142 pub fn config(&self) -> &TransportConfig {
2144 &self.config
2145 }
2146
2147 pub fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
2148 self.config.packet_hashlist_max_entries = max_entries;
2149 self.packet_hashlist = PacketHashlist::new(max_entries);
2150 }
2151
2152 pub fn get_path_table(&self, max_hops: Option<u8>) -> Vec<PathTableRow> {
2156 let mut result = Vec::new();
2157 for (dest_hash, ps) in self.path_table.iter() {
2158 if let Some(entry) = ps.primary() {
2159 if let Some(max) = max_hops {
2160 if entry.hops > max {
2161 continue;
2162 }
2163 }
2164 let iface_name = self
2165 .interfaces
2166 .get(&entry.receiving_interface)
2167 .map(|i| i.name.clone())
2168 .unwrap_or_else(|| {
2169 alloc::format!("Interface({})", entry.receiving_interface.0)
2170 });
2171 result.push((
2172 *dest_hash,
2173 entry.timestamp,
2174 entry.next_hop,
2175 entry.hops,
2176 entry.expires,
2177 iface_name,
2178 ));
2179 }
2180 }
2181 result
2182 }
2183
2184 pub fn get_rate_table(&self) -> Vec<RateTableRow> {
2187 self.rate_limiter
2188 .entries()
2189 .map(|(hash, entry)| {
2190 (
2191 *hash,
2192 entry.last,
2193 entry.rate_violations,
2194 entry.blocked_until,
2195 entry.timestamps.clone(),
2196 )
2197 })
2198 .collect()
2199 }
2200
2201 pub fn get_blackholed(&self) -> Vec<([u8; 16], f64, f64, Option<alloc::string::String>)> {
2204 self.blackholed_entries()
2205 .map(|(hash, entry)| (*hash, entry.created, entry.expires, entry.reason.clone()))
2206 .collect()
2207 }
2208
2209 pub fn active_destination_hashes(&self) -> alloc::collections::BTreeSet<[u8; 16]> {
2215 self.path_table.keys().copied().collect()
2216 }
2217
2218 pub fn path_destination_cap_evict_count(&self) -> usize {
2219 self.path_destination_cap_evict_count
2220 }
2221
2222 pub fn active_packet_hashes(&self) -> Vec<[u8; 32]> {
2224 self.path_table
2225 .values()
2226 .flat_map(|ps| ps.iter().map(|p| p.packet_hash))
2227 .collect()
2228 }
2229
2230 pub fn cull_rate_limiter(
2233 &mut self,
2234 active: &alloc::collections::BTreeSet<[u8; 16]>,
2235 now: f64,
2236 ttl_secs: f64,
2237 ) -> usize {
2238 self.rate_limiter.cull_stale(active, now, ttl_secs)
2239 }
2240
2241 pub fn update_interface_freq(&mut self, id: InterfaceId, ia_freq: f64) {
2247 if let Some(info) = self.interfaces.get_mut(&id) {
2248 info.ia_freq = ia_freq;
2249 }
2250 }
2251
2252 pub fn held_announce_count(&self, interface: &InterfaceId) -> usize {
2254 self.ingress_control.held_count(interface)
2255 }
2256
2257 #[cfg(test)]
2262 #[allow(dead_code)]
2263 pub(crate) fn path_table(&self) -> &BTreeMap<[u8; 16], PathSet> {
2264 &self.path_table
2265 }
2266
2267 #[cfg(test)]
2268 #[allow(dead_code)]
2269 pub(crate) fn announce_table(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2270 &self.announce_table
2271 }
2272
2273 #[cfg(test)]
2274 #[allow(dead_code)]
2275 pub(crate) fn held_announces(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2276 &self.held_announces
2277 }
2278
2279 #[cfg(test)]
2280 #[allow(dead_code)]
2281 pub(crate) fn announce_retained_bytes(&self) -> usize {
2282 self.announce_retained_bytes_total()
2283 }
2284
2285 #[cfg(test)]
2286 #[allow(dead_code)]
2287 pub(crate) fn reverse_table(&self) -> &BTreeMap<[u8; 16], tables::ReverseEntry> {
2288 &self.reverse_table
2289 }
2290
2291 #[cfg(test)]
2292 #[allow(dead_code)]
2293 pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
2294 &self.link_table
2295 }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300 use super::*;
2301 use crate::packet::PacketFlags;
2302
2303 fn make_config(transport_enabled: bool) -> TransportConfig {
2304 TransportConfig {
2305 transport_enabled,
2306 identity_hash: if transport_enabled {
2307 Some([0x42; 16])
2308 } else {
2309 None
2310 },
2311 prefer_shorter_path: false,
2312 max_paths_per_destination: 1,
2313 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
2314 max_discovery_pr_tags: constants::MAX_PR_TAGS,
2315 max_path_destinations: usize::MAX,
2316 max_tunnel_destinations_total: usize::MAX,
2317 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
2318 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
2319 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
2320 announce_sig_cache_enabled: true,
2321 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
2322 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
2323 announce_queue_max_entries: 256,
2324 announce_queue_max_interfaces: 1024,
2325 }
2326 }
2327
2328 fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
2329 InterfaceInfo {
2330 id: InterfaceId(id),
2331 name: String::from("test"),
2332 mode,
2333 out_capable: true,
2334 in_capable: true,
2335 bitrate: None,
2336 announce_rate_target: None,
2337 announce_rate_grace: 0,
2338 announce_rate_penalty: 0.0,
2339 announce_cap: constants::ANNOUNCE_CAP,
2340 is_local_client: false,
2341 wants_tunnel: false,
2342 tunnel_id: None,
2343 mtu: constants::MTU as u32,
2344 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2345 ia_freq: 0.0,
2346 started: 0.0,
2347 }
2348 }
2349
2350 fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
2351 AnnounceEntry {
2352 timestamp,
2353 retransmit_timeout: timestamp,
2354 retries: 0,
2355 received_from: [0xAA; 16],
2356 hops: 2,
2357 packet_raw: vec![0x01; fill_len],
2358 packet_data: vec![0x02; fill_len],
2359 destination_hash: dest_hash,
2360 context_flag: 0,
2361 local_rebroadcasts: 0,
2362 block_rebroadcasts: false,
2363 attached_interface: None,
2364 }
2365 }
2366
2367 fn make_path_entry(
2368 timestamp: f64,
2369 hops: u8,
2370 receiving_interface: InterfaceId,
2371 next_hop: [u8; 16],
2372 ) -> PathEntry {
2373 PathEntry {
2374 timestamp,
2375 next_hop,
2376 hops,
2377 expires: timestamp + 10_000.0,
2378 random_blobs: Vec::new(),
2379 receiving_interface,
2380 packet_hash: [0; 32],
2381 announce_raw: None,
2382 }
2383 }
2384
2385 fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
2386 let mut unique_tag = [0u8; 32];
2387 let tag_len = tag.len().min(16);
2388 unique_tag[..16].copy_from_slice(&dest_hash);
2389 unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
2390 unique_tag
2391 }
2392
2393 fn make_random_blob(timebase: u64) -> [u8; 10] {
2394 let mut blob = [0u8; 10];
2395 let bytes = timebase.to_be_bytes();
2396 blob[5..10].copy_from_slice(&bytes[3..8]);
2397 blob
2398 }
2399
2400 #[test]
2401 fn test_empty_engine() {
2402 let engine = TransportEngine::new(make_config(false));
2403 assert!(!engine.has_path(&[0; 16]));
2404 assert!(engine.hops_to(&[0; 16]).is_none());
2405 assert!(engine.next_hop(&[0; 16]).is_none());
2406 }
2407
2408 #[test]
2409 fn test_register_deregister_interface() {
2410 let mut engine = TransportEngine::new(make_config(false));
2411 engine.register_interface(make_interface(1, constants::MODE_FULL));
2412 assert!(engine.interfaces.contains_key(&InterfaceId(1)));
2413
2414 engine.deregister_interface(InterfaceId(1));
2415 assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
2416 }
2417
2418 #[test]
2419 fn test_deregister_interface_removes_announce_queue_state() {
2420 let mut engine = TransportEngine::new(make_config(false));
2421 engine.register_interface(make_interface(1, constants::MODE_FULL));
2422
2423 let _ = engine.announce_queues.gate_announce(
2424 InterfaceId(1),
2425 vec![0x01; 100],
2426 [0xAA; 16],
2427 2,
2428 0.0,
2429 0.0,
2430 Some(1000),
2431 constants::ANNOUNCE_CAP,
2432 );
2433 let _ = engine.announce_queues.gate_announce(
2434 InterfaceId(1),
2435 vec![0x02; 100],
2436 [0xBB; 16],
2437 3,
2438 0.0,
2439 0.0,
2440 Some(1000),
2441 constants::ANNOUNCE_CAP,
2442 );
2443 assert_eq!(engine.announce_queue_count(), 1);
2444
2445 engine.deregister_interface(InterfaceId(1));
2446 assert_eq!(engine.announce_queue_count(), 0);
2447 }
2448
2449 #[test]
2450 fn test_deregister_interface_preserves_other_announce_queues() {
2451 let mut engine = TransportEngine::new(make_config(false));
2452 engine.register_interface(make_interface(1, constants::MODE_FULL));
2453 engine.register_interface(make_interface(2, constants::MODE_FULL));
2454
2455 let _ = engine.announce_queues.gate_announce(
2456 InterfaceId(1),
2457 vec![0x01; 100],
2458 [0xAA; 16],
2459 2,
2460 0.0,
2461 0.0,
2462 Some(1000),
2463 constants::ANNOUNCE_CAP,
2464 );
2465 let _ = engine.announce_queues.gate_announce(
2466 InterfaceId(1),
2467 vec![0x02; 100],
2468 [0xAB; 16],
2469 3,
2470 0.0,
2471 0.0,
2472 Some(1000),
2473 constants::ANNOUNCE_CAP,
2474 );
2475 let _ = engine.announce_queues.gate_announce(
2476 InterfaceId(2),
2477 vec![0x03; 100],
2478 [0xBA; 16],
2479 2,
2480 0.0,
2481 0.0,
2482 Some(1000),
2483 constants::ANNOUNCE_CAP,
2484 );
2485 let _ = engine.announce_queues.gate_announce(
2486 InterfaceId(2),
2487 vec![0x04; 100],
2488 [0xBB; 16],
2489 3,
2490 0.0,
2491 0.0,
2492 Some(1000),
2493 constants::ANNOUNCE_CAP,
2494 );
2495
2496 engine.deregister_interface(InterfaceId(1));
2497 assert_eq!(engine.announce_queue_count(), 1);
2498 assert_eq!(engine.nonempty_announce_queue_count(), 1);
2499 }
2500
2501 #[test]
2502 fn test_register_deregister_destination() {
2503 let mut engine = TransportEngine::new(make_config(false));
2504 let dest = [0x11; 16];
2505 engine.register_destination(dest, constants::DESTINATION_SINGLE);
2506 assert!(engine.local_destinations.contains_key(&dest));
2507
2508 engine.deregister_destination(&dest);
2509 assert!(!engine.local_destinations.contains_key(&dest));
2510 }
2511
2512 #[test]
2513 fn test_path_state() {
2514 let mut engine = TransportEngine::new(make_config(false));
2515 let dest = [0x22; 16];
2516
2517 assert!(!engine.path_is_unresponsive(&dest));
2518
2519 engine.mark_path_unresponsive(&dest, None);
2520 assert!(engine.path_is_unresponsive(&dest));
2521
2522 engine.mark_path_responsive(&dest);
2523 assert!(!engine.path_is_unresponsive(&dest));
2524 }
2525
2526 #[test]
2527 fn test_announce_clears_stale_path_state_for_unknown_destination() {
2528 use crate::announce::AnnounceData;
2529 use crate::destination::{destination_hash, name_hash};
2530
2531 let mut engine = TransportEngine::new(make_config(false));
2532 engine.register_interface(make_interface(1, constants::MODE_FULL));
2533
2534 let identity =
2535 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x61; 32]));
2536 let dest_hash = destination_hash("pathfix", &["announce"], Some(identity.hash()));
2537 let name_h = name_hash("pathfix", &["announce"]);
2538 let random_hash = [0x24u8; 10];
2539
2540 let (announce_data, _) =
2541 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2542
2543 let packet = RawPacket::pack(
2544 PacketFlags {
2545 header_type: constants::HEADER_1,
2546 context_flag: constants::FLAG_UNSET,
2547 transport_type: constants::TRANSPORT_BROADCAST,
2548 destination_type: constants::DESTINATION_SINGLE,
2549 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2550 },
2551 0,
2552 &dest_hash,
2553 None,
2554 constants::CONTEXT_NONE,
2555 &announce_data,
2556 )
2557 .unwrap();
2558
2559 engine.mark_path_unresponsive(&dest_hash, None);
2560 assert!(engine.path_is_unresponsive(&dest_hash));
2561 assert!(!engine.has_path(&dest_hash));
2562
2563 let mut rng = rns_crypto::FixedRng::new(&[0x62; 32]);
2564 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2565
2566 assert!(engine.has_path(&dest_hash));
2567 assert!(
2568 !engine.path_is_unresponsive(&dest_hash),
2569 "stale path state should be cleared for newly installed paths"
2570 );
2571 assert!(actions.iter().any(|action| matches!(
2572 action,
2573 TransportAction::PathUpdated {
2574 destination_hash,
2575 interface,
2576 ..
2577 } if *destination_hash == dest_hash && *interface == InterfaceId(1)
2578 )));
2579 }
2580
2581 #[test]
2582 fn test_boundary_exempts_unresponsive() {
2583 let mut engine = TransportEngine::new(make_config(false));
2584 engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2585 let dest = [0xB1; 16];
2586
2587 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2589 assert!(!engine.path_is_unresponsive(&dest));
2590 }
2591
2592 #[test]
2593 fn test_non_boundary_marks_unresponsive() {
2594 let mut engine = TransportEngine::new(make_config(false));
2595 engine.register_interface(make_interface(1, constants::MODE_FULL));
2596 let dest = [0xB2; 16];
2597
2598 engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2600 assert!(engine.path_is_unresponsive(&dest));
2601 }
2602
2603 #[test]
2604 fn test_expire_path() {
2605 let mut engine = TransportEngine::new(make_config(false));
2606 let dest = [0x33; 16];
2607
2608 engine.path_table.insert(
2609 dest,
2610 PathSet::from_single(
2611 PathEntry {
2612 timestamp: 1000.0,
2613 next_hop: [0; 16],
2614 hops: 2,
2615 expires: 9999.0,
2616 random_blobs: Vec::new(),
2617 receiving_interface: InterfaceId(1),
2618 packet_hash: [0; 32],
2619 announce_raw: None,
2620 },
2621 1,
2622 ),
2623 );
2624
2625 assert!(engine.has_path(&dest));
2626 engine.expire_path(&dest);
2627 assert!(engine.has_path(&dest));
2629 assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2630 }
2631
2632 #[test]
2633 fn test_link_table_operations() {
2634 let mut engine = TransportEngine::new(make_config(false));
2635 let link_id = [0x44; 16];
2636
2637 engine.register_link(
2638 link_id,
2639 LinkEntry {
2640 timestamp: 100.0,
2641 next_hop_transport_id: [0; 16],
2642 next_hop_interface: InterfaceId(1),
2643 remaining_hops: 3,
2644 received_interface: InterfaceId(2),
2645 taken_hops: 2,
2646 destination_hash: [0xAA; 16],
2647 validated: false,
2648 proof_timeout: 200.0,
2649 },
2650 );
2651
2652 assert!(engine.link_table.contains_key(&link_id));
2653 assert!(!engine.link_table[&link_id].validated);
2654
2655 engine.validate_link(&link_id);
2656 assert!(engine.link_table[&link_id].validated);
2657
2658 engine.remove_link(&link_id);
2659 assert!(!engine.link_table.contains_key(&link_id));
2660 }
2661
2662 #[test]
2663 fn test_lrproof_routes_from_originating_side_via_link_table() {
2664 let mut engine = TransportEngine::new(make_config(true));
2665 engine.register_interface(make_interface(1, constants::MODE_FULL));
2666 engine.register_interface(make_interface(2, constants::MODE_FULL));
2667
2668 let link_id = [0x44; 16];
2669 engine.register_link(
2670 link_id,
2671 LinkEntry {
2672 timestamp: 100.0,
2673 next_hop_transport_id: [0xAA; 16],
2674 next_hop_interface: InterfaceId(2),
2675 remaining_hops: 3,
2676 received_interface: InterfaceId(1),
2677 taken_hops: 1,
2678 destination_hash: [0xBB; 16],
2679 validated: false,
2680 proof_timeout: 200.0,
2681 },
2682 );
2683
2684 let flags = PacketFlags {
2685 header_type: constants::HEADER_1,
2686 context_flag: constants::FLAG_UNSET,
2687 transport_type: constants::TRANSPORT_BROADCAST,
2688 destination_type: constants::DESTINATION_LINK,
2689 packet_type: constants::PACKET_TYPE_PROOF,
2690 };
2691 let packet = RawPacket::pack(
2692 flags,
2693 0,
2694 &link_id,
2695 None,
2696 constants::CONTEXT_LRPROOF,
2697 &[0xCC; 64],
2698 )
2699 .unwrap();
2700 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2701
2702 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 101.0, &mut rng);
2703
2704 assert!(matches!(
2705 engine
2706 .link_table_ref()
2707 .get(&link_id)
2708 .map(|entry| entry.validated),
2709 Some(true)
2710 ));
2711 assert!(actions.iter().any(|action| matches!(
2712 action,
2713 TransportAction::LinkEstablished {
2714 link_id: established,
2715 interface: InterfaceId(2),
2716 } if *established == link_id
2717 )));
2718 assert!(actions.iter().any(|action| matches!(
2719 action,
2720 TransportAction::SendOnInterface {
2721 interface: InterfaceId(2),
2722 ..
2723 }
2724 )));
2725 }
2726
2727 #[test]
2728 fn test_packet_filter_drops_plain_announce() {
2729 let engine = TransportEngine::new(make_config(false));
2730 let flags = PacketFlags {
2731 header_type: constants::HEADER_1,
2732 context_flag: constants::FLAG_UNSET,
2733 transport_type: constants::TRANSPORT_BROADCAST,
2734 destination_type: constants::DESTINATION_PLAIN,
2735 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2736 };
2737 let packet =
2738 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2739 assert!(!engine.packet_filter(&packet));
2740 }
2741
2742 #[test]
2743 fn test_packet_filter_allows_keepalive() {
2744 let engine = TransportEngine::new(make_config(false));
2745 let flags = PacketFlags {
2746 header_type: constants::HEADER_1,
2747 context_flag: constants::FLAG_UNSET,
2748 transport_type: constants::TRANSPORT_BROADCAST,
2749 destination_type: constants::DESTINATION_SINGLE,
2750 packet_type: constants::PACKET_TYPE_DATA,
2751 };
2752 let packet = RawPacket::pack(
2753 flags,
2754 0,
2755 &[0; 16],
2756 None,
2757 constants::CONTEXT_KEEPALIVE,
2758 b"test",
2759 )
2760 .unwrap();
2761 assert!(engine.packet_filter(&packet));
2762 }
2763
2764 #[test]
2765 fn test_packet_filter_drops_high_hop_plain() {
2766 let engine = TransportEngine::new(make_config(false));
2767 let flags = PacketFlags {
2768 header_type: constants::HEADER_1,
2769 context_flag: constants::FLAG_UNSET,
2770 transport_type: constants::TRANSPORT_BROADCAST,
2771 destination_type: constants::DESTINATION_PLAIN,
2772 packet_type: constants::PACKET_TYPE_DATA,
2773 };
2774 let mut packet =
2775 RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2776 packet.hops = 2;
2777 assert!(!engine.packet_filter(&packet));
2778 }
2779
2780 #[test]
2781 fn test_packet_filter_allows_duplicate_single_announce() {
2782 let mut engine = TransportEngine::new(make_config(false));
2783 let flags = PacketFlags {
2784 header_type: constants::HEADER_1,
2785 context_flag: constants::FLAG_UNSET,
2786 transport_type: constants::TRANSPORT_BROADCAST,
2787 destination_type: constants::DESTINATION_SINGLE,
2788 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2789 };
2790 let packet = RawPacket::pack(
2791 flags,
2792 0,
2793 &[0; 16],
2794 None,
2795 constants::CONTEXT_NONE,
2796 &[0xAA; 64],
2797 )
2798 .unwrap();
2799
2800 engine.packet_hashlist.add(packet.packet_hash);
2802
2803 assert!(engine.packet_filter(&packet));
2805 }
2806
2807 #[test]
2808 fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2809 let mut engine = TransportEngine::new(make_config(false));
2810 engine.packet_hashlist = PacketHashlist::new(2);
2811
2812 let make_packet = |seed: u8| {
2813 let flags = PacketFlags {
2814 header_type: constants::HEADER_1,
2815 context_flag: constants::FLAG_UNSET,
2816 transport_type: constants::TRANSPORT_BROADCAST,
2817 destination_type: constants::DESTINATION_SINGLE,
2818 packet_type: constants::PACKET_TYPE_DATA,
2819 };
2820 RawPacket::pack(
2821 flags,
2822 0,
2823 &[seed; 16],
2824 None,
2825 constants::CONTEXT_NONE,
2826 &[seed; 4],
2827 )
2828 .unwrap()
2829 };
2830
2831 let packet1 = make_packet(1);
2832 let packet2 = make_packet(2);
2833 let packet3 = make_packet(3);
2834
2835 engine.packet_hashlist.add(packet1.packet_hash);
2836 engine.packet_hashlist.add(packet2.packet_hash);
2837 assert!(!engine.packet_filter(&packet1));
2838
2839 engine.packet_hashlist.add(packet3.packet_hash);
2840
2841 assert!(engine.packet_filter(&packet1));
2842 assert!(!engine.packet_filter(&packet2));
2843 assert!(!engine.packet_filter(&packet3));
2844 }
2845
2846 #[test]
2847 fn test_packet_filter_duplicate_does_not_refresh_recency() {
2848 let mut engine = TransportEngine::new(make_config(false));
2849 engine.packet_hashlist = PacketHashlist::new(2);
2850
2851 let make_packet = |seed: u8| {
2852 let flags = PacketFlags {
2853 header_type: constants::HEADER_1,
2854 context_flag: constants::FLAG_UNSET,
2855 transport_type: constants::TRANSPORT_BROADCAST,
2856 destination_type: constants::DESTINATION_SINGLE,
2857 packet_type: constants::PACKET_TYPE_DATA,
2858 };
2859 RawPacket::pack(
2860 flags,
2861 0,
2862 &[seed; 16],
2863 None,
2864 constants::CONTEXT_NONE,
2865 &[seed; 4],
2866 )
2867 .unwrap()
2868 };
2869
2870 let packet1 = make_packet(1);
2871 let packet2 = make_packet(2);
2872 let packet3 = make_packet(3);
2873
2874 engine.packet_hashlist.add(packet1.packet_hash);
2875 engine.packet_hashlist.add(packet2.packet_hash);
2876 engine.packet_hashlist.add(packet2.packet_hash);
2877 engine.packet_hashlist.add(packet3.packet_hash);
2878
2879 assert!(engine.packet_filter(&packet1));
2880 assert!(!engine.packet_filter(&packet2));
2881 assert!(!engine.packet_filter(&packet3));
2882 }
2883
2884 #[test]
2885 fn test_tick_retransmits_announce() {
2886 let mut engine = TransportEngine::new(make_config(true));
2887 engine.register_interface(make_interface(1, constants::MODE_FULL));
2888
2889 let dest = [0x55; 16];
2890 engine.insert_announce_entry(
2891 dest,
2892 AnnounceEntry {
2893 timestamp: 190.0,
2894 retransmit_timeout: 100.0, retries: 0,
2896 received_from: [0xAA; 16],
2897 hops: 2,
2898 packet_raw: vec![0x01, 0x02],
2899 packet_data: vec![0xCC; 10],
2900 destination_hash: dest,
2901 context_flag: 0,
2902 local_rebroadcasts: 0,
2903 block_rebroadcasts: false,
2904 attached_interface: None,
2905 },
2906 190.0,
2907 );
2908
2909 let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2910 let actions = engine.tick(200.0, &mut rng);
2911
2912 assert!(!actions.is_empty());
2915 assert!(matches!(
2916 &actions[0],
2917 TransportAction::SendOnInterface { .. }
2918 ));
2919
2920 assert_eq!(engine.announce_table[&dest].retries, 1);
2922 }
2923
2924 #[test]
2925 fn test_tick_culls_expired_announce_entries() {
2926 let mut config = make_config(true);
2927 config.announce_table_ttl_secs = 10.0;
2928 let mut engine = TransportEngine::new(config);
2929
2930 let dest1 = [0x61; 16];
2931 let dest2 = [0x62; 16];
2932 assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2933 assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2934
2935 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2936 let _ = engine.tick(111.0, &mut rng);
2937
2938 assert!(!engine.announce_table().contains_key(&dest1));
2939 assert!(!engine.held_announces().contains_key(&dest2));
2940 }
2941
2942 #[test]
2943 fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2944 let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2945 let mut config = make_config(true);
2946 config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2947 * 2
2948 + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2949 let max_bytes = config.announce_table_max_bytes;
2950 let mut engine = TransportEngine::new(config);
2951
2952 let held_dest = [0x71; 16];
2953 let active_dest = [0x72; 16];
2954 let newest_dest = [0x73; 16];
2955
2956 assert!(engine.insert_held_announce(
2957 held_dest,
2958 make_announce_entry(held_dest, 100.0, 32),
2959 100.0,
2960 ));
2961 assert!(engine.insert_announce_entry(
2962 active_dest,
2963 make_announce_entry(active_dest, 100.0, 32),
2964 100.0,
2965 ));
2966 assert!(engine.insert_announce_entry(
2967 newest_dest,
2968 make_announce_entry(newest_dest, 101.0, 32),
2969 101.0,
2970 ));
2971
2972 assert!(!engine.held_announces().contains_key(&held_dest));
2973 assert!(engine.announce_table().contains_key(&active_dest));
2974 assert!(engine.announce_table().contains_key(&newest_dest));
2975 assert!(engine.announce_retained_bytes() <= max_bytes);
2976 }
2977
2978 #[test]
2979 fn test_oversized_announce_entry_is_not_retained() {
2980 let mut config = make_config(true);
2981 config.announce_table_max_bytes = 200;
2982 let mut engine = TransportEngine::new(config);
2983 let dest = [0x81; 16];
2984
2985 assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2986 assert!(!engine.announce_table().contains_key(&dest));
2987 assert_eq!(engine.announce_retained_bytes(), 0);
2988 }
2989
2990 #[test]
2991 fn test_blackhole_identity() {
2992 let mut engine = TransportEngine::new(make_config(false));
2993 let hash = [0xAA; 16];
2994 let now = 1000.0;
2995
2996 assert!(!engine.is_blackholed(&hash, now));
2997
2998 engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2999 assert!(engine.is_blackholed(&hash, now));
3000 assert!(engine.is_blackholed(&hash, now + 999999.0)); assert!(engine.unblackhole_identity(&hash));
3003 assert!(!engine.is_blackholed(&hash, now));
3004 assert!(!engine.unblackhole_identity(&hash)); }
3006
3007 #[test]
3008 fn test_blackhole_with_duration() {
3009 let mut engine = TransportEngine::new(make_config(false));
3010 let hash = [0xBB; 16];
3011 let now = 1000.0;
3012
3013 engine.blackhole_identity(hash, now, Some(1.0), None); assert!(engine.is_blackholed(&hash, now));
3015 assert!(engine.is_blackholed(&hash, now + 3599.0)); assert!(!engine.is_blackholed(&hash, now + 3601.0)); }
3018
3019 #[test]
3020 fn test_cull_blackholed() {
3021 let mut engine = TransportEngine::new(make_config(false));
3022 let hash1 = [0xCC; 16];
3023 let hash2 = [0xDD; 16];
3024 let now = 1000.0;
3025
3026 engine.blackhole_identity(hash1, now, Some(1.0), None); engine.blackhole_identity(hash2, now, None, None); engine.cull_blackholed(now + 4000.0); assert!(!engine.blackholed_identities.contains_key(&hash1));
3032 assert!(engine.blackholed_identities.contains_key(&hash2));
3033 }
3034
3035 #[test]
3036 fn test_blackhole_blocks_announce() {
3037 use crate::announce::AnnounceData;
3038 use crate::destination::{destination_hash, name_hash};
3039
3040 let mut engine = TransportEngine::new(make_config(false));
3041 engine.register_interface(make_interface(1, constants::MODE_FULL));
3042
3043 let identity =
3044 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
3045 let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
3046 let name_h = name_hash("test", &["app"]);
3047 let random_hash = [0x42u8; 10];
3048
3049 let (announce_data, _) =
3050 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3051
3052 let flags = PacketFlags {
3053 header_type: constants::HEADER_1,
3054 context_flag: constants::FLAG_UNSET,
3055 transport_type: constants::TRANSPORT_BROADCAST,
3056 destination_type: constants::DESTINATION_SINGLE,
3057 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3058 };
3059 let packet = RawPacket::pack(
3060 flags,
3061 0,
3062 &dest_hash,
3063 None,
3064 constants::CONTEXT_NONE,
3065 &announce_data,
3066 )
3067 .unwrap();
3068
3069 let now = 1000.0;
3071 engine.blackhole_identity(*identity.hash(), now, None, None);
3072
3073 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3074 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
3075
3076 assert!(actions
3078 .iter()
3079 .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
3080 assert!(actions
3081 .iter()
3082 .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
3083 }
3084
3085 #[test]
3086 fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
3087 use crate::announce::AnnounceData;
3088 use crate::destination::{destination_hash, name_hash};
3089 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
3090
3091 let mut engine = TransportEngine::new(make_config(true));
3092 engine.register_interface(make_interface(1, constants::MODE_FULL));
3093
3094 let identity =
3095 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
3096 let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
3097 let name_h = name_hash("async", &["announce"]);
3098 let random_hash = [0x44u8; 10];
3099 let (announce_data, _) =
3100 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3101
3102 let packet = RawPacket::pack(
3103 PacketFlags {
3104 header_type: constants::HEADER_2,
3105 context_flag: constants::FLAG_UNSET,
3106 transport_type: constants::TRANSPORT_TRANSPORT,
3107 destination_type: constants::DESTINATION_SINGLE,
3108 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3109 },
3110 3,
3111 &dest_hash,
3112 Some(&[0xBB; 16]),
3113 constants::CONTEXT_NONE,
3114 &announce_data,
3115 )
3116 .unwrap();
3117
3118 engine.announce_table.insert(
3119 dest_hash,
3120 AnnounceEntry {
3121 timestamp: 1000.0,
3122 retransmit_timeout: 2000.0,
3123 retries: constants::PATHFINDER_R,
3124 received_from: [0xBB; 16],
3125 hops: 2,
3126 packet_raw: packet.raw.clone(),
3127 packet_data: packet.data.clone(),
3128 destination_hash: dest_hash,
3129 context_flag: constants::FLAG_UNSET,
3130 local_rebroadcasts: 0,
3131 block_rebroadcasts: false,
3132 attached_interface: None,
3133 },
3134 );
3135
3136 let mut queue = AnnounceVerifyQueue::new(8);
3137 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3138 let actions = engine.handle_inbound_with_announce_queue(
3139 &packet.raw,
3140 InterfaceId(1),
3141 1000.0,
3142 &mut rng,
3143 Some(&mut queue),
3144 );
3145
3146 assert!(actions.is_empty());
3147 assert_eq!(queue.len(), 1);
3148 assert!(
3149 !engine.announce_table.contains_key(&dest_hash),
3150 "retransmit completion should clear announce_table before queueing"
3151 );
3152 }
3153
3154 #[test]
3155 fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
3156 use crate::announce::AnnounceData;
3157 use crate::destination::{destination_hash, name_hash};
3158 use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
3159
3160 let mut engine = TransportEngine::new(make_config(false));
3161 engine.register_interface(make_interface(1, constants::MODE_FULL));
3162
3163 let identity =
3164 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
3165 let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
3166 let name_h = name_hash("async", &["cache"]);
3167 let random_hash = [0x55u8; 10];
3168 let (announce_data, _) =
3169 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3170
3171 let packet = RawPacket::pack(
3172 PacketFlags {
3173 header_type: constants::HEADER_1,
3174 context_flag: constants::FLAG_UNSET,
3175 transport_type: constants::TRANSPORT_BROADCAST,
3176 destination_type: constants::DESTINATION_SINGLE,
3177 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3178 },
3179 0,
3180 &dest_hash,
3181 None,
3182 constants::CONTEXT_NONE,
3183 &announce_data,
3184 )
3185 .unwrap();
3186
3187 let mut queue = AnnounceVerifyQueue::new(8);
3188 let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
3189 let actions = engine.handle_inbound_with_announce_queue(
3190 &packet.raw,
3191 InterfaceId(1),
3192 1000.0,
3193 &mut rng,
3194 Some(&mut queue),
3195 );
3196 assert!(actions.is_empty());
3197 assert_eq!(queue.len(), 1);
3198
3199 let mut batch = queue.take_pending(1000.0);
3200 assert_eq!(batch.len(), 1);
3201 let (key, pending) = batch.pop().unwrap();
3202
3203 let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
3204 let validated = announce.validate(&pending.packet.destination_hash).unwrap();
3205 let mut material = [0u8; 80];
3206 material[..16].copy_from_slice(&pending.packet.destination_hash);
3207 material[16..].copy_from_slice(&announce.signature);
3208 let sig_cache_key = hash::full_hash(&material);
3209
3210 let pending = queue.complete_success(&key).unwrap();
3211 let actions =
3212 engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
3213 assert!(actions
3214 .iter()
3215 .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
3216 assert!(engine.announce_sig_cache_contains(&sig_cache_key));
3217
3218 let actions = engine.handle_inbound_with_announce_queue(
3219 &packet.raw,
3220 InterfaceId(1),
3221 1001.0,
3222 &mut rng,
3223 Some(&mut queue),
3224 );
3225 assert!(actions.is_empty());
3226 assert_eq!(queue.len(), 0);
3227 }
3228
3229 #[test]
3230 fn test_tick_culls_expired_path() {
3231 let mut engine = TransportEngine::new(make_config(false));
3232 engine.register_interface(make_interface(1, constants::MODE_FULL));
3233
3234 let dest = [0x66; 16];
3235 engine.path_table.insert(
3236 dest,
3237 PathSet::from_single(
3238 PathEntry {
3239 timestamp: 100.0,
3240 next_hop: [0; 16],
3241 hops: 2,
3242 expires: 200.0,
3243 random_blobs: Vec::new(),
3244 receiving_interface: InterfaceId(1),
3245 packet_hash: [0; 32],
3246 announce_raw: None,
3247 },
3248 1,
3249 ),
3250 );
3251
3252 assert!(engine.has_path(&dest));
3253
3254 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3255 engine.tick(300.0, &mut rng);
3257
3258 assert!(!engine.has_path(&dest));
3259 }
3260
3261 fn make_local_client_interface(id: u64) -> InterfaceInfo {
3266 InterfaceInfo {
3267 id: InterfaceId(id),
3268 name: String::from("local_client"),
3269 mode: constants::MODE_FULL,
3270 out_capable: true,
3271 in_capable: true,
3272 bitrate: None,
3273 announce_rate_target: None,
3274 announce_rate_grace: 0,
3275 announce_rate_penalty: 0.0,
3276 announce_cap: constants::ANNOUNCE_CAP,
3277 is_local_client: true,
3278 wants_tunnel: false,
3279 tunnel_id: None,
3280 mtu: constants::MTU as u32,
3281 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3282 ia_freq: 0.0,
3283 started: 0.0,
3284 }
3285 }
3286
3287 #[test]
3288 fn test_has_local_clients() {
3289 let mut engine = TransportEngine::new(make_config(false));
3290 assert!(!engine.has_local_clients());
3291
3292 engine.register_interface(make_interface(1, constants::MODE_FULL));
3293 assert!(!engine.has_local_clients());
3294
3295 engine.register_interface(make_local_client_interface(2));
3296 assert!(engine.has_local_clients());
3297
3298 engine.deregister_interface(InterfaceId(2));
3299 assert!(!engine.has_local_clients());
3300 }
3301
3302 #[test]
3303 fn test_local_client_hop_decrement() {
3304 let mut engine = TransportEngine::new(make_config(false));
3307 engine.register_interface(make_local_client_interface(1));
3308 engine.register_interface(make_interface(2, constants::MODE_FULL));
3309
3310 let dest = [0xAA; 16];
3312 engine.register_destination(dest, constants::DESTINATION_PLAIN);
3313
3314 let flags = PacketFlags {
3315 header_type: constants::HEADER_1,
3316 context_flag: constants::FLAG_UNSET,
3317 transport_type: constants::TRANSPORT_BROADCAST,
3318 destination_type: constants::DESTINATION_PLAIN,
3319 packet_type: constants::PACKET_TYPE_DATA,
3320 };
3321 let packet =
3323 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3324
3325 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3326 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3327
3328 let deliver = actions
3331 .iter()
3332 .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
3333 assert!(deliver.is_some(), "Should deliver locally");
3334 }
3335
3336 #[test]
3337 fn test_plain_broadcast_from_local_client() {
3338 let mut engine = TransportEngine::new(make_config(false));
3340 engine.register_interface(make_local_client_interface(1));
3341 engine.register_interface(make_interface(2, constants::MODE_FULL));
3342
3343 let dest = [0xBB; 16];
3344 let flags = PacketFlags {
3345 header_type: constants::HEADER_1,
3346 context_flag: constants::FLAG_UNSET,
3347 transport_type: constants::TRANSPORT_BROADCAST,
3348 destination_type: constants::DESTINATION_PLAIN,
3349 packet_type: constants::PACKET_TYPE_DATA,
3350 };
3351 let packet =
3352 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3353
3354 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3355 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3356
3357 let forward = actions.iter().find(|a| {
3359 matches!(
3360 a,
3361 TransportAction::ForwardPlainBroadcast {
3362 to_local: false,
3363 ..
3364 }
3365 )
3366 });
3367 assert!(forward.is_some(), "Should forward to external interfaces");
3368 }
3369
3370 #[test]
3371 fn test_plain_broadcast_from_external() {
3372 let mut engine = TransportEngine::new(make_config(false));
3374 engine.register_interface(make_local_client_interface(1));
3375 engine.register_interface(make_interface(2, constants::MODE_FULL));
3376
3377 let dest = [0xCC; 16];
3378 let flags = PacketFlags {
3379 header_type: constants::HEADER_1,
3380 context_flag: constants::FLAG_UNSET,
3381 transport_type: constants::TRANSPORT_BROADCAST,
3382 destination_type: constants::DESTINATION_PLAIN,
3383 packet_type: constants::PACKET_TYPE_DATA,
3384 };
3385 let packet =
3386 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3387
3388 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3389 let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
3390
3391 let forward = actions.iter().find(|a| {
3393 matches!(
3394 a,
3395 TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3396 )
3397 });
3398 assert!(forward.is_some(), "Should forward to local clients");
3399 }
3400
3401 #[test]
3402 fn test_no_plain_broadcast_bridging_without_local_clients() {
3403 let mut engine = TransportEngine::new(make_config(false));
3405 engine.register_interface(make_interface(1, constants::MODE_FULL));
3406 engine.register_interface(make_interface(2, constants::MODE_FULL));
3407
3408 let dest = [0xDD; 16];
3409 let flags = PacketFlags {
3410 header_type: constants::HEADER_1,
3411 context_flag: constants::FLAG_UNSET,
3412 transport_type: constants::TRANSPORT_BROADCAST,
3413 destination_type: constants::DESTINATION_PLAIN,
3414 packet_type: constants::PACKET_TYPE_DATA,
3415 };
3416 let packet =
3417 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3418
3419 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3420 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3421
3422 let has_forward = actions
3424 .iter()
3425 .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3426 assert!(!has_forward, "No bridging without local clients");
3427 }
3428
3429 #[test]
3430 fn test_announce_forwarded_to_local_clients() {
3431 use crate::announce::AnnounceData;
3432 use crate::destination::{destination_hash, name_hash};
3433
3434 let mut engine = TransportEngine::new(make_config(false));
3435 engine.register_interface(make_interface(1, constants::MODE_FULL));
3436 engine.register_interface(make_local_client_interface(2));
3437
3438 let identity =
3439 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3440 let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3441 let name_h = name_hash("test", &["fwd"]);
3442 let random_hash = [0x42u8; 10];
3443
3444 let (announce_data, _) =
3445 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3446
3447 let flags = PacketFlags {
3448 header_type: constants::HEADER_1,
3449 context_flag: constants::FLAG_UNSET,
3450 transport_type: constants::TRANSPORT_BROADCAST,
3451 destination_type: constants::DESTINATION_SINGLE,
3452 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3453 };
3454 let packet = RawPacket::pack(
3455 flags,
3456 0,
3457 &dest_hash,
3458 None,
3459 constants::CONTEXT_NONE,
3460 &announce_data,
3461 )
3462 .unwrap();
3463
3464 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3465 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3466
3467 let forward = actions
3469 .iter()
3470 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3471 assert!(
3472 forward.is_some(),
3473 "Should forward announce to local clients"
3474 );
3475
3476 match forward.unwrap() {
3478 TransportAction::ForwardToLocalClients { exclude, .. } => {
3479 assert_eq!(*exclude, Some(InterfaceId(1)));
3480 }
3481 _ => unreachable!(),
3482 }
3483 }
3484
3485 #[test]
3486 fn test_no_announce_forward_without_local_clients() {
3487 use crate::announce::AnnounceData;
3488 use crate::destination::{destination_hash, name_hash};
3489
3490 let mut engine = TransportEngine::new(make_config(false));
3491 engine.register_interface(make_interface(1, constants::MODE_FULL));
3492
3493 let identity =
3494 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3495 let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3496 let name_h = name_hash("test", &["nofwd"]);
3497 let random_hash = [0x42u8; 10];
3498
3499 let (announce_data, _) =
3500 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3501
3502 let flags = PacketFlags {
3503 header_type: constants::HEADER_1,
3504 context_flag: constants::FLAG_UNSET,
3505 transport_type: constants::TRANSPORT_BROADCAST,
3506 destination_type: constants::DESTINATION_SINGLE,
3507 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3508 };
3509 let packet = RawPacket::pack(
3510 flags,
3511 0,
3512 &dest_hash,
3513 None,
3514 constants::CONTEXT_NONE,
3515 &announce_data,
3516 )
3517 .unwrap();
3518
3519 let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3520 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3521
3522 let has_forward = actions
3524 .iter()
3525 .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3526 assert!(!has_forward, "No forward without local clients");
3527 }
3528
3529 #[test]
3530 fn test_local_client_exclude_from_forward() {
3531 use crate::announce::AnnounceData;
3532 use crate::destination::{destination_hash, name_hash};
3533
3534 let mut engine = TransportEngine::new(make_config(false));
3535 engine.register_interface(make_local_client_interface(1));
3536 engine.register_interface(make_local_client_interface(2));
3537
3538 let identity =
3539 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3540 let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3541 let name_h = name_hash("test", &["excl"]);
3542 let random_hash = [0x42u8; 10];
3543
3544 let (announce_data, _) =
3545 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3546
3547 let flags = PacketFlags {
3548 header_type: constants::HEADER_1,
3549 context_flag: constants::FLAG_UNSET,
3550 transport_type: constants::TRANSPORT_BROADCAST,
3551 destination_type: constants::DESTINATION_SINGLE,
3552 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3553 };
3554 let packet = RawPacket::pack(
3555 flags,
3556 0,
3557 &dest_hash,
3558 None,
3559 constants::CONTEXT_NONE,
3560 &announce_data,
3561 )
3562 .unwrap();
3563
3564 let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3565 let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3567
3568 let forward = actions
3570 .iter()
3571 .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3572 assert!(forward.is_some());
3573 match forward.unwrap() {
3574 TransportAction::ForwardToLocalClients { exclude, .. } => {
3575 assert_eq!(*exclude, Some(InterfaceId(1)));
3576 }
3577 _ => unreachable!(),
3578 }
3579 }
3580
3581 fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3586 InterfaceInfo {
3587 id: InterfaceId(id),
3588 name: String::from("tunnel_iface"),
3589 mode: constants::MODE_FULL,
3590 out_capable: true,
3591 in_capable: true,
3592 bitrate: None,
3593 announce_rate_target: None,
3594 announce_rate_grace: 0,
3595 announce_rate_penalty: 0.0,
3596 announce_cap: constants::ANNOUNCE_CAP,
3597 is_local_client: false,
3598 wants_tunnel: true,
3599 tunnel_id: None,
3600 mtu: constants::MTU as u32,
3601 ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3602 ia_freq: 0.0,
3603 started: 0.0,
3604 }
3605 }
3606
3607 #[test]
3608 fn test_handle_tunnel_new() {
3609 let mut engine = TransportEngine::new(make_config(true));
3610 engine.register_interface(make_tunnel_interface(1));
3611
3612 let tunnel_id = [0xAA; 32];
3613 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3614
3615 assert!(actions
3617 .iter()
3618 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3619
3620 let info = engine.interface_info(&InterfaceId(1)).unwrap();
3622 assert_eq!(info.tunnel_id, Some(tunnel_id));
3623
3624 assert_eq!(engine.tunnel_table().len(), 1);
3626 }
3627
3628 #[test]
3629 fn test_announce_stores_tunnel_path() {
3630 use crate::announce::AnnounceData;
3631 use crate::destination::{destination_hash, name_hash};
3632
3633 let mut engine = TransportEngine::new(make_config(false));
3634 let mut iface = make_tunnel_interface(1);
3635 let tunnel_id = [0xBB; 32];
3636 iface.tunnel_id = Some(tunnel_id);
3637 engine.register_interface(iface);
3638
3639 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3641
3642 let identity =
3644 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3645 let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3646 let name_h = name_hash("test", &["tunnel"]);
3647 let random_hash = [0x42u8; 10];
3648
3649 let (announce_data, _) =
3650 AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3651
3652 let flags = PacketFlags {
3653 header_type: constants::HEADER_1,
3654 context_flag: constants::FLAG_UNSET,
3655 transport_type: constants::TRANSPORT_BROADCAST,
3656 destination_type: constants::DESTINATION_SINGLE,
3657 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3658 };
3659 let packet = RawPacket::pack(
3660 flags,
3661 0,
3662 &dest_hash,
3663 None,
3664 constants::CONTEXT_NONE,
3665 &announce_data,
3666 )
3667 .unwrap();
3668
3669 let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3670 engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3671
3672 assert!(engine.has_path(&dest_hash));
3674
3675 let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3677 assert_eq!(tunnel.paths.len(), 1);
3678 assert!(tunnel.paths.contains_key(&dest_hash));
3679 }
3680
3681 #[test]
3682 fn test_tunnel_reattach_restores_paths() {
3683 let mut engine = TransportEngine::new(make_config(true));
3684 engine.register_interface(make_tunnel_interface(1));
3685
3686 let tunnel_id = [0xCC; 32];
3687 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3688
3689 let dest = [0xDD; 16];
3691 engine.tunnel_table.store_tunnel_path(
3692 &tunnel_id,
3693 dest,
3694 tunnel::TunnelPath {
3695 timestamp: 1000.0,
3696 received_from: [0xEE; 16],
3697 hops: 3,
3698 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3699 random_blobs: Vec::new(),
3700 packet_hash: [0xFF; 32],
3701 },
3702 1000.0,
3703 constants::DESTINATION_TIMEOUT,
3704 usize::MAX,
3705 );
3706
3707 engine.void_tunnel_interface(&tunnel_id);
3709
3710 engine.path_table.remove(&dest);
3712 assert!(!engine.has_path(&dest));
3713
3714 engine.register_interface(make_interface(2, constants::MODE_FULL));
3716 let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3717
3718 assert!(engine.has_path(&dest));
3720 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3721 assert_eq!(path.hops, 3);
3722 assert_eq!(path.receiving_interface, InterfaceId(2));
3723
3724 assert!(actions
3726 .iter()
3727 .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3728 }
3729
3730 #[test]
3731 fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3732 let mut engine = TransportEngine::new(make_config(true));
3733 engine.register_interface(make_tunnel_interface(1));
3734
3735 let tunnel_id = [0xCD; 32];
3736 let dest = [0xDE; 16];
3737 let older_blob = make_random_blob(100);
3738 let newer_blob = make_random_blob(200);
3739
3740 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3741 engine.tunnel_table.store_tunnel_path(
3742 &tunnel_id,
3743 dest,
3744 tunnel::TunnelPath {
3745 timestamp: 1000.0,
3746 received_from: [0xEE; 16],
3747 hops: 2,
3748 expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3749 random_blobs: vec![older_blob],
3750 packet_hash: [0x11; 32],
3751 },
3752 1000.0,
3753 constants::DESTINATION_TIMEOUT,
3754 usize::MAX,
3755 );
3756 engine.void_tunnel_interface(&tunnel_id);
3757
3758 engine.path_table.insert(
3759 dest,
3760 PathSet::from_single(
3761 PathEntry {
3762 timestamp: 1500.0,
3763 next_hop: [0xAB; 16],
3764 hops: 3,
3765 expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3766 random_blobs: vec![newer_blob],
3767 receiving_interface: InterfaceId(3),
3768 packet_hash: [0x22; 32],
3769 announce_raw: None,
3770 },
3771 1,
3772 ),
3773 );
3774
3775 engine.register_interface(make_interface(2, constants::MODE_FULL));
3776 engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3777
3778 let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3779 assert_eq!(path.next_hop, [0xAB; 16]);
3780 assert_eq!(path.hops, 3);
3781 assert_eq!(path.receiving_interface, InterfaceId(3));
3782 assert_eq!(path.random_blobs, vec![newer_blob]);
3783 }
3784
3785 #[test]
3786 fn test_void_tunnel_interface() {
3787 let mut engine = TransportEngine::new(make_config(true));
3788 engine.register_interface(make_tunnel_interface(1));
3789
3790 let tunnel_id = [0xDD; 32];
3791 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3792
3793 assert_eq!(
3795 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3796 Some(InterfaceId(1))
3797 );
3798
3799 engine.void_tunnel_interface(&tunnel_id);
3800
3801 assert_eq!(engine.tunnel_table().len(), 1);
3803 assert_eq!(
3804 engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3805 None
3806 );
3807 }
3808
3809 #[test]
3810 fn test_tick_culls_tunnels() {
3811 let mut engine = TransportEngine::new(make_config(true));
3812 engine.register_interface(make_tunnel_interface(1));
3813
3814 let tunnel_id = [0xEE; 32];
3815 engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3816 assert_eq!(engine.tunnel_table().len(), 1);
3817
3818 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3819
3820 engine.tick(
3822 1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3823 &mut rng,
3824 );
3825
3826 assert_eq!(engine.tunnel_table().len(), 0);
3827 }
3828
3829 #[test]
3830 fn test_synthesize_tunnel() {
3831 let mut engine = TransportEngine::new(make_config(true));
3832 engine.register_interface(make_tunnel_interface(1));
3833
3834 let identity =
3835 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3836 let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3837
3838 let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3839
3840 assert_eq!(actions.len(), 1);
3842 match &actions[0] {
3843 TransportAction::TunnelSynthesize {
3844 interface,
3845 data,
3846 dest_hash,
3847 } => {
3848 assert_eq!(*interface, InterfaceId(1));
3849 assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3850 let expected_dest = crate::destination::destination_hash(
3852 "rnstransport",
3853 &["tunnel", "synthesize"],
3854 None,
3855 );
3856 assert_eq!(*dest_hash, expected_dest);
3857 }
3858 _ => panic!("Expected TunnelSynthesize"),
3859 }
3860 }
3861
3862 fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3867 let mut data = Vec::new();
3868 data.extend_from_slice(dest_hash);
3869 data.extend_from_slice(tag);
3870 data
3871 }
3872
3873 #[test]
3874 fn test_path_request_forwarded_on_ap() {
3875 let mut engine = TransportEngine::new(make_config(true));
3876 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3877 engine.register_interface(make_interface(2, constants::MODE_FULL));
3878
3879 let dest = [0xD1; 16];
3880 let tag = [0x01; 16];
3881 let data = make_path_request_data(&dest, &tag);
3882
3883 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3884
3885 assert_eq!(actions.len(), 1);
3887 match &actions[0] {
3888 TransportAction::SendOnInterface { interface, .. } => {
3889 assert_eq!(*interface, InterfaceId(2));
3890 }
3891 _ => panic!("Expected SendOnInterface for forwarded path request"),
3892 }
3893 assert!(engine.discovery_path_requests.contains_key(&dest));
3895 }
3896
3897 #[test]
3898 fn test_path_request_not_forwarded_on_full() {
3899 let mut engine = TransportEngine::new(make_config(true));
3900 engine.register_interface(make_interface(1, constants::MODE_FULL));
3901 engine.register_interface(make_interface(2, constants::MODE_FULL));
3902
3903 let dest = [0xD2; 16];
3904 let tag = [0x02; 16];
3905 let data = make_path_request_data(&dest, &tag);
3906
3907 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3908
3909 assert!(actions.is_empty());
3911 assert!(!engine.discovery_path_requests.contains_key(&dest));
3912 }
3913
3914 #[test]
3915 fn test_discovery_pr_tags_fifo_eviction() {
3916 let mut config = make_config(true);
3917 config.max_discovery_pr_tags = 2;
3918 let mut engine = TransportEngine::new(config);
3919
3920 let dest1 = [0xA1; 16];
3921 let dest2 = [0xA2; 16];
3922 let dest3 = [0xA3; 16];
3923 let tag1 = [0x01; 16];
3924 let tag2 = [0x02; 16];
3925 let tag3 = [0x03; 16];
3926
3927 engine.handle_path_request(
3928 &make_path_request_data(&dest1, &tag1),
3929 InterfaceId(1),
3930 1000.0,
3931 );
3932 engine.handle_path_request(
3933 &make_path_request_data(&dest2, &tag2),
3934 InterfaceId(1),
3935 1001.0,
3936 );
3937 assert_eq!(engine.discovery_pr_tags_count(), 2);
3938
3939 let unique1 = make_unique_tag(dest1, &tag1);
3940 let unique2 = make_unique_tag(dest2, &tag2);
3941 assert!(engine.discovery_pr_tags.contains(&unique1));
3942 assert!(engine.discovery_pr_tags.contains(&unique2));
3943
3944 engine.handle_path_request(
3945 &make_path_request_data(&dest3, &tag3),
3946 InterfaceId(1),
3947 1002.0,
3948 );
3949 assert_eq!(engine.discovery_pr_tags_count(), 2);
3950 assert!(!engine.discovery_pr_tags.contains(&unique1));
3951 assert!(engine.discovery_pr_tags.contains(&unique2));
3952
3953 engine.handle_path_request(
3954 &make_path_request_data(&dest1, &tag1),
3955 InterfaceId(1),
3956 1003.0,
3957 );
3958 assert_eq!(engine.discovery_pr_tags_count(), 2);
3959 assert!(engine.discovery_pr_tags.contains(&unique1));
3960 }
3961
3962 #[test]
3963 fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3964 let mut config = make_config(false);
3965 config.max_path_destinations = 2;
3966 let mut engine = TransportEngine::new(config);
3967 engine.register_interface(make_interface(1, constants::MODE_FULL));
3968
3969 let dest1 = [0xB1; 16];
3970 let dest2 = [0xB2; 16];
3971 let dest3 = [0xB3; 16];
3972
3973 engine.upsert_path_destination(
3974 dest1,
3975 make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3976 1000.0,
3977 );
3978 engine.upsert_path_destination(
3979 dest2,
3980 make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3981 1001.0,
3982 );
3983 engine
3984 .path_states
3985 .insert(dest1, constants::STATE_UNRESPONSIVE);
3986
3987 engine.upsert_path_destination(
3988 dest3,
3989 make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3990 1002.0,
3991 );
3992
3993 assert_eq!(engine.path_table_count(), 2);
3994 assert!(!engine.has_path(&dest1));
3995 assert!(engine.has_path(&dest2));
3996 assert!(engine.has_path(&dest3));
3997 assert!(!engine.path_states.contains_key(&dest1));
3998 assert_eq!(engine.path_destination_cap_evict_count(), 1);
3999 }
4000
4001 #[test]
4002 fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
4003 let mut config = make_config(false);
4004 config.max_path_destinations = 2;
4005 config.max_paths_per_destination = 2;
4006 let mut engine = TransportEngine::new(config);
4007 engine.register_interface(make_interface(1, constants::MODE_FULL));
4008
4009 let dest1 = [0xC1; 16];
4010 let dest2 = [0xC2; 16];
4011
4012 engine.upsert_path_destination(
4013 dest1,
4014 make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
4015 1000.0,
4016 );
4017 engine.upsert_path_destination(
4018 dest2,
4019 make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
4020 1001.0,
4021 );
4022
4023 engine.upsert_path_destination(
4024 dest2,
4025 make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
4026 1002.0,
4027 );
4028
4029 assert_eq!(engine.path_table_count(), 2);
4030 assert!(engine.has_path(&dest1));
4031 assert!(engine.has_path(&dest2));
4032 }
4033
4034 #[test]
4035 fn test_roaming_loop_prevention() {
4036 let mut engine = TransportEngine::new(make_config(true));
4037 engine.register_interface(make_interface(1, constants::MODE_ROAMING));
4038
4039 let dest = [0xD3; 16];
4040 engine.path_table.insert(
4042 dest,
4043 PathSet::from_single(
4044 PathEntry {
4045 timestamp: 900.0,
4046 next_hop: [0xAA; 16],
4047 hops: 2,
4048 expires: 9999.0,
4049 random_blobs: Vec::new(),
4050 receiving_interface: InterfaceId(1),
4051 packet_hash: [0; 32],
4052 announce_raw: None,
4053 },
4054 1,
4055 ),
4056 );
4057
4058 let tag = [0x03; 16];
4059 let data = make_path_request_data(&dest, &tag);
4060
4061 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4062
4063 assert!(actions.is_empty());
4065 assert!(!engine.announce_table.contains_key(&dest));
4066 }
4067
4068 fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
4070 let flags: u8 = 0x01; let mut raw = Vec::new();
4074 raw.push(flags);
4075 raw.push(0x02); raw.extend_from_slice(dest_hash);
4077 raw.push(constants::CONTEXT_NONE);
4078 raw.extend_from_slice(payload);
4079 raw
4080 }
4081
4082 #[test]
4083 fn test_path_request_populates_announce_entry_from_raw() {
4084 let mut engine = TransportEngine::new(make_config(true));
4085 engine.register_interface(make_interface(1, constants::MODE_FULL));
4086 engine.register_interface(make_interface(2, constants::MODE_FULL));
4087
4088 let dest = [0xD5; 16];
4089 let payload = vec![0xAB; 32]; let announce_raw = make_announce_raw(&dest, &payload);
4091
4092 engine.path_table.insert(
4093 dest,
4094 PathSet::from_single(
4095 PathEntry {
4096 timestamp: 900.0,
4097 next_hop: [0xBB; 16],
4098 hops: 2,
4099 expires: 9999.0,
4100 random_blobs: Vec::new(),
4101 receiving_interface: InterfaceId(2),
4102 packet_hash: [0; 32],
4103 announce_raw: Some(announce_raw.clone()),
4104 },
4105 1,
4106 ),
4107 );
4108
4109 let tag = [0x05; 16];
4110 let data = make_path_request_data(&dest, &tag);
4111 let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4112
4113 let entry = engine
4115 .announce_table
4116 .get(&dest)
4117 .expect("announce entry must exist");
4118 assert_eq!(entry.packet_raw, announce_raw);
4119 assert_eq!(entry.packet_data, payload);
4120 assert!(entry.block_rebroadcasts);
4121 }
4122
4123 #[test]
4124 fn test_path_request_skips_when_no_announce_raw() {
4125 let mut engine = TransportEngine::new(make_config(true));
4126 engine.register_interface(make_interface(1, constants::MODE_FULL));
4127 engine.register_interface(make_interface(2, constants::MODE_FULL));
4128
4129 let dest = [0xD6; 16];
4130
4131 engine.path_table.insert(
4132 dest,
4133 PathSet::from_single(
4134 PathEntry {
4135 timestamp: 900.0,
4136 next_hop: [0xCC; 16],
4137 hops: 1,
4138 expires: 9999.0,
4139 random_blobs: Vec::new(),
4140 receiving_interface: InterfaceId(2),
4141 packet_hash: [0; 32],
4142 announce_raw: None, },
4144 1,
4145 ),
4146 );
4147
4148 let tag = [0x06; 16];
4149 let data = make_path_request_data(&dest, &tag);
4150 let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4151
4152 assert!(actions.is_empty());
4154 assert!(!engine.announce_table.contains_key(&dest));
4155 }
4156
4157 #[test]
4158 fn test_discovery_request_consumed_on_announce() {
4159 let mut engine = TransportEngine::new(make_config(true));
4160 engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4161
4162 let dest = [0xD4; 16];
4163
4164 engine.discovery_path_requests.insert(
4166 dest,
4167 DiscoveryPathRequest {
4168 timestamp: 900.0,
4169 requesting_interface: InterfaceId(1),
4170 },
4171 );
4172
4173 let iface = engine.discovery_path_requests_waiting(&dest);
4175 assert_eq!(iface, Some(InterfaceId(1)));
4176
4177 assert!(!engine.discovery_path_requests.contains_key(&dest));
4179 assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
4180 }
4181
4182 #[test]
4183 fn test_pending_path_request_announce_bypasses_ingress_control() {
4184 let mut engine = TransportEngine::new(make_config(true));
4185 let mut inbound = make_interface(1, constants::MODE_FULL);
4186 inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
4187 inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
4188 inbound.started = 0.0;
4189 engine.register_interface(inbound);
4190 engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
4191
4192 let identity =
4193 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4194 let dest_hash = crate::destination::destination_hash(
4195 "ingress",
4196 &["path-request"],
4197 Some(identity.hash()),
4198 );
4199 let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
4200 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4201
4202 engine.discovery_path_requests.insert(
4203 dest_hash,
4204 DiscoveryPathRequest {
4205 timestamp: 999.0,
4206 requesting_interface: InterfaceId(2),
4207 },
4208 );
4209
4210 let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
4211 let actions = engine.handle_inbound(&announce_raw, InterfaceId(1), 1000.0, &mut rng);
4212
4213 assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
4214 assert!(engine.has_path(&dest_hash));
4215 assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
4216 assert!(actions.iter().any(|a| {
4217 matches!(
4218 a,
4219 TransportAction::AnnounceReceived {
4220 destination_hash,
4221 receiving_interface: InterfaceId(1),
4222 ..
4223 } if *destination_hash == dest_hash
4224 )
4225 }));
4226
4227 let entry = engine
4228 .announce_table
4229 .get(&dest_hash)
4230 .expect("path response announce should be queued");
4231 assert!(entry.block_rebroadcasts);
4232 assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
4233 }
4234
4235 fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
4241 let identity =
4242 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4243 let random_hash = [0x42u8; 10];
4244 let (announce_data, _) = crate::announce::AnnounceData::pack(
4245 &identity,
4246 dest_hash,
4247 name_hash,
4248 &random_hash,
4249 None,
4250 None,
4251 )
4252 .unwrap();
4253 let flags = PacketFlags {
4254 header_type: constants::HEADER_1,
4255 context_flag: constants::FLAG_UNSET,
4256 transport_type: constants::TRANSPORT_BROADCAST,
4257 destination_type: constants::DESTINATION_SINGLE,
4258 packet_type: constants::PACKET_TYPE_ANNOUNCE,
4259 };
4260 RawPacket::pack(
4261 flags,
4262 0,
4263 dest_hash,
4264 None,
4265 constants::CONTEXT_NONE,
4266 &announce_data,
4267 )
4268 .unwrap()
4269 .raw
4270 }
4271
4272 #[test]
4273 fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
4274 let mut engine = TransportEngine::new(make_config(false));
4279 engine.register_interface(make_local_client_interface(1));
4280
4281 let identity =
4282 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4283 let dest_hash =
4284 crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
4285 let name_hash = crate::destination::name_hash("issue4", &["test"]);
4286 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4287
4288 let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
4292 announce_packet.raw[1] = 1;
4293 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4294 engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
4295 assert!(engine.has_path(&dest_hash));
4296 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4297
4298 let data_flags = PacketFlags {
4300 header_type: constants::HEADER_1,
4301 context_flag: constants::FLAG_UNSET,
4302 transport_type: constants::TRANSPORT_BROADCAST,
4303 destination_type: constants::DESTINATION_SINGLE,
4304 packet_type: constants::PACKET_TYPE_DATA,
4305 };
4306 let data_packet = RawPacket::pack(
4307 data_flags,
4308 0,
4309 &dest_hash,
4310 None,
4311 constants::CONTEXT_NONE,
4312 b"hello",
4313 )
4314 .unwrap();
4315
4316 let actions =
4317 engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
4318
4319 let send = actions.iter().find_map(|a| match a {
4320 TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4321 _ => None,
4322 });
4323 let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4324 assert_eq!(*interface, InterfaceId(1));
4325 let flags = PacketFlags::unpack(raw[0]);
4326 assert_eq!(flags.header_type, constants::HEADER_2);
4327 assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4328 }
4329
4330 #[test]
4331 fn test_issue4_external_data_to_1hop_via_transport_works() {
4332 let daemon_id = [0x42; 16];
4338 let mut engine = TransportEngine::new(TransportConfig {
4339 transport_enabled: true,
4340 identity_hash: Some(daemon_id),
4341 prefer_shorter_path: false,
4342 max_paths_per_destination: 1,
4343 packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4344 max_discovery_pr_tags: constants::MAX_PR_TAGS,
4345 max_path_destinations: usize::MAX,
4346 max_tunnel_destinations_total: usize::MAX,
4347 destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4348 announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4349 announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4350 announce_sig_cache_enabled: true,
4351 announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4352 announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4353 announce_queue_max_entries: 256,
4354 announce_queue_max_interfaces: 1024,
4355 });
4356 engine.register_interface(make_interface(1, constants::MODE_FULL)); engine.register_interface(make_interface(2, constants::MODE_FULL)); let identity =
4360 rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4361 let dest_hash =
4362 crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4363 let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4364 let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4365
4366 let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4368 engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
4369 assert_eq!(engine.hops_to(&dest_hash), Some(1));
4370
4371 let h2_flags = PacketFlags {
4374 header_type: constants::HEADER_2,
4375 context_flag: constants::FLAG_UNSET,
4376 transport_type: constants::TRANSPORT_TRANSPORT,
4377 destination_type: constants::DESTINATION_SINGLE,
4378 packet_type: constants::PACKET_TYPE_DATA,
4379 };
4380 let mut h2_raw = Vec::new();
4382 h2_raw.push(h2_flags.pack());
4383 h2_raw.push(0); h2_raw.extend_from_slice(&daemon_id); h2_raw.extend_from_slice(&dest_hash);
4386 h2_raw.push(constants::CONTEXT_NONE);
4387 h2_raw.extend_from_slice(b"hello via transport");
4388
4389 let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4390 let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
4391
4392 let has_send = actions.iter().any(|a| {
4394 matches!(
4395 a,
4396 TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4397 )
4398 });
4399 assert!(
4400 has_send,
4401 "HEADER_2 transport packet should be forwarded (control test)"
4402 );
4403 }
4404}