1use std::collections::HashMap;
10
11use super::compressor::Bzip2Compressor;
12use rns_core::channel::{Channel, Sequence};
13use rns_core::constants;
14use rns_core::link::types::{LinkId, LinkState, TeardownReason};
15use rns_core::link::{LinkAction, LinkEngine, LinkMode};
16use rns_core::packet::{PacketFlags, RawPacket};
17use rns_core::resource::{ResourceAction, ResourceReceiver, ResourceSender};
18use rns_crypto::ed25519::Ed25519PrivateKey;
19use rns_crypto::Rng;
20
21use super::time;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ResourceStrategy {
26 AcceptNone,
28 AcceptAll,
30 AcceptApp,
32}
33
34impl Default for ResourceStrategy {
35 fn default() -> Self {
36 ResourceStrategy::AcceptNone
37 }
38}
39
40struct ManagedLink {
42 engine: LinkEngine,
43 channel: Option<Channel>,
44 pending_channel_packets: HashMap<[u8; 32], Sequence>,
45 channel_send_ok: u64,
46 channel_send_not_ready: u64,
47 channel_send_too_big: u64,
48 channel_send_other_error: u64,
49 channel_messages_received: u64,
50 channel_proofs_sent: u64,
51 channel_proofs_received: u64,
52 dest_hash: [u8; 16],
54 remote_identity: Option<([u8; 16], [u8; 64])>,
56 dest_sig_pub_bytes: Option<[u8; 32]>,
58 incoming_resources: Vec<ResourceReceiver>,
60 outgoing_resources: Vec<ResourceSender>,
62 resource_strategy: ResourceStrategy,
64 route_interface: Option<rns_core::transport::types::InterfaceId>,
66 route_transport_id: Option<[u8; 16]>,
71}
72
73struct LinkDestination {
75 sig_prv: Ed25519PrivateKey,
76 sig_pub_bytes: [u8; 32],
77 resource_strategy: ResourceStrategy,
78}
79
80struct RequestHandlerEntry {
82 path: String,
84 path_hash: [u8; 16],
86 allowed_list: Option<Vec<[u8; 16]>>,
88 handler:
90 Box<dyn Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>> + Send>,
91}
92
93#[derive(Debug)]
95pub enum LinkManagerAction {
96 SendPacket {
98 raw: Vec<u8>,
99 dest_type: u8,
100 attached_interface: Option<rns_core::transport::types::InterfaceId>,
101 },
102 LinkEstablished {
104 link_id: LinkId,
105 dest_hash: [u8; 16],
106 rtt: f64,
107 is_initiator: bool,
108 },
109 LinkClosed {
111 link_id: LinkId,
112 reason: Option<TeardownReason>,
113 },
114 RemoteIdentified {
116 link_id: LinkId,
117 identity_hash: [u8; 16],
118 public_key: [u8; 64],
119 },
120 RegisterLinkDest { link_id: LinkId },
122 DeregisterLinkDest { link_id: LinkId },
124 ManagementRequest {
127 link_id: LinkId,
128 path_hash: [u8; 16],
129 data: Vec<u8>,
131 request_id: [u8; 16],
133 remote_identity: Option<([u8; 16], [u8; 64])>,
134 },
135 ResourceReceived {
137 link_id: LinkId,
138 data: Vec<u8>,
139 metadata: Option<Vec<u8>>,
140 },
141 ResourceCompleted { link_id: LinkId },
143 ResourceFailed { link_id: LinkId, error: String },
145 ResourceProgress {
147 link_id: LinkId,
148 received: usize,
149 total: usize,
150 },
151 ResourceAcceptQuery {
153 link_id: LinkId,
154 resource_hash: Vec<u8>,
155 transfer_size: u64,
156 has_metadata: bool,
157 },
158 ChannelMessageReceived {
160 link_id: LinkId,
161 msgtype: u16,
162 payload: Vec<u8>,
163 },
164 LinkDataReceived {
166 link_id: LinkId,
167 context: u8,
168 data: Vec<u8>,
169 },
170 ResponseReceived {
172 link_id: LinkId,
173 request_id: [u8; 16],
174 data: Vec<u8>,
175 },
176 LinkRequestReceived {
178 link_id: LinkId,
179 receiving_interface: rns_core::transport::types::InterfaceId,
180 },
181}
182
183pub struct LinkManager {
185 links: HashMap<LinkId, ManagedLink>,
186 link_destinations: HashMap<[u8; 16], LinkDestination>,
187 request_handlers: Vec<RequestHandlerEntry>,
188 management_paths: Vec<[u8; 16]>,
191}
192
193impl LinkManager {
194 fn resource_sdu_for_link(link: &ManagedLink) -> usize {
195 let mtu = link.engine.mtu() as usize;
198 let derived = mtu.saturating_sub(constants::HEADER_MAXSIZE + constants::IFAC_MIN_SIZE);
199 if derived > 0 {
200 derived
201 } else {
202 constants::RESOURCE_SDU
203 }
204 }
205
206 pub fn new() -> Self {
208 LinkManager {
209 links: HashMap::new(),
210 link_destinations: HashMap::new(),
211 request_handlers: Vec::new(),
212 management_paths: Vec::new(),
213 }
214 }
215
216 pub fn register_management_path(&mut self, path_hash: [u8; 16]) {
220 if !self.management_paths.contains(&path_hash) {
221 self.management_paths.push(path_hash);
222 }
223 }
224
225 pub fn get_derived_key(&self, link_id: &LinkId) -> Option<Vec<u8>> {
227 self.links
228 .get(link_id)
229 .and_then(|link| link.engine.derived_key().map(|dk| dk.to_vec()))
230 }
231
232 pub fn get_link_route_hint(
234 &self,
235 link_id: &LinkId,
236 ) -> Option<(rns_core::transport::types::InterfaceId, Option<[u8; 16]>)> {
237 self.links.get(link_id).and_then(|link| {
238 link.route_interface
239 .map(|iface| (iface, link.route_transport_id))
240 })
241 }
242
243 pub fn register_link_destination(
245 &mut self,
246 dest_hash: [u8; 16],
247 sig_prv: Ed25519PrivateKey,
248 sig_pub_bytes: [u8; 32],
249 resource_strategy: ResourceStrategy,
250 ) {
251 self.link_destinations.insert(
252 dest_hash,
253 LinkDestination {
254 sig_prv,
255 sig_pub_bytes,
256 resource_strategy,
257 },
258 );
259 }
260
261 pub fn deregister_link_destination(&mut self, dest_hash: &[u8; 16]) {
263 self.link_destinations.remove(dest_hash);
264 }
265
266 pub fn register_request_handler<F>(
272 &mut self,
273 path: &str,
274 allowed_list: Option<Vec<[u8; 16]>>,
275 handler: F,
276 ) where
277 F: Fn(LinkId, &str, &[u8], Option<&([u8; 16], [u8; 64])>) -> Option<Vec<u8>>
278 + Send
279 + 'static,
280 {
281 let path_hash = compute_path_hash(path);
282 self.request_handlers.push(RequestHandlerEntry {
283 path: path.to_string(),
284 path_hash,
285 allowed_list,
286 handler: Box::new(handler),
287 });
288 }
289
290 pub fn create_link(
298 &mut self,
299 dest_hash: &[u8; 16],
300 dest_sig_pub_bytes: &[u8; 32],
301 hops: u8,
302 mtu: u32,
303 rng: &mut dyn Rng,
304 ) -> (LinkId, Vec<LinkManagerAction>) {
305 let mode = LinkMode::Aes256Cbc;
306 let (mut engine, request_data) =
307 LinkEngine::new_initiator(dest_hash, hops, mode, Some(mtu), time::now(), rng);
308
309 let flags = PacketFlags {
311 header_type: constants::HEADER_1,
312 context_flag: constants::FLAG_UNSET,
313 transport_type: constants::TRANSPORT_BROADCAST,
314 destination_type: constants::DESTINATION_SINGLE,
315 packet_type: constants::PACKET_TYPE_LINKREQUEST,
316 };
317
318 let packet = match RawPacket::pack(
319 flags,
320 0,
321 dest_hash,
322 None,
323 constants::CONTEXT_NONE,
324 &request_data,
325 ) {
326 Ok(p) => p,
327 Err(_) => {
328 return ([0u8; 16], Vec::new());
330 }
331 };
332
333 engine.set_link_id_from_hashable(&packet.get_hashable_part(), request_data.len());
334 let link_id = *engine.link_id();
335
336 let managed = ManagedLink {
337 engine,
338 channel: None,
339 pending_channel_packets: HashMap::new(),
340 channel_send_ok: 0,
341 channel_send_not_ready: 0,
342 channel_send_too_big: 0,
343 channel_send_other_error: 0,
344 channel_messages_received: 0,
345 channel_proofs_sent: 0,
346 channel_proofs_received: 0,
347 dest_hash: *dest_hash,
348 remote_identity: None,
349 dest_sig_pub_bytes: Some(*dest_sig_pub_bytes),
350 incoming_resources: Vec::new(),
351 outgoing_resources: Vec::new(),
352 resource_strategy: ResourceStrategy::default(),
353 route_interface: None,
354 route_transport_id: None,
355 };
356 self.links.insert(link_id, managed);
357
358 let mut actions = Vec::new();
359 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
361 actions.push(LinkManagerAction::SendPacket {
363 raw: packet.raw,
364 dest_type: constants::DESTINATION_LINK,
365 attached_interface: None,
366 });
367
368 (link_id, actions)
369 }
370
371 pub fn handle_local_delivery(
377 &mut self,
378 dest_hash: [u8; 16],
379 raw: &[u8],
380 packet_hash: [u8; 32],
381 receiving_interface: rns_core::transport::types::InterfaceId,
382 rng: &mut dyn Rng,
383 ) -> Vec<LinkManagerAction> {
384 let packet = match RawPacket::unpack(raw) {
385 Ok(p) => p,
386 Err(_) => return Vec::new(),
387 };
388
389 match packet.flags.packet_type {
390 constants::PACKET_TYPE_LINKREQUEST => {
391 self.handle_linkrequest(&dest_hash, &packet, receiving_interface, rng)
392 }
393 constants::PACKET_TYPE_PROOF if packet.context == constants::CONTEXT_LRPROOF => {
394 self.handle_lrproof(&dest_hash, &packet, receiving_interface, rng)
396 }
397 constants::PACKET_TYPE_PROOF => self.handle_link_proof(&dest_hash, &packet, rng),
398 constants::PACKET_TYPE_DATA => {
399 self.handle_link_data(&dest_hash, &packet, packet_hash, receiving_interface, rng)
400 }
401 _ => Vec::new(),
402 }
403 }
404
405 fn handle_linkrequest(
407 &mut self,
408 dest_hash: &[u8; 16],
409 packet: &RawPacket,
410 receiving_interface: rns_core::transport::types::InterfaceId,
411 rng: &mut dyn Rng,
412 ) -> Vec<LinkManagerAction> {
413 let ld = match self.link_destinations.get(dest_hash) {
415 Some(ld) => ld,
416 None => return Vec::new(),
417 };
418
419 let hashable = packet.get_hashable_part();
420 let now = time::now();
421
422 let (engine, lrproof_data) = match LinkEngine::new_responder(
424 &ld.sig_prv,
425 &ld.sig_pub_bytes,
426 &packet.data,
427 &hashable,
428 dest_hash,
429 packet.hops,
430 now,
431 rng,
432 ) {
433 Ok(r) => r,
434 Err(e) => {
435 log::debug!("LINKREQUEST rejected: {}", e);
436 return Vec::new();
437 }
438 };
439
440 let link_id = *engine.link_id();
441 log::debug!(
442 "LINKREQUEST accepted: link={:02x?} iface={} header_type={} transport_id_present={} hops={}",
443 &link_id[..4],
444 receiving_interface.0,
445 packet.flags.header_type,
446 packet.transport_id.is_some(),
447 packet.hops
448 );
449
450 let managed = ManagedLink {
451 engine,
452 channel: None,
453 pending_channel_packets: HashMap::new(),
454 channel_send_ok: 0,
455 channel_send_not_ready: 0,
456 channel_send_too_big: 0,
457 channel_send_other_error: 0,
458 channel_messages_received: 0,
459 channel_proofs_sent: 0,
460 channel_proofs_received: 0,
461 dest_hash: *dest_hash,
462 remote_identity: None,
463 dest_sig_pub_bytes: None,
464 incoming_resources: Vec::new(),
465 outgoing_resources: Vec::new(),
466 resource_strategy: ld.resource_strategy,
467 route_interface: Some(receiving_interface),
468 route_transport_id: if packet.flags.header_type == constants::HEADER_2 {
469 packet.transport_id
470 } else {
471 None
472 },
473 };
474 self.links.insert(link_id, managed);
475
476 let flags = PacketFlags {
478 header_type: constants::HEADER_1,
479 context_flag: constants::FLAG_UNSET,
480 transport_type: constants::TRANSPORT_BROADCAST,
481 destination_type: constants::DESTINATION_LINK,
482 packet_type: constants::PACKET_TYPE_PROOF,
483 };
484
485 let mut actions = Vec::new();
486
487 actions.push(LinkManagerAction::RegisterLinkDest { link_id });
489
490 if let Ok(pkt) = RawPacket::pack(
491 flags,
492 packet.hops,
493 &link_id,
494 None,
495 constants::CONTEXT_LRPROOF,
496 &lrproof_data,
497 ) {
498 log::debug!(
499 "LRPROOF queued: link={:02x?} route_iface={} route_tid_present={} hops={}",
500 &link_id[..4],
501 receiving_interface.0,
502 packet.transport_id.is_some(),
503 packet.hops
504 );
505 actions.push(LinkManagerAction::SendPacket {
506 raw: pkt.raw,
507 dest_type: constants::DESTINATION_LINK,
508 attached_interface: None,
509 });
510 }
511
512 if packet.hops != 0 {
515 if let Ok(pkt0) = RawPacket::pack(
516 flags,
517 0,
518 &link_id,
519 None,
520 constants::CONTEXT_LRPROOF,
521 &lrproof_data,
522 ) {
523 log::debug!(
524 "LRPROOF fallback queued: link={:02x?} route_iface={} hops=0",
525 &link_id[..4],
526 receiving_interface.0
527 );
528 actions.push(LinkManagerAction::SendPacket {
529 raw: pkt0.raw,
530 dest_type: constants::DESTINATION_LINK,
531 attached_interface: None,
532 });
533 }
534 }
535
536 if packet.hops < u8::MAX {
540 let hops_plus_one = packet.hops + 1;
541 if let Ok(pkt2) = RawPacket::pack(
542 flags,
543 hops_plus_one,
544 &link_id,
545 None,
546 constants::CONTEXT_LRPROOF,
547 &lrproof_data,
548 ) {
549 log::debug!(
550 "LRPROOF +1 queued: link={:02x?} route_iface={} hops={}",
551 &link_id[..4],
552 receiving_interface.0,
553 hops_plus_one
554 );
555 actions.push(LinkManagerAction::SendPacket {
556 raw: pkt2.raw,
557 dest_type: constants::DESTINATION_LINK,
558 attached_interface: None,
559 });
560 }
561 }
562
563 actions.push(LinkManagerAction::LinkRequestReceived {
565 link_id,
566 receiving_interface,
567 });
568
569 actions
570 }
571
572 fn handle_link_proof(
573 &mut self,
574 link_id: &LinkId,
575 packet: &RawPacket,
576 rng: &mut dyn Rng,
577 ) -> Vec<LinkManagerAction> {
578 if packet.data.len() < 32 {
579 return Vec::new();
580 }
581
582 let mut tracked_hash = [0u8; 32];
583 tracked_hash.copy_from_slice(&packet.data[..32]);
584
585 let Some(link) = self.links.get_mut(link_id) else {
586 return Vec::new();
587 };
588 let Some(sequence) = link.pending_channel_packets.remove(&tracked_hash) else {
589 return Vec::new();
590 };
591 link.channel_proofs_received += 1;
592 let Some(channel) = link.channel.as_mut() else {
593 return Vec::new();
594 };
595
596 let chan_actions = channel.packet_delivered(sequence);
597 let _ = channel;
598 let _ = link;
599 self.process_channel_actions(link_id, chan_actions, rng)
600 }
601
602 fn build_link_packet_proof(
603 &mut self,
604 link_id: &LinkId,
605 packet_hash: &[u8; 32],
606 ) -> Vec<LinkManagerAction> {
607 let dest_hash = match self.links.get(link_id) {
608 Some(link) => link.dest_hash,
609 None => return Vec::new(),
610 };
611 let Some(ld) = self.link_destinations.get(&dest_hash) else {
612 return Vec::new();
613 };
614 if let Some(link) = self.links.get_mut(link_id) {
615 link.channel_proofs_sent += 1;
616 }
617
618 let signature = ld.sig_prv.sign(packet_hash);
619 let mut proof_data = Vec::with_capacity(96);
620 proof_data.extend_from_slice(packet_hash);
621 proof_data.extend_from_slice(&signature);
622
623 let flags = PacketFlags {
624 header_type: constants::HEADER_1,
625 context_flag: constants::FLAG_UNSET,
626 transport_type: constants::TRANSPORT_BROADCAST,
627 destination_type: constants::DESTINATION_LINK,
628 packet_type: constants::PACKET_TYPE_PROOF,
629 };
630 if let Ok(pkt) = RawPacket::pack(
631 flags,
632 0,
633 link_id,
634 None,
635 constants::CONTEXT_NONE,
636 &proof_data,
637 ) {
638 vec![LinkManagerAction::SendPacket {
639 raw: pkt.raw,
640 dest_type: constants::DESTINATION_LINK,
641 attached_interface: None,
642 }]
643 } else {
644 Vec::new()
645 }
646 }
647
648 fn handle_lrproof(
650 &mut self,
651 link_id_bytes: &[u8; 16],
652 packet: &RawPacket,
653 receiving_interface: rns_core::transport::types::InterfaceId,
654 rng: &mut dyn Rng,
655 ) -> Vec<LinkManagerAction> {
656 let link = match self.links.get_mut(link_id_bytes) {
657 Some(l) => l,
658 None => return Vec::new(),
659 };
660
661 link.route_interface = Some(receiving_interface);
662 if packet.flags.header_type == constants::HEADER_2 {
663 if let Some(transport_id) = packet.transport_id {
664 link.route_transport_id = Some(transport_id);
665 }
666 }
667 log::debug!(
668 "LRPROOF received: link={:02x?} iface={} header_type={} transport_id_present={}",
669 &link_id_bytes[..4],
670 receiving_interface.0,
671 packet.flags.header_type,
672 packet.transport_id.is_some()
673 );
674
675 if link.engine.state() != LinkState::Pending || !link.engine.is_initiator() {
676 return Vec::new();
677 }
678
679 let dest_sig_pub_bytes = match link.dest_sig_pub_bytes {
681 Some(b) => b,
682 None => {
683 log::debug!("LRPROOF: no destination signing key available");
684 return Vec::new();
685 }
686 };
687
688 let now = time::now();
689 let (lrrtt_encrypted, link_actions) =
690 match link
691 .engine
692 .handle_lrproof(&packet.data, &dest_sig_pub_bytes, now, rng)
693 {
694 Ok(r) => r,
695 Err(e) => {
696 log::debug!("LRPROOF validation failed: {}", e);
697 return Vec::new();
698 }
699 };
700
701 let link_id = *link.engine.link_id();
702 let mut actions = Vec::new();
703
704 actions.extend(self.process_link_actions(&link_id, &link_actions));
706
707 let flags = PacketFlags {
709 header_type: constants::HEADER_1,
710 context_flag: constants::FLAG_UNSET,
711 transport_type: constants::TRANSPORT_BROADCAST,
712 destination_type: constants::DESTINATION_LINK,
713 packet_type: constants::PACKET_TYPE_DATA,
714 };
715
716 if let Ok(pkt) = RawPacket::pack(
717 flags,
718 0,
719 &link_id,
720 None,
721 constants::CONTEXT_LRRTT,
722 &lrrtt_encrypted,
723 ) {
724 actions.push(LinkManagerAction::SendPacket {
725 raw: pkt.raw,
726 dest_type: constants::DESTINATION_LINK,
727 attached_interface: None,
728 });
729 }
730
731 if let Some(link) = self.links.get_mut(&link_id) {
733 if link.engine.state() == LinkState::Active {
734 let rtt = link.engine.rtt().unwrap_or(1.0);
735 link.channel = Some(Channel::new(rtt));
736 }
737 }
738
739 actions
740 }
741
742 fn handle_link_data(
748 &mut self,
749 link_id_bytes: &[u8; 16],
750 packet: &RawPacket,
751 packet_hash: [u8; 32],
752 receiving_interface: rns_core::transport::types::InterfaceId,
753 rng: &mut dyn Rng,
754 ) -> Vec<LinkManagerAction> {
755 enum LinkDataResult {
757 Lrrtt {
758 link_id: LinkId,
759 link_actions: Vec<LinkAction>,
760 },
761 Identify {
762 link_id: LinkId,
763 link_actions: Vec<LinkAction>,
764 },
765 Keepalive {
766 link_id: LinkId,
767 inbound_actions: Vec<LinkAction>,
768 },
769 LinkClose {
770 link_id: LinkId,
771 teardown_actions: Vec<LinkAction>,
772 },
773 Channel {
774 link_id: LinkId,
775 inbound_actions: Vec<LinkAction>,
776 plaintext: Vec<u8>,
777 packet_hash: [u8; 32],
778 },
779 Request {
780 link_id: LinkId,
781 inbound_actions: Vec<LinkAction>,
782 plaintext: Vec<u8>,
783 request_id: [u8; 16],
784 },
785 Response {
786 link_id: LinkId,
787 inbound_actions: Vec<LinkAction>,
788 plaintext: Vec<u8>,
789 },
790 Generic {
791 link_id: LinkId,
792 inbound_actions: Vec<LinkAction>,
793 plaintext: Vec<u8>,
794 context: u8,
795 packet_hash: [u8; 32],
796 },
797 ResourceAdv {
799 link_id: LinkId,
800 inbound_actions: Vec<LinkAction>,
801 plaintext: Vec<u8>,
802 },
803 ResourceReq {
805 link_id: LinkId,
806 inbound_actions: Vec<LinkAction>,
807 plaintext: Vec<u8>,
808 },
809 ResourceHmu {
811 link_id: LinkId,
812 inbound_actions: Vec<LinkAction>,
813 plaintext: Vec<u8>,
814 },
815 ResourcePart {
817 link_id: LinkId,
818 inbound_actions: Vec<LinkAction>,
819 raw_data: Vec<u8>,
820 },
821 ResourcePrf {
823 link_id: LinkId,
824 inbound_actions: Vec<LinkAction>,
825 plaintext: Vec<u8>,
826 },
827 ResourceIcl {
829 link_id: LinkId,
830 inbound_actions: Vec<LinkAction>,
831 },
832 ResourceRcl {
834 link_id: LinkId,
835 inbound_actions: Vec<LinkAction>,
836 },
837 Error,
838 }
839
840 let result = {
841 let link = match self.links.get_mut(link_id_bytes) {
842 Some(l) => l,
843 None => return Vec::new(),
844 };
845
846 link.route_interface = Some(receiving_interface);
847 if packet.flags.header_type == constants::HEADER_2 {
848 if let Some(transport_id) = packet.transport_id {
849 link.route_transport_id = Some(transport_id);
850 }
851 }
852
853 match packet.context {
854 constants::CONTEXT_LRRTT => {
855 match link.engine.handle_lrrtt(&packet.data, time::now()) {
856 Ok(link_actions) => {
857 let link_id = *link.engine.link_id();
858 LinkDataResult::Lrrtt {
859 link_id,
860 link_actions,
861 }
862 }
863 Err(e) => {
864 log::debug!("LRRTT handling failed: {}", e);
865 LinkDataResult::Error
866 }
867 }
868 }
869 constants::CONTEXT_LINKIDENTIFY => {
870 match link.engine.handle_identify(&packet.data) {
871 Ok(link_actions) => {
872 let link_id = *link.engine.link_id();
873 link.remote_identity = link.engine.remote_identity().cloned();
874 LinkDataResult::Identify {
875 link_id,
876 link_actions,
877 }
878 }
879 Err(e) => {
880 log::debug!("LINKIDENTIFY failed: {}", e);
881 LinkDataResult::Error
882 }
883 }
884 }
885 constants::CONTEXT_KEEPALIVE => {
886 let inbound_actions = link.engine.record_inbound(time::now());
887 let link_id = *link.engine.link_id();
888 LinkDataResult::Keepalive {
889 link_id,
890 inbound_actions,
891 }
892 }
893 constants::CONTEXT_LINKCLOSE => {
894 let teardown_actions = link.engine.handle_teardown();
895 let link_id = *link.engine.link_id();
896 LinkDataResult::LinkClose {
897 link_id,
898 teardown_actions,
899 }
900 }
901 constants::CONTEXT_CHANNEL => match link.engine.decrypt(&packet.data) {
902 Ok(plaintext) => {
903 let inbound_actions = link.engine.record_inbound(time::now());
904 let link_id = *link.engine.link_id();
905 LinkDataResult::Channel {
906 link_id,
907 inbound_actions,
908 plaintext,
909 packet_hash,
910 }
911 }
912 Err(_) => LinkDataResult::Error,
913 },
914 constants::CONTEXT_REQUEST => match link.engine.decrypt(&packet.data) {
915 Ok(plaintext) => {
916 let inbound_actions = link.engine.record_inbound(time::now());
917 let link_id = *link.engine.link_id();
918 let request_id = packet.get_truncated_hash();
919 LinkDataResult::Request {
920 link_id,
921 inbound_actions,
922 plaintext,
923 request_id,
924 }
925 }
926 Err(_) => LinkDataResult::Error,
927 },
928 constants::CONTEXT_RESPONSE => match link.engine.decrypt(&packet.data) {
929 Ok(plaintext) => {
930 let inbound_actions = link.engine.record_inbound(time::now());
931 let link_id = *link.engine.link_id();
932 LinkDataResult::Response {
933 link_id,
934 inbound_actions,
935 plaintext,
936 }
937 }
938 Err(_) => LinkDataResult::Error,
939 },
940 constants::CONTEXT_RESOURCE_ADV => match link.engine.decrypt(&packet.data) {
942 Ok(plaintext) => {
943 let inbound_actions = link.engine.record_inbound(time::now());
944 let link_id = *link.engine.link_id();
945 LinkDataResult::ResourceAdv {
946 link_id,
947 inbound_actions,
948 plaintext,
949 }
950 }
951 Err(_) => LinkDataResult::Error,
952 },
953 constants::CONTEXT_RESOURCE_REQ => match link.engine.decrypt(&packet.data) {
954 Ok(plaintext) => {
955 let inbound_actions = link.engine.record_inbound(time::now());
956 let link_id = *link.engine.link_id();
957 LinkDataResult::ResourceReq {
958 link_id,
959 inbound_actions,
960 plaintext,
961 }
962 }
963 Err(_) => LinkDataResult::Error,
964 },
965 constants::CONTEXT_RESOURCE_HMU => match link.engine.decrypt(&packet.data) {
966 Ok(plaintext) => {
967 let inbound_actions = link.engine.record_inbound(time::now());
968 let link_id = *link.engine.link_id();
969 LinkDataResult::ResourceHmu {
970 link_id,
971 inbound_actions,
972 plaintext,
973 }
974 }
975 Err(_) => LinkDataResult::Error,
976 },
977 constants::CONTEXT_RESOURCE => {
978 let inbound_actions = link.engine.record_inbound(time::now());
980 let link_id = *link.engine.link_id();
981 LinkDataResult::ResourcePart {
982 link_id,
983 inbound_actions,
984 raw_data: packet.data.clone(),
985 }
986 }
987 constants::CONTEXT_RESOURCE_PRF => match link.engine.decrypt(&packet.data) {
988 Ok(plaintext) => {
989 let inbound_actions = link.engine.record_inbound(time::now());
990 let link_id = *link.engine.link_id();
991 LinkDataResult::ResourcePrf {
992 link_id,
993 inbound_actions,
994 plaintext,
995 }
996 }
997 Err(_) => LinkDataResult::Error,
998 },
999 constants::CONTEXT_RESOURCE_ICL => {
1000 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1002 let link_id = *link.engine.link_id();
1003 LinkDataResult::ResourceIcl {
1004 link_id,
1005 inbound_actions,
1006 }
1007 }
1008 constants::CONTEXT_RESOURCE_RCL => {
1009 let _ = link.engine.decrypt(&packet.data); let inbound_actions = link.engine.record_inbound(time::now());
1011 let link_id = *link.engine.link_id();
1012 LinkDataResult::ResourceRcl {
1013 link_id,
1014 inbound_actions,
1015 }
1016 }
1017 _ => match link.engine.decrypt(&packet.data) {
1018 Ok(plaintext) => {
1019 let inbound_actions = link.engine.record_inbound(time::now());
1020 let link_id = *link.engine.link_id();
1021 LinkDataResult::Generic {
1022 link_id,
1023 inbound_actions,
1024 plaintext,
1025 context: packet.context,
1026 packet_hash,
1027 }
1028 }
1029 Err(_) => LinkDataResult::Error,
1030 },
1031 }
1032 }; let mut actions = Vec::new();
1036 match result {
1037 LinkDataResult::Lrrtt {
1038 link_id,
1039 link_actions,
1040 } => {
1041 actions.extend(self.process_link_actions(&link_id, &link_actions));
1042 if let Some(link) = self.links.get_mut(&link_id) {
1044 if link.engine.state() == LinkState::Active {
1045 let rtt = link.engine.rtt().unwrap_or(1.0);
1046 link.channel = Some(Channel::new(rtt));
1047 }
1048 }
1049 }
1050 LinkDataResult::Identify {
1051 link_id,
1052 link_actions,
1053 } => {
1054 actions.extend(self.process_link_actions(&link_id, &link_actions));
1055 }
1056 LinkDataResult::Keepalive {
1057 link_id,
1058 inbound_actions,
1059 } => {
1060 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1061 }
1067 LinkDataResult::LinkClose {
1068 link_id,
1069 teardown_actions,
1070 } => {
1071 actions.extend(self.process_link_actions(&link_id, &teardown_actions));
1072 }
1073 LinkDataResult::Channel {
1074 link_id,
1075 inbound_actions,
1076 plaintext,
1077 packet_hash,
1078 } => {
1079 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1080 if let Some(link) = self.links.get_mut(&link_id) {
1082 if let Some(ref mut channel) = link.channel {
1083 let chan_actions = channel.receive(&plaintext, time::now());
1084 link.channel_messages_received += chan_actions
1085 .iter()
1086 .filter(|action| {
1087 matches!(
1088 action,
1089 rns_core::channel::ChannelAction::MessageReceived { .. }
1090 )
1091 })
1092 .count()
1093 as u64;
1094 let _ = link;
1096 actions.extend(self.process_channel_actions(&link_id, chan_actions, rng));
1097 }
1098 }
1099 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1100 }
1101 LinkDataResult::Request {
1102 link_id,
1103 inbound_actions,
1104 plaintext,
1105 request_id,
1106 } => {
1107 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1108 actions.extend(self.handle_request(&link_id, &plaintext, request_id, rng));
1109 }
1110 LinkDataResult::Response {
1111 link_id,
1112 inbound_actions,
1113 plaintext,
1114 } => {
1115 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1116 actions.extend(self.handle_response(&link_id, &plaintext));
1118 }
1119 LinkDataResult::Generic {
1120 link_id,
1121 inbound_actions,
1122 plaintext,
1123 context,
1124 packet_hash,
1125 } => {
1126 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1127 actions.push(LinkManagerAction::LinkDataReceived {
1128 link_id,
1129 context,
1130 data: plaintext,
1131 });
1132
1133 actions.extend(self.build_link_packet_proof(&link_id, &packet_hash));
1134 }
1135 LinkDataResult::ResourceAdv {
1136 link_id,
1137 inbound_actions,
1138 plaintext,
1139 } => {
1140 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1141 actions.extend(self.handle_resource_adv(&link_id, &plaintext, rng));
1142 }
1143 LinkDataResult::ResourceReq {
1144 link_id,
1145 inbound_actions,
1146 plaintext,
1147 } => {
1148 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1149 actions.extend(self.handle_resource_req(&link_id, &plaintext, rng));
1150 }
1151 LinkDataResult::ResourceHmu {
1152 link_id,
1153 inbound_actions,
1154 plaintext,
1155 } => {
1156 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1157 actions.extend(self.handle_resource_hmu(&link_id, &plaintext, rng));
1158 }
1159 LinkDataResult::ResourcePart {
1160 link_id,
1161 inbound_actions,
1162 raw_data,
1163 } => {
1164 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1165 actions.extend(self.handle_resource_part(&link_id, &raw_data, rng));
1166 }
1167 LinkDataResult::ResourcePrf {
1168 link_id,
1169 inbound_actions,
1170 plaintext,
1171 } => {
1172 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1173 actions.extend(self.handle_resource_prf(&link_id, &plaintext));
1174 }
1175 LinkDataResult::ResourceIcl {
1176 link_id,
1177 inbound_actions,
1178 } => {
1179 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1180 actions.extend(self.handle_resource_icl(&link_id));
1181 }
1182 LinkDataResult::ResourceRcl {
1183 link_id,
1184 inbound_actions,
1185 } => {
1186 actions.extend(self.process_link_actions(&link_id, &inbound_actions));
1187 actions.extend(self.handle_resource_rcl(&link_id));
1188 }
1189 LinkDataResult::Error => {}
1190 }
1191
1192 actions
1193 }
1194
1195 fn handle_request(
1197 &mut self,
1198 link_id: &LinkId,
1199 plaintext: &[u8],
1200 request_id: [u8; 16],
1201 rng: &mut dyn Rng,
1202 ) -> Vec<LinkManagerAction> {
1203 use rns_core::msgpack::{self, Value};
1204
1205 let arr = match msgpack::unpack_exact(plaintext) {
1207 Ok(Value::Array(arr)) if arr.len() >= 3 => arr,
1208 _ => return Vec::new(),
1209 };
1210
1211 let path_hash_bytes = match &arr[1] {
1212 Value::Bin(b) if b.len() == 16 => b,
1213 _ => return Vec::new(),
1214 };
1215 let mut path_hash = [0u8; 16];
1216 path_hash.copy_from_slice(path_hash_bytes);
1217
1218 let request_data = msgpack::pack(&arr[2]);
1220
1221 if self.management_paths.contains(&path_hash) {
1223 let remote_identity = self
1224 .links
1225 .get(link_id)
1226 .and_then(|l| l.remote_identity)
1227 .map(|(h, k)| (h, k));
1228 return vec![LinkManagerAction::ManagementRequest {
1229 link_id: *link_id,
1230 path_hash,
1231 data: request_data,
1232 request_id,
1233 remote_identity,
1234 }];
1235 }
1236
1237 let handler_idx = self
1239 .request_handlers
1240 .iter()
1241 .position(|h| h.path_hash == path_hash);
1242 let handler_idx = match handler_idx {
1243 Some(i) => i,
1244 None => return Vec::new(),
1245 };
1246
1247 let remote_identity = self
1249 .links
1250 .get(link_id)
1251 .and_then(|l| l.remote_identity.as_ref());
1252 let handler = &self.request_handlers[handler_idx];
1253 if let Some(ref allowed) = handler.allowed_list {
1254 match remote_identity {
1255 Some((identity_hash, _)) => {
1256 if !allowed.contains(identity_hash) {
1257 log::debug!("Request denied: identity not in allowed list");
1258 return Vec::new();
1259 }
1260 }
1261 None => {
1262 log::debug!("Request denied: peer not identified");
1263 return Vec::new();
1264 }
1265 }
1266 }
1267
1268 let path = handler.path.clone();
1270 let response = (handler.handler)(*link_id, &path, &request_data, remote_identity);
1271
1272 let mut actions = Vec::new();
1273 if let Some(response_data) = response {
1274 let mut response_actions =
1275 self.build_response_packet(link_id, &request_id, &response_data, rng);
1276 if response_actions.is_empty() {
1277 response_actions.extend(self.send_response_resource(
1278 link_id,
1279 &request_id,
1280 &response_data,
1281 rng,
1282 ));
1283 }
1284 actions.extend(response_actions);
1285 }
1286
1287 actions
1288 }
1289
1290 fn build_response_packet(
1293 &self,
1294 link_id: &LinkId,
1295 request_id: &[u8; 16],
1296 response_data: &[u8],
1297 rng: &mut dyn Rng,
1298 ) -> Vec<LinkManagerAction> {
1299 use rns_core::msgpack::{self, Value};
1300
1301 let response_value = msgpack::unpack_exact(response_data)
1302 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1303
1304 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1305 let response_plaintext = msgpack::pack(&response_array);
1306
1307 let mut actions = Vec::new();
1308 if let Some(link) = self.links.get(link_id) {
1309 if let Ok(encrypted) = link.engine.encrypt(&response_plaintext, rng) {
1310 let flags = PacketFlags {
1311 header_type: constants::HEADER_1,
1312 context_flag: constants::FLAG_UNSET,
1313 transport_type: constants::TRANSPORT_BROADCAST,
1314 destination_type: constants::DESTINATION_LINK,
1315 packet_type: constants::PACKET_TYPE_DATA,
1316 };
1317 let max_mtu = link.engine.mtu() as usize;
1318 if let Ok(pkt) = RawPacket::pack_with_max_mtu(
1319 flags,
1320 0,
1321 link_id,
1322 None,
1323 constants::CONTEXT_RESPONSE,
1324 &encrypted,
1325 max_mtu,
1326 ) {
1327 actions.push(LinkManagerAction::SendPacket {
1328 raw: pkt.raw,
1329 dest_type: constants::DESTINATION_LINK,
1330 attached_interface: None,
1331 });
1332 }
1333 }
1334 }
1335 actions
1336 }
1337
1338 fn send_response_resource(
1339 &mut self,
1340 link_id: &LinkId,
1341 request_id: &[u8; 16],
1342 response_data: &[u8],
1343 rng: &mut dyn Rng,
1344 ) -> Vec<LinkManagerAction> {
1345 use rns_core::msgpack::{self, Value};
1346
1347 let link = match self.links.get_mut(link_id) {
1348 Some(l) => l,
1349 None => return Vec::new(),
1350 };
1351
1352 if link.engine.state() != LinkState::Active {
1353 return Vec::new();
1354 }
1355
1356 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1357 let resource_sdu = Self::resource_sdu_for_link(link);
1358 let now = time::now();
1359
1360 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
1361 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
1362 link.engine
1363 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
1364 .unwrap_or_else(|_| plaintext.to_vec())
1365 };
1366
1367 let response_value = msgpack::unpack_exact(response_data)
1371 .unwrap_or_else(|_| Value::Bin(response_data.to_vec()));
1372 let response_array = Value::Array(vec![Value::Bin(request_id.to_vec()), response_value]);
1373 let resource_payload = msgpack::pack(&response_array);
1374
1375 let sender = match ResourceSender::new(
1376 &resource_payload,
1377 None,
1378 resource_sdu,
1379 &encrypt_fn,
1380 &Bzip2Compressor,
1381 rng,
1382 now,
1383 true, true, Some(request_id.to_vec()),
1386 1, 1, None, link_rtt,
1390 6.0, ) {
1392 Ok(s) => s,
1393 Err(e) => {
1394 log::debug!("Failed to create response ResourceSender: {}", e);
1395 return Vec::new();
1396 }
1397 };
1398
1399 let mut sender = sender;
1400 let adv_actions = sender.advertise(now);
1401 link.outgoing_resources.push(sender);
1402
1403 let _ = link;
1404 self.process_resource_actions(link_id, adv_actions, rng)
1405 }
1406
1407 pub fn send_management_response(
1410 &mut self,
1411 link_id: &LinkId,
1412 request_id: &[u8; 16],
1413 response_data: &[u8],
1414 rng: &mut dyn Rng,
1415 ) -> Vec<LinkManagerAction> {
1416 let mut actions = self.build_response_packet(link_id, request_id, response_data, rng);
1417 if actions.is_empty() {
1418 actions.extend(self.send_response_resource(
1419 link_id,
1420 request_id,
1421 response_data,
1422 rng,
1423 ));
1424 }
1425 actions
1426 }
1427
1428 pub fn send_request(
1436 &self,
1437 link_id: &LinkId,
1438 path: &str,
1439 data: &[u8],
1440 rng: &mut dyn Rng,
1441 ) -> Vec<LinkManagerAction> {
1442 use rns_core::msgpack::{self, Value};
1443
1444 let link = match self.links.get(link_id) {
1445 Some(l) => l,
1446 None => return Vec::new(),
1447 };
1448
1449 if link.engine.state() != LinkState::Active {
1450 return Vec::new();
1451 }
1452
1453 let path_hash = compute_path_hash(path);
1454
1455 let data_value = msgpack::unpack_exact(data).unwrap_or_else(|_| Value::Bin(data.to_vec()));
1457
1458 let request_array = Value::Array(vec![
1460 Value::Float(time::now()),
1461 Value::Bin(path_hash.to_vec()),
1462 data_value,
1463 ]);
1464 let plaintext = msgpack::pack(&request_array);
1465
1466 let encrypted = match link.engine.encrypt(&plaintext, rng) {
1467 Ok(e) => e,
1468 Err(_) => return Vec::new(),
1469 };
1470
1471 let flags = PacketFlags {
1472 header_type: constants::HEADER_1,
1473 context_flag: constants::FLAG_UNSET,
1474 transport_type: constants::TRANSPORT_BROADCAST,
1475 destination_type: constants::DESTINATION_LINK,
1476 packet_type: constants::PACKET_TYPE_DATA,
1477 };
1478
1479 let mut actions = Vec::new();
1480 if let Ok(pkt) = RawPacket::pack(
1481 flags,
1482 0,
1483 link_id,
1484 None,
1485 constants::CONTEXT_REQUEST,
1486 &encrypted,
1487 ) {
1488 actions.push(LinkManagerAction::SendPacket {
1489 raw: pkt.raw,
1490 dest_type: constants::DESTINATION_LINK,
1491 attached_interface: None,
1492 });
1493 }
1494 actions
1495 }
1496
1497 pub fn send_on_link(
1499 &self,
1500 link_id: &LinkId,
1501 plaintext: &[u8],
1502 context: u8,
1503 rng: &mut dyn Rng,
1504 ) -> Vec<LinkManagerAction> {
1505 let link = match self.links.get(link_id) {
1506 Some(l) => l,
1507 None => return Vec::new(),
1508 };
1509
1510 if link.engine.state() != LinkState::Active {
1511 return Vec::new();
1512 }
1513
1514 let encrypted = match link.engine.encrypt(plaintext, rng) {
1515 Ok(e) => e,
1516 Err(_) => return Vec::new(),
1517 };
1518
1519 let flags = PacketFlags {
1520 header_type: constants::HEADER_1,
1521 context_flag: constants::FLAG_UNSET,
1522 transport_type: constants::TRANSPORT_BROADCAST,
1523 destination_type: constants::DESTINATION_LINK,
1524 packet_type: constants::PACKET_TYPE_DATA,
1525 };
1526
1527 let mut actions = Vec::new();
1528 if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, context, &encrypted) {
1529 actions.push(LinkManagerAction::SendPacket {
1530 raw: pkt.raw,
1531 dest_type: constants::DESTINATION_LINK,
1532 attached_interface: None,
1533 });
1534 }
1535 actions
1536 }
1537
1538 pub fn identify(
1540 &self,
1541 link_id: &LinkId,
1542 identity: &rns_crypto::identity::Identity,
1543 rng: &mut dyn Rng,
1544 ) -> Vec<LinkManagerAction> {
1545 let link = match self.links.get(link_id) {
1546 Some(l) => l,
1547 None => return Vec::new(),
1548 };
1549
1550 let encrypted = match link.engine.build_identify(identity, rng) {
1551 Ok(e) => e,
1552 Err(_) => return Vec::new(),
1553 };
1554
1555 let flags = PacketFlags {
1556 header_type: constants::HEADER_1,
1557 context_flag: constants::FLAG_UNSET,
1558 transport_type: constants::TRANSPORT_BROADCAST,
1559 destination_type: constants::DESTINATION_LINK,
1560 packet_type: constants::PACKET_TYPE_DATA,
1561 };
1562
1563 let mut actions = Vec::new();
1564 if let Ok(pkt) = RawPacket::pack(
1565 flags,
1566 0,
1567 link_id,
1568 None,
1569 constants::CONTEXT_LINKIDENTIFY,
1570 &encrypted,
1571 ) {
1572 actions.push(LinkManagerAction::SendPacket {
1573 raw: pkt.raw,
1574 dest_type: constants::DESTINATION_LINK,
1575 attached_interface: None,
1576 });
1577 }
1578 actions
1579 }
1580
1581 pub fn teardown_link(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1583 let link = match self.links.get_mut(link_id) {
1584 Some(l) => l,
1585 None => return Vec::new(),
1586 };
1587
1588 let teardown_actions = link.engine.teardown();
1589 if let Some(ref mut channel) = link.channel {
1590 channel.shutdown();
1591 }
1592
1593 let mut actions = self.process_link_actions(link_id, &teardown_actions);
1594
1595 let flags = PacketFlags {
1597 header_type: constants::HEADER_1,
1598 context_flag: constants::FLAG_UNSET,
1599 transport_type: constants::TRANSPORT_BROADCAST,
1600 destination_type: constants::DESTINATION_LINK,
1601 packet_type: constants::PACKET_TYPE_DATA,
1602 };
1603 if let Ok(pkt) = RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_LINKCLOSE, &[])
1604 {
1605 actions.push(LinkManagerAction::SendPacket {
1606 raw: pkt.raw,
1607 dest_type: constants::DESTINATION_LINK,
1608 attached_interface: None,
1609 });
1610 }
1611
1612 actions
1613 }
1614
1615 pub fn teardown_all_links(&mut self) -> Vec<LinkManagerAction> {
1617 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
1618 let mut actions = Vec::new();
1619 for link_id in link_ids {
1620 actions.extend(self.teardown_link(&link_id));
1621 }
1622 actions
1623 }
1624
1625 fn handle_response(&self, link_id: &LinkId, plaintext: &[u8]) -> Vec<LinkManagerAction> {
1627 use rns_core::msgpack;
1628
1629 let arr = match msgpack::unpack_exact(plaintext) {
1631 Ok(msgpack::Value::Array(arr)) if arr.len() >= 2 => arr,
1632 _ => return Vec::new(),
1633 };
1634
1635 let request_id_bytes = match &arr[0] {
1636 msgpack::Value::Bin(b) if b.len() == 16 => b,
1637 _ => return Vec::new(),
1638 };
1639 let mut request_id = [0u8; 16];
1640 request_id.copy_from_slice(request_id_bytes);
1641
1642 let response_data = msgpack::pack(&arr[1]);
1643
1644 vec![LinkManagerAction::ResponseReceived {
1645 link_id: *link_id,
1646 request_id,
1647 data: response_data,
1648 }]
1649 }
1650
1651 fn handle_resource_adv(
1653 &mut self,
1654 link_id: &LinkId,
1655 adv_plaintext: &[u8],
1656 rng: &mut dyn Rng,
1657 ) -> Vec<LinkManagerAction> {
1658 let link = match self.links.get_mut(link_id) {
1659 Some(l) => l,
1660 None => return Vec::new(),
1661 };
1662
1663 let link_rtt = link.engine.rtt().unwrap_or(1.0);
1664 let resource_sdu = Self::resource_sdu_for_link(link);
1665 let now = time::now();
1666
1667 let receiver = match ResourceReceiver::from_advertisement(
1668 adv_plaintext,
1669 resource_sdu,
1670 link_rtt,
1671 now,
1672 None,
1673 None,
1674 ) {
1675 Ok(r) => r,
1676 Err(e) => {
1677 log::debug!("Resource ADV rejected: {}", e);
1678 return Vec::new();
1679 }
1680 };
1681
1682 let strategy = link.resource_strategy;
1683 let resource_hash = receiver.resource_hash.clone();
1684 let transfer_size = receiver.transfer_size;
1685 let has_metadata = receiver.has_metadata;
1686 let is_response = receiver.flags.is_response;
1687
1688 if is_response {
1689 link.incoming_resources.push(receiver);
1692 let idx = link.incoming_resources.len() - 1;
1693 let resource_actions = link.incoming_resources[idx].accept(now);
1694 let _ = link;
1695 return self.process_resource_actions(link_id, resource_actions, rng);
1696 }
1697
1698 match strategy {
1699 ResourceStrategy::AcceptNone => {
1700 let reject_actions = {
1702 let mut r = receiver;
1703 r.reject()
1704 };
1705 self.process_resource_actions(link_id, reject_actions, rng)
1706 }
1707 ResourceStrategy::AcceptAll => {
1708 link.incoming_resources.push(receiver);
1709 let idx = link.incoming_resources.len() - 1;
1710 let resource_actions = link.incoming_resources[idx].accept(now);
1711 let _ = link;
1712 self.process_resource_actions(link_id, resource_actions, rng)
1713 }
1714 ResourceStrategy::AcceptApp => {
1715 link.incoming_resources.push(receiver);
1716 vec![LinkManagerAction::ResourceAcceptQuery {
1718 link_id: *link_id,
1719 resource_hash,
1720 transfer_size,
1721 has_metadata,
1722 }]
1723 }
1724 }
1725 }
1726
1727 pub fn accept_resource(
1729 &mut self,
1730 link_id: &LinkId,
1731 resource_hash: &[u8],
1732 accept: bool,
1733 rng: &mut dyn Rng,
1734 ) -> Vec<LinkManagerAction> {
1735 let link = match self.links.get_mut(link_id) {
1736 Some(l) => l,
1737 None => return Vec::new(),
1738 };
1739
1740 let now = time::now();
1741 let idx = link
1742 .incoming_resources
1743 .iter()
1744 .position(|r| r.resource_hash == resource_hash);
1745 let idx = match idx {
1746 Some(i) => i,
1747 None => return Vec::new(),
1748 };
1749
1750 let resource_actions = if accept {
1751 link.incoming_resources[idx].accept(now)
1752 } else {
1753 link.incoming_resources[idx].reject()
1754 };
1755
1756 let _ = link;
1757 self.process_resource_actions(link_id, resource_actions, rng)
1758 }
1759
1760 fn handle_resource_req(
1762 &mut self,
1763 link_id: &LinkId,
1764 plaintext: &[u8],
1765 rng: &mut dyn Rng,
1766 ) -> Vec<LinkManagerAction> {
1767 let link = match self.links.get_mut(link_id) {
1768 Some(l) => l,
1769 None => return Vec::new(),
1770 };
1771
1772 let now = time::now();
1773 let mut all_actions = Vec::new();
1774 for sender in &mut link.outgoing_resources {
1775 let resource_actions = sender.handle_request(plaintext, now);
1776 if !resource_actions.is_empty() {
1777 all_actions.extend(resource_actions);
1778 break;
1779 }
1780 }
1781
1782 let _ = link;
1783 self.process_resource_actions(link_id, all_actions, rng)
1784 }
1785
1786 fn handle_resource_hmu(
1788 &mut self,
1789 link_id: &LinkId,
1790 plaintext: &[u8],
1791 rng: &mut dyn Rng,
1792 ) -> Vec<LinkManagerAction> {
1793 let link = match self.links.get_mut(link_id) {
1794 Some(l) => l,
1795 None => return Vec::new(),
1796 };
1797
1798 let now = time::now();
1799 let mut all_actions = Vec::new();
1800 for receiver in &mut link.incoming_resources {
1801 let resource_actions = receiver.handle_hashmap_update(plaintext, now);
1802 if !resource_actions.is_empty() {
1803 all_actions.extend(resource_actions);
1804 break;
1805 }
1806 }
1807
1808 let _ = link;
1809 self.process_resource_actions(link_id, all_actions, rng)
1810 }
1811
1812 fn handle_resource_part(
1814 &mut self,
1815 link_id: &LinkId,
1816 raw_data: &[u8],
1817 rng: &mut dyn Rng,
1818 ) -> Vec<LinkManagerAction> {
1819 let link = match self.links.get_mut(link_id) {
1820 Some(l) => l,
1821 None => return Vec::new(),
1822 };
1823
1824 let now = time::now();
1825 let mut all_actions = Vec::new();
1826 let mut assemble_idx = None;
1827 let mut assembled_is_response = false;
1828
1829 for (idx, receiver) in link.incoming_resources.iter_mut().enumerate() {
1830 let resource_actions = receiver.receive_part(raw_data, now);
1831 if !resource_actions.is_empty() {
1832 if receiver.received_count == receiver.total_parts {
1833 assemble_idx = Some(idx);
1834 }
1835 all_actions.extend(resource_actions);
1836 break;
1837 }
1838 }
1839
1840 if let Some(idx) = assemble_idx {
1841 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
1842 link.engine.decrypt(ciphertext).map_err(|_| ())
1843 };
1844 let assemble_actions =
1845 link.incoming_resources[idx].assemble(&decrypt_fn, &Bzip2Compressor);
1846 assembled_is_response = link.incoming_resources[idx].flags.is_response;
1847 all_actions.extend(assemble_actions);
1848 }
1849
1850 let _ = link;
1851 let mut out = self.process_resource_actions(link_id, all_actions, rng);
1852
1853 if assembled_is_response {
1854 let mut converted = Vec::new();
1855 for action in out {
1856 match action {
1857 LinkManagerAction::ResourceReceived { data, .. } => {
1858 converted.extend(self.handle_response(link_id, &data));
1859 }
1860 LinkManagerAction::ResourceAcceptQuery { .. } => {
1861 }
1863 other => converted.push(other),
1864 }
1865 }
1866 out = converted;
1867 }
1868
1869 out
1870 }
1871
1872 fn handle_resource_prf(
1874 &mut self,
1875 link_id: &LinkId,
1876 plaintext: &[u8],
1877 ) -> Vec<LinkManagerAction> {
1878 let link = match self.links.get_mut(link_id) {
1879 Some(l) => l,
1880 None => return Vec::new(),
1881 };
1882
1883 let now = time::now();
1884 let mut result_actions = Vec::new();
1885 for sender in &mut link.outgoing_resources {
1886 let resource_actions = sender.handle_proof(plaintext, now);
1887 if !resource_actions.is_empty() {
1888 result_actions.extend(resource_actions);
1889 break;
1890 }
1891 }
1892
1893 let mut actions = Vec::new();
1895 for ra in result_actions {
1896 match ra {
1897 ResourceAction::Completed => {
1898 actions.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
1899 }
1900 ResourceAction::Failed(e) => {
1901 actions.push(LinkManagerAction::ResourceFailed {
1902 link_id: *link_id,
1903 error: format!("{}", e),
1904 });
1905 }
1906 _ => {}
1907 }
1908 }
1909
1910 link.outgoing_resources
1912 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1913
1914 actions
1915 }
1916
1917 fn handle_resource_icl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1919 let link = match self.links.get_mut(link_id) {
1920 Some(l) => l,
1921 None => return Vec::new(),
1922 };
1923
1924 let mut actions = Vec::new();
1925 for receiver in &mut link.incoming_resources {
1926 let ra = receiver.handle_cancel();
1927 for a in ra {
1928 if let ResourceAction::Failed(ref e) = a {
1929 actions.push(LinkManagerAction::ResourceFailed {
1930 link_id: *link_id,
1931 error: format!("{}", e),
1932 });
1933 }
1934 }
1935 }
1936 link.incoming_resources
1937 .retain(|r| r.status < rns_core::resource::ResourceStatus::Complete);
1938 actions
1939 }
1940
1941 fn handle_resource_rcl(&mut self, link_id: &LinkId) -> Vec<LinkManagerAction> {
1943 let link = match self.links.get_mut(link_id) {
1944 Some(l) => l,
1945 None => return Vec::new(),
1946 };
1947
1948 let mut actions = Vec::new();
1949 for sender in &mut link.outgoing_resources {
1950 let ra = sender.handle_reject();
1951 for a in ra {
1952 if let ResourceAction::Failed(ref e) = a {
1953 actions.push(LinkManagerAction::ResourceFailed {
1954 link_id: *link_id,
1955 error: format!("{}", e),
1956 });
1957 }
1958 }
1959 }
1960 link.outgoing_resources
1961 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
1962 actions
1963 }
1964
1965 fn process_resource_actions(
1967 &self,
1968 link_id: &LinkId,
1969 actions: Vec<ResourceAction>,
1970 rng: &mut dyn Rng,
1971 ) -> Vec<LinkManagerAction> {
1972 let link = match self.links.get(link_id) {
1973 Some(l) => l,
1974 None => return Vec::new(),
1975 };
1976
1977 let mut result = Vec::new();
1978 for action in actions {
1979 match action {
1980 ResourceAction::SendAdvertisement(data) => {
1981 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
1983 result.extend(self.build_link_packet(
1984 link_id,
1985 constants::CONTEXT_RESOURCE_ADV,
1986 &encrypted,
1987 ));
1988 }
1989 }
1990 ResourceAction::SendPart(data) => {
1991 result.extend(self.build_link_packet(
1993 link_id,
1994 constants::CONTEXT_RESOURCE,
1995 &data,
1996 ));
1997 }
1998 ResourceAction::SendRequest(data) => {
1999 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2000 result.extend(self.build_link_packet(
2001 link_id,
2002 constants::CONTEXT_RESOURCE_REQ,
2003 &encrypted,
2004 ));
2005 }
2006 }
2007 ResourceAction::SendHmu(data) => {
2008 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2009 result.extend(self.build_link_packet(
2010 link_id,
2011 constants::CONTEXT_RESOURCE_HMU,
2012 &encrypted,
2013 ));
2014 }
2015 }
2016 ResourceAction::SendProof(data) => {
2017 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2018 result.extend(self.build_link_packet(
2019 link_id,
2020 constants::CONTEXT_RESOURCE_PRF,
2021 &encrypted,
2022 ));
2023 }
2024 }
2025 ResourceAction::SendCancelInitiator(data) => {
2026 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2027 result.extend(self.build_link_packet(
2028 link_id,
2029 constants::CONTEXT_RESOURCE_ICL,
2030 &encrypted,
2031 ));
2032 }
2033 }
2034 ResourceAction::SendCancelReceiver(data) => {
2035 if let Ok(encrypted) = link.engine.encrypt(&data, rng) {
2036 result.extend(self.build_link_packet(
2037 link_id,
2038 constants::CONTEXT_RESOURCE_RCL,
2039 &encrypted,
2040 ));
2041 }
2042 }
2043 ResourceAction::DataReceived { data, metadata } => {
2044 result.push(LinkManagerAction::ResourceReceived {
2045 link_id: *link_id,
2046 data,
2047 metadata,
2048 });
2049 }
2050 ResourceAction::Completed => {
2051 result.push(LinkManagerAction::ResourceCompleted { link_id: *link_id });
2052 }
2053 ResourceAction::Failed(e) => {
2054 result.push(LinkManagerAction::ResourceFailed {
2055 link_id: *link_id,
2056 error: format!("{}", e),
2057 });
2058 }
2059 ResourceAction::ProgressUpdate { received, total } => {
2060 result.push(LinkManagerAction::ResourceProgress {
2061 link_id: *link_id,
2062 received,
2063 total,
2064 });
2065 }
2066 }
2067 }
2068 result
2069 }
2070
2071 fn build_link_packet(
2073 &self,
2074 link_id: &LinkId,
2075 context: u8,
2076 data: &[u8],
2077 ) -> Vec<LinkManagerAction> {
2078 let flags = PacketFlags {
2079 header_type: constants::HEADER_1,
2080 context_flag: constants::FLAG_UNSET,
2081 transport_type: constants::TRANSPORT_BROADCAST,
2082 destination_type: constants::DESTINATION_LINK,
2083 packet_type: constants::PACKET_TYPE_DATA,
2084 };
2085 let mut actions = Vec::new();
2086 let max_mtu = self
2087 .links
2088 .get(link_id)
2089 .map(|l| l.engine.mtu() as usize)
2090 .unwrap_or(constants::MTU);
2091 if let Ok(pkt) = RawPacket::pack_with_max_mtu(flags, 0, link_id, None, context, data, max_mtu)
2092 {
2093 actions.push(LinkManagerAction::SendPacket {
2094 raw: pkt.raw,
2095 dest_type: constants::DESTINATION_LINK,
2096 attached_interface: None,
2097 });
2098 }
2099 actions
2100 }
2101
2102 pub fn send_resource(
2104 &mut self,
2105 link_id: &LinkId,
2106 data: &[u8],
2107 metadata: Option<&[u8]>,
2108 rng: &mut dyn Rng,
2109 ) -> Vec<LinkManagerAction> {
2110 let link = match self.links.get_mut(link_id) {
2111 Some(l) => l,
2112 None => return Vec::new(),
2113 };
2114
2115 if link.engine.state() != LinkState::Active {
2116 return Vec::new();
2117 }
2118
2119 let link_rtt = link.engine.rtt().unwrap_or(1.0);
2120 let resource_sdu = Self::resource_sdu_for_link(link);
2121 let now = time::now();
2122
2123 let enc_rng = std::cell::RefCell::new(rns_crypto::OsRng);
2126 let encrypt_fn = |plaintext: &[u8]| -> Vec<u8> {
2127 link.engine
2128 .encrypt(plaintext, &mut *enc_rng.borrow_mut())
2129 .unwrap_or_else(|_| plaintext.to_vec())
2130 };
2131
2132 let sender = match ResourceSender::new(
2133 data,
2134 metadata,
2135 resource_sdu,
2136 &encrypt_fn,
2137 &Bzip2Compressor,
2138 rng,
2139 now,
2140 true, false, None, 1, 1, None, link_rtt,
2147 6.0, ) {
2149 Ok(s) => s,
2150 Err(e) => {
2151 log::debug!("Failed to create ResourceSender: {}", e);
2152 return Vec::new();
2153 }
2154 };
2155
2156 let mut sender = sender;
2157 let adv_actions = sender.advertise(now);
2158 link.outgoing_resources.push(sender);
2159
2160 let _ = link;
2161 self.process_resource_actions(link_id, adv_actions, rng)
2162 }
2163
2164 pub fn set_resource_strategy(&mut self, link_id: &LinkId, strategy: ResourceStrategy) {
2166 if let Some(link) = self.links.get_mut(link_id) {
2167 link.resource_strategy = strategy;
2168 }
2169 }
2170
2171 pub fn flush_channel_tx(&mut self, link_id: &LinkId) {
2174 if let Some(link) = self.links.get_mut(link_id) {
2175 if let Some(ref mut channel) = link.channel {
2176 channel.flush_tx();
2177 }
2178 }
2179 }
2180
2181 pub fn send_channel_message(
2183 &mut self,
2184 link_id: &LinkId,
2185 msgtype: u16,
2186 payload: &[u8],
2187 rng: &mut dyn Rng,
2188 ) -> Result<Vec<LinkManagerAction>, String> {
2189 let link = match self.links.get_mut(link_id) {
2190 Some(l) => l,
2191 None => return Err("unknown link".to_string()),
2192 };
2193
2194 let channel = match link.channel {
2195 Some(ref mut ch) => ch,
2196 None => return Err("link has no active channel".to_string()),
2197 };
2198
2199 let link_mdu = link.engine.mdu();
2200 let now = time::now();
2201 let chan_actions = match channel.send(msgtype, payload, now, link_mdu) {
2202 Ok(a) => {
2203 link.channel_send_ok += 1;
2204 a
2205 }
2206 Err(e) => {
2207 log::debug!("Channel send failed: {:?}", e);
2208 match e {
2209 rns_core::channel::ChannelError::NotReady => link.channel_send_not_ready += 1,
2210 rns_core::channel::ChannelError::MessageTooBig => {
2211 link.channel_send_too_big += 1;
2212 }
2213 rns_core::channel::ChannelError::InvalidEnvelope => {
2214 link.channel_send_other_error += 1;
2215 }
2216 }
2217 return Err(e.to_string());
2218 }
2219 };
2220
2221 let _ = link;
2222 Ok(self.process_channel_actions(link_id, chan_actions, rng))
2223 }
2224
2225 pub fn tick(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2227 let now = time::now();
2228 let mut all_actions = Vec::new();
2229
2230 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2232
2233 for link_id in &link_ids {
2234 let link = match self.links.get_mut(link_id) {
2235 Some(l) => l,
2236 None => continue,
2237 };
2238
2239 let tick_actions = link.engine.tick(now);
2241 all_actions.extend(self.process_link_actions(link_id, &tick_actions));
2242
2243 let link = match self.links.get_mut(link_id) {
2245 Some(l) => l,
2246 None => continue,
2247 };
2248 if link.engine.needs_keepalive(now) {
2249 let flags = PacketFlags {
2251 header_type: constants::HEADER_1,
2252 context_flag: constants::FLAG_UNSET,
2253 transport_type: constants::TRANSPORT_BROADCAST,
2254 destination_type: constants::DESTINATION_LINK,
2255 packet_type: constants::PACKET_TYPE_DATA,
2256 };
2257 if let Ok(pkt) =
2258 RawPacket::pack(flags, 0, link_id, None, constants::CONTEXT_KEEPALIVE, &[])
2259 {
2260 all_actions.push(LinkManagerAction::SendPacket {
2261 raw: pkt.raw,
2262 dest_type: constants::DESTINATION_LINK,
2263 attached_interface: None,
2264 });
2265 link.engine.record_outbound(now, true);
2266 }
2267 }
2268
2269 if let Some(channel) = link.channel.as_mut() {
2270 let chan_actions = channel.tick(now);
2271 let _ = channel;
2272 let _ = link;
2273 all_actions.extend(self.process_channel_actions(link_id, chan_actions, rng));
2274 }
2275 }
2276
2277 for link_id in &link_ids {
2279 let link = match self.links.get_mut(link_id) {
2280 Some(l) => l,
2281 None => continue,
2282 };
2283
2284 let mut sender_actions = Vec::new();
2286 for sender in &mut link.outgoing_resources {
2287 sender_actions.extend(sender.tick(now));
2288 }
2289
2290 let mut receiver_actions = Vec::new();
2292 for receiver in &mut link.incoming_resources {
2293 let decrypt_fn = |ciphertext: &[u8]| -> Result<Vec<u8>, ()> {
2294 link.engine.decrypt(ciphertext).map_err(|_| ())
2295 };
2296 receiver_actions.extend(receiver.tick(now, &decrypt_fn, &Bzip2Compressor));
2297 }
2298
2299 link.outgoing_resources
2301 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2302 link.incoming_resources
2303 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2304
2305 let _ = link;
2306 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2307 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2308 }
2309
2310 let closed: Vec<LinkId> = self
2312 .links
2313 .iter()
2314 .filter(|(_, l)| l.engine.state() == LinkState::Closed)
2315 .map(|(id, _)| *id)
2316 .collect();
2317 for id in closed {
2318 self.links.remove(&id);
2319 all_actions.push(LinkManagerAction::DeregisterLinkDest { link_id: id });
2320 }
2321
2322 all_actions
2323 }
2324
2325 pub fn is_link_destination(&self, dest_hash: &[u8; 16]) -> bool {
2327 self.links.contains_key(dest_hash) || self.link_destinations.contains_key(dest_hash)
2328 }
2329
2330 pub fn link_state(&self, link_id: &LinkId) -> Option<LinkState> {
2332 self.links.get(link_id).map(|l| l.engine.state())
2333 }
2334
2335 pub fn link_rtt(&self, link_id: &LinkId) -> Option<f64> {
2337 self.links.get(link_id).and_then(|l| l.engine.rtt())
2338 }
2339
2340 pub fn set_link_rtt(&mut self, link_id: &LinkId, rtt: f64) {
2342 if let Some(link) = self.links.get_mut(link_id) {
2343 link.engine.set_rtt(rtt);
2344 }
2345 }
2346
2347 pub fn record_link_inbound(&mut self, link_id: &LinkId) {
2349 if let Some(link) = self.links.get_mut(link_id) {
2350 link.engine.record_inbound(time::now());
2351 }
2352 }
2353
2354 pub fn set_link_mtu(&mut self, link_id: &LinkId, mtu: u32) {
2356 if let Some(link) = self.links.get_mut(link_id) {
2357 link.engine.set_mtu(mtu);
2358 }
2359 }
2360
2361 pub fn link_count(&self) -> usize {
2363 self.links.len()
2364 }
2365
2366 pub fn resource_transfer_count(&self) -> usize {
2368 self.links
2369 .values()
2370 .map(|managed| managed.incoming_resources.len() + managed.outgoing_resources.len())
2371 .sum()
2372 }
2373
2374 pub fn cancel_all_resources(&mut self, rng: &mut dyn Rng) -> Vec<LinkManagerAction> {
2376 let link_ids: Vec<LinkId> = self.links.keys().copied().collect();
2377 let mut all_actions = Vec::new();
2378
2379 for link_id in &link_ids {
2380 let link = match self.links.get_mut(link_id) {
2381 Some(l) => l,
2382 None => continue,
2383 };
2384
2385 let mut sender_actions = Vec::new();
2386 for sender in &mut link.outgoing_resources {
2387 sender_actions.extend(sender.cancel());
2388 }
2389
2390 let mut receiver_actions = Vec::new();
2391 for receiver in &mut link.incoming_resources {
2392 receiver_actions.extend(receiver.reject());
2393 }
2394
2395 link.outgoing_resources
2396 .retain(|s| s.status < rns_core::resource::ResourceStatus::Complete);
2397 link.incoming_resources
2398 .retain(|r| r.status < rns_core::resource::ResourceStatus::Assembling);
2399
2400 let _ = link;
2401 all_actions.extend(self.process_resource_actions(link_id, sender_actions, rng));
2402 all_actions.extend(self.process_resource_actions(link_id, receiver_actions, rng));
2403 }
2404
2405 all_actions
2406 }
2407
2408 pub fn link_entries(&self) -> Vec<crate::event::LinkInfoEntry> {
2410 self.links
2411 .iter()
2412 .map(|(link_id, managed)| {
2413 let state = match managed.engine.state() {
2414 LinkState::Pending => "pending",
2415 LinkState::Handshake => "handshake",
2416 LinkState::Active => "active",
2417 LinkState::Stale => "stale",
2418 LinkState::Closed => "closed",
2419 };
2420 crate::event::LinkInfoEntry {
2421 link_id: *link_id,
2422 state: state.to_string(),
2423 is_initiator: managed.engine.is_initiator(),
2424 dest_hash: managed.dest_hash,
2425 remote_identity: managed.remote_identity.as_ref().map(|(h, _)| *h),
2426 rtt: managed.engine.rtt(),
2427 channel_window: managed.channel.as_ref().map(|c| c.window()),
2428 channel_outstanding: managed.channel.as_ref().map(|c| c.outstanding()),
2429 pending_channel_packets: managed.pending_channel_packets.len(),
2430 channel_send_ok: managed.channel_send_ok,
2431 channel_send_not_ready: managed.channel_send_not_ready,
2432 channel_send_too_big: managed.channel_send_too_big,
2433 channel_send_other_error: managed.channel_send_other_error,
2434 channel_messages_received: managed.channel_messages_received,
2435 channel_proofs_sent: managed.channel_proofs_sent,
2436 channel_proofs_received: managed.channel_proofs_received,
2437 }
2438 })
2439 .collect()
2440 }
2441
2442 pub fn resource_entries(&self) -> Vec<crate::event::ResourceInfoEntry> {
2444 let mut entries = Vec::new();
2445 for (link_id, managed) in &self.links {
2446 for recv in &managed.incoming_resources {
2447 let (received, total) = recv.progress();
2448 entries.push(crate::event::ResourceInfoEntry {
2449 link_id: *link_id,
2450 direction: "incoming".to_string(),
2451 total_parts: total,
2452 transferred_parts: received,
2453 complete: received >= total && total > 0,
2454 });
2455 }
2456 for send in &managed.outgoing_resources {
2457 let total = send.total_parts();
2458 let sent = send.sent_parts;
2459 entries.push(crate::event::ResourceInfoEntry {
2460 link_id: *link_id,
2461 direction: "outgoing".to_string(),
2462 total_parts: total,
2463 transferred_parts: sent,
2464 complete: sent >= total && total > 0,
2465 });
2466 }
2467 }
2468 entries
2469 }
2470
2471 fn process_link_actions(
2473 &self,
2474 link_id: &LinkId,
2475 actions: &[LinkAction],
2476 ) -> Vec<LinkManagerAction> {
2477 let mut result = Vec::new();
2478 for action in actions {
2479 match action {
2480 LinkAction::StateChanged {
2481 new_state, reason, ..
2482 } => match new_state {
2483 LinkState::Closed => {
2484 result.push(LinkManagerAction::LinkClosed {
2485 link_id: *link_id,
2486 reason: *reason,
2487 });
2488 }
2489 _ => {}
2490 },
2491 LinkAction::LinkEstablished {
2492 rtt, is_initiator, ..
2493 } => {
2494 let dest_hash = self
2495 .links
2496 .get(link_id)
2497 .map(|l| l.dest_hash)
2498 .unwrap_or([0u8; 16]);
2499 result.push(LinkManagerAction::LinkEstablished {
2500 link_id: *link_id,
2501 dest_hash,
2502 rtt: *rtt,
2503 is_initiator: *is_initiator,
2504 });
2505 }
2506 LinkAction::RemoteIdentified {
2507 identity_hash,
2508 public_key,
2509 ..
2510 } => {
2511 result.push(LinkManagerAction::RemoteIdentified {
2512 link_id: *link_id,
2513 identity_hash: *identity_hash,
2514 public_key: *public_key,
2515 });
2516 }
2517 LinkAction::DataReceived { .. } => {
2518 }
2520 }
2521 }
2522 result
2523 }
2524
2525 fn process_channel_actions(
2527 &mut self,
2528 link_id: &LinkId,
2529 actions: Vec<rns_core::channel::ChannelAction>,
2530 rng: &mut dyn Rng,
2531 ) -> Vec<LinkManagerAction> {
2532 let mut result = Vec::new();
2533 for action in actions {
2534 match action {
2535 rns_core::channel::ChannelAction::SendOnLink { raw, sequence } => {
2536 let encrypted = match self.links.get(link_id) {
2538 Some(link) => match link.engine.encrypt(&raw, rng) {
2539 Ok(encrypted) => encrypted,
2540 Err(_) => continue,
2541 },
2542 None => continue,
2543 };
2544 let flags = PacketFlags {
2545 header_type: constants::HEADER_1,
2546 context_flag: constants::FLAG_UNSET,
2547 transport_type: constants::TRANSPORT_BROADCAST,
2548 destination_type: constants::DESTINATION_LINK,
2549 packet_type: constants::PACKET_TYPE_DATA,
2550 };
2551 if let Ok(pkt) = RawPacket::pack(
2552 flags,
2553 0,
2554 link_id,
2555 None,
2556 constants::CONTEXT_CHANNEL,
2557 &encrypted,
2558 ) {
2559 if let Some(link_mut) = self.links.get_mut(link_id) {
2560 link_mut
2561 .pending_channel_packets
2562 .insert(pkt.packet_hash, sequence);
2563 }
2564 result.push(LinkManagerAction::SendPacket {
2565 raw: pkt.raw,
2566 dest_type: constants::DESTINATION_LINK,
2567 attached_interface: None,
2568 });
2569 }
2570 }
2571 rns_core::channel::ChannelAction::MessageReceived {
2572 msgtype, payload, ..
2573 } => {
2574 result.push(LinkManagerAction::ChannelMessageReceived {
2575 link_id: *link_id,
2576 msgtype,
2577 payload,
2578 });
2579 }
2580 rns_core::channel::ChannelAction::TeardownLink => {
2581 result.push(LinkManagerAction::LinkClosed {
2582 link_id: *link_id,
2583 reason: Some(TeardownReason::Timeout),
2584 });
2585 }
2586 }
2587 }
2588 result
2589 }
2590}
2591
2592fn compute_path_hash(path: &str) -> [u8; 16] {
2595 let full = rns_core::hash::full_hash(path.as_bytes());
2596 let mut result = [0u8; 16];
2597 result.copy_from_slice(&full[..16]);
2598 result
2599}
2600
2601#[cfg(test)]
2602mod tests {
2603 use super::*;
2604 use rns_crypto::identity::Identity;
2605 use rns_crypto::{FixedRng, OsRng};
2606
2607 fn make_rng(seed: u8) -> FixedRng {
2608 FixedRng::new(&[seed; 128])
2609 }
2610
2611 fn make_dest_keys(rng: &mut dyn Rng) -> (Ed25519PrivateKey, [u8; 32]) {
2612 let sig_prv = Ed25519PrivateKey::generate(rng);
2613 let sig_pub_bytes = sig_prv.public_key().public_bytes();
2614 (sig_prv, sig_pub_bytes)
2615 }
2616
2617 #[test]
2618 fn test_register_link_destination() {
2619 let mut mgr = LinkManager::new();
2620 let mut rng = make_rng(0x01);
2621 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2622 let dest_hash = [0xDD; 16];
2623
2624 mgr.register_link_destination(
2625 dest_hash,
2626 sig_prv,
2627 sig_pub_bytes,
2628 ResourceStrategy::AcceptNone,
2629 );
2630 assert!(mgr.is_link_destination(&dest_hash));
2631
2632 mgr.deregister_link_destination(&dest_hash);
2633 assert!(!mgr.is_link_destination(&dest_hash));
2634 }
2635
2636 #[test]
2637 fn test_create_link() {
2638 let mut mgr = LinkManager::new();
2639 let mut rng = OsRng;
2640 let dest_hash = [0xDD; 16];
2641
2642 let sig_pub_bytes = [0xAA; 32]; let (link_id, actions) = mgr.create_link(
2644 &dest_hash,
2645 &sig_pub_bytes,
2646 1,
2647 constants::MTU as u32,
2648 &mut rng,
2649 );
2650 assert_ne!(link_id, [0u8; 16]);
2651 assert_eq!(actions.len(), 2);
2653 assert!(matches!(
2654 actions[0],
2655 LinkManagerAction::RegisterLinkDest { .. }
2656 ));
2657 assert!(matches!(actions[1], LinkManagerAction::SendPacket { .. }));
2658
2659 assert_eq!(mgr.link_state(&link_id), Some(LinkState::Pending));
2661 }
2662
2663 #[test]
2664 fn test_full_handshake_via_manager() {
2665 let mut rng = OsRng;
2666 let dest_hash = [0xDD; 16];
2667
2668 let mut responder_mgr = LinkManager::new();
2670 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2671 responder_mgr.register_link_destination(
2672 dest_hash,
2673 sig_prv,
2674 sig_pub_bytes,
2675 ResourceStrategy::AcceptNone,
2676 );
2677
2678 let mut initiator_mgr = LinkManager::new();
2680
2681 let (link_id, init_actions) = initiator_mgr.create_link(
2683 &dest_hash,
2684 &sig_pub_bytes,
2685 1,
2686 constants::MTU as u32,
2687 &mut rng,
2688 );
2689 assert_eq!(init_actions.len(), 2);
2690
2691 let linkrequest_raw = match &init_actions[1] {
2693 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2694 _ => panic!("Expected SendPacket"),
2695 };
2696
2697 let lr_packet = RawPacket::unpack(&linkrequest_raw).unwrap();
2699
2700 let resp_actions = responder_mgr.handle_local_delivery(
2702 lr_packet.destination_hash,
2703 &linkrequest_raw,
2704 lr_packet.packet_hash,
2705 rns_core::transport::types::InterfaceId(0),
2706 &mut rng,
2707 );
2708 assert!(resp_actions.len() >= 2);
2710 assert!(matches!(
2711 resp_actions[0],
2712 LinkManagerAction::RegisterLinkDest { .. }
2713 ));
2714
2715 let lrproof_raw = match &resp_actions[1] {
2717 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
2718 _ => panic!("Expected SendPacket for LRPROOF"),
2719 };
2720
2721 let lrproof_packet = RawPacket::unpack(&lrproof_raw).unwrap();
2723 let init_actions2 = initiator_mgr.handle_local_delivery(
2724 lrproof_packet.destination_hash,
2725 &lrproof_raw,
2726 lrproof_packet.packet_hash,
2727 rns_core::transport::types::InterfaceId(0),
2728 &mut rng,
2729 );
2730
2731 let has_established = init_actions2
2733 .iter()
2734 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2735 assert!(has_established, "Initiator should emit LinkEstablished");
2736
2737 let lrrtt_raw = init_actions2
2739 .iter()
2740 .find_map(|a| match a {
2741 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
2742 _ => None,
2743 })
2744 .expect("Should have LRRTT SendPacket");
2745
2746 let lrrtt_packet = RawPacket::unpack(&lrrtt_raw).unwrap();
2748 let resp_link_id = lrrtt_packet.destination_hash;
2749 let resp_actions2 = responder_mgr.handle_local_delivery(
2750 resp_link_id,
2751 &lrrtt_raw,
2752 lrrtt_packet.packet_hash,
2753 rns_core::transport::types::InterfaceId(0),
2754 &mut rng,
2755 );
2756
2757 let has_established = resp_actions2
2758 .iter()
2759 .any(|a| matches!(a, LinkManagerAction::LinkEstablished { .. }));
2760 assert!(has_established, "Responder should emit LinkEstablished");
2761
2762 assert_eq!(initiator_mgr.link_state(&link_id), Some(LinkState::Active));
2764 assert_eq!(responder_mgr.link_state(&link_id), Some(LinkState::Active));
2765
2766 assert!(initiator_mgr.link_rtt(&link_id).is_some());
2768 assert!(responder_mgr.link_rtt(&link_id).is_some());
2769 }
2770
2771 #[test]
2772 fn test_encrypted_data_exchange() {
2773 let mut rng = OsRng;
2774 let dest_hash = [0xDD; 16];
2775 let mut resp_mgr = LinkManager::new();
2776 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2777 resp_mgr.register_link_destination(
2778 dest_hash,
2779 sig_prv,
2780 sig_pub_bytes,
2781 ResourceStrategy::AcceptNone,
2782 );
2783 let mut init_mgr = LinkManager::new();
2784
2785 let (link_id, init_actions) = init_mgr.create_link(
2787 &dest_hash,
2788 &sig_pub_bytes,
2789 1,
2790 constants::MTU as u32,
2791 &mut rng,
2792 );
2793 let lr_raw = extract_send_packet(&init_actions);
2794 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2795 let resp_actions = resp_mgr.handle_local_delivery(
2796 lr_pkt.destination_hash,
2797 &lr_raw,
2798 lr_pkt.packet_hash,
2799 rns_core::transport::types::InterfaceId(0),
2800 &mut rng,
2801 );
2802 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2803 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2804 let init_actions2 = init_mgr.handle_local_delivery(
2805 lrproof_pkt.destination_hash,
2806 &lrproof_raw,
2807 lrproof_pkt.packet_hash,
2808 rns_core::transport::types::InterfaceId(0),
2809 &mut rng,
2810 );
2811 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2812 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2813 resp_mgr.handle_local_delivery(
2814 lrrtt_pkt.destination_hash,
2815 &lrrtt_raw,
2816 lrrtt_pkt.packet_hash,
2817 rns_core::transport::types::InterfaceId(0),
2818 &mut rng,
2819 );
2820
2821 let actions =
2823 init_mgr.send_on_link(&link_id, b"hello link!", constants::CONTEXT_NONE, &mut rng);
2824 assert_eq!(actions.len(), 1);
2825 assert!(matches!(actions[0], LinkManagerAction::SendPacket { .. }));
2826 }
2827
2828 #[test]
2829 fn test_request_response() {
2830 let mut rng = OsRng;
2831 let dest_hash = [0xDD; 16];
2832 let mut resp_mgr = LinkManager::new();
2833 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2834 resp_mgr.register_link_destination(
2835 dest_hash,
2836 sig_prv,
2837 sig_pub_bytes,
2838 ResourceStrategy::AcceptNone,
2839 );
2840
2841 resp_mgr.register_request_handler("/status", None, |_link_id, _path, _data, _remote| {
2843 Some(b"OK".to_vec())
2844 });
2845
2846 let mut init_mgr = LinkManager::new();
2847
2848 let (link_id, init_actions) = init_mgr.create_link(
2850 &dest_hash,
2851 &sig_pub_bytes,
2852 1,
2853 constants::MTU as u32,
2854 &mut rng,
2855 );
2856 let lr_raw = extract_send_packet(&init_actions);
2857 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2858 let resp_actions = resp_mgr.handle_local_delivery(
2859 lr_pkt.destination_hash,
2860 &lr_raw,
2861 lr_pkt.packet_hash,
2862 rns_core::transport::types::InterfaceId(0),
2863 &mut rng,
2864 );
2865 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2866 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2867 let init_actions2 = init_mgr.handle_local_delivery(
2868 lrproof_pkt.destination_hash,
2869 &lrproof_raw,
2870 lrproof_pkt.packet_hash,
2871 rns_core::transport::types::InterfaceId(0),
2872 &mut rng,
2873 );
2874 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2875 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2876 resp_mgr.handle_local_delivery(
2877 lrrtt_pkt.destination_hash,
2878 &lrrtt_raw,
2879 lrrtt_pkt.packet_hash,
2880 rns_core::transport::types::InterfaceId(0),
2881 &mut rng,
2882 );
2883
2884 let req_actions = init_mgr.send_request(&link_id, "/status", b"query", &mut rng);
2886 assert_eq!(req_actions.len(), 1);
2887
2888 let req_raw = extract_send_packet_from(&req_actions);
2890 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2891 let resp_actions = resp_mgr.handle_local_delivery(
2892 req_pkt.destination_hash,
2893 &req_raw,
2894 req_pkt.packet_hash,
2895 rns_core::transport::types::InterfaceId(0),
2896 &mut rng,
2897 );
2898
2899 let has_response = resp_actions
2901 .iter()
2902 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2903 assert!(has_response, "Handler should produce a response packet");
2904 }
2905
2906 #[test]
2907 fn test_request_acl_deny_unidentified() {
2908 let mut rng = OsRng;
2909 let dest_hash = [0xDD; 16];
2910 let mut resp_mgr = LinkManager::new();
2911 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
2912 resp_mgr.register_link_destination(
2913 dest_hash,
2914 sig_prv,
2915 sig_pub_bytes,
2916 ResourceStrategy::AcceptNone,
2917 );
2918
2919 resp_mgr.register_request_handler(
2921 "/restricted",
2922 Some(vec![[0xAA; 16]]),
2923 |_link_id, _path, _data, _remote| Some(b"secret".to_vec()),
2924 );
2925
2926 let mut init_mgr = LinkManager::new();
2927
2928 let (link_id, init_actions) = init_mgr.create_link(
2930 &dest_hash,
2931 &sig_pub_bytes,
2932 1,
2933 constants::MTU as u32,
2934 &mut rng,
2935 );
2936 let lr_raw = extract_send_packet(&init_actions);
2937 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
2938 let resp_actions = resp_mgr.handle_local_delivery(
2939 lr_pkt.destination_hash,
2940 &lr_raw,
2941 lr_pkt.packet_hash,
2942 rns_core::transport::types::InterfaceId(0),
2943 &mut rng,
2944 );
2945 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
2946 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
2947 let init_actions2 = init_mgr.handle_local_delivery(
2948 lrproof_pkt.destination_hash,
2949 &lrproof_raw,
2950 lrproof_pkt.packet_hash,
2951 rns_core::transport::types::InterfaceId(0),
2952 &mut rng,
2953 );
2954 let lrrtt_raw = extract_any_send_packet(&init_actions2);
2955 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
2956 resp_mgr.handle_local_delivery(
2957 lrrtt_pkt.destination_hash,
2958 &lrrtt_raw,
2959 lrrtt_pkt.packet_hash,
2960 rns_core::transport::types::InterfaceId(0),
2961 &mut rng,
2962 );
2963
2964 let req_actions = init_mgr.send_request(&link_id, "/restricted", b"query", &mut rng);
2966 let req_raw = extract_send_packet_from(&req_actions);
2967 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
2968 let resp_actions = resp_mgr.handle_local_delivery(
2969 req_pkt.destination_hash,
2970 &req_raw,
2971 req_pkt.packet_hash,
2972 rns_core::transport::types::InterfaceId(0),
2973 &mut rng,
2974 );
2975
2976 let has_response = resp_actions
2978 .iter()
2979 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
2980 assert!(!has_response, "Unidentified peer should be denied");
2981 }
2982
2983 #[test]
2984 fn test_teardown_link() {
2985 let mut rng = OsRng;
2986 let dest_hash = [0xDD; 16];
2987 let mut mgr = LinkManager::new();
2988
2989 let dummy_sig = [0xAA; 32];
2990 let (link_id, _) =
2991 mgr.create_link(&dest_hash, &dummy_sig, 1, constants::MTU as u32, &mut rng);
2992 assert_eq!(mgr.link_count(), 1);
2993
2994 let actions = mgr.teardown_link(&link_id);
2995 let has_close = actions
2996 .iter()
2997 .any(|a| matches!(a, LinkManagerAction::LinkClosed { .. }));
2998 assert!(has_close);
2999
3000 let tick_actions = mgr.tick(&mut rng);
3002 let has_deregister = tick_actions
3003 .iter()
3004 .any(|a| matches!(a, LinkManagerAction::DeregisterLinkDest { .. }));
3005 assert!(has_deregister);
3006 assert_eq!(mgr.link_count(), 0);
3007 }
3008
3009 #[test]
3010 fn test_identify_on_link() {
3011 let mut rng = OsRng;
3012 let dest_hash = [0xDD; 16];
3013 let mut resp_mgr = LinkManager::new();
3014 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3015 resp_mgr.register_link_destination(
3016 dest_hash,
3017 sig_prv,
3018 sig_pub_bytes,
3019 ResourceStrategy::AcceptNone,
3020 );
3021 let mut init_mgr = LinkManager::new();
3022
3023 let (link_id, init_actions) = init_mgr.create_link(
3025 &dest_hash,
3026 &sig_pub_bytes,
3027 1,
3028 constants::MTU as u32,
3029 &mut rng,
3030 );
3031 let lr_raw = extract_send_packet(&init_actions);
3032 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3033 let resp_actions = resp_mgr.handle_local_delivery(
3034 lr_pkt.destination_hash,
3035 &lr_raw,
3036 lr_pkt.packet_hash,
3037 rns_core::transport::types::InterfaceId(0),
3038 &mut rng,
3039 );
3040 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3041 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3042 let init_actions2 = init_mgr.handle_local_delivery(
3043 lrproof_pkt.destination_hash,
3044 &lrproof_raw,
3045 lrproof_pkt.packet_hash,
3046 rns_core::transport::types::InterfaceId(0),
3047 &mut rng,
3048 );
3049 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3050 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3051 resp_mgr.handle_local_delivery(
3052 lrrtt_pkt.destination_hash,
3053 &lrrtt_raw,
3054 lrrtt_pkt.packet_hash,
3055 rns_core::transport::types::InterfaceId(0),
3056 &mut rng,
3057 );
3058
3059 let identity = Identity::new(&mut rng);
3061 let id_actions = init_mgr.identify(&link_id, &identity, &mut rng);
3062 assert_eq!(id_actions.len(), 1);
3063
3064 let id_raw = extract_send_packet_from(&id_actions);
3066 let id_pkt = RawPacket::unpack(&id_raw).unwrap();
3067 let resp_actions = resp_mgr.handle_local_delivery(
3068 id_pkt.destination_hash,
3069 &id_raw,
3070 id_pkt.packet_hash,
3071 rns_core::transport::types::InterfaceId(0),
3072 &mut rng,
3073 );
3074
3075 let has_identified = resp_actions
3076 .iter()
3077 .any(|a| matches!(a, LinkManagerAction::RemoteIdentified { .. }));
3078 assert!(has_identified, "Responder should emit RemoteIdentified");
3079 }
3080
3081 #[test]
3082 fn test_path_hash_computation() {
3083 let h1 = compute_path_hash("/status");
3084 let h2 = compute_path_hash("/path");
3085 assert_ne!(h1, h2);
3086
3087 assert_eq!(h1, compute_path_hash("/status"));
3089 }
3090
3091 #[test]
3092 fn test_link_count() {
3093 let mut mgr = LinkManager::new();
3094 let mut rng = OsRng;
3095
3096 assert_eq!(mgr.link_count(), 0);
3097
3098 let dummy_sig = [0xAA; 32];
3099 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3100 assert_eq!(mgr.link_count(), 1);
3101
3102 mgr.create_link(&[0x22; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3103 assert_eq!(mgr.link_count(), 2);
3104 }
3105
3106 fn extract_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3109 extract_send_packet_at(actions, actions.len() - 1)
3110 }
3111
3112 fn extract_send_packet_at(actions: &[LinkManagerAction], idx: usize) -> Vec<u8> {
3113 match &actions[idx] {
3114 LinkManagerAction::SendPacket { raw, .. } => raw.clone(),
3115 other => panic!("Expected SendPacket at index {}, got {:?}", idx, other),
3116 }
3117 }
3118
3119 fn extract_any_send_packet(actions: &[LinkManagerAction]) -> Vec<u8> {
3120 actions
3121 .iter()
3122 .find_map(|a| match a {
3123 LinkManagerAction::SendPacket { raw, .. } => Some(raw.clone()),
3124 _ => None,
3125 })
3126 .expect("Expected at least one SendPacket action")
3127 }
3128
3129 fn extract_send_packet_from(actions: &[LinkManagerAction]) -> Vec<u8> {
3130 extract_any_send_packet(actions)
3131 }
3132
3133 fn setup_active_link() -> (LinkManager, LinkManager, LinkId) {
3136 let mut rng = OsRng;
3137 let dest_hash = [0xDD; 16];
3138 let mut resp_mgr = LinkManager::new();
3139 let (sig_prv, sig_pub_bytes) = make_dest_keys(&mut rng);
3140 resp_mgr.register_link_destination(
3141 dest_hash,
3142 sig_prv,
3143 sig_pub_bytes,
3144 ResourceStrategy::AcceptNone,
3145 );
3146 let mut init_mgr = LinkManager::new();
3147
3148 let (link_id, init_actions) = init_mgr.create_link(
3149 &dest_hash,
3150 &sig_pub_bytes,
3151 1,
3152 constants::MTU as u32,
3153 &mut rng,
3154 );
3155 let lr_raw = extract_send_packet(&init_actions);
3156 let lr_pkt = RawPacket::unpack(&lr_raw).unwrap();
3157 let resp_actions = resp_mgr.handle_local_delivery(
3158 lr_pkt.destination_hash,
3159 &lr_raw,
3160 lr_pkt.packet_hash,
3161 rns_core::transport::types::InterfaceId(0),
3162 &mut rng,
3163 );
3164 let lrproof_raw = extract_send_packet_at(&resp_actions, 1);
3165 let lrproof_pkt = RawPacket::unpack(&lrproof_raw).unwrap();
3166 let init_actions2 = init_mgr.handle_local_delivery(
3167 lrproof_pkt.destination_hash,
3168 &lrproof_raw,
3169 lrproof_pkt.packet_hash,
3170 rns_core::transport::types::InterfaceId(0),
3171 &mut rng,
3172 );
3173 let lrrtt_raw = extract_any_send_packet(&init_actions2);
3174 let lrrtt_pkt = RawPacket::unpack(&lrrtt_raw).unwrap();
3175 resp_mgr.handle_local_delivery(
3176 lrrtt_pkt.destination_hash,
3177 &lrrtt_raw,
3178 lrrtt_pkt.packet_hash,
3179 rns_core::transport::types::InterfaceId(0),
3180 &mut rng,
3181 );
3182
3183 assert_eq!(init_mgr.link_state(&link_id), Some(LinkState::Active));
3184 assert_eq!(resp_mgr.link_state(&link_id), Some(LinkState::Active));
3185
3186 (init_mgr, resp_mgr, link_id)
3187 }
3188
3189 #[test]
3194 fn test_resource_strategy_default() {
3195 let mut mgr = LinkManager::new();
3196 let mut rng = OsRng;
3197 let dummy_sig = [0xAA; 32];
3198 let (link_id, _) =
3199 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3200
3201 let link = mgr.links.get(&link_id).unwrap();
3203 assert_eq!(link.resource_strategy, ResourceStrategy::AcceptNone);
3204 }
3205
3206 #[test]
3207 fn test_set_resource_strategy() {
3208 let mut mgr = LinkManager::new();
3209 let mut rng = OsRng;
3210 let dummy_sig = [0xAA; 32];
3211 let (link_id, _) =
3212 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3213
3214 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3215 assert_eq!(
3216 mgr.links.get(&link_id).unwrap().resource_strategy,
3217 ResourceStrategy::AcceptAll
3218 );
3219
3220 mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3221 assert_eq!(
3222 mgr.links.get(&link_id).unwrap().resource_strategy,
3223 ResourceStrategy::AcceptApp
3224 );
3225 }
3226
3227 #[test]
3228 fn test_send_resource_on_active_link() {
3229 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3230 let mut rng = OsRng;
3231
3232 let data = vec![0xAB; 100]; let actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3235
3236 let has_send = actions
3238 .iter()
3239 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3240 assert!(
3241 has_send,
3242 "send_resource should emit advertisement SendPacket"
3243 );
3244 }
3245
3246 #[test]
3247 fn test_send_resource_on_inactive_link() {
3248 let mut mgr = LinkManager::new();
3249 let mut rng = OsRng;
3250 let dummy_sig = [0xAA; 32];
3251 let (link_id, _) =
3252 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3253
3254 let actions = mgr.send_resource(&link_id, b"data", None, &mut rng);
3256 assert!(actions.is_empty(), "Cannot send resource on inactive link");
3257 }
3258
3259 #[test]
3260 fn test_resource_adv_rejected_by_accept_none() {
3261 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3262 let mut rng = OsRng;
3263
3264 let data = vec![0xCD; 100];
3267 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3268
3269 for action in &adv_actions {
3271 if let LinkManagerAction::SendPacket { raw, .. } = action {
3272 let pkt = RawPacket::unpack(raw).unwrap();
3273 let resp_actions = resp_mgr.handle_local_delivery(
3274 pkt.destination_hash,
3275 raw,
3276 pkt.packet_hash,
3277 rns_core::transport::types::InterfaceId(0),
3278 &mut rng,
3279 );
3280 let has_resource_received = resp_actions
3282 .iter()
3283 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3284 assert!(
3285 !has_resource_received,
3286 "AcceptNone should not accept resource"
3287 );
3288 }
3289 }
3290 }
3291
3292 #[test]
3293 fn test_resource_adv_accepted_by_accept_all() {
3294 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3295 let mut rng = OsRng;
3296
3297 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3299
3300 let data = vec![0xCD; 100];
3302 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3303
3304 for action in &adv_actions {
3306 if let LinkManagerAction::SendPacket { raw, .. } = action {
3307 let pkt = RawPacket::unpack(raw).unwrap();
3308 let resp_actions = resp_mgr.handle_local_delivery(
3309 pkt.destination_hash,
3310 raw,
3311 pkt.packet_hash,
3312 rns_core::transport::types::InterfaceId(0),
3313 &mut rng,
3314 );
3315 let has_send = resp_actions
3317 .iter()
3318 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3319 assert!(has_send, "AcceptAll should accept and request parts");
3320 }
3321 }
3322 }
3323
3324 #[test]
3325 fn test_resource_accept_app_query() {
3326 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3327 let mut rng = OsRng;
3328
3329 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3331
3332 let data = vec![0xCD; 100];
3334 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3335
3336 let mut got_query = false;
3338 for action in &adv_actions {
3339 if let LinkManagerAction::SendPacket { raw, .. } = action {
3340 let pkt = RawPacket::unpack(raw).unwrap();
3341 let resp_actions = resp_mgr.handle_local_delivery(
3342 pkt.destination_hash,
3343 raw,
3344 pkt.packet_hash,
3345 rns_core::transport::types::InterfaceId(0),
3346 &mut rng,
3347 );
3348 for a in &resp_actions {
3349 if matches!(a, LinkManagerAction::ResourceAcceptQuery { .. }) {
3350 got_query = true;
3351 }
3352 }
3353 }
3354 }
3355 assert!(got_query, "AcceptApp should emit ResourceAcceptQuery");
3356 }
3357
3358 #[test]
3359 fn test_resource_accept_app_accept() {
3360 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3361 let mut rng = OsRng;
3362
3363 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3364
3365 let data = vec![0xCD; 100];
3366 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3367
3368 for action in &adv_actions {
3369 if let LinkManagerAction::SendPacket { raw, .. } = action {
3370 let pkt = RawPacket::unpack(raw).unwrap();
3371 let resp_actions = resp_mgr.handle_local_delivery(
3372 pkt.destination_hash,
3373 raw,
3374 pkt.packet_hash,
3375 rns_core::transport::types::InterfaceId(0),
3376 &mut rng,
3377 );
3378 for a in &resp_actions {
3379 if let LinkManagerAction::ResourceAcceptQuery {
3380 link_id: lid,
3381 resource_hash,
3382 ..
3383 } = a
3384 {
3385 let accept_actions =
3387 resp_mgr.accept_resource(lid, resource_hash, true, &mut rng);
3388 let has_send = accept_actions
3390 .iter()
3391 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3392 assert!(
3393 has_send,
3394 "Accepting resource should produce request for parts"
3395 );
3396 }
3397 }
3398 }
3399 }
3400 }
3401
3402 #[test]
3403 fn test_resource_accept_app_reject() {
3404 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3405 let mut rng = OsRng;
3406
3407 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptApp);
3408
3409 let data = vec![0xCD; 100];
3410 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3411
3412 for action in &adv_actions {
3413 if let LinkManagerAction::SendPacket { raw, .. } = action {
3414 let pkt = RawPacket::unpack(raw).unwrap();
3415 let resp_actions = resp_mgr.handle_local_delivery(
3416 pkt.destination_hash,
3417 raw,
3418 pkt.packet_hash,
3419 rns_core::transport::types::InterfaceId(0),
3420 &mut rng,
3421 );
3422 for a in &resp_actions {
3423 if let LinkManagerAction::ResourceAcceptQuery {
3424 link_id: lid,
3425 resource_hash,
3426 ..
3427 } = a
3428 {
3429 let reject_actions =
3431 resp_mgr.accept_resource(lid, resource_hash, false, &mut rng);
3432 let has_resource_received = reject_actions
3435 .iter()
3436 .any(|a| matches!(a, LinkManagerAction::ResourceReceived { .. }));
3437 assert!(!has_resource_received);
3438 }
3439 }
3440 }
3441 }
3442 }
3443
3444 #[test]
3445 fn test_resource_full_transfer() {
3446 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3447 let mut rng = OsRng;
3448
3449 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3451
3452 let original_data = b"Hello, Resource Transfer!".to_vec();
3454 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3455
3456 let mut pending: Vec<(char, LinkManagerAction)> =
3459 adv_actions.into_iter().map(|a| ('i', a)).collect();
3460 let mut rounds = 0;
3461 let max_rounds = 50;
3462 let mut resource_received = false;
3463 let mut sender_completed = false;
3464
3465 while !pending.is_empty() && rounds < max_rounds {
3466 rounds += 1;
3467 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3468
3469 for (source, action) in pending.drain(..) {
3470 if let LinkManagerAction::SendPacket { raw, .. } = action {
3471 let pkt = RawPacket::unpack(&raw).unwrap();
3472
3473 let target_actions = if source == 'i' {
3475 resp_mgr.handle_local_delivery(
3476 pkt.destination_hash,
3477 &raw,
3478 pkt.packet_hash,
3479 rns_core::transport::types::InterfaceId(0),
3480 &mut rng,
3481 )
3482 } else {
3483 init_mgr.handle_local_delivery(
3484 pkt.destination_hash,
3485 &raw,
3486 pkt.packet_hash,
3487 rns_core::transport::types::InterfaceId(0),
3488 &mut rng,
3489 )
3490 };
3491
3492 let target_source = if source == 'i' { 'r' } else { 'i' };
3493 for a in &target_actions {
3494 match a {
3495 LinkManagerAction::ResourceReceived { data, .. } => {
3496 assert_eq!(*data, original_data);
3497 resource_received = true;
3498 }
3499 LinkManagerAction::ResourceCompleted { .. } => {
3500 sender_completed = true;
3501 }
3502 _ => {}
3503 }
3504 }
3505 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3506 }
3507 }
3508 pending = next;
3509 }
3510
3511 assert!(
3512 resource_received,
3513 "Responder should receive resource data (rounds={})",
3514 rounds
3515 );
3516 assert!(
3517 sender_completed,
3518 "Sender should get completion proof (rounds={})",
3519 rounds
3520 );
3521 }
3522
3523 #[test]
3524 fn test_resource_cancel_icl() {
3525 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3526 let mut rng = OsRng;
3527
3528 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3529
3530 let data = vec![0xAB; 2000];
3532 let adv_actions = init_mgr.send_resource(&link_id, &data, None, &mut rng);
3533
3534 for action in &adv_actions {
3536 if let LinkManagerAction::SendPacket { raw, .. } = action {
3537 let pkt = RawPacket::unpack(raw).unwrap();
3538 resp_mgr.handle_local_delivery(
3539 pkt.destination_hash,
3540 raw,
3541 pkt.packet_hash,
3542 rns_core::transport::types::InterfaceId(0),
3543 &mut rng,
3544 );
3545 }
3546 }
3547
3548 assert!(!resp_mgr
3550 .links
3551 .get(&link_id)
3552 .unwrap()
3553 .incoming_resources
3554 .is_empty());
3555
3556 let icl_actions = resp_mgr.handle_resource_icl(&link_id);
3558
3559 let has_failed = icl_actions
3561 .iter()
3562 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3563 assert!(has_failed, "ICL should produce ResourceFailed");
3564 }
3565
3566 #[test]
3567 fn test_resource_cancel_rcl() {
3568 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3569 let mut rng = OsRng;
3570
3571 let data = vec![0xAB; 2000];
3573 init_mgr.send_resource(&link_id, &data, None, &mut rng);
3574
3575 assert!(!init_mgr
3577 .links
3578 .get(&link_id)
3579 .unwrap()
3580 .outgoing_resources
3581 .is_empty());
3582
3583 let rcl_actions = init_mgr.handle_resource_rcl(&link_id);
3585
3586 let has_failed = rcl_actions
3587 .iter()
3588 .any(|a| matches!(a, LinkManagerAction::ResourceFailed { .. }));
3589 assert!(has_failed, "RCL should produce ResourceFailed");
3590 }
3591
3592 #[test]
3593 fn test_cancel_all_resources_clears_active_transfers() {
3594 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3595 let mut rng = OsRng;
3596
3597 let actions = init_mgr.send_resource(&link_id, b"resource body", None, &mut rng);
3598 assert!(!actions.is_empty());
3599 assert_eq!(init_mgr.resource_transfer_count(), 1);
3600
3601 let cancel_actions = init_mgr.cancel_all_resources(&mut rng);
3602
3603 assert_eq!(init_mgr.resource_transfer_count(), 0);
3604 assert!(cancel_actions
3605 .iter()
3606 .any(|action| matches!(action, LinkManagerAction::SendPacket { .. })));
3607 }
3608
3609 #[test]
3610 fn test_resource_tick_cleans_up() {
3611 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3612 let mut rng = OsRng;
3613
3614 let data = vec![0xAB; 100];
3615 init_mgr.send_resource(&link_id, &data, None, &mut rng);
3616
3617 assert!(!init_mgr
3618 .links
3619 .get(&link_id)
3620 .unwrap()
3621 .outgoing_resources
3622 .is_empty());
3623
3624 init_mgr.handle_resource_rcl(&link_id);
3626
3627 init_mgr.tick(&mut rng);
3629
3630 assert!(
3631 init_mgr
3632 .links
3633 .get(&link_id)
3634 .unwrap()
3635 .outgoing_resources
3636 .is_empty(),
3637 "Tick should clean up completed/failed outgoing resources"
3638 );
3639 }
3640
3641 #[test]
3642 fn test_build_link_packet() {
3643 let (init_mgr, _resp_mgr, link_id) = setup_active_link();
3644
3645 let actions =
3646 init_mgr.build_link_packet(&link_id, constants::CONTEXT_RESOURCE, b"test data");
3647 assert_eq!(actions.len(), 1);
3648 if let LinkManagerAction::SendPacket { raw, dest_type, .. } = &actions[0] {
3649 let pkt = RawPacket::unpack(raw).unwrap();
3650 assert_eq!(pkt.context, constants::CONTEXT_RESOURCE);
3651 assert_eq!(*dest_type, constants::DESTINATION_LINK);
3652 } else {
3653 panic!("Expected SendPacket");
3654 }
3655 }
3656
3657 #[test]
3662 fn test_channel_message_delivery() {
3663 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3664 let mut rng = OsRng;
3665
3666 let chan_actions = init_mgr
3668 .send_channel_message(&link_id, 42, b"channel data", &mut rng)
3669 .expect("active link channel send should succeed");
3670 assert!(!chan_actions.is_empty());
3671
3672 let mut got_channel_msg = false;
3674 for action in &chan_actions {
3675 if let LinkManagerAction::SendPacket { raw, .. } = action {
3676 let pkt = RawPacket::unpack(raw).unwrap();
3677 let resp_actions = resp_mgr.handle_local_delivery(
3678 pkt.destination_hash,
3679 raw,
3680 pkt.packet_hash,
3681 rns_core::transport::types::InterfaceId(0),
3682 &mut rng,
3683 );
3684 for a in &resp_actions {
3685 if let LinkManagerAction::ChannelMessageReceived {
3686 msgtype, payload, ..
3687 } = a
3688 {
3689 assert_eq!(*msgtype, 42);
3690 assert_eq!(*payload, b"channel data");
3691 got_channel_msg = true;
3692 }
3693 }
3694 }
3695 }
3696 assert!(got_channel_msg, "Responder should receive channel message");
3697 }
3698
3699 #[test]
3700 fn test_channel_proof_reopens_send_window() {
3701 let (mut init_mgr, _resp_mgr, link_id) = setup_active_link();
3702 let mut rng = OsRng;
3703
3704 init_mgr
3705 .send_channel_message(&link_id, 42, b"first", &mut rng)
3706 .expect("first send should succeed");
3707 init_mgr
3708 .send_channel_message(&link_id, 42, b"second", &mut rng)
3709 .expect("second send should succeed");
3710
3711 let err = init_mgr
3712 .send_channel_message(&link_id, 42, b"third", &mut rng)
3713 .expect_err("third send should hit the initial channel window");
3714 assert_eq!(err, "Channel is not ready to send");
3715
3716 let queued_packets = init_mgr
3717 .links
3718 .get(&link_id)
3719 .unwrap()
3720 .pending_channel_packets
3721 .clone();
3722 assert_eq!(queued_packets.len(), 2);
3723 for tracked_hash in queued_packets.keys().take(1) {
3724 let mut proof_data = Vec::with_capacity(96);
3725 proof_data.extend_from_slice(tracked_hash);
3726 proof_data.extend_from_slice(&[0x11; 64]);
3727 let flags = PacketFlags {
3728 header_type: constants::HEADER_1,
3729 context_flag: constants::FLAG_UNSET,
3730 transport_type: constants::TRANSPORT_BROADCAST,
3731 destination_type: constants::DESTINATION_LINK,
3732 packet_type: constants::PACKET_TYPE_PROOF,
3733 };
3734 let proof = RawPacket::pack(
3735 flags,
3736 0,
3737 &link_id,
3738 None,
3739 constants::CONTEXT_NONE,
3740 &proof_data,
3741 )
3742 .expect("proof packet should pack");
3743 let ack_actions = init_mgr.handle_local_delivery(
3744 link_id,
3745 &proof.raw,
3746 proof.packet_hash,
3747 rns_core::transport::types::InterfaceId(0),
3748 &mut rng,
3749 );
3750 assert!(
3751 ack_actions.is_empty(),
3752 "proof delivery should only update channel state"
3753 );
3754 }
3755
3756 init_mgr
3757 .send_channel_message(&link_id, 42, b"third", &mut rng)
3758 .expect("proof should free one channel slot");
3759 }
3760
3761 #[test]
3762 fn test_generic_link_data_delivery() {
3763 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3764 let mut rng = OsRng;
3765
3766 let actions = init_mgr.send_on_link(&link_id, b"raw stuff", 0x42, &mut rng);
3768 assert_eq!(actions.len(), 1);
3769
3770 let raw = extract_any_send_packet(&actions);
3772 let pkt = RawPacket::unpack(&raw).unwrap();
3773 let resp_actions = resp_mgr.handle_local_delivery(
3774 pkt.destination_hash,
3775 &raw,
3776 pkt.packet_hash,
3777 rns_core::transport::types::InterfaceId(0),
3778 &mut rng,
3779 );
3780
3781 let has_data = resp_actions
3782 .iter()
3783 .any(|a| matches!(a, LinkManagerAction::LinkDataReceived { context: 0x42, .. }));
3784 assert!(
3785 has_data,
3786 "Responder should receive LinkDataReceived for unknown context"
3787 );
3788 }
3789
3790 #[test]
3791 fn test_response_delivery() {
3792 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3793 let mut rng = OsRng;
3794
3795 resp_mgr.register_request_handler("/echo", None, |_link_id, _path, data, _remote| {
3797 Some(data.to_vec())
3798 });
3799
3800 let req_actions = init_mgr.send_request(&link_id, "/echo", b"\xc0", &mut rng); assert!(!req_actions.is_empty());
3803
3804 let req_raw = extract_any_send_packet(&req_actions);
3806 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3807 let resp_actions = resp_mgr.handle_local_delivery(
3808 req_pkt.destination_hash,
3809 &req_raw,
3810 req_pkt.packet_hash,
3811 rns_core::transport::types::InterfaceId(0),
3812 &mut rng,
3813 );
3814 let has_resp_send = resp_actions
3815 .iter()
3816 .any(|a| matches!(a, LinkManagerAction::SendPacket { .. }));
3817 assert!(has_resp_send, "Handler should produce response");
3818
3819 let resp_raw = extract_any_send_packet(&resp_actions);
3821 let resp_pkt = RawPacket::unpack(&resp_raw).unwrap();
3822 let init_actions = init_mgr.handle_local_delivery(
3823 resp_pkt.destination_hash,
3824 &resp_raw,
3825 resp_pkt.packet_hash,
3826 rns_core::transport::types::InterfaceId(0),
3827 &mut rng,
3828 );
3829
3830 let has_response_received = init_actions
3831 .iter()
3832 .any(|a| matches!(a, LinkManagerAction::ResponseReceived { .. }));
3833 assert!(
3834 has_response_received,
3835 "Initiator should receive ResponseReceived"
3836 );
3837 }
3838
3839 #[test]
3840 fn test_large_response_uses_resource_fallback() {
3841 let (init_mgr, mut resp_mgr, link_id) = setup_active_link();
3842 let mut rng = OsRng;
3843
3844 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
3847 resp_mgr.register_request_handler("/large", None, {
3848 let large_payload = large_payload.clone();
3849 move |_link_id, _path, _data, _remote| Some(large_payload.clone())
3850 });
3851
3852 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
3854 assert!(!req_actions.is_empty());
3855
3856 let req_raw = extract_any_send_packet(&req_actions);
3858 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
3859 let resp_actions = resp_mgr.handle_local_delivery(
3860 req_pkt.destination_hash,
3861 &req_raw,
3862 req_pkt.packet_hash,
3863 rns_core::transport::types::InterfaceId(0),
3864 &mut rng,
3865 );
3866
3867 let mut has_resource_adv = false;
3868 let mut has_direct_response = false;
3869 for action in &resp_actions {
3870 if let LinkManagerAction::SendPacket { raw, .. } = action {
3871 let pkt = RawPacket::unpack(raw).unwrap();
3872 if pkt.context == constants::CONTEXT_RESOURCE_ADV {
3873 has_resource_adv = true;
3874 }
3875 if pkt.context == constants::CONTEXT_RESPONSE {
3876 has_direct_response = true;
3877 }
3878 }
3879 }
3880
3881 assert!(
3882 has_resource_adv,
3883 "Large response should advertise a response resource"
3884 );
3885 assert!(
3886 !has_direct_response,
3887 "Large response should not use direct CONTEXT_RESPONSE packet"
3888 );
3889 }
3890
3891 #[test]
3892 fn test_send_channel_message_on_no_channel() {
3893 let mut mgr = LinkManager::new();
3894 let mut rng = OsRng;
3895 let dummy_sig = [0xAA; 32];
3896 let (link_id, _) =
3897 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3898
3899 let err = mgr
3901 .send_channel_message(&link_id, 1, b"test", &mut rng)
3902 .expect_err("pending link should reject channel send");
3903 assert_eq!(err, "link has no active channel");
3904 }
3905
3906 #[test]
3907 fn test_send_on_link_requires_active() {
3908 let mut mgr = LinkManager::new();
3909 let mut rng = OsRng;
3910 let dummy_sig = [0xAA; 32];
3911 let (link_id, _) =
3912 mgr.create_link(&[0x11; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3913
3914 let actions = mgr.send_on_link(&link_id, b"test", constants::CONTEXT_NONE, &mut rng);
3915 assert!(actions.is_empty(), "Cannot send on pending link");
3916 }
3917
3918 #[test]
3919 fn test_send_on_link_unknown_link() {
3920 let mgr = LinkManager::new();
3921 let mut rng = OsRng;
3922
3923 let actions = mgr.send_on_link(&[0xFF; 16], b"test", constants::CONTEXT_NONE, &mut rng);
3924 assert!(actions.is_empty());
3925 }
3926
3927 #[test]
3928 fn test_resource_full_transfer_large() {
3929 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
3930 let mut rng = OsRng;
3931
3932 resp_mgr.set_resource_strategy(&link_id, ResourceStrategy::AcceptAll);
3933
3934 let original_data: Vec<u8> = (0..2000u32)
3936 .map(|i| {
3937 let pos = i as usize;
3938 (pos ^ (pos >> 8) ^ (pos >> 16)) as u8
3939 })
3940 .collect();
3941
3942 let adv_actions = init_mgr.send_resource(&link_id, &original_data, None, &mut rng);
3943
3944 let mut pending: Vec<(char, LinkManagerAction)> =
3945 adv_actions.into_iter().map(|a| ('i', a)).collect();
3946 let mut rounds = 0;
3947 let max_rounds = 200;
3948 let mut resource_received = false;
3949 let mut sender_completed = false;
3950
3951 while !pending.is_empty() && rounds < max_rounds {
3952 rounds += 1;
3953 let mut next: Vec<(char, LinkManagerAction)> = Vec::new();
3954
3955 for (source, action) in pending.drain(..) {
3956 if let LinkManagerAction::SendPacket { raw, .. } = action {
3957 let pkt = match RawPacket::unpack(&raw) {
3958 Ok(p) => p,
3959 Err(_) => continue,
3960 };
3961
3962 let target_actions = if source == 'i' {
3963 resp_mgr.handle_local_delivery(
3964 pkt.destination_hash,
3965 &raw,
3966 pkt.packet_hash,
3967 rns_core::transport::types::InterfaceId(0),
3968 &mut rng,
3969 )
3970 } else {
3971 init_mgr.handle_local_delivery(
3972 pkt.destination_hash,
3973 &raw,
3974 pkt.packet_hash,
3975 rns_core::transport::types::InterfaceId(0),
3976 &mut rng,
3977 )
3978 };
3979
3980 let target_source = if source == 'i' { 'r' } else { 'i' };
3981 for a in &target_actions {
3982 match a {
3983 LinkManagerAction::ResourceReceived { data, .. } => {
3984 assert_eq!(*data, original_data);
3985 resource_received = true;
3986 }
3987 LinkManagerAction::ResourceCompleted { .. } => {
3988 sender_completed = true;
3989 }
3990 _ => {}
3991 }
3992 }
3993 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
3994 }
3995 }
3996 pending = next;
3997 }
3998
3999 assert!(
4000 resource_received,
4001 "Should receive large resource (rounds={})",
4002 rounds
4003 );
4004 assert!(
4005 sender_completed,
4006 "Sender should complete (rounds={})",
4007 rounds
4008 );
4009 }
4010
4011 #[test]
4012 fn test_process_resource_actions_mapping() {
4013 let (init_mgr, _resp_mgr, link_id) = setup_active_link();
4014 let mut rng = OsRng;
4015
4016 let actions = vec![
4018 ResourceAction::DataReceived {
4019 data: vec![1, 2, 3],
4020 metadata: Some(vec![4, 5]),
4021 },
4022 ResourceAction::Completed,
4023 ResourceAction::Failed(rns_core::resource::ResourceError::Timeout),
4024 ResourceAction::ProgressUpdate {
4025 received: 10,
4026 total: 20,
4027 },
4028 ];
4029
4030 let result = init_mgr.process_resource_actions(&link_id, actions, &mut rng);
4031
4032 assert!(matches!(
4033 result[0],
4034 LinkManagerAction::ResourceReceived { .. }
4035 ));
4036 assert!(matches!(
4037 result[1],
4038 LinkManagerAction::ResourceCompleted { .. }
4039 ));
4040 assert!(matches!(
4041 result[2],
4042 LinkManagerAction::ResourceFailed { .. }
4043 ));
4044 assert!(matches!(
4045 result[3],
4046 LinkManagerAction::ResourceProgress {
4047 received: 10,
4048 total: 20,
4049 ..
4050 }
4051 ));
4052 }
4053
4054 #[test]
4055 fn test_link_state_empty() {
4056 let mgr = LinkManager::new();
4057 let fake_id = [0xAA; 16];
4058 assert!(mgr.link_state(&fake_id).is_none());
4059 }
4060
4061 #[test]
4062 fn test_large_response_resource_completes_as_response() {
4063 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4064 let mut rng = OsRng;
4065
4066 let large_payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
4067 let response_value =
4068 rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(large_payload));
4069 resp_mgr.register_request_handler("/large", None, {
4070 let response_value = response_value.clone();
4071 move |_link_id, _path, _data, _remote| Some(response_value.clone())
4072 });
4073
4074 let req_actions = init_mgr.send_request(&link_id, "/large", b"\xc0", &mut rng);
4075 let req_raw = extract_any_send_packet(&req_actions);
4076 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4077 let request_id = req_pkt.get_truncated_hash();
4078 let resp_actions = resp_mgr.handle_local_delivery(
4079 req_pkt.destination_hash,
4080 &req_raw,
4081 req_pkt.packet_hash,
4082 rns_core::transport::types::InterfaceId(0),
4083 &mut rng,
4084 );
4085
4086 let mut pending: Vec<(char, LinkManagerAction)> =
4087 resp_actions.into_iter().map(|a| ('r', a)).collect();
4088 let mut rounds = 0;
4089 let mut received_response = None;
4090
4091 while !pending.is_empty() && rounds < 200 {
4092 rounds += 1;
4093 let mut next = Vec::new();
4094
4095 for (source, action) in pending.drain(..) {
4096 let LinkManagerAction::SendPacket { raw, .. } = action else {
4097 continue;
4098 };
4099 let pkt = RawPacket::unpack(&raw).unwrap();
4100 let target_actions = if source == 'r' {
4101 init_mgr.handle_local_delivery(
4102 pkt.destination_hash,
4103 &raw,
4104 pkt.packet_hash,
4105 rns_core::transport::types::InterfaceId(0),
4106 &mut rng,
4107 )
4108 } else {
4109 resp_mgr.handle_local_delivery(
4110 pkt.destination_hash,
4111 &raw,
4112 pkt.packet_hash,
4113 rns_core::transport::types::InterfaceId(0),
4114 &mut rng,
4115 )
4116 };
4117
4118 let target_source = if source == 'r' { 'i' } else { 'r' };
4119 for target_action in &target_actions {
4120 match target_action {
4121 LinkManagerAction::ResponseReceived {
4122 request_id: rid,
4123 data,
4124 ..
4125 } => {
4126 received_response = Some((*rid, data.clone()));
4127 }
4128 LinkManagerAction::ResourceReceived { .. } => {
4129 panic!("response resources must complete as ResponseReceived")
4130 }
4131 LinkManagerAction::ResourceAcceptQuery { .. } => {
4132 panic!("response resources must bypass application acceptance")
4133 }
4134 _ => {}
4135 }
4136 }
4137 next.extend(target_actions.into_iter().map(|a| (target_source, a)));
4138 }
4139
4140 pending = next;
4141 }
4142
4143 let (received_request_id, received_data) = received_response.unwrap_or_else(|| {
4144 panic!(
4145 "large response resource did not complete as ResponseReceived after {} rounds",
4146 rounds
4147 )
4148 });
4149 assert_eq!(received_request_id, request_id);
4150 assert_eq!(received_data, response_value);
4151 }
4152
4153 #[test]
4154 fn test_negotiated_mtu_response_uses_resource_before_global_mtu() {
4155 let (mut init_mgr, mut resp_mgr, link_id) = setup_active_link();
4156 let mut rng = OsRng;
4157
4158 init_mgr.set_link_mtu(&link_id, 300);
4159 resp_mgr.set_link_mtu(&link_id, 300);
4160
4161 let payload = vec![0xAB; 350];
4162 let response_value =
4163 rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4164 resp_mgr.register_request_handler("/mtu", None, {
4165 let response_value = response_value.clone();
4166 move |_link_id, _path, _data, _remote| Some(response_value.clone())
4167 });
4168
4169 let req_actions = init_mgr.send_request(&link_id, "/mtu", b"\xc0", &mut rng);
4170 let req_raw = extract_any_send_packet(&req_actions);
4171 let req_pkt = RawPacket::unpack(&req_raw).unwrap();
4172 let resp_actions = resp_mgr.handle_local_delivery(
4173 req_pkt.destination_hash,
4174 &req_raw,
4175 req_pkt.packet_hash,
4176 rns_core::transport::types::InterfaceId(0),
4177 &mut rng,
4178 );
4179
4180 let mut has_resource_adv = false;
4181 let mut direct_response_len = None;
4182 for action in &resp_actions {
4183 if let LinkManagerAction::SendPacket { raw, .. } = action {
4184 let pkt = RawPacket::unpack(raw).unwrap();
4185 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4186 if pkt.context == constants::CONTEXT_RESPONSE {
4187 direct_response_len = Some(raw.len());
4188 }
4189 }
4190 }
4191
4192 assert!(
4193 has_resource_adv,
4194 "responses larger than the negotiated link MTU should use resource fallback"
4195 );
4196 assert!(
4197 direct_response_len.is_none(),
4198 "sent direct response of {} bytes on a 300 byte negotiated MTU",
4199 direct_response_len.unwrap_or_default()
4200 );
4201 }
4202
4203 #[test]
4204 fn test_large_management_response_uses_resource_fallback() {
4205 let (_init_mgr, mut resp_mgr, link_id) = setup_active_link();
4206 let mut rng = OsRng;
4207
4208 let payload = vec![0xBC; 5000];
4209 let response_value =
4210 rns_core::msgpack::pack(&rns_core::msgpack::Value::Bin(payload));
4211 let actions = resp_mgr.send_management_response(
4212 &link_id,
4213 &[0x55; 16],
4214 &response_value,
4215 &mut rng,
4216 );
4217
4218 let mut has_resource_adv = false;
4219 let mut has_direct_response = false;
4220 for action in &actions {
4221 if let LinkManagerAction::SendPacket { raw, .. } = action {
4222 let pkt = RawPacket::unpack(raw).unwrap();
4223 has_resource_adv |= pkt.context == constants::CONTEXT_RESOURCE_ADV;
4224 has_direct_response |= pkt.context == constants::CONTEXT_RESPONSE;
4225 }
4226 }
4227
4228 assert!(
4229 has_resource_adv,
4230 "large management responses should advertise a response resource"
4231 );
4232 assert!(
4233 !has_direct_response,
4234 "large management responses should not use a direct CONTEXT_RESPONSE packet"
4235 );
4236 }
4237}