Skip to main content

rns_core/transport/
mod.rs

1pub mod announce_proc;
2pub mod announce_queue;
3pub mod announce_verify_queue;
4pub mod dedup;
5pub mod inbound;
6pub mod ingress_control;
7pub mod jobs;
8pub mod outbound;
9pub mod path_requests;
10pub mod pathfinder;
11pub mod queries;
12pub mod rate_limit;
13pub mod retention;
14pub mod tables;
15pub mod tunnel;
16pub mod types;
17
18use alloc::collections::{BTreeMap, BTreeSet, VecDeque};
19use alloc::string::String;
20use alloc::vec::Vec;
21use core::mem::size_of;
22
23use rns_crypto::Rng;
24
25use crate::announce::AnnounceData;
26use crate::constants;
27use crate::hash;
28use crate::packet::RawPacket;
29
30use self::announce_proc::compute_path_expires;
31use self::announce_queue::AnnounceQueues;
32use self::announce_verify_queue::{AnnounceVerifyKey, AnnounceVerifyQueue, PendingAnnounce};
33use self::dedup::{AnnounceSignatureCache, PacketHashlist};
34use self::inbound::{
35    create_link_entry, create_reverse_entry, forward_transport_packet, route_proof_via_reverse,
36    route_via_link_table,
37};
38use self::ingress_control::IngressControl;
39use self::outbound::{route_outbound, should_transmit_announce};
40use self::pathfinder::{
41    decide_announce_multipath, extract_random_blob, timebase_from_random_blob,
42    timebase_from_random_blobs, MultiPathDecision,
43};
44use self::rate_limit::AnnounceRateLimiter;
45use self::tables::{AnnounceEntry, DiscoveryPathRequest, LinkEntry, PathEntry, PathSet};
46use self::tunnel::TunnelTable;
47use self::types::{
48    BlackholeEntry, InterfaceId, InterfaceInfo, PacketBytes, TransportAction, TransportConfig,
49};
50
51pub type PathTableRow = ([u8; 16], f64, [u8; 16], u8, f64, String);
52pub type RateTableRow = ([u8; 16], f64, u32, f64, Vec<f64>);
53
54struct InboundPacketCtx {
55    packet: RawPacket,
56    original_raw: Option<Vec<u8>>,
57    iface: InterfaceId,
58    now: f64,
59    from_local_client: bool,
60}
61
62struct VerifiedAnnounceCtx<'a> {
63    packet: &'a RawPacket,
64    original_raw: &'a [u8],
65    iface: InterfaceId,
66    now: f64,
67    validated: crate::announce::ValidatedAnnounce,
68    received_from: [u8; 16],
69    random_blob: [u8; 10],
70    announce_emitted: u64,
71}
72
73struct TickCtx<'a> {
74    now: f64,
75    rng: &'a mut dyn Rng,
76    actions: Vec<TransportAction>,
77}
78
79struct PathRequestCtx<'a> {
80    data: &'a [u8],
81    interface_id: InterfaceId,
82    now: f64,
83    destination_hash: [u8; 16],
84}
85
86/// The core transport/routing engine.
87///
88/// Maintains routing tables and processes packets without performing any I/O.
89/// Returns `Vec<TransportAction>` that the caller must execute.
90pub struct TransportEngine {
91    config: TransportConfig,
92    path_table: BTreeMap<[u8; 16], PathSet>,
93    announce_table: BTreeMap<[u8; 16], AnnounceEntry>,
94    reverse_table: BTreeMap<[u8; 16], tables::ReverseEntry>,
95    link_table: BTreeMap<[u8; 16], LinkEntry>,
96    held_announces: BTreeMap<[u8; 16], AnnounceEntry>,
97    packet_hashlist: PacketHashlist,
98    announce_sig_cache: AnnounceSignatureCache,
99    rate_limiter: AnnounceRateLimiter,
100    path_states: BTreeMap<[u8; 16], u8>,
101    interfaces: BTreeMap<InterfaceId, InterfaceInfo>,
102    local_destinations: BTreeMap<[u8; 16], u8>,
103    blackholed_identities: BTreeMap<[u8; 16], BlackholeEntry>,
104    announce_queues: AnnounceQueues,
105    ingress_control: IngressControl,
106    tunnel_table: TunnelTable,
107    discovery_pr_tags: VecDeque<[u8; 32]>,
108    discovery_pr_tag_set: BTreeSet<[u8; 32]>,
109    discovery_path_requests: BTreeMap<[u8; 16], DiscoveryPathRequest>,
110    path_destination_cap_evict_count: usize,
111    // Job timing
112    announces_last_checked: f64,
113    tables_last_culled: f64,
114}
115
116impl TransportEngine {
117    pub fn new(config: TransportConfig) -> Self {
118        let packet_hashlist_max_entries = config.packet_hashlist_max_entries;
119        let sig_cache_max = if config.announce_sig_cache_enabled {
120            config.announce_sig_cache_max_entries
121        } else {
122            0
123        };
124        let sig_cache_ttl = config.announce_sig_cache_ttl_secs;
125        let announce_queue_max_interfaces = config.announce_queue_max_interfaces;
126        TransportEngine {
127            config,
128            path_table: BTreeMap::new(),
129            announce_table: BTreeMap::new(),
130            reverse_table: BTreeMap::new(),
131            link_table: BTreeMap::new(),
132            held_announces: BTreeMap::new(),
133            packet_hashlist: PacketHashlist::new(packet_hashlist_max_entries),
134            announce_sig_cache: AnnounceSignatureCache::new(sig_cache_max, sig_cache_ttl),
135            rate_limiter: AnnounceRateLimiter::new(),
136            path_states: BTreeMap::new(),
137            interfaces: BTreeMap::new(),
138            local_destinations: BTreeMap::new(),
139            blackholed_identities: BTreeMap::new(),
140            announce_queues: AnnounceQueues::new(announce_queue_max_interfaces),
141            ingress_control: IngressControl::new(),
142            tunnel_table: TunnelTable::new(),
143            discovery_pr_tags: VecDeque::new(),
144            discovery_pr_tag_set: BTreeSet::new(),
145            discovery_path_requests: BTreeMap::new(),
146            path_destination_cap_evict_count: 0,
147            announces_last_checked: 0.0,
148            tables_last_culled: 0.0,
149        }
150    }
151
152    // =========================================================================
153    // Interface management
154    // =========================================================================
155
156    pub fn register_interface(&mut self, info: InterfaceInfo) {
157        self.interfaces.insert(info.id, info);
158    }
159
160    pub fn deregister_interface(&mut self, id: InterfaceId) {
161        self.interfaces.remove(&id);
162        self.announce_queues.remove_interface(id);
163        self.ingress_control.remove_interface(&id);
164    }
165
166    // =========================================================================
167    // Destination management
168    // =========================================================================
169
170    pub fn register_destination(&mut self, dest_hash: [u8; 16], dest_type: u8) {
171        self.local_destinations.insert(dest_hash, dest_type);
172    }
173
174    pub fn deregister_destination(&mut self, dest_hash: &[u8; 16]) {
175        self.local_destinations.remove(dest_hash);
176    }
177
178    // =========================================================================
179    // Path queries
180    // =========================================================================
181
182    pub fn has_path(&self, dest_hash: &[u8; 16]) -> bool {
183        self.path_table
184            .get(dest_hash)
185            .is_some_and(|ps| !ps.is_empty())
186    }
187
188    pub fn hops_to(&self, dest_hash: &[u8; 16]) -> Option<u8> {
189        self.path_table
190            .get(dest_hash)
191            .and_then(|ps| ps.primary())
192            .map(|e| e.hops)
193    }
194
195    pub fn next_hop(&self, dest_hash: &[u8; 16]) -> Option<[u8; 16]> {
196        self.path_table
197            .get(dest_hash)
198            .and_then(|ps| ps.primary())
199            .map(|e| e.next_hop)
200    }
201
202    pub fn next_hop_interface(&self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
203        self.path_table
204            .get(dest_hash)
205            .and_then(|ps| ps.primary())
206            .map(|e| e.receiving_interface)
207    }
208
209    // =========================================================================
210    // Path state
211    // =========================================================================
212
213    /// Mark a path as unresponsive.
214    ///
215    /// If `receiving_interface` is provided and points to a MODE_BOUNDARY interface,
216    /// the marking is skipped — boundary interfaces must not poison path tables.
217    /// (Python Transport.py: mark_path_unknown/unresponsive boundary exemption)
218    pub fn mark_path_unresponsive(
219        &mut self,
220        dest_hash: &[u8; 16],
221        receiving_interface: Option<InterfaceId>,
222    ) {
223        if let Some(iface_id) = receiving_interface {
224            if let Some(info) = self.interfaces.get(&iface_id) {
225                if info.mode == constants::MODE_BOUNDARY {
226                    return;
227                }
228            }
229        }
230
231        // Failover: if we have alternative paths, promote the next one
232        if let Some(ps) = self.path_table.get_mut(dest_hash) {
233            if ps.len() > 1 {
234                ps.failover(false); // demote old primary to back
235                                    // Clear unresponsive state since we promoted a fresh primary
236                self.path_states.remove(dest_hash);
237                return;
238            }
239        }
240
241        self.path_states
242            .insert(*dest_hash, constants::STATE_UNRESPONSIVE);
243    }
244
245    pub fn mark_path_responsive(&mut self, dest_hash: &[u8; 16]) {
246        self.path_states
247            .insert(*dest_hash, constants::STATE_RESPONSIVE);
248    }
249
250    pub fn path_is_unresponsive(&self, dest_hash: &[u8; 16]) -> bool {
251        self.path_states.get(dest_hash) == Some(&constants::STATE_UNRESPONSIVE)
252    }
253
254    pub fn expire_path(&mut self, dest_hash: &[u8; 16]) {
255        if let Some(ps) = self.path_table.get_mut(dest_hash) {
256            ps.expire_all();
257        }
258    }
259
260    // =========================================================================
261    // Link table
262    // =========================================================================
263
264    pub fn register_link(&mut self, link_id: [u8; 16], entry: LinkEntry) {
265        self.link_table.insert(link_id, entry);
266    }
267
268    pub fn validate_link(&mut self, link_id: &[u8; 16]) {
269        if let Some(entry) = self.link_table.get_mut(link_id) {
270            entry.validated = true;
271        }
272    }
273
274    pub fn remove_link(&mut self, link_id: &[u8; 16]) {
275        self.link_table.remove(link_id);
276    }
277
278    // =========================================================================
279    // Blackhole management
280    // =========================================================================
281
282    /// Add an identity hash to the blackhole list.
283    pub fn blackhole_identity(
284        &mut self,
285        identity_hash: [u8; 16],
286        now: f64,
287        duration_hours: Option<f64>,
288        reason: Option<String>,
289    ) {
290        let expires = match duration_hours {
291            Some(h) if h > 0.0 => now + h * 3600.0,
292            _ => 0.0, // never expires
293        };
294        self.blackholed_identities.insert(
295            identity_hash,
296            BlackholeEntry {
297                created: now,
298                expires,
299                reason,
300            },
301        );
302    }
303
304    /// Remove an identity hash from the blackhole list.
305    pub fn unblackhole_identity(&mut self, identity_hash: &[u8; 16]) -> bool {
306        self.blackholed_identities.remove(identity_hash).is_some()
307    }
308
309    /// Check if an identity hash is blackholed (and not expired).
310    pub fn is_blackholed(&self, identity_hash: &[u8; 16], now: f64) -> bool {
311        if let Some(entry) = self.blackholed_identities.get(identity_hash) {
312            if entry.expires == 0.0 || entry.expires > now {
313                return true;
314            }
315        }
316        false
317    }
318
319    /// Get all blackhole entries (for queries).
320    pub fn blackholed_entries(&self) -> impl Iterator<Item = (&[u8; 16], &BlackholeEntry)> {
321        self.blackholed_identities.iter()
322    }
323
324    /// Cull expired blackhole entries.
325    fn cull_blackholed(&mut self, now: f64) {
326        self.blackholed_identities
327            .retain(|_, entry| entry.expires == 0.0 || entry.expires > now);
328    }
329
330    // =========================================================================
331    // Tunnel management
332    // =========================================================================
333
334    /// Handle a validated tunnel synthesis — create new or reattach.
335    ///
336    /// Returns actions for any restored paths.
337    pub fn handle_tunnel(
338        &mut self,
339        tunnel_id: [u8; 32],
340        interface: InterfaceId,
341        now: f64,
342    ) -> Vec<TransportAction> {
343        let mut actions = Vec::new();
344
345        // Set tunnel_id on the interface
346        if let Some(info) = self.interfaces.get_mut(&interface) {
347            info.tunnel_id = Some(tunnel_id);
348        }
349
350        let restored_paths = self.tunnel_table.handle_tunnel(
351            tunnel_id,
352            interface,
353            now,
354            self.config.destination_timeout_secs,
355        );
356
357        // Restore paths to path table if they're better than existing
358        for (dest_hash, tunnel_path) in &restored_paths {
359            let should_restore = match self.path_table.get(dest_hash).and_then(|ps| ps.primary()) {
360                Some(existing) => {
361                    // Restore if fewer/equal hops or existing expired, but never
362                    // overwrite a path learned from a more recent announce.
363                    if tunnel_path.hops <= existing.hops || existing.expires < now {
364                        let existing_timebase = timebase_from_random_blobs(&existing.random_blobs);
365                        let tunnel_timebase = timebase_from_random_blobs(&tunnel_path.random_blobs);
366                        tunnel_timebase >= existing_timebase
367                    } else {
368                        false
369                    }
370                }
371                None => now < tunnel_path.expires,
372            };
373
374            if should_restore {
375                let entry = PathEntry {
376                    timestamp: tunnel_path.timestamp,
377                    next_hop: tunnel_path.received_from,
378                    hops: tunnel_path.hops,
379                    expires: tunnel_path.expires,
380                    random_blobs: tunnel_path.random_blobs.clone(),
381                    receiving_interface: interface,
382                    packet_hash: tunnel_path.packet_hash,
383                    announce_raw: None,
384                };
385                self.upsert_path_destination(*dest_hash, entry, now);
386            }
387        }
388
389        actions.push(TransportAction::TunnelEstablished {
390            tunnel_id,
391            interface,
392        });
393
394        actions
395    }
396
397    /// Synthesize a tunnel on an interface.
398    ///
399    /// `identity`: the transport identity (must have private key for signing)
400    /// `interface_id`: which interface to send the synthesis on
401    /// `rng`: random number generator
402    ///
403    /// Returns TunnelSynthesize action to send the synthesis packet.
404    pub fn synthesize_tunnel(
405        &self,
406        identity: &rns_crypto::identity::Identity,
407        interface_id: InterfaceId,
408        rng: &mut dyn Rng,
409    ) -> Vec<TransportAction> {
410        let mut actions = Vec::new();
411
412        // Compute interface hash from the interface name
413        let interface_hash = if let Some(info) = self.interfaces.get(&interface_id) {
414            hash::full_hash(info.name.as_bytes())
415        } else {
416            return actions;
417        };
418
419        match tunnel::build_tunnel_synthesize_data(identity, &interface_hash, rng) {
420            Ok((data, _tunnel_id)) => {
421                let dest_hash = crate::destination::destination_hash(
422                    "rnstransport",
423                    &["tunnel", "synthesize"],
424                    None,
425                );
426                actions.push(TransportAction::TunnelSynthesize {
427                    interface: interface_id,
428                    data,
429                    dest_hash,
430                });
431            }
432            Err(e) => {
433                // Can't synthesize — no private key or other error
434                let _ = e;
435            }
436        }
437
438        actions
439    }
440
441    /// Void a tunnel's interface connection (tunnel disconnected).
442    pub fn void_tunnel_interface(&mut self, tunnel_id: &[u8; 32]) {
443        self.tunnel_table.void_tunnel_interface(tunnel_id);
444    }
445
446    /// Access the tunnel table for queries.
447    pub fn tunnel_table(&self) -> &TunnelTable {
448        &self.tunnel_table
449    }
450
451    // =========================================================================
452    // Packet filter
453    // =========================================================================
454
455    /// Check if any local client interfaces are registered.
456    fn has_local_clients(&self) -> bool {
457        self.interfaces.values().any(|i| i.is_local_client)
458    }
459
460    /// Packet filter: dedup + basic validity.
461    ///
462    /// Transport.py:1187-1238
463    fn packet_filter(&self, packet: &RawPacket) -> bool {
464        // Filter packets for other transport instances
465        if packet.transport_id.is_some()
466            && packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE
467        {
468            if let Some(ref identity_hash) = self.config.identity_hash {
469                if packet.transport_id.as_ref() != Some(identity_hash) {
470                    return false;
471                }
472            }
473        }
474
475        // Allow certain contexts unconditionally
476        match packet.context {
477            constants::CONTEXT_KEEPALIVE
478            | constants::CONTEXT_RESOURCE_REQ
479            | constants::CONTEXT_RESOURCE_PRF
480            | constants::CONTEXT_RESOURCE
481            | constants::CONTEXT_CACHE_REQUEST
482            | constants::CONTEXT_CHANNEL => return true,
483            _ => {}
484        }
485
486        // PLAIN/GROUP checks
487        if packet.flags.destination_type == constants::DESTINATION_PLAIN
488            || packet.flags.destination_type == constants::DESTINATION_GROUP
489        {
490            if packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
491                return packet.hops <= 1;
492            } else {
493                // PLAIN/GROUP ANNOUNCE is invalid
494                return false;
495            }
496        }
497
498        // Deduplication
499        if !self.packet_hashlist.is_duplicate(&packet.packet_hash) {
500            return true;
501        }
502
503        // Duplicate announce for SINGLE dest is allowed (path update)
504        if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
505            && packet.flags.destination_type == constants::DESTINATION_SINGLE
506        {
507            return true;
508        }
509
510        false
511    }
512
513    // =========================================================================
514    // Core API: handle_inbound
515    // =========================================================================
516
517    /// Process an inbound raw packet from a network interface.
518    ///
519    /// Returns a list of actions for the caller to execute.
520    pub fn handle_inbound(
521        &mut self,
522        raw: &[u8],
523        iface: InterfaceId,
524        now: f64,
525        rng: &mut dyn Rng,
526    ) -> Vec<TransportAction> {
527        self.handle_inbound_with_announce_queue(raw, iface, now, rng, None)
528    }
529
530    pub fn handle_inbound_with_announce_queue(
531        &mut self,
532        raw: &[u8],
533        iface: InterfaceId,
534        now: f64,
535        rng: &mut dyn Rng,
536        announce_queue: Option<&mut AnnounceVerifyQueue>,
537    ) -> Vec<TransportAction> {
538        let Some(ctx) = self.prepare_inbound_packet(raw, iface, now) else {
539            return Vec::new();
540        };
541        let mut actions = Vec::new();
542
543        self.remember_inbound_packet_hash(&ctx.packet);
544        self.bridge_plain_broadcast(&ctx, &mut actions);
545        self.handle_transport_forwarding(&ctx, &mut actions);
546        self.handle_link_table_routing(&ctx, &mut actions);
547        self.handle_inbound_announce(&ctx, rng, announce_queue, &mut actions);
548
549        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_PROOF {
550            self.process_inbound_proof(&ctx.packet, ctx.iface, ctx.now, &mut actions);
551        }
552
553        self.handle_inbound_local_delivery(&ctx, &mut actions);
554        actions
555    }
556
557    fn prepare_inbound_packet(
558        &self,
559        raw: &[u8],
560        iface: InterfaceId,
561        now: f64,
562    ) -> Option<InboundPacketCtx> {
563        let mut packet = RawPacket::unpack(raw).ok()?;
564        let from_local_client = self
565            .interfaces
566            .get(&iface)
567            .map(|i| i.is_local_client)
568            .unwrap_or(false);
569        packet.hops += 1;
570        if from_local_client {
571            packet.hops = packet.hops.saturating_sub(1);
572        }
573        if !self.packet_filter(&packet) {
574            return None;
575        }
576        let retain_original_raw = packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE;
577        Some(InboundPacketCtx {
578            packet,
579            original_raw: if retain_original_raw {
580                Some(raw.to_vec())
581            } else {
582                None
583            },
584            iface,
585            now,
586            from_local_client,
587        })
588    }
589
590    fn remember_inbound_packet_hash(&mut self, packet: &RawPacket) {
591        let remember_hash = !(self.link_table.contains_key(&packet.destination_hash)
592            || (packet.flags.packet_type == constants::PACKET_TYPE_PROOF
593                && packet.context == constants::CONTEXT_LRPROOF));
594        if remember_hash {
595            self.packet_hashlist.add(packet.packet_hash);
596        }
597    }
598
599    fn bridge_plain_broadcast(&self, ctx: &InboundPacketCtx, actions: &mut Vec<TransportAction>) {
600        if ctx.packet.flags.destination_type != constants::DESTINATION_PLAIN
601            || ctx.packet.flags.transport_type != constants::TRANSPORT_BROADCAST
602            || !self.has_local_clients()
603        {
604            return;
605        }
606
607        if ctx.from_local_client {
608            actions.push(TransportAction::ForwardPlainBroadcast {
609                raw: PacketBytes::from(ctx.packet.raw.clone()),
610                to_local: false,
611                exclude: Some(ctx.iface),
612            });
613        } else {
614            actions.push(TransportAction::ForwardPlainBroadcast {
615                raw: PacketBytes::from(ctx.packet.raw.clone()),
616                to_local: true,
617                exclude: None,
618            });
619        }
620    }
621
622    fn handle_transport_forwarding(
623        &mut self,
624        ctx: &InboundPacketCtx,
625        actions: &mut Vec<TransportAction>,
626    ) {
627        if !(self.config.transport_enabled || self.config.identity_hash.is_some()) {
628            return;
629        }
630        if ctx.packet.transport_id.is_none()
631            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
632        {
633            return;
634        }
635
636        let Some(identity_hash) = self.config.identity_hash else {
637            return;
638        };
639        if ctx.packet.transport_id != Some(identity_hash) {
640            return;
641        }
642
643        let Some(path_entry) = self
644            .path_table
645            .get(&ctx.packet.destination_hash)
646            .and_then(|ps| ps.primary())
647        else {
648            return;
649        };
650
651        let next_hop = path_entry.next_hop;
652        let remaining_hops = path_entry.hops;
653        let outbound_interface = path_entry.receiving_interface;
654        let new_raw =
655            forward_transport_packet(&ctx.packet, next_hop, remaining_hops, outbound_interface);
656
657        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST {
658            let proof_timeout = ctx.now
659                + constants::LINK_ESTABLISHMENT_TIMEOUT_PER_HOP * (remaining_hops.max(1) as f64);
660            let (link_id, link_entry) = create_link_entry(
661                &ctx.packet,
662                next_hop,
663                outbound_interface,
664                remaining_hops,
665                ctx.iface,
666                ctx.now,
667                proof_timeout,
668            );
669            self.link_table.insert(link_id, link_entry);
670            actions.push(TransportAction::LinkRequestReceived {
671                link_id,
672                destination_hash: ctx.packet.destination_hash,
673                receiving_interface: ctx.iface,
674            });
675        } else {
676            let (trunc_hash, reverse_entry) =
677                create_reverse_entry(&ctx.packet, outbound_interface, ctx.iface, ctx.now);
678            self.reverse_table.insert(trunc_hash, reverse_entry);
679        }
680
681        actions.push(TransportAction::SendOnInterface {
682            interface: outbound_interface,
683            raw: new_raw.into(),
684        });
685
686        if let Some(entry) = self
687            .path_table
688            .get_mut(&ctx.packet.destination_hash)
689            .and_then(|ps| ps.primary_mut())
690        {
691            entry.timestamp = ctx.now;
692        }
693    }
694
695    fn handle_link_table_routing(
696        &mut self,
697        ctx: &InboundPacketCtx,
698        actions: &mut Vec<TransportAction>,
699    ) {
700        if !self.config.transport_enabled && self.config.identity_hash.is_none() {
701            return;
702        }
703        if ctx.packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE
704            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
705            || ctx.packet.context == constants::CONTEXT_LRPROOF
706        {
707            return;
708        }
709
710        let Some(link_entry) = self.link_table.get(&ctx.packet.destination_hash).cloned() else {
711            return;
712        };
713        let Some((outbound_iface, new_raw)) =
714            route_via_link_table(&ctx.packet, &link_entry, ctx.iface)
715        else {
716            return;
717        };
718
719        self.packet_hashlist.add(ctx.packet.packet_hash);
720        actions.push(TransportAction::SendOnInterface {
721            interface: outbound_iface,
722            raw: new_raw.into(),
723        });
724
725        if let Some(entry) = self.link_table.get_mut(&ctx.packet.destination_hash) {
726            entry.timestamp = ctx.now;
727        }
728    }
729
730    fn handle_inbound_announce(
731        &mut self,
732        ctx: &InboundPacketCtx,
733        rng: &mut dyn Rng,
734        announce_queue: Option<&mut AnnounceVerifyQueue>,
735        actions: &mut Vec<TransportAction>,
736    ) {
737        if ctx.packet.flags.packet_type != constants::PACKET_TYPE_ANNOUNCE {
738            return;
739        }
740
741        if let Some(queue) = announce_queue {
742            self.try_enqueue_announce(ctx, rng, queue, actions);
743        } else {
744            let original_raw = ctx
745                .original_raw
746                .as_deref()
747                .expect("announce packets retain original raw bytes");
748            self.process_inbound_announce(
749                &ctx.packet,
750                original_raw,
751                ctx.iface,
752                ctx.now,
753                rng,
754                actions,
755            );
756        }
757    }
758
759    fn handle_inbound_local_delivery(
760        &self,
761        ctx: &InboundPacketCtx,
762        actions: &mut Vec<TransportAction>,
763    ) {
764        if (ctx.packet.flags.packet_type == constants::PACKET_TYPE_LINKREQUEST
765            || ctx.packet.flags.packet_type == constants::PACKET_TYPE_DATA)
766            && self
767                .local_destinations
768                .contains_key(&ctx.packet.destination_hash)
769        {
770            actions.push(TransportAction::DeliverLocal {
771                destination_hash: ctx.packet.destination_hash,
772                raw: PacketBytes::from(ctx.packet.raw.clone()),
773                packet_hash: ctx.packet.packet_hash,
774                receiving_interface: ctx.iface,
775            });
776        }
777    }
778
779    // =========================================================================
780    // Inbound announce processing
781    // =========================================================================
782
783    fn process_inbound_announce(
784        &mut self,
785        packet: &RawPacket,
786        original_raw: &[u8],
787        iface: InterfaceId,
788        now: f64,
789        rng: &mut dyn Rng,
790        actions: &mut Vec<TransportAction>,
791    ) {
792        if packet.flags.destination_type != constants::DESTINATION_SINGLE {
793            return;
794        }
795
796        let has_ratchet = packet.flags.context_flag == constants::FLAG_SET;
797
798        // Unpack and validate announce
799        let announce = match AnnounceData::unpack(&packet.data, has_ratchet) {
800            Ok(a) => a,
801            Err(_) => return,
802        };
803
804        let sig_cache_key =
805            Self::announce_sig_cache_key(packet.destination_hash, &announce.signature);
806
807        let validated = if self.announce_sig_cache.contains(&sig_cache_key) {
808            announce.to_validated_unchecked()
809        } else {
810            match announce.validate(&packet.destination_hash) {
811                Ok(v) => {
812                    self.announce_sig_cache.insert(sig_cache_key, now);
813                    v
814                }
815                Err(_) => return,
816            }
817        };
818
819        let received_from = self.announce_received_from(packet, now);
820        let random_blob = match extract_random_blob(&packet.data) {
821            Some(b) => b,
822            None => return,
823        };
824        let announce_emitted = timebase_from_random_blob(&random_blob);
825
826        self.process_verified_announce(
827            VerifiedAnnounceCtx {
828                packet,
829                original_raw,
830                iface,
831                now,
832                validated,
833                received_from,
834                random_blob,
835                announce_emitted,
836            },
837            rng,
838            actions,
839        );
840    }
841
842    fn announce_sig_cache_key(destination_hash: [u8; 16], signature: &[u8; 64]) -> [u8; 32] {
843        let mut material = [0u8; 80];
844        material[..16].copy_from_slice(&destination_hash);
845        material[16..].copy_from_slice(signature);
846        hash::full_hash(&material)
847    }
848
849    fn announce_received_from(&mut self, packet: &RawPacket, now: f64) -> [u8; 16] {
850        if let Some(transport_id) = packet.transport_id {
851            if self.config.transport_enabled {
852                if let Some(announce_entry) = self.announce_table.get_mut(&packet.destination_hash)
853                {
854                    if packet.hops.checked_sub(1) == Some(announce_entry.hops) {
855                        announce_entry.local_rebroadcasts += 1;
856                        if announce_entry.retries > 0
857                            && announce_entry.local_rebroadcasts
858                                >= constants::LOCAL_REBROADCASTS_MAX
859                        {
860                            self.announce_table.remove(&packet.destination_hash);
861                        }
862                    }
863                    if let Some(announce_entry) = self.announce_table.get(&packet.destination_hash)
864                    {
865                        if packet.hops.checked_sub(1) == Some(announce_entry.hops + 1)
866                            && announce_entry.retries > 0
867                            && now < announce_entry.retransmit_timeout
868                        {
869                            self.announce_table.remove(&packet.destination_hash);
870                        }
871                    }
872                }
873            }
874            transport_id
875        } else {
876            packet.destination_hash
877        }
878    }
879
880    fn should_hold_announce(
881        &mut self,
882        packet: &RawPacket,
883        original_raw: &[u8],
884        iface: InterfaceId,
885        now: f64,
886    ) -> bool {
887        if self.has_path(&packet.destination_hash) {
888            return false;
889        }
890        if self
891            .discovery_path_requests
892            .contains_key(&packet.destination_hash)
893        {
894            return false;
895        }
896        let Some(info) = self.interfaces.get(&iface) else {
897            return false;
898        };
899        if packet.context == constants::CONTEXT_PATH_RESPONSE
900            || !self.ingress_control.should_ingress_limit(
901                iface,
902                &info.ingress_control,
903                info.ia_freq,
904                info.started,
905                now,
906            )
907        {
908            return false;
909        }
910        self.ingress_control.hold_announce(
911            iface,
912            &info.ingress_control,
913            packet.destination_hash,
914            ingress_control::HeldAnnounce {
915                raw: original_raw.to_vec(),
916                hops: packet.hops,
917                receiving_interface: iface,
918                timestamp: now,
919            },
920        );
921        true
922    }
923
924    fn try_enqueue_announce(
925        &mut self,
926        ctx: &InboundPacketCtx,
927        rng: &mut dyn Rng,
928        announce_queue: &mut AnnounceVerifyQueue,
929        actions: &mut Vec<TransportAction>,
930    ) {
931        if ctx.packet.flags.destination_type != constants::DESTINATION_SINGLE {
932            return;
933        }
934
935        let has_ratchet = ctx.packet.flags.context_flag == constants::FLAG_SET;
936        let announce = match AnnounceData::unpack(&ctx.packet.data, has_ratchet) {
937            Ok(a) => a,
938            Err(_) => return,
939        };
940
941        let received_from = self.announce_received_from(&ctx.packet, ctx.now);
942
943        if self
944            .local_destinations
945            .contains_key(&ctx.packet.destination_hash)
946        {
947            log::debug!(
948                "Announce:skipping local destination {:02x}{:02x}{:02x}{:02x}..",
949                ctx.packet.destination_hash[0],
950                ctx.packet.destination_hash[1],
951                ctx.packet.destination_hash[2],
952                ctx.packet.destination_hash[3],
953            );
954            return;
955        }
956
957        let original_raw = ctx
958            .original_raw
959            .as_deref()
960            .expect("announce packets retain original raw bytes");
961        if self.should_hold_announce(&ctx.packet, original_raw, ctx.iface, ctx.now) {
962            return;
963        }
964
965        let sig_cache_key =
966            Self::announce_sig_cache_key(ctx.packet.destination_hash, &announce.signature);
967        if self.announce_sig_cache.contains(&sig_cache_key) {
968            let validated = announce.to_validated_unchecked();
969            let random_blob = match extract_random_blob(&ctx.packet.data) {
970                Some(b) => b,
971                None => return,
972            };
973            let announce_emitted = timebase_from_random_blob(&random_blob);
974            self.process_verified_announce(
975                VerifiedAnnounceCtx {
976                    packet: &ctx.packet,
977                    original_raw,
978                    iface: ctx.iface,
979                    now: ctx.now,
980                    validated,
981                    received_from,
982                    random_blob,
983                    announce_emitted,
984                },
985                rng,
986                actions,
987            );
988            return;
989        }
990
991        if ctx.packet.context == constants::CONTEXT_PATH_RESPONSE {
992            let Ok(validated) = announce.validate(&ctx.packet.destination_hash) else {
993                return;
994            };
995            self.announce_sig_cache.insert(sig_cache_key, ctx.now);
996            let random_blob = match extract_random_blob(&ctx.packet.data) {
997                Some(b) => b,
998                None => return,
999            };
1000            let announce_emitted = timebase_from_random_blob(&random_blob);
1001            self.process_verified_announce(
1002                VerifiedAnnounceCtx {
1003                    packet: &ctx.packet,
1004                    original_raw,
1005                    iface: ctx.iface,
1006                    now: ctx.now,
1007                    validated,
1008                    received_from,
1009                    random_blob,
1010                    announce_emitted,
1011                },
1012                rng,
1013                actions,
1014            );
1015            return;
1016        }
1017
1018        let random_blob = match extract_random_blob(&ctx.packet.data) {
1019            Some(b) => b,
1020            None => return,
1021        };
1022        let announce_emitted = timebase_from_random_blob(&random_blob);
1023        let key = AnnounceVerifyKey {
1024            destination_hash: ctx.packet.destination_hash,
1025            random_blob,
1026            received_from,
1027        };
1028        let pending = PendingAnnounce {
1029            original_raw: original_raw.to_vec(),
1030            packet: ctx.packet.clone(),
1031            interface: ctx.iface,
1032            received_from,
1033            queued_at: ctx.now,
1034            best_hops: ctx.packet.hops,
1035            emission_ts: announce_emitted,
1036            random_blob,
1037        };
1038        let _ = announce_queue.enqueue(key, pending);
1039    }
1040
1041    pub fn complete_verified_announce(
1042        &mut self,
1043        pending: PendingAnnounce,
1044        validated: crate::announce::ValidatedAnnounce,
1045        sig_cache_key: [u8; 32],
1046        now: f64,
1047        rng: &mut dyn Rng,
1048    ) -> Vec<TransportAction> {
1049        self.announce_sig_cache.insert(sig_cache_key, now);
1050        let mut actions = Vec::new();
1051        self.process_verified_announce(
1052            VerifiedAnnounceCtx {
1053                packet: &pending.packet,
1054                original_raw: &pending.original_raw,
1055                iface: pending.interface,
1056                now,
1057                validated,
1058                received_from: pending.received_from,
1059                random_blob: pending.random_blob,
1060                announce_emitted: pending.emission_ts,
1061            },
1062            rng,
1063            &mut actions,
1064        );
1065        actions
1066    }
1067
1068    pub fn clear_failed_verified_announce(&mut self, _sig_cache_key: [u8; 32], _now: f64) {}
1069
1070    fn process_verified_announce(
1071        &mut self,
1072        ctx: VerifiedAnnounceCtx<'_>,
1073        rng: &mut dyn Rng,
1074        actions: &mut Vec<TransportAction>,
1075    ) {
1076        if self.is_blackholed(&ctx.validated.identity_hash, ctx.now) {
1077            return;
1078        }
1079        if ctx.packet.hops > constants::PATHFINDER_M {
1080            return;
1081        }
1082
1083        let existing_set = self.path_table.get(&ctx.packet.destination_hash);
1084        let was_unknown_destination = existing_set.is_none_or(|ps| ps.is_empty());
1085
1086        // Reset stale path state before first-path installation so path-state handling
1087        // cannot race ahead of the path table for previously unknown destinations.
1088        if was_unknown_destination {
1089            self.path_states.remove(&ctx.packet.destination_hash);
1090        }
1091
1092        // Multi-path aware decision
1093        let is_unresponsive = self.path_is_unresponsive(&ctx.packet.destination_hash);
1094
1095        let mp_decision = decide_announce_multipath(
1096            existing_set,
1097            ctx.packet.hops,
1098            ctx.announce_emitted,
1099            &ctx.random_blob,
1100            &ctx.received_from,
1101            is_unresponsive,
1102            ctx.now,
1103            self.config.prefer_shorter_path,
1104        );
1105
1106        if mp_decision == MultiPathDecision::Reject {
1107            log::debug!(
1108                "Announce:path decision REJECT for dest={:02x}{:02x}{:02x}{:02x}..",
1109                ctx.packet.destination_hash[0],
1110                ctx.packet.destination_hash[1],
1111                ctx.packet.destination_hash[2],
1112                ctx.packet.destination_hash[3],
1113            );
1114            return;
1115        }
1116
1117        // Rate limiting
1118        let rate_blocked = if ctx.packet.context != constants::CONTEXT_PATH_RESPONSE {
1119            if let Some(iface_info) = self.interfaces.get(&ctx.iface) {
1120                self.rate_limiter.check_and_update(
1121                    &ctx.packet.destination_hash,
1122                    ctx.now,
1123                    iface_info.announce_rate_target,
1124                    iface_info.announce_rate_grace,
1125                    iface_info.announce_rate_penalty,
1126                )
1127            } else {
1128                false
1129            }
1130        } else {
1131            false
1132        };
1133
1134        // Get interface mode for expiry calculation
1135        let interface_mode = self
1136            .interfaces
1137            .get(&ctx.iface)
1138            .map(|i| i.mode)
1139            .unwrap_or(constants::MODE_FULL);
1140
1141        let expires = compute_path_expires(ctx.now, interface_mode);
1142
1143        // Get existing random blobs from the matching path (same next_hop) or empty
1144        let existing_blobs = self
1145            .path_table
1146            .get(&ctx.packet.destination_hash)
1147            .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1148            .map(|e| e.random_blobs.clone())
1149            .unwrap_or_default();
1150
1151        // Generate RNG value for retransmit timeout
1152        let mut rng_bytes = [0u8; 8];
1153        rng.fill_bytes(&mut rng_bytes);
1154        let rng_value = (u64::from_le_bytes(rng_bytes) as f64) / (u64::MAX as f64);
1155
1156        let is_path_response = ctx.packet.context == constants::CONTEXT_PATH_RESPONSE;
1157
1158        let (path_entry, announce_entry) = announce_proc::process_validated_announce(
1159            ctx.packet.destination_hash,
1160            ctx.packet.hops,
1161            &ctx.packet.data,
1162            &ctx.packet.raw,
1163            ctx.packet.packet_hash,
1164            ctx.packet.flags.context_flag,
1165            ctx.received_from,
1166            ctx.iface,
1167            ctx.now,
1168            existing_blobs,
1169            ctx.random_blob,
1170            expires,
1171            rng_value,
1172            self.config.transport_enabled,
1173            is_path_response,
1174            rate_blocked,
1175            Some(ctx.original_raw.to_vec()),
1176        );
1177
1178        // Emit CacheAnnounce for disk caching (pre-hop-increment raw)
1179        actions.push(TransportAction::CacheAnnounce {
1180            packet_hash: ctx.packet.packet_hash,
1181            raw: ctx.original_raw.to_vec().into(),
1182        });
1183
1184        // Store path via upsert into PathSet
1185        self.upsert_path_destination(ctx.packet.destination_hash, path_entry, ctx.now);
1186
1187        // If receiving interface has a tunnel_id, store path in tunnel table too
1188        if let Some(tunnel_id) = self.interfaces.get(&ctx.iface).and_then(|i| i.tunnel_id) {
1189            let blobs = self
1190                .path_table
1191                .get(&ctx.packet.destination_hash)
1192                .and_then(|ps| ps.find_by_next_hop(&ctx.received_from))
1193                .map(|e| e.random_blobs.clone())
1194                .unwrap_or_default();
1195            self.tunnel_table.store_tunnel_path(
1196                &tunnel_id,
1197                ctx.packet.destination_hash,
1198                tunnel::TunnelPath {
1199                    timestamp: ctx.now,
1200                    received_from: ctx.received_from,
1201                    hops: ctx.packet.hops,
1202                    expires,
1203                    random_blobs: blobs,
1204                    packet_hash: ctx.packet.packet_hash,
1205                },
1206                ctx.now,
1207                self.config.destination_timeout_secs,
1208                self.config.max_tunnel_destinations_total,
1209            );
1210        }
1211
1212        // Re-apply the path-state reset after storing the path entry so any transient
1213        // stale state is also cleared once the destination exists in the path table.
1214        self.path_states.remove(&ctx.packet.destination_hash);
1215
1216        // Store announce for retransmission
1217        if let Some(ann) = announce_entry {
1218            self.insert_announce_entry(ctx.packet.destination_hash, ann, ctx.now);
1219        }
1220
1221        // Emit actions
1222        actions.push(TransportAction::AnnounceReceived {
1223            destination_hash: ctx.packet.destination_hash,
1224            identity_hash: ctx.validated.identity_hash,
1225            public_key: ctx.validated.public_key,
1226            name_hash: ctx.validated.name_hash,
1227            random_hash: ctx.validated.random_hash,
1228            ratchet: ctx.validated.ratchet,
1229            app_data: ctx.validated.app_data,
1230            hops: ctx.packet.hops,
1231            receiving_interface: ctx.iface,
1232        });
1233
1234        actions.push(TransportAction::PathUpdated {
1235            destination_hash: ctx.packet.destination_hash,
1236            hops: ctx.packet.hops,
1237            next_hop: ctx.received_from,
1238            interface: ctx.iface,
1239        });
1240
1241        // Forward announce to local clients if any are connected
1242        if self.has_local_clients() {
1243            actions.push(TransportAction::ForwardToLocalClients {
1244                raw: PacketBytes::from(ctx.packet.raw.clone()),
1245                exclude: Some(ctx.iface),
1246            });
1247        }
1248
1249        // Check for discovery path requests waiting for this announce
1250        if let Some(pr_entry) = self.discovery_path_requests_waiting(&ctx.packet.destination_hash) {
1251            // Build a path response announce and queue it
1252            let entry = AnnounceEntry {
1253                timestamp: ctx.now,
1254                retransmit_timeout: ctx.now,
1255                retries: constants::PATHFINDER_R,
1256                received_from: ctx.received_from,
1257                hops: ctx.packet.hops,
1258                packet_raw: ctx.packet.raw.clone(),
1259                packet_data: ctx.packet.data.clone(),
1260                destination_hash: ctx.packet.destination_hash,
1261                context_flag: ctx.packet.flags.context_flag,
1262                local_rebroadcasts: 0,
1263                block_rebroadcasts: true,
1264                attached_interface: Some(pr_entry),
1265            };
1266            self.insert_announce_entry(ctx.packet.destination_hash, entry, ctx.now);
1267        }
1268    }
1269
1270    pub fn announce_sig_cache_contains(&self, sig_cache_key: &[u8; 32]) -> bool {
1271        self.announce_sig_cache.contains(sig_cache_key)
1272    }
1273
1274    /// Check if there's a waiting discovery path request for a destination.
1275    /// Consumes the request if found (one-shot: the caller queues the announce response).
1276    fn discovery_path_requests_waiting(&mut self, dest_hash: &[u8; 16]) -> Option<InterfaceId> {
1277        self.discovery_path_requests
1278            .remove(dest_hash)
1279            .map(|req| req.requesting_interface)
1280    }
1281
1282    // =========================================================================
1283    // Inbound proof processing
1284    // =========================================================================
1285
1286    fn process_inbound_proof(
1287        &mut self,
1288        packet: &RawPacket,
1289        iface: InterfaceId,
1290        _now: f64,
1291        actions: &mut Vec<TransportAction>,
1292    ) {
1293        if packet.context == constants::CONTEXT_LRPROOF {
1294            // Link request proof routing
1295            if (self.config.transport_enabled)
1296                && self.link_table.contains_key(&packet.destination_hash)
1297            {
1298                let link_entry = self.link_table.get(&packet.destination_hash).cloned();
1299                if let Some(entry) = link_entry {
1300                    if let Some((outbound_interface, new_raw)) =
1301                        route_via_link_table(packet, &entry, iface)
1302                    {
1303                        // Forward the proof (simplified: skip signature validation
1304                        // which requires Identity recall)
1305
1306                        // Mark link as validated
1307                        if let Some(le) = self.link_table.get_mut(&packet.destination_hash) {
1308                            le.validated = true;
1309                        }
1310
1311                        actions.push(TransportAction::LinkEstablished {
1312                            link_id: packet.destination_hash,
1313                            interface: outbound_interface,
1314                        });
1315
1316                        actions.push(TransportAction::SendOnInterface {
1317                            interface: outbound_interface,
1318                            raw: new_raw.into(),
1319                        });
1320                    }
1321                }
1322            } else {
1323                // Could be for a local pending link - deliver locally
1324                actions.push(TransportAction::DeliverLocal {
1325                    destination_hash: packet.destination_hash,
1326                    raw: PacketBytes::from(packet.raw.clone()),
1327                    packet_hash: packet.packet_hash,
1328                    receiving_interface: iface,
1329                });
1330            }
1331        } else {
1332            // Regular proof: check reverse table
1333            if self.config.transport_enabled {
1334                if let Some(reverse_entry) = self.reverse_table.remove(&packet.destination_hash) {
1335                    if let Some(action) = route_proof_via_reverse(packet, &reverse_entry, iface) {
1336                        actions.push(action);
1337                    }
1338                }
1339            }
1340
1341            // Deliver to local receipts
1342            actions.push(TransportAction::DeliverLocal {
1343                destination_hash: packet.destination_hash,
1344                raw: PacketBytes::from(packet.raw.clone()),
1345                packet_hash: packet.packet_hash,
1346                receiving_interface: iface,
1347            });
1348        }
1349    }
1350
1351    // =========================================================================
1352    // Core API: handle_outbound
1353    // =========================================================================
1354
1355    /// Route an outbound packet.
1356    pub fn handle_outbound(
1357        &mut self,
1358        packet: &RawPacket,
1359        dest_type: u8,
1360        attached_interface: Option<InterfaceId>,
1361        now: f64,
1362    ) -> Vec<TransportAction> {
1363        let actions = route_outbound(
1364            &self.path_table,
1365            &self.interfaces,
1366            &self.local_destinations,
1367            packet,
1368            dest_type,
1369            attached_interface,
1370            now,
1371        );
1372
1373        // Add to packet hashlist for outbound packets
1374        self.packet_hashlist.add(packet.packet_hash);
1375
1376        // Gate announces with hops > 0 through the bandwidth queue
1377        if packet.flags.packet_type == constants::PACKET_TYPE_ANNOUNCE && packet.hops > 0 {
1378            self.gate_announce_actions(actions, &packet.destination_hash, packet.hops, now)
1379        } else {
1380            actions
1381        }
1382    }
1383
1384    /// Gate announce SendOnInterface actions through per-interface bandwidth queues.
1385    fn gate_announce_actions(
1386        &mut self,
1387        actions: Vec<TransportAction>,
1388        dest_hash: &[u8; 16],
1389        hops: u8,
1390        now: f64,
1391    ) -> Vec<TransportAction> {
1392        let mut result = Vec::new();
1393        for action in actions {
1394            match action {
1395                TransportAction::SendOnInterface { interface, raw } => {
1396                    let (bitrate, airtime_profile, announce_cap) =
1397                        if let Some(info) = self.interfaces.get(&interface) {
1398                            (info.bitrate, info.airtime_profile, info.announce_cap)
1399                        } else {
1400                            (None, None, constants::ANNOUNCE_CAP)
1401                        };
1402                    if let Some(send_action) = self.announce_queues.gate_announce(
1403                        interface,
1404                        raw,
1405                        *dest_hash,
1406                        hops,
1407                        now,
1408                        now,
1409                        bitrate,
1410                        airtime_profile,
1411                        announce_cap,
1412                    ) {
1413                        result.push(send_action);
1414                    }
1415                    // If None, it was queued — no action emitted now
1416                }
1417                other => result.push(other),
1418            }
1419        }
1420        result
1421    }
1422
1423    // =========================================================================
1424    // Core API: tick
1425    // =========================================================================
1426
1427    /// Periodic maintenance. Call regularly (e.g., every 250ms).
1428    pub fn tick(&mut self, now: f64, rng: &mut dyn Rng) -> Vec<TransportAction> {
1429        let mut ctx = TickCtx {
1430            now,
1431            rng,
1432            actions: Vec::new(),
1433        };
1434        self.process_tick_pending_announces(&mut ctx);
1435
1436        let mut queue_actions = self.announce_queues.process_queues(now, &self.interfaces);
1437        ctx.actions.append(&mut queue_actions);
1438
1439        self.process_tick_ingress_release(&mut ctx);
1440        self.cull_tick_tables(&mut ctx);
1441        ctx.actions
1442    }
1443
1444    fn process_tick_pending_announces(&mut self, ctx: &mut TickCtx<'_>) {
1445        if ctx.now <= self.announces_last_checked + constants::ANNOUNCES_CHECK_INTERVAL {
1446            return;
1447        }
1448
1449        self.cull_expired_announce_entries(ctx.now);
1450        self.enforce_announce_retention_cap(ctx.now);
1451        if let Some(identity_hash) = self.config.identity_hash {
1452            let announce_actions = jobs::process_pending_announces(
1453                &mut self.announce_table,
1454                &mut self.held_announces,
1455                &identity_hash,
1456                ctx.now,
1457            );
1458            let gated = self.gate_retransmit_actions(announce_actions, ctx.now);
1459            ctx.actions.extend(gated);
1460        }
1461        self.cull_expired_announce_entries(ctx.now);
1462        self.enforce_announce_retention_cap(ctx.now);
1463        self.announces_last_checked = ctx.now;
1464    }
1465
1466    fn process_tick_ingress_release(&mut self, ctx: &mut TickCtx<'_>) {
1467        let ic_interfaces = self.ingress_control.interfaces_with_held();
1468        for iface_id in ic_interfaces {
1469            let (ia_freq, started, ingress_config) = match self.interfaces.get(&iface_id) {
1470                Some(info) => (info.ia_freq, info.started, info.ingress_control),
1471                None => continue,
1472            };
1473            if !ingress_config.enabled {
1474                continue;
1475            }
1476            if let Some(held) = self.ingress_control.process_held_announces(
1477                iface_id,
1478                &ingress_config,
1479                ia_freq,
1480                started,
1481                ctx.now,
1482            ) {
1483                let released_actions =
1484                    self.handle_inbound(&held.raw, held.receiving_interface, ctx.now, ctx.rng);
1485                ctx.actions.extend(released_actions);
1486            }
1487        }
1488    }
1489
1490    fn cull_tick_tables(&mut self, ctx: &mut TickCtx<'_>) {
1491        if ctx.now <= self.tables_last_culled + constants::TABLES_CULL_INTERVAL {
1492            return;
1493        }
1494
1495        jobs::cull_path_table(&mut self.path_table, &self.interfaces, ctx.now);
1496        jobs::cull_reverse_table(&mut self.reverse_table, &self.interfaces, ctx.now);
1497        let (_culled, link_closed_actions) =
1498            jobs::cull_link_table(&mut self.link_table, &self.interfaces, ctx.now);
1499        ctx.actions.extend(link_closed_actions);
1500        jobs::cull_path_states(&mut self.path_states, &self.path_table);
1501        self.cull_blackholed(ctx.now);
1502        self.discovery_path_requests
1503            .retain(|_, req| ctx.now - req.timestamp < constants::DISCOVERY_PATH_REQUEST_TIMEOUT);
1504        self.tunnel_table
1505            .void_missing_interfaces(|id| self.interfaces.contains_key(id));
1506        self.tunnel_table.cull(ctx.now);
1507        self.announce_sig_cache.cull(ctx.now);
1508        self.tables_last_culled = ctx.now;
1509    }
1510
1511    /// Gate retransmitted announce actions through per-interface bandwidth queues.
1512    ///
1513    /// Retransmitted announces always have hops > 0.
1514    /// `BroadcastOnAllInterfaces` is expanded to per-interface sends gated through queues.
1515    fn gate_retransmit_actions(
1516        &mut self,
1517        actions: Vec<TransportAction>,
1518        now: f64,
1519    ) -> Vec<TransportAction> {
1520        let mut result = Vec::new();
1521        for action in actions {
1522            match action {
1523                TransportAction::SendOnInterface { interface, raw } => {
1524                    // Extract dest_hash from raw (bytes 2..18 for H1, 18..34 for H2)
1525                    let (dest_hash, hops) = Self::extract_announce_info(&raw);
1526                    let (bitrate, airtime_profile, announce_cap) =
1527                        if let Some(info) = self.interfaces.get(&interface) {
1528                            (info.bitrate, info.airtime_profile, info.announce_cap)
1529                        } else {
1530                            (None, None, constants::ANNOUNCE_CAP)
1531                        };
1532                    if let Some(send_action) = self.announce_queues.gate_announce(
1533                        interface,
1534                        raw,
1535                        dest_hash,
1536                        hops,
1537                        now,
1538                        now,
1539                        bitrate,
1540                        airtime_profile,
1541                        announce_cap,
1542                    ) {
1543                        result.push(send_action);
1544                    }
1545                }
1546                TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1547                    let (dest_hash, hops) = Self::extract_announce_info(&raw);
1548                    // Expand to per-interface sends gated through queues,
1549                    // applying mode filtering (AP blocks non-local announces, etc.)
1550                    let iface_ids: Vec<(
1551                        InterfaceId,
1552                        Option<u64>,
1553                        Option<types::AirtimeProfile>,
1554                        f64,
1555                    )> = self
1556                        .interfaces
1557                        .iter()
1558                        .filter(|(_, info)| info.out_capable)
1559                        .filter(|(id, _)| {
1560                            if let Some(ref ex) = exclude {
1561                                **id != *ex
1562                            } else {
1563                                true
1564                            }
1565                        })
1566                        .filter(|(_, info)| {
1567                            should_transmit_announce(
1568                                info,
1569                                &dest_hash,
1570                                hops,
1571                                &self.local_destinations,
1572                                &self.path_table,
1573                                &self.interfaces,
1574                            )
1575                        })
1576                        .map(|(id, info)| {
1577                            (*id, info.bitrate, info.airtime_profile, info.announce_cap)
1578                        })
1579                        .collect();
1580
1581                    for (iface_id, bitrate, airtime_profile, announce_cap) in iface_ids {
1582                        if let Some(send_action) = self.announce_queues.gate_announce(
1583                            iface_id,
1584                            raw.clone(),
1585                            dest_hash,
1586                            hops,
1587                            now,
1588                            now,
1589                            bitrate,
1590                            airtime_profile,
1591                            announce_cap,
1592                        ) {
1593                            result.push(send_action);
1594                        }
1595                    }
1596                }
1597                other => result.push(other),
1598            }
1599        }
1600        result
1601    }
1602
1603    /// Extract destination hash and hops from raw announce bytes.
1604    fn extract_announce_info(raw: &[u8]) -> ([u8; 16], u8) {
1605        if raw.len() < 18 {
1606            return ([0; 16], 0);
1607        }
1608        let header_type = (raw[0] >> 6) & 0x03;
1609        let hops = raw[1];
1610        if header_type == constants::HEADER_2 && raw.len() >= 34 {
1611            // H2: transport_id at [2..18], dest_hash at [18..34]
1612            let mut dest = [0u8; 16];
1613            dest.copy_from_slice(&raw[18..34]);
1614            (dest, hops)
1615        } else {
1616            // H1: dest_hash at [2..18]
1617            let mut dest = [0u8; 16];
1618            dest.copy_from_slice(&raw[2..18]);
1619            (dest, hops)
1620        }
1621    }
1622
1623    #[cfg(test)]
1624    #[allow(dead_code)]
1625    pub(crate) fn link_table_ref(&self) -> &BTreeMap<[u8; 16], LinkEntry> {
1626        &self.link_table
1627    }
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use super::*;
1633    use crate::packet::PacketFlags;
1634
1635    fn make_config(transport_enabled: bool) -> TransportConfig {
1636        TransportConfig {
1637            transport_enabled,
1638            identity_hash: if transport_enabled {
1639                Some([0x42; 16])
1640            } else {
1641                None
1642            },
1643            prefer_shorter_path: false,
1644            max_paths_per_destination: 1,
1645            packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
1646            max_discovery_pr_tags: constants::MAX_PR_TAGS,
1647            max_path_destinations: usize::MAX,
1648            max_tunnel_destinations_total: usize::MAX,
1649            destination_timeout_secs: constants::DESTINATION_TIMEOUT,
1650            announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
1651            announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
1652            announce_sig_cache_enabled: true,
1653            announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
1654            announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
1655            announce_queue_max_entries: 256,
1656            announce_queue_max_interfaces: 1024,
1657        }
1658    }
1659
1660    fn make_interface(id: u64, mode: u8) -> InterfaceInfo {
1661        InterfaceInfo {
1662            id: InterfaceId(id),
1663            name: String::from("test"),
1664            mode,
1665            out_capable: true,
1666            in_capable: true,
1667            bitrate: None,
1668            airtime_profile: None,
1669            announce_rate_target: None,
1670            announce_rate_grace: 0,
1671            announce_rate_penalty: 0.0,
1672            announce_cap: constants::ANNOUNCE_CAP,
1673            is_local_client: false,
1674            wants_tunnel: false,
1675            tunnel_id: None,
1676            mtu: constants::MTU as u32,
1677            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
1678            ia_freq: 0.0,
1679            started: 0.0,
1680        }
1681    }
1682
1683    fn make_announce_entry(dest_hash: [u8; 16], timestamp: f64, fill_len: usize) -> AnnounceEntry {
1684        AnnounceEntry {
1685            timestamp,
1686            retransmit_timeout: timestamp,
1687            retries: 0,
1688            received_from: [0xAA; 16],
1689            hops: 2,
1690            packet_raw: vec![0x01; fill_len],
1691            packet_data: vec![0x02; fill_len],
1692            destination_hash: dest_hash,
1693            context_flag: 0,
1694            local_rebroadcasts: 0,
1695            block_rebroadcasts: false,
1696            attached_interface: None,
1697        }
1698    }
1699
1700    fn make_path_entry(
1701        timestamp: f64,
1702        hops: u8,
1703        receiving_interface: InterfaceId,
1704        next_hop: [u8; 16],
1705    ) -> PathEntry {
1706        PathEntry {
1707            timestamp,
1708            next_hop,
1709            hops,
1710            expires: timestamp + 10_000.0,
1711            random_blobs: Vec::new(),
1712            receiving_interface,
1713            packet_hash: [0; 32],
1714            announce_raw: None,
1715        }
1716    }
1717
1718    fn make_unique_tag(dest_hash: [u8; 16], tag: &[u8]) -> [u8; 32] {
1719        let mut unique_tag = [0u8; 32];
1720        let tag_len = tag.len().min(16);
1721        unique_tag[..16].copy_from_slice(&dest_hash);
1722        unique_tag[16..16 + tag_len].copy_from_slice(&tag[..tag_len]);
1723        unique_tag
1724    }
1725
1726    fn make_random_blob(timebase: u64) -> [u8; 10] {
1727        let mut blob = [0u8; 10];
1728        let bytes = timebase.to_be_bytes();
1729        blob[5..10].copy_from_slice(&bytes[3..8]);
1730        blob
1731    }
1732
1733    #[test]
1734    fn test_empty_engine() {
1735        let engine = TransportEngine::new(make_config(false));
1736        assert!(!engine.has_path(&[0; 16]));
1737        assert!(engine.hops_to(&[0; 16]).is_none());
1738        assert!(engine.next_hop(&[0; 16]).is_none());
1739    }
1740
1741    #[test]
1742    fn test_register_deregister_interface() {
1743        let mut engine = TransportEngine::new(make_config(false));
1744        engine.register_interface(make_interface(1, constants::MODE_FULL));
1745        assert!(engine.interfaces.contains_key(&InterfaceId(1)));
1746
1747        engine.deregister_interface(InterfaceId(1));
1748        assert!(!engine.interfaces.contains_key(&InterfaceId(1)));
1749    }
1750
1751    #[test]
1752    fn test_deregister_interface_removes_announce_queue_state() {
1753        let mut engine = TransportEngine::new(make_config(false));
1754        engine.register_interface(make_interface(1, constants::MODE_FULL));
1755
1756        let _ = engine.announce_queues.gate_announce(
1757            InterfaceId(1),
1758            vec![0x01; 100].into(),
1759            [0xAA; 16],
1760            2,
1761            0.0,
1762            0.0,
1763            Some(1000),
1764            None,
1765            constants::ANNOUNCE_CAP,
1766        );
1767        let _ = engine.announce_queues.gate_announce(
1768            InterfaceId(1),
1769            vec![0x02; 100].into(),
1770            [0xBB; 16],
1771            3,
1772            0.0,
1773            0.0,
1774            Some(1000),
1775            None,
1776            constants::ANNOUNCE_CAP,
1777        );
1778        assert_eq!(engine.announce_queue_count(), 1);
1779
1780        engine.deregister_interface(InterfaceId(1));
1781        assert_eq!(engine.announce_queue_count(), 0);
1782    }
1783
1784    #[test]
1785    fn test_deregister_interface_preserves_other_announce_queues() {
1786        let mut engine = TransportEngine::new(make_config(false));
1787        engine.register_interface(make_interface(1, constants::MODE_FULL));
1788        engine.register_interface(make_interface(2, constants::MODE_FULL));
1789
1790        let _ = engine.announce_queues.gate_announce(
1791            InterfaceId(1),
1792            vec![0x01; 100].into(),
1793            [0xAA; 16],
1794            2,
1795            0.0,
1796            0.0,
1797            Some(1000),
1798            None,
1799            constants::ANNOUNCE_CAP,
1800        );
1801        let _ = engine.announce_queues.gate_announce(
1802            InterfaceId(1),
1803            vec![0x02; 100].into(),
1804            [0xAB; 16],
1805            3,
1806            0.0,
1807            0.0,
1808            Some(1000),
1809            None,
1810            constants::ANNOUNCE_CAP,
1811        );
1812        let _ = engine.announce_queues.gate_announce(
1813            InterfaceId(2),
1814            vec![0x03; 100].into(),
1815            [0xBA; 16],
1816            2,
1817            0.0,
1818            0.0,
1819            Some(1000),
1820            None,
1821            constants::ANNOUNCE_CAP,
1822        );
1823        let _ = engine.announce_queues.gate_announce(
1824            InterfaceId(2),
1825            vec![0x04; 100].into(),
1826            [0xBB; 16],
1827            3,
1828            0.0,
1829            0.0,
1830            Some(1000),
1831            None,
1832            constants::ANNOUNCE_CAP,
1833        );
1834
1835        engine.deregister_interface(InterfaceId(1));
1836        assert_eq!(engine.announce_queue_count(), 1);
1837        assert_eq!(engine.nonempty_announce_queue_count(), 1);
1838    }
1839
1840    #[test]
1841    fn test_register_deregister_destination() {
1842        let mut engine = TransportEngine::new(make_config(false));
1843        let dest = [0x11; 16];
1844        engine.register_destination(dest, constants::DESTINATION_SINGLE);
1845        assert!(engine.local_destinations.contains_key(&dest));
1846
1847        engine.deregister_destination(&dest);
1848        assert!(!engine.local_destinations.contains_key(&dest));
1849    }
1850
1851    #[test]
1852    fn test_path_state() {
1853        let mut engine = TransportEngine::new(make_config(false));
1854        let dest = [0x22; 16];
1855
1856        assert!(!engine.path_is_unresponsive(&dest));
1857
1858        engine.mark_path_unresponsive(&dest, None);
1859        assert!(engine.path_is_unresponsive(&dest));
1860
1861        engine.mark_path_responsive(&dest);
1862        assert!(!engine.path_is_unresponsive(&dest));
1863    }
1864
1865    #[test]
1866    fn test_announce_clears_stale_path_state_for_unknown_destination() {
1867        use crate::announce::AnnounceData;
1868        use crate::destination::{destination_hash, name_hash};
1869
1870        let mut engine = TransportEngine::new(make_config(false));
1871        engine.register_interface(make_interface(1, constants::MODE_FULL));
1872
1873        let identity =
1874            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x61; 32]));
1875        let dest_hash = destination_hash("pathfix", &["announce"], Some(identity.hash()));
1876        let name_h = name_hash("pathfix", &["announce"]);
1877        let random_hash = [0x24u8; 10];
1878
1879        let (announce_data, _) =
1880            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
1881
1882        let packet = RawPacket::pack(
1883            PacketFlags {
1884                header_type: constants::HEADER_1,
1885                context_flag: constants::FLAG_UNSET,
1886                transport_type: constants::TRANSPORT_BROADCAST,
1887                destination_type: constants::DESTINATION_SINGLE,
1888                packet_type: constants::PACKET_TYPE_ANNOUNCE,
1889            },
1890            0,
1891            &dest_hash,
1892            None,
1893            constants::CONTEXT_NONE,
1894            &announce_data,
1895        )
1896        .unwrap();
1897
1898        engine.mark_path_unresponsive(&dest_hash, None);
1899        assert!(engine.path_is_unresponsive(&dest_hash));
1900        assert!(!engine.has_path(&dest_hash));
1901
1902        let mut rng = rns_crypto::FixedRng::new(&[0x62; 32]);
1903        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
1904
1905        assert!(engine.has_path(&dest_hash));
1906        assert!(
1907            !engine.path_is_unresponsive(&dest_hash),
1908            "stale path state should be cleared for newly installed paths"
1909        );
1910        assert!(actions.iter().any(|action| matches!(
1911            action,
1912            TransportAction::PathUpdated {
1913                destination_hash,
1914                interface,
1915                ..
1916            } if *destination_hash == dest_hash && *interface == InterfaceId(1)
1917        )));
1918    }
1919
1920    #[test]
1921    fn test_boundary_exempts_unresponsive() {
1922        let mut engine = TransportEngine::new(make_config(false));
1923        engine.register_interface(make_interface(1, constants::MODE_BOUNDARY));
1924        let dest = [0xB1; 16];
1925
1926        // Marking via a boundary interface should be skipped
1927        engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
1928        assert!(!engine.path_is_unresponsive(&dest));
1929    }
1930
1931    #[test]
1932    fn test_non_boundary_marks_unresponsive() {
1933        let mut engine = TransportEngine::new(make_config(false));
1934        engine.register_interface(make_interface(1, constants::MODE_FULL));
1935        let dest = [0xB2; 16];
1936
1937        // Marking via a non-boundary interface should work
1938        engine.mark_path_unresponsive(&dest, Some(InterfaceId(1)));
1939        assert!(engine.path_is_unresponsive(&dest));
1940    }
1941
1942    #[test]
1943    fn test_expire_path() {
1944        let mut engine = TransportEngine::new(make_config(false));
1945        let dest = [0x33; 16];
1946
1947        engine.path_table.insert(
1948            dest,
1949            PathSet::from_single(
1950                PathEntry {
1951                    timestamp: 1000.0,
1952                    next_hop: [0; 16],
1953                    hops: 2,
1954                    expires: 9999.0,
1955                    random_blobs: Vec::new(),
1956                    receiving_interface: InterfaceId(1),
1957                    packet_hash: [0; 32],
1958                    announce_raw: None,
1959                },
1960                1,
1961            ),
1962        );
1963
1964        assert!(engine.has_path(&dest));
1965        engine.expire_path(&dest);
1966        // Path still exists but expires = 0
1967        assert!(engine.has_path(&dest));
1968        assert_eq!(engine.path_table[&dest].primary().unwrap().expires, 0.0);
1969    }
1970
1971    #[test]
1972    fn test_link_table_operations() {
1973        let mut engine = TransportEngine::new(make_config(false));
1974        let link_id = [0x44; 16];
1975
1976        engine.register_link(
1977            link_id,
1978            LinkEntry {
1979                timestamp: 100.0,
1980                next_hop_transport_id: [0; 16],
1981                next_hop_interface: InterfaceId(1),
1982                remaining_hops: 3,
1983                received_interface: InterfaceId(2),
1984                taken_hops: 2,
1985                destination_hash: [0xAA; 16],
1986                validated: false,
1987                proof_timeout: 200.0,
1988            },
1989        );
1990
1991        assert!(engine.link_table.contains_key(&link_id));
1992        assert!(!engine.link_table[&link_id].validated);
1993
1994        engine.validate_link(&link_id);
1995        assert!(engine.link_table[&link_id].validated);
1996
1997        engine.remove_link(&link_id);
1998        assert!(!engine.link_table.contains_key(&link_id));
1999    }
2000
2001    #[test]
2002    fn test_lrproof_routes_from_originating_side_via_link_table() {
2003        let mut engine = TransportEngine::new(make_config(true));
2004        engine.register_interface(make_interface(1, constants::MODE_FULL));
2005        engine.register_interface(make_interface(2, constants::MODE_FULL));
2006
2007        let link_id = [0x44; 16];
2008        engine.register_link(
2009            link_id,
2010            LinkEntry {
2011                timestamp: 100.0,
2012                next_hop_transport_id: [0xAA; 16],
2013                next_hop_interface: InterfaceId(2),
2014                remaining_hops: 3,
2015                received_interface: InterfaceId(1),
2016                taken_hops: 1,
2017                destination_hash: [0xBB; 16],
2018                validated: false,
2019                proof_timeout: 200.0,
2020            },
2021        );
2022
2023        let flags = PacketFlags {
2024            header_type: constants::HEADER_1,
2025            context_flag: constants::FLAG_UNSET,
2026            transport_type: constants::TRANSPORT_BROADCAST,
2027            destination_type: constants::DESTINATION_LINK,
2028            packet_type: constants::PACKET_TYPE_PROOF,
2029        };
2030        let packet = RawPacket::pack(
2031            flags,
2032            0,
2033            &link_id,
2034            None,
2035            constants::CONTEXT_LRPROOF,
2036            &[0xCC; 64],
2037        )
2038        .unwrap();
2039        let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
2040
2041        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 101.0, &mut rng);
2042
2043        assert!(matches!(
2044            engine
2045                .link_table_ref()
2046                .get(&link_id)
2047                .map(|entry| entry.validated),
2048            Some(true)
2049        ));
2050        assert!(actions.iter().any(|action| matches!(
2051            action,
2052            TransportAction::LinkEstablished {
2053                link_id: established,
2054                interface: InterfaceId(2),
2055            } if *established == link_id
2056        )));
2057        assert!(actions.iter().any(|action| matches!(
2058            action,
2059            TransportAction::SendOnInterface {
2060                interface: InterfaceId(2),
2061                ..
2062            }
2063        )));
2064    }
2065
2066    #[test]
2067    fn test_packet_filter_drops_plain_announce() {
2068        let engine = TransportEngine::new(make_config(false));
2069        let flags = PacketFlags {
2070            header_type: constants::HEADER_1,
2071            context_flag: constants::FLAG_UNSET,
2072            transport_type: constants::TRANSPORT_BROADCAST,
2073            destination_type: constants::DESTINATION_PLAIN,
2074            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2075        };
2076        let packet =
2077            RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2078        assert!(!engine.packet_filter(&packet));
2079    }
2080
2081    #[test]
2082    fn test_packet_filter_allows_keepalive() {
2083        let engine = TransportEngine::new(make_config(false));
2084        let flags = PacketFlags {
2085            header_type: constants::HEADER_1,
2086            context_flag: constants::FLAG_UNSET,
2087            transport_type: constants::TRANSPORT_BROADCAST,
2088            destination_type: constants::DESTINATION_SINGLE,
2089            packet_type: constants::PACKET_TYPE_DATA,
2090        };
2091        let packet = RawPacket::pack(
2092            flags,
2093            0,
2094            &[0; 16],
2095            None,
2096            constants::CONTEXT_KEEPALIVE,
2097            b"test",
2098        )
2099        .unwrap();
2100        assert!(engine.packet_filter(&packet));
2101    }
2102
2103    #[test]
2104    fn test_packet_filter_drops_high_hop_plain() {
2105        let engine = TransportEngine::new(make_config(false));
2106        let flags = PacketFlags {
2107            header_type: constants::HEADER_1,
2108            context_flag: constants::FLAG_UNSET,
2109            transport_type: constants::TRANSPORT_BROADCAST,
2110            destination_type: constants::DESTINATION_PLAIN,
2111            packet_type: constants::PACKET_TYPE_DATA,
2112        };
2113        let mut packet =
2114            RawPacket::pack(flags, 0, &[0; 16], None, constants::CONTEXT_NONE, b"test").unwrap();
2115        packet.hops = 2;
2116        assert!(!engine.packet_filter(&packet));
2117    }
2118
2119    #[test]
2120    fn test_packet_filter_allows_duplicate_single_announce() {
2121        let mut engine = TransportEngine::new(make_config(false));
2122        let flags = PacketFlags {
2123            header_type: constants::HEADER_1,
2124            context_flag: constants::FLAG_UNSET,
2125            transport_type: constants::TRANSPORT_BROADCAST,
2126            destination_type: constants::DESTINATION_SINGLE,
2127            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2128        };
2129        let packet = RawPacket::pack(
2130            flags,
2131            0,
2132            &[0; 16],
2133            None,
2134            constants::CONTEXT_NONE,
2135            &[0xAA; 64],
2136        )
2137        .unwrap();
2138
2139        // Add to hashlist
2140        engine.packet_hashlist.add(packet.packet_hash);
2141
2142        // Should still pass filter (duplicate announce for SINGLE allowed)
2143        assert!(engine.packet_filter(&packet));
2144    }
2145
2146    #[test]
2147    fn test_packet_filter_fifo_eviction_allows_oldest_hash_again() {
2148        let mut engine = TransportEngine::new(make_config(false));
2149        engine.packet_hashlist = PacketHashlist::new(2);
2150
2151        let make_packet = |seed: u8| {
2152            let flags = PacketFlags {
2153                header_type: constants::HEADER_1,
2154                context_flag: constants::FLAG_UNSET,
2155                transport_type: constants::TRANSPORT_BROADCAST,
2156                destination_type: constants::DESTINATION_SINGLE,
2157                packet_type: constants::PACKET_TYPE_DATA,
2158            };
2159            RawPacket::pack(
2160                flags,
2161                0,
2162                &[seed; 16],
2163                None,
2164                constants::CONTEXT_NONE,
2165                &[seed; 4],
2166            )
2167            .unwrap()
2168        };
2169
2170        let packet1 = make_packet(1);
2171        let packet2 = make_packet(2);
2172        let packet3 = make_packet(3);
2173
2174        engine.packet_hashlist.add(packet1.packet_hash);
2175        engine.packet_hashlist.add(packet2.packet_hash);
2176        assert!(!engine.packet_filter(&packet1));
2177
2178        engine.packet_hashlist.add(packet3.packet_hash);
2179
2180        assert!(engine.packet_filter(&packet1));
2181        assert!(!engine.packet_filter(&packet2));
2182        assert!(!engine.packet_filter(&packet3));
2183    }
2184
2185    #[test]
2186    fn test_packet_filter_duplicate_does_not_refresh_recency() {
2187        let mut engine = TransportEngine::new(make_config(false));
2188        engine.packet_hashlist = PacketHashlist::new(2);
2189
2190        let make_packet = |seed: u8| {
2191            let flags = PacketFlags {
2192                header_type: constants::HEADER_1,
2193                context_flag: constants::FLAG_UNSET,
2194                transport_type: constants::TRANSPORT_BROADCAST,
2195                destination_type: constants::DESTINATION_SINGLE,
2196                packet_type: constants::PACKET_TYPE_DATA,
2197            };
2198            RawPacket::pack(
2199                flags,
2200                0,
2201                &[seed; 16],
2202                None,
2203                constants::CONTEXT_NONE,
2204                &[seed; 4],
2205            )
2206            .unwrap()
2207        };
2208
2209        let packet1 = make_packet(1);
2210        let packet2 = make_packet(2);
2211        let packet3 = make_packet(3);
2212
2213        engine.packet_hashlist.add(packet1.packet_hash);
2214        engine.packet_hashlist.add(packet2.packet_hash);
2215        engine.packet_hashlist.add(packet2.packet_hash);
2216        engine.packet_hashlist.add(packet3.packet_hash);
2217
2218        assert!(engine.packet_filter(&packet1));
2219        assert!(!engine.packet_filter(&packet2));
2220        assert!(!engine.packet_filter(&packet3));
2221    }
2222
2223    #[test]
2224    fn test_tick_retransmits_announce() {
2225        let mut engine = TransportEngine::new(make_config(true));
2226        engine.register_interface(make_interface(1, constants::MODE_FULL));
2227
2228        let dest = [0x55; 16];
2229        engine.insert_announce_entry(
2230            dest,
2231            AnnounceEntry {
2232                timestamp: 190.0,
2233                retransmit_timeout: 100.0, // ready to retransmit
2234                retries: 0,
2235                received_from: [0xAA; 16],
2236                hops: 2,
2237                packet_raw: vec![0x01, 0x02],
2238                packet_data: vec![0xCC; 10],
2239                destination_hash: dest,
2240                context_flag: 0,
2241                local_rebroadcasts: 0,
2242                block_rebroadcasts: false,
2243                attached_interface: None,
2244            },
2245            190.0,
2246        );
2247
2248        let mut rng = rns_crypto::FixedRng::new(&[0x42; 32]);
2249        let actions = engine.tick(200.0, &mut rng);
2250
2251        // Should have a send action for the retransmit (gated through announce queue,
2252        // expanded from BroadcastOnAllInterfaces to per-interface SendOnInterface)
2253        assert!(!actions.is_empty());
2254        assert!(matches!(
2255            &actions[0],
2256            TransportAction::SendOnInterface { .. }
2257        ));
2258
2259        // Retries should have increased
2260        assert_eq!(engine.announce_table[&dest].retries, 1);
2261    }
2262
2263    #[test]
2264    fn test_gate_retransmit_actions_expands_broadcast_to_matching_interfaces() {
2265        let mut engine = TransportEngine::new(make_config(false));
2266        engine.register_interface(make_interface(1, constants::MODE_FULL));
2267        engine.register_interface(make_interface(2, constants::MODE_FULL));
2268        engine.register_interface(make_interface(3, constants::MODE_ACCESS_POINT));
2269
2270        let dest = [0x56; 16];
2271        let raw = make_announce_raw(&dest, &[0xAB; 32]);
2272        let actions = engine.gate_retransmit_actions(
2273            vec![TransportAction::BroadcastOnAllInterfaces {
2274                raw: raw.clone().into(),
2275                exclude: None,
2276            }],
2277            1000.0,
2278        );
2279
2280        assert_eq!(actions.len(), 2);
2281        for action in &actions {
2282            match action {
2283                TransportAction::SendOnInterface {
2284                    interface,
2285                    raw: sent,
2286                } => {
2287                    assert!(*interface == InterfaceId(1) || *interface == InterfaceId(2));
2288                    assert_eq!(&**sent, raw.as_slice());
2289                }
2290                other => panic!("expected SendOnInterface, got {:?}", other),
2291            }
2292        }
2293    }
2294
2295    #[test]
2296    fn test_tick_culls_expired_announce_entries() {
2297        let mut config = make_config(true);
2298        config.announce_table_ttl_secs = 10.0;
2299        let mut engine = TransportEngine::new(config);
2300
2301        let dest1 = [0x61; 16];
2302        let dest2 = [0x62; 16];
2303        assert!(engine.insert_announce_entry(dest1, make_announce_entry(dest1, 100.0, 8), 100.0));
2304        assert!(engine.insert_held_announce(dest2, make_announce_entry(dest2, 100.0, 8), 100.0));
2305
2306        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2307        let _ = engine.tick(111.0, &mut rng);
2308
2309        assert!(!engine.announce_table().contains_key(&dest1));
2310        assert!(!engine.held_announces().contains_key(&dest2));
2311    }
2312
2313    #[test]
2314    fn test_announce_retention_cap_evicts_oldest_and_prefers_held_on_tie() {
2315        let sample_entry = make_announce_entry([0x70; 16], 100.0, 32);
2316        let mut config = make_config(true);
2317        config.announce_table_max_bytes = TransportEngine::announce_entry_size_bytes(&sample_entry)
2318            * 2
2319            + TransportEngine::announce_entry_size_bytes(&sample_entry) / 2;
2320        let max_bytes = config.announce_table_max_bytes;
2321        let mut engine = TransportEngine::new(config);
2322
2323        let held_dest = [0x71; 16];
2324        let active_dest = [0x72; 16];
2325        let newest_dest = [0x73; 16];
2326
2327        assert!(engine.insert_held_announce(
2328            held_dest,
2329            make_announce_entry(held_dest, 100.0, 32),
2330            100.0,
2331        ));
2332        assert!(engine.insert_announce_entry(
2333            active_dest,
2334            make_announce_entry(active_dest, 100.0, 32),
2335            100.0,
2336        ));
2337        assert!(engine.insert_announce_entry(
2338            newest_dest,
2339            make_announce_entry(newest_dest, 101.0, 32),
2340            101.0,
2341        ));
2342
2343        assert!(!engine.held_announces().contains_key(&held_dest));
2344        assert!(engine.announce_table().contains_key(&active_dest));
2345        assert!(engine.announce_table().contains_key(&newest_dest));
2346        assert!(engine.announce_retained_bytes() <= max_bytes);
2347    }
2348
2349    #[test]
2350    fn test_oversized_announce_entry_is_not_retained() {
2351        let mut config = make_config(true);
2352        config.announce_table_max_bytes = 200;
2353        let mut engine = TransportEngine::new(config);
2354        let dest = [0x81; 16];
2355
2356        assert!(!engine.insert_announce_entry(dest, make_announce_entry(dest, 100.0, 256), 100.0));
2357        assert!(!engine.announce_table().contains_key(&dest));
2358        assert_eq!(engine.announce_retained_bytes(), 0);
2359    }
2360
2361    #[test]
2362    fn test_blackhole_identity() {
2363        let mut engine = TransportEngine::new(make_config(false));
2364        let hash = [0xAA; 16];
2365        let now = 1000.0;
2366
2367        assert!(!engine.is_blackholed(&hash, now));
2368
2369        engine.blackhole_identity(hash, now, None, Some(String::from("test")));
2370        assert!(engine.is_blackholed(&hash, now));
2371        assert!(engine.is_blackholed(&hash, now + 999999.0)); // never expires
2372
2373        assert!(engine.unblackhole_identity(&hash));
2374        assert!(!engine.is_blackholed(&hash, now));
2375        assert!(!engine.unblackhole_identity(&hash)); // already removed
2376    }
2377
2378    #[test]
2379    fn test_blackhole_with_duration() {
2380        let mut engine = TransportEngine::new(make_config(false));
2381        let hash = [0xBB; 16];
2382        let now = 1000.0;
2383
2384        engine.blackhole_identity(hash, now, Some(1.0), None); // 1 hour
2385        assert!(engine.is_blackholed(&hash, now));
2386        assert!(engine.is_blackholed(&hash, now + 3599.0)); // just before expiry
2387        assert!(!engine.is_blackholed(&hash, now + 3601.0)); // after expiry
2388    }
2389
2390    #[test]
2391    fn test_cull_blackholed() {
2392        let mut engine = TransportEngine::new(make_config(false));
2393        let hash1 = [0xCC; 16];
2394        let hash2 = [0xDD; 16];
2395        let now = 1000.0;
2396
2397        engine.blackhole_identity(hash1, now, Some(1.0), None); // 1 hour
2398        engine.blackhole_identity(hash2, now, None, None); // never expires
2399
2400        engine.cull_blackholed(now + 4000.0); // past hash1 expiry
2401
2402        assert!(!engine.blackholed_identities.contains_key(&hash1));
2403        assert!(engine.blackholed_identities.contains_key(&hash2));
2404    }
2405
2406    #[test]
2407    fn test_blackhole_blocks_announce() {
2408        use crate::announce::AnnounceData;
2409        use crate::destination::{destination_hash, name_hash};
2410
2411        let mut engine = TransportEngine::new(make_config(false));
2412        engine.register_interface(make_interface(1, constants::MODE_FULL));
2413
2414        let identity =
2415            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x55; 32]));
2416        let dest_hash = destination_hash("test", &["app"], Some(identity.hash()));
2417        let name_h = name_hash("test", &["app"]);
2418        let random_hash = [0x42u8; 10];
2419
2420        let (announce_data, _) =
2421            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2422
2423        let flags = PacketFlags {
2424            header_type: constants::HEADER_1,
2425            context_flag: constants::FLAG_UNSET,
2426            transport_type: constants::TRANSPORT_BROADCAST,
2427            destination_type: constants::DESTINATION_SINGLE,
2428            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2429        };
2430        let packet = RawPacket::pack(
2431            flags,
2432            0,
2433            &dest_hash,
2434            None,
2435            constants::CONTEXT_NONE,
2436            &announce_data,
2437        )
2438        .unwrap();
2439
2440        // Blackhole the identity
2441        let now = 1000.0;
2442        engine.blackhole_identity(*identity.hash(), now, None, None);
2443
2444        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2445        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), now, &mut rng);
2446
2447        // Should produce no AnnounceReceived or PathUpdated actions
2448        assert!(actions
2449            .iter()
2450            .all(|a| !matches!(a, TransportAction::AnnounceReceived { .. })));
2451        assert!(actions
2452            .iter()
2453            .all(|a| !matches!(a, TransportAction::PathUpdated { .. })));
2454    }
2455
2456    #[test]
2457    fn test_async_announce_retransmit_cleanup_happens_before_queueing() {
2458        use crate::announce::AnnounceData;
2459        use crate::destination::{destination_hash, name_hash};
2460        use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2461
2462        let mut engine = TransportEngine::new(make_config(true));
2463        engine.register_interface(make_interface(1, constants::MODE_FULL));
2464
2465        let identity =
2466            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x31; 32]));
2467        let dest_hash = destination_hash("async", &["announce"], Some(identity.hash()));
2468        let name_h = name_hash("async", &["announce"]);
2469        let random_hash = [0x44u8; 10];
2470        let (announce_data, _) =
2471            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2472
2473        let packet = RawPacket::pack(
2474            PacketFlags {
2475                header_type: constants::HEADER_2,
2476                context_flag: constants::FLAG_UNSET,
2477                transport_type: constants::TRANSPORT_TRANSPORT,
2478                destination_type: constants::DESTINATION_SINGLE,
2479                packet_type: constants::PACKET_TYPE_ANNOUNCE,
2480            },
2481            3,
2482            &dest_hash,
2483            Some(&[0xBB; 16]),
2484            constants::CONTEXT_NONE,
2485            &announce_data,
2486        )
2487        .unwrap();
2488
2489        engine.announce_table.insert(
2490            dest_hash,
2491            AnnounceEntry {
2492                timestamp: 1000.0,
2493                retransmit_timeout: 2000.0,
2494                retries: constants::PATHFINDER_R,
2495                received_from: [0xBB; 16],
2496                hops: 2,
2497                packet_raw: packet.raw.clone(),
2498                packet_data: packet.data.clone(),
2499                destination_hash: dest_hash,
2500                context_flag: constants::FLAG_UNSET,
2501                local_rebroadcasts: 0,
2502                block_rebroadcasts: false,
2503                attached_interface: None,
2504            },
2505        );
2506
2507        let mut queue = AnnounceVerifyQueue::new(8);
2508        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2509        let actions = engine.handle_inbound_with_announce_queue(
2510            &packet.raw,
2511            InterfaceId(1),
2512            1000.0,
2513            &mut rng,
2514            Some(&mut queue),
2515        );
2516
2517        assert!(actions.is_empty());
2518        assert_eq!(queue.len(), 1);
2519        assert!(
2520            !engine.announce_table.contains_key(&dest_hash),
2521            "retransmit completion should clear announce_table before queueing"
2522        );
2523    }
2524
2525    #[test]
2526    fn test_async_announce_completion_inserts_sig_cache_and_prevents_requeue() {
2527        use crate::announce::AnnounceData;
2528        use crate::destination::{destination_hash, name_hash};
2529        use crate::transport::announce_verify_queue::AnnounceVerifyQueue;
2530
2531        let mut engine = TransportEngine::new(make_config(false));
2532        engine.register_interface(make_interface(1, constants::MODE_FULL));
2533
2534        let identity =
2535            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x52; 32]));
2536        let dest_hash = destination_hash("async", &["cache"], Some(identity.hash()));
2537        let name_h = name_hash("async", &["cache"]);
2538        let random_hash = [0x55u8; 10];
2539        let (announce_data, _) =
2540            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2541
2542        let packet = RawPacket::pack(
2543            PacketFlags {
2544                header_type: constants::HEADER_1,
2545                context_flag: constants::FLAG_UNSET,
2546                transport_type: constants::TRANSPORT_BROADCAST,
2547                destination_type: constants::DESTINATION_SINGLE,
2548                packet_type: constants::PACKET_TYPE_ANNOUNCE,
2549            },
2550            0,
2551            &dest_hash,
2552            None,
2553            constants::CONTEXT_NONE,
2554            &announce_data,
2555        )
2556        .unwrap();
2557
2558        let mut queue = AnnounceVerifyQueue::new(8);
2559        let mut rng = rns_crypto::FixedRng::new(&[0x77; 32]);
2560        let actions = engine.handle_inbound_with_announce_queue(
2561            &packet.raw,
2562            InterfaceId(1),
2563            1000.0,
2564            &mut rng,
2565            Some(&mut queue),
2566        );
2567        assert!(actions.is_empty());
2568        assert_eq!(queue.len(), 1);
2569
2570        let mut batch = queue.take_pending(1000.0);
2571        assert_eq!(batch.len(), 1);
2572        let (key, pending) = batch.pop().unwrap();
2573
2574        let announce = AnnounceData::unpack(&pending.packet.data, false).unwrap();
2575        let validated = announce.validate(&pending.packet.destination_hash).unwrap();
2576        let mut material = [0u8; 80];
2577        material[..16].copy_from_slice(&pending.packet.destination_hash);
2578        material[16..].copy_from_slice(&announce.signature);
2579        let sig_cache_key = hash::full_hash(&material);
2580
2581        let pending = queue.complete_success(&key).unwrap();
2582        let actions =
2583            engine.complete_verified_announce(pending, validated, sig_cache_key, 1000.0, &mut rng);
2584        assert!(actions
2585            .iter()
2586            .any(|action| matches!(action, TransportAction::AnnounceReceived { .. })));
2587        assert!(engine.announce_sig_cache_contains(&sig_cache_key));
2588
2589        let actions = engine.handle_inbound_with_announce_queue(
2590            &packet.raw,
2591            InterfaceId(1),
2592            1001.0,
2593            &mut rng,
2594            Some(&mut queue),
2595        );
2596        assert!(actions.is_empty());
2597        assert_eq!(queue.len(), 0);
2598    }
2599
2600    #[test]
2601    fn test_tick_culls_expired_path() {
2602        let mut engine = TransportEngine::new(make_config(false));
2603        engine.register_interface(make_interface(1, constants::MODE_FULL));
2604
2605        let dest = [0x66; 16];
2606        engine.path_table.insert(
2607            dest,
2608            PathSet::from_single(
2609                PathEntry {
2610                    timestamp: 100.0,
2611                    next_hop: [0; 16],
2612                    hops: 2,
2613                    expires: 200.0,
2614                    random_blobs: Vec::new(),
2615                    receiving_interface: InterfaceId(1),
2616                    packet_hash: [0; 32],
2617                    announce_raw: None,
2618                },
2619                1,
2620            ),
2621        );
2622
2623        assert!(engine.has_path(&dest));
2624
2625        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2626        // Advance past cull interval and path expiry
2627        engine.tick(300.0, &mut rng);
2628
2629        assert!(!engine.has_path(&dest));
2630    }
2631
2632    // =========================================================================
2633    // Phase 7b: Local client transport tests
2634    // =========================================================================
2635
2636    fn make_local_client_interface(id: u64) -> InterfaceInfo {
2637        InterfaceInfo {
2638            id: InterfaceId(id),
2639            name: String::from("local_client"),
2640            mode: constants::MODE_FULL,
2641            out_capable: true,
2642            in_capable: true,
2643            bitrate: None,
2644            airtime_profile: None,
2645            announce_rate_target: None,
2646            announce_rate_grace: 0,
2647            announce_rate_penalty: 0.0,
2648            announce_cap: constants::ANNOUNCE_CAP,
2649            is_local_client: true,
2650            wants_tunnel: false,
2651            tunnel_id: None,
2652            mtu: constants::MTU as u32,
2653            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
2654            ia_freq: 0.0,
2655            started: 0.0,
2656        }
2657    }
2658
2659    #[test]
2660    fn test_has_local_clients() {
2661        let mut engine = TransportEngine::new(make_config(false));
2662        assert!(!engine.has_local_clients());
2663
2664        engine.register_interface(make_interface(1, constants::MODE_FULL));
2665        assert!(!engine.has_local_clients());
2666
2667        engine.register_interface(make_local_client_interface(2));
2668        assert!(engine.has_local_clients());
2669
2670        engine.deregister_interface(InterfaceId(2));
2671        assert!(!engine.has_local_clients());
2672    }
2673
2674    #[test]
2675    fn test_local_client_hop_decrement() {
2676        // Packets from local clients should have their hops decremented
2677        // to cancel the standard +1 (net zero change)
2678        let mut engine = TransportEngine::new(make_config(false));
2679        engine.register_interface(make_local_client_interface(1));
2680        engine.register_interface(make_interface(2, constants::MODE_FULL));
2681
2682        // Register destination so we get a DeliverLocal action
2683        let dest = [0xAA; 16];
2684        engine.register_destination(dest, constants::DESTINATION_PLAIN);
2685
2686        let flags = PacketFlags {
2687            header_type: constants::HEADER_1,
2688            context_flag: constants::FLAG_UNSET,
2689            transport_type: constants::TRANSPORT_BROADCAST,
2690            destination_type: constants::DESTINATION_PLAIN,
2691            packet_type: constants::PACKET_TYPE_DATA,
2692        };
2693        // Pack with hops=0
2694        let packet =
2695            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2696
2697        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2698        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2699
2700        // Should have local delivery; hops should still be 0 (not 1)
2701        // because the local client decrement cancels the increment
2702        let deliver = actions
2703            .iter()
2704            .find(|a| matches!(a, TransportAction::DeliverLocal { .. }));
2705        assert!(deliver.is_some(), "Should deliver locally");
2706    }
2707
2708    #[test]
2709    fn test_prepare_inbound_packet_only_retains_original_raw_for_announces() {
2710        let engine = TransportEngine::new(make_config(false));
2711        let dest = [0xAB; 16];
2712        let flags = PacketFlags {
2713            header_type: constants::HEADER_1,
2714            context_flag: constants::FLAG_UNSET,
2715            transport_type: constants::TRANSPORT_BROADCAST,
2716            destination_type: constants::DESTINATION_SINGLE,
2717            packet_type: constants::PACKET_TYPE_DATA,
2718        };
2719        let packet =
2720            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2721
2722        let ctx = engine
2723            .prepare_inbound_packet(&packet.raw, InterfaceId(9), 1000.0)
2724            .expect("packet should parse and pass filter");
2725
2726        assert!(ctx.original_raw.is_none());
2727        assert_eq!(ctx.packet.raw, packet.raw);
2728        assert_eq!(ctx.packet.hops, 1);
2729        assert_eq!(ctx.iface, InterfaceId(9));
2730
2731        let announce_flags = PacketFlags {
2732            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2733            ..flags
2734        };
2735        let announce = RawPacket::pack(
2736            announce_flags,
2737            0,
2738            &dest,
2739            None,
2740            constants::CONTEXT_NONE,
2741            &[0u8; 91],
2742        )
2743        .unwrap();
2744        let announce_ctx = engine
2745            .prepare_inbound_packet(&announce.raw, InterfaceId(9), 1000.0)
2746            .expect("announce should parse and pass filter");
2747        assert_eq!(
2748            announce_ctx.original_raw.as_deref(),
2749            Some(announce.raw.as_slice())
2750        );
2751    }
2752
2753    #[test]
2754    fn test_deliver_local_preserves_original_raw_and_metadata() {
2755        let mut engine = TransportEngine::new(make_config(false));
2756        engine.register_interface(make_interface(1, constants::MODE_FULL));
2757
2758        let dest = [0xAC; 16];
2759        engine.register_destination(dest, constants::DESTINATION_SINGLE);
2760
2761        let flags = PacketFlags {
2762            header_type: constants::HEADER_1,
2763            context_flag: constants::FLAG_UNSET,
2764            transport_type: constants::TRANSPORT_BROADCAST,
2765            destination_type: constants::DESTINATION_SINGLE,
2766            packet_type: constants::PACKET_TYPE_DATA,
2767        };
2768        let packet =
2769            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"deliver").unwrap();
2770
2771        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2772        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2773
2774        let deliver = actions
2775            .iter()
2776            .find_map(|action| match action {
2777                TransportAction::DeliverLocal {
2778                    destination_hash,
2779                    raw,
2780                    packet_hash,
2781                    receiving_interface,
2782                } => Some((destination_hash, raw, packet_hash, receiving_interface)),
2783                _ => None,
2784            })
2785            .expect("should produce DeliverLocal");
2786
2787        assert_eq!(*deliver.0, dest);
2788        assert_eq!(&**deliver.1, packet.raw.as_slice());
2789        assert_eq!(*deliver.2, packet.packet_hash);
2790        assert_eq!(*deliver.3, InterfaceId(1));
2791    }
2792
2793    #[test]
2794    fn test_plain_broadcast_from_local_client() {
2795        // PLAIN broadcast from local client should forward to external interfaces
2796        let mut engine = TransportEngine::new(make_config(false));
2797        engine.register_interface(make_local_client_interface(1));
2798        engine.register_interface(make_interface(2, constants::MODE_FULL));
2799
2800        let dest = [0xBB; 16];
2801        let flags = PacketFlags {
2802            header_type: constants::HEADER_1,
2803            context_flag: constants::FLAG_UNSET,
2804            transport_type: constants::TRANSPORT_BROADCAST,
2805            destination_type: constants::DESTINATION_PLAIN,
2806            packet_type: constants::PACKET_TYPE_DATA,
2807        };
2808        let packet =
2809            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2810
2811        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2812        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2813
2814        // Should have ForwardPlainBroadcast to external (to_local=false)
2815        let forward = actions.iter().find(|a| {
2816            matches!(
2817                a,
2818                TransportAction::ForwardPlainBroadcast {
2819                    to_local: false,
2820                    ..
2821                }
2822            )
2823        });
2824        assert!(forward.is_some(), "Should forward to external interfaces");
2825    }
2826
2827    #[test]
2828    fn test_plain_broadcast_from_external() {
2829        // PLAIN broadcast from external should forward to local clients
2830        let mut engine = TransportEngine::new(make_config(false));
2831        engine.register_interface(make_local_client_interface(1));
2832        engine.register_interface(make_interface(2, constants::MODE_FULL));
2833
2834        let dest = [0xCC; 16];
2835        let flags = PacketFlags {
2836            header_type: constants::HEADER_1,
2837            context_flag: constants::FLAG_UNSET,
2838            transport_type: constants::TRANSPORT_BROADCAST,
2839            destination_type: constants::DESTINATION_PLAIN,
2840            packet_type: constants::PACKET_TYPE_DATA,
2841        };
2842        let packet =
2843            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2844
2845        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2846        let actions = engine.handle_inbound(&packet.raw, InterfaceId(2), 1000.0, &mut rng);
2847
2848        // Should have ForwardPlainBroadcast to local clients (to_local=true)
2849        let forward = actions.iter().find(|a| {
2850            matches!(
2851                a,
2852                TransportAction::ForwardPlainBroadcast { to_local: true, .. }
2853            )
2854        });
2855        assert!(forward.is_some(), "Should forward to local clients");
2856    }
2857
2858    #[test]
2859    fn test_no_plain_broadcast_bridging_without_local_clients() {
2860        // Without local clients, no bridging should happen
2861        let mut engine = TransportEngine::new(make_config(false));
2862        engine.register_interface(make_interface(1, constants::MODE_FULL));
2863        engine.register_interface(make_interface(2, constants::MODE_FULL));
2864
2865        let dest = [0xDD; 16];
2866        let flags = PacketFlags {
2867            header_type: constants::HEADER_1,
2868            context_flag: constants::FLAG_UNSET,
2869            transport_type: constants::TRANSPORT_BROADCAST,
2870            destination_type: constants::DESTINATION_PLAIN,
2871            packet_type: constants::PACKET_TYPE_DATA,
2872        };
2873        let packet =
2874            RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test").unwrap();
2875
2876        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
2877        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2878
2879        // No ForwardPlainBroadcast should be emitted
2880        let has_forward = actions
2881            .iter()
2882            .any(|a| matches!(a, TransportAction::ForwardPlainBroadcast { .. }));
2883        assert!(!has_forward, "No bridging without local clients");
2884    }
2885
2886    #[test]
2887    fn test_announce_forwarded_to_local_clients() {
2888        use crate::announce::AnnounceData;
2889        use crate::destination::{destination_hash, name_hash};
2890
2891        let mut engine = TransportEngine::new(make_config(false));
2892        engine.register_interface(make_interface(1, constants::MODE_FULL));
2893        engine.register_interface(make_local_client_interface(2));
2894
2895        let identity =
2896            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x77; 32]));
2897        let dest_hash = destination_hash("test", &["fwd"], Some(identity.hash()));
2898        let name_h = name_hash("test", &["fwd"]);
2899        let random_hash = [0x42u8; 10];
2900
2901        let (announce_data, _) =
2902            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2903
2904        let flags = PacketFlags {
2905            header_type: constants::HEADER_1,
2906            context_flag: constants::FLAG_UNSET,
2907            transport_type: constants::TRANSPORT_BROADCAST,
2908            destination_type: constants::DESTINATION_SINGLE,
2909            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2910        };
2911        let packet = RawPacket::pack(
2912            flags,
2913            0,
2914            &dest_hash,
2915            None,
2916            constants::CONTEXT_NONE,
2917            &announce_data,
2918        )
2919        .unwrap();
2920
2921        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
2922        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2923
2924        // Should have ForwardToLocalClients since we have local clients
2925        let forward = actions
2926            .iter()
2927            .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
2928        assert!(
2929            forward.is_some(),
2930            "Should forward announce to local clients"
2931        );
2932
2933        // The exclude should be the receiving interface
2934        match forward.unwrap() {
2935            TransportAction::ForwardToLocalClients { exclude, .. } => {
2936                assert_eq!(*exclude, Some(InterfaceId(1)));
2937            }
2938            _ => unreachable!(),
2939        }
2940    }
2941
2942    #[test]
2943    fn test_no_announce_forward_without_local_clients() {
2944        use crate::announce::AnnounceData;
2945        use crate::destination::{destination_hash, name_hash};
2946
2947        let mut engine = TransportEngine::new(make_config(false));
2948        engine.register_interface(make_interface(1, constants::MODE_FULL));
2949
2950        let identity =
2951            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x88; 32]));
2952        let dest_hash = destination_hash("test", &["nofwd"], Some(identity.hash()));
2953        let name_h = name_hash("test", &["nofwd"]);
2954        let random_hash = [0x42u8; 10];
2955
2956        let (announce_data, _) =
2957            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
2958
2959        let flags = PacketFlags {
2960            header_type: constants::HEADER_1,
2961            context_flag: constants::FLAG_UNSET,
2962            transport_type: constants::TRANSPORT_BROADCAST,
2963            destination_type: constants::DESTINATION_SINGLE,
2964            packet_type: constants::PACKET_TYPE_ANNOUNCE,
2965        };
2966        let packet = RawPacket::pack(
2967            flags,
2968            0,
2969            &dest_hash,
2970            None,
2971            constants::CONTEXT_NONE,
2972            &announce_data,
2973        )
2974        .unwrap();
2975
2976        let mut rng = rns_crypto::FixedRng::new(&[0x22; 32]);
2977        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
2978
2979        // No ForwardToLocalClients should be emitted
2980        let has_forward = actions
2981            .iter()
2982            .any(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
2983        assert!(!has_forward, "No forward without local clients");
2984    }
2985
2986    #[test]
2987    fn test_local_client_exclude_from_forward() {
2988        use crate::announce::AnnounceData;
2989        use crate::destination::{destination_hash, name_hash};
2990
2991        let mut engine = TransportEngine::new(make_config(false));
2992        engine.register_interface(make_local_client_interface(1));
2993        engine.register_interface(make_local_client_interface(2));
2994
2995        let identity =
2996            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
2997        let dest_hash = destination_hash("test", &["excl"], Some(identity.hash()));
2998        let name_h = name_hash("test", &["excl"]);
2999        let random_hash = [0x42u8; 10];
3000
3001        let (announce_data, _) =
3002            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3003
3004        let flags = PacketFlags {
3005            header_type: constants::HEADER_1,
3006            context_flag: constants::FLAG_UNSET,
3007            transport_type: constants::TRANSPORT_BROADCAST,
3008            destination_type: constants::DESTINATION_SINGLE,
3009            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3010        };
3011        let packet = RawPacket::pack(
3012            flags,
3013            0,
3014            &dest_hash,
3015            None,
3016            constants::CONTEXT_NONE,
3017            &announce_data,
3018        )
3019        .unwrap();
3020
3021        let mut rng = rns_crypto::FixedRng::new(&[0x33; 32]);
3022        // Feed announce from local client 1
3023        let actions = engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3024
3025        // Should forward to local clients, excluding interface 1 (the sender)
3026        let forward = actions
3027            .iter()
3028            .find(|a| matches!(a, TransportAction::ForwardToLocalClients { .. }));
3029        assert!(forward.is_some());
3030        match forward.unwrap() {
3031            TransportAction::ForwardToLocalClients { exclude, .. } => {
3032                assert_eq!(*exclude, Some(InterfaceId(1)));
3033            }
3034            _ => unreachable!(),
3035        }
3036    }
3037
3038    // =========================================================================
3039    // Phase 7d: Tunnel tests
3040    // =========================================================================
3041
3042    fn make_tunnel_interface(id: u64) -> InterfaceInfo {
3043        InterfaceInfo {
3044            id: InterfaceId(id),
3045            name: String::from("tunnel_iface"),
3046            mode: constants::MODE_FULL,
3047            out_capable: true,
3048            in_capable: true,
3049            bitrate: None,
3050            airtime_profile: None,
3051            announce_rate_target: None,
3052            announce_rate_grace: 0,
3053            announce_rate_penalty: 0.0,
3054            announce_cap: constants::ANNOUNCE_CAP,
3055            is_local_client: false,
3056            wants_tunnel: true,
3057            tunnel_id: None,
3058            mtu: constants::MTU as u32,
3059            ingress_control: crate::transport::types::IngressControlConfig::disabled(),
3060            ia_freq: 0.0,
3061            started: 0.0,
3062        }
3063    }
3064
3065    #[test]
3066    fn test_handle_tunnel_new() {
3067        let mut engine = TransportEngine::new(make_config(true));
3068        engine.register_interface(make_tunnel_interface(1));
3069
3070        let tunnel_id = [0xAA; 32];
3071        let actions = engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3072
3073        // Should emit TunnelEstablished
3074        assert!(actions
3075            .iter()
3076            .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3077
3078        // Interface should now have tunnel_id set
3079        let info = engine.interface_info(&InterfaceId(1)).unwrap();
3080        assert_eq!(info.tunnel_id, Some(tunnel_id));
3081
3082        // Tunnel table should have the entry
3083        assert_eq!(engine.tunnel_table().len(), 1);
3084    }
3085
3086    #[test]
3087    fn test_announce_stores_tunnel_path() {
3088        use crate::announce::AnnounceData;
3089        use crate::destination::{destination_hash, name_hash};
3090
3091        let mut engine = TransportEngine::new(make_config(false));
3092        let mut iface = make_tunnel_interface(1);
3093        let tunnel_id = [0xBB; 32];
3094        iface.tunnel_id = Some(tunnel_id);
3095        engine.register_interface(iface);
3096
3097        // Create tunnel entry
3098        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3099
3100        // Create and send an announce
3101        let identity =
3102            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xCC; 32]));
3103        let dest_hash = destination_hash("test", &["tunnel"], Some(identity.hash()));
3104        let name_h = name_hash("test", &["tunnel"]);
3105        let random_hash = [0x42u8; 10];
3106
3107        let (announce_data, _) =
3108            AnnounceData::pack(&identity, &dest_hash, &name_h, &random_hash, None, None).unwrap();
3109
3110        let flags = PacketFlags {
3111            header_type: constants::HEADER_1,
3112            context_flag: constants::FLAG_UNSET,
3113            transport_type: constants::TRANSPORT_BROADCAST,
3114            destination_type: constants::DESTINATION_SINGLE,
3115            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3116        };
3117        let packet = RawPacket::pack(
3118            flags,
3119            0,
3120            &dest_hash,
3121            None,
3122            constants::CONTEXT_NONE,
3123            &announce_data,
3124        )
3125        .unwrap();
3126
3127        let mut rng = rns_crypto::FixedRng::new(&[0xDD; 32]);
3128        engine.handle_inbound(&packet.raw, InterfaceId(1), 1000.0, &mut rng);
3129
3130        // Path should be in path table
3131        assert!(engine.has_path(&dest_hash));
3132
3133        // Path should also be in tunnel table
3134        let tunnel = engine.tunnel_table().get(&tunnel_id).unwrap();
3135        assert_eq!(tunnel.paths.len(), 1);
3136        assert!(tunnel.paths.contains_key(&dest_hash));
3137    }
3138
3139    #[test]
3140    fn test_tunnel_reattach_restores_paths() {
3141        let mut engine = TransportEngine::new(make_config(true));
3142        engine.register_interface(make_tunnel_interface(1));
3143
3144        let tunnel_id = [0xCC; 32];
3145        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3146
3147        // Manually add a path to the tunnel
3148        let dest = [0xDD; 16];
3149        engine.tunnel_table.store_tunnel_path(
3150            &tunnel_id,
3151            dest,
3152            tunnel::TunnelPath {
3153                timestamp: 1000.0,
3154                received_from: [0xEE; 16],
3155                hops: 3,
3156                expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3157                random_blobs: Vec::new(),
3158                packet_hash: [0xFF; 32],
3159            },
3160            1000.0,
3161            constants::DESTINATION_TIMEOUT,
3162            usize::MAX,
3163        );
3164
3165        // Void the tunnel interface (disconnect)
3166        engine.void_tunnel_interface(&tunnel_id);
3167
3168        // Remove path from path table to simulate it expiring
3169        engine.path_table.remove(&dest);
3170        assert!(!engine.has_path(&dest));
3171
3172        // Reattach tunnel on new interface
3173        engine.register_interface(make_interface(2, constants::MODE_FULL));
3174        let actions = engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3175
3176        // Should restore the path
3177        assert!(engine.has_path(&dest));
3178        let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3179        assert_eq!(path.hops, 3);
3180        assert_eq!(path.receiving_interface, InterfaceId(2));
3181
3182        // Should emit TunnelEstablished
3183        assert!(actions
3184            .iter()
3185            .any(|a| matches!(a, TransportAction::TunnelEstablished { .. })));
3186    }
3187
3188    #[test]
3189    fn test_tunnel_reattach_does_not_overwrite_newer_path() {
3190        let mut engine = TransportEngine::new(make_config(true));
3191        engine.register_interface(make_tunnel_interface(1));
3192
3193        let tunnel_id = [0xCD; 32];
3194        let dest = [0xDE; 16];
3195        let older_blob = make_random_blob(100);
3196        let newer_blob = make_random_blob(200);
3197
3198        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3199        engine.tunnel_table.store_tunnel_path(
3200            &tunnel_id,
3201            dest,
3202            tunnel::TunnelPath {
3203                timestamp: 1000.0,
3204                received_from: [0xEE; 16],
3205                hops: 2,
3206                expires: 1000.0 + constants::DESTINATION_TIMEOUT,
3207                random_blobs: vec![older_blob],
3208                packet_hash: [0x11; 32],
3209            },
3210            1000.0,
3211            constants::DESTINATION_TIMEOUT,
3212            usize::MAX,
3213        );
3214        engine.void_tunnel_interface(&tunnel_id);
3215
3216        engine.path_table.insert(
3217            dest,
3218            PathSet::from_single(
3219                PathEntry {
3220                    timestamp: 1500.0,
3221                    next_hop: [0xAB; 16],
3222                    hops: 3,
3223                    expires: 1500.0 + constants::DESTINATION_TIMEOUT,
3224                    random_blobs: vec![newer_blob],
3225                    receiving_interface: InterfaceId(3),
3226                    packet_hash: [0x22; 32],
3227                    announce_raw: None,
3228                },
3229                1,
3230            ),
3231        );
3232
3233        engine.register_interface(make_interface(2, constants::MODE_FULL));
3234        engine.handle_tunnel(tunnel_id, InterfaceId(2), 2000.0);
3235
3236        let path = engine.path_table.get(&dest).unwrap().primary().unwrap();
3237        assert_eq!(path.next_hop, [0xAB; 16]);
3238        assert_eq!(path.hops, 3);
3239        assert_eq!(path.receiving_interface, InterfaceId(3));
3240        assert_eq!(path.random_blobs, vec![newer_blob]);
3241    }
3242
3243    #[test]
3244    fn test_void_tunnel_interface() {
3245        let mut engine = TransportEngine::new(make_config(true));
3246        engine.register_interface(make_tunnel_interface(1));
3247
3248        let tunnel_id = [0xDD; 32];
3249        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3250
3251        // Verify tunnel has interface
3252        assert_eq!(
3253            engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3254            Some(InterfaceId(1))
3255        );
3256
3257        engine.void_tunnel_interface(&tunnel_id);
3258
3259        // Interface voided, but tunnel still exists
3260        assert_eq!(engine.tunnel_table().len(), 1);
3261        assert_eq!(
3262            engine.tunnel_table().get(&tunnel_id).unwrap().interface,
3263            None
3264        );
3265    }
3266
3267    #[test]
3268    fn test_tick_culls_tunnels() {
3269        let mut engine = TransportEngine::new(make_config(true));
3270        engine.register_interface(make_tunnel_interface(1));
3271
3272        let tunnel_id = [0xEE; 32];
3273        engine.handle_tunnel(tunnel_id, InterfaceId(1), 1000.0);
3274        assert_eq!(engine.tunnel_table().len(), 1);
3275
3276        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3277
3278        // Tick past DESTINATION_TIMEOUT + TABLES_CULL_INTERVAL
3279        engine.tick(
3280            1000.0 + constants::DESTINATION_TIMEOUT + constants::TABLES_CULL_INTERVAL + 1.0,
3281            &mut rng,
3282        );
3283
3284        assert_eq!(engine.tunnel_table().len(), 0);
3285    }
3286
3287    #[test]
3288    fn test_synthesize_tunnel() {
3289        let mut engine = TransportEngine::new(make_config(true));
3290        engine.register_interface(make_tunnel_interface(1));
3291
3292        let identity =
3293            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0xFF; 32]));
3294        let mut rng = rns_crypto::FixedRng::new(&[0x11; 32]);
3295
3296        let actions = engine.synthesize_tunnel(&identity, InterfaceId(1), &mut rng);
3297
3298        // Should produce a TunnelSynthesize action
3299        assert_eq!(actions.len(), 1);
3300        match &actions[0] {
3301            TransportAction::TunnelSynthesize {
3302                interface,
3303                data,
3304                dest_hash,
3305            } => {
3306                assert_eq!(*interface, InterfaceId(1));
3307                assert_eq!(data.len(), tunnel::TUNNEL_SYNTH_LENGTH);
3308                // dest_hash should be the tunnel.synthesize plain destination
3309                let expected_dest = crate::destination::destination_hash(
3310                    "rnstransport",
3311                    &["tunnel", "synthesize"],
3312                    None,
3313                );
3314                assert_eq!(*dest_hash, expected_dest);
3315            }
3316            _ => panic!("Expected TunnelSynthesize"),
3317        }
3318    }
3319
3320    // =========================================================================
3321    // DISCOVER_PATHS_FOR tests
3322    // =========================================================================
3323
3324    fn make_path_request_data(dest_hash: &[u8; 16], tag: &[u8]) -> Vec<u8> {
3325        let mut data = Vec::new();
3326        data.extend_from_slice(dest_hash);
3327        data.extend_from_slice(tag);
3328        data
3329    }
3330
3331    #[test]
3332    fn test_path_request_forwarded_on_ap() {
3333        let mut engine = TransportEngine::new(make_config(true));
3334        engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3335        engine.register_interface(make_interface(2, constants::MODE_FULL));
3336
3337        let dest = [0xD1; 16];
3338        let tag = [0x01; 16];
3339        let data = make_path_request_data(&dest, &tag);
3340
3341        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3342
3343        // Should forward the path request on interface 2 (the other OUT interface)
3344        assert_eq!(actions.len(), 1);
3345        match &actions[0] {
3346            TransportAction::SendOnInterface { interface, .. } => {
3347                assert_eq!(*interface, InterfaceId(2));
3348            }
3349            _ => panic!("Expected SendOnInterface for forwarded path request"),
3350        }
3351        // Should have stored a discovery path request
3352        assert!(engine.discovery_path_requests.contains_key(&dest));
3353    }
3354
3355    #[test]
3356    fn test_path_request_not_forwarded_on_full() {
3357        let mut engine = TransportEngine::new(make_config(true));
3358        engine.register_interface(make_interface(1, constants::MODE_FULL));
3359        engine.register_interface(make_interface(2, constants::MODE_FULL));
3360
3361        let dest = [0xD2; 16];
3362        let tag = [0x02; 16];
3363        let data = make_path_request_data(&dest, &tag);
3364
3365        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3366
3367        // MODE_FULL is not in DISCOVER_PATHS_FOR, so no forwarding
3368        assert!(actions.is_empty());
3369        assert!(!engine.discovery_path_requests.contains_key(&dest));
3370    }
3371
3372    #[test]
3373    fn test_duplicate_discovery_path_request_is_suppressed() {
3374        let mut engine = TransportEngine::new(make_config(true));
3375        engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3376        engine.register_interface(make_interface(2, constants::MODE_FULL));
3377
3378        let dest = [0xD7; 16];
3379        let tag = [0x07; 16];
3380        let data = make_path_request_data(&dest, &tag);
3381
3382        let first = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3383        let second = engine.handle_path_request(&data, InterfaceId(1), 1001.0);
3384
3385        assert_eq!(first.len(), 1);
3386        assert!(
3387            second.is_empty(),
3388            "duplicate discovery request should be dropped"
3389        );
3390        assert_eq!(engine.discovery_pr_tags_count(), 1);
3391    }
3392
3393    #[test]
3394    fn test_discovery_pr_tags_fifo_eviction() {
3395        let mut config = make_config(true);
3396        config.max_discovery_pr_tags = 2;
3397        let mut engine = TransportEngine::new(config);
3398
3399        let dest1 = [0xA1; 16];
3400        let dest2 = [0xA2; 16];
3401        let dest3 = [0xA3; 16];
3402        let tag1 = [0x01; 16];
3403        let tag2 = [0x02; 16];
3404        let tag3 = [0x03; 16];
3405
3406        engine.handle_path_request(
3407            &make_path_request_data(&dest1, &tag1),
3408            InterfaceId(1),
3409            1000.0,
3410        );
3411        engine.handle_path_request(
3412            &make_path_request_data(&dest2, &tag2),
3413            InterfaceId(1),
3414            1001.0,
3415        );
3416        assert_eq!(engine.discovery_pr_tags_count(), 2);
3417
3418        let unique1 = make_unique_tag(dest1, &tag1);
3419        let unique2 = make_unique_tag(dest2, &tag2);
3420        assert!(engine.has_discovery_pr_tag(&unique1));
3421        assert!(engine.has_discovery_pr_tag(&unique2));
3422
3423        engine.handle_path_request(
3424            &make_path_request_data(&dest3, &tag3),
3425            InterfaceId(1),
3426            1002.0,
3427        );
3428        assert_eq!(engine.discovery_pr_tags_count(), 2);
3429        assert!(!engine.has_discovery_pr_tag(&unique1));
3430        assert!(engine.has_discovery_pr_tag(&unique2));
3431
3432        engine.handle_path_request(
3433            &make_path_request_data(&dest1, &tag1),
3434            InterfaceId(1),
3435            1003.0,
3436        );
3437        assert_eq!(engine.discovery_pr_tags_count(), 2);
3438        assert!(engine.has_discovery_pr_tag(&unique1));
3439    }
3440
3441    #[test]
3442    fn test_path_destination_cap_evicts_oldest_and_clears_state() {
3443        let mut config = make_config(false);
3444        config.max_path_destinations = 2;
3445        let mut engine = TransportEngine::new(config);
3446        engine.register_interface(make_interface(1, constants::MODE_FULL));
3447
3448        let dest1 = [0xB1; 16];
3449        let dest2 = [0xB2; 16];
3450        let dest3 = [0xB3; 16];
3451
3452        engine.upsert_path_destination(
3453            dest1,
3454            make_path_entry(1000.0, 1, InterfaceId(1), [0x11; 16]),
3455            1000.0,
3456        );
3457        engine.upsert_path_destination(
3458            dest2,
3459            make_path_entry(1001.0, 1, InterfaceId(1), [0x22; 16]),
3460            1001.0,
3461        );
3462        engine
3463            .path_states
3464            .insert(dest1, constants::STATE_UNRESPONSIVE);
3465
3466        engine.upsert_path_destination(
3467            dest3,
3468            make_path_entry(1002.0, 1, InterfaceId(1), [0x33; 16]),
3469            1002.0,
3470        );
3471
3472        assert_eq!(engine.path_table_count(), 2);
3473        assert!(!engine.has_path(&dest1));
3474        assert!(engine.has_path(&dest2));
3475        assert!(engine.has_path(&dest3));
3476        assert!(!engine.path_states.contains_key(&dest1));
3477        assert_eq!(engine.path_destination_cap_evict_count(), 1);
3478    }
3479
3480    #[test]
3481    fn test_existing_path_destination_update_does_not_trigger_cap_eviction() {
3482        let mut config = make_config(false);
3483        config.max_path_destinations = 2;
3484        config.max_paths_per_destination = 2;
3485        let mut engine = TransportEngine::new(config);
3486        engine.register_interface(make_interface(1, constants::MODE_FULL));
3487
3488        let dest1 = [0xC1; 16];
3489        let dest2 = [0xC2; 16];
3490
3491        engine.upsert_path_destination(
3492            dest1,
3493            make_path_entry(1000.0, 2, InterfaceId(1), [0x11; 16]),
3494            1000.0,
3495        );
3496        engine.upsert_path_destination(
3497            dest2,
3498            make_path_entry(1001.0, 2, InterfaceId(1), [0x22; 16]),
3499            1001.0,
3500        );
3501
3502        engine.upsert_path_destination(
3503            dest2,
3504            make_path_entry(1002.0, 1, InterfaceId(1), [0x23; 16]),
3505            1002.0,
3506        );
3507
3508        assert_eq!(engine.path_table_count(), 2);
3509        assert!(engine.has_path(&dest1));
3510        assert!(engine.has_path(&dest2));
3511    }
3512
3513    #[test]
3514    fn test_roaming_loop_prevention() {
3515        let mut engine = TransportEngine::new(make_config(true));
3516        engine.register_interface(make_interface(1, constants::MODE_ROAMING));
3517
3518        let dest = [0xD3; 16];
3519        // Path is known and routes through the same interface (1)
3520        engine.path_table.insert(
3521            dest,
3522            PathSet::from_single(
3523                PathEntry {
3524                    timestamp: 900.0,
3525                    next_hop: [0xAA; 16],
3526                    hops: 2,
3527                    expires: 9999.0,
3528                    random_blobs: Vec::new(),
3529                    receiving_interface: InterfaceId(1),
3530                    packet_hash: [0; 32],
3531                    announce_raw: None,
3532                },
3533                1,
3534            ),
3535        );
3536
3537        let tag = [0x03; 16];
3538        let data = make_path_request_data(&dest, &tag);
3539
3540        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3541
3542        // ROAMING interface, path next-hop on same interface → loop prevention, no action
3543        assert!(actions.is_empty());
3544        assert!(!engine.announce_table.contains_key(&dest));
3545    }
3546
3547    /// Build a minimal HEADER_1 announce raw packet for testing.
3548    fn make_announce_raw(dest_hash: &[u8; 16], payload: &[u8]) -> Vec<u8> {
3549        // HEADER_1: [flags:1][hops:1][dest:16][context:1][data:*]
3550        // flags: HEADER_1(0) << 6 | context_flag(0) << 5 | TRANSPORT_BROADCAST(0) << 4 | SINGLE(0) << 2 | ANNOUNCE(1)
3551        let flags: u8 = 0x01; // HEADER_1, no context, broadcast, single, announce
3552        let mut raw = Vec::new();
3553        raw.push(flags);
3554        raw.push(0x02); // hops
3555        raw.extend_from_slice(dest_hash);
3556        raw.push(constants::CONTEXT_NONE);
3557        raw.extend_from_slice(payload);
3558        raw
3559    }
3560
3561    #[test]
3562    fn test_path_request_populates_announce_entry_from_raw() {
3563        let mut engine = TransportEngine::new(make_config(true));
3564        engine.register_interface(make_interface(1, constants::MODE_FULL));
3565        engine.register_interface(make_interface(2, constants::MODE_FULL));
3566
3567        let dest = [0xD5; 16];
3568        let payload = vec![0xAB; 32]; // simulated announce data (pubkey, sig, etc.)
3569        let announce_raw = make_announce_raw(&dest, &payload);
3570
3571        engine.path_table.insert(
3572            dest,
3573            PathSet::from_single(
3574                PathEntry {
3575                    timestamp: 900.0,
3576                    next_hop: [0xBB; 16],
3577                    hops: 2,
3578                    expires: 9999.0,
3579                    random_blobs: Vec::new(),
3580                    receiving_interface: InterfaceId(2),
3581                    packet_hash: [0; 32],
3582                    announce_raw: Some(announce_raw.clone()),
3583                },
3584                1,
3585            ),
3586        );
3587
3588        let tag = [0x05; 16];
3589        let data = make_path_request_data(&dest, &tag);
3590        let _actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3591
3592        // The announce table should now have an entry with populated packet_raw/packet_data
3593        let entry = engine
3594            .announce_table
3595            .get(&dest)
3596            .expect("announce entry must exist");
3597        assert_eq!(entry.packet_raw, announce_raw);
3598        assert_eq!(entry.packet_data, payload);
3599        assert!(entry.block_rebroadcasts);
3600    }
3601
3602    #[test]
3603    fn test_path_request_skips_when_no_announce_raw() {
3604        let mut engine = TransportEngine::new(make_config(true));
3605        engine.register_interface(make_interface(1, constants::MODE_FULL));
3606        engine.register_interface(make_interface(2, constants::MODE_FULL));
3607
3608        let dest = [0xD6; 16];
3609
3610        engine.path_table.insert(
3611            dest,
3612            PathSet::from_single(
3613                PathEntry {
3614                    timestamp: 900.0,
3615                    next_hop: [0xCC; 16],
3616                    hops: 1,
3617                    expires: 9999.0,
3618                    random_blobs: Vec::new(),
3619                    receiving_interface: InterfaceId(2),
3620                    packet_hash: [0; 32],
3621                    announce_raw: None, // no raw data available
3622                },
3623                1,
3624            ),
3625        );
3626
3627        let tag = [0x06; 16];
3628        let data = make_path_request_data(&dest, &tag);
3629        let actions = engine.handle_path_request(&data, InterfaceId(1), 1000.0);
3630
3631        // Should NOT create an announce entry without raw data
3632        assert!(actions.is_empty());
3633        assert!(!engine.announce_table.contains_key(&dest));
3634    }
3635
3636    #[test]
3637    fn test_discovery_request_consumed_on_announce() {
3638        let mut engine = TransportEngine::new(make_config(true));
3639        engine.register_interface(make_interface(1, constants::MODE_ACCESS_POINT));
3640
3641        let dest = [0xD4; 16];
3642
3643        // Simulate a waiting discovery request
3644        engine.discovery_path_requests.insert(
3645            dest,
3646            DiscoveryPathRequest {
3647                timestamp: 900.0,
3648                requesting_interface: InterfaceId(1),
3649            },
3650        );
3651
3652        // Consume it
3653        let iface = engine.discovery_path_requests_waiting(&dest);
3654        assert_eq!(iface, Some(InterfaceId(1)));
3655
3656        // Should be gone now
3657        assert!(!engine.discovery_path_requests.contains_key(&dest));
3658        assert_eq!(engine.discovery_path_requests_waiting(&dest), None);
3659    }
3660
3661    #[test]
3662    fn test_pending_path_request_announce_bypasses_ingress_control() {
3663        let mut engine = TransportEngine::new(make_config(true));
3664        let mut inbound = make_interface(1, constants::MODE_FULL);
3665        inbound.ingress_control = crate::transport::types::IngressControlConfig::enabled();
3666        inbound.ia_freq = constants::IC_BURST_FREQ + 1.0;
3667        inbound.started = 0.0;
3668        engine.register_interface(inbound);
3669        engine.register_interface(make_interface(2, constants::MODE_ACCESS_POINT));
3670
3671        let identity =
3672            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3673        let dest_hash = crate::destination::destination_hash(
3674            "ingress",
3675            &["path-request"],
3676            Some(identity.hash()),
3677        );
3678        let name_hash = crate::destination::name_hash("ingress", &["path-request"]);
3679        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3680
3681        engine.discovery_path_requests.insert(
3682            dest_hash,
3683            DiscoveryPathRequest {
3684                timestamp: 999.0,
3685                requesting_interface: InterfaceId(2),
3686            },
3687        );
3688
3689        let mut rng = rns_crypto::FixedRng::new(&[0x88; 32]);
3690        let actions = engine.handle_inbound(&announce_raw, InterfaceId(1), 1000.0, &mut rng);
3691
3692        assert_eq!(engine.held_announce_count(&InterfaceId(1)), 0);
3693        assert!(engine.has_path(&dest_hash));
3694        assert!(!engine.discovery_path_requests.contains_key(&dest_hash));
3695        assert!(actions.iter().any(|a| {
3696            matches!(
3697                a,
3698                TransportAction::AnnounceReceived {
3699                    destination_hash,
3700                    receiving_interface: InterfaceId(1),
3701                    ..
3702                } if *destination_hash == dest_hash
3703            )
3704        }));
3705
3706        let entry = engine
3707            .announce_table
3708            .get(&dest_hash)
3709            .expect("path response announce should be queued");
3710        assert!(entry.block_rebroadcasts);
3711        assert_eq!(entry.attached_interface, Some(InterfaceId(2)));
3712    }
3713
3714    // =========================================================================
3715    // Issue #4: Shared instance client 1-hop transport injection
3716    // =========================================================================
3717
3718    /// Helper: build a valid announce packet for use in issue #4 tests.
3719    fn build_announce_for_issue4(dest_hash: &[u8; 16], name_hash: &[u8; 10]) -> Vec<u8> {
3720        let identity =
3721            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3722        let random_hash = [0x42u8; 10];
3723        let (announce_data, _) = crate::announce::AnnounceData::pack(
3724            &identity,
3725            dest_hash,
3726            name_hash,
3727            &random_hash,
3728            None,
3729            None,
3730        )
3731        .unwrap();
3732        let flags = PacketFlags {
3733            header_type: constants::HEADER_1,
3734            context_flag: constants::FLAG_UNSET,
3735            transport_type: constants::TRANSPORT_BROADCAST,
3736            destination_type: constants::DESTINATION_SINGLE,
3737            packet_type: constants::PACKET_TYPE_ANNOUNCE,
3738        };
3739        RawPacket::pack(
3740            flags,
3741            0,
3742            dest_hash,
3743            None,
3744            constants::CONTEXT_NONE,
3745            &announce_data,
3746        )
3747        .unwrap()
3748        .raw
3749    }
3750
3751    #[test]
3752    fn test_issue4_local_client_single_data_to_1hop_rewrites_on_outbound() {
3753        // Shared clients learn remote paths via their local shared-instance
3754        // interface and must inject transport headers on outbound when the
3755        // destination is exactly 1 hop away behind the daemon.
3756
3757        let mut engine = TransportEngine::new(make_config(false));
3758        engine.register_interface(make_local_client_interface(1));
3759
3760        let identity =
3761            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3762        let dest_hash =
3763            crate::destination::destination_hash("issue4", &["test"], Some(identity.hash()));
3764        let name_hash = crate::destination::name_hash("issue4", &["test"]);
3765        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3766
3767        // Model the announce as already forwarded by the shared daemon to
3768        // the local client. The raw hop count is 1 so that after the local
3769        // client hop compensation the learned path remains 1 hop away.
3770        let mut announce_packet = RawPacket::unpack(&announce_raw).unwrap();
3771        announce_packet.raw[1] = 1;
3772        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3773        engine.handle_inbound(&announce_packet.raw, InterfaceId(1), 1000.0, &mut rng);
3774        assert!(engine.has_path(&dest_hash));
3775        assert_eq!(engine.hops_to(&dest_hash), Some(1));
3776
3777        // Build DATA from the shared client to the 1-hop destination.
3778        let data_flags = PacketFlags {
3779            header_type: constants::HEADER_1,
3780            context_flag: constants::FLAG_UNSET,
3781            transport_type: constants::TRANSPORT_BROADCAST,
3782            destination_type: constants::DESTINATION_SINGLE,
3783            packet_type: constants::PACKET_TYPE_DATA,
3784        };
3785        let data_packet = RawPacket::pack(
3786            data_flags,
3787            0,
3788            &dest_hash,
3789            None,
3790            constants::CONTEXT_NONE,
3791            b"hello",
3792        )
3793        .unwrap();
3794
3795        let actions =
3796            engine.handle_outbound(&data_packet, constants::DESTINATION_SINGLE, None, 1001.0);
3797
3798        let send = actions.iter().find_map(|a| match a {
3799            TransportAction::SendOnInterface { interface, raw } => Some((interface, raw)),
3800            _ => None,
3801        });
3802        let (interface, raw) = send.expect("shared client should emit a transport-injected packet");
3803        assert_eq!(*interface, InterfaceId(1));
3804        let flags = PacketFlags::unpack(raw[0]);
3805        assert_eq!(flags.header_type, constants::HEADER_2);
3806        assert_eq!(flags.transport_type, constants::TRANSPORT_TRANSPORT);
3807    }
3808
3809    #[test]
3810    fn test_issue4_external_data_to_1hop_via_transport_works() {
3811        // Control test: when a DATA packet arrives from an external interface
3812        // with HEADER_2 and the daemon's transport_id, the daemon correctly
3813        // forwards it via step 5.  This proves the multi-hop path works;
3814        // it's only the 1-hop shared-client case that's broken.
3815
3816        let daemon_id = [0x42; 16];
3817        let mut engine = TransportEngine::new(TransportConfig {
3818            transport_enabled: true,
3819            identity_hash: Some(daemon_id),
3820            prefer_shorter_path: false,
3821            max_paths_per_destination: 1,
3822            packet_hashlist_max_entries: constants::HASHLIST_MAXSIZE,
3823            max_discovery_pr_tags: constants::MAX_PR_TAGS,
3824            max_path_destinations: usize::MAX,
3825            max_tunnel_destinations_total: usize::MAX,
3826            destination_timeout_secs: constants::DESTINATION_TIMEOUT,
3827            announce_table_ttl_secs: constants::ANNOUNCE_TABLE_TTL,
3828            announce_table_max_bytes: constants::ANNOUNCE_TABLE_MAX_BYTES,
3829            announce_sig_cache_enabled: true,
3830            announce_sig_cache_max_entries: constants::ANNOUNCE_SIG_CACHE_MAXSIZE,
3831            announce_sig_cache_ttl_secs: constants::ANNOUNCE_SIG_CACHE_TTL,
3832            announce_queue_max_entries: 256,
3833            announce_queue_max_interfaces: 1024,
3834        });
3835        engine.register_interface(make_interface(1, constants::MODE_FULL)); // inbound
3836        engine.register_interface(make_interface(2, constants::MODE_FULL)); // outbound to Bob
3837
3838        let identity =
3839            rns_crypto::identity::Identity::new(&mut rns_crypto::FixedRng::new(&[0x99; 32]));
3840        let dest_hash =
3841            crate::destination::destination_hash("issue4", &["ctrl"], Some(identity.hash()));
3842        let name_hash = crate::destination::name_hash("issue4", &["ctrl"]);
3843        let announce_raw = build_announce_for_issue4(&dest_hash, &name_hash);
3844
3845        // Feed announce from interface 2 (Bob's side), hops=0 → stored as hops=1
3846        let mut rng = rns_crypto::FixedRng::new(&[0; 32]);
3847        engine.handle_inbound(&announce_raw, InterfaceId(2), 1000.0, &mut rng);
3848        assert_eq!(engine.hops_to(&dest_hash), Some(1));
3849
3850        // Now send a HEADER_2 transport packet addressed to the daemon
3851        // (simulating what Alice would send in a multi-hop scenario)
3852        let h2_flags = PacketFlags {
3853            header_type: constants::HEADER_2,
3854            context_flag: constants::FLAG_UNSET,
3855            transport_type: constants::TRANSPORT_TRANSPORT,
3856            destination_type: constants::DESTINATION_SINGLE,
3857            packet_type: constants::PACKET_TYPE_DATA,
3858        };
3859        // Build HEADER_2 manually: [flags, hops, transport_id(16), dest_hash(16), context, data...]
3860        let mut h2_raw = Vec::new();
3861        h2_raw.push(h2_flags.pack());
3862        h2_raw.push(0); // hops
3863        h2_raw.extend_from_slice(&daemon_id); // transport_id = daemon
3864        h2_raw.extend_from_slice(&dest_hash);
3865        h2_raw.push(constants::CONTEXT_NONE);
3866        h2_raw.extend_from_slice(b"hello via transport");
3867
3868        let mut rng2 = rns_crypto::FixedRng::new(&[0x22; 32]);
3869        let actions = engine.handle_inbound(&h2_raw, InterfaceId(1), 1001.0, &mut rng2);
3870
3871        // This SHOULD forward via step 5 (transport forwarding)
3872        let has_send = actions.iter().any(|a| {
3873            matches!(
3874                a,
3875                TransportAction::SendOnInterface { interface, .. } if *interface == InterfaceId(2)
3876            )
3877        });
3878        assert!(
3879            has_send,
3880            "HEADER_2 transport packet should be forwarded (control test)"
3881        );
3882    }
3883}