Skip to main content

rns_net/common/
link_manager.rs

1//! Link manager: wires rns-core LinkEngine + Channel + Resource into the driver.
2//!
3//! Manages multiple concurrent links, link destination registration,
4//! request/response handling, resource transfers, and full lifecycle
5//! (handshake → active → teardown).
6//!
7//! Python reference: Link.py, RequestReceipt.py, Resource.py
8
9use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23/// Resource acceptance strategy.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26    /// Reject all incoming resources.
27    AcceptNone,
28    /// Accept all incoming resources automatically.
29    AcceptAll,
30    /// Query the application callback for each resource.
31    AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35    fn default() -> Self {
36        ResourceStrategy::AcceptNone
37    }
38}
39
40/// A managed link wrapping LinkEngine + optional Channel + resources.
41struct ManagedLink {
42    engine: LinkEngine,
43    channel: Option<Channel>,
44    pending_channel_packets: HashMap<[u8; 32], Sequence>,
45    channel_send_ok: u64,
46    channel_send_not_ready: u64,
47    channel_send_too_big: u64,
48    channel_send_other_error: u64,
49    channel_messages_received: u64,
50    channel_proofs_sent: u64,
51    channel_proofs_received: u64,
52    /// Destination hash this link belongs to.
53    dest_hash: [u8; 16],
54    /// Remote identity (hash, public_key) once identified.
55    remote_identity: Option<([u8; 16], [u8; 64])>,
56    /// Destination's Ed25519 signing public key (for initiator to verify LRPROOF).
57    dest_sig_pub_bytes: Option<[u8; 32]>,
58    /// Active incoming resource transfers.
59    incoming_resources: Vec<ResourceReceiver>,
60    /// Active outgoing resource transfers.
61    outgoing_resources: Vec<ResourceSender>,
62    /// Logical incoming split transfers, keyed by original resource hash.
63    incoming_splits: HashMap<[u8; 32], IncomingSplitTransfer>,
64    /// Logical outgoing split transfers, keyed by original resource hash.
65    outgoing_splits: HashMap<[u8; 32], OutgoingSplitTransfer>,
66    /// Resource acceptance strategy.
67    resource_strategy: ResourceStrategy,
68    /// Interface this link's packets should be sent on when known.
69    route_interface: Option<rns_core::transport::types::InterfaceId>,
70    /// Next-hop transport ID seen on inbound HEADER_2 link traffic.
71    ///
72    /// When present, outbound link packets can be rewritten to HEADER_2 using
73    /// this transport ID to preserve multi-hop routing.
74    route_transport_id: Option<[u8; 16]>,
75}
76
77struct IncomingSplitTransfer {
78    total_segments: u64,
79    completed_segments: u64,
80    current_segment_index: u64,
81    current_received_parts: usize,
82    current_total_parts: usize,
83    data: Vec<u8>,
84    metadata: Option<Vec<u8>>,
85    is_response: bool,
86}
87
88struct OutgoingSplitTransfer {
89    total_segments: u64,
90    completed_segments: u64,
91    current_segment_index: u64,
92    current_sent_parts: usize,
93    current_total_parts: usize,
94}
95
96/// A registered link destination that can accept incoming LINKREQUEST.
97struct LinkDestination {
98    sig_prv: Ed25519PrivateKey,
99    sig_pub_bytes: [u8; 32],
100    resource_strategy: ResourceStrategy,
101}
102
103/// Response produced by an application request handler.
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum RequestResponse {
106    /// Send the response as the normal request response value.
107    Bytes(Vec<u8>),
108    /// Send the response as a resource response with optional metadata.
109    Resource {
110        data: Vec<u8>,
111        metadata: Option<Vec<u8>>,
112        auto_compress: bool,
113    },
114}
115
116impl From<Vec<u8>> for RequestResponse {
117    fn from(data: Vec<u8>) -> Self {
118        RequestResponse::Bytes(data)
119    }
120}
121
122/// A registered request handler for a path.
123struct RequestHandlerEntry {
124    /// The path this handler serves (e.g. "/status").
125    path: String,
126    /// The truncated hash of the path (first 16 bytes of SHA-256).
127    path_hash: [u8; 16],
128    /// Access control: None means allow all, Some(list) means allow only listed identities.
129    allowed_list: Option<Vec<[u8; 16]>>,
130    /// Handler function: (link_id, path, request_id, data, remote_identity) -> Option<response>.
131    handler: Box<
132        dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
133            + Send,
134    >,
135}
136
137/// Actions produced by LinkManager for the driver to dispatch.
138#[derive(Debug)]
139pub enum LinkManagerAction {
140    /// Send a packet via the transport engine outbound path.
141    SendPacket {
142        raw: Vec<u8>,
143        dest_type: u8,
144        attached_interface: Option<rns_core::transport::types::InterfaceId>,
145    },
146    /// Link established — notify callbacks.
147    LinkEstablished {
148        link_id: LinkId,
149        dest_hash: [u8; 16],
150        rtt: f64,
151        is_initiator: bool,
152    },
153    /// Link closed — notify callbacks.
154    LinkClosed {
155        link_id: LinkId,
156        reason: Option<TeardownReason>,
157    },
158    /// Remote peer identified — notify callbacks.
159    RemoteIdentified {
160        link_id: LinkId,
161        identity_hash: [u8; 16],
162        public_key: [u8; 64],
163    },
164    /// Register a link_id as local destination in transport (for receiving link data).
165    RegisterLinkDest { link_id: LinkId },
166    /// Deregister a link_id from transport local destinations.
167    DeregisterLinkDest { link_id: LinkId },
168    /// A management request that needs to be handled by the driver.
169    /// The driver has access to engine state needed to build the response.
170    ManagementRequest {
171        link_id: LinkId,
172        path_hash: [u8; 16],
173        /// The request data (msgpack-encoded Value from the request array).
174        data: Vec<u8>,
175        /// The request_id (truncated hash of the packed request).
176        request_id: [u8; 16],
177        remote_identity: Option<([u8; 16], [u8; 64])>,
178    },
179    /// Resource data fully received and assembled.
180    ResourceReceived {
181        link_id: LinkId,
182        data: Vec<u8>,
183        metadata: Option<Vec<u8>>,
184    },
185    /// Resource transfer completed (proof validated on sender side).
186    ResourceCompleted { link_id: LinkId },
187    /// Resource transfer failed.
188    ResourceFailed { link_id: LinkId, error: String },
189    /// Resource transfer progress update.
190    ResourceProgress {
191        link_id: LinkId,
192        received: usize,
193        total: usize,
194    },
195    /// Query application whether to accept an incoming resource (for AcceptApp strategy).
196    ResourceAcceptQuery {
197        link_id: LinkId,
198        resource_hash: Vec<u8>,
199        transfer_size: u64,
200        has_metadata: bool,
201    },
202    /// Channel message received on a link.
203    ChannelMessageReceived {
204        link_id: LinkId,
205        msgtype: u16,
206        payload: Vec<u8>,
207    },
208    /// Generic link data received (CONTEXT_NONE).
209    LinkDataReceived {
210        link_id: LinkId,
211        context: u8,
212        data: Vec<u8>,
213    },
214    /// Response received on a link.
215    ResponseReceived {
216        link_id: LinkId,
217        request_id: [u8; 16],
218        data: Vec<u8>,
219        metadata: Option<Vec<u8>>,
220    },
221    /// A link request was received (for hook notification).
222    LinkRequestReceived {
223        link_id: LinkId,
224        receiving_interface: rns_core::transport::types::InterfaceId,
225    },
226}
227
228/// Manages multiple links, link destinations, and request/response.
229pub struct LinkManager {
230    links: HashMap<LinkId, ManagedLink>,
231    link_destinations: HashMap<[u8; 16], LinkDestination>,
232    request_handlers: Vec<RequestHandlerEntry>,
233    /// Path hashes that should be handled externally (by the driver) rather than
234    /// by registered handler closures. Used for management destinations.
235    management_paths: Vec<[u8; 16]>,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct LinkRouteHint {
240    pub interface: rns_core::transport::types::InterfaceId,
241    pub transport_id: Option<[u8; 16]>,
242}
243
244impl LinkManager {
245    fn resource_sdu_for_link(link: &ManagedLink) -> usize {
246        // Python parity: Resource.sdu = link.mtu - HEADER_MAXSIZE - IFAC_MIN_SIZE
247        // when MTU signalling is available on the link.
248        let mtu = link.engine.mtu() as usize;
249        let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
250        if derived > 0 {
251            derived
252        } else {
253            constants::RESOURCE_SDU
254        }
255    }
256
257    fn split_progress_parts(
258        segment_index: u64,
259        total_segments: u64,
260        current_done: usize,
261        current_total: usize,
262        sdu: usize,
263    ) -> (usize, usize) {
264        let max_parts_per_segment = constants::RESOURCE_MAX_EFFICIENT_SIZE.div_ceil(sdu.max(1));
265        let total = (total_segments as usize).saturating_mul(max_parts_per_segment);
266        let completed_segments = segment_index.saturating_sub(1) as usize;
267        let completed = completed_segments.saturating_mul(max_parts_per_segment);
268        let current = if current_total == 0 {
269            0
270        } else if current_total < max_parts_per_segment {
271            let scaled =
272                (current_done as f64) * (max_parts_per_segment as f64 / current_total as f64);
273            scaled.floor() as usize
274        } else {
275            current_done
276        };
277        (completed.saturating_add(current).min(total), total)
278    }
279
280    fn resource_hash_key(hash: &[u8]) -> Option<[u8; 32]> {
281        let mut key = [0u8; 32];
282        if hash.len() != key.len() {
283            return None;
284        }
285        key.copy_from_slice(hash);
286        Some(key)
287    }
288
289    fn incoming_split_progress(split: &IncomingSplitTransfer, sdu: usize) -> (usize, usize) {
290        Self::split_progress_parts(
291            split.current_segment_index,
292            split.total_segments,
293            split.current_received_parts,
294            split.current_total_parts,
295            sdu,
296        )
297    }
298
299    fn outgoing_split_progress(split: &OutgoingSplitTransfer, sdu: usize) -> (usize, usize) {
300        Self::split_progress_parts(
301            split.current_segment_index,
302            split.total_segments,
303            split.current_sent_parts,
304            split.current_total_parts,
305            sdu,
306        )
307    }
308
309    /// Create a new empty link manager.
310    pub fn new() -> Self {
311        LinkManager {
312            links: HashMap::new(),
313            link_destinations: HashMap::new(),
314            request_handlers: Vec::new(),
315            management_paths: Vec::new(),
316        }
317    }
318
319    /// Register a path hash as a management path.
320    /// Management requests are returned as ManagementRequest actions
321    /// for the driver to handle (since they need access to engine state).
322    pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
323        if !self.management_paths.contains(&path_hash) {
324            self.management_paths.push(path_hash);
325        }
326    }
327
328    /// Get the derived session key for a link (needed for hole-punch token derivation).
329    pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
330        self.links
331            .get(link_id)
332            .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
333    }
334
335    /// Return best-known routing hint for link packets.
336    pub fn get_link_route_hint(&self, link_id: &LinkId) -> Option<LinkRouteHint> {
337        self.links.get(link_id).and_then(|link| {
338            link.route_interface.map(|interface| LinkRouteHint {
339                interface,
340                transport_id: link.route_transport_id,
341            })
342        })
343    }
344
345    /// Set the best-known outbound route for a link.
346    pub fn set_link_route_hint(
347        &mut self,
348        link_id: &LinkId,
349        interface: rns_core::transport::types::InterfaceId,
350        transport_id: Option<[u8; 16]>,
351    ) -> bool {
352        let Some(link) = self.links.get_mut(link_id) else {
353            return false;
354        };
355        link.route_interface = Some(interface);
356        link.route_transport_id = transport_id;
357        true
358    }
359
360    /// Register a destination that can accept incoming links.
361    pub fn register_link_destination(
362        &mut self,
363        dest_hash: [u8; 16],
364        sig_prv: Ed25519PrivateKey,
365        sig_pub_bytes: [u8; 32],
366        resource_strategy: ResourceStrategy,
367    ) {
368        self.link_destinations.insert(
369            dest_hash,
370            LinkDestination {
371                sig_prv,
372                sig_pub_bytes,
373                resource_strategy,
374            },
375        );
376    }
377
378    /// Deregister a link destination.
379    pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
380        self.link_destinations.remove(dest_hash);
381    }
382
383    /// Register a request handler for a given path.
384    ///
385    /// `path`: the request path string (e.g. "/status")
386    /// `allowed_list`: None = allow all, Some(list) = restrict to these identity hashes
387    /// `handler`: called with (link_id, path, request_data, remote_identity) -> Option<response>
388    pub fn register_request_handler<F>(
389        &mut self,
390        path: &str,
391        allowed_list: Option<Vec<[u8; 16]>>,
392        handler: F,
393    ) where
394        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
395            + Send
396            + 'static,
397    {
398        let path_hash = compute_path_hash(path);
399        self.request_handlers.push(RequestHandlerEntry {
400            path: path.to_string(),
401            path_hash,
402            allowed_list,
403            handler: Box::new(move |link_id, p, data, remote| {
404                handler(link_id, p, data, remote).map(RequestResponse::Bytes)
405            }),
406        });
407    }
408
409    /// Register a request handler that can return resource responses with metadata.
410    pub fn register_request_handler_response<F>(
411        &mut self,
412        path: &str,
413        allowed_list: Option<Vec<[u8; 16]>>,
414        handler: F,
415    ) where
416        F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
417            + Send
418            + 'static,
419    {
420        let path_hash = compute_path_hash(path);
421        self.request_handlers.push(RequestHandlerEntry {
422            path: path.to_string(),
423            path_hash,
424            allowed_list,
425            handler: Box::new(handler),
426        });
427    }
428
429    /// Create an outbound link to a destination.
430    ///
431    /// `dest_sig_pub_bytes` is the destination's Ed25519 signing public key
432    /// (needed to verify LRPROOF). In Python this comes from the Destination's Identity.
433    ///
434    /// Returns `(link_id, actions)`. The first action will be a SendPacket with
435    /// the LINKREQUEST.
436    pub fn create_link(
437        &mut self,
438        dest_hash: &[u8; 16],
439        dest_sig_pub_bytes: &[u8; 32],
440        hops: u8,
441        mtu: u32,
442        rng: &mut dyn Rng,
443    ) -> (LinkId, Vec<LinkManagerAction>) {
444        let mode = LinkMode::Aes256Cbc;
445        let (mut engine, request_data) =
446            LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
447
448        // Build the LINKREQUEST packet to compute link_id
449        let flags = PacketFlags {
450            header_type: constants::HEADER_1,
451            context_flag: constants::FLAG_UNSET,
452            transport_type: constants::TRANSPORT_BROADCAST,
453            destination_type: constants::DESTINATION_SINGLE,
454            packet_type: constants::PACKET_TYPE_LINKREQUEST,
455        };
456
457        let packet = match RawPacket::pack(
458            flags,
459            0,
460            dest_hash,
461            None,
462            constants::CONTEXT_NONE,
463            &request_data,
464        ) {
465            Ok(p) => p,
466            Err(_) => {
467                // Should not happen with valid data
468                return ([0u8; 16], Vec::new());
469            }
470        };
471
472        engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
473        let link_id = *engine.link_id();
474
475        let managed = ManagedLink {
476            engine,
477            channel: None,
478            pending_channel_packets: HashMap::new(),
479            channel_send_ok: 0,
480            channel_send_not_ready: 0,
481            channel_send_too_big: 0,
482            channel_send_other_error: 0,
483            channel_messages_received: 0,
484            channel_proofs_sent: 0,
485            channel_proofs_received: 0,
486            dest_hash: *dest_hash,
487            remote_identity: None,
488            dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
489            incoming_resources: Vec::new(),
490            outgoing_resources: Vec::new(),
491            incoming_splits: HashMap::new(),
492            outgoing_splits: HashMap::new(),
493            resource_strategy: ResourceStrategy::default(),
494            route_interface: None,
495            route_transport_id: None,
496        };
497        self.links.insert(link_id, managed);
498
499        let mut actions = Vec::new();
500        // Register the link_id as a local destination so we can receive LRPROOF
501        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
502        // Send the LINKREQUEST packet
503        actions.push(LinkManagerAction::SendPacket {
504            raw: packet.raw,
505            dest_type: constants::DESTINATION_LINK,
506            attached_interface: None,
507        });
508
509        (link_id, actions)
510    }
511
512    /// Handle a packet delivered locally (via DeliverLocal).
513    ///
514    /// Returns actions for the driver to dispatch. The `dest_hash` is the
515    /// packet's destination_hash field. `raw` is the full packet bytes.
516    /// `packet_hash` is the SHA-256 hash.
517    pub fn handle_local_delivery(
518        &mut self,
519        dest_hash: [u8; 16],
520        raw: &[u8],
521        packet_hash: [u8; 32],
522        receiving_interface: rns_core::transport::types::InterfaceId,
523        rng: &mut dyn Rng,
524    ) -> Vec<LinkManagerAction> {
525        let packet = match RawPacket::unpack(raw) {
526            Ok(p) => p,
527            Err(_) => return Vec::new(),
528        };
529
530        match packet.flags.packet_type {
531            constants::PACKET_TYPE_LINKREQUEST => {
532                self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
533            }
534            constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
535                // LRPROOF: dest_hash is the link_id
536                self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
537            }
538            constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
539            constants::PACKET_TYPE_DATA => {
540                self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
541            }
542            _ => Vec::new(),
543        }
544    }
545
546    /// Handle an incoming LINKREQUEST packet.
547    fn handle_linkrequest(
548        &mut self,
549        dest_hash: &[u8; 16],
550        packet: &RawPacket,
551        receiving_interface: rns_core::transport::types::InterfaceId,
552        rng: &mut dyn Rng,
553    ) -> Vec<LinkManagerAction> {
554        // Look up the link destination
555        let ld = match self.link_destinations.get(dest_hash) {
556            Some(ld) => ld,
557            None => return Vec::new(),
558        };
559
560        let hashable = packet.get_hashable_part();
561        let now = time::now();
562
563        // Create responder engine
564        let (engine, lrproof_data) = match LinkEngine::new_responder(
565            &ld.sig_prv,
566            &ld.sig_pub_bytes,
567            &packet.data,
568            &hashable,
569            dest_hash,
570            packet.hops,
571            now,
572            rng,
573        ) {
574            Ok(r) => r,
575            Err(e) => {
576                log::debug!("LINKREQUEST rejected: {}", e);
577                return Vec::new();
578            }
579        };
580
581        let link_id = *engine.link_id();
582        log::debug!(
583            "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
584            &link_id[..4],
585            receiving_interface.0,
586            packet.flags.header_type,
587            packet.transport_id.is_some(),
588            packet.hops
589        );
590
591        let managed = ManagedLink {
592            engine,
593            channel: None,
594            pending_channel_packets: HashMap::new(),
595            channel_send_ok: 0,
596            channel_send_not_ready: 0,
597            channel_send_too_big: 0,
598            channel_send_other_error: 0,
599            channel_messages_received: 0,
600            channel_proofs_sent: 0,
601            channel_proofs_received: 0,
602            dest_hash: *dest_hash,
603            remote_identity: None,
604            dest_sig_pub_bytes: None,
605            incoming_resources: Vec::new(),
606            outgoing_resources: Vec::new(),
607            incoming_splits: HashMap::new(),
608            outgoing_splits: HashMap::new(),
609            resource_strategy: ld.resource_strategy,
610            route_interface: Some(receiving_interface),
611            route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
612                packet.transport_id
613            } else {
614                None
615            },
616        };
617        self.links.insert(link_id, managed);
618
619        // Build LRPROOF packet: type=PROOF, context=LRPROOF, dest=link_id
620        let flags = PacketFlags {
621            header_type: constants::HEADER_1,
622            context_flag: constants::FLAG_UNSET,
623            transport_type: constants::TRANSPORT_BROADCAST,
624            destination_type: constants::DESTINATION_LINK,
625            packet_type: constants::PACKET_TYPE_PROOF,
626        };
627
628        let mut actions = Vec::new();
629
630        // Register link_id as local destination so we receive link data
631        actions.push(LinkManagerAction::RegisterLinkDest { link_id });
632
633        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
634            flags,
635            packet.hops,
636            &link_id,
637            None,
638            constants::CONTEXT_LRPROOF,
639            &lrproof_data,
640        ) {
641            log::debug!(
642                "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
643                &link_id[..4],
644                receiving_interface.0,
645                packet.transport_id.is_some(),
646                packet.hops
647            );
648            actions.push(LinkManagerAction::SendPacket {
649                raw,
650                dest_type: constants::DESTINATION_LINK,
651                attached_interface: None,
652            });
653        }
654
655        // Reticulum interop fallback #1: queue an LRPROOF variant with
656        // hop=0, matching peers that validate LRPROOF with hop=0 semantics.
657        if packet.hops != 0 {
658            if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
659                flags,
660                0,
661                &link_id,
662                None,
663                constants::CONTEXT_LRPROOF,
664                &lrproof_data,
665            ) {
666                log::debug!(
667                    "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
668                    &link_id[..4],
669                    receiving_interface.0
670                );
671                actions.push(LinkManagerAction::SendPacket {
672                    raw,
673                    dest_type: constants::DESTINATION_LINK,
674                    attached_interface: None,
675                });
676            }
677        }
678
679        // Reticulum interop fallback #2: queue an LRPROOF +1 hop variant for
680        // peers that validate against a remaining-hop value derived
681        // differently from destination-side delivery hops.
682        if packet.hops < u8::MAX {
683            let hops_plus_one = packet.hops + 1;
684            if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
685                flags,
686                hops_plus_one,
687                &link_id,
688                None,
689                constants::CONTEXT_LRPROOF,
690                &lrproof_data,
691            ) {
692                log::debug!(
693                    "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
694                    &link_id[..4],
695                    receiving_interface.0,
696                    hops_plus_one
697                );
698                actions.push(LinkManagerAction::SendPacket {
699                    raw,
700                    dest_type: constants::DESTINATION_LINK,
701                    attached_interface: None,
702                });
703            }
704        }
705
706        // Notify hook system about the incoming link request
707        actions.push(LinkManagerAction::LinkRequestReceived {
708            link_id,
709            receiving_interface,
710        });
711
712        actions
713    }
714
715    fn handle_link_proof(
716        &mut self,
717        link_id: &LinkId,
718        packet: &RawPacket,
719        rng: &mut dyn Rng,
720    ) -> Vec<LinkManagerAction> {
721        if packet.data.len() < 32 {
722            return Vec::new();
723        }
724
725        let mut tracked_hash = [0u8; 32];
726        tracked_hash.copy_from_slice(&packet.data[..32]);
727
728        let Some(link) = self.links.get_mut(link_id) else {
729            return Vec::new();
730        };
731        let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
732            return Vec::new();
733        };
734        link.channel_proofs_received += 1;
735        let Some(channel) = link.channel.as_mut() else {
736            return Vec::new();
737        };
738
739        let chan_actions = channel.packet_delivered(sequence);
740        let _ = channel;
741        let _ = link;
742        self.process_channel_actions(link_id, chan_actions, rng)
743    }
744
745    fn build_link_packet_proof(
746        &mut self,
747        link_id: &LinkId,
748        packet_hash: &[u8; 32],
749    ) -> Vec<LinkManagerAction> {
750        let dest_hash = match self.links.get(link_id) {
751            Some(link) => link.dest_hash,
752            None => return Vec::new(),
753        };
754        let Some(ld) = self.link_destinations.get(&dest_hash) else {
755            return Vec::new();
756        };
757        if let Some(link) = self.links.get_mut(link_id) {
758            link.channel_proofs_sent += 1;
759        }
760
761        let signature = ld.sig_prv.sign(packet_hash);
762        let mut proof_data = Vec::with_capacity(96);
763        proof_data.extend_from_slice(packet_hash);
764        proof_data.extend_from_slice(&signature);
765
766        let flags = PacketFlags {
767            header_type: constants::HEADER_1,
768            context_flag: constants::FLAG_UNSET,
769            transport_type: constants::TRANSPORT_BROADCAST,
770            destination_type: constants::DESTINATION_LINK,
771            packet_type: constants::PACKET_TYPE_PROOF,
772        };
773        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
774            flags,
775            0,
776            link_id,
777            None,
778            constants::CONTEXT_NONE,
779            &proof_data,
780        ) {
781            vec![LinkManagerAction::SendPacket {
782                raw,
783                dest_type: constants::DESTINATION_LINK,
784                attached_interface: None,
785            }]
786        } else {
787            Vec::new()
788        }
789    }
790
791    /// Handle an incoming LRPROOF packet (initiator side).
792    fn handle_lrproof(
793        &mut self,
794        link_id_bytes: &[u8; 16],
795        packet: &RawPacket,
796        receiving_interface: rns_core::transport::types::InterfaceId,
797        rng: &mut dyn Rng,
798    ) -> Vec<LinkManagerAction> {
799        let link = match self.links.get_mut(link_id_bytes) {
800            Some(l) => l,
801            None => return Vec::new(),
802        };
803
804        link.route_interface = Some(receiving_interface);
805        if packet.flags.header_type == constants::HEADER_2 {
806            if let Some(transport_id) = packet.transport_id {
807                link.route_transport_id = Some(transport_id);
808            }
809        }
810        log::debug!(
811            "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
812            &link_id_bytes[..4],
813            receiving_interface.0,
814            packet.flags.header_type,
815            packet.transport_id.is_some()
816        );
817
818        if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
819            return Vec::new();
820        }
821
822        // The destination's signing pub key was stored when create_link was called
823        let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
824            Some(b) => b,
825            None => {
826                log::debug!("LRPROOF: no destination signing key available");
827                return Vec::new();
828            }
829        };
830
831        let now = time::now();
832        let (lrrtt_encrypted, link_actions) =
833            match link
834                .engine
835                .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
836            {
837                Ok(r) => r,
838                Err(e) => {
839                    log::debug!("LRPROOF validation failed: {}", e);
840                    return Vec::new();
841                }
842            };
843
844        let link_id = *link.engine.link_id();
845        let mut actions = Vec::new();
846
847        // Process link actions (StateChanged, LinkEstablished)
848        actions.extend(self.process_link_actions(&link_id, &link_actions));
849
850        // Send LRRTT: type=DATA, context=LRRTT, dest=link_id
851        let flags = PacketFlags {
852            header_type: constants::HEADER_1,
853            context_flag: constants::FLAG_UNSET,
854            transport_type: constants::TRANSPORT_BROADCAST,
855            destination_type: constants::DESTINATION_LINK,
856            packet_type: constants::PACKET_TYPE_DATA,
857        };
858
859        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
860            flags,
861            0,
862            &link_id,
863            None,
864            constants::CONTEXT_LRRTT,
865            &lrrtt_encrypted,
866        ) {
867            actions.push(LinkManagerAction::SendPacket {
868                raw,
869                dest_type: constants::DESTINATION_LINK,
870                attached_interface: None,
871            });
872        }
873
874        // Initialize channel now that link is active
875        if let Some(link) = self.links.get_mut(&link_id) {
876            if link.engine.state() == LinkState::Active {
877                let rtt = link.engine.rtt().unwrap_or(1.0);
878                link.channel = Some(Channel::new(rtt));
879            }
880        }
881
882        actions
883    }
884
885    /// Handle DATA packets on an established link.
886    ///
887    /// Structured to avoid borrow checker issues: we perform engine operations
888    /// on the link, collect intermediate results, drop the mutable borrow, then
889    /// call self methods that need immutable access.
890    fn handle_link_data(
891        &mut self,
892        link_id_bytes: &[u8; 16],
893        packet: &RawPacket,
894        packet_hash: [u8; 32],
895        receiving_interface: rns_core::transport::types::InterfaceId,
896        rng: &mut dyn Rng,
897    ) -> Vec<LinkManagerAction> {
898        // First pass: perform engine operations, collect results
899        enum LinkDataResult<'a> {
900            Lrrtt {
901                link_id: LinkId,
902                link_actions: Vec<LinkAction>,
903            },
904            Identify {
905                link_id: LinkId,
906                link_actions: Vec<LinkAction>,
907            },
908            Keepalive {
909                link_id: LinkId,
910                inbound_actions: Vec<LinkAction>,
911            },
912            LinkClose {
913                link_id: LinkId,
914                teardown_actions: Vec<LinkAction>,
915            },
916            Channel {
917                link_id: LinkId,
918                inbound_actions: Vec<LinkAction>,
919                plaintext: Vec<u8>,
920                packet_hash: [u8; 32],
921            },
922            Request {
923                link_id: LinkId,
924                inbound_actions: Vec<LinkAction>,
925                plaintext: Vec<u8>,
926                request_id: [u8; 16],
927            },
928            Response {
929                link_id: LinkId,
930                inbound_actions: Vec<LinkAction>,
931                plaintext: Vec<u8>,
932            },
933            Generic {
934                link_id: LinkId,
935                inbound_actions: Vec<LinkAction>,
936                plaintext: Vec<u8>,
937                context: u8,
938                packet_hash: [u8; 32],
939            },
940            /// Resource advertisement (link-decrypted).
941            ResourceAdv {
942                link_id: LinkId,
943                inbound_actions: Vec<LinkAction>,
944                plaintext: Vec<u8>,
945            },
946            /// Resource part request (link-decrypted).
947            ResourceReq {
948                link_id: LinkId,
949                inbound_actions: Vec<LinkAction>,
950                plaintext: Vec<u8>,
951            },
952            /// Resource hashmap update (link-decrypted).
953            ResourceHmu {
954                link_id: LinkId,
955                inbound_actions: Vec<LinkAction>,
956                plaintext: Vec<u8>,
957            },
958            /// Resource part data (NOT link-decrypted; parts are pre-encrypted by ResourceSender).
959            ResourcePart {
960                link_id: LinkId,
961                inbound_actions: Vec<LinkAction>,
962                raw_data: &'a [u8],
963            },
964            /// Resource proof (feed to sender).
965            ResourcePrf {
966                link_id: LinkId,
967                inbound_actions: Vec<LinkAction>,
968                plaintext: Vec<u8>,
969            },
970            /// Resource cancel from initiator (link-decrypted).
971            ResourceIcl {
972                link_id: LinkId,
973                inbound_actions: Vec<LinkAction>,
974            },
975            /// Resource cancel from receiver (link-decrypted).
976            ResourceRcl {
977                link_id: LinkId,
978                inbound_actions: Vec<LinkAction>,
979            },
980            Error,
981        }
982
983        let result = {
984            let link = match self.links.get_mut(link_id_bytes) {
985                Some(l) => l,
986                None => return Vec::new(),
987            };
988
989            link.route_interface = Some(receiving_interface);
990            if packet.flags.header_type == constants::HEADER_2 {
991                if let Some(transport_id) = packet.transport_id {
992                    link.route_transport_id = Some(transport_id);
993                }
994            } else {
995                link.route_transport_id = None;
996            }
997
998            match packet.context {
999                constants::CONTEXT_LRRTT => {
1000                    match link.engine.handle_lrrtt(&packet.data, time::now()) {
1001                        Ok(link_actions) => {
1002                            let link_id = *link.engine.link_id();
1003                            LinkDataResult::Lrrtt {
1004                                link_id,
1005                                link_actions,
1006                            }
1007                        }
1008                        Err(e) => {
1009                            log::debug!("LRRTT handling failed: {}", e);
1010                            LinkDataResult::Error
1011                        }
1012                    }
1013                }
1014                constants::CONTEXT_LINKIDENTIFY => {
1015                    match link.engine.handle_identify(&packet.data) {
1016                        Ok(link_actions) => {
1017                            let link_id = *link.engine.link_id();
1018                            link.remote_identity = link.engine.remote_identity().cloned();
1019                            LinkDataResult::Identify {
1020                                link_id,
1021                                link_actions,
1022                            }
1023                        }
1024                        Err(e) => {
1025                            log::debug!("LINKIDENTIFY failed: {}", e);
1026                            LinkDataResult::Error
1027                        }
1028                    }
1029                }
1030                constants::CONTEXT_KEEPALIVE => {
1031                    let inbound_actions = link.engine.record_inbound(time::now());
1032                    let link_id = *link.engine.link_id();
1033                    LinkDataResult::Keepalive {
1034                        link_id,
1035                        inbound_actions,
1036                    }
1037                }
1038                constants::CONTEXT_LINKCLOSE => {
1039                    let teardown_actions = link.engine.handle_teardown();
1040                    let link_id = *link.engine.link_id();
1041                    LinkDataResult::LinkClose {
1042                        link_id,
1043                        teardown_actions,
1044                    }
1045                }
1046                constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
1047                    Ok(plaintext) => {
1048                        let inbound_actions = link.engine.record_inbound(time::now());
1049                        let link_id = *link.engine.link_id();
1050                        LinkDataResult::Channel {
1051                            link_id,
1052                            inbound_actions,
1053                            plaintext,
1054                            packet_hash,
1055                        }
1056                    }
1057                    Err(_) => LinkDataResult::Error,
1058                },
1059                constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
1060                    Ok(plaintext) => {
1061                        let inbound_actions = link.engine.record_inbound(time::now());
1062                        let link_id = *link.engine.link_id();
1063                        let request_id = packet.get_truncated_hash();
1064                        LinkDataResult::Request {
1065                            link_id,
1066                            inbound_actions,
1067                            plaintext,
1068                            request_id,
1069                        }
1070                    }
1071                    Err(_) => LinkDataResult::Error,
1072                },
1073                constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
1074                    Ok(plaintext) => {
1075                        let inbound_actions = link.engine.record_inbound(time::now());
1076                        let link_id = *link.engine.link_id();
1077                        LinkDataResult::Response {
1078                            link_id,
1079                            inbound_actions,
1080                            plaintext,
1081                        }
1082                    }
1083                    Err(_) => LinkDataResult::Error,
1084                },
1085                // --- Resource contexts ---
1086                constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
1087                    Ok(plaintext) => {
1088                        let inbound_actions = link.engine.record_inbound(time::now());
1089                        let link_id = *link.engine.link_id();
1090                        LinkDataResult::ResourceAdv {
1091                            link_id,
1092                            inbound_actions,
1093                            plaintext,
1094                        }
1095                    }
1096                    Err(_) => LinkDataResult::Error,
1097                },
1098                constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
1099                    Ok(plaintext) => {
1100                        let inbound_actions = link.engine.record_inbound(time::now());
1101                        let link_id = *link.engine.link_id();
1102                        LinkDataResult::ResourceReq {
1103                            link_id,
1104                            inbound_actions,
1105                            plaintext,
1106                        }
1107                    }
1108                    Err(_) => LinkDataResult::Error,
1109                },
1110                constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
1111                    Ok(plaintext) => {
1112                        let inbound_actions = link.engine.record_inbound(time::now());
1113                        let link_id = *link.engine.link_id();
1114                        LinkDataResult::ResourceHmu {
1115                            link_id,
1116                            inbound_actions,
1117                            plaintext,
1118                        }
1119                    }
1120                    Err(_) => LinkDataResult::Error,
1121                },
1122                constants::CONTEXT_RESOURCE => {
1123                    // Resource parts are NOT link-decrypted — they're pre-encrypted by ResourceSender
1124                    let inbound_actions = link.engine.record_inbound(time::now());
1125                    let link_id = *link.engine.link_id();
1126                    LinkDataResult::ResourcePart {
1127                        link_id,
1128                        inbound_actions,
1129                        raw_data: &packet.data,
1130                    }
1131                }
1132                constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
1133                    Ok(plaintext) => {
1134                        let inbound_actions = link.engine.record_inbound(time::now());
1135                        let link_id = *link.engine.link_id();
1136                        LinkDataResult::ResourcePrf {
1137                            link_id,
1138                            inbound_actions,
1139                            plaintext,
1140                        }
1141                    }
1142                    Err(_) => LinkDataResult::Error,
1143                },
1144                constants::CONTEXT_RESOURCE_ICL => {
1145                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1146                    let inbound_actions = link.engine.record_inbound(time::now());
1147                    let link_id = *link.engine.link_id();
1148                    LinkDataResult::ResourceIcl {
1149                        link_id,
1150                        inbound_actions,
1151                    }
1152                }
1153                constants::CONTEXT_RESOURCE_RCL => {
1154                    let _ = link.engine.decrypt(&packet.data); // decrypt to validate
1155                    let inbound_actions = link.engine.record_inbound(time::now());
1156                    let link_id = *link.engine.link_id();
1157                    LinkDataResult::ResourceRcl {
1158                        link_id,
1159                        inbound_actions,
1160                    }
1161                }
1162                _ => match link.engine.decrypt(&packet.data) {
1163                    Ok(plaintext) => {
1164                        let inbound_actions = link.engine.record_inbound(time::now());
1165                        let link_id = *link.engine.link_id();
1166                        LinkDataResult::Generic {
1167                            link_id,
1168                            inbound_actions,
1169                            plaintext,
1170                            context: packet.context,
1171                            packet_hash,
1172                        }
1173                    }
1174                    Err(_) => LinkDataResult::Error,
1175                },
1176            }
1177        }; // mutable borrow of self.links dropped here
1178
1179        // Second pass: process results using self methods
1180        let mut actions = Vec::new();
1181        match result {
1182            LinkDataResult::Lrrtt {
1183                link_id,
1184                link_actions,
1185            } => {
1186                actions.extend(self.process_link_actions(&link_id, &link_actions));
1187                // Initialize channel
1188                if let Some(link) = self.links.get_mut(&link_id) {
1189                    if link.engine.state() == LinkState::Active {
1190                        let rtt = link.engine.rtt().unwrap_or(1.0);
1191                        link.channel = Some(Channel::new(rtt));
1192                    }
1193                }
1194            }
1195            LinkDataResult::Identify {
1196                link_id,
1197                link_actions,
1198            } => {
1199                actions.extend(self.process_link_actions(&link_id, &link_actions));
1200            }
1201            LinkDataResult::Keepalive {
1202                link_id,
1203                inbound_actions,
1204            } => {
1205                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1206                // record_inbound() already updated last_inbound, so the link
1207                // won't go stale.  The regular tick() keepalive mechanism will
1208                // send keepalives when needs_keepalive() returns true.
1209                // Do NOT reply here — unconditional replies create an infinite
1210                // ping-pong loop between the two link endpoints.
1211            }
1212            LinkDataResult::LinkClose {
1213                link_id,
1214                teardown_actions,
1215            } => {
1216                actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1217            }
1218            LinkDataResult::Channel {
1219                link_id,
1220                inbound_actions,
1221                plaintext,
1222                packet_hash,
1223            } => {
1224                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1225                // Feed plaintext to channel
1226                if let Some(link) = self.links.get_mut(&link_id) {
1227                    if let Some(ref mut channel) = link.channel {
1228                        let chan_actions = channel.receive(&plaintext, time::now());
1229                        link.channel_messages_received += chan_actions
1230                            .iter()
1231                            .filter(|action| {
1232                                matches!(
1233                                    action,
1234                                    rns_core::channel::ChannelAction::MessageReceived { .. }
1235                                )
1236                            })
1237                            .count()
1238                            as u64;
1239                        // process_channel_actions needs immutable self, so collect first
1240                        let _ = link;
1241                        actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1242                    }
1243                }
1244                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1245            }
1246            LinkDataResult::Request {
1247                link_id,
1248                inbound_actions,
1249                plaintext,
1250                request_id,
1251            } => {
1252                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1253                actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1254            }
1255            LinkDataResult::Response {
1256                link_id,
1257                inbound_actions,
1258                plaintext,
1259            } => {
1260                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1261                // Unpack msgpack response: [Bin(request_id), response_value]
1262                actions.extend(self.handle_response(&link_id, &plaintext, None));
1263            }
1264            LinkDataResult::Generic {
1265                link_id,
1266                inbound_actions,
1267                plaintext,
1268                context,
1269                packet_hash,
1270            } => {
1271                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1272                actions.push(LinkManagerAction::LinkDataReceived {
1273                    link_id,
1274                    context,
1275                    data: plaintext,
1276                });
1277
1278                actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1279            }
1280            LinkDataResult::ResourceAdv {
1281                link_id,
1282                inbound_actions,
1283                plaintext,
1284            } => {
1285                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1286                actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1287            }
1288            LinkDataResult::ResourceReq {
1289                link_id,
1290                inbound_actions,
1291                plaintext,
1292            } => {
1293                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1294                actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1295            }
1296            LinkDataResult::ResourceHmu {
1297                link_id,
1298                inbound_actions,
1299                plaintext,
1300            } => {
1301                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1302                actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1303            }
1304            LinkDataResult::ResourcePart {
1305                link_id,
1306                inbound_actions,
1307                raw_data,
1308            } => {
1309                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1310                actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1311            }
1312            LinkDataResult::ResourcePrf {
1313                link_id,
1314                inbound_actions,
1315                plaintext,
1316            } => {
1317                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1318                actions.extend(self.handle_resource_prf(&link_id, &plaintext, rng));
1319            }
1320            LinkDataResult::ResourceIcl {
1321                link_id,
1322                inbound_actions,
1323            } => {
1324                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1325                actions.extend(self.handle_resource_icl(&link_id));
1326            }
1327            LinkDataResult::ResourceRcl {
1328                link_id,
1329                inbound_actions,
1330            } => {
1331                actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1332                actions.extend(self.handle_resource_rcl(&link_id));
1333            }
1334            LinkDataResult::Error => {}
1335        }
1336
1337        actions
1338    }
1339
1340    /// Handle a request on a link.
1341    fn handle_request(
1342        &mut self,
1343        link_id: &LinkId,
1344        plaintext: &[u8],
1345        request_id: [u8; 16],
1346        rng: &mut dyn Rng,
1347    ) -> Vec<LinkManagerAction> {
1348        use rns_core::msgpack::{self, Value};
1349
1350        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1351        let arr = match msgpack::unpack_exact(plaintext) {
1352            Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1353            _ => return Vec::new(),
1354        };
1355
1356        let path_hash_bytes = match &arr[1] {
1357            Value::Bin(b) if b.len() == 16 => b,
1358            _ => return Vec::new(),
1359        };
1360        let mut path_hash = [0u8; 16];
1361        path_hash.copy_from_slice(path_hash_bytes);
1362
1363        // Re-encode the data element for the handler
1364        let request_data = msgpack::pack(&arr[2]);
1365
1366        // Check if this is a management path (handled by the driver)
1367        if self.management_paths.contains(&path_hash) {
1368            let remote_identity = self
1369                .links
1370                .get(link_id)
1371                .and_then(|l| l.remote_identity)
1372                .map(|(h, k)| (h, k));
1373            return vec![LinkManagerAction::ManagementRequest {
1374                link_id: *link_id,
1375                path_hash,
1376                data: request_data,
1377                request_id,
1378                remote_identity,
1379            }];
1380        }
1381
1382        // Look up handler by path_hash
1383        let handler_idx = self
1384            .request_handlers
1385            .iter()
1386            .position(|h| h.path_hash == path_hash);
1387        let handler_idx = match handler_idx {
1388            Some(i) => i,
1389            None => return Vec::new(),
1390        };
1391
1392        // Check ACL
1393        let remote_identity = self
1394            .links
1395            .get(link_id)
1396            .and_then(|l| l.remote_identity.as_ref());
1397        let handler = &self.request_handlers[handler_idx];
1398        if let Some(ref allowed) = handler.allowed_list {
1399            match remote_identity {
1400                Some((identity_hash, _)) => {
1401                    if !allowed.contains(identity_hash) {
1402                        log::debug!("Request denied: identity not in allowed list");
1403                        return Vec::new();
1404                    }
1405                }
1406                None => {
1407                    log::debug!("Request denied: peer not identified");
1408                    return Vec::new();
1409                }
1410            }
1411        }
1412
1413        // Call handler
1414        let path = handler.path.clone();
1415        let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1416
1417        let mut actions = Vec::new();
1418        if let Some(response) = response {
1419            match response {
1420                RequestResponse::Bytes(response_data) => {
1421                    let mut response_actions =
1422                        self.build_response_packet(link_id, &request_id, &response_data, rng);
1423                    if response_actions.is_empty() {
1424                        response_actions.extend(self.send_response_resource(
1425                            link_id,
1426                            &request_id,
1427                            &response_data,
1428                            None,
1429                            true,
1430                            rng,
1431                        ));
1432                    }
1433                    actions.extend(response_actions);
1434                }
1435                RequestResponse::Resource {
1436                    data,
1437                    metadata,
1438                    auto_compress,
1439                } => {
1440                    actions.extend(self.send_response_resource(
1441                        link_id,
1442                        &request_id,
1443                        &data,
1444                        metadata.as_deref(),
1445                        auto_compress,
1446                        rng,
1447                    ));
1448                }
1449            }
1450        }
1451
1452        actions
1453    }
1454
1455    /// Build a response packet for a request.
1456    /// `response_data` is the msgpack-encoded response value.
1457    fn build_response_packet(
1458        &self,
1459        link_id: &LinkId,
1460        request_id: &[u8; 16],
1461        response_data: &[u8],
1462        rng: &mut dyn Rng,
1463    ) -> Vec<LinkManagerAction> {
1464        use rns_core::msgpack::{self, Value};
1465
1466        let response_value = msgpack::unpack_exact(response_data)
1467            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1468
1469        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1470        let response_plaintext = msgpack::pack(&response_array);
1471
1472        let mut actions = Vec::new();
1473        if let Some(link) = self.links.get(link_id) {
1474            if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1475                let flags = PacketFlags {
1476                    header_type: constants::HEADER_1,
1477                    context_flag: constants::FLAG_UNSET,
1478                    transport_type: constants::TRANSPORT_BROADCAST,
1479                    destination_type: constants::DESTINATION_LINK,
1480                    packet_type: constants::PACKET_TYPE_DATA,
1481                };
1482                let max_mtu = link.engine.mtu() as usize;
1483                if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
1484                    flags,
1485                    0,
1486                    link_id,
1487                    None,
1488                    constants::CONTEXT_RESPONSE,
1489                    &encrypted,
1490                    max_mtu,
1491                ) {
1492                    actions.push(LinkManagerAction::SendPacket {
1493                        raw,
1494                        dest_type: constants::DESTINATION_LINK,
1495                        attached_interface: None,
1496                    });
1497                }
1498            }
1499        }
1500        actions
1501    }
1502
1503    fn send_response_resource(
1504        &mut self,
1505        link_id: &LinkId,
1506        request_id: &[u8; 16],
1507        response_data: &[u8],
1508        metadata: Option<&[u8]>,
1509        auto_compress: bool,
1510        rng: &mut dyn Rng,
1511    ) -> Vec<LinkManagerAction> {
1512        use rns_core::msgpack::{self, Value};
1513
1514        let link = match self.links.get_mut(link_id) {
1515            Some(l) => l,
1516            None => return Vec::new(),
1517        };
1518
1519        if link.engine.state() != LinkState::Active {
1520            return Vec::new();
1521        }
1522
1523        let now = time::now();
1524
1525        // Match Python resource response format from Link.handle_request:
1526        // packed_response = msgpack([request_id, response_value])
1527        // where response_value is decoded msgpack value, or Bin(raw bytes).
1528        let response_value = msgpack::unpack_exact(response_data)
1529            .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1530        let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1531        let resource_payload = msgpack::pack(&response_array);
1532
1533        let senders = match Self::build_resource_senders(
1534            link,
1535            &resource_payload,
1536            metadata,
1537            auto_compress,
1538            true, // is_response
1539            Some(request_id.to_vec()),
1540            rng,
1541            now,
1542        ) {
1543            Ok(s) => s,
1544            Err(e) => {
1545                log::debug!("Failed to create response ResourceSender: {}", e);
1546                return Vec::new();
1547            }
1548        };
1549
1550        let adv_actions = Self::start_resource_senders(link, senders, now);
1551
1552        let _ = link;
1553        self.process_resource_actions(link_id, adv_actions, rng)
1554    }
1555
1556    /// Send a management response on a link.
1557    /// Called by the driver after building the response for a ManagementRequest.
1558    pub fn send_management_response(
1559        &mut self,
1560        link_id: &LinkId,
1561        request_id: &[u8; 16],
1562        response_data: &[u8],
1563        rng: &mut dyn Rng,
1564    ) -> Vec<LinkManagerAction> {
1565        let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1566        if actions.is_empty() {
1567            actions.extend(self.send_response_resource(
1568                link_id,
1569                request_id,
1570                response_data,
1571                None,
1572                true,
1573                rng,
1574            ));
1575        }
1576        actions
1577    }
1578
1579    /// Send a request on a link.
1580    ///
1581    /// `data` is the msgpack-encoded request data value (e.g. msgpack([True]) for /status).
1582    ///
1583    /// Uses Python-compatible format: plaintext = msgpack([timestamp, path_hash_bytes, data_value]).
1584    /// Returns actions (the encrypted request packet). The response will arrive
1585    /// later via handle_local_delivery with CONTEXT_RESPONSE.
1586    pub fn send_request(
1587        &self,
1588        link_id: &LinkId,
1589        path: &str,
1590        data: &[u8],
1591        rng: &mut dyn Rng,
1592    ) -> Vec<LinkManagerAction> {
1593        use rns_core::msgpack::{self, Value};
1594
1595        let link = match self.links.get(link_id) {
1596            Some(l) => l,
1597            None => return Vec::new(),
1598        };
1599
1600        if link.engine.state() != LinkState::Active {
1601            return Vec::new();
1602        }
1603
1604        let path_hash = compute_path_hash(path);
1605
1606        // Decode data bytes to msgpack Value (or use Bin if can't decode)
1607        let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1608
1609        // Python-compatible format: msgpack([timestamp, Bin(path_hash), data_value])
1610        let request_array = Value::Array(vec![
1611            Value::Float(time::now()),
1612            Value::Bin(path_hash.to_vec()),
1613            data_value,
1614        ]);
1615        let plaintext = msgpack::pack(&request_array);
1616
1617        let encrypted = match link.engine.encrypt(&plaintext, rng) {
1618            Ok(e) => e,
1619            Err(_) => return Vec::new(),
1620        };
1621
1622        let flags = PacketFlags {
1623            header_type: constants::HEADER_1,
1624            context_flag: constants::FLAG_UNSET,
1625            transport_type: constants::TRANSPORT_BROADCAST,
1626            destination_type: constants::DESTINATION_LINK,
1627            packet_type: constants::PACKET_TYPE_DATA,
1628        };
1629
1630        let mut actions = Vec::new();
1631        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1632            flags,
1633            0,
1634            link_id,
1635            None,
1636            constants::CONTEXT_REQUEST,
1637            &encrypted,
1638        ) {
1639            actions.push(LinkManagerAction::SendPacket {
1640                raw,
1641                dest_type: constants::DESTINATION_LINK,
1642                attached_interface: None,
1643            });
1644        }
1645        actions
1646    }
1647
1648    /// Send encrypted data on a link with a given context.
1649    pub fn send_on_link(
1650        &self,
1651        link_id: &LinkId,
1652        plaintext: &[u8],
1653        context: u8,
1654        rng: &mut dyn Rng,
1655    ) -> Vec<LinkManagerAction> {
1656        let link = match self.links.get(link_id) {
1657            Some(l) => l,
1658            None => return Vec::new(),
1659        };
1660
1661        if link.engine.state() != LinkState::Active {
1662            return Vec::new();
1663        }
1664
1665        let encrypted = match link.engine.encrypt(plaintext, rng) {
1666            Ok(e) => e,
1667            Err(_) => return Vec::new(),
1668        };
1669
1670        let flags = PacketFlags {
1671            header_type: constants::HEADER_1,
1672            context_flag: constants::FLAG_UNSET,
1673            transport_type: constants::TRANSPORT_BROADCAST,
1674            destination_type: constants::DESTINATION_LINK,
1675            packet_type: constants::PACKET_TYPE_DATA,
1676        };
1677
1678        let mut actions = Vec::new();
1679        if let Ok((raw, _packet_hash)) =
1680            RawPacket::pack_raw_with_hash(flags, 0, link_id, None, context, &encrypted)
1681        {
1682            actions.push(LinkManagerAction::SendPacket {
1683                raw,
1684                dest_type: constants::DESTINATION_LINK,
1685                attached_interface: None,
1686            });
1687        }
1688        actions
1689    }
1690
1691    /// Send an identify message on a link (initiator reveals identity to responder).
1692    pub fn identify(
1693        &self,
1694        link_id: &LinkId,
1695        identity: &rns_crypto::identity::Identity,
1696        rng: &mut dyn Rng,
1697    ) -> Vec<LinkManagerAction> {
1698        let link = match self.links.get(link_id) {
1699            Some(l) => l,
1700            None => return Vec::new(),
1701        };
1702
1703        let encrypted = match link.engine.build_identify(identity, rng) {
1704            Ok(e) => e,
1705            Err(_) => return Vec::new(),
1706        };
1707
1708        let flags = PacketFlags {
1709            header_type: constants::HEADER_1,
1710            context_flag: constants::FLAG_UNSET,
1711            transport_type: constants::TRANSPORT_BROADCAST,
1712            destination_type: constants::DESTINATION_LINK,
1713            packet_type: constants::PACKET_TYPE_DATA,
1714        };
1715
1716        let mut actions = Vec::new();
1717        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1718            flags,
1719            0,
1720            link_id,
1721            None,
1722            constants::CONTEXT_LINKIDENTIFY,
1723            &encrypted,
1724        ) {
1725            actions.push(LinkManagerAction::SendPacket {
1726                raw,
1727                dest_type: constants::DESTINATION_LINK,
1728                attached_interface: None,
1729            });
1730        }
1731        actions
1732    }
1733
1734    /// Tear down a link.
1735    pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1736        let link = match self.links.get_mut(link_id) {
1737            Some(l) => l,
1738            None => return Vec::new(),
1739        };
1740
1741        let teardown_actions = link.engine.teardown();
1742        if let Some(ref mut channel) = link.channel {
1743            channel.shutdown();
1744        }
1745
1746        let mut actions = self.process_link_actions(link_id, &teardown_actions);
1747
1748        // Send LINKCLOSE packet
1749        let flags = PacketFlags {
1750            header_type: constants::HEADER_1,
1751            context_flag: constants::FLAG_UNSET,
1752            transport_type: constants::TRANSPORT_BROADCAST,
1753            destination_type: constants::DESTINATION_LINK,
1754            packet_type: constants::PACKET_TYPE_DATA,
1755        };
1756        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1757            flags,
1758            0,
1759            link_id,
1760            None,
1761            constants::CONTEXT_LINKCLOSE,
1762            &[],
1763        ) {
1764            actions.push(LinkManagerAction::SendPacket {
1765                raw,
1766                dest_type: constants::DESTINATION_LINK,
1767                attached_interface: None,
1768            });
1769        }
1770
1771        actions
1772    }
1773
1774    /// Tear down all managed links.
1775    pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1776        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1777        let mut actions = Vec::new();
1778        for link_id in link_ids {
1779            actions.extend(self.teardown_link(&link_id));
1780        }
1781        actions
1782    }
1783
1784    /// Handle a response on a link.
1785    fn handle_response(
1786        &self,
1787        link_id: &LinkId,
1788        plaintext: &[u8],
1789        metadata: Option<Vec<u8>>,
1790    ) -> Vec<LinkManagerAction> {
1791        use rns_core::msgpack;
1792
1793        // Python-compatible response: msgpack([Bin(request_id), response_value])
1794        let arr = match msgpack::unpack_exact(plaintext) {
1795            Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1796            _ => return Vec::new(),
1797        };
1798
1799        let request_id_bytes = match &arr[0] {
1800            msgpack::Value::Bin(b) if b.len() == 16 => b,
1801            _ => return Vec::new(),
1802        };
1803        let mut request_id = [0u8; 16];
1804        request_id.copy_from_slice(request_id_bytes);
1805
1806        let response_data = msgpack::pack(&arr[1]);
1807
1808        vec![LinkManagerAction::ResponseReceived {
1809            link_id: *link_id,
1810            request_id,
1811            data: response_data,
1812            metadata,
1813        }]
1814    }
1815
1816    fn build_resource_senders(
1817        link: &ManagedLink,
1818        data: &[u8],
1819        metadata: Option<&[u8]>,
1820        auto_compress: bool,
1821        is_response: bool,
1822        request_id: Option<Vec<u8>>,
1823        rng: &mut dyn Rng,
1824        now: f64,
1825    ) -> Result<Vec<ResourceSender>, rns_core::resource::ResourceError> {
1826        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1827        let resource_sdu = Self::resource_sdu_for_link(link);
1828        let metadata_overhead = metadata.map(|m| 3 + m.len()).unwrap_or(0);
1829        let logical_size = metadata_overhead + data.len();
1830
1831        if logical_size <= constants::RESOURCE_MAX_EFFICIENT_SIZE {
1832            let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1833            let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1834                link.engine
1835                    .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1836                    .unwrap_or_else(|_| plaintext.to_vec())
1837            };
1838            return ResourceSender::new(
1839                data,
1840                metadata,
1841                resource_sdu,
1842                &encrypt_fn,
1843                &Bzip2Compressor,
1844                rng,
1845                now,
1846                auto_compress,
1847                is_response,
1848                request_id,
1849                1,
1850                1,
1851                None,
1852                link_rtt,
1853                6.0,
1854            )
1855            .map(|sender| vec![sender]);
1856        }
1857
1858        if metadata_overhead > constants::RESOURCE_MAX_EFFICIENT_SIZE {
1859            return Err(rns_core::resource::ResourceError::TooLarge);
1860        }
1861
1862        let first_payload_len = core::cmp::min(
1863            data.len(),
1864            constants::RESOURCE_MAX_EFFICIENT_SIZE - metadata_overhead,
1865        );
1866        let remaining = data.len().saturating_sub(first_payload_len);
1867        let total_segments = 1 + remaining.div_ceil(constants::RESOURCE_MAX_EFFICIENT_SIZE) as u64;
1868
1869        let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1870        let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1871            link.engine
1872                .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1873                .unwrap_or_else(|_| plaintext.to_vec())
1874        };
1875
1876        let mut senders = Vec::new();
1877        let mut first = ResourceSender::new(
1878            &data[..first_payload_len],
1879            metadata,
1880            resource_sdu,
1881            &encrypt_fn,
1882            &Bzip2Compressor,
1883            rng,
1884            now,
1885            auto_compress,
1886            is_response,
1887            request_id.clone(),
1888            1,
1889            total_segments,
1890            None,
1891            link_rtt,
1892            6.0,
1893        )?;
1894        first.data_size = logical_size;
1895        let original_hash = first.original_hash;
1896        let has_metadata = metadata.is_some();
1897        senders.push(first);
1898
1899        let mut offset = first_payload_len;
1900        let mut segment_index = 2;
1901        while offset < data.len() {
1902            let end = core::cmp::min(offset + constants::RESOURCE_MAX_EFFICIENT_SIZE, data.len());
1903            let mut sender = ResourceSender::new(
1904                &data[offset..end],
1905                None,
1906                resource_sdu,
1907                &encrypt_fn,
1908                &Bzip2Compressor,
1909                rng,
1910                now,
1911                auto_compress,
1912                is_response,
1913                request_id.clone(),
1914                segment_index,
1915                total_segments,
1916                Some(original_hash),
1917                link_rtt,
1918                6.0,
1919            )?;
1920            sender.data_size = logical_size;
1921            sender.flags.has_metadata = has_metadata;
1922            senders.push(sender);
1923            segment_index += 1;
1924            offset = end;
1925        }
1926
1927        Ok(senders)
1928    }
1929
1930    fn start_resource_senders(
1931        link: &mut ManagedLink,
1932        mut senders: Vec<ResourceSender>,
1933        now: f64,
1934    ) -> Vec<ResourceAction> {
1935        if senders.is_empty() {
1936            return Vec::new();
1937        }
1938
1939        let mut first = senders.remove(0);
1940        let adv_actions = first.advertise(now);
1941
1942        if first.total_segments > 1 {
1943            let original_hash = first.original_hash;
1944            let split = OutgoingSplitTransfer {
1945                total_segments: first.total_segments,
1946                completed_segments: 0,
1947                current_segment_index: first.segment_index,
1948                current_sent_parts: 0,
1949                current_total_parts: first.total_parts(),
1950            };
1951            link.outgoing_splits.insert(original_hash, split);
1952        }
1953
1954        link.outgoing_resources.push(first);
1955        link.outgoing_resources.extend(senders);
1956        adv_actions
1957    }
1958
1959    /// Handle resource advertisement (CONTEXT_RESOURCE_ADV).
1960    fn handle_resource_adv(
1961        &mut self,
1962        link_id: &LinkId,
1963        adv_plaintext: &[u8],
1964        rng: &mut dyn Rng,
1965    ) -> Vec<LinkManagerAction> {
1966        let link = match self.links.get_mut(link_id) {
1967            Some(l) => l,
1968            None => return Vec::new(),
1969        };
1970
1971        let link_rtt = link.engine.rtt().unwrap_or(1.0);
1972        let resource_sdu = Self::resource_sdu_for_link(link);
1973        let now = time::now();
1974
1975        let receiver = match ResourceReceiver::from_advertisement(
1976            adv_plaintext,
1977            resource_sdu,
1978            link_rtt,
1979            now,
1980            None,
1981            None,
1982        ) {
1983            Ok(r) => r,
1984            Err(e) => {
1985                log::debug!("Resource ADV rejected: {}", e);
1986                return Vec::new();
1987            }
1988        };
1989
1990        let strategy = link.resource_strategy;
1991        let resource_hash = receiver.resource_hash.clone();
1992        let transfer_size = receiver.transfer_size;
1993        let has_metadata = receiver.has_metadata;
1994        let is_response = receiver.flags.is_response;
1995        let is_split = receiver.flags.split;
1996        let segment_index = receiver.segment_index;
1997        let total_segments = receiver.total_segments;
1998        let original_hash = match Self::resource_hash_key(&receiver.original_hash) {
1999            Some(key) => key,
2000            None => return Vec::new(),
2001        };
2002
2003        if is_split && segment_index > 1 {
2004            let should_accept = link
2005                .incoming_splits
2006                .get(&original_hash)
2007                .is_some_and(|split| {
2008                    split.completed_segments + 1 == segment_index
2009                        && split.total_segments == total_segments
2010                });
2011
2012            if !should_accept {
2013                let reject_actions = {
2014                    let mut r = receiver;
2015                    r.reject()
2016                };
2017                let _ = link;
2018                return self.process_resource_actions(link_id, reject_actions, rng);
2019            }
2020
2021            let current_total_parts = receiver.total_parts;
2022            link.incoming_resources.push(receiver);
2023            let idx = link.incoming_resources.len() - 1;
2024            if let Some(split) = link.incoming_splits.get_mut(&original_hash) {
2025                split.current_segment_index = segment_index;
2026                split.current_received_parts = 0;
2027                split.current_total_parts = current_total_parts;
2028            }
2029            let resource_actions = link.incoming_resources[idx].accept(now);
2030            let _ = link;
2031            return self.process_resource_actions(link_id, resource_actions, rng);
2032        }
2033
2034        if is_response {
2035            // Response resources bypass the application acceptance strategy —
2036            // they are answers to pending requests, not independent resources.
2037            if is_split {
2038                link.incoming_splits.insert(
2039                    original_hash,
2040                    IncomingSplitTransfer {
2041                        total_segments,
2042                        completed_segments: 0,
2043                        current_segment_index: segment_index,
2044                        current_received_parts: 0,
2045                        current_total_parts: receiver.total_parts,
2046                        data: Vec::new(),
2047                        metadata: None,
2048                        is_response,
2049                    },
2050                );
2051            }
2052            link.incoming_resources.push(receiver);
2053            let idx = link.incoming_resources.len() - 1;
2054            let resource_actions = link.incoming_resources[idx].accept(now);
2055            let _ = link;
2056            return self.process_resource_actions(link_id, resource_actions, rng);
2057        }
2058
2059        match strategy {
2060            ResourceStrategy::AcceptNone => {
2061                // Reject: send RCL
2062                let reject_actions = {
2063                    let mut r = receiver;
2064                    r.reject()
2065                };
2066                self.process_resource_actions(link_id, reject_actions, rng)
2067            }
2068            ResourceStrategy::AcceptAll => {
2069                if is_split {
2070                    link.incoming_splits.insert(
2071                        original_hash,
2072                        IncomingSplitTransfer {
2073                            total_segments,
2074                            completed_segments: 0,
2075                            current_segment_index: segment_index,
2076                            current_received_parts: 0,
2077                            current_total_parts: receiver.total_parts,
2078                            data: Vec::new(),
2079                            metadata: None,
2080                            is_response,
2081                        },
2082                    );
2083                }
2084                link.incoming_resources.push(receiver);
2085                let idx = link.incoming_resources.len() - 1;
2086                let resource_actions = link.incoming_resources[idx].accept(now);
2087                let _ = link;
2088                self.process_resource_actions(link_id, resource_actions, rng)
2089            }
2090            ResourceStrategy::AcceptApp => {
2091                link.incoming_resources.push(receiver);
2092                // Query application callback
2093                vec![LinkManagerAction::ResourceAcceptQuery {
2094                    link_id: *link_id,
2095                    resource_hash,
2096                    transfer_size,
2097                    has_metadata,
2098                }]
2099            }
2100        }
2101    }
2102
2103    /// Accept or reject a pending resource (for AcceptApp strategy).
2104    pub fn accept_resource(
2105        &mut self,
2106        link_id: &LinkId,
2107        resource_hash: &[u8],
2108        accept: bool,
2109        rng: &mut dyn Rng,
2110    ) -> Vec<LinkManagerAction> {
2111        let link = match self.links.get_mut(link_id) {
2112            Some(l) => l,
2113            None => return Vec::new(),
2114        };
2115
2116        let now = time::now();
2117        let idx = link
2118            .incoming_resources
2119            .iter()
2120            .position(|r| r.resource_hash == resource_hash);
2121        let idx = match idx {
2122            Some(i) => i,
2123            None => return Vec::new(),
2124        };
2125
2126        if accept && link.incoming_resources[idx].flags.split {
2127            if let Some(original_hash) =
2128                Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2129            {
2130                link.incoming_splits
2131                    .entry(original_hash)
2132                    .or_insert_with(|| IncomingSplitTransfer {
2133                        total_segments: link.incoming_resources[idx].total_segments,
2134                        completed_segments: 0,
2135                        current_segment_index: link.incoming_resources[idx].segment_index,
2136                        current_received_parts: 0,
2137                        current_total_parts: link.incoming_resources[idx].total_parts,
2138                        data: Vec::new(),
2139                        metadata: None,
2140                        is_response: link.incoming_resources[idx].flags.is_response,
2141                    });
2142            }
2143        }
2144
2145        let resource_actions = if accept {
2146            link.incoming_resources[idx].accept(now)
2147        } else {
2148            link.incoming_resources[idx].reject()
2149        };
2150
2151        let _ = link;
2152        self.process_resource_actions(link_id, resource_actions, rng)
2153    }
2154
2155    /// Handle resource request (CONTEXT_RESOURCE_REQ) — feed to sender.
2156    fn handle_resource_req(
2157        &mut self,
2158        link_id: &LinkId,
2159        plaintext: &[u8],
2160        rng: &mut dyn Rng,
2161    ) -> Vec<LinkManagerAction> {
2162        let link = match self.links.get_mut(link_id) {
2163            Some(l) => l,
2164            None => return Vec::new(),
2165        };
2166
2167        let now = time::now();
2168        let mut all_actions = Vec::new();
2169        let mut progress_update = None;
2170        for sender in &mut link.outgoing_resources {
2171            if sender.flags.split && sender.status == rns_core::resource::ResourceStatus::Queued {
2172                continue;
2173            }
2174            let before_sent = sender.sent_parts;
2175            let resource_actions = sender.handle_request(plaintext, now);
2176            if !resource_actions.is_empty() {
2177                if sender.sent_parts != before_sent {
2178                    if sender.flags.split {
2179                        if let Some(split) = link.outgoing_splits.get_mut(&sender.original_hash) {
2180                            split.current_segment_index = sender.segment_index;
2181                            split.current_sent_parts = sender.sent_parts;
2182                            split.current_total_parts = sender.total_parts();
2183                            progress_update =
2184                                Some(Self::outgoing_split_progress(split, sender.sdu));
2185                        }
2186                    } else {
2187                        progress_update = Some((sender.sent_parts, sender.total_parts()));
2188                    }
2189                }
2190                all_actions.extend(resource_actions);
2191                break;
2192            }
2193        }
2194
2195        let _ = link;
2196        let mut out = self.process_resource_actions(link_id, all_actions, rng);
2197        if let Some((received, total)) = progress_update {
2198            out.push(LinkManagerAction::ResourceProgress {
2199                link_id: *link_id,
2200                received,
2201                total,
2202            });
2203        }
2204        out
2205    }
2206
2207    /// Handle resource HMU (CONTEXT_RESOURCE_HMU) — feed to receiver.
2208    fn handle_resource_hmu(
2209        &mut self,
2210        link_id: &LinkId,
2211        plaintext: &[u8],
2212        rng: &mut dyn Rng,
2213    ) -> Vec<LinkManagerAction> {
2214        let link = match self.links.get_mut(link_id) {
2215            Some(l) => l,
2216            None => return Vec::new(),
2217        };
2218
2219        let now = time::now();
2220        let mut all_actions = Vec::new();
2221        for receiver in &mut link.incoming_resources {
2222            let resource_actions = receiver.handle_hashmap_update(plaintext, now);
2223            if !resource_actions.is_empty() {
2224                all_actions.extend(resource_actions);
2225                break;
2226            }
2227        }
2228
2229        let _ = link;
2230        self.process_resource_actions(link_id, all_actions, rng)
2231    }
2232
2233    /// Handle resource part (CONTEXT_RESOURCE) — feed raw to receiver.
2234    fn handle_resource_part(
2235        &mut self,
2236        link_id: &LinkId,
2237        raw_data: &[u8],
2238        rng: &mut dyn Rng,
2239    ) -> Vec<LinkManagerAction> {
2240        let link = match self.links.get_mut(link_id) {
2241            Some(l) => l,
2242            None => return Vec::new(),
2243        };
2244
2245        let now = time::now();
2246        let resource_sdu = Self::resource_sdu_for_link(link);
2247        let mut all_actions = Vec::new();
2248        let mut assemble_idx = None;
2249        let mut assembled_is_response = false;
2250
2251        for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
2252            if receiver.status >= rns_core::resource::ResourceStatus::Complete {
2253                continue;
2254            }
2255            let resource_actions = receiver.receive_part(raw_data, now);
2256            if !resource_actions.is_empty() {
2257                if receiver.received_count == receiver.total_parts {
2258                    assemble_idx = Some(idx);
2259                }
2260                if receiver.flags.split {
2261                    if let Some(key) = Self::resource_hash_key(&receiver.original_hash) {
2262                        if let Some(split) = link.incoming_splits.get_mut(&key) {
2263                            split.current_segment_index = receiver.segment_index;
2264                            split.current_received_parts = receiver.received_count;
2265                            split.current_total_parts = receiver.total_parts;
2266                            let (received, total) =
2267                                Self::incoming_split_progress(split, resource_sdu);
2268                            for action in resource_actions {
2269                                match action {
2270                                    ResourceAction::ProgressUpdate { .. } => {
2271                                        all_actions.push(ResourceAction::ProgressUpdate {
2272                                            received,
2273                                            total,
2274                                        });
2275                                    }
2276                                    other => all_actions.push(other),
2277                                }
2278                            }
2279                        } else {
2280                            all_actions.extend(resource_actions);
2281                        }
2282                    } else {
2283                        all_actions.extend(resource_actions);
2284                    }
2285                } else {
2286                    all_actions.extend(resource_actions);
2287                }
2288                break;
2289            }
2290        }
2291
2292        if let Some(idx) = assemble_idx {
2293            let split_key = if link.incoming_resources[idx].flags.split {
2294                Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2295            } else {
2296                None
2297            };
2298            let split_segment_index = link.incoming_resources[idx].segment_index;
2299            let split_segment_total = link.incoming_resources[idx].total_segments;
2300            let split_segment_parts = link.incoming_resources[idx].total_parts;
2301            let split_is_response = link.incoming_resources[idx].flags.is_response;
2302            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2303                link.engine.decrypt(ciphertext).map_err(|_| ())
2304            };
2305            let mut assemble_actions =
2306                link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
2307            assembled_is_response = split_is_response;
2308
2309            if let Some(key) = split_key {
2310                let mut converted_actions = Vec::new();
2311                let mut segment_data = None;
2312                let mut segment_metadata = None;
2313                for action in assemble_actions {
2314                    match action {
2315                        ResourceAction::DataReceived { data, metadata } => {
2316                            segment_data = Some(data);
2317                            segment_metadata = metadata;
2318                        }
2319                        ResourceAction::Completed => {}
2320                        other => converted_actions.push(other),
2321                    }
2322                }
2323
2324                if let Some(data) = segment_data {
2325                    if let Some(split) = link.incoming_splits.get_mut(&key) {
2326                        split.data.extend_from_slice(&data);
2327                        if segment_metadata.is_some() {
2328                            split.metadata = segment_metadata;
2329                        }
2330                        split.completed_segments = split_segment_index;
2331                        split.current_segment_index = split_segment_index;
2332                        split.current_received_parts = split_segment_parts;
2333                        split.current_total_parts = split_segment_parts;
2334                    }
2335
2336                    if split_segment_index == split_segment_total {
2337                        if let Some(split) = link.incoming_splits.remove(&key) {
2338                            assembled_is_response = split.is_response;
2339                            converted_actions.push(ResourceAction::DataReceived {
2340                                data: split.data,
2341                                metadata: split.metadata,
2342                            });
2343                            converted_actions.push(ResourceAction::Completed);
2344                        }
2345                    }
2346                }
2347
2348                assemble_actions = converted_actions;
2349            }
2350            all_actions.extend(assemble_actions);
2351        }
2352
2353        let _ = link;
2354        let mut out = self.process_resource_actions(link_id, all_actions, rng);
2355
2356        if assembled_is_response {
2357            let mut converted = Vec::new();
2358            for action in out {
2359                match action {
2360                    LinkManagerAction::ResourceReceived { data, metadata, .. } => {
2361                        converted.extend(self.handle_response(link_id, &data, metadata));
2362                    }
2363                    LinkManagerAction::ResourceAcceptQuery { .. } => {
2364                        // Response resources bypass application acceptance
2365                    }
2366                    other => converted.push(other),
2367                }
2368            }
2369            out = converted;
2370        }
2371
2372        out
2373    }
2374
2375    /// Handle resource proof (CONTEXT_RESOURCE_PRF) — feed to sender.
2376    fn handle_resource_prf(
2377        &mut self,
2378        link_id: &LinkId,
2379        plaintext: &[u8],
2380        rng: &mut dyn Rng,
2381    ) -> Vec<LinkManagerAction> {
2382        let link = match self.links.get_mut(link_id) {
2383            Some(l) => l,
2384            None => return Vec::new(),
2385        };
2386
2387        let now = time::now();
2388        let mut result_actions = Vec::new();
2389        let mut completed_sender = None;
2390        let mut failed_split = None;
2391        let proof_hash = plaintext.get(..32);
2392        for sender in &mut link.outgoing_resources {
2393            if proof_hash.is_some_and(|hash| hash != sender.resource_hash.as_slice()) {
2394                continue;
2395            }
2396            let resource_actions = sender.handle_proof(plaintext, now);
2397            if !resource_actions.is_empty() {
2398                if resource_actions
2399                    .iter()
2400                    .any(|action| matches!(action, ResourceAction::Completed))
2401                {
2402                    completed_sender = Some((
2403                        sender.original_hash,
2404                        sender.segment_index,
2405                        sender.total_segments,
2406                        sender.total_parts(),
2407                    ));
2408                }
2409                if sender.flags.split
2410                    && resource_actions
2411                        .iter()
2412                        .any(|action| matches!(action, ResourceAction::Failed(_)))
2413                {
2414                    failed_split = Some(sender.original_hash);
2415                }
2416                result_actions.extend(resource_actions);
2417                break;
2418            }
2419        }
2420
2421        // Convert to LinkManagerActions
2422        let mut actions = Vec::new();
2423        let mut advertise_next = None;
2424        for ra in result_actions {
2425            match ra {
2426                ResourceAction::Completed => {
2427                    if let Some((original_hash, segment_index, total_segments, total_parts)) =
2428                        completed_sender
2429                    {
2430                        if total_segments > 1 && segment_index < total_segments {
2431                            if let Some(split) = link.outgoing_splits.get_mut(&original_hash) {
2432                                split.completed_segments = segment_index;
2433                                split.current_segment_index = segment_index;
2434                                split.current_sent_parts = total_parts;
2435                                split.current_total_parts = total_parts;
2436                                if let Some(next) = link.outgoing_resources.iter_mut().find(|s| {
2437                                    s.flags.split
2438                                        && s.original_hash == original_hash
2439                                        && s.segment_index == segment_index + 1
2440                                }) {
2441                                    split.current_segment_index = next.segment_index;
2442                                    split.current_sent_parts = 0;
2443                                    split.current_total_parts = next.total_parts();
2444                                    advertise_next = Some(next.advertise(now));
2445                                }
2446                            }
2447                        } else {
2448                            link.outgoing_splits.remove(&original_hash);
2449                            actions
2450                                .push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2451                        }
2452                    } else {
2453                        actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2454                    }
2455                }
2456                ResourceAction::Failed(e) => {
2457                    if let Some(original_hash) = failed_split {
2458                        link.outgoing_splits.remove(&original_hash);
2459                    }
2460                    actions.push(LinkManagerAction::ResourceFailed {
2461                        link_id: *link_id,
2462                        error: format!("{}", e),
2463                    });
2464                }
2465                _ => {}
2466            }
2467        }
2468
2469        // Clean up completed/failed senders
2470        link.outgoing_resources
2471            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2472
2473        let _ = link;
2474        if let Some(next_actions) = advertise_next {
2475            actions.extend(self.process_resource_actions(link_id, next_actions, rng));
2476        }
2477
2478        actions
2479    }
2480
2481    /// Handle cancel from initiator (CONTEXT_RESOURCE_ICL).
2482    fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2483        let link = match self.links.get_mut(link_id) {
2484            Some(l) => l,
2485            None => return Vec::new(),
2486        };
2487
2488        let mut actions = Vec::new();
2489        for receiver in &mut link.incoming_resources {
2490            let ra = receiver.handle_cancel();
2491            for a in ra {
2492                if let ResourceAction::Failed(ref e) = a {
2493                    actions.push(LinkManagerAction::ResourceFailed {
2494                        link_id: *link_id,
2495                        error: format!("{}", e),
2496                    });
2497                }
2498            }
2499        }
2500        link.incoming_resources
2501            .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
2502        link.incoming_splits.clear();
2503        actions
2504    }
2505
2506    /// Handle cancel from receiver (CONTEXT_RESOURCE_RCL).
2507    fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2508        let link = match self.links.get_mut(link_id) {
2509            Some(l) => l,
2510            None => return Vec::new(),
2511        };
2512
2513        let mut actions = Vec::new();
2514        for sender in &mut link.outgoing_resources {
2515            let ra = sender.handle_reject();
2516            for a in ra {
2517                if let ResourceAction::Failed(ref e) = a {
2518                    actions.push(LinkManagerAction::ResourceFailed {
2519                        link_id: *link_id,
2520                        error: format!("{}", e),
2521                    });
2522                }
2523            }
2524        }
2525        link.outgoing_resources
2526            .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2527        link.outgoing_splits.clear();
2528        actions
2529    }
2530
2531    /// Convert ResourceActions to LinkManagerActions.
2532    fn process_resource_actions(
2533        &mut self,
2534        link_id: &LinkId,
2535        actions: Vec<ResourceAction>,
2536        rng: &mut dyn Rng,
2537    ) -> Vec<LinkManagerAction> {
2538        let mut result = Vec::new();
2539        for action in actions {
2540            match action {
2541                ResourceAction::SendAdvertisement(data) => {
2542                    // Link-encrypt and send as CONTEXT_RESOURCE_ADV
2543                    let encrypted = self
2544                        .links
2545                        .get(link_id)
2546                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2547                    if let Some(encrypted) = encrypted {
2548                        result.extend(self.build_link_packet(
2549                            link_id,
2550                            constants::CONTEXT_RESOURCE_ADV,
2551                            &encrypted,
2552                        ));
2553                    }
2554                }
2555                ResourceAction::SendPart(data) => {
2556                    // Parts are NOT link-encrypted — send raw as CONTEXT_RESOURCE
2557                    result.extend(self.build_link_packet(
2558                        link_id,
2559                        constants::CONTEXT_RESOURCE,
2560                        &data,
2561                    ));
2562                }
2563                ResourceAction::SendRequest(data) => {
2564                    let encrypted = self
2565                        .links
2566                        .get(link_id)
2567                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2568                    if let Some(encrypted) = encrypted {
2569                        result.extend(self.build_link_packet(
2570                            link_id,
2571                            constants::CONTEXT_RESOURCE_REQ,
2572                            &encrypted,
2573                        ));
2574                    }
2575                }
2576                ResourceAction::SendHmu(data) => {
2577                    let encrypted = self
2578                        .links
2579                        .get(link_id)
2580                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2581                    if let Some(encrypted) = encrypted {
2582                        result.extend(self.build_link_packet(
2583                            link_id,
2584                            constants::CONTEXT_RESOURCE_HMU,
2585                            &encrypted,
2586                        ));
2587                    }
2588                }
2589                ResourceAction::SendProof(data) => {
2590                    let encrypted = self
2591                        .links
2592                        .get(link_id)
2593                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2594                    if let Some(encrypted) = encrypted {
2595                        result.extend(self.build_link_packet(
2596                            link_id,
2597                            constants::CONTEXT_RESOURCE_PRF,
2598                            &encrypted,
2599                        ));
2600                    }
2601                }
2602                ResourceAction::SendCancelInitiator(data) => {
2603                    let encrypted = self
2604                        .links
2605                        .get(link_id)
2606                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2607                    if let Some(encrypted) = encrypted {
2608                        result.extend(self.build_link_packet(
2609                            link_id,
2610                            constants::CONTEXT_RESOURCE_ICL,
2611                            &encrypted,
2612                        ));
2613                    }
2614                }
2615                ResourceAction::SendCancelReceiver(data) => {
2616                    let encrypted = self
2617                        .links
2618                        .get(link_id)
2619                        .and_then(|link| link.engine.encrypt(&data, rng).ok());
2620                    if let Some(encrypted) = encrypted {
2621                        result.extend(self.build_link_packet(
2622                            link_id,
2623                            constants::CONTEXT_RESOURCE_RCL,
2624                            &encrypted,
2625                        ));
2626                    }
2627                }
2628                ResourceAction::DataReceived { data, metadata } => {
2629                    result.push(LinkManagerAction::ResourceReceived {
2630                        link_id: *link_id,
2631                        data,
2632                        metadata,
2633                    });
2634                }
2635                ResourceAction::Completed => {
2636                    result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2637                }
2638                ResourceAction::Failed(e) => {
2639                    result.push(LinkManagerAction::ResourceFailed {
2640                        link_id: *link_id,
2641                        error: format!("{}", e),
2642                    });
2643                }
2644                ResourceAction::TeardownLink => {
2645                    let teardown_actions = match self.links.get_mut(link_id) {
2646                        Some(link) => link.engine.handle_teardown(),
2647                        None => Vec::new(),
2648                    };
2649                    result.extend(self.process_link_actions(link_id, &teardown_actions));
2650                }
2651                ResourceAction::ProgressUpdate { received, total } => {
2652                    result.push(LinkManagerAction::ResourceProgress {
2653                        link_id: *link_id,
2654                        received,
2655                        total,
2656                    });
2657                }
2658            }
2659        }
2660        result
2661    }
2662
2663    /// Build a link DATA packet with a given context and data.
2664    fn build_link_packet(
2665        &self,
2666        link_id: &LinkId,
2667        context: u8,
2668        data: &[u8],
2669    ) -> Vec<LinkManagerAction> {
2670        let flags = PacketFlags {
2671            header_type: constants::HEADER_1,
2672            context_flag: constants::FLAG_UNSET,
2673            transport_type: constants::TRANSPORT_BROADCAST,
2674            destination_type: constants::DESTINATION_LINK,
2675            packet_type: constants::PACKET_TYPE_DATA,
2676        };
2677        let mut actions = Vec::new();
2678        let max_mtu = self
2679            .links
2680            .get(link_id)
2681            .map(|l| l.engine.mtu() as usize)
2682            .unwrap_or(constants::MTU);
2683        if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
2684            flags, 0, link_id, None, context, data, max_mtu,
2685        ) {
2686            actions.push(LinkManagerAction::SendPacket {
2687                raw,
2688                dest_type: constants::DESTINATION_LINK,
2689                attached_interface: None,
2690            });
2691        }
2692        actions
2693    }
2694
2695    /// Start sending a resource on a link.
2696    pub fn send_resource(
2697        &mut self,
2698        link_id: &LinkId,
2699        data: &[u8],
2700        metadata: Option<&[u8]>,
2701        rng: &mut dyn Rng,
2702    ) -> Vec<LinkManagerAction> {
2703        self.send_resource_with_auto_compress(link_id, data, metadata, true, rng)
2704    }
2705
2706    /// Start sending a resource on a link, controlling automatic compression.
2707    pub fn send_resource_with_auto_compress(
2708        &mut self,
2709        link_id: &LinkId,
2710        data: &[u8],
2711        metadata: Option<&[u8]>,
2712        auto_compress: bool,
2713        rng: &mut dyn Rng,
2714    ) -> Vec<LinkManagerAction> {
2715        let link = match self.links.get_mut(link_id) {
2716            Some(l) => l,
2717            None => return Vec::new(),
2718        };
2719
2720        if link.engine.state() != LinkState::Active {
2721            return Vec::new();
2722        }
2723
2724        let now = time::now();
2725
2726        let senders = match Self::build_resource_senders(
2727            link,
2728            data,
2729            metadata,
2730            auto_compress,
2731            false, // is_response
2732            None,  // request_id
2733            rng,
2734            now,
2735        ) {
2736            Ok(s) => s,
2737            Err(e) => {
2738                log::debug!("Failed to create ResourceSender: {}", e);
2739                return Vec::new();
2740            }
2741        };
2742
2743        let adv_actions = Self::start_resource_senders(link, senders, now);
2744
2745        let _ = link;
2746        self.process_resource_actions(link_id, adv_actions, rng)
2747    }
2748
2749    /// Set the resource acceptance strategy for a link.
2750    pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2751        if let Some(link) = self.links.get_mut(link_id) {
2752            link.resource_strategy = strategy;
2753        }
2754    }
2755
2756    /// Flush the channel TX ring for a link, clearing outstanding messages.
2757    /// Called after holepunch completion where signaling messages are fire-and-forget.
2758    pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2759        if let Some(link) = self.links.get_mut(link_id) {
2760            if let Some(ref mut channel) = link.channel {
2761                channel.flush_tx();
2762            }
2763        }
2764    }
2765
2766    /// Send a channel message on a link.
2767    pub fn send_channel_message(
2768        &mut self,
2769        link_id: &LinkId,
2770        msgtype: u16,
2771        payload: &[u8],
2772        rng: &mut dyn Rng,
2773    ) -> Result<Vec<LinkManagerAction>, String> {
2774        let link = match self.links.get_mut(link_id) {
2775            Some(l) => l,
2776            None => return Err("unknown link".to_string()),
2777        };
2778
2779        let channel = match link.channel {
2780            Some(ref mut ch) => ch,
2781            None => return Err("link has no active channel".to_string()),
2782        };
2783
2784        let link_mdu = link.engine.mdu();
2785        let now = time::now();
2786        let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2787            Ok(a) => {
2788                link.channel_send_ok += 1;
2789                a
2790            }
2791            Err(e) => {
2792                log::debug!("Channel send failed: {:?}", e);
2793                match e {
2794                    rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2795                    rns_core::channel::ChannelError::MessageTooBig => {
2796                        link.channel_send_too_big += 1;
2797                    }
2798                    rns_core::channel::ChannelError::InvalidEnvelope => {
2799                        link.channel_send_other_error += 1;
2800                    }
2801                }
2802                return Err(e.to_string());
2803            }
2804        };
2805
2806        let _ = link;
2807        Ok(self.process_channel_actions(link_id, chan_actions, rng))
2808    }
2809
2810    /// Periodic tick: check keepalive, stale, timeouts for all links.
2811    pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2812        let now = time::now();
2813        let mut all_actions = Vec::new();
2814
2815        // Collect link_ids to avoid borrow issues
2816        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2817
2818        for link_id in &link_ids {
2819            let link = match self.links.get_mut(link_id) {
2820                Some(l) => l,
2821                None => continue,
2822            };
2823
2824            // Tick the engine
2825            let tick_actions = link.engine.tick(now);
2826            all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2827
2828            // Check if keepalive is needed
2829            let link = match self.links.get_mut(link_id) {
2830                Some(l) => l,
2831                None => continue,
2832            };
2833            if link.engine.needs_keepalive(now) {
2834                // Send keepalive packet (empty data with CONTEXT_KEEPALIVE)
2835                let flags = PacketFlags {
2836                    header_type: constants::HEADER_1,
2837                    context_flag: constants::FLAG_UNSET,
2838                    transport_type: constants::TRANSPORT_BROADCAST,
2839                    destination_type: constants::DESTINATION_LINK,
2840                    packet_type: constants::PACKET_TYPE_DATA,
2841                };
2842                if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
2843                    flags,
2844                    0,
2845                    link_id,
2846                    None,
2847                    constants::CONTEXT_KEEPALIVE,
2848                    &[],
2849                ) {
2850                    all_actions.push(LinkManagerAction::SendPacket {
2851                        raw,
2852                        dest_type: constants::DESTINATION_LINK,
2853                        attached_interface: None,
2854                    });
2855                    link.engine.record_outbound(now, true);
2856                }
2857            }
2858
2859            if let Some(channel) = link.channel.as_mut() {
2860                let chan_actions = channel.tick(now);
2861                let _ = channel;
2862                let _ = link;
2863                all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2864            }
2865        }
2866
2867        // Tick resource senders and receivers
2868        for link_id in &link_ids {
2869            let link = match self.links.get_mut(link_id) {
2870                Some(l) => l,
2871                None => continue,
2872            };
2873
2874            // Tick outgoing resources (senders)
2875            let mut sender_actions = Vec::new();
2876            for sender in &mut link.outgoing_resources {
2877                sender_actions.extend(sender.tick(now));
2878            }
2879
2880            // Tick incoming resources (receivers)
2881            let mut receiver_actions = Vec::new();
2882            for receiver in &mut link.incoming_resources {
2883                let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2884                    link.engine.decrypt(ciphertext).map_err(|_| ())
2885                };
2886                receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2887            }
2888
2889            // Clean up completed/failed resources
2890            link.outgoing_resources
2891                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2892            link.incoming_resources
2893                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2894            let active_split_hashes: Vec<[u8; 32]> = link
2895                .outgoing_resources
2896                .iter()
2897                .filter(|s| s.flags.split)
2898                .map(|s| s.original_hash)
2899                .collect();
2900            link.outgoing_splits.retain(|original_hash, split| {
2901                split.completed_segments < split.total_segments
2902                    && active_split_hashes.contains(original_hash)
2903            });
2904
2905            let _ = link;
2906            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2907            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2908        }
2909
2910        // Clean up closed links
2911        let closed: Vec<LinkId> = self
2912            .links
2913            .iter()
2914            .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2915            .map(|(id, _)| *id)
2916            .collect();
2917        for id in closed {
2918            self.links.remove(&id);
2919            all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2920        }
2921
2922        all_actions
2923    }
2924
2925    /// Check if a destination hash is a known link_id managed by this manager.
2926    pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2927        self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2928    }
2929
2930    /// Get the state of a link.
2931    pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2932        self.links.get(link_id).map(|l| l.engine.state())
2933    }
2934
2935    /// Get the RTT of a link.
2936    pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2937        self.links.get(link_id).and_then(|l| l.engine.rtt())
2938    }
2939
2940    /// Update the RTT of a link (e.g., after path redirect to a direct connection).
2941    pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2942        if let Some(link) = self.links.get_mut(link_id) {
2943            link.engine.set_rtt(rtt);
2944        }
2945    }
2946
2947    /// Reset the inbound timer for a link (e.g., after path redirect).
2948    pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2949        if let Some(link) = self.links.get_mut(link_id) {
2950            link.engine.record_inbound(time::now());
2951        }
2952    }
2953
2954    /// Update the MTU of a link (e.g., after path redirect to a different interface).
2955    pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2956        if let Some(link) = self.links.get_mut(link_id) {
2957            link.engine.set_mtu(mtu);
2958        }
2959    }
2960
2961    /// Get the number of active links.
2962    pub fn link_count(&self) -> usize {
2963        self.links.len()
2964    }
2965
2966    /// Get the number of active resource transfers across all links.
2967    pub fn resource_transfer_count(&self) -> usize {
2968        self.links
2969            .values()
2970            .map(|managed| {
2971                managed
2972                    .incoming_resources
2973                    .iter()
2974                    .filter(|resource| !resource.flags.split)
2975                    .count()
2976                    + managed.incoming_splits.len()
2977                    + managed
2978                        .outgoing_resources
2979                        .iter()
2980                        .filter(|resource| !resource.flags.split)
2981                        .count()
2982                    + managed.outgoing_splits.len()
2983            })
2984            .sum()
2985    }
2986
2987    /// Cancel all active resource transfers and return the generated actions.
2988    pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2989        let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2990        let mut all_actions = Vec::new();
2991
2992        for link_id in &link_ids {
2993            let link = match self.links.get_mut(link_id) {
2994                Some(l) => l,
2995                None => continue,
2996            };
2997
2998            let mut sender_actions = Vec::new();
2999            for sender in &mut link.outgoing_resources {
3000                sender_actions.extend(sender.cancel());
3001            }
3002
3003            let mut receiver_actions = Vec::new();
3004            for receiver in &mut link.incoming_resources {
3005                receiver_actions.extend(receiver.reject());
3006            }
3007
3008            link.outgoing_resources
3009                .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
3010            link.incoming_resources
3011                .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
3012            link.outgoing_splits.clear();
3013            link.incoming_splits.clear();
3014
3015            let _ = link;
3016            all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
3017            all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
3018        }
3019
3020        all_actions
3021    }
3022
3023    /// Get information about all active links.
3024    pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
3025        self.links
3026            .iter()
3027            .map(|(link_id, managed)| {
3028                let state = match managed.engine.state() {
3029                    LinkState::Pending => "pending",
3030                    LinkState::Handshake => "handshake",
3031                    LinkState::Active => "active",
3032                    LinkState::Stale => "stale",
3033                    LinkState::Closed => "closed",
3034                };
3035                crate::event::LinkInfoEntry {
3036                    link_id: *link_id,
3037                    state: state.to_string(),
3038                    is_initiator: managed.engine.is_initiator(),
3039                    dest_hash: managed.dest_hash,
3040                    remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
3041                    rtt: managed.engine.rtt(),
3042                    channel_window: managed.channel.as_ref().map(|c| c.window()),
3043                    channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
3044                    pending_channel_packets: managed.pending_channel_packets.len(),
3045                    channel_send_ok: managed.channel_send_ok,
3046                    channel_send_not_ready: managed.channel_send_not_ready,
3047                    channel_send_too_big: managed.channel_send_too_big,
3048                    channel_send_other_error: managed.channel_send_other_error,
3049                    channel_messages_received: managed.channel_messages_received,
3050                    channel_proofs_sent: managed.channel_proofs_sent,
3051                    channel_proofs_received: managed.channel_proofs_received,
3052                }
3053            })
3054            .collect()
3055    }
3056
3057    /// Get information about all active resource transfers.
3058    pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
3059        let mut entries = Vec::new();
3060        for (link_id, managed) in &self.links {
3061            let resource_sdu = Self::resource_sdu_for_link(managed);
3062            for split in managed.incoming_splits.values() {
3063                let (received, total) = Self::incoming_split_progress(split, resource_sdu);
3064                entries.push(crate::event::ResourceInfoEntry {
3065                    link_id: *link_id,
3066                    direction: "incoming".to_string(),
3067                    total_parts: total,
3068                    transferred_parts: received,
3069                    complete: received >= total && total > 0,
3070                });
3071            }
3072            for recv in &managed.incoming_resources {
3073                if recv.flags.split {
3074                    continue;
3075                }
3076                let (received, total) = recv.progress();
3077                entries.push(crate::event::ResourceInfoEntry {
3078                    link_id: *link_id,
3079                    direction: "incoming".to_string(),
3080                    total_parts: total,
3081                    transferred_parts: received,
3082                    complete: received >= total && total > 0,
3083                });
3084            }
3085            for split in managed.outgoing_splits.values() {
3086                let (sent, total) = Self::outgoing_split_progress(split, resource_sdu);
3087                entries.push(crate::event::ResourceInfoEntry {
3088                    link_id: *link_id,
3089                    direction: "outgoing".to_string(),
3090                    total_parts: total,
3091                    transferred_parts: sent,
3092                    complete: sent >= total && total > 0,
3093                });
3094            }
3095            for send in &managed.outgoing_resources {
3096                if send.flags.split {
3097                    continue;
3098                }
3099                let total = send.total_parts();
3100                let sent = send.sent_parts;
3101                entries.push(crate::event::ResourceInfoEntry {
3102                    link_id: *link_id,
3103                    direction: "outgoing".to_string(),
3104                    total_parts: total,
3105                    transferred_parts: sent,
3106                    complete: sent >= total && total > 0,
3107                });
3108            }
3109        }
3110        entries
3111    }
3112
3113    /// Convert LinkActions to LinkManagerActions.
3114    fn process_link_actions(
3115        &self,
3116        link_id: &LinkId,
3117        actions: &[LinkAction],
3118    ) -> Vec<LinkManagerAction> {
3119        let mut result = Vec::new();
3120        for action in actions {
3121            match action {
3122                LinkAction::StateChanged {
3123                    new_state, reason, ..
3124                } => match new_state {
3125                    LinkState::Closed => {
3126                        result.push(LinkManagerAction::LinkClosed {
3127                            link_id: *link_id,
3128                            reason: *reason,
3129                        });
3130                    }
3131                    _ => {}
3132                },
3133                LinkAction::LinkEstablished {
3134                    rtt, is_initiator, ..
3135                } => {
3136                    let dest_hash = self
3137                        .links
3138                        .get(link_id)
3139                        .map(|l| l.dest_hash)
3140                        .unwrap_or([0u8; 16]);
3141                    result.push(LinkManagerAction::LinkEstablished {
3142                        link_id: *link_id,
3143                        dest_hash,
3144                        rtt: *rtt,
3145                        is_initiator: *is_initiator,
3146                    });
3147                }
3148                LinkAction::RemoteIdentified {
3149                    identity_hash,
3150                    public_key,
3151                    ..
3152                } => {
3153                    result.push(LinkManagerAction::RemoteIdentified {
3154                        link_id: *link_id,
3155                        identity_hash: *identity_hash,
3156                        public_key: *public_key,
3157                    });
3158                }
3159                LinkAction::DataReceived { .. } => {
3160                    // Data delivery is handled at a higher level
3161                }
3162            }
3163        }
3164        result
3165    }
3166
3167    /// Convert ChannelActions to LinkManagerActions.
3168    fn process_channel_actions(
3169        &mut self,
3170        link_id: &LinkId,
3171        actions: Vec<rns_core::channel::ChannelAction>,
3172        rng: &mut dyn Rng,
3173    ) -> Vec<LinkManagerAction> {
3174        let mut result = Vec::new();
3175        for action in actions {
3176            match action {
3177                rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
3178                    // Encrypt and send as CHANNEL context
3179                    let encrypted = match self.links.get(link_id) {
3180                        Some(link) => match link.engine.encrypt(&raw, rng) {
3181                            Ok(encrypted) => encrypted,
3182                            Err(_) => continue,
3183                        },
3184                        None => continue,
3185                    };
3186                    let flags = PacketFlags {
3187                        header_type: constants::HEADER_1,
3188                        context_flag: constants::FLAG_UNSET,
3189                        transport_type: constants::TRANSPORT_BROADCAST,
3190                        destination_type: constants::DESTINATION_LINK,
3191                        packet_type: constants::PACKET_TYPE_DATA,
3192                    };
3193                    if let Ok((raw_bytes, packet_hash)) = RawPacket::pack_raw_with_hash(
3194                        flags,
3195                        0,
3196                        link_id,
3197                        None,
3198                        constants::CONTEXT_CHANNEL,
3199                        &encrypted,
3200                    ) {
3201                        if let Some(link_mut) = self.links.get_mut(link_id) {
3202                            link_mut
3203                                .pending_channel_packets
3204                                .insert(packet_hash, sequence);
3205                        }
3206                        result.push(LinkManagerAction::SendPacket {
3207                            raw: raw_bytes,
3208                            dest_type: constants::DESTINATION_LINK,
3209                            attached_interface: None,
3210                        });
3211                    }
3212                }
3213                rns_core::channel::ChannelAction::MessageReceived {
3214                    msgtype, payload, ..
3215                } => {
3216                    result.push(LinkManagerAction::ChannelMessageReceived {
3217                        link_id: *link_id,
3218                        msgtype,
3219                        payload,
3220                    });
3221                }
3222                rns_core::channel::ChannelAction::TeardownLink => {
3223                    result.push(LinkManagerAction::LinkClosed {
3224                        link_id: *link_id,
3225                        reason: Some(TeardownReason::Timeout),
3226                    });
3227                }
3228            }
3229        }
3230        result
3231    }
3232}
3233
3234impl Default for LinkManager {
3235    fn default() -> Self {
3236        Self::new()
3237    }
3238}
3239
3240/// Compute a path hash from a path string.
3241/// Uses truncated SHA-256 (first 16 bytes).
3242fn compute_path_hash(path: &str) -> [u8; 16] {
3243    let full = rns_core::hash::full_hash(path.as_bytes());
3244    let mut result = [0u8; 16];
3245    result.copy_from_slice(&full[..16]);
3246    result
3247}
3248
3249#[cfg(test)]
3250mod tests {
3251    use super::*;
3252    use rns_crypto::identity::Identity;
3253    use rns_crypto::{FixedRng, OsRng};
3254
3255    fn make_rng(seed: u8) -> FixedRng {
3256        FixedRng::new(&[seed; 128])
3257    }
3258
3259    fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
3260        let sig_prv = Ed25519PrivateKey::generate(rng);
3261        let sig_pub_bytes = sig_prv.public_key().public_bytes();
3262        (sig_prv, sig_pub_bytes)
3263    }
3264
3265    #[test]
3266    fn test_register_link_destination() {
3267        let mut mgr = LinkManager::new();
3268        let mut rng = make_rng(0x01);
3269        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3270        let dest_hash = [0xDD; 16];
3271
3272        mgr.register_link_destination(
3273            dest_hash,
3274            sig_prv,
3275            sig_pub_bytes,
3276            ResourceStrategy::AcceptNone,
3277        );
3278        assert!(mgr.is_link_destination(&dest_hash));
3279
3280        mgr.deregister_link_destination(&dest_hash);
3281        assert!(!mgr.is_link_destination(&dest_hash));
3282    }
3283
3284    #[test]
3285    fn test_create_link() {
3286        let mut mgr = LinkManager::new();
3287        let mut rng = OsRng;
3288        let dest_hash = [0xDD; 16];
3289
3290        let sig_pub_bytes = [0xAA; 32]; // dummy sig pub for test
3291        let (link_id, actions) = mgr.create_link(
3292            &dest_hash,
3293            &sig_pub_bytes,
3294            1,
3295            constants::MTU as u32,
3296            &mut rng,
3297        );
3298        assert_ne!(link_id, [0u8; 16]);
3299        // Should have RegisterLinkDest + SendPacket
3300        assert_eq!(actions.len(), 2);
3301        assert!(matches!(
3302            actions[0],
3303            LinkManagerAction::RegisterLinkDest { .. }
3304        ));
3305        assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
3306
3307        // Link should be in Pending state
3308        assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
3309    }
3310
3311    #[test]
3312    fn test_full_handshake_via_manager() {
3313        let mut rng = OsRng;
3314        let dest_hash = [0xDD; 16];
3315
3316        // Setup responder
3317        let mut responder_mgr = LinkManager::new();
3318        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3319        responder_mgr.register_link_destination(
3320            dest_hash,
3321            sig_prv,
3322            sig_pub_bytes,
3323            ResourceStrategy::AcceptNone,
3324        );
3325
3326        // Setup initiator
3327        let mut initiator_mgr = LinkManager::new();
3328
3329        // Step 1: Initiator creates link (needs dest signing pub key for LRPROOF verification)
3330        let (link_id, init_actions) = initiator_mgr.create_link(
3331            &dest_hash,
3332            &sig_pub_bytes,
3333            1,
3334            constants::MTU as u32,
3335            &mut rng,
3336        );
3337        assert_eq!(init_actions.len(), 2);
3338
3339        // Extract the LINKREQUEST packet raw bytes
3340        let linkrequest_raw = match &init_actions[1] {
3341            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3342            _ => panic!("Expected SendPacket"),
3343        };
3344
3345        // Parse to get packet_hash and dest_hash
3346        let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
3347
3348        // Step 2: Responder handles LINKREQUEST
3349        let resp_actions = responder_mgr.handle_local_delivery(
3350            lr_packet.destination_hash,
3351            &linkrequest_raw,
3352            lr_packet.packet_hash,
3353            rns_core::transport::types::InterfaceId(0),
3354            &mut rng,
3355        );
3356        // Should have RegisterLinkDest + SendPacket(LRPROOF)
3357        assert!(resp_actions.len() >= 2);
3358        assert!(matches!(
3359            resp_actions[0],
3360            LinkManagerAction::RegisterLinkDest { .. }
3361        ));
3362
3363        // Extract LRPROOF packet
3364        let lrproof_raw = match &resp_actions[1] {
3365            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3366            _ => panic!("Expected SendPacket for LRPROOF"),
3367        };
3368
3369        // Step 3: Initiator handles LRPROOF
3370        let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
3371        let init_actions2 = initiator_mgr.handle_local_delivery(
3372            lrproof_packet.destination_hash,
3373            &lrproof_raw,
3374            lrproof_packet.packet_hash,
3375            rns_core::transport::types::InterfaceId(0),
3376            &mut rng,
3377        );
3378
3379        // Should have LinkEstablished + SendPacket(LRRTT)
3380        let has_established = init_actions2
3381            .iter()
3382            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3383        assert!(has_established, "Initiator should emit LinkEstablished");
3384
3385        // Extract LRRTT
3386        let lrrtt_raw = init_actions2
3387            .iter()
3388            .find_map(|a| match a {
3389                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3390                _ => None,
3391            })
3392            .expect("Should have LRRTT SendPacket");
3393
3394        // Step 4: Responder handles LRRTT
3395        let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
3396        let resp_link_id = lrrtt_packet.destination_hash;
3397        let resp_actions2 = responder_mgr.handle_local_delivery(
3398            resp_link_id,
3399            &lrrtt_raw,
3400            lrrtt_packet.packet_hash,
3401            rns_core::transport::types::InterfaceId(0),
3402            &mut rng,
3403        );
3404
3405        let has_established = resp_actions2
3406            .iter()
3407            .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3408        assert!(has_established, "Responder should emit LinkEstablished");
3409
3410        // Both sides should be Active
3411        assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
3412        assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
3413
3414        // Both should have RTT
3415        assert!(initiator_mgr.link_rtt(&link_id).is_some());
3416        assert!(responder_mgr.link_rtt(&link_id).is_some());
3417    }
3418
3419    #[test]
3420    fn test_encrypted_data_exchange() {
3421        let mut rng = OsRng;
3422        let dest_hash = [0xDD; 16];
3423        let mut resp_mgr = LinkManager::new();
3424        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3425        resp_mgr.register_link_destination(
3426            dest_hash,
3427            sig_prv,
3428            sig_pub_bytes,
3429            ResourceStrategy::AcceptNone,
3430        );
3431        let mut init_mgr = LinkManager::new();
3432
3433        // Handshake
3434        let (link_id, init_actions) = init_mgr.create_link(
3435            &dest_hash,
3436            &sig_pub_bytes,
3437            1,
3438            constants::MTU as u32,
3439            &mut rng,
3440        );
3441        let lr_raw = extract_send_packet(&init_actions);
3442        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3443        let resp_actions = resp_mgr.handle_local_delivery(
3444            lr_pkt.destination_hash,
3445            &lr_raw,
3446            lr_pkt.packet_hash,
3447            rns_core::transport::types::InterfaceId(0),
3448            &mut rng,
3449        );
3450        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3451        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3452        let init_actions2 = init_mgr.handle_local_delivery(
3453            lrproof_pkt.destination_hash,
3454            &lrproof_raw,
3455            lrproof_pkt.packet_hash,
3456            rns_core::transport::types::InterfaceId(0),
3457            &mut rng,
3458        );
3459        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3460        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3461        resp_mgr.handle_local_delivery(
3462            lrrtt_pkt.destination_hash,
3463            &lrrtt_raw,
3464            lrrtt_pkt.packet_hash,
3465            rns_core::transport::types::InterfaceId(0),
3466            &mut rng,
3467        );
3468
3469        // Send data from initiator to responder
3470        let actions =
3471            init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
3472        assert_eq!(actions.len(), 1);
3473        assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
3474    }
3475
3476    #[test]
3477    fn test_request_response() {
3478        let mut rng = OsRng;
3479        let dest_hash = [0xDD; 16];
3480        let mut resp_mgr = LinkManager::new();
3481        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3482        resp_mgr.register_link_destination(
3483            dest_hash,
3484            sig_prv,
3485            sig_pub_bytes,
3486            ResourceStrategy::AcceptNone,
3487        );
3488
3489        // Register a request handler
3490        resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
3491            Some(b"OK".to_vec())
3492        });
3493
3494        let mut init_mgr = LinkManager::new();
3495
3496        // Complete handshake
3497        let (link_id, init_actions) = init_mgr.create_link(
3498            &dest_hash,
3499            &sig_pub_bytes,
3500            1,
3501            constants::MTU as u32,
3502            &mut rng,
3503        );
3504        let lr_raw = extract_send_packet(&init_actions);
3505        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3506        let resp_actions = resp_mgr.handle_local_delivery(
3507            lr_pkt.destination_hash,
3508            &lr_raw,
3509            lr_pkt.packet_hash,
3510            rns_core::transport::types::InterfaceId(0),
3511            &mut rng,
3512        );
3513        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3514        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3515        let init_actions2 = init_mgr.handle_local_delivery(
3516            lrproof_pkt.destination_hash,
3517            &lrproof_raw,
3518            lrproof_pkt.packet_hash,
3519            rns_core::transport::types::InterfaceId(0),
3520            &mut rng,
3521        );
3522        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3523        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3524        resp_mgr.handle_local_delivery(
3525            lrrtt_pkt.destination_hash,
3526            &lrrtt_raw,
3527            lrrtt_pkt.packet_hash,
3528            rns_core::transport::types::InterfaceId(0),
3529            &mut rng,
3530        );
3531
3532        // Send request from initiator
3533        let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
3534        assert_eq!(req_actions.len(), 1);
3535
3536        // Deliver request to responder
3537        let req_raw = extract_send_packet_from(&req_actions);
3538        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3539        let resp_actions = resp_mgr.handle_local_delivery(
3540            req_pkt.destination_hash,
3541            &req_raw,
3542            req_pkt.packet_hash,
3543            rns_core::transport::types::InterfaceId(0),
3544            &mut rng,
3545        );
3546
3547        // Should have a response SendPacket
3548        let has_response = resp_actions
3549            .iter()
3550            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3551        assert!(has_response, "Handler should produce a response packet");
3552    }
3553
3554    #[test]
3555    fn test_send_request_wraps_invalid_msgpack_data_as_bin() {
3556        use std::sync::{Arc, Mutex};
3557
3558        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3559        let mut rng = OsRng;
3560
3561        let invalid = vec![0xC1];
3562        let expected = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid.clone()));
3563        let captured = Arc::new(Mutex::new(None::<Vec<u8>>));
3564        let captured_for_handler = Arc::clone(&captured);
3565
3566        resp_mgr.register_request_handler("/bin", None, move |_link_id, _path, data, _remote| {
3567            *captured_for_handler.lock().unwrap() = Some(data.to_vec());
3568            Some(rns_core::msgpack::pack(&rns_core::msgpack::Value::Bool(
3569                true,
3570            )))
3571        });
3572
3573        let req_actions = init_mgr.send_request(&link_id, "/bin", &invalid, &mut rng);
3574        let req_raw = extract_send_packet_from(&req_actions);
3575        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3576        let resp_actions = resp_mgr.handle_local_delivery(
3577            req_pkt.destination_hash,
3578            &req_raw,
3579            req_pkt.packet_hash,
3580            rns_core::transport::types::InterfaceId(0),
3581            &mut rng,
3582        );
3583
3584        assert!(
3585            resp_actions
3586                .iter()
3587                .any(|a| matches!(a, LinkManagerAction::SendPacket { .. })),
3588            "handler should still produce a response"
3589        );
3590        assert_eq!(*captured.lock().unwrap(), Some(expected));
3591    }
3592
3593    #[test]
3594    fn test_invalid_response_bytes_are_returned_as_msgpack_bin() {
3595        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3596        let mut rng = OsRng;
3597        let invalid_response = vec![0xC1];
3598        let expected =
3599            rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid_response.clone()));
3600
3601        resp_mgr.register_request_handler("/invalid-response", None, {
3602            let invalid_response = invalid_response.clone();
3603            move |_link_id, _path, _data, _remote| Some(invalid_response.clone())
3604        });
3605
3606        let req_actions = init_mgr.send_request(&link_id, "/invalid-response", b"\xc0", &mut rng);
3607        let req_raw = extract_send_packet_from(&req_actions);
3608        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3609        let resp_actions = resp_mgr.handle_local_delivery(
3610            req_pkt.destination_hash,
3611            &req_raw,
3612            req_pkt.packet_hash,
3613            rns_core::transport::types::InterfaceId(0),
3614            &mut rng,
3615        );
3616
3617        let resp_raw = extract_any_send_packet(&resp_actions);
3618        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3619        let init_actions = init_mgr.handle_local_delivery(
3620            resp_pkt.destination_hash,
3621            &resp_raw,
3622            resp_pkt.packet_hash,
3623            rns_core::transport::types::InterfaceId(0),
3624            &mut rng,
3625        );
3626
3627        let response_data = init_actions
3628            .iter()
3629            .find_map(|action| match action {
3630                LinkManagerAction::ResponseReceived { data, .. } => Some(data.clone()),
3631                _ => None,
3632            })
3633            .expect("initiator should receive a response");
3634        assert_eq!(response_data, expected);
3635    }
3636
3637    #[test]
3638    fn test_request_acl_deny_unidentified() {
3639        let mut rng = OsRng;
3640        let dest_hash = [0xDD; 16];
3641        let mut resp_mgr = LinkManager::new();
3642        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3643        resp_mgr.register_link_destination(
3644            dest_hash,
3645            sig_prv,
3646            sig_pub_bytes,
3647            ResourceStrategy::AcceptNone,
3648        );
3649
3650        // Register handler with ACL (only allow specific identity)
3651        resp_mgr.register_request_handler(
3652            "/restricted",
3653            Some(vec![[0xAA; 16]]),
3654            |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
3655        );
3656
3657        let mut init_mgr = LinkManager::new();
3658
3659        // Complete handshake (without identification)
3660        let (link_id, init_actions) = init_mgr.create_link(
3661            &dest_hash,
3662            &sig_pub_bytes,
3663            1,
3664            constants::MTU as u32,
3665            &mut rng,
3666        );
3667        let lr_raw = extract_send_packet(&init_actions);
3668        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3669        let resp_actions = resp_mgr.handle_local_delivery(
3670            lr_pkt.destination_hash,
3671            &lr_raw,
3672            lr_pkt.packet_hash,
3673            rns_core::transport::types::InterfaceId(0),
3674            &mut rng,
3675        );
3676        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3677        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3678        let init_actions2 = init_mgr.handle_local_delivery(
3679            lrproof_pkt.destination_hash,
3680            &lrproof_raw,
3681            lrproof_pkt.packet_hash,
3682            rns_core::transport::types::InterfaceId(0),
3683            &mut rng,
3684        );
3685        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3686        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3687        resp_mgr.handle_local_delivery(
3688            lrrtt_pkt.destination_hash,
3689            &lrrtt_raw,
3690            lrrtt_pkt.packet_hash,
3691            rns_core::transport::types::InterfaceId(0),
3692            &mut rng,
3693        );
3694
3695        // Send request without identifying first
3696        let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
3697        let req_raw = extract_send_packet_from(&req_actions);
3698        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3699        let resp_actions = resp_mgr.handle_local_delivery(
3700            req_pkt.destination_hash,
3701            &req_raw,
3702            req_pkt.packet_hash,
3703            rns_core::transport::types::InterfaceId(0),
3704            &mut rng,
3705        );
3706
3707        // Should be denied — no response packet
3708        let has_response = resp_actions
3709            .iter()
3710            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3711        assert!(!has_response, "Unidentified peer should be denied");
3712    }
3713
3714    #[test]
3715    fn test_teardown_link() {
3716        let mut rng = OsRng;
3717        let dest_hash = [0xDD; 16];
3718        let mut mgr = LinkManager::new();
3719
3720        let dummy_sig = [0xAA; 32];
3721        let (link_id, _) =
3722            mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3723        assert_eq!(mgr.link_count(), 1);
3724
3725        let actions = mgr.teardown_link(&link_id);
3726        let has_close = actions
3727            .iter()
3728            .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3729        assert!(has_close);
3730
3731        // After tick, closed links should be cleaned up
3732        let tick_actions = mgr.tick(&mut rng);
3733        let has_deregister = tick_actions
3734            .iter()
3735            .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3736        assert!(has_deregister);
3737        assert_eq!(mgr.link_count(), 0);
3738    }
3739
3740    #[test]
3741    fn test_identify_on_link() {
3742        let mut rng = OsRng;
3743        let dest_hash = [0xDD; 16];
3744        let mut resp_mgr = LinkManager::new();
3745        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3746        resp_mgr.register_link_destination(
3747            dest_hash,
3748            sig_prv,
3749            sig_pub_bytes,
3750            ResourceStrategy::AcceptNone,
3751        );
3752        let mut init_mgr = LinkManager::new();
3753
3754        // Complete handshake
3755        let (link_id, init_actions) = init_mgr.create_link(
3756            &dest_hash,
3757            &sig_pub_bytes,
3758            1,
3759            constants::MTU as u32,
3760            &mut rng,
3761        );
3762        let lr_raw = extract_send_packet(&init_actions);
3763        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3764        let resp_actions = resp_mgr.handle_local_delivery(
3765            lr_pkt.destination_hash,
3766            &lr_raw,
3767            lr_pkt.packet_hash,
3768            rns_core::transport::types::InterfaceId(0),
3769            &mut rng,
3770        );
3771        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3772        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3773        let init_actions2 = init_mgr.handle_local_delivery(
3774            lrproof_pkt.destination_hash,
3775            &lrproof_raw,
3776            lrproof_pkt.packet_hash,
3777            rns_core::transport::types::InterfaceId(0),
3778            &mut rng,
3779        );
3780        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3781        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3782        resp_mgr.handle_local_delivery(
3783            lrrtt_pkt.destination_hash,
3784            &lrrtt_raw,
3785            lrrtt_pkt.packet_hash,
3786            rns_core::transport::types::InterfaceId(0),
3787            &mut rng,
3788        );
3789
3790        // Identify initiator to responder
3791        let identity = Identity::new(&mut rng);
3792        let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3793        assert_eq!(id_actions.len(), 1);
3794
3795        // Deliver identify to responder
3796        let id_raw = extract_send_packet_from(&id_actions);
3797        let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3798        let resp_actions = resp_mgr.handle_local_delivery(
3799            id_pkt.destination_hash,
3800            &id_raw,
3801            id_pkt.packet_hash,
3802            rns_core::transport::types::InterfaceId(0),
3803            &mut rng,
3804        );
3805
3806        let has_identified = resp_actions
3807            .iter()
3808            .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3809        assert!(has_identified, "Responder should emit RemoteIdentified");
3810    }
3811
3812    #[test]
3813    fn test_path_hash_computation() {
3814        let h1 = compute_path_hash("/status");
3815        let h2 = compute_path_hash("/path");
3816        assert_ne!(h1, h2);
3817
3818        // Deterministic
3819        assert_eq!(h1, compute_path_hash("/status"));
3820    }
3821
3822    #[test]
3823    fn test_link_count() {
3824        let mut mgr = LinkManager::new();
3825        let mut rng = OsRng;
3826
3827        assert_eq!(mgr.link_count(), 0);
3828
3829        let dummy_sig = [0xAA; 32];
3830        mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3831        assert_eq!(mgr.link_count(), 1);
3832
3833        mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3834        assert_eq!(mgr.link_count(), 2);
3835    }
3836
3837    // --- Test helpers ---
3838
3839    fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3840        extract_send_packet_at(actions, actions.len() - 1)
3841    }
3842
3843    fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3844        match &actions[idx] {
3845            LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3846            other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3847        }
3848    }
3849
3850    fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3851        actions
3852            .iter()
3853            .find_map(|a| match a {
3854                LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3855                _ => None,
3856            })
3857            .expect("Expected at least one SendPacket action")
3858    }
3859
3860    fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3861        extract_any_send_packet(actions)
3862    }
3863
3864    /// Set up two linked managers with an active link.
3865    /// Returns (initiator_mgr, responder_mgr, link_id).
3866    fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3867        let mut rng = OsRng;
3868        let dest_hash = [0xDD; 16];
3869        let mut resp_mgr = LinkManager::new();
3870        let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3871        resp_mgr.register_link_destination(
3872            dest_hash,
3873            sig_prv,
3874            sig_pub_bytes,
3875            ResourceStrategy::AcceptNone,
3876        );
3877        let mut init_mgr = LinkManager::new();
3878
3879        let (link_id, init_actions) = init_mgr.create_link(
3880            &dest_hash,
3881            &sig_pub_bytes,
3882            1,
3883            constants::MTU as u32,
3884            &mut rng,
3885        );
3886        let lr_raw = extract_send_packet(&init_actions);
3887        let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3888        let resp_actions = resp_mgr.handle_local_delivery(
3889            lr_pkt.destination_hash,
3890            &lr_raw,
3891            lr_pkt.packet_hash,
3892            rns_core::transport::types::InterfaceId(0),
3893            &mut rng,
3894        );
3895        let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3896        let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3897        let init_actions2 = init_mgr.handle_local_delivery(
3898            lrproof_pkt.destination_hash,
3899            &lrproof_raw,
3900            lrproof_pkt.packet_hash,
3901            rns_core::transport::types::InterfaceId(0),
3902            &mut rng,
3903        );
3904        let lrrtt_raw = extract_any_send_packet(&init_actions2);
3905        let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3906        resp_mgr.handle_local_delivery(
3907            lrrtt_pkt.destination_hash,
3908            &lrrtt_raw,
3909            lrrtt_pkt.packet_hash,
3910            rns_core::transport::types::InterfaceId(0),
3911            &mut rng,
3912        );
3913
3914        assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3915        assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3916
3917        (init_mgr, resp_mgr, link_id)
3918    }
3919
3920    // ====================================================================
3921    // Phase 8a: Resource wiring tests
3922    // ====================================================================
3923
3924    #[test]
3925    fn test_resource_strategy_default() {
3926        let mut mgr = LinkManager::new();
3927        let mut rng = OsRng;
3928        let dummy_sig = [0xAA; 32];
3929        let (link_id, _) =
3930            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3931
3932        // Default strategy is AcceptNone
3933        let link = mgr.links.get(&link_id).unwrap();
3934        assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3935    }
3936
3937    #[test]
3938    fn test_set_resource_strategy() {
3939        let mut mgr = LinkManager::new();
3940        let mut rng = OsRng;
3941        let dummy_sig = [0xAA; 32];
3942        let (link_id, _) =
3943            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3944
3945        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3946        assert_eq!(
3947            mgr.links.get(&link_id).unwrap().resource_strategy,
3948            ResourceStrategy::AcceptAll
3949        );
3950
3951        mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3952        assert_eq!(
3953            mgr.links.get(&link_id).unwrap().resource_strategy,
3954            ResourceStrategy::AcceptApp
3955        );
3956    }
3957
3958    #[test]
3959    fn test_send_resource_on_active_link() {
3960        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3961        let mut rng = OsRng;
3962
3963        // Send resource data
3964        let data = vec![0xAB; 100]; // small enough for a single part
3965        let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3966
3967        // Should produce at least a SendPacket (advertisement)
3968        let has_send = actions
3969            .iter()
3970            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3971        assert!(
3972            has_send,
3973            "send_resource should emit advertisement SendPacket"
3974        );
3975    }
3976
3977    fn first_resource_advertisement(
3978        mgr: &LinkManager,
3979        link_id: &LinkId,
3980        actions: &[LinkManagerAction],
3981    ) -> rns_core::resource::ResourceAdvertisement {
3982        let adv_raw = actions
3983            .iter()
3984            .find_map(|action| match action {
3985                LinkManagerAction::SendPacket { raw, .. } => {
3986                    let pkt = RawPacket::unpack(raw).ok()?;
3987                    (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw)
3988                }
3989                _ => None,
3990            })
3991            .expect("sender should emit a resource advertisement");
3992        let adv_pkt = RawPacket::unpack(adv_raw).unwrap();
3993        let plaintext = mgr
3994            .links
3995            .get(link_id)
3996            .unwrap()
3997            .engine
3998            .decrypt(&adv_pkt.data)
3999            .unwrap();
4000        rns_core::resource::ResourceAdvertisement::unpack(&plaintext).unwrap()
4001    }
4002
4003    fn deterministic_bytes(len: usize) -> Vec<u8> {
4004        let mut state = 0x1234_5678u32;
4005        (0..len)
4006            .map(|_| {
4007                state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4008                (state >> 16) as u8
4009            })
4010            .collect()
4011    }
4012
4013    fn drive_link_manager_packets(
4014        init_mgr: &mut LinkManager,
4015        resp_mgr: &mut LinkManager,
4016        initial_actions: Vec<LinkManagerAction>,
4017        initial_source: char,
4018        rng: &mut dyn Rng,
4019        max_rounds: usize,
4020    ) -> (
4021        Option<Vec<u8>>,
4022        bool,
4023        Vec<(char, usize, usize)>,
4024        Vec<(char, String)>,
4025        usize,
4026    ) {
4027        let mut pending: Vec<(char, LinkManagerAction)> = initial_actions
4028            .into_iter()
4029            .map(|a| (initial_source, a))
4030            .collect();
4031        let mut rounds = 0;
4032        let mut received_data = None;
4033        let mut sender_completed = false;
4034        let mut progress = Vec::new();
4035        let mut failures = Vec::new();
4036
4037        while !pending.is_empty() && rounds < max_rounds {
4038            rounds += 1;
4039            let mut next = Vec::new();
4040            for (source, action) in pending.drain(..) {
4041                let LinkManagerAction::SendPacket { raw, .. } = action else {
4042                    continue;
4043                };
4044                let pkt = RawPacket::unpack(&raw).unwrap();
4045                let target_actions = if source == 'i' {
4046                    resp_mgr.handle_local_delivery(
4047                        pkt.destination_hash,
4048                        &raw,
4049                        pkt.packet_hash,
4050                        rns_core::transport::types::InterfaceId(0),
4051                        rng,
4052                    )
4053                } else {
4054                    init_mgr.handle_local_delivery(
4055                        pkt.destination_hash,
4056                        &raw,
4057                        pkt.packet_hash,
4058                        rns_core::transport::types::InterfaceId(0),
4059                        rng,
4060                    )
4061                };
4062                let target_source = if source == 'i' { 'r' } else { 'i' };
4063                for action in &target_actions {
4064                    match action {
4065                        LinkManagerAction::ResourceReceived { data, .. } => {
4066                            received_data = Some(data.clone());
4067                        }
4068                        LinkManagerAction::ResourceCompleted { .. } => {
4069                            sender_completed = true;
4070                        }
4071                        LinkManagerAction::ResourceProgress {
4072                            received, total, ..
4073                        } => {
4074                            progress.push((target_source, *received, *total));
4075                        }
4076                        LinkManagerAction::ResourceFailed { error, .. } => {
4077                            failures.push((target_source, error.clone()));
4078                        }
4079                        _ => {}
4080                    }
4081                }
4082                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4083            }
4084            pending = next;
4085        }
4086
4087        (received_data, sender_completed, progress, failures, rounds)
4088    }
4089
4090    #[test]
4091    fn test_send_resource_auto_compress_option_controls_adv_flag() {
4092        let data = vec![0x41; 2048];
4093
4094        let (mut compressed_mgr, _resp_mgr, link_id) = setup_active_link();
4095        let mut rng = OsRng;
4096        let actions =
4097            compressed_mgr.send_resource_with_auto_compress(&link_id, &data, None, true, &mut rng);
4098        let adv = first_resource_advertisement(&compressed_mgr, &link_id, &actions);
4099        assert!(
4100            adv.flags.compressed,
4101            "compressible resource should compress"
4102        );
4103
4104        let (mut plain_mgr, _resp_mgr, link_id) = setup_active_link();
4105        let mut rng = OsRng;
4106        let actions =
4107            plain_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4108        let adv = first_resource_advertisement(&plain_mgr, &link_id, &actions);
4109        assert!(
4110            !adv.flags.compressed,
4111            "auto_compress=false should keep resource uncompressed"
4112        );
4113    }
4114
4115    #[test]
4116    fn test_send_resource_on_inactive_link() {
4117        let mut mgr = LinkManager::new();
4118        let mut rng = OsRng;
4119        let dummy_sig = [0xAA; 32];
4120        let (link_id, _) =
4121            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
4122
4123        // Link is Pending, not Active
4124        let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
4125        assert!(actions.is_empty(), "Cannot send resource on inactive link");
4126    }
4127
4128    #[test]
4129    fn test_send_resource_without_session_key_uses_encrypt_fallback_path() {
4130        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4131        let mut rng = OsRng;
4132        init_mgr
4133            .links
4134            .get_mut(&link_id)
4135            .unwrap()
4136            .engine
4137            .clear_session_for_testing();
4138
4139        let actions = init_mgr.send_resource(&link_id, b"data", None, &mut rng);
4140
4141        assert!(
4142            actions.is_empty(),
4143            "without a session key, no advertisement should be emitted"
4144        );
4145        assert_eq!(
4146            init_mgr
4147                .links
4148                .get(&link_id)
4149                .map(|managed| managed.outgoing_resources.len()),
4150            Some(1)
4151        );
4152    }
4153
4154    #[test]
4155    fn test_resource_adv_rejected_by_accept_none() {
4156        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4157        let mut rng = OsRng;
4158
4159        // Responder uses default AcceptNone strategy
4160        // Send resource from initiator
4161        let data = vec![0xCD; 100];
4162        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4163
4164        // Deliver advertisement to responder
4165        for action in &adv_actions {
4166            if let LinkManagerAction::SendPacket { raw, .. } = action {
4167                let pkt = RawPacket::unpack(raw).unwrap();
4168                let resp_actions = resp_mgr.handle_local_delivery(
4169                    pkt.destination_hash,
4170                    raw,
4171                    pkt.packet_hash,
4172                    rns_core::transport::types::InterfaceId(0),
4173                    &mut rng,
4174                );
4175                // AcceptNone: should not produce ResourceReceived, may produce SendPacket (RCL)
4176                let has_resource_received = resp_actions
4177                    .iter()
4178                    .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4179                assert!(
4180                    !has_resource_received,
4181                    "AcceptNone should not accept resource"
4182                );
4183            }
4184        }
4185    }
4186
4187    #[test]
4188    fn test_resource_adv_accepted_by_accept_all() {
4189        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4190        let mut rng = OsRng;
4191
4192        // Set responder to AcceptAll
4193        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4194
4195        // Send resource from initiator
4196        let data = vec![0xCD; 100];
4197        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4198
4199        // Deliver advertisement to responder
4200        for action in &adv_actions {
4201            if let LinkManagerAction::SendPacket { raw, .. } = action {
4202                let pkt = RawPacket::unpack(raw).unwrap();
4203                let resp_actions = resp_mgr.handle_local_delivery(
4204                    pkt.destination_hash,
4205                    raw,
4206                    pkt.packet_hash,
4207                    rns_core::transport::types::InterfaceId(0),
4208                    &mut rng,
4209                );
4210                // AcceptAll: should accept and produce a SendPacket (request for parts)
4211                let has_send = resp_actions
4212                    .iter()
4213                    .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4214                assert!(has_send, "AcceptAll should accept and request parts");
4215            }
4216        }
4217    }
4218
4219    #[test]
4220    fn test_resource_accept_app_query() {
4221        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4222        let mut rng = OsRng;
4223
4224        // Set responder to AcceptApp
4225        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4226
4227        // Send resource from initiator
4228        let data = vec![0xCD; 100];
4229        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4230
4231        // Deliver advertisement to responder
4232        let mut got_query = false;
4233        for action in &adv_actions {
4234            if let LinkManagerAction::SendPacket { raw, .. } = action {
4235                let pkt = RawPacket::unpack(raw).unwrap();
4236                let resp_actions = resp_mgr.handle_local_delivery(
4237                    pkt.destination_hash,
4238                    raw,
4239                    pkt.packet_hash,
4240                    rns_core::transport::types::InterfaceId(0),
4241                    &mut rng,
4242                );
4243                for a in &resp_actions {
4244                    if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
4245                        got_query = true;
4246                    }
4247                }
4248            }
4249        }
4250        assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
4251    }
4252
4253    #[test]
4254    fn test_resource_accept_app_accept() {
4255        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4256        let mut rng = OsRng;
4257
4258        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4259
4260        let data = vec![0xCD; 100];
4261        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4262
4263        for action in &adv_actions {
4264            if let LinkManagerAction::SendPacket { raw, .. } = action {
4265                let pkt = RawPacket::unpack(raw).unwrap();
4266                let resp_actions = resp_mgr.handle_local_delivery(
4267                    pkt.destination_hash,
4268                    raw,
4269                    pkt.packet_hash,
4270                    rns_core::transport::types::InterfaceId(0),
4271                    &mut rng,
4272                );
4273                for a in &resp_actions {
4274                    if let LinkManagerAction::ResourceAcceptQuery {
4275                        link_id: lid,
4276                        resource_hash,
4277                        ..
4278                    } = a
4279                    {
4280                        // Accept the resource
4281                        let accept_actions =
4282                            resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
4283                        // Should produce a SendPacket (request for parts)
4284                        let has_send = accept_actions
4285                            .iter()
4286                            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4287                        assert!(
4288                            has_send,
4289                            "Accepting resource should produce request for parts"
4290                        );
4291                    }
4292                }
4293            }
4294        }
4295    }
4296
4297    #[test]
4298    fn test_resource_accept_app_reject() {
4299        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4300        let mut rng = OsRng;
4301
4302        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4303
4304        let data = vec![0xCD; 100];
4305        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4306
4307        for action in &adv_actions {
4308            if let LinkManagerAction::SendPacket { raw, .. } = action {
4309                let pkt = RawPacket::unpack(raw).unwrap();
4310                let resp_actions = resp_mgr.handle_local_delivery(
4311                    pkt.destination_hash,
4312                    raw,
4313                    pkt.packet_hash,
4314                    rns_core::transport::types::InterfaceId(0),
4315                    &mut rng,
4316                );
4317                for a in &resp_actions {
4318                    if let LinkManagerAction::ResourceAcceptQuery {
4319                        link_id: lid,
4320                        resource_hash,
4321                        ..
4322                    } = a
4323                    {
4324                        // Reject the resource
4325                        let reject_actions =
4326                            resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
4327                        // Rejecting should send a cancel and not request parts
4328                        // No ResourceReceived should appear
4329                        let has_resource_received = reject_actions
4330                            .iter()
4331                            .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4332                        assert!(!has_resource_received);
4333                    }
4334                }
4335            }
4336        }
4337    }
4338
4339    #[test]
4340    fn test_resource_full_transfer() {
4341        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4342        let mut rng = OsRng;
4343
4344        // Set responder to AcceptAll
4345        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4346
4347        // Small data (fits in single SDU)
4348        let original_data = b"Hello, Resource Transfer!".to_vec();
4349        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
4350
4351        // Drive the full transfer protocol between the two managers.
4352        // Tag each SendPacket with its source ('i' = initiator, 'r' = responder).
4353        let mut pending: Vec<(char, LinkManagerAction)> =
4354            adv_actions.into_iter().map(|a| ('i', a)).collect();
4355        let mut rounds = 0;
4356        let max_rounds = 50;
4357        let mut resource_received = false;
4358        let mut sender_completed = false;
4359
4360        while !pending.is_empty() && rounds < max_rounds {
4361            rounds += 1;
4362            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4363
4364            for (source, action) in pending.drain(..) {
4365                if let LinkManagerAction::SendPacket { raw, .. } = action {
4366                    let pkt = RawPacket::unpack(&raw).unwrap();
4367
4368                    // Deliver only to the OTHER side
4369                    let target_actions = if source == 'i' {
4370                        resp_mgr.handle_local_delivery(
4371                            pkt.destination_hash,
4372                            &raw,
4373                            pkt.packet_hash,
4374                            rns_core::transport::types::InterfaceId(0),
4375                            &mut rng,
4376                        )
4377                    } else {
4378                        init_mgr.handle_local_delivery(
4379                            pkt.destination_hash,
4380                            &raw,
4381                            pkt.packet_hash,
4382                            rns_core::transport::types::InterfaceId(0),
4383                            &mut rng,
4384                        )
4385                    };
4386
4387                    let target_source = if source == 'i' { 'r' } else { 'i' };
4388                    for a in &target_actions {
4389                        match a {
4390                            LinkManagerAction::ResourceReceived { data, .. } => {
4391                                assert_eq!(*data, original_data);
4392                                resource_received = true;
4393                            }
4394                            LinkManagerAction::ResourceCompleted { .. } => {
4395                                sender_completed = true;
4396                            }
4397                            _ => {}
4398                        }
4399                    }
4400                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4401                }
4402            }
4403            pending = next;
4404        }
4405
4406        assert!(
4407            resource_received,
4408            "Responder should receive resource data (rounds={})",
4409            rounds
4410        );
4411        assert!(
4412            sender_completed,
4413            "Sender should get completion proof (rounds={})",
4414            rounds
4415        );
4416    }
4417
4418    #[test]
4419    fn test_split_resource_advertisement_and_progress_entries() {
4420        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4421        let mut rng = OsRng;
4422        let data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4423
4424        let actions =
4425            init_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4426        let adv = first_resource_advertisement(&init_mgr, &link_id, &actions);
4427
4428        assert!(adv.flags.split);
4429        assert_eq!(adv.segment_index, 1);
4430        assert_eq!(adv.total_segments, 2);
4431        assert_eq!(adv.data_size, data.len() as u64);
4432
4433        let managed = init_mgr.links.get(&link_id).unwrap();
4434        assert_eq!(managed.outgoing_splits.len(), 1);
4435        assert_eq!(
4436            managed
4437                .outgoing_resources
4438                .iter()
4439                .filter(|sender| sender.flags.split)
4440                .count(),
4441            2
4442        );
4443
4444        let entries = init_mgr.resource_entries();
4445        assert_eq!(entries.len(), 1);
4446        assert_eq!(entries[0].direction, "outgoing");
4447        assert!(entries[0].total_parts > managed.outgoing_resources[0].total_parts());
4448        assert_eq!(entries[0].transferred_parts, 0);
4449    }
4450
4451    #[test]
4452    fn test_split_resource_full_transfer_and_monotonic_progress() {
4453        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4454        let mut rng = OsRng;
4455        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4456
4457        let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 2048);
4458        let initial_actions = init_mgr.send_resource_with_auto_compress(
4459            &link_id,
4460            &original_data,
4461            None,
4462            false,
4463            &mut rng,
4464        );
4465
4466        let (received_data, sender_completed, progress, failures, rounds) =
4467            drive_link_manager_packets(
4468                &mut init_mgr,
4469                &mut resp_mgr,
4470                initial_actions,
4471                'i',
4472                &mut rng,
4473                10_000,
4474            );
4475
4476        assert!(
4477            received_data.as_ref().is_some_and(|data| data == &original_data),
4478            "split transfer did not deliver payload in {rounds} rounds; sender_completed={sender_completed}; failures={failures:?}; last_progress={:?}; init_entries={:?}; resp_entries={:?}",
4479            progress.last(),
4480            init_mgr.resource_entries(),
4481            resp_mgr.resource_entries()
4482        );
4483        assert!(
4484            sender_completed,
4485            "sender did not complete in {rounds} rounds"
4486        );
4487        assert!(
4488            progress
4489                .iter()
4490                .any(|(_, received, total)| received == total),
4491            "expected final progress update"
4492        );
4493
4494        let mut init_last = 0;
4495        let mut resp_last = 0;
4496        for (side, received, total) in progress {
4497            assert!(received <= total);
4498            match side {
4499                'i' => {
4500                    assert!(
4501                        received >= init_last,
4502                        "initiator progress regressed from {init_last} to {received}"
4503                    );
4504                    init_last = received;
4505                }
4506                'r' => {
4507                    assert!(
4508                        received >= resp_last,
4509                        "responder progress regressed from {resp_last} to {received}"
4510                    );
4511                    resp_last = received;
4512                }
4513                _ => unreachable!(),
4514            }
4515        }
4516    }
4517
4518    #[test]
4519    fn test_split_resource_accept_app_queries_only_first_segment() {
4520        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4521        let mut rng = OsRng;
4522        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4523
4524        let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4525        let adv_actions = init_mgr.send_resource_with_auto_compress(
4526            &link_id,
4527            &original_data,
4528            None,
4529            false,
4530            &mut rng,
4531        );
4532        let adv_raw = extract_any_send_packet(&adv_actions);
4533        let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4534        let query_actions = resp_mgr.handle_local_delivery(
4535            adv_pkt.destination_hash,
4536            &adv_raw,
4537            adv_pkt.packet_hash,
4538            rns_core::transport::types::InterfaceId(0),
4539            &mut rng,
4540        );
4541
4542        let queries: Vec<_> = query_actions
4543            .iter()
4544            .filter_map(|action| match action {
4545                LinkManagerAction::ResourceAcceptQuery { resource_hash, .. } => {
4546                    Some(resource_hash.clone())
4547                }
4548                _ => None,
4549            })
4550            .collect();
4551        assert_eq!(queries.len(), 1);
4552
4553        let accept_actions = resp_mgr.accept_resource(&link_id, &queries[0], true, &mut rng);
4554        let (received_data, sender_completed, _progress, failures, rounds) =
4555            drive_link_manager_packets(
4556                &mut init_mgr,
4557                &mut resp_mgr,
4558                accept_actions,
4559                'r',
4560                &mut rng,
4561                10_000,
4562            );
4563
4564        assert!(
4565            failures.is_empty(),
4566            "split AcceptApp transfer failed: {failures:?}"
4567        );
4568        assert!(
4569            received_data
4570                .as_ref()
4571                .is_some_and(|data| data == &original_data),
4572            "split AcceptApp transfer did not deliver in {rounds} rounds"
4573        );
4574        assert!(sender_completed);
4575    }
4576
4577    #[test]
4578    fn test_resource_cancel_icl() {
4579        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4580        let mut rng = OsRng;
4581
4582        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4583
4584        // Use large data so transfer is multi-part
4585        let data = vec![0xAB; 2000];
4586        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4587
4588        // Deliver advertisement — responder accepts and sends request
4589        for action in &adv_actions {
4590            if let LinkManagerAction::SendPacket { raw, .. } = action {
4591                let pkt = RawPacket::unpack(raw).unwrap();
4592                resp_mgr.handle_local_delivery(
4593                    pkt.destination_hash,
4594                    raw,
4595                    pkt.packet_hash,
4596                    rns_core::transport::types::InterfaceId(0),
4597                    &mut rng,
4598                );
4599            }
4600        }
4601
4602        // Verify there are incoming resources on the responder
4603        assert!(!resp_mgr
4604            .links
4605            .get(&link_id)
4606            .unwrap()
4607            .incoming_resources
4608            .is_empty());
4609
4610        // Simulate ICL (cancel from initiator side) by calling handle_resource_icl
4611        let icl_actions = resp_mgr.handle_resource_icl(&link_id);
4612
4613        // Should have resource failed
4614        let has_failed = icl_actions
4615            .iter()
4616            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4617        assert!(has_failed, "ICL should produce ResourceFailed");
4618    }
4619
4620    #[test]
4621    fn test_resource_cancel_rcl() {
4622        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4623        let mut rng = OsRng;
4624
4625        // Create a resource sender
4626        let data = vec![0xAB; 2000];
4627        init_mgr.send_resource(&link_id, &data, None, &mut rng);
4628
4629        // Verify there are outgoing resources
4630        assert!(!init_mgr
4631            .links
4632            .get(&link_id)
4633            .unwrap()
4634            .outgoing_resources
4635            .is_empty());
4636
4637        // Simulate RCL (cancel from receiver side)
4638        let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
4639
4640        let has_failed = rcl_actions
4641            .iter()
4642            .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4643        assert!(has_failed, "RCL should produce ResourceFailed");
4644    }
4645
4646    #[test]
4647    fn test_cancel_all_resources_clears_active_transfers() {
4648        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4649        let mut rng = OsRng;
4650
4651        let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
4652        assert!(!actions.is_empty());
4653        assert_eq!(init_mgr.resource_transfer_count(), 1);
4654
4655        let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
4656
4657        assert_eq!(init_mgr.resource_transfer_count(), 0);
4658        assert!(cancel_actions
4659            .iter()
4660            .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
4661    }
4662
4663    #[test]
4664    fn test_resource_tick_cleans_up() {
4665        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4666        let mut rng = OsRng;
4667
4668        let data = vec![0xAB; 100];
4669        init_mgr.send_resource(&link_id, &data, None, &mut rng);
4670
4671        assert!(!init_mgr
4672            .links
4673            .get(&link_id)
4674            .unwrap()
4675            .outgoing_resources
4676            .is_empty());
4677
4678        // Cancel the sender to make it Complete
4679        init_mgr.handle_resource_rcl(&link_id);
4680
4681        // Tick should clean up completed resources
4682        init_mgr.tick(&mut rng);
4683
4684        assert!(
4685            init_mgr
4686                .links
4687                .get(&link_id)
4688                .unwrap()
4689                .outgoing_resources
4690                .is_empty(),
4691            "Tick should clean up completed/failed outgoing resources"
4692        );
4693    }
4694
4695    #[test]
4696    fn test_build_link_packet() {
4697        let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4698
4699        let actions =
4700            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
4701        assert_eq!(actions.len(), 1);
4702        if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
4703            let pkt = RawPacket::unpack(raw).unwrap();
4704            assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
4705            assert_eq!(*dest_type, constants::DESTINATION_LINK);
4706        } else {
4707            panic!("Expected SendPacket");
4708        }
4709    }
4710
4711    #[test]
4712    fn test_build_link_packet_returns_empty_when_mtu_too_small() {
4713        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4714        init_mgr.set_link_mtu(&link_id, 84);
4715
4716        let actions =
4717            init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, &[0xAA; 200]);
4718        assert!(actions.is_empty(), "oversized packet should not be built");
4719    }
4720
4721    #[test]
4722    fn test_process_resource_actions_encrypted_variants_drop_on_encrypt_failure() {
4723        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4724        let mut rng = OsRng;
4725        init_mgr
4726            .links
4727            .get_mut(&link_id)
4728            .unwrap()
4729            .engine
4730            .clear_session_for_testing();
4731
4732        let cases = vec![
4733            ResourceAction::SendAdvertisement(vec![1, 2, 3]),
4734            ResourceAction::SendRequest(vec![4, 5, 6]),
4735            ResourceAction::SendHmu(vec![7, 8, 9]),
4736            ResourceAction::SendProof(vec![10, 11, 12]),
4737            ResourceAction::SendCancelInitiator(vec![13, 14, 15]),
4738            ResourceAction::SendCancelReceiver(vec![16, 17, 18]),
4739        ];
4740
4741        for action in cases {
4742            let out = init_mgr.process_resource_actions(&link_id, vec![action], &mut rng);
4743            assert!(
4744                out.is_empty(),
4745                "encrypt failure should suppress packet emission"
4746            );
4747        }
4748    }
4749
4750    // ====================================================================
4751    // Phase 8b: Channel message & data callback tests
4752    // ====================================================================
4753
4754    #[test]
4755    fn test_channel_message_delivery() {
4756        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4757        let mut rng = OsRng;
4758
4759        // Send channel message from initiator
4760        let chan_actions = init_mgr
4761            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4762            .expect("active link channel send should succeed");
4763        assert!(!chan_actions.is_empty());
4764
4765        // Deliver to responder
4766        let mut got_channel_msg = false;
4767        for action in &chan_actions {
4768            if let LinkManagerAction::SendPacket { raw, .. } = action {
4769                let pkt = RawPacket::unpack(raw).unwrap();
4770                let resp_actions = resp_mgr.handle_local_delivery(
4771                    pkt.destination_hash,
4772                    raw,
4773                    pkt.packet_hash,
4774                    rns_core::transport::types::InterfaceId(0),
4775                    &mut rng,
4776                );
4777                for a in &resp_actions {
4778                    if let LinkManagerAction::ChannelMessageReceived {
4779                        msgtype, payload, ..
4780                    } = a
4781                    {
4782                        assert_eq!(*msgtype, 42);
4783                        assert_eq!(*payload, b"channel data");
4784                        got_channel_msg = true;
4785                    }
4786                }
4787            }
4788        }
4789        assert!(got_channel_msg, "Responder should receive channel message");
4790    }
4791
4792    #[test]
4793    fn test_channel_send_drops_packet_when_encrypt_fails() {
4794        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4795        let mut rng = OsRng;
4796        init_mgr
4797            .links
4798            .get_mut(&link_id)
4799            .unwrap()
4800            .engine
4801            .clear_session_for_testing();
4802
4803        let actions = init_mgr
4804            .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4805            .expect("channel should still accept the message locally");
4806
4807        assert!(
4808            actions.is_empty(),
4809            "encrypt failure should suppress channel packet"
4810        );
4811        assert!(
4812            init_mgr
4813                .links
4814                .get(&link_id)
4815                .unwrap()
4816                .pending_channel_packets
4817                .is_empty(),
4818            "failed packet encryption must not track a pending channel proof"
4819        );
4820    }
4821
4822    #[test]
4823    fn test_channel_proof_reopens_send_window() {
4824        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4825        let mut rng = OsRng;
4826
4827        init_mgr
4828            .send_channel_message(&link_id, 42, b"first", &mut rng)
4829            .expect("first send should succeed");
4830        init_mgr
4831            .send_channel_message(&link_id, 42, b"second", &mut rng)
4832            .expect("second send should succeed");
4833
4834        let err = init_mgr
4835            .send_channel_message(&link_id, 42, b"third", &mut rng)
4836            .expect_err("third send should hit the initial channel window");
4837        assert_eq!(err, "Channel is not ready to send");
4838
4839        let queued_packets = init_mgr
4840            .links
4841            .get(&link_id)
4842            .unwrap()
4843            .pending_channel_packets
4844            .clone();
4845        assert_eq!(queued_packets.len(), 2);
4846        for tracked_hash in queued_packets.keys().take(1) {
4847            let mut proof_data = Vec::with_capacity(96);
4848            proof_data.extend_from_slice(tracked_hash);
4849            proof_data.extend_from_slice(&[0x11; 64]);
4850            let flags = PacketFlags {
4851                header_type: constants::HEADER_1,
4852                context_flag: constants::FLAG_UNSET,
4853                transport_type: constants::TRANSPORT_BROADCAST,
4854                destination_type: constants::DESTINATION_LINK,
4855                packet_type: constants::PACKET_TYPE_PROOF,
4856            };
4857            let proof = RawPacket::pack(
4858                flags,
4859                0,
4860                &link_id,
4861                None,
4862                constants::CONTEXT_NONE,
4863                &proof_data,
4864            )
4865            .expect("proof packet should pack");
4866            let ack_actions = init_mgr.handle_local_delivery(
4867                link_id,
4868                &proof.raw,
4869                proof.packet_hash,
4870                rns_core::transport::types::InterfaceId(0),
4871                &mut rng,
4872            );
4873            assert!(
4874                ack_actions.is_empty(),
4875                "proof delivery should only update channel state"
4876            );
4877        }
4878
4879        init_mgr
4880            .send_channel_message(&link_id, 42, b"third", &mut rng)
4881            .expect("proof should free one channel slot");
4882    }
4883
4884    #[test]
4885    fn test_generic_link_data_delivery() {
4886        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
4887        let mut rng = OsRng;
4888
4889        // Send generic data with a custom context
4890        let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
4891        assert_eq!(actions.len(), 1);
4892
4893        // Deliver to responder
4894        let raw = extract_any_send_packet(&actions);
4895        let pkt = RawPacket::unpack(&raw).unwrap();
4896        let resp_actions = resp_mgr.handle_local_delivery(
4897            pkt.destination_hash,
4898            &raw,
4899            pkt.packet_hash,
4900            rns_core::transport::types::InterfaceId(0),
4901            &mut rng,
4902        );
4903
4904        let has_data = resp_actions
4905            .iter()
4906            .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
4907        assert!(
4908            has_data,
4909            "Responder should receive LinkDataReceived for unknown context"
4910        );
4911    }
4912
4913    #[test]
4914    fn test_invalid_encrypted_contexts_are_ignored() {
4915        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4916        let mut rng = OsRng;
4917        let contexts = [
4918            constants::CONTEXT_CHANNEL,
4919            constants::CONTEXT_REQUEST,
4920            constants::CONTEXT_RESPONSE,
4921            constants::CONTEXT_RESOURCE_ADV,
4922            constants::CONTEXT_RESOURCE_REQ,
4923            constants::CONTEXT_RESOURCE_HMU,
4924            constants::CONTEXT_RESOURCE_PRF,
4925            0x42,
4926        ];
4927
4928        for context in contexts {
4929            let flags = PacketFlags {
4930                header_type: constants::HEADER_1,
4931                context_flag: constants::FLAG_UNSET,
4932                transport_type: constants::TRANSPORT_BROADCAST,
4933                destination_type: constants::DESTINATION_LINK,
4934                packet_type: constants::PACKET_TYPE_DATA,
4935            };
4936            let pkt = RawPacket::pack(flags, 0, &link_id, None, context, b"invalid-ciphertext")
4937                .expect("test packet should pack");
4938            let actions = resp_mgr.handle_local_delivery(
4939                pkt.destination_hash,
4940                &pkt.raw,
4941                pkt.packet_hash,
4942                rns_core::transport::types::InterfaceId(0),
4943                &mut rng,
4944            );
4945            assert!(
4946                actions.is_empty(),
4947                "invalid ciphertext for context {context:#x} should be ignored"
4948            );
4949        }
4950    }
4951
4952    #[test]
4953    fn test_resource_part_without_matching_receiver_is_ignored() {
4954        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4955        let mut rng = OsRng;
4956        let flags = PacketFlags {
4957            header_type: constants::HEADER_1,
4958            context_flag: constants::FLAG_UNSET,
4959            transport_type: constants::TRANSPORT_BROADCAST,
4960            destination_type: constants::DESTINATION_LINK,
4961            packet_type: constants::PACKET_TYPE_DATA,
4962        };
4963        let pkt = RawPacket::pack(
4964            flags,
4965            0,
4966            &link_id,
4967            None,
4968            constants::CONTEXT_RESOURCE,
4969            b"orphan-part",
4970        )
4971        .expect("test packet should pack");
4972
4973        let actions = resp_mgr.handle_local_delivery(
4974            pkt.destination_hash,
4975            &pkt.raw,
4976            pkt.packet_hash,
4977            rns_core::transport::types::InterfaceId(0),
4978            &mut rng,
4979        );
4980
4981        assert!(actions.is_empty(), "orphan resource part should be ignored");
4982    }
4983
4984    #[test]
4985    fn test_response_delivery() {
4986        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4987        let mut rng = OsRng;
4988
4989        // Register handler on responder
4990        resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
4991            Some(data.to_vec())
4992        });
4993
4994        // Send request from initiator
4995        let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); // msgpack nil
4996        assert!(!req_actions.is_empty());
4997
4998        // Deliver request to responder — should produce response
4999        let req_raw = extract_any_send_packet(&req_actions);
5000        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5001        let resp_actions = resp_mgr.handle_local_delivery(
5002            req_pkt.destination_hash,
5003            &req_raw,
5004            req_pkt.packet_hash,
5005            rns_core::transport::types::InterfaceId(0),
5006            &mut rng,
5007        );
5008        let has_resp_send = resp_actions
5009            .iter()
5010            .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
5011        assert!(has_resp_send, "Handler should produce response");
5012
5013        // Deliver response back to initiator
5014        let resp_raw = extract_any_send_packet(&resp_actions);
5015        let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
5016        let init_actions = init_mgr.handle_local_delivery(
5017            resp_pkt.destination_hash,
5018            &resp_raw,
5019            resp_pkt.packet_hash,
5020            rns_core::transport::types::InterfaceId(0),
5021            &mut rng,
5022        );
5023
5024        let has_response_received = init_actions
5025            .iter()
5026            .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
5027        assert!(
5028            has_response_received,
5029            "Initiator should receive ResponseReceived"
5030        );
5031    }
5032
5033    #[test]
5034    fn test_large_response_uses_resource_fallback() {
5035        let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
5036        let mut rng = OsRng;
5037
5038        // Register handler on responder with payload that cannot fit a direct
5039        // CONTEXT_RESPONSE packet.
5040        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5041        resp_mgr.register_request_handler("/large", None, {
5042            let large_payload = large_payload.clone();
5043            move |_link_id, _path, _data, _remote| Some(large_payload.clone())
5044        });
5045
5046        // Send request from initiator.
5047        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5048        assert!(!req_actions.is_empty());
5049
5050        // Deliver request to responder and inspect responder outbound packets.
5051        let req_raw = extract_any_send_packet(&req_actions);
5052        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5053        let resp_actions = resp_mgr.handle_local_delivery(
5054            req_pkt.destination_hash,
5055            &req_raw,
5056            req_pkt.packet_hash,
5057            rns_core::transport::types::InterfaceId(0),
5058            &mut rng,
5059        );
5060
5061        let mut has_resource_adv = false;
5062        let mut has_direct_response = false;
5063        for action in &resp_actions {
5064            if let LinkManagerAction::SendPacket { raw, .. } = action {
5065                let pkt = RawPacket::unpack(raw).unwrap();
5066                if pkt.context == constants::CONTEXT_RESOURCE_ADV {
5067                    has_resource_adv = true;
5068                }
5069                if pkt.context == constants::CONTEXT_RESPONSE {
5070                    has_direct_response = true;
5071                }
5072            }
5073        }
5074
5075        assert!(
5076            has_resource_adv,
5077            "Large response should advertise a response resource"
5078        );
5079        assert!(
5080            !has_direct_response,
5081            "Large response should not use direct CONTEXT_RESPONSE packet"
5082        );
5083    }
5084
5085    #[test]
5086    fn test_send_management_response_without_session_key_uses_resource_fallback_path() {
5087        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5088        let mut rng = OsRng;
5089        init_mgr
5090            .links
5091            .get_mut(&link_id)
5092            .unwrap()
5093            .engine
5094            .clear_session_for_testing();
5095
5096        let large_response: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5097        let actions =
5098            init_mgr.send_management_response(&link_id, &[0x11; 16], &large_response, &mut rng);
5099
5100        assert!(
5101            actions.is_empty(),
5102            "without a session key, no response packets should be emitted"
5103        );
5104        assert_eq!(
5105            init_mgr
5106                .links
5107                .get(&link_id)
5108                .map(|managed| managed.outgoing_resources.len()),
5109            Some(1)
5110        );
5111    }
5112
5113    #[test]
5114    fn test_send_channel_message_on_no_channel() {
5115        let mut mgr = LinkManager::new();
5116        let mut rng = OsRng;
5117        let dummy_sig = [0xAA; 32];
5118        let (link_id, _) =
5119            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5120
5121        // Link is Pending (no channel), should return empty
5122        let err = mgr
5123            .send_channel_message(&link_id, 1, b"test", &mut rng)
5124            .expect_err("pending link should reject channel send");
5125        assert_eq!(err, "link has no active channel");
5126    }
5127
5128    #[test]
5129    fn test_send_on_link_requires_active() {
5130        let mut mgr = LinkManager::new();
5131        let mut rng = OsRng;
5132        let dummy_sig = [0xAA; 32];
5133        let (link_id, _) =
5134            mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5135
5136        let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
5137        assert!(actions.is_empty(), "Cannot send on pending link");
5138    }
5139
5140    #[test]
5141    fn test_send_on_link_unknown_link() {
5142        let mgr = LinkManager::new();
5143        let mut rng = OsRng;
5144
5145        let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
5146        assert!(actions.is_empty());
5147    }
5148
5149    #[test]
5150    fn test_resource_full_transfer_large() {
5151        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5152        let mut rng = OsRng;
5153
5154        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5155
5156        // Multi-part data (larger than a single SDU of 464 bytes)
5157        let original_data: Vec<u8> = (0..2000u32)
5158            .map(|i| {
5159                let pos = i as usize;
5160                (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
5161            })
5162            .collect();
5163
5164        let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
5165
5166        let mut pending: Vec<(char, LinkManagerAction)> =
5167            adv_actions.into_iter().map(|a| ('i', a)).collect();
5168        let mut rounds = 0;
5169        let max_rounds = 200;
5170        let mut resource_received = false;
5171        let mut sender_completed = false;
5172
5173        while !pending.is_empty() && rounds < max_rounds {
5174            rounds += 1;
5175            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5176
5177            for (source, action) in pending.drain(..) {
5178                if let LinkManagerAction::SendPacket { raw, .. } = action {
5179                    let pkt = match RawPacket::unpack(&raw) {
5180                        Ok(p) => p,
5181                        Err(_) => continue,
5182                    };
5183
5184                    let target_actions = if source == 'i' {
5185                        resp_mgr.handle_local_delivery(
5186                            pkt.destination_hash,
5187                            &raw,
5188                            pkt.packet_hash,
5189                            rns_core::transport::types::InterfaceId(0),
5190                            &mut rng,
5191                        )
5192                    } else {
5193                        init_mgr.handle_local_delivery(
5194                            pkt.destination_hash,
5195                            &raw,
5196                            pkt.packet_hash,
5197                            rns_core::transport::types::InterfaceId(0),
5198                            &mut rng,
5199                        )
5200                    };
5201
5202                    let target_source = if source == 'i' { 'r' } else { 'i' };
5203                    for a in &target_actions {
5204                        match a {
5205                            LinkManagerAction::ResourceReceived { data, .. } => {
5206                                assert_eq!(*data, original_data);
5207                                resource_received = true;
5208                            }
5209                            LinkManagerAction::ResourceCompleted { .. } => {
5210                                sender_completed = true;
5211                            }
5212                            _ => {}
5213                        }
5214                    }
5215                    next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5216                }
5217            }
5218            pending = next;
5219        }
5220
5221        assert!(
5222            resource_received,
5223            "Should receive large resource (rounds={})",
5224            rounds
5225        );
5226        assert!(
5227            sender_completed,
5228            "Sender should complete (rounds={})",
5229            rounds
5230        );
5231    }
5232
5233    #[test]
5234    fn test_resource_receiver_stores_original_advertisement_plaintext() {
5235        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5236        let mut rng = OsRng;
5237
5238        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5239
5240        let data = vec![0x41; 256];
5241        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5242
5243        let adv_raw = adv_actions
5244            .iter()
5245            .find_map(|action| match action {
5246                LinkManagerAction::SendPacket { raw, .. } => {
5247                    let pkt = RawPacket::unpack(raw).ok()?;
5248                    (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
5249                }
5250                _ => None,
5251            })
5252            .expect("sender should emit a resource advertisement");
5253
5254        let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
5255        let adv_plaintext = resp_mgr
5256            .links
5257            .get(&link_id)
5258            .unwrap()
5259            .engine
5260            .decrypt(&adv_pkt.data)
5261            .unwrap();
5262
5263        let _resp_actions = resp_mgr.handle_local_delivery(
5264            adv_pkt.destination_hash,
5265            &adv_raw,
5266            adv_pkt.packet_hash,
5267            rns_core::transport::types::InterfaceId(0),
5268            &mut rng,
5269        );
5270
5271        let receiver = resp_mgr
5272            .links
5273            .get(&link_id)
5274            .and_then(|managed| managed.incoming_resources.first())
5275            .expect("advertisement should create an incoming receiver");
5276        assert_eq!(receiver.advertisement_packet, adv_plaintext);
5277        assert_eq!(
5278            receiver.max_decompressed_size,
5279            constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
5280        );
5281    }
5282
5283    #[test]
5284    fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
5285        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5286        let mut rng = OsRng;
5287
5288        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5289
5290        let data = vec![b'A'; 4096];
5291        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5292
5293        let mut request_actions = Vec::new();
5294        for action in &adv_actions {
5295            let LinkManagerAction::SendPacket { raw, .. } = action else {
5296                continue;
5297            };
5298            let pkt = RawPacket::unpack(raw).unwrap();
5299            let actions = resp_mgr.handle_local_delivery(
5300                pkt.destination_hash,
5301                raw,
5302                pkt.packet_hash,
5303                rns_core::transport::types::InterfaceId(0),
5304                &mut rng,
5305            );
5306            request_actions.extend(actions);
5307        }
5308
5309        {
5310            let receiver = resp_mgr
5311                .links
5312                .get_mut(&link_id)
5313                .and_then(|managed| managed.incoming_resources.first_mut())
5314                .expect("receiver should exist after advertisement");
5315            assert!(receiver.flags.compressed, "test data should be compressed");
5316            receiver.max_decompressed_size = 64;
5317        }
5318
5319        let mut responder_actions = Vec::new();
5320        for action in request_actions {
5321            let LinkManagerAction::SendPacket { raw, .. } = action else {
5322                continue;
5323            };
5324            let pkt = RawPacket::unpack(&raw).unwrap();
5325            let actions = init_mgr.handle_local_delivery(
5326                pkt.destination_hash,
5327                &raw,
5328                pkt.packet_hash,
5329                rns_core::transport::types::InterfaceId(0),
5330                &mut rng,
5331            );
5332
5333            for action in actions {
5334                let LinkManagerAction::SendPacket { raw, .. } = &action else {
5335                    continue;
5336                };
5337                let pkt = RawPacket::unpack(raw).unwrap();
5338                let delivered = resp_mgr.handle_local_delivery(
5339                    pkt.destination_hash,
5340                    raw,
5341                    pkt.packet_hash,
5342                    rns_core::transport::types::InterfaceId(0),
5343                    &mut rng,
5344                );
5345                responder_actions.extend(delivered);
5346            }
5347        }
5348
5349        assert!(
5350            responder_actions.iter().any(|action| matches!(
5351                action,
5352                LinkManagerAction::ResourceFailed { error, .. }
5353                    if error == "Resource too large"
5354            )),
5355            "corrupt oversized resource should fail with TooLarge"
5356        );
5357        assert!(
5358            responder_actions.iter().any(|action| matches!(
5359                action,
5360                LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
5361            )),
5362            "corrupt oversized resource should tear down the link"
5363        );
5364        assert!(
5365            responder_actions.iter().any(|action| match action {
5366                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5367                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
5368                    .unwrap_or(false),
5369                _ => false,
5370            }),
5371            "corrupt oversized resource should send a receiver cancel/reject packet"
5372        );
5373        assert_eq!(
5374            resp_mgr
5375                .links
5376                .get(&link_id)
5377                .map(|managed| managed.engine.state()),
5378            Some(LinkState::Closed)
5379        );
5380    }
5381
5382    #[test]
5383    fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
5384        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5385        let mut rng = OsRng;
5386
5387        resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5388
5389        // Large and incompressible enough to require multiple hashmap segments
5390        // even with the live Bzip2Compressor in the LinkManager path.
5391        let mut state = 0x1234_5678u32;
5392        let data: Vec<u8> = (0..50000)
5393            .map(|_| {
5394                state = state.wrapping_mul(1664525).wrapping_add(1013904223);
5395                (state >> 16) as u8
5396            })
5397            .collect();
5398        let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5399        let mut pending: Vec<(char, LinkManagerAction)> =
5400            adv_actions.into_iter().map(|a| ('i', a)).collect();
5401
5402        let mut rounds = 0;
5403
5404        // Drive the real link-manager exchange until the receiver is genuinely
5405        // waiting for an HMU after exhausting the advertised hashmap segment.
5406        while rounds < 300 {
5407            rounds += 1;
5408            let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5409
5410            for (source, action) in pending.drain(..) {
5411                let LinkManagerAction::SendPacket { raw, .. } = action else {
5412                    continue;
5413                };
5414
5415                let pkt = match RawPacket::unpack(&raw) {
5416                    Ok(p) => p,
5417                    Err(_) => continue,
5418                };
5419
5420                let target_actions = if source == 'i' {
5421                    resp_mgr.handle_local_delivery(
5422                        pkt.destination_hash,
5423                        &raw,
5424                        pkt.packet_hash,
5425                        rns_core::transport::types::InterfaceId(0),
5426                        &mut rng,
5427                    )
5428                } else {
5429                    init_mgr.handle_local_delivery(
5430                        pkt.destination_hash,
5431                        &raw,
5432                        pkt.packet_hash,
5433                        rns_core::transport::types::InterfaceId(0),
5434                        &mut rng,
5435                    )
5436                };
5437
5438                let target_source = if source == 'i' { 'r' } else { 'i' };
5439                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5440            }
5441
5442            if resp_mgr
5443                .links
5444                .get(&link_id)
5445                .and_then(|managed| managed.incoming_resources.first())
5446                .is_some_and(|receiver| receiver.waiting_for_hmu)
5447            {
5448                break;
5449            }
5450
5451            pending = next;
5452        }
5453
5454        assert!(
5455            resp_mgr
5456                .links
5457                .get(&link_id)
5458                .and_then(|managed| managed.incoming_resources.first())
5459                .is_some_and(|receiver| receiver.waiting_for_hmu),
5460            "expected receiver to reach a live HMU wait state"
5461        );
5462
5463        // Prime the live receiver once so it computes the same EIFR it will use
5464        // for timeout decisions in this HMU-wait state.
5465        let prime_actions = {
5466            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5467            let receiver = managed.incoming_resources.first_mut().unwrap();
5468            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5469                managed.engine.decrypt(ciphertext).map_err(|_| ())
5470            };
5471            receiver.tick(
5472                receiver.last_activity + 0.0001,
5473                &decrypt_fn,
5474                &Bzip2Compressor,
5475            )
5476        };
5477        assert!(
5478            !prime_actions
5479                .iter()
5480                .any(|a| matches!(a, ResourceAction::SendRequest(_))),
5481            "fresh HMU wait state should not immediately emit a retry request"
5482        );
5483
5484        let (late_delta, retries_before) = {
5485            let managed = resp_mgr
5486                .links
5487                .get_mut(&link_id)
5488                .expect("receiver link should still exist");
5489            let receiver = managed
5490                .incoming_resources
5491                .first_mut()
5492                .expect("receiver should have an active incoming resource");
5493
5494            assert!(
5495                receiver.waiting_for_hmu,
5496                "receiver should be waiting for HMU"
5497            );
5498
5499            let eifr = receiver.eifr.unwrap_or_else(|| {
5500                (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
5501            });
5502            let expected_tof = if receiver.outstanding_parts > 0 {
5503                (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
5504            } else {
5505                (3.0 * constants::RESOURCE_SDU as f64) / eifr
5506            };
5507            let expected_hmu_wait =
5508                (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
5509            let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
5510                + constants::RESOURCE_RETRY_GRACE_TIME;
5511            (
5512                old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
5513                receiver.retries_left,
5514            )
5515        };
5516        {
5517            let managed = resp_mgr.links.get(&link_id).unwrap();
5518            let receiver = managed.incoming_resources.first().unwrap();
5519            assert_eq!(receiver.retries_left, retries_before);
5520            assert!(
5521                receiver.eifr.is_some(),
5522                "receiver tick should have populated EIFR"
5523            );
5524        }
5525
5526        let late_resource_actions = {
5527            let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5528            let receiver = managed.incoming_resources.first_mut().unwrap();
5529            let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5530                managed.engine.decrypt(ciphertext).map_err(|_| ())
5531            };
5532            receiver.tick(
5533                receiver.last_activity + late_delta,
5534                &decrypt_fn,
5535                &Bzip2Compressor,
5536            )
5537        };
5538        let late_actions =
5539            resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
5540        let retry_raw = late_actions
5541            .iter()
5542            .find_map(|a| match a {
5543                LinkManagerAction::SendPacket { raw, .. } => {
5544                    let pkt = RawPacket::unpack(raw).ok()?;
5545                    (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
5546                }
5547                _ => None,
5548            })
5549            .expect("receiver should emit a resource retry request after extended timeout");
5550
5551        {
5552            let managed = resp_mgr.links.get(&link_id).unwrap();
5553            let receiver = managed.incoming_resources.first().unwrap();
5554            assert_eq!(receiver.retries_left, retries_before - 1);
5555        }
5556
5557        let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
5558        let retry_plaintext = resp_mgr
5559            .links
5560            .get(&link_id)
5561            .unwrap()
5562            .engine
5563            .decrypt(&retry_pkt.data)
5564            .expect("retry request should decrypt");
5565        assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
5566
5567        // Deliver the retry request to the sender and verify it turns into a
5568        // real HMU packet in the live LinkManager flow.
5569        let retry_to_sender = init_mgr.handle_local_delivery(
5570            retry_pkt.destination_hash,
5571            &retry_raw,
5572            retry_pkt.packet_hash,
5573            rns_core::transport::types::InterfaceId(0),
5574            &mut rng,
5575        );
5576        assert!(
5577            retry_to_sender.iter().any(|a| match a {
5578                LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5579                    .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
5580                    .unwrap_or(false),
5581                _ => false,
5582            }),
5583            "sender should answer the exhausted retry request with a live HMU packet"
5584        );
5585    }
5586
5587    #[test]
5588    fn test_process_resource_actions_mapping() {
5589        let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5590        let mut rng = OsRng;
5591
5592        // Test that various ResourceActions map to correct LinkManagerActions
5593        let actions = vec![
5594            ResourceAction::DataReceived {
5595                data: vec![1, 2, 3],
5596                metadata: Some(vec![4, 5]),
5597            },
5598            ResourceAction::Completed,
5599            ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
5600            ResourceAction::ProgressUpdate {
5601                received: 10,
5602                total: 20,
5603            },
5604            ResourceAction::TeardownLink,
5605        ];
5606
5607        let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
5608
5609        assert!(matches!(
5610            result[0],
5611            LinkManagerAction::ResourceReceived { .. }
5612        ));
5613        assert!(matches!(
5614            result[1],
5615            LinkManagerAction::ResourceCompleted { .. }
5616        ));
5617        assert!(matches!(
5618            result[2],
5619            LinkManagerAction::ResourceFailed { .. }
5620        ));
5621        assert!(matches!(
5622            result[3],
5623            LinkManagerAction::ResourceProgress {
5624                received: 10,
5625                total: 20,
5626                ..
5627            }
5628        ));
5629        assert!(result
5630            .iter()
5631            .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
5632    }
5633
5634    #[test]
5635    fn test_link_state_empty() {
5636        let mgr = LinkManager::new();
5637        let fake_id = [0xAA; 16];
5638        assert!(mgr.link_state(&fake_id).is_none());
5639    }
5640
5641    #[test]
5642    fn test_large_response_resource_completes_as_response() {
5643        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5644        let mut rng = OsRng;
5645
5646        let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5647        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
5648        resp_mgr.register_request_handler("/large", None, {
5649            let response_value = response_value.clone();
5650            move |_link_id, _path, _data, _remote| Some(response_value.clone())
5651        });
5652
5653        let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5654        let req_raw = extract_any_send_packet(&req_actions);
5655        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5656        let request_id = req_pkt.get_truncated_hash();
5657        let resp_actions = resp_mgr.handle_local_delivery(
5658            req_pkt.destination_hash,
5659            &req_raw,
5660            req_pkt.packet_hash,
5661            rns_core::transport::types::InterfaceId(0),
5662            &mut rng,
5663        );
5664
5665        let mut pending: Vec<(char, LinkManagerAction)> =
5666            resp_actions.into_iter().map(|a| ('r', a)).collect();
5667        let mut rounds = 0;
5668        let mut received_response = None;
5669
5670        while !pending.is_empty() && rounds < 200 {
5671            rounds += 1;
5672            let mut next = Vec::new();
5673
5674            for (source, action) in pending.drain(..) {
5675                let LinkManagerAction::SendPacket { raw, .. } = action else {
5676                    continue;
5677                };
5678                let pkt = RawPacket::unpack(&raw).unwrap();
5679                let target_actions = if source == 'r' {
5680                    init_mgr.handle_local_delivery(
5681                        pkt.destination_hash,
5682                        &raw,
5683                        pkt.packet_hash,
5684                        rns_core::transport::types::InterfaceId(0),
5685                        &mut rng,
5686                    )
5687                } else {
5688                    resp_mgr.handle_local_delivery(
5689                        pkt.destination_hash,
5690                        &raw,
5691                        pkt.packet_hash,
5692                        rns_core::transport::types::InterfaceId(0),
5693                        &mut rng,
5694                    )
5695                };
5696
5697                let target_source = if source == 'r' { 'i' } else { 'r' };
5698                for target_action in &target_actions {
5699                    match target_action {
5700                        LinkManagerAction::ResponseReceived {
5701                            request_id: rid,
5702                            data,
5703                            ..
5704                        } => {
5705                            received_response = Some((*rid, data.clone()));
5706                        }
5707                        LinkManagerAction::ResourceReceived { .. } => {
5708                            panic!("response resources must complete as ResponseReceived")
5709                        }
5710                        LinkManagerAction::ResourceAcceptQuery { .. } => {
5711                            panic!("response resources must bypass application acceptance")
5712                        }
5713                        _ => {}
5714                    }
5715                }
5716                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5717            }
5718
5719            pending = next;
5720        }
5721
5722        let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
5723            panic!(
5724                "large response resource did not complete as ResponseReceived after {} rounds",
5725                rounds
5726            )
5727        });
5728        assert_eq!(received_request_id, request_id);
5729        assert_eq!(received_data, response_value);
5730    }
5731
5732    #[test]
5733    fn test_response_resource_preserves_metadata() {
5734        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5735        let mut rng = OsRng;
5736
5737        let payload = b"bundle-data".to_vec();
5738        let metadata = b"git-status-ok".to_vec();
5739        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5740        resp_mgr.register_request_handler_response("/fetch", None, {
5741            let response_value = response_value.clone();
5742            let metadata = metadata.clone();
5743            move |_link_id, _path, _data, _remote| {
5744                Some(RequestResponse::Resource {
5745                    data: response_value.clone(),
5746                    metadata: Some(metadata.clone()),
5747                    auto_compress: false,
5748                })
5749            }
5750        });
5751
5752        let req_actions = init_mgr.send_request(&link_id, "/fetch", b"\xc0", &mut rng);
5753        let req_raw = extract_any_send_packet(&req_actions);
5754        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5755        let request_id = req_pkt.get_truncated_hash();
5756        let resp_actions = resp_mgr.handle_local_delivery(
5757            req_pkt.destination_hash,
5758            &req_raw,
5759            req_pkt.packet_hash,
5760            rns_core::transport::types::InterfaceId(0),
5761            &mut rng,
5762        );
5763
5764        let mut pending: Vec<(char, LinkManagerAction)> =
5765            resp_actions.into_iter().map(|a| ('r', a)).collect();
5766        let mut received_response = None;
5767
5768        for _ in 0..200 {
5769            if pending.is_empty() || received_response.is_some() {
5770                break;
5771            }
5772
5773            let mut next = Vec::new();
5774            for (source, action) in pending.drain(..) {
5775                let LinkManagerAction::SendPacket { raw, .. } = action else {
5776                    continue;
5777                };
5778                let pkt = RawPacket::unpack(&raw).unwrap();
5779                let target_actions = if source == 'r' {
5780                    init_mgr.handle_local_delivery(
5781                        pkt.destination_hash,
5782                        &raw,
5783                        pkt.packet_hash,
5784                        rns_core::transport::types::InterfaceId(0),
5785                        &mut rng,
5786                    )
5787                } else {
5788                    resp_mgr.handle_local_delivery(
5789                        pkt.destination_hash,
5790                        &raw,
5791                        pkt.packet_hash,
5792                        rns_core::transport::types::InterfaceId(0),
5793                        &mut rng,
5794                    )
5795                };
5796
5797                let target_source = if source == 'r' { 'i' } else { 'r' };
5798                for target_action in &target_actions {
5799                    match target_action {
5800                        LinkManagerAction::ResponseReceived {
5801                            request_id: rid,
5802                            data,
5803                            metadata: response_metadata,
5804                            ..
5805                        } => {
5806                            received_response =
5807                                Some((*rid, data.clone(), response_metadata.clone()));
5808                        }
5809                        LinkManagerAction::ResourceReceived { .. } => {
5810                            panic!("response resources must complete as ResponseReceived")
5811                        }
5812                        _ => {}
5813                    }
5814                }
5815                next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5816            }
5817            pending = next;
5818        }
5819
5820        let (received_request_id, received_data, received_metadata) = received_response
5821            .expect("resource response with metadata should complete as ResponseReceived");
5822        assert_eq!(received_request_id, request_id);
5823        assert_eq!(received_data, response_value);
5824        assert_eq!(received_metadata, Some(metadata));
5825    }
5826
5827    #[test]
5828    fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
5829        let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5830        let mut rng = OsRng;
5831
5832        init_mgr.set_link_mtu(&link_id, 300);
5833        resp_mgr.set_link_mtu(&link_id, 300);
5834
5835        let payload = vec![0xAB; 350];
5836        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5837        resp_mgr.register_request_handler("/mtu", None, {
5838            let response_value = response_value.clone();
5839            move |_link_id, _path, _data, _remote| Some(response_value.clone())
5840        });
5841
5842        let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
5843        let req_raw = extract_any_send_packet(&req_actions);
5844        let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5845        let resp_actions = resp_mgr.handle_local_delivery(
5846            req_pkt.destination_hash,
5847            &req_raw,
5848            req_pkt.packet_hash,
5849            rns_core::transport::types::InterfaceId(0),
5850            &mut rng,
5851        );
5852
5853        let mut has_resource_adv = false;
5854        let mut direct_response_len = None;
5855        for action in &resp_actions {
5856            if let LinkManagerAction::SendPacket { raw, .. } = action {
5857                let pkt = RawPacket::unpack(raw).unwrap();
5858                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5859                if pkt.context == constants::CONTEXT_RESPONSE {
5860                    direct_response_len = Some(raw.len());
5861                }
5862            }
5863        }
5864
5865        assert!(
5866            has_resource_adv,
5867            "responses larger than the negotiated link MTU should use resource fallback"
5868        );
5869        assert!(
5870            direct_response_len.is_none(),
5871            "sent direct response of {} bytes on a 300 byte negotiated MTU",
5872            direct_response_len.unwrap_or_default()
5873        );
5874    }
5875
5876    #[test]
5877    fn test_large_management_response_uses_resource_fallback() {
5878        let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
5879        let mut rng = OsRng;
5880
5881        let payload = vec![0xBC; 5000];
5882        let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5883        let actions =
5884            resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
5885
5886        let mut has_resource_adv = false;
5887        let mut has_direct_response = false;
5888        for action in &actions {
5889            if let LinkManagerAction::SendPacket { raw, .. } = action {
5890                let pkt = RawPacket::unpack(raw).unwrap();
5891                has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5892                has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
5893            }
5894        }
5895
5896        assert!(
5897            has_resource_adv,
5898            "large management responses should advertise a response resource"
5899        );
5900        assert!(
5901            !has_direct_response,
5902            "large management responses should not use a direct CONTEXT_RESPONSE packet"
5903        );
5904    }
5905}