1use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26 AcceptNone,
28 AcceptAll,
30 AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35 fn default() -> Self {
36 ResourceStrategy::AcceptNone
37 }
38}
39
40struct ManagedLink {
42 engine: LinkEngine,
43 channel: Option<Channel>,
44 pending_channel_packets: HashMap<[u8; 32], Sequence>,
45 channel_send_ok: u64,
46 channel_send_not_ready: u64,
47 channel_send_too_big: u64,
48 channel_send_other_error: u64,
49 channel_messages_received: u64,
50 channel_proofs_sent: u64,
51 channel_proofs_received: u64,
52 dest_hash: [u8; 16],
54 remote_identity: Option<([u8; 16], [u8; 64])>,
56 dest_sig_pub_bytes: Option<[u8; 32]>,
58 incoming_resources: Vec<ResourceReceiver>,
60 outgoing_resources: Vec<ResourceSender>,
62 resource_strategy: ResourceStrategy,
64 route_interface: Option<rns_core::transport::types::InterfaceId>,
66 route_transport_id: Option<[u8; 16]>,
71}
72
73struct LinkDestination {
75 sig_prv: Ed25519PrivateKey,
76 sig_pub_bytes: [u8; 32],
77 resource_strategy: ResourceStrategy,
78}
79
80struct RequestHandlerEntry {
82 path: String,
84 path_hash: [u8; 16],
86 allowed_list: Option<Vec<[u8; 16]>>,
88 handler:
90 Box<dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>> + Send>,
91}
92
93#[derive(Debug)]
95pub enum LinkManagerAction {
96 SendPacket {
98 raw: Vec<u8>,
99 dest_type: u8,
100 attached_interface: Option<rns_core::transport::types::InterfaceId>,
101 },
102 LinkEstablished {
104 link_id: LinkId,
105 dest_hash: [u8; 16],
106 rtt: f64,
107 is_initiator: bool,
108 },
109 LinkClosed {
111 link_id: LinkId,
112 reason: Option<TeardownReason>,
113 },
114 RemoteIdentified {
116 link_id: LinkId,
117 identity_hash: [u8; 16],
118 public_key: [u8; 64],
119 },
120 RegisterLinkDest { link_id: LinkId },
122 DeregisterLinkDest { link_id: LinkId },
124 ManagementRequest {
127 link_id: LinkId,
128 path_hash: [u8; 16],
129 data: Vec<u8>,
131 request_id: [u8; 16],
133 remote_identity: Option<([u8; 16], [u8; 64])>,
134 },
135 ResourceReceived {
137 link_id: LinkId,
138 data: Vec<u8>,
139 metadata: Option<Vec<u8>>,
140 },
141 ResourceCompleted { link_id: LinkId },
143 ResourceFailed { link_id: LinkId, error: String },
145 ResourceProgress {
147 link_id: LinkId,
148 received: usize,
149 total: usize,
150 },
151 ResourceAcceptQuery {
153 link_id: LinkId,
154 resource_hash: Vec<u8>,
155 transfer_size: u64,
156 has_metadata: bool,
157 },
158 ChannelMessageReceived {
160 link_id: LinkId,
161 msgtype: u16,
162 payload: Vec<u8>,
163 },
164 LinkDataReceived {
166 link_id: LinkId,
167 context: u8,
168 data: Vec<u8>,
169 },
170 ResponseReceived {
172 link_id: LinkId,
173 request_id: [u8; 16],
174 data: Vec<u8>,
175 },
176 LinkRequestReceived {
178 link_id: LinkId,
179 receiving_interface: rns_core::transport::types::InterfaceId,
180 },
181}
182
183pub struct LinkManager {
185 links: HashMap<LinkId, ManagedLink>,
186 link_destinations: HashMap<[u8; 16], LinkDestination>,
187 request_handlers: Vec<RequestHandlerEntry>,
188 management_paths: Vec<[u8; 16]>,
191}
192
193impl LinkManager {
194 fn resource_sdu_for_link(link: &ManagedLink) -> usize {
195 let mtu = link.engine.mtu() as usize;
198 let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
199 if derived > 0 {
200 derived
201 } else {
202 constants::RESOURCE_SDU
203 }
204 }
205
206 pub fn new() -> Self {
208 LinkManager {
209 links: HashMap::new(),
210 link_destinations: HashMap::new(),
211 request_handlers: Vec::new(),
212 management_paths: Vec::new(),
213 }
214 }
215
216 pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
220 if !self.management_paths.contains(&path_hash) {
221 self.management_paths.push(path_hash);
222 }
223 }
224
225 pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
227 self.links
228 .get(link_id)
229 .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
230 }
231
232 pub fn get_link_route_hint(
234 &self,
235 link_id: &LinkId,
236 ) -> Option<(rns_core::transport::types::InterfaceId, Option<[u8; 16]>)> {
237 self.links.get(link_id).and_then(|link| {
238 link.route_interface
239 .map(|iface| (iface, link.route_transport_id))
240 })
241 }
242
243 pub fn register_link_destination(
245 &mut self,
246 dest_hash: [u8; 16],
247 sig_prv: Ed25519PrivateKey,
248 sig_pub_bytes: [u8; 32],
249 resource_strategy: ResourceStrategy,
250 ) {
251 self.link_destinations.insert(
252 dest_hash,
253 LinkDestination {
254 sig_prv,
255 sig_pub_bytes,
256 resource_strategy,
257 },
258 );
259 }
260
261 pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
263 self.link_destinations.remove(dest_hash);
264 }
265
266 pub fn register_request_handler<F>(
272 &mut self,
273 path: &str,
274 allowed_list: Option<Vec<[u8; 16]>>,
275 handler: F,
276 ) where
277 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
278 + Send
279 + 'static,
280 {
281 let path_hash = compute_path_hash(path);
282 self.request_handlers.push(RequestHandlerEntry {
283 path: path.to_string(),
284 path_hash,
285 allowed_list,
286 handler: Box::new(handler),
287 });
288 }
289
290 pub fn create_link(
298 &mut self,
299 dest_hash: &[u8; 16],
300 dest_sig_pub_bytes: &[u8; 32],
301 hops: u8,
302 mtu: u32,
303 rng: &mut dyn Rng,
304 ) -> (LinkId, Vec<LinkManagerAction>) {
305 let mode = LinkMode::Aes256Cbc;
306 let (mut engine, request_data) =
307 LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
308
309 let flags = PacketFlags {
311 header_type: constants::HEADER_1,
312 context_flag: constants::FLAG_UNSET,
313 transport_type: constants::TRANSPORT_BROADCAST,
314 destination_type: constants::DESTINATION_SINGLE,
315 packet_type: constants::PACKET_TYPE_LINKREQUEST,
316 };
317
318 let packet = match RawPacket::pack(
319 flags,
320 0,
321 dest_hash,
322 None,
323 constants::CONTEXT_NONE,
324 &request_data,
325 ) {
326 Ok(p) => p,
327 Err(_) => {
328 return ([0u8; 16], Vec::new());
330 }
331 };
332
333 engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
334 let link_id = *engine.link_id();
335
336 let managed = ManagedLink {
337 engine,
338 channel: None,
339 pending_channel_packets: HashMap::new(),
340 channel_send_ok: 0,
341 channel_send_not_ready: 0,
342 channel_send_too_big: 0,
343 channel_send_other_error: 0,
344 channel_messages_received: 0,
345 channel_proofs_sent: 0,
346 channel_proofs_received: 0,
347 dest_hash: *dest_hash,
348 remote_identity: None,
349 dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
350 incoming_resources: Vec::new(),
351 outgoing_resources: Vec::new(),
352 resource_strategy: ResourceStrategy::default(),
353 route_interface: None,
354 route_transport_id: None,
355 };
356 self.links.insert(link_id, managed);
357
358 let mut actions = Vec::new();
359 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
361 actions.push(LinkManagerAction::SendPacket {
363 raw: packet.raw,
364 dest_type: constants::DESTINATION_LINK,
365 attached_interface: None,
366 });
367
368 (link_id, actions)
369 }
370
371 pub fn handle_local_delivery(
377 &mut self,
378 dest_hash: [u8; 16],
379 raw: &[u8],
380 packet_hash: [u8; 32],
381 receiving_interface: rns_core::transport::types::InterfaceId,
382 rng: &mut dyn Rng,
383 ) -> Vec<LinkManagerAction> {
384 let packet = match RawPacket::unpack(raw) {
385 Ok(p) => p,
386 Err(_) => return Vec::new(),
387 };
388
389 match packet.flags.packet_type {
390 constants::PACKET_TYPE_LINKREQUEST => {
391 self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
392 }
393 constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
394 self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
396 }
397 constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
398 constants::PACKET_TYPE_DATA => {
399 self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
400 }
401 _ => Vec::new(),
402 }
403 }
404
405 fn handle_linkrequest(
407 &mut self,
408 dest_hash: &[u8; 16],
409 packet: &RawPacket,
410 receiving_interface: rns_core::transport::types::InterfaceId,
411 rng: &mut dyn Rng,
412 ) -> Vec<LinkManagerAction> {
413 let ld = match self.link_destinations.get(dest_hash) {
415 Some(ld) => ld,
416 None => return Vec::new(),
417 };
418
419 let hashable = packet.get_hashable_part();
420 let now = time::now();
421
422 let (engine, lrproof_data) = match LinkEngine::new_responder(
424 &ld.sig_prv,
425 &ld.sig_pub_bytes,
426 &packet.data,
427 &hashable,
428 dest_hash,
429 packet.hops,
430 now,
431 rng,
432 ) {
433 Ok(r) => r,
434 Err(e) => {
435 log::debug!("LINKREQUEST rejected: {}", e);
436 return Vec::new();
437 }
438 };
439
440 let link_id = *engine.link_id();
441 log::debug!(
442 "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
443 &link_id[..4],
444 receiving_interface.0,
445 packet.flags.header_type,
446 packet.transport_id.is_some(),
447 packet.hops
448 );
449
450 let managed = ManagedLink {
451 engine,
452 channel: None,
453 pending_channel_packets: HashMap::new(),
454 channel_send_ok: 0,
455 channel_send_not_ready: 0,
456 channel_send_too_big: 0,
457 channel_send_other_error: 0,
458 channel_messages_received: 0,
459 channel_proofs_sent: 0,
460 channel_proofs_received: 0,
461 dest_hash: *dest_hash,
462 remote_identity: None,
463 dest_sig_pub_bytes: None,
464 incoming_resources: Vec::new(),
465 outgoing_resources: Vec::new(),
466 resource_strategy: ld.resource_strategy,
467 route_interface: Some(receiving_interface),
468 route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
469 packet.transport_id
470 } else {
471 None
472 },
473 };
474 self.links.insert(link_id, managed);
475
476 let flags = PacketFlags {
478 header_type: constants::HEADER_1,
479 context_flag: constants::FLAG_UNSET,
480 transport_type: constants::TRANSPORT_BROADCAST,
481 destination_type: constants::DESTINATION_LINK,
482 packet_type: constants::PACKET_TYPE_PROOF,
483 };
484
485 let mut actions = Vec::new();
486
487 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
489
490 if let Ok(pkt) = RawPacket::pack(
491 flags,
492 packet.hops,
493 &link_id,
494 None,
495 constants::CONTEXT_LRPROOF,
496 &lrproof_data,
497 ) {
498 log::debug!(
499 "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
500 &link_id[..4],
501 receiving_interface.0,
502 packet.transport_id.is_some(),
503 packet.hops
504 );
505 actions.push(LinkManagerAction::SendPacket {
506 raw: pkt.raw,
507 dest_type: constants::DESTINATION_LINK,
508 attached_interface: None,
509 });
510 }
511
512 if packet.hops != 0 {
515 if let Ok(pkt0) = RawPacket::pack(
516 flags,
517 0,
518 &link_id,
519 None,
520 constants::CONTEXT_LRPROOF,
521 &lrproof_data,
522 ) {
523 log::debug!(
524 "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
525 &link_id[..4],
526 receiving_interface.0
527 );
528 actions.push(LinkManagerAction::SendPacket {
529 raw: pkt0.raw,
530 dest_type: constants::DESTINATION_LINK,
531 attached_interface: None,
532 });
533 }
534 }
535
536 if packet.hops < u8::MAX {
540 let hops_plus_one = packet.hops + 1;
541 if let Ok(pkt2) = RawPacket::pack(
542 flags,
543 hops_plus_one,
544 &link_id,
545 None,
546 constants::CONTEXT_LRPROOF,
547 &lrproof_data,
548 ) {
549 log::debug!(
550 "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
551 &link_id[..4],
552 receiving_interface.0,
553 hops_plus_one
554 );
555 actions.push(LinkManagerAction::SendPacket {
556 raw: pkt2.raw,
557 dest_type: constants::DESTINATION_LINK,
558 attached_interface: None,
559 });
560 }
561 }
562
563 actions.push(LinkManagerAction::LinkRequestReceived {
565 link_id,
566 receiving_interface,
567 });
568
569 actions
570 }
571
572 fn handle_link_proof(
573 &mut self,
574 link_id: &LinkId,
575 packet: &RawPacket,
576 rng: &mut dyn Rng,
577 ) -> Vec<LinkManagerAction> {
578 if packet.data.len() < 32 {
579 return Vec::new();
580 }
581
582 let mut tracked_hash = [0u8; 32];
583 tracked_hash.copy_from_slice(&packet.data[..32]);
584
585 let Some(link) = self.links.get_mut(link_id) else {
586 return Vec::new();
587 };
588 let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
589 return Vec::new();
590 };
591 link.channel_proofs_received += 1;
592 let Some(channel) = link.channel.as_mut() else {
593 return Vec::new();
594 };
595
596 let chan_actions = channel.packet_delivered(sequence);
597 let _ = channel;
598 let _ = link;
599 self.process_channel_actions(link_id, chan_actions, rng)
600 }
601
602 fn build_link_packet_proof(
603 &mut self,
604 link_id: &LinkId,
605 packet_hash: &[u8; 32],
606 ) -> Vec<LinkManagerAction> {
607 let dest_hash = match self.links.get(link_id) {
608 Some(link) => link.dest_hash,
609 None => return Vec::new(),
610 };
611 let Some(ld) = self.link_destinations.get(&dest_hash) else {
612 return Vec::new();
613 };
614 if let Some(link) = self.links.get_mut(link_id) {
615 link.channel_proofs_sent += 1;
616 }
617
618 let signature = ld.sig_prv.sign(packet_hash);
619 let mut proof_data = Vec::with_capacity(96);
620 proof_data.extend_from_slice(packet_hash);
621 proof_data.extend_from_slice(&signature);
622
623 let flags = PacketFlags {
624 header_type: constants::HEADER_1,
625 context_flag: constants::FLAG_UNSET,
626 transport_type: constants::TRANSPORT_BROADCAST,
627 destination_type: constants::DESTINATION_LINK,
628 packet_type: constants::PACKET_TYPE_PROOF,
629 };
630 if let Ok(pkt) = RawPacket::pack(
631 flags,
632 0,
633 link_id,
634 None,
635 constants::CONTEXT_NONE,
636 &proof_data,
637 ) {
638 vec![LinkManagerAction::SendPacket {
639 raw: pkt.raw,
640 dest_type: constants::DESTINATION_LINK,
641 attached_interface: None,
642 }]
643 } else {
644 Vec::new()
645 }
646 }
647
648 fn handle_lrproof(
650 &mut self,
651 link_id_bytes: &[u8; 16],
652 packet: &RawPacket,
653 receiving_interface: rns_core::transport::types::InterfaceId,
654 rng: &mut dyn Rng,
655 ) -> Vec<LinkManagerAction> {
656 let link = match self.links.get_mut(link_id_bytes) {
657 Some(l) => l,
658 None => return Vec::new(),
659 };
660
661 link.route_interface = Some(receiving_interface);
662 if packet.flags.header_type == constants::HEADER_2 {
663 if let Some(transport_id) = packet.transport_id {
664 link.route_transport_id = Some(transport_id);
665 }
666 }
667 log::debug!(
668 "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
669 &link_id_bytes[..4],
670 receiving_interface.0,
671 packet.flags.header_type,
672 packet.transport_id.is_some()
673 );
674
675 if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
676 return Vec::new();
677 }
678
679 let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
681 Some(b) => b,
682 None => {
683 log::debug!("LRPROOF: no destination signing key available");
684 return Vec::new();
685 }
686 };
687
688 let now = time::now();
689 let (lrrtt_encrypted, link_actions) =
690 match link
691 .engine
692 .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
693 {
694 Ok(r) => r,
695 Err(e) => {
696 log::debug!("LRPROOF validation failed: {}", e);
697 return Vec::new();
698 }
699 };
700
701 let link_id = *link.engine.link_id();
702 let mut actions = Vec::new();
703
704 actions.extend(self.process_link_actions(&link_id, &link_actions));
706
707 let flags = PacketFlags {
709 header_type: constants::HEADER_1,
710 context_flag: constants::FLAG_UNSET,
711 transport_type: constants::TRANSPORT_BROADCAST,
712 destination_type: constants::DESTINATION_LINK,
713 packet_type: constants::PACKET_TYPE_DATA,
714 };
715
716 if let Ok(pkt) = RawPacket::pack(
717 flags,
718 0,
719 &link_id,
720 None,
721 constants::CONTEXT_LRRTT,
722 &lrrtt_encrypted,
723 ) {
724 actions.push(LinkManagerAction::SendPacket {
725 raw: pkt.raw,
726 dest_type: constants::DESTINATION_LINK,
727 attached_interface: None,
728 });
729 }
730
731 if let Some(link) = self.links.get_mut(&link_id) {
733 if link.engine.state() == LinkState::Active {
734 let rtt = link.engine.rtt().unwrap_or(1.0);
735 link.channel = Some(Channel::new(rtt));
736 }
737 }
738
739 actions
740 }
741
742 fn handle_link_data(
748 &mut self,
749 link_id_bytes: &[u8; 16],
750 packet: &RawPacket,
751 packet_hash: [u8; 32],
752 receiving_interface: rns_core::transport::types::InterfaceId,
753 rng: &mut dyn Rng,
754 ) -> Vec<LinkManagerAction> {
755 enum LinkDataResult {
757 Lrrtt {
758 link_id: LinkId,
759 link_actions: Vec<LinkAction>,
760 },
761 Identify {
762 link_id: LinkId,
763 link_actions: Vec<LinkAction>,
764 },
765 Keepalive {
766 link_id: LinkId,
767 inbound_actions: Vec<LinkAction>,
768 },
769 LinkClose {
770 link_id: LinkId,
771 teardown_actions: Vec<LinkAction>,
772 },
773 Channel {
774 link_id: LinkId,
775 inbound_actions: Vec<LinkAction>,
776 plaintext: Vec<u8>,
777 packet_hash: [u8; 32],
778 },
779 Request {
780 link_id: LinkId,
781 inbound_actions: Vec<LinkAction>,
782 plaintext: Vec<u8>,
783 request_id: [u8; 16],
784 },
785 Response {
786 link_id: LinkId,
787 inbound_actions: Vec<LinkAction>,
788 plaintext: Vec<u8>,
789 },
790 Generic {
791 link_id: LinkId,
792 inbound_actions: Vec<LinkAction>,
793 plaintext: Vec<u8>,
794 context: u8,
795 packet_hash: [u8; 32],
796 },
797 ResourceAdv {
799 link_id: LinkId,
800 inbound_actions: Vec<LinkAction>,
801 plaintext: Vec<u8>,
802 },
803 ResourceReq {
805 link_id: LinkId,
806 inbound_actions: Vec<LinkAction>,
807 plaintext: Vec<u8>,
808 },
809 ResourceHmu {
811 link_id: LinkId,
812 inbound_actions: Vec<LinkAction>,
813 plaintext: Vec<u8>,
814 },
815 ResourcePart {
817 link_id: LinkId,
818 inbound_actions: Vec<LinkAction>,
819 raw_data: Vec<u8>,
820 },
821 ResourcePrf {
823 link_id: LinkId,
824 inbound_actions: Vec<LinkAction>,
825 plaintext: Vec<u8>,
826 },
827 ResourceIcl {
829 link_id: LinkId,
830 inbound_actions: Vec<LinkAction>,
831 },
832 ResourceRcl {
834 link_id: LinkId,
835 inbound_actions: Vec<LinkAction>,
836 },
837 Error,
838 }
839
840 let result = {
841 let link = match self.links.get_mut(link_id_bytes) {
842 Some(l) => l,
843 None => return Vec::new(),
844 };
845
846 link.route_interface = Some(receiving_interface);
847 if packet.flags.header_type == constants::HEADER_2 {
848 if let Some(transport_id) = packet.transport_id {
849 link.route_transport_id = Some(transport_id);
850 }
851 }
852
853 match packet.context {
854 constants::CONTEXT_LRRTT => {
855 match link.engine.handle_lrrtt(&packet.data, time::now()) {
856 Ok(link_actions) => {
857 let link_id = *link.engine.link_id();
858 LinkDataResult::Lrrtt {
859 link_id,
860 link_actions,
861 }
862 }
863 Err(e) => {
864 log::debug!("LRRTT handling failed: {}", e);
865 LinkDataResult::Error
866 }
867 }
868 }
869 constants::CONTEXT_LINKIDENTIFY => {
870 match link.engine.handle_identify(&packet.data) {
871 Ok(link_actions) => {
872 let link_id = *link.engine.link_id();
873 link.remote_identity = link.engine.remote_identity().cloned();
874 LinkDataResult::Identify {
875 link_id,
876 link_actions,
877 }
878 }
879 Err(e) => {
880 log::debug!("LINKIDENTIFY failed: {}", e);
881 LinkDataResult::Error
882 }
883 }
884 }
885 constants::CONTEXT_KEEPALIVE => {
886 let inbound_actions = link.engine.record_inbound(time::now());
887 let link_id = *link.engine.link_id();
888 LinkDataResult::Keepalive {
889 link_id,
890 inbound_actions,
891 }
892 }
893 constants::CONTEXT_LINKCLOSE => {
894 let teardown_actions = link.engine.handle_teardown();
895 let link_id = *link.engine.link_id();
896 LinkDataResult::LinkClose {
897 link_id,
898 teardown_actions,
899 }
900 }
901 constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
902 Ok(plaintext) => {
903 let inbound_actions = link.engine.record_inbound(time::now());
904 let link_id = *link.engine.link_id();
905 LinkDataResult::Channel {
906 link_id,
907 inbound_actions,
908 plaintext,
909 packet_hash,
910 }
911 }
912 Err(_) => LinkDataResult::Error,
913 },
914 constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
915 Ok(plaintext) => {
916 let inbound_actions = link.engine.record_inbound(time::now());
917 let link_id = *link.engine.link_id();
918 let request_id = packet.get_truncated_hash();
919 LinkDataResult::Request {
920 link_id,
921 inbound_actions,
922 plaintext,
923 request_id,
924 }
925 }
926 Err(_) => LinkDataResult::Error,
927 },
928 constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
929 Ok(plaintext) => {
930 let inbound_actions = link.engine.record_inbound(time::now());
931 let link_id = *link.engine.link_id();
932 LinkDataResult::Response {
933 link_id,
934 inbound_actions,
935 plaintext,
936 }
937 }
938 Err(_) => LinkDataResult::Error,
939 },
940 constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
942 Ok(plaintext) => {
943 let inbound_actions = link.engine.record_inbound(time::now());
944 let link_id = *link.engine.link_id();
945 LinkDataResult::ResourceAdv {
946 link_id,
947 inbound_actions,
948 plaintext,
949 }
950 }
951 Err(_) => LinkDataResult::Error,
952 },
953 constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
954 Ok(plaintext) => {
955 let inbound_actions = link.engine.record_inbound(time::now());
956 let link_id = *link.engine.link_id();
957 LinkDataResult::ResourceReq {
958 link_id,
959 inbound_actions,
960 plaintext,
961 }
962 }
963 Err(_) => LinkDataResult::Error,
964 },
965 constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
966 Ok(plaintext) => {
967 let inbound_actions = link.engine.record_inbound(time::now());
968 let link_id = *link.engine.link_id();
969 LinkDataResult::ResourceHmu {
970 link_id,
971 inbound_actions,
972 plaintext,
973 }
974 }
975 Err(_) => LinkDataResult::Error,
976 },
977 constants::CONTEXT_RESOURCE => {
978 let inbound_actions = link.engine.record_inbound(time::now());
980 let link_id = *link.engine.link_id();
981 LinkDataResult::ResourcePart {
982 link_id,
983 inbound_actions,
984 raw_data: packet.data.clone(),
985 }
986 }
987 constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
988 Ok(plaintext) => {
989 let inbound_actions = link.engine.record_inbound(time::now());
990 let link_id = *link.engine.link_id();
991 LinkDataResult::ResourcePrf {
992 link_id,
993 inbound_actions,
994 plaintext,
995 }
996 }
997 Err(_) => LinkDataResult::Error,
998 },
999 constants::CONTEXT_RESOURCE_ICL => {
1000 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1002 let link_id = *link.engine.link_id();
1003 LinkDataResult::ResourceIcl {
1004 link_id,
1005 inbound_actions,
1006 }
1007 }
1008 constants::CONTEXT_RESOURCE_RCL => {
1009 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1011 let link_id = *link.engine.link_id();
1012 LinkDataResult::ResourceRcl {
1013 link_id,
1014 inbound_actions,
1015 }
1016 }
1017 _ => match link.engine.decrypt(&packet.data) {
1018 Ok(plaintext) => {
1019 let inbound_actions = link.engine.record_inbound(time::now());
1020 let link_id = *link.engine.link_id();
1021 LinkDataResult::Generic {
1022 link_id,
1023 inbound_actions,
1024 plaintext,
1025 context: packet.context,
1026 packet_hash,
1027 }
1028 }
1029 Err(_) => LinkDataResult::Error,
1030 },
1031 }
1032 }; let mut actions = Vec::new();
1036 match result {
1037 LinkDataResult::Lrrtt {
1038 link_id,
1039 link_actions,
1040 } => {
1041 actions.extend(self.process_link_actions(&link_id, &link_actions));
1042 if let Some(link) = self.links.get_mut(&link_id) {
1044 if link.engine.state() == LinkState::Active {
1045 let rtt = link.engine.rtt().unwrap_or(1.0);
1046 link.channel = Some(Channel::new(rtt));
1047 }
1048 }
1049 }
1050 LinkDataResult::Identify {
1051 link_id,
1052 link_actions,
1053 } => {
1054 actions.extend(self.process_link_actions(&link_id, &link_actions));
1055 }
1056 LinkDataResult::Keepalive {
1057 link_id,
1058 inbound_actions,
1059 } => {
1060 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1061 }
1067 LinkDataResult::LinkClose {
1068 link_id,
1069 teardown_actions,
1070 } => {
1071 actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1072 }
1073 LinkDataResult::Channel {
1074 link_id,
1075 inbound_actions,
1076 plaintext,
1077 packet_hash,
1078 } => {
1079 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1080 if let Some(link) = self.links.get_mut(&link_id) {
1082 if let Some(ref mut channel) = link.channel {
1083 let chan_actions = channel.receive(&plaintext, time::now());
1084 link.channel_messages_received += chan_actions
1085 .iter()
1086 .filter(|action| {
1087 matches!(
1088 action,
1089 rns_core::channel::ChannelAction::MessageReceived { .. }
1090 )
1091 })
1092 .count()
1093 as u64;
1094 let _ = link;
1096 actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1097 }
1098 }
1099 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1100 }
1101 LinkDataResult::Request {
1102 link_id,
1103 inbound_actions,
1104 plaintext,
1105 request_id,
1106 } => {
1107 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1108 actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1109 }
1110 LinkDataResult::Response {
1111 link_id,
1112 inbound_actions,
1113 plaintext,
1114 } => {
1115 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1116 actions.extend(self.handle_response(&link_id, &plaintext));
1118 }
1119 LinkDataResult::Generic {
1120 link_id,
1121 inbound_actions,
1122 plaintext,
1123 context,
1124 packet_hash,
1125 } => {
1126 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1127 actions.push(LinkManagerAction::LinkDataReceived {
1128 link_id,
1129 context,
1130 data: plaintext,
1131 });
1132
1133 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1134 }
1135 LinkDataResult::ResourceAdv {
1136 link_id,
1137 inbound_actions,
1138 plaintext,
1139 } => {
1140 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1141 actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1142 }
1143 LinkDataResult::ResourceReq {
1144 link_id,
1145 inbound_actions,
1146 plaintext,
1147 } => {
1148 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1149 actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1150 }
1151 LinkDataResult::ResourceHmu {
1152 link_id,
1153 inbound_actions,
1154 plaintext,
1155 } => {
1156 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1157 actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1158 }
1159 LinkDataResult::ResourcePart {
1160 link_id,
1161 inbound_actions,
1162 raw_data,
1163 } => {
1164 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1165 actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1166 }
1167 LinkDataResult::ResourcePrf {
1168 link_id,
1169 inbound_actions,
1170 plaintext,
1171 } => {
1172 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1173 actions.extend(self.handle_resource_prf(&link_id, &plaintext));
1174 }
1175 LinkDataResult::ResourceIcl {
1176 link_id,
1177 inbound_actions,
1178 } => {
1179 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1180 actions.extend(self.handle_resource_icl(&link_id));
1181 }
1182 LinkDataResult::ResourceRcl {
1183 link_id,
1184 inbound_actions,
1185 } => {
1186 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1187 actions.extend(self.handle_resource_rcl(&link_id));
1188 }
1189 LinkDataResult::Error => {}
1190 }
1191
1192 actions
1193 }
1194
1195 fn handle_request(
1197 &mut self,
1198 link_id: &LinkId,
1199 plaintext: &[u8],
1200 request_id: [u8; 16],
1201 rng: &mut dyn Rng,
1202 ) -> Vec<LinkManagerAction> {
1203 use rns_core::msgpack::{self, Value};
1204
1205 let arr = match msgpack::unpack_exact(plaintext) {
1207 Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1208 _ => return Vec::new(),
1209 };
1210
1211 let path_hash_bytes = match &arr[1] {
1212 Value::Bin(b) if b.len() == 16 => b,
1213 _ => return Vec::new(),
1214 };
1215 let mut path_hash = [0u8; 16];
1216 path_hash.copy_from_slice(path_hash_bytes);
1217
1218 let request_data = msgpack::pack(&arr[2]);
1220
1221 if self.management_paths.contains(&path_hash) {
1223 let remote_identity = self
1224 .links
1225 .get(link_id)
1226 .and_then(|l| l.remote_identity)
1227 .map(|(h, k)| (h, k));
1228 return vec![LinkManagerAction::ManagementRequest {
1229 link_id: *link_id,
1230 path_hash,
1231 data: request_data,
1232 request_id,
1233 remote_identity,
1234 }];
1235 }
1236
1237 let handler_idx = self
1239 .request_handlers
1240 .iter()
1241 .position(|h| h.path_hash == path_hash);
1242 let handler_idx = match handler_idx {
1243 Some(i) => i,
1244 None => return Vec::new(),
1245 };
1246
1247 let remote_identity = self
1249 .links
1250 .get(link_id)
1251 .and_then(|l| l.remote_identity.as_ref());
1252 let handler = &self.request_handlers[handler_idx];
1253 if let Some(ref allowed) = handler.allowed_list {
1254 match remote_identity {
1255 Some((identity_hash, _)) => {
1256 if !allowed.contains(identity_hash) {
1257 log::debug!("Request denied: identity not in allowed list");
1258 return Vec::new();
1259 }
1260 }
1261 None => {
1262 log::debug!("Request denied: peer not identified");
1263 return Vec::new();
1264 }
1265 }
1266 }
1267
1268 let path = handler.path.clone();
1270 let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1271
1272 let mut actions = Vec::new();
1273 if let Some(response_data) = response {
1274 let mut response_actions =
1275 self.build_response_packet(link_id, &request_id, &response_data, rng);
1276 if response_actions.is_empty() {
1277 response_actions.extend(self.send_response_resource(
1278 link_id,
1279 &request_id,
1280 &response_data,
1281 rng,
1282 ));
1283 }
1284 actions.extend(response_actions);
1285 }
1286
1287 actions
1288 }
1289
1290 fn build_response_packet(
1293 &self,
1294 link_id: &LinkId,
1295 request_id: &[u8; 16],
1296 response_data: &[u8],
1297 rng: &mut dyn Rng,
1298 ) -> Vec<LinkManagerAction> {
1299 use rns_core::msgpack::{self, Value};
1300
1301 let response_value = msgpack::unpack_exact(response_data)
1302 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1303
1304 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1305 let response_plaintext = msgpack::pack(&response_array);
1306
1307 let mut actions = Vec::new();
1308 if let Some(link) = self.links.get(link_id) {
1309 if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1310 let flags = PacketFlags {
1311 header_type: constants::HEADER_1,
1312 context_flag: constants::FLAG_UNSET,
1313 transport_type: constants::TRANSPORT_BROADCAST,
1314 destination_type: constants::DESTINATION_LINK,
1315 packet_type: constants::PACKET_TYPE_DATA,
1316 };
1317 let max_mtu = link.engine.mtu() as usize;
1318 if let Ok(pkt) = RawPacket::pack_with_max_mtu(
1319 flags,
1320 0,
1321 link_id,
1322 None,
1323 constants::CONTEXT_RESPONSE,
1324 &encrypted,
1325 max_mtu,
1326 ) {
1327 actions.push(LinkManagerAction::SendPacket {
1328 raw: pkt.raw,
1329 dest_type: constants::DESTINATION_LINK,
1330 attached_interface: None,
1331 });
1332 }
1333 }
1334 }
1335 actions
1336 }
1337
1338 fn send_response_resource(
1339 &mut self,
1340 link_id: &LinkId,
1341 request_id: &[u8; 16],
1342 response_data: &[u8],
1343 rng: &mut dyn Rng,
1344 ) -> Vec<LinkManagerAction> {
1345 use rns_core::msgpack::{self, Value};
1346
1347 let link = match self.links.get_mut(link_id) {
1348 Some(l) => l,
1349 None => return Vec::new(),
1350 };
1351
1352 if link.engine.state() != LinkState::Active {
1353 return Vec::new();
1354 }
1355
1356 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1357 let resource_sdu = Self::resource_sdu_for_link(link);
1358 let now = time::now();
1359
1360 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1361 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1362 link.engine
1363 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1364 .unwrap_or_else(|_| plaintext.to_vec())
1365 };
1366
1367 let response_value = msgpack::unpack_exact(response_data)
1371 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1372 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1373 let resource_payload = msgpack::pack(&response_array);
1374
1375 let sender = match ResourceSender::new(
1376 &resource_payload,
1377 None,
1378 resource_sdu,
1379 &encrypt_fn,
1380 &Bzip2Compressor,
1381 rng,
1382 now,
1383 true, true, Some(request_id.to_vec()),
1386 1, 1, None, link_rtt,
1390 6.0, ) {
1392 Ok(s) => s,
1393 Err(e) => {
1394 log::debug!("Failed to create response ResourceSender: {}", e);
1395 return Vec::new();
1396 }
1397 };
1398
1399 let mut sender = sender;
1400 let adv_actions = sender.advertise(now);
1401 link.outgoing_resources.push(sender);
1402
1403 let _ = link;
1404 self.process_resource_actions(link_id, adv_actions, rng)
1405 }
1406
1407 pub fn send_management_response(
1410 &mut self,
1411 link_id: &LinkId,
1412 request_id: &[u8; 16],
1413 response_data: &[u8],
1414 rng: &mut dyn Rng,
1415 ) -> Vec<LinkManagerAction> {
1416 let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1417 if actions.is_empty() {
1418 actions.extend(self.send_response_resource(link_id, request_id, response_data, rng));
1419 }
1420 actions
1421 }
1422
1423 pub fn send_request(
1431 &self,
1432 link_id: &LinkId,
1433 path: &str,
1434 data: &[u8],
1435 rng: &mut dyn Rng,
1436 ) -> Vec<LinkManagerAction> {
1437 use rns_core::msgpack::{self, Value};
1438
1439 let link = match self.links.get(link_id) {
1440 Some(l) => l,
1441 None => return Vec::new(),
1442 };
1443
1444 if link.engine.state() != LinkState::Active {
1445 return Vec::new();
1446 }
1447
1448 let path_hash = compute_path_hash(path);
1449
1450 let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1452
1453 let request_array = Value::Array(vec![
1455 Value::Float(time::now()),
1456 Value::Bin(path_hash.to_vec()),
1457 data_value,
1458 ]);
1459 let plaintext = msgpack::pack(&request_array);
1460
1461 let encrypted = match link.engine.encrypt(&plaintext, rng) {
1462 Ok(e) => e,
1463 Err(_) => return Vec::new(),
1464 };
1465
1466 let flags = PacketFlags {
1467 header_type: constants::HEADER_1,
1468 context_flag: constants::FLAG_UNSET,
1469 transport_type: constants::TRANSPORT_BROADCAST,
1470 destination_type: constants::DESTINATION_LINK,
1471 packet_type: constants::PACKET_TYPE_DATA,
1472 };
1473
1474 let mut actions = Vec::new();
1475 if let Ok(pkt) = RawPacket::pack(
1476 flags,
1477 0,
1478 link_id,
1479 None,
1480 constants::CONTEXT_REQUEST,
1481 &encrypted,
1482 ) {
1483 actions.push(LinkManagerAction::SendPacket {
1484 raw: pkt.raw,
1485 dest_type: constants::DESTINATION_LINK,
1486 attached_interface: None,
1487 });
1488 }
1489 actions
1490 }
1491
1492 pub fn send_on_link(
1494 &self,
1495 link_id: &LinkId,
1496 plaintext: &[u8],
1497 context: u8,
1498 rng: &mut dyn Rng,
1499 ) -> Vec<LinkManagerAction> {
1500 let link = match self.links.get(link_id) {
1501 Some(l) => l,
1502 None => return Vec::new(),
1503 };
1504
1505 if link.engine.state() != LinkState::Active {
1506 return Vec::new();
1507 }
1508
1509 let encrypted = match link.engine.encrypt(plaintext, rng) {
1510 Ok(e) => e,
1511 Err(_) => return Vec::new(),
1512 };
1513
1514 let flags = PacketFlags {
1515 header_type: constants::HEADER_1,
1516 context_flag: constants::FLAG_UNSET,
1517 transport_type: constants::TRANSPORT_BROADCAST,
1518 destination_type: constants::DESTINATION_LINK,
1519 packet_type: constants::PACKET_TYPE_DATA,
1520 };
1521
1522 let mut actions = Vec::new();
1523 if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, context, &encrypted) {
1524 actions.push(LinkManagerAction::SendPacket {
1525 raw: pkt.raw,
1526 dest_type: constants::DESTINATION_LINK,
1527 attached_interface: None,
1528 });
1529 }
1530 actions
1531 }
1532
1533 pub fn identify(
1535 &self,
1536 link_id: &LinkId,
1537 identity: &rns_crypto::identity::Identity,
1538 rng: &mut dyn Rng,
1539 ) -> Vec<LinkManagerAction> {
1540 let link = match self.links.get(link_id) {
1541 Some(l) => l,
1542 None => return Vec::new(),
1543 };
1544
1545 let encrypted = match link.engine.build_identify(identity, rng) {
1546 Ok(e) => e,
1547 Err(_) => return Vec::new(),
1548 };
1549
1550 let flags = PacketFlags {
1551 header_type: constants::HEADER_1,
1552 context_flag: constants::FLAG_UNSET,
1553 transport_type: constants::TRANSPORT_BROADCAST,
1554 destination_type: constants::DESTINATION_LINK,
1555 packet_type: constants::PACKET_TYPE_DATA,
1556 };
1557
1558 let mut actions = Vec::new();
1559 if let Ok(pkt) = RawPacket::pack(
1560 flags,
1561 0,
1562 link_id,
1563 None,
1564 constants::CONTEXT_LINKIDENTIFY,
1565 &encrypted,
1566 ) {
1567 actions.push(LinkManagerAction::SendPacket {
1568 raw: pkt.raw,
1569 dest_type: constants::DESTINATION_LINK,
1570 attached_interface: None,
1571 });
1572 }
1573 actions
1574 }
1575
1576 pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1578 let link = match self.links.get_mut(link_id) {
1579 Some(l) => l,
1580 None => return Vec::new(),
1581 };
1582
1583 let teardown_actions = link.engine.teardown();
1584 if let Some(ref mut channel) = link.channel {
1585 channel.shutdown();
1586 }
1587
1588 let mut actions = self.process_link_actions(link_id, &teardown_actions);
1589
1590 let flags = PacketFlags {
1592 header_type: constants::HEADER_1,
1593 context_flag: constants::FLAG_UNSET,
1594 transport_type: constants::TRANSPORT_BROADCAST,
1595 destination_type: constants::DESTINATION_LINK,
1596 packet_type: constants::PACKET_TYPE_DATA,
1597 };
1598 if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_LINKCLOSE, &[])
1599 {
1600 actions.push(LinkManagerAction::SendPacket {
1601 raw: pkt.raw,
1602 dest_type: constants::DESTINATION_LINK,
1603 attached_interface: None,
1604 });
1605 }
1606
1607 actions
1608 }
1609
1610 pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1612 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1613 let mut actions = Vec::new();
1614 for link_id in link_ids {
1615 actions.extend(self.teardown_link(&link_id));
1616 }
1617 actions
1618 }
1619
1620 fn handle_response(&self, link_id: &LinkId, plaintext: &[u8]) -> Vec<LinkManagerAction> {
1622 use rns_core::msgpack;
1623
1624 let arr = match msgpack::unpack_exact(plaintext) {
1626 Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1627 _ => return Vec::new(),
1628 };
1629
1630 let request_id_bytes = match &arr[0] {
1631 msgpack::Value::Bin(b) if b.len() == 16 => b,
1632 _ => return Vec::new(),
1633 };
1634 let mut request_id = [0u8; 16];
1635 request_id.copy_from_slice(request_id_bytes);
1636
1637 let response_data = msgpack::pack(&arr[1]);
1638
1639 vec![LinkManagerAction::ResponseReceived {
1640 link_id: *link_id,
1641 request_id,
1642 data: response_data,
1643 }]
1644 }
1645
1646 fn handle_resource_adv(
1648 &mut self,
1649 link_id: &LinkId,
1650 adv_plaintext: &[u8],
1651 rng: &mut dyn Rng,
1652 ) -> Vec<LinkManagerAction> {
1653 let link = match self.links.get_mut(link_id) {
1654 Some(l) => l,
1655 None => return Vec::new(),
1656 };
1657
1658 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1659 let resource_sdu = Self::resource_sdu_for_link(link);
1660 let now = time::now();
1661
1662 let receiver = match ResourceReceiver::from_advertisement(
1663 adv_plaintext,
1664 resource_sdu,
1665 link_rtt,
1666 now,
1667 None,
1668 None,
1669 ) {
1670 Ok(r) => r,
1671 Err(e) => {
1672 log::debug!("Resource ADV rejected: {}", e);
1673 return Vec::new();
1674 }
1675 };
1676
1677 let strategy = link.resource_strategy;
1678 let resource_hash = receiver.resource_hash.clone();
1679 let transfer_size = receiver.transfer_size;
1680 let has_metadata = receiver.has_metadata;
1681 let is_response = receiver.flags.is_response;
1682
1683 if is_response {
1684 link.incoming_resources.push(receiver);
1687 let idx = link.incoming_resources.len() - 1;
1688 let resource_actions = link.incoming_resources[idx].accept(now);
1689 let _ = link;
1690 return self.process_resource_actions(link_id, resource_actions, rng);
1691 }
1692
1693 match strategy {
1694 ResourceStrategy::AcceptNone => {
1695 let reject_actions = {
1697 let mut r = receiver;
1698 r.reject()
1699 };
1700 self.process_resource_actions(link_id, reject_actions, rng)
1701 }
1702 ResourceStrategy::AcceptAll => {
1703 link.incoming_resources.push(receiver);
1704 let idx = link.incoming_resources.len() - 1;
1705 let resource_actions = link.incoming_resources[idx].accept(now);
1706 let _ = link;
1707 self.process_resource_actions(link_id, resource_actions, rng)
1708 }
1709 ResourceStrategy::AcceptApp => {
1710 link.incoming_resources.push(receiver);
1711 vec![LinkManagerAction::ResourceAcceptQuery {
1713 link_id: *link_id,
1714 resource_hash,
1715 transfer_size,
1716 has_metadata,
1717 }]
1718 }
1719 }
1720 }
1721
1722 pub fn accept_resource(
1724 &mut self,
1725 link_id: &LinkId,
1726 resource_hash: &[u8],
1727 accept: bool,
1728 rng: &mut dyn Rng,
1729 ) -> Vec<LinkManagerAction> {
1730 let link = match self.links.get_mut(link_id) {
1731 Some(l) => l,
1732 None => return Vec::new(),
1733 };
1734
1735 let now = time::now();
1736 let idx = link
1737 .incoming_resources
1738 .iter()
1739 .position(|r| r.resource_hash == resource_hash);
1740 let idx = match idx {
1741 Some(i) => i,
1742 None => return Vec::new(),
1743 };
1744
1745 let resource_actions = if accept {
1746 link.incoming_resources[idx].accept(now)
1747 } else {
1748 link.incoming_resources[idx].reject()
1749 };
1750
1751 let _ = link;
1752 self.process_resource_actions(link_id, resource_actions, rng)
1753 }
1754
1755 fn handle_resource_req(
1757 &mut self,
1758 link_id: &LinkId,
1759 plaintext: &[u8],
1760 rng: &mut dyn Rng,
1761 ) -> Vec<LinkManagerAction> {
1762 let link = match self.links.get_mut(link_id) {
1763 Some(l) => l,
1764 None => return Vec::new(),
1765 };
1766
1767 let now = time::now();
1768 let mut all_actions = Vec::new();
1769 for sender in &mut link.outgoing_resources {
1770 let resource_actions = sender.handle_request(plaintext, now);
1771 if !resource_actions.is_empty() {
1772 all_actions.extend(resource_actions);
1773 break;
1774 }
1775 }
1776
1777 let _ = link;
1778 self.process_resource_actions(link_id, all_actions, rng)
1779 }
1780
1781 fn handle_resource_hmu(
1783 &mut self,
1784 link_id: &LinkId,
1785 plaintext: &[u8],
1786 rng: &mut dyn Rng,
1787 ) -> Vec<LinkManagerAction> {
1788 let link = match self.links.get_mut(link_id) {
1789 Some(l) => l,
1790 None => return Vec::new(),
1791 };
1792
1793 let now = time::now();
1794 let mut all_actions = Vec::new();
1795 for receiver in &mut link.incoming_resources {
1796 let resource_actions = receiver.handle_hashmap_update(plaintext, now);
1797 if !resource_actions.is_empty() {
1798 all_actions.extend(resource_actions);
1799 break;
1800 }
1801 }
1802
1803 let _ = link;
1804 self.process_resource_actions(link_id, all_actions, rng)
1805 }
1806
1807 fn handle_resource_part(
1809 &mut self,
1810 link_id: &LinkId,
1811 raw_data: &[u8],
1812 rng: &mut dyn Rng,
1813 ) -> Vec<LinkManagerAction> {
1814 let link = match self.links.get_mut(link_id) {
1815 Some(l) => l,
1816 None => return Vec::new(),
1817 };
1818
1819 let now = time::now();
1820 let mut all_actions = Vec::new();
1821 let mut assemble_idx = None;
1822 let mut assembled_is_response = false;
1823
1824 for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
1825 let resource_actions = receiver.receive_part(raw_data, now);
1826 if !resource_actions.is_empty() {
1827 if receiver.received_count == receiver.total_parts {
1828 assemble_idx = Some(idx);
1829 }
1830 all_actions.extend(resource_actions);
1831 break;
1832 }
1833 }
1834
1835 if let Some(idx) = assemble_idx {
1836 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
1837 link.engine.decrypt(ciphertext).map_err(|_| ())
1838 };
1839 let assemble_actions =
1840 link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
1841 assembled_is_response = link.incoming_resources[idx].flags.is_response;
1842 all_actions.extend(assemble_actions);
1843 }
1844
1845 let _ = link;
1846 let mut out = self.process_resource_actions(link_id, all_actions, rng);
1847
1848 if assembled_is_response {
1849 let mut converted = Vec::new();
1850 for action in out {
1851 match action {
1852 LinkManagerAction::ResourceReceived { data, .. } => {
1853 converted.extend(self.handle_response(link_id, &data));
1854 }
1855 LinkManagerAction::ResourceAcceptQuery { .. } => {
1856 }
1858 other => converted.push(other),
1859 }
1860 }
1861 out = converted;
1862 }
1863
1864 out
1865 }
1866
1867 fn handle_resource_prf(
1869 &mut self,
1870 link_id: &LinkId,
1871 plaintext: &[u8],
1872 ) -> Vec<LinkManagerAction> {
1873 let link = match self.links.get_mut(link_id) {
1874 Some(l) => l,
1875 None => return Vec::new(),
1876 };
1877
1878 let now = time::now();
1879 let mut result_actions = Vec::new();
1880 for sender in &mut link.outgoing_resources {
1881 let resource_actions = sender.handle_proof(plaintext, now);
1882 if !resource_actions.is_empty() {
1883 result_actions.extend(resource_actions);
1884 break;
1885 }
1886 }
1887
1888 let mut actions = Vec::new();
1890 for ra in result_actions {
1891 match ra {
1892 ResourceAction::Completed => {
1893 actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
1894 }
1895 ResourceAction::Failed(e) => {
1896 actions.push(LinkManagerAction::ResourceFailed {
1897 link_id: *link_id,
1898 error: format!("{}", e),
1899 });
1900 }
1901 _ => {}
1902 }
1903 }
1904
1905 link.outgoing_resources
1907 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1908
1909 actions
1910 }
1911
1912 fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1914 let link = match self.links.get_mut(link_id) {
1915 Some(l) => l,
1916 None => return Vec::new(),
1917 };
1918
1919 let mut actions = Vec::new();
1920 for receiver in &mut link.incoming_resources {
1921 let ra = receiver.handle_cancel();
1922 for a in ra {
1923 if let ResourceAction::Failed(ref e) = a {
1924 actions.push(LinkManagerAction::ResourceFailed {
1925 link_id: *link_id,
1926 error: format!("{}", e),
1927 });
1928 }
1929 }
1930 }
1931 link.incoming_resources
1932 .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
1933 actions
1934 }
1935
1936 fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1938 let link = match self.links.get_mut(link_id) {
1939 Some(l) => l,
1940 None => return Vec::new(),
1941 };
1942
1943 let mut actions = Vec::new();
1944 for sender in &mut link.outgoing_resources {
1945 let ra = sender.handle_reject();
1946 for a in ra {
1947 if let ResourceAction::Failed(ref e) = a {
1948 actions.push(LinkManagerAction::ResourceFailed {
1949 link_id: *link_id,
1950 error: format!("{}", e),
1951 });
1952 }
1953 }
1954 }
1955 link.outgoing_resources
1956 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1957 actions
1958 }
1959
1960 fn process_resource_actions(
1962 &mut self,
1963 link_id: &LinkId,
1964 actions: Vec<ResourceAction>,
1965 rng: &mut dyn Rng,
1966 ) -> Vec<LinkManagerAction> {
1967 let mut result = Vec::new();
1968 for action in actions {
1969 match action {
1970 ResourceAction::SendAdvertisement(data) => {
1971 let encrypted = self
1973 .links
1974 .get(link_id)
1975 .and_then(|link| link.engine.encrypt(&data, rng).ok());
1976 if let Some(encrypted) = encrypted {
1977 result.extend(self.build_link_packet(
1978 link_id,
1979 constants::CONTEXT_RESOURCE_ADV,
1980 &encrypted,
1981 ));
1982 }
1983 }
1984 ResourceAction::SendPart(data) => {
1985 result.extend(self.build_link_packet(
1987 link_id,
1988 constants::CONTEXT_RESOURCE,
1989 &data,
1990 ));
1991 }
1992 ResourceAction::SendRequest(data) => {
1993 let encrypted = self
1994 .links
1995 .get(link_id)
1996 .and_then(|link| link.engine.encrypt(&data, rng).ok());
1997 if let Some(encrypted) = encrypted {
1998 result.extend(self.build_link_packet(
1999 link_id,
2000 constants::CONTEXT_RESOURCE_REQ,
2001 &encrypted,
2002 ));
2003 }
2004 }
2005 ResourceAction::SendHmu(data) => {
2006 let encrypted = self
2007 .links
2008 .get(link_id)
2009 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2010 if let Some(encrypted) = encrypted {
2011 result.extend(self.build_link_packet(
2012 link_id,
2013 constants::CONTEXT_RESOURCE_HMU,
2014 &encrypted,
2015 ));
2016 }
2017 }
2018 ResourceAction::SendProof(data) => {
2019 let encrypted = self
2020 .links
2021 .get(link_id)
2022 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2023 if let Some(encrypted) = encrypted {
2024 result.extend(self.build_link_packet(
2025 link_id,
2026 constants::CONTEXT_RESOURCE_PRF,
2027 &encrypted,
2028 ));
2029 }
2030 }
2031 ResourceAction::SendCancelInitiator(data) => {
2032 let encrypted = self
2033 .links
2034 .get(link_id)
2035 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2036 if let Some(encrypted) = encrypted {
2037 result.extend(self.build_link_packet(
2038 link_id,
2039 constants::CONTEXT_RESOURCE_ICL,
2040 &encrypted,
2041 ));
2042 }
2043 }
2044 ResourceAction::SendCancelReceiver(data) => {
2045 let encrypted = self
2046 .links
2047 .get(link_id)
2048 .and_then(|link| link.engine.encrypt(&data, rng).ok());
2049 if let Some(encrypted) = encrypted {
2050 result.extend(self.build_link_packet(
2051 link_id,
2052 constants::CONTEXT_RESOURCE_RCL,
2053 &encrypted,
2054 ));
2055 }
2056 }
2057 ResourceAction::DataReceived { data, metadata } => {
2058 result.push(LinkManagerAction::ResourceReceived {
2059 link_id: *link_id,
2060 data,
2061 metadata,
2062 });
2063 }
2064 ResourceAction::Completed => {
2065 result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2066 }
2067 ResourceAction::Failed(e) => {
2068 result.push(LinkManagerAction::ResourceFailed {
2069 link_id: *link_id,
2070 error: format!("{}", e),
2071 });
2072 }
2073 ResourceAction::TeardownLink => {
2074 let teardown_actions = match self.links.get_mut(link_id) {
2075 Some(link) => link.engine.handle_teardown(),
2076 None => Vec::new(),
2077 };
2078 result.extend(self.process_link_actions(link_id, &teardown_actions));
2079 }
2080 ResourceAction::ProgressUpdate { received, total } => {
2081 result.push(LinkManagerAction::ResourceProgress {
2082 link_id: *link_id,
2083 received,
2084 total,
2085 });
2086 }
2087 }
2088 }
2089 result
2090 }
2091
2092 fn build_link_packet(
2094 &self,
2095 link_id: &LinkId,
2096 context: u8,
2097 data: &[u8],
2098 ) -> Vec<LinkManagerAction> {
2099 let flags = PacketFlags {
2100 header_type: constants::HEADER_1,
2101 context_flag: constants::FLAG_UNSET,
2102 transport_type: constants::TRANSPORT_BROADCAST,
2103 destination_type: constants::DESTINATION_LINK,
2104 packet_type: constants::PACKET_TYPE_DATA,
2105 };
2106 let mut actions = Vec::new();
2107 let max_mtu = self
2108 .links
2109 .get(link_id)
2110 .map(|l| l.engine.mtu() as usize)
2111 .unwrap_or(constants::MTU);
2112 if let Ok(pkt) =
2113 RawPacket::pack_with_max_mtu(flags, 0, link_id, None, context, data, max_mtu)
2114 {
2115 actions.push(LinkManagerAction::SendPacket {
2116 raw: pkt.raw,
2117 dest_type: constants::DESTINATION_LINK,
2118 attached_interface: None,
2119 });
2120 }
2121 actions
2122 }
2123
2124 pub fn send_resource(
2126 &mut self,
2127 link_id: &LinkId,
2128 data: &[u8],
2129 metadata: Option<&[u8]>,
2130 rng: &mut dyn Rng,
2131 ) -> Vec<LinkManagerAction> {
2132 let link = match self.links.get_mut(link_id) {
2133 Some(l) => l,
2134 None => return Vec::new(),
2135 };
2136
2137 if link.engine.state() != LinkState::Active {
2138 return Vec::new();
2139 }
2140
2141 let link_rtt = link.engine.rtt().unwrap_or(1.0);
2142 let resource_sdu = Self::resource_sdu_for_link(link);
2143 let now = time::now();
2144
2145 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
2148 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
2149 link.engine
2150 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
2151 .unwrap_or_else(|_| plaintext.to_vec())
2152 };
2153
2154 let sender = match ResourceSender::new(
2155 data,
2156 metadata,
2157 resource_sdu,
2158 &encrypt_fn,
2159 &Bzip2Compressor,
2160 rng,
2161 now,
2162 true, false, None, 1, 1, None, link_rtt,
2169 6.0, ) {
2171 Ok(s) => s,
2172 Err(e) => {
2173 log::debug!("Failed to create ResourceSender: {}", e);
2174 return Vec::new();
2175 }
2176 };
2177
2178 let mut sender = sender;
2179 let adv_actions = sender.advertise(now);
2180 link.outgoing_resources.push(sender);
2181
2182 let _ = link;
2183 self.process_resource_actions(link_id, adv_actions, rng)
2184 }
2185
2186 pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2188 if let Some(link) = self.links.get_mut(link_id) {
2189 link.resource_strategy = strategy;
2190 }
2191 }
2192
2193 pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2196 if let Some(link) = self.links.get_mut(link_id) {
2197 if let Some(ref mut channel) = link.channel {
2198 channel.flush_tx();
2199 }
2200 }
2201 }
2202
2203 pub fn send_channel_message(
2205 &mut self,
2206 link_id: &LinkId,
2207 msgtype: u16,
2208 payload: &[u8],
2209 rng: &mut dyn Rng,
2210 ) -> Result<Vec<LinkManagerAction>, String> {
2211 let link = match self.links.get_mut(link_id) {
2212 Some(l) => l,
2213 None => return Err("unknown link".to_string()),
2214 };
2215
2216 let channel = match link.channel {
2217 Some(ref mut ch) => ch,
2218 None => return Err("link has no active channel".to_string()),
2219 };
2220
2221 let link_mdu = link.engine.mdu();
2222 let now = time::now();
2223 let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2224 Ok(a) => {
2225 link.channel_send_ok += 1;
2226 a
2227 }
2228 Err(e) => {
2229 log::debug!("Channel send failed: {:?}", e);
2230 match e {
2231 rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2232 rns_core::channel::ChannelError::MessageTooBig => {
2233 link.channel_send_too_big += 1;
2234 }
2235 rns_core::channel::ChannelError::InvalidEnvelope => {
2236 link.channel_send_other_error += 1;
2237 }
2238 }
2239 return Err(e.to_string());
2240 }
2241 };
2242
2243 let _ = link;
2244 Ok(self.process_channel_actions(link_id, chan_actions, rng))
2245 }
2246
2247 pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2249 let now = time::now();
2250 let mut all_actions = Vec::new();
2251
2252 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2254
2255 for link_id in &link_ids {
2256 let link = match self.links.get_mut(link_id) {
2257 Some(l) => l,
2258 None => continue,
2259 };
2260
2261 let tick_actions = link.engine.tick(now);
2263 all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2264
2265 let link = match self.links.get_mut(link_id) {
2267 Some(l) => l,
2268 None => continue,
2269 };
2270 if link.engine.needs_keepalive(now) {
2271 let flags = PacketFlags {
2273 header_type: constants::HEADER_1,
2274 context_flag: constants::FLAG_UNSET,
2275 transport_type: constants::TRANSPORT_BROADCAST,
2276 destination_type: constants::DESTINATION_LINK,
2277 packet_type: constants::PACKET_TYPE_DATA,
2278 };
2279 if let Ok(pkt) =
2280 RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_KEEPALIVE, &[])
2281 {
2282 all_actions.push(LinkManagerAction::SendPacket {
2283 raw: pkt.raw,
2284 dest_type: constants::DESTINATION_LINK,
2285 attached_interface: None,
2286 });
2287 link.engine.record_outbound(now, true);
2288 }
2289 }
2290
2291 if let Some(channel) = link.channel.as_mut() {
2292 let chan_actions = channel.tick(now);
2293 let _ = channel;
2294 let _ = link;
2295 all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2296 }
2297 }
2298
2299 for link_id in &link_ids {
2301 let link = match self.links.get_mut(link_id) {
2302 Some(l) => l,
2303 None => continue,
2304 };
2305
2306 let mut sender_actions = Vec::new();
2308 for sender in &mut link.outgoing_resources {
2309 sender_actions.extend(sender.tick(now));
2310 }
2311
2312 let mut receiver_actions = Vec::new();
2314 for receiver in &mut link.incoming_resources {
2315 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2316 link.engine.decrypt(ciphertext).map_err(|_| ())
2317 };
2318 receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2319 }
2320
2321 link.outgoing_resources
2323 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2324 link.incoming_resources
2325 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2326
2327 let _ = link;
2328 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2329 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2330 }
2331
2332 let closed: Vec<LinkId> = self
2334 .links
2335 .iter()
2336 .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2337 .map(|(id, _)| *id)
2338 .collect();
2339 for id in closed {
2340 self.links.remove(&id);
2341 all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2342 }
2343
2344 all_actions
2345 }
2346
2347 pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2349 self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2350 }
2351
2352 pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2354 self.links.get(link_id).map(|l| l.engine.state())
2355 }
2356
2357 pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2359 self.links.get(link_id).and_then(|l| l.engine.rtt())
2360 }
2361
2362 pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2364 if let Some(link) = self.links.get_mut(link_id) {
2365 link.engine.set_rtt(rtt);
2366 }
2367 }
2368
2369 pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2371 if let Some(link) = self.links.get_mut(link_id) {
2372 link.engine.record_inbound(time::now());
2373 }
2374 }
2375
2376 pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2378 if let Some(link) = self.links.get_mut(link_id) {
2379 link.engine.set_mtu(mtu);
2380 }
2381 }
2382
2383 pub fn link_count(&self) -> usize {
2385 self.links.len()
2386 }
2387
2388 pub fn resource_transfer_count(&self) -> usize {
2390 self.links
2391 .values()
2392 .map(|managed| managed.incoming_resources.len() + managed.outgoing_resources.len())
2393 .sum()
2394 }
2395
2396 pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2398 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2399 let mut all_actions = Vec::new();
2400
2401 for link_id in &link_ids {
2402 let link = match self.links.get_mut(link_id) {
2403 Some(l) => l,
2404 None => continue,
2405 };
2406
2407 let mut sender_actions = Vec::new();
2408 for sender in &mut link.outgoing_resources {
2409 sender_actions.extend(sender.cancel());
2410 }
2411
2412 let mut receiver_actions = Vec::new();
2413 for receiver in &mut link.incoming_resources {
2414 receiver_actions.extend(receiver.reject());
2415 }
2416
2417 link.outgoing_resources
2418 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2419 link.incoming_resources
2420 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2421
2422 let _ = link;
2423 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2424 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2425 }
2426
2427 all_actions
2428 }
2429
2430 pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
2432 self.links
2433 .iter()
2434 .map(|(link_id, managed)| {
2435 let state = match managed.engine.state() {
2436 LinkState::Pending => "pending",
2437 LinkState::Handshake => "handshake",
2438 LinkState::Active => "active",
2439 LinkState::Stale => "stale",
2440 LinkState::Closed => "closed",
2441 };
2442 crate::event::LinkInfoEntry {
2443 link_id: *link_id,
2444 state: state.to_string(),
2445 is_initiator: managed.engine.is_initiator(),
2446 dest_hash: managed.dest_hash,
2447 remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
2448 rtt: managed.engine.rtt(),
2449 channel_window: managed.channel.as_ref().map(|c| c.window()),
2450 channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
2451 pending_channel_packets: managed.pending_channel_packets.len(),
2452 channel_send_ok: managed.channel_send_ok,
2453 channel_send_not_ready: managed.channel_send_not_ready,
2454 channel_send_too_big: managed.channel_send_too_big,
2455 channel_send_other_error: managed.channel_send_other_error,
2456 channel_messages_received: managed.channel_messages_received,
2457 channel_proofs_sent: managed.channel_proofs_sent,
2458 channel_proofs_received: managed.channel_proofs_received,
2459 }
2460 })
2461 .collect()
2462 }
2463
2464 pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
2466 let mut entries = Vec::new();
2467 for (link_id, managed) in &self.links {
2468 for recv in &managed.incoming_resources {
2469 let (received, total) = recv.progress();
2470 entries.push(crate::event::ResourceInfoEntry {
2471 link_id: *link_id,
2472 direction: "incoming".to_string(),
2473 total_parts: total,
2474 transferred_parts: received,
2475 complete: received >= total && total > 0,
2476 });
2477 }
2478 for send in &managed.outgoing_resources {
2479 let total = send.total_parts();
2480 let sent = send.sent_parts;
2481 entries.push(crate::event::ResourceInfoEntry {
2482 link_id: *link_id,
2483 direction: "outgoing".to_string(),
2484 total_parts: total,
2485 transferred_parts: sent,
2486 complete: sent >= total && total > 0,
2487 });
2488 }
2489 }
2490 entries
2491 }
2492
2493 fn process_link_actions(
2495 &self,
2496 link_id: &LinkId,
2497 actions: &[LinkAction],
2498 ) -> Vec<LinkManagerAction> {
2499 let mut result = Vec::new();
2500 for action in actions {
2501 match action {
2502 LinkAction::StateChanged {
2503 new_state, reason, ..
2504 } => match new_state {
2505 LinkState::Closed => {
2506 result.push(LinkManagerAction::LinkClosed {
2507 link_id: *link_id,
2508 reason: *reason,
2509 });
2510 }
2511 _ => {}
2512 },
2513 LinkAction::LinkEstablished {
2514 rtt, is_initiator, ..
2515 } => {
2516 let dest_hash = self
2517 .links
2518 .get(link_id)
2519 .map(|l| l.dest_hash)
2520 .unwrap_or([0u8; 16]);
2521 result.push(LinkManagerAction::LinkEstablished {
2522 link_id: *link_id,
2523 dest_hash,
2524 rtt: *rtt,
2525 is_initiator: *is_initiator,
2526 });
2527 }
2528 LinkAction::RemoteIdentified {
2529 identity_hash,
2530 public_key,
2531 ..
2532 } => {
2533 result.push(LinkManagerAction::RemoteIdentified {
2534 link_id: *link_id,
2535 identity_hash: *identity_hash,
2536 public_key: *public_key,
2537 });
2538 }
2539 LinkAction::DataReceived { .. } => {
2540 }
2542 }
2543 }
2544 result
2545 }
2546
2547 fn process_channel_actions(
2549 &mut self,
2550 link_id: &LinkId,
2551 actions: Vec<rns_core::channel::ChannelAction>,
2552 rng: &mut dyn Rng,
2553 ) -> Vec<LinkManagerAction> {
2554 let mut result = Vec::new();
2555 for action in actions {
2556 match action {
2557 rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
2558 let encrypted = match self.links.get(link_id) {
2560 Some(link) => match link.engine.encrypt(&raw, rng) {
2561 Ok(encrypted) => encrypted,
2562 Err(_) => continue,
2563 },
2564 None => continue,
2565 };
2566 let flags = PacketFlags {
2567 header_type: constants::HEADER_1,
2568 context_flag: constants::FLAG_UNSET,
2569 transport_type: constants::TRANSPORT_BROADCAST,
2570 destination_type: constants::DESTINATION_LINK,
2571 packet_type: constants::PACKET_TYPE_DATA,
2572 };
2573 if let Ok(pkt) = RawPacket::pack(
2574 flags,
2575 0,
2576 link_id,
2577 None,
2578 constants::CONTEXT_CHANNEL,
2579 &encrypted,
2580 ) {
2581 if let Some(link_mut) = self.links.get_mut(link_id) {
2582 link_mut
2583 .pending_channel_packets
2584 .insert(pkt.packet_hash, sequence);
2585 }
2586 result.push(LinkManagerAction::SendPacket {
2587 raw: pkt.raw,
2588 dest_type: constants::DESTINATION_LINK,
2589 attached_interface: None,
2590 });
2591 }
2592 }
2593 rns_core::channel::ChannelAction::MessageReceived {
2594 msgtype, payload, ..
2595 } => {
2596 result.push(LinkManagerAction::ChannelMessageReceived {
2597 link_id: *link_id,
2598 msgtype,
2599 payload,
2600 });
2601 }
2602 rns_core::channel::ChannelAction::TeardownLink => {
2603 result.push(LinkManagerAction::LinkClosed {
2604 link_id: *link_id,
2605 reason: Some(TeardownReason::Timeout),
2606 });
2607 }
2608 }
2609 }
2610 result
2611 }
2612}
2613
2614fn compute_path_hash(path: &str) -> [u8; 16] {
2617 let full = rns_core::hash::full_hash(path.as_bytes());
2618 let mut result = [0u8; 16];
2619 result.copy_from_slice(&full[..16]);
2620 result
2621}
2622
2623#[cfg(test)]
2624mod tests {
2625 use super::*;
2626 use rns_crypto::identity::Identity;
2627 use rns_crypto::{FixedRng, OsRng};
2628
2629 fn make_rng(seed: u8) -> FixedRng {
2630 FixedRng::new(&[seed; 128])
2631 }
2632
2633 fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
2634 let sig_prv = Ed25519PrivateKey::generate(rng);
2635 let sig_pub_bytes = sig_prv.public_key().public_bytes();
2636 (sig_prv, sig_pub_bytes)
2637 }
2638
2639 #[test]
2640 fn test_register_link_destination() {
2641 let mut mgr = LinkManager::new();
2642 let mut rng = make_rng(0x01);
2643 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2644 let dest_hash = [0xDD; 16];
2645
2646 mgr.register_link_destination(
2647 dest_hash,
2648 sig_prv,
2649 sig_pub_bytes,
2650 ResourceStrategy::AcceptNone,
2651 );
2652 assert!(mgr.is_link_destination(&dest_hash));
2653
2654 mgr.deregister_link_destination(&dest_hash);
2655 assert!(!mgr.is_link_destination(&dest_hash));
2656 }
2657
2658 #[test]
2659 fn test_create_link() {
2660 let mut mgr = LinkManager::new();
2661 let mut rng = OsRng;
2662 let dest_hash = [0xDD; 16];
2663
2664 let sig_pub_bytes = [0xAA; 32]; let (link_id, actions) = mgr.create_link(
2666 &dest_hash,
2667 &sig_pub_bytes,
2668 1,
2669 constants::MTU as u32,
2670 &mut rng,
2671 );
2672 assert_ne!(link_id, [0u8; 16]);
2673 assert_eq!(actions.len(), 2);
2675 assert!(matches!(
2676 actions[0],
2677 LinkManagerAction::RegisterLinkDest { .. }
2678 ));
2679 assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
2680
2681 assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
2683 }
2684
2685 #[test]
2686 fn test_full_handshake_via_manager() {
2687 let mut rng = OsRng;
2688 let dest_hash = [0xDD; 16];
2689
2690 let mut responder_mgr = LinkManager::new();
2692 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2693 responder_mgr.register_link_destination(
2694 dest_hash,
2695 sig_prv,
2696 sig_pub_bytes,
2697 ResourceStrategy::AcceptNone,
2698 );
2699
2700 let mut initiator_mgr = LinkManager::new();
2702
2703 let (link_id, init_actions) = initiator_mgr.create_link(
2705 &dest_hash,
2706 &sig_pub_bytes,
2707 1,
2708 constants::MTU as u32,
2709 &mut rng,
2710 );
2711 assert_eq!(init_actions.len(), 2);
2712
2713 let linkrequest_raw = match &init_actions[1] {
2715 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2716 _ => panic!("Expected SendPacket"),
2717 };
2718
2719 let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
2721
2722 let resp_actions = responder_mgr.handle_local_delivery(
2724 lr_packet.destination_hash,
2725 &linkrequest_raw,
2726 lr_packet.packet_hash,
2727 rns_core::transport::types::InterfaceId(0),
2728 &mut rng,
2729 );
2730 assert!(resp_actions.len() >= 2);
2732 assert!(matches!(
2733 resp_actions[0],
2734 LinkManagerAction::RegisterLinkDest { .. }
2735 ));
2736
2737 let lrproof_raw = match &resp_actions[1] {
2739 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2740 _ => panic!("Expected SendPacket for LRPROOF"),
2741 };
2742
2743 let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
2745 let init_actions2 = initiator_mgr.handle_local_delivery(
2746 lrproof_packet.destination_hash,
2747 &lrproof_raw,
2748 lrproof_packet.packet_hash,
2749 rns_core::transport::types::InterfaceId(0),
2750 &mut rng,
2751 );
2752
2753 let has_established = init_actions2
2755 .iter()
2756 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2757 assert!(has_established, "Initiator should emit LinkEstablished");
2758
2759 let lrrtt_raw = init_actions2
2761 .iter()
2762 .find_map(|a| match a {
2763 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
2764 _ => None,
2765 })
2766 .expect("Should have LRRTT SendPacket");
2767
2768 let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
2770 let resp_link_id = lrrtt_packet.destination_hash;
2771 let resp_actions2 = responder_mgr.handle_local_delivery(
2772 resp_link_id,
2773 &lrrtt_raw,
2774 lrrtt_packet.packet_hash,
2775 rns_core::transport::types::InterfaceId(0),
2776 &mut rng,
2777 );
2778
2779 let has_established = resp_actions2
2780 .iter()
2781 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2782 assert!(has_established, "Responder should emit LinkEstablished");
2783
2784 assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
2786 assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
2787
2788 assert!(initiator_mgr.link_rtt(&link_id).is_some());
2790 assert!(responder_mgr.link_rtt(&link_id).is_some());
2791 }
2792
2793 #[test]
2794 fn test_encrypted_data_exchange() {
2795 let mut rng = OsRng;
2796 let dest_hash = [0xDD; 16];
2797 let mut resp_mgr = LinkManager::new();
2798 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2799 resp_mgr.register_link_destination(
2800 dest_hash,
2801 sig_prv,
2802 sig_pub_bytes,
2803 ResourceStrategy::AcceptNone,
2804 );
2805 let mut init_mgr = LinkManager::new();
2806
2807 let (link_id, init_actions) = init_mgr.create_link(
2809 &dest_hash,
2810 &sig_pub_bytes,
2811 1,
2812 constants::MTU as u32,
2813 &mut rng,
2814 );
2815 let lr_raw = extract_send_packet(&init_actions);
2816 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2817 let resp_actions = resp_mgr.handle_local_delivery(
2818 lr_pkt.destination_hash,
2819 &lr_raw,
2820 lr_pkt.packet_hash,
2821 rns_core::transport::types::InterfaceId(0),
2822 &mut rng,
2823 );
2824 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2825 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2826 let init_actions2 = init_mgr.handle_local_delivery(
2827 lrproof_pkt.destination_hash,
2828 &lrproof_raw,
2829 lrproof_pkt.packet_hash,
2830 rns_core::transport::types::InterfaceId(0),
2831 &mut rng,
2832 );
2833 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2834 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2835 resp_mgr.handle_local_delivery(
2836 lrrtt_pkt.destination_hash,
2837 &lrrtt_raw,
2838 lrrtt_pkt.packet_hash,
2839 rns_core::transport::types::InterfaceId(0),
2840 &mut rng,
2841 );
2842
2843 let actions =
2845 init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
2846 assert_eq!(actions.len(), 1);
2847 assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
2848 }
2849
2850 #[test]
2851 fn test_request_response() {
2852 let mut rng = OsRng;
2853 let dest_hash = [0xDD; 16];
2854 let mut resp_mgr = LinkManager::new();
2855 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2856 resp_mgr.register_link_destination(
2857 dest_hash,
2858 sig_prv,
2859 sig_pub_bytes,
2860 ResourceStrategy::AcceptNone,
2861 );
2862
2863 resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
2865 Some(b"OK".to_vec())
2866 });
2867
2868 let mut init_mgr = LinkManager::new();
2869
2870 let (link_id, init_actions) = init_mgr.create_link(
2872 &dest_hash,
2873 &sig_pub_bytes,
2874 1,
2875 constants::MTU as u32,
2876 &mut rng,
2877 );
2878 let lr_raw = extract_send_packet(&init_actions);
2879 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2880 let resp_actions = resp_mgr.handle_local_delivery(
2881 lr_pkt.destination_hash,
2882 &lr_raw,
2883 lr_pkt.packet_hash,
2884 rns_core::transport::types::InterfaceId(0),
2885 &mut rng,
2886 );
2887 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2888 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2889 let init_actions2 = init_mgr.handle_local_delivery(
2890 lrproof_pkt.destination_hash,
2891 &lrproof_raw,
2892 lrproof_pkt.packet_hash,
2893 rns_core::transport::types::InterfaceId(0),
2894 &mut rng,
2895 );
2896 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2897 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2898 resp_mgr.handle_local_delivery(
2899 lrrtt_pkt.destination_hash,
2900 &lrrtt_raw,
2901 lrrtt_pkt.packet_hash,
2902 rns_core::transport::types::InterfaceId(0),
2903 &mut rng,
2904 );
2905
2906 let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
2908 assert_eq!(req_actions.len(), 1);
2909
2910 let req_raw = extract_send_packet_from(&req_actions);
2912 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2913 let resp_actions = resp_mgr.handle_local_delivery(
2914 req_pkt.destination_hash,
2915 &req_raw,
2916 req_pkt.packet_hash,
2917 rns_core::transport::types::InterfaceId(0),
2918 &mut rng,
2919 );
2920
2921 let has_response = resp_actions
2923 .iter()
2924 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2925 assert!(has_response, "Handler should produce a response packet");
2926 }
2927
2928 #[test]
2929 fn test_request_acl_deny_unidentified() {
2930 let mut rng = OsRng;
2931 let dest_hash = [0xDD; 16];
2932 let mut resp_mgr = LinkManager::new();
2933 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2934 resp_mgr.register_link_destination(
2935 dest_hash,
2936 sig_prv,
2937 sig_pub_bytes,
2938 ResourceStrategy::AcceptNone,
2939 );
2940
2941 resp_mgr.register_request_handler(
2943 "/restricted",
2944 Some(vec![[0xAA; 16]]),
2945 |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
2946 );
2947
2948 let mut init_mgr = LinkManager::new();
2949
2950 let (link_id, init_actions) = init_mgr.create_link(
2952 &dest_hash,
2953 &sig_pub_bytes,
2954 1,
2955 constants::MTU as u32,
2956 &mut rng,
2957 );
2958 let lr_raw = extract_send_packet(&init_actions);
2959 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2960 let resp_actions = resp_mgr.handle_local_delivery(
2961 lr_pkt.destination_hash,
2962 &lr_raw,
2963 lr_pkt.packet_hash,
2964 rns_core::transport::types::InterfaceId(0),
2965 &mut rng,
2966 );
2967 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2968 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2969 let init_actions2 = init_mgr.handle_local_delivery(
2970 lrproof_pkt.destination_hash,
2971 &lrproof_raw,
2972 lrproof_pkt.packet_hash,
2973 rns_core::transport::types::InterfaceId(0),
2974 &mut rng,
2975 );
2976 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2977 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2978 resp_mgr.handle_local_delivery(
2979 lrrtt_pkt.destination_hash,
2980 &lrrtt_raw,
2981 lrrtt_pkt.packet_hash,
2982 rns_core::transport::types::InterfaceId(0),
2983 &mut rng,
2984 );
2985
2986 let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
2988 let req_raw = extract_send_packet_from(&req_actions);
2989 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2990 let resp_actions = resp_mgr.handle_local_delivery(
2991 req_pkt.destination_hash,
2992 &req_raw,
2993 req_pkt.packet_hash,
2994 rns_core::transport::types::InterfaceId(0),
2995 &mut rng,
2996 );
2997
2998 let has_response = resp_actions
3000 .iter()
3001 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3002 assert!(!has_response, "Unidentified peer should be denied");
3003 }
3004
3005 #[test]
3006 fn test_teardown_link() {
3007 let mut rng = OsRng;
3008 let dest_hash = [0xDD; 16];
3009 let mut mgr = LinkManager::new();
3010
3011 let dummy_sig = [0xAA; 32];
3012 let (link_id, _) =
3013 mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
3014 assert_eq!(mgr.link_count(), 1);
3015
3016 let actions = mgr.teardown_link(&link_id);
3017 let has_close = actions
3018 .iter()
3019 .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
3020 assert!(has_close);
3021
3022 let tick_actions = mgr.tick(&mut rng);
3024 let has_deregister = tick_actions
3025 .iter()
3026 .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3027 assert!(has_deregister);
3028 assert_eq!(mgr.link_count(), 0);
3029 }
3030
3031 #[test]
3032 fn test_identify_on_link() {
3033 let mut rng = OsRng;
3034 let dest_hash = [0xDD; 16];
3035 let mut resp_mgr = LinkManager::new();
3036 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3037 resp_mgr.register_link_destination(
3038 dest_hash,
3039 sig_prv,
3040 sig_pub_bytes,
3041 ResourceStrategy::AcceptNone,
3042 );
3043 let mut init_mgr = LinkManager::new();
3044
3045 let (link_id, init_actions) = init_mgr.create_link(
3047 &dest_hash,
3048 &sig_pub_bytes,
3049 1,
3050 constants::MTU as u32,
3051 &mut rng,
3052 );
3053 let lr_raw = extract_send_packet(&init_actions);
3054 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3055 let resp_actions = resp_mgr.handle_local_delivery(
3056 lr_pkt.destination_hash,
3057 &lr_raw,
3058 lr_pkt.packet_hash,
3059 rns_core::transport::types::InterfaceId(0),
3060 &mut rng,
3061 );
3062 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3063 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3064 let init_actions2 = init_mgr.handle_local_delivery(
3065 lrproof_pkt.destination_hash,
3066 &lrproof_raw,
3067 lrproof_pkt.packet_hash,
3068 rns_core::transport::types::InterfaceId(0),
3069 &mut rng,
3070 );
3071 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3072 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3073 resp_mgr.handle_local_delivery(
3074 lrrtt_pkt.destination_hash,
3075 &lrrtt_raw,
3076 lrrtt_pkt.packet_hash,
3077 rns_core::transport::types::InterfaceId(0),
3078 &mut rng,
3079 );
3080
3081 let identity = Identity::new(&mut rng);
3083 let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3084 assert_eq!(id_actions.len(), 1);
3085
3086 let id_raw = extract_send_packet_from(&id_actions);
3088 let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3089 let resp_actions = resp_mgr.handle_local_delivery(
3090 id_pkt.destination_hash,
3091 &id_raw,
3092 id_pkt.packet_hash,
3093 rns_core::transport::types::InterfaceId(0),
3094 &mut rng,
3095 );
3096
3097 let has_identified = resp_actions
3098 .iter()
3099 .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3100 assert!(has_identified, "Responder should emit RemoteIdentified");
3101 }
3102
3103 #[test]
3104 fn test_path_hash_computation() {
3105 let h1 = compute_path_hash("/status");
3106 let h2 = compute_path_hash("/path");
3107 assert_ne!(h1, h2);
3108
3109 assert_eq!(h1, compute_path_hash("/status"));
3111 }
3112
3113 #[test]
3114 fn test_link_count() {
3115 let mut mgr = LinkManager::new();
3116 let mut rng = OsRng;
3117
3118 assert_eq!(mgr.link_count(), 0);
3119
3120 let dummy_sig = [0xAA; 32];
3121 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3122 assert_eq!(mgr.link_count(), 1);
3123
3124 mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3125 assert_eq!(mgr.link_count(), 2);
3126 }
3127
3128 fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3131 extract_send_packet_at(actions, actions.len() - 1)
3132 }
3133
3134 fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3135 match &actions[idx] {
3136 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3137 other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3138 }
3139 }
3140
3141 fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3142 actions
3143 .iter()
3144 .find_map(|a| match a {
3145 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3146 _ => None,
3147 })
3148 .expect("Expected at least one SendPacket action")
3149 }
3150
3151 fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3152 extract_any_send_packet(actions)
3153 }
3154
3155 fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3158 let mut rng = OsRng;
3159 let dest_hash = [0xDD; 16];
3160 let mut resp_mgr = LinkManager::new();
3161 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3162 resp_mgr.register_link_destination(
3163 dest_hash,
3164 sig_prv,
3165 sig_pub_bytes,
3166 ResourceStrategy::AcceptNone,
3167 );
3168 let mut init_mgr = LinkManager::new();
3169
3170 let (link_id, init_actions) = init_mgr.create_link(
3171 &dest_hash,
3172 &sig_pub_bytes,
3173 1,
3174 constants::MTU as u32,
3175 &mut rng,
3176 );
3177 let lr_raw = extract_send_packet(&init_actions);
3178 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3179 let resp_actions = resp_mgr.handle_local_delivery(
3180 lr_pkt.destination_hash,
3181 &lr_raw,
3182 lr_pkt.packet_hash,
3183 rns_core::transport::types::InterfaceId(0),
3184 &mut rng,
3185 );
3186 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3187 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3188 let init_actions2 = init_mgr.handle_local_delivery(
3189 lrproof_pkt.destination_hash,
3190 &lrproof_raw,
3191 lrproof_pkt.packet_hash,
3192 rns_core::transport::types::InterfaceId(0),
3193 &mut rng,
3194 );
3195 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3196 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3197 resp_mgr.handle_local_delivery(
3198 lrrtt_pkt.destination_hash,
3199 &lrrtt_raw,
3200 lrrtt_pkt.packet_hash,
3201 rns_core::transport::types::InterfaceId(0),
3202 &mut rng,
3203 );
3204
3205 assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3206 assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3207
3208 (init_mgr, resp_mgr, link_id)
3209 }
3210
3211 #[test]
3216 fn test_resource_strategy_default() {
3217 let mut mgr = LinkManager::new();
3218 let mut rng = OsRng;
3219 let dummy_sig = [0xAA; 32];
3220 let (link_id, _) =
3221 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3222
3223 let link = mgr.links.get(&link_id).unwrap();
3225 assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3226 }
3227
3228 #[test]
3229 fn test_set_resource_strategy() {
3230 let mut mgr = LinkManager::new();
3231 let mut rng = OsRng;
3232 let dummy_sig = [0xAA; 32];
3233 let (link_id, _) =
3234 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3235
3236 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3237 assert_eq!(
3238 mgr.links.get(&link_id).unwrap().resource_strategy,
3239 ResourceStrategy::AcceptAll
3240 );
3241
3242 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3243 assert_eq!(
3244 mgr.links.get(&link_id).unwrap().resource_strategy,
3245 ResourceStrategy::AcceptApp
3246 );
3247 }
3248
3249 #[test]
3250 fn test_send_resource_on_active_link() {
3251 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3252 let mut rng = OsRng;
3253
3254 let data = vec![0xAB; 100]; let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3257
3258 let has_send = actions
3260 .iter()
3261 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3262 assert!(
3263 has_send,
3264 "send_resource should emit advertisement SendPacket"
3265 );
3266 }
3267
3268 #[test]
3269 fn test_send_resource_on_inactive_link() {
3270 let mut mgr = LinkManager::new();
3271 let mut rng = OsRng;
3272 let dummy_sig = [0xAA; 32];
3273 let (link_id, _) =
3274 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3275
3276 let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
3278 assert!(actions.is_empty(), "Cannot send resource on inactive link");
3279 }
3280
3281 #[test]
3282 fn test_resource_adv_rejected_by_accept_none() {
3283 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3284 let mut rng = OsRng;
3285
3286 let data = vec![0xCD; 100];
3289 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3290
3291 for action in &adv_actions {
3293 if let LinkManagerAction::SendPacket { raw, .. } = action {
3294 let pkt = RawPacket::unpack(raw).unwrap();
3295 let resp_actions = resp_mgr.handle_local_delivery(
3296 pkt.destination_hash,
3297 raw,
3298 pkt.packet_hash,
3299 rns_core::transport::types::InterfaceId(0),
3300 &mut rng,
3301 );
3302 let has_resource_received = resp_actions
3304 .iter()
3305 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3306 assert!(
3307 !has_resource_received,
3308 "AcceptNone should not accept resource"
3309 );
3310 }
3311 }
3312 }
3313
3314 #[test]
3315 fn test_resource_adv_accepted_by_accept_all() {
3316 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3317 let mut rng = OsRng;
3318
3319 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3321
3322 let data = vec![0xCD; 100];
3324 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3325
3326 for action in &adv_actions {
3328 if let LinkManagerAction::SendPacket { raw, .. } = action {
3329 let pkt = RawPacket::unpack(raw).unwrap();
3330 let resp_actions = resp_mgr.handle_local_delivery(
3331 pkt.destination_hash,
3332 raw,
3333 pkt.packet_hash,
3334 rns_core::transport::types::InterfaceId(0),
3335 &mut rng,
3336 );
3337 let has_send = resp_actions
3339 .iter()
3340 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3341 assert!(has_send, "AcceptAll should accept and request parts");
3342 }
3343 }
3344 }
3345
3346 #[test]
3347 fn test_resource_accept_app_query() {
3348 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3349 let mut rng = OsRng;
3350
3351 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3353
3354 let data = vec![0xCD; 100];
3356 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3357
3358 let mut got_query = false;
3360 for action in &adv_actions {
3361 if let LinkManagerAction::SendPacket { raw, .. } = action {
3362 let pkt = RawPacket::unpack(raw).unwrap();
3363 let resp_actions = resp_mgr.handle_local_delivery(
3364 pkt.destination_hash,
3365 raw,
3366 pkt.packet_hash,
3367 rns_core::transport::types::InterfaceId(0),
3368 &mut rng,
3369 );
3370 for a in &resp_actions {
3371 if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
3372 got_query = true;
3373 }
3374 }
3375 }
3376 }
3377 assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
3378 }
3379
3380 #[test]
3381 fn test_resource_accept_app_accept() {
3382 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3383 let mut rng = OsRng;
3384
3385 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3386
3387 let data = vec![0xCD; 100];
3388 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3389
3390 for action in &adv_actions {
3391 if let LinkManagerAction::SendPacket { raw, .. } = action {
3392 let pkt = RawPacket::unpack(raw).unwrap();
3393 let resp_actions = resp_mgr.handle_local_delivery(
3394 pkt.destination_hash,
3395 raw,
3396 pkt.packet_hash,
3397 rns_core::transport::types::InterfaceId(0),
3398 &mut rng,
3399 );
3400 for a in &resp_actions {
3401 if let LinkManagerAction::ResourceAcceptQuery {
3402 link_id: lid,
3403 resource_hash,
3404 ..
3405 } = a
3406 {
3407 let accept_actions =
3409 resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
3410 let has_send = accept_actions
3412 .iter()
3413 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3414 assert!(
3415 has_send,
3416 "Accepting resource should produce request for parts"
3417 );
3418 }
3419 }
3420 }
3421 }
3422 }
3423
3424 #[test]
3425 fn test_resource_accept_app_reject() {
3426 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3427 let mut rng = OsRng;
3428
3429 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3430
3431 let data = vec![0xCD; 100];
3432 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3433
3434 for action in &adv_actions {
3435 if let LinkManagerAction::SendPacket { raw, .. } = action {
3436 let pkt = RawPacket::unpack(raw).unwrap();
3437 let resp_actions = resp_mgr.handle_local_delivery(
3438 pkt.destination_hash,
3439 raw,
3440 pkt.packet_hash,
3441 rns_core::transport::types::InterfaceId(0),
3442 &mut rng,
3443 );
3444 for a in &resp_actions {
3445 if let LinkManagerAction::ResourceAcceptQuery {
3446 link_id: lid,
3447 resource_hash,
3448 ..
3449 } = a
3450 {
3451 let reject_actions =
3453 resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
3454 let has_resource_received = reject_actions
3457 .iter()
3458 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3459 assert!(!has_resource_received);
3460 }
3461 }
3462 }
3463 }
3464 }
3465
3466 #[test]
3467 fn test_resource_full_transfer() {
3468 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3469 let mut rng = OsRng;
3470
3471 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3473
3474 let original_data = b"Hello, Resource Transfer!".to_vec();
3476 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3477
3478 let mut pending: Vec<(char, LinkManagerAction)> =
3481 adv_actions.into_iter().map(|a| ('i', a)).collect();
3482 let mut rounds = 0;
3483 let max_rounds = 50;
3484 let mut resource_received = false;
3485 let mut sender_completed = false;
3486
3487 while !pending.is_empty() && rounds < max_rounds {
3488 rounds += 1;
3489 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3490
3491 for (source, action) in pending.drain(..) {
3492 if let LinkManagerAction::SendPacket { raw, .. } = action {
3493 let pkt = RawPacket::unpack(&raw).unwrap();
3494
3495 let target_actions = if source == 'i' {
3497 resp_mgr.handle_local_delivery(
3498 pkt.destination_hash,
3499 &raw,
3500 pkt.packet_hash,
3501 rns_core::transport::types::InterfaceId(0),
3502 &mut rng,
3503 )
3504 } else {
3505 init_mgr.handle_local_delivery(
3506 pkt.destination_hash,
3507 &raw,
3508 pkt.packet_hash,
3509 rns_core::transport::types::InterfaceId(0),
3510 &mut rng,
3511 )
3512 };
3513
3514 let target_source = if source == 'i' { 'r' } else { 'i' };
3515 for a in &target_actions {
3516 match a {
3517 LinkManagerAction::ResourceReceived { data, .. } => {
3518 assert_eq!(*data, original_data);
3519 resource_received = true;
3520 }
3521 LinkManagerAction::ResourceCompleted { .. } => {
3522 sender_completed = true;
3523 }
3524 _ => {}
3525 }
3526 }
3527 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3528 }
3529 }
3530 pending = next;
3531 }
3532
3533 assert!(
3534 resource_received,
3535 "Responder should receive resource data (rounds={})",
3536 rounds
3537 );
3538 assert!(
3539 sender_completed,
3540 "Sender should get completion proof (rounds={})",
3541 rounds
3542 );
3543 }
3544
3545 #[test]
3546 fn test_resource_cancel_icl() {
3547 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3548 let mut rng = OsRng;
3549
3550 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3551
3552 let data = vec![0xAB; 2000];
3554 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3555
3556 for action in &adv_actions {
3558 if let LinkManagerAction::SendPacket { raw, .. } = action {
3559 let pkt = RawPacket::unpack(raw).unwrap();
3560 resp_mgr.handle_local_delivery(
3561 pkt.destination_hash,
3562 raw,
3563 pkt.packet_hash,
3564 rns_core::transport::types::InterfaceId(0),
3565 &mut rng,
3566 );
3567 }
3568 }
3569
3570 assert!(!resp_mgr
3572 .links
3573 .get(&link_id)
3574 .unwrap()
3575 .incoming_resources
3576 .is_empty());
3577
3578 let icl_actions = resp_mgr.handle_resource_icl(&link_id);
3580
3581 let has_failed = icl_actions
3583 .iter()
3584 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3585 assert!(has_failed, "ICL should produce ResourceFailed");
3586 }
3587
3588 #[test]
3589 fn test_resource_cancel_rcl() {
3590 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3591 let mut rng = OsRng;
3592
3593 let data = vec![0xAB; 2000];
3595 init_mgr.send_resource(&link_id, &data, None, &mut rng);
3596
3597 assert!(!init_mgr
3599 .links
3600 .get(&link_id)
3601 .unwrap()
3602 .outgoing_resources
3603 .is_empty());
3604
3605 let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
3607
3608 let has_failed = rcl_actions
3609 .iter()
3610 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3611 assert!(has_failed, "RCL should produce ResourceFailed");
3612 }
3613
3614 #[test]
3615 fn test_cancel_all_resources_clears_active_transfers() {
3616 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3617 let mut rng = OsRng;
3618
3619 let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
3620 assert!(!actions.is_empty());
3621 assert_eq!(init_mgr.resource_transfer_count(), 1);
3622
3623 let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
3624
3625 assert_eq!(init_mgr.resource_transfer_count(), 0);
3626 assert!(cancel_actions
3627 .iter()
3628 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
3629 }
3630
3631 #[test]
3632 fn test_resource_tick_cleans_up() {
3633 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3634 let mut rng = OsRng;
3635
3636 let data = vec![0xAB; 100];
3637 init_mgr.send_resource(&link_id, &data, None, &mut rng);
3638
3639 assert!(!init_mgr
3640 .links
3641 .get(&link_id)
3642 .unwrap()
3643 .outgoing_resources
3644 .is_empty());
3645
3646 init_mgr.handle_resource_rcl(&link_id);
3648
3649 init_mgr.tick(&mut rng);
3651
3652 assert!(
3653 init_mgr
3654 .links
3655 .get(&link_id)
3656 .unwrap()
3657 .outgoing_resources
3658 .is_empty(),
3659 "Tick should clean up completed/failed outgoing resources"
3660 );
3661 }
3662
3663 #[test]
3664 fn test_build_link_packet() {
3665 let (init_mgr, _resp_mgr, link_id) = setup_active_link();
3666
3667 let actions =
3668 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
3669 assert_eq!(actions.len(), 1);
3670 if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
3671 let pkt = RawPacket::unpack(raw).unwrap();
3672 assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
3673 assert_eq!(*dest_type, constants::DESTINATION_LINK);
3674 } else {
3675 panic!("Expected SendPacket");
3676 }
3677 }
3678
3679 #[test]
3684 fn test_channel_message_delivery() {
3685 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3686 let mut rng = OsRng;
3687
3688 let chan_actions = init_mgr
3690 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
3691 .expect("active link channel send should succeed");
3692 assert!(!chan_actions.is_empty());
3693
3694 let mut got_channel_msg = false;
3696 for action in &chan_actions {
3697 if let LinkManagerAction::SendPacket { raw, .. } = action {
3698 let pkt = RawPacket::unpack(raw).unwrap();
3699 let resp_actions = resp_mgr.handle_local_delivery(
3700 pkt.destination_hash,
3701 raw,
3702 pkt.packet_hash,
3703 rns_core::transport::types::InterfaceId(0),
3704 &mut rng,
3705 );
3706 for a in &resp_actions {
3707 if let LinkManagerAction::ChannelMessageReceived {
3708 msgtype, payload, ..
3709 } = a
3710 {
3711 assert_eq!(*msgtype, 42);
3712 assert_eq!(*payload, b"channel data");
3713 got_channel_msg = true;
3714 }
3715 }
3716 }
3717 }
3718 assert!(got_channel_msg, "Responder should receive channel message");
3719 }
3720
3721 #[test]
3722 fn test_channel_proof_reopens_send_window() {
3723 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3724 let mut rng = OsRng;
3725
3726 init_mgr
3727 .send_channel_message(&link_id, 42, b"first", &mut rng)
3728 .expect("first send should succeed");
3729 init_mgr
3730 .send_channel_message(&link_id, 42, b"second", &mut rng)
3731 .expect("second send should succeed");
3732
3733 let err = init_mgr
3734 .send_channel_message(&link_id, 42, b"third", &mut rng)
3735 .expect_err("third send should hit the initial channel window");
3736 assert_eq!(err, "Channel is not ready to send");
3737
3738 let queued_packets = init_mgr
3739 .links
3740 .get(&link_id)
3741 .unwrap()
3742 .pending_channel_packets
3743 .clone();
3744 assert_eq!(queued_packets.len(), 2);
3745 for tracked_hash in queued_packets.keys().take(1) {
3746 let mut proof_data = Vec::with_capacity(96);
3747 proof_data.extend_from_slice(tracked_hash);
3748 proof_data.extend_from_slice(&[0x11; 64]);
3749 let flags = PacketFlags {
3750 header_type: constants::HEADER_1,
3751 context_flag: constants::FLAG_UNSET,
3752 transport_type: constants::TRANSPORT_BROADCAST,
3753 destination_type: constants::DESTINATION_LINK,
3754 packet_type: constants::PACKET_TYPE_PROOF,
3755 };
3756 let proof = RawPacket::pack(
3757 flags,
3758 0,
3759 &link_id,
3760 None,
3761 constants::CONTEXT_NONE,
3762 &proof_data,
3763 )
3764 .expect("proof packet should pack");
3765 let ack_actions = init_mgr.handle_local_delivery(
3766 link_id,
3767 &proof.raw,
3768 proof.packet_hash,
3769 rns_core::transport::types::InterfaceId(0),
3770 &mut rng,
3771 );
3772 assert!(
3773 ack_actions.is_empty(),
3774 "proof delivery should only update channel state"
3775 );
3776 }
3777
3778 init_mgr
3779 .send_channel_message(&link_id, 42, b"third", &mut rng)
3780 .expect("proof should free one channel slot");
3781 }
3782
3783 #[test]
3784 fn test_generic_link_data_delivery() {
3785 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3786 let mut rng = OsRng;
3787
3788 let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
3790 assert_eq!(actions.len(), 1);
3791
3792 let raw = extract_any_send_packet(&actions);
3794 let pkt = RawPacket::unpack(&raw).unwrap();
3795 let resp_actions = resp_mgr.handle_local_delivery(
3796 pkt.destination_hash,
3797 &raw,
3798 pkt.packet_hash,
3799 rns_core::transport::types::InterfaceId(0),
3800 &mut rng,
3801 );
3802
3803 let has_data = resp_actions
3804 .iter()
3805 .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
3806 assert!(
3807 has_data,
3808 "Responder should receive LinkDataReceived for unknown context"
3809 );
3810 }
3811
3812 #[test]
3813 fn test_response_delivery() {
3814 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3815 let mut rng = OsRng;
3816
3817 resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
3819 Some(data.to_vec())
3820 });
3821
3822 let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); assert!(!req_actions.is_empty());
3825
3826 let req_raw = extract_any_send_packet(&req_actions);
3828 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3829 let resp_actions = resp_mgr.handle_local_delivery(
3830 req_pkt.destination_hash,
3831 &req_raw,
3832 req_pkt.packet_hash,
3833 rns_core::transport::types::InterfaceId(0),
3834 &mut rng,
3835 );
3836 let has_resp_send = resp_actions
3837 .iter()
3838 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3839 assert!(has_resp_send, "Handler should produce response");
3840
3841 let resp_raw = extract_any_send_packet(&resp_actions);
3843 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3844 let init_actions = init_mgr.handle_local_delivery(
3845 resp_pkt.destination_hash,
3846 &resp_raw,
3847 resp_pkt.packet_hash,
3848 rns_core::transport::types::InterfaceId(0),
3849 &mut rng,
3850 );
3851
3852 let has_response_received = init_actions
3853 .iter()
3854 .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
3855 assert!(
3856 has_response_received,
3857 "Initiator should receive ResponseReceived"
3858 );
3859 }
3860
3861 #[test]
3862 fn test_large_response_uses_resource_fallback() {
3863 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3864 let mut rng = OsRng;
3865
3866 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
3869 resp_mgr.register_request_handler("/large", None, {
3870 let large_payload = large_payload.clone();
3871 move |_link_id, _path, _data, _remote| Some(large_payload.clone())
3872 });
3873
3874 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
3876 assert!(!req_actions.is_empty());
3877
3878 let req_raw = extract_any_send_packet(&req_actions);
3880 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3881 let resp_actions = resp_mgr.handle_local_delivery(
3882 req_pkt.destination_hash,
3883 &req_raw,
3884 req_pkt.packet_hash,
3885 rns_core::transport::types::InterfaceId(0),
3886 &mut rng,
3887 );
3888
3889 let mut has_resource_adv = false;
3890 let mut has_direct_response = false;
3891 for action in &resp_actions {
3892 if let LinkManagerAction::SendPacket { raw, .. } = action {
3893 let pkt = RawPacket::unpack(raw).unwrap();
3894 if pkt.context == constants::CONTEXT_RESOURCE_ADV {
3895 has_resource_adv = true;
3896 }
3897 if pkt.context == constants::CONTEXT_RESPONSE {
3898 has_direct_response = true;
3899 }
3900 }
3901 }
3902
3903 assert!(
3904 has_resource_adv,
3905 "Large response should advertise a response resource"
3906 );
3907 assert!(
3908 !has_direct_response,
3909 "Large response should not use direct CONTEXT_RESPONSE packet"
3910 );
3911 }
3912
3913 #[test]
3914 fn test_send_channel_message_on_no_channel() {
3915 let mut mgr = LinkManager::new();
3916 let mut rng = OsRng;
3917 let dummy_sig = [0xAA; 32];
3918 let (link_id, _) =
3919 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3920
3921 let err = mgr
3923 .send_channel_message(&link_id, 1, b"test", &mut rng)
3924 .expect_err("pending link should reject channel send");
3925 assert_eq!(err, "link has no active channel");
3926 }
3927
3928 #[test]
3929 fn test_send_on_link_requires_active() {
3930 let mut mgr = LinkManager::new();
3931 let mut rng = OsRng;
3932 let dummy_sig = [0xAA; 32];
3933 let (link_id, _) =
3934 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3935
3936 let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
3937 assert!(actions.is_empty(), "Cannot send on pending link");
3938 }
3939
3940 #[test]
3941 fn test_send_on_link_unknown_link() {
3942 let mgr = LinkManager::new();
3943 let mut rng = OsRng;
3944
3945 let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
3946 assert!(actions.is_empty());
3947 }
3948
3949 #[test]
3950 fn test_resource_full_transfer_large() {
3951 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3952 let mut rng = OsRng;
3953
3954 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3955
3956 let original_data: Vec<u8> = (0..2000u32)
3958 .map(|i| {
3959 let pos = i as usize;
3960 (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
3961 })
3962 .collect();
3963
3964 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3965
3966 let mut pending: Vec<(char, LinkManagerAction)> =
3967 adv_actions.into_iter().map(|a| ('i', a)).collect();
3968 let mut rounds = 0;
3969 let max_rounds = 200;
3970 let mut resource_received = false;
3971 let mut sender_completed = false;
3972
3973 while !pending.is_empty() && rounds < max_rounds {
3974 rounds += 1;
3975 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3976
3977 for (source, action) in pending.drain(..) {
3978 if let LinkManagerAction::SendPacket { raw, .. } = action {
3979 let pkt = match RawPacket::unpack(&raw) {
3980 Ok(p) => p,
3981 Err(_) => continue,
3982 };
3983
3984 let target_actions = if source == 'i' {
3985 resp_mgr.handle_local_delivery(
3986 pkt.destination_hash,
3987 &raw,
3988 pkt.packet_hash,
3989 rns_core::transport::types::InterfaceId(0),
3990 &mut rng,
3991 )
3992 } else {
3993 init_mgr.handle_local_delivery(
3994 pkt.destination_hash,
3995 &raw,
3996 pkt.packet_hash,
3997 rns_core::transport::types::InterfaceId(0),
3998 &mut rng,
3999 )
4000 };
4001
4002 let target_source = if source == 'i' { 'r' } else { 'i' };
4003 for a in &target_actions {
4004 match a {
4005 LinkManagerAction::ResourceReceived { data, .. } => {
4006 assert_eq!(*data, original_data);
4007 resource_received = true;
4008 }
4009 LinkManagerAction::ResourceCompleted { .. } => {
4010 sender_completed = true;
4011 }
4012 _ => {}
4013 }
4014 }
4015 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4016 }
4017 }
4018 pending = next;
4019 }
4020
4021 assert!(
4022 resource_received,
4023 "Should receive large resource (rounds={})",
4024 rounds
4025 );
4026 assert!(
4027 sender_completed,
4028 "Sender should complete (rounds={})",
4029 rounds
4030 );
4031 }
4032
4033 #[test]
4034 fn test_resource_receiver_stores_original_advertisement_plaintext() {
4035 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4036 let mut rng = OsRng;
4037
4038 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4039
4040 let data = vec![0x41; 256];
4041 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4042
4043 let adv_raw = adv_actions
4044 .iter()
4045 .find_map(|action| match action {
4046 LinkManagerAction::SendPacket { raw, .. } => {
4047 let pkt = RawPacket::unpack(raw).ok()?;
4048 (pkt.context == constants::CONTEXT_RESOURCE_ADV).then_some(raw.clone())
4049 }
4050 _ => None,
4051 })
4052 .expect("sender should emit a resource advertisement");
4053
4054 let adv_pkt = RawPacket::unpack(&adv_raw).unwrap();
4055 let adv_plaintext = resp_mgr
4056 .links
4057 .get(&link_id)
4058 .unwrap()
4059 .engine
4060 .decrypt(&adv_pkt.data)
4061 .unwrap();
4062
4063 let _resp_actions = resp_mgr.handle_local_delivery(
4064 adv_pkt.destination_hash,
4065 &adv_raw,
4066 adv_pkt.packet_hash,
4067 rns_core::transport::types::InterfaceId(0),
4068 &mut rng,
4069 );
4070
4071 let receiver = resp_mgr
4072 .links
4073 .get(&link_id)
4074 .and_then(|managed| managed.incoming_resources.first())
4075 .expect("advertisement should create an incoming receiver");
4076 assert_eq!(receiver.advertisement_packet, adv_plaintext);
4077 assert_eq!(
4078 receiver.max_decompressed_size,
4079 constants::RESOURCE_AUTO_COMPRESS_MAX_SIZE
4080 );
4081 }
4082
4083 #[test]
4084 fn test_corrupt_compressed_resource_rejects_and_tears_down_link() {
4085 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4086 let mut rng = OsRng;
4087
4088 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4089
4090 let data = vec![b'A'; 4096];
4091 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4092
4093 let mut request_actions = Vec::new();
4094 for action in &adv_actions {
4095 let LinkManagerAction::SendPacket { raw, .. } = action else {
4096 continue;
4097 };
4098 let pkt = RawPacket::unpack(raw).unwrap();
4099 let actions = resp_mgr.handle_local_delivery(
4100 pkt.destination_hash,
4101 raw,
4102 pkt.packet_hash,
4103 rns_core::transport::types::InterfaceId(0),
4104 &mut rng,
4105 );
4106 request_actions.extend(actions);
4107 }
4108
4109 {
4110 let receiver = resp_mgr
4111 .links
4112 .get_mut(&link_id)
4113 .and_then(|managed| managed.incoming_resources.first_mut())
4114 .expect("receiver should exist after advertisement");
4115 assert!(receiver.flags.compressed, "test data should be compressed");
4116 receiver.max_decompressed_size = 64;
4117 }
4118
4119 let mut responder_actions = Vec::new();
4120 for action in request_actions {
4121 let LinkManagerAction::SendPacket { raw, .. } = action else {
4122 continue;
4123 };
4124 let pkt = RawPacket::unpack(&raw).unwrap();
4125 let actions = init_mgr.handle_local_delivery(
4126 pkt.destination_hash,
4127 &raw,
4128 pkt.packet_hash,
4129 rns_core::transport::types::InterfaceId(0),
4130 &mut rng,
4131 );
4132
4133 for action in actions {
4134 let LinkManagerAction::SendPacket { raw, .. } = &action else {
4135 continue;
4136 };
4137 let pkt = RawPacket::unpack(raw).unwrap();
4138 let delivered = resp_mgr.handle_local_delivery(
4139 pkt.destination_hash,
4140 raw,
4141 pkt.packet_hash,
4142 rns_core::transport::types::InterfaceId(0),
4143 &mut rng,
4144 );
4145 responder_actions.extend(delivered);
4146 }
4147 }
4148
4149 assert!(
4150 responder_actions.iter().any(|action| matches!(
4151 action,
4152 LinkManagerAction::ResourceFailed { error, .. }
4153 if error == "Resource too large"
4154 )),
4155 "corrupt oversized resource should fail with TooLarge"
4156 );
4157 assert!(
4158 responder_actions.iter().any(|action| matches!(
4159 action,
4160 LinkManagerAction::LinkClosed { link_id: closed_id, .. } if *closed_id == link_id
4161 )),
4162 "corrupt oversized resource should tear down the link"
4163 );
4164 assert!(
4165 responder_actions.iter().any(|action| match action {
4166 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
4167 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_RCL)
4168 .unwrap_or(false),
4169 _ => false,
4170 }),
4171 "corrupt oversized resource should send a receiver cancel/reject packet"
4172 );
4173 assert_eq!(
4174 resp_mgr
4175 .links
4176 .get(&link_id)
4177 .map(|managed| managed.engine.state()),
4178 Some(LinkState::Closed)
4179 );
4180 }
4181
4182 #[test]
4183 fn test_resource_hmu_timeout_extension_in_link_manager_flow() {
4184 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4185 let mut rng = OsRng;
4186
4187 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
4188
4189 let mut state = 0x1234_5678u32;
4192 let data: Vec<u8> = (0..50000)
4193 .map(|_| {
4194 state = state.wrapping_mul(1664525).wrapping_add(1013904223);
4195 (state >> 16) as u8
4196 })
4197 .collect();
4198 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
4199 let mut pending: Vec<(char, LinkManagerAction)> =
4200 adv_actions.into_iter().map(|a| ('i', a)).collect();
4201
4202 let mut rounds = 0;
4203
4204 while rounds < 300 {
4207 rounds += 1;
4208 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
4209
4210 for (source, action) in pending.drain(..) {
4211 let LinkManagerAction::SendPacket { raw, .. } = action else {
4212 continue;
4213 };
4214
4215 let pkt = match RawPacket::unpack(&raw) {
4216 Ok(p) => p,
4217 Err(_) => continue,
4218 };
4219
4220 let target_actions = if source == 'i' {
4221 resp_mgr.handle_local_delivery(
4222 pkt.destination_hash,
4223 &raw,
4224 pkt.packet_hash,
4225 rns_core::transport::types::InterfaceId(0),
4226 &mut rng,
4227 )
4228 } else {
4229 init_mgr.handle_local_delivery(
4230 pkt.destination_hash,
4231 &raw,
4232 pkt.packet_hash,
4233 rns_core::transport::types::InterfaceId(0),
4234 &mut rng,
4235 )
4236 };
4237
4238 let target_source = if source == 'i' { 'r' } else { 'i' };
4239 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4240 }
4241
4242 if resp_mgr
4243 .links
4244 .get(&link_id)
4245 .and_then(|managed| managed.incoming_resources.first())
4246 .is_some_and(|receiver| receiver.waiting_for_hmu)
4247 {
4248 break;
4249 }
4250
4251 pending = next;
4252 }
4253
4254 assert!(
4255 resp_mgr
4256 .links
4257 .get(&link_id)
4258 .and_then(|managed| managed.incoming_resources.first())
4259 .is_some_and(|receiver| receiver.waiting_for_hmu),
4260 "expected receiver to reach a live HMU wait state"
4261 );
4262
4263 let prime_actions = {
4266 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
4267 let receiver = managed.incoming_resources.first_mut().unwrap();
4268 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
4269 managed.engine.decrypt(ciphertext).map_err(|_| ())
4270 };
4271 receiver.tick(
4272 receiver.last_activity + 0.0001,
4273 &decrypt_fn,
4274 &Bzip2Compressor,
4275 )
4276 };
4277 assert!(
4278 !prime_actions
4279 .iter()
4280 .any(|a| matches!(a, ResourceAction::SendRequest(_))),
4281 "fresh HMU wait state should not immediately emit a retry request"
4282 );
4283
4284 let (late_delta, retries_before) = {
4285 let managed = resp_mgr
4286 .links
4287 .get_mut(&link_id)
4288 .expect("receiver link should still exist");
4289 let receiver = managed
4290 .incoming_resources
4291 .first_mut()
4292 .expect("receiver should have an active incoming resource");
4293
4294 assert!(
4295 receiver.waiting_for_hmu,
4296 "receiver should be waiting for HMU"
4297 );
4298
4299 let eifr = receiver.eifr.unwrap_or_else(|| {
4300 (constants::RESOURCE_SDU as f64 * 8.0) / receiver.rtt.unwrap_or(0.5)
4301 });
4302 let expected_tof = if receiver.outstanding_parts > 0 {
4303 (receiver.outstanding_parts as f64 * constants::RESOURCE_SDU as f64 * 8.0) / eifr
4304 } else {
4305 (3.0 * constants::RESOURCE_SDU as f64) / eifr
4306 };
4307 let expected_hmu_wait =
4308 (constants::RESOURCE_SDU as f64 * 8.0 * constants::RESOURCE_HMU_WAIT_FACTOR) / eifr;
4309 let old_delta = constants::RESOURCE_PART_TIMEOUT_FACTOR_AFTER_RTT * expected_tof
4310 + constants::RESOURCE_RETRY_GRACE_TIME;
4311 (
4312 old_delta + expected_hmu_wait + expected_hmu_wait.max(1.0),
4313 receiver.retries_left,
4314 )
4315 };
4316 {
4317 let managed = resp_mgr.links.get(&link_id).unwrap();
4318 let receiver = managed.incoming_resources.first().unwrap();
4319 assert_eq!(receiver.retries_left, retries_before);
4320 assert!(
4321 receiver.eifr.is_some(),
4322 "receiver tick should have populated EIFR"
4323 );
4324 }
4325
4326 let late_resource_actions = {
4327 let managed = resp_mgr.links.get_mut(&link_id).unwrap();
4328 let receiver = managed.incoming_resources.first_mut().unwrap();
4329 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
4330 managed.engine.decrypt(ciphertext).map_err(|_| ())
4331 };
4332 receiver.tick(
4333 receiver.last_activity + late_delta,
4334 &decrypt_fn,
4335 &Bzip2Compressor,
4336 )
4337 };
4338 let late_actions =
4339 resp_mgr.process_resource_actions(&link_id, late_resource_actions, &mut rng);
4340 let retry_raw = late_actions
4341 .iter()
4342 .find_map(|a| match a {
4343 LinkManagerAction::SendPacket { raw, .. } => {
4344 let pkt = RawPacket::unpack(raw).ok()?;
4345 (pkt.context == constants::CONTEXT_RESOURCE_REQ).then_some(raw.clone())
4346 }
4347 _ => None,
4348 })
4349 .expect("receiver should emit a resource retry request after extended timeout");
4350
4351 {
4352 let managed = resp_mgr.links.get(&link_id).unwrap();
4353 let receiver = managed.incoming_resources.first().unwrap();
4354 assert_eq!(receiver.retries_left, retries_before - 1);
4355 }
4356
4357 let retry_pkt = RawPacket::unpack(&retry_raw).unwrap();
4358 let retry_plaintext = resp_mgr
4359 .links
4360 .get(&link_id)
4361 .unwrap()
4362 .engine
4363 .decrypt(&retry_pkt.data)
4364 .expect("retry request should decrypt");
4365 assert_eq!(retry_plaintext[0], constants::RESOURCE_HASHMAP_IS_EXHAUSTED);
4366
4367 let retry_to_sender = init_mgr.handle_local_delivery(
4370 retry_pkt.destination_hash,
4371 &retry_raw,
4372 retry_pkt.packet_hash,
4373 rns_core::transport::types::InterfaceId(0),
4374 &mut rng,
4375 );
4376 assert!(
4377 retry_to_sender.iter().any(|a| match a {
4378 LinkManagerAction::SendPacket { raw, .. } => RawPacket::unpack(raw)
4379 .map(|pkt| pkt.context == constants::CONTEXT_RESOURCE_HMU)
4380 .unwrap_or(false),
4381 _ => false,
4382 }),
4383 "sender should answer the exhausted retry request with a live HMU packet"
4384 );
4385 }
4386
4387 #[test]
4388 fn test_process_resource_actions_mapping() {
4389 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
4390 let mut rng = OsRng;
4391
4392 let actions = vec![
4394 ResourceAction::DataReceived {
4395 data: vec![1, 2, 3],
4396 metadata: Some(vec![4, 5]),
4397 },
4398 ResourceAction::Completed,
4399 ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
4400 ResourceAction::ProgressUpdate {
4401 received: 10,
4402 total: 20,
4403 },
4404 ResourceAction::TeardownLink,
4405 ];
4406
4407 let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
4408
4409 assert!(matches!(
4410 result[0],
4411 LinkManagerAction::ResourceReceived { .. }
4412 ));
4413 assert!(matches!(
4414 result[1],
4415 LinkManagerAction::ResourceCompleted { .. }
4416 ));
4417 assert!(matches!(
4418 result[2],
4419 LinkManagerAction::ResourceFailed { .. }
4420 ));
4421 assert!(matches!(
4422 result[3],
4423 LinkManagerAction::ResourceProgress {
4424 received: 10,
4425 total: 20,
4426 ..
4427 }
4428 ));
4429 assert!(result
4430 .iter()
4431 .any(|action| matches!(action, LinkManagerAction::LinkClosed { .. })));
4432 }
4433
4434 #[test]
4435 fn test_link_state_empty() {
4436 let mgr = LinkManager::new();
4437 let fake_id = [0xAA; 16];
4438 assert!(mgr.link_state(&fake_id).is_none());
4439 }
4440
4441 #[test]
4442 fn test_large_response_resource_completes_as_response() {
4443 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4444 let mut rng = OsRng;
4445
4446 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
4447 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
4448 resp_mgr.register_request_handler("/large", None, {
4449 let response_value = response_value.clone();
4450 move |_link_id, _path, _data, _remote| Some(response_value.clone())
4451 });
4452
4453 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
4454 let req_raw = extract_any_send_packet(&req_actions);
4455 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4456 let request_id = req_pkt.get_truncated_hash();
4457 let resp_actions = resp_mgr.handle_local_delivery(
4458 req_pkt.destination_hash,
4459 &req_raw,
4460 req_pkt.packet_hash,
4461 rns_core::transport::types::InterfaceId(0),
4462 &mut rng,
4463 );
4464
4465 let mut pending: Vec<(char, LinkManagerAction)> =
4466 resp_actions.into_iter().map(|a| ('r', a)).collect();
4467 let mut rounds = 0;
4468 let mut received_response = None;
4469
4470 while !pending.is_empty() && rounds < 200 {
4471 rounds += 1;
4472 let mut next = Vec::new();
4473
4474 for (source, action) in pending.drain(..) {
4475 let LinkManagerAction::SendPacket { raw, .. } = action else {
4476 continue;
4477 };
4478 let pkt = RawPacket::unpack(&raw).unwrap();
4479 let target_actions = if source == 'r' {
4480 init_mgr.handle_local_delivery(
4481 pkt.destination_hash,
4482 &raw,
4483 pkt.packet_hash,
4484 rns_core::transport::types::InterfaceId(0),
4485 &mut rng,
4486 )
4487 } else {
4488 resp_mgr.handle_local_delivery(
4489 pkt.destination_hash,
4490 &raw,
4491 pkt.packet_hash,
4492 rns_core::transport::types::InterfaceId(0),
4493 &mut rng,
4494 )
4495 };
4496
4497 let target_source = if source == 'r' { 'i' } else { 'r' };
4498 for target_action in &target_actions {
4499 match target_action {
4500 LinkManagerAction::ResponseReceived {
4501 request_id: rid,
4502 data,
4503 ..
4504 } => {
4505 received_response = Some((*rid, data.clone()));
4506 }
4507 LinkManagerAction::ResourceReceived { .. } => {
4508 panic!("response resources must complete as ResponseReceived")
4509 }
4510 LinkManagerAction::ResourceAcceptQuery { .. } => {
4511 panic!("response resources must bypass application acceptance")
4512 }
4513 _ => {}
4514 }
4515 }
4516 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4517 }
4518
4519 pending = next;
4520 }
4521
4522 let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
4523 panic!(
4524 "large response resource did not complete as ResponseReceived after {} rounds",
4525 rounds
4526 )
4527 });
4528 assert_eq!(received_request_id, request_id);
4529 assert_eq!(received_data, response_value);
4530 }
4531
4532 #[test]
4533 fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
4534 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4535 let mut rng = OsRng;
4536
4537 init_mgr.set_link_mtu(&link_id, 300);
4538 resp_mgr.set_link_mtu(&link_id, 300);
4539
4540 let payload = vec![0xAB; 350];
4541 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4542 resp_mgr.register_request_handler("/mtu", None, {
4543 let response_value = response_value.clone();
4544 move |_link_id, _path, _data, _remote| Some(response_value.clone())
4545 });
4546
4547 let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
4548 let req_raw = extract_any_send_packet(&req_actions);
4549 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4550 let resp_actions = resp_mgr.handle_local_delivery(
4551 req_pkt.destination_hash,
4552 &req_raw,
4553 req_pkt.packet_hash,
4554 rns_core::transport::types::InterfaceId(0),
4555 &mut rng,
4556 );
4557
4558 let mut has_resource_adv = false;
4559 let mut direct_response_len = None;
4560 for action in &resp_actions {
4561 if let LinkManagerAction::SendPacket { raw, .. } = action {
4562 let pkt = RawPacket::unpack(raw).unwrap();
4563 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4564 if pkt.context == constants::CONTEXT_RESPONSE {
4565 direct_response_len = Some(raw.len());
4566 }
4567 }
4568 }
4569
4570 assert!(
4571 has_resource_adv,
4572 "responses larger than the negotiated link MTU should use resource fallback"
4573 );
4574 assert!(
4575 direct_response_len.is_none(),
4576 "sent direct response of {} bytes on a 300 byte negotiated MTU",
4577 direct_response_len.unwrap_or_default()
4578 );
4579 }
4580
4581 #[test]
4582 fn test_large_management_response_uses_resource_fallback() {
4583 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4584 let mut rng = OsRng;
4585
4586 let payload = vec![0xBC; 5000];
4587 let response_value = rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4588 let actions =
4589 resp_mgr.send_management_response(&link_id, &[0x55; 16], &response_value, &mut rng);
4590
4591 let mut has_resource_adv = false;
4592 let mut has_direct_response = false;
4593 for action in &actions {
4594 if let LinkManagerAction::SendPacket { raw, .. } = action {
4595 let pkt = RawPacket::unpack(raw).unwrap();
4596 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4597 has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
4598 }
4599 }
4600
4601 assert!(
4602 has_resource_adv,
4603 "large management responses should advertise a response resource"
4604 );
4605 assert!(
4606 !has_direct_response,
4607 "large management responses should not use a direct CONTEXT_RESPONSE packet"
4608 );
4609 }
4610}