Skip to main content

rns_net/common/
link_manager.rs

1//! Link manager: wires rns-core LinkEngine + Channel + Resource into the driver.
2//!
3//! Manages multiple concurrent links, link destination registration,
4//! request/response handling, resource transfers, and full lifecycle
5//! (handshake → active → teardown).
6//!
7//! Python reference: Link.py, RequestReceipt.py, Resource.py
8
9use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23/// Resource acceptance strategy.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26    /// Reject all incoming resources.
27    AcceptNone,
28    /// Accept all incoming resources automatically.
29    AcceptAll,
30    /// Query the application callback for each resource.
31    AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35    fn default() -> Self {
36        ResourceStrategy::AcceptNone
37    }
38}
39
40/// A managed link wrapping LinkEngine + optional Channel + resources.
41struct ManagedLink {
42    engine: LinkEngine,
43    channel: Option<Channel>,
44    pending_channel_packets: HashMap<[u8; 32], Sequence>,
45    channel_send_ok: u64,
46    channel_send_not_ready: u64,
47    channel_send_too_big: u64,
48    channel_send_other_error: u64,
49    channel_messages_received: u64,
50    channel_proofs_sent: u64,
51    channel_proofs_received: u64,
52    /// Destination hash this link belongs to.
53    dest_hash: [u8; 16],
54    /// Remote identity (hash, public_key) once identified.
55    remote_identity: Option<([u8; 16], [u8; 64])>,
56    /// Destination's Ed25519 signing public key (for initiator to verify LRPROOF).
57    dest_sig_pub_bytes: Option<[u8; 32]>,
58    /// Active incoming resource transfers.
59    incoming_resources: Vec<ResourceReceiver>,
60    /// Active outgoing resource transfers.
61    outgoing_resources: Vec<ResourceSender>,
62    /// Logical incoming split transfers, keyed by original resource hash.
63    incoming_splits: HashMap<[u8; 32], IncomingSplitTransfer>,
64    /// Logical outgoing split transfers, keyed by original resource hash.
65    outgoing_splits: HashMap<[u8; 32], OutgoingSplitTransfer>,
66    /// Resource acceptance strategy.
67    resource_strategy: ResourceStrategy,
68    /// Interface this link's packets should be sent on when known.
69    route_interface: Option<rns_core::transport::types::InterfaceId>,
70    /// Next-hop transport ID seen on inbound HEADER_2 link traffic.
71    ///
72    /// When present, outbound link packets can be rewritten to HEADER_2 using
73    /// this transport ID to preserve multi-hop routing.
74    route_transport_id: Option<[u8; 16]>,
75}
76
77struct IncomingSplitTransfer {
78    total_segments: u64,
79    completed_segments: u64,
80    current_segment_index: u64,
81    current_received_parts: usize,
82    current_total_parts: usize,
83    data: Vec<u8>,
84    metadata: Option<Vec<u8>>,
85    is_response: bool,
86}
87
88struct OutgoingSplitTransfer {
89    total_segments: u64,
90    completed_segments: u64,
91    current_segment_index: u64,
92    current_sent_parts: usize,
93    current_total_parts: usize,
94}
95
96/// A registered link destination that can accept incoming LINKREQUEST.
97struct LinkDestination {
98    sig_prv: Ed25519PrivateKey,
99    sig_pub_bytes: [u8; 32],
100    resource_strategy: ResourceStrategy,
101}
102
103/// Response produced by an application request handler.
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum RequestResponse {
106    /// Send the response as the normal request response value.
107    Bytes(Vec<u8>),
108    /// Send the response as a resource response with optional metadata.
109    Resource {
110        data: Vec<u8>,
111        metadata: Option<Vec<u8>>,
112        auto_compress: bool,
113    },
114}
115
116impl From<Vec<u8>> for RequestResponse {
117    fn from(data: Vec<u8>) -> Self {
118        RequestResponse::Bytes(data)
119    }
120}
121
122/// A registered request handler for a path.
123struct RequestHandlerEntry {
124    /// The path this handler serves (e.g. "/status").
125    path: String,
126    /// The truncated hash of the path (first 16 bytes of SHA-256).
127    path_hash: [u8; 16],
128    /// Access control: None means allow all, Some(list) means allow only listed identities.
129    allowed_list: Option<Vec<[u8; 16]>>,
130    /// Handler function: (link_id, path, request_id, data, remote_identity) -> Option<response>.
131    handler: Box<
132        dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
133            + Send,
134    >,
135}
136
137/// Actions produced by LinkManager for the driver to dispatch.
138#[derive(Debug)]
139pub enum LinkManagerAction {
140    /// Send a packet via the transport engine outbound path.
141    SendPacket {
142        raw: Vec<u8>,
143        dest_type: u8,
144        attached_interface: Option<rns_core::transport::types::InterfaceId>,
145    },
146    /// Link established — notify callbacks.
147    LinkEstablished {
148        link_id: LinkId,
149        dest_hash: [u8; 16],
150        rtt: f64,
151        is_initiator: bool,
152    },
153    /// Link closed — notify callbacks.
154    LinkClosed {
155        link_id: LinkId,
156        reason: Option<TeardownReason>,
157    },
158    /// Remote peer identified — notify callbacks.
159    RemoteIdentified {
160        link_id: LinkId,
161        identity_hash: [u8; 16],
162        public_key: [u8; 64],
163    },
164    /// Register a link_id as local destination in transport (for receiving link data).
165    RegisterLinkDest { link_id: LinkId },
166    /// Deregister a link_id from transport local destinations.
167    DeregisterLinkDest { link_id: LinkId },
168    /// A management request that needs to be handled by the driver.
169    /// The driver has access to engine state needed to build the response.
170    ManagementRequest {
171        link_id: LinkId,
172        path_hash: [u8; 16],
173        /// The request data (msgpack-encoded Value from the request array).
174        data: Vec<u8>,
175        /// The request_id (truncated hash of the packed request).
176        request_id: [u8; 16],
177        remote_identity: Option<([u8; 16], [u8; 64])>,
178    },
179    /// Resource data fully received and assembled.
180    ResourceReceived {
181        link_id: LinkId,
182        data: Vec<u8>,
183        metadata: Option<Vec<u8>>,
184    },
185    /// Resource transfer completed (proof validated on sender side).
186    ResourceCompleted { link_id: LinkId },
187    /// Resource transfer failed.
188    ResourceFailed { link_id: LinkId, error: String },
189    /// Resource transfer progress update.
190    ResourceProgress {
191        link_id: LinkId,
192        received: usize,
193        total: usize,
194    },
195    /// Query application whether to accept an incoming resource (for AcceptApp strategy).
196    ResourceAcceptQuery {
197        link_id: LinkId,
198        resource_hash: Vec<u8>,
199        transfer_size: u64,
200        has_metadata: bool,
201    },
202    /// Channel message received on a link.
203    ChannelMessageReceived {
204        link_id: LinkId,
205        msgtype: u16,
206        payload: Vec<u8>,
207    },
208    /// Generic link data received (CONTEXT_NONE).
209    LinkDataReceived {
210        link_id: LinkId,
211        context: u8,
212        data: Vec<u8>,
213    },
214    /// Response received on a link.
215    ResponseReceived {
216        link_id: LinkId,
217        request_id: [u8; 16],
218        data: Vec<u8>,
219        metadata: Option<Vec<u8>>,
220    },
221    /// A link request was received (for hook notification).
222    LinkRequestReceived {
223        link_id: LinkId,
224        receiving_interface: rns_core::transport::types::InterfaceId,
225    },
226}
227
228/// Manages multiple links, link destinations, and request/response.
229pub struct LinkManager {
230    links: HashMap<LinkId, ManagedLink>,
231    link_destinations: HashMap<[u8; 16], LinkDestination>,
232    request_handlers: Vec<RequestHandlerEntry>,
233    /// Path hashes that should be handled externally (by the driver) rather than
234    /// by registered handler closures. Used for management destinations.
235    management_paths: Vec<[u8; 16]>,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct LinkRouteHint {
240    pub interface: rns_core::transport::types::InterfaceId,
241    pub transport_id: Option<[u8; 16]>,
242}
243
244impl LinkManager {
245    fn resource_sdu_for_link(link: &ManagedLink) -> usize {
246        // Python parity: Resource.sdu = link.mtu - HEADER_MAXSIZE - IFAC_MIN_SIZE
247        // when MTU signalling is available on the link.
248        let mtu = link.engine.mtu() as usize;
249        let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
250        if derived > 0 {
251            derived
252        } else {
253            constants::RESOURCE_SDU
254        }
255    }
256
257    fn split_progress_parts(
258        segment_index: u64,
259        total_segments: u64,
260        current_done: usize,
261        current_total: usize,
262        sdu: usize,
263    ) -> (usize, usize) {
264        let max_parts_per_segment = constants::RESOURCE_MAX_EFFICIENT_SIZE.div_ceil(sdu.max(1));
265        let total = (total_segments as usize).saturating_mul(max_parts_per_segment);
266        let completed_segments = segment_index.saturating_sub(1) as usize;
267        let completed = completed_segments.saturating_mul(max_parts_per_segment);
268        let current = if current_total == 0 {
269            0
270        } else if current_total < max_parts_per_segment {
271            let scaled =
272                (current_done as f64) * (max_parts_per_segment as f64 / current_total as f64);
273            scaled.floor() as usize
274        } else {
275            current_done
276        };
277        (completed.saturating_add(current).min(total), total)
278    }
279
280    fn resource_hash_key(hash: &[u8]) -> Option<[u8; 32]> {
281        let mut key = [0u8; 32];
282        if hash.len() != key.len() {
283            return None;
284        }
285        key.copy_from_slice(hash);
286        Some(key)
287    }
288
289    fn incoming_split_progress(split: &IncomingSplitTransfer, sdu: usize) -> (usize, usize) {
290        Self::split_progress_parts(
291            split.current_segment_index,
292            split.total_segments,
293            split.current_received_parts,
294            split.current_total_parts,
295            sdu,
296        )
297    }
298
299    fn outgoing_split_progress(split: &OutgoingSplitTransfer, sdu: usize) -> (usize, usize) {
300        Self::split_progress_parts(
301            split.current_segment_index,
302            split.total_segments,
303            split.current_sent_parts,
304            split.current_total_parts,
305            sdu,
306        )
307    }
308
309    /// Create a new empty link manager.
310    pub fn new() -> Self {
311        LinkManager {
312            links: HashMap::new(),
313            link_destinations: HashMap::new(),
314            request_handlers: Vec::new(),
315            management_paths: Vec::new(),
316        }
317    }
318
319    /// Register a path hash as a management path.
320    /// Management requests are returned as ManagementRequest actions
321    /// for the driver to handle (since they need access to engine state).
322    pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
323        if !self.management_paths.contains(&path_hash) {
324            self.management_paths.push(path_hash);
325        }
326    }
327
328    /// Get the derived session key for a link (needed for hole-punch token derivation).
329    pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
330        self.links
331            .get(link_id)
332            .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
333    }
334
335    /// Return best-known routing hint for link packets.
336    pub fn get_link_route_hint(&self, link_id: &LinkId) -> Option<LinkRouteHint> {
337        self.links.get(link_id).and_then(|link| {
338            link.route_interface.map(|interface| LinkRouteHint {
339                interface,
340                transport_id: link.route_transport_id,
341            })
342        })
343    }
344
345    /// Set the best-known outbound route for a link.
346    pub fn set_link_route_hint(
347        &mut self,
348        link_id: &LinkId,
349        interface: rns_core::transport::types::InterfaceId,
350        transport_id: Option<[u8; 16]>,
351    ) -> bool {
352        let Some(link) = self.links.get_mut(link_id) else {
353            return false;
354        };
355        link.route_interface = Some(interface);
356        link.route_transport_id = transport_id;
357        true
358    }
359
360    /// Register a destination that can accept incoming links.
361    pub fn register_link_destination(
362        &mut self,
363        dest_hash: [u8; 16],
364        sig_prv: Ed25519PrivateKey,
365        sig_pub_bytes: [u8; 32],
366        resource_strategy: ResourceStrategy,
367    ) {
368        self.link_destinations.insert(
369            dest_hash,
370            LinkDestination {
371                sig_prv,
372                sig_pub_bytes,
373                resource_strategy,
374            },
375        );
376    }
377
378    /// Deregister a link destination.
379    pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
380        self.link_destinations.remove(dest_hash);
381    }
382
383    /// Register a request handler for a given path.
384    ///
385    /// `path`: the request path string (e.g. "/status")
386    /// `allowed_list`: None = allow all, Some(list) = restrict to these identity hashes
387    /// `handler`: called with (link_id, path, request_data, remote_identity) -> Option<response>
388    pub fn register_request_handler<F>(
389        &mut self,
390        path: &str,
391        allowed_list: Option<Vec<[u8; 16]>>,
392        handler: F,
393    ) where
394        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
395            + Send
396            + 'static,
397    {
398        let path_hash = compute_path_hash(path);
399        self.request_handlers.push(RequestHandlerEntry {
400            path: path.to_string(),
401            path_hash,
402            allowed_list,
403            handler: Box::new(move |link_id, p, data, remote| {
404                handler(link_id, p, data, remote).map(RequestResponse::Bytes)
405            }),
406        });
407    }
408
409    /// Register a request handler that can return resource responses with metadata.
410    pub fn register_request_handler_response<F>(
411        &mut self,
412        path: &str,
413        allowed_list: Option<Vec<[u8; 16]>>,
414        handler: F,
415    ) where
416        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
417            + Send
418            + 'static,
419    {
420        let path_hash = compute_path_hash(path);
421        self.request_handlers.push(RequestHandlerEntry {
422            path: path.to_string(),
423            path_hash,
424            allowed_list,
425            handler: Box::new(handler),
426        });
427    }
428
429    /// Create an outbound link to a destination.
430    ///
431    /// `dest_sig_pub_bytes` is the destination's Ed25519 signing public key
432    /// (needed to verify LRPROOF). In Python this comes from the Destination's Identity.
433    ///
434    /// Returns `(link_id, actions)`. The first action will be a SendPacket with
435    /// the LINKREQUEST.
436    pub fn create_link(
437        &mut self,
438        dest_hash: &[u8; 16],
439        dest_sig_pub_bytes: &[u8; 32],
440        hops: u8,
441        mtu: u32,
442        rng: &mut dyn Rng,
443    ) -> (LinkId, Vec<LinkManagerAction>) {
444        let mode = LinkMode::Aes256Cbc;
445        let (mut engine, request_data) =
446            LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
447
448        // Build the LINKREQUEST packet to compute link_id
449        let flags = PacketFlags {
450            header_type: constants::HEADER_1,
451            context_flag: constants::FLAG_UNSET,
452            transport_type: constants::TRANSPORT_BROADCAST,
453            destination_type: constants::DESTINATION_SINGLE,
454            packet_type: constants::PACKET_TYPE_LINKREQUEST,
455        };
456
457        let packet = match RawPacket::pack(
458            flags,
459            0,
460            dest_hash,
461            None,
462            constants::CONTEXT_NONE,
463            &request_data,
464        ) {
465            Ok(p) => p,
466            Err(_) => {
467                // Should not happen with valid data
468                return ([0u8; 16], Vec::new());
469            }
470        };
471
472        engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
473        let link_id = *engine.link_id();
474
475        let managed = ManagedLink {
476            engine,
477            channel: None,
478            pending_channel_packets: HashMap::new(),
479            channel_send_ok: 0,
480            channel_send_not_ready: 0,
481            channel_send_too_big: 0,
482            channel_send_other_error: 0,
483            channel_messages_received: 0,
484            channel_proofs_sent: 0,
485            channel_proofs_received: 0,
486            dest_hash: *dest_hash,
487            remote_identity: None,
488            dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
489            incoming_resources: Vec::new(),
490            outgoing_resources: Vec::new(),
491            incoming_splits: HashMap::new(),
492            outgoing_splits: HashMap::new(),
493            resource_strategy: ResourceStrategy::default(),
494            route_interface: None,
495            route_transport_id: None,
496        };
497        self.links.insert(link_id, managed);
498
499        let mut actions = Vec::new();
500        // Register the link_id as a local destination so we can receive LRPROOF
501        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
502        // Send the LINKREQUEST packet
503        actions.push(LinkManagerAction::SendPacket {
504            raw: packet.raw,
505            dest_type: constants::DESTINATION_LINK,
506            attached_interface: None,
507        });
508
509        (link_id, actions)
510    }
511
512    /// Handle a packet delivered locally (via DeliverLocal).
513    ///
514    /// Returns actions for the driver to dispatch. The `dest_hash` is the
515    /// packet's destination_hash field. `raw` is the full packet bytes.
516    /// `packet_hash` is the SHA-256 hash.
517    pub fn handle_local_delivery(
518        &mut self,
519        dest_hash: [u8; 16],
520        raw: &[u8],
521        packet_hash: [u8; 32],
522        receiving_interface: rns_core::transport::types::InterfaceId,
523        rng: &mut dyn Rng,
524    ) -> Vec<LinkManagerAction> {
525        let packet = match RawPacket::unpack(raw) {
526            Ok(p) => p,
527            Err(_) => return Vec::new(),
528        };
529
530        match packet.flags.packet_type {
531            constants::PACKET_TYPE_LINKREQUEST => {
532                self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
533            }
534            constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
535                // LRPROOF: dest_hash is the link_id
536                self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
537            }
538            constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
539            constants::PACKET_TYPE_DATA => {
540                self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
541            }
542            _ => Vec::new(),
543        }
544    }
545
546    /// Handle an incoming LINKREQUEST packet.
547    fn handle_linkrequest(
548        &mut self,
549        dest_hash: &[u8; 16],
550        packet: &RawPacket,
551        receiving_interface: rns_core::transport::types::InterfaceId,
552        rng: &mut dyn Rng,
553    ) -> Vec<LinkManagerAction> {
554        // Look up the link destination
555        let ld = match self.link_destinations.get(dest_hash) {
556            Some(ld) => ld,
557            None => return Vec::new(),
558        };
559
560        let hashable = packet.get_hashable_part();
561        let now = time::now();
562
563        // Create responder engine
564        let (engine, lrproof_data) = match LinkEngine::new_responder(
565            &ld.sig_prv,
566            &ld.sig_pub_bytes,
567            &packet.data,
568            &hashable,
569            dest_hash,
570            packet.hops,
571            now,
572            rng,
573        ) {
574            Ok(r) => r,
575            Err(e) => {
576                log::debug!("LINKREQUEST rejected: {}", e);
577                return Vec::new();
578            }
579        };
580
581        let link_id = *engine.link_id();
582        log::debug!(
583            "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
584            &link_id[..4],
585            receiving_interface.0,
586            packet.flags.header_type,
587            packet.transport_id.is_some(),
588            packet.hops
589        );
590
591        let managed = ManagedLink {
592            engine,
593            channel: None,
594            pending_channel_packets: HashMap::new(),
595            channel_send_ok: 0,
596            channel_send_not_ready: 0,
597            channel_send_too_big: 0,
598            channel_send_other_error: 0,
599            channel_messages_received: 0,
600            channel_proofs_sent: 0,
601            channel_proofs_received: 0,
602            dest_hash: *dest_hash,
603            remote_identity: None,
604            dest_sig_pub_bytes: None,
605            incoming_resources: Vec::new(),
606            outgoing_resources: Vec::new(),
607            incoming_splits: HashMap::new(),
608            outgoing_splits: HashMap::new(),
609            resource_strategy: ld.resource_strategy,
610            route_interface: Some(receiving_interface),
611            route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
612                packet.transport_id
613            } else {
614                None
615            },
616        };
617        self.links.insert(link_id, managed);
618
619        // Build LRPROOF packet: type=PROOF, context=LRPROOF, dest=link_id
620        let flags = PacketFlags {
621            header_type: constants::HEADER_1,
622            context_flag: constants::FLAG_UNSET,
623            transport_type: constants::TRANSPORT_BROADCAST,
624            destination_type: constants::DESTINATION_LINK,
625            packet_type: constants::PACKET_TYPE_PROOF,
626        };
627
628        let mut actions = Vec::new();
629
630        // Register link_id as local destination so we receive link data
631        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
632
633        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
634            flags,
635            packet.hops,
636            &link_id,
637            None,
638            constants::CONTEXT_LRPROOF,
639            &lrproof_data,
640        ) {
641            log::debug!(
642                "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
643                &link_id[..4],
644                receiving_interface.0,
645                packet.transport_id.is_some(),
646                packet.hops
647            );
648            actions.push(LinkManagerAction::SendPacket {
649                raw,
650                dest_type: constants::DESTINATION_LINK,
651                attached_interface: None,
652            });
653        }
654
655        // Reticulum interop fallback #1: queue an LRPROOF variant with
656        // hop=0, matching peers that validate LRPROOF with hop=0 semantics.
657        if packet.hops != 0 {
658            if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
659                flags,
660                0,
661                &link_id,
662                None,
663                constants::CONTEXT_LRPROOF,
664                &lrproof_data,
665            ) {
666                log::debug!(
667                    "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
668                    &link_id[..4],
669                    receiving_interface.0
670                );
671                actions.push(LinkManagerAction::SendPacket {
672                    raw,
673                    dest_type: constants::DESTINATION_LINK,
674                    attached_interface: None,
675                });
676            }
677        }
678
679        // Reticulum interop fallback #2: queue an LRPROOF +1 hop variant for
680        // peers that validate against a remaining-hop value derived
681        // differently from destination-side delivery hops.
682        if packet.hops < u8::MAX {
683            let hops_plus_one = packet.hops + 1;
684            if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
685                flags,
686                hops_plus_one,
687                &link_id,
688                None,
689                constants::CONTEXT_LRPROOF,
690                &lrproof_data,
691            ) {
692                log::debug!(
693                    "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
694                    &link_id[..4],
695                    receiving_interface.0,
696                    hops_plus_one
697                );
698                actions.push(LinkManagerAction::SendPacket {
699                    raw,
700                    dest_type: constants::DESTINATION_LINK,
701                    attached_interface: None,
702                });
703            }
704        }
705
706        // Notify hook system about the incoming link request
707        actions.push(LinkManagerAction::LinkRequestReceived {
708            link_id,
709            receiving_interface,
710        });
711
712        actions
713    }
714
715    fn handle_link_proof(
716        &mut self,
717        link_id: &LinkId,
718        packet: &RawPacket,
719        rng: &mut dyn Rng,
720    ) -> Vec<LinkManagerAction> {
721        if packet.data.len() < 32 {
722            return Vec::new();
723        }
724
725        let mut tracked_hash = [0u8; 32];
726        tracked_hash.copy_from_slice(&packet.data[..32]);
727
728        let Some(link) = self.links.get_mut(link_id) else {
729            return Vec::new();
730        };
731        let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
732            return Vec::new();
733        };
734        link.channel_proofs_received += 1;
735        let Some(channel) = link.channel.as_mut() else {
736            return Vec::new();
737        };
738
739        let chan_actions = channel.packet_delivered(sequence);
740        let _ = channel;
741        let _ = link;
742        self.process_channel_actions(link_id, chan_actions, rng)
743    }
744
745    fn build_link_packet_proof(
746        &mut self,
747        link_id: &LinkId,
748        packet_hash: &[u8; 32],
749    ) -> Vec<LinkManagerAction> {
750        let dest_hash = match self.links.get(link_id) {
751            Some(link) => link.dest_hash,
752            None => return Vec::new(),
753        };
754        let Some(ld) = self.link_destinations.get(&dest_hash) else {
755            return Vec::new();
756        };
757        if let Some(link) = self.links.get_mut(link_id) {
758            link.channel_proofs_sent += 1;
759        }
760
761        let signature = ld.sig_prv.sign(packet_hash);
762        let mut proof_data = Vec::with_capacity(96);
763        proof_data.extend_from_slice(packet_hash);
764        proof_data.extend_from_slice(&signature);
765
766        let flags = PacketFlags {
767            header_type: constants::HEADER_1,
768            context_flag: constants::FLAG_UNSET,
769            transport_type: constants::TRANSPORT_BROADCAST,
770            destination_type: constants::DESTINATION_LINK,
771            packet_type: constants::PACKET_TYPE_PROOF,
772        };
773        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
774            flags,
775            0,
776            link_id,
777            None,
778            constants::CONTEXT_NONE,
779            &proof_data,
780        ) {
781            vec![LinkManagerAction::SendPacket {
782                raw,
783                dest_type: constants::DESTINATION_LINK,
784                attached_interface: None,
785            }]
786        } else {
787            Vec::new()
788        }
789    }
790
791    /// Handle an incoming LRPROOF packet (initiator side).
792    fn handle_lrproof(
793        &mut self,
794        link_id_bytes: &[u8; 16],
795        packet: &RawPacket,
796        receiving_interface: rns_core::transport::types::InterfaceId,
797        rng: &mut dyn Rng,
798    ) -> Vec<LinkManagerAction> {
799        let link = match self.links.get_mut(link_id_bytes) {
800            Some(l) => l,
801            None => return Vec::new(),
802        };
803
804        link.route_interface = Some(receiving_interface);
805        if packet.flags.header_type == constants::HEADER_2 {
806            if let Some(transport_id) = packet.transport_id {
807                link.route_transport_id = Some(transport_id);
808            }
809        }
810        log::debug!(
811            "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
812            &link_id_bytes[..4],
813            receiving_interface.0,
814            packet.flags.header_type,
815            packet.transport_id.is_some()
816        );
817
818        if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
819            return Vec::new();
820        }
821
822        // The destination's signing pub key was stored when create_link was called
823        let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
824            Some(b) => b,
825            None => {
826                log::debug!("LRPROOF: no destination signing key available");
827                return Vec::new();
828            }
829        };
830
831        let now = time::now();
832        let (lrrtt_encrypted, link_actions) =
833            match link
834                .engine
835                .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
836            {
837                Ok(r) => r,
838                Err(e) => {
839                    log::debug!("LRPROOF validation failed: {}", e);
840                    return Vec::new();
841                }
842            };
843
844        let link_id = *link.engine.link_id();
845        let mut actions = Vec::new();
846
847        // Process link actions (StateChanged, LinkEstablished)
848        actions.extend(self.process_link_actions(&link_id, &link_actions));
849
850        // Send LRRTT: type=DATA, context=LRRTT, dest=link_id
851        let flags = PacketFlags {
852            header_type: constants::HEADER_1,
853            context_flag: constants::FLAG_UNSET,
854            transport_type: constants::TRANSPORT_BROADCAST,
855            destination_type: constants::DESTINATION_LINK,
856            packet_type: constants::PACKET_TYPE_DATA,
857        };
858
859        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
860            flags,
861            0,
862            &link_id,
863            None,
864            constants::CONTEXT_LRRTT,
865            &lrrtt_encrypted,
866        ) {
867            actions.push(LinkManagerAction::SendPacket {
868                raw,
869                dest_type: constants::DESTINATION_LINK,
870                attached_interface: None,
871            });
872        }
873
874        // Initialize channel now that link is active
875        if let Some(link) = self.links.get_mut(&link_id) {
876            if link.engine.state() == LinkState::Active {
877                let rtt = link.engine.rtt().unwrap_or(1.0);
878                link.channel = Some(Channel::new(rtt));
879            }
880        }
881
882        actions
883    }
884
885    /// Handle DATA packets on an established link.
886    ///
887    /// Structured to avoid borrow checker issues: we perform engine operations
888    /// on the link, collect intermediate results, drop the mutable borrow, then
889    /// call self methods that need immutable access.
890    fn handle_link_data(
891        &mut self,
892        link_id_bytes: &[u8; 16],
893        packet: &RawPacket,
894        packet_hash: [u8; 32],
895        receiving_interface: rns_core::transport::types::InterfaceId,
896        rng: &mut dyn Rng,
897    ) -> Vec<LinkManagerAction> {
898        // First pass: perform engine operations, collect results
899        enum LinkDataResult<'a> {
900            Lrrtt {
901                link_id: LinkId,
902                link_actions: Vec<LinkAction>,
903            },
904            Identify {
905                link_id: LinkId,
906                link_actions: Vec<LinkAction>,
907            },
908            Keepalive {
909                link_id: LinkId,
910                inbound_actions: Vec<LinkAction>,
911            },
912            LinkClose {
913                link_id: LinkId,
914                teardown_actions: Vec<LinkAction>,
915            },
916            Channel {
917                link_id: LinkId,
918                inbound_actions: Vec<LinkAction>,
919                plaintext: Vec<u8>,
920                packet_hash: [u8; 32],
921            },
922            Request {
923                link_id: LinkId,
924                inbound_actions: Vec<LinkAction>,
925                plaintext: Vec<u8>,
926                request_id: [u8; 16],
927            },
928            Response {
929                link_id: LinkId,
930                inbound_actions: Vec<LinkAction>,
931                plaintext: Vec<u8>,
932            },
933            Generic {
934                link_id: LinkId,
935                inbound_actions: Vec<LinkAction>,
936                plaintext: Vec<u8>,
937                context: u8,
938                packet_hash: [u8; 32],
939            },
940            /// Resource advertisement (link-decrypted).
941            ResourceAdv {
942                link_id: LinkId,
943                inbound_actions: Vec<LinkAction>,
944                plaintext: Vec<u8>,
945            },
946            /// Resource part request (link-decrypted).
947            ResourceReq {
948                link_id: LinkId,
949                inbound_actions: Vec<LinkAction>,
950                plaintext: Vec<u8>,
951            },
952            /// Resource hashmap update (link-decrypted).
953            ResourceHmu {
954                link_id: LinkId,
955                inbound_actions: Vec<LinkAction>,
956                plaintext: Vec<u8>,
957            },
958            /// Resource part data (NOT link-decrypted; parts are pre-encrypted by ResourceSender).
959            ResourcePart {
960                link_id: LinkId,
961                inbound_actions: Vec<LinkAction>,
962                raw_data: &'a [u8],
963            },
964            /// Resource proof (feed to sender).
965            ResourcePrf {
966                link_id: LinkId,
967                inbound_actions: Vec<LinkAction>,
968                plaintext: Vec<u8>,
969            },
970            /// Resource cancel from initiator (link-decrypted).
971            ResourceIcl {
972                link_id: LinkId,
973                inbound_actions: Vec<LinkAction>,
974            },
975            /// Resource cancel from receiver (link-decrypted).
976            ResourceRcl {
977                link_id: LinkId,
978                inbound_actions: Vec<LinkAction>,
979            },
980            Error,
981        }
982
983        let result = {
984            let link = match self.links.get_mut(link_id_bytes) {
985                Some(l) => l,
986                None => return Vec::new(),
987            };
988
989            link.route_interface = Some(receiving_interface);
990            if packet.flags.header_type == constants::HEADER_2 {
991                if let Some(transport_id) = packet.transport_id {
992                    link.route_transport_id = Some(transport_id);
993                }
994            } else {
995                link.route_transport_id = None;
996            }
997
998            match packet.context {
999                constants::CONTEXT_LRRTT => {
1000                    match link.engine.handle_lrrtt(&packet.data, time::now()) {
1001                        Ok(link_actions) => {
1002                            let link_id = *link.engine.link_id();
1003                            LinkDataResult::Lrrtt {
1004                                link_id,
1005                                link_actions,
1006                            }
1007                        }
1008                        Err(e) => {
1009                            log::debug!("LRRTT handling failed: {}", e);
1010                            LinkDataResult::Error
1011                        }
1012                    }
1013                }
1014                constants::CONTEXT_LINKIDENTIFY => {
1015                    match link.engine.handle_identify(&packet.data) {
1016                        Ok(link_actions) => {
1017                            let link_id = *link.engine.link_id();
1018                            link.remote_identity = link.engine.remote_identity().cloned();
1019                            LinkDataResult::Identify {
1020                                link_id,
1021                                link_actions,
1022                            }
1023                        }
1024                        Err(e) => {
1025                            log::debug!("LINKIDENTIFY failed: {}", e);
1026                            LinkDataResult::Error
1027                        }
1028                    }
1029                }
1030                constants::CONTEXT_KEEPALIVE => {
1031                    let inbound_actions = link.engine.record_inbound(time::now());
1032                    let link_id = *link.engine.link_id();
1033                    LinkDataResult::Keepalive {
1034                        link_id,
1035                        inbound_actions,
1036                    }
1037                }
1038                constants::CONTEXT_LINKCLOSE => {
1039                    let teardown_actions = link.engine.handle_teardown();
1040                    let link_id = *link.engine.link_id();
1041                    LinkDataResult::LinkClose {
1042                        link_id,
1043                        teardown_actions,
1044                    }
1045                }
1046                constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
1047                    Ok(plaintext) => {
1048                        let inbound_actions = link.engine.record_inbound(time::now());
1049                        let link_id = *link.engine.link_id();
1050                        LinkDataResult::Channel {
1051                            link_id,
1052                            inbound_actions,
1053                            plaintext,
1054                            packet_hash,
1055                        }
1056                    }
1057                    Err(_) => LinkDataResult::Error,
1058                },
1059                constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
1060                    Ok(plaintext) => {
1061                        let inbound_actions = link.engine.record_inbound(time::now());
1062                        let link_id = *link.engine.link_id();
1063                        let request_id = packet.get_truncated_hash();
1064                        LinkDataResult::Request {
1065                            link_id,
1066                            inbound_actions,
1067                            plaintext,
1068                            request_id,
1069                        }
1070                    }
1071                    Err(_) => LinkDataResult::Error,
1072                },
1073                constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
1074                    Ok(plaintext) => {
1075                        let inbound_actions = link.engine.record_inbound(time::now());
1076                        let link_id = *link.engine.link_id();
1077                        LinkDataResult::Response {
1078                            link_id,
1079                            inbound_actions,
1080                            plaintext,
1081                        }
1082                    }
1083                    Err(_) => LinkDataResult::Error,
1084                },
1085                // --- Resource contexts ---
1086                constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
1087                    Ok(plaintext) => {
1088                        let inbound_actions = link.engine.record_inbound(time::now());
1089                        let link_id = *link.engine.link_id();
1090                        LinkDataResult::ResourceAdv {
1091                            link_id,
1092                            inbound_actions,
1093                            plaintext,
1094                        }
1095                    }
1096                    Err(_) => LinkDataResult::Error,
1097                },
1098                constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
1099                    Ok(plaintext) => {
1100                        let inbound_actions = link.engine.record_inbound(time::now());
1101                        let link_id = *link.engine.link_id();
1102                        LinkDataResult::ResourceReq {
1103                            link_id,
1104                            inbound_actions,
1105                            plaintext,
1106                        }
1107                    }
1108                    Err(_) => LinkDataResult::Error,
1109                },
1110                constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
1111                    Ok(plaintext) => {
1112                        let inbound_actions = link.engine.record_inbound(time::now());
1113                        let link_id = *link.engine.link_id();
1114                        LinkDataResult::ResourceHmu {
1115                            link_id,
1116                            inbound_actions,
1117                            plaintext,
1118                        }
1119                    }
1120                    Err(_) => LinkDataResult::Error,
1121                },
1122                constants::CONTEXT_RESOURCE => {
1123                    // Resource parts are NOT link-decrypted — they're pre-encrypted by ResourceSender
1124                    let inbound_actions = link.engine.record_inbound(time::now());
1125                    let link_id = *link.engine.link_id();
1126                    LinkDataResult::ResourcePart {
1127                        link_id,
1128                        inbound_actions,
1129                        raw_data: &packet.data,
1130                    }
1131                }
1132                constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
1133                    Ok(plaintext) => {
1134                        let inbound_actions = link.engine.record_inbound(time::now());
1135                        let link_id = *link.engine.link_id();
1136                        LinkDataResult::ResourcePrf {
1137                            link_id,
1138                            inbound_actions,
1139                            plaintext,
1140                        }
1141                    }
1142                    Err(_) => LinkDataResult::Error,
1143                },
1144                constants::CONTEXT_RESOURCE_ICL => {
1145                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1146                    let inbound_actions = link.engine.record_inbound(time::now());
1147                    let link_id = *link.engine.link_id();
1148                    LinkDataResult::ResourceIcl {
1149                        link_id,
1150                        inbound_actions,
1151                    }
1152                }
1153                constants::CONTEXT_RESOURCE_RCL => {
1154                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1155                    let inbound_actions = link.engine.record_inbound(time::now());
1156                    let link_id = *link.engine.link_id();
1157                    LinkDataResult::ResourceRcl {
1158                        link_id,
1159                        inbound_actions,
1160                    }
1161                }
1162                _ => match link.engine.decrypt(&packet.data) {
1163                    Ok(plaintext) => {
1164                        let inbound_actions = link.engine.record_inbound(time::now());
1165                        let link_id = *link.engine.link_id();
1166                        LinkDataResult::Generic {
1167                            link_id,
1168                            inbound_actions,
1169                            plaintext,
1170                            context: packet.context,
1171                            packet_hash,
1172                        }
1173                    }
1174                    Err(_) => LinkDataResult::Error,
1175                },
1176            }
1177        }; // mutable borrow of self.links dropped here
1178
1179        // Second pass: process results using self methods
1180        let mut actions = Vec::new();
1181        match result {
1182            LinkDataResult::Lrrtt {
1183                link_id,
1184                link_actions,
1185            } => {
1186                actions.extend(self.process_link_actions(&link_id, &link_actions));
1187                // Initialize channel
1188                if let Some(link) = self.links.get_mut(&link_id) {
1189                    if link.engine.state() == LinkState::Active {
1190                        let rtt = link.engine.rtt().unwrap_or(1.0);
1191                        link.channel = Some(Channel::new(rtt));
1192                    }
1193                }
1194            }
1195            LinkDataResult::Identify {
1196                link_id,
1197                link_actions,
1198            } => {
1199                actions.extend(self.process_link_actions(&link_id, &link_actions));
1200            }
1201            LinkDataResult::Keepalive {
1202                link_id,
1203                inbound_actions,
1204            } => {
1205                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1206                // record_inbound() already updated last_inbound, so the link
1207                // won't go stale.  The regular tick() keepalive mechanism will
1208                // send keepalives when needs_keepalive() returns true.
1209                // Do NOT reply here — unconditional replies create an infinite
1210                // ping-pong loop between the two link endpoints.
1211            }
1212            LinkDataResult::LinkClose {
1213                link_id,
1214                teardown_actions,
1215            } => {
1216                actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1217            }
1218            LinkDataResult::Channel {
1219                link_id,
1220                inbound_actions,
1221                plaintext,
1222                packet_hash,
1223            } => {
1224                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1225                // Feed plaintext to channel
1226                if let Some(link) = self.links.get_mut(&link_id) {
1227                    if let Some(ref mut channel) = link.channel {
1228                        let chan_actions = channel.receive(&plaintext, time::now());
1229                        link.channel_messages_received += chan_actions
1230                            .iter()
1231                            .filter(|action| {
1232                                matches!(
1233                                    action,
1234                                    rns_core::channel::ChannelAction::MessageReceived { .. }
1235                                )
1236                            })
1237                            .count()
1238                            as u64;
1239                        // process_channel_actions needs immutable self, so collect first
1240                        let _ = link;
1241                        actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1242                    }
1243                }
1244                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1245            }
1246            LinkDataResult::Request {
1247                link_id,
1248                inbound_actions,
1249                plaintext,
1250                request_id,
1251            } => {
1252                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1253                actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1254            }
1255            LinkDataResult::Response {
1256                link_id,
1257                inbound_actions,
1258                plaintext,
1259            } => {
1260                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1261                // Unpack msgpack response: [Bin(request_id), response_value]
1262                actions.extend(self.handle_response(&link_id, &plaintext, None));
1263            }
1264            LinkDataResult::Generic {
1265                link_id,
1266                inbound_actions,
1267                plaintext,
1268                context,
1269                packet_hash,
1270            } => {
1271                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1272                actions.push(LinkManagerAction::LinkDataReceived {
1273                    link_id,
1274                    context,
1275                    data: plaintext,
1276                });
1277
1278                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1279            }
1280            LinkDataResult::ResourceAdv {
1281                link_id,
1282                inbound_actions,
1283                plaintext,
1284            } => {
1285                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1286                actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1287            }
1288            LinkDataResult::ResourceReq {
1289                link_id,
1290                inbound_actions,
1291                plaintext,
1292            } => {
1293                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1294                actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1295            }
1296            LinkDataResult::ResourceHmu {
1297                link_id,
1298                inbound_actions,
1299                plaintext,
1300            } => {
1301                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1302                actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1303            }
1304            LinkDataResult::ResourcePart {
1305                link_id,
1306                inbound_actions,
1307                raw_data,
1308            } => {
1309                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1310                actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1311            }
1312            LinkDataResult::ResourcePrf {
1313                link_id,
1314                inbound_actions,
1315                plaintext,
1316            } => {
1317                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1318                actions.extend(self.handle_resource_prf(&link_id, &plaintext, rng));
1319            }
1320            LinkDataResult::ResourceIcl {
1321                link_id,
1322                inbound_actions,
1323            } => {
1324                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1325                actions.extend(self.handle_resource_icl(&link_id));
1326            }
1327            LinkDataResult::ResourceRcl {
1328                link_id,
1329                inbound_actions,
1330            } => {
1331                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1332                actions.extend(self.handle_resource_rcl(&link_id));
1333            }
1334            LinkDataResult::Error => {}
1335        }
1336
1337        actions
1338    }
1339
1340    /// Handle a request on a link.
1341    fn handle_request(
1342        &mut self,
1343        link_id: &LinkId,
1344        plaintext: &[u8],
1345        request_id: [u8; 16],
1346        rng: &mut dyn Rng,
1347    ) -> Vec<LinkManagerAction> {
1348        use rns_core::msgpack::{self, Value};
1349
1350        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1351        let arr = match msgpack::unpack_exact(plaintext) {
1352            Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1353            _ => return Vec::new(),
1354        };
1355
1356        let path_hash_bytes = match &arr[1] {
1357            Value::Bin(b) if b.len() == 16 => b,
1358            _ => return Vec::new(),
1359        };
1360        let mut path_hash = [0u8; 16];
1361        path_hash.copy_from_slice(path_hash_bytes);
1362
1363        // Re-encode the data element for the handler
1364        let request_data = msgpack::pack(&arr[2]);
1365
1366        // Check if this is a management path (handled by the driver)
1367        if self.management_paths.contains(&path_hash) {
1368            let remote_identity = self
1369                .links
1370                .get(link_id)
1371                .and_then(|l| l.remote_identity)
1372                .map(|(h, k)| (h, k));
1373            return vec![LinkManagerAction::ManagementRequest {
1374                link_id: *link_id,
1375                path_hash,
1376                data: request_data,
1377                request_id,
1378                remote_identity,
1379            }];
1380        }
1381
1382        // Look up handler by path_hash
1383        let handler_idx = self
1384            .request_handlers
1385            .iter()
1386            .position(|h| h.path_hash == path_hash);
1387        let handler_idx = match handler_idx {
1388            Some(i) => i,
1389            None => return Vec::new(),
1390        };
1391
1392        // Check ACL
1393        let remote_identity = self
1394            .links
1395            .get(link_id)
1396            .and_then(|l| l.remote_identity.as_ref());
1397        let handler = &self.request_handlers[handler_idx];
1398        if let Some(ref allowed) = handler.allowed_list {
1399            match remote_identity {
1400                Some((identity_hash, _)) => {
1401                    if !allowed.contains(identity_hash) {
1402                        log::debug!("Request denied: identity not in allowed list");
1403                        return Vec::new();
1404                    }
1405                }
1406                None => {
1407                    log::debug!("Request denied: peer not identified");
1408                    return Vec::new();
1409                }
1410            }
1411        }
1412
1413        // Call handler
1414        let path = handler.path.clone();
1415        let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1416
1417        let mut actions = Vec::new();
1418        if let Some(response) = response {
1419            match response {
1420                RequestResponse::Bytes(response_data) => {
1421                    let mut response_actions =
1422                        self.build_response_packet(link_id, &request_id, &response_data, rng);
1423                    if response_actions.is_empty() {
1424                        response_actions.extend(self.send_response_resource(
1425                            link_id,
1426                            &request_id,
1427                            &response_data,
1428                            None,
1429                            true,
1430                            rng,
1431                        ));
1432                    }
1433                    actions.extend(response_actions);
1434                }
1435                RequestResponse::Resource {
1436                    data,
1437                    metadata,
1438                    auto_compress,
1439                } => {
1440                    actions.extend(self.send_response_resource(
1441                        link_id,
1442                        &request_id,
1443                        &data,
1444                        metadata.as_deref(),
1445                        auto_compress,
1446                        rng,
1447                    ));
1448                }
1449            }
1450        }
1451
1452        actions
1453    }
1454
1455    /// Build a response packet for a request.
1456    /// `response_data` is the msgpack-encoded response value.
1457    fn build_response_packet(
1458        &self,
1459        link_id: &LinkId,
1460        request_id: &[u8; 16],
1461        response_data: &[u8],
1462        rng: &mut dyn Rng,
1463    ) -> Vec<LinkManagerAction> {
1464        use rns_core::msgpack::{self, Value};
1465
1466        let response_value = msgpack::unpack_exact(response_data)
1467            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1468
1469        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1470        let response_plaintext = msgpack::pack(&response_array);
1471
1472        let mut actions = Vec::new();
1473        if let Some(link) = self.links.get(link_id) {
1474            if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1475                let flags = PacketFlags {
1476                    header_type: constants::HEADER_1,
1477                    context_flag: constants::FLAG_UNSET,
1478                    transport_type: constants::TRANSPORT_BROADCAST,
1479                    destination_type: constants::DESTINATION_LINK,
1480                    packet_type: constants::PACKET_TYPE_DATA,
1481                };
1482                let max_mtu = link.engine.mtu() as usize;
1483                if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
1484                    flags,
1485                    0,
1486                    link_id,
1487                    None,
1488                    constants::CONTEXT_RESPONSE,
1489                    &encrypted,
1490                    max_mtu,
1491                ) {
1492                    actions.push(LinkManagerAction::SendPacket {
1493                        raw,
1494                        dest_type: constants::DESTINATION_LINK,
1495                        attached_interface: None,
1496                    });
1497                }
1498            }
1499        }
1500        actions
1501    }
1502
1503    fn send_response_resource(
1504        &mut self,
1505        link_id: &LinkId,
1506        request_id: &[u8; 16],
1507        response_data: &[u8],
1508        metadata: Option<&[u8]>,
1509        auto_compress: bool,
1510        rng: &mut dyn Rng,
1511    ) -> Vec<LinkManagerAction> {
1512        use rns_core::msgpack::{self, Value};
1513
1514        let link = match self.links.get_mut(link_id) {
1515            Some(l) => l,
1516            None => return Vec::new(),
1517        };
1518
1519        if link.engine.state() != LinkState::Active {
1520            return Vec::new();
1521        }
1522
1523        let now = time::now();
1524
1525        // Match Python resource response format from Link.handle_request:
1526        // packed_response = msgpack([request_id, response_value])
1527        // where response_value is decoded msgpack value, or Bin(raw bytes).
1528        let response_value = msgpack::unpack_exact(response_data)
1529            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1530        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1531        let resource_payload = msgpack::pack(&response_array);
1532
1533        let senders = match Self::build_resource_senders(
1534            link,
1535            &resource_payload,
1536            metadata,
1537            auto_compress,
1538            true, // is_response
1539            Some(request_id.to_vec()),
1540            rng,
1541            now,
1542        ) {
1543            Ok(s) => s,
1544            Err(e) => {
1545                log::debug!("Failed to create response ResourceSender: {}", e);
1546                return Vec::new();
1547            }
1548        };
1549
1550        let adv_actions = Self::start_resource_senders(link, senders, now);
1551
1552        let _ = link;
1553        self.process_resource_actions(link_id, adv_actions, rng)
1554    }
1555
1556    /// Send a management response on a link.
1557    /// Called by the driver after building the response for a ManagementRequest.
1558    pub fn send_management_response(
1559        &mut self,
1560        link_id: &LinkId,
1561        request_id: &[u8; 16],
1562        response_data: &[u8],
1563        rng: &mut dyn Rng,
1564    ) -> Vec<LinkManagerAction> {
1565        let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1566        if actions.is_empty() {
1567            actions.extend(self.send_response_resource(
1568                link_id,
1569                request_id,
1570                response_data,
1571                None,
1572                true,
1573                rng,
1574            ));
1575        }
1576        actions
1577    }
1578
1579    /// Send a request on a link.
1580    ///
1581    /// `data` is the msgpack-encoded request data value (e.g. msgpack([True]) for /status).
1582    ///
1583    /// Uses Python-compatible format: plaintext = msgpack([timestamp, path_hash_bytes, data_value]).
1584    /// Returns actions (the encrypted request packet). The response will arrive
1585    /// later via handle_local_delivery with CONTEXT_RESPONSE.
1586    pub fn send_request(
1587        &self,
1588        link_id: &LinkId,
1589        path: &str,
1590        data: &[u8],
1591        rng: &mut dyn Rng,
1592    ) -> Vec<LinkManagerAction> {
1593        use rns_core::msgpack::{self, Value};
1594
1595        let link = match self.links.get(link_id) {
1596            Some(l) => l,
1597            None => return Vec::new(),
1598        };
1599
1600        if link.engine.state() != LinkState::Active {
1601            return Vec::new();
1602        }
1603
1604        let path_hash = compute_path_hash(path);
1605
1606        // Decode data bytes to msgpack Value (or use Bin if can't decode)
1607        let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1608
1609        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1610        let request_array = Value::Array(vec![
1611            Value::Float(time::now()),
1612            Value::Bin(path_hash.to_vec()),
1613            data_value,
1614        ]);
1615        let plaintext = msgpack::pack(&request_array);
1616
1617        let encrypted = match link.engine.encrypt(&plaintext, rng) {
1618            Ok(e) => e,
1619            Err(_) => return Vec::new(),
1620        };
1621
1622        let flags = PacketFlags {
1623            header_type: constants::HEADER_1,
1624            context_flag: constants::FLAG_UNSET,
1625            transport_type: constants::TRANSPORT_BROADCAST,
1626            destination_type: constants::DESTINATION_LINK,
1627            packet_type: constants::PACKET_TYPE_DATA,
1628        };
1629
1630        let mut actions = Vec::new();
1631        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1632            flags,
1633            0,
1634            link_id,
1635            None,
1636            constants::CONTEXT_REQUEST,
1637            &encrypted,
1638        ) {
1639            actions.push(LinkManagerAction::SendPacket {
1640                raw,
1641                dest_type: constants::DESTINATION_LINK,
1642                attached_interface: None,
1643            });
1644        }
1645        actions
1646    }
1647
1648    /// Send encrypted data on a link with a given context.
1649    pub fn send_on_link(
1650        &self,
1651        link_id: &LinkId,
1652        plaintext: &[u8],
1653        context: u8,
1654        rng: &mut dyn Rng,
1655    ) -> Vec<LinkManagerAction> {
1656        let link = match self.links.get(link_id) {
1657            Some(l) => l,
1658            None => return Vec::new(),
1659        };
1660
1661        if link.engine.state() != LinkState::Active {
1662            return Vec::new();
1663        }
1664
1665        let encrypted = match link.engine.encrypt(plaintext, rng) {
1666            Ok(e) => e,
1667            Err(_) => return Vec::new(),
1668        };
1669
1670        let flags = PacketFlags {
1671            header_type: constants::HEADER_1,
1672            context_flag: constants::FLAG_UNSET,
1673            transport_type: constants::TRANSPORT_BROADCAST,
1674            destination_type: constants::DESTINATION_LINK,
1675            packet_type: constants::PACKET_TYPE_DATA,
1676        };
1677
1678        let mut actions = Vec::new();
1679        if let Ok((raw, _packet_hash)) =
1680            RawPacket::pack_raw_with_hash(flags, 0, link_id, None, context, &encrypted)
1681        {
1682            actions.push(LinkManagerAction::SendPacket {
1683                raw,
1684                dest_type: constants::DESTINATION_LINK,
1685                attached_interface: None,
1686            });
1687        }
1688        actions
1689    }
1690
1691    /// Send an identify message on a link (initiator reveals identity to responder).
1692    pub fn identify(
1693        &self,
1694        link_id: &LinkId,
1695        identity: &rns_crypto::identity::Identity,
1696        rng: &mut dyn Rng,
1697    ) -> Vec<LinkManagerAction> {
1698        let link = match self.links.get(link_id) {
1699            Some(l) => l,
1700            None => return Vec::new(),
1701        };
1702
1703        let encrypted = match link.engine.build_identify(identity, rng) {
1704            Ok(e) => e,
1705            Err(_) => return Vec::new(),
1706        };
1707
1708        let flags = PacketFlags {
1709            header_type: constants::HEADER_1,
1710            context_flag: constants::FLAG_UNSET,
1711            transport_type: constants::TRANSPORT_BROADCAST,
1712            destination_type: constants::DESTINATION_LINK,
1713            packet_type: constants::PACKET_TYPE_DATA,
1714        };
1715
1716        let mut actions = Vec::new();
1717        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1718            flags,
1719            0,
1720            link_id,
1721            None,
1722            constants::CONTEXT_LINKIDENTIFY,
1723            &encrypted,
1724        ) {
1725            actions.push(LinkManagerAction::SendPacket {
1726                raw,
1727                dest_type: constants::DESTINATION_LINK,
1728                attached_interface: None,
1729            });
1730        }
1731        actions
1732    }
1733
1734    /// Tear down a link.
1735    pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1736        let link = match self.links.get_mut(link_id) {
1737            Some(l) => l,
1738            None => return Vec::new(),
1739        };
1740
1741        let teardown_actions = link.engine.teardown();
1742        if let Some(ref mut channel) = link.channel {
1743            channel.shutdown();
1744        }
1745
1746        let mut actions = self.process_link_actions(link_id, &teardown_actions);
1747
1748        // Send LINKCLOSE packet
1749        let flags = PacketFlags {
1750            header_type: constants::HEADER_1,
1751            context_flag: constants::FLAG_UNSET,
1752            transport_type: constants::TRANSPORT_BROADCAST,
1753            destination_type: constants::DESTINATION_LINK,
1754            packet_type: constants::PACKET_TYPE_DATA,
1755        };
1756        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1757            flags,
1758            0,
1759            link_id,
1760            None,
1761            constants::CONTEXT_LINKCLOSE,
1762            &[],
1763        ) {
1764            actions.push(LinkManagerAction::SendPacket {
1765                raw,
1766                dest_type: constants::DESTINATION_LINK,
1767                attached_interface: None,
1768            });
1769        }
1770
1771        actions
1772    }
1773
1774    /// Tear down all managed links.
1775    pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1776        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1777        let mut actions = Vec::new();
1778        for link_id in link_ids {
1779            actions.extend(self.teardown_link(&link_id));
1780        }
1781        actions
1782    }
1783
1784    /// Handle a response on a link.
1785    fn handle_response(
1786        &self,
1787        link_id: &LinkId,
1788        plaintext: &[u8],
1789        metadata: Option<Vec<u8>>,
1790    ) -> Vec<LinkManagerAction> {
1791        use rns_core::msgpack;
1792
1793        // Python-compatible response: msgpack([Bin(request_id), response_value])
1794        let arr = match msgpack::unpack_exact(plaintext) {
1795            Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1796            _ => return Vec::new(),
1797        };
1798
1799        let request_id_bytes = match &arr[0] {
1800            msgpack::Value::Bin(b) if b.len() == 16 => b,
1801            _ => return Vec::new(),
1802        };
1803        let mut request_id = [0u8; 16];
1804        request_id.copy_from_slice(request_id_bytes);
1805
1806        let response_data = msgpack::pack(&arr[1]);
1807
1808        vec![LinkManagerAction::ResponseReceived {
1809            link_id: *link_id,
1810            request_id,
1811            data: response_data,
1812            metadata,
1813        }]
1814    }
1815
1816    fn build_resource_senders(
1817        link: &ManagedLink,
1818        data: &[u8],
1819        metadata: Option<&[u8]>,
1820        auto_compress: bool,
1821        is_response: bool,
1822        request_id: Option<Vec<u8>>,
1823        rng: &mut dyn Rng,
1824        now: f64,
1825    ) -> Result<Vec<ResourceSender>, rns_core::resource::ResourceError> {
1826        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1827        let resource_sdu = Self::resource_sdu_for_link(link);
1828        let metadata_overhead = metadata.map(|m| 3 + m.len()).unwrap_or(0);
1829        let logical_size = metadata_overhead + data.len();
1830
1831        if logical_size <= constants::RESOURCE_MAX_EFFICIENT_SIZE {
1832            let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1833            let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1834                link.engine
1835                    .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1836                    .unwrap_or_else(|_| plaintext.to_vec())
1837            };
1838            return ResourceSender::new(
1839                data,
1840                metadata,
1841                resource_sdu,
1842                &encrypt_fn,
1843                &Bzip2Compressor,
1844                rng,
1845                now,
1846                auto_compress,
1847                is_response,
1848                request_id,
1849                1,
1850                1,
1851                None,
1852                link_rtt,
1853                6.0,
1854            )
1855            .map(|sender| vec![sender]);
1856        }
1857
1858        if metadata_overhead > constants::RESOURCE_MAX_EFFICIENT_SIZE {
1859            return Err(rns_core::resource::ResourceError::TooLarge);
1860        }
1861
1862        let first_payload_len = core::cmp::min(
1863            data.len(),
1864            constants::RESOURCE_MAX_EFFICIENT_SIZE - metadata_overhead,
1865        );
1866        let remaining = data.len().saturating_sub(first_payload_len);
1867        let total_segments = 1 + remaining.div_ceil(constants::RESOURCE_MAX_EFFICIENT_SIZE) as u64;
1868
1869        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1870        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1871            link.engine
1872                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1873                .unwrap_or_else(|_| plaintext.to_vec())
1874        };
1875
1876        let mut senders = Vec::new();
1877        let mut first = ResourceSender::new(
1878            &data[..first_payload_len],
1879            metadata,
1880            resource_sdu,
1881            &encrypt_fn,
1882            &Bzip2Compressor,
1883            rng,
1884            now,
1885            auto_compress,
1886            is_response,
1887            request_id.clone(),
1888            1,
1889            total_segments,
1890            None,
1891            link_rtt,
1892            6.0,
1893        )?;
1894        first.data_size = logical_size;
1895        let original_hash = first.original_hash;
1896        let has_metadata = metadata.is_some();
1897        senders.push(first);
1898
1899        let mut offset = first_payload_len;
1900        let mut segment_index = 2;
1901        while offset < data.len() {
1902            let end = core::cmp::min(offset + constants::RESOURCE_MAX_EFFICIENT_SIZE, data.len());
1903            let mut sender = ResourceSender::new(
1904                &data[offset..end],
1905                None,
1906                resource_sdu,
1907                &encrypt_fn,
1908                &Bzip2Compressor,
1909                rng,
1910                now,
1911                auto_compress,
1912                is_response,
1913                request_id.clone(),
1914                segment_index,
1915                total_segments,
1916                Some(original_hash),
1917                link_rtt,
1918                6.0,
1919            )?;
1920            sender.data_size = logical_size;
1921            sender.flags.has_metadata = has_metadata;
1922            senders.push(sender);
1923            segment_index += 1;
1924            offset = end;
1925        }
1926
1927        Ok(senders)
1928    }
1929
1930    fn start_resource_senders(
1931        link: &mut ManagedLink,
1932        mut senders: Vec<ResourceSender>,
1933        now: f64,
1934    ) -> Vec<ResourceAction> {
1935        if senders.is_empty() {
1936            return Vec::new();
1937        }
1938
1939        let mut first = senders.remove(0);
1940        let adv_actions = first.advertise(now);
1941
1942        if first.total_segments > 1 {
1943            let original_hash = first.original_hash;
1944            let split = OutgoingSplitTransfer {
1945                total_segments: first.total_segments,
1946                completed_segments: 0,
1947                current_segment_index: first.segment_index,
1948                current_sent_parts: 0,
1949                current_total_parts: first.total_parts(),
1950            };
1951            link.outgoing_splits.insert(original_hash, split);
1952        }
1953
1954        link.outgoing_resources.push(first);
1955        link.outgoing_resources.extend(senders);
1956        adv_actions
1957    }
1958
1959    /// Handle resource advertisement (CONTEXT_RESOURCE_ADV).
1960    fn handle_resource_adv(
1961        &mut self,
1962        link_id: &LinkId,
1963        adv_plaintext: &[u8],
1964        rng: &mut dyn Rng,
1965    ) -> Vec<LinkManagerAction> {
1966        let link = match self.links.get_mut(link_id) {
1967            Some(l) => l,
1968            None => return Vec::new(),
1969        };
1970
1971        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1972        let resource_sdu = Self::resource_sdu_for_link(link);
1973        let now = time::now();
1974
1975        let receiver = match ResourceReceiver::from_advertisement(
1976            adv_plaintext,
1977            resource_sdu,
1978            link_rtt,
1979            now,
1980            None,
1981            None,
1982        ) {
1983            Ok(r) => r,
1984            Err(e) => {
1985                log::debug!("Resource ADV rejected: {}", e);
1986                return Vec::new();
1987            }
1988        };
1989
1990        let strategy = link.resource_strategy;
1991        let resource_hash = receiver.resource_hash.clone();
1992        let transfer_size = receiver.transfer_size;
1993        let has_metadata = receiver.has_metadata;
1994        let is_response = receiver.flags.is_response;
1995        let is_split = receiver.flags.split;
1996        let segment_index = receiver.segment_index;
1997        let total_segments = receiver.total_segments;
1998        let original_hash = match Self::resource_hash_key(&receiver.original_hash) {
1999            Some(key) => key,
2000            None => return Vec::new(),
2001        };
2002
2003        if is_split && segment_index > 1 {
2004            let should_accept = link
2005                .incoming_splits
2006                .get(&original_hash)
2007                .is_some_and(|split| {
2008                    split.completed_segments + 1 == segment_index
2009                        && split.total_segments == total_segments
2010                });
2011
2012            if !should_accept {
2013                let reject_actions = {
2014                    let mut r = receiver;
2015                    r.reject()
2016                };
2017                let _ = link;
2018                return self.process_resource_actions(link_id, reject_actions, rng);
2019            }
2020
2021            let current_total_parts = receiver.total_parts;
2022            link.incoming_resources.push(receiver);
2023            let idx = link.incoming_resources.len() - 1;
2024            if let Some(split) = link.incoming_splits.get_mut(&original_hash) {
2025                split.current_segment_index = segment_index;
2026                split.current_received_parts = 0;
2027                split.current_total_parts = current_total_parts;
2028            }
2029            let resource_actions = link.incoming_resources[idx].accept(now);
2030            let _ = link;
2031            return self.process_resource_actions(link_id, resource_actions, rng);
2032        }
2033
2034        if is_response {
2035            // Response resources bypass the application acceptance strategy —
2036            // they are answers to pending requests, not independent resources.
2037            if is_split {
2038                link.incoming_splits.insert(
2039                    original_hash,
2040                    IncomingSplitTransfer {
2041                        total_segments,
2042                        completed_segments: 0,
2043                        current_segment_index: segment_index,
2044                        current_received_parts: 0,
2045                        current_total_parts: receiver.total_parts,
2046                        data: Vec::new(),
2047                        metadata: None,
2048                        is_response,
2049                    },
2050                );
2051            }
2052            link.incoming_resources.push(receiver);
2053            let idx = link.incoming_resources.len() - 1;
2054            let resource_actions = link.incoming_resources[idx].accept(now);
2055            let _ = link;
2056            return self.process_resource_actions(link_id, resource_actions, rng);
2057        }
2058
2059        match strategy {
2060            ResourceStrategy::AcceptNone => {
2061                // Reject: send RCL
2062                let reject_actions = {
2063                    let mut r = receiver;
2064                    r.reject()
2065                };
2066                self.process_resource_actions(link_id, reject_actions, rng)
2067            }
2068            ResourceStrategy::AcceptAll => {
2069                if is_split {
2070                    link.incoming_splits.insert(
2071                        original_hash,
2072                        IncomingSplitTransfer {
2073                            total_segments,
2074                            completed_segments: 0,
2075                            current_segment_index: segment_index,
2076                            current_received_parts: 0,
2077                            current_total_parts: receiver.total_parts,
2078                            data: Vec::new(),
2079                            metadata: None,
2080                            is_response,
2081                        },
2082                    );
2083                }
2084                link.incoming_resources.push(receiver);
2085                let idx = link.incoming_resources.len() - 1;
2086                let resource_actions = link.incoming_resources[idx].accept(now);
2087                let _ = link;
2088                self.process_resource_actions(link_id, resource_actions, rng)
2089            }
2090            ResourceStrategy::AcceptApp => {
2091                link.incoming_resources.push(receiver);
2092                // Query application callback
2093                vec![LinkManagerAction::ResourceAcceptQuery {
2094                    link_id: *link_id,
2095                    resource_hash,
2096                    transfer_size,
2097                    has_metadata,
2098                }]
2099            }
2100        }
2101    }
2102
2103    /// Accept or reject a pending resource (for AcceptApp strategy).
2104    pub fn accept_resource(
2105        &mut self,
2106        link_id: &LinkId,
2107        resource_hash: &[u8],
2108        accept: bool,
2109        rng: &mut dyn Rng,
2110    ) -> Vec<LinkManagerAction> {
2111        let link = match self.links.get_mut(link_id) {
2112            Some(l) => l,
2113            None => return Vec::new(),
2114        };
2115
2116        let now = time::now();
2117        let idx = link
2118            .incoming_resources
2119            .iter()
2120            .position(|r| r.resource_hash == resource_hash);
2121        let idx = match idx {
2122            Some(i) => i,
2123            None => return Vec::new(),
2124        };
2125
2126        if accept && link.incoming_resources[idx].flags.split {
2127            if let Some(original_hash) =
2128                Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2129            {
2130                link.incoming_splits
2131                    .entry(original_hash)
2132                    .or_insert_with(|| IncomingSplitTransfer {
2133                        total_segments: link.incoming_resources[idx].total_segments,
2134                        completed_segments: 0,
2135                        current_segment_index: link.incoming_resources[idx].segment_index,
2136                        current_received_parts: 0,
2137                        current_total_parts: link.incoming_resources[idx].total_parts,
2138                        data: Vec::new(),
2139                        metadata: None,
2140                        is_response: link.incoming_resources[idx].flags.is_response,
2141                    });
2142            }
2143        }
2144
2145        let resource_actions = if accept {
2146            link.incoming_resources[idx].accept(now)
2147        } else {
2148            link.incoming_resources[idx].reject()
2149        };
2150
2151        let _ = link;
2152        self.process_resource_actions(link_id, resource_actions, rng)
2153    }
2154
2155    /// Handle resource request (CONTEXT_RESOURCE_REQ) — feed to sender.
2156    fn handle_resource_req(
2157        &mut self,
2158        link_id: &LinkId,
2159        plaintext: &[u8],
2160        rng: &mut dyn Rng,
2161    ) -> Vec<LinkManagerAction> {
2162        let link = match self.links.get_mut(link_id) {
2163            Some(l) => l,
2164            None => return Vec::new(),
2165        };
2166
2167        let now = time::now();
2168        let mut all_actions = Vec::new();
2169        let mut progress_update = None;
2170        for sender in &mut link.outgoing_resources {
2171            if sender.flags.split && sender.status == rns_core::resource::ResourceStatus::Queued {
2172                continue;
2173            }
2174            let before_sent = sender.sent_parts;
2175            let resource_actions = sender.handle_request(plaintext, now);
2176            if !resource_actions.is_empty() {
2177                if sender.sent_parts != before_sent {
2178                    if sender.flags.split {
2179                        if let Some(split) = link.outgoing_splits.get_mut(&sender.original_hash) {
2180                            split.current_segment_index = sender.segment_index;
2181                            split.current_sent_parts = sender.sent_parts;
2182                            split.current_total_parts = sender.total_parts();
2183                            progress_update =
2184                                Some(Self::outgoing_split_progress(split, sender.sdu));
2185                        }
2186                    } else {
2187                        progress_update = Some((sender.sent_parts, sender.total_parts()));
2188                    }
2189                }
2190                all_actions.extend(resource_actions);
2191                break;
2192            }
2193        }
2194
2195        let _ = link;
2196        let mut out = self.process_resource_actions(link_id, all_actions, rng);
2197        if let Some((received, total)) = progress_update {
2198            out.push(LinkManagerAction::ResourceProgress {
2199                link_id: *link_id,
2200                received,
2201                total,
2202            });
2203        }
2204        out
2205    }
2206
2207    /// Handle resource HMU (CONTEXT_RESOURCE_HMU) — feed to receiver.
2208    fn handle_resource_hmu(
2209        &mut self,
2210        link_id: &LinkId,
2211        plaintext: &[u8],
2212        rng: &mut dyn Rng,
2213    ) -> Vec<LinkManagerAction> {
2214        let link = match self.links.get_mut(link_id) {
2215            Some(l) => l,
2216            None => return Vec::new(),
2217        };
2218
2219        let now = time::now();
2220        let mut all_actions = Vec::new();
2221        for receiver in &mut link.incoming_resources {
2222            let resource_actions = receiver.handle_hashmap_update(plaintext, now);
2223            if !resource_actions.is_empty() {
2224                all_actions.extend(resource_actions);
2225                break;
2226            }
2227        }
2228
2229        let _ = link;
2230        self.process_resource_actions(link_id, all_actions, rng)
2231    }
2232
2233    /// Handle resource part (CONTEXT_RESOURCE) — feed raw to receiver.
2234    fn handle_resource_part(
2235        &mut self,
2236        link_id: &LinkId,
2237        raw_data: &[u8],
2238        rng: &mut dyn Rng,
2239    ) -> Vec<LinkManagerAction> {
2240        let link = match self.links.get_mut(link_id) {
2241            Some(l) => l,
2242            None => return Vec::new(),
2243        };
2244
2245        let now = time::now();
2246        let resource_sdu = Self::resource_sdu_for_link(link);
2247        let mut all_actions = Vec::new();
2248        let mut assemble_idx = None;
2249        let mut assembled_is_response = false;
2250
2251        for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
2252            if receiver.status >= rns_core::resource::ResourceStatus::Complete {
2253                continue;
2254            }
2255            let resource_actions = receiver.receive_part(raw_data, now);
2256            if !resource_actions.is_empty() {
2257                if receiver.received_count == receiver.total_parts {
2258                    assemble_idx = Some(idx);
2259                }
2260                if receiver.flags.split {
2261                    if let Some(key) = Self::resource_hash_key(&receiver.original_hash) {
2262                        if let Some(split) = link.incoming_splits.get_mut(&key) {
2263                            split.current_segment_index = receiver.segment_index;
2264                            split.current_received_parts = receiver.received_count;
2265                            split.current_total_parts = receiver.total_parts;
2266                            let (received, total) =
2267                                Self::incoming_split_progress(split, resource_sdu);
2268                            for action in resource_actions {
2269                                match action {
2270                                    ResourceAction::ProgressUpdate { .. } => {
2271                                        all_actions.push(ResourceAction::ProgressUpdate {
2272                                            received,
2273                                            total,
2274                                        });
2275                                    }
2276                                    other => all_actions.push(other),
2277                                }
2278                            }
2279                        } else {
2280                            all_actions.extend(resource_actions);
2281                        }
2282                    } else {
2283                        all_actions.extend(resource_actions);
2284                    }
2285                } else {
2286                    all_actions.extend(resource_actions);
2287                }
2288                break;
2289            }
2290        }
2291
2292        if let Some(idx) = assemble_idx {
2293            let split_key = if link.incoming_resources[idx].flags.split {
2294                Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2295            } else {
2296                None
2297            };
2298            let split_segment_index = link.incoming_resources[idx].segment_index;
2299            let split_segment_total = link.incoming_resources[idx].total_segments;
2300            let split_segment_parts = link.incoming_resources[idx].total_parts;
2301            let split_is_response = link.incoming_resources[idx].flags.is_response;
2302            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2303                link.engine.decrypt(ciphertext).map_err(|_| ())
2304            };
2305            let mut assemble_actions =
2306                link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
2307            assembled_is_response = split_is_response;
2308
2309            if let Some(key) = split_key {
2310                let mut converted_actions = Vec::new();
2311                let mut segment_data = None;
2312                let mut segment_metadata = None;
2313                for action in assemble_actions {
2314                    match action {
2315                        ResourceAction::DataReceived { data, metadata } => {
2316                            segment_data = Some(data);
2317                            segment_metadata = metadata;
2318                        }
2319                        ResourceAction::Completed => {}
2320                        other => converted_actions.push(other),
2321                    }
2322                }
2323
2324                if let Some(data) = segment_data {
2325                    if let Some(split) = link.incoming_splits.get_mut(&key) {
2326                        split.data.extend_from_slice(&data);
2327                        if segment_metadata.is_some() {
2328                            split.metadata = segment_metadata;
2329                        }
2330                        split.completed_segments = split_segment_index;
2331                        split.current_segment_index = split_segment_index;
2332                        split.current_received_parts = split_segment_parts;
2333                        split.current_total_parts = split_segment_parts;
2334                    }
2335
2336                    if split_segment_index == split_segment_total {
2337                        if let Some(split) = link.incoming_splits.remove(&key) {
2338                            assembled_is_response = split.is_response;
2339                            converted_actions.push(ResourceAction::DataReceived {
2340                                data: split.data,
2341                                metadata: split.metadata,
2342                            });
2343                            converted_actions.push(ResourceAction::Completed);
2344                        }
2345                    }
2346                }
2347
2348                assemble_actions = converted_actions;
2349            }
2350            all_actions.extend(assemble_actions);
2351        }
2352
2353        let _ = link;
2354        let mut out = self.process_resource_actions(link_id, all_actions, rng);
2355
2356        if assembled_is_response {
2357            let mut converted = Vec::new();
2358            for action in out {
2359                match action {
2360                    LinkManagerAction::ResourceReceived { data, metadata, .. } => {
2361                        converted.extend(self.handle_response(link_id, &data, metadata));
2362                    }
2363                    LinkManagerAction::ResourceAcceptQuery { .. } => {
2364                        // Response resources bypass application acceptance
2365                    }
2366                    other => converted.push(other),
2367                }
2368            }
2369            out = converted;
2370        }
2371
2372        out
2373    }
2374
2375    /// Handle resource proof (CONTEXT_RESOURCE_PRF) — feed to sender.
2376    fn handle_resource_prf(
2377        &mut self,
2378        link_id: &LinkId,
2379        plaintext: &[u8],
2380        rng: &mut dyn Rng,
2381    ) -> Vec<LinkManagerAction> {
2382        let link = match self.links.get_mut(link_id) {
2383            Some(l) => l,
2384            None => return Vec::new(),
2385        };
2386
2387        let now = time::now();
2388        let mut result_actions = Vec::new();
2389        let mut completed_sender = None;
2390        let mut failed_split = None;
2391        let proof_hash = plaintext.get(..32);
2392        for sender in &mut link.outgoing_resources {
2393            if proof_hash.is_some_and(|hash| hash != sender.resource_hash.as_slice()) {
2394                continue;
2395            }
2396            let resource_actions = sender.handle_proof(plaintext, now);
2397            if !resource_actions.is_empty() {
2398                if resource_actions
2399                    .iter()
2400                    .any(|action| matches!(action, ResourceAction::Completed))
2401                {
2402                    completed_sender = Some((
2403                        sender.original_hash,
2404                        sender.segment_index,
2405                        sender.total_segments,
2406                        sender.total_parts(),
2407                    ));
2408                }
2409                if sender.flags.split
2410                    && resource_actions
2411                        .iter()
2412                        .any(|action| matches!(action, ResourceAction::Failed(_)))
2413                {
2414                    failed_split = Some(sender.original_hash);
2415                }
2416                result_actions.extend(resource_actions);
2417                break;
2418            }
2419        }
2420
2421        // Convert to LinkManagerActions
2422        let mut actions = Vec::new();
2423        let mut advertise_next = None;
2424        for ra in result_actions {
2425            match ra {
2426                ResourceAction::Completed => {
2427                    if let Some((original_hash, segment_index, total_segments, total_parts)) =
2428                        completed_sender
2429                    {
2430                        if total_segments > 1 && segment_index < total_segments {
2431                            if let Some(split) = link.outgoing_splits.get_mut(&original_hash) {
2432                                split.completed_segments = segment_index;
2433                                split.current_segment_index = segment_index;
2434                                split.current_sent_parts = total_parts;
2435                                split.current_total_parts = total_parts;
2436                                if let Some(next) = link.outgoing_resources.iter_mut().find(|s| {
2437                                    s.flags.split
2438                                        && s.original_hash == original_hash
2439                                        && s.segment_index == segment_index + 1
2440                                }) {
2441                                    split.current_segment_index = next.segment_index;
2442                                    split.current_sent_parts = 0;
2443                                    split.current_total_parts = next.total_parts();
2444                                    advertise_next = Some(next.advertise(now));
2445                                }
2446                            }
2447                        } else {
2448                            link.outgoing_splits.remove(&original_hash);
2449                            actions
2450                                .push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2451                        }
2452                    } else {
2453                        actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2454                    }
2455                }
2456                ResourceAction::Failed(e) => {
2457                    if let Some(original_hash) = failed_split {
2458                        link.outgoing_splits.remove(&original_hash);
2459                    }
2460                    actions.push(LinkManagerAction::ResourceFailed {
2461                        link_id: *link_id,
2462                        error: format!("{}", e),
2463                    });
2464                }
2465                _ => {}
2466            }
2467        }
2468
2469        // Clean up completed/failed senders
2470        link.outgoing_resources
2471            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2472
2473        let _ = link;
2474        if let Some(next_actions) = advertise_next {
2475            actions.extend(self.process_resource_actions(link_id, next_actions, rng));
2476        }
2477
2478        actions
2479    }
2480
2481    /// Handle cancel from initiator (CONTEXT_RESOURCE_ICL).
2482    fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2483        let link = match self.links.get_mut(link_id) {
2484            Some(l) => l,
2485            None => return Vec::new(),
2486        };
2487
2488        let mut actions = Vec::new();
2489        for receiver in &mut link.incoming_resources {
2490            let ra = receiver.handle_cancel();
2491            for a in ra {
2492                if let ResourceAction::Failed(ref e) = a {
2493                    actions.push(LinkManagerAction::ResourceFailed {
2494                        link_id: *link_id,
2495                        error: format!("{}", e),
2496                    });
2497                }
2498            }
2499        }
2500        link.incoming_resources
2501            .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
2502        link.incoming_splits.clear();
2503        actions
2504    }
2505
2506    /// Handle cancel from receiver (CONTEXT_RESOURCE_RCL).
2507    fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2508        let link = match self.links.get_mut(link_id) {
2509            Some(l) => l,
2510            None => return Vec::new(),
2511        };
2512
2513        let mut actions = Vec::new();
2514        for sender in &mut link.outgoing_resources {
2515            let ra = sender.handle_reject();
2516            for a in ra {
2517                if let ResourceAction::Failed(ref e) = a {
2518                    actions.push(LinkManagerAction::ResourceFailed {
2519                        link_id: *link_id,
2520                        error: format!("{}", e),
2521                    });
2522                }
2523            }
2524        }
2525        link.outgoing_resources
2526            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2527        link.outgoing_splits.clear();
2528        actions
2529    }
2530
2531    /// Convert ResourceActions to LinkManagerActions.
2532    fn process_resource_actions(
2533        &mut self,
2534        link_id: &LinkId,
2535        actions: Vec<ResourceAction>,
2536        rng: &mut dyn Rng,
2537    ) -> Vec<LinkManagerAction> {
2538        let mut result = Vec::new();
2539        for action in actions {
2540            match action {
2541                ResourceAction::SendAdvertisement(data) => {
2542                    // Link-encrypt and send as CONTEXT_RESOURCE_ADV
2543                    let encrypted = self
2544                        .links
2545                        .get(link_id)
2546                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2547                    if let Some(encrypted) = encrypted {
2548                        result.extend(self.build_link_packet(
2549                            link_id,
2550                            constants::CONTEXT_RESOURCE_ADV,
2551                            &encrypted,
2552                        ));
2553                    }
2554                }
2555                ResourceAction::SendPart(data) => {
2556                    // Parts are NOT link-encrypted — send raw as CONTEXT_RESOURCE
2557                    result.extend(self.build_link_packet(
2558                        link_id,
2559                        constants::CONTEXT_RESOURCE,
2560                        &data,
2561                    ));
2562                }
2563                ResourceAction::SendRequest(data) => {
2564                    let encrypted = self
2565                        .links
2566                        .get(link_id)
2567                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2568                    if let Some(encrypted) = encrypted {
2569                        result.extend(self.build_link_packet(
2570                            link_id,
2571                            constants::CONTEXT_RESOURCE_REQ,
2572                            &encrypted,
2573                        ));
2574                    }
2575                }
2576                ResourceAction::SendHmu(data) => {
2577                    let encrypted = self
2578                        .links
2579                        .get(link_id)
2580                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2581                    if let Some(encrypted) = encrypted {
2582                        result.extend(self.build_link_packet(
2583                            link_id,
2584                            constants::CONTEXT_RESOURCE_HMU,
2585                            &encrypted,
2586                        ));
2587                    }
2588                }
2589                ResourceAction::SendProof(data) => {
2590                    let encrypted = self
2591                        .links
2592                        .get(link_id)
2593                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2594                    if let Some(encrypted) = encrypted {
2595                        result.extend(self.build_link_packet(
2596                            link_id,
2597                            constants::CONTEXT_RESOURCE_PRF,
2598                            &encrypted,
2599                        ));
2600                    }
2601                }
2602                ResourceAction::SendCancelInitiator(data) => {
2603                    let encrypted = self
2604                        .links
2605                        .get(link_id)
2606                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2607                    if let Some(encrypted) = encrypted {
2608                        result.extend(self.build_link_packet(
2609                            link_id,
2610                            constants::CONTEXT_RESOURCE_ICL,
2611                            &encrypted,
2612                        ));
2613                    }
2614                }
2615                ResourceAction::SendCancelReceiver(data) => {
2616                    let encrypted = self
2617                        .links
2618                        .get(link_id)
2619                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2620                    if let Some(encrypted) = encrypted {
2621                        result.extend(self.build_link_packet(
2622                            link_id,
2623                            constants::CONTEXT_RESOURCE_RCL,
2624                            &encrypted,
2625                        ));
2626                    }
2627                }
2628                ResourceAction::DataReceived { data, metadata } => {
2629                    result.push(LinkManagerAction::ResourceReceived {
2630                        link_id: *link_id,
2631                        data,
2632                        metadata,
2633                    });
2634                }
2635                ResourceAction::Completed => {
2636                    result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2637                }
2638                ResourceAction::Failed(e) => {
2639                    result.push(LinkManagerAction::ResourceFailed {
2640                        link_id: *link_id,
2641                        error: format!("{}", e),
2642                    });
2643                }
2644                ResourceAction::TeardownLink => {
2645                    let teardown_actions = match self.links.get_mut(link_id) {
2646                        Some(link) => link.engine.handle_teardown(),
2647                        None => Vec::new(),
2648                    };
2649                    result.extend(self.process_link_actions(link_id, &teardown_actions));
2650                }
2651                ResourceAction::ProgressUpdate { received, total } => {
2652                    result.push(LinkManagerAction::ResourceProgress {
2653                        link_id: *link_id,
2654                        received,
2655                        total,
2656                    });
2657                }
2658            }
2659        }
2660        result
2661    }
2662
2663    /// Build a link DATA packet with a given context and data.
2664    fn build_link_packet(
2665        &self,
2666        link_id: &LinkId,
2667        context: u8,
2668        data: &[u8],
2669    ) -> Vec<LinkManagerAction> {
2670        let flags = PacketFlags {
2671            header_type: constants::HEADER_1,
2672            context_flag: constants::FLAG_UNSET,
2673            transport_type: constants::TRANSPORT_BROADCAST,
2674            destination_type: constants::DESTINATION_LINK,
2675            packet_type: constants::PACKET_TYPE_DATA,
2676        };
2677        let mut actions = Vec::new();
2678        let max_mtu = self
2679            .links
2680            .get(link_id)
2681            .map(|l| l.engine.mtu() as usize)
2682            .unwrap_or(constants::MTU);
2683        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
2684            flags, 0, link_id, None, context, data, max_mtu,
2685        ) {
2686            actions.push(LinkManagerAction::SendPacket {
2687                raw,
2688                dest_type: constants::DESTINATION_LINK,
2689                attached_interface: None,
2690            });
2691        }
2692        actions
2693    }
2694
2695    /// Start sending a resource on a link.
2696    pub fn send_resource(
2697        &mut self,
2698        link_id: &LinkId,
2699        data: &[u8],
2700        metadata: Option<&[u8]>,
2701        rng: &mut dyn Rng,
2702    ) -> Vec<LinkManagerAction> {
2703        self.send_resource_with_auto_compress(link_id, data, metadata, true, rng)
2704    }
2705
2706    /// Start sending a resource on a link, controlling automatic compression.
2707    pub fn send_resource_with_auto_compress(
2708        &mut self,
2709        link_id: &LinkId,
2710        data: &[u8],
2711        metadata: Option<&[u8]>,
2712        auto_compress: bool,
2713        rng: &mut dyn Rng,
2714    ) -> Vec<LinkManagerAction> {
2715        let link = match self.links.get_mut(link_id) {
2716            Some(l) => l,
2717            None => return Vec::new(),
2718        };
2719
2720        if link.engine.state() != LinkState::Active {
2721            return Vec::new();
2722        }
2723
2724        let now = time::now();
2725
2726        let senders = match Self::build_resource_senders(
2727            link,
2728            data,
2729            metadata,
2730            auto_compress,
2731            false, // is_response
2732            None,  // request_id
2733            rng,
2734            now,
2735        ) {
2736            Ok(s) => s,
2737            Err(e) => {
2738                log::debug!("Failed to create ResourceSender: {}", e);
2739                return Vec::new();
2740            }
2741        };
2742
2743        let adv_actions = Self::start_resource_senders(link, senders, now);
2744
2745        let _ = link;
2746        self.process_resource_actions(link_id, adv_actions, rng)
2747    }
2748
2749    /// Set the resource acceptance strategy for a link.
2750    pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2751        if let Some(link) = self.links.get_mut(link_id) {
2752            link.resource_strategy = strategy;
2753        }
2754    }
2755
2756    /// Flush the channel TX ring for a link, clearing outstanding messages.
2757    /// Called after holepunch completion where signaling messages are fire-and-forget.
2758    pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2759        if let Some(link) = self.links.get_mut(link_id) {
2760            if let Some(ref mut channel) = link.channel {
2761                channel.flush_tx();
2762            }
2763        }
2764    }
2765
2766    /// Send a channel message on a link.
2767    pub fn send_channel_message(
2768        &mut self,
2769        link_id: &LinkId,
2770        msgtype: u16,
2771        payload: &[u8],
2772        rng: &mut dyn Rng,
2773    ) -> Result<Vec<LinkManagerAction>, String> {
2774        let link = match self.links.get_mut(link_id) {
2775            Some(l) => l,
2776            None => return Err("unknown link".to_string()),
2777        };
2778
2779        let channel = match link.channel {
2780            Some(ref mut ch) => ch,
2781            None => return Err("link has no active channel".to_string()),
2782        };
2783
2784        let link_mdu = link.engine.mdu();
2785        let now = time::now();
2786        let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2787            Ok(a) => {
2788                link.channel_send_ok += 1;
2789                a
2790            }
2791            Err(e) => {
2792                log::debug!("Channel send failed: {:?}", e);
2793                match e {
2794                    rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2795                    rns_core::channel::ChannelError::MessageTooBig => {
2796                        link.channel_send_too_big += 1;
2797                    }
2798                    rns_core::channel::ChannelError::InvalidEnvelope => {
2799                        link.channel_send_other_error += 1;
2800                    }
2801                }
2802                return Err(e.to_string());
2803            }
2804        };
2805
2806        let _ = link;
2807        Ok(self.process_channel_actions(link_id, chan_actions, rng))
2808    }
2809
2810    /// Periodic tick: check keepalive, stale, timeouts for all links.
2811    pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2812        let now = time::now();
2813        let mut all_actions = Vec::new();
2814
2815        // Collect link_ids to avoid borrow issues
2816        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2817
2818        for link_id in &link_ids {
2819            let link = match self.links.get_mut(link_id) {
2820                Some(l) => l,
2821                None => continue,
2822            };
2823
2824            // Tick the engine
2825            let tick_actions = link.engine.tick(now);
2826            all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2827
2828            // Check if keepalive is needed
2829            let link = match self.links.get_mut(link_id) {
2830                Some(l) => l,
2831                None => continue,
2832            };
2833            if link.engine.needs_keepalive(now) {
2834                // Send keepalive packet (empty data with CONTEXT_KEEPALIVE)
2835                let flags = PacketFlags {
2836                    header_type: constants::HEADER_1,
2837                    context_flag: constants::FLAG_UNSET,
2838                    transport_type: constants::TRANSPORT_BROADCAST,
2839                    destination_type: constants::DESTINATION_LINK,
2840                    packet_type: constants::PACKET_TYPE_DATA,
2841                };
2842                if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
2843                    flags,
2844                    0,
2845                    link_id,
2846                    None,
2847                    constants::CONTEXT_KEEPALIVE,
2848                    &[],
2849                ) {
2850                    all_actions.push(LinkManagerAction::SendPacket {
2851                        raw,
2852                        dest_type: constants::DESTINATION_LINK,
2853                        attached_interface: None,
2854                    });
2855                    link.engine.record_outbound(now, true);
2856                }
2857            }
2858
2859            if let Some(channel) = link.channel.as_mut() {
2860                let chan_actions = channel.tick(now);
2861                let _ = channel;
2862                let _ = link;
2863                all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2864            }
2865        }
2866
2867        // Tick resource senders and receivers
2868        for link_id in &link_ids {
2869            let link = match self.links.get_mut(link_id) {
2870                Some(l) => l,
2871                None => continue,
2872            };
2873
2874            // Tick outgoing resources (senders)
2875            let mut sender_actions = Vec::new();
2876            for sender in &mut link.outgoing_resources {
2877                sender_actions.extend(sender.tick(now));
2878            }
2879
2880            // Tick incoming resources (receivers)
2881            let mut receiver_actions = Vec::new();
2882            for receiver in &mut link.incoming_resources {
2883                let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2884                    link.engine.decrypt(ciphertext).map_err(|_| ())
2885                };
2886                receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2887            }
2888
2889            // Clean up completed/failed resources
2890            link.outgoing_resources
2891                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2892            link.incoming_resources
2893                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2894            let active_split_hashes: Vec<[u8; 32]> = link
2895                .outgoing_resources
2896                .iter()
2897                .filter(|s| s.flags.split)
2898                .map(|s| s.original_hash)
2899                .collect();
2900            link.outgoing_splits.retain(|original_hash, split| {
2901                split.completed_segments < split.total_segments
2902                    && active_split_hashes.contains(original_hash)
2903            });
2904
2905            let _ = link;
2906            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2907            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2908        }
2909
2910        // Clean up closed links
2911        let closed: Vec<LinkId> = self
2912            .links
2913            .iter()
2914            .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2915            .map(|(id, _)| *id)
2916            .collect();
2917        for id in closed {
2918            self.links.remove(&id);
2919            all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2920        }
2921
2922        all_actions
2923    }
2924
2925    /// Check if a destination hash is a known link_id managed by this manager.
2926    pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2927        self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2928    }
2929
2930    /// Get the state of a link.
2931    pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2932        self.links.get(link_id).map(|l| l.engine.state())
2933    }
2934
2935    /// Get the RTT of a link.
2936    pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2937        self.links.get(link_id).and_then(|l| l.engine.rtt())
2938    }
2939
2940    /// Update the RTT of a link (e.g., after path redirect to a direct connection).
2941    pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2942        if let Some(link) = self.links.get_mut(link_id) {
2943            link.engine.set_rtt(rtt);
2944        }
2945    }
2946
2947    /// Reset the inbound timer for a link (e.g., after path redirect).
2948    pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2949        if let Some(link) = self.links.get_mut(link_id) {
2950            link.engine.record_inbound(time::now());
2951        }
2952    }
2953
2954    /// Update the MTU of a link (e.g., after path redirect to a different interface).
2955    pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2956        if let Some(link) = self.links.get_mut(link_id) {
2957            link.engine.set_mtu(mtu);
2958        }
2959    }
2960
2961    /// Get the number of active links.
2962    pub fn link_count(&self) -> usize {
2963        self.links.len()
2964    }
2965
2966    /// Get the number of active resource transfers across all links.
2967    pub fn resource_transfer_count(&self) -> usize {
2968        self.links
2969            .values()
2970            .map(|managed| {
2971                managed
2972                    .incoming_resources
2973                    .iter()
2974                    .filter(|resource| !resource.flags.split)
2975                    .count()
2976                    + managed.incoming_splits.len()
2977                    + managed
2978                        .outgoing_resources
2979                        .iter()
2980                        .filter(|resource| !resource.flags.split)
2981                        .count()
2982                    + managed.outgoing_splits.len()
2983            })
2984            .sum()
2985    }
2986
2987    /// Cancel all active resource transfers and return the generated actions.
2988    pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2989        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2990        let mut all_actions = Vec::new();
2991
2992        for link_id in &link_ids {
2993            let link = match self.links.get_mut(link_id) {
2994                Some(l) => l,
2995                None => continue,
2996            };
2997
2998            let mut sender_actions = Vec::new();
2999            for sender in &mut link.outgoing_resources {
3000                sender_actions.extend(sender.cancel());
3001            }
3002
3003            let mut receiver_actions = Vec::new();
3004            for receiver in &mut link.incoming_resources {
3005                receiver_actions.extend(receiver.reject());
3006            }
3007
3008            link.outgoing_resources
3009                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
3010            link.incoming_resources
3011                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
3012            link.outgoing_splits.clear();
3013            link.incoming_splits.clear();
3014
3015            let _ = link;
3016            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
3017            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
3018        }
3019
3020        all_actions
3021    }
3022
3023    /// Get information about all active links.
3024    pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
3025        self.links
3026            .iter()
3027            .map(|(link_id, managed)| {
3028                let state = match managed.engine.state() {
3029                    LinkState::Pending => "pending",
3030                    LinkState::Handshake => "handshake",
3031                    LinkState::Active => "active",
3032                    LinkState::Stale => "stale",
3033                    LinkState::Closed => "closed",
3034                };
3035                crate::event::LinkInfoEntry {
3036                    link_id: *link_id,
3037                    state: state.to_string(),
3038                    is_initiator: managed.engine.is_initiator(),
3039                    dest_hash: managed.dest_hash,
3040                    remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
3041                    rtt: managed.engine.rtt(),
3042                    channel_window: managed.channel.as_ref().map(|c| c.window()),
3043                    channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
3044                    pending_channel_packets: managed.pending_channel_packets.len(),
3045                    channel_send_ok: managed.channel_send_ok,
3046                    channel_send_not_ready: managed.channel_send_not_ready,
3047                    channel_send_too_big: managed.channel_send_too_big,
3048                    channel_send_other_error: managed.channel_send_other_error,
3049                    channel_messages_received: managed.channel_messages_received,
3050                    channel_proofs_sent: managed.channel_proofs_sent,
3051                    channel_proofs_received: managed.channel_proofs_received,
3052                }
3053            })
3054            .collect()
3055    }
3056
3057    /// Get information about all active resource transfers.
3058    pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
3059        let mut entries = Vec::new();
3060        for (link_id, managed) in &self.links {
3061            let resource_sdu = Self::resource_sdu_for_link(managed);
3062            for split in managed.incoming_splits.values() {
3063                let (received, total) = Self::incoming_split_progress(split, resource_sdu);
3064                entries.push(crate::event::ResourceInfoEntry {
3065                    link_id: *link_id,
3066                    direction: "incoming".to_string(),
3067                    total_parts: total,
3068                    transferred_parts: received,
3069                    complete: received >= total && total > 0,
3070                });
3071            }
3072            for recv in &managed.incoming_resources {
3073                if recv.flags.split {
3074                    continue;
3075                }
3076                let (received, total) = recv.progress();
3077                entries.push(crate::event::ResourceInfoEntry {
3078                    link_id: *link_id,
3079                    direction: "incoming".to_string(),
3080                    total_parts: total,
3081                    transferred_parts: received,
3082                    complete: received >= total && total > 0,
3083                });
3084            }
3085            for split in managed.outgoing_splits.values() {
3086                let (sent, total) = Self::outgoing_split_progress(split, resource_sdu);
3087                entries.push(crate::event::ResourceInfoEntry {
3088                    link_id: *link_id,
3089                    direction: "outgoing".to_string(),
3090                    total_parts: total,
3091                    transferred_parts: sent,
3092                    complete: sent >= total && total > 0,
3093                });
3094            }
3095            for send in &managed.outgoing_resources {
3096                if send.flags.split {
3097                    continue;
3098                }
3099                let total = send.total_parts();
3100                let sent = send.sent_parts;
3101                entries.push(crate::event::ResourceInfoEntry {
3102                    link_id: *link_id,
3103                    direction: "outgoing".to_string(),
3104                    total_parts: total,
3105                    transferred_parts: sent,
3106                    complete: sent >= total && total > 0,
3107                });
3108            }
3109        }
3110        entries
3111    }
3112
3113    /// Convert LinkActions to LinkManagerActions.
3114    fn process_link_actions(
3115        &self,
3116        link_id: &LinkId,
3117        actions: &[LinkAction],
3118    ) -> Vec<LinkManagerAction> {
3119        let mut result = Vec::new();
3120        for action in actions {
3121            match action {
3122                LinkAction::StateChanged {
3123                    new_state, reason, ..
3124                } => match new_state {
3125                    LinkState::Closed => {
3126                        result.push(LinkManagerAction::LinkClosed {
3127                            link_id: *link_id,
3128                            reason: *reason,
3129                        });
3130                    }
3131                    _ => {}
3132                },
3133                LinkAction::LinkEstablished {
3134                    rtt, is_initiator, ..
3135                } => {
3136                    let dest_hash = self
3137                        .links
3138                        .get(link_id)
3139                        .map(|l| l.dest_hash)
3140                        .unwrap_or([0u8; 16]);
3141                    result.push(LinkManagerAction::LinkEstablished {
3142                        link_id: *link_id,
3143                        dest_hash,
3144                        rtt: *rtt,
3145                        is_initiator: *is_initiator,
3146                    });
3147                }
3148                LinkAction::RemoteIdentified {
3149                    identity_hash,
3150                    public_key,
3151                    ..
3152                } => {
3153                    result.push(LinkManagerAction::RemoteIdentified {
3154                        link_id: *link_id,
3155                        identity_hash: *identity_hash,
3156                        public_key: *public_key,
3157                    });
3158                }
3159                LinkAction::DataReceived { .. } => {
3160                    // Data delivery is handled at a higher level
3161                }
3162            }
3163        }
3164        result
3165    }
3166
3167    /// Convert ChannelActions to LinkManagerActions.
3168    fn process_channel_actions(
3169        &mut self,
3170        link_id: &LinkId,
3171        actions: Vec<rns_core::channel::ChannelAction>,
3172        rng: &mut dyn Rng,
3173    ) -> Vec<LinkManagerAction> {
3174        let mut result = Vec::new();
3175        for action in actions {
3176            match action {
3177                rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
3178                    // Encrypt and send as CHANNEL context. If the packet cannot be
3179                    // emitted, remove the reserved channel sequence so receivers do
3180                    // not stall on a gap that never reached the wire.
3181                    let encrypted = match self.links.get(link_id) {
3182                        Some(link) => match link.engine.encrypt(&raw, rng) {
3183                            Ok(encrypted) => encrypted,
3184                            Err(_) => {
3185                                if let Some(link_mut) = self.links.get_mut(link_id) {
3186                                    if let Some(channel) = link_mut.channel.as_mut() {
3187                                        channel.cancel_send(sequence);
3188                                    }
3189                                }
3190                                continue;
3191                            }
3192                        },
3193                        None => continue,
3194                    };
3195                    let flags = PacketFlags {
3196                        header_type: constants::HEADER_1,
3197                        context_flag: constants::FLAG_UNSET,
3198                        transport_type: constants::TRANSPORT_BROADCAST,
3199                        destination_type: constants::DESTINATION_LINK,
3200                        packet_type: constants::PACKET_TYPE_DATA,
3201                    };
3202                    match RawPacket::pack_raw_with_hash(
3203                        flags,
3204                        0,
3205                        link_id,
3206                        None,
3207                        constants::CONTEXT_CHANNEL,
3208                        &encrypted,
3209                    ) {
3210                        Ok((raw_bytes, packet_hash)) => {
3211                            if let Some(link_mut) = self.links.get_mut(link_id) {
3212                                link_mut
3213                                    .pending_channel_packets
3214                                    .insert(packet_hash, sequence);
3215                            }
3216                            result.push(LinkManagerAction::SendPacket {
3217                                raw: raw_bytes,
3218                                dest_type: constants::DESTINATION_LINK,
3219                                attached_interface: None,
3220                            });
3221                        }
3222                        Err(_) => {
3223                            if let Some(link_mut) = self.links.get_mut(link_id) {
3224                                if let Some(channel) = link_mut.channel.as_mut() {
3225                                    channel.cancel_send(sequence);
3226                                }
3227                            }
3228                        }
3229                    }
3230                }
3231                rns_core::channel::ChannelAction::MessageReceived {
3232                    msgtype, payload, ..
3233                } => {
3234                    result.push(LinkManagerAction::ChannelMessageReceived {
3235                        link_id: *link_id,
3236                        msgtype,
3237                        payload,
3238                    });
3239                }
3240                rns_core::channel::ChannelAction::TeardownLink => {
3241                    result.push(LinkManagerAction::LinkClosed {
3242                        link_id: *link_id,
3243                        reason: Some(TeardownReason::Timeout),
3244                    });
3245                }
3246            }
3247        }
3248        result
3249    }
3250}
3251
3252impl Default for LinkManager {
3253    fn default() -> Self {
3254        Self::new()
3255    }
3256}
3257
3258/// Compute a path hash from a path string.
3259/// Uses truncated SHA-256 (first 16 bytes).
3260fn compute_path_hash(path: &str) -> [u8; 16] {
3261    let full = rns_core::hash::full_hash(path.as_bytes());
3262    let mut result = [0u8; 16];
3263    result.copy_from_slice(&full[..16]);
3264    result
3265}
3266
3267#[cfg(test)]
3268mod tests {
3269    use super::*;
3270    use rns_crypto::identity::Identity;
3271    use rns_crypto::{FixedRng, OsRng};
3272
3273    fn make_rng(seed: u8) -> FixedRng {
3274        FixedRng::new(&[seed; 128])
3275    }
3276
3277    fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
3278        let sig_prv = Ed25519PrivateKey::generate(rng);
3279        let sig_pub_bytes = sig_prv.public_key().public_bytes();
3280        (sig_prv, sig_pub_bytes)
3281    }
3282
3283    #[test]
3284    fn test_register_link_destination() {
3285        let mut mgr = LinkManager::new();
3286        let mut rng = make_rng(0x01);
3287        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3288        let dest_hash = [0xDD; 16];
3289
3290        mgr.register_link_destination(
3291            dest_hash,
3292            sig_prv,
3293            sig_pub_bytes,
3294            ResourceStrategy::AcceptNone,
3295        );
3296        assert!(mgr.is_link_destination(&dest_hash));
3297
3298        mgr.deregister_link_destination(&dest_hash);
3299        assert!(!mgr.is_link_destination(&dest_hash));
3300    }
3301
3302    #[test]
3303    fn test_create_link() {
3304        let mut mgr = LinkManager::new();
3305        let mut rng = OsRng;
3306        let dest_hash = [0xDD; 16];
3307
3308        let sig_pub_bytes = [0xAA; 32]; // dummy sig pub for test
3309        let (link_id, actions) = mgr.create_link(
3310            &dest_hash,
3311            &sig_pub_bytes,
3312            1,
3313            constants::MTU as u32,
3314            &mut rng,
3315        );
3316        assert_ne!(link_id, [0u8; 16]);
3317        // Should have RegisterLinkDest + SendPacket
3318        assert_eq!(actions.len(), 2);
3319        assert!(matches!(
3320            actions[0],
3321            LinkManagerAction::RegisterLinkDest { .. }
3322        ));
3323        assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
3324
3325        // Link should be in Pending state
3326        assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
3327    }
3328
3329    #[test]
3330    fn test_full_handshake_via_manager() {
3331        let mut rng = OsRng;
3332        let dest_hash = [0xDD; 16];
3333
3334        // Setup responder
3335        let mut responder_mgr = LinkManager::new();
3336        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3337        responder_mgr.register_link_destination(
3338            dest_hash,
3339            sig_prv,
3340            sig_pub_bytes,
3341            ResourceStrategy::AcceptNone,
3342        );
3343
3344        // Setup initiator
3345        let mut initiator_mgr = LinkManager::new();
3346
3347        // Step 1: Initiator creates link (needs dest signing pub key for LRPROOF verification)
3348        let (link_id, init_actions) = initiator_mgr.create_link(
3349            &dest_hash,
3350            &sig_pub_bytes,
3351            1,
3352            constants::MTU as u32,
3353            &mut rng,
3354        );
3355        assert_eq!(init_actions.len(), 2);
3356
3357        // Extract the LINKREQUEST packet raw bytes
3358        let linkrequest_raw = match &init_actions[1] {
3359            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3360            _ => panic!("Expected SendPacket"),
3361        };
3362
3363        // Parse to get packet_hash and dest_hash
3364        let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
3365
3366        // Step 2: Responder handles LINKREQUEST
3367        let resp_actions = responder_mgr.handle_local_delivery(
3368            lr_packet.destination_hash,
3369            &linkrequest_raw,
3370            lr_packet.packet_hash,
3371            rns_core::transport::types::InterfaceId(0),
3372            &mut rng,
3373        );
3374        // Should have RegisterLinkDest + SendPacket(LRPROOF)
3375        assert!(resp_actions.len() >= 2);
3376        assert!(matches!(
3377            resp_actions[0],
3378            LinkManagerAction::RegisterLinkDest { .. }
3379        ));
3380
3381        // Extract LRPROOF packet
3382        let lrproof_raw = match &resp_actions[1] {
3383            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3384            _ => panic!("Expected SendPacket for LRPROOF"),
3385        };
3386
3387        // Step 3: Initiator handles LRPROOF
3388        let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
3389        let init_actions2 = initiator_mgr.handle_local_delivery(
3390            lrproof_packet.destination_hash,
3391            &lrproof_raw,
3392            lrproof_packet.packet_hash,
3393            rns_core::transport::types::InterfaceId(0),
3394            &mut rng,
3395        );
3396
3397        // Should have LinkEstablished + SendPacket(LRRTT)
3398        let has_established = init_actions2
3399            .iter()
3400            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3401        assert!(has_established, "Initiator should emit LinkEstablished");
3402
3403        // Extract LRRTT
3404        let lrrtt_raw = init_actions2
3405            .iter()
3406            .find_map(|a| match a {
3407                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3408                _ => None,
3409            })
3410            .expect("Should have LRRTT SendPacket");
3411
3412        // Step 4: Responder handles LRRTT
3413        let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
3414        let resp_link_id = lrrtt_packet.destination_hash;
3415        let resp_actions2 = responder_mgr.handle_local_delivery(
3416            resp_link_id,
3417            &lrrtt_raw,
3418            lrrtt_packet.packet_hash,
3419            rns_core::transport::types::InterfaceId(0),
3420            &mut rng,
3421        );
3422
3423        let has_established = resp_actions2
3424            .iter()
3425            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3426        assert!(has_established, "Responder should emit LinkEstablished");
3427
3428        // Both sides should be Active
3429        assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
3430        assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
3431
3432        // Both should have RTT
3433        assert!(initiator_mgr.link_rtt(&link_id).is_some());
3434        assert!(responder_mgr.link_rtt(&link_id).is_some());
3435    }
3436
3437    #[test]
3438    fn test_encrypted_data_exchange() {
3439        let mut rng = OsRng;
3440        let dest_hash = [0xDD; 16];
3441        let mut resp_mgr = LinkManager::new();
3442        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3443        resp_mgr.register_link_destination(
3444            dest_hash,
3445            sig_prv,
3446            sig_pub_bytes,
3447            ResourceStrategy::AcceptNone,
3448        );
3449        let mut init_mgr = LinkManager::new();
3450
3451        // Handshake
3452        let (link_id, init_actions) = init_mgr.create_link(
3453            &dest_hash,
3454            &sig_pub_bytes,
3455            1,
3456            constants::MTU as u32,
3457            &mut rng,
3458        );
3459        let lr_raw = extract_send_packet(&init_actions);
3460        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3461        let resp_actions = resp_mgr.handle_local_delivery(
3462            lr_pkt.destination_hash,
3463            &lr_raw,
3464            lr_pkt.packet_hash,
3465            rns_core::transport::types::InterfaceId(0),
3466            &mut rng,
3467        );
3468        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3469        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3470        let init_actions2 = init_mgr.handle_local_delivery(
3471            lrproof_pkt.destination_hash,
3472            &lrproof_raw,
3473            lrproof_pkt.packet_hash,
3474            rns_core::transport::types::InterfaceId(0),
3475            &mut rng,
3476        );
3477        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3478        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3479        resp_mgr.handle_local_delivery(
3480            lrrtt_pkt.destination_hash,
3481            &lrrtt_raw,
3482            lrrtt_pkt.packet_hash,
3483            rns_core::transport::types::InterfaceId(0),
3484            &mut rng,
3485        );
3486
3487        // Send data from initiator to responder
3488        let actions =
3489            init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
3490        assert_eq!(actions.len(), 1);
3491        assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
3492    }
3493
3494    #[test]
3495    fn test_request_response() {
3496        let mut rng = OsRng;
3497        let dest_hash = [0xDD; 16];
3498        let mut resp_mgr = LinkManager::new();
3499        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3500        resp_mgr.register_link_destination(
3501            dest_hash,
3502            sig_prv,
3503            sig_pub_bytes,
3504            ResourceStrategy::AcceptNone,
3505        );
3506
3507        // Register a request handler
3508        resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
3509            Some(b"OK".to_vec())
3510        });
3511
3512        let mut init_mgr = LinkManager::new();
3513
3514        // Complete handshake
3515        let (link_id, init_actions) = init_mgr.create_link(
3516            &dest_hash,
3517            &sig_pub_bytes,
3518            1,
3519            constants::MTU as u32,
3520            &mut rng,
3521        );
3522        let lr_raw = extract_send_packet(&init_actions);
3523        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3524        let resp_actions = resp_mgr.handle_local_delivery(
3525            lr_pkt.destination_hash,
3526            &lr_raw,
3527            lr_pkt.packet_hash,
3528            rns_core::transport::types::InterfaceId(0),
3529            &mut rng,
3530        );
3531        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3532        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3533        let init_actions2 = init_mgr.handle_local_delivery(
3534            lrproof_pkt.destination_hash,
3535            &lrproof_raw,
3536            lrproof_pkt.packet_hash,
3537            rns_core::transport::types::InterfaceId(0),
3538            &mut rng,
3539        );
3540        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3541        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3542        resp_mgr.handle_local_delivery(
3543            lrrtt_pkt.destination_hash,
3544            &lrrtt_raw,
3545            lrrtt_pkt.packet_hash,
3546            rns_core::transport::types::InterfaceId(0),
3547            &mut rng,
3548        );
3549
3550        // Send request from initiator
3551        let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
3552        assert_eq!(req_actions.len(), 1);
3553
3554        // Deliver request to responder
3555        let req_raw = extract_send_packet_from(&req_actions);
3556        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3557        let resp_actions = resp_mgr.handle_local_delivery(
3558            req_pkt.destination_hash,
3559            &req_raw,
3560            req_pkt.packet_hash,
3561            rns_core::transport::types::InterfaceId(0),
3562            &mut rng,
3563        );
3564
3565        // Should have a response SendPacket
3566        let has_response = resp_actions
3567            .iter()
3568            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3569        assert!(has_response, "Handler should produce a response packet");
3570    }
3571
3572    #[test]
3573    fn test_send_request_wraps_invalid_msgpack_data_as_bin() {
3574        use std::sync::{Arc, Mutex};
3575
3576        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3577        let mut rng = OsRng;
3578
3579        let invalid = vec![0xC1];
3580        let expected = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid.clone()));
3581        let captured = Arc::new(Mutex::new(None::<Vec<u8>>));
3582        let captured_for_handler = Arc::clone(&captured);
3583
3584        resp_mgr.register_request_handler("/bin", None, move |_link_id, _path, data, _remote| {
3585            *captured_for_handler.lock().unwrap() = Some(data.to_vec());
3586            Some(rns_core::msgpack::pack(&rns_core::msgpack::Value::Bool(
3587                true,
3588            )))
3589        });
3590
3591        let req_actions = init_mgr.send_request(&link_id, "/bin", &invalid, &mut rng);
3592        let req_raw = extract_send_packet_from(&req_actions);
3593        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3594        let resp_actions = resp_mgr.handle_local_delivery(
3595            req_pkt.destination_hash,
3596            &req_raw,
3597            req_pkt.packet_hash,
3598            rns_core::transport::types::InterfaceId(0),
3599            &mut rng,
3600        );
3601
3602        assert!(
3603            resp_actions
3604                .iter()
3605                .any(|a| matches!(a, LinkManagerAction::SendPacket { .. })),
3606            "handler should still produce a response"
3607        );
3608        assert_eq!(*captured.lock().unwrap(), Some(expected));
3609    }
3610
3611    #[test]
3612    fn test_invalid_response_bytes_are_returned_as_msgpack_bin() {
3613        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3614        let mut rng = OsRng;
3615        let invalid_response = vec![0xC1];
3616        let expected =
3617            rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid_response.clone()));
3618
3619        resp_mgr.register_request_handler("/invalid-response", None, {
3620            let invalid_response = invalid_response.clone();
3621            move |_link_id, _path, _data, _remote| Some(invalid_response.clone())
3622        });
3623
3624        let req_actions = init_mgr.send_request(&link_id, "/invalid-response", b"\xc0", &mut rng);
3625        let req_raw = extract_send_packet_from(&req_actions);
3626        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3627        let resp_actions = resp_mgr.handle_local_delivery(
3628            req_pkt.destination_hash,
3629            &req_raw,
3630            req_pkt.packet_hash,
3631            rns_core::transport::types::InterfaceId(0),
3632            &mut rng,
3633        );
3634
3635        let resp_raw = extract_any_send_packet(&resp_actions);
3636        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3637        let init_actions = init_mgr.handle_local_delivery(
3638            resp_pkt.destination_hash,
3639            &resp_raw,
3640            resp_pkt.packet_hash,
3641            rns_core::transport::types::InterfaceId(0),
3642            &mut rng,
3643        );
3644
3645        let response_data = init_actions
3646            .iter()
3647            .find_map(|action| match action {
3648                LinkManagerAction::ResponseReceived { data, .. } => Some(data.clone()),
3649                _ => None,
3650            })
3651            .expect("initiator should receive a response");
3652        assert_eq!(response_data, expected);
3653    }
3654
3655    #[test]
3656    fn test_request_acl_deny_unidentified() {
3657        let mut rng = OsRng;
3658        let dest_hash = [0xDD; 16];
3659        let mut resp_mgr = LinkManager::new();
3660        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3661        resp_mgr.register_link_destination(
3662            dest_hash,
3663            sig_prv,
3664            sig_pub_bytes,
3665            ResourceStrategy::AcceptNone,
3666        );
3667
3668        // Register handler with ACL (only allow specific identity)
3669        resp_mgr.register_request_handler(
3670            "/restricted",
3671            Some(vec![[0xAA; 16]]),
3672            |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
3673        );
3674
3675        let mut init_mgr = LinkManager::new();
3676
3677        // Complete handshake (without identification)
3678        let (link_id, init_actions) = init_mgr.create_link(
3679            &dest_hash,
3680            &sig_pub_bytes,
3681            1,
3682            constants::MTU as u32,
3683            &mut rng,
3684        );
3685        let lr_raw = extract_send_packet(&init_actions);
3686        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3687        let resp_actions = resp_mgr.handle_local_delivery(
3688            lr_pkt.destination_hash,
3689            &lr_raw,
3690            lr_pkt.packet_hash,
3691            rns_core::transport::types::InterfaceId(0),
3692            &mut rng,
3693        );
3694        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3695        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3696        let init_actions2 = init_mgr.handle_local_delivery(
3697            lrproof_pkt.destination_hash,
3698            &lrproof_raw,
3699            lrproof_pkt.packet_hash,
3700            rns_core::transport::types::InterfaceId(0),
3701            &mut rng,
3702        );
3703        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3704        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3705        resp_mgr.handle_local_delivery(
3706            lrrtt_pkt.destination_hash,
3707            &lrrtt_raw,
3708            lrrtt_pkt.packet_hash,
3709            rns_core::transport::types::InterfaceId(0),
3710            &mut rng,
3711        );
3712
3713        // Send request without identifying first
3714        let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
3715        let req_raw = extract_send_packet_from(&req_actions);
3716        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3717        let resp_actions = resp_mgr.handle_local_delivery(
3718            req_pkt.destination_hash,
3719            &req_raw,
3720            req_pkt.packet_hash,
3721            rns_core::transport::types::InterfaceId(0),
3722            &mut rng,
3723        );
3724
3725        // Should be denied — no response packet
3726        let has_response = resp_actions
3727            .iter()
3728            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3729        assert!(!has_response, "Unidentified peer should be denied");
3730    }
3731
3732    #[test]
3733    fn test_teardown_link() {
3734        let mut rng = OsRng;
3735        let dest_hash = [0xDD; 16];
3736        let mut mgr = LinkManager::new();
3737
3738        let dummy_sig = [0xAA; 32];
3739        let (link_id, _) =
3740            mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3741        assert_eq!(mgr.link_count(), 1);
3742
3743        let actions = mgr.teardown_link(&link_id);
3744        let has_close = actions
3745            .iter()
3746            .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3747        assert!(has_close);
3748
3749        // After tick, closed links should be cleaned up
3750        let tick_actions = mgr.tick(&mut rng);
3751        let has_deregister = tick_actions
3752            .iter()
3753            .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3754        assert!(has_deregister);
3755        assert_eq!(mgr.link_count(), 0);
3756    }
3757
3758    #[test]
3759    fn test_identify_on_link() {
3760        let mut rng = OsRng;
3761        let dest_hash = [0xDD; 16];
3762        let mut resp_mgr = LinkManager::new();
3763        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3764        resp_mgr.register_link_destination(
3765            dest_hash,
3766            sig_prv,
3767            sig_pub_bytes,
3768            ResourceStrategy::AcceptNone,
3769        );
3770        let mut init_mgr = LinkManager::new();
3771
3772        // Complete handshake
3773        let (link_id, init_actions) = init_mgr.create_link(
3774            &dest_hash,
3775            &sig_pub_bytes,
3776            1,
3777            constants::MTU as u32,
3778            &mut rng,
3779        );
3780        let lr_raw = extract_send_packet(&init_actions);
3781        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3782        let resp_actions = resp_mgr.handle_local_delivery(
3783            lr_pkt.destination_hash,
3784            &lr_raw,
3785            lr_pkt.packet_hash,
3786            rns_core::transport::types::InterfaceId(0),
3787            &mut rng,
3788        );
3789        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3790        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3791        let init_actions2 = init_mgr.handle_local_delivery(
3792            lrproof_pkt.destination_hash,
3793            &lrproof_raw,
3794            lrproof_pkt.packet_hash,
3795            rns_core::transport::types::InterfaceId(0),
3796            &mut rng,
3797        );
3798        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3799        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3800        resp_mgr.handle_local_delivery(
3801            lrrtt_pkt.destination_hash,
3802            &lrrtt_raw,
3803            lrrtt_pkt.packet_hash,
3804            rns_core::transport::types::InterfaceId(0),
3805            &mut rng,
3806        );
3807
3808        // Identify initiator to responder
3809        let identity = Identity::new(&mut rng);
3810        let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3811        assert_eq!(id_actions.len(), 1);
3812
3813        // Deliver identify to responder
3814        let id_raw = extract_send_packet_from(&id_actions);
3815        let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3816        let resp_actions = resp_mgr.handle_local_delivery(
3817            id_pkt.destination_hash,
3818            &id_raw,
3819            id_pkt.packet_hash,
3820            rns_core::transport::types::InterfaceId(0),
3821            &mut rng,
3822        );
3823
3824        let has_identified = resp_actions
3825            .iter()
3826            .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3827        assert!(has_identified, "Responder should emit RemoteIdentified");
3828    }
3829
3830    #[test]
3831    fn test_path_hash_computation() {
3832        let h1 = compute_path_hash("/status");
3833        let h2 = compute_path_hash("/path");
3834        assert_ne!(h1, h2);
3835
3836        // Deterministic
3837        assert_eq!(h1, compute_path_hash("/status"));
3838    }
3839
3840    #[test]
3841    fn test_link_count() {
3842        let mut mgr = LinkManager::new();
3843        let mut rng = OsRng;
3844
3845        assert_eq!(mgr.link_count(), 0);
3846
3847        let dummy_sig = [0xAA; 32];
3848        mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3849        assert_eq!(mgr.link_count(), 1);
3850
3851        mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3852        assert_eq!(mgr.link_count(), 2);
3853    }
3854
3855    // --- Test helpers ---
3856
3857    fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3858        extract_send_packet_at(actions, actions.len() - 1)
3859    }
3860
3861    fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3862        match &actions[idx] {
3863            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3864            other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3865        }
3866    }
3867
3868    fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3869        actions
3870            .iter()
3871            .find_map(|a| match a {
3872                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3873                _ => None,
3874            })
3875            .expect("Expected at least one SendPacket action")
3876    }
3877
3878    fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3879        extract_any_send_packet(actions)
3880    }
3881
3882    /// Set up two linked managers with an active link.
3883    /// Returns (initiator_mgr, responder_mgr, link_id).
3884    fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3885        let mut rng = OsRng;
3886        let dest_hash = [0xDD; 16];
3887        let mut resp_mgr = LinkManager::new();
3888        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3889        resp_mgr.register_link_destination(
3890            dest_hash,
3891            sig_prv,
3892            sig_pub_bytes,
3893            ResourceStrategy::AcceptNone,
3894        );
3895        let mut init_mgr = LinkManager::new();
3896
3897        let (link_id, init_actions) = init_mgr.create_link(
3898            &dest_hash,
3899            &sig_pub_bytes,
3900            1,
3901            constants::MTU as u32,
3902            &mut rng,
3903        );
3904        let lr_raw = extract_send_packet(&init_actions);
3905        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3906        let resp_actions = resp_mgr.handle_local_delivery(
3907            lr_pkt.destination_hash,
3908            &lr_raw,
3909            lr_pkt.packet_hash,
3910            rns_core::transport::types::InterfaceId(0),
3911            &mut rng,
3912        );
3913        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3914        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3915        let init_actions2 = init_mgr.handle_local_delivery(
3916            lrproof_pkt.destination_hash,
3917            &lrproof_raw,
3918            lrproof_pkt.packet_hash,
3919            rns_core::transport::types::InterfaceId(0),
3920            &mut rng,
3921        );
3922        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3923        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3924        resp_mgr.handle_local_delivery(
3925            lrrtt_pkt.destination_hash,
3926            &lrrtt_raw,
3927            lrrtt_pkt.packet_hash,
3928            rns_core::transport::types::InterfaceId(0),
3929            &mut rng,
3930        );
3931
3932        assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3933        assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3934
3935        (init_mgr, resp_mgr, link_id)
3936    }
3937
3938    // ====================================================================
3939    // Phase 8a: Resource wiring tests
3940    // ====================================================================
3941
3942    #[test]
3943    fn test_resource_strategy_default() {
3944        let mut mgr = LinkManager::new();
3945        let mut rng = OsRng;
3946        let dummy_sig = [0xAA; 32];
3947        let (link_id, _) =
3948            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3949
3950        // Default strategy is AcceptNone
3951        let link = mgr.links.get(&link_id).unwrap();
3952        assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3953    }
3954
3955    #[test]
3956    fn test_set_resource_strategy() {
3957        let mut mgr = LinkManager::new();
3958        let mut rng = OsRng;
3959        let dummy_sig = [0xAA; 32];
3960        let (link_id, _) =
3961            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3962
3963        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3964        assert_eq!(
3965            mgr.links.get(&link_id).unwrap().resource_strategy,
3966            ResourceStrategy::AcceptAll
3967        );
3968
3969        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3970        assert_eq!(
3971            mgr.links.get(&link_id).unwrap().resource_strategy,
3972            ResourceStrategy::AcceptApp
3973        );
3974    }
3975
3976    #[test]
3977    fn test_send_resource_on_active_link() {
3978        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3979        let mut rng = OsRng;
3980
3981        // Send resource data
3982        let data = vec![0xAB; 100]; // small enough for a single part
3983        let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3984
3985        // Should produce at least a SendPacket (advertisement)
3986        let has_send = actions
3987            .iter()
3988            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3989        assert!(
3990            has_send,
3991            "send_resource should emit advertisement SendPacket"
3992        );
3993    }
3994
3995    fn first_resource_advertisement(
3996        mgr: &LinkManager,
3997        link_id: &LinkId,
3998        actions: &[LinkManagerAction],
3999    ) -> rns_core::resource::ResourceAdvertisement {
4000        let adv_raw = actions
4001            .iter()
4002            .find_map(|action| match action {
4003                LinkManagerAction::SendPacket { raw, .. } => {
4004                    let pkt = RawPacket::unpack(raw).ok()?;
4005                    (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw)
4006                }
4007                _ => None,
4008            })
4009            .expect("sender should emit a resource advertisement");
4010        let adv_pkt = RawPacket::unpack(adv_raw).unwrap();
4011        let plaintext = mgr
4012            .links
4013            .get(link_id)
4014            .unwrap()
4015            .engine
4016            .decrypt(&adv_pkt.data)
4017            .unwrap();
4018        rns_core::resource::ResourceAdvertisement::unpack(&plaintext).unwrap()
4019    }
4020
4021    fn deterministic_bytes(len: usize) -> Vec<u8> {
4022        let mut state = 0x1234_5678u32;
4023        (0..len)
4024            .map(|_| {
4025                state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4026                (state >> 16) as u8
4027            })
4028            .collect()
4029    }
4030
4031    fn drive_link_manager_packets(
4032        init_mgr: &mut LinkManager,
4033        resp_mgr: &mut LinkManager,
4034        initial_actions: Vec<LinkManagerAction>,
4035        initial_source: char,
4036        rng: &mut dyn Rng,
4037        max_rounds: usize,
4038    ) -> (
4039        Option<Vec<u8>>,
4040        bool,
4041        Vec<(char, usize, usize)>,
4042        Vec<(char, String)>,
4043        usize,
4044    ) {
4045        let mut pending: Vec<(char, LinkManagerAction)> = initial_actions
4046            .into_iter()
4047            .map(|a| (initial_source, a))
4048            .collect();
4049        let mut rounds = 0;
4050        let mut received_data = None;
4051        let mut sender_completed = false;
4052        let mut progress = Vec::new();
4053        let mut failures = Vec::new();
4054
4055        while !pending.is_empty() && rounds < max_rounds {
4056            rounds += 1;
4057            let mut next = Vec::new();
4058            for (source, action) in pending.drain(..) {
4059                let LinkManagerAction::SendPacket { raw, .. } = action else {
4060                    continue;
4061                };
4062                let pkt = RawPacket::unpack(&raw).unwrap();
4063                let target_actions = if source == 'i' {
4064                    resp_mgr.handle_local_delivery(
4065                        pkt.destination_hash,
4066                        &raw,
4067                        pkt.packet_hash,
4068                        rns_core::transport::types::InterfaceId(0),
4069                        rng,
4070                    )
4071                } else {
4072                    init_mgr.handle_local_delivery(
4073                        pkt.destination_hash,
4074                        &raw,
4075                        pkt.packet_hash,
4076                        rns_core::transport::types::InterfaceId(0),
4077                        rng,
4078                    )
4079                };
4080                let target_source = if source == 'i' { 'r' } else { 'i' };
4081                for action in &target_actions {
4082                    match action {
4083                        LinkManagerAction::ResourceReceived { data, .. } => {
4084                            received_data = Some(data.clone());
4085                        }
4086                        LinkManagerAction::ResourceCompleted { .. } => {
4087                            sender_completed = true;
4088                        }
4089                        LinkManagerAction::ResourceProgress {
4090                            received, total, ..
4091                        } => {
4092                            progress.push((target_source, *received, *total));
4093                        }
4094                        LinkManagerAction::ResourceFailed { error, .. } => {
4095                            failures.push((target_source, error.clone()));
4096                        }
4097                        _ => {}
4098                    }
4099                }
4100                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4101            }
4102            pending = next;
4103        }
4104
4105        (received_data, sender_completed, progress, failures, rounds)
4106    }
4107
4108    #[test]
4109    fn test_send_resource_auto_compress_option_controls_adv_flag() {
4110        let data = vec![0x41; 2048];
4111
4112        let (mut compressed_mgr, _resp_mgr, link_id) = setup_active_link();
4113        let mut rng = OsRng;
4114        let actions =
4115            compressed_mgr.send_resource_with_auto_compress(&link_id, &data, None, true, &mut rng);
4116        let adv = first_resource_advertisement(&compressed_mgr, &link_id, &actions);
4117        assert!(
4118            adv.flags.compressed,
4119            "compressible resource should compress"
4120        );
4121
4122        let (mut plain_mgr, _resp_mgr, link_id) = setup_active_link();
4123        let mut rng = OsRng;
4124        let actions =
4125            plain_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4126        let adv = first_resource_advertisement(&plain_mgr, &link_id, &actions);
4127        assert!(
4128            !adv.flags.compressed,
4129            "auto_compress=false should keep resource uncompressed"
4130        );
4131    }
4132
4133    #[test]
4134    fn test_send_resource_on_inactive_link() {
4135        let mut mgr = LinkManager::new();
4136        let mut rng = OsRng;
4137        let dummy_sig = [0xAA; 32];
4138        let (link_id, _) =
4139            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
4140
4141        // Link is Pending, not Active
4142        let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
4143        assert!(actions.is_empty(), "Cannot send resource on inactive link");
4144    }
4145
4146    #[test]
4147    fn test_send_resource_without_session_key_uses_encrypt_fallback_path() {
4148        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4149        let mut rng = OsRng;
4150        init_mgr
4151            .links
4152            .get_mut(&link_id)
4153            .unwrap()
4154            .engine
4155            .clear_session_for_testing();
4156
4157        let actions = init_mgr.send_resource(&link_id, b"data", None, &mut rng);
4158
4159        assert!(
4160            actions.is_empty(),
4161            "without a session key, no advertisement should be emitted"
4162        );
4163        assert_eq!(
4164            init_mgr
4165                .links
4166                .get(&link_id)
4167                .map(|managed| managed.outgoing_resources.len()),
4168            Some(1)
4169        );
4170    }
4171
4172    #[test]
4173    fn test_resource_adv_rejected_by_accept_none() {
4174        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4175        let mut rng = OsRng;
4176
4177        // Responder uses default AcceptNone strategy
4178        // Send resource from initiator
4179        let data = vec![0xCD; 100];
4180        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4181
4182        // Deliver advertisement to responder
4183        for action in &adv_actions {
4184            if let LinkManagerAction::SendPacket { raw, .. } = action {
4185                let pkt = RawPacket::unpack(raw).unwrap();
4186                let resp_actions = resp_mgr.handle_local_delivery(
4187                    pkt.destination_hash,
4188                    raw,
4189                    pkt.packet_hash,
4190                    rns_core::transport::types::InterfaceId(0),
4191                    &mut rng,
4192                );
4193                // AcceptNone: should not produce ResourceReceived, may produce SendPacket (RCL)
4194                let has_resource_received = resp_actions
4195                    .iter()
4196                    .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4197                assert!(
4198                    !has_resource_received,
4199                    "AcceptNone should not accept resource"
4200                );
4201            }
4202        }
4203    }
4204
4205    #[test]
4206    fn test_resource_adv_accepted_by_accept_all() {
4207        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4208        let mut rng = OsRng;
4209
4210        // Set responder to AcceptAll
4211        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4212
4213        // Send resource from initiator
4214        let data = vec![0xCD; 100];
4215        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4216
4217        // Deliver advertisement to responder
4218        for action in &adv_actions {
4219            if let LinkManagerAction::SendPacket { raw, .. } = action {
4220                let pkt = RawPacket::unpack(raw).unwrap();
4221                let resp_actions = resp_mgr.handle_local_delivery(
4222                    pkt.destination_hash,
4223                    raw,
4224                    pkt.packet_hash,
4225                    rns_core::transport::types::InterfaceId(0),
4226                    &mut rng,
4227                );
4228                // AcceptAll: should accept and produce a SendPacket (request for parts)
4229                let has_send = resp_actions
4230                    .iter()
4231                    .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4232                assert!(has_send, "AcceptAll should accept and request parts");
4233            }
4234        }
4235    }
4236
4237    #[test]
4238    fn test_resource_accept_app_query() {
4239        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4240        let mut rng = OsRng;
4241
4242        // Set responder to AcceptApp
4243        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4244
4245        // Send resource from initiator
4246        let data = vec![0xCD; 100];
4247        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4248
4249        // Deliver advertisement to responder
4250        let mut got_query = false;
4251        for action in &adv_actions {
4252            if let LinkManagerAction::SendPacket { raw, .. } = action {
4253                let pkt = RawPacket::unpack(raw).unwrap();
4254                let resp_actions = resp_mgr.handle_local_delivery(
4255                    pkt.destination_hash,
4256                    raw,
4257                    pkt.packet_hash,
4258                    rns_core::transport::types::InterfaceId(0),
4259                    &mut rng,
4260                );
4261                for a in &resp_actions {
4262                    if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
4263                        got_query = true;
4264                    }
4265                }
4266            }
4267        }
4268        assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
4269    }
4270
4271    #[test]
4272    fn test_resource_accept_app_accept() {
4273        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4274        let mut rng = OsRng;
4275
4276        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4277
4278        let data = vec![0xCD; 100];
4279        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4280
4281        for action in &adv_actions {
4282            if let LinkManagerAction::SendPacket { raw, .. } = action {
4283                let pkt = RawPacket::unpack(raw).unwrap();
4284                let resp_actions = resp_mgr.handle_local_delivery(
4285                    pkt.destination_hash,
4286                    raw,
4287                    pkt.packet_hash,
4288                    rns_core::transport::types::InterfaceId(0),
4289                    &mut rng,
4290                );
4291                for a in &resp_actions {
4292                    if let LinkManagerAction::ResourceAcceptQuery {
4293                        link_id: lid,
4294                        resource_hash,
4295                        ..
4296                    } = a
4297                    {
4298                        // Accept the resource
4299                        let accept_actions =
4300                            resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
4301                        // Should produce a SendPacket (request for parts)
4302                        let has_send = accept_actions
4303                            .iter()
4304                            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4305                        assert!(
4306                            has_send,
4307                            "Accepting resource should produce request for parts"
4308                        );
4309                    }
4310                }
4311            }
4312        }
4313    }
4314
4315    #[test]
4316    fn test_resource_accept_app_reject() {
4317        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4318        let mut rng = OsRng;
4319
4320        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4321
4322        let data = vec![0xCD; 100];
4323        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4324
4325        for action in &adv_actions {
4326            if let LinkManagerAction::SendPacket { raw, .. } = action {
4327                let pkt = RawPacket::unpack(raw).unwrap();
4328                let resp_actions = resp_mgr.handle_local_delivery(
4329                    pkt.destination_hash,
4330                    raw,
4331                    pkt.packet_hash,
4332                    rns_core::transport::types::InterfaceId(0),
4333                    &mut rng,
4334                );
4335                for a in &resp_actions {
4336                    if let LinkManagerAction::ResourceAcceptQuery {
4337                        link_id: lid,
4338                        resource_hash,
4339                        ..
4340                    } = a
4341                    {
4342                        // Reject the resource
4343                        let reject_actions =
4344                            resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
4345                        // Rejecting should send a cancel and not request parts
4346                        // No ResourceReceived should appear
4347                        let has_resource_received = reject_actions
4348                            .iter()
4349                            .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4350                        assert!(!has_resource_received);
4351                    }
4352                }
4353            }
4354        }
4355    }
4356
4357    #[test]
4358    fn test_resource_full_transfer() {
4359        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4360        let mut rng = OsRng;
4361
4362        // Set responder to AcceptAll
4363        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4364
4365        // Small data (fits in single SDU)
4366        let original_data = b"Hello, Resource Transfer!".to_vec();
4367        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
4368
4369        // Drive the full transfer protocol between the two managers.
4370        // Tag each SendPacket with its source ('i' = initiator, 'r' = responder).
4371        let mut pending: Vec<(char, LinkManagerAction)> =
4372            adv_actions.into_iter().map(|a| ('i', a)).collect();
4373        let mut rounds = 0;
4374        let max_rounds = 50;
4375        let mut resource_received = false;
4376        let mut sender_completed = false;
4377
4378        while !pending.is_empty() && rounds < max_rounds {
4379            rounds += 1;
4380            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4381
4382            for (source, action) in pending.drain(..) {
4383                if let LinkManagerAction::SendPacket { raw, .. } = action {
4384                    let pkt = RawPacket::unpack(&raw).unwrap();
4385
4386                    // Deliver only to the OTHER side
4387                    let target_actions = if source == 'i' {
4388                        resp_mgr.handle_local_delivery(
4389                            pkt.destination_hash,
4390                            &raw,
4391                            pkt.packet_hash,
4392                            rns_core::transport::types::InterfaceId(0),
4393                            &mut rng,
4394                        )
4395                    } else {
4396                        init_mgr.handle_local_delivery(
4397                            pkt.destination_hash,
4398                            &raw,
4399                            pkt.packet_hash,
4400                            rns_core::transport::types::InterfaceId(0),
4401                            &mut rng,
4402                        )
4403                    };
4404
4405                    let target_source = if source == 'i' { 'r' } else { 'i' };
4406                    for a in &target_actions {
4407                        match a {
4408                            LinkManagerAction::ResourceReceived { data, .. } => {
4409                                assert_eq!(*data, original_data);
4410                                resource_received = true;
4411                            }
4412                            LinkManagerAction::ResourceCompleted { .. } => {
4413                                sender_completed = true;
4414                            }
4415                            _ => {}
4416                        }
4417                    }
4418                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4419                }
4420            }
4421            pending = next;
4422        }
4423
4424        assert!(
4425            resource_received,
4426            "Responder should receive resource data (rounds={})",
4427            rounds
4428        );
4429        assert!(
4430            sender_completed,
4431            "Sender should get completion proof (rounds={})",
4432            rounds
4433        );
4434    }
4435
4436    #[test]
4437    fn test_split_resource_advertisement_and_progress_entries() {
4438        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4439        let mut rng = OsRng;
4440        let data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4441
4442        let actions =
4443            init_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4444        let adv = first_resource_advertisement(&init_mgr, &link_id, &actions);
4445
4446        assert!(adv.flags.split);
4447        assert_eq!(adv.segment_index, 1);
4448        assert_eq!(adv.total_segments, 2);
4449        assert_eq!(adv.data_size, data.len() as u64);
4450
4451        let managed = init_mgr.links.get(&link_id).unwrap();
4452        assert_eq!(managed.outgoing_splits.len(), 1);
4453        assert_eq!(
4454            managed
4455                .outgoing_resources
4456                .iter()
4457                .filter(|sender| sender.flags.split)
4458                .count(),
4459            2
4460        );
4461
4462        let entries = init_mgr.resource_entries();
4463        assert_eq!(entries.len(), 1);
4464        assert_eq!(entries[0].direction, "outgoing");
4465        assert!(entries[0].total_parts > managed.outgoing_resources[0].total_parts());
4466        assert_eq!(entries[0].transferred_parts, 0);
4467    }
4468
4469    #[test]
4470    fn test_split_resource_full_transfer_and_monotonic_progress() {
4471        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4472        let mut rng = OsRng;
4473        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4474
4475        let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 2048);
4476        let initial_actions = init_mgr.send_resource_with_auto_compress(
4477            &link_id,
4478            &original_data,
4479            None,
4480            false,
4481            &mut rng,
4482        );
4483
4484        let (received_data, sender_completed, progress, failures, rounds) =
4485            drive_link_manager_packets(
4486                &mut init_mgr,
4487                &mut resp_mgr,
4488                initial_actions,
4489                'i',
4490                &mut rng,
4491                10_000,
4492            );
4493
4494        assert!(
4495            received_data.as_ref().is_some_and(|data| data == &original_data),
4496            "split transfer did not deliver payload in {rounds} rounds; sender_completed={sender_completed}; failures={failures:?}; last_progress={:?}; init_entries={:?}; resp_entries={:?}",
4497            progress.last(),
4498            init_mgr.resource_entries(),
4499            resp_mgr.resource_entries()
4500        );
4501        assert!(
4502            sender_completed,
4503            "sender did not complete in {rounds} rounds"
4504        );
4505        assert!(
4506            progress
4507                .iter()
4508                .any(|(_, received, total)| received == total),
4509            "expected final progress update"
4510        );
4511
4512        let mut init_last = 0;
4513        let mut resp_last = 0;
4514        for (side, received, total) in progress {
4515            assert!(received <= total);
4516            match side {
4517                'i' => {
4518                    assert!(
4519                        received >= init_last,
4520                        "initiator progress regressed from {init_last} to {received}"
4521                    );
4522                    init_last = received;
4523                }
4524                'r' => {
4525                    assert!(
4526                        received >= resp_last,
4527                        "responder progress regressed from {resp_last} to {received}"
4528                    );
4529                    resp_last = received;
4530                }
4531                _ => unreachable!(),
4532            }
4533        }
4534    }
4535
4536    #[test]
4537    fn test_split_resource_accept_app_queries_only_first_segment() {
4538        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4539        let mut rng = OsRng;
4540        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4541
4542        let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4543        let adv_actions = init_mgr.send_resource_with_auto_compress(
4544            &link_id,
4545            &original_data,
4546            None,
4547            false,
4548            &mut rng,
4549        );
4550        let adv_raw = extract_any_send_packet(&adv_actions);
4551        let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4552        let query_actions = resp_mgr.handle_local_delivery(
4553            adv_pkt.destination_hash,
4554            &adv_raw,
4555            adv_pkt.packet_hash,
4556            rns_core::transport::types::InterfaceId(0),
4557            &mut rng,
4558        );
4559
4560        let queries: Vec<_> = query_actions
4561            .iter()
4562            .filter_map(|action| match action {
4563                LinkManagerAction::ResourceAcceptQuery { resource_hash, .. } => {
4564                    Some(resource_hash.clone())
4565                }
4566                _ => None,
4567            })
4568            .collect();
4569        assert_eq!(queries.len(), 1);
4570
4571        let accept_actions = resp_mgr.accept_resource(&link_id, &queries[0], true, &mut rng);
4572        let (received_data, sender_completed, _progress, failures, rounds) =
4573            drive_link_manager_packets(
4574                &mut init_mgr,
4575                &mut resp_mgr,
4576                accept_actions,
4577                'r',
4578                &mut rng,
4579                10_000,
4580            );
4581
4582        assert!(
4583            failures.is_empty(),
4584            "split AcceptApp transfer failed: {failures:?}"
4585        );
4586        assert!(
4587            received_data
4588                .as_ref()
4589                .is_some_and(|data| data == &original_data),
4590            "split AcceptApp transfer did not deliver in {rounds} rounds"
4591        );
4592        assert!(sender_completed);
4593    }
4594
4595    #[test]
4596    fn test_resource_cancel_icl() {
4597        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4598        let mut rng = OsRng;
4599
4600        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4601
4602        // Use large data so transfer is multi-part
4603        let data = vec![0xAB; 2000];
4604        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4605
4606        // Deliver advertisement — responder accepts and sends request
4607        for action in &adv_actions {
4608            if let LinkManagerAction::SendPacket { raw, .. } = action {
4609                let pkt = RawPacket::unpack(raw).unwrap();
4610                resp_mgr.handle_local_delivery(
4611                    pkt.destination_hash,
4612                    raw,
4613                    pkt.packet_hash,
4614                    rns_core::transport::types::InterfaceId(0),
4615                    &mut rng,
4616                );
4617            }
4618        }
4619
4620        // Verify there are incoming resources on the responder
4621        assert!(!resp_mgr
4622            .links
4623            .get(&link_id)
4624            .unwrap()
4625            .incoming_resources
4626            .is_empty());
4627
4628        // Simulate ICL (cancel from initiator side) by calling handle_resource_icl
4629        let icl_actions = resp_mgr.handle_resource_icl(&link_id);
4630
4631        // Should have resource failed
4632        let has_failed = icl_actions
4633            .iter()
4634            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4635        assert!(has_failed, "ICL should produce ResourceFailed");
4636    }
4637
4638    #[test]
4639    fn test_resource_cancel_rcl() {
4640        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4641        let mut rng = OsRng;
4642
4643        // Create a resource sender
4644        let data = vec![0xAB; 2000];
4645        init_mgr.send_resource(&link_id, &data, None, &mut rng);
4646
4647        // Verify there are outgoing resources
4648        assert!(!init_mgr
4649            .links
4650            .get(&link_id)
4651            .unwrap()
4652            .outgoing_resources
4653            .is_empty());
4654
4655        // Simulate RCL (cancel from receiver side)
4656        let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
4657
4658        let has_failed = rcl_actions
4659            .iter()
4660            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4661        assert!(has_failed, "RCL should produce ResourceFailed");
4662    }
4663
4664    #[test]
4665    fn test_cancel_all_resources_clears_active_transfers() {
4666        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4667        let mut rng = OsRng;
4668
4669        let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
4670        assert!(!actions.is_empty());
4671        assert_eq!(init_mgr.resource_transfer_count(), 1);
4672
4673        let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
4674
4675        assert_eq!(init_mgr.resource_transfer_count(), 0);
4676        assert!(cancel_actions
4677            .iter()
4678            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
4679    }
4680
4681    #[test]
4682    fn test_resource_tick_cleans_up() {
4683        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4684        let mut rng = OsRng;
4685
4686        let data = vec![0xAB; 100];
4687        init_mgr.send_resource(&link_id, &data, None, &mut rng);
4688
4689        assert!(!init_mgr
4690            .links
4691            .get(&link_id)
4692            .unwrap()
4693            .outgoing_resources
4694            .is_empty());
4695
4696        // Cancel the sender to make it Complete
4697        init_mgr.handle_resource_rcl(&link_id);
4698
4699        // Tick should clean up completed resources
4700        init_mgr.tick(&mut rng);
4701
4702        assert!(
4703            init_mgr
4704                .links
4705                .get(&link_id)
4706                .unwrap()
4707                .outgoing_resources
4708                .is_empty(),
4709            "Tick should clean up completed/failed outgoing resources"
4710        );
4711    }
4712
4713    #[test]
4714    fn test_build_link_packet() {
4715        let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4716
4717        let actions =
4718            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
4719        assert_eq!(actions.len(), 1);
4720        if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
4721            let pkt = RawPacket::unpack(raw).unwrap();
4722            assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
4723            assert_eq!(*dest_type, constants::DESTINATION_LINK);
4724        } else {
4725            panic!("Expected SendPacket");
4726        }
4727    }
4728
4729    #[test]
4730    fn test_build_link_packet_returns_empty_when_mtu_too_small() {
4731        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4732        init_mgr.set_link_mtu(&link_id, 84);
4733
4734        let actions =
4735            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, &[0xAA; 200]);
4736        assert!(actions.is_empty(), "oversized packet should not be built");
4737    }
4738
4739    #[test]
4740    fn test_process_resource_actions_encrypted_variants_drop_on_encrypt_failure() {
4741        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4742        let mut rng = OsRng;
4743        init_mgr
4744            .links
4745            .get_mut(&link_id)
4746            .unwrap()
4747            .engine
4748            .clear_session_for_testing();
4749
4750        let cases = vec![
4751            ResourceAction::SendAdvertisement(vec![1, 2, 3]),
4752            ResourceAction::SendRequest(vec![4, 5, 6]),
4753            ResourceAction::SendHmu(vec![7, 8, 9]),
4754            ResourceAction::SendProof(vec![10, 11, 12]),
4755            ResourceAction::SendCancelInitiator(vec![13, 14, 15]),
4756            ResourceAction::SendCancelReceiver(vec![16, 17, 18]),
4757        ];
4758
4759        for action in cases {
4760            let out = init_mgr.process_resource_actions(&link_id, vec![action], &mut rng);
4761            assert!(
4762                out.is_empty(),
4763                "encrypt failure should suppress packet emission"
4764            );
4765        }
4766    }
4767
4768    // ====================================================================
4769    // Phase 8b: Channel message & data callback tests
4770    // ====================================================================
4771
4772    #[test]
4773    fn test_channel_message_delivery() {
4774        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4775        let mut rng = OsRng;
4776
4777        // Send channel message from initiator
4778        let chan_actions = init_mgr
4779            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4780            .expect("active link channel send should succeed");
4781        assert!(!chan_actions.is_empty());
4782
4783        // Deliver to responder
4784        let mut got_channel_msg = false;
4785        for action in &chan_actions {
4786            if let LinkManagerAction::SendPacket { raw, .. } = action {
4787                let pkt = RawPacket::unpack(raw).unwrap();
4788                let resp_actions = resp_mgr.handle_local_delivery(
4789                    pkt.destination_hash,
4790                    raw,
4791                    pkt.packet_hash,
4792                    rns_core::transport::types::InterfaceId(0),
4793                    &mut rng,
4794                );
4795                for a in &resp_actions {
4796                    if let LinkManagerAction::ChannelMessageReceived {
4797                        msgtype, payload, ..
4798                    } = a
4799                    {
4800                        assert_eq!(*msgtype, 42);
4801                        assert_eq!(*payload, b"channel data");
4802                        got_channel_msg = true;
4803                    }
4804                }
4805            }
4806        }
4807        assert!(got_channel_msg, "Responder should receive channel message");
4808    }
4809
4810    #[test]
4811    fn test_channel_send_drops_packet_when_encrypt_fails() {
4812        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4813        let mut rng = OsRng;
4814        init_mgr
4815            .links
4816            .get_mut(&link_id)
4817            .unwrap()
4818            .engine
4819            .clear_session_for_testing();
4820
4821        let actions = init_mgr
4822            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4823            .expect("channel should still accept the message locally");
4824
4825        assert!(
4826            actions.is_empty(),
4827            "encrypt failure should suppress channel packet"
4828        );
4829        assert!(
4830            init_mgr
4831                .links
4832                .get(&link_id)
4833                .unwrap()
4834                .pending_channel_packets
4835                .is_empty(),
4836            "failed packet encryption must not track a pending channel proof"
4837        );
4838    }
4839
4840    #[test]
4841    fn test_channel_proof_reopens_send_window() {
4842        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4843        let mut rng = OsRng;
4844
4845        init_mgr
4846            .send_channel_message(&link_id, 42, b"first", &mut rng)
4847            .expect("first send should succeed");
4848        init_mgr
4849            .send_channel_message(&link_id, 42, b"second", &mut rng)
4850            .expect("second send should succeed");
4851
4852        let err = init_mgr
4853            .send_channel_message(&link_id, 42, b"third", &mut rng)
4854            .expect_err("third send should hit the initial channel window");
4855        assert_eq!(err, "Channel is not ready to send");
4856
4857        let queued_packets = init_mgr
4858            .links
4859            .get(&link_id)
4860            .unwrap()
4861            .pending_channel_packets
4862            .clone();
4863        assert_eq!(queued_packets.len(), 2);
4864        for tracked_hash in queued_packets.keys().take(1) {
4865            let mut proof_data = Vec::with_capacity(96);
4866            proof_data.extend_from_slice(tracked_hash);
4867            proof_data.extend_from_slice(&[0x11; 64]);
4868            let flags = PacketFlags {
4869                header_type: constants::HEADER_1,
4870                context_flag: constants::FLAG_UNSET,
4871                transport_type: constants::TRANSPORT_BROADCAST,
4872                destination_type: constants::DESTINATION_LINK,
4873                packet_type: constants::PACKET_TYPE_PROOF,
4874            };
4875            let proof = RawPacket::pack(
4876                flags,
4877                0,
4878                &link_id,
4879                None,
4880                constants::CONTEXT_NONE,
4881                &proof_data,
4882            )
4883            .expect("proof packet should pack");
4884            let ack_actions = init_mgr.handle_local_delivery(
4885                link_id,
4886                &proof.raw,
4887                proof.packet_hash,
4888                rns_core::transport::types::InterfaceId(0),
4889                &mut rng,
4890            );
4891            assert!(
4892                ack_actions.is_empty(),
4893                "proof delivery should only update channel state"
4894            );
4895        }
4896
4897        init_mgr
4898            .send_channel_message(&link_id, 42, b"third", &mut rng)
4899            .expect("proof should free one channel slot");
4900    }
4901
4902    #[test]
4903    fn test_generic_link_data_delivery() {
4904        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
4905        let mut rng = OsRng;
4906
4907        // Send generic data with a custom context
4908        let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
4909        assert_eq!(actions.len(), 1);
4910
4911        // Deliver to responder
4912        let raw = extract_any_send_packet(&actions);
4913        let pkt = RawPacket::unpack(&raw).unwrap();
4914        let resp_actions = resp_mgr.handle_local_delivery(
4915            pkt.destination_hash,
4916            &raw,
4917            pkt.packet_hash,
4918            rns_core::transport::types::InterfaceId(0),
4919            &mut rng,
4920        );
4921
4922        let has_data = resp_actions
4923            .iter()
4924            .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
4925        assert!(
4926            has_data,
4927            "Responder should receive LinkDataReceived for unknown context"
4928        );
4929    }
4930
4931    #[test]
4932    fn test_invalid_encrypted_contexts_are_ignored() {
4933        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4934        let mut rng = OsRng;
4935        let contexts = [
4936            constants::CONTEXT_CHANNEL,
4937            constants::CONTEXT_REQUEST,
4938            constants::CONTEXT_RESPONSE,
4939            constants::CONTEXT_RESOURCE_ADV,
4940            constants::CONTEXT_RESOURCE_REQ,
4941            constants::CONTEXT_RESOURCE_HMU,
4942            constants::CONTEXT_RESOURCE_PRF,
4943            0x42,
4944        ];
4945
4946        for context in contexts {
4947            let flags = PacketFlags {
4948                header_type: constants::HEADER_1,
4949                context_flag: constants::FLAG_UNSET,
4950                transport_type: constants::TRANSPORT_BROADCAST,
4951                destination_type: constants::DESTINATION_LINK,
4952                packet_type: constants::PACKET_TYPE_DATA,
4953            };
4954            let pkt = RawPacket::pack(flags, 0, &link_id, None, context, b"invalid-ciphertext")
4955                .expect("test packet should pack");
4956            let actions = resp_mgr.handle_local_delivery(
4957                pkt.destination_hash,
4958                &pkt.raw,
4959                pkt.packet_hash,
4960                rns_core::transport::types::InterfaceId(0),
4961                &mut rng,
4962            );
4963            assert!(
4964                actions.is_empty(),
4965                "invalid ciphertext for context {context:#x} should be ignored"
4966            );
4967        }
4968    }
4969
4970    #[test]
4971    fn test_resource_part_without_matching_receiver_is_ignored() {
4972        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4973        let mut rng = OsRng;
4974        let flags = PacketFlags {
4975            header_type: constants::HEADER_1,
4976            context_flag: constants::FLAG_UNSET,
4977            transport_type: constants::TRANSPORT_BROADCAST,
4978            destination_type: constants::DESTINATION_LINK,
4979            packet_type: constants::PACKET_TYPE_DATA,
4980        };
4981        let pkt = RawPacket::pack(
4982            flags,
4983            0,
4984            &link_id,
4985            None,
4986            constants::CONTEXT_RESOURCE,
4987            b"orphan-part",
4988        )
4989        .expect("test packet should pack");
4990
4991        let actions = resp_mgr.handle_local_delivery(
4992            pkt.destination_hash,
4993            &pkt.raw,
4994            pkt.packet_hash,
4995            rns_core::transport::types::InterfaceId(0),
4996            &mut rng,
4997        );
4998
4999        assert!(actions.is_empty(), "orphan resource part should be ignored");
5000    }
5001
5002    #[test]
5003    fn test_response_delivery() {
5004        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5005        let mut rng = OsRng;
5006
5007        // Register handler on responder
5008        resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
5009            Some(data.to_vec())
5010        });
5011
5012        // Send request from initiator
5013        let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); // msgpack nil
5014        assert!(!req_actions.is_empty());
5015
5016        // Deliver request to responder — should produce response
5017        let req_raw = extract_any_send_packet(&req_actions);
5018        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5019        let resp_actions = resp_mgr.handle_local_delivery(
5020            req_pkt.destination_hash,
5021            &req_raw,
5022            req_pkt.packet_hash,
5023            rns_core::transport::types::InterfaceId(0),
5024            &mut rng,
5025        );
5026        let has_resp_send = resp_actions
5027            .iter()
5028            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
5029        assert!(has_resp_send, "Handler should produce response");
5030
5031        // Deliver response back to initiator
5032        let resp_raw = extract_any_send_packet(&resp_actions);
5033        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
5034        let init_actions = init_mgr.handle_local_delivery(
5035            resp_pkt.destination_hash,
5036            &resp_raw,
5037            resp_pkt.packet_hash,
5038            rns_core::transport::types::InterfaceId(0),
5039            &mut rng,
5040        );
5041
5042        let has_response_received = init_actions
5043            .iter()
5044            .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
5045        assert!(
5046            has_response_received,
5047            "Initiator should receive ResponseReceived"
5048        );
5049    }
5050
5051    #[test]
5052    fn test_large_response_uses_resource_fallback() {
5053        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
5054        let mut rng = OsRng;
5055
5056        // Register handler on responder with payload that cannot fit a direct
5057        // CONTEXT_RESPONSE packet.
5058        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5059        resp_mgr.register_request_handler("/large", None, {
5060            let large_payload = large_payload.clone();
5061            move |_link_id, _path, _data, _remote| Some(large_payload.clone())
5062        });
5063
5064        // Send request from initiator.
5065        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5066        assert!(!req_actions.is_empty());
5067
5068        // Deliver request to responder and inspect responder outbound packets.
5069        let req_raw = extract_any_send_packet(&req_actions);
5070        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5071        let resp_actions = resp_mgr.handle_local_delivery(
5072            req_pkt.destination_hash,
5073            &req_raw,
5074            req_pkt.packet_hash,
5075            rns_core::transport::types::InterfaceId(0),
5076            &mut rng,
5077        );
5078
5079        let mut has_resource_adv = false;
5080        let mut has_direct_response = false;
5081        for action in &resp_actions {
5082            if let LinkManagerAction::SendPacket { raw, .. } = action {
5083                let pkt = RawPacket::unpack(raw).unwrap();
5084                if pkt.context == constants::CONTEXT_RESOURCE_ADV {
5085                    has_resource_adv = true;
5086                }
5087                if pkt.context == constants::CONTEXT_RESPONSE {
5088                    has_direct_response = true;
5089                }
5090            }
5091        }
5092
5093        assert!(
5094            has_resource_adv,
5095            "Large response should advertise a response resource"
5096        );
5097        assert!(
5098            !has_direct_response,
5099            "Large response should not use direct CONTEXT_RESPONSE packet"
5100        );
5101    }
5102
5103    #[test]
5104    fn test_send_management_response_without_session_key_uses_resource_fallback_path() {
5105        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5106        let mut rng = OsRng;
5107        init_mgr
5108            .links
5109            .get_mut(&link_id)
5110            .unwrap()
5111            .engine
5112            .clear_session_for_testing();
5113
5114        let large_response: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5115        let actions =
5116            init_mgr.send_management_response(&link_id, &[0x11; 16], &large_response, &mut rng);
5117
5118        assert!(
5119            actions.is_empty(),
5120            "without a session key, no response packets should be emitted"
5121        );
5122        assert_eq!(
5123            init_mgr
5124                .links
5125                .get(&link_id)
5126                .map(|managed| managed.outgoing_resources.len()),
5127            Some(1)
5128        );
5129    }
5130
5131    #[test]
5132    fn test_send_channel_message_on_no_channel() {
5133        let mut mgr = LinkManager::new();
5134        let mut rng = OsRng;
5135        let dummy_sig = [0xAA; 32];
5136        let (link_id, _) =
5137            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5138
5139        // Link is Pending (no channel), should return empty
5140        let err = mgr
5141            .send_channel_message(&link_id, 1, b"test", &mut rng)
5142            .expect_err("pending link should reject channel send");
5143        assert_eq!(err, "link has no active channel");
5144    }
5145
5146    #[test]
5147    fn test_send_on_link_requires_active() {
5148        let mut mgr = LinkManager::new();
5149        let mut rng = OsRng;
5150        let dummy_sig = [0xAA; 32];
5151        let (link_id, _) =
5152            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5153
5154        let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
5155        assert!(actions.is_empty(), "Cannot send on pending link");
5156    }
5157
5158    #[test]
5159    fn test_send_on_link_unknown_link() {
5160        let mgr = LinkManager::new();
5161        let mut rng = OsRng;
5162
5163        let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
5164        assert!(actions.is_empty());
5165    }
5166
5167    #[test]
5168    fn test_resource_full_transfer_large() {
5169        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5170        let mut rng = OsRng;
5171
5172        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5173
5174        // Multi-part data (larger than a single SDU of 464 bytes)
5175        let original_data: Vec<u8> = (0..2000u32)
5176            .map(|i| {
5177                let pos = i as usize;
5178                (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
5179            })
5180            .collect();
5181
5182        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
5183
5184        let mut pending: Vec<(char, LinkManagerAction)> =
5185            adv_actions.into_iter().map(|a| ('i', a)).collect();
5186        let mut rounds = 0;
5187        let max_rounds = 200;
5188        let mut resource_received = false;
5189        let mut sender_completed = false;
5190
5191        while !pending.is_empty() && rounds < max_rounds {
5192            rounds += 1;
5193            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5194
5195            for (source, action) in pending.drain(..) {
5196                if let LinkManagerAction::SendPacket { raw, .. } = action {
5197                    let pkt = match RawPacket::unpack(&raw) {
5198                        Ok(p) => p,
5199                        Err(_) => continue,
5200                    };
5201
5202                    let target_actions = if source == 'i' {
5203                        resp_mgr.handle_local_delivery(
5204                            pkt.destination_hash,
5205                            &raw,
5206                            pkt.packet_hash,
5207                            rns_core::transport::types::InterfaceId(0),
5208                            &mut rng,
5209                        )
5210                    } else {
5211                        init_mgr.handle_local_delivery(
5212                            pkt.destination_hash,
5213                            &raw,
5214                            pkt.packet_hash,
5215                            rns_core::transport::types::InterfaceId(0),
5216                            &mut rng,
5217                        )
5218                    };
5219
5220                    let target_source = if source == 'i' { 'r' } else { 'i' };
5221                    for a in &target_actions {
5222                        match a {
5223                            LinkManagerAction::ResourceReceived { data, .. } => {
5224                                assert_eq!(*data, original_data);
5225                                resource_received = true;
5226                            }
5227                            LinkManagerAction::ResourceCompleted { .. } => {
5228                                sender_completed = true;
5229                            }
5230                            _ => {}
5231                        }
5232                    }
5233                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5234                }
5235            }
5236            pending = next;
5237        }
5238
5239        assert!(
5240            resource_received,
5241            "Should receive large resource (rounds={})",
5242            rounds
5243        );
5244        assert!(
5245            sender_completed,
5246            "Sender should complete (rounds={})",
5247            rounds
5248        );
5249    }
5250
5251    #[test]
5252    fn test_resource_receiver_stores_original_advertisement_plaintext() {
5253        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5254        let mut rng = OsRng;
5255
5256        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5257
5258        let data = vec![0x41; 256];
5259        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5260
5261        let adv_raw = adv_actions
5262            .iter()
5263            .find_map(|action| match action {
5264                LinkManagerAction::SendPacket { raw, .. } => {
5265                    let pkt = RawPacket::unpack(raw).ok()?;
5266                    (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
5267                }
5268                _ => None,
5269            })
5270            .expect("sender should emit a resource advertisement");
5271
5272        let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
5273        let adv_plaintext = resp_mgr
5274            .links
5275            .get(&link_id)
5276            .unwrap()
5277            .engine
5278            .decrypt(&adv_pkt.data)
5279            .unwrap();
5280
5281        let _resp_actions = resp_mgr.handle_local_delivery(
5282            adv_pkt.destination_hash,
5283            &adv_raw,
5284            adv_pkt.packet_hash,
5285            rns_core::transport::types::InterfaceId(0),
5286            &mut rng,
5287        );
5288
5289        let receiver = resp_mgr
5290            .links
5291            .get(&link_id)
5292            .and_then(|managed| managed.incoming_resources.first())
5293            .expect("advertisement should create an incoming receiver");
5294        assert_eq!(receiver.advertisement_packet, adv_plaintext);
5295        assert_eq!(
5296            receiver.max_decompressed_size,
5297            constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
5298        );
5299    }
5300
5301    #[test]
5302    fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
5303        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5304        let mut rng = OsRng;
5305
5306        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5307
5308        let data = vec![b'A'; 4096];
5309        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5310
5311        let mut request_actions = Vec::new();
5312        for action in &adv_actions {
5313            let LinkManagerAction::SendPacket { raw, .. } = action else {
5314                continue;
5315            };
5316            let pkt = RawPacket::unpack(raw).unwrap();
5317            let actions = resp_mgr.handle_local_delivery(
5318                pkt.destination_hash,
5319                raw,
5320                pkt.packet_hash,
5321                rns_core::transport::types::InterfaceId(0),
5322                &mut rng,
5323            );
5324            request_actions.extend(actions);
5325        }
5326
5327        {
5328            let receiver = resp_mgr
5329                .links
5330                .get_mut(&link_id)
5331                .and_then(|managed| managed.incoming_resources.first_mut())
5332                .expect("receiver should exist after advertisement");
5333            assert!(receiver.flags.compressed, "test data should be compressed");
5334            receiver.max_decompressed_size = 64;
5335        }
5336
5337        let mut responder_actions = Vec::new();
5338        for action in request_actions {
5339            let LinkManagerAction::SendPacket { raw, .. } = action else {
5340                continue;
5341            };
5342            let pkt = RawPacket::unpack(&raw).unwrap();
5343            let actions = init_mgr.handle_local_delivery(
5344                pkt.destination_hash,
5345                &raw,
5346                pkt.packet_hash,
5347                rns_core::transport::types::InterfaceId(0),
5348                &mut rng,
5349            );
5350
5351            for action in actions {
5352                let LinkManagerAction::SendPacket { raw, .. } = &action else {
5353                    continue;
5354                };
5355                let pkt = RawPacket::unpack(raw).unwrap();
5356                let delivered = resp_mgr.handle_local_delivery(
5357                    pkt.destination_hash,
5358                    raw,
5359                    pkt.packet_hash,
5360                    rns_core::transport::types::InterfaceId(0),
5361                    &mut rng,
5362                );
5363                responder_actions.extend(delivered);
5364            }
5365        }
5366
5367        assert!(
5368            responder_actions.iter().any(|action| matches!(
5369                action,
5370                LinkManagerAction::ResourceFailed { error, .. }
5371                    if error == "Resource too large"
5372            )),
5373            "corrupt oversized resource should fail with TooLarge"
5374        );
5375        assert!(
5376            responder_actions.iter().any(|action| matches!(
5377                action,
5378                LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
5379            )),
5380            "corrupt oversized resource should tear down the link"
5381        );
5382        assert!(
5383            responder_actions.iter().any(|action| match action {
5384                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5385                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
5386                    .unwrap_or(false),
5387                _ => false,
5388            }),
5389            "corrupt oversized resource should send a receiver cancel/reject packet"
5390        );
5391        assert_eq!(
5392            resp_mgr
5393                .links
5394                .get(&link_id)
5395                .map(|managed| managed.engine.state()),
5396            Some(LinkState::Closed)
5397        );
5398    }
5399
5400    #[test]
5401    fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
5402        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5403        let mut rng = OsRng;
5404
5405        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5406
5407        // Large and incompressible enough to require multiple hashmap segments
5408        // even with the live Bzip2Compressor in the LinkManager path.
5409        let mut state = 0x1234_5678u32;
5410        let data: Vec<u8> = (0..50000)
5411            .map(|_| {
5412                state = state.wrapping_mul(1664525).wrapping_add(1013904223);
5413                (state >> 16) as u8
5414            })
5415            .collect();
5416        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5417        let mut pending: Vec<(char, LinkManagerAction)> =
5418            adv_actions.into_iter().map(|a| ('i', a)).collect();
5419
5420        let mut rounds = 0;
5421
5422        // Drive the real link-manager exchange until the receiver is genuinely
5423        // waiting for an HMU after exhausting the advertised hashmap segment.
5424        while rounds < 300 {
5425            rounds += 1;
5426            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5427
5428            for (source, action) in pending.drain(..) {
5429                let LinkManagerAction::SendPacket { raw, .. } = action else {
5430                    continue;
5431                };
5432
5433                let pkt = match RawPacket::unpack(&raw) {
5434                    Ok(p) => p,
5435                    Err(_) => continue,
5436                };
5437
5438                let target_actions = if source == 'i' {
5439                    resp_mgr.handle_local_delivery(
5440                        pkt.destination_hash,
5441                        &raw,
5442                        pkt.packet_hash,
5443                        rns_core::transport::types::InterfaceId(0),
5444                        &mut rng,
5445                    )
5446                } else {
5447                    init_mgr.handle_local_delivery(
5448                        pkt.destination_hash,
5449                        &raw,
5450                        pkt.packet_hash,
5451                        rns_core::transport::types::InterfaceId(0),
5452                        &mut rng,
5453                    )
5454                };
5455
5456                let target_source = if source == 'i' { 'r' } else { 'i' };
5457                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5458            }
5459
5460            if resp_mgr
5461                .links
5462                .get(&link_id)
5463                .and_then(|managed| managed.incoming_resources.first())
5464                .is_some_and(|receiver| receiver.waiting_for_hmu)
5465            {
5466                break;
5467            }
5468
5469            pending = next;
5470        }
5471
5472        assert!(
5473            resp_mgr
5474                .links
5475                .get(&link_id)
5476                .and_then(|managed| managed.incoming_resources.first())
5477                .is_some_and(|receiver| receiver.waiting_for_hmu),
5478            "expected receiver to reach a live HMU wait state"
5479        );
5480
5481        // Prime the live receiver once so it computes the same EIFR it will use
5482        // for timeout decisions in this HMU-wait state.
5483        let prime_actions = {
5484            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5485            let receiver = managed.incoming_resources.first_mut().unwrap();
5486            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5487                managed.engine.decrypt(ciphertext).map_err(|_| ())
5488            };
5489            receiver.tick(
5490                receiver.last_activity + 0.0001,
5491                &decrypt_fn,
5492                &Bzip2Compressor,
5493            )
5494        };
5495        assert!(
5496            !prime_actions
5497                .iter()
5498                .any(|a| matches!(a, ResourceAction::SendRequest(_))),
5499            "fresh HMU wait state should not immediately emit a retry request"
5500        );
5501
5502        let (late_delta, retries_before) = {
5503            let managed = resp_mgr
5504                .links
5505                .get_mut(&link_id)
5506                .expect("receiver link should still exist");
5507            let receiver = managed
5508                .incoming_resources
5509                .first_mut()
5510                .expect("receiver should have an active incoming resource");
5511
5512            assert!(
5513                receiver.waiting_for_hmu,
5514                "receiver should be waiting for HMU"
5515            );
5516
5517            let eifr = receiver.eifr.unwrap_or_else(|| {
5518                (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
5519            });
5520            let expected_tof = if receiver.outstanding_parts > 0 {
5521                (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
5522            } else {
5523                (3.0 * constants::RESOURCE_SDU as f64) / eifr
5524            };
5525            let expected_hmu_wait =
5526                (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
5527            let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
5528                + constants::RESOURCE_RETRY_GRACE_TIME;
5529            (
5530                old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
5531                receiver.retries_left,
5532            )
5533        };
5534        {
5535            let managed = resp_mgr.links.get(&link_id).unwrap();
5536            let receiver = managed.incoming_resources.first().unwrap();
5537            assert_eq!(receiver.retries_left, retries_before);
5538            assert!(
5539                receiver.eifr.is_some(),
5540                "receiver tick should have populated EIFR"
5541            );
5542        }
5543
5544        let late_resource_actions = {
5545            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5546            let receiver = managed.incoming_resources.first_mut().unwrap();
5547            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5548                managed.engine.decrypt(ciphertext).map_err(|_| ())
5549            };
5550            receiver.tick(
5551                receiver.last_activity + late_delta,
5552                &decrypt_fn,
5553                &Bzip2Compressor,
5554            )
5555        };
5556        let late_actions =
5557            resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
5558        let retry_raw = late_actions
5559            .iter()
5560            .find_map(|a| match a {
5561                LinkManagerAction::SendPacket { raw, .. } => {
5562                    let pkt = RawPacket::unpack(raw).ok()?;
5563                    (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
5564                }
5565                _ => None,
5566            })
5567            .expect("receiver should emit a resource retry request after extended timeout");
5568
5569        {
5570            let managed = resp_mgr.links.get(&link_id).unwrap();
5571            let receiver = managed.incoming_resources.first().unwrap();
5572            assert_eq!(receiver.retries_left, retries_before - 1);
5573        }
5574
5575        let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
5576        let retry_plaintext = resp_mgr
5577            .links
5578            .get(&link_id)
5579            .unwrap()
5580            .engine
5581            .decrypt(&retry_pkt.data)
5582            .expect("retry request should decrypt");
5583        assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
5584
5585        // Deliver the retry request to the sender and verify it turns into a
5586        // real HMU packet in the live LinkManager flow.
5587        let retry_to_sender = init_mgr.handle_local_delivery(
5588            retry_pkt.destination_hash,
5589            &retry_raw,
5590            retry_pkt.packet_hash,
5591            rns_core::transport::types::InterfaceId(0),
5592            &mut rng,
5593        );
5594        assert!(
5595            retry_to_sender.iter().any(|a| match a {
5596                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5597                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
5598                    .unwrap_or(false),
5599                _ => false,
5600            }),
5601            "sender should answer the exhausted retry request with a live HMU packet"
5602        );
5603    }
5604
5605    #[test]
5606    fn test_process_resource_actions_mapping() {
5607        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5608        let mut rng = OsRng;
5609
5610        // Test that various ResourceActions map to correct LinkManagerActions
5611        let actions = vec![
5612            ResourceAction::DataReceived {
5613                data: vec![1, 2, 3],
5614                metadata: Some(vec![4, 5]),
5615            },
5616            ResourceAction::Completed,
5617            ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
5618            ResourceAction::ProgressUpdate {
5619                received: 10,
5620                total: 20,
5621            },
5622            ResourceAction::TeardownLink,
5623        ];
5624
5625        let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
5626
5627        assert!(matches!(
5628            result[0],
5629            LinkManagerAction::ResourceReceived { .. }
5630        ));
5631        assert!(matches!(
5632            result[1],
5633            LinkManagerAction::ResourceCompleted { .. }
5634        ));
5635        assert!(matches!(
5636            result[2],
5637            LinkManagerAction::ResourceFailed { .. }
5638        ));
5639        assert!(matches!(
5640            result[3],
5641            LinkManagerAction::ResourceProgress {
5642                received: 10,
5643                total: 20,
5644                ..
5645            }
5646        ));
5647        assert!(result
5648            .iter()
5649            .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
5650    }
5651
5652    #[test]
5653    fn test_link_state_empty() {
5654        let mgr = LinkManager::new();
5655        let fake_id = [0xAA; 16];
5656        assert!(mgr.link_state(&fake_id).is_none());
5657    }
5658
5659    #[test]
5660    fn test_large_response_resource_completes_as_response() {
5661        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5662        let mut rng = OsRng;
5663
5664        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5665        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
5666        resp_mgr.register_request_handler("/large", None, {
5667            let response_value = response_value.clone();
5668            move |_link_id, _path, _data, _remote| Some(response_value.clone())
5669        });
5670
5671        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5672        let req_raw = extract_any_send_packet(&req_actions);
5673        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5674        let request_id = req_pkt.get_truncated_hash();
5675        let resp_actions = resp_mgr.handle_local_delivery(
5676            req_pkt.destination_hash,
5677            &req_raw,
5678            req_pkt.packet_hash,
5679            rns_core::transport::types::InterfaceId(0),
5680            &mut rng,
5681        );
5682
5683        let mut pending: Vec<(char, LinkManagerAction)> =
5684            resp_actions.into_iter().map(|a| ('r', a)).collect();
5685        let mut rounds = 0;
5686        let mut received_response = None;
5687
5688        while !pending.is_empty() && rounds < 200 {
5689            rounds += 1;
5690            let mut next = Vec::new();
5691
5692            for (source, action) in pending.drain(..) {
5693                let LinkManagerAction::SendPacket { raw, .. } = action else {
5694                    continue;
5695                };
5696                let pkt = RawPacket::unpack(&raw).unwrap();
5697                let target_actions = if source == 'r' {
5698                    init_mgr.handle_local_delivery(
5699                        pkt.destination_hash,
5700                        &raw,
5701                        pkt.packet_hash,
5702                        rns_core::transport::types::InterfaceId(0),
5703                        &mut rng,
5704                    )
5705                } else {
5706                    resp_mgr.handle_local_delivery(
5707                        pkt.destination_hash,
5708                        &raw,
5709                        pkt.packet_hash,
5710                        rns_core::transport::types::InterfaceId(0),
5711                        &mut rng,
5712                    )
5713                };
5714
5715                let target_source = if source == 'r' { 'i' } else { 'r' };
5716                for target_action in &target_actions {
5717                    match target_action {
5718                        LinkManagerAction::ResponseReceived {
5719                            request_id: rid,
5720                            data,
5721                            ..
5722                        } => {
5723                            received_response = Some((*rid, data.clone()));
5724                        }
5725                        LinkManagerAction::ResourceReceived { .. } => {
5726                            panic!("response resources must complete as ResponseReceived")
5727                        }
5728                        LinkManagerAction::ResourceAcceptQuery { .. } => {
5729                            panic!("response resources must bypass application acceptance")
5730                        }
5731                        _ => {}
5732                    }
5733                }
5734                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5735            }
5736
5737            pending = next;
5738        }
5739
5740        let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
5741            panic!(
5742                "large response resource did not complete as ResponseReceived after {} rounds",
5743                rounds
5744            )
5745        });
5746        assert_eq!(received_request_id, request_id);
5747        assert_eq!(received_data, response_value);
5748    }
5749
5750    #[test]
5751    fn test_response_resource_preserves_metadata() {
5752        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5753        let mut rng = OsRng;
5754
5755        let payload = b"bundle-data".to_vec();
5756        let metadata = b"git-status-ok".to_vec();
5757        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5758        resp_mgr.register_request_handler_response("/fetch", None, {
5759            let response_value = response_value.clone();
5760            let metadata = metadata.clone();
5761            move |_link_id, _path, _data, _remote| {
5762                Some(RequestResponse::Resource {
5763                    data: response_value.clone(),
5764                    metadata: Some(metadata.clone()),
5765                    auto_compress: false,
5766                })
5767            }
5768        });
5769
5770        let req_actions = init_mgr.send_request(&link_id, "/fetch", b"\xc0", &mut rng);
5771        let req_raw = extract_any_send_packet(&req_actions);
5772        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5773        let request_id = req_pkt.get_truncated_hash();
5774        let resp_actions = resp_mgr.handle_local_delivery(
5775            req_pkt.destination_hash,
5776            &req_raw,
5777            req_pkt.packet_hash,
5778            rns_core::transport::types::InterfaceId(0),
5779            &mut rng,
5780        );
5781
5782        let mut pending: Vec<(char, LinkManagerAction)> =
5783            resp_actions.into_iter().map(|a| ('r', a)).collect();
5784        let mut received_response = None;
5785
5786        for _ in 0..200 {
5787            if pending.is_empty() || received_response.is_some() {
5788                break;
5789            }
5790
5791            let mut next = Vec::new();
5792            for (source, action) in pending.drain(..) {
5793                let LinkManagerAction::SendPacket { raw, .. } = action else {
5794                    continue;
5795                };
5796                let pkt = RawPacket::unpack(&raw).unwrap();
5797                let target_actions = if source == 'r' {
5798                    init_mgr.handle_local_delivery(
5799                        pkt.destination_hash,
5800                        &raw,
5801                        pkt.packet_hash,
5802                        rns_core::transport::types::InterfaceId(0),
5803                        &mut rng,
5804                    )
5805                } else {
5806                    resp_mgr.handle_local_delivery(
5807                        pkt.destination_hash,
5808                        &raw,
5809                        pkt.packet_hash,
5810                        rns_core::transport::types::InterfaceId(0),
5811                        &mut rng,
5812                    )
5813                };
5814
5815                let target_source = if source == 'r' { 'i' } else { 'r' };
5816                for target_action in &target_actions {
5817                    match target_action {
5818                        LinkManagerAction::ResponseReceived {
5819                            request_id: rid,
5820                            data,
5821                            metadata: response_metadata,
5822                            ..
5823                        } => {
5824                            received_response =
5825                                Some((*rid, data.clone(), response_metadata.clone()));
5826                        }
5827                        LinkManagerAction::ResourceReceived { .. } => {
5828                            panic!("response resources must complete as ResponseReceived")
5829                        }
5830                        _ => {}
5831                    }
5832                }
5833                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5834            }
5835            pending = next;
5836        }
5837
5838        let (received_request_id, received_data, received_metadata) = received_response
5839            .expect("resource response with metadata should complete as ResponseReceived");
5840        assert_eq!(received_request_id, request_id);
5841        assert_eq!(received_data, response_value);
5842        assert_eq!(received_metadata, Some(metadata));
5843    }
5844
5845    #[test]
5846    fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
5847        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5848        let mut rng = OsRng;
5849
5850        init_mgr.set_link_mtu(&link_id, 300);
5851        resp_mgr.set_link_mtu(&link_id, 300);
5852
5853        let payload = vec![0xAB; 350];
5854        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5855        resp_mgr.register_request_handler("/mtu", None, {
5856            let response_value = response_value.clone();
5857            move |_link_id, _path, _data, _remote| Some(response_value.clone())
5858        });
5859
5860        let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
5861        let req_raw = extract_any_send_packet(&req_actions);
5862        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5863        let resp_actions = resp_mgr.handle_local_delivery(
5864            req_pkt.destination_hash,
5865            &req_raw,
5866            req_pkt.packet_hash,
5867            rns_core::transport::types::InterfaceId(0),
5868            &mut rng,
5869        );
5870
5871        let mut has_resource_adv = false;
5872        let mut direct_response_len = None;
5873        for action in &resp_actions {
5874            if let LinkManagerAction::SendPacket { raw, .. } = action {
5875                let pkt = RawPacket::unpack(raw).unwrap();
5876                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5877                if pkt.context == constants::CONTEXT_RESPONSE {
5878                    direct_response_len = Some(raw.len());
5879                }
5880            }
5881        }
5882
5883        assert!(
5884            has_resource_adv,
5885            "responses larger than the negotiated link MTU should use resource fallback"
5886        );
5887        assert!(
5888            direct_response_len.is_none(),
5889            "sent direct response of {} bytes on a 300 byte negotiated MTU",
5890            direct_response_len.unwrap_or_default()
5891        );
5892    }
5893
5894    #[test]
5895    fn test_large_management_response_uses_resource_fallback() {
5896        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
5897        let mut rng = OsRng;
5898
5899        let payload = vec![0xBC; 5000];
5900        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5901        let actions =
5902            resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
5903
5904        let mut has_resource_adv = false;
5905        let mut has_direct_response = false;
5906        for action in &actions {
5907            if let LinkManagerAction::SendPacket { raw, .. } = action {
5908                let pkt = RawPacket::unpack(raw).unwrap();
5909                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5910                has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
5911            }
5912        }
5913
5914        assert!(
5915            has_resource_adv,
5916            "large management responses should advertise a response resource"
5917        );
5918        assert!(
5919            !has_direct_response,
5920            "large management responses should not use a direct CONTEXT_RESPONSE packet"
5921        );
5922    }
5923}