1use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26 AcceptNone,
28 AcceptAll,
30 AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35 fn default() -> Self {
36 ResourceStrategy::AcceptNone
37 }
38}
39
40struct ManagedLink {
42 engine: LinkEngine,
43 channel: Option<Channel>,
44 pending_channel_packets: HashMap<[u8; 32], Sequence>,
45 channel_send_ok: u64,
46 channel_send_not_ready: u64,
47 channel_send_too_big: u64,
48 channel_send_other_error: u64,
49 channel_messages_received: u64,
50 channel_proofs_sent: u64,
51 channel_proofs_received: u64,
52 dest_hash: [u8; 16],
54 remote_identity: Option<([u8; 16], [u8; 64])>,
56 dest_sig_pub_bytes: Option<[u8; 32]>,
58 incoming_resources: Vec<ResourceReceiver>,
60 outgoing_resources: Vec<ResourceSender>,
62 incoming_splits: HashMap<[u8; 32], IncomingSplitTransfer>,
64 outgoing_splits: HashMap<[u8; 32], OutgoingSplitTransfer>,
66 resource_strategy: ResourceStrategy,
68 route_interface: Option<rns_core::transport::types::InterfaceId>,
70 route_transport_id: Option<[u8; 16]>,
75}
76
77struct IncomingSplitTransfer {
78 total_segments: u64,
79 completed_segments: u64,
80 current_segment_index: u64,
81 current_received_parts: usize,
82 current_total_parts: usize,
83 data: Vec<u8>,
84 metadata: Option<Vec<u8>>,
85 is_response: bool,
86}
87
88struct OutgoingSplitTransfer {
89 total_segments: u64,
90 completed_segments: u64,
91 current_segment_index: u64,
92 current_sent_parts: usize,
93 current_total_parts: usize,
94}
95
96struct LinkDestination {
98 sig_prv: Ed25519PrivateKey,
99 sig_pub_bytes: [u8; 32],
100 resource_strategy: ResourceStrategy,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum RequestResponse {
106 Bytes(Vec<u8>),
108 Resource {
110 data: Vec<u8>,
111 metadata: Option<Vec<u8>>,
112 auto_compress: bool,
113 },
114}
115
116impl From<Vec<u8>> for RequestResponse {
117 fn from(data: Vec<u8>) -> Self {
118 RequestResponse::Bytes(data)
119 }
120}
121
122struct RequestHandlerEntry {
124 path: String,
126 path_hash: [u8; 16],
128 allowed_list: Option<Vec<[u8; 16]>>,
130 handler: Box<
132 dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
133 + Send,
134 >,
135}
136
137#[derive(Debug)]
139pub enum LinkManagerAction {
140 SendPacket {
142 raw: Vec<u8>,
143 dest_type: u8,
144 attached_interface: Option<rns_core::transport::types::InterfaceId>,
145 },
146 LinkEstablished {
148 link_id: LinkId,
149 dest_hash: [u8; 16],
150 rtt: f64,
151 is_initiator: bool,
152 },
153 LinkClosed {
155 link_id: LinkId,
156 reason: Option<TeardownReason>,
157 },
158 RemoteIdentified {
160 link_id: LinkId,
161 identity_hash: [u8; 16],
162 public_key: [u8; 64],
163 },
164 RegisterLinkDest { link_id: LinkId },
166 DeregisterLinkDest { link_id: LinkId },
168 ManagementRequest {
171 link_id: LinkId,
172 path_hash: [u8; 16],
173 data: Vec<u8>,
175 request_id: [u8; 16],
177 remote_identity: Option<([u8; 16], [u8; 64])>,
178 },
179 ResourceReceived {
181 link_id: LinkId,
182 data: Vec<u8>,
183 metadata: Option<Vec<u8>>,
184 },
185 ResourceCompleted { link_id: LinkId },
187 ResourceFailed { link_id: LinkId, error: String },
189 ResourceProgress {
191 link_id: LinkId,
192 received: usize,
193 total: usize,
194 },
195 ResourceAcceptQuery {
197 link_id: LinkId,
198 resource_hash: Vec<u8>,
199 transfer_size: u64,
200 has_metadata: bool,
201 },
202 ChannelMessageReceived {
204 link_id: LinkId,
205 msgtype: u16,
206 payload: Vec<u8>,
207 },
208 LinkDataReceived {
210 link_id: LinkId,
211 context: u8,
212 data: Vec<u8>,
213 },
214 ResponseReceived {
216 link_id: LinkId,
217 request_id: [u8; 16],
218 data: Vec<u8>,
219 metadata: Option<Vec<u8>>,
220 },
221 LinkRequestReceived {
223 link_id: LinkId,
224 receiving_interface: rns_core::transport::types::InterfaceId,
225 },
226}
227
228pub struct LinkManager {
230 links: HashMap<LinkId, ManagedLink>,
231 link_destinations: HashMap<[u8; 16], LinkDestination>,
232 request_handlers: Vec<RequestHandlerEntry>,
233 management_paths: Vec<[u8; 16]>,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct LinkRouteHint {
240 pub interface: rns_core::transport::types::InterfaceId,
241 pub transport_id: Option<[u8; 16]>,
242}
243
244impl LinkManager {
245 fn resource_sdu_for_link(link: &ManagedLink) -> usize {
246 let mtu = link.engine.mtu() as usize;
249 let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
250 if derived > 0 {
251 derived
252 } else {
253 constants::RESOURCE_SDU
254 }
255 }
256
257 fn split_progress_parts(
258 segment_index: u64,
259 total_segments: u64,
260 current_done: usize,
261 current_total: usize,
262 sdu: usize,
263 ) -> (usize, usize) {
264 let max_parts_per_segment = constants::RESOURCE_MAX_EFFICIENT_SIZE.div_ceil(sdu.max(1));
265 let total = (total_segments as usize).saturating_mul(max_parts_per_segment);
266 let completed_segments = segment_index.saturating_sub(1) as usize;
267 let completed = completed_segments.saturating_mul(max_parts_per_segment);
268 let current = if current_total == 0 {
269 0
270 } else if current_total < max_parts_per_segment {
271 let scaled =
272 (current_done as f64) * (max_parts_per_segment as f64 / current_total as f64);
273 scaled.floor() as usize
274 } else {
275 current_done
276 };
277 (completed.saturating_add(current).min(total), total)
278 }
279
280 fn resource_hash_key(hash: &[u8]) -> Option<[u8; 32]> {
281 let mut key = [0u8; 32];
282 if hash.len() != key.len() {
283 return None;
284 }
285 key.copy_from_slice(hash);
286 Some(key)
287 }
288
289 fn incoming_split_progress(split: &IncomingSplitTransfer, sdu: usize) -> (usize, usize) {
290 Self::split_progress_parts(
291 split.current_segment_index,
292 split.total_segments,
293 split.current_received_parts,
294 split.current_total_parts,
295 sdu,
296 )
297 }
298
299 fn outgoing_split_progress(split: &OutgoingSplitTransfer, sdu: usize) -> (usize, usize) {
300 Self::split_progress_parts(
301 split.current_segment_index,
302 split.total_segments,
303 split.current_sent_parts,
304 split.current_total_parts,
305 sdu,
306 )
307 }
308
309 pub fn new() -> Self {
311 LinkManager {
312 links: HashMap::new(),
313 link_destinations: HashMap::new(),
314 request_handlers: Vec::new(),
315 management_paths: Vec::new(),
316 }
317 }
318
319 pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
323 if !self.management_paths.contains(&path_hash) {
324 self.management_paths.push(path_hash);
325 }
326 }
327
328 pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
330 self.links
331 .get(link_id)
332 .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
333 }
334
335 pub fn get_link_route_hint(&self, link_id: &LinkId) -> Option<LinkRouteHint> {
337 self.links.get(link_id).and_then(|link| {
338 link.route_interface.map(|interface| LinkRouteHint {
339 interface,
340 transport_id: link.route_transport_id,
341 })
342 })
343 }
344
345 pub fn set_link_route_hint(
347 &mut self,
348 link_id: &LinkId,
349 interface: rns_core::transport::types::InterfaceId,
350 transport_id: Option<[u8; 16]>,
351 ) -> bool {
352 let Some(link) = self.links.get_mut(link_id) else {
353 return false;
354 };
355 link.route_interface = Some(interface);
356 link.route_transport_id = transport_id;
357 true
358 }
359
360 pub fn register_link_destination(
362 &mut self,
363 dest_hash: [u8; 16],
364 sig_prv: Ed25519PrivateKey,
365 sig_pub_bytes: [u8; 32],
366 resource_strategy: ResourceStrategy,
367 ) {
368 self.link_destinations.insert(
369 dest_hash,
370 LinkDestination {
371 sig_prv,
372 sig_pub_bytes,
373 resource_strategy,
374 },
375 );
376 }
377
378 pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
380 self.link_destinations.remove(dest_hash);
381 }
382
383 pub fn register_request_handler<F>(
389 &mut self,
390 path: &str,
391 allowed_list: Option<Vec<[u8; 16]>>,
392 handler: F,
393 ) where
394 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
395 + Send
396 + 'static,
397 {
398 let path_hash = compute_path_hash(path);
399 self.request_handlers.push(RequestHandlerEntry {
400 path: path.to_string(),
401 path_hash,
402 allowed_list,
403 handler: Box::new(move |link_id, p, data, remote| {
404 handler(link_id, p, data, remote).map(RequestResponse::Bytes)
405 }),
406 });
407 }
408
409 pub fn register_request_handler_response<F>(
411 &mut self,
412 path: &str,
413 allowed_list: Option<Vec<[u8; 16]>>,
414 handler: F,
415 ) where
416 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<RequestResponse>
417 + Send
418 + 'static,
419 {
420 let path_hash = compute_path_hash(path);
421 self.request_handlers.push(RequestHandlerEntry {
422 path: path.to_string(),
423 path_hash,
424 allowed_list,
425 handler: Box::new(handler),
426 });
427 }
428
429 pub fn create_link(
437 &mut self,
438 dest_hash: &[u8; 16],
439 dest_sig_pub_bytes: &[u8; 32],
440 hops: u8,
441 mtu: u32,
442 rng: &mut dyn Rng,
443 ) -> (LinkId, Vec<LinkManagerAction>) {
444 let mode = LinkMode::Aes256Cbc;
445 let (mut engine, request_data) =
446 LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
447
448 let flags = PacketFlags {
450 header_type: constants::HEADER_1,
451 context_flag: constants::FLAG_UNSET,
452 transport_type: constants::TRANSPORT_BROADCAST,
453 destination_type: constants::DESTINATION_SINGLE,
454 packet_type: constants::PACKET_TYPE_LINKREQUEST,
455 };
456
457 let packet = match RawPacket::pack(
458 flags,
459 0,
460 dest_hash,
461 None,
462 constants::CONTEXT_NONE,
463 &request_data,
464 ) {
465 Ok(p) => p,
466 Err(_) => {
467 return ([0u8; 16], Vec::new());
469 }
470 };
471
472 engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
473 let link_id = *engine.link_id();
474
475 let managed = ManagedLink {
476 engine,
477 channel: None,
478 pending_channel_packets: HashMap::new(),
479 channel_send_ok: 0,
480 channel_send_not_ready: 0,
481 channel_send_too_big: 0,
482 channel_send_other_error: 0,
483 channel_messages_received: 0,
484 channel_proofs_sent: 0,
485 channel_proofs_received: 0,
486 dest_hash: *dest_hash,
487 remote_identity: None,
488 dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
489 incoming_resources: Vec::new(),
490 outgoing_resources: Vec::new(),
491 incoming_splits: HashMap::new(),
492 outgoing_splits: HashMap::new(),
493 resource_strategy: ResourceStrategy::default(),
494 route_interface: None,
495 route_transport_id: None,
496 };
497 self.links.insert(link_id, managed);
498
499 let mut actions = Vec::new();
500 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
502 actions.push(LinkManagerAction::SendPacket {
504 raw: packet.raw,
505 dest_type: constants::DESTINATION_LINK,
506 attached_interface: None,
507 });
508
509 (link_id, actions)
510 }
511
512 pub fn handle_local_delivery(
518 &mut self,
519 dest_hash: [u8; 16],
520 raw: &[u8],
521 packet_hash: [u8; 32],
522 receiving_interface: rns_core::transport::types::InterfaceId,
523 rng: &mut dyn Rng,
524 ) -> Vec<LinkManagerAction> {
525 let packet = match RawPacket::unpack(raw) {
526 Ok(p) => p,
527 Err(_) => return Vec::new(),
528 };
529
530 match packet.flags.packet_type {
531 constants::PACKET_TYPE_LINKREQUEST => {
532 self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
533 }
534 constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
535 self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
537 }
538 constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
539 constants::PACKET_TYPE_DATA => {
540 self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
541 }
542 _ => Vec::new(),
543 }
544 }
545
546 fn handle_linkrequest(
548 &mut self,
549 dest_hash: &[u8; 16],
550 packet: &RawPacket,
551 receiving_interface: rns_core::transport::types::InterfaceId,
552 rng: &mut dyn Rng,
553 ) -> Vec<LinkManagerAction> {
554 let ld = match self.link_destinations.get(dest_hash) {
556 Some(ld) => ld,
557 None => return Vec::new(),
558 };
559
560 let hashable = packet.get_hashable_part();
561 let now = time::now();
562
563 let (engine, lrproof_data) = match LinkEngine::new_responder(
565 &ld.sig_prv,
566 &ld.sig_pub_bytes,
567 &packet.data,
568 &hashable,
569 dest_hash,
570 packet.hops,
571 now,
572 rng,
573 ) {
574 Ok(r) => r,
575 Err(e) => {
576 log::debug!("LINKREQUEST rejected: {}", e);
577 return Vec::new();
578 }
579 };
580
581 let link_id = *engine.link_id();
582 log::debug!(
583 "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
584 &link_id[..4],
585 receiving_interface.0,
586 packet.flags.header_type,
587 packet.transport_id.is_some(),
588 packet.hops
589 );
590
591 let managed = ManagedLink {
592 engine,
593 channel: None,
594 pending_channel_packets: HashMap::new(),
595 channel_send_ok: 0,
596 channel_send_not_ready: 0,
597 channel_send_too_big: 0,
598 channel_send_other_error: 0,
599 channel_messages_received: 0,
600 channel_proofs_sent: 0,
601 channel_proofs_received: 0,
602 dest_hash: *dest_hash,
603 remote_identity: None,
604 dest_sig_pub_bytes: None,
605 incoming_resources: Vec::new(),
606 outgoing_resources: Vec::new(),
607 incoming_splits: HashMap::new(),
608 outgoing_splits: HashMap::new(),
609 resource_strategy: ld.resource_strategy,
610 route_interface: Some(receiving_interface),
611 route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
612 packet.transport_id
613 } else {
614 None
615 },
616 };
617 self.links.insert(link_id, managed);
618
619 let flags = PacketFlags {
621 header_type: constants::HEADER_1,
622 context_flag: constants::FLAG_UNSET,
623 transport_type: constants::TRANSPORT_BROADCAST,
624 destination_type: constants::DESTINATION_LINK,
625 packet_type: constants::PACKET_TYPE_PROOF,
626 };
627
628 let mut actions = Vec::new();
629
630 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
632
633 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
634 flags,
635 packet.hops,
636 &link_id,
637 None,
638 constants::CONTEXT_LRPROOF,
639 &lrproof_data,
640 ) {
641 log::debug!(
642 "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
643 &link_id[..4],
644 receiving_interface.0,
645 packet.transport_id.is_some(),
646 packet.hops
647 );
648 actions.push(LinkManagerAction::SendPacket {
649 raw,
650 dest_type: constants::DESTINATION_LINK,
651 attached_interface: None,
652 });
653 }
654
655 if packet.hops != 0 {
658 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
659 flags,
660 0,
661 &link_id,
662 None,
663 constants::CONTEXT_LRPROOF,
664 &lrproof_data,
665 ) {
666 log::debug!(
667 "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
668 &link_id[..4],
669 receiving_interface.0
670 );
671 actions.push(LinkManagerAction::SendPacket {
672 raw,
673 dest_type: constants::DESTINATION_LINK,
674 attached_interface: None,
675 });
676 }
677 }
678
679 if packet.hops < u8::MAX {
683 let hops_plus_one = packet.hops + 1;
684 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
685 flags,
686 hops_plus_one,
687 &link_id,
688 None,
689 constants::CONTEXT_LRPROOF,
690 &lrproof_data,
691 ) {
692 log::debug!(
693 "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
694 &link_id[..4],
695 receiving_interface.0,
696 hops_plus_one
697 );
698 actions.push(LinkManagerAction::SendPacket {
699 raw,
700 dest_type: constants::DESTINATION_LINK,
701 attached_interface: None,
702 });
703 }
704 }
705
706 actions.push(LinkManagerAction::LinkRequestReceived {
708 link_id,
709 receiving_interface,
710 });
711
712 actions
713 }
714
715 fn handle_link_proof(
716 &mut self,
717 link_id: &LinkId,
718 packet: &RawPacket,
719 rng: &mut dyn Rng,
720 ) -> Vec<LinkManagerAction> {
721 if packet.data.len() < 32 {
722 return Vec::new();
723 }
724
725 let mut tracked_hash = [0u8; 32];
726 tracked_hash.copy_from_slice(&packet.data[..32]);
727
728 let Some(link) = self.links.get_mut(link_id) else {
729 return Vec::new();
730 };
731 let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
732 return Vec::new();
733 };
734 link.channel_proofs_received += 1;
735 let Some(channel) = link.channel.as_mut() else {
736 return Vec::new();
737 };
738
739 let chan_actions = channel.packet_delivered(sequence);
740 let _ = channel;
741 let _ = link;
742 self.process_channel_actions(link_id, chan_actions, rng)
743 }
744
745 fn build_link_packet_proof(
746 &mut self,
747 link_id: &LinkId,
748 packet_hash: &[u8; 32],
749 ) -> Vec<LinkManagerAction> {
750 let dest_hash = match self.links.get(link_id) {
751 Some(link) => link.dest_hash,
752 None => return Vec::new(),
753 };
754 let Some(ld) = self.link_destinations.get(&dest_hash) else {
755 return Vec::new();
756 };
757 if let Some(link) = self.links.get_mut(link_id) {
758 link.channel_proofs_sent += 1;
759 }
760
761 let signature = ld.sig_prv.sign(packet_hash);
762 let mut proof_data = Vec::with_capacity(96);
763 proof_data.extend_from_slice(packet_hash);
764 proof_data.extend_from_slice(&signature);
765
766 let flags = PacketFlags {
767 header_type: constants::HEADER_1,
768 context_flag: constants::FLAG_UNSET,
769 transport_type: constants::TRANSPORT_BROADCAST,
770 destination_type: constants::DESTINATION_LINK,
771 packet_type: constants::PACKET_TYPE_PROOF,
772 };
773 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
774 flags,
775 0,
776 link_id,
777 None,
778 constants::CONTEXT_NONE,
779 &proof_data,
780 ) {
781 vec![LinkManagerAction::SendPacket {
782 raw,
783 dest_type: constants::DESTINATION_LINK,
784 attached_interface: None,
785 }]
786 } else {
787 Vec::new()
788 }
789 }
790
791 fn handle_lrproof(
793 &mut self,
794 link_id_bytes: &[u8; 16],
795 packet: &RawPacket,
796 receiving_interface: rns_core::transport::types::InterfaceId,
797 rng: &mut dyn Rng,
798 ) -> Vec<LinkManagerAction> {
799 let link = match self.links.get_mut(link_id_bytes) {
800 Some(l) => l,
801 None => return Vec::new(),
802 };
803
804 link.route_interface = Some(receiving_interface);
805 if packet.flags.header_type == constants::HEADER_2 {
806 if let Some(transport_id) = packet.transport_id {
807 link.route_transport_id = Some(transport_id);
808 }
809 }
810 log::debug!(
811 "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
812 &link_id_bytes[..4],
813 receiving_interface.0,
814 packet.flags.header_type,
815 packet.transport_id.is_some()
816 );
817
818 if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
819 return Vec::new();
820 }
821
822 let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
824 Some(b) => b,
825 None => {
826 log::debug!("LRPROOF: no destination signing key available");
827 return Vec::new();
828 }
829 };
830
831 let now = time::now();
832 let (lrrtt_encrypted, link_actions) =
833 match link
834 .engine
835 .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
836 {
837 Ok(r) => r,
838 Err(e) => {
839 log::debug!("LRPROOF validation failed: {}", e);
840 return Vec::new();
841 }
842 };
843
844 let link_id = *link.engine.link_id();
845 let mut actions = Vec::new();
846
847 actions.extend(self.process_link_actions(&link_id, &link_actions));
849
850 let flags = PacketFlags {
852 header_type: constants::HEADER_1,
853 context_flag: constants::FLAG_UNSET,
854 transport_type: constants::TRANSPORT_BROADCAST,
855 destination_type: constants::DESTINATION_LINK,
856 packet_type: constants::PACKET_TYPE_DATA,
857 };
858
859 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
860 flags,
861 0,
862 &link_id,
863 None,
864 constants::CONTEXT_LRRTT,
865 &lrrtt_encrypted,
866 ) {
867 actions.push(LinkManagerAction::SendPacket {
868 raw,
869 dest_type: constants::DESTINATION_LINK,
870 attached_interface: None,
871 });
872 }
873
874 if let Some(link) = self.links.get_mut(&link_id) {
876 if link.engine.state() == LinkState::Active {
877 let rtt = link.engine.rtt().unwrap_or(1.0);
878 link.channel = Some(Channel::new(rtt));
879 }
880 }
881
882 actions
883 }
884
885 fn handle_link_data(
891 &mut self,
892 link_id_bytes: &[u8; 16],
893 packet: &RawPacket,
894 packet_hash: [u8; 32],
895 receiving_interface: rns_core::transport::types::InterfaceId,
896 rng: &mut dyn Rng,
897 ) -> Vec<LinkManagerAction> {
898 enum LinkDataResult<'a> {
900 Lrrtt {
901 link_id: LinkId,
902 link_actions: Vec<LinkAction>,
903 },
904 Identify {
905 link_id: LinkId,
906 link_actions: Vec<LinkAction>,
907 },
908 Keepalive {
909 link_id: LinkId,
910 inbound_actions: Vec<LinkAction>,
911 },
912 LinkClose {
913 link_id: LinkId,
914 teardown_actions: Vec<LinkAction>,
915 },
916 Channel {
917 link_id: LinkId,
918 inbound_actions: Vec<LinkAction>,
919 plaintext: Vec<u8>,
920 packet_hash: [u8; 32],
921 },
922 Request {
923 link_id: LinkId,
924 inbound_actions: Vec<LinkAction>,
925 plaintext: Vec<u8>,
926 request_id: [u8; 16],
927 },
928 Response {
929 link_id: LinkId,
930 inbound_actions: Vec<LinkAction>,
931 plaintext: Vec<u8>,
932 },
933 Generic {
934 link_id: LinkId,
935 inbound_actions: Vec<LinkAction>,
936 plaintext: Vec<u8>,
937 context: u8,
938 packet_hash: [u8; 32],
939 },
940 ResourceAdv {
942 link_id: LinkId,
943 inbound_actions: Vec<LinkAction>,
944 plaintext: Vec<u8>,
945 },
946 ResourceReq {
948 link_id: LinkId,
949 inbound_actions: Vec<LinkAction>,
950 plaintext: Vec<u8>,
951 },
952 ResourceHmu {
954 link_id: LinkId,
955 inbound_actions: Vec<LinkAction>,
956 plaintext: Vec<u8>,
957 },
958 ResourcePart {
960 link_id: LinkId,
961 inbound_actions: Vec<LinkAction>,
962 raw_data: &'a [u8],
963 },
964 ResourcePrf {
966 link_id: LinkId,
967 inbound_actions: Vec<LinkAction>,
968 plaintext: Vec<u8>,
969 },
970 ResourceIcl {
972 link_id: LinkId,
973 inbound_actions: Vec<LinkAction>,
974 },
975 ResourceRcl {
977 link_id: LinkId,
978 inbound_actions: Vec<LinkAction>,
979 },
980 Error,
981 }
982
983 let result = {
984 let link = match self.links.get_mut(link_id_bytes) {
985 Some(l) => l,
986 None => return Vec::new(),
987 };
988
989 link.route_interface = Some(receiving_interface);
990 if packet.flags.header_type == constants::HEADER_2 {
991 if let Some(transport_id) = packet.transport_id {
992 link.route_transport_id = Some(transport_id);
993 }
994 } else {
995 link.route_transport_id = None;
996 }
997
998 match packet.context {
999 constants::CONTEXT_LRRTT => {
1000 match link.engine.handle_lrrtt(&packet.data, time::now()) {
1001 Ok(link_actions) => {
1002 let link_id = *link.engine.link_id();
1003 LinkDataResult::Lrrtt {
1004 link_id,
1005 link_actions,
1006 }
1007 }
1008 Err(e) => {
1009 log::debug!("LRRTT handling failed: {}", e);
1010 LinkDataResult::Error
1011 }
1012 }
1013 }
1014 constants::CONTEXT_LINKIDENTIFY => {
1015 match link.engine.handle_identify(&packet.data) {
1016 Ok(link_actions) => {
1017 let link_id = *link.engine.link_id();
1018 link.remote_identity = link.engine.remote_identity().cloned();
1019 LinkDataResult::Identify {
1020 link_id,
1021 link_actions,
1022 }
1023 }
1024 Err(e) => {
1025 log::debug!("LINKIDENTIFY failed: {}", e);
1026 LinkDataResult::Error
1027 }
1028 }
1029 }
1030 constants::CONTEXT_KEEPALIVE => {
1031 let inbound_actions = link.engine.record_inbound(time::now());
1032 let link_id = *link.engine.link_id();
1033 LinkDataResult::Keepalive {
1034 link_id,
1035 inbound_actions,
1036 }
1037 }
1038 constants::CONTEXT_LINKCLOSE => {
1039 let teardown_actions = link.engine.handle_teardown();
1040 let link_id = *link.engine.link_id();
1041 LinkDataResult::LinkClose {
1042 link_id,
1043 teardown_actions,
1044 }
1045 }
1046 constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
1047 Ok(plaintext) => {
1048 let inbound_actions = link.engine.record_inbound(time::now());
1049 let link_id = *link.engine.link_id();
1050 LinkDataResult::Channel {
1051 link_id,
1052 inbound_actions,
1053 plaintext,
1054 packet_hash,
1055 }
1056 }
1057 Err(_) => LinkDataResult::Error,
1058 },
1059 constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
1060 Ok(plaintext) => {
1061 let inbound_actions = link.engine.record_inbound(time::now());
1062 let link_id = *link.engine.link_id();
1063 let request_id = packet.get_truncated_hash();
1064 LinkDataResult::Request {
1065 link_id,
1066 inbound_actions,
1067 plaintext,
1068 request_id,
1069 }
1070 }
1071 Err(_) => LinkDataResult::Error,
1072 },
1073 constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
1074 Ok(plaintext) => {
1075 let inbound_actions = link.engine.record_inbound(time::now());
1076 let link_id = *link.engine.link_id();
1077 LinkDataResult::Response {
1078 link_id,
1079 inbound_actions,
1080 plaintext,
1081 }
1082 }
1083 Err(_) => LinkDataResult::Error,
1084 },
1085 constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
1087 Ok(plaintext) => {
1088 let inbound_actions = link.engine.record_inbound(time::now());
1089 let link_id = *link.engine.link_id();
1090 LinkDataResult::ResourceAdv {
1091 link_id,
1092 inbound_actions,
1093 plaintext,
1094 }
1095 }
1096 Err(_) => LinkDataResult::Error,
1097 },
1098 constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
1099 Ok(plaintext) => {
1100 let inbound_actions = link.engine.record_inbound(time::now());
1101 let link_id = *link.engine.link_id();
1102 LinkDataResult::ResourceReq {
1103 link_id,
1104 inbound_actions,
1105 plaintext,
1106 }
1107 }
1108 Err(_) => LinkDataResult::Error,
1109 },
1110 constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
1111 Ok(plaintext) => {
1112 let inbound_actions = link.engine.record_inbound(time::now());
1113 let link_id = *link.engine.link_id();
1114 LinkDataResult::ResourceHmu {
1115 link_id,
1116 inbound_actions,
1117 plaintext,
1118 }
1119 }
1120 Err(_) => LinkDataResult::Error,
1121 },
1122 constants::CONTEXT_RESOURCE => {
1123 let inbound_actions = link.engine.record_inbound(time::now());
1125 let link_id = *link.engine.link_id();
1126 LinkDataResult::ResourcePart {
1127 link_id,
1128 inbound_actions,
1129 raw_data: &packet.data,
1130 }
1131 }
1132 constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
1133 Ok(plaintext) => {
1134 let inbound_actions = link.engine.record_inbound(time::now());
1135 let link_id = *link.engine.link_id();
1136 LinkDataResult::ResourcePrf {
1137 link_id,
1138 inbound_actions,
1139 plaintext,
1140 }
1141 }
1142 Err(_) => LinkDataResult::Error,
1143 },
1144 constants::CONTEXT_RESOURCE_ICL => {
1145 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1147 let link_id = *link.engine.link_id();
1148 LinkDataResult::ResourceIcl {
1149 link_id,
1150 inbound_actions,
1151 }
1152 }
1153 constants::CONTEXT_RESOURCE_RCL => {
1154 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1156 let link_id = *link.engine.link_id();
1157 LinkDataResult::ResourceRcl {
1158 link_id,
1159 inbound_actions,
1160 }
1161 }
1162 _ => match link.engine.decrypt(&packet.data) {
1163 Ok(plaintext) => {
1164 let inbound_actions = link.engine.record_inbound(time::now());
1165 let link_id = *link.engine.link_id();
1166 LinkDataResult::Generic {
1167 link_id,
1168 inbound_actions,
1169 plaintext,
1170 context: packet.context,
1171 packet_hash,
1172 }
1173 }
1174 Err(_) => LinkDataResult::Error,
1175 },
1176 }
1177 }; let mut actions = Vec::new();
1181 match result {
1182 LinkDataResult::Lrrtt {
1183 link_id,
1184 link_actions,
1185 } => {
1186 actions.extend(self.process_link_actions(&link_id, &link_actions));
1187 if let Some(link) = self.links.get_mut(&link_id) {
1189 if link.engine.state() == LinkState::Active {
1190 let rtt = link.engine.rtt().unwrap_or(1.0);
1191 link.channel = Some(Channel::new(rtt));
1192 }
1193 }
1194 }
1195 LinkDataResult::Identify {
1196 link_id,
1197 link_actions,
1198 } => {
1199 actions.extend(self.process_link_actions(&link_id, &link_actions));
1200 }
1201 LinkDataResult::Keepalive {
1202 link_id,
1203 inbound_actions,
1204 } => {
1205 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1206 }
1212 LinkDataResult::LinkClose {
1213 link_id,
1214 teardown_actions,
1215 } => {
1216 actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1217 }
1218 LinkDataResult::Channel {
1219 link_id,
1220 inbound_actions,
1221 plaintext,
1222 packet_hash,
1223 } => {
1224 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1225 if let Some(link) = self.links.get_mut(&link_id) {
1227 if let Some(ref mut channel) = link.channel {
1228 let chan_actions = channel.receive(&plaintext, time::now());
1229 link.channel_messages_received += chan_actions
1230 .iter()
1231 .filter(|action| {
1232 matches!(
1233 action,
1234 rns_core::channel::ChannelAction::MessageReceived { .. }
1235 )
1236 })
1237 .count()
1238 as u64;
1239 let _ = link;
1241 actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1242 }
1243 }
1244 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1245 }
1246 LinkDataResult::Request {
1247 link_id,
1248 inbound_actions,
1249 plaintext,
1250 request_id,
1251 } => {
1252 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1253 actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1254 }
1255 LinkDataResult::Response {
1256 link_id,
1257 inbound_actions,
1258 plaintext,
1259 } => {
1260 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1261 actions.extend(self.handle_response(&link_id, &plaintext, None));
1263 }
1264 LinkDataResult::Generic {
1265 link_id,
1266 inbound_actions,
1267 plaintext,
1268 context,
1269 packet_hash,
1270 } => {
1271 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1272 actions.push(LinkManagerAction::LinkDataReceived {
1273 link_id,
1274 context,
1275 data: plaintext,
1276 });
1277
1278 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1279 }
1280 LinkDataResult::ResourceAdv {
1281 link_id,
1282 inbound_actions,
1283 plaintext,
1284 } => {
1285 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1286 actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1287 }
1288 LinkDataResult::ResourceReq {
1289 link_id,
1290 inbound_actions,
1291 plaintext,
1292 } => {
1293 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1294 actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1295 }
1296 LinkDataResult::ResourceHmu {
1297 link_id,
1298 inbound_actions,
1299 plaintext,
1300 } => {
1301 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1302 actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1303 }
1304 LinkDataResult::ResourcePart {
1305 link_id,
1306 inbound_actions,
1307 raw_data,
1308 } => {
1309 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1310 actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1311 }
1312 LinkDataResult::ResourcePrf {
1313 link_id,
1314 inbound_actions,
1315 plaintext,
1316 } => {
1317 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1318 actions.extend(self.handle_resource_prf(&link_id, &plaintext, rng));
1319 }
1320 LinkDataResult::ResourceIcl {
1321 link_id,
1322 inbound_actions,
1323 } => {
1324 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1325 actions.extend(self.handle_resource_icl(&link_id));
1326 }
1327 LinkDataResult::ResourceRcl {
1328 link_id,
1329 inbound_actions,
1330 } => {
1331 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1332 actions.extend(self.handle_resource_rcl(&link_id));
1333 }
1334 LinkDataResult::Error => {}
1335 }
1336
1337 actions
1338 }
1339
1340 fn handle_request(
1342 &mut self,
1343 link_id: &LinkId,
1344 plaintext: &[u8],
1345 request_id: [u8; 16],
1346 rng: &mut dyn Rng,
1347 ) -> Vec<LinkManagerAction> {
1348 use rns_core::msgpack::{self, Value};
1349
1350 let arr = match msgpack::unpack_exact(plaintext) {
1352 Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1353 _ => return Vec::new(),
1354 };
1355
1356 let path_hash_bytes = match &arr[1] {
1357 Value::Bin(b) if b.len() == 16 => b,
1358 _ => return Vec::new(),
1359 };
1360 let mut path_hash = [0u8; 16];
1361 path_hash.copy_from_slice(path_hash_bytes);
1362
1363 let request_data = msgpack::pack(&arr[2]);
1365
1366 if self.management_paths.contains(&path_hash) {
1368 let remote_identity = self
1369 .links
1370 .get(link_id)
1371 .and_then(|l| l.remote_identity)
1372 .map(|(h, k)| (h, k));
1373 return vec![LinkManagerAction::ManagementRequest {
1374 link_id: *link_id,
1375 path_hash,
1376 data: request_data,
1377 request_id,
1378 remote_identity,
1379 }];
1380 }
1381
1382 let handler_idx = self
1384 .request_handlers
1385 .iter()
1386 .position(|h| h.path_hash == path_hash);
1387 let handler_idx = match handler_idx {
1388 Some(i) => i,
1389 None => return Vec::new(),
1390 };
1391
1392 let remote_identity = self
1394 .links
1395 .get(link_id)
1396 .and_then(|l| l.remote_identity.as_ref());
1397 let handler = &self.request_handlers[handler_idx];
1398 if let Some(ref allowed) = handler.allowed_list {
1399 match remote_identity {
1400 Some((identity_hash, _)) => {
1401 if !allowed.contains(identity_hash) {
1402 log::debug!("Request denied: identity not in allowed list");
1403 return Vec::new();
1404 }
1405 }
1406 None => {
1407 log::debug!("Request denied: peer not identified");
1408 return Vec::new();
1409 }
1410 }
1411 }
1412
1413 let path = handler.path.clone();
1415 let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1416
1417 let mut actions = Vec::new();
1418 if let Some(response) = response {
1419 match response {
1420 RequestResponse::Bytes(response_data) => {
1421 let mut response_actions =
1422 self.build_response_packet(link_id, &request_id, &response_data, rng);
1423 if response_actions.is_empty() {
1424 response_actions.extend(self.send_response_resource(
1425 link_id,
1426 &request_id,
1427 &response_data,
1428 None,
1429 true,
1430 rng,
1431 ));
1432 }
1433 actions.extend(response_actions);
1434 }
1435 RequestResponse::Resource {
1436 data,
1437 metadata,
1438 auto_compress,
1439 } => {
1440 actions.extend(self.send_response_resource(
1441 link_id,
1442 &request_id,
1443 &data,
1444 metadata.as_deref(),
1445 auto_compress,
1446 rng,
1447 ));
1448 }
1449 }
1450 }
1451
1452 actions
1453 }
1454
1455 fn build_response_packet(
1458 &self,
1459 link_id: &LinkId,
1460 request_id: &[u8; 16],
1461 response_data: &[u8],
1462 rng: &mut dyn Rng,
1463 ) -> Vec<LinkManagerAction> {
1464 use rns_core::msgpack::{self, Value};
1465
1466 let response_value = msgpack::unpack_exact(response_data)
1467 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1468
1469 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1470 let response_plaintext = msgpack::pack(&response_array);
1471
1472 let mut actions = Vec::new();
1473 if let Some(link) = self.links.get(link_id) {
1474 if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1475 let flags = PacketFlags {
1476 header_type: constants::HEADER_1,
1477 context_flag: constants::FLAG_UNSET,
1478 transport_type: constants::TRANSPORT_BROADCAST,
1479 destination_type: constants::DESTINATION_LINK,
1480 packet_type: constants::PACKET_TYPE_DATA,
1481 };
1482 let max_mtu = link.engine.mtu() as usize;
1483 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
1484 flags,
1485 0,
1486 link_id,
1487 None,
1488 constants::CONTEXT_RESPONSE,
1489 &encrypted,
1490 max_mtu,
1491 ) {
1492 actions.push(LinkManagerAction::SendPacket {
1493 raw,
1494 dest_type: constants::DESTINATION_LINK,
1495 attached_interface: None,
1496 });
1497 }
1498 }
1499 }
1500 actions
1501 }
1502
1503 fn send_response_resource(
1504 &mut self,
1505 link_id: &LinkId,
1506 request_id: &[u8; 16],
1507 response_data: &[u8],
1508 metadata: Option<&[u8]>,
1509 auto_compress: bool,
1510 rng: &mut dyn Rng,
1511 ) -> Vec<LinkManagerAction> {
1512 use rns_core::msgpack::{self, Value};
1513
1514 let link = match self.links.get_mut(link_id) {
1515 Some(l) => l,
1516 None => return Vec::new(),
1517 };
1518
1519 if link.engine.state() != LinkState::Active {
1520 return Vec::new();
1521 }
1522
1523 let now = time::now();
1524
1525 let response_value = msgpack::unpack_exact(response_data)
1529 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1530 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1531 let resource_payload = msgpack::pack(&response_array);
1532
1533 let senders = match Self::build_resource_senders(
1534 link,
1535 &resource_payload,
1536 metadata,
1537 auto_compress,
1538 true, Some(request_id.to_vec()),
1540 rng,
1541 now,
1542 ) {
1543 Ok(s) => s,
1544 Err(e) => {
1545 log::debug!("Failed to create response ResourceSender: {}", e);
1546 return Vec::new();
1547 }
1548 };
1549
1550 let adv_actions = Self::start_resource_senders(link, senders, now);
1551
1552 let _ = link;
1553 self.process_resource_actions(link_id, adv_actions, rng)
1554 }
1555
1556 pub fn send_management_response(
1559 &mut self,
1560 link_id: &LinkId,
1561 request_id: &[u8; 16],
1562 response_data: &[u8],
1563 rng: &mut dyn Rng,
1564 ) -> Vec<LinkManagerAction> {
1565 let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1566 if actions.is_empty() {
1567 actions.extend(self.send_response_resource(
1568 link_id,
1569 request_id,
1570 response_data,
1571 None,
1572 true,
1573 rng,
1574 ));
1575 }
1576 actions
1577 }
1578
1579 pub fn send_request(
1587 &self,
1588 link_id: &LinkId,
1589 path: &str,
1590 data: &[u8],
1591 rng: &mut dyn Rng,
1592 ) -> Vec<LinkManagerAction> {
1593 use rns_core::msgpack::{self, Value};
1594
1595 let link = match self.links.get(link_id) {
1596 Some(l) => l,
1597 None => return Vec::new(),
1598 };
1599
1600 if link.engine.state() != LinkState::Active {
1601 return Vec::new();
1602 }
1603
1604 let path_hash = compute_path_hash(path);
1605
1606 let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1608
1609 let request_array = Value::Array(vec![
1611 Value::Float(time::now()),
1612 Value::Bin(path_hash.to_vec()),
1613 data_value,
1614 ]);
1615 let plaintext = msgpack::pack(&request_array);
1616
1617 let encrypted = match link.engine.encrypt(&plaintext, rng) {
1618 Ok(e) => e,
1619 Err(_) => return Vec::new(),
1620 };
1621
1622 let flags = PacketFlags {
1623 header_type: constants::HEADER_1,
1624 context_flag: constants::FLAG_UNSET,
1625 transport_type: constants::TRANSPORT_BROADCAST,
1626 destination_type: constants::DESTINATION_LINK,
1627 packet_type: constants::PACKET_TYPE_DATA,
1628 };
1629
1630 let mut actions = Vec::new();
1631 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1632 flags,
1633 0,
1634 link_id,
1635 None,
1636 constants::CONTEXT_REQUEST,
1637 &encrypted,
1638 ) {
1639 actions.push(LinkManagerAction::SendPacket {
1640 raw,
1641 dest_type: constants::DESTINATION_LINK,
1642 attached_interface: None,
1643 });
1644 }
1645 actions
1646 }
1647
1648 pub fn send_on_link(
1650 &self,
1651 link_id: &LinkId,
1652 plaintext: &[u8],
1653 context: u8,
1654 rng: &mut dyn Rng,
1655 ) -> Vec<LinkManagerAction> {
1656 let link = match self.links.get(link_id) {
1657 Some(l) => l,
1658 None => return Vec::new(),
1659 };
1660
1661 if link.engine.state() != LinkState::Active {
1662 return Vec::new();
1663 }
1664
1665 let encrypted = match link.engine.encrypt(plaintext, rng) {
1666 Ok(e) => e,
1667 Err(_) => return Vec::new(),
1668 };
1669
1670 let flags = PacketFlags {
1671 header_type: constants::HEADER_1,
1672 context_flag: constants::FLAG_UNSET,
1673 transport_type: constants::TRANSPORT_BROADCAST,
1674 destination_type: constants::DESTINATION_LINK,
1675 packet_type: constants::PACKET_TYPE_DATA,
1676 };
1677
1678 let mut actions = Vec::new();
1679 if let Ok((raw, _packet_hash)) =
1680 RawPacket::pack_raw_with_hash(flags, 0, link_id, None, context, &encrypted)
1681 {
1682 actions.push(LinkManagerAction::SendPacket {
1683 raw,
1684 dest_type: constants::DESTINATION_LINK,
1685 attached_interface: None,
1686 });
1687 }
1688 actions
1689 }
1690
1691 pub fn identify(
1693 &self,
1694 link_id: &LinkId,
1695 identity: &rns_crypto::identity::Identity,
1696 rng: &mut dyn Rng,
1697 ) -> Vec<LinkManagerAction> {
1698 let link = match self.links.get(link_id) {
1699 Some(l) => l,
1700 None => return Vec::new(),
1701 };
1702
1703 let encrypted = match link.engine.build_identify(identity, rng) {
1704 Ok(e) => e,
1705 Err(_) => return Vec::new(),
1706 };
1707
1708 let flags = PacketFlags {
1709 header_type: constants::HEADER_1,
1710 context_flag: constants::FLAG_UNSET,
1711 transport_type: constants::TRANSPORT_BROADCAST,
1712 destination_type: constants::DESTINATION_LINK,
1713 packet_type: constants::PACKET_TYPE_DATA,
1714 };
1715
1716 let mut actions = Vec::new();
1717 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1718 flags,
1719 0,
1720 link_id,
1721 None,
1722 constants::CONTEXT_LINKIDENTIFY,
1723 &encrypted,
1724 ) {
1725 actions.push(LinkManagerAction::SendPacket {
1726 raw,
1727 dest_type: constants::DESTINATION_LINK,
1728 attached_interface: None,
1729 });
1730 }
1731 actions
1732 }
1733
1734 pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1736 let link = match self.links.get_mut(link_id) {
1737 Some(l) => l,
1738 None => return Vec::new(),
1739 };
1740
1741 let teardown_actions = link.engine.teardown();
1742 if let Some(ref mut channel) = link.channel {
1743 channel.shutdown();
1744 }
1745
1746 let mut actions = self.process_link_actions(link_id, &teardown_actions);
1747
1748 let flags = PacketFlags {
1750 header_type: constants::HEADER_1,
1751 context_flag: constants::FLAG_UNSET,
1752 transport_type: constants::TRANSPORT_BROADCAST,
1753 destination_type: constants::DESTINATION_LINK,
1754 packet_type: constants::PACKET_TYPE_DATA,
1755 };
1756 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
1757 flags,
1758 0,
1759 link_id,
1760 None,
1761 constants::CONTEXT_LINKCLOSE,
1762 &[],
1763 ) {
1764 actions.push(LinkManagerAction::SendPacket {
1765 raw,
1766 dest_type: constants::DESTINATION_LINK,
1767 attached_interface: None,
1768 });
1769 }
1770
1771 actions
1772 }
1773
1774 pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1776 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1777 let mut actions = Vec::new();
1778 for link_id in link_ids {
1779 actions.extend(self.teardown_link(&link_id));
1780 }
1781 actions
1782 }
1783
1784 fn handle_response(
1786 &self,
1787 link_id: &LinkId,
1788 plaintext: &[u8],
1789 metadata: Option<Vec<u8>>,
1790 ) -> Vec<LinkManagerAction> {
1791 use rns_core::msgpack;
1792
1793 let arr = match msgpack::unpack_exact(plaintext) {
1795 Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1796 _ => return Vec::new(),
1797 };
1798
1799 let request_id_bytes = match &arr[0] {
1800 msgpack::Value::Bin(b) if b.len() == 16 => b,
1801 _ => return Vec::new(),
1802 };
1803 let mut request_id = [0u8; 16];
1804 request_id.copy_from_slice(request_id_bytes);
1805
1806 let response_data = msgpack::pack(&arr[1]);
1807
1808 vec![LinkManagerAction::ResponseReceived {
1809 link_id: *link_id,
1810 request_id,
1811 data: response_data,
1812 metadata,
1813 }]
1814 }
1815
1816 fn build_resource_senders(
1817 link: &ManagedLink,
1818 data: &[u8],
1819 metadata: Option<&[u8]>,
1820 auto_compress: bool,
1821 is_response: bool,
1822 request_id: Option<Vec<u8>>,
1823 rng: &mut dyn Rng,
1824 now: f64,
1825 ) -> Result<Vec<ResourceSender>, rns_core::resource::ResourceError> {
1826 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1827 let resource_sdu = Self::resource_sdu_for_link(link);
1828 let metadata_overhead = metadata.map(|m| 3 + m.len()).unwrap_or(0);
1829 let logical_size = metadata_overhead + data.len();
1830
1831 if logical_size <= constants::RESOURCE_MAX_EFFICIENT_SIZE {
1832 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1833 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1834 link.engine
1835 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1836 .unwrap_or_else(|_| plaintext.to_vec())
1837 };
1838 return ResourceSender::new(
1839 data,
1840 metadata,
1841 resource_sdu,
1842 &encrypt_fn,
1843 &Bzip2Compressor,
1844 rng,
1845 now,
1846 auto_compress,
1847 is_response,
1848 request_id,
1849 1,
1850 1,
1851 None,
1852 link_rtt,
1853 6.0,
1854 )
1855 .map(|sender| vec![sender]);
1856 }
1857
1858 if metadata_overhead > constants::RESOURCE_MAX_EFFICIENT_SIZE {
1859 return Err(rns_core::resource::ResourceError::TooLarge);
1860 }
1861
1862 let first_payload_len = core::cmp::min(
1863 data.len(),
1864 constants::RESOURCE_MAX_EFFICIENT_SIZE - metadata_overhead,
1865 );
1866 let remaining = data.len().saturating_sub(first_payload_len);
1867 let total_segments = 1 + remaining.div_ceil(constants::RESOURCE_MAX_EFFICIENT_SIZE) as u64;
1868
1869 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1870 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1871 link.engine
1872 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1873 .unwrap_or_else(|_| plaintext.to_vec())
1874 };
1875
1876 let mut senders = Vec::new();
1877 let mut first = ResourceSender::new(
1878 &data[..first_payload_len],
1879 metadata,
1880 resource_sdu,
1881 &encrypt_fn,
1882 &Bzip2Compressor,
1883 rng,
1884 now,
1885 auto_compress,
1886 is_response,
1887 request_id.clone(),
1888 1,
1889 total_segments,
1890 None,
1891 link_rtt,
1892 6.0,
1893 )?;
1894 first.data_size = logical_size;
1895 let original_hash = first.original_hash;
1896 let has_metadata = metadata.is_some();
1897 senders.push(first);
1898
1899 let mut offset = first_payload_len;
1900 let mut segment_index = 2;
1901 while offset < data.len() {
1902 let end = core::cmp::min(offset + constants::RESOURCE_MAX_EFFICIENT_SIZE, data.len());
1903 let mut sender = ResourceSender::new(
1904 &data[offset..end],
1905 None,
1906 resource_sdu,
1907 &encrypt_fn,
1908 &Bzip2Compressor,
1909 rng,
1910 now,
1911 auto_compress,
1912 is_response,
1913 request_id.clone(),
1914 segment_index,
1915 total_segments,
1916 Some(original_hash),
1917 link_rtt,
1918 6.0,
1919 )?;
1920 sender.data_size = logical_size;
1921 sender.flags.has_metadata = has_metadata;
1922 senders.push(sender);
1923 segment_index += 1;
1924 offset = end;
1925 }
1926
1927 Ok(senders)
1928 }
1929
1930 fn start_resource_senders(
1931 link: &mut ManagedLink,
1932 mut senders: Vec<ResourceSender>,
1933 now: f64,
1934 ) -> Vec<ResourceAction> {
1935 if senders.is_empty() {
1936 return Vec::new();
1937 }
1938
1939 let mut first = senders.remove(0);
1940 let adv_actions = first.advertise(now);
1941
1942 if first.total_segments > 1 {
1943 let original_hash = first.original_hash;
1944 let split = OutgoingSplitTransfer {
1945 total_segments: first.total_segments,
1946 completed_segments: 0,
1947 current_segment_index: first.segment_index,
1948 current_sent_parts: 0,
1949 current_total_parts: first.total_parts(),
1950 };
1951 link.outgoing_splits.insert(original_hash, split);
1952 }
1953
1954 link.outgoing_resources.push(first);
1955 link.outgoing_resources.extend(senders);
1956 adv_actions
1957 }
1958
1959 fn handle_resource_adv(
1961 &mut self,
1962 link_id: &LinkId,
1963 adv_plaintext: &[u8],
1964 rng: &mut dyn Rng,
1965 ) -> Vec<LinkManagerAction> {
1966 let link = match self.links.get_mut(link_id) {
1967 Some(l) => l,
1968 None => return Vec::new(),
1969 };
1970
1971 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1972 let resource_sdu = Self::resource_sdu_for_link(link);
1973 let now = time::now();
1974
1975 let receiver = match ResourceReceiver::from_advertisement(
1976 adv_plaintext,
1977 resource_sdu,
1978 link_rtt,
1979 now,
1980 None,
1981 None,
1982 ) {
1983 Ok(r) => r,
1984 Err(e) => {
1985 log::debug!("Resource ADV rejected: {}", e);
1986 return Vec::new();
1987 }
1988 };
1989
1990 let strategy = link.resource_strategy;
1991 let resource_hash = receiver.resource_hash.clone();
1992 let transfer_size = receiver.transfer_size;
1993 let has_metadata = receiver.has_metadata;
1994 let is_response = receiver.flags.is_response;
1995 let is_split = receiver.flags.split;
1996 let segment_index = receiver.segment_index;
1997 let total_segments = receiver.total_segments;
1998 let original_hash = match Self::resource_hash_key(&receiver.original_hash) {
1999 Some(key) => key,
2000 None => return Vec::new(),
2001 };
2002
2003 if is_split && segment_index > 1 {
2004 let should_accept = link
2005 .incoming_splits
2006 .get(&original_hash)
2007 .is_some_and(|split| {
2008 split.completed_segments + 1 == segment_index
2009 && split.total_segments == total_segments
2010 });
2011
2012 if !should_accept {
2013 let reject_actions = {
2014 let mut r = receiver;
2015 r.reject()
2016 };
2017 let _ = link;
2018 return self.process_resource_actions(link_id, reject_actions, rng);
2019 }
2020
2021 let current_total_parts = receiver.total_parts;
2022 link.incoming_resources.push(receiver);
2023 let idx = link.incoming_resources.len() - 1;
2024 if let Some(split) = link.incoming_splits.get_mut(&original_hash) {
2025 split.current_segment_index = segment_index;
2026 split.current_received_parts = 0;
2027 split.current_total_parts = current_total_parts;
2028 }
2029 let resource_actions = link.incoming_resources[idx].accept(now);
2030 let _ = link;
2031 return self.process_resource_actions(link_id, resource_actions, rng);
2032 }
2033
2034 if is_response {
2035 if is_split {
2038 link.incoming_splits.insert(
2039 original_hash,
2040 IncomingSplitTransfer {
2041 total_segments,
2042 completed_segments: 0,
2043 current_segment_index: segment_index,
2044 current_received_parts: 0,
2045 current_total_parts: receiver.total_parts,
2046 data: Vec::new(),
2047 metadata: None,
2048 is_response,
2049 },
2050 );
2051 }
2052 link.incoming_resources.push(receiver);
2053 let idx = link.incoming_resources.len() - 1;
2054 let resource_actions = link.incoming_resources[idx].accept(now);
2055 let _ = link;
2056 return self.process_resource_actions(link_id, resource_actions, rng);
2057 }
2058
2059 match strategy {
2060 ResourceStrategy::AcceptNone => {
2061 let reject_actions = {
2063 let mut r = receiver;
2064 r.reject()
2065 };
2066 self.process_resource_actions(link_id, reject_actions, rng)
2067 }
2068 ResourceStrategy::AcceptAll => {
2069 if is_split {
2070 link.incoming_splits.insert(
2071 original_hash,
2072 IncomingSplitTransfer {
2073 total_segments,
2074 completed_segments: 0,
2075 current_segment_index: segment_index,
2076 current_received_parts: 0,
2077 current_total_parts: receiver.total_parts,
2078 data: Vec::new(),
2079 metadata: None,
2080 is_response,
2081 },
2082 );
2083 }
2084 link.incoming_resources.push(receiver);
2085 let idx = link.incoming_resources.len() - 1;
2086 let resource_actions = link.incoming_resources[idx].accept(now);
2087 let _ = link;
2088 self.process_resource_actions(link_id, resource_actions, rng)
2089 }
2090 ResourceStrategy::AcceptApp => {
2091 link.incoming_resources.push(receiver);
2092 vec![LinkManagerAction::ResourceAcceptQuery {
2094 link_id: *link_id,
2095 resource_hash,
2096 transfer_size,
2097 has_metadata,
2098 }]
2099 }
2100 }
2101 }
2102
2103 pub fn accept_resource(
2105 &mut self,
2106 link_id: &LinkId,
2107 resource_hash: &[u8],
2108 accept: bool,
2109 rng: &mut dyn Rng,
2110 ) -> Vec<LinkManagerAction> {
2111 let link = match self.links.get_mut(link_id) {
2112 Some(l) => l,
2113 None => return Vec::new(),
2114 };
2115
2116 let now = time::now();
2117 let idx = link
2118 .incoming_resources
2119 .iter()
2120 .position(|r| r.resource_hash == resource_hash);
2121 let idx = match idx {
2122 Some(i) => i,
2123 None => return Vec::new(),
2124 };
2125
2126 if accept && link.incoming_resources[idx].flags.split {
2127 if let Some(original_hash) =
2128 Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2129 {
2130 link.incoming_splits
2131 .entry(original_hash)
2132 .or_insert_with(|| IncomingSplitTransfer {
2133 total_segments: link.incoming_resources[idx].total_segments,
2134 completed_segments: 0,
2135 current_segment_index: link.incoming_resources[idx].segment_index,
2136 current_received_parts: 0,
2137 current_total_parts: link.incoming_resources[idx].total_parts,
2138 data: Vec::new(),
2139 metadata: None,
2140 is_response: link.incoming_resources[idx].flags.is_response,
2141 });
2142 }
2143 }
2144
2145 let resource_actions = if accept {
2146 link.incoming_resources[idx].accept(now)
2147 } else {
2148 link.incoming_resources[idx].reject()
2149 };
2150
2151 let _ = link;
2152 self.process_resource_actions(link_id, resource_actions, rng)
2153 }
2154
2155 fn handle_resource_req(
2157 &mut self,
2158 link_id: &LinkId,
2159 plaintext: &[u8],
2160 rng: &mut dyn Rng,
2161 ) -> Vec<LinkManagerAction> {
2162 let link = match self.links.get_mut(link_id) {
2163 Some(l) => l,
2164 None => return Vec::new(),
2165 };
2166
2167 let now = time::now();
2168 let mut all_actions = Vec::new();
2169 let mut progress_update = None;
2170 for sender in &mut link.outgoing_resources {
2171 if sender.flags.split && sender.status == rns_core::resource::ResourceStatus::Queued {
2172 continue;
2173 }
2174 let before_sent = sender.sent_parts;
2175 let resource_actions = sender.handle_request(plaintext, now);
2176 if !resource_actions.is_empty() {
2177 if sender.sent_parts != before_sent {
2178 if sender.flags.split {
2179 if let Some(split) = link.outgoing_splits.get_mut(&sender.original_hash) {
2180 split.current_segment_index = sender.segment_index;
2181 split.current_sent_parts = sender.sent_parts;
2182 split.current_total_parts = sender.total_parts();
2183 progress_update =
2184 Some(Self::outgoing_split_progress(split, sender.sdu));
2185 }
2186 } else {
2187 progress_update = Some((sender.sent_parts, sender.total_parts()));
2188 }
2189 }
2190 all_actions.extend(resource_actions);
2191 break;
2192 }
2193 }
2194
2195 let _ = link;
2196 let mut out = self.process_resource_actions(link_id, all_actions, rng);
2197 if let Some((received, total)) = progress_update {
2198 out.push(LinkManagerAction::ResourceProgress {
2199 link_id: *link_id,
2200 received,
2201 total,
2202 });
2203 }
2204 out
2205 }
2206
2207 fn handle_resource_hmu(
2209 &mut self,
2210 link_id: &LinkId,
2211 plaintext: &[u8],
2212 rng: &mut dyn Rng,
2213 ) -> Vec<LinkManagerAction> {
2214 let link = match self.links.get_mut(link_id) {
2215 Some(l) => l,
2216 None => return Vec::new(),
2217 };
2218
2219 let now = time::now();
2220 let mut all_actions = Vec::new();
2221 for receiver in &mut link.incoming_resources {
2222 let resource_actions = receiver.handle_hashmap_update(plaintext, now);
2223 if !resource_actions.is_empty() {
2224 all_actions.extend(resource_actions);
2225 break;
2226 }
2227 }
2228
2229 let _ = link;
2230 self.process_resource_actions(link_id, all_actions, rng)
2231 }
2232
2233 fn handle_resource_part(
2235 &mut self,
2236 link_id: &LinkId,
2237 raw_data: &[u8],
2238 rng: &mut dyn Rng,
2239 ) -> Vec<LinkManagerAction> {
2240 let link = match self.links.get_mut(link_id) {
2241 Some(l) => l,
2242 None => return Vec::new(),
2243 };
2244
2245 let now = time::now();
2246 let resource_sdu = Self::resource_sdu_for_link(link);
2247 let mut all_actions = Vec::new();
2248 let mut assemble_idx = None;
2249 let mut assembled_is_response = false;
2250
2251 for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
2252 if receiver.status >= rns_core::resource::ResourceStatus::Complete {
2253 continue;
2254 }
2255 let resource_actions = receiver.receive_part(raw_data, now);
2256 if !resource_actions.is_empty() {
2257 if receiver.received_count == receiver.total_parts {
2258 assemble_idx = Some(idx);
2259 }
2260 if receiver.flags.split {
2261 if let Some(key) = Self::resource_hash_key(&receiver.original_hash) {
2262 if let Some(split) = link.incoming_splits.get_mut(&key) {
2263 split.current_segment_index = receiver.segment_index;
2264 split.current_received_parts = receiver.received_count;
2265 split.current_total_parts = receiver.total_parts;
2266 let (received, total) =
2267 Self::incoming_split_progress(split, resource_sdu);
2268 for action in resource_actions {
2269 match action {
2270 ResourceAction::ProgressUpdate { .. } => {
2271 all_actions.push(ResourceAction::ProgressUpdate {
2272 received,
2273 total,
2274 });
2275 }
2276 other => all_actions.push(other),
2277 }
2278 }
2279 } else {
2280 all_actions.extend(resource_actions);
2281 }
2282 } else {
2283 all_actions.extend(resource_actions);
2284 }
2285 } else {
2286 all_actions.extend(resource_actions);
2287 }
2288 break;
2289 }
2290 }
2291
2292 if let Some(idx) = assemble_idx {
2293 let split_key = if link.incoming_resources[idx].flags.split {
2294 Self::resource_hash_key(&link.incoming_resources[idx].original_hash)
2295 } else {
2296 None
2297 };
2298 let split_segment_index = link.incoming_resources[idx].segment_index;
2299 let split_segment_total = link.incoming_resources[idx].total_segments;
2300 let split_segment_parts = link.incoming_resources[idx].total_parts;
2301 let split_is_response = link.incoming_resources[idx].flags.is_response;
2302 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2303 link.engine.decrypt(ciphertext).map_err(|_| ())
2304 };
2305 let mut assemble_actions =
2306 link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
2307 assembled_is_response = split_is_response;
2308
2309 if let Some(key) = split_key {
2310 let mut converted_actions = Vec::new();
2311 let mut segment_data = None;
2312 let mut segment_metadata = None;
2313 for action in assemble_actions {
2314 match action {
2315 ResourceAction::DataReceived { data, metadata } => {
2316 segment_data = Some(data);
2317 segment_metadata = metadata;
2318 }
2319 ResourceAction::Completed => {}
2320 other => converted_actions.push(other),
2321 }
2322 }
2323
2324 if let Some(data) = segment_data {
2325 if let Some(split) = link.incoming_splits.get_mut(&key) {
2326 split.data.extend_from_slice(&data);
2327 if segment_metadata.is_some() {
2328 split.metadata = segment_metadata;
2329 }
2330 split.completed_segments = split_segment_index;
2331 split.current_segment_index = split_segment_index;
2332 split.current_received_parts = split_segment_parts;
2333 split.current_total_parts = split_segment_parts;
2334 }
2335
2336 if split_segment_index == split_segment_total {
2337 if let Some(split) = link.incoming_splits.remove(&key) {
2338 assembled_is_response = split.is_response;
2339 converted_actions.push(ResourceAction::DataReceived {
2340 data: split.data,
2341 metadata: split.metadata,
2342 });
2343 converted_actions.push(ResourceAction::Completed);
2344 }
2345 }
2346 }
2347
2348 assemble_actions = converted_actions;
2349 }
2350 all_actions.extend(assemble_actions);
2351 }
2352
2353 let _ = link;
2354 let mut out = self.process_resource_actions(link_id, all_actions, rng);
2355
2356 if assembled_is_response {
2357 let mut converted = Vec::new();
2358 for action in out {
2359 match action {
2360 LinkManagerAction::ResourceReceived { data, metadata, .. } => {
2361 converted.extend(self.handle_response(link_id, &data, metadata));
2362 }
2363 LinkManagerAction::ResourceAcceptQuery { .. } => {
2364 }
2366 other => converted.push(other),
2367 }
2368 }
2369 out = converted;
2370 }
2371
2372 out
2373 }
2374
2375 fn handle_resource_prf(
2377 &mut self,
2378 link_id: &LinkId,
2379 plaintext: &[u8],
2380 rng: &mut dyn Rng,
2381 ) -> Vec<LinkManagerAction> {
2382 let link = match self.links.get_mut(link_id) {
2383 Some(l) => l,
2384 None => return Vec::new(),
2385 };
2386
2387 let now = time::now();
2388 let mut result_actions = Vec::new();
2389 let mut completed_sender = None;
2390 let mut failed_split = None;
2391 let proof_hash = plaintext.get(..32);
2392 for sender in &mut link.outgoing_resources {
2393 if proof_hash.is_some_and(|hash| hash != sender.resource_hash.as_slice()) {
2394 continue;
2395 }
2396 let resource_actions = sender.handle_proof(plaintext, now);
2397 if !resource_actions.is_empty() {
2398 if resource_actions
2399 .iter()
2400 .any(|action| matches!(action, ResourceAction::Completed))
2401 {
2402 completed_sender = Some((
2403 sender.original_hash,
2404 sender.segment_index,
2405 sender.total_segments,
2406 sender.total_parts(),
2407 ));
2408 }
2409 if sender.flags.split
2410 && resource_actions
2411 .iter()
2412 .any(|action| matches!(action, ResourceAction::Failed(_)))
2413 {
2414 failed_split = Some(sender.original_hash);
2415 }
2416 result_actions.extend(resource_actions);
2417 break;
2418 }
2419 }
2420
2421 let mut actions = Vec::new();
2423 let mut advertise_next = None;
2424 for ra in result_actions {
2425 match ra {
2426 ResourceAction::Completed => {
2427 if let Some((original_hash, segment_index, total_segments, total_parts)) =
2428 completed_sender
2429 {
2430 if total_segments > 1 && segment_index < total_segments {
2431 if let Some(split) = link.outgoing_splits.get_mut(&original_hash) {
2432 split.completed_segments = segment_index;
2433 split.current_segment_index = segment_index;
2434 split.current_sent_parts = total_parts;
2435 split.current_total_parts = total_parts;
2436 if let Some(next) = link.outgoing_resources.iter_mut().find(|s| {
2437 s.flags.split
2438 && s.original_hash == original_hash
2439 && s.segment_index == segment_index + 1
2440 }) {
2441 split.current_segment_index = next.segment_index;
2442 split.current_sent_parts = 0;
2443 split.current_total_parts = next.total_parts();
2444 advertise_next = Some(next.advertise(now));
2445 }
2446 }
2447 } else {
2448 link.outgoing_splits.remove(&original_hash);
2449 actions
2450 .push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2451 }
2452 } else {
2453 actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2454 }
2455 }
2456 ResourceAction::Failed(e) => {
2457 if let Some(original_hash) = failed_split {
2458 link.outgoing_splits.remove(&original_hash);
2459 }
2460 actions.push(LinkManagerAction::ResourceFailed {
2461 link_id: *link_id,
2462 error: format!("{}", e),
2463 });
2464 }
2465 _ => {}
2466 }
2467 }
2468
2469 link.outgoing_resources
2471 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2472
2473 let _ = link;
2474 if let Some(next_actions) = advertise_next {
2475 actions.extend(self.process_resource_actions(link_id, next_actions, rng));
2476 }
2477
2478 actions
2479 }
2480
2481 fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2483 let link = match self.links.get_mut(link_id) {
2484 Some(l) => l,
2485 None => return Vec::new(),
2486 };
2487
2488 let mut actions = Vec::new();
2489 for receiver in &mut link.incoming_resources {
2490 let ra = receiver.handle_cancel();
2491 for a in ra {
2492 if let ResourceAction::Failed(ref e) = a {
2493 actions.push(LinkManagerAction::ResourceFailed {
2494 link_id: *link_id,
2495 error: format!("{}", e),
2496 });
2497 }
2498 }
2499 }
2500 link.incoming_resources
2501 .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
2502 link.incoming_splits.clear();
2503 actions
2504 }
2505
2506 fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
2508 let link = match self.links.get_mut(link_id) {
2509 Some(l) => l,
2510 None => return Vec::new(),
2511 };
2512
2513 let mut actions = Vec::new();
2514 for sender in &mut link.outgoing_resources {
2515 let ra = sender.handle_reject();
2516 for a in ra {
2517 if let ResourceAction::Failed(ref e) = a {
2518 actions.push(LinkManagerAction::ResourceFailed {
2519 link_id: *link_id,
2520 error: format!("{}", e),
2521 });
2522 }
2523 }
2524 }
2525 link.outgoing_resources
2526 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2527 link.outgoing_splits.clear();
2528 actions
2529 }
2530
2531 fn process_resource_actions(
2533 &mut self,
2534 link_id: &LinkId,
2535 actions: Vec<ResourceAction>,
2536 rng: &mut dyn Rng,
2537 ) -> Vec<LinkManagerAction> {
2538 let mut result = Vec::new();
2539 for action in actions {
2540 match action {
2541 ResourceAction::SendAdvertisement(data) => {
2542 let encrypted = self
2544 .links
2545 .get(link_id)
2546 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2547 if let Some(encrypted) = encrypted {
2548 result.extend(self.build_link_packet(
2549 link_id,
2550 constants::CONTEXT_RESOURCE_ADV,
2551 &encrypted,
2552 ));
2553 }
2554 }
2555 ResourceAction::SendPart(data) => {
2556 result.extend(self.build_link_packet(
2558 link_id,
2559 constants::CONTEXT_RESOURCE,
2560 &data,
2561 ));
2562 }
2563 ResourceAction::SendRequest(data) => {
2564 let encrypted = self
2565 .links
2566 .get(link_id)
2567 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2568 if let Some(encrypted) = encrypted {
2569 result.extend(self.build_link_packet(
2570 link_id,
2571 constants::CONTEXT_RESOURCE_REQ,
2572 &encrypted,
2573 ));
2574 }
2575 }
2576 ResourceAction::SendHmu(data) => {
2577 let encrypted = self
2578 .links
2579 .get(link_id)
2580 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2581 if let Some(encrypted) = encrypted {
2582 result.extend(self.build_link_packet(
2583 link_id,
2584 constants::CONTEXT_RESOURCE_HMU,
2585 &encrypted,
2586 ));
2587 }
2588 }
2589 ResourceAction::SendProof(data) => {
2590 let encrypted = self
2591 .links
2592 .get(link_id)
2593 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2594 if let Some(encrypted) = encrypted {
2595 result.extend(self.build_link_packet(
2596 link_id,
2597 constants::CONTEXT_RESOURCE_PRF,
2598 &encrypted,
2599 ));
2600 }
2601 }
2602 ResourceAction::SendCancelInitiator(data) => {
2603 let encrypted = self
2604 .links
2605 .get(link_id)
2606 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2607 if let Some(encrypted) = encrypted {
2608 result.extend(self.build_link_packet(
2609 link_id,
2610 constants::CONTEXT_RESOURCE_ICL,
2611 &encrypted,
2612 ));
2613 }
2614 }
2615 ResourceAction::SendCancelReceiver(data) => {
2616 let encrypted = self
2617 .links
2618 .get(link_id)
2619 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2620 if let Some(encrypted) = encrypted {
2621 result.extend(self.build_link_packet(
2622 link_id,
2623 constants::CONTEXT_RESOURCE_RCL,
2624 &encrypted,
2625 ));
2626 }
2627 }
2628 ResourceAction::DataReceived { data, metadata } => {
2629 result.push(LinkManagerAction::ResourceReceived {
2630 link_id: *link_id,
2631 data,
2632 metadata,
2633 });
2634 }
2635 ResourceAction::Completed => {
2636 result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2637 }
2638 ResourceAction::Failed(e) => {
2639 result.push(LinkManagerAction::ResourceFailed {
2640 link_id: *link_id,
2641 error: format!("{}", e),
2642 });
2643 }
2644 ResourceAction::TeardownLink => {
2645 let teardown_actions = match self.links.get_mut(link_id) {
2646 Some(link) => link.engine.handle_teardown(),
2647 None => Vec::new(),
2648 };
2649 result.extend(self.process_link_actions(link_id, &teardown_actions));
2650 }
2651 ResourceAction::ProgressUpdate { received, total } => {
2652 result.push(LinkManagerAction::ResourceProgress {
2653 link_id: *link_id,
2654 received,
2655 total,
2656 });
2657 }
2658 }
2659 }
2660 result
2661 }
2662
2663 fn build_link_packet(
2665 &self,
2666 link_id: &LinkId,
2667 context: u8,
2668 data: &[u8],
2669 ) -> Vec<LinkManagerAction> {
2670 let flags = PacketFlags {
2671 header_type: constants::HEADER_1,
2672 context_flag: constants::FLAG_UNSET,
2673 transport_type: constants::TRANSPORT_BROADCAST,
2674 destination_type: constants::DESTINATION_LINK,
2675 packet_type: constants::PACKET_TYPE_DATA,
2676 };
2677 let mut actions = Vec::new();
2678 let max_mtu = self
2679 .links
2680 .get(link_id)
2681 .map(|l| l.engine.mtu() as usize)
2682 .unwrap_or(constants::MTU);
2683 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash_with_max_mtu(
2684 flags, 0, link_id, None, context, data, max_mtu,
2685 ) {
2686 actions.push(LinkManagerAction::SendPacket {
2687 raw,
2688 dest_type: constants::DESTINATION_LINK,
2689 attached_interface: None,
2690 });
2691 }
2692 actions
2693 }
2694
2695 pub fn send_resource(
2697 &mut self,
2698 link_id: &LinkId,
2699 data: &[u8],
2700 metadata: Option<&[u8]>,
2701 rng: &mut dyn Rng,
2702 ) -> Vec<LinkManagerAction> {
2703 self.send_resource_with_auto_compress(link_id, data, metadata, true, rng)
2704 }
2705
2706 pub fn send_resource_with_auto_compress(
2708 &mut self,
2709 link_id: &LinkId,
2710 data: &[u8],
2711 metadata: Option<&[u8]>,
2712 auto_compress: bool,
2713 rng: &mut dyn Rng,
2714 ) -> Vec<LinkManagerAction> {
2715 let link = match self.links.get_mut(link_id) {
2716 Some(l) => l,
2717 None => return Vec::new(),
2718 };
2719
2720 if link.engine.state() != LinkState::Active {
2721 return Vec::new();
2722 }
2723
2724 let now = time::now();
2725
2726 let senders = match Self::build_resource_senders(
2727 link,
2728 data,
2729 metadata,
2730 auto_compress,
2731 false, None, rng,
2734 now,
2735 ) {
2736 Ok(s) => s,
2737 Err(e) => {
2738 log::debug!("Failed to create ResourceSender: {}", e);
2739 return Vec::new();
2740 }
2741 };
2742
2743 let adv_actions = Self::start_resource_senders(link, senders, now);
2744
2745 let _ = link;
2746 self.process_resource_actions(link_id, adv_actions, rng)
2747 }
2748
2749 pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2751 if let Some(link) = self.links.get_mut(link_id) {
2752 link.resource_strategy = strategy;
2753 }
2754 }
2755
2756 pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2759 if let Some(link) = self.links.get_mut(link_id) {
2760 if let Some(ref mut channel) = link.channel {
2761 channel.flush_tx();
2762 }
2763 }
2764 }
2765
2766 pub fn send_channel_message(
2768 &mut self,
2769 link_id: &LinkId,
2770 msgtype: u16,
2771 payload: &[u8],
2772 rng: &mut dyn Rng,
2773 ) -> Result<Vec<LinkManagerAction>, String> {
2774 let link = match self.links.get_mut(link_id) {
2775 Some(l) => l,
2776 None => return Err("unknown link".to_string()),
2777 };
2778
2779 let channel = match link.channel {
2780 Some(ref mut ch) => ch,
2781 None => return Err("link has no active channel".to_string()),
2782 };
2783
2784 let link_mdu = link.engine.mdu();
2785 let now = time::now();
2786 let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2787 Ok(a) => {
2788 link.channel_send_ok += 1;
2789 a
2790 }
2791 Err(e) => {
2792 log::debug!("Channel send failed: {:?}", e);
2793 match e {
2794 rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2795 rns_core::channel::ChannelError::MessageTooBig => {
2796 link.channel_send_too_big += 1;
2797 }
2798 rns_core::channel::ChannelError::InvalidEnvelope => {
2799 link.channel_send_other_error += 1;
2800 }
2801 }
2802 return Err(e.to_string());
2803 }
2804 };
2805
2806 let _ = link;
2807 Ok(self.process_channel_actions(link_id, chan_actions, rng))
2808 }
2809
2810 pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2812 let now = time::now();
2813 let mut all_actions = Vec::new();
2814
2815 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2817
2818 for link_id in &link_ids {
2819 let link = match self.links.get_mut(link_id) {
2820 Some(l) => l,
2821 None => continue,
2822 };
2823
2824 let tick_actions = link.engine.tick(now);
2826 all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2827
2828 let link = match self.links.get_mut(link_id) {
2830 Some(l) => l,
2831 None => continue,
2832 };
2833 if link.engine.needs_keepalive(now) {
2834 let flags = PacketFlags {
2836 header_type: constants::HEADER_1,
2837 context_flag: constants::FLAG_UNSET,
2838 transport_type: constants::TRANSPORT_BROADCAST,
2839 destination_type: constants::DESTINATION_LINK,
2840 packet_type: constants::PACKET_TYPE_DATA,
2841 };
2842 if let Ok((raw, _packet_hash)) = RawPacket::pack_raw_with_hash(
2843 flags,
2844 0,
2845 link_id,
2846 None,
2847 constants::CONTEXT_KEEPALIVE,
2848 &[],
2849 ) {
2850 all_actions.push(LinkManagerAction::SendPacket {
2851 raw,
2852 dest_type: constants::DESTINATION_LINK,
2853 attached_interface: None,
2854 });
2855 link.engine.record_outbound(now, true);
2856 }
2857 }
2858
2859 if let Some(channel) = link.channel.as_mut() {
2860 let chan_actions = channel.tick(now);
2861 let _ = channel;
2862 let _ = link;
2863 all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2864 }
2865 }
2866
2867 for link_id in &link_ids {
2869 let link = match self.links.get_mut(link_id) {
2870 Some(l) => l,
2871 None => continue,
2872 };
2873
2874 let mut sender_actions = Vec::new();
2876 for sender in &mut link.outgoing_resources {
2877 sender_actions.extend(sender.tick(now));
2878 }
2879
2880 let mut receiver_actions = Vec::new();
2882 for receiver in &mut link.incoming_resources {
2883 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2884 link.engine.decrypt(ciphertext).map_err(|_| ())
2885 };
2886 receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2887 }
2888
2889 link.outgoing_resources
2891 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2892 link.incoming_resources
2893 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2894 let active_split_hashes: Vec<[u8; 32]> = link
2895 .outgoing_resources
2896 .iter()
2897 .filter(|s| s.flags.split)
2898 .map(|s| s.original_hash)
2899 .collect();
2900 link.outgoing_splits.retain(|original_hash, split| {
2901 split.completed_segments < split.total_segments
2902 && active_split_hashes.contains(original_hash)
2903 });
2904
2905 let _ = link;
2906 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2907 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2908 }
2909
2910 let closed: Vec<LinkId> = self
2912 .links
2913 .iter()
2914 .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2915 .map(|(id, _)| *id)
2916 .collect();
2917 for id in closed {
2918 self.links.remove(&id);
2919 all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2920 }
2921
2922 all_actions
2923 }
2924
2925 pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2927 self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2928 }
2929
2930 pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2932 self.links.get(link_id).map(|l| l.engine.state())
2933 }
2934
2935 pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2937 self.links.get(link_id).and_then(|l| l.engine.rtt())
2938 }
2939
2940 pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2942 if let Some(link) = self.links.get_mut(link_id) {
2943 link.engine.set_rtt(rtt);
2944 }
2945 }
2946
2947 pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2949 if let Some(link) = self.links.get_mut(link_id) {
2950 link.engine.record_inbound(time::now());
2951 }
2952 }
2953
2954 pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2956 if let Some(link) = self.links.get_mut(link_id) {
2957 link.engine.set_mtu(mtu);
2958 }
2959 }
2960
2961 pub fn link_count(&self) -> usize {
2963 self.links.len()
2964 }
2965
2966 pub fn resource_transfer_count(&self) -> usize {
2968 self.links
2969 .values()
2970 .map(|managed| {
2971 managed
2972 .incoming_resources
2973 .iter()
2974 .filter(|resource| !resource.flags.split)
2975 .count()
2976 + managed.incoming_splits.len()
2977 + managed
2978 .outgoing_resources
2979 .iter()
2980 .filter(|resource| !resource.flags.split)
2981 .count()
2982 + managed.outgoing_splits.len()
2983 })
2984 .sum()
2985 }
2986
2987 pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2989 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2990 let mut all_actions = Vec::new();
2991
2992 for link_id in &link_ids {
2993 let link = match self.links.get_mut(link_id) {
2994 Some(l) => l,
2995 None => continue,
2996 };
2997
2998 let mut sender_actions = Vec::new();
2999 for sender in &mut link.outgoing_resources {
3000 sender_actions.extend(sender.cancel());
3001 }
3002
3003 let mut receiver_actions = Vec::new();
3004 for receiver in &mut link.incoming_resources {
3005 receiver_actions.extend(receiver.reject());
3006 }
3007
3008 link.outgoing_resources
3009 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
3010 link.incoming_resources
3011 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
3012 link.outgoing_splits.clear();
3013 link.incoming_splits.clear();
3014
3015 let _ = link;
3016 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
3017 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
3018 }
3019
3020 all_actions
3021 }
3022
3023 pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
3025 self.links
3026 .iter()
3027 .map(|(link_id, managed)| {
3028 let state = match managed.engine.state() {
3029 LinkState::Pending => "pending",
3030 LinkState::Handshake => "handshake",
3031 LinkState::Active => "active",
3032 LinkState::Stale => "stale",
3033 LinkState::Closed => "closed",
3034 };
3035 crate::event::LinkInfoEntry {
3036 link_id: *link_id,
3037 state: state.to_string(),
3038 is_initiator: managed.engine.is_initiator(),
3039 dest_hash: managed.dest_hash,
3040 remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
3041 rtt: managed.engine.rtt(),
3042 channel_window: managed.channel.as_ref().map(|c| c.window()),
3043 channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
3044 pending_channel_packets: managed.pending_channel_packets.len(),
3045 channel_send_ok: managed.channel_send_ok,
3046 channel_send_not_ready: managed.channel_send_not_ready,
3047 channel_send_too_big: managed.channel_send_too_big,
3048 channel_send_other_error: managed.channel_send_other_error,
3049 channel_messages_received: managed.channel_messages_received,
3050 channel_proofs_sent: managed.channel_proofs_sent,
3051 channel_proofs_received: managed.channel_proofs_received,
3052 }
3053 })
3054 .collect()
3055 }
3056
3057 pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
3059 let mut entries = Vec::new();
3060 for (link_id, managed) in &self.links {
3061 let resource_sdu = Self::resource_sdu_for_link(managed);
3062 for split in managed.incoming_splits.values() {
3063 let (received, total) = Self::incoming_split_progress(split, resource_sdu);
3064 entries.push(crate::event::ResourceInfoEntry {
3065 link_id: *link_id,
3066 direction: "incoming".to_string(),
3067 total_parts: total,
3068 transferred_parts: received,
3069 complete: received >= total && total > 0,
3070 });
3071 }
3072 for recv in &managed.incoming_resources {
3073 if recv.flags.split {
3074 continue;
3075 }
3076 let (received, total) = recv.progress();
3077 entries.push(crate::event::ResourceInfoEntry {
3078 link_id: *link_id,
3079 direction: "incoming".to_string(),
3080 total_parts: total,
3081 transferred_parts: received,
3082 complete: received >= total && total > 0,
3083 });
3084 }
3085 for split in managed.outgoing_splits.values() {
3086 let (sent, total) = Self::outgoing_split_progress(split, resource_sdu);
3087 entries.push(crate::event::ResourceInfoEntry {
3088 link_id: *link_id,
3089 direction: "outgoing".to_string(),
3090 total_parts: total,
3091 transferred_parts: sent,
3092 complete: sent >= total && total > 0,
3093 });
3094 }
3095 for send in &managed.outgoing_resources {
3096 if send.flags.split {
3097 continue;
3098 }
3099 let total = send.total_parts();
3100 let sent = send.sent_parts;
3101 entries.push(crate::event::ResourceInfoEntry {
3102 link_id: *link_id,
3103 direction: "outgoing".to_string(),
3104 total_parts: total,
3105 transferred_parts: sent,
3106 complete: sent >= total && total > 0,
3107 });
3108 }
3109 }
3110 entries
3111 }
3112
3113 fn process_link_actions(
3115 &self,
3116 link_id: &LinkId,
3117 actions: &[LinkAction],
3118 ) -> Vec<LinkManagerAction> {
3119 let mut result = Vec::new();
3120 for action in actions {
3121 match action {
3122 LinkAction::StateChanged {
3123 new_state, reason, ..
3124 } => match new_state {
3125 LinkState::Closed => {
3126 result.push(LinkManagerAction::LinkClosed {
3127 link_id: *link_id,
3128 reason: *reason,
3129 });
3130 }
3131 _ => {}
3132 },
3133 LinkAction::LinkEstablished {
3134 rtt, is_initiator, ..
3135 } => {
3136 let dest_hash = self
3137 .links
3138 .get(link_id)
3139 .map(|l| l.dest_hash)
3140 .unwrap_or([0u8; 16]);
3141 result.push(LinkManagerAction::LinkEstablished {
3142 link_id: *link_id,
3143 dest_hash,
3144 rtt: *rtt,
3145 is_initiator: *is_initiator,
3146 });
3147 }
3148 LinkAction::RemoteIdentified {
3149 identity_hash,
3150 public_key,
3151 ..
3152 } => {
3153 result.push(LinkManagerAction::RemoteIdentified {
3154 link_id: *link_id,
3155 identity_hash: *identity_hash,
3156 public_key: *public_key,
3157 });
3158 }
3159 LinkAction::DataReceived { .. } => {
3160 }
3162 }
3163 }
3164 result
3165 }
3166
3167 fn process_channel_actions(
3169 &mut self,
3170 link_id: &LinkId,
3171 actions: Vec<rns_core::channel::ChannelAction>,
3172 rng: &mut dyn Rng,
3173 ) -> Vec<LinkManagerAction> {
3174 let mut result = Vec::new();
3175 for action in actions {
3176 match action {
3177 rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
3178 let encrypted = match self.links.get(link_id) {
3180 Some(link) => match link.engine.encrypt(&raw, rng) {
3181 Ok(encrypted) => encrypted,
3182 Err(_) => continue,
3183 },
3184 None => continue,
3185 };
3186 let flags = PacketFlags {
3187 header_type: constants::HEADER_1,
3188 context_flag: constants::FLAG_UNSET,
3189 transport_type: constants::TRANSPORT_BROADCAST,
3190 destination_type: constants::DESTINATION_LINK,
3191 packet_type: constants::PACKET_TYPE_DATA,
3192 };
3193 if let Ok((raw_bytes, packet_hash)) = RawPacket::pack_raw_with_hash(
3194 flags,
3195 0,
3196 link_id,
3197 None,
3198 constants::CONTEXT_CHANNEL,
3199 &encrypted,
3200 ) {
3201 if let Some(link_mut) = self.links.get_mut(link_id) {
3202 link_mut
3203 .pending_channel_packets
3204 .insert(packet_hash, sequence);
3205 }
3206 result.push(LinkManagerAction::SendPacket {
3207 raw: raw_bytes,
3208 dest_type: constants::DESTINATION_LINK,
3209 attached_interface: None,
3210 });
3211 }
3212 }
3213 rns_core::channel::ChannelAction::MessageReceived {
3214 msgtype, payload, ..
3215 } => {
3216 result.push(LinkManagerAction::ChannelMessageReceived {
3217 link_id: *link_id,
3218 msgtype,
3219 payload,
3220 });
3221 }
3222 rns_core::channel::ChannelAction::TeardownLink => {
3223 result.push(LinkManagerAction::LinkClosed {
3224 link_id: *link_id,
3225 reason: Some(TeardownReason::Timeout),
3226 });
3227 }
3228 }
3229 }
3230 result
3231 }
3232}
3233
3234impl Default for LinkManager {
3235 fn default() -> Self {
3236 Self::new()
3237 }
3238}
3239
3240fn compute_path_hash(path: &str) -> [u8; 16] {
3243 let full = rns_core::hash::full_hash(path.as_bytes());
3244 let mut result = [0u8; 16];
3245 result.copy_from_slice(&full[..16]);
3246 result
3247}
3248
3249#[cfg(test)]
3250mod tests {
3251 use super::*;
3252 use rns_crypto::identity::Identity;
3253 use rns_crypto::{FixedRng, OsRng};
3254
3255 fn make_rng(seed: u8) -> FixedRng {
3256 FixedRng::new(&[seed; 128])
3257 }
3258
3259 fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
3260 let sig_prv = Ed25519PrivateKey::generate(rng);
3261 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3262 (sig_prv, sig_pub_bytes)
3263 }
3264
3265 #[test]
3266 fn test_register_link_destination() {
3267 let mut mgr = LinkManager::new();
3268 let mut rng = make_rng(0x01);
3269 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3270 let dest_hash = [0xDD; 16];
3271
3272 mgr.register_link_destination(
3273 dest_hash,
3274 sig_prv,
3275 sig_pub_bytes,
3276 ResourceStrategy::AcceptNone,
3277 );
3278 assert!(mgr.is_link_destination(&dest_hash));
3279
3280 mgr.deregister_link_destination(&dest_hash);
3281 assert!(!mgr.is_link_destination(&dest_hash));
3282 }
3283
3284 #[test]
3285 fn test_create_link() {
3286 let mut mgr = LinkManager::new();
3287 let mut rng = OsRng;
3288 let dest_hash = [0xDD; 16];
3289
3290 let sig_pub_bytes = [0xAA; 32]; let (link_id, actions) = mgr.create_link(
3292 &dest_hash,
3293 &sig_pub_bytes,
3294 1,
3295 constants::MTU as u32,
3296 &mut rng,
3297 );
3298 assert_ne!(link_id, [0u8; 16]);
3299 assert_eq!(actions.len(), 2);
3301 assert!(matches!(
3302 actions[0],
3303 LinkManagerAction::RegisterLinkDest { .. }
3304 ));
3305 assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
3306
3307 assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
3309 }
3310
3311 #[test]
3312 fn test_full_handshake_via_manager() {
3313 let mut rng = OsRng;
3314 let dest_hash = [0xDD; 16];
3315
3316 let mut responder_mgr = LinkManager::new();
3318 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3319 responder_mgr.register_link_destination(
3320 dest_hash,
3321 sig_prv,
3322 sig_pub_bytes,
3323 ResourceStrategy::AcceptNone,
3324 );
3325
3326 let mut initiator_mgr = LinkManager::new();
3328
3329 let (link_id, init_actions) = initiator_mgr.create_link(
3331 &dest_hash,
3332 &sig_pub_bytes,
3333 1,
3334 constants::MTU as u32,
3335 &mut rng,
3336 );
3337 assert_eq!(init_actions.len(), 2);
3338
3339 let linkrequest_raw = match &init_actions[1] {
3341 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3342 _ => panic!("Expected SendPacket"),
3343 };
3344
3345 let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
3347
3348 let resp_actions = responder_mgr.handle_local_delivery(
3350 lr_packet.destination_hash,
3351 &linkrequest_raw,
3352 lr_packet.packet_hash,
3353 rns_core::transport::types::InterfaceId(0),
3354 &mut rng,
3355 );
3356 assert!(resp_actions.len() >= 2);
3358 assert!(matches!(
3359 resp_actions[0],
3360 LinkManagerAction::RegisterLinkDest { .. }
3361 ));
3362
3363 let lrproof_raw = match &resp_actions[1] {
3365 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3366 _ => panic!("Expected SendPacket for LRPROOF"),
3367 };
3368
3369 let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
3371 let init_actions2 = initiator_mgr.handle_local_delivery(
3372 lrproof_packet.destination_hash,
3373 &lrproof_raw,
3374 lrproof_packet.packet_hash,
3375 rns_core::transport::types::InterfaceId(0),
3376 &mut rng,
3377 );
3378
3379 let has_established = init_actions2
3381 .iter()
3382 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3383 assert!(has_established, "Initiator should emit LinkEstablished");
3384
3385 let lrrtt_raw = init_actions2
3387 .iter()
3388 .find_map(|a| match a {
3389 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3390 _ => None,
3391 })
3392 .expect("Should have LRRTT SendPacket");
3393
3394 let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
3396 let resp_link_id = lrrtt_packet.destination_hash;
3397 let resp_actions2 = responder_mgr.handle_local_delivery(
3398 resp_link_id,
3399 &lrrtt_raw,
3400 lrrtt_packet.packet_hash,
3401 rns_core::transport::types::InterfaceId(0),
3402 &mut rng,
3403 );
3404
3405 let has_established = resp_actions2
3406 .iter()
3407 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
3408 assert!(has_established, "Responder should emit LinkEstablished");
3409
3410 assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
3412 assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
3413
3414 assert!(initiator_mgr.link_rtt(&link_id).is_some());
3416 assert!(responder_mgr.link_rtt(&link_id).is_some());
3417 }
3418
3419 #[test]
3420 fn test_encrypted_data_exchange() {
3421 let mut rng = OsRng;
3422 let dest_hash = [0xDD; 16];
3423 let mut resp_mgr = LinkManager::new();
3424 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3425 resp_mgr.register_link_destination(
3426 dest_hash,
3427 sig_prv,
3428 sig_pub_bytes,
3429 ResourceStrategy::AcceptNone,
3430 );
3431 let mut init_mgr = LinkManager::new();
3432
3433 let (link_id, init_actions) = init_mgr.create_link(
3435 &dest_hash,
3436 &sig_pub_bytes,
3437 1,
3438 constants::MTU as u32,
3439 &mut rng,
3440 );
3441 let lr_raw = extract_send_packet(&init_actions);
3442 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3443 let resp_actions = resp_mgr.handle_local_delivery(
3444 lr_pkt.destination_hash,
3445 &lr_raw,
3446 lr_pkt.packet_hash,
3447 rns_core::transport::types::InterfaceId(0),
3448 &mut rng,
3449 );
3450 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3451 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3452 let init_actions2 = init_mgr.handle_local_delivery(
3453 lrproof_pkt.destination_hash,
3454 &lrproof_raw,
3455 lrproof_pkt.packet_hash,
3456 rns_core::transport::types::InterfaceId(0),
3457 &mut rng,
3458 );
3459 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3460 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3461 resp_mgr.handle_local_delivery(
3462 lrrtt_pkt.destination_hash,
3463 &lrrtt_raw,
3464 lrrtt_pkt.packet_hash,
3465 rns_core::transport::types::InterfaceId(0),
3466 &mut rng,
3467 );
3468
3469 let actions =
3471 init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
3472 assert_eq!(actions.len(), 1);
3473 assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
3474 }
3475
3476 #[test]
3477 fn test_request_response() {
3478 let mut rng = OsRng;
3479 let dest_hash = [0xDD; 16];
3480 let mut resp_mgr = LinkManager::new();
3481 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3482 resp_mgr.register_link_destination(
3483 dest_hash,
3484 sig_prv,
3485 sig_pub_bytes,
3486 ResourceStrategy::AcceptNone,
3487 );
3488
3489 resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
3491 Some(b"OK".to_vec())
3492 });
3493
3494 let mut init_mgr = LinkManager::new();
3495
3496 let (link_id, init_actions) = init_mgr.create_link(
3498 &dest_hash,
3499 &sig_pub_bytes,
3500 1,
3501 constants::MTU as u32,
3502 &mut rng,
3503 );
3504 let lr_raw = extract_send_packet(&init_actions);
3505 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3506 let resp_actions = resp_mgr.handle_local_delivery(
3507 lr_pkt.destination_hash,
3508 &lr_raw,
3509 lr_pkt.packet_hash,
3510 rns_core::transport::types::InterfaceId(0),
3511 &mut rng,
3512 );
3513 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3514 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3515 let init_actions2 = init_mgr.handle_local_delivery(
3516 lrproof_pkt.destination_hash,
3517 &lrproof_raw,
3518 lrproof_pkt.packet_hash,
3519 rns_core::transport::types::InterfaceId(0),
3520 &mut rng,
3521 );
3522 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3523 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3524 resp_mgr.handle_local_delivery(
3525 lrrtt_pkt.destination_hash,
3526 &lrrtt_raw,
3527 lrrtt_pkt.packet_hash,
3528 rns_core::transport::types::InterfaceId(0),
3529 &mut rng,
3530 );
3531
3532 let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
3534 assert_eq!(req_actions.len(), 1);
3535
3536 let req_raw = extract_send_packet_from(&req_actions);
3538 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3539 let resp_actions = resp_mgr.handle_local_delivery(
3540 req_pkt.destination_hash,
3541 &req_raw,
3542 req_pkt.packet_hash,
3543 rns_core::transport::types::InterfaceId(0),
3544 &mut rng,
3545 );
3546
3547 let has_response = resp_actions
3549 .iter()
3550 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3551 assert!(has_response, "Handler should produce a response packet");
3552 }
3553
3554 #[test]
3555 fn test_send_request_wraps_invalid_msgpack_data_as_bin() {
3556 use std::sync::{Arc, Mutex};
3557
3558 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3559 let mut rng = OsRng;
3560
3561 let invalid = vec![0xC1];
3562 let expected = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid.clone()));
3563 let captured = Arc::new(Mutex::new(None::<Vec<u8>>));
3564 let captured_for_handler = Arc::clone(&captured);
3565
3566 resp_mgr.register_request_handler("/bin", None, move |_link_id, _path, data, _remote| {
3567 *captured_for_handler.lock().unwrap() = Some(data.to_vec());
3568 Some(rns_core::msgpack::pack(&rns_core::msgpack::Value::Bool(
3569 true,
3570 )))
3571 });
3572
3573 let req_actions = init_mgr.send_request(&link_id, "/bin", &invalid, &mut rng);
3574 let req_raw = extract_send_packet_from(&req_actions);
3575 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3576 let resp_actions = resp_mgr.handle_local_delivery(
3577 req_pkt.destination_hash,
3578 &req_raw,
3579 req_pkt.packet_hash,
3580 rns_core::transport::types::InterfaceId(0),
3581 &mut rng,
3582 );
3583
3584 assert!(
3585 resp_actions
3586 .iter()
3587 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. })),
3588 "handler should still produce a response"
3589 );
3590 assert_eq!(*captured.lock().unwrap(), Some(expected));
3591 }
3592
3593 #[test]
3594 fn test_invalid_response_bytes_are_returned_as_msgpack_bin() {
3595 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3596 let mut rng = OsRng;
3597 let invalid_response = vec![0xC1];
3598 let expected =
3599 rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(invalid_response.clone()));
3600
3601 resp_mgr.register_request_handler("/invalid-response", None, {
3602 let invalid_response = invalid_response.clone();
3603 move |_link_id, _path, _data, _remote| Some(invalid_response.clone())
3604 });
3605
3606 let req_actions = init_mgr.send_request(&link_id, "/invalid-response", b"\xc0", &mut rng);
3607 let req_raw = extract_send_packet_from(&req_actions);
3608 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3609 let resp_actions = resp_mgr.handle_local_delivery(
3610 req_pkt.destination_hash,
3611 &req_raw,
3612 req_pkt.packet_hash,
3613 rns_core::transport::types::InterfaceId(0),
3614 &mut rng,
3615 );
3616
3617 let resp_raw = extract_any_send_packet(&resp_actions);
3618 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3619 let init_actions = init_mgr.handle_local_delivery(
3620 resp_pkt.destination_hash,
3621 &resp_raw,
3622 resp_pkt.packet_hash,
3623 rns_core::transport::types::InterfaceId(0),
3624 &mut rng,
3625 );
3626
3627 let response_data = init_actions
3628 .iter()
3629 .find_map(|action| match action {
3630 LinkManagerAction::ResponseReceived { data, .. } => Some(data.clone()),
3631 _ => None,
3632 })
3633 .expect("initiator should receive a response");
3634 assert_eq!(response_data, expected);
3635 }
3636
3637 #[test]
3638 fn test_request_acl_deny_unidentified() {
3639 let mut rng = OsRng;
3640 let dest_hash = [0xDD; 16];
3641 let mut resp_mgr = LinkManager::new();
3642 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3643 resp_mgr.register_link_destination(
3644 dest_hash,
3645 sig_prv,
3646 sig_pub_bytes,
3647 ResourceStrategy::AcceptNone,
3648 );
3649
3650 resp_mgr.register_request_handler(
3652 "/restricted",
3653 Some(vec![[0xAA; 16]]),
3654 |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
3655 );
3656
3657 let mut init_mgr = LinkManager::new();
3658
3659 let (link_id, init_actions) = init_mgr.create_link(
3661 &dest_hash,
3662 &sig_pub_bytes,
3663 1,
3664 constants::MTU as u32,
3665 &mut rng,
3666 );
3667 let lr_raw = extract_send_packet(&init_actions);
3668 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3669 let resp_actions = resp_mgr.handle_local_delivery(
3670 lr_pkt.destination_hash,
3671 &lr_raw,
3672 lr_pkt.packet_hash,
3673 rns_core::transport::types::InterfaceId(0),
3674 &mut rng,
3675 );
3676 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3677 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3678 let init_actions2 = init_mgr.handle_local_delivery(
3679 lrproof_pkt.destination_hash,
3680 &lrproof_raw,
3681 lrproof_pkt.packet_hash,
3682 rns_core::transport::types::InterfaceId(0),
3683 &mut rng,
3684 );
3685 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3686 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3687 resp_mgr.handle_local_delivery(
3688 lrrtt_pkt.destination_hash,
3689 &lrrtt_raw,
3690 lrrtt_pkt.packet_hash,
3691 rns_core::transport::types::InterfaceId(0),
3692 &mut rng,
3693 );
3694
3695 let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
3697 let req_raw = extract_send_packet_from(&req_actions);
3698 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3699 let resp_actions = resp_mgr.handle_local_delivery(
3700 req_pkt.destination_hash,
3701 &req_raw,
3702 req_pkt.packet_hash,
3703 rns_core::transport::types::InterfaceId(0),
3704 &mut rng,
3705 );
3706
3707 let has_response = resp_actions
3709 .iter()
3710 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3711 assert!(!has_response, "Unidentified peer should be denied");
3712 }
3713
3714 #[test]
3715 fn test_teardown_link() {
3716 let mut rng = OsRng;
3717 let dest_hash = [0xDD; 16];
3718 let mut mgr = LinkManager::new();
3719
3720 let dummy_sig = [0xAA; 32];
3721 let (link_id, _) =
3722 mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3723 assert_eq!(mgr.link_count(), 1);
3724
3725 let actions = mgr.teardown_link(&link_id);
3726 let has_close = actions
3727 .iter()
3728 .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3729 assert!(has_close);
3730
3731 let tick_actions = mgr.tick(&mut rng);
3733 let has_deregister = tick_actions
3734 .iter()
3735 .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3736 assert!(has_deregister);
3737 assert_eq!(mgr.link_count(), 0);
3738 }
3739
3740 #[test]
3741 fn test_identify_on_link() {
3742 let mut rng = OsRng;
3743 let dest_hash = [0xDD; 16];
3744 let mut resp_mgr = LinkManager::new();
3745 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3746 resp_mgr.register_link_destination(
3747 dest_hash,
3748 sig_prv,
3749 sig_pub_bytes,
3750 ResourceStrategy::AcceptNone,
3751 );
3752 let mut init_mgr = LinkManager::new();
3753
3754 let (link_id, init_actions) = init_mgr.create_link(
3756 &dest_hash,
3757 &sig_pub_bytes,
3758 1,
3759 constants::MTU as u32,
3760 &mut rng,
3761 );
3762 let lr_raw = extract_send_packet(&init_actions);
3763 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3764 let resp_actions = resp_mgr.handle_local_delivery(
3765 lr_pkt.destination_hash,
3766 &lr_raw,
3767 lr_pkt.packet_hash,
3768 rns_core::transport::types::InterfaceId(0),
3769 &mut rng,
3770 );
3771 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3772 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3773 let init_actions2 = init_mgr.handle_local_delivery(
3774 lrproof_pkt.destination_hash,
3775 &lrproof_raw,
3776 lrproof_pkt.packet_hash,
3777 rns_core::transport::types::InterfaceId(0),
3778 &mut rng,
3779 );
3780 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3781 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3782 resp_mgr.handle_local_delivery(
3783 lrrtt_pkt.destination_hash,
3784 &lrrtt_raw,
3785 lrrtt_pkt.packet_hash,
3786 rns_core::transport::types::InterfaceId(0),
3787 &mut rng,
3788 );
3789
3790 let identity = Identity::new(&mut rng);
3792 let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3793 assert_eq!(id_actions.len(), 1);
3794
3795 let id_raw = extract_send_packet_from(&id_actions);
3797 let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3798 let resp_actions = resp_mgr.handle_local_delivery(
3799 id_pkt.destination_hash,
3800 &id_raw,
3801 id_pkt.packet_hash,
3802 rns_core::transport::types::InterfaceId(0),
3803 &mut rng,
3804 );
3805
3806 let has_identified = resp_actions
3807 .iter()
3808 .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3809 assert!(has_identified, "Responder should emit RemoteIdentified");
3810 }
3811
3812 #[test]
3813 fn test_path_hash_computation() {
3814 let h1 = compute_path_hash("/status");
3815 let h2 = compute_path_hash("/path");
3816 assert_ne!(h1, h2);
3817
3818 assert_eq!(h1, compute_path_hash("/status"));
3820 }
3821
3822 #[test]
3823 fn test_link_count() {
3824 let mut mgr = LinkManager::new();
3825 let mut rng = OsRng;
3826
3827 assert_eq!(mgr.link_count(), 0);
3828
3829 let dummy_sig = [0xAA; 32];
3830 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3831 assert_eq!(mgr.link_count(), 1);
3832
3833 mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3834 assert_eq!(mgr.link_count(), 2);
3835 }
3836
3837 fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3840 extract_send_packet_at(actions, actions.len() - 1)
3841 }
3842
3843 fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3844 match &actions[idx] {
3845 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3846 other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3847 }
3848 }
3849
3850 fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3851 actions
3852 .iter()
3853 .find_map(|a| match a {
3854 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3855 _ => None,
3856 })
3857 .expect("Expected at least one SendPacket action")
3858 }
3859
3860 fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3861 extract_any_send_packet(actions)
3862 }
3863
3864 fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3867 let mut rng = OsRng;
3868 let dest_hash = [0xDD; 16];
3869 let mut resp_mgr = LinkManager::new();
3870 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3871 resp_mgr.register_link_destination(
3872 dest_hash,
3873 sig_prv,
3874 sig_pub_bytes,
3875 ResourceStrategy::AcceptNone,
3876 );
3877 let mut init_mgr = LinkManager::new();
3878
3879 let (link_id, init_actions) = init_mgr.create_link(
3880 &dest_hash,
3881 &sig_pub_bytes,
3882 1,
3883 constants::MTU as u32,
3884 &mut rng,
3885 );
3886 let lr_raw = extract_send_packet(&init_actions);
3887 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3888 let resp_actions = resp_mgr.handle_local_delivery(
3889 lr_pkt.destination_hash,
3890 &lr_raw,
3891 lr_pkt.packet_hash,
3892 rns_core::transport::types::InterfaceId(0),
3893 &mut rng,
3894 );
3895 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3896 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3897 let init_actions2 = init_mgr.handle_local_delivery(
3898 lrproof_pkt.destination_hash,
3899 &lrproof_raw,
3900 lrproof_pkt.packet_hash,
3901 rns_core::transport::types::InterfaceId(0),
3902 &mut rng,
3903 );
3904 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3905 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3906 resp_mgr.handle_local_delivery(
3907 lrrtt_pkt.destination_hash,
3908 &lrrtt_raw,
3909 lrrtt_pkt.packet_hash,
3910 rns_core::transport::types::InterfaceId(0),
3911 &mut rng,
3912 );
3913
3914 assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3915 assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3916
3917 (init_mgr, resp_mgr, link_id)
3918 }
3919
3920 #[test]
3925 fn test_resource_strategy_default() {
3926 let mut mgr = LinkManager::new();
3927 let mut rng = OsRng;
3928 let dummy_sig = [0xAA; 32];
3929 let (link_id, _) =
3930 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3931
3932 let link = mgr.links.get(&link_id).unwrap();
3934 assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3935 }
3936
3937 #[test]
3938 fn test_set_resource_strategy() {
3939 let mut mgr = LinkManager::new();
3940 let mut rng = OsRng;
3941 let dummy_sig = [0xAA; 32];
3942 let (link_id, _) =
3943 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3944
3945 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3946 assert_eq!(
3947 mgr.links.get(&link_id).unwrap().resource_strategy,
3948 ResourceStrategy::AcceptAll
3949 );
3950
3951 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3952 assert_eq!(
3953 mgr.links.get(&link_id).unwrap().resource_strategy,
3954 ResourceStrategy::AcceptApp
3955 );
3956 }
3957
3958 #[test]
3959 fn test_send_resource_on_active_link() {
3960 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3961 let mut rng = OsRng;
3962
3963 let data = vec![0xAB; 100]; let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3966
3967 let has_send = actions
3969 .iter()
3970 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3971 assert!(
3972 has_send,
3973 "send_resource should emit advertisement SendPacket"
3974 );
3975 }
3976
3977 fn first_resource_advertisement(
3978 mgr: &LinkManager,
3979 link_id: &LinkId,
3980 actions: &[LinkManagerAction],
3981 ) -> rns_core::resource::ResourceAdvertisement {
3982 let adv_raw = actions
3983 .iter()
3984 .find_map(|action| match action {
3985 LinkManagerAction::SendPacket { raw, .. } => {
3986 let pkt = RawPacket::unpack(raw).ok()?;
3987 (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw)
3988 }
3989 _ => None,
3990 })
3991 .expect("sender should emit a resource advertisement");
3992 let adv_pkt = RawPacket::unpack(adv_raw).unwrap();
3993 let plaintext = mgr
3994 .links
3995 .get(link_id)
3996 .unwrap()
3997 .engine
3998 .decrypt(&adv_pkt.data)
3999 .unwrap();
4000 rns_core::resource::ResourceAdvertisement::unpack(&plaintext).unwrap()
4001 }
4002
4003 fn deterministic_bytes(len: usize) -> Vec<u8> {
4004 let mut state = 0x1234_5678u32;
4005 (0..len)
4006 .map(|_| {
4007 state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4008 (state >> 16) as u8
4009 })
4010 .collect()
4011 }
4012
4013 fn drive_link_manager_packets(
4014 init_mgr: &mut LinkManager,
4015 resp_mgr: &mut LinkManager,
4016 initial_actions: Vec<LinkManagerAction>,
4017 initial_source: char,
4018 rng: &mut dyn Rng,
4019 max_rounds: usize,
4020 ) -> (
4021 Option<Vec<u8>>,
4022 bool,
4023 Vec<(char, usize, usize)>,
4024 Vec<(char, String)>,
4025 usize,
4026 ) {
4027 let mut pending: Vec<(char, LinkManagerAction)> = initial_actions
4028 .into_iter()
4029 .map(|a| (initial_source, a))
4030 .collect();
4031 let mut rounds = 0;
4032 let mut received_data = None;
4033 let mut sender_completed = false;
4034 let mut progress = Vec::new();
4035 let mut failures = Vec::new();
4036
4037 while !pending.is_empty() && rounds < max_rounds {
4038 rounds += 1;
4039 let mut next = Vec::new();
4040 for (source, action) in pending.drain(..) {
4041 let LinkManagerAction::SendPacket { raw, .. } = action else {
4042 continue;
4043 };
4044 let pkt = RawPacket::unpack(&raw).unwrap();
4045 let target_actions = if source == 'i' {
4046 resp_mgr.handle_local_delivery(
4047 pkt.destination_hash,
4048 &raw,
4049 pkt.packet_hash,
4050 rns_core::transport::types::InterfaceId(0),
4051 rng,
4052 )
4053 } else {
4054 init_mgr.handle_local_delivery(
4055 pkt.destination_hash,
4056 &raw,
4057 pkt.packet_hash,
4058 rns_core::transport::types::InterfaceId(0),
4059 rng,
4060 )
4061 };
4062 let target_source = if source == 'i' { 'r' } else { 'i' };
4063 for action in &target_actions {
4064 match action {
4065 LinkManagerAction::ResourceReceived { data, .. } => {
4066 received_data = Some(data.clone());
4067 }
4068 LinkManagerAction::ResourceCompleted { .. } => {
4069 sender_completed = true;
4070 }
4071 LinkManagerAction::ResourceProgress {
4072 received, total, ..
4073 } => {
4074 progress.push((target_source, *received, *total));
4075 }
4076 LinkManagerAction::ResourceFailed { error, .. } => {
4077 failures.push((target_source, error.clone()));
4078 }
4079 _ => {}
4080 }
4081 }
4082 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4083 }
4084 pending = next;
4085 }
4086
4087 (received_data, sender_completed, progress, failures, rounds)
4088 }
4089
4090 #[test]
4091 fn test_send_resource_auto_compress_option_controls_adv_flag() {
4092 let data = vec![0x41; 2048];
4093
4094 let (mut compressed_mgr, _resp_mgr, link_id) = setup_active_link();
4095 let mut rng = OsRng;
4096 let actions =
4097 compressed_mgr.send_resource_with_auto_compress(&link_id, &data, None, true, &mut rng);
4098 let adv = first_resource_advertisement(&compressed_mgr, &link_id, &actions);
4099 assert!(
4100 adv.flags.compressed,
4101 "compressible resource should compress"
4102 );
4103
4104 let (mut plain_mgr, _resp_mgr, link_id) = setup_active_link();
4105 let mut rng = OsRng;
4106 let actions =
4107 plain_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4108 let adv = first_resource_advertisement(&plain_mgr, &link_id, &actions);
4109 assert!(
4110 !adv.flags.compressed,
4111 "auto_compress=false should keep resource uncompressed"
4112 );
4113 }
4114
4115 #[test]
4116 fn test_send_resource_on_inactive_link() {
4117 let mut mgr = LinkManager::new();
4118 let mut rng = OsRng;
4119 let dummy_sig = [0xAA; 32];
4120 let (link_id, _) =
4121 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
4122
4123 let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
4125 assert!(actions.is_empty(), "Cannot send resource on inactive link");
4126 }
4127
4128 #[test]
4129 fn test_send_resource_without_session_key_uses_encrypt_fallback_path() {
4130 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4131 let mut rng = OsRng;
4132 init_mgr
4133 .links
4134 .get_mut(&link_id)
4135 .unwrap()
4136 .engine
4137 .clear_session_for_testing();
4138
4139 let actions = init_mgr.send_resource(&link_id, b"data", None, &mut rng);
4140
4141 assert!(
4142 actions.is_empty(),
4143 "without a session key, no advertisement should be emitted"
4144 );
4145 assert_eq!(
4146 init_mgr
4147 .links
4148 .get(&link_id)
4149 .map(|managed| managed.outgoing_resources.len()),
4150 Some(1)
4151 );
4152 }
4153
4154 #[test]
4155 fn test_resource_adv_rejected_by_accept_none() {
4156 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4157 let mut rng = OsRng;
4158
4159 let data = vec![0xCD; 100];
4162 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4163
4164 for action in &adv_actions {
4166 if let LinkManagerAction::SendPacket { raw, .. } = action {
4167 let pkt = RawPacket::unpack(raw).unwrap();
4168 let resp_actions = resp_mgr.handle_local_delivery(
4169 pkt.destination_hash,
4170 raw,
4171 pkt.packet_hash,
4172 rns_core::transport::types::InterfaceId(0),
4173 &mut rng,
4174 );
4175 let has_resource_received = resp_actions
4177 .iter()
4178 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4179 assert!(
4180 !has_resource_received,
4181 "AcceptNone should not accept resource"
4182 );
4183 }
4184 }
4185 }
4186
4187 #[test]
4188 fn test_resource_adv_accepted_by_accept_all() {
4189 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4190 let mut rng = OsRng;
4191
4192 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4194
4195 let data = vec![0xCD; 100];
4197 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4198
4199 for action in &adv_actions {
4201 if let LinkManagerAction::SendPacket { raw, .. } = action {
4202 let pkt = RawPacket::unpack(raw).unwrap();
4203 let resp_actions = resp_mgr.handle_local_delivery(
4204 pkt.destination_hash,
4205 raw,
4206 pkt.packet_hash,
4207 rns_core::transport::types::InterfaceId(0),
4208 &mut rng,
4209 );
4210 let has_send = resp_actions
4212 .iter()
4213 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4214 assert!(has_send, "AcceptAll should accept and request parts");
4215 }
4216 }
4217 }
4218
4219 #[test]
4220 fn test_resource_accept_app_query() {
4221 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4222 let mut rng = OsRng;
4223
4224 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4226
4227 let data = vec![0xCD; 100];
4229 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4230
4231 let mut got_query = false;
4233 for action in &adv_actions {
4234 if let LinkManagerAction::SendPacket { raw, .. } = action {
4235 let pkt = RawPacket::unpack(raw).unwrap();
4236 let resp_actions = resp_mgr.handle_local_delivery(
4237 pkt.destination_hash,
4238 raw,
4239 pkt.packet_hash,
4240 rns_core::transport::types::InterfaceId(0),
4241 &mut rng,
4242 );
4243 for a in &resp_actions {
4244 if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
4245 got_query = true;
4246 }
4247 }
4248 }
4249 }
4250 assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
4251 }
4252
4253 #[test]
4254 fn test_resource_accept_app_accept() {
4255 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4256 let mut rng = OsRng;
4257
4258 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4259
4260 let data = vec![0xCD; 100];
4261 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4262
4263 for action in &adv_actions {
4264 if let LinkManagerAction::SendPacket { raw, .. } = action {
4265 let pkt = RawPacket::unpack(raw).unwrap();
4266 let resp_actions = resp_mgr.handle_local_delivery(
4267 pkt.destination_hash,
4268 raw,
4269 pkt.packet_hash,
4270 rns_core::transport::types::InterfaceId(0),
4271 &mut rng,
4272 );
4273 for a in &resp_actions {
4274 if let LinkManagerAction::ResourceAcceptQuery {
4275 link_id: lid,
4276 resource_hash,
4277 ..
4278 } = a
4279 {
4280 let accept_actions =
4282 resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
4283 let has_send = accept_actions
4285 .iter()
4286 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
4287 assert!(
4288 has_send,
4289 "Accepting resource should produce request for parts"
4290 );
4291 }
4292 }
4293 }
4294 }
4295 }
4296
4297 #[test]
4298 fn test_resource_accept_app_reject() {
4299 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4300 let mut rng = OsRng;
4301
4302 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4303
4304 let data = vec![0xCD; 100];
4305 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4306
4307 for action in &adv_actions {
4308 if let LinkManagerAction::SendPacket { raw, .. } = action {
4309 let pkt = RawPacket::unpack(raw).unwrap();
4310 let resp_actions = resp_mgr.handle_local_delivery(
4311 pkt.destination_hash,
4312 raw,
4313 pkt.packet_hash,
4314 rns_core::transport::types::InterfaceId(0),
4315 &mut rng,
4316 );
4317 for a in &resp_actions {
4318 if let LinkManagerAction::ResourceAcceptQuery {
4319 link_id: lid,
4320 resource_hash,
4321 ..
4322 } = a
4323 {
4324 let reject_actions =
4326 resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
4327 let has_resource_received = reject_actions
4330 .iter()
4331 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
4332 assert!(!has_resource_received);
4333 }
4334 }
4335 }
4336 }
4337 }
4338
4339 #[test]
4340 fn test_resource_full_transfer() {
4341 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4342 let mut rng = OsRng;
4343
4344 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4346
4347 let original_data = b"Hello, Resource Transfer!".to_vec();
4349 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
4350
4351 let mut pending: Vec<(char, LinkManagerAction)> =
4354 adv_actions.into_iter().map(|a| ('i', a)).collect();
4355 let mut rounds = 0;
4356 let max_rounds = 50;
4357 let mut resource_received = false;
4358 let mut sender_completed = false;
4359
4360 while !pending.is_empty() && rounds < max_rounds {
4361 rounds += 1;
4362 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4363
4364 for (source, action) in pending.drain(..) {
4365 if let LinkManagerAction::SendPacket { raw, .. } = action {
4366 let pkt = RawPacket::unpack(&raw).unwrap();
4367
4368 let target_actions = if source == 'i' {
4370 resp_mgr.handle_local_delivery(
4371 pkt.destination_hash,
4372 &raw,
4373 pkt.packet_hash,
4374 rns_core::transport::types::InterfaceId(0),
4375 &mut rng,
4376 )
4377 } else {
4378 init_mgr.handle_local_delivery(
4379 pkt.destination_hash,
4380 &raw,
4381 pkt.packet_hash,
4382 rns_core::transport::types::InterfaceId(0),
4383 &mut rng,
4384 )
4385 };
4386
4387 let target_source = if source == 'i' { 'r' } else { 'i' };
4388 for a in &target_actions {
4389 match a {
4390 LinkManagerAction::ResourceReceived { data, .. } => {
4391 assert_eq!(*data, original_data);
4392 resource_received = true;
4393 }
4394 LinkManagerAction::ResourceCompleted { .. } => {
4395 sender_completed = true;
4396 }
4397 _ => {}
4398 }
4399 }
4400 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4401 }
4402 }
4403 pending = next;
4404 }
4405
4406 assert!(
4407 resource_received,
4408 "Responder should receive resource data (rounds={})",
4409 rounds
4410 );
4411 assert!(
4412 sender_completed,
4413 "Sender should get completion proof (rounds={})",
4414 rounds
4415 );
4416 }
4417
4418 #[test]
4419 fn test_split_resource_advertisement_and_progress_entries() {
4420 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4421 let mut rng = OsRng;
4422 let data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4423
4424 let actions =
4425 init_mgr.send_resource_with_auto_compress(&link_id, &data, None, false, &mut rng);
4426 let adv = first_resource_advertisement(&init_mgr, &link_id, &actions);
4427
4428 assert!(adv.flags.split);
4429 assert_eq!(adv.segment_index, 1);
4430 assert_eq!(adv.total_segments, 2);
4431 assert_eq!(adv.data_size, data.len() as u64);
4432
4433 let managed = init_mgr.links.get(&link_id).unwrap();
4434 assert_eq!(managed.outgoing_splits.len(), 1);
4435 assert_eq!(
4436 managed
4437 .outgoing_resources
4438 .iter()
4439 .filter(|sender| sender.flags.split)
4440 .count(),
4441 2
4442 );
4443
4444 let entries = init_mgr.resource_entries();
4445 assert_eq!(entries.len(), 1);
4446 assert_eq!(entries[0].direction, "outgoing");
4447 assert!(entries[0].total_parts > managed.outgoing_resources[0].total_parts());
4448 assert_eq!(entries[0].transferred_parts, 0);
4449 }
4450
4451 #[test]
4452 fn test_split_resource_full_transfer_and_monotonic_progress() {
4453 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4454 let mut rng = OsRng;
4455 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4456
4457 let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 2048);
4458 let initial_actions = init_mgr.send_resource_with_auto_compress(
4459 &link_id,
4460 &original_data,
4461 None,
4462 false,
4463 &mut rng,
4464 );
4465
4466 let (received_data, sender_completed, progress, failures, rounds) =
4467 drive_link_manager_packets(
4468 &mut init_mgr,
4469 &mut resp_mgr,
4470 initial_actions,
4471 'i',
4472 &mut rng,
4473 10_000,
4474 );
4475
4476 assert!(
4477 received_data.as_ref().is_some_and(|data| data == &original_data),
4478 "split transfer did not deliver payload in {rounds} rounds; sender_completed={sender_completed}; failures={failures:?}; last_progress={:?}; init_entries={:?}; resp_entries={:?}",
4479 progress.last(),
4480 init_mgr.resource_entries(),
4481 resp_mgr.resource_entries()
4482 );
4483 assert!(
4484 sender_completed,
4485 "sender did not complete in {rounds} rounds"
4486 );
4487 assert!(
4488 progress
4489 .iter()
4490 .any(|(_, received, total)| received == total),
4491 "expected final progress update"
4492 );
4493
4494 let mut init_last = 0;
4495 let mut resp_last = 0;
4496 for (side, received, total) in progress {
4497 assert!(received <= total);
4498 match side {
4499 'i' => {
4500 assert!(
4501 received >= init_last,
4502 "initiator progress regressed from {init_last} to {received}"
4503 );
4504 init_last = received;
4505 }
4506 'r' => {
4507 assert!(
4508 received >= resp_last,
4509 "responder progress regressed from {resp_last} to {received}"
4510 );
4511 resp_last = received;
4512 }
4513 _ => unreachable!(),
4514 }
4515 }
4516 }
4517
4518 #[test]
4519 fn test_split_resource_accept_app_queries_only_first_segment() {
4520 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4521 let mut rng = OsRng;
4522 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
4523
4524 let original_data = deterministic_bytes(constants::RESOURCE_MAX_EFFICIENT_SIZE + 1024);
4525 let adv_actions = init_mgr.send_resource_with_auto_compress(
4526 &link_id,
4527 &original_data,
4528 None,
4529 false,
4530 &mut rng,
4531 );
4532 let adv_raw = extract_any_send_packet(&adv_actions);
4533 let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4534 let query_actions = resp_mgr.handle_local_delivery(
4535 adv_pkt.destination_hash,
4536 &adv_raw,
4537 adv_pkt.packet_hash,
4538 rns_core::transport::types::InterfaceId(0),
4539 &mut rng,
4540 );
4541
4542 let queries: Vec<_> = query_actions
4543 .iter()
4544 .filter_map(|action| match action {
4545 LinkManagerAction::ResourceAcceptQuery { resource_hash, .. } => {
4546 Some(resource_hash.clone())
4547 }
4548 _ => None,
4549 })
4550 .collect();
4551 assert_eq!(queries.len(), 1);
4552
4553 let accept_actions = resp_mgr.accept_resource(&link_id, &queries[0], true, &mut rng);
4554 let (received_data, sender_completed, _progress, failures, rounds) =
4555 drive_link_manager_packets(
4556 &mut init_mgr,
4557 &mut resp_mgr,
4558 accept_actions,
4559 'r',
4560 &mut rng,
4561 10_000,
4562 );
4563
4564 assert!(
4565 failures.is_empty(),
4566 "split AcceptApp transfer failed: {failures:?}"
4567 );
4568 assert!(
4569 received_data
4570 .as_ref()
4571 .is_some_and(|data| data == &original_data),
4572 "split AcceptApp transfer did not deliver in {rounds} rounds"
4573 );
4574 assert!(sender_completed);
4575 }
4576
4577 #[test]
4578 fn test_resource_cancel_icl() {
4579 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4580 let mut rng = OsRng;
4581
4582 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4583
4584 let data = vec![0xAB; 2000];
4586 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4587
4588 for action in &adv_actions {
4590 if let LinkManagerAction::SendPacket { raw, .. } = action {
4591 let pkt = RawPacket::unpack(raw).unwrap();
4592 resp_mgr.handle_local_delivery(
4593 pkt.destination_hash,
4594 raw,
4595 pkt.packet_hash,
4596 rns_core::transport::types::InterfaceId(0),
4597 &mut rng,
4598 );
4599 }
4600 }
4601
4602 assert!(!resp_mgr
4604 .links
4605 .get(&link_id)
4606 .unwrap()
4607 .incoming_resources
4608 .is_empty());
4609
4610 let icl_actions = resp_mgr.handle_resource_icl(&link_id);
4612
4613 let has_failed = icl_actions
4615 .iter()
4616 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4617 assert!(has_failed, "ICL should produce ResourceFailed");
4618 }
4619
4620 #[test]
4621 fn test_resource_cancel_rcl() {
4622 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4623 let mut rng = OsRng;
4624
4625 let data = vec![0xAB; 2000];
4627 init_mgr.send_resource(&link_id, &data, None, &mut rng);
4628
4629 assert!(!init_mgr
4631 .links
4632 .get(&link_id)
4633 .unwrap()
4634 .outgoing_resources
4635 .is_empty());
4636
4637 let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
4639
4640 let has_failed = rcl_actions
4641 .iter()
4642 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
4643 assert!(has_failed, "RCL should produce ResourceFailed");
4644 }
4645
4646 #[test]
4647 fn test_cancel_all_resources_clears_active_transfers() {
4648 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4649 let mut rng = OsRng;
4650
4651 let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
4652 assert!(!actions.is_empty());
4653 assert_eq!(init_mgr.resource_transfer_count(), 1);
4654
4655 let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
4656
4657 assert_eq!(init_mgr.resource_transfer_count(), 0);
4658 assert!(cancel_actions
4659 .iter()
4660 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
4661 }
4662
4663 #[test]
4664 fn test_resource_tick_cleans_up() {
4665 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4666 let mut rng = OsRng;
4667
4668 let data = vec![0xAB; 100];
4669 init_mgr.send_resource(&link_id, &data, None, &mut rng);
4670
4671 assert!(!init_mgr
4672 .links
4673 .get(&link_id)
4674 .unwrap()
4675 .outgoing_resources
4676 .is_empty());
4677
4678 init_mgr.handle_resource_rcl(&link_id);
4680
4681 init_mgr.tick(&mut rng);
4683
4684 assert!(
4685 init_mgr
4686 .links
4687 .get(&link_id)
4688 .unwrap()
4689 .outgoing_resources
4690 .is_empty(),
4691 "Tick should clean up completed/failed outgoing resources"
4692 );
4693 }
4694
4695 #[test]
4696 fn test_build_link_packet() {
4697 let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4698
4699 let actions =
4700 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
4701 assert_eq!(actions.len(), 1);
4702 if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
4703 let pkt = RawPacket::unpack(raw).unwrap();
4704 assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
4705 assert_eq!(*dest_type, constants::DESTINATION_LINK);
4706 } else {
4707 panic!("Expected SendPacket");
4708 }
4709 }
4710
4711 #[test]
4712 fn test_build_link_packet_returns_empty_when_mtu_too_small() {
4713 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4714 init_mgr.set_link_mtu(&link_id, 84);
4715
4716 let actions =
4717 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, &[0xAA; 200]);
4718 assert!(actions.is_empty(), "oversized packet should not be built");
4719 }
4720
4721 #[test]
4722 fn test_process_resource_actions_encrypted_variants_drop_on_encrypt_failure() {
4723 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4724 let mut rng = OsRng;
4725 init_mgr
4726 .links
4727 .get_mut(&link_id)
4728 .unwrap()
4729 .engine
4730 .clear_session_for_testing();
4731
4732 let cases = vec![
4733 ResourceAction::SendAdvertisement(vec![1, 2, 3]),
4734 ResourceAction::SendRequest(vec![4, 5, 6]),
4735 ResourceAction::SendHmu(vec![7, 8, 9]),
4736 ResourceAction::SendProof(vec![10, 11, 12]),
4737 ResourceAction::SendCancelInitiator(vec![13, 14, 15]),
4738 ResourceAction::SendCancelReceiver(vec![16, 17, 18]),
4739 ];
4740
4741 for action in cases {
4742 let out = init_mgr.process_resource_actions(&link_id, vec![action], &mut rng);
4743 assert!(
4744 out.is_empty(),
4745 "encrypt failure should suppress packet emission"
4746 );
4747 }
4748 }
4749
4750 #[test]
4755 fn test_channel_message_delivery() {
4756 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4757 let mut rng = OsRng;
4758
4759 let chan_actions = init_mgr
4761 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4762 .expect("active link channel send should succeed");
4763 assert!(!chan_actions.is_empty());
4764
4765 let mut got_channel_msg = false;
4767 for action in &chan_actions {
4768 if let LinkManagerAction::SendPacket { raw, .. } = action {
4769 let pkt = RawPacket::unpack(raw).unwrap();
4770 let resp_actions = resp_mgr.handle_local_delivery(
4771 pkt.destination_hash,
4772 raw,
4773 pkt.packet_hash,
4774 rns_core::transport::types::InterfaceId(0),
4775 &mut rng,
4776 );
4777 for a in &resp_actions {
4778 if let LinkManagerAction::ChannelMessageReceived {
4779 msgtype, payload, ..
4780 } = a
4781 {
4782 assert_eq!(*msgtype, 42);
4783 assert_eq!(*payload, b"channel data");
4784 got_channel_msg = true;
4785 }
4786 }
4787 }
4788 }
4789 assert!(got_channel_msg, "Responder should receive channel message");
4790 }
4791
4792 #[test]
4793 fn test_channel_send_drops_packet_when_encrypt_fails() {
4794 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4795 let mut rng = OsRng;
4796 init_mgr
4797 .links
4798 .get_mut(&link_id)
4799 .unwrap()
4800 .engine
4801 .clear_session_for_testing();
4802
4803 let actions = init_mgr
4804 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
4805 .expect("channel should still accept the message locally");
4806
4807 assert!(
4808 actions.is_empty(),
4809 "encrypt failure should suppress channel packet"
4810 );
4811 assert!(
4812 init_mgr
4813 .links
4814 .get(&link_id)
4815 .unwrap()
4816 .pending_channel_packets
4817 .is_empty(),
4818 "failed packet encryption must not track a pending channel proof"
4819 );
4820 }
4821
4822 #[test]
4823 fn test_channel_proof_reopens_send_window() {
4824 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4825 let mut rng = OsRng;
4826
4827 init_mgr
4828 .send_channel_message(&link_id, 42, b"first", &mut rng)
4829 .expect("first send should succeed");
4830 init_mgr
4831 .send_channel_message(&link_id, 42, b"second", &mut rng)
4832 .expect("second send should succeed");
4833
4834 let err = init_mgr
4835 .send_channel_message(&link_id, 42, b"third", &mut rng)
4836 .expect_err("third send should hit the initial channel window");
4837 assert_eq!(err, "Channel is not ready to send");
4838
4839 let queued_packets = init_mgr
4840 .links
4841 .get(&link_id)
4842 .unwrap()
4843 .pending_channel_packets
4844 .clone();
4845 assert_eq!(queued_packets.len(), 2);
4846 for tracked_hash in queued_packets.keys().take(1) {
4847 let mut proof_data = Vec::with_capacity(96);
4848 proof_data.extend_from_slice(tracked_hash);
4849 proof_data.extend_from_slice(&[0x11; 64]);
4850 let flags = PacketFlags {
4851 header_type: constants::HEADER_1,
4852 context_flag: constants::FLAG_UNSET,
4853 transport_type: constants::TRANSPORT_BROADCAST,
4854 destination_type: constants::DESTINATION_LINK,
4855 packet_type: constants::PACKET_TYPE_PROOF,
4856 };
4857 let proof = RawPacket::pack(
4858 flags,
4859 0,
4860 &link_id,
4861 None,
4862 constants::CONTEXT_NONE,
4863 &proof_data,
4864 )
4865 .expect("proof packet should pack");
4866 let ack_actions = init_mgr.handle_local_delivery(
4867 link_id,
4868 &proof.raw,
4869 proof.packet_hash,
4870 rns_core::transport::types::InterfaceId(0),
4871 &mut rng,
4872 );
4873 assert!(
4874 ack_actions.is_empty(),
4875 "proof delivery should only update channel state"
4876 );
4877 }
4878
4879 init_mgr
4880 .send_channel_message(&link_id, 42, b"third", &mut rng)
4881 .expect("proof should free one channel slot");
4882 }
4883
4884 #[test]
4885 fn test_generic_link_data_delivery() {
4886 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
4887 let mut rng = OsRng;
4888
4889 let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
4891 assert_eq!(actions.len(), 1);
4892
4893 let raw = extract_any_send_packet(&actions);
4895 let pkt = RawPacket::unpack(&raw).unwrap();
4896 let resp_actions = resp_mgr.handle_local_delivery(
4897 pkt.destination_hash,
4898 &raw,
4899 pkt.packet_hash,
4900 rns_core::transport::types::InterfaceId(0),
4901 &mut rng,
4902 );
4903
4904 let has_data = resp_actions
4905 .iter()
4906 .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
4907 assert!(
4908 has_data,
4909 "Responder should receive LinkDataReceived for unknown context"
4910 );
4911 }
4912
4913 #[test]
4914 fn test_invalid_encrypted_contexts_are_ignored() {
4915 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4916 let mut rng = OsRng;
4917 let contexts = [
4918 constants::CONTEXT_CHANNEL,
4919 constants::CONTEXT_REQUEST,
4920 constants::CONTEXT_RESPONSE,
4921 constants::CONTEXT_RESOURCE_ADV,
4922 constants::CONTEXT_RESOURCE_REQ,
4923 constants::CONTEXT_RESOURCE_HMU,
4924 constants::CONTEXT_RESOURCE_PRF,
4925 0x42,
4926 ];
4927
4928 for context in contexts {
4929 let flags = PacketFlags {
4930 header_type: constants::HEADER_1,
4931 context_flag: constants::FLAG_UNSET,
4932 transport_type: constants::TRANSPORT_BROADCAST,
4933 destination_type: constants::DESTINATION_LINK,
4934 packet_type: constants::PACKET_TYPE_DATA,
4935 };
4936 let pkt = RawPacket::pack(flags, 0, &link_id, None, context, b"invalid-ciphertext")
4937 .expect("test packet should pack");
4938 let actions = resp_mgr.handle_local_delivery(
4939 pkt.destination_hash,
4940 &pkt.raw,
4941 pkt.packet_hash,
4942 rns_core::transport::types::InterfaceId(0),
4943 &mut rng,
4944 );
4945 assert!(
4946 actions.is_empty(),
4947 "invalid ciphertext for context {context:#x} should be ignored"
4948 );
4949 }
4950 }
4951
4952 #[test]
4953 fn test_resource_part_without_matching_receiver_is_ignored() {
4954 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4955 let mut rng = OsRng;
4956 let flags = PacketFlags {
4957 header_type: constants::HEADER_1,
4958 context_flag: constants::FLAG_UNSET,
4959 transport_type: constants::TRANSPORT_BROADCAST,
4960 destination_type: constants::DESTINATION_LINK,
4961 packet_type: constants::PACKET_TYPE_DATA,
4962 };
4963 let pkt = RawPacket::pack(
4964 flags,
4965 0,
4966 &link_id,
4967 None,
4968 constants::CONTEXT_RESOURCE,
4969 b"orphan-part",
4970 )
4971 .expect("test packet should pack");
4972
4973 let actions = resp_mgr.handle_local_delivery(
4974 pkt.destination_hash,
4975 &pkt.raw,
4976 pkt.packet_hash,
4977 rns_core::transport::types::InterfaceId(0),
4978 &mut rng,
4979 );
4980
4981 assert!(actions.is_empty(), "orphan resource part should be ignored");
4982 }
4983
4984 #[test]
4985 fn test_response_delivery() {
4986 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4987 let mut rng = OsRng;
4988
4989 resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
4991 Some(data.to_vec())
4992 });
4993
4994 let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); assert!(!req_actions.is_empty());
4997
4998 let req_raw = extract_any_send_packet(&req_actions);
5000 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5001 let resp_actions = resp_mgr.handle_local_delivery(
5002 req_pkt.destination_hash,
5003 &req_raw,
5004 req_pkt.packet_hash,
5005 rns_core::transport::types::InterfaceId(0),
5006 &mut rng,
5007 );
5008 let has_resp_send = resp_actions
5009 .iter()
5010 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
5011 assert!(has_resp_send, "Handler should produce response");
5012
5013 let resp_raw = extract_any_send_packet(&resp_actions);
5015 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
5016 let init_actions = init_mgr.handle_local_delivery(
5017 resp_pkt.destination_hash,
5018 &resp_raw,
5019 resp_pkt.packet_hash,
5020 rns_core::transport::types::InterfaceId(0),
5021 &mut rng,
5022 );
5023
5024 let has_response_received = init_actions
5025 .iter()
5026 .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
5027 assert!(
5028 has_response_received,
5029 "Initiator should receive ResponseReceived"
5030 );
5031 }
5032
5033 #[test]
5034 fn test_large_response_uses_resource_fallback() {
5035 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
5036 let mut rng = OsRng;
5037
5038 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5041 resp_mgr.register_request_handler("/large", None, {
5042 let large_payload = large_payload.clone();
5043 move |_link_id, _path, _data, _remote| Some(large_payload.clone())
5044 });
5045
5046 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5048 assert!(!req_actions.is_empty());
5049
5050 let req_raw = extract_any_send_packet(&req_actions);
5052 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5053 let resp_actions = resp_mgr.handle_local_delivery(
5054 req_pkt.destination_hash,
5055 &req_raw,
5056 req_pkt.packet_hash,
5057 rns_core::transport::types::InterfaceId(0),
5058 &mut rng,
5059 );
5060
5061 let mut has_resource_adv = false;
5062 let mut has_direct_response = false;
5063 for action in &resp_actions {
5064 if let LinkManagerAction::SendPacket { raw, .. } = action {
5065 let pkt = RawPacket::unpack(raw).unwrap();
5066 if pkt.context == constants::CONTEXT_RESOURCE_ADV {
5067 has_resource_adv = true;
5068 }
5069 if pkt.context == constants::CONTEXT_RESPONSE {
5070 has_direct_response = true;
5071 }
5072 }
5073 }
5074
5075 assert!(
5076 has_resource_adv,
5077 "Large response should advertise a response resource"
5078 );
5079 assert!(
5080 !has_direct_response,
5081 "Large response should not use direct CONTEXT_RESPONSE packet"
5082 );
5083 }
5084
5085 #[test]
5086 fn test_send_management_response_without_session_key_uses_resource_fallback_path() {
5087 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5088 let mut rng = OsRng;
5089 init_mgr
5090 .links
5091 .get_mut(&link_id)
5092 .unwrap()
5093 .engine
5094 .clear_session_for_testing();
5095
5096 let large_response: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5097 let actions =
5098 init_mgr.send_management_response(&link_id, &[0x11; 16], &large_response, &mut rng);
5099
5100 assert!(
5101 actions.is_empty(),
5102 "without a session key, no response packets should be emitted"
5103 );
5104 assert_eq!(
5105 init_mgr
5106 .links
5107 .get(&link_id)
5108 .map(|managed| managed.outgoing_resources.len()),
5109 Some(1)
5110 );
5111 }
5112
5113 #[test]
5114 fn test_send_channel_message_on_no_channel() {
5115 let mut mgr = LinkManager::new();
5116 let mut rng = OsRng;
5117 let dummy_sig = [0xAA; 32];
5118 let (link_id, _) =
5119 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5120
5121 let err = mgr
5123 .send_channel_message(&link_id, 1, b"test", &mut rng)
5124 .expect_err("pending link should reject channel send");
5125 assert_eq!(err, "link has no active channel");
5126 }
5127
5128 #[test]
5129 fn test_send_on_link_requires_active() {
5130 let mut mgr = LinkManager::new();
5131 let mut rng = OsRng;
5132 let dummy_sig = [0xAA; 32];
5133 let (link_id, _) =
5134 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
5135
5136 let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
5137 assert!(actions.is_empty(), "Cannot send on pending link");
5138 }
5139
5140 #[test]
5141 fn test_send_on_link_unknown_link() {
5142 let mgr = LinkManager::new();
5143 let mut rng = OsRng;
5144
5145 let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
5146 assert!(actions.is_empty());
5147 }
5148
5149 #[test]
5150 fn test_resource_full_transfer_large() {
5151 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5152 let mut rng = OsRng;
5153
5154 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5155
5156 let original_data: Vec<u8> = (0..2000u32)
5158 .map(|i| {
5159 let pos = i as usize;
5160 (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
5161 })
5162 .collect();
5163
5164 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
5165
5166 let mut pending: Vec<(char, LinkManagerAction)> =
5167 adv_actions.into_iter().map(|a| ('i', a)).collect();
5168 let mut rounds = 0;
5169 let max_rounds = 200;
5170 let mut resource_received = false;
5171 let mut sender_completed = false;
5172
5173 while !pending.is_empty() && rounds < max_rounds {
5174 rounds += 1;
5175 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5176
5177 for (source, action) in pending.drain(..) {
5178 if let LinkManagerAction::SendPacket { raw, .. } = action {
5179 let pkt = match RawPacket::unpack(&raw) {
5180 Ok(p) => p,
5181 Err(_) => continue,
5182 };
5183
5184 let target_actions = if source == 'i' {
5185 resp_mgr.handle_local_delivery(
5186 pkt.destination_hash,
5187 &raw,
5188 pkt.packet_hash,
5189 rns_core::transport::types::InterfaceId(0),
5190 &mut rng,
5191 )
5192 } else {
5193 init_mgr.handle_local_delivery(
5194 pkt.destination_hash,
5195 &raw,
5196 pkt.packet_hash,
5197 rns_core::transport::types::InterfaceId(0),
5198 &mut rng,
5199 )
5200 };
5201
5202 let target_source = if source == 'i' { 'r' } else { 'i' };
5203 for a in &target_actions {
5204 match a {
5205 LinkManagerAction::ResourceReceived { data, .. } => {
5206 assert_eq!(*data, original_data);
5207 resource_received = true;
5208 }
5209 LinkManagerAction::ResourceCompleted { .. } => {
5210 sender_completed = true;
5211 }
5212 _ => {}
5213 }
5214 }
5215 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5216 }
5217 }
5218 pending = next;
5219 }
5220
5221 assert!(
5222 resource_received,
5223 "Should receive large resource (rounds={})",
5224 rounds
5225 );
5226 assert!(
5227 sender_completed,
5228 "Sender should complete (rounds={})",
5229 rounds
5230 );
5231 }
5232
5233 #[test]
5234 fn test_resource_receiver_stores_original_advertisement_plaintext() {
5235 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5236 let mut rng = OsRng;
5237
5238 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5239
5240 let data = vec![0x41; 256];
5241 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5242
5243 let adv_raw = adv_actions
5244 .iter()
5245 .find_map(|action| match action {
5246 LinkManagerAction::SendPacket { raw, .. } => {
5247 let pkt = RawPacket::unpack(raw).ok()?;
5248 (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
5249 }
5250 _ => None,
5251 })
5252 .expect("sender should emit a resource advertisement");
5253
5254 let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
5255 let adv_plaintext = resp_mgr
5256 .links
5257 .get(&link_id)
5258 .unwrap()
5259 .engine
5260 .decrypt(&adv_pkt.data)
5261 .unwrap();
5262
5263 let _resp_actions = resp_mgr.handle_local_delivery(
5264 adv_pkt.destination_hash,
5265 &adv_raw,
5266 adv_pkt.packet_hash,
5267 rns_core::transport::types::InterfaceId(0),
5268 &mut rng,
5269 );
5270
5271 let receiver = resp_mgr
5272 .links
5273 .get(&link_id)
5274 .and_then(|managed| managed.incoming_resources.first())
5275 .expect("advertisement should create an incoming receiver");
5276 assert_eq!(receiver.advertisement_packet, adv_plaintext);
5277 assert_eq!(
5278 receiver.max_decompressed_size,
5279 constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
5280 );
5281 }
5282
5283 #[test]
5284 fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
5285 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5286 let mut rng = OsRng;
5287
5288 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5289
5290 let data = vec![b'A'; 4096];
5291 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5292
5293 let mut request_actions = Vec::new();
5294 for action in &adv_actions {
5295 let LinkManagerAction::SendPacket { raw, .. } = action else {
5296 continue;
5297 };
5298 let pkt = RawPacket::unpack(raw).unwrap();
5299 let actions = resp_mgr.handle_local_delivery(
5300 pkt.destination_hash,
5301 raw,
5302 pkt.packet_hash,
5303 rns_core::transport::types::InterfaceId(0),
5304 &mut rng,
5305 );
5306 request_actions.extend(actions);
5307 }
5308
5309 {
5310 let receiver = resp_mgr
5311 .links
5312 .get_mut(&link_id)
5313 .and_then(|managed| managed.incoming_resources.first_mut())
5314 .expect("receiver should exist after advertisement");
5315 assert!(receiver.flags.compressed, "test data should be compressed");
5316 receiver.max_decompressed_size = 64;
5317 }
5318
5319 let mut responder_actions = Vec::new();
5320 for action in request_actions {
5321 let LinkManagerAction::SendPacket { raw, .. } = action else {
5322 continue;
5323 };
5324 let pkt = RawPacket::unpack(&raw).unwrap();
5325 let actions = init_mgr.handle_local_delivery(
5326 pkt.destination_hash,
5327 &raw,
5328 pkt.packet_hash,
5329 rns_core::transport::types::InterfaceId(0),
5330 &mut rng,
5331 );
5332
5333 for action in actions {
5334 let LinkManagerAction::SendPacket { raw, .. } = &action else {
5335 continue;
5336 };
5337 let pkt = RawPacket::unpack(raw).unwrap();
5338 let delivered = resp_mgr.handle_local_delivery(
5339 pkt.destination_hash,
5340 raw,
5341 pkt.packet_hash,
5342 rns_core::transport::types::InterfaceId(0),
5343 &mut rng,
5344 );
5345 responder_actions.extend(delivered);
5346 }
5347 }
5348
5349 assert!(
5350 responder_actions.iter().any(|action| matches!(
5351 action,
5352 LinkManagerAction::ResourceFailed { error, .. }
5353 if error == "Resource too large"
5354 )),
5355 "corrupt oversized resource should fail with TooLarge"
5356 );
5357 assert!(
5358 responder_actions.iter().any(|action| matches!(
5359 action,
5360 LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
5361 )),
5362 "corrupt oversized resource should tear down the link"
5363 );
5364 assert!(
5365 responder_actions.iter().any(|action| match action {
5366 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5367 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
5368 .unwrap_or(false),
5369 _ => false,
5370 }),
5371 "corrupt oversized resource should send a receiver cancel/reject packet"
5372 );
5373 assert_eq!(
5374 resp_mgr
5375 .links
5376 .get(&link_id)
5377 .map(|managed| managed.engine.state()),
5378 Some(LinkState::Closed)
5379 );
5380 }
5381
5382 #[test]
5383 fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
5384 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5385 let mut rng = OsRng;
5386
5387 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
5388
5389 let mut state = 0x1234_5678u32;
5392 let data: Vec<u8> = (0..50000)
5393 .map(|_| {
5394 state = state.wrapping_mul(1664525).wrapping_add(1013904223);
5395 (state >> 16) as u8
5396 })
5397 .collect();
5398 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
5399 let mut pending: Vec<(char, LinkManagerAction)> =
5400 adv_actions.into_iter().map(|a| ('i', a)).collect();
5401
5402 let mut rounds = 0;
5403
5404 while rounds < 300 {
5407 rounds += 1;
5408 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
5409
5410 for (source, action) in pending.drain(..) {
5411 let LinkManagerAction::SendPacket { raw, .. } = action else {
5412 continue;
5413 };
5414
5415 let pkt = match RawPacket::unpack(&raw) {
5416 Ok(p) => p,
5417 Err(_) => continue,
5418 };
5419
5420 let target_actions = if source == 'i' {
5421 resp_mgr.handle_local_delivery(
5422 pkt.destination_hash,
5423 &raw,
5424 pkt.packet_hash,
5425 rns_core::transport::types::InterfaceId(0),
5426 &mut rng,
5427 )
5428 } else {
5429 init_mgr.handle_local_delivery(
5430 pkt.destination_hash,
5431 &raw,
5432 pkt.packet_hash,
5433 rns_core::transport::types::InterfaceId(0),
5434 &mut rng,
5435 )
5436 };
5437
5438 let target_source = if source == 'i' { 'r' } else { 'i' };
5439 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5440 }
5441
5442 if resp_mgr
5443 .links
5444 .get(&link_id)
5445 .and_then(|managed| managed.incoming_resources.first())
5446 .is_some_and(|receiver| receiver.waiting_for_hmu)
5447 {
5448 break;
5449 }
5450
5451 pending = next;
5452 }
5453
5454 assert!(
5455 resp_mgr
5456 .links
5457 .get(&link_id)
5458 .and_then(|managed| managed.incoming_resources.first())
5459 .is_some_and(|receiver| receiver.waiting_for_hmu),
5460 "expected receiver to reach a live HMU wait state"
5461 );
5462
5463 let prime_actions = {
5466 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5467 let receiver = managed.incoming_resources.first_mut().unwrap();
5468 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5469 managed.engine.decrypt(ciphertext).map_err(|_| ())
5470 };
5471 receiver.tick(
5472 receiver.last_activity + 0.0001,
5473 &decrypt_fn,
5474 &Bzip2Compressor,
5475 )
5476 };
5477 assert!(
5478 !prime_actions
5479 .iter()
5480 .any(|a| matches!(a, ResourceAction::SendRequest(_))),
5481 "fresh HMU wait state should not immediately emit a retry request"
5482 );
5483
5484 let (late_delta, retries_before) = {
5485 let managed = resp_mgr
5486 .links
5487 .get_mut(&link_id)
5488 .expect("receiver link should still exist");
5489 let receiver = managed
5490 .incoming_resources
5491 .first_mut()
5492 .expect("receiver should have an active incoming resource");
5493
5494 assert!(
5495 receiver.waiting_for_hmu,
5496 "receiver should be waiting for HMU"
5497 );
5498
5499 let eifr = receiver.eifr.unwrap_or_else(|| {
5500 (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
5501 });
5502 let expected_tof = if receiver.outstanding_parts > 0 {
5503 (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
5504 } else {
5505 (3.0 * constants::RESOURCE_SDU as f64) / eifr
5506 };
5507 let expected_hmu_wait =
5508 (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
5509 let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
5510 + constants::RESOURCE_RETRY_GRACE_TIME;
5511 (
5512 old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
5513 receiver.retries_left,
5514 )
5515 };
5516 {
5517 let managed = resp_mgr.links.get(&link_id).unwrap();
5518 let receiver = managed.incoming_resources.first().unwrap();
5519 assert_eq!(receiver.retries_left, retries_before);
5520 assert!(
5521 receiver.eifr.is_some(),
5522 "receiver tick should have populated EIFR"
5523 );
5524 }
5525
5526 let late_resource_actions = {
5527 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
5528 let receiver = managed.incoming_resources.first_mut().unwrap();
5529 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
5530 managed.engine.decrypt(ciphertext).map_err(|_| ())
5531 };
5532 receiver.tick(
5533 receiver.last_activity + late_delta,
5534 &decrypt_fn,
5535 &Bzip2Compressor,
5536 )
5537 };
5538 let late_actions =
5539 resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
5540 let retry_raw = late_actions
5541 .iter()
5542 .find_map(|a| match a {
5543 LinkManagerAction::SendPacket { raw, .. } => {
5544 let pkt = RawPacket::unpack(raw).ok()?;
5545 (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
5546 }
5547 _ => None,
5548 })
5549 .expect("receiver should emit a resource retry request after extended timeout");
5550
5551 {
5552 let managed = resp_mgr.links.get(&link_id).unwrap();
5553 let receiver = managed.incoming_resources.first().unwrap();
5554 assert_eq!(receiver.retries_left, retries_before - 1);
5555 }
5556
5557 let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
5558 let retry_plaintext = resp_mgr
5559 .links
5560 .get(&link_id)
5561 .unwrap()
5562 .engine
5563 .decrypt(&retry_pkt.data)
5564 .expect("retry request should decrypt");
5565 assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
5566
5567 let retry_to_sender = init_mgr.handle_local_delivery(
5570 retry_pkt.destination_hash,
5571 &retry_raw,
5572 retry_pkt.packet_hash,
5573 rns_core::transport::types::InterfaceId(0),
5574 &mut rng,
5575 );
5576 assert!(
5577 retry_to_sender.iter().any(|a| match a {
5578 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
5579 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
5580 .unwrap_or(false),
5581 _ => false,
5582 }),
5583 "sender should answer the exhausted retry request with a live HMU packet"
5584 );
5585 }
5586
5587 #[test]
5588 fn test_process_resource_actions_mapping() {
5589 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
5590 let mut rng = OsRng;
5591
5592 let actions = vec![
5594 ResourceAction::DataReceived {
5595 data: vec![1, 2, 3],
5596 metadata: Some(vec![4, 5]),
5597 },
5598 ResourceAction::Completed,
5599 ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
5600 ResourceAction::ProgressUpdate {
5601 received: 10,
5602 total: 20,
5603 },
5604 ResourceAction::TeardownLink,
5605 ];
5606
5607 let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
5608
5609 assert!(matches!(
5610 result[0],
5611 LinkManagerAction::ResourceReceived { .. }
5612 ));
5613 assert!(matches!(
5614 result[1],
5615 LinkManagerAction::ResourceCompleted { .. }
5616 ));
5617 assert!(matches!(
5618 result[2],
5619 LinkManagerAction::ResourceFailed { .. }
5620 ));
5621 assert!(matches!(
5622 result[3],
5623 LinkManagerAction::ResourceProgress {
5624 received: 10,
5625 total: 20,
5626 ..
5627 }
5628 ));
5629 assert!(result
5630 .iter()
5631 .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
5632 }
5633
5634 #[test]
5635 fn test_link_state_empty() {
5636 let mgr = LinkManager::new();
5637 let fake_id = [0xAA; 16];
5638 assert!(mgr.link_state(&fake_id).is_none());
5639 }
5640
5641 #[test]
5642 fn test_large_response_resource_completes_as_response() {
5643 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5644 let mut rng = OsRng;
5645
5646 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
5647 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
5648 resp_mgr.register_request_handler("/large", None, {
5649 let response_value = response_value.clone();
5650 move |_link_id, _path, _data, _remote| Some(response_value.clone())
5651 });
5652
5653 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
5654 let req_raw = extract_any_send_packet(&req_actions);
5655 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5656 let request_id = req_pkt.get_truncated_hash();
5657 let resp_actions = resp_mgr.handle_local_delivery(
5658 req_pkt.destination_hash,
5659 &req_raw,
5660 req_pkt.packet_hash,
5661 rns_core::transport::types::InterfaceId(0),
5662 &mut rng,
5663 );
5664
5665 let mut pending: Vec<(char, LinkManagerAction)> =
5666 resp_actions.into_iter().map(|a| ('r', a)).collect();
5667 let mut rounds = 0;
5668 let mut received_response = None;
5669
5670 while !pending.is_empty() && rounds < 200 {
5671 rounds += 1;
5672 let mut next = Vec::new();
5673
5674 for (source, action) in pending.drain(..) {
5675 let LinkManagerAction::SendPacket { raw, .. } = action else {
5676 continue;
5677 };
5678 let pkt = RawPacket::unpack(&raw).unwrap();
5679 let target_actions = if source == 'r' {
5680 init_mgr.handle_local_delivery(
5681 pkt.destination_hash,
5682 &raw,
5683 pkt.packet_hash,
5684 rns_core::transport::types::InterfaceId(0),
5685 &mut rng,
5686 )
5687 } else {
5688 resp_mgr.handle_local_delivery(
5689 pkt.destination_hash,
5690 &raw,
5691 pkt.packet_hash,
5692 rns_core::transport::types::InterfaceId(0),
5693 &mut rng,
5694 )
5695 };
5696
5697 let target_source = if source == 'r' { 'i' } else { 'r' };
5698 for target_action in &target_actions {
5699 match target_action {
5700 LinkManagerAction::ResponseReceived {
5701 request_id: rid,
5702 data,
5703 ..
5704 } => {
5705 received_response = Some((*rid, data.clone()));
5706 }
5707 LinkManagerAction::ResourceReceived { .. } => {
5708 panic!("response resources must complete as ResponseReceived")
5709 }
5710 LinkManagerAction::ResourceAcceptQuery { .. } => {
5711 panic!("response resources must bypass application acceptance")
5712 }
5713 _ => {}
5714 }
5715 }
5716 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5717 }
5718
5719 pending = next;
5720 }
5721
5722 let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
5723 panic!(
5724 "large response resource did not complete as ResponseReceived after {} rounds",
5725 rounds
5726 )
5727 });
5728 assert_eq!(received_request_id, request_id);
5729 assert_eq!(received_data, response_value);
5730 }
5731
5732 #[test]
5733 fn test_response_resource_preserves_metadata() {
5734 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5735 let mut rng = OsRng;
5736
5737 let payload = b"bundle-data".to_vec();
5738 let metadata = b"git-status-ok".to_vec();
5739 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5740 resp_mgr.register_request_handler_response("/fetch", None, {
5741 let response_value = response_value.clone();
5742 let metadata = metadata.clone();
5743 move |_link_id, _path, _data, _remote| {
5744 Some(RequestResponse::Resource {
5745 data: response_value.clone(),
5746 metadata: Some(metadata.clone()),
5747 auto_compress: false,
5748 })
5749 }
5750 });
5751
5752 let req_actions = init_mgr.send_request(&link_id, "/fetch", b"\xc0", &mut rng);
5753 let req_raw = extract_any_send_packet(&req_actions);
5754 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5755 let request_id = req_pkt.get_truncated_hash();
5756 let resp_actions = resp_mgr.handle_local_delivery(
5757 req_pkt.destination_hash,
5758 &req_raw,
5759 req_pkt.packet_hash,
5760 rns_core::transport::types::InterfaceId(0),
5761 &mut rng,
5762 );
5763
5764 let mut pending: Vec<(char, LinkManagerAction)> =
5765 resp_actions.into_iter().map(|a| ('r', a)).collect();
5766 let mut received_response = None;
5767
5768 for _ in 0..200 {
5769 if pending.is_empty() || received_response.is_some() {
5770 break;
5771 }
5772
5773 let mut next = Vec::new();
5774 for (source, action) in pending.drain(..) {
5775 let LinkManagerAction::SendPacket { raw, .. } = action else {
5776 continue;
5777 };
5778 let pkt = RawPacket::unpack(&raw).unwrap();
5779 let target_actions = if source == 'r' {
5780 init_mgr.handle_local_delivery(
5781 pkt.destination_hash,
5782 &raw,
5783 pkt.packet_hash,
5784 rns_core::transport::types::InterfaceId(0),
5785 &mut rng,
5786 )
5787 } else {
5788 resp_mgr.handle_local_delivery(
5789 pkt.destination_hash,
5790 &raw,
5791 pkt.packet_hash,
5792 rns_core::transport::types::InterfaceId(0),
5793 &mut rng,
5794 )
5795 };
5796
5797 let target_source = if source == 'r' { 'i' } else { 'r' };
5798 for target_action in &target_actions {
5799 match target_action {
5800 LinkManagerAction::ResponseReceived {
5801 request_id: rid,
5802 data,
5803 metadata: response_metadata,
5804 ..
5805 } => {
5806 received_response =
5807 Some((*rid, data.clone(), response_metadata.clone()));
5808 }
5809 LinkManagerAction::ResourceReceived { .. } => {
5810 panic!("response resources must complete as ResponseReceived")
5811 }
5812 _ => {}
5813 }
5814 }
5815 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
5816 }
5817 pending = next;
5818 }
5819
5820 let (received_request_id, received_data, received_metadata) = received_response
5821 .expect("resource response with metadata should complete as ResponseReceived");
5822 assert_eq!(received_request_id, request_id);
5823 assert_eq!(received_data, response_value);
5824 assert_eq!(received_metadata, Some(metadata));
5825 }
5826
5827 #[test]
5828 fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
5829 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
5830 let mut rng = OsRng;
5831
5832 init_mgr.set_link_mtu(&link_id, 300);
5833 resp_mgr.set_link_mtu(&link_id, 300);
5834
5835 let payload = vec![0xAB; 350];
5836 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5837 resp_mgr.register_request_handler("/mtu", None, {
5838 let response_value = response_value.clone();
5839 move |_link_id, _path, _data, _remote| Some(response_value.clone())
5840 });
5841
5842 let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
5843 let req_raw = extract_any_send_packet(&req_actions);
5844 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
5845 let resp_actions = resp_mgr.handle_local_delivery(
5846 req_pkt.destination_hash,
5847 &req_raw,
5848 req_pkt.packet_hash,
5849 rns_core::transport::types::InterfaceId(0),
5850 &mut rng,
5851 );
5852
5853 let mut has_resource_adv = false;
5854 let mut direct_response_len = None;
5855 for action in &resp_actions {
5856 if let LinkManagerAction::SendPacket { raw, .. } = action {
5857 let pkt = RawPacket::unpack(raw).unwrap();
5858 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5859 if pkt.context == constants::CONTEXT_RESPONSE {
5860 direct_response_len = Some(raw.len());
5861 }
5862 }
5863 }
5864
5865 assert!(
5866 has_resource_adv,
5867 "responses larger than the negotiated link MTU should use resource fallback"
5868 );
5869 assert!(
5870 direct_response_len.is_none(),
5871 "sent direct response of {} bytes on a 300 byte negotiated MTU",
5872 direct_response_len.unwrap_or_default()
5873 );
5874 }
5875
5876 #[test]
5877 fn test_large_management_response_uses_resource_fallback() {
5878 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
5879 let mut rng = OsRng;
5880
5881 let payload = vec![0xBC; 5000];
5882 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
5883 let actions =
5884 resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
5885
5886 let mut has_resource_adv = false;
5887 let mut has_direct_response = false;
5888 for action in &actions {
5889 if let LinkManagerAction::SendPacket { raw, .. } = action {
5890 let pkt = RawPacket::unpack(raw).unwrap();
5891 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
5892 has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
5893 }
5894 }
5895
5896 assert!(
5897 has_resource_adv,
5898 "large management responses should advertise a response resource"
5899 );
5900 assert!(
5901 !has_direct_response,
5902 "large management responses should not use a direct CONTEXT_RESPONSE packet"
5903 );
5904 }
5905}