Skip to main content

rns_net/common/
link_manager.rs

1//! Link manager: wires rns-core LinkEngine + Channel + Resource into the driver.
2//!
3//! Manages multiple concurrent links, link destination registration,
4//! request/response handling, resource transfers, and full lifecycle
5//! (handshake → active → teardown).
6//!
7//! Python reference: Link.py, RequestReceipt.py, Resource.py
8
9use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23/// Resource acceptance strategy.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26    /// Reject all incoming resources.
27    AcceptNone,
28    /// Accept all incoming resources automatically.
29    AcceptAll,
30    /// Query the application callback for each resource.
31    AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35    fn default() -> Self {
36        ResourceStrategy::AcceptNone
37    }
38}
39
40/// A managed link wrapping LinkEngine + optional Channel + resources.
41struct ManagedLink {
42    engine: LinkEngine,
43    channel: Option<Channel>,
44    pending_channel_packets: HashMap<[u8; 32], Sequence>,
45    channel_send_ok: u64,
46    channel_send_not_ready: u64,
47    channel_send_too_big: u64,
48    channel_send_other_error: u64,
49    channel_messages_received: u64,
50    channel_proofs_sent: u64,
51    channel_proofs_received: u64,
52    /// Destination hash this link belongs to.
53    dest_hash: [u8; 16],
54    /// Remote identity (hash, public_key) once identified.
55    remote_identity: Option<([u8; 16], [u8; 64])>,
56    /// Destination's Ed25519 signing public key (for initiator to verify LRPROOF).
57    dest_sig_pub_bytes: Option<[u8; 32]>,
58    /// Active incoming resource transfers.
59    incoming_resources: Vec<ResourceReceiver>,
60    /// Active outgoing resource transfers.
61    outgoing_resources: Vec<ResourceSender>,
62    /// Resource acceptance strategy.
63    resource_strategy: ResourceStrategy,
64    /// Interface this link's packets should be sent on when known.
65    route_interface: Option<rns_core::transport::types::InterfaceId>,
66    /// Next-hop transport ID seen on inbound HEADER_2 link traffic.
67    ///
68    /// When present, outbound link packets can be rewritten to HEADER_2 using
69    /// this transport ID to preserve multi-hop routing.
70    route_transport_id: Option<[u8; 16]>,
71}
72
73/// A registered link destination that can accept incoming LINKREQUEST.
74struct LinkDestination {
75    sig_prv: Ed25519PrivateKey,
76    sig_pub_bytes: [u8; 32],
77    resource_strategy: ResourceStrategy,
78}
79
80/// A registered request handler for a path.
81struct RequestHandlerEntry {
82    /// The path this handler serves (e.g. "/status").
83    path: String,
84    /// The truncated hash of the path (first 16 bytes of SHA-256).
85    path_hash: [u8; 16],
86    /// Access control: None means allow all, Some(list) means allow only listed identities.
87    allowed_list: Option<Vec<[u8; 16]>>,
88    /// Handler function: (link_id, path, request_id, data, remote_identity) -> Option<response_data>.
89    handler:
90        Box<dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>> + Send>,
91}
92
93/// Actions produced by LinkManager for the driver to dispatch.
94#[derive(Debug)]
95pub enum LinkManagerAction {
96    /// Send a packet via the transport engine outbound path.
97    SendPacket {
98        raw: Vec<u8>,
99        dest_type: u8,
100        attached_interface: Option<rns_core::transport::types::InterfaceId>,
101    },
102    /// Link established — notify callbacks.
103    LinkEstablished {
104        link_id: LinkId,
105        dest_hash: [u8; 16],
106        rtt: f64,
107        is_initiator: bool,
108    },
109    /// Link closed — notify callbacks.
110    LinkClosed {
111        link_id: LinkId,
112        reason: Option<TeardownReason>,
113    },
114    /// Remote peer identified — notify callbacks.
115    RemoteIdentified {
116        link_id: LinkId,
117        identity_hash: [u8; 16],
118        public_key: [u8; 64],
119    },
120    /// Register a link_id as local destination in transport (for receiving link data).
121    RegisterLinkDest { link_id: LinkId },
122    /// Deregister a link_id from transport local destinations.
123    DeregisterLinkDest { link_id: LinkId },
124    /// A management request that needs to be handled by the driver.
125    /// The driver has access to engine state needed to build the response.
126    ManagementRequest {
127        link_id: LinkId,
128        path_hash: [u8; 16],
129        /// The request data (msgpack-encoded Value from the request array).
130        data: Vec<u8>,
131        /// The request_id (truncated hash of the packed request).
132        request_id: [u8; 16],
133        remote_identity: Option<([u8; 16], [u8; 64])>,
134    },
135    /// Resource data fully received and assembled.
136    ResourceReceived {
137        link_id: LinkId,
138        data: Vec<u8>,
139        metadata: Option<Vec<u8>>,
140    },
141    /// Resource transfer completed (proof validated on sender side).
142    ResourceCompleted { link_id: LinkId },
143    /// Resource transfer failed.
144    ResourceFailed { link_id: LinkId, error: String },
145    /// Resource transfer progress update.
146    ResourceProgress {
147        link_id: LinkId,
148        received: usize,
149        total: usize,
150    },
151    /// Query application whether to accept an incoming resource (for AcceptApp strategy).
152    ResourceAcceptQuery {
153        link_id: LinkId,
154        resource_hash: Vec<u8>,
155        transfer_size: u64,
156        has_metadata: bool,
157    },
158    /// Channel message received on a link.
159    ChannelMessageReceived {
160        link_id: LinkId,
161        msgtype: u16,
162        payload: Vec<u8>,
163    },
164    /// Generic link data received (CONTEXT_NONE).
165    LinkDataReceived {
166        link_id: LinkId,
167        context: u8,
168        data: Vec<u8>,
169    },
170    /// Response received on a link.
171    ResponseReceived {
172        link_id: LinkId,
173        request_id: [u8; 16],
174        data: Vec<u8>,
175    },
176    /// A link request was received (for hook notification).
177    LinkRequestReceived {
178        link_id: LinkId,
179        receiving_interface: rns_core::transport::types::InterfaceId,
180    },
181}
182
183/// Manages multiple links, link destinations, and request/response.
184pub struct LinkManager {
185    links: HashMap<LinkId, ManagedLink>,
186    link_destinations: HashMap<[u8; 16], LinkDestination>,
187    request_handlers: Vec<RequestHandlerEntry>,
188    /// Path hashes that should be handled externally (by the driver) rather than
189    /// by registered handler closures. Used for management destinations.
190    management_paths: Vec<[u8; 16]>,
191}
192
193impl LinkManager {
194    fn resource_sdu_for_link(link: &ManagedLink) -> usize {
195        // Python parity: Resource.sdu = link.mtu - HEADER_MAXSIZE - IFAC_MIN_SIZE
196        // when MTU signalling is available on the link.
197        let mtu = link.engine.mtu() as usize;
198        let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
199        if derived > 0 {
200            derived
201        } else {
202            constants::RESOURCE_SDU
203        }
204    }
205
206    /// Create a new empty link manager.
207    pub fn new() -> Self {
208        LinkManager {
209            links: HashMap::new(),
210            link_destinations: HashMap::new(),
211            request_handlers: Vec::new(),
212            management_paths: Vec::new(),
213        }
214    }
215
216    /// Register a path hash as a management path.
217    /// Management requests are returned as ManagementRequest actions
218    /// for the driver to handle (since they need access to engine state).
219    pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
220        if !self.management_paths.contains(&path_hash) {
221            self.management_paths.push(path_hash);
222        }
223    }
224
225    /// Get the derived session key for a link (needed for hole-punch token derivation).
226    pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
227        self.links
228            .get(link_id)
229            .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
230    }
231
232    /// Return best-known routing hint for link packets.
233    pub fn get_link_route_hint(
234        &self,
235        link_id: &LinkId,
236    ) -> Option<(rns_core::transport::types::InterfaceId, Option<[u8; 16]>)> {
237        self.links.get(link_id).and_then(|link| {
238            link.route_interface
239                .map(|iface| (iface, link.route_transport_id))
240        })
241    }
242
243    /// Register a destination that can accept incoming links.
244    pub fn register_link_destination(
245        &mut self,
246        dest_hash: [u8; 16],
247        sig_prv: Ed25519PrivateKey,
248        sig_pub_bytes: [u8; 32],
249        resource_strategy: ResourceStrategy,
250    ) {
251        self.link_destinations.insert(
252            dest_hash,
253            LinkDestination {
254                sig_prv,
255                sig_pub_bytes,
256                resource_strategy,
257            },
258        );
259    }
260
261    /// Deregister a link destination.
262    pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
263        self.link_destinations.remove(dest_hash);
264    }
265
266    /// Register a request handler for a given path.
267    ///
268    /// `path`: the request path string (e.g. "/status")
269    /// `allowed_list`: None = allow all, Some(list) = restrict to these identity hashes
270    /// `handler`: called with (link_id, path, request_data, remote_identity) -> Option<response>
271    pub fn register_request_handler<F>(
272        &mut self,
273        path: &str,
274        allowed_list: Option<Vec<[u8; 16]>>,
275        handler: F,
276    ) where
277        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
278            + Send
279            + 'static,
280    {
281        let path_hash = compute_path_hash(path);
282        self.request_handlers.push(RequestHandlerEntry {
283            path: path.to_string(),
284            path_hash,
285            allowed_list,
286            handler: Box::new(handler),
287        });
288    }
289
290    /// Create an outbound link to a destination.
291    ///
292    /// `dest_sig_pub_bytes` is the destination's Ed25519 signing public key
293    /// (needed to verify LRPROOF). In Python this comes from the Destination's Identity.
294    ///
295    /// Returns `(link_id, actions)`. The first action will be a SendPacket with
296    /// the LINKREQUEST.
297    pub fn create_link(
298        &mut self,
299        dest_hash: &[u8; 16],
300        dest_sig_pub_bytes: &[u8; 32],
301        hops: u8,
302        mtu: u32,
303        rng: &mut dyn Rng,
304    ) -> (LinkId, Vec<LinkManagerAction>) {
305        let mode = LinkMode::Aes256Cbc;
306        let (mut engine, request_data) =
307            LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
308
309        // Build the LINKREQUEST packet to compute link_id
310        let flags = PacketFlags {
311            header_type: constants::HEADER_1,
312            context_flag: constants::FLAG_UNSET,
313            transport_type: constants::TRANSPORT_BROADCAST,
314            destination_type: constants::DESTINATION_SINGLE,
315            packet_type: constants::PACKET_TYPE_LINKREQUEST,
316        };
317
318        let packet = match RawPacket::pack(
319            flags,
320            0,
321            dest_hash,
322            None,
323            constants::CONTEXT_NONE,
324            &request_data,
325        ) {
326            Ok(p) => p,
327            Err(_) => {
328                // Should not happen with valid data
329                return ([0u8; 16], Vec::new());
330            }
331        };
332
333        engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
334        let link_id = *engine.link_id();
335
336        let managed = ManagedLink {
337            engine,
338            channel: None,
339            pending_channel_packets: HashMap::new(),
340            channel_send_ok: 0,
341            channel_send_not_ready: 0,
342            channel_send_too_big: 0,
343            channel_send_other_error: 0,
344            channel_messages_received: 0,
345            channel_proofs_sent: 0,
346            channel_proofs_received: 0,
347            dest_hash: *dest_hash,
348            remote_identity: None,
349            dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
350            incoming_resources: Vec::new(),
351            outgoing_resources: Vec::new(),
352            resource_strategy: ResourceStrategy::default(),
353            route_interface: None,
354            route_transport_id: None,
355        };
356        self.links.insert(link_id, managed);
357
358        let mut actions = Vec::new();
359        // Register the link_id as a local destination so we can receive LRPROOF
360        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
361        // Send the LINKREQUEST packet
362        actions.push(LinkManagerAction::SendPacket {
363            raw: packet.raw,
364            dest_type: constants::DESTINATION_LINK,
365            attached_interface: None,
366        });
367
368        (link_id, actions)
369    }
370
371    /// Handle a packet delivered locally (via DeliverLocal).
372    ///
373    /// Returns actions for the driver to dispatch. The `dest_hash` is the
374    /// packet's destination_hash field. `raw` is the full packet bytes.
375    /// `packet_hash` is the SHA-256 hash.
376    pub fn handle_local_delivery(
377        &mut self,
378        dest_hash: [u8; 16],
379        raw: &[u8],
380        packet_hash: [u8; 32],
381        receiving_interface: rns_core::transport::types::InterfaceId,
382        rng: &mut dyn Rng,
383    ) -> Vec<LinkManagerAction> {
384        let packet = match RawPacket::unpack(raw) {
385            Ok(p) => p,
386            Err(_) => return Vec::new(),
387        };
388
389        match packet.flags.packet_type {
390            constants::PACKET_TYPE_LINKREQUEST => {
391                self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
392            }
393            constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
394                // LRPROOF: dest_hash is the link_id
395                self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
396            }
397            constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
398            constants::PACKET_TYPE_DATA => {
399                self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
400            }
401            _ => Vec::new(),
402        }
403    }
404
405    /// Handle an incoming LINKREQUEST packet.
406    fn handle_linkrequest(
407        &mut self,
408        dest_hash: &[u8; 16],
409        packet: &RawPacket,
410        receiving_interface: rns_core::transport::types::InterfaceId,
411        rng: &mut dyn Rng,
412    ) -> Vec<LinkManagerAction> {
413        // Look up the link destination
414        let ld = match self.link_destinations.get(dest_hash) {
415            Some(ld) => ld,
416            None => return Vec::new(),
417        };
418
419        let hashable = packet.get_hashable_part();
420        let now = time::now();
421
422        // Create responder engine
423        let (engine, lrproof_data) = match LinkEngine::new_responder(
424            &ld.sig_prv,
425            &ld.sig_pub_bytes,
426            &packet.data,
427            &hashable,
428            dest_hash,
429            packet.hops,
430            now,
431            rng,
432        ) {
433            Ok(r) => r,
434            Err(e) => {
435                log::debug!("LINKREQUEST rejected: {}", e);
436                return Vec::new();
437            }
438        };
439
440        let link_id = *engine.link_id();
441        log::debug!(
442            "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
443            &link_id[..4],
444            receiving_interface.0,
445            packet.flags.header_type,
446            packet.transport_id.is_some(),
447            packet.hops
448        );
449
450        let managed = ManagedLink {
451            engine,
452            channel: None,
453            pending_channel_packets: HashMap::new(),
454            channel_send_ok: 0,
455            channel_send_not_ready: 0,
456            channel_send_too_big: 0,
457            channel_send_other_error: 0,
458            channel_messages_received: 0,
459            channel_proofs_sent: 0,
460            channel_proofs_received: 0,
461            dest_hash: *dest_hash,
462            remote_identity: None,
463            dest_sig_pub_bytes: None,
464            incoming_resources: Vec::new(),
465            outgoing_resources: Vec::new(),
466            resource_strategy: ld.resource_strategy,
467            route_interface: Some(receiving_interface),
468            route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
469                packet.transport_id
470            } else {
471                None
472            },
473        };
474        self.links.insert(link_id, managed);
475
476        // Build LRPROOF packet: type=PROOF, context=LRPROOF, dest=link_id
477        let flags = PacketFlags {
478            header_type: constants::HEADER_1,
479            context_flag: constants::FLAG_UNSET,
480            transport_type: constants::TRANSPORT_BROADCAST,
481            destination_type: constants::DESTINATION_LINK,
482            packet_type: constants::PACKET_TYPE_PROOF,
483        };
484
485        let mut actions = Vec::new();
486
487        // Register link_id as local destination so we receive link data
488        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
489
490        if let Ok(pkt) = RawPacket::pack(
491            flags,
492            packet.hops,
493            &link_id,
494            None,
495            constants::CONTEXT_LRPROOF,
496            &lrproof_data,
497        ) {
498            log::debug!(
499                "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
500                &link_id[..4],
501                receiving_interface.0,
502                packet.transport_id.is_some(),
503                packet.hops
504            );
505            actions.push(LinkManagerAction::SendPacket {
506                raw: pkt.raw,
507                dest_type: constants::DESTINATION_LINK,
508                attached_interface: None,
509            });
510        }
511
512        // Reticulum interop fallback #1: queue an LRPROOF variant with
513        // hop=0, matching peers that validate LRPROOF with hop=0 semantics.
514        if packet.hops != 0 {
515            if let Ok(pkt0) = RawPacket::pack(
516                flags,
517                0,
518                &link_id,
519                None,
520                constants::CONTEXT_LRPROOF,
521                &lrproof_data,
522            ) {
523                log::debug!(
524                    "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
525                    &link_id[..4],
526                    receiving_interface.0
527                );
528                actions.push(LinkManagerAction::SendPacket {
529                    raw: pkt0.raw,
530                    dest_type: constants::DESTINATION_LINK,
531                    attached_interface: None,
532                });
533            }
534        }
535
536        // Reticulum interop fallback #2: queue an LRPROOF +1 hop variant for
537        // peers that validate against a remaining-hop value derived
538        // differently from destination-side delivery hops.
539        if packet.hops < u8::MAX {
540            let hops_plus_one = packet.hops + 1;
541            if let Ok(pkt2) = RawPacket::pack(
542                flags,
543                hops_plus_one,
544                &link_id,
545                None,
546                constants::CONTEXT_LRPROOF,
547                &lrproof_data,
548            ) {
549                log::debug!(
550                    "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
551                    &link_id[..4],
552                    receiving_interface.0,
553                    hops_plus_one
554                );
555                actions.push(LinkManagerAction::SendPacket {
556                    raw: pkt2.raw,
557                    dest_type: constants::DESTINATION_LINK,
558                    attached_interface: None,
559                });
560            }
561        }
562
563        // Notify hook system about the incoming link request
564        actions.push(LinkManagerAction::LinkRequestReceived {
565            link_id,
566            receiving_interface,
567        });
568
569        actions
570    }
571
572    fn handle_link_proof(
573        &mut self,
574        link_id: &LinkId,
575        packet: &RawPacket,
576        rng: &mut dyn Rng,
577    ) -> Vec<LinkManagerAction> {
578        if packet.data.len() < 32 {
579            return Vec::new();
580        }
581
582        let mut tracked_hash = [0u8; 32];
583        tracked_hash.copy_from_slice(&packet.data[..32]);
584
585        let Some(link) = self.links.get_mut(link_id) else {
586            return Vec::new();
587        };
588        let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
589            return Vec::new();
590        };
591        link.channel_proofs_received += 1;
592        let Some(channel) = link.channel.as_mut() else {
593            return Vec::new();
594        };
595
596        let chan_actions = channel.packet_delivered(sequence);
597        let _ = channel;
598        let _ = link;
599        self.process_channel_actions(link_id, chan_actions, rng)
600    }
601
602    fn build_link_packet_proof(
603        &mut self,
604        link_id: &LinkId,
605        packet_hash: &[u8; 32],
606    ) -> Vec<LinkManagerAction> {
607        let dest_hash = match self.links.get(link_id) {
608            Some(link) => link.dest_hash,
609            None => return Vec::new(),
610        };
611        let Some(ld) = self.link_destinations.get(&dest_hash) else {
612            return Vec::new();
613        };
614        if let Some(link) = self.links.get_mut(link_id) {
615            link.channel_proofs_sent += 1;
616        }
617
618        let signature = ld.sig_prv.sign(packet_hash);
619        let mut proof_data = Vec::with_capacity(96);
620        proof_data.extend_from_slice(packet_hash);
621        proof_data.extend_from_slice(&signature);
622
623        let flags = PacketFlags {
624            header_type: constants::HEADER_1,
625            context_flag: constants::FLAG_UNSET,
626            transport_type: constants::TRANSPORT_BROADCAST,
627            destination_type: constants::DESTINATION_LINK,
628            packet_type: constants::PACKET_TYPE_PROOF,
629        };
630        if let Ok(pkt) = RawPacket::pack(
631            flags,
632            0,
633            link_id,
634            None,
635            constants::CONTEXT_NONE,
636            &proof_data,
637        ) {
638            vec![LinkManagerAction::SendPacket {
639                raw: pkt.raw,
640                dest_type: constants::DESTINATION_LINK,
641                attached_interface: None,
642            }]
643        } else {
644            Vec::new()
645        }
646    }
647
648    /// Handle an incoming LRPROOF packet (initiator side).
649    fn handle_lrproof(
650        &mut self,
651        link_id_bytes: &[u8; 16],
652        packet: &RawPacket,
653        receiving_interface: rns_core::transport::types::InterfaceId,
654        rng: &mut dyn Rng,
655    ) -> Vec<LinkManagerAction> {
656        let link = match self.links.get_mut(link_id_bytes) {
657            Some(l) => l,
658            None => return Vec::new(),
659        };
660
661        link.route_interface = Some(receiving_interface);
662        if packet.flags.header_type == constants::HEADER_2 {
663            if let Some(transport_id) = packet.transport_id {
664                link.route_transport_id = Some(transport_id);
665            }
666        }
667        log::debug!(
668            "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
669            &link_id_bytes[..4],
670            receiving_interface.0,
671            packet.flags.header_type,
672            packet.transport_id.is_some()
673        );
674
675        if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
676            return Vec::new();
677        }
678
679        // The destination's signing pub key was stored when create_link was called
680        let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
681            Some(b) => b,
682            None => {
683                log::debug!("LRPROOF: no destination signing key available");
684                return Vec::new();
685            }
686        };
687
688        let now = time::now();
689        let (lrrtt_encrypted, link_actions) =
690            match link
691                .engine
692                .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
693            {
694                Ok(r) => r,
695                Err(e) => {
696                    log::debug!("LRPROOF validation failed: {}", e);
697                    return Vec::new();
698                }
699            };
700
701        let link_id = *link.engine.link_id();
702        let mut actions = Vec::new();
703
704        // Process link actions (StateChanged, LinkEstablished)
705        actions.extend(self.process_link_actions(&link_id, &link_actions));
706
707        // Send LRRTT: type=DATA, context=LRRTT, dest=link_id
708        let flags = PacketFlags {
709            header_type: constants::HEADER_1,
710            context_flag: constants::FLAG_UNSET,
711            transport_type: constants::TRANSPORT_BROADCAST,
712            destination_type: constants::DESTINATION_LINK,
713            packet_type: constants::PACKET_TYPE_DATA,
714        };
715
716        if let Ok(pkt) = RawPacket::pack(
717            flags,
718            0,
719            &link_id,
720            None,
721            constants::CONTEXT_LRRTT,
722            &lrrtt_encrypted,
723        ) {
724            actions.push(LinkManagerAction::SendPacket {
725                raw: pkt.raw,
726                dest_type: constants::DESTINATION_LINK,
727                attached_interface: None,
728            });
729        }
730
731        // Initialize channel now that link is active
732        if let Some(link) = self.links.get_mut(&link_id) {
733            if link.engine.state() == LinkState::Active {
734                let rtt = link.engine.rtt().unwrap_or(1.0);
735                link.channel = Some(Channel::new(rtt));
736            }
737        }
738
739        actions
740    }
741
742    /// Handle DATA packets on an established link.
743    ///
744    /// Structured to avoid borrow checker issues: we perform engine operations
745    /// on the link, collect intermediate results, drop the mutable borrow, then
746    /// call self methods that need immutable access.
747    fn handle_link_data(
748        &mut self,
749        link_id_bytes: &[u8; 16],
750        packet: &RawPacket,
751        packet_hash: [u8; 32],
752        receiving_interface: rns_core::transport::types::InterfaceId,
753        rng: &mut dyn Rng,
754    ) -> Vec<LinkManagerAction> {
755        // First pass: perform engine operations, collect results
756        enum LinkDataResult {
757            Lrrtt {
758                link_id: LinkId,
759                link_actions: Vec<LinkAction>,
760            },
761            Identify {
762                link_id: LinkId,
763                link_actions: Vec<LinkAction>,
764            },
765            Keepalive {
766                link_id: LinkId,
767                inbound_actions: Vec<LinkAction>,
768            },
769            LinkClose {
770                link_id: LinkId,
771                teardown_actions: Vec<LinkAction>,
772            },
773            Channel {
774                link_id: LinkId,
775                inbound_actions: Vec<LinkAction>,
776                plaintext: Vec<u8>,
777                packet_hash: [u8; 32],
778            },
779            Request {
780                link_id: LinkId,
781                inbound_actions: Vec<LinkAction>,
782                plaintext: Vec<u8>,
783                request_id: [u8; 16],
784            },
785            Response {
786                link_id: LinkId,
787                inbound_actions: Vec<LinkAction>,
788                plaintext: Vec<u8>,
789            },
790            Generic {
791                link_id: LinkId,
792                inbound_actions: Vec<LinkAction>,
793                plaintext: Vec<u8>,
794                context: u8,
795                packet_hash: [u8; 32],
796            },
797            /// Resource advertisement (link-decrypted).
798            ResourceAdv {
799                link_id: LinkId,
800                inbound_actions: Vec<LinkAction>,
801                plaintext: Vec<u8>,
802            },
803            /// Resource part request (link-decrypted).
804            ResourceReq {
805                link_id: LinkId,
806                inbound_actions: Vec<LinkAction>,
807                plaintext: Vec<u8>,
808            },
809            /// Resource hashmap update (link-decrypted).
810            ResourceHmu {
811                link_id: LinkId,
812                inbound_actions: Vec<LinkAction>,
813                plaintext: Vec<u8>,
814            },
815            /// Resource part data (NOT link-decrypted; parts are pre-encrypted by ResourceSender).
816            ResourcePart {
817                link_id: LinkId,
818                inbound_actions: Vec<LinkAction>,
819                raw_data: Vec<u8>,
820            },
821            /// Resource proof (feed to sender).
822            ResourcePrf {
823                link_id: LinkId,
824                inbound_actions: Vec<LinkAction>,
825                plaintext: Vec<u8>,
826            },
827            /// Resource cancel from initiator (link-decrypted).
828            ResourceIcl {
829                link_id: LinkId,
830                inbound_actions: Vec<LinkAction>,
831            },
832            /// Resource cancel from receiver (link-decrypted).
833            ResourceRcl {
834                link_id: LinkId,
835                inbound_actions: Vec<LinkAction>,
836            },
837            Error,
838        }
839
840        let result = {
841            let link = match self.links.get_mut(link_id_bytes) {
842                Some(l) => l,
843                None => return Vec::new(),
844            };
845
846            link.route_interface = Some(receiving_interface);
847            if packet.flags.header_type == constants::HEADER_2 {
848                if let Some(transport_id) = packet.transport_id {
849                    link.route_transport_id = Some(transport_id);
850                }
851            }
852
853            match packet.context {
854                constants::CONTEXT_LRRTT => {
855                    match link.engine.handle_lrrtt(&packet.data, time::now()) {
856                        Ok(link_actions) => {
857                            let link_id = *link.engine.link_id();
858                            LinkDataResult::Lrrtt {
859                                link_id,
860                                link_actions,
861                            }
862                        }
863                        Err(e) => {
864                            log::debug!("LRRTT handling failed: {}", e);
865                            LinkDataResult::Error
866                        }
867                    }
868                }
869                constants::CONTEXT_LINKIDENTIFY => {
870                    match link.engine.handle_identify(&packet.data) {
871                        Ok(link_actions) => {
872                            let link_id = *link.engine.link_id();
873                            link.remote_identity = link.engine.remote_identity().cloned();
874                            LinkDataResult::Identify {
875                                link_id,
876                                link_actions,
877                            }
878                        }
879                        Err(e) => {
880                            log::debug!("LINKIDENTIFY failed: {}", e);
881                            LinkDataResult::Error
882                        }
883                    }
884                }
885                constants::CONTEXT_KEEPALIVE => {
886                    let inbound_actions = link.engine.record_inbound(time::now());
887                    let link_id = *link.engine.link_id();
888                    LinkDataResult::Keepalive {
889                        link_id,
890                        inbound_actions,
891                    }
892                }
893                constants::CONTEXT_LINKCLOSE => {
894                    let teardown_actions = link.engine.handle_teardown();
895                    let link_id = *link.engine.link_id();
896                    LinkDataResult::LinkClose {
897                        link_id,
898                        teardown_actions,
899                    }
900                }
901                constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
902                    Ok(plaintext) => {
903                        let inbound_actions = link.engine.record_inbound(time::now());
904                        let link_id = *link.engine.link_id();
905                        LinkDataResult::Channel {
906                            link_id,
907                            inbound_actions,
908                            plaintext,
909                            packet_hash,
910                        }
911                    }
912                    Err(_) => LinkDataResult::Error,
913                },
914                constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
915                    Ok(plaintext) => {
916                        let inbound_actions = link.engine.record_inbound(time::now());
917                        let link_id = *link.engine.link_id();
918                        let request_id = packet.get_truncated_hash();
919                        LinkDataResult::Request {
920                            link_id,
921                            inbound_actions,
922                            plaintext,
923                            request_id,
924                        }
925                    }
926                    Err(_) => LinkDataResult::Error,
927                },
928                constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
929                    Ok(plaintext) => {
930                        let inbound_actions = link.engine.record_inbound(time::now());
931                        let link_id = *link.engine.link_id();
932                        LinkDataResult::Response {
933                            link_id,
934                            inbound_actions,
935                            plaintext,
936                        }
937                    }
938                    Err(_) => LinkDataResult::Error,
939                },
940                // --- Resource contexts ---
941                constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
942                    Ok(plaintext) => {
943                        let inbound_actions = link.engine.record_inbound(time::now());
944                        let link_id = *link.engine.link_id();
945                        LinkDataResult::ResourceAdv {
946                            link_id,
947                            inbound_actions,
948                            plaintext,
949                        }
950                    }
951                    Err(_) => LinkDataResult::Error,
952                },
953                constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
954                    Ok(plaintext) => {
955                        let inbound_actions = link.engine.record_inbound(time::now());
956                        let link_id = *link.engine.link_id();
957                        LinkDataResult::ResourceReq {
958                            link_id,
959                            inbound_actions,
960                            plaintext,
961                        }
962                    }
963                    Err(_) => LinkDataResult::Error,
964                },
965                constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
966                    Ok(plaintext) => {
967                        let inbound_actions = link.engine.record_inbound(time::now());
968                        let link_id = *link.engine.link_id();
969                        LinkDataResult::ResourceHmu {
970                            link_id,
971                            inbound_actions,
972                            plaintext,
973                        }
974                    }
975                    Err(_) => LinkDataResult::Error,
976                },
977                constants::CONTEXT_RESOURCE => {
978                    // Resource parts are NOT link-decrypted — they're pre-encrypted by ResourceSender
979                    let inbound_actions = link.engine.record_inbound(time::now());
980                    let link_id = *link.engine.link_id();
981                    LinkDataResult::ResourcePart {
982                        link_id,
983                        inbound_actions,
984                        raw_data: packet.data.clone(),
985                    }
986                }
987                constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
988                    Ok(plaintext) => {
989                        let inbound_actions = link.engine.record_inbound(time::now());
990                        let link_id = *link.engine.link_id();
991                        LinkDataResult::ResourcePrf {
992                            link_id,
993                            inbound_actions,
994                            plaintext,
995                        }
996                    }
997                    Err(_) => LinkDataResult::Error,
998                },
999                constants::CONTEXT_RESOURCE_ICL => {
1000                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1001                    let inbound_actions = link.engine.record_inbound(time::now());
1002                    let link_id = *link.engine.link_id();
1003                    LinkDataResult::ResourceIcl {
1004                        link_id,
1005                        inbound_actions,
1006                    }
1007                }
1008                constants::CONTEXT_RESOURCE_RCL => {
1009                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1010                    let inbound_actions = link.engine.record_inbound(time::now());
1011                    let link_id = *link.engine.link_id();
1012                    LinkDataResult::ResourceRcl {
1013                        link_id,
1014                        inbound_actions,
1015                    }
1016                }
1017                _ => match link.engine.decrypt(&packet.data) {
1018                    Ok(plaintext) => {
1019                        let inbound_actions = link.engine.record_inbound(time::now());
1020                        let link_id = *link.engine.link_id();
1021                        LinkDataResult::Generic {
1022                            link_id,
1023                            inbound_actions,
1024                            plaintext,
1025                            context: packet.context,
1026                            packet_hash,
1027                        }
1028                    }
1029                    Err(_) => LinkDataResult::Error,
1030                },
1031            }
1032        }; // mutable borrow of self.links dropped here
1033
1034        // Second pass: process results using self methods
1035        let mut actions = Vec::new();
1036        match result {
1037            LinkDataResult::Lrrtt {
1038                link_id,
1039                link_actions,
1040            } => {
1041                actions.extend(self.process_link_actions(&link_id, &link_actions));
1042                // Initialize channel
1043                if let Some(link) = self.links.get_mut(&link_id) {
1044                    if link.engine.state() == LinkState::Active {
1045                        let rtt = link.engine.rtt().unwrap_or(1.0);
1046                        link.channel = Some(Channel::new(rtt));
1047                    }
1048                }
1049            }
1050            LinkDataResult::Identify {
1051                link_id,
1052                link_actions,
1053            } => {
1054                actions.extend(self.process_link_actions(&link_id, &link_actions));
1055            }
1056            LinkDataResult::Keepalive {
1057                link_id,
1058                inbound_actions,
1059            } => {
1060                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1061                // record_inbound() already updated last_inbound, so the link
1062                // won't go stale.  The regular tick() keepalive mechanism will
1063                // send keepalives when needs_keepalive() returns true.
1064                // Do NOT reply here — unconditional replies create an infinite
1065                // ping-pong loop between the two link endpoints.
1066            }
1067            LinkDataResult::LinkClose {
1068                link_id,
1069                teardown_actions,
1070            } => {
1071                actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1072            }
1073            LinkDataResult::Channel {
1074                link_id,
1075                inbound_actions,
1076                plaintext,
1077                packet_hash,
1078            } => {
1079                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1080                // Feed plaintext to channel
1081                if let Some(link) = self.links.get_mut(&link_id) {
1082                    if let Some(ref mut channel) = link.channel {
1083                        let chan_actions = channel.receive(&plaintext, time::now());
1084                        link.channel_messages_received += chan_actions
1085                            .iter()
1086                            .filter(|action| {
1087                                matches!(
1088                                    action,
1089                                    rns_core::channel::ChannelAction::MessageReceived { .. }
1090                                )
1091                            })
1092                            .count()
1093                            as u64;
1094                        // process_channel_actions needs immutable self, so collect first
1095                        let _ = link;
1096                        actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1097                    }
1098                }
1099                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1100            }
1101            LinkDataResult::Request {
1102                link_id,
1103                inbound_actions,
1104                plaintext,
1105                request_id,
1106            } => {
1107                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1108                actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1109            }
1110            LinkDataResult::Response {
1111                link_id,
1112                inbound_actions,
1113                plaintext,
1114            } => {
1115                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1116                // Unpack msgpack response: [Bin(request_id), response_value]
1117                actions.extend(self.handle_response(&link_id, &plaintext));
1118            }
1119            LinkDataResult::Generic {
1120                link_id,
1121                inbound_actions,
1122                plaintext,
1123                context,
1124                packet_hash,
1125            } => {
1126                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1127                actions.push(LinkManagerAction::LinkDataReceived {
1128                    link_id,
1129                    context,
1130                    data: plaintext,
1131                });
1132
1133                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1134            }
1135            LinkDataResult::ResourceAdv {
1136                link_id,
1137                inbound_actions,
1138                plaintext,
1139            } => {
1140                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1141                actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1142            }
1143            LinkDataResult::ResourceReq {
1144                link_id,
1145                inbound_actions,
1146                plaintext,
1147            } => {
1148                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1149                actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1150            }
1151            LinkDataResult::ResourceHmu {
1152                link_id,
1153                inbound_actions,
1154                plaintext,
1155            } => {
1156                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1157                actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1158            }
1159            LinkDataResult::ResourcePart {
1160                link_id,
1161                inbound_actions,
1162                raw_data,
1163            } => {
1164                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1165                actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1166            }
1167            LinkDataResult::ResourcePrf {
1168                link_id,
1169                inbound_actions,
1170                plaintext,
1171            } => {
1172                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1173                actions.extend(self.handle_resource_prf(&link_id, &plaintext));
1174            }
1175            LinkDataResult::ResourceIcl {
1176                link_id,
1177                inbound_actions,
1178            } => {
1179                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1180                actions.extend(self.handle_resource_icl(&link_id));
1181            }
1182            LinkDataResult::ResourceRcl {
1183                link_id,
1184                inbound_actions,
1185            } => {
1186                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1187                actions.extend(self.handle_resource_rcl(&link_id));
1188            }
1189            LinkDataResult::Error => {}
1190        }
1191
1192        actions
1193    }
1194
1195    /// Handle a request on a link.
1196    fn handle_request(
1197        &mut self,
1198        link_id: &LinkId,
1199        plaintext: &[u8],
1200        request_id: [u8; 16],
1201        rng: &mut dyn Rng,
1202    ) -> Vec<LinkManagerAction> {
1203        use rns_core::msgpack::{self, Value};
1204
1205        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1206        let arr = match msgpack::unpack_exact(plaintext) {
1207            Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1208            _ => return Vec::new(),
1209        };
1210
1211        let path_hash_bytes = match &arr[1] {
1212            Value::Bin(b) if b.len() == 16 => b,
1213            _ => return Vec::new(),
1214        };
1215        let mut path_hash = [0u8; 16];
1216        path_hash.copy_from_slice(path_hash_bytes);
1217
1218        // Re-encode the data element for the handler
1219        let request_data = msgpack::pack(&arr[2]);
1220
1221        // Check if this is a management path (handled by the driver)
1222        if self.management_paths.contains(&path_hash) {
1223            let remote_identity = self
1224                .links
1225                .get(link_id)
1226                .and_then(|l| l.remote_identity)
1227                .map(|(h, k)| (h, k));
1228            return vec![LinkManagerAction::ManagementRequest {
1229                link_id: *link_id,
1230                path_hash,
1231                data: request_data,
1232                request_id,
1233                remote_identity,
1234            }];
1235        }
1236
1237        // Look up handler by path_hash
1238        let handler_idx = self
1239            .request_handlers
1240            .iter()
1241            .position(|h| h.path_hash == path_hash);
1242        let handler_idx = match handler_idx {
1243            Some(i) => i,
1244            None => return Vec::new(),
1245        };
1246
1247        // Check ACL
1248        let remote_identity = self
1249            .links
1250            .get(link_id)
1251            .and_then(|l| l.remote_identity.as_ref());
1252        let handler = &self.request_handlers[handler_idx];
1253        if let Some(ref allowed) = handler.allowed_list {
1254            match remote_identity {
1255                Some((identity_hash, _)) => {
1256                    if !allowed.contains(identity_hash) {
1257                        log::debug!("Request denied: identity not in allowed list");
1258                        return Vec::new();
1259                    }
1260                }
1261                None => {
1262                    log::debug!("Request denied: peer not identified");
1263                    return Vec::new();
1264                }
1265            }
1266        }
1267
1268        // Call handler
1269        let path = handler.path.clone();
1270        let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1271
1272        let mut actions = Vec::new();
1273        if let Some(response_data) = response {
1274            let mut response_actions =
1275                self.build_response_packet(link_id, &request_id, &response_data, rng);
1276            if response_actions.is_empty() {
1277                response_actions.extend(self.send_response_resource(
1278                    link_id,
1279                    &request_id,
1280                    &response_data,
1281                    rng,
1282                ));
1283            }
1284            actions.extend(response_actions);
1285        }
1286
1287        actions
1288    }
1289
1290    /// Build a response packet for a request.
1291    /// `response_data` is the msgpack-encoded response value.
1292    fn build_response_packet(
1293        &self,
1294        link_id: &LinkId,
1295        request_id: &[u8; 16],
1296        response_data: &[u8],
1297        rng: &mut dyn Rng,
1298    ) -> Vec<LinkManagerAction> {
1299        use rns_core::msgpack::{self, Value};
1300
1301        let response_value = msgpack::unpack_exact(response_data)
1302            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1303
1304        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1305        let response_plaintext = msgpack::pack(&response_array);
1306
1307        let mut actions = Vec::new();
1308        if let Some(link) = self.links.get(link_id) {
1309            if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1310                let flags = PacketFlags {
1311                    header_type: constants::HEADER_1,
1312                    context_flag: constants::FLAG_UNSET,
1313                    transport_type: constants::TRANSPORT_BROADCAST,
1314                    destination_type: constants::DESTINATION_LINK,
1315                    packet_type: constants::PACKET_TYPE_DATA,
1316                };
1317                let max_mtu = link.engine.mtu() as usize;
1318                if let Ok(pkt) = RawPacket::pack_with_max_mtu(
1319                    flags,
1320                    0,
1321                    link_id,
1322                    None,
1323                    constants::CONTEXT_RESPONSE,
1324                    &encrypted,
1325                    max_mtu,
1326                ) {
1327                    actions.push(LinkManagerAction::SendPacket {
1328                        raw: pkt.raw,
1329                        dest_type: constants::DESTINATION_LINK,
1330                        attached_interface: None,
1331                    });
1332                }
1333            }
1334        }
1335        actions
1336    }
1337
1338    fn send_response_resource(
1339        &mut self,
1340        link_id: &LinkId,
1341        request_id: &[u8; 16],
1342        response_data: &[u8],
1343        rng: &mut dyn Rng,
1344    ) -> Vec<LinkManagerAction> {
1345        use rns_core::msgpack::{self, Value};
1346
1347        let link = match self.links.get_mut(link_id) {
1348            Some(l) => l,
1349            None => return Vec::new(),
1350        };
1351
1352        if link.engine.state() != LinkState::Active {
1353            return Vec::new();
1354        }
1355
1356        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1357        let resource_sdu = Self::resource_sdu_for_link(link);
1358        let now = time::now();
1359
1360        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1361        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1362            link.engine
1363                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1364                .unwrap_or_else(|_| plaintext.to_vec())
1365        };
1366
1367        // Match Python resource response format from Link.handle_request:
1368        // packed_response = msgpack([request_id, response_value])
1369        // where response_value is decoded msgpack value, or Bin(raw bytes).
1370        let response_value = msgpack::unpack_exact(response_data)
1371            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1372        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1373        let resource_payload = msgpack::pack(&response_array);
1374
1375        let sender = match ResourceSender::new(
1376            &resource_payload,
1377            None,
1378            resource_sdu,
1379            &encrypt_fn,
1380            &Bzip2Compressor,
1381            rng,
1382            now,
1383            true, // auto_compress
1384            true, // is_response
1385            Some(request_id.to_vec()),
1386            1,    // segment_index
1387            1,    // total_segments
1388            None, // original_hash
1389            link_rtt,
1390            6.0, // traffic_timeout_factor
1391        ) {
1392            Ok(s) => s,
1393            Err(e) => {
1394                log::debug!("Failed to create response ResourceSender: {}", e);
1395                return Vec::new();
1396            }
1397        };
1398
1399        let mut sender = sender;
1400        let adv_actions = sender.advertise(now);
1401        link.outgoing_resources.push(sender);
1402
1403        let _ = link;
1404        self.process_resource_actions(link_id, adv_actions, rng)
1405    }
1406
1407    /// Send a management response on a link.
1408    /// Called by the driver after building the response for a ManagementRequest.
1409    pub fn send_management_response(
1410        &mut self,
1411        link_id: &LinkId,
1412        request_id: &[u8; 16],
1413        response_data: &[u8],
1414        rng: &mut dyn Rng,
1415    ) -> Vec<LinkManagerAction> {
1416        let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1417        if actions.is_empty() {
1418            actions.extend(self.send_response_resource(
1419                link_id,
1420                request_id,
1421                response_data,
1422                rng,
1423            ));
1424        }
1425        actions
1426    }
1427
1428    /// Send a request on a link.
1429    ///
1430    /// `data` is the msgpack-encoded request data value (e.g. msgpack([True]) for /status).
1431    ///
1432    /// Uses Python-compatible format: plaintext = msgpack([timestamp, path_hash_bytes, data_value]).
1433    /// Returns actions (the encrypted request packet). The response will arrive
1434    /// later via handle_local_delivery with CONTEXT_RESPONSE.
1435    pub fn send_request(
1436        &self,
1437        link_id: &LinkId,
1438        path: &str,
1439        data: &[u8],
1440        rng: &mut dyn Rng,
1441    ) -> Vec<LinkManagerAction> {
1442        use rns_core::msgpack::{self, Value};
1443
1444        let link = match self.links.get(link_id) {
1445            Some(l) => l,
1446            None => return Vec::new(),
1447        };
1448
1449        if link.engine.state() != LinkState::Active {
1450            return Vec::new();
1451        }
1452
1453        let path_hash = compute_path_hash(path);
1454
1455        // Decode data bytes to msgpack Value (or use Bin if can't decode)
1456        let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1457
1458        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1459        let request_array = Value::Array(vec![
1460            Value::Float(time::now()),
1461            Value::Bin(path_hash.to_vec()),
1462            data_value,
1463        ]);
1464        let plaintext = msgpack::pack(&request_array);
1465
1466        let encrypted = match link.engine.encrypt(&plaintext, rng) {
1467            Ok(e) => e,
1468            Err(_) => return Vec::new(),
1469        };
1470
1471        let flags = PacketFlags {
1472            header_type: constants::HEADER_1,
1473            context_flag: constants::FLAG_UNSET,
1474            transport_type: constants::TRANSPORT_BROADCAST,
1475            destination_type: constants::DESTINATION_LINK,
1476            packet_type: constants::PACKET_TYPE_DATA,
1477        };
1478
1479        let mut actions = Vec::new();
1480        if let Ok(pkt) = RawPacket::pack(
1481            flags,
1482            0,
1483            link_id,
1484            None,
1485            constants::CONTEXT_REQUEST,
1486            &encrypted,
1487        ) {
1488            actions.push(LinkManagerAction::SendPacket {
1489                raw: pkt.raw,
1490                dest_type: constants::DESTINATION_LINK,
1491                attached_interface: None,
1492            });
1493        }
1494        actions
1495    }
1496
1497    /// Send encrypted data on a link with a given context.
1498    pub fn send_on_link(
1499        &self,
1500        link_id: &LinkId,
1501        plaintext: &[u8],
1502        context: u8,
1503        rng: &mut dyn Rng,
1504    ) -> Vec<LinkManagerAction> {
1505        let link = match self.links.get(link_id) {
1506            Some(l) => l,
1507            None => return Vec::new(),
1508        };
1509
1510        if link.engine.state() != LinkState::Active {
1511            return Vec::new();
1512        }
1513
1514        let encrypted = match link.engine.encrypt(plaintext, rng) {
1515            Ok(e) => e,
1516            Err(_) => return Vec::new(),
1517        };
1518
1519        let flags = PacketFlags {
1520            header_type: constants::HEADER_1,
1521            context_flag: constants::FLAG_UNSET,
1522            transport_type: constants::TRANSPORT_BROADCAST,
1523            destination_type: constants::DESTINATION_LINK,
1524            packet_type: constants::PACKET_TYPE_DATA,
1525        };
1526
1527        let mut actions = Vec::new();
1528        if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, context, &encrypted) {
1529            actions.push(LinkManagerAction::SendPacket {
1530                raw: pkt.raw,
1531                dest_type: constants::DESTINATION_LINK,
1532                attached_interface: None,
1533            });
1534        }
1535        actions
1536    }
1537
1538    /// Send an identify message on a link (initiator reveals identity to responder).
1539    pub fn identify(
1540        &self,
1541        link_id: &LinkId,
1542        identity: &rns_crypto::identity::Identity,
1543        rng: &mut dyn Rng,
1544    ) -> Vec<LinkManagerAction> {
1545        let link = match self.links.get(link_id) {
1546            Some(l) => l,
1547            None => return Vec::new(),
1548        };
1549
1550        let encrypted = match link.engine.build_identify(identity, rng) {
1551            Ok(e) => e,
1552            Err(_) => return Vec::new(),
1553        };
1554
1555        let flags = PacketFlags {
1556            header_type: constants::HEADER_1,
1557            context_flag: constants::FLAG_UNSET,
1558            transport_type: constants::TRANSPORT_BROADCAST,
1559            destination_type: constants::DESTINATION_LINK,
1560            packet_type: constants::PACKET_TYPE_DATA,
1561        };
1562
1563        let mut actions = Vec::new();
1564        if let Ok(pkt) = RawPacket::pack(
1565            flags,
1566            0,
1567            link_id,
1568            None,
1569            constants::CONTEXT_LINKIDENTIFY,
1570            &encrypted,
1571        ) {
1572            actions.push(LinkManagerAction::SendPacket {
1573                raw: pkt.raw,
1574                dest_type: constants::DESTINATION_LINK,
1575                attached_interface: None,
1576            });
1577        }
1578        actions
1579    }
1580
1581    /// Tear down a link.
1582    pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1583        let link = match self.links.get_mut(link_id) {
1584            Some(l) => l,
1585            None => return Vec::new(),
1586        };
1587
1588        let teardown_actions = link.engine.teardown();
1589        if let Some(ref mut channel) = link.channel {
1590            channel.shutdown();
1591        }
1592
1593        let mut actions = self.process_link_actions(link_id, &teardown_actions);
1594
1595        // Send LINKCLOSE packet
1596        let flags = PacketFlags {
1597            header_type: constants::HEADER_1,
1598            context_flag: constants::FLAG_UNSET,
1599            transport_type: constants::TRANSPORT_BROADCAST,
1600            destination_type: constants::DESTINATION_LINK,
1601            packet_type: constants::PACKET_TYPE_DATA,
1602        };
1603        if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_LINKCLOSE, &[])
1604        {
1605            actions.push(LinkManagerAction::SendPacket {
1606                raw: pkt.raw,
1607                dest_type: constants::DESTINATION_LINK,
1608                attached_interface: None,
1609            });
1610        }
1611
1612        actions
1613    }
1614
1615    /// Tear down all managed links.
1616    pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1617        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1618        let mut actions = Vec::new();
1619        for link_id in link_ids {
1620            actions.extend(self.teardown_link(&link_id));
1621        }
1622        actions
1623    }
1624
1625    /// Handle a response on a link.
1626    fn handle_response(&self, link_id: &LinkId, plaintext: &[u8]) -> Vec<LinkManagerAction> {
1627        use rns_core::msgpack;
1628
1629        // Python-compatible response: msgpack([Bin(request_id), response_value])
1630        let arr = match msgpack::unpack_exact(plaintext) {
1631            Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1632            _ => return Vec::new(),
1633        };
1634
1635        let request_id_bytes = match &arr[0] {
1636            msgpack::Value::Bin(b) if b.len() == 16 => b,
1637            _ => return Vec::new(),
1638        };
1639        let mut request_id = [0u8; 16];
1640        request_id.copy_from_slice(request_id_bytes);
1641
1642        let response_data = msgpack::pack(&arr[1]);
1643
1644        vec![LinkManagerAction::ResponseReceived {
1645            link_id: *link_id,
1646            request_id,
1647            data: response_data,
1648        }]
1649    }
1650
1651    /// Handle resource advertisement (CONTEXT_RESOURCE_ADV).
1652    fn handle_resource_adv(
1653        &mut self,
1654        link_id: &LinkId,
1655        adv_plaintext: &[u8],
1656        rng: &mut dyn Rng,
1657    ) -> Vec<LinkManagerAction> {
1658        let link = match self.links.get_mut(link_id) {
1659            Some(l) => l,
1660            None => return Vec::new(),
1661        };
1662
1663        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1664        let resource_sdu = Self::resource_sdu_for_link(link);
1665        let now = time::now();
1666
1667        let receiver = match ResourceReceiver::from_advertisement(
1668            adv_plaintext,
1669            resource_sdu,
1670            link_rtt,
1671            now,
1672            None,
1673            None,
1674        ) {
1675            Ok(r) => r,
1676            Err(e) => {
1677                log::debug!("Resource ADV rejected: {}", e);
1678                return Vec::new();
1679            }
1680        };
1681
1682        let strategy = link.resource_strategy;
1683        let resource_hash = receiver.resource_hash.clone();
1684        let transfer_size = receiver.transfer_size;
1685        let has_metadata = receiver.has_metadata;
1686        let is_response = receiver.flags.is_response;
1687
1688        if is_response {
1689            // Response resources bypass the application acceptance strategy —
1690            // they are answers to pending requests, not independent resources.
1691            link.incoming_resources.push(receiver);
1692            let idx = link.incoming_resources.len() - 1;
1693            let resource_actions = link.incoming_resources[idx].accept(now);
1694            let _ = link;
1695            return self.process_resource_actions(link_id, resource_actions, rng);
1696        }
1697
1698        match strategy {
1699            ResourceStrategy::AcceptNone => {
1700                // Reject: send RCL
1701                let reject_actions = {
1702                    let mut r = receiver;
1703                    r.reject()
1704                };
1705                self.process_resource_actions(link_id, reject_actions, rng)
1706            }
1707            ResourceStrategy::AcceptAll => {
1708                link.incoming_resources.push(receiver);
1709                let idx = link.incoming_resources.len() - 1;
1710                let resource_actions = link.incoming_resources[idx].accept(now);
1711                let _ = link;
1712                self.process_resource_actions(link_id, resource_actions, rng)
1713            }
1714            ResourceStrategy::AcceptApp => {
1715                link.incoming_resources.push(receiver);
1716                // Query application callback
1717                vec![LinkManagerAction::ResourceAcceptQuery {
1718                    link_id: *link_id,
1719                    resource_hash,
1720                    transfer_size,
1721                    has_metadata,
1722                }]
1723            }
1724        }
1725    }
1726
1727    /// Accept or reject a pending resource (for AcceptApp strategy).
1728    pub fn accept_resource(
1729        &mut self,
1730        link_id: &LinkId,
1731        resource_hash: &[u8],
1732        accept: bool,
1733        rng: &mut dyn Rng,
1734    ) -> Vec<LinkManagerAction> {
1735        let link = match self.links.get_mut(link_id) {
1736            Some(l) => l,
1737            None => return Vec::new(),
1738        };
1739
1740        let now = time::now();
1741        let idx = link
1742            .incoming_resources
1743            .iter()
1744            .position(|r| r.resource_hash == resource_hash);
1745        let idx = match idx {
1746            Some(i) => i,
1747            None => return Vec::new(),
1748        };
1749
1750        let resource_actions = if accept {
1751            link.incoming_resources[idx].accept(now)
1752        } else {
1753            link.incoming_resources[idx].reject()
1754        };
1755
1756        let _ = link;
1757        self.process_resource_actions(link_id, resource_actions, rng)
1758    }
1759
1760    /// Handle resource request (CONTEXT_RESOURCE_REQ) — feed to sender.
1761    fn handle_resource_req(
1762        &mut self,
1763        link_id: &LinkId,
1764        plaintext: &[u8],
1765        rng: &mut dyn Rng,
1766    ) -> Vec<LinkManagerAction> {
1767        let link = match self.links.get_mut(link_id) {
1768            Some(l) => l,
1769            None => return Vec::new(),
1770        };
1771
1772        let now = time::now();
1773        let mut all_actions = Vec::new();
1774        for sender in &mut link.outgoing_resources {
1775            let resource_actions = sender.handle_request(plaintext, now);
1776            if !resource_actions.is_empty() {
1777                all_actions.extend(resource_actions);
1778                break;
1779            }
1780        }
1781
1782        let _ = link;
1783        self.process_resource_actions(link_id, all_actions, rng)
1784    }
1785
1786    /// Handle resource HMU (CONTEXT_RESOURCE_HMU) — feed to receiver.
1787    fn handle_resource_hmu(
1788        &mut self,
1789        link_id: &LinkId,
1790        plaintext: &[u8],
1791        rng: &mut dyn Rng,
1792    ) -> Vec<LinkManagerAction> {
1793        let link = match self.links.get_mut(link_id) {
1794            Some(l) => l,
1795            None => return Vec::new(),
1796        };
1797
1798        let now = time::now();
1799        let mut all_actions = Vec::new();
1800        for receiver in &mut link.incoming_resources {
1801            let resource_actions = receiver.handle_hashmap_update(plaintext, now);
1802            if !resource_actions.is_empty() {
1803                all_actions.extend(resource_actions);
1804                break;
1805            }
1806        }
1807
1808        let _ = link;
1809        self.process_resource_actions(link_id, all_actions, rng)
1810    }
1811
1812    /// Handle resource part (CONTEXT_RESOURCE) — feed raw to receiver.
1813    fn handle_resource_part(
1814        &mut self,
1815        link_id: &LinkId,
1816        raw_data: &[u8],
1817        rng: &mut dyn Rng,
1818    ) -> Vec<LinkManagerAction> {
1819        let link = match self.links.get_mut(link_id) {
1820            Some(l) => l,
1821            None => return Vec::new(),
1822        };
1823
1824        let now = time::now();
1825        let mut all_actions = Vec::new();
1826        let mut assemble_idx = None;
1827        let mut assembled_is_response = false;
1828
1829        for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
1830            let resource_actions = receiver.receive_part(raw_data, now);
1831            if !resource_actions.is_empty() {
1832                if receiver.received_count == receiver.total_parts {
1833                    assemble_idx = Some(idx);
1834                }
1835                all_actions.extend(resource_actions);
1836                break;
1837            }
1838        }
1839
1840        if let Some(idx) = assemble_idx {
1841            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
1842                link.engine.decrypt(ciphertext).map_err(|_| ())
1843            };
1844            let assemble_actions =
1845                link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
1846            assembled_is_response = link.incoming_resources[idx].flags.is_response;
1847            all_actions.extend(assemble_actions);
1848        }
1849
1850        let _ = link;
1851        let mut out = self.process_resource_actions(link_id, all_actions, rng);
1852
1853        if assembled_is_response {
1854            let mut converted = Vec::new();
1855            for action in out {
1856                match action {
1857                    LinkManagerAction::ResourceReceived { data, .. } => {
1858                        converted.extend(self.handle_response(link_id, &data));
1859                    }
1860                    LinkManagerAction::ResourceAcceptQuery { .. } => {
1861                        // Response resources bypass application acceptance
1862                    }
1863                    other => converted.push(other),
1864                }
1865            }
1866            out = converted;
1867        }
1868
1869        out
1870    }
1871
1872    /// Handle resource proof (CONTEXT_RESOURCE_PRF) — feed to sender.
1873    fn handle_resource_prf(
1874        &mut self,
1875        link_id: &LinkId,
1876        plaintext: &[u8],
1877    ) -> Vec<LinkManagerAction> {
1878        let link = match self.links.get_mut(link_id) {
1879            Some(l) => l,
1880            None => return Vec::new(),
1881        };
1882
1883        let now = time::now();
1884        let mut result_actions = Vec::new();
1885        for sender in &mut link.outgoing_resources {
1886            let resource_actions = sender.handle_proof(plaintext, now);
1887            if !resource_actions.is_empty() {
1888                result_actions.extend(resource_actions);
1889                break;
1890            }
1891        }
1892
1893        // Convert to LinkManagerActions
1894        let mut actions = Vec::new();
1895        for ra in result_actions {
1896            match ra {
1897                ResourceAction::Completed => {
1898                    actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
1899                }
1900                ResourceAction::Failed(e) => {
1901                    actions.push(LinkManagerAction::ResourceFailed {
1902                        link_id: *link_id,
1903                        error: format!("{}", e),
1904                    });
1905                }
1906                _ => {}
1907            }
1908        }
1909
1910        // Clean up completed/failed senders
1911        link.outgoing_resources
1912            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1913
1914        actions
1915    }
1916
1917    /// Handle cancel from initiator (CONTEXT_RESOURCE_ICL).
1918    fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1919        let link = match self.links.get_mut(link_id) {
1920            Some(l) => l,
1921            None => return Vec::new(),
1922        };
1923
1924        let mut actions = Vec::new();
1925        for receiver in &mut link.incoming_resources {
1926            let ra = receiver.handle_cancel();
1927            for a in ra {
1928                if let ResourceAction::Failed(ref e) = a {
1929                    actions.push(LinkManagerAction::ResourceFailed {
1930                        link_id: *link_id,
1931                        error: format!("{}", e),
1932                    });
1933                }
1934            }
1935        }
1936        link.incoming_resources
1937            .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
1938        actions
1939    }
1940
1941    /// Handle cancel from receiver (CONTEXT_RESOURCE_RCL).
1942    fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1943        let link = match self.links.get_mut(link_id) {
1944            Some(l) => l,
1945            None => return Vec::new(),
1946        };
1947
1948        let mut actions = Vec::new();
1949        for sender in &mut link.outgoing_resources {
1950            let ra = sender.handle_reject();
1951            for a in ra {
1952                if let ResourceAction::Failed(ref e) = a {
1953                    actions.push(LinkManagerAction::ResourceFailed {
1954                        link_id: *link_id,
1955                        error: format!("{}", e),
1956                    });
1957                }
1958            }
1959        }
1960        link.outgoing_resources
1961            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1962        actions
1963    }
1964
1965    /// Convert ResourceActions to LinkManagerActions.
1966    fn process_resource_actions(
1967        &self,
1968        link_id: &LinkId,
1969        actions: Vec<ResourceAction>,
1970        rng: &mut dyn Rng,
1971    ) -> Vec<LinkManagerAction> {
1972        let link = match self.links.get(link_id) {
1973            Some(l) => l,
1974            None => return Vec::new(),
1975        };
1976
1977        let mut result = Vec::new();
1978        for action in actions {
1979            match action {
1980                ResourceAction::SendAdvertisement(data) => {
1981                    // Link-encrypt and send as CONTEXT_RESOURCE_ADV
1982                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
1983                        result.extend(self.build_link_packet(
1984                            link_id,
1985                            constants::CONTEXT_RESOURCE_ADV,
1986                            &encrypted,
1987                        ));
1988                    }
1989                }
1990                ResourceAction::SendPart(data) => {
1991                    // Parts are NOT link-encrypted — send raw as CONTEXT_RESOURCE
1992                    result.extend(self.build_link_packet(
1993                        link_id,
1994                        constants::CONTEXT_RESOURCE,
1995                        &data,
1996                    ));
1997                }
1998                ResourceAction::SendRequest(data) => {
1999                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2000                        result.extend(self.build_link_packet(
2001                            link_id,
2002                            constants::CONTEXT_RESOURCE_REQ,
2003                            &encrypted,
2004                        ));
2005                    }
2006                }
2007                ResourceAction::SendHmu(data) => {
2008                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2009                        result.extend(self.build_link_packet(
2010                            link_id,
2011                            constants::CONTEXT_RESOURCE_HMU,
2012                            &encrypted,
2013                        ));
2014                    }
2015                }
2016                ResourceAction::SendProof(data) => {
2017                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2018                        result.extend(self.build_link_packet(
2019                            link_id,
2020                            constants::CONTEXT_RESOURCE_PRF,
2021                            &encrypted,
2022                        ));
2023                    }
2024                }
2025                ResourceAction::SendCancelInitiator(data) => {
2026                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2027                        result.extend(self.build_link_packet(
2028                            link_id,
2029                            constants::CONTEXT_RESOURCE_ICL,
2030                            &encrypted,
2031                        ));
2032                    }
2033                }
2034                ResourceAction::SendCancelReceiver(data) => {
2035                    if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2036                        result.extend(self.build_link_packet(
2037                            link_id,
2038                            constants::CONTEXT_RESOURCE_RCL,
2039                            &encrypted,
2040                        ));
2041                    }
2042                }
2043                ResourceAction::DataReceived { data, metadata } => {
2044                    result.push(LinkManagerAction::ResourceReceived {
2045                        link_id: *link_id,
2046                        data,
2047                        metadata,
2048                    });
2049                }
2050                ResourceAction::Completed => {
2051                    result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2052                }
2053                ResourceAction::Failed(e) => {
2054                    result.push(LinkManagerAction::ResourceFailed {
2055                        link_id: *link_id,
2056                        error: format!("{}", e),
2057                    });
2058                }
2059                ResourceAction::ProgressUpdate { received, total } => {
2060                    result.push(LinkManagerAction::ResourceProgress {
2061                        link_id: *link_id,
2062                        received,
2063                        total,
2064                    });
2065                }
2066            }
2067        }
2068        result
2069    }
2070
2071    /// Build a link DATA packet with a given context and data.
2072    fn build_link_packet(
2073        &self,
2074        link_id: &LinkId,
2075        context: u8,
2076        data: &[u8],
2077    ) -> Vec<LinkManagerAction> {
2078        let flags = PacketFlags {
2079            header_type: constants::HEADER_1,
2080            context_flag: constants::FLAG_UNSET,
2081            transport_type: constants::TRANSPORT_BROADCAST,
2082            destination_type: constants::DESTINATION_LINK,
2083            packet_type: constants::PACKET_TYPE_DATA,
2084        };
2085        let mut actions = Vec::new();
2086        let max_mtu = self
2087            .links
2088            .get(link_id)
2089            .map(|l| l.engine.mtu() as usize)
2090            .unwrap_or(constants::MTU);
2091        if let Ok(pkt) = RawPacket::pack_with_max_mtu(flags, 0, link_id, None, context, data, max_mtu)
2092        {
2093            actions.push(LinkManagerAction::SendPacket {
2094                raw: pkt.raw,
2095                dest_type: constants::DESTINATION_LINK,
2096                attached_interface: None,
2097            });
2098        }
2099        actions
2100    }
2101
2102    /// Start sending a resource on a link.
2103    pub fn send_resource(
2104        &mut self,
2105        link_id: &LinkId,
2106        data: &[u8],
2107        metadata: Option<&[u8]>,
2108        rng: &mut dyn Rng,
2109    ) -> Vec<LinkManagerAction> {
2110        let link = match self.links.get_mut(link_id) {
2111            Some(l) => l,
2112            None => return Vec::new(),
2113        };
2114
2115        if link.engine.state() != LinkState::Active {
2116            return Vec::new();
2117        }
2118
2119        let link_rtt = link.engine.rtt().unwrap_or(1.0);
2120        let resource_sdu = Self::resource_sdu_for_link(link);
2121        let now = time::now();
2122
2123        // Use RefCell for interior mutability since ResourceSender::new expects &dyn Fn (not FnMut)
2124        // but link.engine.encrypt needs &mut dyn Rng
2125        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
2126        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
2127            link.engine
2128                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
2129                .unwrap_or_else(|_| plaintext.to_vec())
2130        };
2131
2132        let sender = match ResourceSender::new(
2133            data,
2134            metadata,
2135            resource_sdu,
2136            &encrypt_fn,
2137            &Bzip2Compressor,
2138            rng,
2139            now,
2140            true,  // auto_compress
2141            false, // is_response
2142            None,  // request_id
2143            1,     // segment_index
2144            1,     // total_segments
2145            None,  // original_hash
2146            link_rtt,
2147            6.0, // traffic_timeout_factor
2148        ) {
2149            Ok(s) => s,
2150            Err(e) => {
2151                log::debug!("Failed to create ResourceSender: {}", e);
2152                return Vec::new();
2153            }
2154        };
2155
2156        let mut sender = sender;
2157        let adv_actions = sender.advertise(now);
2158        link.outgoing_resources.push(sender);
2159
2160        let _ = link;
2161        self.process_resource_actions(link_id, adv_actions, rng)
2162    }
2163
2164    /// Set the resource acceptance strategy for a link.
2165    pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2166        if let Some(link) = self.links.get_mut(link_id) {
2167            link.resource_strategy = strategy;
2168        }
2169    }
2170
2171    /// Flush the channel TX ring for a link, clearing outstanding messages.
2172    /// Called after holepunch completion where signaling messages are fire-and-forget.
2173    pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2174        if let Some(link) = self.links.get_mut(link_id) {
2175            if let Some(ref mut channel) = link.channel {
2176                channel.flush_tx();
2177            }
2178        }
2179    }
2180
2181    /// Send a channel message on a link.
2182    pub fn send_channel_message(
2183        &mut self,
2184        link_id: &LinkId,
2185        msgtype: u16,
2186        payload: &[u8],
2187        rng: &mut dyn Rng,
2188    ) -> Result<Vec<LinkManagerAction>, String> {
2189        let link = match self.links.get_mut(link_id) {
2190            Some(l) => l,
2191            None => return Err("unknown link".to_string()),
2192        };
2193
2194        let channel = match link.channel {
2195            Some(ref mut ch) => ch,
2196            None => return Err("link has no active channel".to_string()),
2197        };
2198
2199        let link_mdu = link.engine.mdu();
2200        let now = time::now();
2201        let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2202            Ok(a) => {
2203                link.channel_send_ok += 1;
2204                a
2205            }
2206            Err(e) => {
2207                log::debug!("Channel send failed: {:?}", e);
2208                match e {
2209                    rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2210                    rns_core::channel::ChannelError::MessageTooBig => {
2211                        link.channel_send_too_big += 1;
2212                    }
2213                    rns_core::channel::ChannelError::InvalidEnvelope => {
2214                        link.channel_send_other_error += 1;
2215                    }
2216                }
2217                return Err(e.to_string());
2218            }
2219        };
2220
2221        let _ = link;
2222        Ok(self.process_channel_actions(link_id, chan_actions, rng))
2223    }
2224
2225    /// Periodic tick: check keepalive, stale, timeouts for all links.
2226    pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2227        let now = time::now();
2228        let mut all_actions = Vec::new();
2229
2230        // Collect link_ids to avoid borrow issues
2231        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2232
2233        for link_id in &link_ids {
2234            let link = match self.links.get_mut(link_id) {
2235                Some(l) => l,
2236                None => continue,
2237            };
2238
2239            // Tick the engine
2240            let tick_actions = link.engine.tick(now);
2241            all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2242
2243            // Check if keepalive is needed
2244            let link = match self.links.get_mut(link_id) {
2245                Some(l) => l,
2246                None => continue,
2247            };
2248            if link.engine.needs_keepalive(now) {
2249                // Send keepalive packet (empty data with CONTEXT_KEEPALIVE)
2250                let flags = PacketFlags {
2251                    header_type: constants::HEADER_1,
2252                    context_flag: constants::FLAG_UNSET,
2253                    transport_type: constants::TRANSPORT_BROADCAST,
2254                    destination_type: constants::DESTINATION_LINK,
2255                    packet_type: constants::PACKET_TYPE_DATA,
2256                };
2257                if let Ok(pkt) =
2258                    RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_KEEPALIVE, &[])
2259                {
2260                    all_actions.push(LinkManagerAction::SendPacket {
2261                        raw: pkt.raw,
2262                        dest_type: constants::DESTINATION_LINK,
2263                        attached_interface: None,
2264                    });
2265                    link.engine.record_outbound(now, true);
2266                }
2267            }
2268
2269            if let Some(channel) = link.channel.as_mut() {
2270                let chan_actions = channel.tick(now);
2271                let _ = channel;
2272                let _ = link;
2273                all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2274            }
2275        }
2276
2277        // Tick resource senders and receivers
2278        for link_id in &link_ids {
2279            let link = match self.links.get_mut(link_id) {
2280                Some(l) => l,
2281                None => continue,
2282            };
2283
2284            // Tick outgoing resources (senders)
2285            let mut sender_actions = Vec::new();
2286            for sender in &mut link.outgoing_resources {
2287                sender_actions.extend(sender.tick(now));
2288            }
2289
2290            // Tick incoming resources (receivers)
2291            let mut receiver_actions = Vec::new();
2292            for receiver in &mut link.incoming_resources {
2293                let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2294                    link.engine.decrypt(ciphertext).map_err(|_| ())
2295                };
2296                receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2297            }
2298
2299            // Clean up completed/failed resources
2300            link.outgoing_resources
2301                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2302            link.incoming_resources
2303                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2304
2305            let _ = link;
2306            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2307            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2308        }
2309
2310        // Clean up closed links
2311        let closed: Vec<LinkId> = self
2312            .links
2313            .iter()
2314            .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2315            .map(|(id, _)| *id)
2316            .collect();
2317        for id in closed {
2318            self.links.remove(&id);
2319            all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2320        }
2321
2322        all_actions
2323    }
2324
2325    /// Check if a destination hash is a known link_id managed by this manager.
2326    pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2327        self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2328    }
2329
2330    /// Get the state of a link.
2331    pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2332        self.links.get(link_id).map(|l| l.engine.state())
2333    }
2334
2335    /// Get the RTT of a link.
2336    pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2337        self.links.get(link_id).and_then(|l| l.engine.rtt())
2338    }
2339
2340    /// Update the RTT of a link (e.g., after path redirect to a direct connection).
2341    pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2342        if let Some(link) = self.links.get_mut(link_id) {
2343            link.engine.set_rtt(rtt);
2344        }
2345    }
2346
2347    /// Reset the inbound timer for a link (e.g., after path redirect).
2348    pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2349        if let Some(link) = self.links.get_mut(link_id) {
2350            link.engine.record_inbound(time::now());
2351        }
2352    }
2353
2354    /// Update the MTU of a link (e.g., after path redirect to a different interface).
2355    pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2356        if let Some(link) = self.links.get_mut(link_id) {
2357            link.engine.set_mtu(mtu);
2358        }
2359    }
2360
2361    /// Get the number of active links.
2362    pub fn link_count(&self) -> usize {
2363        self.links.len()
2364    }
2365
2366    /// Get the number of active resource transfers across all links.
2367    pub fn resource_transfer_count(&self) -> usize {
2368        self.links
2369            .values()
2370            .map(|managed| managed.incoming_resources.len() + managed.outgoing_resources.len())
2371            .sum()
2372    }
2373
2374    /// Cancel all active resource transfers and return the generated actions.
2375    pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2376        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2377        let mut all_actions = Vec::new();
2378
2379        for link_id in &link_ids {
2380            let link = match self.links.get_mut(link_id) {
2381                Some(l) => l,
2382                None => continue,
2383            };
2384
2385            let mut sender_actions = Vec::new();
2386            for sender in &mut link.outgoing_resources {
2387                sender_actions.extend(sender.cancel());
2388            }
2389
2390            let mut receiver_actions = Vec::new();
2391            for receiver in &mut link.incoming_resources {
2392                receiver_actions.extend(receiver.reject());
2393            }
2394
2395            link.outgoing_resources
2396                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2397            link.incoming_resources
2398                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2399
2400            let _ = link;
2401            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2402            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2403        }
2404
2405        all_actions
2406    }
2407
2408    /// Get information about all active links.
2409    pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
2410        self.links
2411            .iter()
2412            .map(|(link_id, managed)| {
2413                let state = match managed.engine.state() {
2414                    LinkState::Pending => "pending",
2415                    LinkState::Handshake => "handshake",
2416                    LinkState::Active => "active",
2417                    LinkState::Stale => "stale",
2418                    LinkState::Closed => "closed",
2419                };
2420                crate::event::LinkInfoEntry {
2421                    link_id: *link_id,
2422                    state: state.to_string(),
2423                    is_initiator: managed.engine.is_initiator(),
2424                    dest_hash: managed.dest_hash,
2425                    remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
2426                    rtt: managed.engine.rtt(),
2427                    channel_window: managed.channel.as_ref().map(|c| c.window()),
2428                    channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
2429                    pending_channel_packets: managed.pending_channel_packets.len(),
2430                    channel_send_ok: managed.channel_send_ok,
2431                    channel_send_not_ready: managed.channel_send_not_ready,
2432                    channel_send_too_big: managed.channel_send_too_big,
2433                    channel_send_other_error: managed.channel_send_other_error,
2434                    channel_messages_received: managed.channel_messages_received,
2435                    channel_proofs_sent: managed.channel_proofs_sent,
2436                    channel_proofs_received: managed.channel_proofs_received,
2437                }
2438            })
2439            .collect()
2440    }
2441
2442    /// Get information about all active resource transfers.
2443    pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
2444        let mut entries = Vec::new();
2445        for (link_id, managed) in &self.links {
2446            for recv in &managed.incoming_resources {
2447                let (received, total) = recv.progress();
2448                entries.push(crate::event::ResourceInfoEntry {
2449                    link_id: *link_id,
2450                    direction: "incoming".to_string(),
2451                    total_parts: total,
2452                    transferred_parts: received,
2453                    complete: received >= total && total > 0,
2454                });
2455            }
2456            for send in &managed.outgoing_resources {
2457                let total = send.total_parts();
2458                let sent = send.sent_parts;
2459                entries.push(crate::event::ResourceInfoEntry {
2460                    link_id: *link_id,
2461                    direction: "outgoing".to_string(),
2462                    total_parts: total,
2463                    transferred_parts: sent,
2464                    complete: sent >= total && total > 0,
2465                });
2466            }
2467        }
2468        entries
2469    }
2470
2471    /// Convert LinkActions to LinkManagerActions.
2472    fn process_link_actions(
2473        &self,
2474        link_id: &LinkId,
2475        actions: &[LinkAction],
2476    ) -> Vec<LinkManagerAction> {
2477        let mut result = Vec::new();
2478        for action in actions {
2479            match action {
2480                LinkAction::StateChanged {
2481                    new_state, reason, ..
2482                } => match new_state {
2483                    LinkState::Closed => {
2484                        result.push(LinkManagerAction::LinkClosed {
2485                            link_id: *link_id,
2486                            reason: *reason,
2487                        });
2488                    }
2489                    _ => {}
2490                },
2491                LinkAction::LinkEstablished {
2492                    rtt, is_initiator, ..
2493                } => {
2494                    let dest_hash = self
2495                        .links
2496                        .get(link_id)
2497                        .map(|l| l.dest_hash)
2498                        .unwrap_or([0u8; 16]);
2499                    result.push(LinkManagerAction::LinkEstablished {
2500                        link_id: *link_id,
2501                        dest_hash,
2502                        rtt: *rtt,
2503                        is_initiator: *is_initiator,
2504                    });
2505                }
2506                LinkAction::RemoteIdentified {
2507                    identity_hash,
2508                    public_key,
2509                    ..
2510                } => {
2511                    result.push(LinkManagerAction::RemoteIdentified {
2512                        link_id: *link_id,
2513                        identity_hash: *identity_hash,
2514                        public_key: *public_key,
2515                    });
2516                }
2517                LinkAction::DataReceived { .. } => {
2518                    // Data delivery is handled at a higher level
2519                }
2520            }
2521        }
2522        result
2523    }
2524
2525    /// Convert ChannelActions to LinkManagerActions.
2526    fn process_channel_actions(
2527        &mut self,
2528        link_id: &LinkId,
2529        actions: Vec<rns_core::channel::ChannelAction>,
2530        rng: &mut dyn Rng,
2531    ) -> Vec<LinkManagerAction> {
2532        let mut result = Vec::new();
2533        for action in actions {
2534            match action {
2535                rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
2536                    // Encrypt and send as CHANNEL context
2537                    let encrypted = match self.links.get(link_id) {
2538                        Some(link) => match link.engine.encrypt(&raw, rng) {
2539                            Ok(encrypted) => encrypted,
2540                            Err(_) => continue,
2541                        },
2542                        None => continue,
2543                    };
2544                    let flags = PacketFlags {
2545                        header_type: constants::HEADER_1,
2546                        context_flag: constants::FLAG_UNSET,
2547                        transport_type: constants::TRANSPORT_BROADCAST,
2548                        destination_type: constants::DESTINATION_LINK,
2549                        packet_type: constants::PACKET_TYPE_DATA,
2550                    };
2551                    if let Ok(pkt) = RawPacket::pack(
2552                        flags,
2553                        0,
2554                        link_id,
2555                        None,
2556                        constants::CONTEXT_CHANNEL,
2557                        &encrypted,
2558                    ) {
2559                        if let Some(link_mut) = self.links.get_mut(link_id) {
2560                            link_mut
2561                                .pending_channel_packets
2562                                .insert(pkt.packet_hash, sequence);
2563                        }
2564                        result.push(LinkManagerAction::SendPacket {
2565                            raw: pkt.raw,
2566                            dest_type: constants::DESTINATION_LINK,
2567                            attached_interface: None,
2568                        });
2569                    }
2570                }
2571                rns_core::channel::ChannelAction::MessageReceived {
2572                    msgtype, payload, ..
2573                } => {
2574                    result.push(LinkManagerAction::ChannelMessageReceived {
2575                        link_id: *link_id,
2576                        msgtype,
2577                        payload,
2578                    });
2579                }
2580                rns_core::channel::ChannelAction::TeardownLink => {
2581                    result.push(LinkManagerAction::LinkClosed {
2582                        link_id: *link_id,
2583                        reason: Some(TeardownReason::Timeout),
2584                    });
2585                }
2586            }
2587        }
2588        result
2589    }
2590}
2591
2592/// Compute a path hash from a path string.
2593/// Uses truncated SHA-256 (first 16 bytes).
2594fn compute_path_hash(path: &str) -> [u8; 16] {
2595    let full = rns_core::hash::full_hash(path.as_bytes());
2596    let mut result = [0u8; 16];
2597    result.copy_from_slice(&full[..16]);
2598    result
2599}
2600
2601#[cfg(test)]
2602mod tests {
2603    use super::*;
2604    use rns_crypto::identity::Identity;
2605    use rns_crypto::{FixedRng, OsRng};
2606
2607    fn make_rng(seed: u8) -> FixedRng {
2608        FixedRng::new(&[seed; 128])
2609    }
2610
2611    fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
2612        let sig_prv = Ed25519PrivateKey::generate(rng);
2613        let sig_pub_bytes = sig_prv.public_key().public_bytes();
2614        (sig_prv, sig_pub_bytes)
2615    }
2616
2617    #[test]
2618    fn test_register_link_destination() {
2619        let mut mgr = LinkManager::new();
2620        let mut rng = make_rng(0x01);
2621        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2622        let dest_hash = [0xDD; 16];
2623
2624        mgr.register_link_destination(
2625            dest_hash,
2626            sig_prv,
2627            sig_pub_bytes,
2628            ResourceStrategy::AcceptNone,
2629        );
2630        assert!(mgr.is_link_destination(&dest_hash));
2631
2632        mgr.deregister_link_destination(&dest_hash);
2633        assert!(!mgr.is_link_destination(&dest_hash));
2634    }
2635
2636    #[test]
2637    fn test_create_link() {
2638        let mut mgr = LinkManager::new();
2639        let mut rng = OsRng;
2640        let dest_hash = [0xDD; 16];
2641
2642        let sig_pub_bytes = [0xAA; 32]; // dummy sig pub for test
2643        let (link_id, actions) = mgr.create_link(
2644            &dest_hash,
2645            &sig_pub_bytes,
2646            1,
2647            constants::MTU as u32,
2648            &mut rng,
2649        );
2650        assert_ne!(link_id, [0u8; 16]);
2651        // Should have RegisterLinkDest + SendPacket
2652        assert_eq!(actions.len(), 2);
2653        assert!(matches!(
2654            actions[0],
2655            LinkManagerAction::RegisterLinkDest { .. }
2656        ));
2657        assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
2658
2659        // Link should be in Pending state
2660        assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
2661    }
2662
2663    #[test]
2664    fn test_full_handshake_via_manager() {
2665        let mut rng = OsRng;
2666        let dest_hash = [0xDD; 16];
2667
2668        // Setup responder
2669        let mut responder_mgr = LinkManager::new();
2670        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2671        responder_mgr.register_link_destination(
2672            dest_hash,
2673            sig_prv,
2674            sig_pub_bytes,
2675            ResourceStrategy::AcceptNone,
2676        );
2677
2678        // Setup initiator
2679        let mut initiator_mgr = LinkManager::new();
2680
2681        // Step 1: Initiator creates link (needs dest signing pub key for LRPROOF verification)
2682        let (link_id, init_actions) = initiator_mgr.create_link(
2683            &dest_hash,
2684            &sig_pub_bytes,
2685            1,
2686            constants::MTU as u32,
2687            &mut rng,
2688        );
2689        assert_eq!(init_actions.len(), 2);
2690
2691        // Extract the LINKREQUEST packet raw bytes
2692        let linkrequest_raw = match &init_actions[1] {
2693            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2694            _ => panic!("Expected SendPacket"),
2695        };
2696
2697        // Parse to get packet_hash and dest_hash
2698        let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
2699
2700        // Step 2: Responder handles LINKREQUEST
2701        let resp_actions = responder_mgr.handle_local_delivery(
2702            lr_packet.destination_hash,
2703            &linkrequest_raw,
2704            lr_packet.packet_hash,
2705            rns_core::transport::types::InterfaceId(0),
2706            &mut rng,
2707        );
2708        // Should have RegisterLinkDest + SendPacket(LRPROOF)
2709        assert!(resp_actions.len() >= 2);
2710        assert!(matches!(
2711            resp_actions[0],
2712            LinkManagerAction::RegisterLinkDest { .. }
2713        ));
2714
2715        // Extract LRPROOF packet
2716        let lrproof_raw = match &resp_actions[1] {
2717            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2718            _ => panic!("Expected SendPacket for LRPROOF"),
2719        };
2720
2721        // Step 3: Initiator handles LRPROOF
2722        let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
2723        let init_actions2 = initiator_mgr.handle_local_delivery(
2724            lrproof_packet.destination_hash,
2725            &lrproof_raw,
2726            lrproof_packet.packet_hash,
2727            rns_core::transport::types::InterfaceId(0),
2728            &mut rng,
2729        );
2730
2731        // Should have LinkEstablished + SendPacket(LRRTT)
2732        let has_established = init_actions2
2733            .iter()
2734            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2735        assert!(has_established, "Initiator should emit LinkEstablished");
2736
2737        // Extract LRRTT
2738        let lrrtt_raw = init_actions2
2739            .iter()
2740            .find_map(|a| match a {
2741                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
2742                _ => None,
2743            })
2744            .expect("Should have LRRTT SendPacket");
2745
2746        // Step 4: Responder handles LRRTT
2747        let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
2748        let resp_link_id = lrrtt_packet.destination_hash;
2749        let resp_actions2 = responder_mgr.handle_local_delivery(
2750            resp_link_id,
2751            &lrrtt_raw,
2752            lrrtt_packet.packet_hash,
2753            rns_core::transport::types::InterfaceId(0),
2754            &mut rng,
2755        );
2756
2757        let has_established = resp_actions2
2758            .iter()
2759            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2760        assert!(has_established, "Responder should emit LinkEstablished");
2761
2762        // Both sides should be Active
2763        assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
2764        assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
2765
2766        // Both should have RTT
2767        assert!(initiator_mgr.link_rtt(&link_id).is_some());
2768        assert!(responder_mgr.link_rtt(&link_id).is_some());
2769    }
2770
2771    #[test]
2772    fn test_encrypted_data_exchange() {
2773        let mut rng = OsRng;
2774        let dest_hash = [0xDD; 16];
2775        let mut resp_mgr = LinkManager::new();
2776        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2777        resp_mgr.register_link_destination(
2778            dest_hash,
2779            sig_prv,
2780            sig_pub_bytes,
2781            ResourceStrategy::AcceptNone,
2782        );
2783        let mut init_mgr = LinkManager::new();
2784
2785        // Handshake
2786        let (link_id, init_actions) = init_mgr.create_link(
2787            &dest_hash,
2788            &sig_pub_bytes,
2789            1,
2790            constants::MTU as u32,
2791            &mut rng,
2792        );
2793        let lr_raw = extract_send_packet(&init_actions);
2794        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2795        let resp_actions = resp_mgr.handle_local_delivery(
2796            lr_pkt.destination_hash,
2797            &lr_raw,
2798            lr_pkt.packet_hash,
2799            rns_core::transport::types::InterfaceId(0),
2800            &mut rng,
2801        );
2802        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2803        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2804        let init_actions2 = init_mgr.handle_local_delivery(
2805            lrproof_pkt.destination_hash,
2806            &lrproof_raw,
2807            lrproof_pkt.packet_hash,
2808            rns_core::transport::types::InterfaceId(0),
2809            &mut rng,
2810        );
2811        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2812        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2813        resp_mgr.handle_local_delivery(
2814            lrrtt_pkt.destination_hash,
2815            &lrrtt_raw,
2816            lrrtt_pkt.packet_hash,
2817            rns_core::transport::types::InterfaceId(0),
2818            &mut rng,
2819        );
2820
2821        // Send data from initiator to responder
2822        let actions =
2823            init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
2824        assert_eq!(actions.len(), 1);
2825        assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
2826    }
2827
2828    #[test]
2829    fn test_request_response() {
2830        let mut rng = OsRng;
2831        let dest_hash = [0xDD; 16];
2832        let mut resp_mgr = LinkManager::new();
2833        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2834        resp_mgr.register_link_destination(
2835            dest_hash,
2836            sig_prv,
2837            sig_pub_bytes,
2838            ResourceStrategy::AcceptNone,
2839        );
2840
2841        // Register a request handler
2842        resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
2843            Some(b"OK".to_vec())
2844        });
2845
2846        let mut init_mgr = LinkManager::new();
2847
2848        // Complete handshake
2849        let (link_id, init_actions) = init_mgr.create_link(
2850            &dest_hash,
2851            &sig_pub_bytes,
2852            1,
2853            constants::MTU as u32,
2854            &mut rng,
2855        );
2856        let lr_raw = extract_send_packet(&init_actions);
2857        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2858        let resp_actions = resp_mgr.handle_local_delivery(
2859            lr_pkt.destination_hash,
2860            &lr_raw,
2861            lr_pkt.packet_hash,
2862            rns_core::transport::types::InterfaceId(0),
2863            &mut rng,
2864        );
2865        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2866        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2867        let init_actions2 = init_mgr.handle_local_delivery(
2868            lrproof_pkt.destination_hash,
2869            &lrproof_raw,
2870            lrproof_pkt.packet_hash,
2871            rns_core::transport::types::InterfaceId(0),
2872            &mut rng,
2873        );
2874        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2875        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2876        resp_mgr.handle_local_delivery(
2877            lrrtt_pkt.destination_hash,
2878            &lrrtt_raw,
2879            lrrtt_pkt.packet_hash,
2880            rns_core::transport::types::InterfaceId(0),
2881            &mut rng,
2882        );
2883
2884        // Send request from initiator
2885        let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
2886        assert_eq!(req_actions.len(), 1);
2887
2888        // Deliver request to responder
2889        let req_raw = extract_send_packet_from(&req_actions);
2890        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2891        let resp_actions = resp_mgr.handle_local_delivery(
2892            req_pkt.destination_hash,
2893            &req_raw,
2894            req_pkt.packet_hash,
2895            rns_core::transport::types::InterfaceId(0),
2896            &mut rng,
2897        );
2898
2899        // Should have a response SendPacket
2900        let has_response = resp_actions
2901            .iter()
2902            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2903        assert!(has_response, "Handler should produce a response packet");
2904    }
2905
2906    #[test]
2907    fn test_request_acl_deny_unidentified() {
2908        let mut rng = OsRng;
2909        let dest_hash = [0xDD; 16];
2910        let mut resp_mgr = LinkManager::new();
2911        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2912        resp_mgr.register_link_destination(
2913            dest_hash,
2914            sig_prv,
2915            sig_pub_bytes,
2916            ResourceStrategy::AcceptNone,
2917        );
2918
2919        // Register handler with ACL (only allow specific identity)
2920        resp_mgr.register_request_handler(
2921            "/restricted",
2922            Some(vec![[0xAA; 16]]),
2923            |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
2924        );
2925
2926        let mut init_mgr = LinkManager::new();
2927
2928        // Complete handshake (without identification)
2929        let (link_id, init_actions) = init_mgr.create_link(
2930            &dest_hash,
2931            &sig_pub_bytes,
2932            1,
2933            constants::MTU as u32,
2934            &mut rng,
2935        );
2936        let lr_raw = extract_send_packet(&init_actions);
2937        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2938        let resp_actions = resp_mgr.handle_local_delivery(
2939            lr_pkt.destination_hash,
2940            &lr_raw,
2941            lr_pkt.packet_hash,
2942            rns_core::transport::types::InterfaceId(0),
2943            &mut rng,
2944        );
2945        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2946        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2947        let init_actions2 = init_mgr.handle_local_delivery(
2948            lrproof_pkt.destination_hash,
2949            &lrproof_raw,
2950            lrproof_pkt.packet_hash,
2951            rns_core::transport::types::InterfaceId(0),
2952            &mut rng,
2953        );
2954        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2955        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2956        resp_mgr.handle_local_delivery(
2957            lrrtt_pkt.destination_hash,
2958            &lrrtt_raw,
2959            lrrtt_pkt.packet_hash,
2960            rns_core::transport::types::InterfaceId(0),
2961            &mut rng,
2962        );
2963
2964        // Send request without identifying first
2965        let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
2966        let req_raw = extract_send_packet_from(&req_actions);
2967        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2968        let resp_actions = resp_mgr.handle_local_delivery(
2969            req_pkt.destination_hash,
2970            &req_raw,
2971            req_pkt.packet_hash,
2972            rns_core::transport::types::InterfaceId(0),
2973            &mut rng,
2974        );
2975
2976        // Should be denied — no response packet
2977        let has_response = resp_actions
2978            .iter()
2979            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2980        assert!(!has_response, "Unidentified peer should be denied");
2981    }
2982
2983    #[test]
2984    fn test_teardown_link() {
2985        let mut rng = OsRng;
2986        let dest_hash = [0xDD; 16];
2987        let mut mgr = LinkManager::new();
2988
2989        let dummy_sig = [0xAA; 32];
2990        let (link_id, _) =
2991            mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
2992        assert_eq!(mgr.link_count(), 1);
2993
2994        let actions = mgr.teardown_link(&link_id);
2995        let has_close = actions
2996            .iter()
2997            .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
2998        assert!(has_close);
2999
3000        // After tick, closed links should be cleaned up
3001        let tick_actions = mgr.tick(&mut rng);
3002        let has_deregister = tick_actions
3003            .iter()
3004            .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3005        assert!(has_deregister);
3006        assert_eq!(mgr.link_count(), 0);
3007    }
3008
3009    #[test]
3010    fn test_identify_on_link() {
3011        let mut rng = OsRng;
3012        let dest_hash = [0xDD; 16];
3013        let mut resp_mgr = LinkManager::new();
3014        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3015        resp_mgr.register_link_destination(
3016            dest_hash,
3017            sig_prv,
3018            sig_pub_bytes,
3019            ResourceStrategy::AcceptNone,
3020        );
3021        let mut init_mgr = LinkManager::new();
3022
3023        // Complete handshake
3024        let (link_id, init_actions) = init_mgr.create_link(
3025            &dest_hash,
3026            &sig_pub_bytes,
3027            1,
3028            constants::MTU as u32,
3029            &mut rng,
3030        );
3031        let lr_raw = extract_send_packet(&init_actions);
3032        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3033        let resp_actions = resp_mgr.handle_local_delivery(
3034            lr_pkt.destination_hash,
3035            &lr_raw,
3036            lr_pkt.packet_hash,
3037            rns_core::transport::types::InterfaceId(0),
3038            &mut rng,
3039        );
3040        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3041        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3042        let init_actions2 = init_mgr.handle_local_delivery(
3043            lrproof_pkt.destination_hash,
3044            &lrproof_raw,
3045            lrproof_pkt.packet_hash,
3046            rns_core::transport::types::InterfaceId(0),
3047            &mut rng,
3048        );
3049        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3050        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3051        resp_mgr.handle_local_delivery(
3052            lrrtt_pkt.destination_hash,
3053            &lrrtt_raw,
3054            lrrtt_pkt.packet_hash,
3055            rns_core::transport::types::InterfaceId(0),
3056            &mut rng,
3057        );
3058
3059        // Identify initiator to responder
3060        let identity = Identity::new(&mut rng);
3061        let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3062        assert_eq!(id_actions.len(), 1);
3063
3064        // Deliver identify to responder
3065        let id_raw = extract_send_packet_from(&id_actions);
3066        let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3067        let resp_actions = resp_mgr.handle_local_delivery(
3068            id_pkt.destination_hash,
3069            &id_raw,
3070            id_pkt.packet_hash,
3071            rns_core::transport::types::InterfaceId(0),
3072            &mut rng,
3073        );
3074
3075        let has_identified = resp_actions
3076            .iter()
3077            .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3078        assert!(has_identified, "Responder should emit RemoteIdentified");
3079    }
3080
3081    #[test]
3082    fn test_path_hash_computation() {
3083        let h1 = compute_path_hash("/status");
3084        let h2 = compute_path_hash("/path");
3085        assert_ne!(h1, h2);
3086
3087        // Deterministic
3088        assert_eq!(h1, compute_path_hash("/status"));
3089    }
3090
3091    #[test]
3092    fn test_link_count() {
3093        let mut mgr = LinkManager::new();
3094        let mut rng = OsRng;
3095
3096        assert_eq!(mgr.link_count(), 0);
3097
3098        let dummy_sig = [0xAA; 32];
3099        mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3100        assert_eq!(mgr.link_count(), 1);
3101
3102        mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3103        assert_eq!(mgr.link_count(), 2);
3104    }
3105
3106    // --- Test helpers ---
3107
3108    fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3109        extract_send_packet_at(actions, actions.len() - 1)
3110    }
3111
3112    fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3113        match &actions[idx] {
3114            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3115            other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3116        }
3117    }
3118
3119    fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3120        actions
3121            .iter()
3122            .find_map(|a| match a {
3123                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3124                _ => None,
3125            })
3126            .expect("Expected at least one SendPacket action")
3127    }
3128
3129    fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3130        extract_any_send_packet(actions)
3131    }
3132
3133    /// Set up two linked managers with an active link.
3134    /// Returns (initiator_mgr, responder_mgr, link_id).
3135    fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3136        let mut rng = OsRng;
3137        let dest_hash = [0xDD; 16];
3138        let mut resp_mgr = LinkManager::new();
3139        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3140        resp_mgr.register_link_destination(
3141            dest_hash,
3142            sig_prv,
3143            sig_pub_bytes,
3144            ResourceStrategy::AcceptNone,
3145        );
3146        let mut init_mgr = LinkManager::new();
3147
3148        let (link_id, init_actions) = init_mgr.create_link(
3149            &dest_hash,
3150            &sig_pub_bytes,
3151            1,
3152            constants::MTU as u32,
3153            &mut rng,
3154        );
3155        let lr_raw = extract_send_packet(&init_actions);
3156        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3157        let resp_actions = resp_mgr.handle_local_delivery(
3158            lr_pkt.destination_hash,
3159            &lr_raw,
3160            lr_pkt.packet_hash,
3161            rns_core::transport::types::InterfaceId(0),
3162            &mut rng,
3163        );
3164        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3165        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3166        let init_actions2 = init_mgr.handle_local_delivery(
3167            lrproof_pkt.destination_hash,
3168            &lrproof_raw,
3169            lrproof_pkt.packet_hash,
3170            rns_core::transport::types::InterfaceId(0),
3171            &mut rng,
3172        );
3173        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3174        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3175        resp_mgr.handle_local_delivery(
3176            lrrtt_pkt.destination_hash,
3177            &lrrtt_raw,
3178            lrrtt_pkt.packet_hash,
3179            rns_core::transport::types::InterfaceId(0),
3180            &mut rng,
3181        );
3182
3183        assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3184        assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3185
3186        (init_mgr, resp_mgr, link_id)
3187    }
3188
3189    // ====================================================================
3190    // Phase 8a: Resource wiring tests
3191    // ====================================================================
3192
3193    #[test]
3194    fn test_resource_strategy_default() {
3195        let mut mgr = LinkManager::new();
3196        let mut rng = OsRng;
3197        let dummy_sig = [0xAA; 32];
3198        let (link_id, _) =
3199            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3200
3201        // Default strategy is AcceptNone
3202        let link = mgr.links.get(&link_id).unwrap();
3203        assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3204    }
3205
3206    #[test]
3207    fn test_set_resource_strategy() {
3208        let mut mgr = LinkManager::new();
3209        let mut rng = OsRng;
3210        let dummy_sig = [0xAA; 32];
3211        let (link_id, _) =
3212            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3213
3214        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3215        assert_eq!(
3216            mgr.links.get(&link_id).unwrap().resource_strategy,
3217            ResourceStrategy::AcceptAll
3218        );
3219
3220        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3221        assert_eq!(
3222            mgr.links.get(&link_id).unwrap().resource_strategy,
3223            ResourceStrategy::AcceptApp
3224        );
3225    }
3226
3227    #[test]
3228    fn test_send_resource_on_active_link() {
3229        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3230        let mut rng = OsRng;
3231
3232        // Send resource data
3233        let data = vec![0xAB; 100]; // small enough for a single part
3234        let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3235
3236        // Should produce at least a SendPacket (advertisement)
3237        let has_send = actions
3238            .iter()
3239            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3240        assert!(
3241            has_send,
3242            "send_resource should emit advertisement SendPacket"
3243        );
3244    }
3245
3246    #[test]
3247    fn test_send_resource_on_inactive_link() {
3248        let mut mgr = LinkManager::new();
3249        let mut rng = OsRng;
3250        let dummy_sig = [0xAA; 32];
3251        let (link_id, _) =
3252            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3253
3254        // Link is Pending, not Active
3255        let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
3256        assert!(actions.is_empty(), "Cannot send resource on inactive link");
3257    }
3258
3259    #[test]
3260    fn test_resource_adv_rejected_by_accept_none() {
3261        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3262        let mut rng = OsRng;
3263
3264        // Responder uses default AcceptNone strategy
3265        // Send resource from initiator
3266        let data = vec![0xCD; 100];
3267        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3268
3269        // Deliver advertisement to responder
3270        for action in &adv_actions {
3271            if let LinkManagerAction::SendPacket { raw, .. } = action {
3272                let pkt = RawPacket::unpack(raw).unwrap();
3273                let resp_actions = resp_mgr.handle_local_delivery(
3274                    pkt.destination_hash,
3275                    raw,
3276                    pkt.packet_hash,
3277                    rns_core::transport::types::InterfaceId(0),
3278                    &mut rng,
3279                );
3280                // AcceptNone: should not produce ResourceReceived, may produce SendPacket (RCL)
3281                let has_resource_received = resp_actions
3282                    .iter()
3283                    .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3284                assert!(
3285                    !has_resource_received,
3286                    "AcceptNone should not accept resource"
3287                );
3288            }
3289        }
3290    }
3291
3292    #[test]
3293    fn test_resource_adv_accepted_by_accept_all() {
3294        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3295        let mut rng = OsRng;
3296
3297        // Set responder to AcceptAll
3298        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3299
3300        // Send resource from initiator
3301        let data = vec![0xCD; 100];
3302        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3303
3304        // Deliver advertisement to responder
3305        for action in &adv_actions {
3306            if let LinkManagerAction::SendPacket { raw, .. } = action {
3307                let pkt = RawPacket::unpack(raw).unwrap();
3308                let resp_actions = resp_mgr.handle_local_delivery(
3309                    pkt.destination_hash,
3310                    raw,
3311                    pkt.packet_hash,
3312                    rns_core::transport::types::InterfaceId(0),
3313                    &mut rng,
3314                );
3315                // AcceptAll: should accept and produce a SendPacket (request for parts)
3316                let has_send = resp_actions
3317                    .iter()
3318                    .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3319                assert!(has_send, "AcceptAll should accept and request parts");
3320            }
3321        }
3322    }
3323
3324    #[test]
3325    fn test_resource_accept_app_query() {
3326        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3327        let mut rng = OsRng;
3328
3329        // Set responder to AcceptApp
3330        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3331
3332        // Send resource from initiator
3333        let data = vec![0xCD; 100];
3334        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3335
3336        // Deliver advertisement to responder
3337        let mut got_query = false;
3338        for action in &adv_actions {
3339            if let LinkManagerAction::SendPacket { raw, .. } = action {
3340                let pkt = RawPacket::unpack(raw).unwrap();
3341                let resp_actions = resp_mgr.handle_local_delivery(
3342                    pkt.destination_hash,
3343                    raw,
3344                    pkt.packet_hash,
3345                    rns_core::transport::types::InterfaceId(0),
3346                    &mut rng,
3347                );
3348                for a in &resp_actions {
3349                    if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
3350                        got_query = true;
3351                    }
3352                }
3353            }
3354        }
3355        assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
3356    }
3357
3358    #[test]
3359    fn test_resource_accept_app_accept() {
3360        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3361        let mut rng = OsRng;
3362
3363        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3364
3365        let data = vec![0xCD; 100];
3366        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3367
3368        for action in &adv_actions {
3369            if let LinkManagerAction::SendPacket { raw, .. } = action {
3370                let pkt = RawPacket::unpack(raw).unwrap();
3371                let resp_actions = resp_mgr.handle_local_delivery(
3372                    pkt.destination_hash,
3373                    raw,
3374                    pkt.packet_hash,
3375                    rns_core::transport::types::InterfaceId(0),
3376                    &mut rng,
3377                );
3378                for a in &resp_actions {
3379                    if let LinkManagerAction::ResourceAcceptQuery {
3380                        link_id: lid,
3381                        resource_hash,
3382                        ..
3383                    } = a
3384                    {
3385                        // Accept the resource
3386                        let accept_actions =
3387                            resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
3388                        // Should produce a SendPacket (request for parts)
3389                        let has_send = accept_actions
3390                            .iter()
3391                            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3392                        assert!(
3393                            has_send,
3394                            "Accepting resource should produce request for parts"
3395                        );
3396                    }
3397                }
3398            }
3399        }
3400    }
3401
3402    #[test]
3403    fn test_resource_accept_app_reject() {
3404        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3405        let mut rng = OsRng;
3406
3407        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3408
3409        let data = vec![0xCD; 100];
3410        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3411
3412        for action in &adv_actions {
3413            if let LinkManagerAction::SendPacket { raw, .. } = action {
3414                let pkt = RawPacket::unpack(raw).unwrap();
3415                let resp_actions = resp_mgr.handle_local_delivery(
3416                    pkt.destination_hash,
3417                    raw,
3418                    pkt.packet_hash,
3419                    rns_core::transport::types::InterfaceId(0),
3420                    &mut rng,
3421                );
3422                for a in &resp_actions {
3423                    if let LinkManagerAction::ResourceAcceptQuery {
3424                        link_id: lid,
3425                        resource_hash,
3426                        ..
3427                    } = a
3428                    {
3429                        // Reject the resource
3430                        let reject_actions =
3431                            resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
3432                        // Rejecting should send a cancel and not request parts
3433                        // No ResourceReceived should appear
3434                        let has_resource_received = reject_actions
3435                            .iter()
3436                            .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3437                        assert!(!has_resource_received);
3438                    }
3439                }
3440            }
3441        }
3442    }
3443
3444    #[test]
3445    fn test_resource_full_transfer() {
3446        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3447        let mut rng = OsRng;
3448
3449        // Set responder to AcceptAll
3450        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3451
3452        // Small data (fits in single SDU)
3453        let original_data = b"Hello, Resource Transfer!".to_vec();
3454        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3455
3456        // Drive the full transfer protocol between the two managers.
3457        // Tag each SendPacket with its source ('i' = initiator, 'r' = responder).
3458        let mut pending: Vec<(char, LinkManagerAction)> =
3459            adv_actions.into_iter().map(|a| ('i', a)).collect();
3460        let mut rounds = 0;
3461        let max_rounds = 50;
3462        let mut resource_received = false;
3463        let mut sender_completed = false;
3464
3465        while !pending.is_empty() && rounds < max_rounds {
3466            rounds += 1;
3467            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3468
3469            for (source, action) in pending.drain(..) {
3470                if let LinkManagerAction::SendPacket { raw, .. } = action {
3471                    let pkt = RawPacket::unpack(&raw).unwrap();
3472
3473                    // Deliver only to the OTHER side
3474                    let target_actions = if source == 'i' {
3475                        resp_mgr.handle_local_delivery(
3476                            pkt.destination_hash,
3477                            &raw,
3478                            pkt.packet_hash,
3479                            rns_core::transport::types::InterfaceId(0),
3480                            &mut rng,
3481                        )
3482                    } else {
3483                        init_mgr.handle_local_delivery(
3484                            pkt.destination_hash,
3485                            &raw,
3486                            pkt.packet_hash,
3487                            rns_core::transport::types::InterfaceId(0),
3488                            &mut rng,
3489                        )
3490                    };
3491
3492                    let target_source = if source == 'i' { 'r' } else { 'i' };
3493                    for a in &target_actions {
3494                        match a {
3495                            LinkManagerAction::ResourceReceived { data, .. } => {
3496                                assert_eq!(*data, original_data);
3497                                resource_received = true;
3498                            }
3499                            LinkManagerAction::ResourceCompleted { .. } => {
3500                                sender_completed = true;
3501                            }
3502                            _ => {}
3503                        }
3504                    }
3505                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3506                }
3507            }
3508            pending = next;
3509        }
3510
3511        assert!(
3512            resource_received,
3513            "Responder should receive resource data (rounds={})",
3514            rounds
3515        );
3516        assert!(
3517            sender_completed,
3518            "Sender should get completion proof (rounds={})",
3519            rounds
3520        );
3521    }
3522
3523    #[test]
3524    fn test_resource_cancel_icl() {
3525        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3526        let mut rng = OsRng;
3527
3528        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3529
3530        // Use large data so transfer is multi-part
3531        let data = vec![0xAB; 2000];
3532        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3533
3534        // Deliver advertisement — responder accepts and sends request
3535        for action in &adv_actions {
3536            if let LinkManagerAction::SendPacket { raw, .. } = action {
3537                let pkt = RawPacket::unpack(raw).unwrap();
3538                resp_mgr.handle_local_delivery(
3539                    pkt.destination_hash,
3540                    raw,
3541                    pkt.packet_hash,
3542                    rns_core::transport::types::InterfaceId(0),
3543                    &mut rng,
3544                );
3545            }
3546        }
3547
3548        // Verify there are incoming resources on the responder
3549        assert!(!resp_mgr
3550            .links
3551            .get(&link_id)
3552            .unwrap()
3553            .incoming_resources
3554            .is_empty());
3555
3556        // Simulate ICL (cancel from initiator side) by calling handle_resource_icl
3557        let icl_actions = resp_mgr.handle_resource_icl(&link_id);
3558
3559        // Should have resource failed
3560        let has_failed = icl_actions
3561            .iter()
3562            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3563        assert!(has_failed, "ICL should produce ResourceFailed");
3564    }
3565
3566    #[test]
3567    fn test_resource_cancel_rcl() {
3568        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3569        let mut rng = OsRng;
3570
3571        // Create a resource sender
3572        let data = vec![0xAB; 2000];
3573        init_mgr.send_resource(&link_id, &data, None, &mut rng);
3574
3575        // Verify there are outgoing resources
3576        assert!(!init_mgr
3577            .links
3578            .get(&link_id)
3579            .unwrap()
3580            .outgoing_resources
3581            .is_empty());
3582
3583        // Simulate RCL (cancel from receiver side)
3584        let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
3585
3586        let has_failed = rcl_actions
3587            .iter()
3588            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3589        assert!(has_failed, "RCL should produce ResourceFailed");
3590    }
3591
3592    #[test]
3593    fn test_cancel_all_resources_clears_active_transfers() {
3594        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3595        let mut rng = OsRng;
3596
3597        let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
3598        assert!(!actions.is_empty());
3599        assert_eq!(init_mgr.resource_transfer_count(), 1);
3600
3601        let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
3602
3603        assert_eq!(init_mgr.resource_transfer_count(), 0);
3604        assert!(cancel_actions
3605            .iter()
3606            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
3607    }
3608
3609    #[test]
3610    fn test_resource_tick_cleans_up() {
3611        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3612        let mut rng = OsRng;
3613
3614        let data = vec![0xAB; 100];
3615        init_mgr.send_resource(&link_id, &data, None, &mut rng);
3616
3617        assert!(!init_mgr
3618            .links
3619            .get(&link_id)
3620            .unwrap()
3621            .outgoing_resources
3622            .is_empty());
3623
3624        // Cancel the sender to make it Complete
3625        init_mgr.handle_resource_rcl(&link_id);
3626
3627        // Tick should clean up completed resources
3628        init_mgr.tick(&mut rng);
3629
3630        assert!(
3631            init_mgr
3632                .links
3633                .get(&link_id)
3634                .unwrap()
3635                .outgoing_resources
3636                .is_empty(),
3637            "Tick should clean up completed/failed outgoing resources"
3638        );
3639    }
3640
3641    #[test]
3642    fn test_build_link_packet() {
3643        let (init_mgr, _resp_mgr, link_id) = setup_active_link();
3644
3645        let actions =
3646            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
3647        assert_eq!(actions.len(), 1);
3648        if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
3649            let pkt = RawPacket::unpack(raw).unwrap();
3650            assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
3651            assert_eq!(*dest_type, constants::DESTINATION_LINK);
3652        } else {
3653            panic!("Expected SendPacket");
3654        }
3655    }
3656
3657    // ====================================================================
3658    // Phase 8b: Channel message & data callback tests
3659    // ====================================================================
3660
3661    #[test]
3662    fn test_channel_message_delivery() {
3663        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3664        let mut rng = OsRng;
3665
3666        // Send channel message from initiator
3667        let chan_actions = init_mgr
3668            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
3669            .expect("active link channel send should succeed");
3670        assert!(!chan_actions.is_empty());
3671
3672        // Deliver to responder
3673        let mut got_channel_msg = false;
3674        for action in &chan_actions {
3675            if let LinkManagerAction::SendPacket { raw, .. } = action {
3676                let pkt = RawPacket::unpack(raw).unwrap();
3677                let resp_actions = resp_mgr.handle_local_delivery(
3678                    pkt.destination_hash,
3679                    raw,
3680                    pkt.packet_hash,
3681                    rns_core::transport::types::InterfaceId(0),
3682                    &mut rng,
3683                );
3684                for a in &resp_actions {
3685                    if let LinkManagerAction::ChannelMessageReceived {
3686                        msgtype, payload, ..
3687                    } = a
3688                    {
3689                        assert_eq!(*msgtype, 42);
3690                        assert_eq!(*payload, b"channel data");
3691                        got_channel_msg = true;
3692                    }
3693                }
3694            }
3695        }
3696        assert!(got_channel_msg, "Responder should receive channel message");
3697    }
3698
3699    #[test]
3700    fn test_channel_proof_reopens_send_window() {
3701        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3702        let mut rng = OsRng;
3703
3704        init_mgr
3705            .send_channel_message(&link_id, 42, b"first", &mut rng)
3706            .expect("first send should succeed");
3707        init_mgr
3708            .send_channel_message(&link_id, 42, b"second", &mut rng)
3709            .expect("second send should succeed");
3710
3711        let err = init_mgr
3712            .send_channel_message(&link_id, 42, b"third", &mut rng)
3713            .expect_err("third send should hit the initial channel window");
3714        assert_eq!(err, "Channel is not ready to send");
3715
3716        let queued_packets = init_mgr
3717            .links
3718            .get(&link_id)
3719            .unwrap()
3720            .pending_channel_packets
3721            .clone();
3722        assert_eq!(queued_packets.len(), 2);
3723        for tracked_hash in queued_packets.keys().take(1) {
3724            let mut proof_data = Vec::with_capacity(96);
3725            proof_data.extend_from_slice(tracked_hash);
3726            proof_data.extend_from_slice(&[0x11; 64]);
3727            let flags = PacketFlags {
3728                header_type: constants::HEADER_1,
3729                context_flag: constants::FLAG_UNSET,
3730                transport_type: constants::TRANSPORT_BROADCAST,
3731                destination_type: constants::DESTINATION_LINK,
3732                packet_type: constants::PACKET_TYPE_PROOF,
3733            };
3734            let proof = RawPacket::pack(
3735                flags,
3736                0,
3737                &link_id,
3738                None,
3739                constants::CONTEXT_NONE,
3740                &proof_data,
3741            )
3742            .expect("proof packet should pack");
3743            let ack_actions = init_mgr.handle_local_delivery(
3744                link_id,
3745                &proof.raw,
3746                proof.packet_hash,
3747                rns_core::transport::types::InterfaceId(0),
3748                &mut rng,
3749            );
3750            assert!(
3751                ack_actions.is_empty(),
3752                "proof delivery should only update channel state"
3753            );
3754        }
3755
3756        init_mgr
3757            .send_channel_message(&link_id, 42, b"third", &mut rng)
3758            .expect("proof should free one channel slot");
3759    }
3760
3761    #[test]
3762    fn test_generic_link_data_delivery() {
3763        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3764        let mut rng = OsRng;
3765
3766        // Send generic data with a custom context
3767        let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
3768        assert_eq!(actions.len(), 1);
3769
3770        // Deliver to responder
3771        let raw = extract_any_send_packet(&actions);
3772        let pkt = RawPacket::unpack(&raw).unwrap();
3773        let resp_actions = resp_mgr.handle_local_delivery(
3774            pkt.destination_hash,
3775            &raw,
3776            pkt.packet_hash,
3777            rns_core::transport::types::InterfaceId(0),
3778            &mut rng,
3779        );
3780
3781        let has_data = resp_actions
3782            .iter()
3783            .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
3784        assert!(
3785            has_data,
3786            "Responder should receive LinkDataReceived for unknown context"
3787        );
3788    }
3789
3790    #[test]
3791    fn test_response_delivery() {
3792        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3793        let mut rng = OsRng;
3794
3795        // Register handler on responder
3796        resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
3797            Some(data.to_vec())
3798        });
3799
3800        // Send request from initiator
3801        let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); // msgpack nil
3802        assert!(!req_actions.is_empty());
3803
3804        // Deliver request to responder — should produce response
3805        let req_raw = extract_any_send_packet(&req_actions);
3806        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3807        let resp_actions = resp_mgr.handle_local_delivery(
3808            req_pkt.destination_hash,
3809            &req_raw,
3810            req_pkt.packet_hash,
3811            rns_core::transport::types::InterfaceId(0),
3812            &mut rng,
3813        );
3814        let has_resp_send = resp_actions
3815            .iter()
3816            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3817        assert!(has_resp_send, "Handler should produce response");
3818
3819        // Deliver response back to initiator
3820        let resp_raw = extract_any_send_packet(&resp_actions);
3821        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3822        let init_actions = init_mgr.handle_local_delivery(
3823            resp_pkt.destination_hash,
3824            &resp_raw,
3825            resp_pkt.packet_hash,
3826            rns_core::transport::types::InterfaceId(0),
3827            &mut rng,
3828        );
3829
3830        let has_response_received = init_actions
3831            .iter()
3832            .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
3833        assert!(
3834            has_response_received,
3835            "Initiator should receive ResponseReceived"
3836        );
3837    }
3838
3839    #[test]
3840    fn test_large_response_uses_resource_fallback() {
3841        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3842        let mut rng = OsRng;
3843
3844        // Register handler on responder with payload that cannot fit a direct
3845        // CONTEXT_RESPONSE packet.
3846        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
3847        resp_mgr.register_request_handler("/large", None, {
3848            let large_payload = large_payload.clone();
3849            move |_link_id, _path, _data, _remote| Some(large_payload.clone())
3850        });
3851
3852        // Send request from initiator.
3853        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
3854        assert!(!req_actions.is_empty());
3855
3856        // Deliver request to responder and inspect responder outbound packets.
3857        let req_raw = extract_any_send_packet(&req_actions);
3858        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3859        let resp_actions = resp_mgr.handle_local_delivery(
3860            req_pkt.destination_hash,
3861            &req_raw,
3862            req_pkt.packet_hash,
3863            rns_core::transport::types::InterfaceId(0),
3864            &mut rng,
3865        );
3866
3867        let mut has_resource_adv = false;
3868        let mut has_direct_response = false;
3869        for action in &resp_actions {
3870            if let LinkManagerAction::SendPacket { raw, .. } = action {
3871                let pkt = RawPacket::unpack(raw).unwrap();
3872                if pkt.context == constants::CONTEXT_RESOURCE_ADV {
3873                    has_resource_adv = true;
3874                }
3875                if pkt.context == constants::CONTEXT_RESPONSE {
3876                    has_direct_response = true;
3877                }
3878            }
3879        }
3880
3881        assert!(
3882            has_resource_adv,
3883            "Large response should advertise a response resource"
3884        );
3885        assert!(
3886            !has_direct_response,
3887            "Large response should not use direct CONTEXT_RESPONSE packet"
3888        );
3889    }
3890
3891    #[test]
3892    fn test_send_channel_message_on_no_channel() {
3893        let mut mgr = LinkManager::new();
3894        let mut rng = OsRng;
3895        let dummy_sig = [0xAA; 32];
3896        let (link_id, _) =
3897            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3898
3899        // Link is Pending (no channel), should return empty
3900        let err = mgr
3901            .send_channel_message(&link_id, 1, b"test", &mut rng)
3902            .expect_err("pending link should reject channel send");
3903        assert_eq!(err, "link has no active channel");
3904    }
3905
3906    #[test]
3907    fn test_send_on_link_requires_active() {
3908        let mut mgr = LinkManager::new();
3909        let mut rng = OsRng;
3910        let dummy_sig = [0xAA; 32];
3911        let (link_id, _) =
3912            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3913
3914        let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
3915        assert!(actions.is_empty(), "Cannot send on pending link");
3916    }
3917
3918    #[test]
3919    fn test_send_on_link_unknown_link() {
3920        let mgr = LinkManager::new();
3921        let mut rng = OsRng;
3922
3923        let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
3924        assert!(actions.is_empty());
3925    }
3926
3927    #[test]
3928    fn test_resource_full_transfer_large() {
3929        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3930        let mut rng = OsRng;
3931
3932        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3933
3934        // Multi-part data (larger than a single SDU of 464 bytes)
3935        let original_data: Vec<u8> = (0..2000u32)
3936            .map(|i| {
3937                let pos = i as usize;
3938                (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
3939            })
3940            .collect();
3941
3942        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3943
3944        let mut pending: Vec<(char, LinkManagerAction)> =
3945            adv_actions.into_iter().map(|a| ('i', a)).collect();
3946        let mut rounds = 0;
3947        let max_rounds = 200;
3948        let mut resource_received = false;
3949        let mut sender_completed = false;
3950
3951        while !pending.is_empty() && rounds < max_rounds {
3952            rounds += 1;
3953            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3954
3955            for (source, action) in pending.drain(..) {
3956                if let LinkManagerAction::SendPacket { raw, .. } = action {
3957                    let pkt = match RawPacket::unpack(&raw) {
3958                        Ok(p) => p,
3959                        Err(_) => continue,
3960                    };
3961
3962                    let target_actions = if source == 'i' {
3963                        resp_mgr.handle_local_delivery(
3964                            pkt.destination_hash,
3965                            &raw,
3966                            pkt.packet_hash,
3967                            rns_core::transport::types::InterfaceId(0),
3968                            &mut rng,
3969                        )
3970                    } else {
3971                        init_mgr.handle_local_delivery(
3972                            pkt.destination_hash,
3973                            &raw,
3974                            pkt.packet_hash,
3975                            rns_core::transport::types::InterfaceId(0),
3976                            &mut rng,
3977                        )
3978                    };
3979
3980                    let target_source = if source == 'i' { 'r' } else { 'i' };
3981                    for a in &target_actions {
3982                        match a {
3983                            LinkManagerAction::ResourceReceived { data, .. } => {
3984                                assert_eq!(*data, original_data);
3985                                resource_received = true;
3986                            }
3987                            LinkManagerAction::ResourceCompleted { .. } => {
3988                                sender_completed = true;
3989                            }
3990                            _ => {}
3991                        }
3992                    }
3993                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3994                }
3995            }
3996            pending = next;
3997        }
3998
3999        assert!(
4000            resource_received,
4001            "Should receive large resource (rounds={})",
4002            rounds
4003        );
4004        assert!(
4005            sender_completed,
4006            "Sender should complete (rounds={})",
4007            rounds
4008        );
4009    }
4010
4011    #[test]
4012    fn test_process_resource_actions_mapping() {
4013        let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4014        let mut rng = OsRng;
4015
4016        // Test that various ResourceActions map to correct LinkManagerActions
4017        let actions = vec![
4018            ResourceAction::DataReceived {
4019                data: vec![1, 2, 3],
4020                metadata: Some(vec![4, 5]),
4021            },
4022            ResourceAction::Completed,
4023            ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
4024            ResourceAction::ProgressUpdate {
4025                received: 10,
4026                total: 20,
4027            },
4028        ];
4029
4030        let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
4031
4032        assert!(matches!(
4033            result[0],
4034            LinkManagerAction::ResourceReceived { .. }
4035        ));
4036        assert!(matches!(
4037            result[1],
4038            LinkManagerAction::ResourceCompleted { .. }
4039        ));
4040        assert!(matches!(
4041            result[2],
4042            LinkManagerAction::ResourceFailed { .. }
4043        ));
4044        assert!(matches!(
4045            result[3],
4046            LinkManagerAction::ResourceProgress {
4047                received: 10,
4048                total: 20,
4049                ..
4050            }
4051        ));
4052    }
4053
4054    #[test]
4055    fn test_link_state_empty() {
4056        let mgr = LinkManager::new();
4057        let fake_id = [0xAA; 16];
4058        assert!(mgr.link_state(&fake_id).is_none());
4059    }
4060
4061    #[test]
4062    fn test_large_response_resource_completes_as_response() {
4063        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4064        let mut rng = OsRng;
4065
4066        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
4067        let response_value =
4068            rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
4069        resp_mgr.register_request_handler("/large", None, {
4070            let response_value = response_value.clone();
4071            move |_link_id, _path, _data, _remote| Some(response_value.clone())
4072        });
4073
4074        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
4075        let req_raw = extract_any_send_packet(&req_actions);
4076        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4077        let request_id = req_pkt.get_truncated_hash();
4078        let resp_actions = resp_mgr.handle_local_delivery(
4079            req_pkt.destination_hash,
4080            &req_raw,
4081            req_pkt.packet_hash,
4082            rns_core::transport::types::InterfaceId(0),
4083            &mut rng,
4084        );
4085
4086        let mut pending: Vec<(char, LinkManagerAction)> =
4087            resp_actions.into_iter().map(|a| ('r', a)).collect();
4088        let mut rounds = 0;
4089        let mut received_response = None;
4090
4091        while !pending.is_empty() && rounds < 200 {
4092            rounds += 1;
4093            let mut next = Vec::new();
4094
4095            for (source, action) in pending.drain(..) {
4096                let LinkManagerAction::SendPacket { raw, .. } = action else {
4097                    continue;
4098                };
4099                let pkt = RawPacket::unpack(&raw).unwrap();
4100                let target_actions = if source == 'r' {
4101                    init_mgr.handle_local_delivery(
4102                        pkt.destination_hash,
4103                        &raw,
4104                        pkt.packet_hash,
4105                        rns_core::transport::types::InterfaceId(0),
4106                        &mut rng,
4107                    )
4108                } else {
4109                    resp_mgr.handle_local_delivery(
4110                        pkt.destination_hash,
4111                        &raw,
4112                        pkt.packet_hash,
4113                        rns_core::transport::types::InterfaceId(0),
4114                        &mut rng,
4115                    )
4116                };
4117
4118                let target_source = if source == 'r' { 'i' } else { 'r' };
4119                for target_action in &target_actions {
4120                    match target_action {
4121                        LinkManagerAction::ResponseReceived {
4122                            request_id: rid,
4123                            data,
4124                            ..
4125                        } => {
4126                            received_response = Some((*rid, data.clone()));
4127                        }
4128                        LinkManagerAction::ResourceReceived { .. } => {
4129                            panic!("response resources must complete as ResponseReceived")
4130                        }
4131                        LinkManagerAction::ResourceAcceptQuery { .. } => {
4132                            panic!("response resources must bypass application acceptance")
4133                        }
4134                        _ => {}
4135                    }
4136                }
4137                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4138            }
4139
4140            pending = next;
4141        }
4142
4143        let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
4144            panic!(
4145                "large response resource did not complete as ResponseReceived after {} rounds",
4146                rounds
4147            )
4148        });
4149        assert_eq!(received_request_id, request_id);
4150        assert_eq!(received_data, response_value);
4151    }
4152
4153    #[test]
4154    fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
4155        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4156        let mut rng = OsRng;
4157
4158        init_mgr.set_link_mtu(&link_id, 300);
4159        resp_mgr.set_link_mtu(&link_id, 300);
4160
4161        let payload = vec![0xAB; 350];
4162        let response_value =
4163            rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4164        resp_mgr.register_request_handler("/mtu", None, {
4165            let response_value = response_value.clone();
4166            move |_link_id, _path, _data, _remote| Some(response_value.clone())
4167        });
4168
4169        let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
4170        let req_raw = extract_any_send_packet(&req_actions);
4171        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4172        let resp_actions = resp_mgr.handle_local_delivery(
4173            req_pkt.destination_hash,
4174            &req_raw,
4175            req_pkt.packet_hash,
4176            rns_core::transport::types::InterfaceId(0),
4177            &mut rng,
4178        );
4179
4180        let mut has_resource_adv = false;
4181        let mut direct_response_len = None;
4182        for action in &resp_actions {
4183            if let LinkManagerAction::SendPacket { raw, .. } = action {
4184                let pkt = RawPacket::unpack(raw).unwrap();
4185                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4186                if pkt.context == constants::CONTEXT_RESPONSE {
4187                    direct_response_len = Some(raw.len());
4188                }
4189            }
4190        }
4191
4192        assert!(
4193            has_resource_adv,
4194            "responses larger than the negotiated link MTU should use resource fallback"
4195        );
4196        assert!(
4197            direct_response_len.is_none(),
4198            "sent direct response of {} bytes on a 300 byte negotiated MTU",
4199            direct_response_len.unwrap_or_default()
4200        );
4201    }
4202
4203    #[test]
4204    fn test_large_management_response_uses_resource_fallback() {
4205        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4206        let mut rng = OsRng;
4207
4208        let payload = vec![0xBC; 5000];
4209        let response_value =
4210            rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4211        let actions = resp_mgr.send_management_response(
4212            &link_id,
4213            &[0x55; 16],
4214            &response_value,
4215            &mut rng,
4216        );
4217
4218        let mut has_resource_adv = false;
4219        let mut has_direct_response = false;
4220        for action in &actions {
4221            if let LinkManagerAction::SendPacket { raw, .. } = action {
4222                let pkt = RawPacket::unpack(raw).unwrap();
4223                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4224                has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
4225            }
4226        }
4227
4228        assert!(
4229            has_resource_adv,
4230            "large management responses should advertise a response resource"
4231        );
4232        assert!(
4233            !has_direct_response,
4234            "large management responses should not use a direct CONTEXT_RESPONSE packet"
4235        );
4236    }
4237}