Skip to main content

rns_core/transport/
mod.rs

1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod pathfinder;
10pub mod rate_limit;
11pub mod tables;
12pub mod tunnel;
13pub mod types;
14
15use alloc::collections::BTreeMap;
16use alloc::string::String;
17use alloc::vec::Vec;
18use core::mem::size_of;
19
20use rns_crypto::Rng;
21
22use crate::announce::AnnounceData;
23use crate::constants;
24use crate::hash;
25use crate::packet::RawPacket;
26
27use self::announce_proc::compute_path_expires;
28use self::announce_queue::AnnounceQueues;
29use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
30use self::dedup::{AnnounceSignatureCache, PacketHashlist};
31use self::inbound::{
32    create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
33    route_via_link_table,
34};
35use self::ingress_control::IngressControl;
36use self::outbound::{route_outbound, should_transmit_announce};
37use self::pathfinder::{
38    decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
39    timebase_from_random_blobs, MultiPathDecision,
40};
41use self::rate_limit::AnnounceRateLimiter;
42use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
43use self::tunnel::TunnelTable;
44use self::types::{BlackholeEntry, InterfaceId, InterfaceInfo, TransportAction, TransportConfig};
45
46pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
47pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
48
49struct InboundPacketCtx {
50    packet: RawPacket,
51    original_raw: Vec<u8>,
52    iface: InterfaceId,
53    now: f64,
54    from_local_client: bool,
55}
56
57struct VerifiedAnnounceCtx<'a> {
58    packet: &'a RawPacket,
59    original_raw: &'a [u8],
60    iface: InterfaceId,
61    now: f64,
62    validated: crate::announce::ValidatedAnnounce,
63    received_from: [u8; 16],
64    random_blob: [u8; 10],
65    announce_emitted: u64,
66}
67
68struct TickCtx<'a> {
69    now: f64,
70    rng: &'a mut dyn Rng,
71    actions: Vec<TransportAction>,
72}
73
74struct PathRequestCtx<'a> {
75    data: &'a [u8],
76    interface_id: InterfaceId,
77    now: f64,
78    destination_hash: [u8; 16],
79}
80
81/// The core transport/routing engine.
82///
83/// Maintains routing tables and processes packets without performing any I/O.
84/// Returns `Vec<TransportAction>` that the caller must execute.
85pub struct TransportEngine {
86    config: TransportConfig,
87    path_table: BTreeMap<[u8; 16], PathSet>,
88    announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
89    reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
90    link_table: BTreeMap<[u8; 16], LinkEntry>,
91    held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
92    packet_hashlist: PacketHashlist,
93    announce_sig_cache: AnnounceSignatureCache,
94    rate_limiter: AnnounceRateLimiter,
95    path_states: BTreeMap<[u8; 16], u8>,
96    interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
97    local_destinations: BTreeMap<[u8; 16], u8>,
98    blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
99    announce_queues: AnnounceQueues,
100    ingress_control: IngressControl,
101    tunnel_table: TunnelTable,
102    discovery_pr_tags: Vec<[u8; 32]>,
103    discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
104    path_destination_cap_evict_count: usize,
105    // Job timing
106    announces_last_checked: f64,
107    tables_last_culled: f64,
108}
109
110impl TransportEngine {
111    pub fn new(config: TransportConfig) -> Self {
112        let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
113        let sig_cache_max = if config.announce_sig_cache_enabled {
114            config.announce_sig_cache_max_entries
115        } else {
116            0
117        };
118        let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
119        let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
120        TransportEngine {
121            config,
122            path_table: BTreeMap::new(),
123            announce_table: BTreeMap::new(),
124            reverse_table: BTreeMap::new(),
125            link_table: BTreeMap::new(),
126            held_announces: BTreeMap::new(),
127            packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
128            announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
129            rate_limiter: AnnounceRateLimiter::new(),
130            path_states: BTreeMap::new(),
131            interfaces: BTreeMap::new(),
132            local_destinations: BTreeMap::new(),
133            blackholed_identities: BTreeMap::new(),
134            announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
135            ingress_control: IngressControl::new(),
136            tunnel_table: TunnelTable::new(),
137            discovery_pr_tags: Vec::new(),
138            discovery_path_requests: BTreeMap::new(),
139            path_destination_cap_evict_count: 0,
140            announces_last_checked: 0.0,
141            tables_last_culled: 0.0,
142        }
143    }
144
145    fn insert_discovery_pr_tag(&mut self, unique_tag: [u8; 32]) -> bool {
146        if self.discovery_pr_tags.contains(&unique_tag) {
147            return false;
148        }
149        if self.config.max_discovery_pr_tags != usize::MAX
150            && self.discovery_pr_tags.len() >= self.config.max_discovery_pr_tags
151            && !self.discovery_pr_tags.is_empty()
152        {
153            self.discovery_pr_tags.remove(0);
154        }
155        self.discovery_pr_tags.push(unique_tag);
156        true
157    }
158
159    fn upsert_path_destination(&mut self, dest_hash: [u8; 16], entry: PathEntry, now: f64) {
160        let max_paths = self.config.max_paths_per_destination;
161        if let Some(ps) = self.path_table.get_mut(&dest_hash) {
162            ps.upsert(entry);
163            return;
164        }
165        self.enforce_path_destination_cap(now);
166        self.path_table
167            .insert(dest_hash, PathSet::from_single(entry, max_paths));
168    }
169
170    fn enforce_path_destination_cap(&mut self, now: f64) {
171        if self.config.max_path_destinations == usize::MAX {
172            return;
173        }
174        jobs::cull_path_table(&mut self.path_table, &self.interfaces, now);
175        while self.path_table.len() >= self.config.max_path_destinations {
176            let Some(dest_hash) = self.oldest_path_destination() else {
177                break;
178            };
179            self.path_table.remove(&dest_hash);
180            self.path_states.remove(&dest_hash);
181            self.path_destination_cap_evict_count += 1;
182        }
183    }
184
185    fn oldest_path_destination(&self) -> Option<[u8; 16]> {
186        self.path_table
187            .iter()
188            .filter_map(|(dest_hash, path_set)| {
189                path_set
190                    .primary()
191                    .map(|primary| (*dest_hash, primary.timestamp, primary.hops))
192            })
193            .min_by(|a, b| {
194                a.1.partial_cmp(&b.1)
195                    .unwrap_or(core::cmp::Ordering::Equal)
196                    .then_with(|| b.2.cmp(&a.2))
197            })
198            .map(|(dest_hash, _, _)| dest_hash)
199    }
200
201    fn announce_entry_size_bytes(entry: &AnnounceEntry) -> usize {
202        size_of::<AnnounceEntry>() + entry.packet_raw.capacity() + entry.packet_data.capacity()
203    }
204
205    fn announce_retained_bytes_total(&self) -> usize {
206        self.announce_table
207            .values()
208            .chain(self.held_announces.values())
209            .map(Self::announce_entry_size_bytes)
210            .sum()
211    }
212
213    fn cull_expired_announce_entries(&mut self, now: f64) -> usize {
214        let ttl = self.config.announce_table_ttl_secs;
215        let mut removed = 0usize;
216
217        self.announce_table.retain(|_, entry| {
218            let keep = now <= entry.timestamp + ttl;
219            if !keep {
220                removed += 1;
221            }
222            keep
223        });
224
225        self.held_announces.retain(|_, entry| {
226            let keep = now <= entry.timestamp + ttl;
227            if !keep {
228                removed += 1;
229            }
230            keep
231        });
232
233        removed
234    }
235
236    fn oldest_retained_announce(&self) -> Option<([u8; 16], bool)> {
237        let oldest_active = self
238            .announce_table
239            .iter()
240            .map(|(dest_hash, entry)| (*dest_hash, false, entry.timestamp))
241            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
242        let oldest_held = self
243            .held_announces
244            .iter()
245            .map(|(dest_hash, entry)| (*dest_hash, true, entry.timestamp))
246            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(core::cmp::Ordering::Equal));
247
248        match (oldest_active, oldest_held) {
249            (Some(active), Some(held)) => {
250                let ordering = active
251                    .2
252                    .partial_cmp(&held.2)
253                    .unwrap_or(core::cmp::Ordering::Equal);
254                if ordering == core::cmp::Ordering::Less {
255                    Some((active.0, active.1))
256                } else {
257                    Some((held.0, held.1))
258                }
259            }
260            (Some(active), None) => Some((active.0, active.1)),
261            (None, Some(held)) => Some((held.0, held.1)),
262            (None, None) => None,
263        }
264    }
265
266    fn enforce_announce_retention_cap(&mut self, now: f64) {
267        self.cull_expired_announce_entries(now);
268        while self.announce_retained_bytes_total() > self.config.announce_table_max_bytes {
269            let Some((dest_hash, is_held)) = self.oldest_retained_announce() else {
270                break;
271            };
272            if is_held {
273                self.held_announces.remove(&dest_hash);
274            } else {
275                self.announce_table.remove(&dest_hash);
276            }
277        }
278    }
279
280    fn insert_announce_entry(
281        &mut self,
282        dest_hash: [u8; 16],
283        entry: AnnounceEntry,
284        now: f64,
285    ) -> bool {
286        self.cull_expired_announce_entries(now);
287        if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
288            return false;
289        }
290        self.announce_table.insert(dest_hash, entry);
291        self.enforce_announce_retention_cap(now);
292        self.announce_table.contains_key(&dest_hash)
293    }
294
295    fn insert_held_announce(
296        &mut self,
297        dest_hash: [u8; 16],
298        entry: AnnounceEntry,
299        now: f64,
300    ) -> bool {
301        self.cull_expired_announce_entries(now);
302        if Self::announce_entry_size_bytes(&entry) > self.config.announce_table_max_bytes {
303            return false;
304        }
305        self.held_announces.insert(dest_hash, entry);
306        self.enforce_announce_retention_cap(now);
307        self.held_announces.contains_key(&dest_hash)
308    }
309
310    // =========================================================================
311    // Interface management
312    // =========================================================================
313
314    pub fn register_interface(&mut self, info: InterfaceInfo) {
315        self.interfaces.insert(info.id, info);
316    }
317
318    pub fn deregister_interface(&mut self, id: InterfaceId) {
319        self.interfaces.remove(&id);
320        self.announce_queues.remove_interface(id);
321        self.ingress_control.remove_interface(&id);
322    }
323
324    // =========================================================================
325    // Destination management
326    // =========================================================================
327
328    pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
329        self.local_destinations.insert(dest_hash, dest_type);
330    }
331
332    pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
333        self.local_destinations.remove(dest_hash);
334    }
335
336    // =========================================================================
337    // Path queries
338    // =========================================================================
339
340    pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
341        self.path_table
342            .get(dest_hash)
343            .is_some_and(|ps| !ps.is_empty())
344    }
345
346    pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
347        self.path_table
348            .get(dest_hash)
349            .and_then(|ps| ps.primary())
350            .map(|e| e.hops)
351    }
352
353    pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
354        self.path_table
355            .get(dest_hash)
356            .and_then(|ps| ps.primary())
357            .map(|e| e.next_hop)
358    }
359
360    pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
361        self.path_table
362            .get(dest_hash)
363            .and_then(|ps| ps.primary())
364            .map(|e| e.receiving_interface)
365    }
366
367    // =========================================================================
368    // Path state
369    // =========================================================================
370
371    /// Mark a path as unresponsive.
372    ///
373    /// If `receiving_interface` is provided and points to a MODE_BOUNDARY interface,
374    /// the marking is skipped — boundary interfaces must not poison path tables.
375    /// (Python Transport.py: mark_path_unknown/unresponsive boundary exemption)
376    pub fn mark_path_unresponsive(
377        &mut self,
378        dest_hash: &[u8; 16],
379        receiving_interface: Option<InterfaceId>,
380    ) {
381        if let Some(iface_id) = receiving_interface {
382            if let Some(info) = self.interfaces.get(&iface_id) {
383                if info.mode == constants::MODE_BOUNDARY {
384                    return;
385                }
386            }
387        }
388
389        // Failover: if we have alternative paths, promote the next one
390        if let Some(ps) = self.path_table.get_mut(dest_hash) {
391            if ps.len() > 1 {
392                ps.failover(false); // demote old primary to back
393                                    // Clear unresponsive state since we promoted a fresh primary
394                self.path_states.remove(dest_hash);
395                return;
396            }
397        }
398
399        self.path_states
400            .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
401    }
402
403    pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
404        self.path_states
405            .insert(*dest_hash, constants::STATE_RESPONSIVE);
406    }
407
408    pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
409        self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
410    }
411
412    pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
413        if let Some(ps) = self.path_table.get_mut(dest_hash) {
414            ps.expire_all();
415        }
416    }
417
418    // =========================================================================
419    // Link table
420    // =========================================================================
421
422    pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
423        self.link_table.insert(link_id, entry);
424    }
425
426    pub fn validate_link(&mut self, link_id: &[u8; 16]) {
427        if let Some(entry) = self.link_table.get_mut(link_id) {
428            entry.validated = true;
429        }
430    }
431
432    pub fn remove_link(&mut self, link_id: &[u8; 16]) {
433        self.link_table.remove(link_id);
434    }
435
436    // =========================================================================
437    // Blackhole management
438    // =========================================================================
439
440    /// Add an identity hash to the blackhole list.
441    pub fn blackhole_identity(
442        &mut self,
443        identity_hash: [u8; 16],
444        now: f64,
445        duration_hours: Option<f64>,
446        reason: Option<String>,
447    ) {
448        let expires = match duration_hours {
449            Some(h) if h > 0.0 => now + h * 3600.0,
450            _ => 0.0, // never expires
451        };
452        self.blackholed_identities.insert(
453            identity_hash,
454            BlackholeEntry {
455                created: now,
456                expires,
457                reason,
458            },
459        );
460    }
461
462    /// Remove an identity hash from the blackhole list.
463    pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
464        self.blackholed_identities.remove(identity_hash).is_some()
465    }
466
467    /// Check if an identity hash is blackholed (and not expired).
468    pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
469        if let Some(entry) = self.blackholed_identities.get(identity_hash) {
470            if entry.expires == 0.0 || entry.expires > now {
471                return true;
472            }
473        }
474        false
475    }
476
477    /// Get all blackhole entries (for queries).
478    pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
479        self.blackholed_identities.iter()
480    }
481
482    /// Cull expired blackhole entries.
483    fn cull_blackholed(&mut self, now: f64) {
484        self.blackholed_identities
485            .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
486    }
487
488    // =========================================================================
489    // Tunnel management
490    // =========================================================================
491
492    /// Handle a validated tunnel synthesis — create new or reattach.
493    ///
494    /// Returns actions for any restored paths.
495    pub fn handle_tunnel(
496        &mut self,
497        tunnel_id: [u8; 32],
498        interface: InterfaceId,
499        now: f64,
500    ) -> Vec<TransportAction> {
501        let mut actions = Vec::new();
502
503        // Set tunnel_id on the interface
504        if let Some(info) = self.interfaces.get_mut(&interface) {
505            info.tunnel_id = Some(tunnel_id);
506        }
507
508        let restored_paths = self.tunnel_table.handle_tunnel(
509            tunnel_id,
510            interface,
511            now,
512            self.config.destination_timeout_secs,
513        );
514
515        // Restore paths to path table if they're better than existing
516        for (dest_hash, tunnel_path) in &restored_paths {
517            let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
518                Some(existing) => {
519                    // Restore if fewer/equal hops or existing expired, but never
520                    // overwrite a path learned from a more recent announce.
521                    if tunnel_path.hops <= existing.hops || existing.expires < now {
522                        let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
523                        let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
524                        tunnel_timebase >= existing_timebase
525                    } else {
526                        false
527                    }
528                }
529                None => now < tunnel_path.expires,
530            };
531
532            if should_restore {
533                let entry = PathEntry {
534                    timestamp: tunnel_path.timestamp,
535                    next_hop: tunnel_path.received_from,
536                    hops: tunnel_path.hops,
537                    expires: tunnel_path.expires,
538                    random_blobs: tunnel_path.random_blobs.clone(),
539                    receiving_interface: interface,
540                    packet_hash: tunnel_path.packet_hash,
541                    announce_raw: None,
542                };
543                self.upsert_path_destination(*dest_hash, entry, now);
544            }
545        }
546
547        actions.push(TransportAction::TunnelEstablished {
548            tunnel_id,
549            interface,
550        });
551
552        actions
553    }
554
555    /// Synthesize a tunnel on an interface.
556    ///
557    /// `identity`: the transport identity (must have private key for signing)
558    /// `interface_id`: which interface to send the synthesis on
559    /// `rng`: random number generator
560    ///
561    /// Returns TunnelSynthesize action to send the synthesis packet.
562    pub fn synthesize_tunnel(
563        &self,
564        identity: &rns_crypto::identity::Identity,
565        interface_id: InterfaceId,
566        rng: &mut dyn Rng,
567    ) -> Vec<TransportAction> {
568        let mut actions = Vec::new();
569
570        // Compute interface hash from the interface name
571        let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
572            hash::full_hash(info.name.as_bytes())
573        } else {
574            return actions;
575        };
576
577        match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
578            Ok((data, _tunnel_id)) => {
579                let dest_hash = crate::destination::destination_hash(
580                    "rnstransport",
581                    &["tunnel", "synthesize"],
582                    None,
583                );
584                actions.push(TransportAction::TunnelSynthesize {
585                    interface: interface_id,
586                    data,
587                    dest_hash,
588                });
589            }
590            Err(e) => {
591                // Can't synthesize — no private key or other error
592                let _ = e;
593            }
594        }
595
596        actions
597    }
598
599    /// Void a tunnel's interface connection (tunnel disconnected).
600    pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
601        self.tunnel_table.void_tunnel_interface(tunnel_id);
602    }
603
604    /// Access the tunnel table for queries.
605    pub fn tunnel_table(&self) -> &TunnelTable {
606        &self.tunnel_table
607    }
608
609    // =========================================================================
610    // Packet filter
611    // =========================================================================
612
613    /// Check if any local client interfaces are registered.
614    fn has_local_clients(&self) -> bool {
615        self.interfaces.values().any(|i| i.is_local_client)
616    }
617
618    /// Packet filter: dedup + basic validity.
619    ///
620    /// Transport.py:1187-1238
621    fn packet_filter(&self, packet: &RawPacket) -> bool {
622        // Filter packets for other transport instances
623        if packet.transport_id.is_some()
624            && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
625        {
626            if let Some(ref identity_hash) = self.config.identity_hash {
627                if packet.transport_id.as_ref() != Some(identity_hash) {
628                    return false;
629                }
630            }
631        }
632
633        // Allow certain contexts unconditionally
634        match packet.context {
635            constants::CONTEXT_KEEPALIVE
636            | constants::CONTEXT_RESOURCE_REQ
637            | constants::CONTEXT_RESOURCE_PRF
638            | constants::CONTEXT_RESOURCE
639            | constants::CONTEXT_CACHE_REQUEST
640            | constants::CONTEXT_CHANNEL => return true,
641            _ => {}
642        }
643
644        // PLAIN/GROUP checks
645        if packet.flags.destination_type == constants::DESTINATION_PLAIN
646            || packet.flags.destination_type == constants::DESTINATION_GROUP
647        {
648            if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
649                return packet.hops <= 1;
650            } else {
651                // PLAIN/GROUP ANNOUNCE is invalid
652                return false;
653            }
654        }
655
656        // Deduplication
657        if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
658            return true;
659        }
660
661        // Duplicate announce for SINGLE dest is allowed (path update)
662        if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
663            && packet.flags.destination_type == constants::DESTINATION_SINGLE
664        {
665            return true;
666        }
667
668        false
669    }
670
671    // =========================================================================
672    // Core API: handle_inbound
673    // =========================================================================
674
675    /// Process an inbound raw packet from a network interface.
676    ///
677    /// Returns a list of actions for the caller to execute.
678    pub fn handle_inbound(
679        &mut self,
680        raw: &[u8],
681        iface: InterfaceId,
682        now: f64,
683        rng: &mut dyn Rng,
684    ) -> Vec<TransportAction> {
685        self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
686    }
687
688    pub fn handle_inbound_with_announce_queue(
689        &mut self,
690        raw: &[u8],
691        iface: InterfaceId,
692        now: f64,
693        rng: &mut dyn Rng,
694        announce_queue: Option<&mut AnnounceVerifyQueue>,
695    ) -> Vec<TransportAction> {
696        let Some(ctx) = self.prepare_inbound_packet(raw, iface, now) else {
697            return Vec::new();
698        };
699        let mut actions = Vec::new();
700
701        self.remember_inbound_packet_hash(&ctx.packet);
702        self.bridge_plain_broadcast(&ctx, &mut actions);
703        self.handle_transport_forwarding(&ctx, &mut actions);
704        self.handle_link_table_routing(&ctx, &mut actions);
705        self.handle_inbound_announce(&ctx, rng, announce_queue, &mut actions);
706
707        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
708            self.process_inbound_proof(&ctx.packet, ctx.iface, ctx.now, &mut actions);
709        }
710
711        self.handle_inbound_local_delivery(&ctx, &mut actions);
712        actions
713    }
714
715    fn prepare_inbound_packet(
716        &self,
717        raw: &[u8],
718        iface: InterfaceId,
719        now: f64,
720    ) -> Option<InboundPacketCtx> {
721        let mut packet = RawPacket::unpack(raw).ok()?;
722        let from_local_client = self
723            .interfaces
724            .get(&iface)
725            .map(|i| i.is_local_client)
726            .unwrap_or(false);
727        packet.hops += 1;
728        if from_local_client {
729            packet.hops = packet.hops.saturating_sub(1);
730        }
731        if !self.packet_filter(&packet) {
732            return None;
733        }
734        Some(InboundPacketCtx {
735            packet,
736            original_raw: raw.to_vec(),
737            iface,
738            now,
739            from_local_client,
740        })
741    }
742
743    fn remember_inbound_packet_hash(&mut self, packet: &RawPacket) {
744        let remember_hash = !(self.link_table.contains_key(&packet.destination_hash)
745            || (packet.flags.packet_type == constants::PACKET_TYPE_PROOF
746                && packet.context == constants::CONTEXT_LRPROOF));
747        if remember_hash {
748            self.packet_hashlist.add(packet.packet_hash);
749        }
750    }
751
752    fn bridge_plain_broadcast(&self, ctx: &InboundPacketCtx, actions: &mut Vec<TransportAction>) {
753        if ctx.packet.flags.destination_type != constants::DESTINATION_PLAIN
754            || ctx.packet.flags.transport_type != constants::TRANSPORT_BROADCAST
755            || !self.has_local_clients()
756        {
757            return;
758        }
759
760        if ctx.from_local_client {
761            actions.push(TransportAction::ForwardPlainBroadcast {
762                raw: ctx.packet.raw.clone(),
763                to_local: false,
764                exclude: Some(ctx.iface),
765            });
766        } else {
767            actions.push(TransportAction::ForwardPlainBroadcast {
768                raw: ctx.packet.raw.clone(),
769                to_local: true,
770                exclude: None,
771            });
772        }
773    }
774
775    fn handle_transport_forwarding(
776        &mut self,
777        ctx: &InboundPacketCtx,
778        actions: &mut Vec<TransportAction>,
779    ) {
780        if !(self.config.transport_enabled || self.config.identity_hash.is_some()) {
781            return;
782        }
783        if ctx.packet.transport_id.is_none()
784            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
785        {
786            return;
787        }
788
789        let Some(identity_hash) = self.config.identity_hash else {
790            return;
791        };
792        if ctx.packet.transport_id != Some(identity_hash) {
793            return;
794        }
795
796        let Some(path_entry) = self
797            .path_table
798            .get(&ctx.packet.destination_hash)
799            .and_then(|ps| ps.primary())
800        else {
801            return;
802        };
803
804        let next_hop = path_entry.next_hop;
805        let remaining_hops = path_entry.hops;
806        let outbound_interface = path_entry.receiving_interface;
807        let new_raw =
808            forward_transport_packet(&ctx.packet, next_hop, remaining_hops, outbound_interface);
809
810        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
811            let proof_timeout = ctx.now
812                + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP * (remaining_hops.max(1) as f64);
813            let (link_id, link_entry) = create_link_entry(
814                &ctx.packet,
815                next_hop,
816                outbound_interface,
817                remaining_hops,
818                ctx.iface,
819                ctx.now,
820                proof_timeout,
821            );
822            self.link_table.insert(link_id, link_entry);
823            actions.push(TransportAction::LinkRequestReceived {
824                link_id,
825                destination_hash: ctx.packet.destination_hash,
826                receiving_interface: ctx.iface,
827            });
828        } else {
829            let (trunc_hash, reverse_entry) =
830                create_reverse_entry(&ctx.packet, outbound_interface, ctx.iface, ctx.now);
831            self.reverse_table.insert(trunc_hash, reverse_entry);
832        }
833
834        actions.push(TransportAction::SendOnInterface {
835            interface: outbound_interface,
836            raw: new_raw,
837        });
838
839        if let Some(entry) = self
840            .path_table
841            .get_mut(&ctx.packet.destination_hash)
842            .and_then(|ps| ps.primary_mut())
843        {
844            entry.timestamp = ctx.now;
845        }
846    }
847
848    fn handle_link_table_routing(
849        &mut self,
850        ctx: &InboundPacketCtx,
851        actions: &mut Vec<TransportAction>,
852    ) {
853        if !self.config.transport_enabled && self.config.identity_hash.is_none() {
854            return;
855        }
856        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
857            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
858            || ctx.packet.context == constants::CONTEXT_LRPROOF
859        {
860            return;
861        }
862
863        let Some(link_entry) = self.link_table.get(&ctx.packet.destination_hash).cloned() else {
864            return;
865        };
866        let Some((outbound_iface, new_raw)) =
867            route_via_link_table(&ctx.packet, &link_entry, ctx.iface)
868        else {
869            return;
870        };
871
872        self.packet_hashlist.add(ctx.packet.packet_hash);
873        actions.push(TransportAction::SendOnInterface {
874            interface: outbound_iface,
875            raw: new_raw,
876        });
877
878        if let Some(entry) = self.link_table.get_mut(&ctx.packet.destination_hash) {
879            entry.timestamp = ctx.now;
880        }
881    }
882
883    fn handle_inbound_announce(
884        &mut self,
885        ctx: &InboundPacketCtx,
886        rng: &mut dyn Rng,
887        announce_queue: Option<&mut AnnounceVerifyQueue>,
888        actions: &mut Vec<TransportAction>,
889    ) {
890        if ctx.packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
891            return;
892        }
893
894        if let Some(queue) = announce_queue {
895            self.try_enqueue_announce(ctx, rng, queue, actions);
896        } else {
897            self.process_inbound_announce(
898                &ctx.packet,
899                &ctx.original_raw,
900                ctx.iface,
901                ctx.now,
902                rng,
903                actions,
904            );
905        }
906    }
907
908    fn handle_inbound_local_delivery(
909        &self,
910        ctx: &InboundPacketCtx,
911        actions: &mut Vec<TransportAction>,
912    ) {
913        if (ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
914            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA)
915            && self
916                .local_destinations
917                .contains_key(&ctx.packet.destination_hash)
918        {
919            actions.push(TransportAction::DeliverLocal {
920                destination_hash: ctx.packet.destination_hash,
921                raw: ctx.packet.raw.clone(),
922                packet_hash: ctx.packet.packet_hash,
923                receiving_interface: ctx.iface,
924            });
925        }
926    }
927
928    // =========================================================================
929    // Inbound announce processing
930    // =========================================================================
931
932    fn process_inbound_announce(
933        &mut self,
934        packet: &RawPacket,
935        original_raw: &[u8],
936        iface: InterfaceId,
937        now: f64,
938        rng: &mut dyn Rng,
939        actions: &mut Vec<TransportAction>,
940    ) {
941        if packet.flags.destination_type != constants::DESTINATION_SINGLE {
942            return;
943        }
944
945        let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
946
947        // Unpack and validate announce
948        let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
949            Ok(a) => a,
950            Err(_) => return,
951        };
952
953        let sig_cache_key =
954            Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
955
956        let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
957            announce.to_validated_unchecked()
958        } else {
959            match announce.validate(&packet.destination_hash) {
960                Ok(v) => {
961                    self.announce_sig_cache.insert(sig_cache_key, now);
962                    v
963                }
964                Err(_) => return,
965            }
966        };
967
968        let received_from = self.announce_received_from(packet, now);
969        let random_blob = match extract_random_blob(&packet.data) {
970            Some(b) => b,
971            None => return,
972        };
973        let announce_emitted = timebase_from_random_blob(&random_blob);
974
975        self.process_verified_announce(
976            VerifiedAnnounceCtx {
977                packet,
978                original_raw,
979                iface,
980                now,
981                validated,
982                received_from,
983                random_blob,
984                announce_emitted,
985            },
986            rng,
987            actions,
988        );
989    }
990
991    fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
992        let mut material = [0u8; 80];
993        material[..16].copy_from_slice(&destination_hash);
994        material[16..].copy_from_slice(signature);
995        hash::full_hash(&material)
996    }
997
998    fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
999        if let Some(transport_id) = packet.transport_id {
1000            if self.config.transport_enabled {
1001                if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
1002                {
1003                    if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
1004                        announce_entry.local_rebroadcasts += 1;
1005                        if announce_entry.retries > 0
1006                            && announce_entry.local_rebroadcasts
1007                                >= constants::LOCAL_REBROADCASTS_MAX
1008                        {
1009                            self.announce_table.remove(&packet.destination_hash);
1010                        }
1011                    }
1012                    if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
1013                    {
1014                        if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
1015                            && announce_entry.retries > 0
1016                            && now < announce_entry.retransmit_timeout
1017                        {
1018                            self.announce_table.remove(&packet.destination_hash);
1019                        }
1020                    }
1021                }
1022            }
1023            transport_id
1024        } else {
1025            packet.destination_hash
1026        }
1027    }
1028
1029    fn should_hold_announce(
1030        &mut self,
1031        packet: &RawPacket,
1032        original_raw: &[u8],
1033        iface: InterfaceId,
1034        now: f64,
1035    ) -> bool {
1036        if self.has_path(&packet.destination_hash) {
1037            return false;
1038        }
1039        if self
1040            .discovery_path_requests
1041            .contains_key(&packet.destination_hash)
1042        {
1043            return false;
1044        }
1045        let Some(info) = self.interfaces.get(&iface) else {
1046            return false;
1047        };
1048        if packet.context == constants::CONTEXT_PATH_RESPONSE
1049            || !self.ingress_control.should_ingress_limit(
1050                iface,
1051                &info.ingress_control,
1052                info.ia_freq,
1053                info.started,
1054                now,
1055            )
1056        {
1057            return false;
1058        }
1059        self.ingress_control.hold_announce(
1060            iface,
1061            &info.ingress_control,
1062            packet.destination_hash,
1063            ingress_control::HeldAnnounce {
1064                raw: original_raw.to_vec(),
1065                hops: packet.hops,
1066                receiving_interface: iface,
1067                timestamp: now,
1068            },
1069        );
1070        true
1071    }
1072
1073    fn try_enqueue_announce(
1074        &mut self,
1075        ctx: &InboundPacketCtx,
1076        rng: &mut dyn Rng,
1077        announce_queue: &mut AnnounceVerifyQueue,
1078        actions: &mut Vec<TransportAction>,
1079    ) {
1080        if ctx.packet.flags.destination_type != constants::DESTINATION_SINGLE {
1081            return;
1082        }
1083
1084        let has_ratchet = ctx.packet.flags.context_flag == constants::FLAG_SET;
1085        let announce = match AnnounceData::unpack(&ctx.packet.data, has_ratchet) {
1086            Ok(a) => a,
1087            Err(_) => return,
1088        };
1089
1090        let received_from = self.announce_received_from(&ctx.packet, ctx.now);
1091
1092        if self
1093            .local_destinations
1094            .contains_key(&ctx.packet.destination_hash)
1095        {
1096            log::debug!(
1097                "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
1098                ctx.packet.destination_hash[0],
1099                ctx.packet.destination_hash[1],
1100                ctx.packet.destination_hash[2],
1101                ctx.packet.destination_hash[3],
1102            );
1103            return;
1104        }
1105
1106        if self.should_hold_announce(&ctx.packet, &ctx.original_raw, ctx.iface, ctx.now) {
1107            return;
1108        }
1109
1110        let sig_cache_key =
1111            Self::announce_sig_cache_key(ctx.packet.destination_hash, &announce.signature);
1112        if self.announce_sig_cache.contains(&sig_cache_key) {
1113            let validated = announce.to_validated_unchecked();
1114            let random_blob = match extract_random_blob(&ctx.packet.data) {
1115                Some(b) => b,
1116                None => return,
1117            };
1118            let announce_emitted = timebase_from_random_blob(&random_blob);
1119            self.process_verified_announce(
1120                VerifiedAnnounceCtx {
1121                    packet: &ctx.packet,
1122                    original_raw: &ctx.original_raw,
1123                    iface: ctx.iface,
1124                    now: ctx.now,
1125                    validated,
1126                    received_from,
1127                    random_blob,
1128                    announce_emitted,
1129                },
1130                rng,
1131                actions,
1132            );
1133            return;
1134        }
1135
1136        if ctx.packet.context == constants::CONTEXT_PATH_RESPONSE {
1137            let Ok(validated) = announce.validate(&ctx.packet.destination_hash) else {
1138                return;
1139            };
1140            self.announce_sig_cache.insert(sig_cache_key, ctx.now);
1141            let random_blob = match extract_random_blob(&ctx.packet.data) {
1142                Some(b) => b,
1143                None => return,
1144            };
1145            let announce_emitted = timebase_from_random_blob(&random_blob);
1146            self.process_verified_announce(
1147                VerifiedAnnounceCtx {
1148                    packet: &ctx.packet,
1149                    original_raw: &ctx.original_raw,
1150                    iface: ctx.iface,
1151                    now: ctx.now,
1152                    validated,
1153                    received_from,
1154                    random_blob,
1155                    announce_emitted,
1156                },
1157                rng,
1158                actions,
1159            );
1160            return;
1161        }
1162
1163        let random_blob = match extract_random_blob(&ctx.packet.data) {
1164            Some(b) => b,
1165            None => return,
1166        };
1167        let announce_emitted = timebase_from_random_blob(&random_blob);
1168        let key = AnnounceVerifyKey {
1169            destination_hash: ctx.packet.destination_hash,
1170            random_blob,
1171            received_from,
1172        };
1173        let pending = PendingAnnounce {
1174            original_raw: ctx.original_raw.clone(),
1175            packet: ctx.packet.clone(),
1176            interface: ctx.iface,
1177            received_from,
1178            queued_at: ctx.now,
1179            best_hops: ctx.packet.hops,
1180            emission_ts: announce_emitted,
1181            random_blob,
1182        };
1183        let _ = announce_queue.enqueue(key, pending);
1184    }
1185
1186    pub fn complete_verified_announce(
1187        &mut self,
1188        pending: PendingAnnounce,
1189        validated: crate::announce::ValidatedAnnounce,
1190        sig_cache_key: [u8; 32],
1191        now: f64,
1192        rng: &mut dyn Rng,
1193    ) -> Vec<TransportAction> {
1194        self.announce_sig_cache.insert(sig_cache_key, now);
1195        let mut actions = Vec::new();
1196        self.process_verified_announce(
1197            VerifiedAnnounceCtx {
1198                packet: &pending.packet,
1199                original_raw: &pending.original_raw,
1200                iface: pending.interface,
1201                now,
1202                validated,
1203                received_from: pending.received_from,
1204                random_blob: pending.random_blob,
1205                announce_emitted: pending.emission_ts,
1206            },
1207            rng,
1208            &mut actions,
1209        );
1210        actions
1211    }
1212
1213    pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1214
1215    fn process_verified_announce(
1216        &mut self,
1217        ctx: VerifiedAnnounceCtx<'_>,
1218        rng: &mut dyn Rng,
1219        actions: &mut Vec<TransportAction>,
1220    ) {
1221        if self.is_blackholed(&ctx.validated.identity_hash, ctx.now) {
1222            return;
1223        }
1224        if ctx.packet.hops > constants::PATHFINDER_M {
1225            return;
1226        }
1227
1228        let existing_set = self.path_table.get(&ctx.packet.destination_hash);
1229        let was_unknown_destination = existing_set.map_or(true, |ps| ps.is_empty());
1230
1231        // Reset stale path state before first-path installation so path-state handling
1232        // cannot race ahead of the path table for previously unknown destinations.
1233        if was_unknown_destination {
1234            self.path_states.remove(&ctx.packet.destination_hash);
1235        }
1236
1237        // Multi-path aware decision
1238        let is_unresponsive = self.path_is_unresponsive(&ctx.packet.destination_hash);
1239
1240        let mp_decision = decide_announce_multipath(
1241            existing_set,
1242            ctx.packet.hops,
1243            ctx.announce_emitted,
1244            &ctx.random_blob,
1245            &ctx.received_from,
1246            is_unresponsive,
1247            ctx.now,
1248            self.config.prefer_shorter_path,
1249        );
1250
1251        if mp_decision == MultiPathDecision::Reject {
1252            log::debug!(
1253                "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1254                ctx.packet.destination_hash[0],
1255                ctx.packet.destination_hash[1],
1256                ctx.packet.destination_hash[2],
1257                ctx.packet.destination_hash[3],
1258            );
1259            return;
1260        }
1261
1262        // Rate limiting
1263        let rate_blocked = if ctx.packet.context != constants::CONTEXT_PATH_RESPONSE {
1264            if let Some(iface_info) = self.interfaces.get(&ctx.iface) {
1265                self.rate_limiter.check_and_update(
1266                    &ctx.packet.destination_hash,
1267                    ctx.now,
1268                    iface_info.announce_rate_target,
1269                    iface_info.announce_rate_grace,
1270                    iface_info.announce_rate_penalty,
1271                )
1272            } else {
1273                false
1274            }
1275        } else {
1276            false
1277        };
1278
1279        // Get interface mode for expiry calculation
1280        let interface_mode = self
1281            .interfaces
1282            .get(&ctx.iface)
1283            .map(|i| i.mode)
1284            .unwrap_or(constants::MODE_FULL);
1285
1286        let expires = compute_path_expires(ctx.now, interface_mode);
1287
1288        // Get existing random blobs from the matching path (same next_hop) or empty
1289        let existing_blobs = self
1290            .path_table
1291            .get(&ctx.packet.destination_hash)
1292            .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1293            .map(|e| e.random_blobs.clone())
1294            .unwrap_or_default();
1295
1296        // Generate RNG value for retransmit timeout
1297        let mut rng_bytes = [0u8; 8];
1298        rng.fill_bytes(&mut rng_bytes);
1299        let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1300
1301        let is_path_response = ctx.packet.context == constants::CONTEXT_PATH_RESPONSE;
1302
1303        let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1304            ctx.packet.destination_hash,
1305            ctx.packet.hops,
1306            &ctx.packet.data,
1307            &ctx.packet.raw,
1308            ctx.packet.packet_hash,
1309            ctx.packet.flags.context_flag,
1310            ctx.received_from,
1311            ctx.iface,
1312            ctx.now,
1313            existing_blobs,
1314            ctx.random_blob,
1315            expires,
1316            rng_value,
1317            self.config.transport_enabled,
1318            is_path_response,
1319            rate_blocked,
1320            Some(ctx.original_raw.to_vec()),
1321        );
1322
1323        // Emit CacheAnnounce for disk caching (pre-hop-increment raw)
1324        actions.push(TransportAction::CacheAnnounce {
1325            packet_hash: ctx.packet.packet_hash,
1326            raw: ctx.original_raw.to_vec(),
1327        });
1328
1329        // Store path via upsert into PathSet
1330        self.upsert_path_destination(ctx.packet.destination_hash, path_entry, ctx.now);
1331
1332        // If receiving interface has a tunnel_id, store path in tunnel table too
1333        if let Some(tunnel_id) = self.interfaces.get(&ctx.iface).and_then(|i| i.tunnel_id) {
1334            let blobs = self
1335                .path_table
1336                .get(&ctx.packet.destination_hash)
1337                .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1338                .map(|e| e.random_blobs.clone())
1339                .unwrap_or_default();
1340            self.tunnel_table.store_tunnel_path(
1341                &tunnel_id,
1342                ctx.packet.destination_hash,
1343                tunnel::TunnelPath {
1344                    timestamp: ctx.now,
1345                    received_from: ctx.received_from,
1346                    hops: ctx.packet.hops,
1347                    expires,
1348                    random_blobs: blobs,
1349                    packet_hash: ctx.packet.packet_hash,
1350                },
1351                ctx.now,
1352                self.config.destination_timeout_secs,
1353                self.config.max_tunnel_destinations_total,
1354            );
1355        }
1356
1357        // Re-apply the path-state reset after storing the path entry so any transient
1358        // stale state is also cleared once the destination exists in the path table.
1359        self.path_states.remove(&ctx.packet.destination_hash);
1360
1361        // Store announce for retransmission
1362        if let Some(ann) = announce_entry {
1363            self.insert_announce_entry(ctx.packet.destination_hash, ann, ctx.now);
1364        }
1365
1366        // Emit actions
1367        actions.push(TransportAction::AnnounceReceived {
1368            destination_hash: ctx.packet.destination_hash,
1369            identity_hash: ctx.validated.identity_hash,
1370            public_key: ctx.validated.public_key,
1371            name_hash: ctx.validated.name_hash,
1372            random_hash: ctx.validated.random_hash,
1373            app_data: ctx.validated.app_data,
1374            hops: ctx.packet.hops,
1375            receiving_interface: ctx.iface,
1376        });
1377
1378        actions.push(TransportAction::PathUpdated {
1379            destination_hash: ctx.packet.destination_hash,
1380            hops: ctx.packet.hops,
1381            next_hop: ctx.received_from,
1382            interface: ctx.iface,
1383        });
1384
1385        // Forward announce to local clients if any are connected
1386        if self.has_local_clients() {
1387            actions.push(TransportAction::ForwardToLocalClients {
1388                raw: ctx.packet.raw.clone(),
1389                exclude: Some(ctx.iface),
1390            });
1391        }
1392
1393        // Check for discovery path requests waiting for this announce
1394        if let Some(pr_entry) = self.discovery_path_requests_waiting(&ctx.packet.destination_hash) {
1395            // Build a path response announce and queue it
1396            let entry = AnnounceEntry {
1397                timestamp: ctx.now,
1398                retransmit_timeout: ctx.now,
1399                retries: constants::PATHFINDER_R,
1400                received_from: ctx.received_from,
1401                hops: ctx.packet.hops,
1402                packet_raw: ctx.packet.raw.clone(),
1403                packet_data: ctx.packet.data.clone(),
1404                destination_hash: ctx.packet.destination_hash,
1405                context_flag: ctx.packet.flags.context_flag,
1406                local_rebroadcasts: 0,
1407                block_rebroadcasts: true,
1408                attached_interface: Some(pr_entry),
1409            };
1410            self.insert_announce_entry(ctx.packet.destination_hash, entry, ctx.now);
1411        }
1412    }
1413
1414    pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1415        self.announce_sig_cache.contains(sig_cache_key)
1416    }
1417
1418    /// Check if there's a waiting discovery path request for a destination.
1419    /// Consumes the request if found (one-shot: the caller queues the announce response).
1420    fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1421        self.discovery_path_requests
1422            .remove(dest_hash)
1423            .map(|req| req.requesting_interface)
1424    }
1425
1426    // =========================================================================
1427    // Inbound proof processing
1428    // =========================================================================
1429
1430    fn process_inbound_proof(
1431        &mut self,
1432        packet: &RawPacket,
1433        iface: InterfaceId,
1434        _now: f64,
1435        actions: &mut Vec<TransportAction>,
1436    ) {
1437        if packet.context == constants::CONTEXT_LRPROOF {
1438            // Link request proof routing
1439            if (self.config.transport_enabled)
1440                && self.link_table.contains_key(&packet.destination_hash)
1441            {
1442                let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1443                if let Some(entry) = link_entry {
1444                    if let Some((outbound_interface, new_raw)) =
1445                        route_via_link_table(packet, &entry, iface)
1446                    {
1447                        // Forward the proof (simplified: skip signature validation
1448                        // which requires Identity recall)
1449
1450                        // Mark link as validated
1451                        if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1452                            le.validated = true;
1453                        }
1454
1455                        actions.push(TransportAction::LinkEstablished {
1456                            link_id: packet.destination_hash,
1457                            interface: outbound_interface,
1458                        });
1459
1460                        actions.push(TransportAction::SendOnInterface {
1461                            interface: outbound_interface,
1462                            raw: new_raw,
1463                        });
1464                    }
1465                }
1466            } else {
1467                // Could be for a local pending link - deliver locally
1468                actions.push(TransportAction::DeliverLocal {
1469                    destination_hash: packet.destination_hash,
1470                    raw: packet.raw.clone(),
1471                    packet_hash: packet.packet_hash,
1472                    receiving_interface: iface,
1473                });
1474            }
1475        } else {
1476            // Regular proof: check reverse table
1477            if self.config.transport_enabled {
1478                if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1479                    if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1480                        actions.push(action);
1481                    }
1482                }
1483            }
1484
1485            // Deliver to local receipts
1486            actions.push(TransportAction::DeliverLocal {
1487                destination_hash: packet.destination_hash,
1488                raw: packet.raw.clone(),
1489                packet_hash: packet.packet_hash,
1490                receiving_interface: iface,
1491            });
1492        }
1493    }
1494
1495    // =========================================================================
1496    // Core API: handle_outbound
1497    // =========================================================================
1498
1499    /// Route an outbound packet.
1500    pub fn handle_outbound(
1501        &mut self,
1502        packet: &RawPacket,
1503        dest_type: u8,
1504        attached_interface: Option<InterfaceId>,
1505        now: f64,
1506    ) -> Vec<TransportAction> {
1507        let actions = route_outbound(
1508            &self.path_table,
1509            &self.interfaces,
1510            &self.local_destinations,
1511            packet,
1512            dest_type,
1513            attached_interface,
1514            now,
1515        );
1516
1517        // Add to packet hashlist for outbound packets
1518        self.packet_hashlist.add(packet.packet_hash);
1519
1520        // Gate announces with hops > 0 through the bandwidth queue
1521        if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1522            self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1523        } else {
1524            actions
1525        }
1526    }
1527
1528    /// Gate announce SendOnInterface actions through per-interface bandwidth queues.
1529    fn gate_announce_actions(
1530        &mut self,
1531        actions: Vec<TransportAction>,
1532        dest_hash: &[u8; 16],
1533        hops: u8,
1534        now: f64,
1535    ) -> Vec<TransportAction> {
1536        let mut result = Vec::new();
1537        for action in actions {
1538            match action {
1539                TransportAction::SendOnInterface { interface, raw } => {
1540                    let (bitrate, announce_cap) =
1541                        if let Some(info) = self.interfaces.get(&interface) {
1542                            (info.bitrate, info.announce_cap)
1543                        } else {
1544                            (None, constants::ANNOUNCE_CAP)
1545                        };
1546                    if let Some(send_action) = self.announce_queues.gate_announce(
1547                        interface,
1548                        raw,
1549                        *dest_hash,
1550                        hops,
1551                        now,
1552                        now,
1553                        bitrate,
1554                        announce_cap,
1555                    ) {
1556                        result.push(send_action);
1557                    }
1558                    // If None, it was queued — no action emitted now
1559                }
1560                other => result.push(other),
1561            }
1562        }
1563        result
1564    }
1565
1566    // =========================================================================
1567    // Core API: tick
1568    // =========================================================================
1569
1570    /// Periodic maintenance. Call regularly (e.g., every 250ms).
1571    pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1572        let mut ctx = TickCtx {
1573            now,
1574            rng,
1575            actions: Vec::new(),
1576        };
1577        self.process_tick_pending_announces(&mut ctx);
1578
1579        let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1580        ctx.actions.append(&mut queue_actions);
1581
1582        self.process_tick_ingress_release(&mut ctx);
1583        self.cull_tick_tables(&mut ctx);
1584        ctx.actions
1585    }
1586
1587    fn process_tick_pending_announces(&mut self, ctx: &mut TickCtx<'_>) {
1588        if ctx.now <= self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1589            return;
1590        }
1591
1592        self.cull_expired_announce_entries(ctx.now);
1593        self.enforce_announce_retention_cap(ctx.now);
1594        if let Some(identity_hash) = self.config.identity_hash {
1595            let announce_actions = jobs::process_pending_announces(
1596                &mut self.announce_table,
1597                &mut self.held_announces,
1598                &identity_hash,
1599                ctx.now,
1600            );
1601            let gated = self.gate_retransmit_actions(announce_actions, ctx.now);
1602            ctx.actions.extend(gated);
1603        }
1604        self.cull_expired_announce_entries(ctx.now);
1605        self.enforce_announce_retention_cap(ctx.now);
1606        self.announces_last_checked = ctx.now;
1607    }
1608
1609    fn process_tick_ingress_release(&mut self, ctx: &mut TickCtx<'_>) {
1610        let ic_interfaces = self.ingress_control.interfaces_with_held();
1611        for iface_id in ic_interfaces {
1612            let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1613                Some(info) => (info.ia_freq, info.started, info.ingress_control),
1614                None => continue,
1615            };
1616            if !ingress_config.enabled {
1617                continue;
1618            }
1619            if let Some(held) = self.ingress_control.process_held_announces(
1620                iface_id,
1621                &ingress_config,
1622                ia_freq,
1623                started,
1624                ctx.now,
1625            ) {
1626                let released_actions =
1627                    self.handle_inbound(&held.raw, held.receiving_interface, ctx.now, ctx.rng);
1628                ctx.actions.extend(released_actions);
1629            }
1630        }
1631    }
1632
1633    fn cull_tick_tables(&mut self, ctx: &mut TickCtx<'_>) {
1634        if ctx.now <= self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1635            return;
1636        }
1637
1638        jobs::cull_path_table(&mut self.path_table, &self.interfaces, ctx.now);
1639        jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, ctx.now);
1640        let (_culled, link_closed_actions) =
1641            jobs::cull_link_table(&mut self.link_table, &self.interfaces, ctx.now);
1642        ctx.actions.extend(link_closed_actions);
1643        jobs::cull_path_states(&mut self.path_states, &self.path_table);
1644        self.cull_blackholed(ctx.now);
1645        self.discovery_path_requests
1646            .retain(|_, req| ctx.now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1647        self.tunnel_table
1648            .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1649        self.tunnel_table.cull(ctx.now);
1650        self.announce_sig_cache.cull(ctx.now);
1651        self.tables_last_culled = ctx.now;
1652    }
1653
1654    /// Gate retransmitted announce actions through per-interface bandwidth queues.
1655    ///
1656    /// Retransmitted announces always have hops > 0.
1657    /// `BroadcastOnAllInterfaces` is expanded to per-interface sends gated through queues.
1658    fn gate_retransmit_actions(
1659        &mut self,
1660        actions: Vec<TransportAction>,
1661        now: f64,
1662    ) -> Vec<TransportAction> {
1663        let mut result = Vec::new();
1664        for action in actions {
1665            match action {
1666                TransportAction::SendOnInterface { interface, raw } => {
1667                    // Extract dest_hash from raw (bytes 2..18 for H1, 18..34 for H2)
1668                    let (dest_hash, hops) = Self::extract_announce_info(&raw);
1669                    let (bitrate, announce_cap) =
1670                        if let Some(info) = self.interfaces.get(&interface) {
1671                            (info.bitrate, info.announce_cap)
1672                        } else {
1673                            (None, constants::ANNOUNCE_CAP)
1674                        };
1675                    if let Some(send_action) = self.announce_queues.gate_announce(
1676                        interface,
1677                        raw,
1678                        dest_hash,
1679                        hops,
1680                        now,
1681                        now,
1682                        bitrate,
1683                        announce_cap,
1684                    ) {
1685                        result.push(send_action);
1686                    }
1687                }
1688                TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1689                    let (dest_hash, hops) = Self::extract_announce_info(&raw);
1690                    // Expand to per-interface sends gated through queues,
1691                    // applying mode filtering (AP blocks non-local announces, etc.)
1692                    let iface_ids: Vec<(InterfaceId, Option<u64>, f64)> = self
1693                        .interfaces
1694                        .iter()
1695                        .filter(|(_, info)| info.out_capable)
1696                        .filter(|(id, _)| {
1697                            if let Some(ref ex) = exclude {
1698                                **id != *ex
1699                            } else {
1700                                true
1701                            }
1702                        })
1703                        .filter(|(_, info)| {
1704                            should_transmit_announce(
1705                                info,
1706                                &dest_hash,
1707                                hops,
1708                                &self.local_destinations,
1709                                &self.path_table,
1710                                &self.interfaces,
1711                            )
1712                        })
1713                        .map(|(id, info)| (*id, info.bitrate, info.announce_cap))
1714                        .collect();
1715
1716                    for (iface_id, bitrate, announce_cap) in iface_ids {
1717                        if let Some(send_action) = self.announce_queues.gate_announce(
1718                            iface_id,
1719                            raw.clone(),
1720                            dest_hash,
1721                            hops,
1722                            now,
1723                            now,
1724                            bitrate,
1725                            announce_cap,
1726                        ) {
1727                            result.push(send_action);
1728                        }
1729                    }
1730                }
1731                other => result.push(other),
1732            }
1733        }
1734        result
1735    }
1736
1737    /// Extract destination hash and hops from raw announce bytes.
1738    fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1739        if raw.len() < 18 {
1740            return ([0; 16], 0);
1741        }
1742        let header_type = (raw[0] >> 6) & 0x03;
1743        let hops = raw[1];
1744        if header_type == constants::HEADER_2 && raw.len() >= 34 {
1745            // H2: transport_id at [2..18], dest_hash at [18..34]
1746            let mut dest = [0u8; 16];
1747            dest.copy_from_slice(&raw[18..34]);
1748            (dest, hops)
1749        } else {
1750            // H1: dest_hash at [2..18]
1751            let mut dest = [0u8; 16];
1752            dest.copy_from_slice(&raw[2..18]);
1753            (dest, hops)
1754        }
1755    }
1756
1757    // =========================================================================
1758    // Path request handling
1759    // =========================================================================
1760
1761    /// Handle an incoming path request.
1762    ///
1763    /// Transport.py path_request_handler (~line 2700):
1764    /// - Dedup via unique tag
1765    /// - If local destination, caller handles announce
1766    /// - If path known and transport enabled, queue retransmit (with ROAMING loop prevention)
1767    /// - If path unknown and receiving interface is in DISCOVER_PATHS_FOR, store a
1768    ///   DiscoveryPathRequest and forward the raw path request on other OUT interfaces
1769    pub fn handle_path_request(
1770        &mut self,
1771        data: &[u8],
1772        interface_id: InterfaceId,
1773        now: f64,
1774    ) -> Vec<TransportAction> {
1775        let Some(ctx) = self.parse_path_request(data, interface_id, now) else {
1776            return Vec::new();
1777        };
1778        if self.local_destinations.contains_key(&ctx.destination_hash) {
1779            return Vec::new();
1780        }
1781        if self.config.transport_enabled && self.has_path(&ctx.destination_hash) {
1782            self.handle_known_path_request(&ctx);
1783            return Vec::new();
1784        }
1785        if self.config.transport_enabled {
1786            return self.handle_discovery_path_request(&ctx);
1787        }
1788        Vec::new()
1789    }
1790
1791    fn parse_path_request<'a>(
1792        &mut self,
1793        data: &'a [u8],
1794        interface_id: InterfaceId,
1795        now: f64,
1796    ) -> Option<PathRequestCtx<'a>> {
1797        if data.len() < 16 {
1798            return None;
1799        }
1800
1801        let mut destination_hash = [0u8; 16];
1802        destination_hash.copy_from_slice(&data[..16]);
1803
1804        let tag_bytes = if data.len() > 32 {
1805            Some(&data[32..])
1806        } else if data.len() > 16 {
1807            Some(&data[16..])
1808        } else {
1809            None
1810        }?;
1811
1812        let tag_len = tag_bytes.len().min(16);
1813        let mut unique_tag = [0u8; 32];
1814        unique_tag[..16].copy_from_slice(&destination_hash);
1815        unique_tag[16..16 + tag_len].copy_from_slice(&tag_bytes[..tag_len]);
1816        if !self.insert_discovery_pr_tag(unique_tag) {
1817            return None;
1818        }
1819
1820        Some(PathRequestCtx {
1821            data,
1822            interface_id,
1823            now,
1824            destination_hash,
1825        })
1826    }
1827
1828    fn handle_known_path_request(&mut self, ctx: &PathRequestCtx<'_>) {
1829        let Some(path) = self
1830            .path_table
1831            .get(&ctx.destination_hash)
1832            .and_then(|ps| ps.primary())
1833            .cloned()
1834        else {
1835            return;
1836        };
1837
1838        if let Some(recv_info) = self.interfaces.get(&ctx.interface_id) {
1839            if recv_info.mode == constants::MODE_ROAMING
1840                && path.receiving_interface == ctx.interface_id
1841            {
1842                return;
1843            }
1844        }
1845
1846        let Some(raw) = path.announce_raw.as_ref() else {
1847            return;
1848        };
1849        if let Some(existing) = self.announce_table.remove(&ctx.destination_hash) {
1850            self.insert_held_announce(ctx.destination_hash, existing, ctx.now);
1851        }
1852        let retransmit_timeout = if let Some(iface_info) = self.interfaces.get(&ctx.interface_id) {
1853            let base = ctx.now + constants::PATH_REQUEST_GRACE;
1854            if iface_info.mode == constants::MODE_ROAMING {
1855                base + constants::PATH_REQUEST_RG
1856            } else {
1857                base
1858            }
1859        } else {
1860            ctx.now + constants::PATH_REQUEST_GRACE
1861        };
1862
1863        let Ok(parsed) = RawPacket::unpack(raw) else {
1864            return;
1865        };
1866
1867        let entry = AnnounceEntry {
1868            timestamp: ctx.now,
1869            retransmit_timeout,
1870            retries: constants::PATHFINDER_R,
1871            received_from: path.next_hop,
1872            hops: path.hops,
1873            packet_raw: raw.clone(),
1874            packet_data: parsed.data,
1875            destination_hash: ctx.destination_hash,
1876            context_flag: parsed.flags.context_flag,
1877            local_rebroadcasts: 0,
1878            block_rebroadcasts: true,
1879            attached_interface: Some(ctx.interface_id),
1880        };
1881
1882        self.insert_announce_entry(ctx.destination_hash, entry, ctx.now);
1883    }
1884
1885    fn handle_discovery_path_request(&mut self, ctx: &PathRequestCtx<'_>) -> Vec<TransportAction> {
1886        let should_discover = self
1887            .interfaces
1888            .get(&ctx.interface_id)
1889            .map(|info| constants::DISCOVER_PATHS_FOR.contains(&info.mode))
1890            .unwrap_or(false);
1891        if !should_discover {
1892            return Vec::new();
1893        }
1894
1895        self.discovery_path_requests.insert(
1896            ctx.destination_hash,
1897            DiscoveryPathRequest {
1898                timestamp: ctx.now,
1899                requesting_interface: ctx.interface_id,
1900            },
1901        );
1902
1903        self.interfaces
1904            .values()
1905            .filter(|iface_info| iface_info.id != ctx.interface_id && iface_info.out_capable)
1906            .map(|iface_info| TransportAction::SendOnInterface {
1907                interface: iface_info.id,
1908                raw: ctx.data.to_vec(),
1909            })
1910            .collect()
1911    }
1912
1913    // =========================================================================
1914    // Public read accessors
1915    // =========================================================================
1916
1917    /// Iterate over primary path entries (one per destination).
1918    pub fn path_table_entries(&self) -> impl Iterator<Item = (&[u8; 16], &PathEntry)> {
1919        self.path_table
1920            .iter()
1921            .filter_map(|(k, ps)| ps.primary().map(|e| (k, e)))
1922    }
1923
1924    /// Iterate over all path sets (exposing alternatives).
1925    pub fn path_table_sets(&self) -> impl Iterator<Item = (&[u8; 16], &PathSet)> {
1926        self.path_table.iter()
1927    }
1928
1929    /// Number of registered interfaces.
1930    pub fn interface_count(&self) -> usize {
1931        self.interfaces.len()
1932    }
1933
1934    /// Number of link table entries.
1935    pub fn link_table_count(&self) -> usize {
1936        self.link_table.len()
1937    }
1938
1939    /// Number of path table entries.
1940    pub fn path_table_count(&self) -> usize {
1941        self.path_table.len()
1942    }
1943
1944    /// Number of announce table entries.
1945    pub fn announce_table_count(&self) -> usize {
1946        self.announce_table.len()
1947    }
1948
1949    /// Number of reverse table entries.
1950    pub fn reverse_table_count(&self) -> usize {
1951        self.reverse_table.len()
1952    }
1953
1954    /// Number of held announces.
1955    pub fn held_announces_count(&self) -> usize {
1956        self.held_announces.len()
1957    }
1958
1959    /// Number of entries in the packet hashlist.
1960    pub fn packet_hashlist_len(&self) -> usize {
1961        self.packet_hashlist.len()
1962    }
1963
1964    /// Number of entries in the announce signature verification cache.
1965    pub fn announce_sig_cache_len(&self) -> usize {
1966        self.announce_sig_cache.len()
1967    }
1968
1969    /// Number of entries in the rate limiter.
1970    pub fn rate_limiter_count(&self) -> usize {
1971        self.rate_limiter.len()
1972    }
1973
1974    /// Number of blackholed identities.
1975    pub fn blackholed_count(&self) -> usize {
1976        self.blackholed_identities.len()
1977    }
1978
1979    /// Number of tunnel table entries.
1980    pub fn tunnel_count(&self) -> usize {
1981        self.tunnel_table.len()
1982    }
1983
1984    /// Number of discovery PR tags.
1985    pub fn discovery_pr_tags_count(&self) -> usize {
1986        self.discovery_pr_tags.len()
1987    }
1988
1989    /// Number of discovery path requests.
1990    pub fn discovery_path_requests_count(&self) -> usize {
1991        self.discovery_path_requests.len()
1992    }
1993
1994    /// Number of announce bandwidth queues currently tracked.
1995    pub fn announce_queue_count(&self) -> usize {
1996        self.announce_queues.queue_count()
1997    }
1998
1999    /// Number of non-empty announce bandwidth queues.
2000    pub fn nonempty_announce_queue_count(&self) -> usize {
2001        self.announce_queues.nonempty_queue_count()
2002    }
2003
2004    /// Total number of buffered announces across all announce queues.
2005    pub fn queued_announce_count(&self) -> usize {
2006        self.announce_queues.total_queued_announces()
2007    }
2008
2009    /// Total retained raw-byte payload across all buffered announce queues.
2010    pub fn queued_announce_bytes(&self) -> usize {
2011        self.announce_queues.total_queued_bytes()
2012    }
2013
2014    /// Number of announces dropped because the announce-queue interface cap was reached.
2015    pub fn announce_queue_interface_cap_drop_count(&self) -> u64 {
2016        self.announce_queues.interface_cap_drop_count()
2017    }
2018
2019    /// Number of local destinations.
2020    pub fn local_destinations_count(&self) -> usize {
2021        self.local_destinations.len()
2022    }
2023
2024    /// Access the rate limiter for reading rate table entries.
2025    pub fn rate_limiter(&self) -> &AnnounceRateLimiter {
2026        &self.rate_limiter
2027    }
2028
2029    /// Get interface info by id.
2030    pub fn interface_info(&self, id: &InterfaceId) -> Option<&InterfaceInfo> {
2031        self.interfaces.get(id)
2032    }
2033
2034    /// Redirect a path entry to a different interface (e.g. after direct connect).
2035    /// If no entry exists, creates a minimal direct path (hops=1).
2036    pub fn redirect_path(&mut self, dest_hash: &[u8; 16], interface: InterfaceId, now: f64) {
2037        if let Some(entry) = self
2038            .path_table
2039            .get_mut(dest_hash)
2040            .and_then(|ps| ps.primary_mut())
2041        {
2042            entry.receiving_interface = interface;
2043            entry.hops = 1;
2044        } else {
2045            self.upsert_path_destination(
2046                *dest_hash,
2047                PathEntry {
2048                    timestamp: now,
2049                    next_hop: [0u8; 16],
2050                    hops: 1,
2051                    expires: now + 3600.0,
2052                    random_blobs: Vec::new(),
2053                    receiving_interface: interface,
2054                    packet_hash: [0u8; 32],
2055                    announce_raw: None,
2056                },
2057                now,
2058            );
2059        }
2060    }
2061
2062    /// Inject a path entry directly into the path table (full override).
2063    pub fn inject_path(&mut self, dest_hash: [u8; 16], entry: PathEntry) {
2064        self.upsert_path_destination(dest_hash, entry.clone(), entry.timestamp);
2065    }
2066
2067    /// Drop a path from the path table.
2068    pub fn drop_path(&mut self, dest_hash: &[u8; 16]) -> bool {
2069        self.path_table.remove(dest_hash).is_some()
2070    }
2071
2072    /// Drop all paths that route via a given transport hash.
2073    ///
2074    /// Removes matching individual paths from each PathSet, then removes
2075    /// any PathSets that become empty.
2076    pub fn drop_all_via(&mut self, transport_hash: &[u8; 16]) -> usize {
2077        let mut removed = 0usize;
2078        for ps in self.path_table.values_mut() {
2079            let before = ps.len();
2080            ps.retain(|entry| &entry.next_hop != transport_hash);
2081            removed += before - ps.len();
2082        }
2083        self.path_table.retain(|_, ps| !ps.is_empty());
2084        removed
2085    }
2086
2087    /// Drop all path entries learned via a given interface.
2088    pub fn drop_paths_for_interface(&mut self, interface: InterfaceId) -> usize {
2089        let mut removed = 0usize;
2090        let mut cleared_destinations = Vec::new();
2091        for (dest_hash, ps) in self.path_table.iter_mut() {
2092            let before = ps.len();
2093            ps.retain(|entry| entry.receiving_interface != interface);
2094            if ps.is_empty() {
2095                cleared_destinations.push(*dest_hash);
2096            }
2097            removed += before - ps.len();
2098        }
2099        self.path_table.retain(|_, ps| !ps.is_empty());
2100        for dest_hash in cleared_destinations {
2101            self.path_states.remove(&dest_hash);
2102        }
2103        removed
2104    }
2105
2106    /// Drop all reverse table entries that reference a given interface.
2107    pub fn drop_reverse_for_interface(&mut self, interface: InterfaceId) -> usize {
2108        let before = self.reverse_table.len();
2109        self.reverse_table.retain(|_, entry| {
2110            entry.receiving_interface != interface && entry.outbound_interface != interface
2111        });
2112        before - self.reverse_table.len()
2113    }
2114
2115    /// Drop all link table entries that reference a given interface.
2116    pub fn drop_links_for_interface(&mut self, interface: InterfaceId) -> usize {
2117        let before = self.link_table.len();
2118        self.link_table.retain(|_, entry| {
2119            entry.next_hop_interface != interface && entry.received_interface != interface
2120        });
2121        before - self.link_table.len()
2122    }
2123
2124    /// Drop all pending announce retransmissions and bandwidth queues.
2125    pub fn drop_announce_queues(&mut self) {
2126        self.announce_table.clear();
2127        self.held_announces.clear();
2128        self.announce_queues = AnnounceQueues::new(self.config.announce_queue_max_interfaces);
2129        self.ingress_control.clear();
2130    }
2131
2132    /// Get the transport identity hash.
2133    pub fn identity_hash(&self) -> Option<&[u8; 16]> {
2134        self.config.identity_hash.as_ref()
2135    }
2136
2137    /// Whether transport is enabled.
2138    pub fn transport_enabled(&self) -> bool {
2139        self.config.transport_enabled
2140    }
2141
2142    /// Access the transport configuration.
2143    pub fn config(&self) -> &TransportConfig {
2144        &self.config
2145    }
2146
2147    pub fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
2148        self.config.packet_hashlist_max_entries = max_entries;
2149        self.packet_hashlist = PacketHashlist::new(max_entries);
2150    }
2151
2152    /// Get path table entries as tuples for management queries.
2153    /// Returns (dest_hash, timestamp, next_hop, hops, expires, interface_name).
2154    /// Reports primaries only for backward compatibility.
2155    pub fn get_path_table(&self, max_hops: Option<u8>) -> Vec<PathTableRow> {
2156        let mut result = Vec::new();
2157        for (dest_hash, ps) in self.path_table.iter() {
2158            if let Some(entry) = ps.primary() {
2159                if let Some(max) = max_hops {
2160                    if entry.hops > max {
2161                        continue;
2162                    }
2163                }
2164                let iface_name = self
2165                    .interfaces
2166                    .get(&entry.receiving_interface)
2167                    .map(|i| i.name.clone())
2168                    .unwrap_or_else(|| {
2169                        alloc::format!("Interface({})", entry.receiving_interface.0)
2170                    });
2171                result.push((
2172                    *dest_hash,
2173                    entry.timestamp,
2174                    entry.next_hop,
2175                    entry.hops,
2176                    entry.expires,
2177                    iface_name,
2178                ));
2179            }
2180        }
2181        result
2182    }
2183
2184    /// Get rate table entries as tuples for management queries.
2185    /// Returns (dest_hash, last, rate_violations, blocked_until, timestamps).
2186    pub fn get_rate_table(&self) -> Vec<RateTableRow> {
2187        self.rate_limiter
2188            .entries()
2189            .map(|(hash, entry)| {
2190                (
2191                    *hash,
2192                    entry.last,
2193                    entry.rate_violations,
2194                    entry.blocked_until,
2195                    entry.timestamps.clone(),
2196                )
2197            })
2198            .collect()
2199    }
2200
2201    /// Get blackholed identities as tuples for management queries.
2202    /// Returns (identity_hash, created, expires, reason).
2203    pub fn get_blackholed(&self) -> Vec<([u8; 16], f64, f64, Option<alloc::string::String>)> {
2204        self.blackholed_entries()
2205            .map(|(hash, entry)| (*hash, entry.created, entry.expires, entry.reason.clone()))
2206            .collect()
2207    }
2208
2209    // =========================================================================
2210    // Cleanup
2211    // =========================================================================
2212
2213    /// Return the set of destination hashes that currently have active paths.
2214    pub fn active_destination_hashes(&self) -> alloc::collections::BTreeSet<[u8; 16]> {
2215        self.path_table.keys().copied().collect()
2216    }
2217
2218    pub fn path_destination_cap_evict_count(&self) -> usize {
2219        self.path_destination_cap_evict_count
2220    }
2221
2222    /// Collect all packet hashes from active path entries (all paths, not just primaries).
2223    pub fn active_packet_hashes(&self) -> Vec<[u8; 32]> {
2224        self.path_table
2225            .values()
2226            .flat_map(|ps| ps.iter().map(|p| p.packet_hash))
2227            .collect()
2228    }
2229
2230    /// Cull rate limiter entries for destinations that are neither active nor recently used.
2231    /// Returns the number of removed entries.
2232    pub fn cull_rate_limiter(
2233        &mut self,
2234        active: &alloc::collections::BTreeSet<[u8; 16]>,
2235        now: f64,
2236        ttl_secs: f64,
2237    ) -> usize {
2238        self.rate_limiter.cull_stale(active, now, ttl_secs)
2239    }
2240
2241    // =========================================================================
2242    // Ingress control
2243    // =========================================================================
2244
2245    /// Update the incoming announce frequency for an interface.
2246    pub fn update_interface_freq(&mut self, id: InterfaceId, ia_freq: f64) {
2247        if let Some(info) = self.interfaces.get_mut(&id) {
2248            info.ia_freq = ia_freq;
2249        }
2250    }
2251
2252    /// Get the count of held announces for an interface (for management reporting).
2253    pub fn held_announce_count(&self, interface: &InterfaceId) -> usize {
2254        self.ingress_control.held_count(interface)
2255    }
2256
2257    // =========================================================================
2258    // Testing helpers
2259    // =========================================================================
2260
2261    #[cfg(test)]
2262    #[allow(dead_code)]
2263    pub(crate) fn path_table(&self) -> &BTreeMap<[u8; 16], PathSet> {
2264        &self.path_table
2265    }
2266
2267    #[cfg(test)]
2268    #[allow(dead_code)]
2269    pub(crate) fn announce_table(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2270        &self.announce_table
2271    }
2272
2273    #[cfg(test)]
2274    #[allow(dead_code)]
2275    pub(crate) fn held_announces(&self) -> &BTreeMap<[u8; 16], AnnounceEntry> {
2276        &self.held_announces
2277    }
2278
2279    #[cfg(test)]
2280    #[allow(dead_code)]
2281    pub(crate) fn announce_retained_bytes(&self) -> usize {
2282        self.announce_retained_bytes_total()
2283    }
2284
2285    #[cfg(test)]
2286    #[allow(dead_code)]
2287    pub(crate) fn reverse_table(&self) -> &BTreeMap<[u8; 16], tables::ReverseEntry> {
2288        &self.reverse_table
2289    }
2290
2291    #[cfg(test)]
2292    #[allow(dead_code)]
2293    pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
2294        &self.link_table
2295    }
2296}
2297
2298#[cfg(test)]
2299mod tests {
2300    use super::*;
2301    use crate::packet::PacketFlags;
2302
2303    fn make_config(transport_enabled: bool) -> TransportConfig {
2304        TransportConfig {
2305            transport_enabled,
2306            identity_hash: if transport_enabled {
2307                Some([0x42; 16])
2308            } else {
2309                None
2310            },
2311            prefer_shorter_path: false,
2312            max_paths_per_destination: 1,
2313            packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
2314            max_discovery_pr_tags: constants::MAX_PR_TAGS,
2315            max_path_destinations: usize::MAX,
2316            max_tunnel_destinations_total: usize::MAX,
2317            destination_timeout_secs: constants::DESTINATION_TIMEOUT,
2318            announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
2319            announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
2320            announce_sig_cache_enabled: true,
2321            announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
2322            announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
2323            announce_queue_max_entries: 256,
2324            announce_queue_max_interfaces: 1024,
2325        }
2326    }
2327
2328    fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
2329        InterfaceInfo {
2330            id: InterfaceId(id),
2331            name: String::from("test"),
2332            mode,
2333            out_capable: true,
2334            in_capable: true,
2335            bitrate: None,
2336            announce_rate_target: None,
2337            announce_rate_grace: 0,
2338            announce_rate_penalty: 0.0,
2339            announce_cap: constants::ANNOUNCE_CAP,
2340            is_local_client: false,
2341            wants_tunnel: false,
2342            tunnel_id: None,
2343            mtu: constants::MTU as u32,
2344            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2345            ia_freq: 0.0,
2346            started: 0.0,
2347        }
2348    }
2349
2350    fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
2351        AnnounceEntry {
2352            timestamp,
2353            retransmit_timeout: timestamp,
2354            retries: 0,
2355            received_from: [0xAA; 16],
2356            hops: 2,
2357            packet_raw: vec![0x01; fill_len],
2358            packet_data: vec![0x02; fill_len],
2359            destination_hash: dest_hash,
2360            context_flag: 0,
2361            local_rebroadcasts: 0,
2362            block_rebroadcasts: false,
2363            attached_interface: None,
2364        }
2365    }
2366
2367    fn make_path_entry(
2368        timestamp: f64,
2369        hops: u8,
2370        receiving_interface: InterfaceId,
2371        next_hop: [u8; 16],
2372    ) -> PathEntry {
2373        PathEntry {
2374            timestamp,
2375            next_hop,
2376            hops,
2377            expires: timestamp + 10_000.0,
2378            random_blobs: Vec::new(),
2379            receiving_interface,
2380            packet_hash: [0; 32],
2381            announce_raw: None,
2382        }
2383    }
2384
2385    fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
2386        let mut unique_tag = [0u8; 32];
2387        let tag_len = tag.len().min(16);
2388        unique_tag[..16].copy_from_slice(&dest_hash);
2389        unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
2390        unique_tag
2391    }
2392
2393    fn make_random_blob(timebase: u64) -> [u8; 10] {
2394        let mut blob = [0u8; 10];
2395        let bytes = timebase.to_be_bytes();
2396        blob[5..10].copy_from_slice(&bytes[3..8]);
2397        blob
2398    }
2399
2400    #[test]
2401    fn test_empty_engine() {
2402        let engine = TransportEngine::new(make_config(false));
2403        assert!(!engine.has_path(&[0; 16]));
2404        assert!(engine.hops_to(&[0; 16]).is_none());
2405        assert!(engine.next_hop(&[0; 16]).is_none());
2406    }
2407
2408    #[test]
2409    fn test_register_deregister_interface() {
2410        let mut engine = TransportEngine::new(make_config(false));
2411        engine.register_interface(make_interface(1, constants::MODE_FULL));
2412        assert!(engine.interfaces.contains_key(&InterfaceId(1)));
2413
2414        engine.deregister_interface(InterfaceId(1));
2415        assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
2416    }
2417
2418    #[test]
2419    fn test_deregister_interface_removes_announce_queue_state() {
2420        let mut engine = TransportEngine::new(make_config(false));
2421        engine.register_interface(make_interface(1, constants::MODE_FULL));
2422
2423        let _ = engine.announce_queues.gate_announce(
2424            InterfaceId(1),
2425            vec![0x01; 100],
2426            [0xAA; 16],
2427            2,
2428            0.0,
2429            0.0,
2430            Some(1000),
2431            constants::ANNOUNCE_CAP,
2432        );
2433        let _ = engine.announce_queues.gate_announce(
2434            InterfaceId(1),
2435            vec![0x02; 100],
2436            [0xBB; 16],
2437            3,
2438            0.0,
2439            0.0,
2440            Some(1000),
2441            constants::ANNOUNCE_CAP,
2442        );
2443        assert_eq!(engine.announce_queue_count(), 1);
2444
2445        engine.deregister_interface(InterfaceId(1));
2446        assert_eq!(engine.announce_queue_count(), 0);
2447    }
2448
2449    #[test]
2450    fn test_deregister_interface_preserves_other_announce_queues() {
2451        let mut engine = TransportEngine::new(make_config(false));
2452        engine.register_interface(make_interface(1, constants::MODE_FULL));
2453        engine.register_interface(make_interface(2, constants::MODE_FULL));
2454
2455        let _ = engine.announce_queues.gate_announce(
2456            InterfaceId(1),
2457            vec![0x01; 100],
2458            [0xAA; 16],
2459            2,
2460            0.0,
2461            0.0,
2462            Some(1000),
2463            constants::ANNOUNCE_CAP,
2464        );
2465        let _ = engine.announce_queues.gate_announce(
2466            InterfaceId(1),
2467            vec![0x02; 100],
2468            [0xAB; 16],
2469            3,
2470            0.0,
2471            0.0,
2472            Some(1000),
2473            constants::ANNOUNCE_CAP,
2474        );
2475        let _ = engine.announce_queues.gate_announce(
2476            InterfaceId(2),
2477            vec![0x03; 100],
2478            [0xBA; 16],
2479            2,
2480            0.0,
2481            0.0,
2482            Some(1000),
2483            constants::ANNOUNCE_CAP,
2484        );
2485        let _ = engine.announce_queues.gate_announce(
2486            InterfaceId(2),
2487            vec![0x04; 100],
2488            [0xBB; 16],
2489            3,
2490            0.0,
2491            0.0,
2492            Some(1000),
2493            constants::ANNOUNCE_CAP,
2494        );
2495
2496        engine.deregister_interface(InterfaceId(1));
2497        assert_eq!(engine.announce_queue_count(), 1);
2498        assert_eq!(engine.nonempty_announce_queue_count(), 1);
2499    }
2500
2501    #[test]
2502    fn test_register_deregister_destination() {
2503        let mut engine = TransportEngine::new(make_config(false));
2504        let dest = [0x11; 16];
2505        engine.register_destination(dest, constants::DESTINATION_SINGLE);
2506        assert!(engine.local_destinations.contains_key(&dest));
2507
2508        engine.deregister_destination(&dest);
2509        assert!(!engine.local_destinations.contains_key(&dest));
2510    }
2511
2512    #[test]
2513    fn test_path_state() {
2514        let mut engine = TransportEngine::new(make_config(false));
2515        let dest = [0x22; 16];
2516
2517        assert!(!engine.path_is_unresponsive(&dest));
2518
2519        engine.mark_path_unresponsive(&dest, None);
2520        assert!(engine.path_is_unresponsive(&dest));
2521
2522        engine.mark_path_responsive(&dest);
2523        assert!(!engine.path_is_unresponsive(&dest));
2524    }
2525
2526    #[test]
2527    fn test_announce_clears_stale_path_state_for_unknown_destination() {
2528        use crate::announce::AnnounceData;
2529        use crate::destination::{destination_hash, name_hash};
2530
2531        let mut engine = TransportEngine::new(make_config(false));
2532        engine.register_interface(make_interface(1, constants::MODE_FULL));
2533
2534        let identity =
2535            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x61; 32]));
2536        let dest_hash = destination_hash("pathfix", &["announce"], Some(identity.hash()));
2537        let name_h = name_hash("pathfix", &["announce"]);
2538        let random_hash = [0x24u8; 10];
2539
2540        let (announce_data, _) =
2541            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2542
2543        let packet = RawPacket::pack(
2544            PacketFlags {
2545                header_type: constants::HEADER_1,
2546                context_flag: constants::FLAG_UNSET,
2547                transport_type: constants::TRANSPORT_BROADCAST,
2548                destination_type: constants::DESTINATION_SINGLE,
2549                packet_type: constants::PACKET_TYPE_ANNOUNCE,
2550            },
2551            0,
2552            &dest_hash,
2553            None,
2554            constants::CONTEXT_NONE,
2555            &announce_data,
2556        )
2557        .unwrap();
2558
2559        engine.mark_path_unresponsive(&dest_hash, None);
2560        assert!(engine.path_is_unresponsive(&dest_hash));
2561        assert!(!engine.has_path(&dest_hash));
2562
2563        let mut rng = rns_crypto::FixedRng::new(&[0x62; 32]);
2564        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2565
2566        assert!(engine.has_path(&dest_hash));
2567        assert!(
2568            !engine.path_is_unresponsive(&dest_hash),
2569            "stale path state should be cleared for newly installed paths"
2570        );
2571        assert!(actions.iter().any(|action| matches!(
2572            action,
2573            TransportAction::PathUpdated {
2574                destination_hash,
2575                interface,
2576                ..
2577            } if *destination_hash == dest_hash && *interface == InterfaceId(1)
2578        )));
2579    }
2580
2581    #[test]
2582    fn test_boundary_exempts_unresponsive() {
2583        let mut engine = TransportEngine::new(make_config(false));
2584        engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
2585        let dest = [0xB1; 16];
2586
2587        // Marking via a boundary interface should be skipped
2588        engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2589        assert!(!engine.path_is_unresponsive(&dest));
2590    }
2591
2592    #[test]
2593    fn test_non_boundary_marks_unresponsive() {
2594        let mut engine = TransportEngine::new(make_config(false));
2595        engine.register_interface(make_interface(1, constants::MODE_FULL));
2596        let dest = [0xB2; 16];
2597
2598        // Marking via a non-boundary interface should work
2599        engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
2600        assert!(engine.path_is_unresponsive(&dest));
2601    }
2602
2603    #[test]
2604    fn test_expire_path() {
2605        let mut engine = TransportEngine::new(make_config(false));
2606        let dest = [0x33; 16];
2607
2608        engine.path_table.insert(
2609            dest,
2610            PathSet::from_single(
2611                PathEntry {
2612                    timestamp: 1000.0,
2613                    next_hop: [0; 16],
2614                    hops: 2,
2615                    expires: 9999.0,
2616                    random_blobs: Vec::new(),
2617                    receiving_interface: InterfaceId(1),
2618                    packet_hash: [0; 32],
2619                    announce_raw: None,
2620                },
2621                1,
2622            ),
2623        );
2624
2625        assert!(engine.has_path(&dest));
2626        engine.expire_path(&dest);
2627        // Path still exists but expires = 0
2628        assert!(engine.has_path(&dest));
2629        assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
2630    }
2631
2632    #[test]
2633    fn test_link_table_operations() {
2634        let mut engine = TransportEngine::new(make_config(false));
2635        let link_id = [0x44; 16];
2636
2637        engine.register_link(
2638            link_id,
2639            LinkEntry {
2640                timestamp: 100.0,
2641                next_hop_transport_id: [0; 16],
2642                next_hop_interface: InterfaceId(1),
2643                remaining_hops: 3,
2644                received_interface: InterfaceId(2),
2645                taken_hops: 2,
2646                destination_hash: [0xAA; 16],
2647                validated: false,
2648                proof_timeout: 200.0,
2649            },
2650        );
2651
2652        assert!(engine.link_table.contains_key(&link_id));
2653        assert!(!engine.link_table[&link_id].validated);
2654
2655        engine.validate_link(&link_id);
2656        assert!(engine.link_table[&link_id].validated);
2657
2658        engine.remove_link(&link_id);
2659        assert!(!engine.link_table.contains_key(&link_id));
2660    }
2661
2662    #[test]
2663    fn test_lrproof_routes_from_originating_side_via_link_table() {
2664        let mut engine = TransportEngine::new(make_config(true));
2665        engine.register_interface(make_interface(1, constants::MODE_FULL));
2666        engine.register_interface(make_interface(2, constants::MODE_FULL));
2667
2668        let link_id = [0x44; 16];
2669        engine.register_link(
2670            link_id,
2671            LinkEntry {
2672                timestamp: 100.0,
2673                next_hop_transport_id: [0xAA; 16],
2674                next_hop_interface: InterfaceId(2),
2675                remaining_hops: 3,
2676                received_interface: InterfaceId(1),
2677                taken_hops: 1,
2678                destination_hash: [0xBB; 16],
2679                validated: false,
2680                proof_timeout: 200.0,
2681            },
2682        );
2683
2684        let flags = PacketFlags {
2685            header_type: constants::HEADER_1,
2686            context_flag: constants::FLAG_UNSET,
2687            transport_type: constants::TRANSPORT_BROADCAST,
2688            destination_type: constants::DESTINATION_LINK,
2689            packet_type: constants::PACKET_TYPE_PROOF,
2690        };
2691        let packet = RawPacket::pack(
2692            flags,
2693            0,
2694            &link_id,
2695            None,
2696            constants::CONTEXT_LRPROOF,
2697            &[0xCC; 64],
2698        )
2699        .unwrap();
2700        let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2701
2702        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 101.0, &mut rng);
2703
2704        assert!(matches!(
2705            engine
2706                .link_table_ref()
2707                .get(&link_id)
2708                .map(|entry| entry.validated),
2709            Some(true)
2710        ));
2711        assert!(actions.iter().any(|action| matches!(
2712            action,
2713            TransportAction::LinkEstablished {
2714                link_id: established,
2715                interface: InterfaceId(2),
2716            } if *established == link_id
2717        )));
2718        assert!(actions.iter().any(|action| matches!(
2719            action,
2720            TransportAction::SendOnInterface {
2721                interface: InterfaceId(2),
2722                ..
2723            }
2724        )));
2725    }
2726
2727    #[test]
2728    fn test_packet_filter_drops_plain_announce() {
2729        let engine = TransportEngine::new(make_config(false));
2730        let flags = PacketFlags {
2731            header_type: constants::HEADER_1,
2732            context_flag: constants::FLAG_UNSET,
2733            transport_type: constants::TRANSPORT_BROADCAST,
2734            destination_type: constants::DESTINATION_PLAIN,
2735            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2736        };
2737        let packet =
2738            RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2739        assert!(!engine.packet_filter(&packet));
2740    }
2741
2742    #[test]
2743    fn test_packet_filter_allows_keepalive() {
2744        let engine = TransportEngine::new(make_config(false));
2745        let flags = PacketFlags {
2746            header_type: constants::HEADER_1,
2747            context_flag: constants::FLAG_UNSET,
2748            transport_type: constants::TRANSPORT_BROADCAST,
2749            destination_type: constants::DESTINATION_SINGLE,
2750            packet_type: constants::PACKET_TYPE_DATA,
2751        };
2752        let packet = RawPacket::pack(
2753            flags,
2754            0,
2755            &[0; 16],
2756            None,
2757            constants::CONTEXT_KEEPALIVE,
2758            b"test",
2759        )
2760        .unwrap();
2761        assert!(engine.packet_filter(&packet));
2762    }
2763
2764    #[test]
2765    fn test_packet_filter_drops_high_hop_plain() {
2766        let engine = TransportEngine::new(make_config(false));
2767        let flags = PacketFlags {
2768            header_type: constants::HEADER_1,
2769            context_flag: constants::FLAG_UNSET,
2770            transport_type: constants::TRANSPORT_BROADCAST,
2771            destination_type: constants::DESTINATION_PLAIN,
2772            packet_type: constants::PACKET_TYPE_DATA,
2773        };
2774        let mut packet =
2775            RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2776        packet.hops = 2;
2777        assert!(!engine.packet_filter(&packet));
2778    }
2779
2780    #[test]
2781    fn test_packet_filter_allows_duplicate_single_announce() {
2782        let mut engine = TransportEngine::new(make_config(false));
2783        let flags = PacketFlags {
2784            header_type: constants::HEADER_1,
2785            context_flag: constants::FLAG_UNSET,
2786            transport_type: constants::TRANSPORT_BROADCAST,
2787            destination_type: constants::DESTINATION_SINGLE,
2788            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2789        };
2790        let packet = RawPacket::pack(
2791            flags,
2792            0,
2793            &[0; 16],
2794            None,
2795            constants::CONTEXT_NONE,
2796            &[0xAA; 64],
2797        )
2798        .unwrap();
2799
2800        // Add to hashlist
2801        engine.packet_hashlist.add(packet.packet_hash);
2802
2803        // Should still pass filter (duplicate announce for SINGLE allowed)
2804        assert!(engine.packet_filter(&packet));
2805    }
2806
2807    #[test]
2808    fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2809        let mut engine = TransportEngine::new(make_config(false));
2810        engine.packet_hashlist = PacketHashlist::new(2);
2811
2812        let make_packet = |seed: u8| {
2813            let flags = PacketFlags {
2814                header_type: constants::HEADER_1,
2815                context_flag: constants::FLAG_UNSET,
2816                transport_type: constants::TRANSPORT_BROADCAST,
2817                destination_type: constants::DESTINATION_SINGLE,
2818                packet_type: constants::PACKET_TYPE_DATA,
2819            };
2820            RawPacket::pack(
2821                flags,
2822                0,
2823                &[seed; 16],
2824                None,
2825                constants::CONTEXT_NONE,
2826                &[seed; 4],
2827            )
2828            .unwrap()
2829        };
2830
2831        let packet1 = make_packet(1);
2832        let packet2 = make_packet(2);
2833        let packet3 = make_packet(3);
2834
2835        engine.packet_hashlist.add(packet1.packet_hash);
2836        engine.packet_hashlist.add(packet2.packet_hash);
2837        assert!(!engine.packet_filter(&packet1));
2838
2839        engine.packet_hashlist.add(packet3.packet_hash);
2840
2841        assert!(engine.packet_filter(&packet1));
2842        assert!(!engine.packet_filter(&packet2));
2843        assert!(!engine.packet_filter(&packet3));
2844    }
2845
2846    #[test]
2847    fn test_packet_filter_duplicate_does_not_refresh_recency() {
2848        let mut engine = TransportEngine::new(make_config(false));
2849        engine.packet_hashlist = PacketHashlist::new(2);
2850
2851        let make_packet = |seed: u8| {
2852            let flags = PacketFlags {
2853                header_type: constants::HEADER_1,
2854                context_flag: constants::FLAG_UNSET,
2855                transport_type: constants::TRANSPORT_BROADCAST,
2856                destination_type: constants::DESTINATION_SINGLE,
2857                packet_type: constants::PACKET_TYPE_DATA,
2858            };
2859            RawPacket::pack(
2860                flags,
2861                0,
2862                &[seed; 16],
2863                None,
2864                constants::CONTEXT_NONE,
2865                &[seed; 4],
2866            )
2867            .unwrap()
2868        };
2869
2870        let packet1 = make_packet(1);
2871        let packet2 = make_packet(2);
2872        let packet3 = make_packet(3);
2873
2874        engine.packet_hashlist.add(packet1.packet_hash);
2875        engine.packet_hashlist.add(packet2.packet_hash);
2876        engine.packet_hashlist.add(packet2.packet_hash);
2877        engine.packet_hashlist.add(packet3.packet_hash);
2878
2879        assert!(engine.packet_filter(&packet1));
2880        assert!(!engine.packet_filter(&packet2));
2881        assert!(!engine.packet_filter(&packet3));
2882    }
2883
2884    #[test]
2885    fn test_tick_retransmits_announce() {
2886        let mut engine = TransportEngine::new(make_config(true));
2887        engine.register_interface(make_interface(1, constants::MODE_FULL));
2888
2889        let dest = [0x55; 16];
2890        engine.insert_announce_entry(
2891            dest,
2892            AnnounceEntry {
2893                timestamp: 190.0,
2894                retransmit_timeout: 100.0, // ready to retransmit
2895                retries: 0,
2896                received_from: [0xAA; 16],
2897                hops: 2,
2898                packet_raw: vec![0x01, 0x02],
2899                packet_data: vec![0xCC; 10],
2900                destination_hash: dest,
2901                context_flag: 0,
2902                local_rebroadcasts: 0,
2903                block_rebroadcasts: false,
2904                attached_interface: None,
2905            },
2906            190.0,
2907        );
2908
2909        let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2910        let actions = engine.tick(200.0, &mut rng);
2911
2912        // Should have a send action for the retransmit (gated through announce queue,
2913        // expanded from BroadcastOnAllInterfaces to per-interface SendOnInterface)
2914        assert!(!actions.is_empty());
2915        assert!(matches!(
2916            &actions[0],
2917            TransportAction::SendOnInterface { .. }
2918        ));
2919
2920        // Retries should have increased
2921        assert_eq!(engine.announce_table[&dest].retries, 1);
2922    }
2923
2924    #[test]
2925    fn test_tick_culls_expired_announce_entries() {
2926        let mut config = make_config(true);
2927        config.announce_table_ttl_secs = 10.0;
2928        let mut engine = TransportEngine::new(config);
2929
2930        let dest1 = [0x61; 16];
2931        let dest2 = [0x62; 16];
2932        assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2933        assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2934
2935        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2936        let _ = engine.tick(111.0, &mut rng);
2937
2938        assert!(!engine.announce_table().contains_key(&dest1));
2939        assert!(!engine.held_announces().contains_key(&dest2));
2940    }
2941
2942    #[test]
2943    fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2944        let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2945        let mut config = make_config(true);
2946        config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2947            * 2
2948            + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2949        let max_bytes = config.announce_table_max_bytes;
2950        let mut engine = TransportEngine::new(config);
2951
2952        let held_dest = [0x71; 16];
2953        let active_dest = [0x72; 16];
2954        let newest_dest = [0x73; 16];
2955
2956        assert!(engine.insert_held_announce(
2957            held_dest,
2958            make_announce_entry(held_dest, 100.0, 32),
2959            100.0,
2960        ));
2961        assert!(engine.insert_announce_entry(
2962            active_dest,
2963            make_announce_entry(active_dest, 100.0, 32),
2964            100.0,
2965        ));
2966        assert!(engine.insert_announce_entry(
2967            newest_dest,
2968            make_announce_entry(newest_dest, 101.0, 32),
2969            101.0,
2970        ));
2971
2972        assert!(!engine.held_announces().contains_key(&held_dest));
2973        assert!(engine.announce_table().contains_key(&active_dest));
2974        assert!(engine.announce_table().contains_key(&newest_dest));
2975        assert!(engine.announce_retained_bytes() <= max_bytes);
2976    }
2977
2978    #[test]
2979    fn test_oversized_announce_entry_is_not_retained() {
2980        let mut config = make_config(true);
2981        config.announce_table_max_bytes = 200;
2982        let mut engine = TransportEngine::new(config);
2983        let dest = [0x81; 16];
2984
2985        assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2986        assert!(!engine.announce_table().contains_key(&dest));
2987        assert_eq!(engine.announce_retained_bytes(), 0);
2988    }
2989
2990    #[test]
2991    fn test_blackhole_identity() {
2992        let mut engine = TransportEngine::new(make_config(false));
2993        let hash = [0xAA; 16];
2994        let now = 1000.0;
2995
2996        assert!(!engine.is_blackholed(&hash, now));
2997
2998        engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2999        assert!(engine.is_blackholed(&hash, now));
3000        assert!(engine.is_blackholed(&hash, now + 999999.0)); // never expires
3001
3002        assert!(engine.unblackhole_identity(&hash));
3003        assert!(!engine.is_blackholed(&hash, now));
3004        assert!(!engine.unblackhole_identity(&hash)); // already removed
3005    }
3006
3007    #[test]
3008    fn test_blackhole_with_duration() {
3009        let mut engine = TransportEngine::new(make_config(false));
3010        let hash = [0xBB; 16];
3011        let now = 1000.0;
3012
3013        engine.blackhole_identity(hash, now, Some(1.0), None); // 1 hour
3014        assert!(engine.is_blackholed(&hash, now));
3015        assert!(engine.is_blackholed(&hash, now + 3599.0)); // just before expiry
3016        assert!(!engine.is_blackholed(&hash, now + 3601.0)); // after expiry
3017    }
3018
3019    #[test]
3020    fn test_cull_blackholed() {
3021        let mut engine = TransportEngine::new(make_config(false));
3022        let hash1 = [0xCC; 16];
3023        let hash2 = [0xDD; 16];
3024        let now = 1000.0;
3025
3026        engine.blackhole_identity(hash1, now, Some(1.0), None); // 1 hour
3027        engine.blackhole_identity(hash2, now, None, None); // never expires
3028
3029        engine.cull_blackholed(now + 4000.0); // past hash1 expiry
3030
3031        assert!(!engine.blackholed_identities.contains_key(&hash1));
3032        assert!(engine.blackholed_identities.contains_key(&hash2));
3033    }
3034
3035    #[test]
3036    fn test_blackhole_blocks_announce() {
3037        use crate::announce::AnnounceData;
3038        use crate::destination::{destination_hash, name_hash};
3039
3040        let mut engine = TransportEngine::new(make_config(false));
3041        engine.register_interface(make_interface(1, constants::MODE_FULL));
3042
3043        let identity =
3044            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
3045        let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
3046        let name_h = name_hash("test", &["app"]);
3047        let random_hash = [0x42u8; 10];
3048
3049        let (announce_data, _) =
3050            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3051
3052        let flags = PacketFlags {
3053            header_type: constants::HEADER_1,
3054            context_flag: constants::FLAG_UNSET,
3055            transport_type: constants::TRANSPORT_BROADCAST,
3056            destination_type: constants::DESTINATION_SINGLE,
3057            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3058        };
3059        let packet = RawPacket::pack(
3060            flags,
3061            0,
3062            &dest_hash,
3063            None,
3064            constants::CONTEXT_NONE,
3065            &announce_data,
3066        )
3067        .unwrap();
3068
3069        // Blackhole the identity
3070        let now = 1000.0;
3071        engine.blackhole_identity(*identity.hash(), now, None, None);
3072
3073        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3074        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
3075
3076        // Should produce no AnnounceReceived or PathUpdated actions
3077        assert!(actions
3078            .iter()
3079            .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
3080        assert!(actions
3081            .iter()
3082            .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
3083    }
3084
3085    #[test]
3086    fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
3087        use crate::announce::AnnounceData;
3088        use crate::destination::{destination_hash, name_hash};
3089        use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
3090
3091        let mut engine = TransportEngine::new(make_config(true));
3092        engine.register_interface(make_interface(1, constants::MODE_FULL));
3093
3094        let identity =
3095            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
3096        let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
3097        let name_h = name_hash("async", &["announce"]);
3098        let random_hash = [0x44u8; 10];
3099        let (announce_data, _) =
3100            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3101
3102        let packet = RawPacket::pack(
3103            PacketFlags {
3104                header_type: constants::HEADER_2,
3105                context_flag: constants::FLAG_UNSET,
3106                transport_type: constants::TRANSPORT_TRANSPORT,
3107                destination_type: constants::DESTINATION_SINGLE,
3108                packet_type: constants::PACKET_TYPE_ANNOUNCE,
3109            },
3110            3,
3111            &dest_hash,
3112            Some(&[0xBB; 16]),
3113            constants::CONTEXT_NONE,
3114            &announce_data,
3115        )
3116        .unwrap();
3117
3118        engine.announce_table.insert(
3119            dest_hash,
3120            AnnounceEntry {
3121                timestamp: 1000.0,
3122                retransmit_timeout: 2000.0,
3123                retries: constants::PATHFINDER_R,
3124                received_from: [0xBB; 16],
3125                hops: 2,
3126                packet_raw: packet.raw.clone(),
3127                packet_data: packet.data.clone(),
3128                destination_hash: dest_hash,
3129                context_flag: constants::FLAG_UNSET,
3130                local_rebroadcasts: 0,
3131                block_rebroadcasts: false,
3132                attached_interface: None,
3133            },
3134        );
3135
3136        let mut queue = AnnounceVerifyQueue::new(8);
3137        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3138        let actions = engine.handle_inbound_with_announce_queue(
3139            &packet.raw,
3140            InterfaceId(1),
3141            1000.0,
3142            &mut rng,
3143            Some(&mut queue),
3144        );
3145
3146        assert!(actions.is_empty());
3147        assert_eq!(queue.len(), 1);
3148        assert!(
3149            !engine.announce_table.contains_key(&dest_hash),
3150            "retransmit completion should clear announce_table before queueing"
3151        );
3152    }
3153
3154    #[test]
3155    fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
3156        use crate::announce::AnnounceData;
3157        use crate::destination::{destination_hash, name_hash};
3158        use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
3159
3160        let mut engine = TransportEngine::new(make_config(false));
3161        engine.register_interface(make_interface(1, constants::MODE_FULL));
3162
3163        let identity =
3164            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
3165        let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
3166        let name_h = name_hash("async", &["cache"]);
3167        let random_hash = [0x55u8; 10];
3168        let (announce_data, _) =
3169            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3170
3171        let packet = RawPacket::pack(
3172            PacketFlags {
3173                header_type: constants::HEADER_1,
3174                context_flag: constants::FLAG_UNSET,
3175                transport_type: constants::TRANSPORT_BROADCAST,
3176                destination_type: constants::DESTINATION_SINGLE,
3177                packet_type: constants::PACKET_TYPE_ANNOUNCE,
3178            },
3179            0,
3180            &dest_hash,
3181            None,
3182            constants::CONTEXT_NONE,
3183            &announce_data,
3184        )
3185        .unwrap();
3186
3187        let mut queue = AnnounceVerifyQueue::new(8);
3188        let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
3189        let actions = engine.handle_inbound_with_announce_queue(
3190            &packet.raw,
3191            InterfaceId(1),
3192            1000.0,
3193            &mut rng,
3194            Some(&mut queue),
3195        );
3196        assert!(actions.is_empty());
3197        assert_eq!(queue.len(), 1);
3198
3199        let mut batch = queue.take_pending(1000.0);
3200        assert_eq!(batch.len(), 1);
3201        let (key, pending) = batch.pop().unwrap();
3202
3203        let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
3204        let validated = announce.validate(&pending.packet.destination_hash).unwrap();
3205        let mut material = [0u8; 80];
3206        material[..16].copy_from_slice(&pending.packet.destination_hash);
3207        material[16..].copy_from_slice(&announce.signature);
3208        let sig_cache_key = hash::full_hash(&material);
3209
3210        let pending = queue.complete_success(&key).unwrap();
3211        let actions =
3212            engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
3213        assert!(actions
3214            .iter()
3215            .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
3216        assert!(engine.announce_sig_cache_contains(&sig_cache_key));
3217
3218        let actions = engine.handle_inbound_with_announce_queue(
3219            &packet.raw,
3220            InterfaceId(1),
3221            1001.0,
3222            &mut rng,
3223            Some(&mut queue),
3224        );
3225        assert!(actions.is_empty());
3226        assert_eq!(queue.len(), 0);
3227    }
3228
3229    #[test]
3230    fn test_tick_culls_expired_path() {
3231        let mut engine = TransportEngine::new(make_config(false));
3232        engine.register_interface(make_interface(1, constants::MODE_FULL));
3233
3234        let dest = [0x66; 16];
3235        engine.path_table.insert(
3236            dest,
3237            PathSet::from_single(
3238                PathEntry {
3239                    timestamp: 100.0,
3240                    next_hop: [0; 16],
3241                    hops: 2,
3242                    expires: 200.0,
3243                    random_blobs: Vec::new(),
3244                    receiving_interface: InterfaceId(1),
3245                    packet_hash: [0; 32],
3246                    announce_raw: None,
3247                },
3248                1,
3249            ),
3250        );
3251
3252        assert!(engine.has_path(&dest));
3253
3254        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3255        // Advance past cull interval and path expiry
3256        engine.tick(300.0, &mut rng);
3257
3258        assert!(!engine.has_path(&dest));
3259    }
3260
3261    // =========================================================================
3262    // Phase 7b: Local client transport tests
3263    // =========================================================================
3264
3265    fn make_local_client_interface(id: u64) -> InterfaceInfo {
3266        InterfaceInfo {
3267            id: InterfaceId(id),
3268            name: String::from("local_client"),
3269            mode: constants::MODE_FULL,
3270            out_capable: true,
3271            in_capable: true,
3272            bitrate: None,
3273            announce_rate_target: None,
3274            announce_rate_grace: 0,
3275            announce_rate_penalty: 0.0,
3276            announce_cap: constants::ANNOUNCE_CAP,
3277            is_local_client: true,
3278            wants_tunnel: false,
3279            tunnel_id: None,
3280            mtu: constants::MTU as u32,
3281            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3282            ia_freq: 0.0,
3283            started: 0.0,
3284        }
3285    }
3286
3287    #[test]
3288    fn test_has_local_clients() {
3289        let mut engine = TransportEngine::new(make_config(false));
3290        assert!(!engine.has_local_clients());
3291
3292        engine.register_interface(make_interface(1, constants::MODE_FULL));
3293        assert!(!engine.has_local_clients());
3294
3295        engine.register_interface(make_local_client_interface(2));
3296        assert!(engine.has_local_clients());
3297
3298        engine.deregister_interface(InterfaceId(2));
3299        assert!(!engine.has_local_clients());
3300    }
3301
3302    #[test]
3303    fn test_local_client_hop_decrement() {
3304        // Packets from local clients should have their hops decremented
3305        // to cancel the standard +1 (net zero change)
3306        let mut engine = TransportEngine::new(make_config(false));
3307        engine.register_interface(make_local_client_interface(1));
3308        engine.register_interface(make_interface(2, constants::MODE_FULL));
3309
3310        // Register destination so we get a DeliverLocal action
3311        let dest = [0xAA; 16];
3312        engine.register_destination(dest, constants::DESTINATION_PLAIN);
3313
3314        let flags = PacketFlags {
3315            header_type: constants::HEADER_1,
3316            context_flag: constants::FLAG_UNSET,
3317            transport_type: constants::TRANSPORT_BROADCAST,
3318            destination_type: constants::DESTINATION_PLAIN,
3319            packet_type: constants::PACKET_TYPE_DATA,
3320        };
3321        // Pack with hops=0
3322        let packet =
3323            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3324
3325        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3326        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3327
3328        // Should have local delivery; hops should still be 0 (not 1)
3329        // because the local client decrement cancels the increment
3330        let deliver = actions
3331            .iter()
3332            .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
3333        assert!(deliver.is_some(), "Should deliver locally");
3334    }
3335
3336    #[test]
3337    fn test_plain_broadcast_from_local_client() {
3338        // PLAIN broadcast from local client should forward to external interfaces
3339        let mut engine = TransportEngine::new(make_config(false));
3340        engine.register_interface(make_local_client_interface(1));
3341        engine.register_interface(make_interface(2, constants::MODE_FULL));
3342
3343        let dest = [0xBB; 16];
3344        let flags = PacketFlags {
3345            header_type: constants::HEADER_1,
3346            context_flag: constants::FLAG_UNSET,
3347            transport_type: constants::TRANSPORT_BROADCAST,
3348            destination_type: constants::DESTINATION_PLAIN,
3349            packet_type: constants::PACKET_TYPE_DATA,
3350        };
3351        let packet =
3352            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3353
3354        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3355        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3356
3357        // Should have ForwardPlainBroadcast to external (to_local=false)
3358        let forward = actions.iter().find(|a| {
3359            matches!(
3360                a,
3361                TransportAction::ForwardPlainBroadcast {
3362                    to_local: false,
3363                    ..
3364                }
3365            )
3366        });
3367        assert!(forward.is_some(), "Should forward to external interfaces");
3368    }
3369
3370    #[test]
3371    fn test_plain_broadcast_from_external() {
3372        // PLAIN broadcast from external should forward to local clients
3373        let mut engine = TransportEngine::new(make_config(false));
3374        engine.register_interface(make_local_client_interface(1));
3375        engine.register_interface(make_interface(2, constants::MODE_FULL));
3376
3377        let dest = [0xCC; 16];
3378        let flags = PacketFlags {
3379            header_type: constants::HEADER_1,
3380            context_flag: constants::FLAG_UNSET,
3381            transport_type: constants::TRANSPORT_BROADCAST,
3382            destination_type: constants::DESTINATION_PLAIN,
3383            packet_type: constants::PACKET_TYPE_DATA,
3384        };
3385        let packet =
3386            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3387
3388        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3389        let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
3390
3391        // Should have ForwardPlainBroadcast to local clients (to_local=true)
3392        let forward = actions.iter().find(|a| {
3393            matches!(
3394                a,
3395                TransportAction::ForwardPlainBroadcast { to_local: true, .. }
3396            )
3397        });
3398        assert!(forward.is_some(), "Should forward to local clients");
3399    }
3400
3401    #[test]
3402    fn test_no_plain_broadcast_bridging_without_local_clients() {
3403        // Without local clients, no bridging should happen
3404        let mut engine = TransportEngine::new(make_config(false));
3405        engine.register_interface(make_interface(1, constants::MODE_FULL));
3406        engine.register_interface(make_interface(2, constants::MODE_FULL));
3407
3408        let dest = [0xDD; 16];
3409        let flags = PacketFlags {
3410            header_type: constants::HEADER_1,
3411            context_flag: constants::FLAG_UNSET,
3412            transport_type: constants::TRANSPORT_BROADCAST,
3413            destination_type: constants::DESTINATION_PLAIN,
3414            packet_type: constants::PACKET_TYPE_DATA,
3415        };
3416        let packet =
3417            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
3418
3419        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3420        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3421
3422        // No ForwardPlainBroadcast should be emitted
3423        let has_forward = actions
3424            .iter()
3425            .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
3426        assert!(!has_forward, "No bridging without local clients");
3427    }
3428
3429    #[test]
3430    fn test_announce_forwarded_to_local_clients() {
3431        use crate::announce::AnnounceData;
3432        use crate::destination::{destination_hash, name_hash};
3433
3434        let mut engine = TransportEngine::new(make_config(false));
3435        engine.register_interface(make_interface(1, constants::MODE_FULL));
3436        engine.register_interface(make_local_client_interface(2));
3437
3438        let identity =
3439            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
3440        let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
3441        let name_h = name_hash("test", &["fwd"]);
3442        let random_hash = [0x42u8; 10];
3443
3444        let (announce_data, _) =
3445            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3446
3447        let flags = PacketFlags {
3448            header_type: constants::HEADER_1,
3449            context_flag: constants::FLAG_UNSET,
3450            transport_type: constants::TRANSPORT_BROADCAST,
3451            destination_type: constants::DESTINATION_SINGLE,
3452            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3453        };
3454        let packet = RawPacket::pack(
3455            flags,
3456            0,
3457            &dest_hash,
3458            None,
3459            constants::CONTEXT_NONE,
3460            &announce_data,
3461        )
3462        .unwrap();
3463
3464        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3465        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3466
3467        // Should have ForwardToLocalClients since we have local clients
3468        let forward = actions
3469            .iter()
3470            .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3471        assert!(
3472            forward.is_some(),
3473            "Should forward announce to local clients"
3474        );
3475
3476        // The exclude should be the receiving interface
3477        match forward.unwrap() {
3478            TransportAction::ForwardToLocalClients { exclude, .. } => {
3479                assert_eq!(*exclude, Some(InterfaceId(1)));
3480            }
3481            _ => unreachable!(),
3482        }
3483    }
3484
3485    #[test]
3486    fn test_no_announce_forward_without_local_clients() {
3487        use crate::announce::AnnounceData;
3488        use crate::destination::{destination_hash, name_hash};
3489
3490        let mut engine = TransportEngine::new(make_config(false));
3491        engine.register_interface(make_interface(1, constants::MODE_FULL));
3492
3493        let identity =
3494            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
3495        let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
3496        let name_h = name_hash("test", &["nofwd"]);
3497        let random_hash = [0x42u8; 10];
3498
3499        let (announce_data, _) =
3500            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3501
3502        let flags = PacketFlags {
3503            header_type: constants::HEADER_1,
3504            context_flag: constants::FLAG_UNSET,
3505            transport_type: constants::TRANSPORT_BROADCAST,
3506            destination_type: constants::DESTINATION_SINGLE,
3507            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3508        };
3509        let packet = RawPacket::pack(
3510            flags,
3511            0,
3512            &dest_hash,
3513            None,
3514            constants::CONTEXT_NONE,
3515            &announce_data,
3516        )
3517        .unwrap();
3518
3519        let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
3520        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3521
3522        // No ForwardToLocalClients should be emitted
3523        let has_forward = actions
3524            .iter()
3525            .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3526        assert!(!has_forward, "No forward without local clients");
3527    }
3528
3529    #[test]
3530    fn test_local_client_exclude_from_forward() {
3531        use crate::announce::AnnounceData;
3532        use crate::destination::{destination_hash, name_hash};
3533
3534        let mut engine = TransportEngine::new(make_config(false));
3535        engine.register_interface(make_local_client_interface(1));
3536        engine.register_interface(make_local_client_interface(2));
3537
3538        let identity =
3539            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3540        let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
3541        let name_h = name_hash("test", &["excl"]);
3542        let random_hash = [0x42u8; 10];
3543
3544        let (announce_data, _) =
3545            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3546
3547        let flags = PacketFlags {
3548            header_type: constants::HEADER_1,
3549            context_flag: constants::FLAG_UNSET,
3550            transport_type: constants::TRANSPORT_BROADCAST,
3551            destination_type: constants::DESTINATION_SINGLE,
3552            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3553        };
3554        let packet = RawPacket::pack(
3555            flags,
3556            0,
3557            &dest_hash,
3558            None,
3559            constants::CONTEXT_NONE,
3560            &announce_data,
3561        )
3562        .unwrap();
3563
3564        let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3565        // Feed announce from local client 1
3566        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3567
3568        // Should forward to local clients, excluding interface 1 (the sender)
3569        let forward = actions
3570            .iter()
3571            .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3572        assert!(forward.is_some());
3573        match forward.unwrap() {
3574            TransportAction::ForwardToLocalClients { exclude, .. } => {
3575                assert_eq!(*exclude, Some(InterfaceId(1)));
3576            }
3577            _ => unreachable!(),
3578        }
3579    }
3580
3581    // =========================================================================
3582    // Phase 7d: Tunnel tests
3583    // =========================================================================
3584
3585    fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3586        InterfaceInfo {
3587            id: InterfaceId(id),
3588            name: String::from("tunnel_iface"),
3589            mode: constants::MODE_FULL,
3590            out_capable: true,
3591            in_capable: true,
3592            bitrate: None,
3593            announce_rate_target: None,
3594            announce_rate_grace: 0,
3595            announce_rate_penalty: 0.0,
3596            announce_cap: constants::ANNOUNCE_CAP,
3597            is_local_client: false,
3598            wants_tunnel: true,
3599            tunnel_id: None,
3600            mtu: constants::MTU as u32,
3601            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3602            ia_freq: 0.0,
3603            started: 0.0,
3604        }
3605    }
3606
3607    #[test]
3608    fn test_handle_tunnel_new() {
3609        let mut engine = TransportEngine::new(make_config(true));
3610        engine.register_interface(make_tunnel_interface(1));
3611
3612        let tunnel_id = [0xAA; 32];
3613        let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3614
3615        // Should emit TunnelEstablished
3616        assert!(actions
3617            .iter()
3618            .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3619
3620        // Interface should now have tunnel_id set
3621        let info = engine.interface_info(&InterfaceId(1)).unwrap();
3622        assert_eq!(info.tunnel_id, Some(tunnel_id));
3623
3624        // Tunnel table should have the entry
3625        assert_eq!(engine.tunnel_table().len(), 1);
3626    }
3627
3628    #[test]
3629    fn test_announce_stores_tunnel_path() {
3630        use crate::announce::AnnounceData;
3631        use crate::destination::{destination_hash, name_hash};
3632
3633        let mut engine = TransportEngine::new(make_config(false));
3634        let mut iface = make_tunnel_interface(1);
3635        let tunnel_id = [0xBB; 32];
3636        iface.tunnel_id = Some(tunnel_id);
3637        engine.register_interface(iface);
3638
3639        // Create tunnel entry
3640        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3641
3642        // Create and send an announce
3643        let identity =
3644            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3645        let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3646        let name_h = name_hash("test", &["tunnel"]);
3647        let random_hash = [0x42u8; 10];
3648
3649        let (announce_data, _) =
3650            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3651
3652        let flags = PacketFlags {
3653            header_type: constants::HEADER_1,
3654            context_flag: constants::FLAG_UNSET,
3655            transport_type: constants::TRANSPORT_BROADCAST,
3656            destination_type: constants::DESTINATION_SINGLE,
3657            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3658        };
3659        let packet = RawPacket::pack(
3660            flags,
3661            0,
3662            &dest_hash,
3663            None,
3664            constants::CONTEXT_NONE,
3665            &announce_data,
3666        )
3667        .unwrap();
3668
3669        let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3670        engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3671
3672        // Path should be in path table
3673        assert!(engine.has_path(&dest_hash));
3674
3675        // Path should also be in tunnel table
3676        let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3677        assert_eq!(tunnel.paths.len(), 1);
3678        assert!(tunnel.paths.contains_key(&dest_hash));
3679    }
3680
3681    #[test]
3682    fn test_tunnel_reattach_restores_paths() {
3683        let mut engine = TransportEngine::new(make_config(true));
3684        engine.register_interface(make_tunnel_interface(1));
3685
3686        let tunnel_id = [0xCC; 32];
3687        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3688
3689        // Manually add a path to the tunnel
3690        let dest = [0xDD; 16];
3691        engine.tunnel_table.store_tunnel_path(
3692            &tunnel_id,
3693            dest,
3694            tunnel::TunnelPath {
3695                timestamp: 1000.0,
3696                received_from: [0xEE; 16],
3697                hops: 3,
3698                expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3699                random_blobs: Vec::new(),
3700                packet_hash: [0xFF; 32],
3701            },
3702            1000.0,
3703            constants::DESTINATION_TIMEOUT,
3704            usize::MAX,
3705        );
3706
3707        // Void the tunnel interface (disconnect)
3708        engine.void_tunnel_interface(&tunnel_id);
3709
3710        // Remove path from path table to simulate it expiring
3711        engine.path_table.remove(&dest);
3712        assert!(!engine.has_path(&dest));
3713
3714        // Reattach tunnel on new interface
3715        engine.register_interface(make_interface(2, constants::MODE_FULL));
3716        let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3717
3718        // Should restore the path
3719        assert!(engine.has_path(&dest));
3720        let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3721        assert_eq!(path.hops, 3);
3722        assert_eq!(path.receiving_interface, InterfaceId(2));
3723
3724        // Should emit TunnelEstablished
3725        assert!(actions
3726            .iter()
3727            .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3728    }
3729
3730    #[test]
3731    fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3732        let mut engine = TransportEngine::new(make_config(true));
3733        engine.register_interface(make_tunnel_interface(1));
3734
3735        let tunnel_id = [0xCD; 32];
3736        let dest = [0xDE; 16];
3737        let older_blob = make_random_blob(100);
3738        let newer_blob = make_random_blob(200);
3739
3740        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3741        engine.tunnel_table.store_tunnel_path(
3742            &tunnel_id,
3743            dest,
3744            tunnel::TunnelPath {
3745                timestamp: 1000.0,
3746                received_from: [0xEE; 16],
3747                hops: 2,
3748                expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3749                random_blobs: vec![older_blob],
3750                packet_hash: [0x11; 32],
3751            },
3752            1000.0,
3753            constants::DESTINATION_TIMEOUT,
3754            usize::MAX,
3755        );
3756        engine.void_tunnel_interface(&tunnel_id);
3757
3758        engine.path_table.insert(
3759            dest,
3760            PathSet::from_single(
3761                PathEntry {
3762                    timestamp: 1500.0,
3763                    next_hop: [0xAB; 16],
3764                    hops: 3,
3765                    expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3766                    random_blobs: vec![newer_blob],
3767                    receiving_interface: InterfaceId(3),
3768                    packet_hash: [0x22; 32],
3769                    announce_raw: None,
3770                },
3771                1,
3772            ),
3773        );
3774
3775        engine.register_interface(make_interface(2, constants::MODE_FULL));
3776        engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3777
3778        let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3779        assert_eq!(path.next_hop, [0xAB; 16]);
3780        assert_eq!(path.hops, 3);
3781        assert_eq!(path.receiving_interface, InterfaceId(3));
3782        assert_eq!(path.random_blobs, vec![newer_blob]);
3783    }
3784
3785    #[test]
3786    fn test_void_tunnel_interface() {
3787        let mut engine = TransportEngine::new(make_config(true));
3788        engine.register_interface(make_tunnel_interface(1));
3789
3790        let tunnel_id = [0xDD; 32];
3791        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3792
3793        // Verify tunnel has interface
3794        assert_eq!(
3795            engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3796            Some(InterfaceId(1))
3797        );
3798
3799        engine.void_tunnel_interface(&tunnel_id);
3800
3801        // Interface voided, but tunnel still exists
3802        assert_eq!(engine.tunnel_table().len(), 1);
3803        assert_eq!(
3804            engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3805            None
3806        );
3807    }
3808
3809    #[test]
3810    fn test_tick_culls_tunnels() {
3811        let mut engine = TransportEngine::new(make_config(true));
3812        engine.register_interface(make_tunnel_interface(1));
3813
3814        let tunnel_id = [0xEE; 32];
3815        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3816        assert_eq!(engine.tunnel_table().len(), 1);
3817
3818        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3819
3820        // Tick past DESTINATION_TIMEOUT + TABLES_CULL_INTERVAL
3821        engine.tick(
3822            1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3823            &mut rng,
3824        );
3825
3826        assert_eq!(engine.tunnel_table().len(), 0);
3827    }
3828
3829    #[test]
3830    fn test_synthesize_tunnel() {
3831        let mut engine = TransportEngine::new(make_config(true));
3832        engine.register_interface(make_tunnel_interface(1));
3833
3834        let identity =
3835            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3836        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3837
3838        let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3839
3840        // Should produce a TunnelSynthesize action
3841        assert_eq!(actions.len(), 1);
3842        match &actions[0] {
3843            TransportAction::TunnelSynthesize {
3844                interface,
3845                data,
3846                dest_hash,
3847            } => {
3848                assert_eq!(*interface, InterfaceId(1));
3849                assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3850                // dest_hash should be the tunnel.synthesize plain destination
3851                let expected_dest = crate::destination::destination_hash(
3852                    "rnstransport",
3853                    &["tunnel", "synthesize"],
3854                    None,
3855                );
3856                assert_eq!(*dest_hash, expected_dest);
3857            }
3858            _ => panic!("Expected TunnelSynthesize"),
3859        }
3860    }
3861
3862    // =========================================================================
3863    // DISCOVER_PATHS_FOR tests
3864    // =========================================================================
3865
3866    fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3867        let mut data = Vec::new();
3868        data.extend_from_slice(dest_hash);
3869        data.extend_from_slice(tag);
3870        data
3871    }
3872
3873    #[test]
3874    fn test_path_request_forwarded_on_ap() {
3875        let mut engine = TransportEngine::new(make_config(true));
3876        engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3877        engine.register_interface(make_interface(2, constants::MODE_FULL));
3878
3879        let dest = [0xD1; 16];
3880        let tag = [0x01; 16];
3881        let data = make_path_request_data(&dest, &tag);
3882
3883        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3884
3885        // Should forward the path request on interface 2 (the other OUT interface)
3886        assert_eq!(actions.len(), 1);
3887        match &actions[0] {
3888            TransportAction::SendOnInterface { interface, .. } => {
3889                assert_eq!(*interface, InterfaceId(2));
3890            }
3891            _ => panic!("Expected SendOnInterface for forwarded path request"),
3892        }
3893        // Should have stored a discovery path request
3894        assert!(engine.discovery_path_requests.contains_key(&dest));
3895    }
3896
3897    #[test]
3898    fn test_path_request_not_forwarded_on_full() {
3899        let mut engine = TransportEngine::new(make_config(true));
3900        engine.register_interface(make_interface(1, constants::MODE_FULL));
3901        engine.register_interface(make_interface(2, constants::MODE_FULL));
3902
3903        let dest = [0xD2; 16];
3904        let tag = [0x02; 16];
3905        let data = make_path_request_data(&dest, &tag);
3906
3907        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3908
3909        // MODE_FULL is not in DISCOVER_PATHS_FOR, so no forwarding
3910        assert!(actions.is_empty());
3911        assert!(!engine.discovery_path_requests.contains_key(&dest));
3912    }
3913
3914    #[test]
3915    fn test_discovery_pr_tags_fifo_eviction() {
3916        let mut config = make_config(true);
3917        config.max_discovery_pr_tags = 2;
3918        let mut engine = TransportEngine::new(config);
3919
3920        let dest1 = [0xA1; 16];
3921        let dest2 = [0xA2; 16];
3922        let dest3 = [0xA3; 16];
3923        let tag1 = [0x01; 16];
3924        let tag2 = [0x02; 16];
3925        let tag3 = [0x03; 16];
3926
3927        engine.handle_path_request(
3928            &make_path_request_data(&dest1, &tag1),
3929            InterfaceId(1),
3930            1000.0,
3931        );
3932        engine.handle_path_request(
3933            &make_path_request_data(&dest2, &tag2),
3934            InterfaceId(1),
3935            1001.0,
3936        );
3937        assert_eq!(engine.discovery_pr_tags_count(), 2);
3938
3939        let unique1 = make_unique_tag(dest1, &tag1);
3940        let unique2 = make_unique_tag(dest2, &tag2);
3941        assert!(engine.discovery_pr_tags.contains(&unique1));
3942        assert!(engine.discovery_pr_tags.contains(&unique2));
3943
3944        engine.handle_path_request(
3945            &make_path_request_data(&dest3, &tag3),
3946            InterfaceId(1),
3947            1002.0,
3948        );
3949        assert_eq!(engine.discovery_pr_tags_count(), 2);
3950        assert!(!engine.discovery_pr_tags.contains(&unique1));
3951        assert!(engine.discovery_pr_tags.contains(&unique2));
3952
3953        engine.handle_path_request(
3954            &make_path_request_data(&dest1, &tag1),
3955            InterfaceId(1),
3956            1003.0,
3957        );
3958        assert_eq!(engine.discovery_pr_tags_count(), 2);
3959        assert!(engine.discovery_pr_tags.contains(&unique1));
3960    }
3961
3962    #[test]
3963    fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3964        let mut config = make_config(false);
3965        config.max_path_destinations = 2;
3966        let mut engine = TransportEngine::new(config);
3967        engine.register_interface(make_interface(1, constants::MODE_FULL));
3968
3969        let dest1 = [0xB1; 16];
3970        let dest2 = [0xB2; 16];
3971        let dest3 = [0xB3; 16];
3972
3973        engine.upsert_path_destination(
3974            dest1,
3975            make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3976            1000.0,
3977        );
3978        engine.upsert_path_destination(
3979            dest2,
3980            make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3981            1001.0,
3982        );
3983        engine
3984            .path_states
3985            .insert(dest1, constants::STATE_UNRESPONSIVE);
3986
3987        engine.upsert_path_destination(
3988            dest3,
3989            make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3990            1002.0,
3991        );
3992
3993        assert_eq!(engine.path_table_count(), 2);
3994        assert!(!engine.has_path(&dest1));
3995        assert!(engine.has_path(&dest2));
3996        assert!(engine.has_path(&dest3));
3997        assert!(!engine.path_states.contains_key(&dest1));
3998        assert_eq!(engine.path_destination_cap_evict_count(), 1);
3999    }
4000
4001    #[test]
4002    fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
4003        let mut config = make_config(false);
4004        config.max_path_destinations = 2;
4005        config.max_paths_per_destination = 2;
4006        let mut engine = TransportEngine::new(config);
4007        engine.register_interface(make_interface(1, constants::MODE_FULL));
4008
4009        let dest1 = [0xC1; 16];
4010        let dest2 = [0xC2; 16];
4011
4012        engine.upsert_path_destination(
4013            dest1,
4014            make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
4015            1000.0,
4016        );
4017        engine.upsert_path_destination(
4018            dest2,
4019            make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
4020            1001.0,
4021        );
4022
4023        engine.upsert_path_destination(
4024            dest2,
4025            make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
4026            1002.0,
4027        );
4028
4029        assert_eq!(engine.path_table_count(), 2);
4030        assert!(engine.has_path(&dest1));
4031        assert!(engine.has_path(&dest2));
4032    }
4033
4034    #[test]
4035    fn test_roaming_loop_prevention() {
4036        let mut engine = TransportEngine::new(make_config(true));
4037        engine.register_interface(make_interface(1, constants::MODE_ROAMING));
4038
4039        let dest = [0xD3; 16];
4040        // Path is known and routes through the same interface (1)
4041        engine.path_table.insert(
4042            dest,
4043            PathSet::from_single(
4044                PathEntry {
4045                    timestamp: 900.0,
4046                    next_hop: [0xAA; 16],
4047                    hops: 2,
4048                    expires: 9999.0,
4049                    random_blobs: Vec::new(),
4050                    receiving_interface: InterfaceId(1),
4051                    packet_hash: [0; 32],
4052                    announce_raw: None,
4053                },
4054                1,
4055            ),
4056        );
4057
4058        let tag = [0x03; 16];
4059        let data = make_path_request_data(&dest, &tag);
4060
4061        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4062
4063        // ROAMING interface, path next-hop on same interface → loop prevention, no action
4064        assert!(actions.is_empty());
4065        assert!(!engine.announce_table.contains_key(&dest));
4066    }
4067
4068    /// Build a minimal HEADER_1 announce raw packet for testing.
4069    fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
4070        // HEADER_1: [flags:1][hops:1][dest:16][context:1][data:*]
4071        // flags: HEADER_1(0) << 6 | context_flag(0) << 5 | TRANSPORT_BROADCAST(0) << 4 | SINGLE(0) << 2 | ANNOUNCE(1)
4072        let flags: u8 = 0x01; // HEADER_1, no context, broadcast, single, announce
4073        let mut raw = Vec::new();
4074        raw.push(flags);
4075        raw.push(0x02); // hops
4076        raw.extend_from_slice(dest_hash);
4077        raw.push(constants::CONTEXT_NONE);
4078        raw.extend_from_slice(payload);
4079        raw
4080    }
4081
4082    #[test]
4083    fn test_path_request_populates_announce_entry_from_raw() {
4084        let mut engine = TransportEngine::new(make_config(true));
4085        engine.register_interface(make_interface(1, constants::MODE_FULL));
4086        engine.register_interface(make_interface(2, constants::MODE_FULL));
4087
4088        let dest = [0xD5; 16];
4089        let payload = vec![0xAB; 32]; // simulated announce data (pubkey, sig, etc.)
4090        let announce_raw = make_announce_raw(&dest, &payload);
4091
4092        engine.path_table.insert(
4093            dest,
4094            PathSet::from_single(
4095                PathEntry {
4096                    timestamp: 900.0,
4097                    next_hop: [0xBB; 16],
4098                    hops: 2,
4099                    expires: 9999.0,
4100                    random_blobs: Vec::new(),
4101                    receiving_interface: InterfaceId(2),
4102                    packet_hash: [0; 32],
4103                    announce_raw: Some(announce_raw.clone()),
4104                },
4105                1,
4106            ),
4107        );
4108
4109        let tag = [0x05; 16];
4110        let data = make_path_request_data(&dest, &tag);
4111        let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4112
4113        // The announce table should now have an entry with populated packet_raw/packet_data
4114        let entry = engine
4115            .announce_table
4116            .get(&dest)
4117            .expect("announce entry must exist");
4118        assert_eq!(entry.packet_raw, announce_raw);
4119        assert_eq!(entry.packet_data, payload);
4120        assert!(entry.block_rebroadcasts);
4121    }
4122
4123    #[test]
4124    fn test_path_request_skips_when_no_announce_raw() {
4125        let mut engine = TransportEngine::new(make_config(true));
4126        engine.register_interface(make_interface(1, constants::MODE_FULL));
4127        engine.register_interface(make_interface(2, constants::MODE_FULL));
4128
4129        let dest = [0xD6; 16];
4130
4131        engine.path_table.insert(
4132            dest,
4133            PathSet::from_single(
4134                PathEntry {
4135                    timestamp: 900.0,
4136                    next_hop: [0xCC; 16],
4137                    hops: 1,
4138                    expires: 9999.0,
4139                    random_blobs: Vec::new(),
4140                    receiving_interface: InterfaceId(2),
4141                    packet_hash: [0; 32],
4142                    announce_raw: None, // no raw data available
4143                },
4144                1,
4145            ),
4146        );
4147
4148        let tag = [0x06; 16];
4149        let data = make_path_request_data(&dest, &tag);
4150        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
4151
4152        // Should NOT create an announce entry without raw data
4153        assert!(actions.is_empty());
4154        assert!(!engine.announce_table.contains_key(&dest));
4155    }
4156
4157    #[test]
4158    fn test_discovery_request_consumed_on_announce() {
4159        let mut engine = TransportEngine::new(make_config(true));
4160        engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
4161
4162        let dest = [0xD4; 16];
4163
4164        // Simulate a waiting discovery request
4165        engine.discovery_path_requests.insert(
4166            dest,
4167            DiscoveryPathRequest {
4168                timestamp: 900.0,
4169                requesting_interface: InterfaceId(1),
4170            },
4171        );
4172
4173        // Consume it
4174        let iface = engine.discovery_path_requests_waiting(&dest);
4175        assert_eq!(iface, Some(InterfaceId(1)));
4176
4177        // Should be gone now
4178        assert!(!engine.discovery_path_requests.contains_key(&dest));
4179        assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
4180    }
4181
4182    #[test]
4183    fn test_pending_path_request_announce_bypasses_ingress_control() {
4184        let mut engine = TransportEngine::new(make_config(true));
4185        let mut inbound = make_interface(1, constants::MODE_FULL);
4186        inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
4187        inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
4188        inbound.started = 0.0;
4189        engine.register_interface(inbound);
4190        engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
4191
4192        let identity =
4193            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4194        let dest_hash = crate::destination::destination_hash(
4195            "ingress",
4196            &["path-request"],
4197            Some(identity.hash()),
4198        );
4199        let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
4200        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4201
4202        engine.discovery_path_requests.insert(
4203            dest_hash,
4204            DiscoveryPathRequest {
4205                timestamp: 999.0,
4206                requesting_interface: InterfaceId(2),
4207            },
4208        );
4209
4210        let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
4211        let actions = engine.handle_inbound(&announce_raw, InterfaceId(1), 1000.0, &mut rng);
4212
4213        assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
4214        assert!(engine.has_path(&dest_hash));
4215        assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
4216        assert!(actions.iter().any(|a| {
4217            matches!(
4218                a,
4219                TransportAction::AnnounceReceived {
4220                    destination_hash,
4221                    receiving_interface: InterfaceId(1),
4222                    ..
4223                } if *destination_hash == dest_hash
4224            )
4225        }));
4226
4227        let entry = engine
4228            .announce_table
4229            .get(&dest_hash)
4230            .expect("path response announce should be queued");
4231        assert!(entry.block_rebroadcasts);
4232        assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
4233    }
4234
4235    // =========================================================================
4236    // Issue #4: Shared instance client 1-hop transport injection
4237    // =========================================================================
4238
4239    /// Helper: build a valid announce packet for use in issue #4 tests.
4240    fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
4241        let identity =
4242            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4243        let random_hash = [0x42u8; 10];
4244        let (announce_data, _) = crate::announce::AnnounceData::pack(
4245            &identity,
4246            dest_hash,
4247            name_hash,
4248            &random_hash,
4249            None,
4250            None,
4251        )
4252        .unwrap();
4253        let flags = PacketFlags {
4254            header_type: constants::HEADER_1,
4255            context_flag: constants::FLAG_UNSET,
4256            transport_type: constants::TRANSPORT_BROADCAST,
4257            destination_type: constants::DESTINATION_SINGLE,
4258            packet_type: constants::PACKET_TYPE_ANNOUNCE,
4259        };
4260        RawPacket::pack(
4261            flags,
4262            0,
4263            dest_hash,
4264            None,
4265            constants::CONTEXT_NONE,
4266            &announce_data,
4267        )
4268        .unwrap()
4269        .raw
4270    }
4271
4272    #[test]
4273    fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
4274        // Shared clients learn remote paths via their local shared-instance
4275        // interface and must inject transport headers on outbound when the
4276        // destination is exactly 1 hop away behind the daemon.
4277
4278        let mut engine = TransportEngine::new(make_config(false));
4279        engine.register_interface(make_local_client_interface(1));
4280
4281        let identity =
4282            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4283        let dest_hash =
4284            crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
4285        let name_hash = crate::destination::name_hash("issue4", &["test"]);
4286        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4287
4288        // Model the announce as already forwarded by the shared daemon to
4289        // the local client. The raw hop count is 1 so that after the local
4290        // client hop compensation the learned path remains 1 hop away.
4291        let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
4292        announce_packet.raw[1] = 1;
4293        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4294        engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
4295        assert!(engine.has_path(&dest_hash));
4296        assert_eq!(engine.hops_to(&dest_hash), Some(1));
4297
4298        // Build DATA from the shared client to the 1-hop destination.
4299        let data_flags = PacketFlags {
4300            header_type: constants::HEADER_1,
4301            context_flag: constants::FLAG_UNSET,
4302            transport_type: constants::TRANSPORT_BROADCAST,
4303            destination_type: constants::DESTINATION_SINGLE,
4304            packet_type: constants::PACKET_TYPE_DATA,
4305        };
4306        let data_packet = RawPacket::pack(
4307            data_flags,
4308            0,
4309            &dest_hash,
4310            None,
4311            constants::CONTEXT_NONE,
4312            b"hello",
4313        )
4314        .unwrap();
4315
4316        let actions =
4317            engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
4318
4319        let send = actions.iter().find_map(|a| match a {
4320            TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
4321            _ => None,
4322        });
4323        let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
4324        assert_eq!(*interface, InterfaceId(1));
4325        let flags = PacketFlags::unpack(raw[0]);
4326        assert_eq!(flags.header_type, constants::HEADER_2);
4327        assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
4328    }
4329
4330    #[test]
4331    fn test_issue4_external_data_to_1hop_via_transport_works() {
4332        // Control test: when a DATA packet arrives from an external interface
4333        // with HEADER_2 and the daemon's transport_id, the daemon correctly
4334        // forwards it via step 5.  This proves the multi-hop path works;
4335        // it's only the 1-hop shared-client case that's broken.
4336
4337        let daemon_id = [0x42; 16];
4338        let mut engine = TransportEngine::new(TransportConfig {
4339            transport_enabled: true,
4340            identity_hash: Some(daemon_id),
4341            prefer_shorter_path: false,
4342            max_paths_per_destination: 1,
4343            packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
4344            max_discovery_pr_tags: constants::MAX_PR_TAGS,
4345            max_path_destinations: usize::MAX,
4346            max_tunnel_destinations_total: usize::MAX,
4347            destination_timeout_secs: constants::DESTINATION_TIMEOUT,
4348            announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
4349            announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
4350            announce_sig_cache_enabled: true,
4351            announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
4352            announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
4353            announce_queue_max_entries: 256,
4354            announce_queue_max_interfaces: 1024,
4355        });
4356        engine.register_interface(make_interface(1, constants::MODE_FULL)); // inbound
4357        engine.register_interface(make_interface(2, constants::MODE_FULL)); // outbound to Bob
4358
4359        let identity =
4360            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
4361        let dest_hash =
4362            crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
4363        let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
4364        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
4365
4366        // Feed announce from interface 2 (Bob's side), hops=0 → stored as hops=1
4367        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
4368        engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
4369        assert_eq!(engine.hops_to(&dest_hash), Some(1));
4370
4371        // Now send a HEADER_2 transport packet addressed to the daemon
4372        // (simulating what Alice would send in a multi-hop scenario)
4373        let h2_flags = PacketFlags {
4374            header_type: constants::HEADER_2,
4375            context_flag: constants::FLAG_UNSET,
4376            transport_type: constants::TRANSPORT_TRANSPORT,
4377            destination_type: constants::DESTINATION_SINGLE,
4378            packet_type: constants::PACKET_TYPE_DATA,
4379        };
4380        // Build HEADER_2 manually: [flags, hops, transport_id(16), dest_hash(16), context, data...]
4381        let mut h2_raw = Vec::new();
4382        h2_raw.push(h2_flags.pack());
4383        h2_raw.push(0); // hops
4384        h2_raw.extend_from_slice(&daemon_id); // transport_id = daemon
4385        h2_raw.extend_from_slice(&dest_hash);
4386        h2_raw.push(constants::CONTEXT_NONE);
4387        h2_raw.extend_from_slice(b"hello via transport");
4388
4389        let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
4390        let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
4391
4392        // This SHOULD forward via step 5 (transport forwarding)
4393        let has_send = actions.iter().any(|a| {
4394            matches!(
4395                a,
4396                TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
4397            )
4398        });
4399        assert!(
4400            has_send,
4401            "HEADER_2 transport packet should be forwarded (control test)"
4402        );
4403    }
4404}