1use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26 AcceptNone,
28 AcceptAll,
30 AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35 fn default() -> Self {
36 ResourceStrategy::AcceptNone
37 }
38}
39
40struct ManagedLink {
42 engine: LinkEngine,
43 channel: Option<Channel>,
44 pending_channel_packets: HashMap<[u8; 32], Sequence>,
45 channel_send_ok: u64,
46 channel_send_not_ready: u64,
47 channel_send_too_big: u64,
48 channel_send_other_error: u64,
49 channel_messages_received: u64,
50 channel_proofs_sent: u64,
51 channel_proofs_received: u64,
52 dest_hash: [u8; 16],
54 remote_identity: Option<([u8; 16], [u8; 64])>,
56 dest_sig_pub_bytes: Option<[u8; 32]>,
58 incoming_resources: Vec<ResourceReceiver>,
60 outgoing_resources: Vec<ResourceSender>,
62 incoming_splits: HashMap<[u8; 32], IncomingSplitTransfer>,
64 outgoing_splits: HashMap<[u8; 32], OutgoingSplitTransfer>,
66 resource_strategy: ResourceStrategy,
68 route_interface: Option<rns_core::transport::types::InterfaceId>,
70 route_transport_id: Option<[u8; 16]>,
75}
76
77struct IncomingSplitTransfer {
78 total_segments: u64,
79 completed_segments: u64,
80 current_segment_index: u64,
81 current_received_parts: usize,
82 current_total_parts: usize,
83 data: Vec<u8>,
84 metadata: Option<Vec<u8>>,
85 is_response: bool,
86}
87
88struct OutgoingSplitTransfer {
89 total_segments: u64,
90 completed_segments: u64,
91 current_segment_index: u64,
92 current_sent_parts: usize,
93 current_total_parts: usize,
94}
95
96struct LinkDestination {
98 sig_prv: Ed25519PrivateKey,
99 sig_pub_bytes: [u8; 32],
100 resource_strategy: ResourceStrategy,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum RequestResponse {
106 Bytes(Vec<u8>),
108 Resource {
110 data: Vec<u8>,
111 metadata: Option<Vec<u8>>,
112 auto_compress: bool,
113 },
114}
115
116impl From<Vec<u8>> for RequestResponse {
117 fn from(data: Vec<u8>) -> Self {
118 RequestResponse::Bytes(data)
119 }
120}
121
122struct RequestHandlerEntry {
124 path: String,
126 path_hash: [u8; 16],
128 allowed_list: Option<Vec<[u8; 16]>>,
130 handler: Box<
132 dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
133 + Send,
134 >,
135}
136
137#[derive(Debug)]
139pub enum LinkManagerAction {
140 SendPacket {
142 raw: Vec<u8>,
143 dest_type: u8,
144 attached_interface: Option<rns_core::transport::types::InterfaceId>,
145 },
146 LinkEstablished {
148 link_id: LinkId,
149 dest_hash: [u8; 16],
150 rtt: f64,
151 is_initiator: bool,
152 },
153 LinkClosed {
155 link_id: LinkId,
156 reason: Option<TeardownReason>,
157 },
158 RemoteIdentified {
160 link_id: LinkId,
161 identity_hash: [u8; 16],
162 public_key: [u8; 64],
163 },
164 RegisterLinkDest { link_id: LinkId },
166 DeregisterLinkDest { link_id: LinkId },
168 ManagementRequest {
171 link_id: LinkId,
172 path_hash: [u8; 16],
173 data: Vec<u8>,
175 request_id: [u8; 16],
177 remote_identity: Option<([u8; 16], [u8; 64])>,
178 },
179 ResourceReceived {
181 link_id: LinkId,
182 data: Vec<u8>,
183 metadata: Option<Vec<u8>>,
184 },
185 ResourceCompleted { link_id: LinkId },
187 ResourceFailed { link_id: LinkId, error: String },
189 ResourceProgress {
191 link_id: LinkId,
192 received: usize,
193 total: usize,
194 },
195 ResourceAcceptQuery {
197 link_id: LinkId,
198 resource_hash: Vec<u8>,
199 transfer_size: u64,
200 has_metadata: bool,
201 },
202 ChannelMessageReceived {
204 link_id: LinkId,
205 msgtype: u16,
206 payload: Vec<u8>,
207 },
208 LinkDataReceived {
210 link_id: LinkId,
211 context: u8,
212 data: Vec<u8>,
213 },
214 ResponseReceived {
216 link_id: LinkId,
217 request_id: [u8; 16],
218 data: Vec<u8>,
219 metadata: Option<Vec<u8>>,
220 },
221 LinkRequestReceived {
223 link_id: LinkId,
224 receiving_interface: rns_core::transport::types::InterfaceId,
225 },
226}
227
228pub struct LinkManager {
230 links: HashMap<LinkId, ManagedLink>,
231 link_destinations: HashMap<[u8; 16], LinkDestination>,
232 request_handlers: Vec<RequestHandlerEntry>,
233 management_paths: Vec<[u8; 16]>,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct LinkRouteHint {
240 pub interface: rns_core::transport::types::InterfaceId,
241 pub transport_id: Option<[u8; 16]>,
242}
243
244impl LinkManager {
245 fn resource_sdu_for_link(link: &ManagedLink) -> usize {
246 let mtu = link.engine.mtu() as usize;
249 let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
250 if derived > 0 {
251 derived
252 } else {
253 constants::RESOURCE_SDU
254 }
255 }
256
257 fn split_progress_parts(
258 segment_index: u64,
259 total_segments: u64,
260 current_done: usize,
261 current_total: usize,
262 sdu: usize,
263 ) -> (usize, usize) {
264 let max_parts_per_segment = constants::RESOURCE_MAX_EFFICIENT_SIZE.div_ceil(sdu.max(1));
265 let total = (total_segments as usize).saturating_mul(max_parts_per_segment);
266 let completed_segments = segment_index.saturating_sub(1) as usize;
267 let completed = completed_segments.saturating_mul(max_parts_per_segment);
268 let current = if current_total == 0 {
269 0
270 } else if current_total < max_parts_per_segment {
271 let scaled =
272 (current_done as f64) * (max_parts_per_segment as f64 / current_total as f64);
273 scaled.floor() as usize
274 } else {
275 current_done
276 };
277 (completed.saturating_add(current).min(total), total)
278 }
279
280 fn resource_hash_key(hash: &[u8]) -> Option<[u8; 32]> {
281 let mut key = [0u8; 32];
282 if hash.len() != key.len() {
283 return None;
284 }
285 key.copy_from_slice(hash);
286 Some(key)
287 }
288
289 fn incoming_split_progress(split: &IncomingSplitTransfer, sdu: usize) -> (usize, usize) {
290 Self::split_progress_parts(
291 split.current_segment_index,
292 split.total_segments,
293 split.current_received_parts,
294 split.current_total_parts,
295 sdu,
296 )
297 }
298
299 fn outgoing_split_progress(split: &OutgoingSplitTransfer, sdu: usize) -> (usize, usize) {
300 Self::split_progress_parts(
301 split.current_segment_index,
302 split.total_segments,
303 split.current_sent_parts,
304 split.current_total_parts,
305 sdu,
306 )
307 }
308
309 pub fn new() -> Self {
311 LinkManager {
312 links: HashMap::new(),
313 link_destinations: HashMap::new(),
314 request_handlers: Vec::new(),
315 management_paths: Vec::new(),
316 }
317 }
318
319 pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
323 if !self.management_paths.contains(&path_hash) {
324 self.management_paths.push(path_hash);
325 }
326 }
327
328 pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
330 self.links
331 .get(link_id)
332 .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
333 }
334
335 pub fn get_link_route_hint(&self, link_id: &LinkId) -> Option<LinkRouteHint> {
337 self.links.get(link_id).and_then(|link| {
338 link.route_interface.map(|interface| LinkRouteHint {
339 interface,
340 transport_id: link.route_transport_id,
341 })
342 })
343 }
344
345 pub fn set_link_route_hint(
347 &mut self,
348 link_id: &LinkId,
349 interface: rns_core::transport::types::InterfaceId,
350 transport_id: Option<[u8; 16]>,
351 ) -> bool {
352 let Some(link) = self.links.get_mut(link_id) else {
353 return false;
354 };
355 link.route_interface = Some(interface);
356 link.route_transport_id = transport_id;
357 true
358 }
359
360 pub fn register_link_destination(
362 &mut self,
363 dest_hash: [u8; 16],
364 sig_prv: Ed25519PrivateKey,
365 sig_pub_bytes: [u8; 32],
366 resource_strategy: ResourceStrategy,
367 ) {
368 self.link_destinations.insert(
369 dest_hash,
370 LinkDestination {
371 sig_prv,
372 sig_pub_bytes,
373 resource_strategy,
374 },
375 );
376 }
377
378 pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
380 self.link_destinations.remove(dest_hash);
381 }
382
383 pub fn register_request_handler<F>(
389 &mut self,
390 path: &str,
391 allowed_list: Option<Vec<[u8; 16]>>,
392 handler: F,
393 ) where
394 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
395 + Send
396 + 'static,
397 {
398 let path_hash = compute_path_hash(path);
399 self.request_handlers.push(RequestHandlerEntry {
400 path: path.to_string(),
401 path_hash,
402 allowed_list,
403 handler: Box::new(move |link_id, p, data, remote| {
404 handler(link_id, p, data, remote).map(RequestResponse::Bytes)
405 }),
406 });
407 }
408
409 pub fn register_request_handler_response<F>(
411 &mut self,
412 path: &str,
413 allowed_list: Option<Vec<[u8; 16]>>,
414 handler: F,
415 ) where
416 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
417 + Send
418 + 'static,
419 {
420 let path_hash = compute_path_hash(path);
421 self.request_handlers.push(RequestHandlerEntry {
422 path: path.to_string(),
423 path_hash,
424 allowed_list,
425 handler: Box::new(handler),
426 });
427 }
428
429 pub fn create_link(
437 &mut self,
438 dest_hash: &[u8; 16],
439 dest_sig_pub_bytes: &[u8; 32],
440 hops: u8,
441 mtu: u32,
442 rng: &mut dyn Rng,
443 ) -> (LinkId, Vec<LinkManagerAction>) {
444 let mode = LinkMode::Aes256Cbc;
445 let (mut engine, request_data) =
446 LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
447
448 let flags = PacketFlags {
450 header_type: constants::HEADER_1,
451 context_flag: constants::FLAG_UNSET,
452 transport_type: constants::TRANSPORT_BROADCAST,
453 destination_type: constants::DESTINATION_SINGLE,
454 packet_type: constants::PACKET_TYPE_LINKREQUEST,
455 };
456
457 let packet = match RawPacket::pack(
458 flags,
459 0,
460 dest_hash,
461 None,
462 constants::CONTEXT_NONE,
463 &request_data,
464 ) {
465 Ok(p) => p,
466 Err(_) => {
467 return ([0u8; 16], Vec::new());
469 }
470 };
471
472 engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
473 let link_id = *engine.link_id();
474
475 let managed = ManagedLink {
476 engine,
477 channel: None,
478 pending_channel_packets: HashMap::new(),
479 channel_send_ok: 0,
480 channel_send_not_ready: 0,
481 channel_send_too_big: 0,
482 channel_send_other_error: 0,
483 channel_messages_received: 0,
484 channel_proofs_sent: 0,
485 channel_proofs_received: 0,
486 dest_hash: *dest_hash,
487 remote_identity: None,
488 dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
489 incoming_resources: Vec::new(),
490 outgoing_resources: Vec::new(),
491 incoming_splits: HashMap::new(),
492 outgoing_splits: HashMap::new(),
493 resource_strategy: ResourceStrategy::default(),
494 route_interface: None,
495 route_transport_id: None,
496 };
497 self.links.insert(link_id, managed);
498
499 let mut actions = Vec::new();
500 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
502 actions.push(LinkManagerAction::SendPacket {
504 raw: packet.raw,
505 dest_type: constants::DESTINATION_LINK,
506 attached_interface: None,
507 });
508
509 (link_id, actions)
510 }
511
512 pub fn handle_local_delivery(
518 &mut self,
519 dest_hash: [u8; 16],
520 raw: &[u8],
521 packet_hash: [u8; 32],
522 receiving_interface: rns_core::transport::types::InterfaceId,
523 rng: &mut dyn Rng,
524 ) -> Vec<LinkManagerAction> {
525 let packet = match RawPacket::unpack(raw) {
526 Ok(p) => p,
527 Err(_) => return Vec::new(),
528 };
529
530 match packet.flags.packet_type {
531 constants::PACKET_TYPE_LINKREQUEST => {
532 self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
533 }
534 constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
535 self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
537 }
538 constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
539 constants::PACKET_TYPE_DATA => {
540 self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
541 }
542 _ => Vec::new(),
543 }
544 }
545
546 fn handle_linkrequest(
548 &mut self,
549 dest_hash: &[u8; 16],
550 packet: &RawPacket,
551 receiving_interface: rns_core::transport::types::InterfaceId,
552 rng: &mut dyn Rng,
553 ) -> Vec<LinkManagerAction> {
554 let ld = match self.link_destinations.get(dest_hash) {
556 Some(ld) => ld,
557 None => return Vec::new(),
558 };
559
560 let hashable = packet.get_hashable_part();
561 let now = time::now();
562
563 let (engine, lrproof_data) = match LinkEngine::new_responder(
565 &ld.sig_prv,
566 &ld.sig_pub_bytes,
567 &packet.data,
568 &hashable,
569 dest_hash,
570 packet.hops,
571 now,
572 rng,
573 ) {
574 Ok(r) => r,
575 Err(e) => {
576 log::debug!("LINKREQUEST rejected: {}", e);
577 return Vec::new();
578 }
579 };
580
581 let link_id = *engine.link_id();
582 log::debug!(
583 "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
584 &link_id[..4],
585 receiving_interface.0,
586 packet.flags.header_type,
587 packet.transport_id.is_some(),
588 packet.hops
589 );
590
591 let managed = ManagedLink {
592 engine,
593 channel: None,
594 pending_channel_packets: HashMap::new(),
595 channel_send_ok: 0,
596 channel_send_not_ready: 0,
597 channel_send_too_big: 0,
598 channel_send_other_error: 0,
599 channel_messages_received: 0,
600 channel_proofs_sent: 0,
601 channel_proofs_received: 0,
602 dest_hash: *dest_hash,
603 remote_identity: None,
604 dest_sig_pub_bytes: None,
605 incoming_resources: Vec::new(),
606 outgoing_resources: Vec::new(),
607 incoming_splits: HashMap::new(),
608 outgoing_splits: HashMap::new(),
609 resource_strategy: ld.resource_strategy,
610 route_interface: Some(receiving_interface),
611 route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
612 packet.transport_id
613 } else {
614 None
615 },
616 };
617 self.links.insert(link_id, managed);
618
619 let flags = PacketFlags {
621 header_type: constants::HEADER_1,
622 context_flag: constants::FLAG_UNSET,
623 transport_type: constants::TRANSPORT_BROADCAST,
624 destination_type: constants::DESTINATION_LINK,
625 packet_type: constants::PACKET_TYPE_PROOF,
626 };
627
628 let mut actions = Vec::new();
629
630 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
632
633 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
634 flags,
635 packet.hops,
636 &link_id,
637 None,
638 constants::CONTEXT_LRPROOF,
639 &lrproof_data,
640 ) {
641 log::debug!(
642 "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
643 &link_id[..4],
644 receiving_interface.0,
645 packet.transport_id.is_some(),
646 packet.hops
647 );
648 actions.push(LinkManagerAction::SendPacket {
649 raw,
650 dest_type: constants::DESTINATION_LINK,
651 attached_interface: None,
652 });
653 }
654
655 if packet.hops != 0 {
658 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
659 flags,
660 0,
661 &link_id,
662 None,
663 constants::CONTEXT_LRPROOF,
664 &lrproof_data,
665 ) {
666 log::debug!(
667 "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
668 &link_id[..4],
669 receiving_interface.0
670 );
671 actions.push(LinkManagerAction::SendPacket {
672 raw,
673 dest_type: constants::DESTINATION_LINK,
674 attached_interface: None,
675 });
676 }
677 }
678
679 if packet.hops < u8::MAX {
683 let hops_plus_one = packet.hops + 1;
684 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
685 flags,
686 hops_plus_one,
687 &link_id,
688 None,
689 constants::CONTEXT_LRPROOF,
690 &lrproof_data,
691 ) {
692 log::debug!(
693 "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
694 &link_id[..4],
695 receiving_interface.0,
696 hops_plus_one
697 );
698 actions.push(LinkManagerAction::SendPacket {
699 raw,
700 dest_type: constants::DESTINATION_LINK,
701 attached_interface: None,
702 });
703 }
704 }
705
706 actions.push(LinkManagerAction::LinkRequestReceived {
708 link_id,
709 receiving_interface,
710 });
711
712 actions
713 }
714
715 fn handle_link_proof(
716 &mut self,
717 link_id: &LinkId,
718 packet: &RawPacket,
719 rng: &mut dyn Rng,
720 ) -> Vec<LinkManagerAction> {
721 if packet.data.len() < 32 {
722 return Vec::new();
723 }
724
725 let mut tracked_hash = [0u8; 32];
726 tracked_hash.copy_from_slice(&packet.data[..32]);
727
728 let Some(link) = self.links.get_mut(link_id) else {
729 return Vec::new();
730 };
731 let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
732 return Vec::new();
733 };
734 link.channel_proofs_received += 1;
735 let Some(channel) = link.channel.as_mut() else {
736 return Vec::new();
737 };
738
739 let chan_actions = channel.packet_delivered(sequence);
740 let _ = channel;
741 let _ = link;
742 self.process_channel_actions(link_id, chan_actions, rng)
743 }
744
745 fn build_link_packet_proof(
746 &mut self,
747 link_id: &LinkId,
748 packet_hash: &[u8; 32],
749 ) -> Vec<LinkManagerAction> {
750 let dest_hash = match self.links.get(link_id) {
751 Some(link) => link.dest_hash,
752 None => return Vec::new(),
753 };
754 let Some(ld) = self.link_destinations.get(&dest_hash) else {
755 return Vec::new();
756 };
757 if let Some(link) = self.links.get_mut(link_id) {
758 link.channel_proofs_sent += 1;
759 }
760
761 let signature = ld.sig_prv.sign(packet_hash);
762 let mut proof_data = Vec::with_capacity(96);
763 proof_data.extend_from_slice(packet_hash);
764 proof_data.extend_from_slice(&signature);
765
766 let flags = PacketFlags {
767 header_type: constants::HEADER_1,
768 context_flag: constants::FLAG_UNSET,
769 transport_type: constants::TRANSPORT_BROADCAST,
770 destination_type: constants::DESTINATION_LINK,
771 packet_type: constants::PACKET_TYPE_PROOF,
772 };
773 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
774 flags,
775 0,
776 link_id,
777 None,
778 constants::CONTEXT_NONE,
779 &proof_data,
780 ) {
781 vec![LinkManagerAction::SendPacket {
782 raw,
783 dest_type: constants::DESTINATION_LINK,
784 attached_interface: None,
785 }]
786 } else {
787 Vec::new()
788 }
789 }
790
791 fn handle_lrproof(
793 &mut self,
794 link_id_bytes: &[u8; 16],
795 packet: &RawPacket,
796 receiving_interface: rns_core::transport::types::InterfaceId,
797 rng: &mut dyn Rng,
798 ) -> Vec<LinkManagerAction> {
799 let link = match self.links.get_mut(link_id_bytes) {
800 Some(l) => l,
801 None => return Vec::new(),
802 };
803
804 link.route_interface = Some(receiving_interface);
805 if packet.flags.header_type == constants::HEADER_2 {
806 if let Some(transport_id) = packet.transport_id {
807 link.route_transport_id = Some(transport_id);
808 }
809 }
810 log::debug!(
811 "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
812 &link_id_bytes[..4],
813 receiving_interface.0,
814 packet.flags.header_type,
815 packet.transport_id.is_some()
816 );
817
818 if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
819 return Vec::new();
820 }
821
822 let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
824 Some(b) => b,
825 None => {
826 log::debug!("LRPROOF: no destination signing key available");
827 return Vec::new();
828 }
829 };
830
831 let now = time::now();
832 let (lrrtt_encrypted, link_actions) =
833 match link
834 .engine
835 .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
836 {
837 Ok(r) => r,
838 Err(e) => {
839 log::debug!("LRPROOF validation failed: {}", e);
840 return Vec::new();
841 }
842 };
843
844 let link_id = *link.engine.link_id();
845 let mut actions = Vec::new();
846
847 actions.extend(self.process_link_actions(&link_id, &link_actions));
849
850 let flags = PacketFlags {
852 header_type: constants::HEADER_1,
853 context_flag: constants::FLAG_UNSET,
854 transport_type: constants::TRANSPORT_BROADCAST,
855 destination_type: constants::DESTINATION_LINK,
856 packet_type: constants::PACKET_TYPE_DATA,
857 };
858
859 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
860 flags,
861 0,
862 &link_id,
863 None,
864 constants::CONTEXT_LRRTT,
865 &lrrtt_encrypted,
866 ) {
867 actions.push(LinkManagerAction::SendPacket {
868 raw,
869 dest_type: constants::DESTINATION_LINK,
870 attached_interface: None,
871 });
872 }
873
874 if let Some(link) = self.links.get_mut(&link_id) {
876 if link.engine.state() == LinkState::Active {
877 let rtt = link.engine.rtt().unwrap_or(1.0);
878 link.channel = Some(Channel::new(rtt));
879 }
880 }
881
882 actions
883 }
884
885 fn handle_link_data(
891 &mut self,
892 link_id_bytes: &[u8; 16],
893 packet: &RawPacket,
894 packet_hash: [u8; 32],
895 receiving_interface: rns_core::transport::types::InterfaceId,
896 rng: &mut dyn Rng,
897 ) -> Vec<LinkManagerAction> {
898 enum LinkDataResult<'a> {
900 Lrrtt {
901 link_id: LinkId,
902 link_actions: Vec<LinkAction>,
903 },
904 Identify {
905 link_id: LinkId,
906 link_actions: Vec<LinkAction>,
907 },
908 Keepalive {
909 link_id: LinkId,
910 inbound_actions: Vec<LinkAction>,
911 },
912 LinkClose {
913 link_id: LinkId,
914 teardown_actions: Vec<LinkAction>,
915 },
916 Channel {
917 link_id: LinkId,
918 inbound_actions: Vec<LinkAction>,
919 plaintext: Vec<u8>,
920 packet_hash: [u8; 32],
921 },
922 Request {
923 link_id: LinkId,
924 inbound_actions: Vec<LinkAction>,
925 plaintext: Vec<u8>,
926 request_id: [u8; 16],
927 },
928 Response {
929 link_id: LinkId,
930 inbound_actions: Vec<LinkAction>,
931 plaintext: Vec<u8>,
932 },
933 Generic {
934 link_id: LinkId,
935 inbound_actions: Vec<LinkAction>,
936 plaintext: Vec<u8>,
937 context: u8,
938 packet_hash: [u8; 32],
939 },
940 ResourceAdv {
942 link_id: LinkId,
943 inbound_actions: Vec<LinkAction>,
944 plaintext: Vec<u8>,
945 },
946 ResourceReq {
948 link_id: LinkId,
949 inbound_actions: Vec<LinkAction>,
950 plaintext: Vec<u8>,
951 },
952 ResourceHmu {
954 link_id: LinkId,
955 inbound_actions: Vec<LinkAction>,
956 plaintext: Vec<u8>,
957 },
958 ResourcePart {
960 link_id: LinkId,
961 inbound_actions: Vec<LinkAction>,
962 raw_data: &'a [u8],
963 },
964 ResourcePrf {
966 link_id: LinkId,
967 inbound_actions: Vec<LinkAction>,
968 plaintext: Vec<u8>,
969 },
970 ResourceIcl {
972 link_id: LinkId,
973 inbound_actions: Vec<LinkAction>,
974 },
975 ResourceRcl {
977 link_id: LinkId,
978 inbound_actions: Vec<LinkAction>,
979 },
980 Error,
981 }
982
983 let result = {
984 let link = match self.links.get_mut(link_id_bytes) {
985 Some(l) => l,
986 None => return Vec::new(),
987 };
988
989 link.route_interface = Some(receiving_interface);
990 if packet.flags.header_type == constants::HEADER_2 {
991 if let Some(transport_id) = packet.transport_id {
992 link.route_transport_id = Some(transport_id);
993 }
994 } else {
995 link.route_transport_id = None;
996 }
997
998 match packet.context {
999 constants::CONTEXT_LRRTT => {
1000 match link.engine.handle_lrrtt(&packet.data, time::now()) {
1001 Ok(link_actions) => {
1002 let link_id = *link.engine.link_id();
1003 LinkDataResult::Lrrtt {
1004 link_id,
1005 link_actions,
1006 }
1007 }
1008 Err(e) => {
1009 log::debug!("LRRTT handling failed: {}", e);
1010 LinkDataResult::Error
1011 }
1012 }
1013 }
1014 constants::CONTEXT_LINKIDENTIFY => {
1015 match link.engine.handle_identify(&packet.data) {
1016 Ok(link_actions) => {
1017 let link_id = *link.engine.link_id();
1018 link.remote_identity = link.engine.remote_identity().cloned();
1019 LinkDataResult::Identify {
1020 link_id,
1021 link_actions,
1022 }
1023 }
1024 Err(e) => {
1025 log::debug!("LINKIDENTIFY failed: {}", e);
1026 LinkDataResult::Error
1027 }
1028 }
1029 }
1030 constants::CONTEXT_KEEPALIVE => {
1031 let inbound_actions = link.engine.record_inbound(time::now());
1032 let link_id = *link.engine.link_id();
1033 LinkDataResult::Keepalive {
1034 link_id,
1035 inbound_actions,
1036 }
1037 }
1038 constants::CONTEXT_LINKCLOSE => {
1039 let teardown_actions = link.engine.handle_teardown();
1040 let link_id = *link.engine.link_id();
1041 LinkDataResult::LinkClose {
1042 link_id,
1043 teardown_actions,
1044 }
1045 }
1046 constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
1047 Ok(plaintext) => {
1048 let inbound_actions = link.engine.record_inbound(time::now());
1049 let link_id = *link.engine.link_id();
1050 LinkDataResult::Channel {
1051 link_id,
1052 inbound_actions,
1053 plaintext,
1054 packet_hash,
1055 }
1056 }
1057 Err(_) => LinkDataResult::Error,
1058 },
1059 constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
1060 Ok(plaintext) => {
1061 let inbound_actions = link.engine.record_inbound(time::now());
1062 let link_id = *link.engine.link_id();
1063 let request_id = packet.get_truncated_hash();
1064 LinkDataResult::Request {
1065 link_id,
1066 inbound_actions,
1067 plaintext,
1068 request_id,
1069 }
1070 }
1071 Err(_) => LinkDataResult::Error,
1072 },
1073 constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
1074 Ok(plaintext) => {
1075 let inbound_actions = link.engine.record_inbound(time::now());
1076 let link_id = *link.engine.link_id();
1077 LinkDataResult::Response {
1078 link_id,
1079 inbound_actions,
1080 plaintext,
1081 }
1082 }
1083 Err(_) => LinkDataResult::Error,
1084 },
1085 constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
1087 Ok(plaintext) => {
1088 let inbound_actions = link.engine.record_inbound(time::now());
1089 let link_id = *link.engine.link_id();
1090 LinkDataResult::ResourceAdv {
1091 link_id,
1092 inbound_actions,
1093 plaintext,
1094 }
1095 }
1096 Err(_) => LinkDataResult::Error,
1097 },
1098 constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
1099 Ok(plaintext) => {
1100 let inbound_actions = link.engine.record_inbound(time::now());
1101 let link_id = *link.engine.link_id();
1102 LinkDataResult::ResourceReq {
1103 link_id,
1104 inbound_actions,
1105 plaintext,
1106 }
1107 }
1108 Err(_) => LinkDataResult::Error,
1109 },
1110 constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
1111 Ok(plaintext) => {
1112 let inbound_actions = link.engine.record_inbound(time::now());
1113 let link_id = *link.engine.link_id();
1114 LinkDataResult::ResourceHmu {
1115 link_id,
1116 inbound_actions,
1117 plaintext,
1118 }
1119 }
1120 Err(_) => LinkDataResult::Error,
1121 },
1122 constants::CONTEXT_RESOURCE => {
1123 let inbound_actions = link.engine.record_inbound(time::now());
1125 let link_id = *link.engine.link_id();
1126 LinkDataResult::ResourcePart {
1127 link_id,
1128 inbound_actions,
1129 raw_data: &packet.data,
1130 }
1131 }
1132 constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
1133 Ok(plaintext) => {
1134 let inbound_actions = link.engine.record_inbound(time::now());
1135 let link_id = *link.engine.link_id();
1136 LinkDataResult::ResourcePrf {
1137 link_id,
1138 inbound_actions,
1139 plaintext,
1140 }
1141 }
1142 Err(_) => LinkDataResult::Error,
1143 },
1144 constants::CONTEXT_RESOURCE_ICL => {
1145 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1147 let link_id = *link.engine.link_id();
1148 LinkDataResult::ResourceIcl {
1149 link_id,
1150 inbound_actions,
1151 }
1152 }
1153 constants::CONTEXT_RESOURCE_RCL => {
1154 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1156 let link_id = *link.engine.link_id();
1157 LinkDataResult::ResourceRcl {
1158 link_id,
1159 inbound_actions,
1160 }
1161 }
1162 _ => match link.engine.decrypt(&packet.data) {
1163 Ok(plaintext) => {
1164 let inbound_actions = link.engine.record_inbound(time::now());
1165 let link_id = *link.engine.link_id();
1166 LinkDataResult::Generic {
1167 link_id,
1168 inbound_actions,
1169 plaintext,
1170 context: packet.context,
1171 packet_hash,
1172 }
1173 }
1174 Err(_) => LinkDataResult::Error,
1175 },
1176 }
1177 }; let mut actions = Vec::new();
1181 match result {
1182 LinkDataResult::Lrrtt {
1183 link_id,
1184 link_actions,
1185 } => {
1186 actions.extend(self.process_link_actions(&link_id, &link_actions));
1187 if let Some(link) = self.links.get_mut(&link_id) {
1189 if link.engine.state() == LinkState::Active {
1190 let rtt = link.engine.rtt().unwrap_or(1.0);
1191 link.channel = Some(Channel::new(rtt));
1192 }
1193 }
1194 }
1195 LinkDataResult::Identify {
1196 link_id,
1197 link_actions,
1198 } => {
1199 actions.extend(self.process_link_actions(&link_id, &link_actions));
1200 }
1201 LinkDataResult::Keepalive {
1202 link_id,
1203 inbound_actions,
1204 } => {
1205 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1206 }
1212 LinkDataResult::LinkClose {
1213 link_id,
1214 teardown_actions,
1215 } => {
1216 actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1217 }
1218 LinkDataResult::Channel {
1219 link_id,
1220 inbound_actions,
1221 plaintext,
1222 packet_hash,
1223 } => {
1224 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1225 if let Some(link) = self.links.get_mut(&link_id) {
1227 if let Some(ref mut channel) = link.channel {
1228 let chan_actions = channel.receive(&plaintext, time::now());
1229 link.channel_messages_received += chan_actions
1230 .iter()
1231 .filter(|action| {
1232 matches!(
1233 action,
1234 rns_core::channel::ChannelAction::MessageReceived { .. }
1235 )
1236 })
1237 .count()
1238 as u64;
1239 let _ = link;
1241 actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1242 }
1243 }
1244 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1245 }
1246 LinkDataResult::Request {
1247 link_id,
1248 inbound_actions,
1249 plaintext,
1250 request_id,
1251 } => {
1252 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1253 actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1254 }
1255 LinkDataResult::Response {
1256 link_id,
1257 inbound_actions,
1258 plaintext,
1259 } => {
1260 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1261 actions.extend(self.handle_response(&link_id, &plaintext, None));
1263 }
1264 LinkDataResult::Generic {
1265 link_id,
1266 inbound_actions,
1267 plaintext,
1268 context,
1269 packet_hash,
1270 } => {
1271 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1272 actions.push(LinkManagerAction::LinkDataReceived {
1273 link_id,
1274 context,
1275 data: plaintext,
1276 });
1277
1278 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1279 }
1280 LinkDataResult::ResourceAdv {
1281 link_id,
1282 inbound_actions,
1283 plaintext,
1284 } => {
1285 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1286 actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1287 }
1288 LinkDataResult::ResourceReq {
1289 link_id,
1290 inbound_actions,
1291 plaintext,
1292 } => {
1293 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1294 actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1295 }
1296 LinkDataResult::ResourceHmu {
1297 link_id,
1298 inbound_actions,
1299 plaintext,
1300 } => {
1301 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1302 actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1303 }
1304 LinkDataResult::ResourcePart {
1305 link_id,
1306 inbound_actions,
1307 raw_data,
1308 } => {
1309 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1310 actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1311 }
1312 LinkDataResult::ResourcePrf {
1313 link_id,
1314 inbound_actions,
1315 plaintext,
1316 } => {
1317 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1318 actions.extend(self.handle_resource_prf(&link_id, &plaintext, rng));
1319 }
1320 LinkDataResult::ResourceIcl {
1321 link_id,
1322 inbound_actions,
1323 } => {
1324 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1325 actions.extend(self.handle_resource_icl(&link_id));
1326 }
1327 LinkDataResult::ResourceRcl {
1328 link_id,
1329 inbound_actions,
1330 } => {
1331 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1332 actions.extend(self.handle_resource_rcl(&link_id));
1333 }
1334 LinkDataResult::Error => {}
1335 }
1336
1337 actions
1338 }
1339
1340 fn handle_request(
1342 &mut self,
1343 link_id: &LinkId,
1344 plaintext: &[u8],
1345 request_id: [u8; 16],
1346 rng: &mut dyn Rng,
1347 ) -> Vec<LinkManagerAction> {
1348 use rns_core::msgpack::{self, Value};
1349
1350 let arr = match msgpack::unpack_exact(plaintext) {
1352 Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1353 _ => return Vec::new(),
1354 };
1355
1356 let path_hash_bytes = match &arr[1] {
1357 Value::Bin(b) if b.len() == 16 => b,
1358 _ => return Vec::new(),
1359 };
1360 let mut path_hash = [0u8; 16];
1361 path_hash.copy_from_slice(path_hash_bytes);
1362
1363 let request_data = msgpack::pack(&arr[2]);
1365
1366 if self.management_paths.contains(&path_hash) {
1368 let remote_identity = self
1369 .links
1370 .get(link_id)
1371 .and_then(|l| l.remote_identity)
1372 .map(|(h, k)| (h, k));
1373 return vec![LinkManagerAction::ManagementRequest {
1374 link_id: *link_id,
1375 path_hash,
1376 data: request_data,
1377 request_id,
1378 remote_identity,
1379 }];
1380 }
1381
1382 let handler_idx = self
1384 .request_handlers
1385 .iter()
1386 .position(|h| h.path_hash == path_hash);
1387 let handler_idx = match handler_idx {
1388 Some(i) => i,
1389 None => return Vec::new(),
1390 };
1391
1392 let remote_identity = self
1394 .links
1395 .get(link_id)
1396 .and_then(|l| l.remote_identity.as_ref());
1397 let handler = &self.request_handlers[handler_idx];
1398 if let Some(ref allowed) = handler.allowed_list {
1399 match remote_identity {
1400 Some((identity_hash, _)) => {
1401 if !allowed.contains(identity_hash) {
1402 log::debug!("Request denied: identity not in allowed list");
1403 return Vec::new();
1404 }
1405 }
1406 None => {
1407 log::debug!("Request denied: peer not identified");
1408 return Vec::new();
1409 }
1410 }
1411 }
1412
1413 let path = handler.path.clone();
1415 let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1416
1417 let mut actions = Vec::new();
1418 if let Some(response) = response {
1419 match response {
1420 RequestResponse::Bytes(response_data) => {
1421 let mut response_actions =
1422 self.build_response_packet(link_id, &request_id, &response_data, rng);
1423 if response_actions.is_empty() {
1424 response_actions.extend(self.send_response_resource(
1425 link_id,
1426 &request_id,
1427 &response_data,
1428 None,
1429 true,
1430 rng,
1431 ));
1432 }
1433 actions.extend(response_actions);
1434 }
1435 RequestResponse::Resource {
1436 data,
1437 metadata,
1438 auto_compress,
1439 } => {
1440 actions.extend(self.send_response_resource(
1441 link_id,
1442 &request_id,
1443 &data,
1444 metadata.as_deref(),
1445 auto_compress,
1446 rng,
1447 ));
1448 }
1449 }
1450 }
1451
1452 actions
1453 }
1454
1455 fn build_response_packet(
1458 &self,
1459 link_id: &LinkId,
1460 request_id: &[u8; 16],
1461 response_data: &[u8],
1462 rng: &mut dyn Rng,
1463 ) -> Vec<LinkManagerAction> {
1464 use rns_core::msgpack::{self, Value};
1465
1466 let response_value = msgpack::unpack_exact(response_data)
1467 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1468
1469 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1470 let response_plaintext = msgpack::pack(&response_array);
1471
1472 let mut actions = Vec::new();
1473 if let Some(link) = self.links.get(link_id) {
1474 if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1475 let flags = PacketFlags {
1476 header_type: constants::HEADER_1,
1477 context_flag: constants::FLAG_UNSET,
1478 transport_type: constants::TRANSPORT_BROADCAST,
1479 destination_type: constants::DESTINATION_LINK,
1480 packet_type: constants::PACKET_TYPE_DATA,
1481 };
1482 let max_mtu = link.engine.mtu() as usize;
1483 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
1484 flags,
1485 0,
1486 link_id,
1487 None,
1488 constants::CONTEXT_RESPONSE,
1489 &encrypted,
1490 max_mtu,
1491 ) {
1492 actions.push(LinkManagerAction::SendPacket {
1493 raw,
1494 dest_type: constants::DESTINATION_LINK,
1495 attached_interface: None,
1496 });
1497 }
1498 }
1499 }
1500 actions
1501 }
1502
1503 fn send_response_resource(
1504 &mut self,
1505 link_id: &LinkId,
1506 request_id: &[u8; 16],
1507 response_data: &[u8],
1508 metadata: Option<&[u8]>,
1509 auto_compress: bool,
1510 rng: &mut dyn Rng,
1511 ) -> Vec<LinkManagerAction> {
1512 use rns_core::msgpack::{self, Value};
1513
1514 let link = match self.links.get_mut(link_id) {
1515 Some(l) => l,
1516 None => return Vec::new(),
1517 };
1518
1519 if link.engine.state() != LinkState::Active {
1520 return Vec::new();
1521 }
1522
1523 let now = time::now();
1524
1525 let response_value = msgpack::unpack_exact(response_data)
1529 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1530 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1531 let resource_payload = msgpack::pack(&response_array);
1532
1533 let senders = match Self::build_resource_senders(
1534 link,
1535 &resource_payload,
1536 metadata,
1537 auto_compress,
1538 true, Some(request_id.to_vec()),
1540 rng,
1541 now,
1542 ) {
1543 Ok(s) => s,
1544 Err(e) => {
1545 log::debug!("Failed to create response ResourceSender: {}", e);
1546 return Vec::new();
1547 }
1548 };
1549
1550 let adv_actions = Self::start_resource_senders(link, senders, now);
1551
1552 let _ = link;
1553 self.process_resource_actions(link_id, adv_actions, rng)
1554 }
1555
1556 pub fn send_management_response(
1559 &mut self,
1560 link_id: &LinkId,
1561 request_id: &[u8; 16],
1562 response_data: &[u8],
1563 rng: &mut dyn Rng,
1564 ) -> Vec<LinkManagerAction> {
1565 let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1566 if actions.is_empty() {
1567 actions.extend(self.send_response_resource(
1568 link_id,
1569 request_id,
1570 response_data,
1571 None,
1572 true,
1573 rng,
1574 ));
1575 }
1576 actions
1577 }
1578
1579 pub fn send_request(
1587 &self,
1588 link_id: &LinkId,
1589 path: &str,
1590 data: &[u8],
1591 rng: &mut dyn Rng,
1592 ) -> Vec<LinkManagerAction> {
1593 use rns_core::msgpack::{self, Value};
1594
1595 let link = match self.links.get(link_id) {
1596 Some(l) => l,
1597 None => return Vec::new(),
1598 };
1599
1600 if link.engine.state() != LinkState::Active {
1601 return Vec::new();
1602 }
1603
1604 let path_hash = compute_path_hash(path);
1605
1606 let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1608
1609 let request_array = Value::Array(vec![
1611 Value::Float(time::now()),
1612 Value::Bin(path_hash.to_vec()),
1613 data_value,
1614 ]);
1615 let plaintext = msgpack::pack(&request_array);
1616
1617 let encrypted = match link.engine.encrypt(&plaintext, rng) {
1618 Ok(e) => e,
1619 Err(_) => return Vec::new(),
1620 };
1621
1622 let flags = PacketFlags {
1623 header_type: constants::HEADER_1,
1624 context_flag: constants::FLAG_UNSET,
1625 transport_type: constants::TRANSPORT_BROADCAST,
1626 destination_type: constants::DESTINATION_LINK,
1627 packet_type: constants::PACKET_TYPE_DATA,
1628 };
1629
1630 let mut actions = Vec::new();
1631 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1632 flags,
1633 0,
1634 link_id,
1635 None,
1636 constants::CONTEXT_REQUEST,
1637 &encrypted,
1638 ) {
1639 actions.push(LinkManagerAction::SendPacket {
1640 raw,
1641 dest_type: constants::DESTINATION_LINK,
1642 attached_interface: None,
1643 });
1644 }
1645 actions
1646 }
1647
1648 pub fn send_on_link(
1650 &self,
1651 link_id: &LinkId,
1652 plaintext: &[u8],
1653 context: u8,
1654 rng: &mut dyn Rng,
1655 ) -> Vec<LinkManagerAction> {
1656 let link = match self.links.get(link_id) {
1657 Some(l) => l,
1658 None => return Vec::new(),
1659 };
1660
1661 if link.engine.state() != LinkState::Active {
1662 return Vec::new();
1663 }
1664
1665 let encrypted = match link.engine.encrypt(plaintext, rng) {
1666 Ok(e) => e,
1667 Err(_) => return Vec::new(),
1668 };
1669
1670 let flags = PacketFlags {
1671 header_type: constants::HEADER_1,
1672 context_flag: constants::FLAG_UNSET,
1673 transport_type: constants::TRANSPORT_BROADCAST,
1674 destination_type: constants::DESTINATION_LINK,
1675 packet_type: constants::PACKET_TYPE_DATA,
1676 };
1677
1678 let mut actions = Vec::new();
1679 if let Ok((raw, _packet_hash)) =
1680 RawPacket::pack_raw_with_hash(flags, 0, link_id, None, context, &encrypted)
1681 {
1682 actions.push(LinkManagerAction::SendPacket {
1683 raw,
1684 dest_type: constants::DESTINATION_LINK,
1685 attached_interface: None,
1686 });
1687 }
1688 actions
1689 }
1690
1691 pub fn identify(
1693 &self,
1694 link_id: &LinkId,
1695 identity: &rns_crypto::identity::Identity,
1696 rng: &mut dyn Rng,
1697 ) -> Vec<LinkManagerAction> {
1698 let link = match self.links.get(link_id) {
1699 Some(l) => l,
1700 None => return Vec::new(),
1701 };
1702
1703 let encrypted = match link.engine.build_identify(identity, rng) {
1704 Ok(e) => e,
1705 Err(_) => return Vec::new(),
1706 };
1707
1708 let flags = PacketFlags {
1709 header_type: constants::HEADER_1,
1710 context_flag: constants::FLAG_UNSET,
1711 transport_type: constants::TRANSPORT_BROADCAST,
1712 destination_type: constants::DESTINATION_LINK,
1713 packet_type: constants::PACKET_TYPE_DATA,
1714 };
1715
1716 let mut actions = Vec::new();
1717 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1718 flags,
1719 0,
1720 link_id,
1721 None,
1722 constants::CONTEXT_LINKIDENTIFY,
1723 &encrypted,
1724 ) {
1725 actions.push(LinkManagerAction::SendPacket {
1726 raw,
1727 dest_type: constants::DESTINATION_LINK,
1728 attached_interface: None,
1729 });
1730 }
1731 actions
1732 }
1733
1734 pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1736 let link = match self.links.get_mut(link_id) {
1737 Some(l) => l,
1738 None => return Vec::new(),
1739 };
1740
1741 let teardown_actions = link.engine.teardown();
1742 if let Some(ref mut channel) = link.channel {
1743 channel.shutdown();
1744 }
1745
1746 let mut actions = self.process_link_actions(link_id, &teardown_actions);
1747
1748 let flags = PacketFlags {
1750 header_type: constants::HEADER_1,
1751 context_flag: constants::FLAG_UNSET,
1752 transport_type: constants::TRANSPORT_BROADCAST,
1753 destination_type: constants::DESTINATION_LINK,
1754 packet_type: constants::PACKET_TYPE_DATA,
1755 };
1756 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1757 flags,
1758 0,
1759 link_id,
1760 None,
1761 constants::CONTEXT_LINKCLOSE,
1762 &[],
1763 ) {
1764 actions.push(LinkManagerAction::SendPacket {
1765 raw,
1766 dest_type: constants::DESTINATION_LINK,
1767 attached_interface: None,
1768 });
1769 }
1770
1771 actions
1772 }
1773
1774 pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1776 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1777 let mut actions = Vec::new();
1778 for link_id in link_ids {
1779 actions.extend(self.teardown_link(&link_id));
1780 }
1781 actions
1782 }
1783
1784 fn handle_response(
1786 &self,
1787 link_id: &LinkId,
1788 plaintext: &[u8],
1789 metadata: Option<Vec<u8>>,
1790 ) -> Vec<LinkManagerAction> {
1791 use rns_core::msgpack;
1792
1793 let arr = match msgpack::unpack_exact(plaintext) {
1795 Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1796 _ => return Vec::new(),
1797 };
1798
1799 let request_id_bytes = match &arr[0] {
1800 msgpack::Value::Bin(b) if b.len() == 16 => b,
1801 _ => return Vec::new(),
1802 };
1803 let mut request_id = [0u8; 16];
1804 request_id.copy_from_slice(request_id_bytes);
1805
1806 let response_data = msgpack::pack(&arr[1]);
1807
1808 vec![LinkManagerAction::ResponseReceived {
1809 link_id: *link_id,
1810 request_id,
1811 data: response_data,
1812 metadata,
1813 }]
1814 }
1815
1816 fn build_resource_senders(
1817 link: &ManagedLink,
1818 data: &[u8],
1819 metadata: Option<&[u8]>,
1820 auto_compress: bool,
1821 is_response: bool,
1822 request_id: Option<Vec<u8>>,
1823 rng: &mut dyn Rng,
1824 now: f64,
1825 ) -> Result<Vec<ResourceSender>, rns_core::resource::ResourceError> {
1826 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1827 let resource_sdu = Self::resource_sdu_for_link(link);
1828 let metadata_overhead = metadata.map(|m| 3 + m.len()).unwrap_or(0);
1829 let logical_size = metadata_overhead + data.len();
1830
1831 if logical_size <= constants::RESOURCE_MAX_EFFICIENT_SIZE {
1832 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1833 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1834 link.engine
1835 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1836 .unwrap_or_else(|_| plaintext.to_vec())
1837 };
1838 return ResourceSender::new(
1839 data,
1840 metadata,
1841 resource_sdu,
1842 &encrypt_fn,
1843 &Bzip2Compressor,
1844 rng,
1845 now,
1846 auto_compress,
1847 is_response,
1848 request_id,
1849 1,
1850 1,
1851 None,
1852 link_rtt,
1853 6.0,
1854 )
1855 .map(|sender| vec![sender]);
1856 }
1857
1858 if metadata_overhead > constants::RESOURCE_MAX_EFFICIENT_SIZE {
1859 return Err(rns_core::resource::ResourceError::TooLarge);
1860 }
1861
1862 let first_payload_len = core::cmp::min(
1863 data.len(),
1864 constants::RESOURCE_MAX_EFFICIENT_SIZE - metadata_overhead,
1865 );
1866 let remaining = data.len().saturating_sub(first_payload_len);
1867 let total_segments = 1 + remaining.div_ceil(constants::RESOURCE_MAX_EFFICIENT_SIZE) as u64;
1868
1869 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1870 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1871 link.engine
1872 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1873 .unwrap_or_else(|_| plaintext.to_vec())
1874 };
1875
1876 let mut senders = Vec::new();
1877 let mut first = ResourceSender::new(
1878 &data[..first_payload_len],
1879 metadata,
1880 resource_sdu,
1881 &encrypt_fn,
1882 &Bzip2Compressor,
1883 rng,
1884 now,
1885 auto_compress,
1886 is_response,
1887 request_id.clone(),
1888 1,
1889 total_segments,
1890 None,
1891 link_rtt,
1892 6.0,
1893 )?;
1894 first.data_size = logical_size;
1895 let original_hash = first.original_hash;
1896 let has_metadata = metadata.is_some();
1897 senders.push(first);
1898
1899 let mut offset = first_payload_len;
1900 let mut segment_index = 2;
1901 while offset < data.len() {
1902 let end = core::cmp::min(offset + constants::RESOURCE_MAX_EFFICIENT_SIZE, data.len());
1903 let mut sender = ResourceSender::new(
1904 &data[offset..end],
1905 None,
1906 resource_sdu,
1907 &encrypt_fn,
1908 &Bzip2Compressor,
1909 rng,
1910 now,
1911 auto_compress,
1912 is_response,
1913 request_id.clone(),
1914 segment_index,
1915 total_segments,
1916 Some(original_hash),
1917 link_rtt,
1918 6.0,
1919 )?;
1920 sender.data_size = logical_size;
1921 sender.flags.has_metadata = has_metadata;
1922 senders.push(sender);
1923 segment_index += 1;
1924 offset = end;
1925 }
1926
1927 Ok(senders)
1928 }
1929
1930 fn start_resource_senders(
1931 link: &mut ManagedLink,
1932 mut senders: Vec<ResourceSender>,
1933 now: f64,
1934 ) -> Vec<ResourceAction> {
1935 if senders.is_empty() {
1936 return Vec::new();
1937 }
1938
1939 let mut first = senders.remove(0);
1940 let adv_actions = first.advertise(now);
1941
1942 if first.total_segments > 1 {
1943 let original_hash = first.original_hash;
1944 let split = OutgoingSplitTransfer {
1945 total_segments: first.total_segments,
1946 completed_segments: 0,
1947 current_segment_index: first.segment_index,
1948 current_sent_parts: 0,
1949 current_total_parts: first.total_parts(),
1950 };
1951 link.outgoing_splits.insert(original_hash, split);
1952 }
1953
1954 link.outgoing_resources.push(first);
1955 link.outgoing_resources.extend(senders);
1956 adv_actions
1957 }
1958
1959 fn handle_resource_adv(
1961 &mut self,
1962 link_id: &LinkId,
1963 adv_plaintext: &[u8],
1964 rng: &mut dyn Rng,
1965 ) -> Vec<LinkManagerAction> {
1966 let link = match self.links.get_mut(link_id) {
1967 Some(l) => l,
1968 None => return Vec::new(),
1969 };
1970
1971 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1972 let resource_sdu = Self::resource_sdu_for_link(link);
1973 let now = time::now();
1974
1975 let receiver = match ResourceReceiver::from_advertisement(
1976 adv_plaintext,
1977 resource_sdu,
1978 link_rtt,
1979 now,
1980 None,
1981 None,
1982 ) {
1983 Ok(r) => r,
1984 Err(e) => {
1985 log::debug!("Resource ADV rejected: {}", e);
1986 return Vec::new();
1987 }
1988 };
1989
1990 let strategy = link.resource_strategy;
1991 let resource_hash = receiver.resource_hash.clone();
1992 let transfer_size = receiver.transfer_size;
1993 let has_metadata = receiver.has_metadata;
1994 let is_response = receiver.flags.is_response;
1995 let is_split = receiver.flags.split;
1996 let segment_index = receiver.segment_index;
1997 let total_segments = receiver.total_segments;
1998 let original_hash = match Self::resource_hash_key(&receiver.original_hash) {
1999 Some(key) => key,
2000 None => return Vec::new(),
2001 };
2002
2003 if is_split && segment_index > 1 {
2004 let should_accept = link
2005 .incoming_splits
2006 .get(&original_hash)
2007 .is_some_and(|split| {
2008 split.completed_segments + 1 == segment_index
2009 && split.total_segments == total_segments
2010 });
2011
2012 if !should_accept {
2013 let reject_actions = {
2014 let mut r = receiver;
2015 r.reject()
2016 };
2017 let _ = link;
2018 return self.process_resource_actions(link_id, reject_actions, rng);
2019 }
2020
2021 let current_total_parts = receiver.total_parts;
2022 link.incoming_resources.push(receiver);
2023 let idx = link.incoming_resources.len() - 1;
2024 if let Some(split) = link.incoming_splits.get_mut(&original_hash) {
2025 split.current_segment_index = segment_index;
2026 split.current_received_parts = 0;
2027 split.current_total_parts = current_total_parts;
2028 }
2029 let resource_actions = link.incoming_resources[idx].accept(now);
2030 let _ = link;
2031 return self.process_resource_actions(link_id, resource_actions, rng);
2032 }
2033
2034 if is_response {
2035 if is_split {
2038 link.incoming_splits.insert(
2039 original_hash,
2040 IncomingSplitTransfer {
2041 total_segments,
2042 completed_segments: 0,
2043 current_segment_index: segment_index,
2044 current_received_parts: 0,
2045 current_total_parts: receiver.total_parts,
2046 data: Vec::new(),
2047 metadata: None,
2048 is_response,
2049 },
2050 );
2051 }
2052 link.incoming_resources.push(receiver);
2053 let idx = link.incoming_resources.len() - 1;
2054 let resource_actions = link.incoming_resources[idx].accept(now);
2055 let _ = link;
2056 return self.process_resource_actions(link_id, resource_actions, rng);
2057 }
2058
2059 match strategy {
2060 ResourceStrategy::AcceptNone => {
2061 let reject_actions = {
2063 let mut r = receiver;
2064 r.reject()
2065 };
2066 self.process_resource_actions(link_id, reject_actions, rng)
2067 }
2068 ResourceStrategy::AcceptAll => {
2069 if is_split {
2070 link.incoming_splits.insert(
2071 original_hash,
2072 IncomingSplitTransfer {
2073 total_segments,
2074 completed_segments: 0,
2075 current_segment_index: segment_index,
2076 current_received_parts: 0,
2077 current_total_parts: receiver.total_parts,
2078 data: Vec::new(),
2079 metadata: None,
2080 is_response,
2081 },
2082 );
2083 }
2084 link.incoming_resources.push(receiver);
2085 let idx = link.incoming_resources.len() - 1;
2086 let resource_actions = link.incoming_resources[idx].accept(now);
2087 let _ = link;
2088 self.process_resource_actions(link_id, resource_actions, rng)
2089 }
2090 ResourceStrategy::AcceptApp => {
2091 link.incoming_resources.push(receiver);
2092 vec![LinkManagerAction::ResourceAcceptQuery {
2094 link_id: *link_id,
2095 resource_hash,
2096 transfer_size,
2097 has_metadata,
2098 }]
2099 }
2100 }
2101 }
2102
2103 pub fn accept_resource(
2105 &mut self,
2106 link_id: &LinkId,
2107 resource_hash: &[u8],
2108 accept: bool,
2109 rng: &mut dyn Rng,
2110 ) -> Vec<LinkManagerAction> {
2111 let link = match self.links.get_mut(link_id) {
2112 Some(l) => l,
2113 None => return Vec::new(),
2114 };
2115
2116 let now = time::now();
2117 let idx = link
2118 .incoming_resources
2119 .iter()
2120 .position(|r| r.resource_hash == resource_hash);
2121 let idx = match idx {
2122 Some(i) => i,
2123 None => return Vec::new(),
2124 };
2125
2126 if accept && link.incoming_resources[idx].flags.split {
2127 if let Some(original_hash) =
2128 Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2129 {
2130 link.incoming_splits
2131 .entry(original_hash)
2132 .or_insert_with(|| IncomingSplitTransfer {
2133 total_segments: link.incoming_resources[idx].total_segments,
2134 completed_segments: 0,
2135 current_segment_index: link.incoming_resources[idx].segment_index,
2136 current_received_parts: 0,
2137 current_total_parts: link.incoming_resources[idx].total_parts,
2138 data: Vec::new(),
2139 metadata: None,
2140 is_response: link.incoming_resources[idx].flags.is_response,
2141 });
2142 }
2143 }
2144
2145 let resource_actions = if accept {
2146 link.incoming_resources[idx].accept(now)
2147 } else {
2148 link.incoming_resources[idx].reject()
2149 };
2150
2151 let _ = link;
2152 self.process_resource_actions(link_id, resource_actions, rng)
2153 }
2154
2155 fn handle_resource_req(
2157 &mut self,
2158 link_id: &LinkId,
2159 plaintext: &[u8],
2160 rng: &mut dyn Rng,
2161 ) -> Vec<LinkManagerAction> {
2162 let link = match self.links.get_mut(link_id) {
2163 Some(l) => l,
2164 None => return Vec::new(),
2165 };
2166
2167 let now = time::now();
2168 let mut all_actions = Vec::new();
2169 let mut progress_update = None;
2170 for sender in &mut link.outgoing_resources {
2171 if sender.flags.split && sender.status == rns_core::resource::ResourceStatus::Queued {
2172 continue;
2173 }
2174 let before_sent = sender.sent_parts;
2175 let resource_actions = sender.handle_request(plaintext, now);
2176 if !resource_actions.is_empty() {
2177 if sender.sent_parts != before_sent {
2178 if sender.flags.split {
2179 if let Some(split) = link.outgoing_splits.get_mut(&sender.original_hash) {
2180 split.current_segment_index = sender.segment_index;
2181 split.current_sent_parts = sender.sent_parts;
2182 split.current_total_parts = sender.total_parts();
2183 progress_update =
2184 Some(Self::outgoing_split_progress(split, sender.sdu));
2185 }
2186 } else {
2187 progress_update = Some((sender.sent_parts, sender.total_parts()));
2188 }
2189 }
2190 all_actions.extend(resource_actions);
2191 break;
2192 }
2193 }
2194
2195 let _ = link;
2196 let mut out = self.process_resource_actions(link_id, all_actions, rng);
2197 if let Some((received, total)) = progress_update {
2198 out.push(LinkManagerAction::ResourceProgress {
2199 link_id: *link_id,
2200 received,
2201 total,
2202 });
2203 }
2204 out
2205 }
2206
2207 fn handle_resource_hmu(
2209 &mut self,
2210 link_id: &LinkId,
2211 plaintext: &[u8],
2212 rng: &mut dyn Rng,
2213 ) -> Vec<LinkManagerAction> {
2214 let link = match self.links.get_mut(link_id) {
2215 Some(l) => l,
2216 None => return Vec::new(),
2217 };
2218
2219 let now = time::now();
2220 let mut all_actions = Vec::new();
2221 for receiver in &mut link.incoming_resources {
2222 let resource_actions = receiver.handle_hashmap_update(plaintext, now);
2223 if !resource_actions.is_empty() {
2224 all_actions.extend(resource_actions);
2225 break;
2226 }
2227 }
2228
2229 let _ = link;
2230 self.process_resource_actions(link_id, all_actions, rng)
2231 }
2232
2233 fn handle_resource_part(
2235 &mut self,
2236 link_id: &LinkId,
2237 raw_data: &[u8],
2238 rng: &mut dyn Rng,
2239 ) -> Vec<LinkManagerAction> {
2240 let link = match self.links.get_mut(link_id) {
2241 Some(l) => l,
2242 None => return Vec::new(),
2243 };
2244
2245 let now = time::now();
2246 let resource_sdu = Self::resource_sdu_for_link(link);
2247 let mut all_actions = Vec::new();
2248 let mut assemble_idx = None;
2249 let mut assembled_is_response = false;
2250
2251 for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
2252 if receiver.status >= rns_core::resource::ResourceStatus::Complete {
2253 continue;
2254 }
2255 let resource_actions = receiver.receive_part(raw_data, now);
2256 if !resource_actions.is_empty() {
2257 if receiver.received_count == receiver.total_parts {
2258 assemble_idx = Some(idx);
2259 }
2260 if receiver.flags.split {
2261 if let Some(key) = Self::resource_hash_key(&receiver.original_hash) {
2262 if let Some(split) = link.incoming_splits.get_mut(&key) {
2263 split.current_segment_index = receiver.segment_index;
2264 split.current_received_parts = receiver.received_count;
2265 split.current_total_parts = receiver.total_parts;
2266 let (received, total) =
2267 Self::incoming_split_progress(split, resource_sdu);
2268 for action in resource_actions {
2269 match action {
2270 ResourceAction::ProgressUpdate { .. } => {
2271 all_actions.push(ResourceAction::ProgressUpdate {
2272 received,
2273 total,
2274 });
2275 }
2276 other => all_actions.push(other),
2277 }
2278 }
2279 } else {
2280 all_actions.extend(resource_actions);
2281 }
2282 } else {
2283 all_actions.extend(resource_actions);
2284 }
2285 } else {
2286 all_actions.extend(resource_actions);
2287 }
2288 break;
2289 }
2290 }
2291
2292 if let Some(idx) = assemble_idx {
2293 let split_key = if link.incoming_resources[idx].flags.split {
2294 Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2295 } else {
2296 None
2297 };
2298 let split_segment_index = link.incoming_resources[idx].segment_index;
2299 let split_segment_total = link.incoming_resources[idx].total_segments;
2300 let split_segment_parts = link.incoming_resources[idx].total_parts;
2301 let split_is_response = link.incoming_resources[idx].flags.is_response;
2302 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2303 link.engine.decrypt(ciphertext).map_err(|_| ())
2304 };
2305 let mut assemble_actions =
2306 link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
2307 assembled_is_response = split_is_response;
2308
2309 if let Some(key) = split_key {
2310 let mut converted_actions = Vec::new();
2311 let mut segment_data = None;
2312 let mut segment_metadata = None;
2313 for action in assemble_actions {
2314 match action {
2315 ResourceAction::DataReceived { data, metadata } => {
2316 segment_data = Some(data);
2317 segment_metadata = metadata;
2318 }
2319 ResourceAction::Completed => {}
2320 other => converted_actions.push(other),
2321 }
2322 }
2323
2324 if let Some(data) = segment_data {
2325 if let Some(split) = link.incoming_splits.get_mut(&key) {
2326 split.data.extend_from_slice(&data);
2327 if segment_metadata.is_some() {
2328 split.metadata = segment_metadata;
2329 }
2330 split.completed_segments = split_segment_index;
2331 split.current_segment_index = split_segment_index;
2332 split.current_received_parts = split_segment_parts;
2333 split.current_total_parts = split_segment_parts;
2334 }
2335
2336 if split_segment_index == split_segment_total {
2337 if let Some(split) = link.incoming_splits.remove(&key) {
2338 assembled_is_response = split.is_response;
2339 converted_actions.push(ResourceAction::DataReceived {
2340 data: split.data,
2341 metadata: split.metadata,
2342 });
2343 converted_actions.push(ResourceAction::Completed);
2344 }
2345 }
2346 }
2347
2348 assemble_actions = converted_actions;
2349 }
2350 all_actions.extend(assemble_actions);
2351 }
2352
2353 let _ = link;
2354 let mut out = self.process_resource_actions(link_id, all_actions, rng);
2355
2356 if assembled_is_response {
2357 let mut converted = Vec::new();
2358 for action in out {
2359 match action {
2360 LinkManagerAction::ResourceReceived { data, metadata, .. } => {
2361 converted.extend(self.handle_response(link_id, &data, metadata));
2362 }
2363 LinkManagerAction::ResourceAcceptQuery { .. } => {
2364 }
2366 other => converted.push(other),
2367 }
2368 }
2369 out = converted;
2370 }
2371
2372 out
2373 }
2374
2375 fn handle_resource_prf(
2377 &mut self,
2378 link_id: &LinkId,
2379 plaintext: &[u8],
2380 rng: &mut dyn Rng,
2381 ) -> Vec<LinkManagerAction> {
2382 let link = match self.links.get_mut(link_id) {
2383 Some(l) => l,
2384 None => return Vec::new(),
2385 };
2386
2387 let now = time::now();
2388 let mut result_actions = Vec::new();
2389 let mut completed_sender = None;
2390 let mut failed_split = None;
2391 let proof_hash = plaintext.get(..32);
2392 for sender in &mut link.outgoing_resources {
2393 if proof_hash.is_some_and(|hash| hash != sender.resource_hash.as_slice()) {
2394 continue;
2395 }
2396 let resource_actions = sender.handle_proof(plaintext, now);
2397 if !resource_actions.is_empty() {
2398 if resource_actions
2399 .iter()
2400 .any(|action| matches!(action, ResourceAction::Completed))
2401 {
2402 completed_sender = Some((
2403 sender.original_hash,
2404 sender.segment_index,
2405 sender.total_segments,
2406 sender.total_parts(),
2407 ));
2408 }
2409 if sender.flags.split
2410 && resource_actions
2411 .iter()
2412 .any(|action| matches!(action, ResourceAction::Failed(_)))
2413 {
2414 failed_split = Some(sender.original_hash);
2415 }
2416 result_actions.extend(resource_actions);
2417 break;
2418 }
2419 }
2420
2421 let mut actions = Vec::new();
2423 let mut advertise_next = None;
2424 for ra in result_actions {
2425 match ra {
2426 ResourceAction::Completed => {
2427 if let Some((original_hash, segment_index, total_segments, total_parts)) =
2428 completed_sender
2429 {
2430 if total_segments > 1 && segment_index < total_segments {
2431 if let Some(split) = link.outgoing_splits.get_mut(&original_hash) {
2432 split.completed_segments = segment_index;
2433 split.current_segment_index = segment_index;
2434 split.current_sent_parts = total_parts;
2435 split.current_total_parts = total_parts;
2436 if let Some(next) = link.outgoing_resources.iter_mut().find(|s| {
2437 s.flags.split
2438 && s.original_hash == original_hash
2439 && s.segment_index == segment_index + 1
2440 }) {
2441 split.current_segment_index = next.segment_index;
2442 split.current_sent_parts = 0;
2443 split.current_total_parts = next.total_parts();
2444 advertise_next = Some(next.advertise(now));
2445 }
2446 }
2447 } else {
2448 link.outgoing_splits.remove(&original_hash);
2449 actions
2450 .push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2451 }
2452 } else {
2453 actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2454 }
2455 }
2456 ResourceAction::Failed(e) => {
2457 if let Some(original_hash) = failed_split {
2458 link.outgoing_splits.remove(&original_hash);
2459 }
2460 actions.push(LinkManagerAction::ResourceFailed {
2461 link_id: *link_id,
2462 error: format!("{}", e),
2463 });
2464 }
2465 _ => {}
2466 }
2467 }
2468
2469 link.outgoing_resources
2471 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2472
2473 let _ = link;
2474 if let Some(next_actions) = advertise_next {
2475 actions.extend(self.process_resource_actions(link_id, next_actions, rng));
2476 }
2477
2478 actions
2479 }
2480
2481 fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2483 let link = match self.links.get_mut(link_id) {
2484 Some(l) => l,
2485 None => return Vec::new(),
2486 };
2487
2488 let mut actions = Vec::new();
2489 for receiver in &mut link.incoming_resources {
2490 let ra = receiver.handle_cancel();
2491 for a in ra {
2492 if let ResourceAction::Failed(ref e) = a {
2493 actions.push(LinkManagerAction::ResourceFailed {
2494 link_id: *link_id,
2495 error: format!("{}", e),
2496 });
2497 }
2498 }
2499 }
2500 link.incoming_resources
2501 .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
2502 link.incoming_splits.clear();
2503 actions
2504 }
2505
2506 fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2508 let link = match self.links.get_mut(link_id) {
2509 Some(l) => l,
2510 None => return Vec::new(),
2511 };
2512
2513 let mut actions = Vec::new();
2514 for sender in &mut link.outgoing_resources {
2515 let ra = sender.handle_reject();
2516 for a in ra {
2517 if let ResourceAction::Failed(ref e) = a {
2518 actions.push(LinkManagerAction::ResourceFailed {
2519 link_id: *link_id,
2520 error: format!("{}", e),
2521 });
2522 }
2523 }
2524 }
2525 link.outgoing_resources
2526 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2527 link.outgoing_splits.clear();
2528 actions
2529 }
2530
2531 fn process_resource_actions(
2533 &mut self,
2534 link_id: &LinkId,
2535 actions: Vec<ResourceAction>,
2536 rng: &mut dyn Rng,
2537 ) -> Vec<LinkManagerAction> {
2538 let mut result = Vec::new();
2539 for action in actions {
2540 match action {
2541 ResourceAction::SendAdvertisement(data) => {
2542 let encrypted = self
2544 .links
2545 .get(link_id)
2546 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2547 if let Some(encrypted) = encrypted {
2548 result.extend(self.build_link_packet(
2549 link_id,
2550 constants::CONTEXT_RESOURCE_ADV,
2551 &encrypted,
2552 ));
2553 }
2554 }
2555 ResourceAction::SendPart(data) => {
2556 result.extend(self.build_link_packet(
2558 link_id,
2559 constants::CONTEXT_RESOURCE,
2560 &data,
2561 ));
2562 }
2563 ResourceAction::SendRequest(data) => {
2564 let encrypted = self
2565 .links
2566 .get(link_id)
2567 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2568 if let Some(encrypted) = encrypted {
2569 result.extend(self.build_link_packet(
2570 link_id,
2571 constants::CONTEXT_RESOURCE_REQ,
2572 &encrypted,
2573 ));
2574 }
2575 }
2576 ResourceAction::SendHmu(data) => {
2577 let encrypted = self
2578 .links
2579 .get(link_id)
2580 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2581 if let Some(encrypted) = encrypted {
2582 result.extend(self.build_link_packet(
2583 link_id,
2584 constants::CONTEXT_RESOURCE_HMU,
2585 &encrypted,
2586 ));
2587 }
2588 }
2589 ResourceAction::SendProof(data) => {
2590 let encrypted = self
2591 .links
2592 .get(link_id)
2593 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2594 if let Some(encrypted) = encrypted {
2595 result.extend(self.build_link_packet(
2596 link_id,
2597 constants::CONTEXT_RESOURCE_PRF,
2598 &encrypted,
2599 ));
2600 }
2601 }
2602 ResourceAction::SendCancelInitiator(data) => {
2603 let encrypted = self
2604 .links
2605 .get(link_id)
2606 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2607 if let Some(encrypted) = encrypted {
2608 result.extend(self.build_link_packet(
2609 link_id,
2610 constants::CONTEXT_RESOURCE_ICL,
2611 &encrypted,
2612 ));
2613 }
2614 }
2615 ResourceAction::SendCancelReceiver(data) => {
2616 let encrypted = self
2617 .links
2618 .get(link_id)
2619 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2620 if let Some(encrypted) = encrypted {
2621 result.extend(self.build_link_packet(
2622 link_id,
2623 constants::CONTEXT_RESOURCE_RCL,
2624 &encrypted,
2625 ));
2626 }
2627 }
2628 ResourceAction::DataReceived { data, metadata } => {
2629 result.push(LinkManagerAction::ResourceReceived {
2630 link_id: *link_id,
2631 data,
2632 metadata,
2633 });
2634 }
2635 ResourceAction::Completed => {
2636 result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2637 }
2638 ResourceAction::Failed(e) => {
2639 result.push(LinkManagerAction::ResourceFailed {
2640 link_id: *link_id,
2641 error: format!("{}", e),
2642 });
2643 }
2644 ResourceAction::TeardownLink => {
2645 let teardown_actions = match self.links.get_mut(link_id) {
2646 Some(link) => link.engine.handle_teardown(),
2647 None => Vec::new(),
2648 };
2649 result.extend(self.process_link_actions(link_id, &teardown_actions));
2650 }
2651 ResourceAction::ProgressUpdate { received, total } => {
2652 result.push(LinkManagerAction::ResourceProgress {
2653 link_id: *link_id,
2654 received,
2655 total,
2656 });
2657 }
2658 }
2659 }
2660 result
2661 }
2662
2663 fn build_link_packet(
2665 &self,
2666 link_id: &LinkId,
2667 context: u8,
2668 data: &[u8],
2669 ) -> Vec<LinkManagerAction> {
2670 let flags = PacketFlags {
2671 header_type: constants::HEADER_1,
2672 context_flag: constants::FLAG_UNSET,
2673 transport_type: constants::TRANSPORT_BROADCAST,
2674 destination_type: constants::DESTINATION_LINK,
2675 packet_type: constants::PACKET_TYPE_DATA,
2676 };
2677 let mut actions = Vec::new();
2678 let max_mtu = self
2679 .links
2680 .get(link_id)
2681 .map(|l| l.engine.mtu() as usize)
2682 .unwrap_or(constants::MTU);
2683 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
2684 flags, 0, link_id, None, context, data, max_mtu,
2685 ) {
2686 actions.push(LinkManagerAction::SendPacket {
2687 raw,
2688 dest_type: constants::DESTINATION_LINK,
2689 attached_interface: None,
2690 });
2691 }
2692 actions
2693 }
2694
2695 pub fn send_resource(
2697 &mut self,
2698 link_id: &LinkId,
2699 data: &[u8],
2700 metadata: Option<&[u8]>,
2701 rng: &mut dyn Rng,
2702 ) -> Vec<LinkManagerAction> {
2703 self.send_resource_with_auto_compress(link_id, data, metadata, true, rng)
2704 }
2705
2706 pub fn send_resource_with_auto_compress(
2708 &mut self,
2709 link_id: &LinkId,
2710 data: &[u8],
2711 metadata: Option<&[u8]>,
2712 auto_compress: bool,
2713 rng: &mut dyn Rng,
2714 ) -> Vec<LinkManagerAction> {
2715 let link = match self.links.get_mut(link_id) {
2716 Some(l) => l,
2717 None => return Vec::new(),
2718 };
2719
2720 if link.engine.state() != LinkState::Active {
2721 return Vec::new();
2722 }
2723
2724 let now = time::now();
2725
2726 let senders = match Self::build_resource_senders(
2727 link,
2728 data,
2729 metadata,
2730 auto_compress,
2731 false, None, rng,
2734 now,
2735 ) {
2736 Ok(s) => s,
2737 Err(e) => {
2738 log::debug!("Failed to create ResourceSender: {}", e);
2739 return Vec::new();
2740 }
2741 };
2742
2743 let adv_actions = Self::start_resource_senders(link, senders, now);
2744
2745 let _ = link;
2746 self.process_resource_actions(link_id, adv_actions, rng)
2747 }
2748
2749 pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2751 if let Some(link) = self.links.get_mut(link_id) {
2752 link.resource_strategy = strategy;
2753 }
2754 }
2755
2756 pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2759 if let Some(link) = self.links.get_mut(link_id) {
2760 if let Some(ref mut channel) = link.channel {
2761 channel.flush_tx();
2762 }
2763 }
2764 }
2765
2766 pub fn send_channel_message(
2768 &mut self,
2769 link_id: &LinkId,
2770 msgtype: u16,
2771 payload: &[u8],
2772 rng: &mut dyn Rng,
2773 ) -> Result<Vec<LinkManagerAction>, String> {
2774 let link = match self.links.get_mut(link_id) {
2775 Some(l) => l,
2776 None => return Err("unknown link".to_string()),
2777 };
2778
2779 let channel = match link.channel {
2780 Some(ref mut ch) => ch,
2781 None => return Err("link has no active channel".to_string()),
2782 };
2783
2784 let link_mdu = link.engine.mdu();
2785 let now = time::now();
2786 let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2787 Ok(a) => {
2788 link.channel_send_ok += 1;
2789 a
2790 }
2791 Err(e) => {
2792 log::debug!("Channel send failed: {:?}", e);
2793 match e {
2794 rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2795 rns_core::channel::ChannelError::MessageTooBig => {
2796 link.channel_send_too_big += 1;
2797 }
2798 rns_core::channel::ChannelError::InvalidEnvelope => {
2799 link.channel_send_other_error += 1;
2800 }
2801 }
2802 return Err(e.to_string());
2803 }
2804 };
2805
2806 let _ = link;
2807 Ok(self.process_channel_actions(link_id, chan_actions, rng))
2808 }
2809
2810 pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2812 let now = time::now();
2813 let mut all_actions = Vec::new();
2814
2815 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2817
2818 for link_id in &link_ids {
2819 let link = match self.links.get_mut(link_id) {
2820 Some(l) => l,
2821 None => continue,
2822 };
2823
2824 let tick_actions = link.engine.tick(now);
2826 all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2827
2828 let link = match self.links.get_mut(link_id) {
2830 Some(l) => l,
2831 None => continue,
2832 };
2833 if link.engine.needs_keepalive(now) {
2834 let flags = PacketFlags {
2836 header_type: constants::HEADER_1,
2837 context_flag: constants::FLAG_UNSET,
2838 transport_type: constants::TRANSPORT_BROADCAST,
2839 destination_type: constants::DESTINATION_LINK,
2840 packet_type: constants::PACKET_TYPE_DATA,
2841 };
2842 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
2843 flags,
2844 0,
2845 link_id,
2846 None,
2847 constants::CONTEXT_KEEPALIVE,
2848 &[],
2849 ) {
2850 all_actions.push(LinkManagerAction::SendPacket {
2851 raw,
2852 dest_type: constants::DESTINATION_LINK,
2853 attached_interface: None,
2854 });
2855 link.engine.record_outbound(now, true);
2856 }
2857 }
2858
2859 if let Some(channel) = link.channel.as_mut() {
2860 let chan_actions = channel.tick(now);
2861 let _ = channel;
2862 let _ = link;
2863 all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2864 }
2865 }
2866
2867 for link_id in &link_ids {
2869 let link = match self.links.get_mut(link_id) {
2870 Some(l) => l,
2871 None => continue,
2872 };
2873
2874 let mut sender_actions = Vec::new();
2876 for sender in &mut link.outgoing_resources {
2877 sender_actions.extend(sender.tick(now));
2878 }
2879
2880 let mut receiver_actions = Vec::new();
2882 for receiver in &mut link.incoming_resources {
2883 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2884 link.engine.decrypt(ciphertext).map_err(|_| ())
2885 };
2886 receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2887 }
2888
2889 link.outgoing_resources
2891 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2892 link.incoming_resources
2893 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2894 let active_split_hashes: Vec<[u8; 32]> = link
2895 .outgoing_resources
2896 .iter()
2897 .filter(|s| s.flags.split)
2898 .map(|s| s.original_hash)
2899 .collect();
2900 link.outgoing_splits.retain(|original_hash, split| {
2901 split.completed_segments < split.total_segments
2902 && active_split_hashes.contains(original_hash)
2903 });
2904
2905 let _ = link;
2906 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2907 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2908 }
2909
2910 let closed: Vec<LinkId> = self
2912 .links
2913 .iter()
2914 .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2915 .map(|(id, _)| *id)
2916 .collect();
2917 for id in closed {
2918 self.links.remove(&id);
2919 all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2920 }
2921
2922 all_actions
2923 }
2924
2925 pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2927 self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2928 }
2929
2930 pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2932 self.links.get(link_id).map(|l| l.engine.state())
2933 }
2934
2935 pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2937 self.links.get(link_id).and_then(|l| l.engine.rtt())
2938 }
2939
2940 pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2942 if let Some(link) = self.links.get_mut(link_id) {
2943 link.engine.set_rtt(rtt);
2944 }
2945 }
2946
2947 pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2949 if let Some(link) = self.links.get_mut(link_id) {
2950 link.engine.record_inbound(time::now());
2951 }
2952 }
2953
2954 pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2956 if let Some(link) = self.links.get_mut(link_id) {
2957 link.engine.set_mtu(mtu);
2958 }
2959 }
2960
2961 pub fn link_count(&self) -> usize {
2963 self.links.len()
2964 }
2965
2966 pub fn resource_transfer_count(&self) -> usize {
2968 self.links
2969 .values()
2970 .map(|managed| {
2971 managed
2972 .incoming_resources
2973 .iter()
2974 .filter(|resource| !resource.flags.split)
2975 .count()
2976 + managed.incoming_splits.len()
2977 + managed
2978 .outgoing_resources
2979 .iter()
2980 .filter(|resource| !resource.flags.split)
2981 .count()
2982 + managed.outgoing_splits.len()
2983 })
2984 .sum()
2985 }
2986
2987 pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2989 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2990 let mut all_actions = Vec::new();
2991
2992 for link_id in &link_ids {
2993 let link = match self.links.get_mut(link_id) {
2994 Some(l) => l,
2995 None => continue,
2996 };
2997
2998 let mut sender_actions = Vec::new();
2999 for sender in &mut link.outgoing_resources {
3000 sender_actions.extend(sender.cancel());
3001 }
3002
3003 let mut receiver_actions = Vec::new();
3004 for receiver in &mut link.incoming_resources {
3005 receiver_actions.extend(receiver.reject());
3006 }
3007
3008 link.outgoing_resources
3009 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
3010 link.incoming_resources
3011 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
3012 link.outgoing_splits.clear();
3013 link.incoming_splits.clear();
3014
3015 let _ = link;
3016 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
3017 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
3018 }
3019
3020 all_actions
3021 }
3022
3023 pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
3025 self.links
3026 .iter()
3027 .map(|(link_id, managed)| {
3028 let state = match managed.engine.state() {
3029 LinkState::Pending => "pending",
3030 LinkState::Handshake => "handshake",
3031 LinkState::Active => "active",
3032 LinkState::Stale => "stale",
3033 LinkState::Closed => "closed",
3034 };
3035 crate::event::LinkInfoEntry {
3036 link_id: *link_id,
3037 state: state.to_string(),
3038 is_initiator: managed.engine.is_initiator(),
3039 dest_hash: managed.dest_hash,
3040 remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
3041 rtt: managed.engine.rtt(),
3042 channel_window: managed.channel.as_ref().map(|c| c.window()),
3043 channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
3044 pending_channel_packets: managed.pending_channel_packets.len(),
3045 channel_send_ok: managed.channel_send_ok,
3046 channel_send_not_ready: managed.channel_send_not_ready,
3047 channel_send_too_big: managed.channel_send_too_big,
3048 channel_send_other_error: managed.channel_send_other_error,
3049 channel_messages_received: managed.channel_messages_received,
3050 channel_proofs_sent: managed.channel_proofs_sent,
3051 channel_proofs_received: managed.channel_proofs_received,
3052 }
3053 })
3054 .collect()
3055 }
3056
3057 pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
3059 let mut entries = Vec::new();
3060 for (link_id, managed) in &self.links {
3061 let resource_sdu = Self::resource_sdu_for_link(managed);
3062 for split in managed.incoming_splits.values() {
3063 let (received, total) = Self::incoming_split_progress(split, resource_sdu);
3064 entries.push(crate::event::ResourceInfoEntry {
3065 link_id: *link_id,
3066 direction: "incoming".to_string(),
3067 total_parts: total,
3068 transferred_parts: received,
3069 complete: received >= total && total > 0,
3070 });
3071 }
3072 for recv in &managed.incoming_resources {
3073 if recv.flags.split {
3074 continue;
3075 }
3076 let (received, total) = recv.progress();
3077 entries.push(crate::event::ResourceInfoEntry {
3078 link_id: *link_id,
3079 direction: "incoming".to_string(),
3080 total_parts: total,
3081 transferred_parts: received,
3082 complete: received >= total && total > 0,
3083 });
3084 }
3085 for split in managed.outgoing_splits.values() {
3086 let (sent, total) = Self::outgoing_split_progress(split, resource_sdu);
3087 entries.push(crate::event::ResourceInfoEntry {
3088 link_id: *link_id,
3089 direction: "outgoing".to_string(),
3090 total_parts: total,
3091 transferred_parts: sent,
3092 complete: sent >= total && total > 0,
3093 });
3094 }
3095 for send in &managed.outgoing_resources {
3096 if send.flags.split {
3097 continue;
3098 }
3099 let total = send.total_parts();
3100 let sent = send.sent_parts;
3101 entries.push(crate::event::ResourceInfoEntry {
3102 link_id: *link_id,
3103 direction: "outgoing".to_string(),
3104 total_parts: total,
3105 transferred_parts: sent,
3106 complete: sent >= total && total > 0,
3107 });
3108 }
3109 }
3110 entries
3111 }
3112
3113 fn process_link_actions(
3115 &self,
3116 link_id: &LinkId,
3117 actions: &[LinkAction],
3118 ) -> Vec<LinkManagerAction> {
3119 let mut result = Vec::new();
3120 for action in actions {
3121 match action {
3122 LinkAction::StateChanged {
3123 new_state, reason, ..
3124 } => match new_state {
3125 LinkState::Closed => {
3126 result.push(LinkManagerAction::LinkClosed {
3127 link_id: *link_id,
3128 reason: *reason,
3129 });
3130 }
3131 _ => {}
3132 },
3133 LinkAction::LinkEstablished {
3134 rtt, is_initiator, ..
3135 } => {
3136 let dest_hash = self
3137 .links
3138 .get(link_id)
3139 .map(|l| l.dest_hash)
3140 .unwrap_or([0u8; 16]);
3141 result.push(LinkManagerAction::LinkEstablished {
3142 link_id: *link_id,
3143 dest_hash,
3144 rtt: *rtt,
3145 is_initiator: *is_initiator,
3146 });
3147 }
3148 LinkAction::RemoteIdentified {
3149 identity_hash,
3150 public_key,
3151 ..
3152 } => {
3153 result.push(LinkManagerAction::RemoteIdentified {
3154 link_id: *link_id,
3155 identity_hash: *identity_hash,
3156 public_key: *public_key,
3157 });
3158 }
3159 LinkAction::DataReceived { .. } => {
3160 }
3162 }
3163 }
3164 result
3165 }
3166
3167 fn process_channel_actions(
3169 &mut self,
3170 link_id: &LinkId,
3171 actions: Vec<rns_core::channel::ChannelAction>,
3172 rng: &mut dyn Rng,
3173 ) -> Vec<LinkManagerAction> {
3174 let mut result = Vec::new();
3175 for action in actions {
3176 match action {
3177 rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
3178 let encrypted = match self.links.get(link_id) {
3182 Some(link) => match link.engine.encrypt(&raw, rng) {
3183 Ok(encrypted) => encrypted,
3184 Err(_) => {
3185 if let Some(link_mut) = self.links.get_mut(link_id) {
3186 if let Some(channel) = link_mut.channel.as_mut() {
3187 channel.cancel_send(sequence);
3188 }
3189 }
3190 continue;
3191 }
3192 },
3193 None => continue,
3194 };
3195 let flags = PacketFlags {
3196 header_type: constants::HEADER_1,
3197 context_flag: constants::FLAG_UNSET,
3198 transport_type: constants::TRANSPORT_BROADCAST,
3199 destination_type: constants::DESTINATION_LINK,
3200 packet_type: constants::PACKET_TYPE_DATA,
3201 };
3202 match RawPacket::pack_raw_with_hash(
3203 flags,
3204 0,
3205 link_id,
3206 None,
3207 constants::CONTEXT_CHANNEL,
3208 &encrypted,
3209 ) {
3210 Ok((raw_bytes, packet_hash)) => {
3211 if let Some(link_mut) = self.links.get_mut(link_id) {
3212 link_mut
3213 .pending_channel_packets
3214 .insert(packet_hash, sequence);
3215 }
3216 result.push(LinkManagerAction::SendPacket {
3217 raw: raw_bytes,
3218 dest_type: constants::DESTINATION_LINK,
3219 attached_interface: None,
3220 });
3221 }
3222 Err(_) => {
3223 if let Some(link_mut) = self.links.get_mut(link_id) {
3224 if let Some(channel) = link_mut.channel.as_mut() {
3225 channel.cancel_send(sequence);
3226 }
3227 }
3228 }
3229 }
3230 }
3231 rns_core::channel::ChannelAction::MessageReceived {
3232 msgtype, payload, ..
3233 } => {
3234 result.push(LinkManagerAction::ChannelMessageReceived {
3235 link_id: *link_id,
3236 msgtype,
3237 payload,
3238 });
3239 }
3240 rns_core::channel::ChannelAction::TeardownLink => {
3241 result.push(LinkManagerAction::LinkClosed {
3242 link_id: *link_id,
3243 reason: Some(TeardownReason::Timeout),
3244 });
3245 }
3246 }
3247 }
3248 result
3249 }
3250}
3251
3252impl Default for LinkManager {
3253 fn default() -> Self {
3254 Self::new()
3255 }
3256}
3257
3258fn compute_path_hash(path: &str) -> [u8; 16] {
3261 let full = rns_core::hash::full_hash(path.as_bytes());
3262 let mut result = [0u8; 16];
3263 result.copy_from_slice(&full[..16]);
3264 result
3265}
3266
3267#[cfg(test)]
3268mod tests {
3269 use super::*;
3270 use rns_crypto::identity::Identity;
3271 use rns_crypto::{FixedRng, OsRng};
3272
3273 fn make_rng(seed: u8) -> FixedRng {
3274 FixedRng::new(&[seed; 128])
3275 }
3276
3277 fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
3278 let sig_prv = Ed25519PrivateKey::generate(rng);
3279 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3280 (sig_prv, sig_pub_bytes)
3281 }
3282
3283 #[test]
3284 fn test_register_link_destination() {
3285 let mut mgr = LinkManager::new();
3286 let mut rng = make_rng(0x01);
3287 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3288 let dest_hash = [0xDD; 16];
3289
3290 mgr.register_link_destination(
3291 dest_hash,
3292 sig_prv,
3293 sig_pub_bytes,
3294 ResourceStrategy::AcceptNone,
3295 );
3296 assert!(mgr.is_link_destination(&dest_hash));
3297
3298 mgr.deregister_link_destination(&dest_hash);
3299 assert!(!mgr.is_link_destination(&dest_hash));
3300 }
3301
3302 #[test]
3303 fn test_create_link() {
3304 let mut mgr = LinkManager::new();
3305 let mut rng = OsRng;
3306 let dest_hash = [0xDD; 16];
3307
3308 let sig_pub_bytes = [0xAA; 32]; let (link_id, actions) = mgr.create_link(
3310 &dest_hash,
3311 &sig_pub_bytes,
3312 1,
3313 constants::MTU as u32,
3314 &mut rng,
3315 );
3316 assert_ne!(link_id, [0u8; 16]);
3317 assert_eq!(actions.len(), 2);
3319 assert!(matches!(
3320 actions[0],
3321 LinkManagerAction::RegisterLinkDest { .. }
3322 ));
3323 assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
3324
3325 assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
3327 }
3328
3329 #[test]
3330 fn test_full_handshake_via_manager() {
3331 let mut rng = OsRng;
3332 let dest_hash = [0xDD; 16];
3333
3334 let mut responder_mgr = LinkManager::new();
3336 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3337 responder_mgr.register_link_destination(
3338 dest_hash,
3339 sig_prv,
3340 sig_pub_bytes,
3341 ResourceStrategy::AcceptNone,
3342 );
3343
3344 let mut initiator_mgr = LinkManager::new();
3346
3347 let (link_id, init_actions) = initiator_mgr.create_link(
3349 &dest_hash,
3350 &sig_pub_bytes,
3351 1,
3352 constants::MTU as u32,
3353 &mut rng,
3354 );
3355 assert_eq!(init_actions.len(), 2);
3356
3357 let linkrequest_raw = match &init_actions[1] {
3359 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3360 _ => panic!("Expected SendPacket"),
3361 };
3362
3363 let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
3365
3366 let resp_actions = responder_mgr.handle_local_delivery(
3368 lr_packet.destination_hash,
3369 &linkrequest_raw,
3370 lr_packet.packet_hash,
3371 rns_core::transport::types::InterfaceId(0),
3372 &mut rng,
3373 );
3374 assert!(resp_actions.len() >= 2);
3376 assert!(matches!(
3377 resp_actions[0],
3378 LinkManagerAction::RegisterLinkDest { .. }
3379 ));
3380
3381 let lrproof_raw = match &resp_actions[1] {
3383 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3384 _ => panic!("Expected SendPacket for LRPROOF"),
3385 };
3386
3387 let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
3389 let init_actions2 = initiator_mgr.handle_local_delivery(
3390 lrproof_packet.destination_hash,
3391 &lrproof_raw,
3392 lrproof_packet.packet_hash,
3393 rns_core::transport::types::InterfaceId(0),
3394 &mut rng,
3395 );
3396
3397 let has_established = init_actions2
3399 .iter()
3400 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3401 assert!(has_established, "Initiator should emit LinkEstablished");
3402
3403 let lrrtt_raw = init_actions2
3405 .iter()
3406 .find_map(|a| match a {
3407 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3408 _ => None,
3409 })
3410 .expect("Should have LRRTT SendPacket");
3411
3412 let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
3414 let resp_link_id = lrrtt_packet.destination_hash;
3415 let resp_actions2 = responder_mgr.handle_local_delivery(
3416 resp_link_id,
3417 &lrrtt_raw,
3418 lrrtt_packet.packet_hash,
3419 rns_core::transport::types::InterfaceId(0),
3420 &mut rng,
3421 );
3422
3423 let has_established = resp_actions2
3424 .iter()
3425 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3426 assert!(has_established, "Responder should emit LinkEstablished");
3427
3428 assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
3430 assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
3431
3432 assert!(initiator_mgr.link_rtt(&link_id).is_some());
3434 assert!(responder_mgr.link_rtt(&link_id).is_some());
3435 }
3436
3437 #[test]
3438 fn test_encrypted_data_exchange() {
3439 let mut rng = OsRng;
3440 let dest_hash = [0xDD; 16];
3441 let mut resp_mgr = LinkManager::new();
3442 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3443 resp_mgr.register_link_destination(
3444 dest_hash,
3445 sig_prv,
3446 sig_pub_bytes,
3447 ResourceStrategy::AcceptNone,
3448 );
3449 let mut init_mgr = LinkManager::new();
3450
3451 let (link_id, init_actions) = init_mgr.create_link(
3453 &dest_hash,
3454 &sig_pub_bytes,
3455 1,
3456 constants::MTU as u32,
3457 &mut rng,
3458 );
3459 let lr_raw = extract_send_packet(&init_actions);
3460 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3461 let resp_actions = resp_mgr.handle_local_delivery(
3462 lr_pkt.destination_hash,
3463 &lr_raw,
3464 lr_pkt.packet_hash,
3465 rns_core::transport::types::InterfaceId(0),
3466 &mut rng,
3467 );
3468 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3469 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3470 let init_actions2 = init_mgr.handle_local_delivery(
3471 lrproof_pkt.destination_hash,
3472 &lrproof_raw,
3473 lrproof_pkt.packet_hash,
3474 rns_core::transport::types::InterfaceId(0),
3475 &mut rng,
3476 );
3477 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3478 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3479 resp_mgr.handle_local_delivery(
3480 lrrtt_pkt.destination_hash,
3481 &lrrtt_raw,
3482 lrrtt_pkt.packet_hash,
3483 rns_core::transport::types::InterfaceId(0),
3484 &mut rng,
3485 );
3486
3487 let actions =
3489 init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
3490 assert_eq!(actions.len(), 1);
3491 assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
3492 }
3493
3494 #[test]
3495 fn test_request_response() {
3496 let mut rng = OsRng;
3497 let dest_hash = [0xDD; 16];
3498 let mut resp_mgr = LinkManager::new();
3499 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3500 resp_mgr.register_link_destination(
3501 dest_hash,
3502 sig_prv,
3503 sig_pub_bytes,
3504 ResourceStrategy::AcceptNone,
3505 );
3506
3507 resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
3509 Some(b"OK".to_vec())
3510 });
3511
3512 let mut init_mgr = LinkManager::new();
3513
3514 let (link_id, init_actions) = init_mgr.create_link(
3516 &dest_hash,
3517 &sig_pub_bytes,
3518 1,
3519 constants::MTU as u32,
3520 &mut rng,
3521 );
3522 let lr_raw = extract_send_packet(&init_actions);
3523 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3524 let resp_actions = resp_mgr.handle_local_delivery(
3525 lr_pkt.destination_hash,
3526 &lr_raw,
3527 lr_pkt.packet_hash,
3528 rns_core::transport::types::InterfaceId(0),
3529 &mut rng,
3530 );
3531 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3532 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3533 let init_actions2 = init_mgr.handle_local_delivery(
3534 lrproof_pkt.destination_hash,
3535 &lrproof_raw,
3536 lrproof_pkt.packet_hash,
3537 rns_core::transport::types::InterfaceId(0),
3538 &mut rng,
3539 );
3540 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3541 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3542 resp_mgr.handle_local_delivery(
3543 lrrtt_pkt.destination_hash,
3544 &lrrtt_raw,
3545 lrrtt_pkt.packet_hash,
3546 rns_core::transport::types::InterfaceId(0),
3547 &mut rng,
3548 );
3549
3550 let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
3552 assert_eq!(req_actions.len(), 1);
3553
3554 let req_raw = extract_send_packet_from(&req_actions);
3556 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3557 let resp_actions = resp_mgr.handle_local_delivery(
3558 req_pkt.destination_hash,
3559 &req_raw,
3560 req_pkt.packet_hash,
3561 rns_core::transport::types::InterfaceId(0),
3562 &mut rng,
3563 );
3564
3565 let has_response = resp_actions
3567 .iter()
3568 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3569 assert!(has_response, "Handler should produce a response packet");
3570 }
3571
3572 #[test]
3573 fn test_send_request_wraps_invalid_msgpack_data_as_bin() {
3574 use std::sync::{Arc, Mutex};
3575
3576 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3577 let mut rng = OsRng;
3578
3579 let invalid = vec![0xC1];
3580 let expected = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid.clone()));
3581 let captured = Arc::new(Mutex::new(None::<Vec<u8>>));
3582 let captured_for_handler = Arc::clone(&captured);
3583
3584 resp_mgr.register_request_handler("/bin", None, move |_link_id, _path, data, _remote| {
3585 *captured_for_handler.lock().unwrap() = Some(data.to_vec());
3586 Some(rns_core::msgpack::pack(&rns_core::msgpack::Value::Bool(
3587 true,
3588 )))
3589 });
3590
3591 let req_actions = init_mgr.send_request(&link_id, "/bin", &invalid, &mut rng);
3592 let req_raw = extract_send_packet_from(&req_actions);
3593 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3594 let resp_actions = resp_mgr.handle_local_delivery(
3595 req_pkt.destination_hash,
3596 &req_raw,
3597 req_pkt.packet_hash,
3598 rns_core::transport::types::InterfaceId(0),
3599 &mut rng,
3600 );
3601
3602 assert!(
3603 resp_actions
3604 .iter()
3605 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. })),
3606 "handler should still produce a response"
3607 );
3608 assert_eq!(*captured.lock().unwrap(), Some(expected));
3609 }
3610
3611 #[test]
3612 fn test_invalid_response_bytes_are_returned_as_msgpack_bin() {
3613 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3614 let mut rng = OsRng;
3615 let invalid_response = vec![0xC1];
3616 let expected =
3617 rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid_response.clone()));
3618
3619 resp_mgr.register_request_handler("/invalid-response", None, {
3620 let invalid_response = invalid_response.clone();
3621 move |_link_id, _path, _data, _remote| Some(invalid_response.clone())
3622 });
3623
3624 let req_actions = init_mgr.send_request(&link_id, "/invalid-response", b"\xc0", &mut rng);
3625 let req_raw = extract_send_packet_from(&req_actions);
3626 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3627 let resp_actions = resp_mgr.handle_local_delivery(
3628 req_pkt.destination_hash,
3629 &req_raw,
3630 req_pkt.packet_hash,
3631 rns_core::transport::types::InterfaceId(0),
3632 &mut rng,
3633 );
3634
3635 let resp_raw = extract_any_send_packet(&resp_actions);
3636 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3637 let init_actions = init_mgr.handle_local_delivery(
3638 resp_pkt.destination_hash,
3639 &resp_raw,
3640 resp_pkt.packet_hash,
3641 rns_core::transport::types::InterfaceId(0),
3642 &mut rng,
3643 );
3644
3645 let response_data = init_actions
3646 .iter()
3647 .find_map(|action| match action {
3648 LinkManagerAction::ResponseReceived { data, .. } => Some(data.clone()),
3649 _ => None,
3650 })
3651 .expect("initiator should receive a response");
3652 assert_eq!(response_data, expected);
3653 }
3654
3655 #[test]
3656 fn test_request_acl_deny_unidentified() {
3657 let mut rng = OsRng;
3658 let dest_hash = [0xDD; 16];
3659 let mut resp_mgr = LinkManager::new();
3660 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3661 resp_mgr.register_link_destination(
3662 dest_hash,
3663 sig_prv,
3664 sig_pub_bytes,
3665 ResourceStrategy::AcceptNone,
3666 );
3667
3668 resp_mgr.register_request_handler(
3670 "/restricted",
3671 Some(vec![[0xAA; 16]]),
3672 |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
3673 );
3674
3675 let mut init_mgr = LinkManager::new();
3676
3677 let (link_id, init_actions) = init_mgr.create_link(
3679 &dest_hash,
3680 &sig_pub_bytes,
3681 1,
3682 constants::MTU as u32,
3683 &mut rng,
3684 );
3685 let lr_raw = extract_send_packet(&init_actions);
3686 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3687 let resp_actions = resp_mgr.handle_local_delivery(
3688 lr_pkt.destination_hash,
3689 &lr_raw,
3690 lr_pkt.packet_hash,
3691 rns_core::transport::types::InterfaceId(0),
3692 &mut rng,
3693 );
3694 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3695 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3696 let init_actions2 = init_mgr.handle_local_delivery(
3697 lrproof_pkt.destination_hash,
3698 &lrproof_raw,
3699 lrproof_pkt.packet_hash,
3700 rns_core::transport::types::InterfaceId(0),
3701 &mut rng,
3702 );
3703 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3704 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3705 resp_mgr.handle_local_delivery(
3706 lrrtt_pkt.destination_hash,
3707 &lrrtt_raw,
3708 lrrtt_pkt.packet_hash,
3709 rns_core::transport::types::InterfaceId(0),
3710 &mut rng,
3711 );
3712
3713 let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
3715 let req_raw = extract_send_packet_from(&req_actions);
3716 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3717 let resp_actions = resp_mgr.handle_local_delivery(
3718 req_pkt.destination_hash,
3719 &req_raw,
3720 req_pkt.packet_hash,
3721 rns_core::transport::types::InterfaceId(0),
3722 &mut rng,
3723 );
3724
3725 let has_response = resp_actions
3727 .iter()
3728 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3729 assert!(!has_response, "Unidentified peer should be denied");
3730 }
3731
3732 #[test]
3733 fn test_teardown_link() {
3734 let mut rng = OsRng;
3735 let dest_hash = [0xDD; 16];
3736 let mut mgr = LinkManager::new();
3737
3738 let dummy_sig = [0xAA; 32];
3739 let (link_id, _) =
3740 mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3741 assert_eq!(mgr.link_count(), 1);
3742
3743 let actions = mgr.teardown_link(&link_id);
3744 let has_close = actions
3745 .iter()
3746 .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3747 assert!(has_close);
3748
3749 let tick_actions = mgr.tick(&mut rng);
3751 let has_deregister = tick_actions
3752 .iter()
3753 .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3754 assert!(has_deregister);
3755 assert_eq!(mgr.link_count(), 0);
3756 }
3757
3758 #[test]
3759 fn test_identify_on_link() {
3760 let mut rng = OsRng;
3761 let dest_hash = [0xDD; 16];
3762 let mut resp_mgr = LinkManager::new();
3763 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3764 resp_mgr.register_link_destination(
3765 dest_hash,
3766 sig_prv,
3767 sig_pub_bytes,
3768 ResourceStrategy::AcceptNone,
3769 );
3770 let mut init_mgr = LinkManager::new();
3771
3772 let (link_id, init_actions) = init_mgr.create_link(
3774 &dest_hash,
3775 &sig_pub_bytes,
3776 1,
3777 constants::MTU as u32,
3778 &mut rng,
3779 );
3780 let lr_raw = extract_send_packet(&init_actions);
3781 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3782 let resp_actions = resp_mgr.handle_local_delivery(
3783 lr_pkt.destination_hash,
3784 &lr_raw,
3785 lr_pkt.packet_hash,
3786 rns_core::transport::types::InterfaceId(0),
3787 &mut rng,
3788 );
3789 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3790 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3791 let init_actions2 = init_mgr.handle_local_delivery(
3792 lrproof_pkt.destination_hash,
3793 &lrproof_raw,
3794 lrproof_pkt.packet_hash,
3795 rns_core::transport::types::InterfaceId(0),
3796 &mut rng,
3797 );
3798 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3799 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3800 resp_mgr.handle_local_delivery(
3801 lrrtt_pkt.destination_hash,
3802 &lrrtt_raw,
3803 lrrtt_pkt.packet_hash,
3804 rns_core::transport::types::InterfaceId(0),
3805 &mut rng,
3806 );
3807
3808 let identity = Identity::new(&mut rng);
3810 let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3811 assert_eq!(id_actions.len(), 1);
3812
3813 let id_raw = extract_send_packet_from(&id_actions);
3815 let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3816 let resp_actions = resp_mgr.handle_local_delivery(
3817 id_pkt.destination_hash,
3818 &id_raw,
3819 id_pkt.packet_hash,
3820 rns_core::transport::types::InterfaceId(0),
3821 &mut rng,
3822 );
3823
3824 let has_identified = resp_actions
3825 .iter()
3826 .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3827 assert!(has_identified, "Responder should emit RemoteIdentified");
3828 }
3829
3830 #[test]
3831 fn test_path_hash_computation() {
3832 let h1 = compute_path_hash("/status");
3833 let h2 = compute_path_hash("/path");
3834 assert_ne!(h1, h2);
3835
3836 assert_eq!(h1, compute_path_hash("/status"));
3838 }
3839
3840 #[test]
3841 fn test_link_count() {
3842 let mut mgr = LinkManager::new();
3843 let mut rng = OsRng;
3844
3845 assert_eq!(mgr.link_count(), 0);
3846
3847 let dummy_sig = [0xAA; 32];
3848 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3849 assert_eq!(mgr.link_count(), 1);
3850
3851 mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3852 assert_eq!(mgr.link_count(), 2);
3853 }
3854
3855 fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3858 extract_send_packet_at(actions, actions.len() - 1)
3859 }
3860
3861 fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3862 match &actions[idx] {
3863 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3864 other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3865 }
3866 }
3867
3868 fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3869 actions
3870 .iter()
3871 .find_map(|a| match a {
3872 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3873 _ => None,
3874 })
3875 .expect("Expected at least one SendPacket action")
3876 }
3877
3878 fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3879 extract_any_send_packet(actions)
3880 }
3881
3882 fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3885 let mut rng = OsRng;
3886 let dest_hash = [0xDD; 16];
3887 let mut resp_mgr = LinkManager::new();
3888 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3889 resp_mgr.register_link_destination(
3890 dest_hash,
3891 sig_prv,
3892 sig_pub_bytes,
3893 ResourceStrategy::AcceptNone,
3894 );
3895 let mut init_mgr = LinkManager::new();
3896
3897 let (link_id, init_actions) = init_mgr.create_link(
3898 &dest_hash,
3899 &sig_pub_bytes,
3900 1,
3901 constants::MTU as u32,
3902 &mut rng,
3903 );
3904 let lr_raw = extract_send_packet(&init_actions);
3905 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3906 let resp_actions = resp_mgr.handle_local_delivery(
3907 lr_pkt.destination_hash,
3908 &lr_raw,
3909 lr_pkt.packet_hash,
3910 rns_core::transport::types::InterfaceId(0),
3911 &mut rng,
3912 );
3913 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3914 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3915 let init_actions2 = init_mgr.handle_local_delivery(
3916 lrproof_pkt.destination_hash,
3917 &lrproof_raw,
3918 lrproof_pkt.packet_hash,
3919 rns_core::transport::types::InterfaceId(0),
3920 &mut rng,
3921 );
3922 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3923 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3924 resp_mgr.handle_local_delivery(
3925 lrrtt_pkt.destination_hash,
3926 &lrrtt_raw,
3927 lrrtt_pkt.packet_hash,
3928 rns_core::transport::types::InterfaceId(0),
3929 &mut rng,
3930 );
3931
3932 assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3933 assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3934
3935 (init_mgr, resp_mgr, link_id)
3936 }
3937
3938 #[test]
3943 fn test_resource_strategy_default() {
3944 let mut mgr = LinkManager::new();
3945 let mut rng = OsRng;
3946 let dummy_sig = [0xAA; 32];
3947 let (link_id, _) =
3948 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3949
3950 let link = mgr.links.get(&link_id).unwrap();
3952 assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3953 }
3954
3955 #[test]
3956 fn test_set_resource_strategy() {
3957 let mut mgr = LinkManager::new();
3958 let mut rng = OsRng;
3959 let dummy_sig = [0xAA; 32];
3960 let (link_id, _) =
3961 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3962
3963 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3964 assert_eq!(
3965 mgr.links.get(&link_id).unwrap().resource_strategy,
3966 ResourceStrategy::AcceptAll
3967 );
3968
3969 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3970 assert_eq!(
3971 mgr.links.get(&link_id).unwrap().resource_strategy,
3972 ResourceStrategy::AcceptApp
3973 );
3974 }
3975
3976 #[test]
3977 fn test_send_resource_on_active_link() {
3978 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3979 let mut rng = OsRng;
3980
3981 let data = vec![0xAB; 100]; let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3984
3985 let has_send = actions
3987 .iter()
3988 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3989 assert!(
3990 has_send,
3991 "send_resource should emit advertisement SendPacket"
3992 );
3993 }
3994
3995 fn first_resource_advertisement(
3996 mgr: &LinkManager,
3997 link_id: &LinkId,
3998 actions: &[LinkManagerAction],
3999 ) -> rns_core::resource::ResourceAdvertisement {
4000 let adv_raw = actions
4001 .iter()
4002 .find_map(|action| match action {
4003 LinkManagerAction::SendPacket { raw, .. } => {
4004 let pkt = RawPacket::unpack(raw).ok()?;
4005 (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw)
4006 }
4007 _ => None,
4008 })
4009 .expect("sender should emit a resource advertisement");
4010 let adv_pkt = RawPacket::unpack(adv_raw).unwrap();
4011 let plaintext = mgr
4012 .links
4013 .get(link_id)
4014 .unwrap()
4015 .engine
4016 .decrypt(&adv_pkt.data)
4017 .unwrap();
4018 rns_core::resource::ResourceAdvertisement::unpack(&plaintext).unwrap()
4019 }
4020
4021 fn deterministic_bytes(len: usize) -> Vec<u8> {
4022 let mut state = 0x1234_5678u32;
4023 (0..len)
4024 .map(|_| {
4025 state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4026 (state >> 16) as u8
4027 })
4028 .collect()
4029 }
4030
4031 fn drive_link_manager_packets(
4032 init_mgr: &mut LinkManager,
4033 resp_mgr: &mut LinkManager,
4034 initial_actions: Vec<LinkManagerAction>,
4035 initial_source: char,
4036 rng: &mut dyn Rng,
4037 max_rounds: usize,
4038 ) -> (
4039 Option<Vec<u8>>,
4040 bool,
4041 Vec<(char, usize, usize)>,
4042 Vec<(char, String)>,
4043 usize,
4044 ) {
4045 let mut pending: Vec<(char, LinkManagerAction)> = initial_actions
4046 .into_iter()
4047 .map(|a| (initial_source, a))
4048 .collect();
4049 let mut rounds = 0;
4050 let mut received_data = None;
4051 let mut sender_completed = false;
4052 let mut progress = Vec::new();
4053 let mut failures = Vec::new();
4054
4055 while !pending.is_empty() && rounds < max_rounds {
4056 rounds += 1;
4057 let mut next = Vec::new();
4058 for (source, action) in pending.drain(..) {
4059 let LinkManagerAction::SendPacket { raw, .. } = action else {
4060 continue;
4061 };
4062 let pkt = RawPacket::unpack(&raw).unwrap();
4063 let target_actions = if source == 'i' {
4064 resp_mgr.handle_local_delivery(
4065 pkt.destination_hash,
4066 &raw,
4067 pkt.packet_hash,
4068 rns_core::transport::types::InterfaceId(0),
4069 rng,
4070 )
4071 } else {
4072 init_mgr.handle_local_delivery(
4073 pkt.destination_hash,
4074 &raw,
4075 pkt.packet_hash,
4076 rns_core::transport::types::InterfaceId(0),
4077 rng,
4078 )
4079 };
4080 let target_source = if source == 'i' { 'r' } else { 'i' };
4081 for action in &target_actions {
4082 match action {
4083 LinkManagerAction::ResourceReceived { data, .. } => {
4084 received_data = Some(data.clone());
4085 }
4086 LinkManagerAction::ResourceCompleted { .. } => {
4087 sender_completed = true;
4088 }
4089 LinkManagerAction::ResourceProgress {
4090 received, total, ..
4091 } => {
4092 progress.push((target_source, *received, *total));
4093 }
4094 LinkManagerAction::ResourceFailed { error, .. } => {
4095 failures.push((target_source, error.clone()));
4096 }
4097 _ => {}
4098 }
4099 }
4100 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4101 }
4102 pending = next;
4103 }
4104
4105 (received_data, sender_completed, progress, failures, rounds)
4106 }
4107
4108 #[test]
4109 fn test_send_resource_auto_compress_option_controls_adv_flag() {
4110 let data = vec![0x41; 2048];
4111
4112 let (mut compressed_mgr, _resp_mgr, link_id) = setup_active_link();
4113 let mut rng = OsRng;
4114 let actions =
4115 compressed_mgr.send_resource_with_auto_compress(&link_id, &data, None, true, &mut rng);
4116 let adv = first_resource_advertisement(&compressed_mgr, &link_id, &actions);
4117 assert!(
4118 adv.flags.compressed,
4119 "compressible resource should compress"
4120 );
4121
4122 let (mut plain_mgr, _resp_mgr, link_id) = setup_active_link();
4123 let mut rng = OsRng;
4124 let actions =
4125 plain_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4126 let adv = first_resource_advertisement(&plain_mgr, &link_id, &actions);
4127 assert!(
4128 !adv.flags.compressed,
4129 "auto_compress=false should keep resource uncompressed"
4130 );
4131 }
4132
4133 #[test]
4134 fn test_send_resource_on_inactive_link() {
4135 let mut mgr = LinkManager::new();
4136 let mut rng = OsRng;
4137 let dummy_sig = [0xAA; 32];
4138 let (link_id, _) =
4139 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
4140
4141 let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
4143 assert!(actions.is_empty(), "Cannot send resource on inactive link");
4144 }
4145
4146 #[test]
4147 fn test_send_resource_without_session_key_uses_encrypt_fallback_path() {
4148 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4149 let mut rng = OsRng;
4150 init_mgr
4151 .links
4152 .get_mut(&link_id)
4153 .unwrap()
4154 .engine
4155 .clear_session_for_testing();
4156
4157 let actions = init_mgr.send_resource(&link_id, b"data", None, &mut rng);
4158
4159 assert!(
4160 actions.is_empty(),
4161 "without a session key, no advertisement should be emitted"
4162 );
4163 assert_eq!(
4164 init_mgr
4165 .links
4166 .get(&link_id)
4167 .map(|managed| managed.outgoing_resources.len()),
4168 Some(1)
4169 );
4170 }
4171
4172 #[test]
4173 fn test_resource_adv_rejected_by_accept_none() {
4174 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4175 let mut rng = OsRng;
4176
4177 let data = vec![0xCD; 100];
4180 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4181
4182 for action in &adv_actions {
4184 if let LinkManagerAction::SendPacket { raw, .. } = action {
4185 let pkt = RawPacket::unpack(raw).unwrap();
4186 let resp_actions = resp_mgr.handle_local_delivery(
4187 pkt.destination_hash,
4188 raw,
4189 pkt.packet_hash,
4190 rns_core::transport::types::InterfaceId(0),
4191 &mut rng,
4192 );
4193 let has_resource_received = resp_actions
4195 .iter()
4196 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4197 assert!(
4198 !has_resource_received,
4199 "AcceptNone should not accept resource"
4200 );
4201 }
4202 }
4203 }
4204
4205 #[test]
4206 fn test_resource_adv_accepted_by_accept_all() {
4207 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4208 let mut rng = OsRng;
4209
4210 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4212
4213 let data = vec![0xCD; 100];
4215 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4216
4217 for action in &adv_actions {
4219 if let LinkManagerAction::SendPacket { raw, .. } = action {
4220 let pkt = RawPacket::unpack(raw).unwrap();
4221 let resp_actions = resp_mgr.handle_local_delivery(
4222 pkt.destination_hash,
4223 raw,
4224 pkt.packet_hash,
4225 rns_core::transport::types::InterfaceId(0),
4226 &mut rng,
4227 );
4228 let has_send = resp_actions
4230 .iter()
4231 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4232 assert!(has_send, "AcceptAll should accept and request parts");
4233 }
4234 }
4235 }
4236
4237 #[test]
4238 fn test_resource_accept_app_query() {
4239 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4240 let mut rng = OsRng;
4241
4242 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4244
4245 let data = vec![0xCD; 100];
4247 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4248
4249 let mut got_query = false;
4251 for action in &adv_actions {
4252 if let LinkManagerAction::SendPacket { raw, .. } = action {
4253 let pkt = RawPacket::unpack(raw).unwrap();
4254 let resp_actions = resp_mgr.handle_local_delivery(
4255 pkt.destination_hash,
4256 raw,
4257 pkt.packet_hash,
4258 rns_core::transport::types::InterfaceId(0),
4259 &mut rng,
4260 );
4261 for a in &resp_actions {
4262 if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
4263 got_query = true;
4264 }
4265 }
4266 }
4267 }
4268 assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
4269 }
4270
4271 #[test]
4272 fn test_resource_accept_app_accept() {
4273 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4274 let mut rng = OsRng;
4275
4276 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4277
4278 let data = vec![0xCD; 100];
4279 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4280
4281 for action in &adv_actions {
4282 if let LinkManagerAction::SendPacket { raw, .. } = action {
4283 let pkt = RawPacket::unpack(raw).unwrap();
4284 let resp_actions = resp_mgr.handle_local_delivery(
4285 pkt.destination_hash,
4286 raw,
4287 pkt.packet_hash,
4288 rns_core::transport::types::InterfaceId(0),
4289 &mut rng,
4290 );
4291 for a in &resp_actions {
4292 if let LinkManagerAction::ResourceAcceptQuery {
4293 link_id: lid,
4294 resource_hash,
4295 ..
4296 } = a
4297 {
4298 let accept_actions =
4300 resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
4301 let has_send = accept_actions
4303 .iter()
4304 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4305 assert!(
4306 has_send,
4307 "Accepting resource should produce request for parts"
4308 );
4309 }
4310 }
4311 }
4312 }
4313 }
4314
4315 #[test]
4316 fn test_resource_accept_app_reject() {
4317 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4318 let mut rng = OsRng;
4319
4320 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4321
4322 let data = vec![0xCD; 100];
4323 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4324
4325 for action in &adv_actions {
4326 if let LinkManagerAction::SendPacket { raw, .. } = action {
4327 let pkt = RawPacket::unpack(raw).unwrap();
4328 let resp_actions = resp_mgr.handle_local_delivery(
4329 pkt.destination_hash,
4330 raw,
4331 pkt.packet_hash,
4332 rns_core::transport::types::InterfaceId(0),
4333 &mut rng,
4334 );
4335 for a in &resp_actions {
4336 if let LinkManagerAction::ResourceAcceptQuery {
4337 link_id: lid,
4338 resource_hash,
4339 ..
4340 } = a
4341 {
4342 let reject_actions =
4344 resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
4345 let has_resource_received = reject_actions
4348 .iter()
4349 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4350 assert!(!has_resource_received);
4351 }
4352 }
4353 }
4354 }
4355 }
4356
4357 #[test]
4358 fn test_resource_full_transfer() {
4359 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4360 let mut rng = OsRng;
4361
4362 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4364
4365 let original_data = b"Hello, Resource Transfer!".to_vec();
4367 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
4368
4369 let mut pending: Vec<(char, LinkManagerAction)> =
4372 adv_actions.into_iter().map(|a| ('i', a)).collect();
4373 let mut rounds = 0;
4374 let max_rounds = 50;
4375 let mut resource_received = false;
4376 let mut sender_completed = false;
4377
4378 while !pending.is_empty() && rounds < max_rounds {
4379 rounds += 1;
4380 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4381
4382 for (source, action) in pending.drain(..) {
4383 if let LinkManagerAction::SendPacket { raw, .. } = action {
4384 let pkt = RawPacket::unpack(&raw).unwrap();
4385
4386 let target_actions = if source == 'i' {
4388 resp_mgr.handle_local_delivery(
4389 pkt.destination_hash,
4390 &raw,
4391 pkt.packet_hash,
4392 rns_core::transport::types::InterfaceId(0),
4393 &mut rng,
4394 )
4395 } else {
4396 init_mgr.handle_local_delivery(
4397 pkt.destination_hash,
4398 &raw,
4399 pkt.packet_hash,
4400 rns_core::transport::types::InterfaceId(0),
4401 &mut rng,
4402 )
4403 };
4404
4405 let target_source = if source == 'i' { 'r' } else { 'i' };
4406 for a in &target_actions {
4407 match a {
4408 LinkManagerAction::ResourceReceived { data, .. } => {
4409 assert_eq!(*data, original_data);
4410 resource_received = true;
4411 }
4412 LinkManagerAction::ResourceCompleted { .. } => {
4413 sender_completed = true;
4414 }
4415 _ => {}
4416 }
4417 }
4418 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4419 }
4420 }
4421 pending = next;
4422 }
4423
4424 assert!(
4425 resource_received,
4426 "Responder should receive resource data (rounds={})",
4427 rounds
4428 );
4429 assert!(
4430 sender_completed,
4431 "Sender should get completion proof (rounds={})",
4432 rounds
4433 );
4434 }
4435
4436 #[test]
4437 fn test_split_resource_advertisement_and_progress_entries() {
4438 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4439 let mut rng = OsRng;
4440 let data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4441
4442 let actions =
4443 init_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4444 let adv = first_resource_advertisement(&init_mgr, &link_id, &actions);
4445
4446 assert!(adv.flags.split);
4447 assert_eq!(adv.segment_index, 1);
4448 assert_eq!(adv.total_segments, 2);
4449 assert_eq!(adv.data_size, data.len() as u64);
4450
4451 let managed = init_mgr.links.get(&link_id).unwrap();
4452 assert_eq!(managed.outgoing_splits.len(), 1);
4453 assert_eq!(
4454 managed
4455 .outgoing_resources
4456 .iter()
4457 .filter(|sender| sender.flags.split)
4458 .count(),
4459 2
4460 );
4461
4462 let entries = init_mgr.resource_entries();
4463 assert_eq!(entries.len(), 1);
4464 assert_eq!(entries[0].direction, "outgoing");
4465 assert!(entries[0].total_parts > managed.outgoing_resources[0].total_parts());
4466 assert_eq!(entries[0].transferred_parts, 0);
4467 }
4468
4469 #[test]
4470 fn test_split_resource_full_transfer_and_monotonic_progress() {
4471 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4472 let mut rng = OsRng;
4473 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4474
4475 let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 2048);
4476 let initial_actions = init_mgr.send_resource_with_auto_compress(
4477 &link_id,
4478 &original_data,
4479 None,
4480 false,
4481 &mut rng,
4482 );
4483
4484 let (received_data, sender_completed, progress, failures, rounds) =
4485 drive_link_manager_packets(
4486 &mut init_mgr,
4487 &mut resp_mgr,
4488 initial_actions,
4489 'i',
4490 &mut rng,
4491 10_000,
4492 );
4493
4494 assert!(
4495 received_data.as_ref().is_some_and(|data| data == &original_data),
4496 "split transfer did not deliver payload in {rounds} rounds; sender_completed={sender_completed}; failures={failures:?}; last_progress={:?}; init_entries={:?}; resp_entries={:?}",
4497 progress.last(),
4498 init_mgr.resource_entries(),
4499 resp_mgr.resource_entries()
4500 );
4501 assert!(
4502 sender_completed,
4503 "sender did not complete in {rounds} rounds"
4504 );
4505 assert!(
4506 progress
4507 .iter()
4508 .any(|(_, received, total)| received == total),
4509 "expected final progress update"
4510 );
4511
4512 let mut init_last = 0;
4513 let mut resp_last = 0;
4514 for (side, received, total) in progress {
4515 assert!(received <= total);
4516 match side {
4517 'i' => {
4518 assert!(
4519 received >= init_last,
4520 "initiator progress regressed from {init_last} to {received}"
4521 );
4522 init_last = received;
4523 }
4524 'r' => {
4525 assert!(
4526 received >= resp_last,
4527 "responder progress regressed from {resp_last} to {received}"
4528 );
4529 resp_last = received;
4530 }
4531 _ => unreachable!(),
4532 }
4533 }
4534 }
4535
4536 #[test]
4537 fn test_split_resource_accept_app_queries_only_first_segment() {
4538 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4539 let mut rng = OsRng;
4540 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4541
4542 let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4543 let adv_actions = init_mgr.send_resource_with_auto_compress(
4544 &link_id,
4545 &original_data,
4546 None,
4547 false,
4548 &mut rng,
4549 );
4550 let adv_raw = extract_any_send_packet(&adv_actions);
4551 let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4552 let query_actions = resp_mgr.handle_local_delivery(
4553 adv_pkt.destination_hash,
4554 &adv_raw,
4555 adv_pkt.packet_hash,
4556 rns_core::transport::types::InterfaceId(0),
4557 &mut rng,
4558 );
4559
4560 let queries: Vec<_> = query_actions
4561 .iter()
4562 .filter_map(|action| match action {
4563 LinkManagerAction::ResourceAcceptQuery { resource_hash, .. } => {
4564 Some(resource_hash.clone())
4565 }
4566 _ => None,
4567 })
4568 .collect();
4569 assert_eq!(queries.len(), 1);
4570
4571 let accept_actions = resp_mgr.accept_resource(&link_id, &queries[0], true, &mut rng);
4572 let (received_data, sender_completed, _progress, failures, rounds) =
4573 drive_link_manager_packets(
4574 &mut init_mgr,
4575 &mut resp_mgr,
4576 accept_actions,
4577 'r',
4578 &mut rng,
4579 10_000,
4580 );
4581
4582 assert!(
4583 failures.is_empty(),
4584 "split AcceptApp transfer failed: {failures:?}"
4585 );
4586 assert!(
4587 received_data
4588 .as_ref()
4589 .is_some_and(|data| data == &original_data),
4590 "split AcceptApp transfer did not deliver in {rounds} rounds"
4591 );
4592 assert!(sender_completed);
4593 }
4594
4595 #[test]
4596 fn test_resource_cancel_icl() {
4597 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4598 let mut rng = OsRng;
4599
4600 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4601
4602 let data = vec![0xAB; 2000];
4604 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4605
4606 for action in &adv_actions {
4608 if let LinkManagerAction::SendPacket { raw, .. } = action {
4609 let pkt = RawPacket::unpack(raw).unwrap();
4610 resp_mgr.handle_local_delivery(
4611 pkt.destination_hash,
4612 raw,
4613 pkt.packet_hash,
4614 rns_core::transport::types::InterfaceId(0),
4615 &mut rng,
4616 );
4617 }
4618 }
4619
4620 assert!(!resp_mgr
4622 .links
4623 .get(&link_id)
4624 .unwrap()
4625 .incoming_resources
4626 .is_empty());
4627
4628 let icl_actions = resp_mgr.handle_resource_icl(&link_id);
4630
4631 let has_failed = icl_actions
4633 .iter()
4634 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4635 assert!(has_failed, "ICL should produce ResourceFailed");
4636 }
4637
4638 #[test]
4639 fn test_resource_cancel_rcl() {
4640 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4641 let mut rng = OsRng;
4642
4643 let data = vec![0xAB; 2000];
4645 init_mgr.send_resource(&link_id, &data, None, &mut rng);
4646
4647 assert!(!init_mgr
4649 .links
4650 .get(&link_id)
4651 .unwrap()
4652 .outgoing_resources
4653 .is_empty());
4654
4655 let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
4657
4658 let has_failed = rcl_actions
4659 .iter()
4660 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4661 assert!(has_failed, "RCL should produce ResourceFailed");
4662 }
4663
4664 #[test]
4665 fn test_cancel_all_resources_clears_active_transfers() {
4666 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4667 let mut rng = OsRng;
4668
4669 let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
4670 assert!(!actions.is_empty());
4671 assert_eq!(init_mgr.resource_transfer_count(), 1);
4672
4673 let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
4674
4675 assert_eq!(init_mgr.resource_transfer_count(), 0);
4676 assert!(cancel_actions
4677 .iter()
4678 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
4679 }
4680
4681 #[test]
4682 fn test_resource_tick_cleans_up() {
4683 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4684 let mut rng = OsRng;
4685
4686 let data = vec![0xAB; 100];
4687 init_mgr.send_resource(&link_id, &data, None, &mut rng);
4688
4689 assert!(!init_mgr
4690 .links
4691 .get(&link_id)
4692 .unwrap()
4693 .outgoing_resources
4694 .is_empty());
4695
4696 init_mgr.handle_resource_rcl(&link_id);
4698
4699 init_mgr.tick(&mut rng);
4701
4702 assert!(
4703 init_mgr
4704 .links
4705 .get(&link_id)
4706 .unwrap()
4707 .outgoing_resources
4708 .is_empty(),
4709 "Tick should clean up completed/failed outgoing resources"
4710 );
4711 }
4712
4713 #[test]
4714 fn test_build_link_packet() {
4715 let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4716
4717 let actions =
4718 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
4719 assert_eq!(actions.len(), 1);
4720 if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
4721 let pkt = RawPacket::unpack(raw).unwrap();
4722 assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
4723 assert_eq!(*dest_type, constants::DESTINATION_LINK);
4724 } else {
4725 panic!("Expected SendPacket");
4726 }
4727 }
4728
4729 #[test]
4730 fn test_build_link_packet_returns_empty_when_mtu_too_small() {
4731 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4732 init_mgr.set_link_mtu(&link_id, 84);
4733
4734 let actions =
4735 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, &[0xAA; 200]);
4736 assert!(actions.is_empty(), "oversized packet should not be built");
4737 }
4738
4739 #[test]
4740 fn test_process_resource_actions_encrypted_variants_drop_on_encrypt_failure() {
4741 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4742 let mut rng = OsRng;
4743 init_mgr
4744 .links
4745 .get_mut(&link_id)
4746 .unwrap()
4747 .engine
4748 .clear_session_for_testing();
4749
4750 let cases = vec![
4751 ResourceAction::SendAdvertisement(vec![1, 2, 3]),
4752 ResourceAction::SendRequest(vec![4, 5, 6]),
4753 ResourceAction::SendHmu(vec![7, 8, 9]),
4754 ResourceAction::SendProof(vec![10, 11, 12]),
4755 ResourceAction::SendCancelInitiator(vec![13, 14, 15]),
4756 ResourceAction::SendCancelReceiver(vec![16, 17, 18]),
4757 ];
4758
4759 for action in cases {
4760 let out = init_mgr.process_resource_actions(&link_id, vec![action], &mut rng);
4761 assert!(
4762 out.is_empty(),
4763 "encrypt failure should suppress packet emission"
4764 );
4765 }
4766 }
4767
4768 #[test]
4773 fn test_channel_message_delivery() {
4774 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4775 let mut rng = OsRng;
4776
4777 let chan_actions = init_mgr
4779 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4780 .expect("active link channel send should succeed");
4781 assert!(!chan_actions.is_empty());
4782
4783 let mut got_channel_msg = false;
4785 for action in &chan_actions {
4786 if let LinkManagerAction::SendPacket { raw, .. } = action {
4787 let pkt = RawPacket::unpack(raw).unwrap();
4788 let resp_actions = resp_mgr.handle_local_delivery(
4789 pkt.destination_hash,
4790 raw,
4791 pkt.packet_hash,
4792 rns_core::transport::types::InterfaceId(0),
4793 &mut rng,
4794 );
4795 for a in &resp_actions {
4796 if let LinkManagerAction::ChannelMessageReceived {
4797 msgtype, payload, ..
4798 } = a
4799 {
4800 assert_eq!(*msgtype, 42);
4801 assert_eq!(*payload, b"channel data");
4802 got_channel_msg = true;
4803 }
4804 }
4805 }
4806 }
4807 assert!(got_channel_msg, "Responder should receive channel message");
4808 }
4809
4810 #[test]
4811 fn test_channel_send_drops_packet_when_encrypt_fails() {
4812 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4813 let mut rng = OsRng;
4814 init_mgr
4815 .links
4816 .get_mut(&link_id)
4817 .unwrap()
4818 .engine
4819 .clear_session_for_testing();
4820
4821 let actions = init_mgr
4822 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4823 .expect("channel should still accept the message locally");
4824
4825 assert!(
4826 actions.is_empty(),
4827 "encrypt failure should suppress channel packet"
4828 );
4829 assert!(
4830 init_mgr
4831 .links
4832 .get(&link_id)
4833 .unwrap()
4834 .pending_channel_packets
4835 .is_empty(),
4836 "failed packet encryption must not track a pending channel proof"
4837 );
4838 }
4839
4840 #[test]
4841 fn test_channel_proof_reopens_send_window() {
4842 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4843 let mut rng = OsRng;
4844
4845 init_mgr
4846 .send_channel_message(&link_id, 42, b"first", &mut rng)
4847 .expect("first send should succeed");
4848 init_mgr
4849 .send_channel_message(&link_id, 42, b"second", &mut rng)
4850 .expect("second send should succeed");
4851
4852 let err = init_mgr
4853 .send_channel_message(&link_id, 42, b"third", &mut rng)
4854 .expect_err("third send should hit the initial channel window");
4855 assert_eq!(err, "Channel is not ready to send");
4856
4857 let queued_packets = init_mgr
4858 .links
4859 .get(&link_id)
4860 .unwrap()
4861 .pending_channel_packets
4862 .clone();
4863 assert_eq!(queued_packets.len(), 2);
4864 for tracked_hash in queued_packets.keys().take(1) {
4865 let mut proof_data = Vec::with_capacity(96);
4866 proof_data.extend_from_slice(tracked_hash);
4867 proof_data.extend_from_slice(&[0x11; 64]);
4868 let flags = PacketFlags {
4869 header_type: constants::HEADER_1,
4870 context_flag: constants::FLAG_UNSET,
4871 transport_type: constants::TRANSPORT_BROADCAST,
4872 destination_type: constants::DESTINATION_LINK,
4873 packet_type: constants::PACKET_TYPE_PROOF,
4874 };
4875 let proof = RawPacket::pack(
4876 flags,
4877 0,
4878 &link_id,
4879 None,
4880 constants::CONTEXT_NONE,
4881 &proof_data,
4882 )
4883 .expect("proof packet should pack");
4884 let ack_actions = init_mgr.handle_local_delivery(
4885 link_id,
4886 &proof.raw,
4887 proof.packet_hash,
4888 rns_core::transport::types::InterfaceId(0),
4889 &mut rng,
4890 );
4891 assert!(
4892 ack_actions.is_empty(),
4893 "proof delivery should only update channel state"
4894 );
4895 }
4896
4897 init_mgr
4898 .send_channel_message(&link_id, 42, b"third", &mut rng)
4899 .expect("proof should free one channel slot");
4900 }
4901
4902 #[test]
4903 fn test_generic_link_data_delivery() {
4904 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
4905 let mut rng = OsRng;
4906
4907 let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
4909 assert_eq!(actions.len(), 1);
4910
4911 let raw = extract_any_send_packet(&actions);
4913 let pkt = RawPacket::unpack(&raw).unwrap();
4914 let resp_actions = resp_mgr.handle_local_delivery(
4915 pkt.destination_hash,
4916 &raw,
4917 pkt.packet_hash,
4918 rns_core::transport::types::InterfaceId(0),
4919 &mut rng,
4920 );
4921
4922 let has_data = resp_actions
4923 .iter()
4924 .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
4925 assert!(
4926 has_data,
4927 "Responder should receive LinkDataReceived for unknown context"
4928 );
4929 }
4930
4931 #[test]
4932 fn test_invalid_encrypted_contexts_are_ignored() {
4933 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4934 let mut rng = OsRng;
4935 let contexts = [
4936 constants::CONTEXT_CHANNEL,
4937 constants::CONTEXT_REQUEST,
4938 constants::CONTEXT_RESPONSE,
4939 constants::CONTEXT_RESOURCE_ADV,
4940 constants::CONTEXT_RESOURCE_REQ,
4941 constants::CONTEXT_RESOURCE_HMU,
4942 constants::CONTEXT_RESOURCE_PRF,
4943 0x42,
4944 ];
4945
4946 for context in contexts {
4947 let flags = PacketFlags {
4948 header_type: constants::HEADER_1,
4949 context_flag: constants::FLAG_UNSET,
4950 transport_type: constants::TRANSPORT_BROADCAST,
4951 destination_type: constants::DESTINATION_LINK,
4952 packet_type: constants::PACKET_TYPE_DATA,
4953 };
4954 let pkt = RawPacket::pack(flags, 0, &link_id, None, context, b"invalid-ciphertext")
4955 .expect("test packet should pack");
4956 let actions = resp_mgr.handle_local_delivery(
4957 pkt.destination_hash,
4958 &pkt.raw,
4959 pkt.packet_hash,
4960 rns_core::transport::types::InterfaceId(0),
4961 &mut rng,
4962 );
4963 assert!(
4964 actions.is_empty(),
4965 "invalid ciphertext for context {context:#x} should be ignored"
4966 );
4967 }
4968 }
4969
4970 #[test]
4971 fn test_resource_part_without_matching_receiver_is_ignored() {
4972 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4973 let mut rng = OsRng;
4974 let flags = PacketFlags {
4975 header_type: constants::HEADER_1,
4976 context_flag: constants::FLAG_UNSET,
4977 transport_type: constants::TRANSPORT_BROADCAST,
4978 destination_type: constants::DESTINATION_LINK,
4979 packet_type: constants::PACKET_TYPE_DATA,
4980 };
4981 let pkt = RawPacket::pack(
4982 flags,
4983 0,
4984 &link_id,
4985 None,
4986 constants::CONTEXT_RESOURCE,
4987 b"orphan-part",
4988 )
4989 .expect("test packet should pack");
4990
4991 let actions = resp_mgr.handle_local_delivery(
4992 pkt.destination_hash,
4993 &pkt.raw,
4994 pkt.packet_hash,
4995 rns_core::transport::types::InterfaceId(0),
4996 &mut rng,
4997 );
4998
4999 assert!(actions.is_empty(), "orphan resource part should be ignored");
5000 }
5001
5002 #[test]
5003 fn test_response_delivery() {
5004 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5005 let mut rng = OsRng;
5006
5007 resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
5009 Some(data.to_vec())
5010 });
5011
5012 let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); assert!(!req_actions.is_empty());
5015
5016 let req_raw = extract_any_send_packet(&req_actions);
5018 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5019 let resp_actions = resp_mgr.handle_local_delivery(
5020 req_pkt.destination_hash,
5021 &req_raw,
5022 req_pkt.packet_hash,
5023 rns_core::transport::types::InterfaceId(0),
5024 &mut rng,
5025 );
5026 let has_resp_send = resp_actions
5027 .iter()
5028 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
5029 assert!(has_resp_send, "Handler should produce response");
5030
5031 let resp_raw = extract_any_send_packet(&resp_actions);
5033 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
5034 let init_actions = init_mgr.handle_local_delivery(
5035 resp_pkt.destination_hash,
5036 &resp_raw,
5037 resp_pkt.packet_hash,
5038 rns_core::transport::types::InterfaceId(0),
5039 &mut rng,
5040 );
5041
5042 let has_response_received = init_actions
5043 .iter()
5044 .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
5045 assert!(
5046 has_response_received,
5047 "Initiator should receive ResponseReceived"
5048 );
5049 }
5050
5051 #[test]
5052 fn test_large_response_uses_resource_fallback() {
5053 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
5054 let mut rng = OsRng;
5055
5056 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5059 resp_mgr.register_request_handler("/large", None, {
5060 let large_payload = large_payload.clone();
5061 move |_link_id, _path, _data, _remote| Some(large_payload.clone())
5062 });
5063
5064 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5066 assert!(!req_actions.is_empty());
5067
5068 let req_raw = extract_any_send_packet(&req_actions);
5070 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5071 let resp_actions = resp_mgr.handle_local_delivery(
5072 req_pkt.destination_hash,
5073 &req_raw,
5074 req_pkt.packet_hash,
5075 rns_core::transport::types::InterfaceId(0),
5076 &mut rng,
5077 );
5078
5079 let mut has_resource_adv = false;
5080 let mut has_direct_response = false;
5081 for action in &resp_actions {
5082 if let LinkManagerAction::SendPacket { raw, .. } = action {
5083 let pkt = RawPacket::unpack(raw).unwrap();
5084 if pkt.context == constants::CONTEXT_RESOURCE_ADV {
5085 has_resource_adv = true;
5086 }
5087 if pkt.context == constants::CONTEXT_RESPONSE {
5088 has_direct_response = true;
5089 }
5090 }
5091 }
5092
5093 assert!(
5094 has_resource_adv,
5095 "Large response should advertise a response resource"
5096 );
5097 assert!(
5098 !has_direct_response,
5099 "Large response should not use direct CONTEXT_RESPONSE packet"
5100 );
5101 }
5102
5103 #[test]
5104 fn test_send_management_response_without_session_key_uses_resource_fallback_path() {
5105 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5106 let mut rng = OsRng;
5107 init_mgr
5108 .links
5109 .get_mut(&link_id)
5110 .unwrap()
5111 .engine
5112 .clear_session_for_testing();
5113
5114 let large_response: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5115 let actions =
5116 init_mgr.send_management_response(&link_id, &[0x11; 16], &large_response, &mut rng);
5117
5118 assert!(
5119 actions.is_empty(),
5120 "without a session key, no response packets should be emitted"
5121 );
5122 assert_eq!(
5123 init_mgr
5124 .links
5125 .get(&link_id)
5126 .map(|managed| managed.outgoing_resources.len()),
5127 Some(1)
5128 );
5129 }
5130
5131 #[test]
5132 fn test_send_channel_message_on_no_channel() {
5133 let mut mgr = LinkManager::new();
5134 let mut rng = OsRng;
5135 let dummy_sig = [0xAA; 32];
5136 let (link_id, _) =
5137 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5138
5139 let err = mgr
5141 .send_channel_message(&link_id, 1, b"test", &mut rng)
5142 .expect_err("pending link should reject channel send");
5143 assert_eq!(err, "link has no active channel");
5144 }
5145
5146 #[test]
5147 fn test_send_on_link_requires_active() {
5148 let mut mgr = LinkManager::new();
5149 let mut rng = OsRng;
5150 let dummy_sig = [0xAA; 32];
5151 let (link_id, _) =
5152 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5153
5154 let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
5155 assert!(actions.is_empty(), "Cannot send on pending link");
5156 }
5157
5158 #[test]
5159 fn test_send_on_link_unknown_link() {
5160 let mgr = LinkManager::new();
5161 let mut rng = OsRng;
5162
5163 let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
5164 assert!(actions.is_empty());
5165 }
5166
5167 #[test]
5168 fn test_resource_full_transfer_large() {
5169 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5170 let mut rng = OsRng;
5171
5172 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5173
5174 let original_data: Vec<u8> = (0..2000u32)
5176 .map(|i| {
5177 let pos = i as usize;
5178 (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
5179 })
5180 .collect();
5181
5182 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
5183
5184 let mut pending: Vec<(char, LinkManagerAction)> =
5185 adv_actions.into_iter().map(|a| ('i', a)).collect();
5186 let mut rounds = 0;
5187 let max_rounds = 200;
5188 let mut resource_received = false;
5189 let mut sender_completed = false;
5190
5191 while !pending.is_empty() && rounds < max_rounds {
5192 rounds += 1;
5193 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5194
5195 for (source, action) in pending.drain(..) {
5196 if let LinkManagerAction::SendPacket { raw, .. } = action {
5197 let pkt = match RawPacket::unpack(&raw) {
5198 Ok(p) => p,
5199 Err(_) => continue,
5200 };
5201
5202 let target_actions = if source == 'i' {
5203 resp_mgr.handle_local_delivery(
5204 pkt.destination_hash,
5205 &raw,
5206 pkt.packet_hash,
5207 rns_core::transport::types::InterfaceId(0),
5208 &mut rng,
5209 )
5210 } else {
5211 init_mgr.handle_local_delivery(
5212 pkt.destination_hash,
5213 &raw,
5214 pkt.packet_hash,
5215 rns_core::transport::types::InterfaceId(0),
5216 &mut rng,
5217 )
5218 };
5219
5220 let target_source = if source == 'i' { 'r' } else { 'i' };
5221 for a in &target_actions {
5222 match a {
5223 LinkManagerAction::ResourceReceived { data, .. } => {
5224 assert_eq!(*data, original_data);
5225 resource_received = true;
5226 }
5227 LinkManagerAction::ResourceCompleted { .. } => {
5228 sender_completed = true;
5229 }
5230 _ => {}
5231 }
5232 }
5233 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5234 }
5235 }
5236 pending = next;
5237 }
5238
5239 assert!(
5240 resource_received,
5241 "Should receive large resource (rounds={})",
5242 rounds
5243 );
5244 assert!(
5245 sender_completed,
5246 "Sender should complete (rounds={})",
5247 rounds
5248 );
5249 }
5250
5251 #[test]
5252 fn test_resource_receiver_stores_original_advertisement_plaintext() {
5253 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5254 let mut rng = OsRng;
5255
5256 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5257
5258 let data = vec![0x41; 256];
5259 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5260
5261 let adv_raw = adv_actions
5262 .iter()
5263 .find_map(|action| match action {
5264 LinkManagerAction::SendPacket { raw, .. } => {
5265 let pkt = RawPacket::unpack(raw).ok()?;
5266 (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
5267 }
5268 _ => None,
5269 })
5270 .expect("sender should emit a resource advertisement");
5271
5272 let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
5273 let adv_plaintext = resp_mgr
5274 .links
5275 .get(&link_id)
5276 .unwrap()
5277 .engine
5278 .decrypt(&adv_pkt.data)
5279 .unwrap();
5280
5281 let _resp_actions = resp_mgr.handle_local_delivery(
5282 adv_pkt.destination_hash,
5283 &adv_raw,
5284 adv_pkt.packet_hash,
5285 rns_core::transport::types::InterfaceId(0),
5286 &mut rng,
5287 );
5288
5289 let receiver = resp_mgr
5290 .links
5291 .get(&link_id)
5292 .and_then(|managed| managed.incoming_resources.first())
5293 .expect("advertisement should create an incoming receiver");
5294 assert_eq!(receiver.advertisement_packet, adv_plaintext);
5295 assert_eq!(
5296 receiver.max_decompressed_size,
5297 constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
5298 );
5299 }
5300
5301 #[test]
5302 fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
5303 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5304 let mut rng = OsRng;
5305
5306 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5307
5308 let data = vec![b'A'; 4096];
5309 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5310
5311 let mut request_actions = Vec::new();
5312 for action in &adv_actions {
5313 let LinkManagerAction::SendPacket { raw, .. } = action else {
5314 continue;
5315 };
5316 let pkt = RawPacket::unpack(raw).unwrap();
5317 let actions = resp_mgr.handle_local_delivery(
5318 pkt.destination_hash,
5319 raw,
5320 pkt.packet_hash,
5321 rns_core::transport::types::InterfaceId(0),
5322 &mut rng,
5323 );
5324 request_actions.extend(actions);
5325 }
5326
5327 {
5328 let receiver = resp_mgr
5329 .links
5330 .get_mut(&link_id)
5331 .and_then(|managed| managed.incoming_resources.first_mut())
5332 .expect("receiver should exist after advertisement");
5333 assert!(receiver.flags.compressed, "test data should be compressed");
5334 receiver.max_decompressed_size = 64;
5335 }
5336
5337 let mut responder_actions = Vec::new();
5338 for action in request_actions {
5339 let LinkManagerAction::SendPacket { raw, .. } = action else {
5340 continue;
5341 };
5342 let pkt = RawPacket::unpack(&raw).unwrap();
5343 let actions = init_mgr.handle_local_delivery(
5344 pkt.destination_hash,
5345 &raw,
5346 pkt.packet_hash,
5347 rns_core::transport::types::InterfaceId(0),
5348 &mut rng,
5349 );
5350
5351 for action in actions {
5352 let LinkManagerAction::SendPacket { raw, .. } = &action else {
5353 continue;
5354 };
5355 let pkt = RawPacket::unpack(raw).unwrap();
5356 let delivered = resp_mgr.handle_local_delivery(
5357 pkt.destination_hash,
5358 raw,
5359 pkt.packet_hash,
5360 rns_core::transport::types::InterfaceId(0),
5361 &mut rng,
5362 );
5363 responder_actions.extend(delivered);
5364 }
5365 }
5366
5367 assert!(
5368 responder_actions.iter().any(|action| matches!(
5369 action,
5370 LinkManagerAction::ResourceFailed { error, .. }
5371 if error == "Resource too large"
5372 )),
5373 "corrupt oversized resource should fail with TooLarge"
5374 );
5375 assert!(
5376 responder_actions.iter().any(|action| matches!(
5377 action,
5378 LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
5379 )),
5380 "corrupt oversized resource should tear down the link"
5381 );
5382 assert!(
5383 responder_actions.iter().any(|action| match action {
5384 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5385 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
5386 .unwrap_or(false),
5387 _ => false,
5388 }),
5389 "corrupt oversized resource should send a receiver cancel/reject packet"
5390 );
5391 assert_eq!(
5392 resp_mgr
5393 .links
5394 .get(&link_id)
5395 .map(|managed| managed.engine.state()),
5396 Some(LinkState::Closed)
5397 );
5398 }
5399
5400 #[test]
5401 fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
5402 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5403 let mut rng = OsRng;
5404
5405 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5406
5407 let mut state = 0x1234_5678u32;
5410 let data: Vec<u8> = (0..50000)
5411 .map(|_| {
5412 state = state.wrapping_mul(1664525).wrapping_add(1013904223);
5413 (state >> 16) as u8
5414 })
5415 .collect();
5416 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5417 let mut pending: Vec<(char, LinkManagerAction)> =
5418 adv_actions.into_iter().map(|a| ('i', a)).collect();
5419
5420 let mut rounds = 0;
5421
5422 while rounds < 300 {
5425 rounds += 1;
5426 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5427
5428 for (source, action) in pending.drain(..) {
5429 let LinkManagerAction::SendPacket { raw, .. } = action else {
5430 continue;
5431 };
5432
5433 let pkt = match RawPacket::unpack(&raw) {
5434 Ok(p) => p,
5435 Err(_) => continue,
5436 };
5437
5438 let target_actions = if source == 'i' {
5439 resp_mgr.handle_local_delivery(
5440 pkt.destination_hash,
5441 &raw,
5442 pkt.packet_hash,
5443 rns_core::transport::types::InterfaceId(0),
5444 &mut rng,
5445 )
5446 } else {
5447 init_mgr.handle_local_delivery(
5448 pkt.destination_hash,
5449 &raw,
5450 pkt.packet_hash,
5451 rns_core::transport::types::InterfaceId(0),
5452 &mut rng,
5453 )
5454 };
5455
5456 let target_source = if source == 'i' { 'r' } else { 'i' };
5457 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5458 }
5459
5460 if resp_mgr
5461 .links
5462 .get(&link_id)
5463 .and_then(|managed| managed.incoming_resources.first())
5464 .is_some_and(|receiver| receiver.waiting_for_hmu)
5465 {
5466 break;
5467 }
5468
5469 pending = next;
5470 }
5471
5472 assert!(
5473 resp_mgr
5474 .links
5475 .get(&link_id)
5476 .and_then(|managed| managed.incoming_resources.first())
5477 .is_some_and(|receiver| receiver.waiting_for_hmu),
5478 "expected receiver to reach a live HMU wait state"
5479 );
5480
5481 let prime_actions = {
5484 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5485 let receiver = managed.incoming_resources.first_mut().unwrap();
5486 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5487 managed.engine.decrypt(ciphertext).map_err(|_| ())
5488 };
5489 receiver.tick(
5490 receiver.last_activity + 0.0001,
5491 &decrypt_fn,
5492 &Bzip2Compressor,
5493 )
5494 };
5495 assert!(
5496 !prime_actions
5497 .iter()
5498 .any(|a| matches!(a, ResourceAction::SendRequest(_))),
5499 "fresh HMU wait state should not immediately emit a retry request"
5500 );
5501
5502 let (late_delta, retries_before) = {
5503 let managed = resp_mgr
5504 .links
5505 .get_mut(&link_id)
5506 .expect("receiver link should still exist");
5507 let receiver = managed
5508 .incoming_resources
5509 .first_mut()
5510 .expect("receiver should have an active incoming resource");
5511
5512 assert!(
5513 receiver.waiting_for_hmu,
5514 "receiver should be waiting for HMU"
5515 );
5516
5517 let eifr = receiver.eifr.unwrap_or_else(|| {
5518 (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
5519 });
5520 let expected_tof = if receiver.outstanding_parts > 0 {
5521 (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
5522 } else {
5523 (3.0 * constants::RESOURCE_SDU as f64) / eifr
5524 };
5525 let expected_hmu_wait =
5526 (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
5527 let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
5528 + constants::RESOURCE_RETRY_GRACE_TIME;
5529 (
5530 old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
5531 receiver.retries_left,
5532 )
5533 };
5534 {
5535 let managed = resp_mgr.links.get(&link_id).unwrap();
5536 let receiver = managed.incoming_resources.first().unwrap();
5537 assert_eq!(receiver.retries_left, retries_before);
5538 assert!(
5539 receiver.eifr.is_some(),
5540 "receiver tick should have populated EIFR"
5541 );
5542 }
5543
5544 let late_resource_actions = {
5545 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5546 let receiver = managed.incoming_resources.first_mut().unwrap();
5547 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5548 managed.engine.decrypt(ciphertext).map_err(|_| ())
5549 };
5550 receiver.tick(
5551 receiver.last_activity + late_delta,
5552 &decrypt_fn,
5553 &Bzip2Compressor,
5554 )
5555 };
5556 let late_actions =
5557 resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
5558 let retry_raw = late_actions
5559 .iter()
5560 .find_map(|a| match a {
5561 LinkManagerAction::SendPacket { raw, .. } => {
5562 let pkt = RawPacket::unpack(raw).ok()?;
5563 (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
5564 }
5565 _ => None,
5566 })
5567 .expect("receiver should emit a resource retry request after extended timeout");
5568
5569 {
5570 let managed = resp_mgr.links.get(&link_id).unwrap();
5571 let receiver = managed.incoming_resources.first().unwrap();
5572 assert_eq!(receiver.retries_left, retries_before - 1);
5573 }
5574
5575 let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
5576 let retry_plaintext = resp_mgr
5577 .links
5578 .get(&link_id)
5579 .unwrap()
5580 .engine
5581 .decrypt(&retry_pkt.data)
5582 .expect("retry request should decrypt");
5583 assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
5584
5585 let retry_to_sender = init_mgr.handle_local_delivery(
5588 retry_pkt.destination_hash,
5589 &retry_raw,
5590 retry_pkt.packet_hash,
5591 rns_core::transport::types::InterfaceId(0),
5592 &mut rng,
5593 );
5594 assert!(
5595 retry_to_sender.iter().any(|a| match a {
5596 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5597 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
5598 .unwrap_or(false),
5599 _ => false,
5600 }),
5601 "sender should answer the exhausted retry request with a live HMU packet"
5602 );
5603 }
5604
5605 #[test]
5606 fn test_process_resource_actions_mapping() {
5607 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5608 let mut rng = OsRng;
5609
5610 let actions = vec![
5612 ResourceAction::DataReceived {
5613 data: vec![1, 2, 3],
5614 metadata: Some(vec![4, 5]),
5615 },
5616 ResourceAction::Completed,
5617 ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
5618 ResourceAction::ProgressUpdate {
5619 received: 10,
5620 total: 20,
5621 },
5622 ResourceAction::TeardownLink,
5623 ];
5624
5625 let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
5626
5627 assert!(matches!(
5628 result[0],
5629 LinkManagerAction::ResourceReceived { .. }
5630 ));
5631 assert!(matches!(
5632 result[1],
5633 LinkManagerAction::ResourceCompleted { .. }
5634 ));
5635 assert!(matches!(
5636 result[2],
5637 LinkManagerAction::ResourceFailed { .. }
5638 ));
5639 assert!(matches!(
5640 result[3],
5641 LinkManagerAction::ResourceProgress {
5642 received: 10,
5643 total: 20,
5644 ..
5645 }
5646 ));
5647 assert!(result
5648 .iter()
5649 .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
5650 }
5651
5652 #[test]
5653 fn test_link_state_empty() {
5654 let mgr = LinkManager::new();
5655 let fake_id = [0xAA; 16];
5656 assert!(mgr.link_state(&fake_id).is_none());
5657 }
5658
5659 #[test]
5660 fn test_large_response_resource_completes_as_response() {
5661 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5662 let mut rng = OsRng;
5663
5664 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5665 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
5666 resp_mgr.register_request_handler("/large", None, {
5667 let response_value = response_value.clone();
5668 move |_link_id, _path, _data, _remote| Some(response_value.clone())
5669 });
5670
5671 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5672 let req_raw = extract_any_send_packet(&req_actions);
5673 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5674 let request_id = req_pkt.get_truncated_hash();
5675 let resp_actions = resp_mgr.handle_local_delivery(
5676 req_pkt.destination_hash,
5677 &req_raw,
5678 req_pkt.packet_hash,
5679 rns_core::transport::types::InterfaceId(0),
5680 &mut rng,
5681 );
5682
5683 let mut pending: Vec<(char, LinkManagerAction)> =
5684 resp_actions.into_iter().map(|a| ('r', a)).collect();
5685 let mut rounds = 0;
5686 let mut received_response = None;
5687
5688 while !pending.is_empty() && rounds < 200 {
5689 rounds += 1;
5690 let mut next = Vec::new();
5691
5692 for (source, action) in pending.drain(..) {
5693 let LinkManagerAction::SendPacket { raw, .. } = action else {
5694 continue;
5695 };
5696 let pkt = RawPacket::unpack(&raw).unwrap();
5697 let target_actions = if source == 'r' {
5698 init_mgr.handle_local_delivery(
5699 pkt.destination_hash,
5700 &raw,
5701 pkt.packet_hash,
5702 rns_core::transport::types::InterfaceId(0),
5703 &mut rng,
5704 )
5705 } else {
5706 resp_mgr.handle_local_delivery(
5707 pkt.destination_hash,
5708 &raw,
5709 pkt.packet_hash,
5710 rns_core::transport::types::InterfaceId(0),
5711 &mut rng,
5712 )
5713 };
5714
5715 let target_source = if source == 'r' { 'i' } else { 'r' };
5716 for target_action in &target_actions {
5717 match target_action {
5718 LinkManagerAction::ResponseReceived {
5719 request_id: rid,
5720 data,
5721 ..
5722 } => {
5723 received_response = Some((*rid, data.clone()));
5724 }
5725 LinkManagerAction::ResourceReceived { .. } => {
5726 panic!("response resources must complete as ResponseReceived")
5727 }
5728 LinkManagerAction::ResourceAcceptQuery { .. } => {
5729 panic!("response resources must bypass application acceptance")
5730 }
5731 _ => {}
5732 }
5733 }
5734 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5735 }
5736
5737 pending = next;
5738 }
5739
5740 let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
5741 panic!(
5742 "large response resource did not complete as ResponseReceived after {} rounds",
5743 rounds
5744 )
5745 });
5746 assert_eq!(received_request_id, request_id);
5747 assert_eq!(received_data, response_value);
5748 }
5749
5750 #[test]
5751 fn test_response_resource_preserves_metadata() {
5752 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5753 let mut rng = OsRng;
5754
5755 let payload = b"bundle-data".to_vec();
5756 let metadata = b"git-status-ok".to_vec();
5757 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5758 resp_mgr.register_request_handler_response("/fetch", None, {
5759 let response_value = response_value.clone();
5760 let metadata = metadata.clone();
5761 move |_link_id, _path, _data, _remote| {
5762 Some(RequestResponse::Resource {
5763 data: response_value.clone(),
5764 metadata: Some(metadata.clone()),
5765 auto_compress: false,
5766 })
5767 }
5768 });
5769
5770 let req_actions = init_mgr.send_request(&link_id, "/fetch", b"\xc0", &mut rng);
5771 let req_raw = extract_any_send_packet(&req_actions);
5772 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5773 let request_id = req_pkt.get_truncated_hash();
5774 let resp_actions = resp_mgr.handle_local_delivery(
5775 req_pkt.destination_hash,
5776 &req_raw,
5777 req_pkt.packet_hash,
5778 rns_core::transport::types::InterfaceId(0),
5779 &mut rng,
5780 );
5781
5782 let mut pending: Vec<(char, LinkManagerAction)> =
5783 resp_actions.into_iter().map(|a| ('r', a)).collect();
5784 let mut received_response = None;
5785
5786 for _ in 0..200 {
5787 if pending.is_empty() || received_response.is_some() {
5788 break;
5789 }
5790
5791 let mut next = Vec::new();
5792 for (source, action) in pending.drain(..) {
5793 let LinkManagerAction::SendPacket { raw, .. } = action else {
5794 continue;
5795 };
5796 let pkt = RawPacket::unpack(&raw).unwrap();
5797 let target_actions = if source == 'r' {
5798 init_mgr.handle_local_delivery(
5799 pkt.destination_hash,
5800 &raw,
5801 pkt.packet_hash,
5802 rns_core::transport::types::InterfaceId(0),
5803 &mut rng,
5804 )
5805 } else {
5806 resp_mgr.handle_local_delivery(
5807 pkt.destination_hash,
5808 &raw,
5809 pkt.packet_hash,
5810 rns_core::transport::types::InterfaceId(0),
5811 &mut rng,
5812 )
5813 };
5814
5815 let target_source = if source == 'r' { 'i' } else { 'r' };
5816 for target_action in &target_actions {
5817 match target_action {
5818 LinkManagerAction::ResponseReceived {
5819 request_id: rid,
5820 data,
5821 metadata: response_metadata,
5822 ..
5823 } => {
5824 received_response =
5825 Some((*rid, data.clone(), response_metadata.clone()));
5826 }
5827 LinkManagerAction::ResourceReceived { .. } => {
5828 panic!("response resources must complete as ResponseReceived")
5829 }
5830 _ => {}
5831 }
5832 }
5833 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5834 }
5835 pending = next;
5836 }
5837
5838 let (received_request_id, received_data, received_metadata) = received_response
5839 .expect("resource response with metadata should complete as ResponseReceived");
5840 assert_eq!(received_request_id, request_id);
5841 assert_eq!(received_data, response_value);
5842 assert_eq!(received_metadata, Some(metadata));
5843 }
5844
5845 #[test]
5846 fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
5847 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5848 let mut rng = OsRng;
5849
5850 init_mgr.set_link_mtu(&link_id, 300);
5851 resp_mgr.set_link_mtu(&link_id, 300);
5852
5853 let payload = vec![0xAB; 350];
5854 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5855 resp_mgr.register_request_handler("/mtu", None, {
5856 let response_value = response_value.clone();
5857 move |_link_id, _path, _data, _remote| Some(response_value.clone())
5858 });
5859
5860 let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
5861 let req_raw = extract_any_send_packet(&req_actions);
5862 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5863 let resp_actions = resp_mgr.handle_local_delivery(
5864 req_pkt.destination_hash,
5865 &req_raw,
5866 req_pkt.packet_hash,
5867 rns_core::transport::types::InterfaceId(0),
5868 &mut rng,
5869 );
5870
5871 let mut has_resource_adv = false;
5872 let mut direct_response_len = None;
5873 for action in &resp_actions {
5874 if let LinkManagerAction::SendPacket { raw, .. } = action {
5875 let pkt = RawPacket::unpack(raw).unwrap();
5876 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5877 if pkt.context == constants::CONTEXT_RESPONSE {
5878 direct_response_len = Some(raw.len());
5879 }
5880 }
5881 }
5882
5883 assert!(
5884 has_resource_adv,
5885 "responses larger than the negotiated link MTU should use resource fallback"
5886 );
5887 assert!(
5888 direct_response_len.is_none(),
5889 "sent direct response of {} bytes on a 300 byte negotiated MTU",
5890 direct_response_len.unwrap_or_default()
5891 );
5892 }
5893
5894 #[test]
5895 fn test_large_management_response_uses_resource_fallback() {
5896 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
5897 let mut rng = OsRng;
5898
5899 let payload = vec![0xBC; 5000];
5900 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5901 let actions =
5902 resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
5903
5904 let mut has_resource_adv = false;
5905 let mut has_direct_response = false;
5906 for action in &actions {
5907 if let LinkManagerAction::SendPacket { raw, .. } = action {
5908 let pkt = RawPacket::unpack(raw).unwrap();
5909 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5910 has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
5911 }
5912 }
5913
5914 assert!(
5915 has_resource_adv,
5916 "large management responses should advertise a response resource"
5917 );
5918 assert!(
5919 !has_direct_response,
5920 "large management responses should not use a direct CONTEXT_RESPONSE packet"
5921 );
5922 }
5923}