Skip to main content

rns_net/common/
link_manager.rs

1//! Link manager: wires rns-core LinkEngine + Channel + Resource into the driver.
2//!
3//! Manages multiple concurrent links, link destination registration,
4//! request/response handling, resource transfers, and full lifecycle
5//! (handshake → active → teardown).
6//!
7//! Python reference: Link.py, RequestReceipt.py, Resource.py
8
9use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23/// Resource acceptance strategy.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26    /// Reject all incoming resources.
27    AcceptNone,
28    /// Accept all incoming resources automatically.
29    AcceptAll,
30    /// Query the application callback for each resource.
31    AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35    fn default() -> Self {
36        ResourceStrategy::AcceptNone
37    }
38}
39
40/// A managed link wrapping LinkEngine + optional Channel + resources.
41struct ManagedLink {
42    engine: LinkEngine,
43    channel: Option<Channel>,
44    pending_channel_packets: HashMap<[u8; 32], Sequence>,
45    channel_send_ok: u64,
46    channel_send_not_ready: u64,
47    channel_send_too_big: u64,
48    channel_send_other_error: u64,
49    channel_messages_received: u64,
50    channel_proofs_sent: u64,
51    channel_proofs_received: u64,
52    /// Destination hash this link belongs to.
53    dest_hash: [u8; 16],
54    /// Remote identity (hash, public_key) once identified.
55    remote_identity: Option<([u8; 16], [u8; 64])>,
56    /// Destination's Ed25519 signing public key (for initiator to verify LRPROOF).
57    dest_sig_pub_bytes: Option<[u8; 32]>,
58    /// Active incoming resource transfers.
59    incoming_resources: Vec<ResourceReceiver>,
60    /// Active outgoing resource transfers.
61    outgoing_resources: Vec<ResourceSender>,
62    /// Resource acceptance strategy.
63    resource_strategy: ResourceStrategy,
64    /// Interface this link's packets should be sent on when known.
65    route_interface: Option<rns_core::transport::types::InterfaceId>,
66    /// Next-hop transport ID seen on inbound HEADER_2 link traffic.
67    ///
68    /// When present, outbound link packets can be rewritten to HEADER_2 using
69    /// this transport ID to preserve multi-hop routing.
70    route_transport_id: Option<[u8; 16]>,
71}
72
73/// A registered link destination that can accept incoming LINKREQUEST.
74struct LinkDestination {
75    sig_prv: Ed25519PrivateKey,
76    sig_pub_bytes: [u8; 32],
77    resource_strategy: ResourceStrategy,
78}
79
80/// A registered request handler for a path.
81struct RequestHandlerEntry {
82    /// The path this handler serves (e.g. "/status").
83    path: String,
84    /// The truncated hash of the path (first 16 bytes of SHA-256).
85    path_hash: [u8; 16],
86    /// Access control: None means allow all, Some(list) means allow only listed identities.
87    allowed_list: Option<Vec<[u8; 16]>>,
88    /// Handler function: (link_id, path, request_id, data, remote_identity) -> Option<response_data>.
89    handler:
90        Box<dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>> + Send>,
91}
92
93/// Actions produced by LinkManager for the driver to dispatch.
94#[derive(Debug)]
95pub enum LinkManagerAction {
96    /// Send a packet via the transport engine outbound path.
97    SendPacket {
98        raw: Vec<u8>,
99        dest_type: u8,
100        attached_interface: Option<rns_core::transport::types::InterfaceId>,
101    },
102    /// Link established — notify callbacks.
103    LinkEstablished {
104        link_id: LinkId,
105        dest_hash: [u8; 16],
106        rtt: f64,
107        is_initiator: bool,
108    },
109    /// Link closed — notify callbacks.
110    LinkClosed {
111        link_id: LinkId,
112        reason: Option<TeardownReason>,
113    },
114    /// Remote peer identified — notify callbacks.
115    RemoteIdentified {
116        link_id: LinkId,
117        identity_hash: [u8; 16],
118        public_key: [u8; 64],
119    },
120    /// Register a link_id as local destination in transport (for receiving link data).
121    RegisterLinkDest { link_id: LinkId },
122    /// Deregister a link_id from transport local destinations.
123    DeregisterLinkDest { link_id: LinkId },
124    /// A management request that needs to be handled by the driver.
125    /// The driver has access to engine state needed to build the response.
126    ManagementRequest {
127        link_id: LinkId,
128        path_hash: [u8; 16],
129        /// The request data (msgpack-encoded Value from the request array).
130        data: Vec<u8>,
131        /// The request_id (truncated hash of the packed request).
132        request_id: [u8; 16],
133        remote_identity: Option<([u8; 16], [u8; 64])>,
134    },
135    /// Resource data fully received and assembled.
136    ResourceReceived {
137        link_id: LinkId,
138        data: Vec<u8>,
139        metadata: Option<Vec<u8>>,
140    },
141    /// Resource transfer completed (proof validated on sender side).
142    ResourceCompleted { link_id: LinkId },
143    /// Resource transfer failed.
144    ResourceFailed { link_id: LinkId, error: String },
145    /// Resource transfer progress update.
146    ResourceProgress {
147        link_id: LinkId,
148        received: usize,
149        total: usize,
150    },
151    /// Query application whether to accept an incoming resource (for AcceptApp strategy).
152    ResourceAcceptQuery {
153        link_id: LinkId,
154        resource_hash: Vec<u8>,
155        transfer_size: u64,
156        has_metadata: bool,
157    },
158    /// Channel message received on a link.
159    ChannelMessageReceived {
160        link_id: LinkId,
161        msgtype: u16,
162        payload: Vec<u8>,
163    },
164    /// Generic link data received (CONTEXT_NONE).
165    LinkDataReceived {
166        link_id: LinkId,
167        context: u8,
168        data: Vec<u8>,
169    },
170    /// Response received on a link.
171    ResponseReceived {
172        link_id: LinkId,
173        request_id: [u8; 16],
174        data: Vec<u8>,
175    },
176    /// A link request was received (for hook notification).
177    LinkRequestReceived {
178        link_id: LinkId,
179        receiving_interface: rns_core::transport::types::InterfaceId,
180    },
181}
182
183/// Manages multiple links, link destinations, and request/response.
184pub struct LinkManager {
185    links: HashMap<LinkId, ManagedLink>,
186    link_destinations: HashMap<[u8; 16], LinkDestination>,
187    request_handlers: Vec<RequestHandlerEntry>,
188    /// Path hashes that should be handled externally (by the driver) rather than
189    /// by registered handler closures. Used for management destinations.
190    management_paths: Vec<[u8; 16]>,
191}
192
193impl LinkManager {
194    fn resource_sdu_for_link(link: &ManagedLink) -> usize {
195        // Python parity: Resource.sdu = link.mtu - HEADER_MAXSIZE - IFAC_MIN_SIZE
196        // when MTU signalling is available on the link.
197        let mtu = link.engine.mtu() as usize;
198        let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
199        if derived > 0 {
200            derived
201        } else {
202            constants::RESOURCE_SDU
203        }
204    }
205
206    /// Create a new empty link manager.
207    pub fn new() -> Self {
208        LinkManager {
209            links: HashMap::new(),
210            link_destinations: HashMap::new(),
211            request_handlers: Vec::new(),
212            management_paths: Vec::new(),
213        }
214    }
215
216    /// Register a path hash as a management path.
217    /// Management requests are returned as ManagementRequest actions
218    /// for the driver to handle (since they need access to engine state).
219    pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
220        if !self.management_paths.contains(&path_hash) {
221            self.management_paths.push(path_hash);
222        }
223    }
224
225    /// Get the derived session key for a link (needed for hole-punch token derivation).
226    pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
227        self.links
228            .get(link_id)
229            .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
230    }
231
232    /// Return best-known routing hint for link packets.
233    pub fn get_link_route_hint(
234        &self,
235        link_id: &LinkId,
236    ) -> Option<(rns_core::transport::types::InterfaceId, Option<[u8; 16]>)> {
237        self.links.get(link_id).and_then(|link| {
238            link.route_interface
239                .map(|iface| (iface, link.route_transport_id))
240        })
241    }
242
243    /// Register a destination that can accept incoming links.
244    pub fn register_link_destination(
245        &mut self,
246        dest_hash: [u8; 16],
247        sig_prv: Ed25519PrivateKey,
248        sig_pub_bytes: [u8; 32],
249        resource_strategy: ResourceStrategy,
250    ) {
251        self.link_destinations.insert(
252            dest_hash,
253            LinkDestination {
254                sig_prv,
255                sig_pub_bytes,
256                resource_strategy,
257            },
258        );
259    }
260
261    /// Deregister a link destination.
262    pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
263        self.link_destinations.remove(dest_hash);
264    }
265
266    /// Register a request handler for a given path.
267    ///
268    /// `path`: the request path string (e.g. "/status")
269    /// `allowed_list`: None = allow all, Some(list) = restrict to these identity hashes
270    /// `handler`: called with (link_id, path, request_data, remote_identity) -> Option<response>
271    pub fn register_request_handler<F>(
272        &mut self,
273        path: &str,
274        allowed_list: Option<Vec<[u8; 16]>>,
275        handler: F,
276    ) where
277        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
278            + Send
279            + 'static,
280    {
281        let path_hash = compute_path_hash(path);
282        self.request_handlers.push(RequestHandlerEntry {
283            path: path.to_string(),
284            path_hash,
285            allowed_list,
286            handler: Box::new(handler),
287        });
288    }
289
290    /// Create an outbound link to a destination.
291    ///
292    /// `dest_sig_pub_bytes` is the destination's Ed25519 signing public key
293    /// (needed to verify LRPROOF). In Python this comes from the Destination's Identity.
294    ///
295    /// Returns `(link_id, actions)`. The first action will be a SendPacket with
296    /// the LINKREQUEST.
297    pub fn create_link(
298        &mut self,
299        dest_hash: &[u8; 16],
300        dest_sig_pub_bytes: &[u8; 32],
301        hops: u8,
302        mtu: u32,
303        rng: &mut dyn Rng,
304    ) -> (LinkId, Vec<LinkManagerAction>) {
305        let mode = LinkMode::Aes256Cbc;
306        let (mut engine, request_data) =
307            LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
308
309        // Build the LINKREQUEST packet to compute link_id
310        let flags = PacketFlags {
311            header_type: constants::HEADER_1,
312            context_flag: constants::FLAG_UNSET,
313            transport_type: constants::TRANSPORT_BROADCAST,
314            destination_type: constants::DESTINATION_SINGLE,
315            packet_type: constants::PACKET_TYPE_LINKREQUEST,
316        };
317
318        let packet = match RawPacket::pack(
319            flags,
320            0,
321            dest_hash,
322            None,
323            constants::CONTEXT_NONE,
324            &request_data,
325        ) {
326            Ok(p) => p,
327            Err(_) => {
328                // Should not happen with valid data
329                return ([0u8; 16], Vec::new());
330            }
331        };
332
333        engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
334        let link_id = *engine.link_id();
335
336        let managed = ManagedLink {
337            engine,
338            channel: None,
339            pending_channel_packets: HashMap::new(),
340            channel_send_ok: 0,
341            channel_send_not_ready: 0,
342            channel_send_too_big: 0,
343            channel_send_other_error: 0,
344            channel_messages_received: 0,
345            channel_proofs_sent: 0,
346            channel_proofs_received: 0,
347            dest_hash: *dest_hash,
348            remote_identity: None,
349            dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
350            incoming_resources: Vec::new(),
351            outgoing_resources: Vec::new(),
352            resource_strategy: ResourceStrategy::default(),
353            route_interface: None,
354            route_transport_id: None,
355        };
356        self.links.insert(link_id, managed);
357
358        let mut actions = Vec::new();
359        // Register the link_id as a local destination so we can receive LRPROOF
360        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
361        // Send the LINKREQUEST packet
362        actions.push(LinkManagerAction::SendPacket {
363            raw: packet.raw,
364            dest_type: constants::DESTINATION_LINK,
365            attached_interface: None,
366        });
367
368        (link_id, actions)
369    }
370
371    /// Handle a packet delivered locally (via DeliverLocal).
372    ///
373    /// Returns actions for the driver to dispatch. The `dest_hash` is the
374    /// packet's destination_hash field. `raw` is the full packet bytes.
375    /// `packet_hash` is the SHA-256 hash.
376    pub fn handle_local_delivery(
377        &mut self,
378        dest_hash: [u8; 16],
379        raw: &[u8],
380        packet_hash: [u8; 32],
381        receiving_interface: rns_core::transport::types::InterfaceId,
382        rng: &mut dyn Rng,
383    ) -> Vec<LinkManagerAction> {
384        let packet = match RawPacket::unpack(raw) {
385            Ok(p) => p,
386            Err(_) => return Vec::new(),
387        };
388
389        match packet.flags.packet_type {
390            constants::PACKET_TYPE_LINKREQUEST => {
391                self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
392            }
393            constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
394                // LRPROOF: dest_hash is the link_id
395                self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
396            }
397            constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
398            constants::PACKET_TYPE_DATA => {
399                self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
400            }
401            _ => Vec::new(),
402        }
403    }
404
405    /// Handle an incoming LINKREQUEST packet.
406    fn handle_linkrequest(
407        &mut self,
408        dest_hash: &[u8; 16],
409        packet: &RawPacket,
410        receiving_interface: rns_core::transport::types::InterfaceId,
411        rng: &mut dyn Rng,
412    ) -> Vec<LinkManagerAction> {
413        // Look up the link destination
414        let ld = match self.link_destinations.get(dest_hash) {
415            Some(ld) => ld,
416            None => return Vec::new(),
417        };
418
419        let hashable = packet.get_hashable_part();
420        let now = time::now();
421
422        // Create responder engine
423        let (engine, lrproof_data) = match LinkEngine::new_responder(
424            &ld.sig_prv,
425            &ld.sig_pub_bytes,
426            &packet.data,
427            &hashable,
428            dest_hash,
429            packet.hops,
430            now,
431            rng,
432        ) {
433            Ok(r) => r,
434            Err(e) => {
435                log::debug!("LINKREQUEST rejected: {}", e);
436                return Vec::new();
437            }
438        };
439
440        let link_id = *engine.link_id();
441        log::debug!(
442            "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
443            &link_id[..4],
444            receiving_interface.0,
445            packet.flags.header_type,
446            packet.transport_id.is_some(),
447            packet.hops
448        );
449
450        let managed = ManagedLink {
451            engine,
452            channel: None,
453            pending_channel_packets: HashMap::new(),
454            channel_send_ok: 0,
455            channel_send_not_ready: 0,
456            channel_send_too_big: 0,
457            channel_send_other_error: 0,
458            channel_messages_received: 0,
459            channel_proofs_sent: 0,
460            channel_proofs_received: 0,
461            dest_hash: *dest_hash,
462            remote_identity: None,
463            dest_sig_pub_bytes: None,
464            incoming_resources: Vec::new(),
465            outgoing_resources: Vec::new(),
466            resource_strategy: ld.resource_strategy,
467            route_interface: Some(receiving_interface),
468            route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
469                packet.transport_id
470            } else {
471                None
472            },
473        };
474        self.links.insert(link_id, managed);
475
476        // Build LRPROOF packet: type=PROOF, context=LRPROOF, dest=link_id
477        let flags = PacketFlags {
478            header_type: constants::HEADER_1,
479            context_flag: constants::FLAG_UNSET,
480            transport_type: constants::TRANSPORT_BROADCAST,
481            destination_type: constants::DESTINATION_LINK,
482            packet_type: constants::PACKET_TYPE_PROOF,
483        };
484
485        let mut actions = Vec::new();
486
487        // Register link_id as local destination so we receive link data
488        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
489
490        if let Ok(pkt) = RawPacket::pack(
491            flags,
492            packet.hops,
493            &link_id,
494            None,
495            constants::CONTEXT_LRPROOF,
496            &lrproof_data,
497        ) {
498            log::debug!(
499                "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
500                &link_id[..4],
501                receiving_interface.0,
502                packet.transport_id.is_some(),
503                packet.hops
504            );
505            actions.push(LinkManagerAction::SendPacket {
506                raw: pkt.raw,
507                dest_type: constants::DESTINATION_LINK,
508                attached_interface: None,
509            });
510        }
511
512        // Reticulum interop fallback #1: queue an LRPROOF variant with
513        // hop=0, matching peers that validate LRPROOF with hop=0 semantics.
514        if packet.hops != 0 {
515            if let Ok(pkt0) = RawPacket::pack(
516                flags,
517                0,
518                &link_id,
519                None,
520                constants::CONTEXT_LRPROOF,
521                &lrproof_data,
522            ) {
523                log::debug!(
524                    "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
525                    &link_id[..4],
526                    receiving_interface.0
527                );
528                actions.push(LinkManagerAction::SendPacket {
529                    raw: pkt0.raw,
530                    dest_type: constants::DESTINATION_LINK,
531                    attached_interface: None,
532                });
533            }
534        }
535
536        // Reticulum interop fallback #2: queue an LRPROOF +1 hop variant for
537        // peers that validate against a remaining-hop value derived
538        // differently from destination-side delivery hops.
539        if packet.hops < u8::MAX {
540            let hops_plus_one = packet.hops + 1;
541            if let Ok(pkt2) = RawPacket::pack(
542                flags,
543                hops_plus_one,
544                &link_id,
545                None,
546                constants::CONTEXT_LRPROOF,
547                &lrproof_data,
548            ) {
549                log::debug!(
550                    "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
551                    &link_id[..4],
552                    receiving_interface.0,
553                    hops_plus_one
554                );
555                actions.push(LinkManagerAction::SendPacket {
556                    raw: pkt2.raw,
557                    dest_type: constants::DESTINATION_LINK,
558                    attached_interface: None,
559                });
560            }
561        }
562
563        // Notify hook system about the incoming link request
564        actions.push(LinkManagerAction::LinkRequestReceived {
565            link_id,
566            receiving_interface,
567        });
568
569        actions
570    }
571
572    fn handle_link_proof(
573        &mut self,
574        link_id: &LinkId,
575        packet: &RawPacket,
576        rng: &mut dyn Rng,
577    ) -> Vec<LinkManagerAction> {
578        if packet.data.len() < 32 {
579            return Vec::new();
580        }
581
582        let mut tracked_hash = [0u8; 32];
583        tracked_hash.copy_from_slice(&packet.data[..32]);
584
585        let Some(link) = self.links.get_mut(link_id) else {
586            return Vec::new();
587        };
588        let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
589            return Vec::new();
590        };
591        link.channel_proofs_received += 1;
592        let Some(channel) = link.channel.as_mut() else {
593            return Vec::new();
594        };
595
596        let chan_actions = channel.packet_delivered(sequence);
597        let _ = channel;
598        let _ = link;
599        self.process_channel_actions(link_id, chan_actions, rng)
600    }
601
602    fn build_link_packet_proof(
603        &mut self,
604        link_id: &LinkId,
605        packet_hash: &[u8; 32],
606    ) -> Vec<LinkManagerAction> {
607        let dest_hash = match self.links.get(link_id) {
608            Some(link) => link.dest_hash,
609            None => return Vec::new(),
610        };
611        let Some(ld) = self.link_destinations.get(&dest_hash) else {
612            return Vec::new();
613        };
614        if let Some(link) = self.links.get_mut(link_id) {
615            link.channel_proofs_sent += 1;
616        }
617
618        let signature = ld.sig_prv.sign(packet_hash);
619        let mut proof_data = Vec::with_capacity(96);
620        proof_data.extend_from_slice(packet_hash);
621        proof_data.extend_from_slice(&signature);
622
623        let flags = PacketFlags {
624            header_type: constants::HEADER_1,
625            context_flag: constants::FLAG_UNSET,
626            transport_type: constants::TRANSPORT_BROADCAST,
627            destination_type: constants::DESTINATION_LINK,
628            packet_type: constants::PACKET_TYPE_PROOF,
629        };
630        if let Ok(pkt) = RawPacket::pack(
631            flags,
632            0,
633            link_id,
634            None,
635            constants::CONTEXT_NONE,
636            &proof_data,
637        ) {
638            vec![LinkManagerAction::SendPacket {
639                raw: pkt.raw,
640                dest_type: constants::DESTINATION_LINK,
641                attached_interface: None,
642            }]
643        } else {
644            Vec::new()
645        }
646    }
647
648    /// Handle an incoming LRPROOF packet (initiator side).
649    fn handle_lrproof(
650        &mut self,
651        link_id_bytes: &[u8; 16],
652        packet: &RawPacket,
653        receiving_interface: rns_core::transport::types::InterfaceId,
654        rng: &mut dyn Rng,
655    ) -> Vec<LinkManagerAction> {
656        let link = match self.links.get_mut(link_id_bytes) {
657            Some(l) => l,
658            None => return Vec::new(),
659        };
660
661        link.route_interface = Some(receiving_interface);
662        if packet.flags.header_type == constants::HEADER_2 {
663            if let Some(transport_id) = packet.transport_id {
664                link.route_transport_id = Some(transport_id);
665            }
666        }
667        log::debug!(
668            "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
669            &link_id_bytes[..4],
670            receiving_interface.0,
671            packet.flags.header_type,
672            packet.transport_id.is_some()
673        );
674
675        if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
676            return Vec::new();
677        }
678
679        // The destination's signing pub key was stored when create_link was called
680        let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
681            Some(b) => b,
682            None => {
683                log::debug!("LRPROOF: no destination signing key available");
684                return Vec::new();
685            }
686        };
687
688        let now = time::now();
689        let (lrrtt_encrypted, link_actions) =
690            match link
691                .engine
692                .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
693            {
694                Ok(r) => r,
695                Err(e) => {
696                    log::debug!("LRPROOF validation failed: {}", e);
697                    return Vec::new();
698                }
699            };
700
701        let link_id = *link.engine.link_id();
702        let mut actions = Vec::new();
703
704        // Process link actions (StateChanged, LinkEstablished)
705        actions.extend(self.process_link_actions(&link_id, &link_actions));
706
707        // Send LRRTT: type=DATA, context=LRRTT, dest=link_id
708        let flags = PacketFlags {
709            header_type: constants::HEADER_1,
710            context_flag: constants::FLAG_UNSET,
711            transport_type: constants::TRANSPORT_BROADCAST,
712            destination_type: constants::DESTINATION_LINK,
713            packet_type: constants::PACKET_TYPE_DATA,
714        };
715
716        if let Ok(pkt) = RawPacket::pack(
717            flags,
718            0,
719            &link_id,
720            None,
721            constants::CONTEXT_LRRTT,
722            &lrrtt_encrypted,
723        ) {
724            actions.push(LinkManagerAction::SendPacket {
725                raw: pkt.raw,
726                dest_type: constants::DESTINATION_LINK,
727                attached_interface: None,
728            });
729        }
730
731        // Initialize channel now that link is active
732        if let Some(link) = self.links.get_mut(&link_id) {
733            if link.engine.state() == LinkState::Active {
734                let rtt = link.engine.rtt().unwrap_or(1.0);
735                link.channel = Some(Channel::new(rtt));
736            }
737        }
738
739        actions
740    }
741
742    /// Handle DATA packets on an established link.
743    ///
744    /// Structured to avoid borrow checker issues: we perform engine operations
745    /// on the link, collect intermediate results, drop the mutable borrow, then
746    /// call self methods that need immutable access.
747    fn handle_link_data(
748        &mut self,
749        link_id_bytes: &[u8; 16],
750        packet: &RawPacket,
751        packet_hash: [u8; 32],
752        receiving_interface: rns_core::transport::types::InterfaceId,
753        rng: &mut dyn Rng,
754    ) -> Vec<LinkManagerAction> {
755        // First pass: perform engine operations, collect results
756        enum LinkDataResult {
757            Lrrtt {
758                link_id: LinkId,
759                link_actions: Vec<LinkAction>,
760            },
761            Identify {
762                link_id: LinkId,
763                link_actions: Vec<LinkAction>,
764            },
765            Keepalive {
766                link_id: LinkId,
767                inbound_actions: Vec<LinkAction>,
768            },
769            LinkClose {
770                link_id: LinkId,
771                teardown_actions: Vec<LinkAction>,
772            },
773            Channel {
774                link_id: LinkId,
775                inbound_actions: Vec<LinkAction>,
776                plaintext: Vec<u8>,
777                packet_hash: [u8; 32],
778            },
779            Request {
780                link_id: LinkId,
781                inbound_actions: Vec<LinkAction>,
782                plaintext: Vec<u8>,
783                request_id: [u8; 16],
784            },
785            Response {
786                link_id: LinkId,
787                inbound_actions: Vec<LinkAction>,
788                plaintext: Vec<u8>,
789            },
790            Generic {
791                link_id: LinkId,
792                inbound_actions: Vec<LinkAction>,
793                plaintext: Vec<u8>,
794                context: u8,
795                packet_hash: [u8; 32],
796            },
797            /// Resource advertisement (link-decrypted).
798            ResourceAdv {
799                link_id: LinkId,
800                inbound_actions: Vec<LinkAction>,
801                plaintext: Vec<u8>,
802            },
803            /// Resource part request (link-decrypted).
804            ResourceReq {
805                link_id: LinkId,
806                inbound_actions: Vec<LinkAction>,
807                plaintext: Vec<u8>,
808            },
809            /// Resource hashmap update (link-decrypted).
810            ResourceHmu {
811                link_id: LinkId,
812                inbound_actions: Vec<LinkAction>,
813                plaintext: Vec<u8>,
814            },
815            /// Resource part data (NOT link-decrypted; parts are pre-encrypted by ResourceSender).
816            ResourcePart {
817                link_id: LinkId,
818                inbound_actions: Vec<LinkAction>,
819                raw_data: Vec<u8>,
820            },
821            /// Resource proof (feed to sender).
822            ResourcePrf {
823                link_id: LinkId,
824                inbound_actions: Vec<LinkAction>,
825                plaintext: Vec<u8>,
826            },
827            /// Resource cancel from initiator (link-decrypted).
828            ResourceIcl {
829                link_id: LinkId,
830                inbound_actions: Vec<LinkAction>,
831            },
832            /// Resource cancel from receiver (link-decrypted).
833            ResourceRcl {
834                link_id: LinkId,
835                inbound_actions: Vec<LinkAction>,
836            },
837            Error,
838        }
839
840        let result = {
841            let link = match self.links.get_mut(link_id_bytes) {
842                Some(l) => l,
843                None => return Vec::new(),
844            };
845
846            link.route_interface = Some(receiving_interface);
847            if packet.flags.header_type == constants::HEADER_2 {
848                if let Some(transport_id) = packet.transport_id {
849                    link.route_transport_id = Some(transport_id);
850                }
851            }
852
853            match packet.context {
854                constants::CONTEXT_LRRTT => {
855                    match link.engine.handle_lrrtt(&packet.data, time::now()) {
856                        Ok(link_actions) => {
857                            let link_id = *link.engine.link_id();
858                            LinkDataResult::Lrrtt {
859                                link_id,
860                                link_actions,
861                            }
862                        }
863                        Err(e) => {
864                            log::debug!("LRRTT handling failed: {}", e);
865                            LinkDataResult::Error
866                        }
867                    }
868                }
869                constants::CONTEXT_LINKIDENTIFY => {
870                    match link.engine.handle_identify(&packet.data) {
871                        Ok(link_actions) => {
872                            let link_id = *link.engine.link_id();
873                            link.remote_identity = link.engine.remote_identity().cloned();
874                            LinkDataResult::Identify {
875                                link_id,
876                                link_actions,
877                            }
878                        }
879                        Err(e) => {
880                            log::debug!("LINKIDENTIFY failed: {}", e);
881                            LinkDataResult::Error
882                        }
883                    }
884                }
885                constants::CONTEXT_KEEPALIVE => {
886                    let inbound_actions = link.engine.record_inbound(time::now());
887                    let link_id = *link.engine.link_id();
888                    LinkDataResult::Keepalive {
889                        link_id,
890                        inbound_actions,
891                    }
892                }
893                constants::CONTEXT_LINKCLOSE => {
894                    let teardown_actions = link.engine.handle_teardown();
895                    let link_id = *link.engine.link_id();
896                    LinkDataResult::LinkClose {
897                        link_id,
898                        teardown_actions,
899                    }
900                }
901                constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
902                    Ok(plaintext) => {
903                        let inbound_actions = link.engine.record_inbound(time::now());
904                        let link_id = *link.engine.link_id();
905                        LinkDataResult::Channel {
906                            link_id,
907                            inbound_actions,
908                            plaintext,
909                            packet_hash,
910                        }
911                    }
912                    Err(_) => LinkDataResult::Error,
913                },
914                constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
915                    Ok(plaintext) => {
916                        let inbound_actions = link.engine.record_inbound(time::now());
917                        let link_id = *link.engine.link_id();
918                        let request_id = packet.get_truncated_hash();
919                        LinkDataResult::Request {
920                            link_id,
921                            inbound_actions,
922                            plaintext,
923                            request_id,
924                        }
925                    }
926                    Err(_) => LinkDataResult::Error,
927                },
928                constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
929                    Ok(plaintext) => {
930                        let inbound_actions = link.engine.record_inbound(time::now());
931                        let link_id = *link.engine.link_id();
932                        LinkDataResult::Response {
933                            link_id,
934                            inbound_actions,
935                            plaintext,
936                        }
937                    }
938                    Err(_) => LinkDataResult::Error,
939                },
940                // --- Resource contexts ---
941                constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
942                    Ok(plaintext) => {
943                        let inbound_actions = link.engine.record_inbound(time::now());
944                        let link_id = *link.engine.link_id();
945                        LinkDataResult::ResourceAdv {
946                            link_id,
947                            inbound_actions,
948                            plaintext,
949                        }
950                    }
951                    Err(_) => LinkDataResult::Error,
952                },
953                constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
954                    Ok(plaintext) => {
955                        let inbound_actions = link.engine.record_inbound(time::now());
956                        let link_id = *link.engine.link_id();
957                        LinkDataResult::ResourceReq {
958                            link_id,
959                            inbound_actions,
960                            plaintext,
961                        }
962                    }
963                    Err(_) => LinkDataResult::Error,
964                },
965                constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
966                    Ok(plaintext) => {
967                        let inbound_actions = link.engine.record_inbound(time::now());
968                        let link_id = *link.engine.link_id();
969                        LinkDataResult::ResourceHmu {
970                            link_id,
971                            inbound_actions,
972                            plaintext,
973                        }
974                    }
975                    Err(_) => LinkDataResult::Error,
976                },
977                constants::CONTEXT_RESOURCE => {
978                    // Resource parts are NOT link-decrypted — they're pre-encrypted by ResourceSender
979                    let inbound_actions = link.engine.record_inbound(time::now());
980                    let link_id = *link.engine.link_id();
981                    LinkDataResult::ResourcePart {
982                        link_id,
983                        inbound_actions,
984                        raw_data: packet.data.clone(),
985                    }
986                }
987                constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
988                    Ok(plaintext) => {
989                        let inbound_actions = link.engine.record_inbound(time::now());
990                        let link_id = *link.engine.link_id();
991                        LinkDataResult::ResourcePrf {
992                            link_id,
993                            inbound_actions,
994                            plaintext,
995                        }
996                    }
997                    Err(_) => LinkDataResult::Error,
998                },
999                constants::CONTEXT_RESOURCE_ICL => {
1000                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1001                    let inbound_actions = link.engine.record_inbound(time::now());
1002                    let link_id = *link.engine.link_id();
1003                    LinkDataResult::ResourceIcl {
1004                        link_id,
1005                        inbound_actions,
1006                    }
1007                }
1008                constants::CONTEXT_RESOURCE_RCL => {
1009                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1010                    let inbound_actions = link.engine.record_inbound(time::now());
1011                    let link_id = *link.engine.link_id();
1012                    LinkDataResult::ResourceRcl {
1013                        link_id,
1014                        inbound_actions,
1015                    }
1016                }
1017                _ => match link.engine.decrypt(&packet.data) {
1018                    Ok(plaintext) => {
1019                        let inbound_actions = link.engine.record_inbound(time::now());
1020                        let link_id = *link.engine.link_id();
1021                        LinkDataResult::Generic {
1022                            link_id,
1023                            inbound_actions,
1024                            plaintext,
1025                            context: packet.context,
1026                            packet_hash,
1027                        }
1028                    }
1029                    Err(_) => LinkDataResult::Error,
1030                },
1031            }
1032        }; // mutable borrow of self.links dropped here
1033
1034        // Second pass: process results using self methods
1035        let mut actions = Vec::new();
1036        match result {
1037            LinkDataResult::Lrrtt {
1038                link_id,
1039                link_actions,
1040            } => {
1041                actions.extend(self.process_link_actions(&link_id, &link_actions));
1042                // Initialize channel
1043                if let Some(link) = self.links.get_mut(&link_id) {
1044                    if link.engine.state() == LinkState::Active {
1045                        let rtt = link.engine.rtt().unwrap_or(1.0);
1046                        link.channel = Some(Channel::new(rtt));
1047                    }
1048                }
1049            }
1050            LinkDataResult::Identify {
1051                link_id,
1052                link_actions,
1053            } => {
1054                actions.extend(self.process_link_actions(&link_id, &link_actions));
1055            }
1056            LinkDataResult::Keepalive {
1057                link_id,
1058                inbound_actions,
1059            } => {
1060                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1061                // record_inbound() already updated last_inbound, so the link
1062                // won't go stale.  The regular tick() keepalive mechanism will
1063                // send keepalives when needs_keepalive() returns true.
1064                // Do NOT reply here — unconditional replies create an infinite
1065                // ping-pong loop between the two link endpoints.
1066            }
1067            LinkDataResult::LinkClose {
1068                link_id,
1069                teardown_actions,
1070            } => {
1071                actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1072            }
1073            LinkDataResult::Channel {
1074                link_id,
1075                inbound_actions,
1076                plaintext,
1077                packet_hash,
1078            } => {
1079                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1080                // Feed plaintext to channel
1081                if let Some(link) = self.links.get_mut(&link_id) {
1082                    if let Some(ref mut channel) = link.channel {
1083                        let chan_actions = channel.receive(&plaintext, time::now());
1084                        link.channel_messages_received += chan_actions
1085                            .iter()
1086                            .filter(|action| {
1087                                matches!(
1088                                    action,
1089                                    rns_core::channel::ChannelAction::MessageReceived { .. }
1090                                )
1091                            })
1092                            .count()
1093                            as u64;
1094                        // process_channel_actions needs immutable self, so collect first
1095                        let _ = link;
1096                        actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1097                    }
1098                }
1099                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1100            }
1101            LinkDataResult::Request {
1102                link_id,
1103                inbound_actions,
1104                plaintext,
1105                request_id,
1106            } => {
1107                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1108                actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1109            }
1110            LinkDataResult::Response {
1111                link_id,
1112                inbound_actions,
1113                plaintext,
1114            } => {
1115                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1116                // Unpack msgpack response: [Bin(request_id), response_value]
1117                actions.extend(self.handle_response(&link_id, &plaintext));
1118            }
1119            LinkDataResult::Generic {
1120                link_id,
1121                inbound_actions,
1122                plaintext,
1123                context,
1124                packet_hash,
1125            } => {
1126                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1127                actions.push(LinkManagerAction::LinkDataReceived {
1128                    link_id,
1129                    context,
1130                    data: plaintext,
1131                });
1132
1133                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1134            }
1135            LinkDataResult::ResourceAdv {
1136                link_id,
1137                inbound_actions,
1138                plaintext,
1139            } => {
1140                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1141                actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1142            }
1143            LinkDataResult::ResourceReq {
1144                link_id,
1145                inbound_actions,
1146                plaintext,
1147            } => {
1148                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1149                actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1150            }
1151            LinkDataResult::ResourceHmu {
1152                link_id,
1153                inbound_actions,
1154                plaintext,
1155            } => {
1156                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1157                actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1158            }
1159            LinkDataResult::ResourcePart {
1160                link_id,
1161                inbound_actions,
1162                raw_data,
1163            } => {
1164                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1165                actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1166            }
1167            LinkDataResult::ResourcePrf {
1168                link_id,
1169                inbound_actions,
1170                plaintext,
1171            } => {
1172                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1173                actions.extend(self.handle_resource_prf(&link_id, &plaintext));
1174            }
1175            LinkDataResult::ResourceIcl {
1176                link_id,
1177                inbound_actions,
1178            } => {
1179                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1180                actions.extend(self.handle_resource_icl(&link_id));
1181            }
1182            LinkDataResult::ResourceRcl {
1183                link_id,
1184                inbound_actions,
1185            } => {
1186                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1187                actions.extend(self.handle_resource_rcl(&link_id));
1188            }
1189            LinkDataResult::Error => {}
1190        }
1191
1192        actions
1193    }
1194
1195    /// Handle a request on a link.
1196    fn handle_request(
1197        &mut self,
1198        link_id: &LinkId,
1199        plaintext: &[u8],
1200        request_id: [u8; 16],
1201        rng: &mut dyn Rng,
1202    ) -> Vec<LinkManagerAction> {
1203        use rns_core::msgpack::{self, Value};
1204
1205        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1206        let arr = match msgpack::unpack_exact(plaintext) {
1207            Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1208            _ => return Vec::new(),
1209        };
1210
1211        let path_hash_bytes = match &arr[1] {
1212            Value::Bin(b) if b.len() == 16 => b,
1213            _ => return Vec::new(),
1214        };
1215        let mut path_hash = [0u8; 16];
1216        path_hash.copy_from_slice(path_hash_bytes);
1217
1218        // Re-encode the data element for the handler
1219        let request_data = msgpack::pack(&arr[2]);
1220
1221        // Check if this is a management path (handled by the driver)
1222        if self.management_paths.contains(&path_hash) {
1223            let remote_identity = self
1224                .links
1225                .get(link_id)
1226                .and_then(|l| l.remote_identity)
1227                .map(|(h, k)| (h, k));
1228            return vec![LinkManagerAction::ManagementRequest {
1229                link_id: *link_id,
1230                path_hash,
1231                data: request_data,
1232                request_id,
1233                remote_identity,
1234            }];
1235        }
1236
1237        // Look up handler by path_hash
1238        let handler_idx = self
1239            .request_handlers
1240            .iter()
1241            .position(|h| h.path_hash == path_hash);
1242        let handler_idx = match handler_idx {
1243            Some(i) => i,
1244            None => return Vec::new(),
1245        };
1246
1247        // Check ACL
1248        let remote_identity = self
1249            .links
1250            .get(link_id)
1251            .and_then(|l| l.remote_identity.as_ref());
1252        let handler = &self.request_handlers[handler_idx];
1253        if let Some(ref allowed) = handler.allowed_list {
1254            match remote_identity {
1255                Some((identity_hash, _)) => {
1256                    if !allowed.contains(identity_hash) {
1257                        log::debug!("Request denied: identity not in allowed list");
1258                        return Vec::new();
1259                    }
1260                }
1261                None => {
1262                    log::debug!("Request denied: peer not identified");
1263                    return Vec::new();
1264                }
1265            }
1266        }
1267
1268        // Call handler
1269        let path = handler.path.clone();
1270        let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1271
1272        let mut actions = Vec::new();
1273        if let Some(response_data) = response {
1274            let mut response_actions =
1275                self.build_response_packet(link_id, &request_id, &response_data, rng);
1276            if response_actions.is_empty() {
1277                response_actions.extend(self.send_response_resource(
1278                    link_id,
1279                    &request_id,
1280                    &response_data,
1281                    rng,
1282                ));
1283            }
1284            actions.extend(response_actions);
1285        }
1286
1287        actions
1288    }
1289
1290    /// Build a response packet for a request.
1291    /// `response_data` is the msgpack-encoded response value.
1292    fn build_response_packet(
1293        &self,
1294        link_id: &LinkId,
1295        request_id: &[u8; 16],
1296        response_data: &[u8],
1297        rng: &mut dyn Rng,
1298    ) -> Vec<LinkManagerAction> {
1299        use rns_core::msgpack::{self, Value};
1300
1301        let response_value = msgpack::unpack_exact(response_data)
1302            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1303
1304        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1305        let response_plaintext = msgpack::pack(&response_array);
1306
1307        let mut actions = Vec::new();
1308        if let Some(link) = self.links.get(link_id) {
1309            if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1310                let flags = PacketFlags {
1311                    header_type: constants::HEADER_1,
1312                    context_flag: constants::FLAG_UNSET,
1313                    transport_type: constants::TRANSPORT_BROADCAST,
1314                    destination_type: constants::DESTINATION_LINK,
1315                    packet_type: constants::PACKET_TYPE_DATA,
1316                };
1317                let max_mtu = link.engine.mtu() as usize;
1318                if let Ok(pkt) = RawPacket::pack_with_max_mtu(
1319                    flags,
1320                    0,
1321                    link_id,
1322                    None,
1323                    constants::CONTEXT_RESPONSE,
1324                    &encrypted,
1325                    max_mtu,
1326                ) {
1327                    actions.push(LinkManagerAction::SendPacket {
1328                        raw: pkt.raw,
1329                        dest_type: constants::DESTINATION_LINK,
1330                        attached_interface: None,
1331                    });
1332                }
1333            }
1334        }
1335        actions
1336    }
1337
1338    fn send_response_resource(
1339        &mut self,
1340        link_id: &LinkId,
1341        request_id: &[u8; 16],
1342        response_data: &[u8],
1343        rng: &mut dyn Rng,
1344    ) -> Vec<LinkManagerAction> {
1345        use rns_core::msgpack::{self, Value};
1346
1347        let link = match self.links.get_mut(link_id) {
1348            Some(l) => l,
1349            None => return Vec::new(),
1350        };
1351
1352        if link.engine.state() != LinkState::Active {
1353            return Vec::new();
1354        }
1355
1356        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1357        let resource_sdu = Self::resource_sdu_for_link(link);
1358        let now = time::now();
1359
1360        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1361        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1362            link.engine
1363                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1364                .unwrap_or_else(|_| plaintext.to_vec())
1365        };
1366
1367        // Match Python resource response format from Link.handle_request:
1368        // packed_response = msgpack([request_id, response_value])
1369        // where response_value is decoded msgpack value, or Bin(raw bytes).
1370        let response_value = msgpack::unpack_exact(response_data)
1371            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1372        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1373        let resource_payload = msgpack::pack(&response_array);
1374
1375        let sender = match ResourceSender::new(
1376            &resource_payload,
1377            None,
1378            resource_sdu,
1379            &encrypt_fn,
1380            &Bzip2Compressor,
1381            rng,
1382            now,
1383            true, // auto_compress
1384            true, // is_response
1385            Some(request_id.to_vec()),
1386            1,    // segment_index
1387            1,    // total_segments
1388            None, // original_hash
1389            link_rtt,
1390            6.0, // traffic_timeout_factor
1391        ) {
1392            Ok(s) => s,
1393            Err(e) => {
1394                log::debug!("Failed to create response ResourceSender: {}", e);
1395                return Vec::new();
1396            }
1397        };
1398
1399        let mut sender = sender;
1400        let adv_actions = sender.advertise(now);
1401        link.outgoing_resources.push(sender);
1402
1403        let _ = link;
1404        self.process_resource_actions(link_id, adv_actions, rng)
1405    }
1406
1407    /// Send a management response on a link.
1408    /// Called by the driver after building the response for a ManagementRequest.
1409    pub fn send_management_response(
1410        &mut self,
1411        link_id: &LinkId,
1412        request_id: &[u8; 16],
1413        response_data: &[u8],
1414        rng: &mut dyn Rng,
1415    ) -> Vec<LinkManagerAction> {
1416        let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1417        if actions.is_empty() {
1418            actions.extend(self.send_response_resource(link_id, request_id, response_data, rng));
1419        }
1420        actions
1421    }
1422
1423    /// Send a request on a link.
1424    ///
1425    /// `data` is the msgpack-encoded request data value (e.g. msgpack([True]) for /status).
1426    ///
1427    /// Uses Python-compatible format: plaintext = msgpack([timestamp, path_hash_bytes, data_value]).
1428    /// Returns actions (the encrypted request packet). The response will arrive
1429    /// later via handle_local_delivery with CONTEXT_RESPONSE.
1430    pub fn send_request(
1431        &self,
1432        link_id: &LinkId,
1433        path: &str,
1434        data: &[u8],
1435        rng: &mut dyn Rng,
1436    ) -> Vec<LinkManagerAction> {
1437        use rns_core::msgpack::{self, Value};
1438
1439        let link = match self.links.get(link_id) {
1440            Some(l) => l,
1441            None => return Vec::new(),
1442        };
1443
1444        if link.engine.state() != LinkState::Active {
1445            return Vec::new();
1446        }
1447
1448        let path_hash = compute_path_hash(path);
1449
1450        // Decode data bytes to msgpack Value (or use Bin if can't decode)
1451        let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1452
1453        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1454        let request_array = Value::Array(vec![
1455            Value::Float(time::now()),
1456            Value::Bin(path_hash.to_vec()),
1457            data_value,
1458        ]);
1459        let plaintext = msgpack::pack(&request_array);
1460
1461        let encrypted = match link.engine.encrypt(&plaintext, rng) {
1462            Ok(e) => e,
1463            Err(_) => return Vec::new(),
1464        };
1465
1466        let flags = PacketFlags {
1467            header_type: constants::HEADER_1,
1468            context_flag: constants::FLAG_UNSET,
1469            transport_type: constants::TRANSPORT_BROADCAST,
1470            destination_type: constants::DESTINATION_LINK,
1471            packet_type: constants::PACKET_TYPE_DATA,
1472        };
1473
1474        let mut actions = Vec::new();
1475        if let Ok(pkt) = RawPacket::pack(
1476            flags,
1477            0,
1478            link_id,
1479            None,
1480            constants::CONTEXT_REQUEST,
1481            &encrypted,
1482        ) {
1483            actions.push(LinkManagerAction::SendPacket {
1484                raw: pkt.raw,
1485                dest_type: constants::DESTINATION_LINK,
1486                attached_interface: None,
1487            });
1488        }
1489        actions
1490    }
1491
1492    /// Send encrypted data on a link with a given context.
1493    pub fn send_on_link(
1494        &self,
1495        link_id: &LinkId,
1496        plaintext: &[u8],
1497        context: u8,
1498        rng: &mut dyn Rng,
1499    ) -> Vec<LinkManagerAction> {
1500        let link = match self.links.get(link_id) {
1501            Some(l) => l,
1502            None => return Vec::new(),
1503        };
1504
1505        if link.engine.state() != LinkState::Active {
1506            return Vec::new();
1507        }
1508
1509        let encrypted = match link.engine.encrypt(plaintext, rng) {
1510            Ok(e) => e,
1511            Err(_) => return Vec::new(),
1512        };
1513
1514        let flags = PacketFlags {
1515            header_type: constants::HEADER_1,
1516            context_flag: constants::FLAG_UNSET,
1517            transport_type: constants::TRANSPORT_BROADCAST,
1518            destination_type: constants::DESTINATION_LINK,
1519            packet_type: constants::PACKET_TYPE_DATA,
1520        };
1521
1522        let mut actions = Vec::new();
1523        if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, context, &encrypted) {
1524            actions.push(LinkManagerAction::SendPacket {
1525                raw: pkt.raw,
1526                dest_type: constants::DESTINATION_LINK,
1527                attached_interface: None,
1528            });
1529        }
1530        actions
1531    }
1532
1533    /// Send an identify message on a link (initiator reveals identity to responder).
1534    pub fn identify(
1535        &self,
1536        link_id: &LinkId,
1537        identity: &rns_crypto::identity::Identity,
1538        rng: &mut dyn Rng,
1539    ) -> Vec<LinkManagerAction> {
1540        let link = match self.links.get(link_id) {
1541            Some(l) => l,
1542            None => return Vec::new(),
1543        };
1544
1545        let encrypted = match link.engine.build_identify(identity, rng) {
1546            Ok(e) => e,
1547            Err(_) => return Vec::new(),
1548        };
1549
1550        let flags = PacketFlags {
1551            header_type: constants::HEADER_1,
1552            context_flag: constants::FLAG_UNSET,
1553            transport_type: constants::TRANSPORT_BROADCAST,
1554            destination_type: constants::DESTINATION_LINK,
1555            packet_type: constants::PACKET_TYPE_DATA,
1556        };
1557
1558        let mut actions = Vec::new();
1559        if let Ok(pkt) = RawPacket::pack(
1560            flags,
1561            0,
1562            link_id,
1563            None,
1564            constants::CONTEXT_LINKIDENTIFY,
1565            &encrypted,
1566        ) {
1567            actions.push(LinkManagerAction::SendPacket {
1568                raw: pkt.raw,
1569                dest_type: constants::DESTINATION_LINK,
1570                attached_interface: None,
1571            });
1572        }
1573        actions
1574    }
1575
1576    /// Tear down a link.
1577    pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1578        let link = match self.links.get_mut(link_id) {
1579            Some(l) => l,
1580            None => return Vec::new(),
1581        };
1582
1583        let teardown_actions = link.engine.teardown();
1584        if let Some(ref mut channel) = link.channel {
1585            channel.shutdown();
1586        }
1587
1588        let mut actions = self.process_link_actions(link_id, &teardown_actions);
1589
1590        // Send LINKCLOSE packet
1591        let flags = PacketFlags {
1592            header_type: constants::HEADER_1,
1593            context_flag: constants::FLAG_UNSET,
1594            transport_type: constants::TRANSPORT_BROADCAST,
1595            destination_type: constants::DESTINATION_LINK,
1596            packet_type: constants::PACKET_TYPE_DATA,
1597        };
1598        if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_LINKCLOSE, &[])
1599        {
1600            actions.push(LinkManagerAction::SendPacket {
1601                raw: pkt.raw,
1602                dest_type: constants::DESTINATION_LINK,
1603                attached_interface: None,
1604            });
1605        }
1606
1607        actions
1608    }
1609
1610    /// Tear down all managed links.
1611    pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1612        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1613        let mut actions = Vec::new();
1614        for link_id in link_ids {
1615            actions.extend(self.teardown_link(&link_id));
1616        }
1617        actions
1618    }
1619
1620    /// Handle a response on a link.
1621    fn handle_response(&self, link_id: &LinkId, plaintext: &[u8]) -> Vec<LinkManagerAction> {
1622        use rns_core::msgpack;
1623
1624        // Python-compatible response: msgpack([Bin(request_id), response_value])
1625        let arr = match msgpack::unpack_exact(plaintext) {
1626            Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1627            _ => return Vec::new(),
1628        };
1629
1630        let request_id_bytes = match &arr[0] {
1631            msgpack::Value::Bin(b) if b.len() == 16 => b,
1632            _ => return Vec::new(),
1633        };
1634        let mut request_id = [0u8; 16];
1635        request_id.copy_from_slice(request_id_bytes);
1636
1637        let response_data = msgpack::pack(&arr[1]);
1638
1639        vec![LinkManagerAction::ResponseReceived {
1640            link_id: *link_id,
1641            request_id,
1642            data: response_data,
1643        }]
1644    }
1645
1646    /// Handle resource advertisement (CONTEXT_RESOURCE_ADV).
1647    fn handle_resource_adv(
1648        &mut self,
1649        link_id: &LinkId,
1650        adv_plaintext: &[u8],
1651        rng: &mut dyn Rng,
1652    ) -> Vec<LinkManagerAction> {
1653        let link = match self.links.get_mut(link_id) {
1654            Some(l) => l,
1655            None => return Vec::new(),
1656        };
1657
1658        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1659        let resource_sdu = Self::resource_sdu_for_link(link);
1660        let now = time::now();
1661
1662        let receiver = match ResourceReceiver::from_advertisement(
1663            adv_plaintext,
1664            resource_sdu,
1665            link_rtt,
1666            now,
1667            None,
1668            None,
1669        ) {
1670            Ok(r) => r,
1671            Err(e) => {
1672                log::debug!("Resource ADV rejected: {}", e);
1673                return Vec::new();
1674            }
1675        };
1676
1677        let strategy = link.resource_strategy;
1678        let resource_hash = receiver.resource_hash.clone();
1679        let transfer_size = receiver.transfer_size;
1680        let has_metadata = receiver.has_metadata;
1681        let is_response = receiver.flags.is_response;
1682
1683        if is_response {
1684            // Response resources bypass the application acceptance strategy —
1685            // they are answers to pending requests, not independent resources.
1686            link.incoming_resources.push(receiver);
1687            let idx = link.incoming_resources.len() - 1;
1688            let resource_actions = link.incoming_resources[idx].accept(now);
1689            let _ = link;
1690            return self.process_resource_actions(link_id, resource_actions, rng);
1691        }
1692
1693        match strategy {
1694            ResourceStrategy::AcceptNone => {
1695                // Reject: send RCL
1696                let reject_actions = {
1697                    let mut r = receiver;
1698                    r.reject()
1699                };
1700                self.process_resource_actions(link_id, reject_actions, rng)
1701            }
1702            ResourceStrategy::AcceptAll => {
1703                link.incoming_resources.push(receiver);
1704                let idx = link.incoming_resources.len() - 1;
1705                let resource_actions = link.incoming_resources[idx].accept(now);
1706                let _ = link;
1707                self.process_resource_actions(link_id, resource_actions, rng)
1708            }
1709            ResourceStrategy::AcceptApp => {
1710                link.incoming_resources.push(receiver);
1711                // Query application callback
1712                vec![LinkManagerAction::ResourceAcceptQuery {
1713                    link_id: *link_id,
1714                    resource_hash,
1715                    transfer_size,
1716                    has_metadata,
1717                }]
1718            }
1719        }
1720    }
1721
1722    /// Accept or reject a pending resource (for AcceptApp strategy).
1723    pub fn accept_resource(
1724        &mut self,
1725        link_id: &LinkId,
1726        resource_hash: &[u8],
1727        accept: bool,
1728        rng: &mut dyn Rng,
1729    ) -> Vec<LinkManagerAction> {
1730        let link = match self.links.get_mut(link_id) {
1731            Some(l) => l,
1732            None => return Vec::new(),
1733        };
1734
1735        let now = time::now();
1736        let idx = link
1737            .incoming_resources
1738            .iter()
1739            .position(|r| r.resource_hash == resource_hash);
1740        let idx = match idx {
1741            Some(i) => i,
1742            None => return Vec::new(),
1743        };
1744
1745        let resource_actions = if accept {
1746            link.incoming_resources[idx].accept(now)
1747        } else {
1748            link.incoming_resources[idx].reject()
1749        };
1750
1751        let _ = link;
1752        self.process_resource_actions(link_id, resource_actions, rng)
1753    }
1754
1755    /// Handle resource request (CONTEXT_RESOURCE_REQ) — feed to sender.
1756    fn handle_resource_req(
1757        &mut self,
1758        link_id: &LinkId,
1759        plaintext: &[u8],
1760        rng: &mut dyn Rng,
1761    ) -> Vec<LinkManagerAction> {
1762        let link = match self.links.get_mut(link_id) {
1763            Some(l) => l,
1764            None => return Vec::new(),
1765        };
1766
1767        let now = time::now();
1768        let mut all_actions = Vec::new();
1769        for sender in &mut link.outgoing_resources {
1770            let resource_actions = sender.handle_request(plaintext, now);
1771            if !resource_actions.is_empty() {
1772                all_actions.extend(resource_actions);
1773                break;
1774            }
1775        }
1776
1777        let _ = link;
1778        self.process_resource_actions(link_id, all_actions, rng)
1779    }
1780
1781    /// Handle resource HMU (CONTEXT_RESOURCE_HMU) — feed to receiver.
1782    fn handle_resource_hmu(
1783        &mut self,
1784        link_id: &LinkId,
1785        plaintext: &[u8],
1786        rng: &mut dyn Rng,
1787    ) -> Vec<LinkManagerAction> {
1788        let link = match self.links.get_mut(link_id) {
1789            Some(l) => l,
1790            None => return Vec::new(),
1791        };
1792
1793        let now = time::now();
1794        let mut all_actions = Vec::new();
1795        for receiver in &mut link.incoming_resources {
1796            let resource_actions = receiver.handle_hashmap_update(plaintext, now);
1797            if !resource_actions.is_empty() {
1798                all_actions.extend(resource_actions);
1799                break;
1800            }
1801        }
1802
1803        let _ = link;
1804        self.process_resource_actions(link_id, all_actions, rng)
1805    }
1806
1807    /// Handle resource part (CONTEXT_RESOURCE) — feed raw to receiver.
1808    fn handle_resource_part(
1809        &mut self,
1810        link_id: &LinkId,
1811        raw_data: &[u8],
1812        rng: &mut dyn Rng,
1813    ) -> Vec<LinkManagerAction> {
1814        let link = match self.links.get_mut(link_id) {
1815            Some(l) => l,
1816            None => return Vec::new(),
1817        };
1818
1819        let now = time::now();
1820        let mut all_actions = Vec::new();
1821        let mut assemble_idx = None;
1822        let mut assembled_is_response = false;
1823
1824        for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
1825            let resource_actions = receiver.receive_part(raw_data, now);
1826            if !resource_actions.is_empty() {
1827                if receiver.received_count == receiver.total_parts {
1828                    assemble_idx = Some(idx);
1829                }
1830                all_actions.extend(resource_actions);
1831                break;
1832            }
1833        }
1834
1835        if let Some(idx) = assemble_idx {
1836            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
1837                link.engine.decrypt(ciphertext).map_err(|_| ())
1838            };
1839            let assemble_actions =
1840                link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
1841            assembled_is_response = link.incoming_resources[idx].flags.is_response;
1842            all_actions.extend(assemble_actions);
1843        }
1844
1845        let _ = link;
1846        let mut out = self.process_resource_actions(link_id, all_actions, rng);
1847
1848        if assembled_is_response {
1849            let mut converted = Vec::new();
1850            for action in out {
1851                match action {
1852                    LinkManagerAction::ResourceReceived { data, .. } => {
1853                        converted.extend(self.handle_response(link_id, &data));
1854                    }
1855                    LinkManagerAction::ResourceAcceptQuery { .. } => {
1856                        // Response resources bypass application acceptance
1857                    }
1858                    other => converted.push(other),
1859                }
1860            }
1861            out = converted;
1862        }
1863
1864        out
1865    }
1866
1867    /// Handle resource proof (CONTEXT_RESOURCE_PRF) — feed to sender.
1868    fn handle_resource_prf(
1869        &mut self,
1870        link_id: &LinkId,
1871        plaintext: &[u8],
1872    ) -> Vec<LinkManagerAction> {
1873        let link = match self.links.get_mut(link_id) {
1874            Some(l) => l,
1875            None => return Vec::new(),
1876        };
1877
1878        let now = time::now();
1879        let mut result_actions = Vec::new();
1880        for sender in &mut link.outgoing_resources {
1881            let resource_actions = sender.handle_proof(plaintext, now);
1882            if !resource_actions.is_empty() {
1883                result_actions.extend(resource_actions);
1884                break;
1885            }
1886        }
1887
1888        // Convert to LinkManagerActions
1889        let mut actions = Vec::new();
1890        for ra in result_actions {
1891            match ra {
1892                ResourceAction::Completed => {
1893                    actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
1894                }
1895                ResourceAction::Failed(e) => {
1896                    actions.push(LinkManagerAction::ResourceFailed {
1897                        link_id: *link_id,
1898                        error: format!("{}", e),
1899                    });
1900                }
1901                _ => {}
1902            }
1903        }
1904
1905        // Clean up completed/failed senders
1906        link.outgoing_resources
1907            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1908
1909        actions
1910    }
1911
1912    /// Handle cancel from initiator (CONTEXT_RESOURCE_ICL).
1913    fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1914        let link = match self.links.get_mut(link_id) {
1915            Some(l) => l,
1916            None => return Vec::new(),
1917        };
1918
1919        let mut actions = Vec::new();
1920        for receiver in &mut link.incoming_resources {
1921            let ra = receiver.handle_cancel();
1922            for a in ra {
1923                if let ResourceAction::Failed(ref e) = a {
1924                    actions.push(LinkManagerAction::ResourceFailed {
1925                        link_id: *link_id,
1926                        error: format!("{}", e),
1927                    });
1928                }
1929            }
1930        }
1931        link.incoming_resources
1932            .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
1933        actions
1934    }
1935
1936    /// Handle cancel from receiver (CONTEXT_RESOURCE_RCL).
1937    fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1938        let link = match self.links.get_mut(link_id) {
1939            Some(l) => l,
1940            None => return Vec::new(),
1941        };
1942
1943        let mut actions = Vec::new();
1944        for sender in &mut link.outgoing_resources {
1945            let ra = sender.handle_reject();
1946            for a in ra {
1947                if let ResourceAction::Failed(ref e) = a {
1948                    actions.push(LinkManagerAction::ResourceFailed {
1949                        link_id: *link_id,
1950                        error: format!("{}", e),
1951                    });
1952                }
1953            }
1954        }
1955        link.outgoing_resources
1956            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1957        actions
1958    }
1959
1960    /// Convert ResourceActions to LinkManagerActions.
1961    fn process_resource_actions(
1962        &mut self,
1963        link_id: &LinkId,
1964        actions: Vec<ResourceAction>,
1965        rng: &mut dyn Rng,
1966    ) -> Vec<LinkManagerAction> {
1967        let mut result = Vec::new();
1968        for action in actions {
1969            match action {
1970                ResourceAction::SendAdvertisement(data) => {
1971                    // Link-encrypt and send as CONTEXT_RESOURCE_ADV
1972                    let encrypted = self
1973                        .links
1974                        .get(link_id)
1975                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
1976                    if let Some(encrypted) = encrypted {
1977                        result.extend(self.build_link_packet(
1978                            link_id,
1979                            constants::CONTEXT_RESOURCE_ADV,
1980                            &encrypted,
1981                        ));
1982                    }
1983                }
1984                ResourceAction::SendPart(data) => {
1985                    // Parts are NOT link-encrypted — send raw as CONTEXT_RESOURCE
1986                    result.extend(self.build_link_packet(
1987                        link_id,
1988                        constants::CONTEXT_RESOURCE,
1989                        &data,
1990                    ));
1991                }
1992                ResourceAction::SendRequest(data) => {
1993                    let encrypted = self
1994                        .links
1995                        .get(link_id)
1996                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
1997                    if let Some(encrypted) = encrypted {
1998                        result.extend(self.build_link_packet(
1999                            link_id,
2000                            constants::CONTEXT_RESOURCE_REQ,
2001                            &encrypted,
2002                        ));
2003                    }
2004                }
2005                ResourceAction::SendHmu(data) => {
2006                    let encrypted = self
2007                        .links
2008                        .get(link_id)
2009                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2010                    if let Some(encrypted) = encrypted {
2011                        result.extend(self.build_link_packet(
2012                            link_id,
2013                            constants::CONTEXT_RESOURCE_HMU,
2014                            &encrypted,
2015                        ));
2016                    }
2017                }
2018                ResourceAction::SendProof(data) => {
2019                    let encrypted = self
2020                        .links
2021                        .get(link_id)
2022                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2023                    if let Some(encrypted) = encrypted {
2024                        result.extend(self.build_link_packet(
2025                            link_id,
2026                            constants::CONTEXT_RESOURCE_PRF,
2027                            &encrypted,
2028                        ));
2029                    }
2030                }
2031                ResourceAction::SendCancelInitiator(data) => {
2032                    let encrypted = self
2033                        .links
2034                        .get(link_id)
2035                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2036                    if let Some(encrypted) = encrypted {
2037                        result.extend(self.build_link_packet(
2038                            link_id,
2039                            constants::CONTEXT_RESOURCE_ICL,
2040                            &encrypted,
2041                        ));
2042                    }
2043                }
2044                ResourceAction::SendCancelReceiver(data) => {
2045                    let encrypted = self
2046                        .links
2047                        .get(link_id)
2048                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2049                    if let Some(encrypted) = encrypted {
2050                        result.extend(self.build_link_packet(
2051                            link_id,
2052                            constants::CONTEXT_RESOURCE_RCL,
2053                            &encrypted,
2054                        ));
2055                    }
2056                }
2057                ResourceAction::DataReceived { data, metadata } => {
2058                    result.push(LinkManagerAction::ResourceReceived {
2059                        link_id: *link_id,
2060                        data,
2061                        metadata,
2062                    });
2063                }
2064                ResourceAction::Completed => {
2065                    result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2066                }
2067                ResourceAction::Failed(e) => {
2068                    result.push(LinkManagerAction::ResourceFailed {
2069                        link_id: *link_id,
2070                        error: format!("{}", e),
2071                    });
2072                }
2073                ResourceAction::TeardownLink => {
2074                    let teardown_actions = match self.links.get_mut(link_id) {
2075                        Some(link) => link.engine.handle_teardown(),
2076                        None => Vec::new(),
2077                    };
2078                    result.extend(self.process_link_actions(link_id, &teardown_actions));
2079                }
2080                ResourceAction::ProgressUpdate { received, total } => {
2081                    result.push(LinkManagerAction::ResourceProgress {
2082                        link_id: *link_id,
2083                        received,
2084                        total,
2085                    });
2086                }
2087            }
2088        }
2089        result
2090    }
2091
2092    /// Build a link DATA packet with a given context and data.
2093    fn build_link_packet(
2094        &self,
2095        link_id: &LinkId,
2096        context: u8,
2097        data: &[u8],
2098    ) -> Vec<LinkManagerAction> {
2099        let flags = PacketFlags {
2100            header_type: constants::HEADER_1,
2101            context_flag: constants::FLAG_UNSET,
2102            transport_type: constants::TRANSPORT_BROADCAST,
2103            destination_type: constants::DESTINATION_LINK,
2104            packet_type: constants::PACKET_TYPE_DATA,
2105        };
2106        let mut actions = Vec::new();
2107        let max_mtu = self
2108            .links
2109            .get(link_id)
2110            .map(|l| l.engine.mtu() as usize)
2111            .unwrap_or(constants::MTU);
2112        if let Ok(pkt) =
2113            RawPacket::pack_with_max_mtu(flags, 0, link_id, None, context, data, max_mtu)
2114        {
2115            actions.push(LinkManagerAction::SendPacket {
2116                raw: pkt.raw,
2117                dest_type: constants::DESTINATION_LINK,
2118                attached_interface: None,
2119            });
2120        }
2121        actions
2122    }
2123
2124    /// Start sending a resource on a link.
2125    pub fn send_resource(
2126        &mut self,
2127        link_id: &LinkId,
2128        data: &[u8],
2129        metadata: Option<&[u8]>,
2130        rng: &mut dyn Rng,
2131    ) -> Vec<LinkManagerAction> {
2132        let link = match self.links.get_mut(link_id) {
2133            Some(l) => l,
2134            None => return Vec::new(),
2135        };
2136
2137        if link.engine.state() != LinkState::Active {
2138            return Vec::new();
2139        }
2140
2141        let link_rtt = link.engine.rtt().unwrap_or(1.0);
2142        let resource_sdu = Self::resource_sdu_for_link(link);
2143        let now = time::now();
2144
2145        // Use RefCell for interior mutability since ResourceSender::new expects &dyn Fn (not FnMut)
2146        // but link.engine.encrypt needs &mut dyn Rng
2147        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
2148        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
2149            link.engine
2150                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
2151                .unwrap_or_else(|_| plaintext.to_vec())
2152        };
2153
2154        let sender = match ResourceSender::new(
2155            data,
2156            metadata,
2157            resource_sdu,
2158            &encrypt_fn,
2159            &Bzip2Compressor,
2160            rng,
2161            now,
2162            true,  // auto_compress
2163            false, // is_response
2164            None,  // request_id
2165            1,     // segment_index
2166            1,     // total_segments
2167            None,  // original_hash
2168            link_rtt,
2169            6.0, // traffic_timeout_factor
2170        ) {
2171            Ok(s) => s,
2172            Err(e) => {
2173                log::debug!("Failed to create ResourceSender: {}", e);
2174                return Vec::new();
2175            }
2176        };
2177
2178        let mut sender = sender;
2179        let adv_actions = sender.advertise(now);
2180        link.outgoing_resources.push(sender);
2181
2182        let _ = link;
2183        self.process_resource_actions(link_id, adv_actions, rng)
2184    }
2185
2186    /// Set the resource acceptance strategy for a link.
2187    pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2188        if let Some(link) = self.links.get_mut(link_id) {
2189            link.resource_strategy = strategy;
2190        }
2191    }
2192
2193    /// Flush the channel TX ring for a link, clearing outstanding messages.
2194    /// Called after holepunch completion where signaling messages are fire-and-forget.
2195    pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2196        if let Some(link) = self.links.get_mut(link_id) {
2197            if let Some(ref mut channel) = link.channel {
2198                channel.flush_tx();
2199            }
2200        }
2201    }
2202
2203    /// Send a channel message on a link.
2204    pub fn send_channel_message(
2205        &mut self,
2206        link_id: &LinkId,
2207        msgtype: u16,
2208        payload: &[u8],
2209        rng: &mut dyn Rng,
2210    ) -> Result<Vec<LinkManagerAction>, String> {
2211        let link = match self.links.get_mut(link_id) {
2212            Some(l) => l,
2213            None => return Err("unknown link".to_string()),
2214        };
2215
2216        let channel = match link.channel {
2217            Some(ref mut ch) => ch,
2218            None => return Err("link has no active channel".to_string()),
2219        };
2220
2221        let link_mdu = link.engine.mdu();
2222        let now = time::now();
2223        let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2224            Ok(a) => {
2225                link.channel_send_ok += 1;
2226                a
2227            }
2228            Err(e) => {
2229                log::debug!("Channel send failed: {:?}", e);
2230                match e {
2231                    rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2232                    rns_core::channel::ChannelError::MessageTooBig => {
2233                        link.channel_send_too_big += 1;
2234                    }
2235                    rns_core::channel::ChannelError::InvalidEnvelope => {
2236                        link.channel_send_other_error += 1;
2237                    }
2238                }
2239                return Err(e.to_string());
2240            }
2241        };
2242
2243        let _ = link;
2244        Ok(self.process_channel_actions(link_id, chan_actions, rng))
2245    }
2246
2247    /// Periodic tick: check keepalive, stale, timeouts for all links.
2248    pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2249        let now = time::now();
2250        let mut all_actions = Vec::new();
2251
2252        // Collect link_ids to avoid borrow issues
2253        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2254
2255        for link_id in &link_ids {
2256            let link = match self.links.get_mut(link_id) {
2257                Some(l) => l,
2258                None => continue,
2259            };
2260
2261            // Tick the engine
2262            let tick_actions = link.engine.tick(now);
2263            all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2264
2265            // Check if keepalive is needed
2266            let link = match self.links.get_mut(link_id) {
2267                Some(l) => l,
2268                None => continue,
2269            };
2270            if link.engine.needs_keepalive(now) {
2271                // Send keepalive packet (empty data with CONTEXT_KEEPALIVE)
2272                let flags = PacketFlags {
2273                    header_type: constants::HEADER_1,
2274                    context_flag: constants::FLAG_UNSET,
2275                    transport_type: constants::TRANSPORT_BROADCAST,
2276                    destination_type: constants::DESTINATION_LINK,
2277                    packet_type: constants::PACKET_TYPE_DATA,
2278                };
2279                if let Ok(pkt) =
2280                    RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_KEEPALIVE, &[])
2281                {
2282                    all_actions.push(LinkManagerAction::SendPacket {
2283                        raw: pkt.raw,
2284                        dest_type: constants::DESTINATION_LINK,
2285                        attached_interface: None,
2286                    });
2287                    link.engine.record_outbound(now, true);
2288                }
2289            }
2290
2291            if let Some(channel) = link.channel.as_mut() {
2292                let chan_actions = channel.tick(now);
2293                let _ = channel;
2294                let _ = link;
2295                all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2296            }
2297        }
2298
2299        // Tick resource senders and receivers
2300        for link_id in &link_ids {
2301            let link = match self.links.get_mut(link_id) {
2302                Some(l) => l,
2303                None => continue,
2304            };
2305
2306            // Tick outgoing resources (senders)
2307            let mut sender_actions = Vec::new();
2308            for sender in &mut link.outgoing_resources {
2309                sender_actions.extend(sender.tick(now));
2310            }
2311
2312            // Tick incoming resources (receivers)
2313            let mut receiver_actions = Vec::new();
2314            for receiver in &mut link.incoming_resources {
2315                let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2316                    link.engine.decrypt(ciphertext).map_err(|_| ())
2317                };
2318                receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2319            }
2320
2321            // Clean up completed/failed resources
2322            link.outgoing_resources
2323                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2324            link.incoming_resources
2325                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2326
2327            let _ = link;
2328            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2329            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2330        }
2331
2332        // Clean up closed links
2333        let closed: Vec<LinkId> = self
2334            .links
2335            .iter()
2336            .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2337            .map(|(id, _)| *id)
2338            .collect();
2339        for id in closed {
2340            self.links.remove(&id);
2341            all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2342        }
2343
2344        all_actions
2345    }
2346
2347    /// Check if a destination hash is a known link_id managed by this manager.
2348    pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2349        self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2350    }
2351
2352    /// Get the state of a link.
2353    pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2354        self.links.get(link_id).map(|l| l.engine.state())
2355    }
2356
2357    /// Get the RTT of a link.
2358    pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2359        self.links.get(link_id).and_then(|l| l.engine.rtt())
2360    }
2361
2362    /// Update the RTT of a link (e.g., after path redirect to a direct connection).
2363    pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2364        if let Some(link) = self.links.get_mut(link_id) {
2365            link.engine.set_rtt(rtt);
2366        }
2367    }
2368
2369    /// Reset the inbound timer for a link (e.g., after path redirect).
2370    pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2371        if let Some(link) = self.links.get_mut(link_id) {
2372            link.engine.record_inbound(time::now());
2373        }
2374    }
2375
2376    /// Update the MTU of a link (e.g., after path redirect to a different interface).
2377    pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2378        if let Some(link) = self.links.get_mut(link_id) {
2379            link.engine.set_mtu(mtu);
2380        }
2381    }
2382
2383    /// Get the number of active links.
2384    pub fn link_count(&self) -> usize {
2385        self.links.len()
2386    }
2387
2388    /// Get the number of active resource transfers across all links.
2389    pub fn resource_transfer_count(&self) -> usize {
2390        self.links
2391            .values()
2392            .map(|managed| managed.incoming_resources.len() + managed.outgoing_resources.len())
2393            .sum()
2394    }
2395
2396    /// Cancel all active resource transfers and return the generated actions.
2397    pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2398        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2399        let mut all_actions = Vec::new();
2400
2401        for link_id in &link_ids {
2402            let link = match self.links.get_mut(link_id) {
2403                Some(l) => l,
2404                None => continue,
2405            };
2406
2407            let mut sender_actions = Vec::new();
2408            for sender in &mut link.outgoing_resources {
2409                sender_actions.extend(sender.cancel());
2410            }
2411
2412            let mut receiver_actions = Vec::new();
2413            for receiver in &mut link.incoming_resources {
2414                receiver_actions.extend(receiver.reject());
2415            }
2416
2417            link.outgoing_resources
2418                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2419            link.incoming_resources
2420                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2421
2422            let _ = link;
2423            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2424            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2425        }
2426
2427        all_actions
2428    }
2429
2430    /// Get information about all active links.
2431    pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
2432        self.links
2433            .iter()
2434            .map(|(link_id, managed)| {
2435                let state = match managed.engine.state() {
2436                    LinkState::Pending => "pending",
2437                    LinkState::Handshake => "handshake",
2438                    LinkState::Active => "active",
2439                    LinkState::Stale => "stale",
2440                    LinkState::Closed => "closed",
2441                };
2442                crate::event::LinkInfoEntry {
2443                    link_id: *link_id,
2444                    state: state.to_string(),
2445                    is_initiator: managed.engine.is_initiator(),
2446                    dest_hash: managed.dest_hash,
2447                    remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
2448                    rtt: managed.engine.rtt(),
2449                    channel_window: managed.channel.as_ref().map(|c| c.window()),
2450                    channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
2451                    pending_channel_packets: managed.pending_channel_packets.len(),
2452                    channel_send_ok: managed.channel_send_ok,
2453                    channel_send_not_ready: managed.channel_send_not_ready,
2454                    channel_send_too_big: managed.channel_send_too_big,
2455                    channel_send_other_error: managed.channel_send_other_error,
2456                    channel_messages_received: managed.channel_messages_received,
2457                    channel_proofs_sent: managed.channel_proofs_sent,
2458                    channel_proofs_received: managed.channel_proofs_received,
2459                }
2460            })
2461            .collect()
2462    }
2463
2464    /// Get information about all active resource transfers.
2465    pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
2466        let mut entries = Vec::new();
2467        for (link_id, managed) in &self.links {
2468            for recv in &managed.incoming_resources {
2469                let (received, total) = recv.progress();
2470                entries.push(crate::event::ResourceInfoEntry {
2471                    link_id: *link_id,
2472                    direction: "incoming".to_string(),
2473                    total_parts: total,
2474                    transferred_parts: received,
2475                    complete: received >= total && total > 0,
2476                });
2477            }
2478            for send in &managed.outgoing_resources {
2479                let total = send.total_parts();
2480                let sent = send.sent_parts;
2481                entries.push(crate::event::ResourceInfoEntry {
2482                    link_id: *link_id,
2483                    direction: "outgoing".to_string(),
2484                    total_parts: total,
2485                    transferred_parts: sent,
2486                    complete: sent >= total && total > 0,
2487                });
2488            }
2489        }
2490        entries
2491    }
2492
2493    /// Convert LinkActions to LinkManagerActions.
2494    fn process_link_actions(
2495        &self,
2496        link_id: &LinkId,
2497        actions: &[LinkAction],
2498    ) -> Vec<LinkManagerAction> {
2499        let mut result = Vec::new();
2500        for action in actions {
2501            match action {
2502                LinkAction::StateChanged {
2503                    new_state, reason, ..
2504                } => match new_state {
2505                    LinkState::Closed => {
2506                        result.push(LinkManagerAction::LinkClosed {
2507                            link_id: *link_id,
2508                            reason: *reason,
2509                        });
2510                    }
2511                    _ => {}
2512                },
2513                LinkAction::LinkEstablished {
2514                    rtt, is_initiator, ..
2515                } => {
2516                    let dest_hash = self
2517                        .links
2518                        .get(link_id)
2519                        .map(|l| l.dest_hash)
2520                        .unwrap_or([0u8; 16]);
2521                    result.push(LinkManagerAction::LinkEstablished {
2522                        link_id: *link_id,
2523                        dest_hash,
2524                        rtt: *rtt,
2525                        is_initiator: *is_initiator,
2526                    });
2527                }
2528                LinkAction::RemoteIdentified {
2529                    identity_hash,
2530                    public_key,
2531                    ..
2532                } => {
2533                    result.push(LinkManagerAction::RemoteIdentified {
2534                        link_id: *link_id,
2535                        identity_hash: *identity_hash,
2536                        public_key: *public_key,
2537                    });
2538                }
2539                LinkAction::DataReceived { .. } => {
2540                    // Data delivery is handled at a higher level
2541                }
2542            }
2543        }
2544        result
2545    }
2546
2547    /// Convert ChannelActions to LinkManagerActions.
2548    fn process_channel_actions(
2549        &mut self,
2550        link_id: &LinkId,
2551        actions: Vec<rns_core::channel::ChannelAction>,
2552        rng: &mut dyn Rng,
2553    ) -> Vec<LinkManagerAction> {
2554        let mut result = Vec::new();
2555        for action in actions {
2556            match action {
2557                rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
2558                    // Encrypt and send as CHANNEL context
2559                    let encrypted = match self.links.get(link_id) {
2560                        Some(link) => match link.engine.encrypt(&raw, rng) {
2561                            Ok(encrypted) => encrypted,
2562                            Err(_) => continue,
2563                        },
2564                        None => continue,
2565                    };
2566                    let flags = PacketFlags {
2567                        header_type: constants::HEADER_1,
2568                        context_flag: constants::FLAG_UNSET,
2569                        transport_type: constants::TRANSPORT_BROADCAST,
2570                        destination_type: constants::DESTINATION_LINK,
2571                        packet_type: constants::PACKET_TYPE_DATA,
2572                    };
2573                    if let Ok(pkt) = RawPacket::pack(
2574                        flags,
2575                        0,
2576                        link_id,
2577                        None,
2578                        constants::CONTEXT_CHANNEL,
2579                        &encrypted,
2580                    ) {
2581                        if let Some(link_mut) = self.links.get_mut(link_id) {
2582                            link_mut
2583                                .pending_channel_packets
2584                                .insert(pkt.packet_hash, sequence);
2585                        }
2586                        result.push(LinkManagerAction::SendPacket {
2587                            raw: pkt.raw,
2588                            dest_type: constants::DESTINATION_LINK,
2589                            attached_interface: None,
2590                        });
2591                    }
2592                }
2593                rns_core::channel::ChannelAction::MessageReceived {
2594                    msgtype, payload, ..
2595                } => {
2596                    result.push(LinkManagerAction::ChannelMessageReceived {
2597                        link_id: *link_id,
2598                        msgtype,
2599                        payload,
2600                    });
2601                }
2602                rns_core::channel::ChannelAction::TeardownLink => {
2603                    result.push(LinkManagerAction::LinkClosed {
2604                        link_id: *link_id,
2605                        reason: Some(TeardownReason::Timeout),
2606                    });
2607                }
2608            }
2609        }
2610        result
2611    }
2612}
2613
2614/// Compute a path hash from a path string.
2615/// Uses truncated SHA-256 (first 16 bytes).
2616fn compute_path_hash(path: &str) -> [u8; 16] {
2617    let full = rns_core::hash::full_hash(path.as_bytes());
2618    let mut result = [0u8; 16];
2619    result.copy_from_slice(&full[..16]);
2620    result
2621}
2622
2623#[cfg(test)]
2624mod tests {
2625    use super::*;
2626    use rns_crypto::identity::Identity;
2627    use rns_crypto::{FixedRng, OsRng};
2628
2629    fn make_rng(seed: u8) -> FixedRng {
2630        FixedRng::new(&[seed; 128])
2631    }
2632
2633    fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
2634        let sig_prv = Ed25519PrivateKey::generate(rng);
2635        let sig_pub_bytes = sig_prv.public_key().public_bytes();
2636        (sig_prv, sig_pub_bytes)
2637    }
2638
2639    #[test]
2640    fn test_register_link_destination() {
2641        let mut mgr = LinkManager::new();
2642        let mut rng = make_rng(0x01);
2643        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2644        let dest_hash = [0xDD; 16];
2645
2646        mgr.register_link_destination(
2647            dest_hash,
2648            sig_prv,
2649            sig_pub_bytes,
2650            ResourceStrategy::AcceptNone,
2651        );
2652        assert!(mgr.is_link_destination(&dest_hash));
2653
2654        mgr.deregister_link_destination(&dest_hash);
2655        assert!(!mgr.is_link_destination(&dest_hash));
2656    }
2657
2658    #[test]
2659    fn test_create_link() {
2660        let mut mgr = LinkManager::new();
2661        let mut rng = OsRng;
2662        let dest_hash = [0xDD; 16];
2663
2664        let sig_pub_bytes = [0xAA; 32]; // dummy sig pub for test
2665        let (link_id, actions) = mgr.create_link(
2666            &dest_hash,
2667            &sig_pub_bytes,
2668            1,
2669            constants::MTU as u32,
2670            &mut rng,
2671        );
2672        assert_ne!(link_id, [0u8; 16]);
2673        // Should have RegisterLinkDest + SendPacket
2674        assert_eq!(actions.len(), 2);
2675        assert!(matches!(
2676            actions[0],
2677            LinkManagerAction::RegisterLinkDest { .. }
2678        ));
2679        assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
2680
2681        // Link should be in Pending state
2682        assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
2683    }
2684
2685    #[test]
2686    fn test_full_handshake_via_manager() {
2687        let mut rng = OsRng;
2688        let dest_hash = [0xDD; 16];
2689
2690        // Setup responder
2691        let mut responder_mgr = LinkManager::new();
2692        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2693        responder_mgr.register_link_destination(
2694            dest_hash,
2695            sig_prv,
2696            sig_pub_bytes,
2697            ResourceStrategy::AcceptNone,
2698        );
2699
2700        // Setup initiator
2701        let mut initiator_mgr = LinkManager::new();
2702
2703        // Step 1: Initiator creates link (needs dest signing pub key for LRPROOF verification)
2704        let (link_id, init_actions) = initiator_mgr.create_link(
2705            &dest_hash,
2706            &sig_pub_bytes,
2707            1,
2708            constants::MTU as u32,
2709            &mut rng,
2710        );
2711        assert_eq!(init_actions.len(), 2);
2712
2713        // Extract the LINKREQUEST packet raw bytes
2714        let linkrequest_raw = match &init_actions[1] {
2715            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2716            _ => panic!("Expected SendPacket"),
2717        };
2718
2719        // Parse to get packet_hash and dest_hash
2720        let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
2721
2722        // Step 2: Responder handles LINKREQUEST
2723        let resp_actions = responder_mgr.handle_local_delivery(
2724            lr_packet.destination_hash,
2725            &linkrequest_raw,
2726            lr_packet.packet_hash,
2727            rns_core::transport::types::InterfaceId(0),
2728            &mut rng,
2729        );
2730        // Should have RegisterLinkDest + SendPacket(LRPROOF)
2731        assert!(resp_actions.len() >= 2);
2732        assert!(matches!(
2733            resp_actions[0],
2734            LinkManagerAction::RegisterLinkDest { .. }
2735        ));
2736
2737        // Extract LRPROOF packet
2738        let lrproof_raw = match &resp_actions[1] {
2739            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2740            _ => panic!("Expected SendPacket for LRPROOF"),
2741        };
2742
2743        // Step 3: Initiator handles LRPROOF
2744        let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
2745        let init_actions2 = initiator_mgr.handle_local_delivery(
2746            lrproof_packet.destination_hash,
2747            &lrproof_raw,
2748            lrproof_packet.packet_hash,
2749            rns_core::transport::types::InterfaceId(0),
2750            &mut rng,
2751        );
2752
2753        // Should have LinkEstablished + SendPacket(LRRTT)
2754        let has_established = init_actions2
2755            .iter()
2756            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2757        assert!(has_established, "Initiator should emit LinkEstablished");
2758
2759        // Extract LRRTT
2760        let lrrtt_raw = init_actions2
2761            .iter()
2762            .find_map(|a| match a {
2763                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
2764                _ => None,
2765            })
2766            .expect("Should have LRRTT SendPacket");
2767
2768        // Step 4: Responder handles LRRTT
2769        let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
2770        let resp_link_id = lrrtt_packet.destination_hash;
2771        let resp_actions2 = responder_mgr.handle_local_delivery(
2772            resp_link_id,
2773            &lrrtt_raw,
2774            lrrtt_packet.packet_hash,
2775            rns_core::transport::types::InterfaceId(0),
2776            &mut rng,
2777        );
2778
2779        let has_established = resp_actions2
2780            .iter()
2781            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2782        assert!(has_established, "Responder should emit LinkEstablished");
2783
2784        // Both sides should be Active
2785        assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
2786        assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
2787
2788        // Both should have RTT
2789        assert!(initiator_mgr.link_rtt(&link_id).is_some());
2790        assert!(responder_mgr.link_rtt(&link_id).is_some());
2791    }
2792
2793    #[test]
2794    fn test_encrypted_data_exchange() {
2795        let mut rng = OsRng;
2796        let dest_hash = [0xDD; 16];
2797        let mut resp_mgr = LinkManager::new();
2798        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2799        resp_mgr.register_link_destination(
2800            dest_hash,
2801            sig_prv,
2802            sig_pub_bytes,
2803            ResourceStrategy::AcceptNone,
2804        );
2805        let mut init_mgr = LinkManager::new();
2806
2807        // Handshake
2808        let (link_id, init_actions) = init_mgr.create_link(
2809            &dest_hash,
2810            &sig_pub_bytes,
2811            1,
2812            constants::MTU as u32,
2813            &mut rng,
2814        );
2815        let lr_raw = extract_send_packet(&init_actions);
2816        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2817        let resp_actions = resp_mgr.handle_local_delivery(
2818            lr_pkt.destination_hash,
2819            &lr_raw,
2820            lr_pkt.packet_hash,
2821            rns_core::transport::types::InterfaceId(0),
2822            &mut rng,
2823        );
2824        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2825        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2826        let init_actions2 = init_mgr.handle_local_delivery(
2827            lrproof_pkt.destination_hash,
2828            &lrproof_raw,
2829            lrproof_pkt.packet_hash,
2830            rns_core::transport::types::InterfaceId(0),
2831            &mut rng,
2832        );
2833        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2834        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2835        resp_mgr.handle_local_delivery(
2836            lrrtt_pkt.destination_hash,
2837            &lrrtt_raw,
2838            lrrtt_pkt.packet_hash,
2839            rns_core::transport::types::InterfaceId(0),
2840            &mut rng,
2841        );
2842
2843        // Send data from initiator to responder
2844        let actions =
2845            init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
2846        assert_eq!(actions.len(), 1);
2847        assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
2848    }
2849
2850    #[test]
2851    fn test_request_response() {
2852        let mut rng = OsRng;
2853        let dest_hash = [0xDD; 16];
2854        let mut resp_mgr = LinkManager::new();
2855        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2856        resp_mgr.register_link_destination(
2857            dest_hash,
2858            sig_prv,
2859            sig_pub_bytes,
2860            ResourceStrategy::AcceptNone,
2861        );
2862
2863        // Register a request handler
2864        resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
2865            Some(b"OK".to_vec())
2866        });
2867
2868        let mut init_mgr = LinkManager::new();
2869
2870        // Complete handshake
2871        let (link_id, init_actions) = init_mgr.create_link(
2872            &dest_hash,
2873            &sig_pub_bytes,
2874            1,
2875            constants::MTU as u32,
2876            &mut rng,
2877        );
2878        let lr_raw = extract_send_packet(&init_actions);
2879        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2880        let resp_actions = resp_mgr.handle_local_delivery(
2881            lr_pkt.destination_hash,
2882            &lr_raw,
2883            lr_pkt.packet_hash,
2884            rns_core::transport::types::InterfaceId(0),
2885            &mut rng,
2886        );
2887        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2888        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2889        let init_actions2 = init_mgr.handle_local_delivery(
2890            lrproof_pkt.destination_hash,
2891            &lrproof_raw,
2892            lrproof_pkt.packet_hash,
2893            rns_core::transport::types::InterfaceId(0),
2894            &mut rng,
2895        );
2896        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2897        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2898        resp_mgr.handle_local_delivery(
2899            lrrtt_pkt.destination_hash,
2900            &lrrtt_raw,
2901            lrrtt_pkt.packet_hash,
2902            rns_core::transport::types::InterfaceId(0),
2903            &mut rng,
2904        );
2905
2906        // Send request from initiator
2907        let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
2908        assert_eq!(req_actions.len(), 1);
2909
2910        // Deliver request to responder
2911        let req_raw = extract_send_packet_from(&req_actions);
2912        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2913        let resp_actions = resp_mgr.handle_local_delivery(
2914            req_pkt.destination_hash,
2915            &req_raw,
2916            req_pkt.packet_hash,
2917            rns_core::transport::types::InterfaceId(0),
2918            &mut rng,
2919        );
2920
2921        // Should have a response SendPacket
2922        let has_response = resp_actions
2923            .iter()
2924            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2925        assert!(has_response, "Handler should produce a response packet");
2926    }
2927
2928    #[test]
2929    fn test_request_acl_deny_unidentified() {
2930        let mut rng = OsRng;
2931        let dest_hash = [0xDD; 16];
2932        let mut resp_mgr = LinkManager::new();
2933        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2934        resp_mgr.register_link_destination(
2935            dest_hash,
2936            sig_prv,
2937            sig_pub_bytes,
2938            ResourceStrategy::AcceptNone,
2939        );
2940
2941        // Register handler with ACL (only allow specific identity)
2942        resp_mgr.register_request_handler(
2943            "/restricted",
2944            Some(vec![[0xAA; 16]]),
2945            |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
2946        );
2947
2948        let mut init_mgr = LinkManager::new();
2949
2950        // Complete handshake (without identification)
2951        let (link_id, init_actions) = init_mgr.create_link(
2952            &dest_hash,
2953            &sig_pub_bytes,
2954            1,
2955            constants::MTU as u32,
2956            &mut rng,
2957        );
2958        let lr_raw = extract_send_packet(&init_actions);
2959        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2960        let resp_actions = resp_mgr.handle_local_delivery(
2961            lr_pkt.destination_hash,
2962            &lr_raw,
2963            lr_pkt.packet_hash,
2964            rns_core::transport::types::InterfaceId(0),
2965            &mut rng,
2966        );
2967        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2968        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2969        let init_actions2 = init_mgr.handle_local_delivery(
2970            lrproof_pkt.destination_hash,
2971            &lrproof_raw,
2972            lrproof_pkt.packet_hash,
2973            rns_core::transport::types::InterfaceId(0),
2974            &mut rng,
2975        );
2976        let lrrtt_raw = extract_any_send_packet(&init_actions2);
2977        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2978        resp_mgr.handle_local_delivery(
2979            lrrtt_pkt.destination_hash,
2980            &lrrtt_raw,
2981            lrrtt_pkt.packet_hash,
2982            rns_core::transport::types::InterfaceId(0),
2983            &mut rng,
2984        );
2985
2986        // Send request without identifying first
2987        let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
2988        let req_raw = extract_send_packet_from(&req_actions);
2989        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2990        let resp_actions = resp_mgr.handle_local_delivery(
2991            req_pkt.destination_hash,
2992            &req_raw,
2993            req_pkt.packet_hash,
2994            rns_core::transport::types::InterfaceId(0),
2995            &mut rng,
2996        );
2997
2998        // Should be denied — no response packet
2999        let has_response = resp_actions
3000            .iter()
3001            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3002        assert!(!has_response, "Unidentified peer should be denied");
3003    }
3004
3005    #[test]
3006    fn test_teardown_link() {
3007        let mut rng = OsRng;
3008        let dest_hash = [0xDD; 16];
3009        let mut mgr = LinkManager::new();
3010
3011        let dummy_sig = [0xAA; 32];
3012        let (link_id, _) =
3013            mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3014        assert_eq!(mgr.link_count(), 1);
3015
3016        let actions = mgr.teardown_link(&link_id);
3017        let has_close = actions
3018            .iter()
3019            .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3020        assert!(has_close);
3021
3022        // After tick, closed links should be cleaned up
3023        let tick_actions = mgr.tick(&mut rng);
3024        let has_deregister = tick_actions
3025            .iter()
3026            .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3027        assert!(has_deregister);
3028        assert_eq!(mgr.link_count(), 0);
3029    }
3030
3031    #[test]
3032    fn test_identify_on_link() {
3033        let mut rng = OsRng;
3034        let dest_hash = [0xDD; 16];
3035        let mut resp_mgr = LinkManager::new();
3036        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3037        resp_mgr.register_link_destination(
3038            dest_hash,
3039            sig_prv,
3040            sig_pub_bytes,
3041            ResourceStrategy::AcceptNone,
3042        );
3043        let mut init_mgr = LinkManager::new();
3044
3045        // Complete handshake
3046        let (link_id, init_actions) = init_mgr.create_link(
3047            &dest_hash,
3048            &sig_pub_bytes,
3049            1,
3050            constants::MTU as u32,
3051            &mut rng,
3052        );
3053        let lr_raw = extract_send_packet(&init_actions);
3054        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3055        let resp_actions = resp_mgr.handle_local_delivery(
3056            lr_pkt.destination_hash,
3057            &lr_raw,
3058            lr_pkt.packet_hash,
3059            rns_core::transport::types::InterfaceId(0),
3060            &mut rng,
3061        );
3062        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3063        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3064        let init_actions2 = init_mgr.handle_local_delivery(
3065            lrproof_pkt.destination_hash,
3066            &lrproof_raw,
3067            lrproof_pkt.packet_hash,
3068            rns_core::transport::types::InterfaceId(0),
3069            &mut rng,
3070        );
3071        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3072        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3073        resp_mgr.handle_local_delivery(
3074            lrrtt_pkt.destination_hash,
3075            &lrrtt_raw,
3076            lrrtt_pkt.packet_hash,
3077            rns_core::transport::types::InterfaceId(0),
3078            &mut rng,
3079        );
3080
3081        // Identify initiator to responder
3082        let identity = Identity::new(&mut rng);
3083        let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3084        assert_eq!(id_actions.len(), 1);
3085
3086        // Deliver identify to responder
3087        let id_raw = extract_send_packet_from(&id_actions);
3088        let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3089        let resp_actions = resp_mgr.handle_local_delivery(
3090            id_pkt.destination_hash,
3091            &id_raw,
3092            id_pkt.packet_hash,
3093            rns_core::transport::types::InterfaceId(0),
3094            &mut rng,
3095        );
3096
3097        let has_identified = resp_actions
3098            .iter()
3099            .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3100        assert!(has_identified, "Responder should emit RemoteIdentified");
3101    }
3102
3103    #[test]
3104    fn test_path_hash_computation() {
3105        let h1 = compute_path_hash("/status");
3106        let h2 = compute_path_hash("/path");
3107        assert_ne!(h1, h2);
3108
3109        // Deterministic
3110        assert_eq!(h1, compute_path_hash("/status"));
3111    }
3112
3113    #[test]
3114    fn test_link_count() {
3115        let mut mgr = LinkManager::new();
3116        let mut rng = OsRng;
3117
3118        assert_eq!(mgr.link_count(), 0);
3119
3120        let dummy_sig = [0xAA; 32];
3121        mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3122        assert_eq!(mgr.link_count(), 1);
3123
3124        mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3125        assert_eq!(mgr.link_count(), 2);
3126    }
3127
3128    // --- Test helpers ---
3129
3130    fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3131        extract_send_packet_at(actions, actions.len() - 1)
3132    }
3133
3134    fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3135        match &actions[idx] {
3136            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3137            other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3138        }
3139    }
3140
3141    fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3142        actions
3143            .iter()
3144            .find_map(|a| match a {
3145                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3146                _ => None,
3147            })
3148            .expect("Expected at least one SendPacket action")
3149    }
3150
3151    fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3152        extract_any_send_packet(actions)
3153    }
3154
3155    /// Set up two linked managers with an active link.
3156    /// Returns (initiator_mgr, responder_mgr, link_id).
3157    fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3158        let mut rng = OsRng;
3159        let dest_hash = [0xDD; 16];
3160        let mut resp_mgr = LinkManager::new();
3161        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3162        resp_mgr.register_link_destination(
3163            dest_hash,
3164            sig_prv,
3165            sig_pub_bytes,
3166            ResourceStrategy::AcceptNone,
3167        );
3168        let mut init_mgr = LinkManager::new();
3169
3170        let (link_id, init_actions) = init_mgr.create_link(
3171            &dest_hash,
3172            &sig_pub_bytes,
3173            1,
3174            constants::MTU as u32,
3175            &mut rng,
3176        );
3177        let lr_raw = extract_send_packet(&init_actions);
3178        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3179        let resp_actions = resp_mgr.handle_local_delivery(
3180            lr_pkt.destination_hash,
3181            &lr_raw,
3182            lr_pkt.packet_hash,
3183            rns_core::transport::types::InterfaceId(0),
3184            &mut rng,
3185        );
3186        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3187        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3188        let init_actions2 = init_mgr.handle_local_delivery(
3189            lrproof_pkt.destination_hash,
3190            &lrproof_raw,
3191            lrproof_pkt.packet_hash,
3192            rns_core::transport::types::InterfaceId(0),
3193            &mut rng,
3194        );
3195        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3196        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3197        resp_mgr.handle_local_delivery(
3198            lrrtt_pkt.destination_hash,
3199            &lrrtt_raw,
3200            lrrtt_pkt.packet_hash,
3201            rns_core::transport::types::InterfaceId(0),
3202            &mut rng,
3203        );
3204
3205        assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3206        assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3207
3208        (init_mgr, resp_mgr, link_id)
3209    }
3210
3211    // ====================================================================
3212    // Phase 8a: Resource wiring tests
3213    // ====================================================================
3214
3215    #[test]
3216    fn test_resource_strategy_default() {
3217        let mut mgr = LinkManager::new();
3218        let mut rng = OsRng;
3219        let dummy_sig = [0xAA; 32];
3220        let (link_id, _) =
3221            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3222
3223        // Default strategy is AcceptNone
3224        let link = mgr.links.get(&link_id).unwrap();
3225        assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3226    }
3227
3228    #[test]
3229    fn test_set_resource_strategy() {
3230        let mut mgr = LinkManager::new();
3231        let mut rng = OsRng;
3232        let dummy_sig = [0xAA; 32];
3233        let (link_id, _) =
3234            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3235
3236        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3237        assert_eq!(
3238            mgr.links.get(&link_id).unwrap().resource_strategy,
3239            ResourceStrategy::AcceptAll
3240        );
3241
3242        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3243        assert_eq!(
3244            mgr.links.get(&link_id).unwrap().resource_strategy,
3245            ResourceStrategy::AcceptApp
3246        );
3247    }
3248
3249    #[test]
3250    fn test_send_resource_on_active_link() {
3251        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3252        let mut rng = OsRng;
3253
3254        // Send resource data
3255        let data = vec![0xAB; 100]; // small enough for a single part
3256        let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3257
3258        // Should produce at least a SendPacket (advertisement)
3259        let has_send = actions
3260            .iter()
3261            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3262        assert!(
3263            has_send,
3264            "send_resource should emit advertisement SendPacket"
3265        );
3266    }
3267
3268    #[test]
3269    fn test_send_resource_on_inactive_link() {
3270        let mut mgr = LinkManager::new();
3271        let mut rng = OsRng;
3272        let dummy_sig = [0xAA; 32];
3273        let (link_id, _) =
3274            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3275
3276        // Link is Pending, not Active
3277        let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
3278        assert!(actions.is_empty(), "Cannot send resource on inactive link");
3279    }
3280
3281    #[test]
3282    fn test_resource_adv_rejected_by_accept_none() {
3283        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3284        let mut rng = OsRng;
3285
3286        // Responder uses default AcceptNone strategy
3287        // Send resource from initiator
3288        let data = vec![0xCD; 100];
3289        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3290
3291        // Deliver advertisement to responder
3292        for action in &adv_actions {
3293            if let LinkManagerAction::SendPacket { raw, .. } = action {
3294                let pkt = RawPacket::unpack(raw).unwrap();
3295                let resp_actions = resp_mgr.handle_local_delivery(
3296                    pkt.destination_hash,
3297                    raw,
3298                    pkt.packet_hash,
3299                    rns_core::transport::types::InterfaceId(0),
3300                    &mut rng,
3301                );
3302                // AcceptNone: should not produce ResourceReceived, may produce SendPacket (RCL)
3303                let has_resource_received = resp_actions
3304                    .iter()
3305                    .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3306                assert!(
3307                    !has_resource_received,
3308                    "AcceptNone should not accept resource"
3309                );
3310            }
3311        }
3312    }
3313
3314    #[test]
3315    fn test_resource_adv_accepted_by_accept_all() {
3316        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3317        let mut rng = OsRng;
3318
3319        // Set responder to AcceptAll
3320        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3321
3322        // Send resource from initiator
3323        let data = vec![0xCD; 100];
3324        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3325
3326        // Deliver advertisement to responder
3327        for action in &adv_actions {
3328            if let LinkManagerAction::SendPacket { raw, .. } = action {
3329                let pkt = RawPacket::unpack(raw).unwrap();
3330                let resp_actions = resp_mgr.handle_local_delivery(
3331                    pkt.destination_hash,
3332                    raw,
3333                    pkt.packet_hash,
3334                    rns_core::transport::types::InterfaceId(0),
3335                    &mut rng,
3336                );
3337                // AcceptAll: should accept and produce a SendPacket (request for parts)
3338                let has_send = resp_actions
3339                    .iter()
3340                    .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3341                assert!(has_send, "AcceptAll should accept and request parts");
3342            }
3343        }
3344    }
3345
3346    #[test]
3347    fn test_resource_accept_app_query() {
3348        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3349        let mut rng = OsRng;
3350
3351        // Set responder to AcceptApp
3352        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3353
3354        // Send resource from initiator
3355        let data = vec![0xCD; 100];
3356        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3357
3358        // Deliver advertisement to responder
3359        let mut got_query = false;
3360        for action in &adv_actions {
3361            if let LinkManagerAction::SendPacket { raw, .. } = action {
3362                let pkt = RawPacket::unpack(raw).unwrap();
3363                let resp_actions = resp_mgr.handle_local_delivery(
3364                    pkt.destination_hash,
3365                    raw,
3366                    pkt.packet_hash,
3367                    rns_core::transport::types::InterfaceId(0),
3368                    &mut rng,
3369                );
3370                for a in &resp_actions {
3371                    if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
3372                        got_query = true;
3373                    }
3374                }
3375            }
3376        }
3377        assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
3378    }
3379
3380    #[test]
3381    fn test_resource_accept_app_accept() {
3382        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3383        let mut rng = OsRng;
3384
3385        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3386
3387        let data = vec![0xCD; 100];
3388        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3389
3390        for action in &adv_actions {
3391            if let LinkManagerAction::SendPacket { raw, .. } = action {
3392                let pkt = RawPacket::unpack(raw).unwrap();
3393                let resp_actions = resp_mgr.handle_local_delivery(
3394                    pkt.destination_hash,
3395                    raw,
3396                    pkt.packet_hash,
3397                    rns_core::transport::types::InterfaceId(0),
3398                    &mut rng,
3399                );
3400                for a in &resp_actions {
3401                    if let LinkManagerAction::ResourceAcceptQuery {
3402                        link_id: lid,
3403                        resource_hash,
3404                        ..
3405                    } = a
3406                    {
3407                        // Accept the resource
3408                        let accept_actions =
3409                            resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
3410                        // Should produce a SendPacket (request for parts)
3411                        let has_send = accept_actions
3412                            .iter()
3413                            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3414                        assert!(
3415                            has_send,
3416                            "Accepting resource should produce request for parts"
3417                        );
3418                    }
3419                }
3420            }
3421        }
3422    }
3423
3424    #[test]
3425    fn test_resource_accept_app_reject() {
3426        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3427        let mut rng = OsRng;
3428
3429        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3430
3431        let data = vec![0xCD; 100];
3432        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3433
3434        for action in &adv_actions {
3435            if let LinkManagerAction::SendPacket { raw, .. } = action {
3436                let pkt = RawPacket::unpack(raw).unwrap();
3437                let resp_actions = resp_mgr.handle_local_delivery(
3438                    pkt.destination_hash,
3439                    raw,
3440                    pkt.packet_hash,
3441                    rns_core::transport::types::InterfaceId(0),
3442                    &mut rng,
3443                );
3444                for a in &resp_actions {
3445                    if let LinkManagerAction::ResourceAcceptQuery {
3446                        link_id: lid,
3447                        resource_hash,
3448                        ..
3449                    } = a
3450                    {
3451                        // Reject the resource
3452                        let reject_actions =
3453                            resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
3454                        // Rejecting should send a cancel and not request parts
3455                        // No ResourceReceived should appear
3456                        let has_resource_received = reject_actions
3457                            .iter()
3458                            .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3459                        assert!(!has_resource_received);
3460                    }
3461                }
3462            }
3463        }
3464    }
3465
3466    #[test]
3467    fn test_resource_full_transfer() {
3468        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3469        let mut rng = OsRng;
3470
3471        // Set responder to AcceptAll
3472        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3473
3474        // Small data (fits in single SDU)
3475        let original_data = b"Hello, Resource Transfer!".to_vec();
3476        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3477
3478        // Drive the full transfer protocol between the two managers.
3479        // Tag each SendPacket with its source ('i' = initiator, 'r' = responder).
3480        let mut pending: Vec<(char, LinkManagerAction)> =
3481            adv_actions.into_iter().map(|a| ('i', a)).collect();
3482        let mut rounds = 0;
3483        let max_rounds = 50;
3484        let mut resource_received = false;
3485        let mut sender_completed = false;
3486
3487        while !pending.is_empty() && rounds < max_rounds {
3488            rounds += 1;
3489            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3490
3491            for (source, action) in pending.drain(..) {
3492                if let LinkManagerAction::SendPacket { raw, .. } = action {
3493                    let pkt = RawPacket::unpack(&raw).unwrap();
3494
3495                    // Deliver only to the OTHER side
3496                    let target_actions = if source == 'i' {
3497                        resp_mgr.handle_local_delivery(
3498                            pkt.destination_hash,
3499                            &raw,
3500                            pkt.packet_hash,
3501                            rns_core::transport::types::InterfaceId(0),
3502                            &mut rng,
3503                        )
3504                    } else {
3505                        init_mgr.handle_local_delivery(
3506                            pkt.destination_hash,
3507                            &raw,
3508                            pkt.packet_hash,
3509                            rns_core::transport::types::InterfaceId(0),
3510                            &mut rng,
3511                        )
3512                    };
3513
3514                    let target_source = if source == 'i' { 'r' } else { 'i' };
3515                    for a in &target_actions {
3516                        match a {
3517                            LinkManagerAction::ResourceReceived { data, .. } => {
3518                                assert_eq!(*data, original_data);
3519                                resource_received = true;
3520                            }
3521                            LinkManagerAction::ResourceCompleted { .. } => {
3522                                sender_completed = true;
3523                            }
3524                            _ => {}
3525                        }
3526                    }
3527                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3528                }
3529            }
3530            pending = next;
3531        }
3532
3533        assert!(
3534            resource_received,
3535            "Responder should receive resource data (rounds={})",
3536            rounds
3537        );
3538        assert!(
3539            sender_completed,
3540            "Sender should get completion proof (rounds={})",
3541            rounds
3542        );
3543    }
3544
3545    #[test]
3546    fn test_resource_cancel_icl() {
3547        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3548        let mut rng = OsRng;
3549
3550        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3551
3552        // Use large data so transfer is multi-part
3553        let data = vec![0xAB; 2000];
3554        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3555
3556        // Deliver advertisement — responder accepts and sends request
3557        for action in &adv_actions {
3558            if let LinkManagerAction::SendPacket { raw, .. } = action {
3559                let pkt = RawPacket::unpack(raw).unwrap();
3560                resp_mgr.handle_local_delivery(
3561                    pkt.destination_hash,
3562                    raw,
3563                    pkt.packet_hash,
3564                    rns_core::transport::types::InterfaceId(0),
3565                    &mut rng,
3566                );
3567            }
3568        }
3569
3570        // Verify there are incoming resources on the responder
3571        assert!(!resp_mgr
3572            .links
3573            .get(&link_id)
3574            .unwrap()
3575            .incoming_resources
3576            .is_empty());
3577
3578        // Simulate ICL (cancel from initiator side) by calling handle_resource_icl
3579        let icl_actions = resp_mgr.handle_resource_icl(&link_id);
3580
3581        // Should have resource failed
3582        let has_failed = icl_actions
3583            .iter()
3584            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3585        assert!(has_failed, "ICL should produce ResourceFailed");
3586    }
3587
3588    #[test]
3589    fn test_resource_cancel_rcl() {
3590        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3591        let mut rng = OsRng;
3592
3593        // Create a resource sender
3594        let data = vec![0xAB; 2000];
3595        init_mgr.send_resource(&link_id, &data, None, &mut rng);
3596
3597        // Verify there are outgoing resources
3598        assert!(!init_mgr
3599            .links
3600            .get(&link_id)
3601            .unwrap()
3602            .outgoing_resources
3603            .is_empty());
3604
3605        // Simulate RCL (cancel from receiver side)
3606        let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
3607
3608        let has_failed = rcl_actions
3609            .iter()
3610            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3611        assert!(has_failed, "RCL should produce ResourceFailed");
3612    }
3613
3614    #[test]
3615    fn test_cancel_all_resources_clears_active_transfers() {
3616        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3617        let mut rng = OsRng;
3618
3619        let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
3620        assert!(!actions.is_empty());
3621        assert_eq!(init_mgr.resource_transfer_count(), 1);
3622
3623        let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
3624
3625        assert_eq!(init_mgr.resource_transfer_count(), 0);
3626        assert!(cancel_actions
3627            .iter()
3628            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
3629    }
3630
3631    #[test]
3632    fn test_resource_tick_cleans_up() {
3633        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3634        let mut rng = OsRng;
3635
3636        let data = vec![0xAB; 100];
3637        init_mgr.send_resource(&link_id, &data, None, &mut rng);
3638
3639        assert!(!init_mgr
3640            .links
3641            .get(&link_id)
3642            .unwrap()
3643            .outgoing_resources
3644            .is_empty());
3645
3646        // Cancel the sender to make it Complete
3647        init_mgr.handle_resource_rcl(&link_id);
3648
3649        // Tick should clean up completed resources
3650        init_mgr.tick(&mut rng);
3651
3652        assert!(
3653            init_mgr
3654                .links
3655                .get(&link_id)
3656                .unwrap()
3657                .outgoing_resources
3658                .is_empty(),
3659            "Tick should clean up completed/failed outgoing resources"
3660        );
3661    }
3662
3663    #[test]
3664    fn test_build_link_packet() {
3665        let (init_mgr, _resp_mgr, link_id) = setup_active_link();
3666
3667        let actions =
3668            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
3669        assert_eq!(actions.len(), 1);
3670        if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
3671            let pkt = RawPacket::unpack(raw).unwrap();
3672            assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
3673            assert_eq!(*dest_type, constants::DESTINATION_LINK);
3674        } else {
3675            panic!("Expected SendPacket");
3676        }
3677    }
3678
3679    // ====================================================================
3680    // Phase 8b: Channel message & data callback tests
3681    // ====================================================================
3682
3683    #[test]
3684    fn test_channel_message_delivery() {
3685        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3686        let mut rng = OsRng;
3687
3688        // Send channel message from initiator
3689        let chan_actions = init_mgr
3690            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
3691            .expect("active link channel send should succeed");
3692        assert!(!chan_actions.is_empty());
3693
3694        // Deliver to responder
3695        let mut got_channel_msg = false;
3696        for action in &chan_actions {
3697            if let LinkManagerAction::SendPacket { raw, .. } = action {
3698                let pkt = RawPacket::unpack(raw).unwrap();
3699                let resp_actions = resp_mgr.handle_local_delivery(
3700                    pkt.destination_hash,
3701                    raw,
3702                    pkt.packet_hash,
3703                    rns_core::transport::types::InterfaceId(0),
3704                    &mut rng,
3705                );
3706                for a in &resp_actions {
3707                    if let LinkManagerAction::ChannelMessageReceived {
3708                        msgtype, payload, ..
3709                    } = a
3710                    {
3711                        assert_eq!(*msgtype, 42);
3712                        assert_eq!(*payload, b"channel data");
3713                        got_channel_msg = true;
3714                    }
3715                }
3716            }
3717        }
3718        assert!(got_channel_msg, "Responder should receive channel message");
3719    }
3720
3721    #[test]
3722    fn test_channel_proof_reopens_send_window() {
3723        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3724        let mut rng = OsRng;
3725
3726        init_mgr
3727            .send_channel_message(&link_id, 42, b"first", &mut rng)
3728            .expect("first send should succeed");
3729        init_mgr
3730            .send_channel_message(&link_id, 42, b"second", &mut rng)
3731            .expect("second send should succeed");
3732
3733        let err = init_mgr
3734            .send_channel_message(&link_id, 42, b"third", &mut rng)
3735            .expect_err("third send should hit the initial channel window");
3736        assert_eq!(err, "Channel is not ready to send");
3737
3738        let queued_packets = init_mgr
3739            .links
3740            .get(&link_id)
3741            .unwrap()
3742            .pending_channel_packets
3743            .clone();
3744        assert_eq!(queued_packets.len(), 2);
3745        for tracked_hash in queued_packets.keys().take(1) {
3746            let mut proof_data = Vec::with_capacity(96);
3747            proof_data.extend_from_slice(tracked_hash);
3748            proof_data.extend_from_slice(&[0x11; 64]);
3749            let flags = PacketFlags {
3750                header_type: constants::HEADER_1,
3751                context_flag: constants::FLAG_UNSET,
3752                transport_type: constants::TRANSPORT_BROADCAST,
3753                destination_type: constants::DESTINATION_LINK,
3754                packet_type: constants::PACKET_TYPE_PROOF,
3755            };
3756            let proof = RawPacket::pack(
3757                flags,
3758                0,
3759                &link_id,
3760                None,
3761                constants::CONTEXT_NONE,
3762                &proof_data,
3763            )
3764            .expect("proof packet should pack");
3765            let ack_actions = init_mgr.handle_local_delivery(
3766                link_id,
3767                &proof.raw,
3768                proof.packet_hash,
3769                rns_core::transport::types::InterfaceId(0),
3770                &mut rng,
3771            );
3772            assert!(
3773                ack_actions.is_empty(),
3774                "proof delivery should only update channel state"
3775            );
3776        }
3777
3778        init_mgr
3779            .send_channel_message(&link_id, 42, b"third", &mut rng)
3780            .expect("proof should free one channel slot");
3781    }
3782
3783    #[test]
3784    fn test_generic_link_data_delivery() {
3785        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3786        let mut rng = OsRng;
3787
3788        // Send generic data with a custom context
3789        let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
3790        assert_eq!(actions.len(), 1);
3791
3792        // Deliver to responder
3793        let raw = extract_any_send_packet(&actions);
3794        let pkt = RawPacket::unpack(&raw).unwrap();
3795        let resp_actions = resp_mgr.handle_local_delivery(
3796            pkt.destination_hash,
3797            &raw,
3798            pkt.packet_hash,
3799            rns_core::transport::types::InterfaceId(0),
3800            &mut rng,
3801        );
3802
3803        let has_data = resp_actions
3804            .iter()
3805            .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
3806        assert!(
3807            has_data,
3808            "Responder should receive LinkDataReceived for unknown context"
3809        );
3810    }
3811
3812    #[test]
3813    fn test_response_delivery() {
3814        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3815        let mut rng = OsRng;
3816
3817        // Register handler on responder
3818        resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
3819            Some(data.to_vec())
3820        });
3821
3822        // Send request from initiator
3823        let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); // msgpack nil
3824        assert!(!req_actions.is_empty());
3825
3826        // Deliver request to responder — should produce response
3827        let req_raw = extract_any_send_packet(&req_actions);
3828        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3829        let resp_actions = resp_mgr.handle_local_delivery(
3830            req_pkt.destination_hash,
3831            &req_raw,
3832            req_pkt.packet_hash,
3833            rns_core::transport::types::InterfaceId(0),
3834            &mut rng,
3835        );
3836        let has_resp_send = resp_actions
3837            .iter()
3838            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3839        assert!(has_resp_send, "Handler should produce response");
3840
3841        // Deliver response back to initiator
3842        let resp_raw = extract_any_send_packet(&resp_actions);
3843        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3844        let init_actions = init_mgr.handle_local_delivery(
3845            resp_pkt.destination_hash,
3846            &resp_raw,
3847            resp_pkt.packet_hash,
3848            rns_core::transport::types::InterfaceId(0),
3849            &mut rng,
3850        );
3851
3852        let has_response_received = init_actions
3853            .iter()
3854            .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
3855        assert!(
3856            has_response_received,
3857            "Initiator should receive ResponseReceived"
3858        );
3859    }
3860
3861    #[test]
3862    fn test_large_response_uses_resource_fallback() {
3863        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3864        let mut rng = OsRng;
3865
3866        // Register handler on responder with payload that cannot fit a direct
3867        // CONTEXT_RESPONSE packet.
3868        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
3869        resp_mgr.register_request_handler("/large", None, {
3870            let large_payload = large_payload.clone();
3871            move |_link_id, _path, _data, _remote| Some(large_payload.clone())
3872        });
3873
3874        // Send request from initiator.
3875        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
3876        assert!(!req_actions.is_empty());
3877
3878        // Deliver request to responder and inspect responder outbound packets.
3879        let req_raw = extract_any_send_packet(&req_actions);
3880        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3881        let resp_actions = resp_mgr.handle_local_delivery(
3882            req_pkt.destination_hash,
3883            &req_raw,
3884            req_pkt.packet_hash,
3885            rns_core::transport::types::InterfaceId(0),
3886            &mut rng,
3887        );
3888
3889        let mut has_resource_adv = false;
3890        let mut has_direct_response = false;
3891        for action in &resp_actions {
3892            if let LinkManagerAction::SendPacket { raw, .. } = action {
3893                let pkt = RawPacket::unpack(raw).unwrap();
3894                if pkt.context == constants::CONTEXT_RESOURCE_ADV {
3895                    has_resource_adv = true;
3896                }
3897                if pkt.context == constants::CONTEXT_RESPONSE {
3898                    has_direct_response = true;
3899                }
3900            }
3901        }
3902
3903        assert!(
3904            has_resource_adv,
3905            "Large response should advertise a response resource"
3906        );
3907        assert!(
3908            !has_direct_response,
3909            "Large response should not use direct CONTEXT_RESPONSE packet"
3910        );
3911    }
3912
3913    #[test]
3914    fn test_send_channel_message_on_no_channel() {
3915        let mut mgr = LinkManager::new();
3916        let mut rng = OsRng;
3917        let dummy_sig = [0xAA; 32];
3918        let (link_id, _) =
3919            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3920
3921        // Link is Pending (no channel), should return empty
3922        let err = mgr
3923            .send_channel_message(&link_id, 1, b"test", &mut rng)
3924            .expect_err("pending link should reject channel send");
3925        assert_eq!(err, "link has no active channel");
3926    }
3927
3928    #[test]
3929    fn test_send_on_link_requires_active() {
3930        let mut mgr = LinkManager::new();
3931        let mut rng = OsRng;
3932        let dummy_sig = [0xAA; 32];
3933        let (link_id, _) =
3934            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3935
3936        let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
3937        assert!(actions.is_empty(), "Cannot send on pending link");
3938    }
3939
3940    #[test]
3941    fn test_send_on_link_unknown_link() {
3942        let mgr = LinkManager::new();
3943        let mut rng = OsRng;
3944
3945        let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
3946        assert!(actions.is_empty());
3947    }
3948
3949    #[test]
3950    fn test_resource_full_transfer_large() {
3951        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3952        let mut rng = OsRng;
3953
3954        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3955
3956        // Multi-part data (larger than a single SDU of 464 bytes)
3957        let original_data: Vec<u8> = (0..2000u32)
3958            .map(|i| {
3959                let pos = i as usize;
3960                (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
3961            })
3962            .collect();
3963
3964        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3965
3966        let mut pending: Vec<(char, LinkManagerAction)> =
3967            adv_actions.into_iter().map(|a| ('i', a)).collect();
3968        let mut rounds = 0;
3969        let max_rounds = 200;
3970        let mut resource_received = false;
3971        let mut sender_completed = false;
3972
3973        while !pending.is_empty() && rounds < max_rounds {
3974            rounds += 1;
3975            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3976
3977            for (source, action) in pending.drain(..) {
3978                if let LinkManagerAction::SendPacket { raw, .. } = action {
3979                    let pkt = match RawPacket::unpack(&raw) {
3980                        Ok(p) => p,
3981                        Err(_) => continue,
3982                    };
3983
3984                    let target_actions = if source == 'i' {
3985                        resp_mgr.handle_local_delivery(
3986                            pkt.destination_hash,
3987                            &raw,
3988                            pkt.packet_hash,
3989                            rns_core::transport::types::InterfaceId(0),
3990                            &mut rng,
3991                        )
3992                    } else {
3993                        init_mgr.handle_local_delivery(
3994                            pkt.destination_hash,
3995                            &raw,
3996                            pkt.packet_hash,
3997                            rns_core::transport::types::InterfaceId(0),
3998                            &mut rng,
3999                        )
4000                    };
4001
4002                    let target_source = if source == 'i' { 'r' } else { 'i' };
4003                    for a in &target_actions {
4004                        match a {
4005                            LinkManagerAction::ResourceReceived { data, .. } => {
4006                                assert_eq!(*data, original_data);
4007                                resource_received = true;
4008                            }
4009                            LinkManagerAction::ResourceCompleted { .. } => {
4010                                sender_completed = true;
4011                            }
4012                            _ => {}
4013                        }
4014                    }
4015                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4016                }
4017            }
4018            pending = next;
4019        }
4020
4021        assert!(
4022            resource_received,
4023            "Should receive large resource (rounds={})",
4024            rounds
4025        );
4026        assert!(
4027            sender_completed,
4028            "Sender should complete (rounds={})",
4029            rounds
4030        );
4031    }
4032
4033    #[test]
4034    fn test_resource_receiver_stores_original_advertisement_plaintext() {
4035        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4036        let mut rng = OsRng;
4037
4038        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4039
4040        let data = vec![0x41; 256];
4041        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4042
4043        let adv_raw = adv_actions
4044            .iter()
4045            .find_map(|action| match action {
4046                LinkManagerAction::SendPacket { raw, .. } => {
4047                    let pkt = RawPacket::unpack(raw).ok()?;
4048                    (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
4049                }
4050                _ => None,
4051            })
4052            .expect("sender should emit a resource advertisement");
4053
4054        let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4055        let adv_plaintext = resp_mgr
4056            .links
4057            .get(&link_id)
4058            .unwrap()
4059            .engine
4060            .decrypt(&adv_pkt.data)
4061            .unwrap();
4062
4063        let _resp_actions = resp_mgr.handle_local_delivery(
4064            adv_pkt.destination_hash,
4065            &adv_raw,
4066            adv_pkt.packet_hash,
4067            rns_core::transport::types::InterfaceId(0),
4068            &mut rng,
4069        );
4070
4071        let receiver = resp_mgr
4072            .links
4073            .get(&link_id)
4074            .and_then(|managed| managed.incoming_resources.first())
4075            .expect("advertisement should create an incoming receiver");
4076        assert_eq!(receiver.advertisement_packet, adv_plaintext);
4077        assert_eq!(
4078            receiver.max_decompressed_size,
4079            constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
4080        );
4081    }
4082
4083    #[test]
4084    fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
4085        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4086        let mut rng = OsRng;
4087
4088        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4089
4090        let data = vec![b'A'; 4096];
4091        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4092
4093        let mut request_actions = Vec::new();
4094        for action in &adv_actions {
4095            let LinkManagerAction::SendPacket { raw, .. } = action else {
4096                continue;
4097            };
4098            let pkt = RawPacket::unpack(raw).unwrap();
4099            let actions = resp_mgr.handle_local_delivery(
4100                pkt.destination_hash,
4101                raw,
4102                pkt.packet_hash,
4103                rns_core::transport::types::InterfaceId(0),
4104                &mut rng,
4105            );
4106            request_actions.extend(actions);
4107        }
4108
4109        {
4110            let receiver = resp_mgr
4111                .links
4112                .get_mut(&link_id)
4113                .and_then(|managed| managed.incoming_resources.first_mut())
4114                .expect("receiver should exist after advertisement");
4115            assert!(receiver.flags.compressed, "test data should be compressed");
4116            receiver.max_decompressed_size = 64;
4117        }
4118
4119        let mut responder_actions = Vec::new();
4120        for action in request_actions {
4121            let LinkManagerAction::SendPacket { raw, .. } = action else {
4122                continue;
4123            };
4124            let pkt = RawPacket::unpack(&raw).unwrap();
4125            let actions = init_mgr.handle_local_delivery(
4126                pkt.destination_hash,
4127                &raw,
4128                pkt.packet_hash,
4129                rns_core::transport::types::InterfaceId(0),
4130                &mut rng,
4131            );
4132
4133            for action in actions {
4134                let LinkManagerAction::SendPacket { raw, .. } = &action else {
4135                    continue;
4136                };
4137                let pkt = RawPacket::unpack(raw).unwrap();
4138                let delivered = resp_mgr.handle_local_delivery(
4139                    pkt.destination_hash,
4140                    raw,
4141                    pkt.packet_hash,
4142                    rns_core::transport::types::InterfaceId(0),
4143                    &mut rng,
4144                );
4145                responder_actions.extend(delivered);
4146            }
4147        }
4148
4149        assert!(
4150            responder_actions.iter().any(|action| matches!(
4151                action,
4152                LinkManagerAction::ResourceFailed { error, .. }
4153                    if error == "Resource too large"
4154            )),
4155            "corrupt oversized resource should fail with TooLarge"
4156        );
4157        assert!(
4158            responder_actions.iter().any(|action| matches!(
4159                action,
4160                LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
4161            )),
4162            "corrupt oversized resource should tear down the link"
4163        );
4164        assert!(
4165            responder_actions.iter().any(|action| match action {
4166                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
4167                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
4168                    .unwrap_or(false),
4169                _ => false,
4170            }),
4171            "corrupt oversized resource should send a receiver cancel/reject packet"
4172        );
4173        assert_eq!(
4174            resp_mgr
4175                .links
4176                .get(&link_id)
4177                .map(|managed| managed.engine.state()),
4178            Some(LinkState::Closed)
4179        );
4180    }
4181
4182    #[test]
4183    fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
4184        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4185        let mut rng = OsRng;
4186
4187        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4188
4189        // Large and incompressible enough to require multiple hashmap segments
4190        // even with the live Bzip2Compressor in the LinkManager path.
4191        let mut state = 0x1234_5678u32;
4192        let data: Vec<u8> = (0..50000)
4193            .map(|_| {
4194                state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4195                (state >> 16) as u8
4196            })
4197            .collect();
4198        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4199        let mut pending: Vec<(char, LinkManagerAction)> =
4200            adv_actions.into_iter().map(|a| ('i', a)).collect();
4201
4202        let mut rounds = 0;
4203
4204        // Drive the real link-manager exchange until the receiver is genuinely
4205        // waiting for an HMU after exhausting the advertised hashmap segment.
4206        while rounds < 300 {
4207            rounds += 1;
4208            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4209
4210            for (source, action) in pending.drain(..) {
4211                let LinkManagerAction::SendPacket { raw, .. } = action else {
4212                    continue;
4213                };
4214
4215                let pkt = match RawPacket::unpack(&raw) {
4216                    Ok(p) => p,
4217                    Err(_) => continue,
4218                };
4219
4220                let target_actions = if source == 'i' {
4221                    resp_mgr.handle_local_delivery(
4222                        pkt.destination_hash,
4223                        &raw,
4224                        pkt.packet_hash,
4225                        rns_core::transport::types::InterfaceId(0),
4226                        &mut rng,
4227                    )
4228                } else {
4229                    init_mgr.handle_local_delivery(
4230                        pkt.destination_hash,
4231                        &raw,
4232                        pkt.packet_hash,
4233                        rns_core::transport::types::InterfaceId(0),
4234                        &mut rng,
4235                    )
4236                };
4237
4238                let target_source = if source == 'i' { 'r' } else { 'i' };
4239                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4240            }
4241
4242            if resp_mgr
4243                .links
4244                .get(&link_id)
4245                .and_then(|managed| managed.incoming_resources.first())
4246                .is_some_and(|receiver| receiver.waiting_for_hmu)
4247            {
4248                break;
4249            }
4250
4251            pending = next;
4252        }
4253
4254        assert!(
4255            resp_mgr
4256                .links
4257                .get(&link_id)
4258                .and_then(|managed| managed.incoming_resources.first())
4259                .is_some_and(|receiver| receiver.waiting_for_hmu),
4260            "expected receiver to reach a live HMU wait state"
4261        );
4262
4263        // Prime the live receiver once so it computes the same EIFR it will use
4264        // for timeout decisions in this HMU-wait state.
4265        let prime_actions = {
4266            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
4267            let receiver = managed.incoming_resources.first_mut().unwrap();
4268            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
4269                managed.engine.decrypt(ciphertext).map_err(|_| ())
4270            };
4271            receiver.tick(
4272                receiver.last_activity + 0.0001,
4273                &decrypt_fn,
4274                &Bzip2Compressor,
4275            )
4276        };
4277        assert!(
4278            !prime_actions
4279                .iter()
4280                .any(|a| matches!(a, ResourceAction::SendRequest(_))),
4281            "fresh HMU wait state should not immediately emit a retry request"
4282        );
4283
4284        let (late_delta, retries_before) = {
4285            let managed = resp_mgr
4286                .links
4287                .get_mut(&link_id)
4288                .expect("receiver link should still exist");
4289            let receiver = managed
4290                .incoming_resources
4291                .first_mut()
4292                .expect("receiver should have an active incoming resource");
4293
4294            assert!(
4295                receiver.waiting_for_hmu,
4296                "receiver should be waiting for HMU"
4297            );
4298
4299            let eifr = receiver.eifr.unwrap_or_else(|| {
4300                (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
4301            });
4302            let expected_tof = if receiver.outstanding_parts > 0 {
4303                (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
4304            } else {
4305                (3.0 * constants::RESOURCE_SDU as f64) / eifr
4306            };
4307            let expected_hmu_wait =
4308                (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
4309            let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
4310                + constants::RESOURCE_RETRY_GRACE_TIME;
4311            (
4312                old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
4313                receiver.retries_left,
4314            )
4315        };
4316        {
4317            let managed = resp_mgr.links.get(&link_id).unwrap();
4318            let receiver = managed.incoming_resources.first().unwrap();
4319            assert_eq!(receiver.retries_left, retries_before);
4320            assert!(
4321                receiver.eifr.is_some(),
4322                "receiver tick should have populated EIFR"
4323            );
4324        }
4325
4326        let late_resource_actions = {
4327            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
4328            let receiver = managed.incoming_resources.first_mut().unwrap();
4329            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
4330                managed.engine.decrypt(ciphertext).map_err(|_| ())
4331            };
4332            receiver.tick(
4333                receiver.last_activity + late_delta,
4334                &decrypt_fn,
4335                &Bzip2Compressor,
4336            )
4337        };
4338        let late_actions =
4339            resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
4340        let retry_raw = late_actions
4341            .iter()
4342            .find_map(|a| match a {
4343                LinkManagerAction::SendPacket { raw, .. } => {
4344                    let pkt = RawPacket::unpack(raw).ok()?;
4345                    (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
4346                }
4347                _ => None,
4348            })
4349            .expect("receiver should emit a resource retry request after extended timeout");
4350
4351        {
4352            let managed = resp_mgr.links.get(&link_id).unwrap();
4353            let receiver = managed.incoming_resources.first().unwrap();
4354            assert_eq!(receiver.retries_left, retries_before - 1);
4355        }
4356
4357        let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
4358        let retry_plaintext = resp_mgr
4359            .links
4360            .get(&link_id)
4361            .unwrap()
4362            .engine
4363            .decrypt(&retry_pkt.data)
4364            .expect("retry request should decrypt");
4365        assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
4366
4367        // Deliver the retry request to the sender and verify it turns into a
4368        // real HMU packet in the live LinkManager flow.
4369        let retry_to_sender = init_mgr.handle_local_delivery(
4370            retry_pkt.destination_hash,
4371            &retry_raw,
4372            retry_pkt.packet_hash,
4373            rns_core::transport::types::InterfaceId(0),
4374            &mut rng,
4375        );
4376        assert!(
4377            retry_to_sender.iter().any(|a| match a {
4378                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
4379                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
4380                    .unwrap_or(false),
4381                _ => false,
4382            }),
4383            "sender should answer the exhausted retry request with a live HMU packet"
4384        );
4385    }
4386
4387    #[test]
4388    fn test_process_resource_actions_mapping() {
4389        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4390        let mut rng = OsRng;
4391
4392        // Test that various ResourceActions map to correct LinkManagerActions
4393        let actions = vec![
4394            ResourceAction::DataReceived {
4395                data: vec![1, 2, 3],
4396                metadata: Some(vec![4, 5]),
4397            },
4398            ResourceAction::Completed,
4399            ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
4400            ResourceAction::ProgressUpdate {
4401                received: 10,
4402                total: 20,
4403            },
4404            ResourceAction::TeardownLink,
4405        ];
4406
4407        let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
4408
4409        assert!(matches!(
4410            result[0],
4411            LinkManagerAction::ResourceReceived { .. }
4412        ));
4413        assert!(matches!(
4414            result[1],
4415            LinkManagerAction::ResourceCompleted { .. }
4416        ));
4417        assert!(matches!(
4418            result[2],
4419            LinkManagerAction::ResourceFailed { .. }
4420        ));
4421        assert!(matches!(
4422            result[3],
4423            LinkManagerAction::ResourceProgress {
4424                received: 10,
4425                total: 20,
4426                ..
4427            }
4428        ));
4429        assert!(result
4430            .iter()
4431            .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
4432    }
4433
4434    #[test]
4435    fn test_link_state_empty() {
4436        let mgr = LinkManager::new();
4437        let fake_id = [0xAA; 16];
4438        assert!(mgr.link_state(&fake_id).is_none());
4439    }
4440
4441    #[test]
4442    fn test_large_response_resource_completes_as_response() {
4443        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4444        let mut rng = OsRng;
4445
4446        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
4447        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
4448        resp_mgr.register_request_handler("/large", None, {
4449            let response_value = response_value.clone();
4450            move |_link_id, _path, _data, _remote| Some(response_value.clone())
4451        });
4452
4453        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
4454        let req_raw = extract_any_send_packet(&req_actions);
4455        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4456        let request_id = req_pkt.get_truncated_hash();
4457        let resp_actions = resp_mgr.handle_local_delivery(
4458            req_pkt.destination_hash,
4459            &req_raw,
4460            req_pkt.packet_hash,
4461            rns_core::transport::types::InterfaceId(0),
4462            &mut rng,
4463        );
4464
4465        let mut pending: Vec<(char, LinkManagerAction)> =
4466            resp_actions.into_iter().map(|a| ('r', a)).collect();
4467        let mut rounds = 0;
4468        let mut received_response = None;
4469
4470        while !pending.is_empty() && rounds < 200 {
4471            rounds += 1;
4472            let mut next = Vec::new();
4473
4474            for (source, action) in pending.drain(..) {
4475                let LinkManagerAction::SendPacket { raw, .. } = action else {
4476                    continue;
4477                };
4478                let pkt = RawPacket::unpack(&raw).unwrap();
4479                let target_actions = if source == 'r' {
4480                    init_mgr.handle_local_delivery(
4481                        pkt.destination_hash,
4482                        &raw,
4483                        pkt.packet_hash,
4484                        rns_core::transport::types::InterfaceId(0),
4485                        &mut rng,
4486                    )
4487                } else {
4488                    resp_mgr.handle_local_delivery(
4489                        pkt.destination_hash,
4490                        &raw,
4491                        pkt.packet_hash,
4492                        rns_core::transport::types::InterfaceId(0),
4493                        &mut rng,
4494                    )
4495                };
4496
4497                let target_source = if source == 'r' { 'i' } else { 'r' };
4498                for target_action in &target_actions {
4499                    match target_action {
4500                        LinkManagerAction::ResponseReceived {
4501                            request_id: rid,
4502                            data,
4503                            ..
4504                        } => {
4505                            received_response = Some((*rid, data.clone()));
4506                        }
4507                        LinkManagerAction::ResourceReceived { .. } => {
4508                            panic!("response resources must complete as ResponseReceived")
4509                        }
4510                        LinkManagerAction::ResourceAcceptQuery { .. } => {
4511                            panic!("response resources must bypass application acceptance")
4512                        }
4513                        _ => {}
4514                    }
4515                }
4516                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4517            }
4518
4519            pending = next;
4520        }
4521
4522        let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
4523            panic!(
4524                "large response resource did not complete as ResponseReceived after {} rounds",
4525                rounds
4526            )
4527        });
4528        assert_eq!(received_request_id, request_id);
4529        assert_eq!(received_data, response_value);
4530    }
4531
4532    #[test]
4533    fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
4534        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4535        let mut rng = OsRng;
4536
4537        init_mgr.set_link_mtu(&link_id, 300);
4538        resp_mgr.set_link_mtu(&link_id, 300);
4539
4540        let payload = vec![0xAB; 350];
4541        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4542        resp_mgr.register_request_handler("/mtu", None, {
4543            let response_value = response_value.clone();
4544            move |_link_id, _path, _data, _remote| Some(response_value.clone())
4545        });
4546
4547        let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
4548        let req_raw = extract_any_send_packet(&req_actions);
4549        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4550        let resp_actions = resp_mgr.handle_local_delivery(
4551            req_pkt.destination_hash,
4552            &req_raw,
4553            req_pkt.packet_hash,
4554            rns_core::transport::types::InterfaceId(0),
4555            &mut rng,
4556        );
4557
4558        let mut has_resource_adv = false;
4559        let mut direct_response_len = None;
4560        for action in &resp_actions {
4561            if let LinkManagerAction::SendPacket { raw, .. } = action {
4562                let pkt = RawPacket::unpack(raw).unwrap();
4563                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4564                if pkt.context == constants::CONTEXT_RESPONSE {
4565                    direct_response_len = Some(raw.len());
4566                }
4567            }
4568        }
4569
4570        assert!(
4571            has_resource_adv,
4572            "responses larger than the negotiated link MTU should use resource fallback"
4573        );
4574        assert!(
4575            direct_response_len.is_none(),
4576            "sent direct response of {} bytes on a 300 byte negotiated MTU",
4577            direct_response_len.unwrap_or_default()
4578        );
4579    }
4580
4581    #[test]
4582    fn test_large_management_response_uses_resource_fallback() {
4583        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4584        let mut rng = OsRng;
4585
4586        let payload = vec![0xBC; 5000];
4587        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4588        let actions =
4589            resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
4590
4591        let mut has_resource_adv = false;
4592        let mut has_direct_response = false;
4593        for action in &actions {
4594            if let LinkManagerAction::SendPacket { raw, .. } = action {
4595                let pkt = RawPacket::unpack(raw).unwrap();
4596                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4597                has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
4598            }
4599        }
4600
4601        assert!(
4602            has_resource_adv,
4603            "large management responses should advertise a response resource"
4604        );
4605        assert!(
4606            !has_direct_response,
4607            "large management responses should not use a direct CONTEXT_RESPONSE packet"
4608        );
4609    }
4610}