1use std::collections::HashMap;
4
5use rns_core::packet::RawPacket;
6use rns_core::transport::tables::PathEntry;
7use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
8use rns_core::transport::TransportEngine;
9use rns_crypto::{OsRng, Rng};
10
11#[cfg(feature = "rns-hooks")]
12use rns_hooks::{create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot};
13
14use crate::event::{
15 BlackholeInfo, Event, EventReceiver, InterfaceStatsResponse, LocalDestinationEntry,
16 NextHopResponse, PathTableEntry, QueryRequest, QueryResponse, RateTableEntry,
17 SingleInterfaceStat,
18};
19use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
20use crate::ifac;
21use crate::interface::{InterfaceEntry, InterfaceStats};
22use crate::link_manager::{LinkManager, LinkManagerAction};
23use crate::time;
24
25#[cfg(feature = "rns-hooks")]
27struct EngineRef<'a> {
28 engine: &'a TransportEngine,
29 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
30 link_manager: &'a LinkManager,
31 now: f64,
32}
33
34#[cfg(feature = "rns-hooks")]
35impl<'a> EngineAccess for EngineRef<'a> {
36 fn has_path(&self, dest: &[u8; 16]) -> bool {
37 self.engine.has_path(dest)
38 }
39 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
40 self.engine.hops_to(dest)
41 }
42 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
43 self.engine.next_hop(dest)
44 }
45 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
46 self.engine.is_blackholed(identity, self.now)
47 }
48 fn interface_name(&self, id: u64) -> Option<String> {
49 self.interfaces
50 .get(&InterfaceId(id))
51 .map(|e| e.info.name.clone())
52 }
53 fn interface_mode(&self, id: u64) -> Option<u8> {
54 self.interfaces.get(&InterfaceId(id)).map(|e| e.info.mode)
55 }
56 fn identity_hash(&self) -> Option<[u8; 16]> {
57 self.engine.identity_hash().copied()
58 }
59 fn announce_rate(&self, id: u64) -> Option<i32> {
60 self.interfaces
61 .get(&InterfaceId(id))
62 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
63 }
64 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
65 use rns_core::link::types::LinkState;
66 self.link_manager.link_state(link_hash).map(|s| match s {
67 LinkState::Pending => 0,
68 LinkState::Handshake => 1,
69 LinkState::Active => 2,
70 LinkState::Stale => 3,
71 LinkState::Closed => 4,
72 })
73 }
74}
75
76#[cfg(any(test, feature = "rns-hooks"))]
81fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
82 let mut dest = [0u8; 16];
83 if raw.is_empty() {
84 return dest;
85 }
86 let is_header2 = raw[0] & 0x40 != 0;
87 let start = if is_header2 { 18 } else { 2 };
88 let end = start + 16;
89 if raw.len() >= end {
90 dest.copy_from_slice(&raw[start..end]);
91 }
92 dest
93}
94
95#[cfg(feature = "rns-hooks")]
97fn run_hook_inner(
98 programs: &mut [rns_hooks::LoadedProgram],
99 hook_manager: &Option<HookManager>,
100 engine_access: &dyn EngineAccess,
101 ctx: &HookContext,
102 now: f64,
103) -> Option<rns_hooks::ExecuteResult> {
104 if programs.is_empty() {
105 return None;
106 }
107 let mgr = hook_manager.as_ref()?;
108 mgr.run_chain(programs, ctx, engine_access, now)
109}
110
111#[cfg(feature = "rns-hooks")]
113fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
114 actions
115 .into_iter()
116 .map(|a| {
117 use rns_hooks::ActionWire;
118 match a {
119 ActionWire::SendOnInterface { interface, raw } => {
120 TransportAction::SendOnInterface {
121 interface: InterfaceId(interface),
122 raw,
123 }
124 }
125 ActionWire::BroadcastOnAllInterfaces {
126 raw,
127 exclude,
128 has_exclude,
129 } => TransportAction::BroadcastOnAllInterfaces {
130 raw,
131 exclude: if has_exclude != 0 {
132 Some(InterfaceId(exclude))
133 } else {
134 None
135 },
136 },
137 ActionWire::DeliverLocal {
138 destination_hash,
139 raw,
140 packet_hash,
141 receiving_interface,
142 } => TransportAction::DeliverLocal {
143 destination_hash,
144 raw,
145 packet_hash,
146 receiving_interface: InterfaceId(receiving_interface),
147 },
148 ActionWire::PathUpdated {
149 destination_hash,
150 hops,
151 next_hop,
152 interface,
153 } => TransportAction::PathUpdated {
154 destination_hash,
155 hops,
156 next_hop,
157 interface: InterfaceId(interface),
158 },
159 ActionWire::CacheAnnounce { packet_hash, raw } => {
160 TransportAction::CacheAnnounce { packet_hash, raw }
161 }
162 ActionWire::TunnelEstablished {
163 tunnel_id,
164 interface,
165 } => TransportAction::TunnelEstablished {
166 tunnel_id,
167 interface: InterfaceId(interface),
168 },
169 ActionWire::TunnelSynthesize {
170 interface,
171 data,
172 dest_hash,
173 } => TransportAction::TunnelSynthesize {
174 interface: InterfaceId(interface),
175 data,
176 dest_hash,
177 },
178 ActionWire::ForwardToLocalClients {
179 raw,
180 exclude,
181 has_exclude,
182 } => TransportAction::ForwardToLocalClients {
183 raw,
184 exclude: if has_exclude != 0 {
185 Some(InterfaceId(exclude))
186 } else {
187 None
188 },
189 },
190 ActionWire::ForwardPlainBroadcast {
191 raw,
192 to_local,
193 exclude,
194 has_exclude,
195 } => TransportAction::ForwardPlainBroadcast {
196 raw,
197 to_local: to_local != 0,
198 exclude: if has_exclude != 0 {
199 Some(InterfaceId(exclude))
200 } else {
201 None
202 },
203 },
204 ActionWire::AnnounceReceived {
205 destination_hash,
206 identity_hash,
207 public_key,
208 name_hash,
209 random_hash,
210 app_data,
211 hops,
212 receiving_interface,
213 } => TransportAction::AnnounceReceived {
214 destination_hash,
215 identity_hash,
216 public_key,
217 name_hash,
218 random_hash,
219 app_data,
220 hops,
221 receiving_interface: InterfaceId(receiving_interface),
222 },
223 }
224 })
225 .collect()
226}
227
228fn infer_interface_type(name: &str) -> String {
232 if name.starts_with("TCPServerInterface") {
233 "TCPServerClientInterface".to_string()
234 } else if name.starts_with("BackboneInterface") {
235 "BackboneInterface".to_string()
236 } else if name.starts_with("LocalInterface") {
237 "LocalServerClientInterface".to_string()
238 } else {
239 "AutoInterface".to_string()
242 }
243}
244
245pub use crate::common::callbacks::Callbacks;
246
247pub struct Driver {
249 pub(crate) engine: TransportEngine,
250 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
251 pub(crate) rng: OsRng,
252 pub(crate) rx: EventReceiver,
253 pub(crate) callbacks: Box<dyn Callbacks>,
254 pub(crate) started: f64,
255 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
256 pub(crate) tunnel_synth_dest: [u8; 16],
258 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
260 pub(crate) link_manager: LinkManager,
262 pub(crate) management_config: crate::management::ManagementConfig,
264 pub(crate) last_management_announce: f64,
266 pub(crate) initial_announce_sent: bool,
268 pub(crate) known_destinations: HashMap<[u8; 16], crate::destination::AnnouncedIdentity>,
270 pub(crate) path_request_dest: [u8; 16],
272 pub(crate) proof_strategies: HashMap<
275 [u8; 16],
276 (
277 rns_core::types::ProofStrategy,
278 Option<rns_crypto::identity::Identity>,
279 ),
280 >,
281 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
283 pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
285 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
287 pub(crate) holepunch_manager: HolePunchManager,
289 pub(crate) event_tx: crate::event::EventSender,
291 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
293 pub(crate) discovery_required_value: u8,
295 pub(crate) discovery_name_hash: [u8; 10],
297 pub(crate) probe_responder_hash: Option<[u8; 16]>,
299 pub(crate) discover_interfaces: bool,
301 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
303 pub(crate) discovery_cleanup_counter: u32,
305 #[cfg(feature = "rns-hooks")]
307 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
308 #[cfg(feature = "rns-hooks")]
310 pub(crate) hook_manager: Option<HookManager>,
311}
312
313impl Driver {
314 pub fn new(
316 config: TransportConfig,
317 rx: EventReceiver,
318 tx: crate::event::EventSender,
319 callbacks: Box<dyn Callbacks>,
320 ) -> Self {
321 let tunnel_synth_dest = rns_core::destination::destination_hash(
322 "rnstransport",
323 &["tunnel", "synthesize"],
324 None,
325 );
326 let path_request_dest =
327 rns_core::destination::destination_hash("rnstransport", &["path", "request"], None);
328 let discovery_name_hash = crate::discovery::discovery_name_hash();
329 let mut engine = TransportEngine::new(config);
330 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
331 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
333 let mut local_destinations = HashMap::new();
336 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
337 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
338 Driver {
339 engine,
340 interfaces: HashMap::new(),
341 rng: OsRng,
342 rx,
343 callbacks,
344 started: time::now(),
345 announce_cache: None,
346 tunnel_synth_dest,
347 transport_identity: None,
348 link_manager: LinkManager::new(),
349 management_config: Default::default(),
350 last_management_announce: 0.0,
351 initial_announce_sent: false,
352 known_destinations: HashMap::new(),
353 path_request_dest,
354 proof_strategies: HashMap::new(),
355 sent_packets: HashMap::new(),
356 completed_proofs: HashMap::new(),
357 local_destinations,
358 holepunch_manager: HolePunchManager::new(
359 vec![],
360 rns_core::holepunch::ProbeProtocol::Rnsp,
361 None,
362 ),
363 event_tx: tx,
364 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
365 std::env::temp_dir().join("rns-discovered-interfaces"),
366 ),
367 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
368 discovery_name_hash,
369 probe_responder_hash: None,
370 discover_interfaces: false,
371 interface_announcer: None,
372 discovery_cleanup_counter: 0,
373 #[cfg(feature = "rns-hooks")]
374 hook_slots: create_hook_slots(),
375 #[cfg(feature = "rns-hooks")]
376 hook_manager: HookManager::new().ok(),
377 }
378 }
379
380 pub fn set_probe_config(
382 &mut self,
383 addrs: Vec<std::net::SocketAddr>,
384 protocol: rns_core::holepunch::ProbeProtocol,
385 device: Option<String>,
386 ) {
387 self.holepunch_manager = HolePunchManager::new(addrs, protocol, device);
388 }
389
390 pub fn run(&mut self) {
392 loop {
393 let event = match self.rx.recv() {
394 Ok(e) => e,
395 Err(_) => break, };
397
398 match event {
399 Event::Frame { interface_id, data } => {
400 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
402 log::debug!(
403 "Announce:frame from iface {} (len={}, flags=0x{:02x})",
404 interface_id.0,
405 data.len(),
406 data[0]
407 );
408 }
409 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
411 entry.stats.rxb += data.len() as u64;
412 entry.stats.rx_packets += 1;
413 }
414
415 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
417 if let Some(ref ifac_state) = entry.ifac {
418 match ifac::unmask_inbound(&data, ifac_state) {
420 Some(unmasked) => unmasked,
421 None => {
422 log::debug!("[{}] IFAC rejected packet", interface_id.0);
423 continue;
424 }
425 }
426 } else {
427 if data.len() > 2 && data[0] & 0x80 == 0x80 {
429 log::debug!(
430 "[{}] dropping packet with IFAC flag on non-IFAC interface",
431 interface_id.0
432 );
433 continue;
434 }
435 data
436 }
437 } else {
438 data
439 };
440
441 #[cfg(feature = "rns-hooks")]
443 {
444 let pkt_ctx = rns_hooks::PacketContext {
445 flags: if packet.is_empty() { 0 } else { packet[0] },
446 hops: if packet.len() > 1 { packet[1] } else { 0 },
447 destination_hash: extract_dest_hash(&packet),
448 context: 0,
449 packet_hash: [0; 32],
450 interface_id: interface_id.0,
451 data_offset: 0,
452 data_len: packet.len() as u32,
453 };
454 let ctx = HookContext::Packet {
455 ctx: &pkt_ctx,
456 raw: &packet,
457 };
458 let now = time::now();
459 let engine_ref = EngineRef {
460 engine: &self.engine,
461 interfaces: &self.interfaces,
462 link_manager: &self.link_manager,
463 now,
464 };
465 {
466 let exec = run_hook_inner(
467 &mut self.hook_slots[HookPoint::PreIngress as usize].programs,
468 &self.hook_manager,
469 &engine_ref,
470 &ctx,
471 now,
472 );
473 if let Some(ref e) = exec {
474 if !e.injected_actions.is_empty() {
475 let extra =
476 convert_injected_actions(e.injected_actions.clone());
477 self.dispatch_all(extra);
478 }
479 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
480 continue;
481 }
482 }
483 }
484 }
485
486 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
488 let now = time::now();
489 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
490 entry.stats.record_incoming_announce(now);
491 }
492 }
493
494 if let Some(entry) = self.interfaces.get(&interface_id) {
496 self.engine.update_interface_freq(
497 interface_id,
498 entry.stats.incoming_announce_freq(),
499 );
500 }
501
502 let actions = self.engine.handle_inbound(
503 &packet,
504 interface_id,
505 time::now(),
506 &mut self.rng,
507 );
508
509 #[cfg(feature = "rns-hooks")]
511 {
512 let pkt_ctx2 = rns_hooks::PacketContext {
513 flags: if packet.is_empty() { 0 } else { packet[0] },
514 hops: if packet.len() > 1 { packet[1] } else { 0 },
515 destination_hash: extract_dest_hash(&packet),
516 context: 0,
517 packet_hash: [0; 32],
518 interface_id: interface_id.0,
519 data_offset: 0,
520 data_len: packet.len() as u32,
521 };
522 let ctx = HookContext::Packet {
523 ctx: &pkt_ctx2,
524 raw: &packet,
525 };
526 let now = time::now();
527 let engine_ref = EngineRef {
528 engine: &self.engine,
529 interfaces: &self.interfaces,
530 link_manager: &self.link_manager,
531 now,
532 };
533 if let Some(ref e) = run_hook_inner(
534 &mut self.hook_slots[HookPoint::PreDispatch as usize].programs,
535 &self.hook_manager,
536 &engine_ref,
537 &ctx,
538 now,
539 ) {
540 if !e.injected_actions.is_empty() {
541 self.dispatch_all(convert_injected_actions(
542 e.injected_actions.clone(),
543 ));
544 }
545 }
546 }
547
548 self.dispatch_all(actions);
549 }
550 Event::Tick => {
551 #[cfg(feature = "rns-hooks")]
553 {
554 let ctx = HookContext::Tick;
555 let now = time::now();
556 let engine_ref = EngineRef {
557 engine: &self.engine,
558 interfaces: &self.interfaces,
559 link_manager: &self.link_manager,
560 now,
561 };
562 if let Some(ref e) = run_hook_inner(
563 &mut self.hook_slots[HookPoint::Tick as usize].programs,
564 &self.hook_manager,
565 &engine_ref,
566 &ctx,
567 now,
568 ) {
569 if !e.injected_actions.is_empty() {
570 self.dispatch_all(convert_injected_actions(
571 e.injected_actions.clone(),
572 ));
573 }
574 }
575 }
576
577 let now = time::now();
578 for (id, entry) in &self.interfaces {
580 self.engine
581 .update_interface_freq(*id, entry.stats.incoming_announce_freq());
582 }
583 let actions = self.engine.tick(now, &mut self.rng);
584 self.dispatch_all(actions);
585 let link_actions = self.link_manager.tick(&mut self.rng);
587 self.dispatch_link_actions(link_actions);
588 {
590 let tx = self.get_event_sender();
591 let hp_actions = self.holepunch_manager.tick(&tx);
592 self.dispatch_holepunch_actions(hp_actions);
593 }
594 self.tick_management_announces(now);
596 self.sent_packets
598 .retain(|_, (_, sent_time)| now - *sent_time < 60.0);
599 self.completed_proofs
601 .retain(|_, (_, received)| now - *received < 120.0);
602
603 self.tick_discovery_announcer(now);
604
605 if self.discover_interfaces {
607 self.discovery_cleanup_counter += 1;
608 if self.discovery_cleanup_counter >= 3600 {
609 self.discovery_cleanup_counter = 0;
610 if let Ok(removed) = self.discovered_interfaces.cleanup() {
611 if removed > 0 {
612 log::info!(
613 "Discovery cleanup: removed {} stale entries",
614 removed
615 );
616 }
617 }
618 }
619 }
620 }
621 Event::InterfaceUp(id, new_writer, info) => {
622 let wants_tunnel;
623 if let Some(mut info) = info {
624 log::info!("[{}] dynamic interface registered", id.0);
626 wants_tunnel = info.wants_tunnel;
627 let iface_type = infer_interface_type(&info.name);
628 info.started = time::now();
630 self.engine.register_interface(info.clone());
631 if let Some(writer) = new_writer {
632 self.interfaces.insert(
633 id,
634 InterfaceEntry {
635 id,
636 info,
637 writer,
638 online: true,
639 dynamic: true,
640 ifac: None,
641 stats: InterfaceStats {
642 started: time::now(),
643 ..Default::default()
644 },
645 interface_type: iface_type,
646 },
647 );
648 }
649 self.callbacks.on_interface_up(id);
650 #[cfg(feature = "rns-hooks")]
651 {
652 let ctx = HookContext::Interface { interface_id: id.0 };
653 let now = time::now();
654 let engine_ref = EngineRef {
655 engine: &self.engine,
656 interfaces: &self.interfaces,
657 link_manager: &self.link_manager,
658 now,
659 };
660 if let Some(ref e) = run_hook_inner(
661 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
662 &self.hook_manager,
663 &engine_ref,
664 &ctx,
665 now,
666 ) {
667 if !e.injected_actions.is_empty() {
668 self.dispatch_all(convert_injected_actions(
669 e.injected_actions.clone(),
670 ));
671 }
672 }
673 }
674 } else if let Some(entry) = self.interfaces.get_mut(&id) {
675 log::info!("[{}] interface online", id.0);
677 wants_tunnel = entry.info.wants_tunnel;
678 entry.online = true;
679 if let Some(writer) = new_writer {
680 log::info!("[{}] writer refreshed after reconnect", id.0);
681 entry.writer = writer;
682 }
683 self.callbacks.on_interface_up(id);
684 #[cfg(feature = "rns-hooks")]
685 {
686 let ctx = HookContext::Interface { interface_id: id.0 };
687 let now = time::now();
688 let engine_ref = EngineRef {
689 engine: &self.engine,
690 interfaces: &self.interfaces,
691 link_manager: &self.link_manager,
692 now,
693 };
694 if let Some(ref e) = run_hook_inner(
695 &mut self.hook_slots[HookPoint::InterfaceUp as usize].programs,
696 &self.hook_manager,
697 &engine_ref,
698 &ctx,
699 now,
700 ) {
701 if !e.injected_actions.is_empty() {
702 self.dispatch_all(convert_injected_actions(
703 e.injected_actions.clone(),
704 ));
705 }
706 }
707 }
708 } else {
709 wants_tunnel = false;
710 }
711
712 if wants_tunnel {
714 self.synthesize_tunnel_for_interface(id);
715 }
716 }
717 Event::InterfaceDown(id) => {
718 if let Some(entry) = self.interfaces.get(&id) {
720 if let Some(tunnel_id) = entry.info.tunnel_id {
721 self.engine.void_tunnel_interface(&tunnel_id);
722 }
723 }
724
725 if let Some(entry) = self.interfaces.get(&id) {
726 if entry.dynamic {
727 log::info!("[{}] dynamic interface removed", id.0);
729 self.engine.deregister_interface(id);
730 self.interfaces.remove(&id);
731 } else {
732 log::info!("[{}] interface offline", id.0);
734 self.interfaces.get_mut(&id).unwrap().online = false;
735 }
736 self.callbacks.on_interface_down(id);
737 #[cfg(feature = "rns-hooks")]
738 {
739 let ctx = HookContext::Interface { interface_id: id.0 };
740 let now = time::now();
741 let engine_ref = EngineRef {
742 engine: &self.engine,
743 interfaces: &self.interfaces,
744 link_manager: &self.link_manager,
745 now,
746 };
747 if let Some(ref e) = run_hook_inner(
748 &mut self.hook_slots[HookPoint::InterfaceDown as usize].programs,
749 &self.hook_manager,
750 &engine_ref,
751 &ctx,
752 now,
753 ) {
754 if !e.injected_actions.is_empty() {
755 self.dispatch_all(convert_injected_actions(
756 e.injected_actions.clone(),
757 ));
758 }
759 }
760 }
761 }
762 }
763 Event::SendOutbound {
764 raw,
765 dest_type,
766 attached_interface,
767 } => {
768 match RawPacket::unpack(&raw) {
769 Ok(packet) => {
770 let is_announce = packet.flags.packet_type
771 == rns_core::constants::PACKET_TYPE_ANNOUNCE;
772 if is_announce {
773 log::debug!("SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
774 &packet.destination_hash[..4], raw.len(), dest_type, attached_interface);
775 }
776 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
778 self.sent_packets.insert(
779 packet.packet_hash,
780 (packet.destination_hash, time::now()),
781 );
782 }
783 let actions = self.engine.handle_outbound(
784 &packet,
785 dest_type,
786 attached_interface,
787 time::now(),
788 );
789 if is_announce {
790 log::debug!(
791 "SendOutbound: announce routed to {} actions: {:?}",
792 actions.len(),
793 actions
794 .iter()
795 .map(|a| match a {
796 TransportAction::SendOnInterface {
797 interface, ..
798 } => format!("SendOn({})", interface.0),
799 TransportAction::BroadcastOnAllInterfaces {
800 ..
801 } => "BroadcastAll".to_string(),
802 _ => "other".to_string(),
803 })
804 .collect::<Vec<_>>()
805 );
806 }
807 self.dispatch_all(actions);
808 }
809 Err(e) => {
810 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
811 }
812 }
813 }
814 Event::RegisterDestination {
815 dest_hash,
816 dest_type,
817 } => {
818 self.engine.register_destination(dest_hash, dest_type);
819 self.local_destinations.insert(dest_hash, dest_type);
820 }
821 Event::DeregisterDestination { dest_hash } => {
822 self.engine.deregister_destination(&dest_hash);
823 self.local_destinations.remove(&dest_hash);
824 }
825 Event::Query(request, response_tx) => {
826 let response = self.handle_query_mut(request);
827 let _ = response_tx.send(response);
828 }
829 Event::DeregisterLinkDestination { dest_hash } => {
830 self.link_manager.deregister_link_destination(&dest_hash);
831 }
832 Event::RegisterLinkDestination {
833 dest_hash,
834 sig_prv_bytes,
835 sig_pub_bytes,
836 resource_strategy,
837 } => {
838 let sig_prv =
839 rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
840 let strat = match resource_strategy {
841 1 => crate::link_manager::ResourceStrategy::AcceptAll,
842 2 => crate::link_manager::ResourceStrategy::AcceptApp,
843 _ => crate::link_manager::ResourceStrategy::AcceptNone,
844 };
845 self.link_manager.register_link_destination(
846 dest_hash,
847 sig_prv,
848 sig_pub_bytes,
849 strat,
850 );
851 self.engine
853 .register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
854 self.local_destinations
855 .insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
856 }
857 Event::RegisterRequestHandler {
858 path,
859 allowed_list,
860 handler,
861 } => {
862 self.link_manager.register_request_handler(
863 &path,
864 allowed_list,
865 move |link_id, p, data, remote| handler(link_id, p, data, remote),
866 );
867 }
868 Event::CreateLink {
869 dest_hash,
870 dest_sig_pub_bytes,
871 response_tx,
872 } => {
873 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
874 let mtu = self
875 .engine
876 .next_hop_interface(&dest_hash)
877 .and_then(|iface_id| self.interfaces.get(&iface_id))
878 .map(|entry| entry.info.mtu)
879 .unwrap_or(rns_core::constants::MTU as u32);
880 let (link_id, link_actions) = self.link_manager.create_link(
881 &dest_hash,
882 &dest_sig_pub_bytes,
883 hops,
884 mtu,
885 &mut self.rng,
886 );
887 let _ = response_tx.send(link_id);
888 self.dispatch_link_actions(link_actions);
889 }
890 Event::SendRequest {
891 link_id,
892 path,
893 data,
894 } => {
895 let link_actions =
896 self.link_manager
897 .send_request(&link_id, &path, &data, &mut self.rng);
898 self.dispatch_link_actions(link_actions);
899 }
900 Event::IdentifyOnLink {
901 link_id,
902 identity_prv_key,
903 } => {
904 let identity =
905 rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
906 let link_actions =
907 self.link_manager
908 .identify(&link_id, &identity, &mut self.rng);
909 self.dispatch_link_actions(link_actions);
910 }
911 Event::TeardownLink { link_id } => {
912 let link_actions = self.link_manager.teardown_link(&link_id);
913 self.dispatch_link_actions(link_actions);
914 }
915 Event::SendResource {
916 link_id,
917 data,
918 metadata,
919 } => {
920 let link_actions = self.link_manager.send_resource(
921 &link_id,
922 &data,
923 metadata.as_deref(),
924 &mut self.rng,
925 );
926 self.dispatch_link_actions(link_actions);
927 }
928 Event::SetResourceStrategy { link_id, strategy } => {
929 use crate::link_manager::ResourceStrategy;
930 let strat = match strategy {
931 0 => ResourceStrategy::AcceptNone,
932 1 => ResourceStrategy::AcceptAll,
933 2 => ResourceStrategy::AcceptApp,
934 _ => ResourceStrategy::AcceptNone,
935 };
936 self.link_manager.set_resource_strategy(&link_id, strat);
937 }
938 Event::AcceptResource {
939 link_id,
940 resource_hash,
941 accept,
942 } => {
943 let link_actions = self.link_manager.accept_resource(
944 &link_id,
945 &resource_hash,
946 accept,
947 &mut self.rng,
948 );
949 self.dispatch_link_actions(link_actions);
950 }
951 Event::SendChannelMessage {
952 link_id,
953 msgtype,
954 payload,
955 } => {
956 let link_actions = self.link_manager.send_channel_message(
957 &link_id,
958 msgtype,
959 &payload,
960 &mut self.rng,
961 );
962 self.dispatch_link_actions(link_actions);
963 }
964 Event::SendOnLink {
965 link_id,
966 data,
967 context,
968 } => {
969 let link_actions =
970 self.link_manager
971 .send_on_link(&link_id, &data, context, &mut self.rng);
972 self.dispatch_link_actions(link_actions);
973 }
974 Event::RequestPath { dest_hash } => {
975 self.handle_request_path(dest_hash);
976 }
977 Event::RegisterProofStrategy {
978 dest_hash,
979 strategy,
980 signing_key,
981 } => {
982 let identity = signing_key
983 .map(|key| rns_crypto::identity::Identity::from_private_key(&key));
984 self.proof_strategies
985 .insert(dest_hash, (strategy, identity));
986 }
987 Event::ProposeDirectConnect { link_id } => {
988 let derived_key = self.link_manager.get_derived_key(&link_id);
989 if let Some(dk) = derived_key {
990 let tx = self.get_event_sender();
991 let hp_actions =
992 self.holepunch_manager
993 .propose(link_id, &dk, &mut self.rng, &tx);
994 self.dispatch_holepunch_actions(hp_actions);
995 } else {
996 log::warn!(
997 "Cannot propose direct connect: no derived key for link {:02x?}",
998 &link_id[..4]
999 );
1000 }
1001 }
1002 Event::SetDirectConnectPolicy { policy } => {
1003 self.holepunch_manager.set_policy(policy);
1004 }
1005 Event::HolePunchProbeResult {
1006 link_id,
1007 session_id,
1008 observed_addr,
1009 socket,
1010 probe_server,
1011 } => {
1012 let hp_actions = self.holepunch_manager.handle_probe_result(
1013 link_id,
1014 session_id,
1015 observed_addr,
1016 socket,
1017 probe_server,
1018 );
1019 self.dispatch_holepunch_actions(hp_actions);
1020 }
1021 Event::HolePunchProbeFailed {
1022 link_id,
1023 session_id,
1024 } => {
1025 let hp_actions = self
1026 .holepunch_manager
1027 .handle_probe_failed(link_id, session_id);
1028 self.dispatch_holepunch_actions(hp_actions);
1029 }
1030 Event::LoadHook {
1031 name,
1032 wasm_bytes,
1033 attach_point,
1034 priority,
1035 response_tx,
1036 } => {
1037 #[cfg(feature = "rns-hooks")]
1038 {
1039 let result = (|| -> Result<(), String> {
1040 let point_idx = crate::config::parse_hook_point(&attach_point)
1041 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1042 let mgr = self
1043 .hook_manager
1044 .as_ref()
1045 .ok_or_else(|| "hook manager not available".to_string())?;
1046 let program = mgr
1047 .compile(name.clone(), &wasm_bytes, priority)
1048 .map_err(|e| format!("compile error: {}", e))?;
1049 self.hook_slots[point_idx].attach(program);
1050 log::info!(
1051 "Loaded hook '{}' at point {} (priority {})",
1052 name,
1053 attach_point,
1054 priority
1055 );
1056 Ok(())
1057 })();
1058 let _ = response_tx.send(result);
1059 }
1060 #[cfg(not(feature = "rns-hooks"))]
1061 {
1062 let _ = (name, wasm_bytes, attach_point, priority);
1063 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1064 }
1065 }
1066 Event::UnloadHook {
1067 name,
1068 attach_point,
1069 response_tx,
1070 } => {
1071 #[cfg(feature = "rns-hooks")]
1072 {
1073 let result = (|| -> Result<(), String> {
1074 let point_idx = crate::config::parse_hook_point(&attach_point)
1075 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1076 match self.hook_slots[point_idx].detach(&name) {
1077 Some(_) => {
1078 log::info!(
1079 "Unloaded hook '{}' from point {}",
1080 name,
1081 attach_point
1082 );
1083 Ok(())
1084 }
1085 None => Err(format!(
1086 "hook '{}' not found at point '{}'",
1087 name, attach_point
1088 )),
1089 }
1090 })();
1091 let _ = response_tx.send(result);
1092 }
1093 #[cfg(not(feature = "rns-hooks"))]
1094 {
1095 let _ = (name, attach_point);
1096 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1097 }
1098 }
1099 Event::ReloadHook {
1100 name,
1101 attach_point,
1102 wasm_bytes,
1103 response_tx,
1104 } => {
1105 #[cfg(feature = "rns-hooks")]
1106 {
1107 let result = (|| -> Result<(), String> {
1108 let point_idx = crate::config::parse_hook_point(&attach_point)
1109 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1110 let old =
1111 self.hook_slots[point_idx].detach(&name).ok_or_else(|| {
1112 format!("hook '{}' not found at point '{}'", name, attach_point)
1113 })?;
1114 let priority = old.priority;
1115 let mgr = match self.hook_manager.as_ref() {
1116 Some(m) => m,
1117 None => {
1118 self.hook_slots[point_idx].attach(old);
1119 return Err("hook manager not available".to_string());
1120 }
1121 };
1122 match mgr.compile(name.clone(), &wasm_bytes, priority) {
1123 Ok(program) => {
1124 self.hook_slots[point_idx].attach(program);
1125 log::info!(
1126 "Reloaded hook '{}' at point {} (priority {})",
1127 name,
1128 attach_point,
1129 priority
1130 );
1131 Ok(())
1132 }
1133 Err(e) => {
1134 self.hook_slots[point_idx].attach(old);
1135 Err(format!("compile error: {}", e))
1136 }
1137 }
1138 })();
1139 let _ = response_tx.send(result);
1140 }
1141 #[cfg(not(feature = "rns-hooks"))]
1142 {
1143 let _ = (name, attach_point, wasm_bytes);
1144 let _ = response_tx.send(Err("hooks not enabled".to_string()));
1145 }
1146 }
1147 Event::ListHooks { response_tx } => {
1148 #[cfg(feature = "rns-hooks")]
1149 {
1150 let hook_point_names = [
1151 "PreIngress",
1152 "PreDispatch",
1153 "AnnounceReceived",
1154 "PathUpdated",
1155 "AnnounceRetransmit",
1156 "LinkRequestReceived",
1157 "LinkEstablished",
1158 "LinkClosed",
1159 "InterfaceUp",
1160 "InterfaceDown",
1161 "InterfaceConfigChanged",
1162 "SendOnInterface",
1163 "BroadcastOnAllInterfaces",
1164 "DeliverLocal",
1165 "TunnelSynthesize",
1166 "Tick",
1167 ];
1168 let mut infos = Vec::new();
1169 for (idx, slot) in self.hook_slots.iter().enumerate() {
1170 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
1171 for prog in &slot.programs {
1172 infos.push(crate::event::HookInfo {
1173 name: prog.name.clone(),
1174 attach_point: point_name.to_string(),
1175 priority: prog.priority,
1176 enabled: prog.enabled,
1177 consecutive_traps: prog.consecutive_traps,
1178 });
1179 }
1180 }
1181 let _ = response_tx.send(infos);
1182 }
1183 #[cfg(not(feature = "rns-hooks"))]
1184 {
1185 let _ = response_tx.send(Vec::new());
1186 }
1187 }
1188 Event::InterfaceConfigChanged(id) => {
1189 #[cfg(feature = "rns-hooks")]
1190 {
1191 let ctx = HookContext::Interface { interface_id: id.0 };
1192 let now = time::now();
1193 let engine_ref = EngineRef {
1194 engine: &self.engine,
1195 interfaces: &self.interfaces,
1196 link_manager: &self.link_manager,
1197 now,
1198 };
1199 if let Some(ref e) = run_hook_inner(
1200 &mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize]
1201 .programs,
1202 &self.hook_manager,
1203 &engine_ref,
1204 &ctx,
1205 now,
1206 ) {
1207 if !e.injected_actions.is_empty() {
1208 self.dispatch_all(convert_injected_actions(
1209 e.injected_actions.clone(),
1210 ));
1211 }
1212 }
1213 }
1214 #[cfg(not(feature = "rns-hooks"))]
1215 let _ = id;
1216 }
1217 Event::Shutdown => break,
1218 }
1219 }
1220 }
1221
1222 fn handle_query(&self, request: QueryRequest) -> QueryResponse {
1224 match request {
1225 QueryRequest::InterfaceStats => {
1226 let mut interfaces = Vec::new();
1227 let mut total_rxb: u64 = 0;
1228 let mut total_txb: u64 = 0;
1229 for entry in self.interfaces.values() {
1230 total_rxb += entry.stats.rxb;
1231 total_txb += entry.stats.txb;
1232 interfaces.push(SingleInterfaceStat {
1233 name: entry.info.name.clone(),
1234 status: entry.online,
1235 mode: entry.info.mode,
1236 rxb: entry.stats.rxb,
1237 txb: entry.stats.txb,
1238 rx_packets: entry.stats.rx_packets,
1239 tx_packets: entry.stats.tx_packets,
1240 bitrate: entry.info.bitrate,
1241 ifac_size: entry.ifac.as_ref().map(|s| s.size),
1242 started: entry.stats.started,
1243 ia_freq: entry.stats.incoming_announce_freq(),
1244 oa_freq: entry.stats.outgoing_announce_freq(),
1245 interface_type: entry.interface_type.clone(),
1246 });
1247 }
1248 interfaces.sort_by(|a, b| a.name.cmp(&b.name));
1250 QueryResponse::InterfaceStats(InterfaceStatsResponse {
1251 interfaces,
1252 transport_id: self.engine.identity_hash().copied(),
1253 transport_enabled: self.engine.transport_enabled(),
1254 transport_uptime: time::now() - self.started,
1255 total_rxb,
1256 total_txb,
1257 probe_responder: self.probe_responder_hash,
1258 })
1259 }
1260 QueryRequest::PathTable { max_hops } => {
1261 let entries: Vec<PathTableEntry> = self
1262 .engine
1263 .path_table_entries()
1264 .filter(|(_, entry)| max_hops.map_or(true, |max| entry.hops <= max))
1265 .map(|(hash, entry)| {
1266 let iface_name = self
1267 .interfaces
1268 .get(&entry.receiving_interface)
1269 .map(|e| e.info.name.clone())
1270 .or_else(|| {
1271 self.engine
1272 .interface_info(&entry.receiving_interface)
1273 .map(|i| i.name.clone())
1274 })
1275 .unwrap_or_default();
1276 PathTableEntry {
1277 hash: *hash,
1278 timestamp: entry.timestamp,
1279 via: entry.next_hop,
1280 hops: entry.hops,
1281 expires: entry.expires,
1282 interface: entry.receiving_interface,
1283 interface_name: iface_name,
1284 }
1285 })
1286 .collect();
1287 QueryResponse::PathTable(entries)
1288 }
1289 QueryRequest::RateTable => {
1290 let entries: Vec<RateTableEntry> = self
1291 .engine
1292 .rate_limiter()
1293 .entries()
1294 .map(|(hash, entry)| RateTableEntry {
1295 hash: *hash,
1296 last: entry.last,
1297 rate_violations: entry.rate_violations,
1298 blocked_until: entry.blocked_until,
1299 timestamps: entry.timestamps.clone(),
1300 })
1301 .collect();
1302 QueryResponse::RateTable(entries)
1303 }
1304 QueryRequest::NextHop { dest_hash } => {
1305 let resp = self
1306 .engine
1307 .next_hop(&dest_hash)
1308 .map(|next_hop| NextHopResponse {
1309 next_hop,
1310 hops: self.engine.hops_to(&dest_hash).unwrap_or(0),
1311 interface: self
1312 .engine
1313 .next_hop_interface(&dest_hash)
1314 .unwrap_or(InterfaceId(0)),
1315 });
1316 QueryResponse::NextHop(resp)
1317 }
1318 QueryRequest::NextHopIfName { dest_hash } => {
1319 let name = self
1320 .engine
1321 .next_hop_interface(&dest_hash)
1322 .and_then(|id| self.interfaces.get(&id))
1323 .map(|entry| entry.info.name.clone());
1324 QueryResponse::NextHopIfName(name)
1325 }
1326 QueryRequest::LinkCount => QueryResponse::LinkCount(
1327 self.engine.link_table_count() + self.link_manager.link_count(),
1328 ),
1329 QueryRequest::DropPath { .. } => {
1330 QueryResponse::DropPath(false)
1332 }
1333 QueryRequest::DropAllVia { .. } => QueryResponse::DropAllVia(0),
1334 QueryRequest::DropAnnounceQueues => QueryResponse::DropAnnounceQueues,
1335 QueryRequest::TransportIdentity => {
1336 QueryResponse::TransportIdentity(self.engine.identity_hash().copied())
1337 }
1338 QueryRequest::GetBlackholed => {
1339 let now = time::now();
1340 let entries: Vec<BlackholeInfo> = self
1341 .engine
1342 .blackholed_entries()
1343 .filter(|(_, e)| e.expires == 0.0 || e.expires > now)
1344 .map(|(hash, entry)| BlackholeInfo {
1345 identity_hash: *hash,
1346 created: entry.created,
1347 expires: entry.expires,
1348 reason: entry.reason.clone(),
1349 })
1350 .collect();
1351 QueryResponse::Blackholed(entries)
1352 }
1353 QueryRequest::BlackholeIdentity { .. } | QueryRequest::UnblackholeIdentity { .. } => {
1354 QueryResponse::BlackholeResult(false)
1356 }
1357 QueryRequest::InjectPath { .. } => {
1358 QueryResponse::InjectPath(false)
1360 }
1361 QueryRequest::InjectIdentity { .. } => {
1362 QueryResponse::InjectIdentity(false)
1364 }
1365 QueryRequest::HasPath { dest_hash } => {
1366 QueryResponse::HasPath(self.engine.has_path(&dest_hash))
1367 }
1368 QueryRequest::HopsTo { dest_hash } => {
1369 QueryResponse::HopsTo(self.engine.hops_to(&dest_hash))
1370 }
1371 QueryRequest::RecallIdentity { dest_hash } => {
1372 QueryResponse::RecallIdentity(self.known_destinations.get(&dest_hash).cloned())
1373 }
1374 QueryRequest::LocalDestinations => {
1375 let entries: Vec<LocalDestinationEntry> = self
1376 .local_destinations
1377 .iter()
1378 .map(|(hash, dest_type)| LocalDestinationEntry {
1379 hash: *hash,
1380 dest_type: *dest_type,
1381 })
1382 .collect();
1383 QueryResponse::LocalDestinations(entries)
1384 }
1385 QueryRequest::Links => QueryResponse::Links(self.link_manager.link_entries()),
1386 QueryRequest::Resources => {
1387 QueryResponse::Resources(self.link_manager.resource_entries())
1388 }
1389 QueryRequest::DiscoveredInterfaces {
1390 only_available,
1391 only_transport,
1392 } => {
1393 let mut interfaces = self.discovered_interfaces.list().unwrap_or_default();
1394 crate::discovery::filter_and_sort_interfaces(
1395 &mut interfaces,
1396 only_available,
1397 only_transport,
1398 );
1399 QueryResponse::DiscoveredInterfaces(interfaces)
1400 }
1401 QueryRequest::SendProbe { .. } => QueryResponse::SendProbe(None),
1403 QueryRequest::CheckProof { .. } => QueryResponse::CheckProof(None),
1404 }
1405 }
1406
1407 fn handle_query_mut(&mut self, request: QueryRequest) -> QueryResponse {
1409 match request {
1410 QueryRequest::BlackholeIdentity {
1411 identity_hash,
1412 duration_hours,
1413 reason,
1414 } => {
1415 let now = time::now();
1416 self.engine
1417 .blackhole_identity(identity_hash, now, duration_hours, reason);
1418 QueryResponse::BlackholeResult(true)
1419 }
1420 QueryRequest::UnblackholeIdentity { identity_hash } => {
1421 let result = self.engine.unblackhole_identity(&identity_hash);
1422 QueryResponse::UnblackholeResult(result)
1423 }
1424 QueryRequest::DropPath { dest_hash } => {
1425 QueryResponse::DropPath(self.engine.drop_path(&dest_hash))
1426 }
1427 QueryRequest::DropAllVia { transport_hash } => {
1428 QueryResponse::DropAllVia(self.engine.drop_all_via(&transport_hash))
1429 }
1430 QueryRequest::DropAnnounceQueues => {
1431 self.engine.drop_announce_queues();
1432 QueryResponse::DropAnnounceQueues
1433 }
1434 QueryRequest::InjectPath {
1435 dest_hash,
1436 next_hop,
1437 hops,
1438 expires,
1439 interface_name,
1440 packet_hash,
1441 } => {
1442 let iface_id = self
1444 .interfaces
1445 .iter()
1446 .find(|(_, entry)| entry.info.name == interface_name)
1447 .map(|(id, _)| *id);
1448 match iface_id {
1449 Some(id) => {
1450 let entry = PathEntry {
1451 timestamp: time::now(),
1452 next_hop,
1453 hops,
1454 expires,
1455 random_blobs: Vec::new(),
1456 receiving_interface: id,
1457 packet_hash,
1458 announce_raw: None,
1459 };
1460 self.engine.inject_path(dest_hash, entry);
1461 QueryResponse::InjectPath(true)
1462 }
1463 None => QueryResponse::InjectPath(false),
1464 }
1465 }
1466 QueryRequest::InjectIdentity {
1467 dest_hash,
1468 identity_hash,
1469 public_key,
1470 app_data,
1471 hops,
1472 received_at,
1473 } => {
1474 self.known_destinations.insert(
1475 dest_hash,
1476 crate::destination::AnnouncedIdentity {
1477 dest_hash: rns_core::types::DestHash(dest_hash),
1478 identity_hash: rns_core::types::IdentityHash(identity_hash),
1479 public_key,
1480 app_data,
1481 hops,
1482 received_at,
1483 receiving_interface: rns_core::transport::types::InterfaceId(0),
1484 },
1485 );
1486 QueryResponse::InjectIdentity(true)
1487 }
1488 QueryRequest::SendProbe {
1489 dest_hash,
1490 payload_size,
1491 } => {
1492 let announced = self.known_destinations.get(&dest_hash).cloned();
1494 match announced {
1495 Some(recalled) => {
1496 let remote_id =
1498 rns_crypto::identity::Identity::from_public_key(&recalled.public_key);
1499 let mut payload = vec![0u8; payload_size];
1500 self.rng.fill_bytes(&mut payload);
1501 match remote_id.encrypt(&payload, &mut self.rng) {
1502 Ok(ciphertext) => {
1503 let flags = rns_core::packet::PacketFlags {
1505 header_type: rns_core::constants::HEADER_1,
1506 context_flag: rns_core::constants::FLAG_UNSET,
1507 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1508 destination_type: rns_core::constants::DESTINATION_SINGLE,
1509 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1510 };
1511 match RawPacket::pack(
1512 flags,
1513 0,
1514 &dest_hash,
1515 None,
1516 rns_core::constants::CONTEXT_NONE,
1517 &ciphertext,
1518 ) {
1519 Ok(packet) => {
1520 let packet_hash = packet.packet_hash;
1521 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
1522 self.sent_packets
1524 .insert(packet_hash, (dest_hash, time::now()));
1525 let actions = self.engine.handle_outbound(
1527 &packet,
1528 rns_core::constants::DESTINATION_SINGLE,
1529 None,
1530 time::now(),
1531 );
1532 self.dispatch_all(actions);
1533 log::debug!(
1534 "Sent probe ({} bytes) to {:02x?}",
1535 payload_size,
1536 &dest_hash[..4],
1537 );
1538 QueryResponse::SendProbe(Some((packet_hash, hops)))
1539 }
1540 Err(_) => {
1541 log::warn!("Failed to pack probe packet");
1542 QueryResponse::SendProbe(None)
1543 }
1544 }
1545 }
1546 Err(_) => {
1547 log::warn!("Failed to encrypt probe payload");
1548 QueryResponse::SendProbe(None)
1549 }
1550 }
1551 }
1552 None => {
1553 log::debug!("No known identity for probe dest {:02x?}", &dest_hash[..4]);
1554 QueryResponse::SendProbe(None)
1555 }
1556 }
1557 }
1558 QueryRequest::CheckProof { packet_hash } => {
1559 match self.completed_proofs.remove(&packet_hash) {
1560 Some((rtt, _received)) => QueryResponse::CheckProof(Some(rtt)),
1561 None => QueryResponse::CheckProof(None),
1562 }
1563 }
1564 other => self.handle_query(other),
1565 }
1566 }
1567
1568 fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1570 let packet = match RawPacket::unpack(raw) {
1572 Ok(p) => p,
1573 Err(_) => return,
1574 };
1575
1576 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1577 Ok(validated) => {
1578 let iface_id = self
1581 .interfaces
1582 .iter()
1583 .find(|(_, entry)| entry.info.wants_tunnel && entry.online)
1584 .map(|(id, _)| *id);
1585
1586 if let Some(iface) = iface_id {
1587 let now = time::now();
1588 let tunnel_actions = self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1589 self.dispatch_all(tunnel_actions);
1590 }
1591 }
1592 Err(e) => {
1593 log::debug!("Tunnel synthesis validation failed: {}", e);
1594 }
1595 }
1596 }
1597
1598 fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1602 if let Some(ref identity) = self.transport_identity {
1603 let actions = self
1604 .engine
1605 .synthesize_tunnel(identity, interface, &mut self.rng);
1606 self.dispatch_all(actions);
1607 }
1608 }
1609
1610 fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1612 let mut data = Vec::with_capacity(48);
1614 data.extend_from_slice(&dest_hash);
1615
1616 if self.engine.transport_enabled() {
1617 if let Some(id_hash) = self.engine.identity_hash() {
1618 data.extend_from_slice(id_hash);
1619 }
1620 }
1621
1622 let mut tag = [0u8; 16];
1624 self.rng.fill_bytes(&mut tag);
1625 data.extend_from_slice(&tag);
1626
1627 let flags = rns_core::packet::PacketFlags {
1629 header_type: rns_core::constants::HEADER_1,
1630 context_flag: rns_core::constants::FLAG_UNSET,
1631 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1632 destination_type: rns_core::constants::DESTINATION_PLAIN,
1633 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1634 };
1635
1636 if let Ok(packet) = RawPacket::pack(
1637 flags,
1638 0,
1639 &self.path_request_dest,
1640 None,
1641 rns_core::constants::CONTEXT_NONE,
1642 &data,
1643 ) {
1644 let actions = self.engine.handle_outbound(
1645 &packet,
1646 rns_core::constants::DESTINATION_PLAIN,
1647 None,
1648 time::now(),
1649 );
1650 self.dispatch_all(actions);
1651 }
1652 }
1653
1654 fn maybe_generate_proof(&mut self, dest_hash: [u8; 16], packet_hash: &[u8; 32]) {
1657 use rns_core::types::ProofStrategy;
1658
1659 let (strategy, identity) = match self.proof_strategies.get(&dest_hash) {
1660 Some((s, id)) => (*s, id.as_ref()),
1661 None => return,
1662 };
1663
1664 let should_prove = match strategy {
1665 ProofStrategy::ProveAll => true,
1666 ProofStrategy::ProveApp => self.callbacks.on_proof_requested(
1667 rns_core::types::DestHash(dest_hash),
1668 rns_core::types::PacketHash(*packet_hash),
1669 ),
1670 ProofStrategy::ProveNone => false,
1671 };
1672
1673 if !should_prove {
1674 return;
1675 }
1676
1677 let identity = match identity {
1678 Some(id) => id,
1679 None => {
1680 log::warn!(
1681 "Cannot generate proof for {:02x?}: no signing key",
1682 &dest_hash[..4]
1683 );
1684 return;
1685 }
1686 };
1687
1688 let signature = match identity.sign(packet_hash) {
1690 Ok(sig) => sig,
1691 Err(e) => {
1692 log::warn!("Failed to sign proof for {:02x?}: {:?}", &dest_hash[..4], e);
1693 return;
1694 }
1695 };
1696
1697 let mut proof_data = Vec::with_capacity(96);
1699 proof_data.extend_from_slice(packet_hash);
1700 proof_data.extend_from_slice(&signature);
1701
1702 let mut proof_dest = [0u8; 16];
1708 proof_dest.copy_from_slice(&packet_hash[..16]);
1709
1710 let flags = rns_core::packet::PacketFlags {
1711 header_type: rns_core::constants::HEADER_1,
1712 context_flag: rns_core::constants::FLAG_UNSET,
1713 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1714 destination_type: rns_core::constants::DESTINATION_SINGLE,
1715 packet_type: rns_core::constants::PACKET_TYPE_PROOF,
1716 };
1717
1718 if let Ok(packet) = RawPacket::pack(
1719 flags,
1720 0,
1721 &proof_dest,
1722 None,
1723 rns_core::constants::CONTEXT_NONE,
1724 &proof_data,
1725 ) {
1726 let actions = self.engine.handle_outbound(
1727 &packet,
1728 rns_core::constants::DESTINATION_SINGLE,
1729 None,
1730 time::now(),
1731 );
1732 self.dispatch_all(actions);
1733 log::debug!(
1734 "Generated proof for packet on dest {:02x?}",
1735 &dest_hash[..4]
1736 );
1737 }
1738 }
1739
1740 fn handle_inbound_proof(
1742 &mut self,
1743 dest_hash: [u8; 16],
1744 proof_data: &[u8],
1745 _raw_packet_hash: &[u8; 32],
1746 ) {
1747 if proof_data.len() < 96 {
1749 log::debug!(
1750 "Proof too short for explicit proof: {} bytes",
1751 proof_data.len()
1752 );
1753 return;
1754 }
1755
1756 let mut tracked_hash = [0u8; 32];
1757 tracked_hash.copy_from_slice(&proof_data[..32]);
1758
1759 let signature = &proof_data[32..96];
1760
1761 if let Some((tracked_dest, sent_time)) = self.sent_packets.remove(&tracked_hash) {
1763 if let Some(announced) = self.known_destinations.get(&tracked_dest) {
1766 let identity =
1767 rns_crypto::identity::Identity::from_public_key(&announced.public_key);
1768 let mut sig = [0u8; 64];
1769 sig.copy_from_slice(signature);
1770 if !identity.verify(&sig, &tracked_hash) {
1771 log::debug!("Proof signature invalid for {:02x?}", &tracked_hash[..4],);
1772 return;
1773 }
1774 } else {
1775 log::debug!(
1776 "No known identity for dest {:02x?}, accepting proof without signature check",
1777 &tracked_dest[..4],
1778 );
1779 }
1780
1781 let now = time::now();
1782 let rtt = now - sent_time;
1783 log::debug!(
1784 "Proof received for {:02x?} rtt={:.3}s",
1785 &tracked_hash[..4],
1786 rtt,
1787 );
1788 self.completed_proofs.insert(tracked_hash, (rtt, now));
1789 self.callbacks.on_proof(
1790 rns_core::types::DestHash(tracked_dest),
1791 rns_core::types::PacketHash(tracked_hash),
1792 rtt,
1793 );
1794 } else {
1795 log::debug!(
1796 "Proof for unknown packet {:02x?} on dest {:02x?}",
1797 &tracked_hash[..4],
1798 &dest_hash[..4],
1799 );
1800 }
1801 }
1802
1803 fn dispatch_all(&mut self, actions: Vec<TransportAction>) {
1805 #[cfg(feature = "rns-hooks")]
1806 let mut hook_injected: Vec<TransportAction> = Vec::new();
1807
1808 for action in actions {
1809 match action {
1810 TransportAction::SendOnInterface { interface, raw } => {
1811 #[cfg(feature = "rns-hooks")]
1812 {
1813 let pkt_ctx = rns_hooks::PacketContext {
1814 flags: if raw.is_empty() { 0 } else { raw[0] },
1815 hops: if raw.len() > 1 { raw[1] } else { 0 },
1816 destination_hash: extract_dest_hash(&raw),
1817 context: 0,
1818 packet_hash: [0; 32],
1819 interface_id: interface.0,
1820 data_offset: 0,
1821 data_len: raw.len() as u32,
1822 };
1823 let ctx = HookContext::Packet {
1824 ctx: &pkt_ctx,
1825 raw: &raw,
1826 };
1827 let now = time::now();
1828 let engine_ref = EngineRef {
1829 engine: &self.engine,
1830 interfaces: &self.interfaces,
1831 link_manager: &self.link_manager,
1832 now,
1833 };
1834 {
1835 let exec = run_hook_inner(
1836 &mut self.hook_slots[HookPoint::SendOnInterface as usize].programs,
1837 &self.hook_manager,
1838 &engine_ref,
1839 &ctx,
1840 now,
1841 );
1842 if let Some(ref e) = exec {
1843 if !e.injected_actions.is_empty() {
1844 hook_injected.extend(convert_injected_actions(
1845 e.injected_actions.clone(),
1846 ));
1847 }
1848 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
1849 continue;
1850 }
1851 }
1852 }
1853 }
1854 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1855 if is_announce {
1856 log::debug!(
1857 "Announce:dispatching to iface {} (len={}, online={})",
1858 interface.0,
1859 raw.len(),
1860 self.interfaces
1861 .get(&interface)
1862 .map(|e| e.online)
1863 .unwrap_or(false)
1864 );
1865 }
1866 if let Some(entry) = self.interfaces.get_mut(&interface) {
1867 if entry.online {
1868 let data = if let Some(ref ifac_state) = entry.ifac {
1869 ifac::mask_outbound(&raw, ifac_state)
1870 } else {
1871 raw
1872 };
1873 entry.stats.txb += data.len() as u64;
1875 entry.stats.tx_packets += 1;
1876 if is_announce {
1877 entry.stats.record_outgoing_announce(time::now());
1878 }
1879 if let Err(e) = entry.writer.send_frame(&data) {
1880 log::warn!("[{}] send failed: {}", entry.info.id.0, e);
1881 } else if is_announce {
1882 let header_type = (data[0] >> 6) & 0x03;
1885 let dest_start = if header_type == 1 { 18usize } else { 2usize };
1886 let dest_preview = if data.len() >= dest_start + 4 {
1887 format!("{:02x?}", &data[dest_start..dest_start + 4])
1888 } else {
1889 "??".into()
1890 };
1891 log::debug!(
1892 "Announce:SENT on iface {} (len={}, h={}, dest=[{}])",
1893 interface.0,
1894 data.len(),
1895 header_type,
1896 dest_preview
1897 );
1898 }
1899 }
1900 }
1901 }
1902 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1903 #[cfg(feature = "rns-hooks")]
1904 {
1905 let pkt_ctx = rns_hooks::PacketContext {
1906 flags: if raw.is_empty() { 0 } else { raw[0] },
1907 hops: if raw.len() > 1 { raw[1] } else { 0 },
1908 destination_hash: extract_dest_hash(&raw),
1909 context: 0,
1910 packet_hash: [0; 32],
1911 interface_id: 0,
1912 data_offset: 0,
1913 data_len: raw.len() as u32,
1914 };
1915 let ctx = HookContext::Packet {
1916 ctx: &pkt_ctx,
1917 raw: &raw,
1918 };
1919 let now = time::now();
1920 let engine_ref = EngineRef {
1921 engine: &self.engine,
1922 interfaces: &self.interfaces,
1923 link_manager: &self.link_manager,
1924 now,
1925 };
1926 {
1927 let exec = run_hook_inner(
1928 &mut self.hook_slots[HookPoint::BroadcastOnAllInterfaces as usize]
1929 .programs,
1930 &self.hook_manager,
1931 &engine_ref,
1932 &ctx,
1933 now,
1934 );
1935 if let Some(ref e) = exec {
1936 if !e.injected_actions.is_empty() {
1937 hook_injected.extend(convert_injected_actions(
1938 e.injected_actions.clone(),
1939 ));
1940 }
1941 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
1942 continue;
1943 }
1944 }
1945 }
1946 }
1947 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1948 for entry in self.interfaces.values_mut() {
1949 if entry.online && Some(entry.id) != exclude {
1950 let data = if let Some(ref ifac_state) = entry.ifac {
1951 ifac::mask_outbound(&raw, ifac_state)
1952 } else {
1953 raw.clone()
1954 };
1955 entry.stats.txb += data.len() as u64;
1957 entry.stats.tx_packets += 1;
1958 if is_announce {
1959 entry.stats.record_outgoing_announce(time::now());
1960 }
1961 if let Err(e) = entry.writer.send_frame(&data) {
1962 log::warn!("[{}] broadcast failed: {}", entry.info.id.0, e);
1963 }
1964 }
1965 }
1966 }
1967 TransportAction::DeliverLocal {
1968 destination_hash,
1969 raw,
1970 packet_hash,
1971 receiving_interface,
1972 } => {
1973 #[cfg(feature = "rns-hooks")]
1974 {
1975 let pkt_ctx = rns_hooks::PacketContext {
1976 flags: 0,
1977 hops: 0,
1978 destination_hash,
1979 context: 0,
1980 packet_hash,
1981 interface_id: receiving_interface.0,
1982 data_offset: 0,
1983 data_len: raw.len() as u32,
1984 };
1985 let ctx = HookContext::Packet {
1986 ctx: &pkt_ctx,
1987 raw: &raw,
1988 };
1989 let now = time::now();
1990 let engine_ref = EngineRef {
1991 engine: &self.engine,
1992 interfaces: &self.interfaces,
1993 link_manager: &self.link_manager,
1994 now,
1995 };
1996 {
1997 let exec = run_hook_inner(
1998 &mut self.hook_slots[HookPoint::DeliverLocal as usize].programs,
1999 &self.hook_manager,
2000 &engine_ref,
2001 &ctx,
2002 now,
2003 );
2004 if let Some(ref e) = exec {
2005 if !e.injected_actions.is_empty() {
2006 hook_injected.extend(convert_injected_actions(
2007 e.injected_actions.clone(),
2008 ));
2009 }
2010 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
2011 continue;
2012 }
2013 }
2014 }
2015 }
2016 if destination_hash == self.tunnel_synth_dest {
2017 self.handle_tunnel_synth_delivery(&raw);
2019 } else if destination_hash == self.path_request_dest {
2020 if let Ok(packet) = RawPacket::unpack(&raw) {
2022 let actions = self.engine.handle_path_request(
2023 &packet.data,
2024 InterfaceId(0), time::now(),
2026 );
2027 self.dispatch_all(actions);
2028 }
2029 } else if self.link_manager.is_link_destination(&destination_hash) {
2030 let link_actions = self.link_manager.handle_local_delivery(
2032 destination_hash,
2033 &raw,
2034 packet_hash,
2035 receiving_interface,
2036 &mut self.rng,
2037 );
2038 if link_actions.is_empty() {
2039 if let Ok(packet) = RawPacket::unpack(&raw) {
2043 if packet.flags.packet_type
2044 == rns_core::constants::PACKET_TYPE_PROOF
2045 {
2046 self.handle_inbound_proof(
2047 destination_hash,
2048 &packet.data,
2049 &packet_hash,
2050 );
2051 continue;
2052 }
2053 }
2054 self.maybe_generate_proof(destination_hash, &packet_hash);
2055 self.callbacks.on_local_delivery(
2056 rns_core::types::DestHash(destination_hash),
2057 raw,
2058 rns_core::types::PacketHash(packet_hash),
2059 );
2060 } else {
2061 self.dispatch_link_actions(link_actions);
2062 }
2063 } else {
2064 if let Ok(packet) = RawPacket::unpack(&raw) {
2066 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
2067 self.handle_inbound_proof(
2068 destination_hash,
2069 &packet.data,
2070 &packet_hash,
2071 );
2072 continue;
2073 }
2074 }
2075
2076 self.maybe_generate_proof(destination_hash, &packet_hash);
2078
2079 self.callbacks.on_local_delivery(
2080 rns_core::types::DestHash(destination_hash),
2081 raw,
2082 rns_core::types::PacketHash(packet_hash),
2083 );
2084 }
2085 }
2086 TransportAction::AnnounceReceived {
2087 destination_hash,
2088 identity_hash,
2089 public_key,
2090 name_hash,
2091 app_data,
2092 hops,
2093 receiving_interface,
2094 ..
2095 } => {
2096 #[cfg(feature = "rns-hooks")]
2097 {
2098 let ctx = HookContext::Announce {
2099 destination_hash,
2100 hops,
2101 interface_id: receiving_interface.0,
2102 };
2103 let now = time::now();
2104 let engine_ref = EngineRef {
2105 engine: &self.engine,
2106 interfaces: &self.interfaces,
2107 link_manager: &self.link_manager,
2108 now,
2109 };
2110 {
2111 let exec = run_hook_inner(
2112 &mut self.hook_slots[HookPoint::AnnounceReceived as usize].programs,
2113 &self.hook_manager,
2114 &engine_ref,
2115 &ctx,
2116 now,
2117 );
2118 if let Some(ref e) = exec {
2119 if !e.injected_actions.is_empty() {
2120 hook_injected.extend(convert_injected_actions(
2121 e.injected_actions.clone(),
2122 ));
2123 }
2124 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
2125 continue;
2126 }
2127 }
2128 }
2129 }
2130
2131 if name_hash == self.discovery_name_hash {
2135 if self.discover_interfaces {
2136 if let Some(ref app_data) = app_data {
2137 if let Some(mut discovered) =
2138 crate::discovery::parse_interface_announce(
2139 app_data,
2140 &identity_hash,
2141 hops,
2142 self.discovery_required_value,
2143 )
2144 {
2145 if let Ok(Some(existing)) =
2147 self.discovered_interfaces.load(&discovered.discovery_hash)
2148 {
2149 discovered.discovered = existing.discovered;
2150 discovered.heard_count = existing.heard_count + 1;
2151 }
2152 if let Err(e) = self.discovered_interfaces.store(&discovered) {
2153 log::warn!("Failed to store discovered interface: {}", e);
2154 } else {
2155 log::debug!(
2156 "Discovered interface '{}' ({}) at {}:{} [stamp={}]",
2157 discovered.name,
2158 discovered.interface_type,
2159 discovered.reachable_on.as_deref().unwrap_or("?"),
2160 discovered
2161 .port
2162 .map(|p| p.to_string())
2163 .unwrap_or_else(|| "?".into()),
2164 discovered.stamp_value,
2165 );
2166 }
2167 }
2168 }
2169 }
2170 }
2172
2173 let announced = crate::destination::AnnouncedIdentity {
2175 dest_hash: rns_core::types::DestHash(destination_hash),
2176 identity_hash: rns_core::types::IdentityHash(identity_hash),
2177 public_key,
2178 app_data: app_data.clone(),
2179 hops,
2180 received_at: time::now(),
2181 receiving_interface,
2182 };
2183 self.known_destinations
2184 .insert(destination_hash, announced.clone());
2185 log::info!(
2186 "Announce:validated dest={:02x}{:02x}{:02x}{:02x}.. hops={}",
2187 destination_hash[0],
2188 destination_hash[1],
2189 destination_hash[2],
2190 destination_hash[3],
2191 hops,
2192 );
2193 self.callbacks.on_announce(announced);
2194 }
2195 TransportAction::PathUpdated {
2196 destination_hash,
2197 hops,
2198 interface,
2199 ..
2200 } => {
2201 #[cfg(feature = "rns-hooks")]
2202 {
2203 let ctx = HookContext::Announce {
2204 destination_hash,
2205 hops,
2206 interface_id: interface.0,
2207 };
2208 let now = time::now();
2209 let engine_ref = EngineRef {
2210 engine: &self.engine,
2211 interfaces: &self.interfaces,
2212 link_manager: &self.link_manager,
2213 now,
2214 };
2215 if let Some(ref e) = run_hook_inner(
2216 &mut self.hook_slots[HookPoint::PathUpdated as usize].programs,
2217 &self.hook_manager,
2218 &engine_ref,
2219 &ctx,
2220 now,
2221 ) {
2222 if !e.injected_actions.is_empty() {
2223 hook_injected
2224 .extend(convert_injected_actions(e.injected_actions.clone()));
2225 }
2226 }
2227 }
2228 #[cfg(not(feature = "rns-hooks"))]
2229 let _ = interface;
2230
2231 self.callbacks
2232 .on_path_updated(rns_core::types::DestHash(destination_hash), hops);
2233 }
2234 TransportAction::ForwardToLocalClients { raw, exclude } => {
2235 for entry in self.interfaces.values_mut() {
2236 if entry.online && entry.info.is_local_client && Some(entry.id) != exclude {
2237 let data = if let Some(ref ifac_state) = entry.ifac {
2238 ifac::mask_outbound(&raw, ifac_state)
2239 } else {
2240 raw.clone()
2241 };
2242 entry.stats.txb += data.len() as u64;
2243 entry.stats.tx_packets += 1;
2244 if let Err(e) = entry.writer.send_frame(&data) {
2245 log::warn!(
2246 "[{}] forward to local client failed: {}",
2247 entry.info.id.0,
2248 e
2249 );
2250 }
2251 }
2252 }
2253 }
2254 TransportAction::ForwardPlainBroadcast {
2255 raw,
2256 to_local,
2257 exclude,
2258 } => {
2259 for entry in self.interfaces.values_mut() {
2260 if entry.online
2261 && entry.info.is_local_client == to_local
2262 && Some(entry.id) != exclude
2263 {
2264 let data = if let Some(ref ifac_state) = entry.ifac {
2265 ifac::mask_outbound(&raw, ifac_state)
2266 } else {
2267 raw.clone()
2268 };
2269 entry.stats.txb += data.len() as u64;
2270 entry.stats.tx_packets += 1;
2271 if let Err(e) = entry.writer.send_frame(&data) {
2272 log::warn!(
2273 "[{}] forward plain broadcast failed: {}",
2274 entry.info.id.0,
2275 e
2276 );
2277 }
2278 }
2279 }
2280 }
2281 TransportAction::CacheAnnounce { packet_hash, raw } => {
2282 if let Some(ref cache) = self.announce_cache {
2283 if let Err(e) = cache.store(&packet_hash, &raw, None) {
2284 log::warn!("Failed to cache announce: {}", e);
2285 }
2286 }
2287 }
2288 TransportAction::TunnelSynthesize {
2289 interface,
2290 data,
2291 dest_hash,
2292 } => {
2293 #[cfg(feature = "rns-hooks")]
2294 {
2295 let pkt_ctx = rns_hooks::PacketContext {
2296 flags: 0,
2297 hops: 0,
2298 destination_hash: dest_hash,
2299 context: 0,
2300 packet_hash: [0; 32],
2301 interface_id: interface.0,
2302 data_offset: 0,
2303 data_len: data.len() as u32,
2304 };
2305 let ctx = HookContext::Packet {
2306 ctx: &pkt_ctx,
2307 raw: &data,
2308 };
2309 let now = time::now();
2310 let engine_ref = EngineRef {
2311 engine: &self.engine,
2312 interfaces: &self.interfaces,
2313 link_manager: &self.link_manager,
2314 now,
2315 };
2316 {
2317 let exec = run_hook_inner(
2318 &mut self.hook_slots[HookPoint::TunnelSynthesize as usize].programs,
2319 &self.hook_manager,
2320 &engine_ref,
2321 &ctx,
2322 now,
2323 );
2324 if let Some(ref e) = exec {
2325 if !e.injected_actions.is_empty() {
2326 hook_injected.extend(convert_injected_actions(
2327 e.injected_actions.clone(),
2328 ));
2329 }
2330 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
2331 continue;
2332 }
2333 }
2334 }
2335 }
2336 let flags = rns_core::packet::PacketFlags {
2338 header_type: rns_core::constants::HEADER_1,
2339 context_flag: rns_core::constants::FLAG_UNSET,
2340 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
2341 destination_type: rns_core::constants::DESTINATION_PLAIN,
2342 packet_type: rns_core::constants::PACKET_TYPE_DATA,
2343 };
2344 if let Ok(packet) = rns_core::packet::RawPacket::pack(
2345 flags,
2346 0,
2347 &dest_hash,
2348 None,
2349 rns_core::constants::CONTEXT_NONE,
2350 &data,
2351 ) {
2352 if let Some(entry) = self.interfaces.get_mut(&interface) {
2353 if entry.online {
2354 let raw = if let Some(ref ifac_state) = entry.ifac {
2355 ifac::mask_outbound(&packet.raw, ifac_state)
2356 } else {
2357 packet.raw
2358 };
2359 entry.stats.txb += raw.len() as u64;
2360 entry.stats.tx_packets += 1;
2361 if let Err(e) = entry.writer.send_frame(&raw) {
2362 log::warn!(
2363 "[{}] tunnel synthesize send failed: {}",
2364 entry.info.id.0,
2365 e
2366 );
2367 }
2368 }
2369 }
2370 }
2371 }
2372 TransportAction::TunnelEstablished {
2373 tunnel_id,
2374 interface,
2375 } => {
2376 log::info!(
2377 "Tunnel established: {:02x?} on interface {}",
2378 &tunnel_id[..4],
2379 interface.0
2380 );
2381 }
2382 TransportAction::AnnounceRetransmit {
2383 destination_hash,
2384 hops,
2385 interface,
2386 } => {
2387 #[cfg(feature = "rns-hooks")]
2388 {
2389 let ctx = HookContext::Announce {
2390 destination_hash,
2391 hops,
2392 interface_id: interface.map(|i| i.0).unwrap_or(0),
2393 };
2394 let now = time::now();
2395 let engine_ref = EngineRef {
2396 engine: &self.engine,
2397 interfaces: &self.interfaces,
2398 link_manager: &self.link_manager,
2399 now,
2400 };
2401 if let Some(ref e) = run_hook_inner(
2402 &mut self.hook_slots[HookPoint::AnnounceRetransmit as usize].programs,
2403 &self.hook_manager,
2404 &engine_ref,
2405 &ctx,
2406 now,
2407 ) {
2408 if !e.injected_actions.is_empty() {
2409 hook_injected
2410 .extend(convert_injected_actions(e.injected_actions.clone()));
2411 }
2412 }
2413 }
2414 #[cfg(not(feature = "rns-hooks"))]
2415 {
2416 let _ = (destination_hash, hops, interface);
2417 }
2418 }
2419 TransportAction::LinkRequestReceived {
2420 link_id,
2421 destination_hash: _,
2422 receiving_interface,
2423 } => {
2424 #[cfg(feature = "rns-hooks")]
2425 {
2426 let ctx = HookContext::Link {
2427 link_id,
2428 interface_id: receiving_interface.0,
2429 };
2430 let now = time::now();
2431 let engine_ref = EngineRef {
2432 engine: &self.engine,
2433 interfaces: &self.interfaces,
2434 link_manager: &self.link_manager,
2435 now,
2436 };
2437 if let Some(ref e) = run_hook_inner(
2438 &mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs,
2439 &self.hook_manager,
2440 &engine_ref,
2441 &ctx,
2442 now,
2443 ) {
2444 if !e.injected_actions.is_empty() {
2445 hook_injected
2446 .extend(convert_injected_actions(e.injected_actions.clone()));
2447 }
2448 }
2449 }
2450 #[cfg(not(feature = "rns-hooks"))]
2451 {
2452 let _ = (link_id, receiving_interface);
2453 }
2454 }
2455 TransportAction::LinkEstablished { link_id, interface } => {
2456 #[cfg(feature = "rns-hooks")]
2457 {
2458 let ctx = HookContext::Link {
2459 link_id,
2460 interface_id: interface.0,
2461 };
2462 let now = time::now();
2463 let engine_ref = EngineRef {
2464 engine: &self.engine,
2465 interfaces: &self.interfaces,
2466 link_manager: &self.link_manager,
2467 now,
2468 };
2469 if let Some(ref e) = run_hook_inner(
2470 &mut self.hook_slots[HookPoint::LinkEstablished as usize].programs,
2471 &self.hook_manager,
2472 &engine_ref,
2473 &ctx,
2474 now,
2475 ) {
2476 if !e.injected_actions.is_empty() {
2477 hook_injected
2478 .extend(convert_injected_actions(e.injected_actions.clone()));
2479 }
2480 }
2481 }
2482 #[cfg(not(feature = "rns-hooks"))]
2483 {
2484 let _ = (link_id, interface);
2485 }
2486 }
2487 TransportAction::LinkClosed { link_id } => {
2488 #[cfg(feature = "rns-hooks")]
2489 {
2490 let ctx = HookContext::Link {
2491 link_id,
2492 interface_id: 0,
2493 };
2494 let now = time::now();
2495 let engine_ref = EngineRef {
2496 engine: &self.engine,
2497 interfaces: &self.interfaces,
2498 link_manager: &self.link_manager,
2499 now,
2500 };
2501 if let Some(ref e) = run_hook_inner(
2502 &mut self.hook_slots[HookPoint::LinkClosed as usize].programs,
2503 &self.hook_manager,
2504 &engine_ref,
2505 &ctx,
2506 now,
2507 ) {
2508 if !e.injected_actions.is_empty() {
2509 hook_injected
2510 .extend(convert_injected_actions(e.injected_actions.clone()));
2511 }
2512 }
2513 }
2514 #[cfg(not(feature = "rns-hooks"))]
2515 {
2516 let _ = link_id;
2517 }
2518 }
2519 }
2520 }
2521
2522 #[cfg(feature = "rns-hooks")]
2524 if !hook_injected.is_empty() {
2525 self.dispatch_all(hook_injected);
2526 }
2527 }
2528
2529 fn dispatch_link_actions(&mut self, actions: Vec<LinkManagerAction>) {
2531 #[cfg(feature = "rns-hooks")]
2532 let mut hook_injected: Vec<TransportAction> = Vec::new();
2533
2534 for action in actions {
2535 match action {
2536 LinkManagerAction::SendPacket {
2537 raw,
2538 dest_type,
2539 attached_interface,
2540 } => {
2541 match RawPacket::unpack(&raw) {
2543 Ok(packet) => {
2544 let transport_actions = self.engine.handle_outbound(
2545 &packet,
2546 dest_type,
2547 attached_interface,
2548 time::now(),
2549 );
2550 self.dispatch_all(transport_actions);
2551 }
2552 Err(e) => {
2553 log::warn!("LinkManager SendPacket: failed to unpack: {:?}", e);
2554 }
2555 }
2556 }
2557 LinkManagerAction::LinkEstablished {
2558 link_id,
2559 dest_hash,
2560 rtt,
2561 is_initiator,
2562 } => {
2563 #[cfg(feature = "rns-hooks")]
2564 {
2565 let ctx = HookContext::Link {
2566 link_id,
2567 interface_id: 0,
2568 };
2569 let now = time::now();
2570 let engine_ref = EngineRef {
2571 engine: &self.engine,
2572 interfaces: &self.interfaces,
2573 link_manager: &self.link_manager,
2574 now,
2575 };
2576 if let Some(ref e) = run_hook_inner(
2577 &mut self.hook_slots[HookPoint::LinkEstablished as usize].programs,
2578 &self.hook_manager,
2579 &engine_ref,
2580 &ctx,
2581 now,
2582 ) {
2583 if !e.injected_actions.is_empty() {
2584 hook_injected
2585 .extend(convert_injected_actions(e.injected_actions.clone()));
2586 }
2587 }
2588 }
2589 log::info!(
2590 "Link established: {:02x?} rtt={:.3}s initiator={}",
2591 &link_id[..4],
2592 rtt,
2593 is_initiator,
2594 );
2595 self.callbacks.on_link_established(
2596 rns_core::types::LinkId(link_id),
2597 rns_core::types::DestHash(dest_hash),
2598 rtt,
2599 is_initiator,
2600 );
2601 }
2602 LinkManagerAction::LinkClosed { link_id, reason } => {
2603 #[cfg(feature = "rns-hooks")]
2604 {
2605 let ctx = HookContext::Link {
2606 link_id,
2607 interface_id: 0,
2608 };
2609 let now = time::now();
2610 let engine_ref = EngineRef {
2611 engine: &self.engine,
2612 interfaces: &self.interfaces,
2613 link_manager: &self.link_manager,
2614 now,
2615 };
2616 if let Some(ref e) = run_hook_inner(
2617 &mut self.hook_slots[HookPoint::LinkClosed as usize].programs,
2618 &self.hook_manager,
2619 &engine_ref,
2620 &ctx,
2621 now,
2622 ) {
2623 if !e.injected_actions.is_empty() {
2624 hook_injected
2625 .extend(convert_injected_actions(e.injected_actions.clone()));
2626 }
2627 }
2628 }
2629 log::info!("Link closed: {:02x?} reason={:?}", &link_id[..4], reason);
2630 self.holepunch_manager.link_closed(&link_id);
2631 self.callbacks
2632 .on_link_closed(rns_core::types::LinkId(link_id), reason);
2633 }
2634 LinkManagerAction::RemoteIdentified {
2635 link_id,
2636 identity_hash,
2637 public_key,
2638 } => {
2639 log::debug!(
2640 "Remote identified on link {:02x?}: {:02x?}",
2641 &link_id[..4],
2642 &identity_hash[..4],
2643 );
2644 self.callbacks.on_remote_identified(
2645 rns_core::types::LinkId(link_id),
2646 rns_core::types::IdentityHash(identity_hash),
2647 public_key,
2648 );
2649 }
2650 LinkManagerAction::RegisterLinkDest { link_id } => {
2651 self.engine
2653 .register_destination(link_id, rns_core::constants::DESTINATION_LINK);
2654 }
2655 LinkManagerAction::DeregisterLinkDest { link_id } => {
2656 self.engine.deregister_destination(&link_id);
2657 }
2658 LinkManagerAction::ManagementRequest {
2659 link_id,
2660 path_hash,
2661 data,
2662 request_id,
2663 remote_identity,
2664 } => {
2665 self.handle_management_request(
2666 link_id,
2667 path_hash,
2668 data,
2669 request_id,
2670 remote_identity,
2671 );
2672 }
2673 LinkManagerAction::ResourceReceived {
2674 link_id,
2675 data,
2676 metadata,
2677 } => {
2678 self.callbacks.on_resource_received(
2679 rns_core::types::LinkId(link_id),
2680 data,
2681 metadata,
2682 );
2683 }
2684 LinkManagerAction::ResourceCompleted { link_id } => {
2685 self.callbacks
2686 .on_resource_completed(rns_core::types::LinkId(link_id));
2687 }
2688 LinkManagerAction::ResourceFailed { link_id, error } => {
2689 log::debug!("Resource failed on link {:02x?}: {}", &link_id[..4], error);
2690 self.callbacks
2691 .on_resource_failed(rns_core::types::LinkId(link_id), error);
2692 }
2693 LinkManagerAction::ResourceProgress {
2694 link_id,
2695 received,
2696 total,
2697 } => {
2698 self.callbacks.on_resource_progress(
2699 rns_core::types::LinkId(link_id),
2700 received,
2701 total,
2702 );
2703 }
2704 LinkManagerAction::ResourceAcceptQuery {
2705 link_id,
2706 resource_hash,
2707 transfer_size,
2708 has_metadata,
2709 } => {
2710 let accept = self.callbacks.on_resource_accept_query(
2711 rns_core::types::LinkId(link_id),
2712 resource_hash.clone(),
2713 transfer_size,
2714 has_metadata,
2715 );
2716 let accept_actions = self.link_manager.accept_resource(
2717 &link_id,
2718 &resource_hash,
2719 accept,
2720 &mut self.rng,
2721 );
2722 self.dispatch_link_actions(accept_actions);
2724 }
2725 LinkManagerAction::ChannelMessageReceived {
2726 link_id,
2727 msgtype,
2728 payload,
2729 } => {
2730 if HolePunchManager::is_holepunch_message(msgtype) {
2732 let derived_key = self.link_manager.get_derived_key(&link_id);
2733 let tx = self.get_event_sender();
2734 let (handled, hp_actions) = self.holepunch_manager.handle_signal(
2735 link_id,
2736 msgtype,
2737 payload,
2738 derived_key.as_deref(),
2739 &tx,
2740 );
2741 if handled {
2742 self.dispatch_holepunch_actions(hp_actions);
2743 }
2744 } else {
2745 self.callbacks.on_channel_message(
2746 rns_core::types::LinkId(link_id),
2747 msgtype,
2748 payload,
2749 );
2750 }
2751 }
2752 LinkManagerAction::LinkDataReceived {
2753 link_id,
2754 context,
2755 data,
2756 } => {
2757 self.callbacks
2758 .on_link_data(rns_core::types::LinkId(link_id), context, data);
2759 }
2760 LinkManagerAction::ResponseReceived {
2761 link_id,
2762 request_id,
2763 data,
2764 } => {
2765 self.callbacks
2766 .on_response(rns_core::types::LinkId(link_id), request_id, data);
2767 }
2768 LinkManagerAction::LinkRequestReceived {
2769 link_id,
2770 receiving_interface,
2771 } => {
2772 #[cfg(feature = "rns-hooks")]
2773 {
2774 let ctx = HookContext::Link {
2775 link_id,
2776 interface_id: receiving_interface.0,
2777 };
2778 let now = time::now();
2779 let engine_ref = EngineRef {
2780 engine: &self.engine,
2781 interfaces: &self.interfaces,
2782 link_manager: &self.link_manager,
2783 now,
2784 };
2785 if let Some(ref e) = run_hook_inner(
2786 &mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs,
2787 &self.hook_manager,
2788 &engine_ref,
2789 &ctx,
2790 now,
2791 ) {
2792 if !e.injected_actions.is_empty() {
2793 hook_injected
2794 .extend(convert_injected_actions(e.injected_actions.clone()));
2795 }
2796 }
2797 }
2798 #[cfg(not(feature = "rns-hooks"))]
2799 {
2800 let _ = (link_id, receiving_interface);
2801 }
2802 }
2803 }
2804 }
2805
2806 #[cfg(feature = "rns-hooks")]
2808 if !hook_injected.is_empty() {
2809 self.dispatch_all(hook_injected);
2810 }
2811 }
2812
2813 fn dispatch_holepunch_actions(&mut self, actions: Vec<HolePunchManagerAction>) {
2815 for action in actions {
2816 match action {
2817 HolePunchManagerAction::SendChannelMessage {
2818 link_id,
2819 msgtype,
2820 payload,
2821 } => {
2822 let link_actions = self.link_manager.send_channel_message(
2823 &link_id,
2824 msgtype,
2825 &payload,
2826 &mut self.rng,
2827 );
2828 self.dispatch_link_actions(link_actions);
2829 }
2830 HolePunchManagerAction::DirectConnectEstablished {
2831 link_id,
2832 session_id,
2833 interface_id,
2834 rtt,
2835 mtu,
2836 } => {
2837 log::info!(
2838 "Direct connection established for link {:02x?} session {:02x?} iface {} rtt={:.1}ms mtu={}",
2839 &link_id[..4], &session_id[..4], interface_id.0, rtt * 1000.0, mtu
2840 );
2841 self.engine
2843 .redirect_path(&link_id, interface_id, time::now());
2844 self.link_manager.set_link_rtt(&link_id, rtt);
2846 self.link_manager.set_link_mtu(&link_id, mtu);
2847 self.link_manager.record_link_inbound(&link_id);
2850 self.link_manager.flush_channel_tx(&link_id);
2852 self.callbacks.on_direct_connect_established(
2853 rns_core::types::LinkId(link_id),
2854 interface_id,
2855 );
2856 }
2857 HolePunchManagerAction::DirectConnectFailed {
2858 link_id,
2859 session_id,
2860 reason,
2861 } => {
2862 log::debug!(
2863 "Direct connection failed for link {:02x?} session {:02x?} reason={}",
2864 &link_id[..4],
2865 &session_id[..4],
2866 reason
2867 );
2868 self.callbacks
2869 .on_direct_connect_failed(rns_core::types::LinkId(link_id), reason);
2870 }
2871 }
2872 }
2873 }
2874
2875 fn get_event_sender(&self) -> crate::event::EventSender {
2880 self.event_tx.clone()
2884 }
2885
2886 const MANAGEMENT_ANNOUNCE_INTERVAL: f64 = 300.0;
2888
2889 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
2891
2892 fn tick_discovery_announcer(&mut self, now: f64) {
2894 let announcer = match self.interface_announcer.as_mut() {
2895 Some(a) => a,
2896 None => return,
2897 };
2898
2899 announcer.maybe_start(now);
2900
2901 let stamp_result = match announcer.poll_ready() {
2902 Some(r) => r,
2903 None => return,
2904 };
2905
2906 let identity = match self.transport_identity.as_ref() {
2907 Some(id) => id,
2908 None => {
2909 log::warn!("Discovery: stamp ready but no transport identity");
2910 return;
2911 }
2912 };
2913
2914 let identity_hash = identity.hash();
2916 let disc_dest = rns_core::destination::destination_hash(
2917 crate::discovery::APP_NAME,
2918 &["discovery", "interface"],
2919 Some(&identity_hash),
2920 );
2921 let name_hash = self.discovery_name_hash;
2922 let mut random_hash = [0u8; 10];
2923 self.rng.fill_bytes(&mut random_hash);
2924
2925 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
2926 identity,
2927 &disc_dest,
2928 &name_hash,
2929 &random_hash,
2930 None,
2931 Some(&stamp_result.app_data),
2932 ) {
2933 Ok(v) => v,
2934 Err(e) => {
2935 log::warn!("Discovery: failed to pack announce: {}", e);
2936 return;
2937 }
2938 };
2939
2940 let flags = rns_core::packet::PacketFlags {
2941 header_type: rns_core::constants::HEADER_1,
2942 context_flag: rns_core::constants::FLAG_UNSET,
2943 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
2944 destination_type: rns_core::constants::DESTINATION_SINGLE,
2945 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
2946 };
2947
2948 let packet = match RawPacket::pack(
2949 flags,
2950 0,
2951 &disc_dest,
2952 None,
2953 rns_core::constants::CONTEXT_NONE,
2954 &announce_data,
2955 ) {
2956 Ok(p) => p,
2957 Err(e) => {
2958 log::warn!("Discovery: failed to pack packet: {}", e);
2959 return;
2960 }
2961 };
2962
2963 let outbound_actions = self.engine.handle_outbound(
2964 &packet,
2965 rns_core::constants::DESTINATION_SINGLE,
2966 None,
2967 now,
2968 );
2969 log::debug!(
2970 "Discovery announce sent for interface #{} ({} actions, dest={:02x?})",
2971 stamp_result.index,
2972 outbound_actions.len(),
2973 &disc_dest[..4],
2974 );
2975 self.dispatch_all(outbound_actions);
2976 }
2977
2978 fn tick_management_announces(&mut self, now: f64) {
2980 if self.transport_identity.is_none() {
2981 return;
2982 }
2983
2984 let uptime = now - self.started;
2985
2986 if !self.initial_announce_sent {
2988 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
2989 return;
2990 }
2991 self.initial_announce_sent = true;
2992 self.emit_management_announces(now);
2993 return;
2994 }
2995
2996 if now - self.last_management_announce >= Self::MANAGEMENT_ANNOUNCE_INTERVAL {
2998 self.emit_management_announces(now);
2999 }
3000 }
3001
3002 fn emit_management_announces(&mut self, now: f64) {
3004 use crate::management;
3005
3006 self.last_management_announce = now;
3007
3008 let identity = match self.transport_identity {
3009 Some(ref id) => id,
3010 None => return,
3011 };
3012
3013 let mgmt_raw = if self.management_config.enable_remote_management {
3015 management::build_management_announce(identity, &mut self.rng)
3016 } else {
3017 None
3018 };
3019
3020 let bh_raw = if self.management_config.publish_blackhole {
3021 management::build_blackhole_announce(identity, &mut self.rng)
3022 } else {
3023 None
3024 };
3025
3026 let probe_raw = if self.probe_responder_hash.is_some() {
3027 management::build_probe_announce(identity, &mut self.rng)
3028 } else {
3029 None
3030 };
3031
3032 if let Some(raw) = mgmt_raw {
3033 if let Ok(packet) = RawPacket::unpack(&raw) {
3034 let actions = self.engine.handle_outbound(
3035 &packet,
3036 rns_core::constants::DESTINATION_SINGLE,
3037 None,
3038 now,
3039 );
3040 self.dispatch_all(actions);
3041 log::debug!("Emitted management destination announce");
3042 }
3043 }
3044
3045 if let Some(raw) = bh_raw {
3046 if let Ok(packet) = RawPacket::unpack(&raw) {
3047 let actions = self.engine.handle_outbound(
3048 &packet,
3049 rns_core::constants::DESTINATION_SINGLE,
3050 None,
3051 now,
3052 );
3053 self.dispatch_all(actions);
3054 log::debug!("Emitted blackhole info announce");
3055 }
3056 }
3057
3058 if let Some(raw) = probe_raw {
3059 if let Ok(packet) = RawPacket::unpack(&raw) {
3060 let actions = self.engine.handle_outbound(
3061 &packet,
3062 rns_core::constants::DESTINATION_SINGLE,
3063 None,
3064 now,
3065 );
3066 self.dispatch_all(actions);
3067 log::debug!("Emitted probe responder announce");
3068 }
3069 }
3070 }
3071
3072 fn handle_management_request(
3074 &mut self,
3075 link_id: [u8; 16],
3076 path_hash: [u8; 16],
3077 data: Vec<u8>,
3078 request_id: [u8; 16],
3079 remote_identity: Option<([u8; 16], [u8; 64])>,
3080 ) {
3081 use crate::management;
3082
3083 let is_restricted = path_hash == management::status_path_hash()
3085 || path_hash == management::path_path_hash();
3086
3087 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
3088 match remote_identity {
3089 Some((identity_hash, _)) => {
3090 if !self
3091 .management_config
3092 .remote_management_allowed
3093 .contains(&identity_hash)
3094 {
3095 log::debug!("Management request denied: identity not in allowed list");
3096 return;
3097 }
3098 }
3099 None => {
3100 log::debug!("Management request denied: peer not identified");
3101 return;
3102 }
3103 }
3104 }
3105
3106 let response_data = if path_hash == management::status_path_hash() {
3107 {
3108 let views: Vec<&dyn management::InterfaceStatusView> = self
3109 .interfaces
3110 .values()
3111 .map(|e| e as &dyn management::InterfaceStatusView)
3112 .collect();
3113 management::handle_status_request(
3114 &data,
3115 &self.engine,
3116 &views,
3117 self.started,
3118 self.probe_responder_hash,
3119 )
3120 }
3121 } else if path_hash == management::path_path_hash() {
3122 management::handle_path_request(&data, &self.engine)
3123 } else if path_hash == management::list_path_hash() {
3124 management::handle_blackhole_list_request(&self.engine)
3125 } else {
3126 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
3127 None
3128 };
3129
3130 if let Some(response) = response_data {
3131 let actions = self.link_manager.send_management_response(
3132 &link_id,
3133 &request_id,
3134 &response,
3135 &mut self.rng,
3136 );
3137 self.dispatch_link_actions(actions);
3138 }
3139 }
3140}
3141
3142#[cfg(test)]
3143mod tests {
3144 use super::*;
3145 use crate::event;
3146 use crate::interface::Writer;
3147 use rns_core::announce::AnnounceData;
3148 use rns_core::constants;
3149 use rns_core::packet::PacketFlags;
3150 use rns_core::transport::types::InterfaceInfo;
3151 use rns_crypto::identity::Identity;
3152 use std::io;
3153 use std::sync::mpsc;
3154 use std::sync::{Arc, Mutex};
3155
3156 struct MockWriter {
3157 sent: Arc<Mutex<Vec<Vec<u8>>>>,
3158 }
3159
3160 impl MockWriter {
3161 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
3162 let sent = Arc::new(Mutex::new(Vec::new()));
3163 (MockWriter { sent: sent.clone() }, sent)
3164 }
3165 }
3166
3167 impl Writer for MockWriter {
3168 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
3169 self.sent.lock().unwrap().push(data.to_vec());
3170 Ok(())
3171 }
3172 }
3173
3174 use rns_core::types::{DestHash, IdentityHash, LinkId as TypedLinkId, PacketHash};
3175
3176 struct MockCallbacks {
3177 announces: Arc<Mutex<Vec<(DestHash, u8)>>>,
3178 paths: Arc<Mutex<Vec<(DestHash, u8)>>>,
3179 deliveries: Arc<Mutex<Vec<DestHash>>>,
3180 iface_ups: Arc<Mutex<Vec<InterfaceId>>>,
3181 iface_downs: Arc<Mutex<Vec<InterfaceId>>>,
3182 link_established: Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
3183 link_closed: Arc<Mutex<Vec<TypedLinkId>>>,
3184 remote_identified: Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
3185 resources_received: Arc<Mutex<Vec<(TypedLinkId, Vec<u8>)>>>,
3186 resource_completed: Arc<Mutex<Vec<TypedLinkId>>>,
3187 resource_failed: Arc<Mutex<Vec<(TypedLinkId, String)>>>,
3188 channel_messages: Arc<Mutex<Vec<(TypedLinkId, u16, Vec<u8>)>>>,
3189 link_data: Arc<Mutex<Vec<(TypedLinkId, u8, Vec<u8>)>>>,
3190 responses: Arc<Mutex<Vec<(TypedLinkId, [u8; 16], Vec<u8>)>>>,
3191 proofs: Arc<Mutex<Vec<(DestHash, PacketHash, f64)>>>,
3192 proof_requested: Arc<Mutex<Vec<(DestHash, PacketHash)>>>,
3193 }
3194
3195 impl MockCallbacks {
3196 fn new() -> (
3197 Self,
3198 Arc<Mutex<Vec<(DestHash, u8)>>>,
3199 Arc<Mutex<Vec<(DestHash, u8)>>>,
3200 Arc<Mutex<Vec<DestHash>>>,
3201 Arc<Mutex<Vec<InterfaceId>>>,
3202 Arc<Mutex<Vec<InterfaceId>>>,
3203 ) {
3204 let announces = Arc::new(Mutex::new(Vec::new()));
3205 let paths = Arc::new(Mutex::new(Vec::new()));
3206 let deliveries = Arc::new(Mutex::new(Vec::new()));
3207 let iface_ups = Arc::new(Mutex::new(Vec::new()));
3208 let iface_downs = Arc::new(Mutex::new(Vec::new()));
3209 (
3210 MockCallbacks {
3211 announces: announces.clone(),
3212 paths: paths.clone(),
3213 deliveries: deliveries.clone(),
3214 iface_ups: iface_ups.clone(),
3215 iface_downs: iface_downs.clone(),
3216 link_established: Arc::new(Mutex::new(Vec::new())),
3217 link_closed: Arc::new(Mutex::new(Vec::new())),
3218 remote_identified: Arc::new(Mutex::new(Vec::new())),
3219 resources_received: Arc::new(Mutex::new(Vec::new())),
3220 resource_completed: Arc::new(Mutex::new(Vec::new())),
3221 resource_failed: Arc::new(Mutex::new(Vec::new())),
3222 channel_messages: Arc::new(Mutex::new(Vec::new())),
3223 link_data: Arc::new(Mutex::new(Vec::new())),
3224 responses: Arc::new(Mutex::new(Vec::new())),
3225 proofs: Arc::new(Mutex::new(Vec::new())),
3226 proof_requested: Arc::new(Mutex::new(Vec::new())),
3227 },
3228 announces,
3229 paths,
3230 deliveries,
3231 iface_ups,
3232 iface_downs,
3233 )
3234 }
3235
3236 fn with_link_tracking() -> (
3237 Self,
3238 Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
3239 Arc<Mutex<Vec<TypedLinkId>>>,
3240 Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
3241 ) {
3242 let link_established = Arc::new(Mutex::new(Vec::new()));
3243 let link_closed = Arc::new(Mutex::new(Vec::new()));
3244 let remote_identified = Arc::new(Mutex::new(Vec::new()));
3245 (
3246 MockCallbacks {
3247 announces: Arc::new(Mutex::new(Vec::new())),
3248 paths: Arc::new(Mutex::new(Vec::new())),
3249 deliveries: Arc::new(Mutex::new(Vec::new())),
3250 iface_ups: Arc::new(Mutex::new(Vec::new())),
3251 iface_downs: Arc::new(Mutex::new(Vec::new())),
3252 link_established: link_established.clone(),
3253 link_closed: link_closed.clone(),
3254 remote_identified: remote_identified.clone(),
3255 resources_received: Arc::new(Mutex::new(Vec::new())),
3256 resource_completed: Arc::new(Mutex::new(Vec::new())),
3257 resource_failed: Arc::new(Mutex::new(Vec::new())),
3258 channel_messages: Arc::new(Mutex::new(Vec::new())),
3259 link_data: Arc::new(Mutex::new(Vec::new())),
3260 responses: Arc::new(Mutex::new(Vec::new())),
3261 proofs: Arc::new(Mutex::new(Vec::new())),
3262 proof_requested: Arc::new(Mutex::new(Vec::new())),
3263 },
3264 link_established,
3265 link_closed,
3266 remote_identified,
3267 )
3268 }
3269 }
3270
3271 impl Callbacks for MockCallbacks {
3272 fn on_announce(&mut self, announced: crate::destination::AnnouncedIdentity) {
3273 self.announces
3274 .lock()
3275 .unwrap()
3276 .push((announced.dest_hash, announced.hops));
3277 }
3278
3279 fn on_path_updated(&mut self, dest_hash: DestHash, hops: u8) {
3280 self.paths.lock().unwrap().push((dest_hash, hops));
3281 }
3282
3283 fn on_local_delivery(
3284 &mut self,
3285 dest_hash: DestHash,
3286 _raw: Vec<u8>,
3287 _packet_hash: PacketHash,
3288 ) {
3289 self.deliveries.lock().unwrap().push(dest_hash);
3290 }
3291
3292 fn on_interface_up(&mut self, id: InterfaceId) {
3293 self.iface_ups.lock().unwrap().push(id);
3294 }
3295
3296 fn on_interface_down(&mut self, id: InterfaceId) {
3297 self.iface_downs.lock().unwrap().push(id);
3298 }
3299
3300 fn on_link_established(
3301 &mut self,
3302 link_id: TypedLinkId,
3303 _dest_hash: DestHash,
3304 rtt: f64,
3305 is_initiator: bool,
3306 ) {
3307 self.link_established
3308 .lock()
3309 .unwrap()
3310 .push((link_id, rtt, is_initiator));
3311 }
3312
3313 fn on_link_closed(
3314 &mut self,
3315 link_id: TypedLinkId,
3316 _reason: Option<rns_core::link::TeardownReason>,
3317 ) {
3318 self.link_closed.lock().unwrap().push(link_id);
3319 }
3320
3321 fn on_remote_identified(
3322 &mut self,
3323 link_id: TypedLinkId,
3324 identity_hash: IdentityHash,
3325 _public_key: [u8; 64],
3326 ) {
3327 self.remote_identified
3328 .lock()
3329 .unwrap()
3330 .push((link_id, identity_hash));
3331 }
3332
3333 fn on_resource_received(
3334 &mut self,
3335 link_id: TypedLinkId,
3336 data: Vec<u8>,
3337 _metadata: Option<Vec<u8>>,
3338 ) {
3339 self.resources_received
3340 .lock()
3341 .unwrap()
3342 .push((link_id, data));
3343 }
3344
3345 fn on_resource_completed(&mut self, link_id: TypedLinkId) {
3346 self.resource_completed.lock().unwrap().push(link_id);
3347 }
3348
3349 fn on_resource_failed(&mut self, link_id: TypedLinkId, error: String) {
3350 self.resource_failed.lock().unwrap().push((link_id, error));
3351 }
3352
3353 fn on_channel_message(&mut self, link_id: TypedLinkId, msgtype: u16, payload: Vec<u8>) {
3354 self.channel_messages
3355 .lock()
3356 .unwrap()
3357 .push((link_id, msgtype, payload));
3358 }
3359
3360 fn on_link_data(&mut self, link_id: TypedLinkId, context: u8, data: Vec<u8>) {
3361 self.link_data
3362 .lock()
3363 .unwrap()
3364 .push((link_id, context, data));
3365 }
3366
3367 fn on_response(&mut self, link_id: TypedLinkId, request_id: [u8; 16], data: Vec<u8>) {
3368 self.responses
3369 .lock()
3370 .unwrap()
3371 .push((link_id, request_id, data));
3372 }
3373
3374 fn on_proof(&mut self, dest_hash: DestHash, packet_hash: PacketHash, rtt: f64) {
3375 self.proofs
3376 .lock()
3377 .unwrap()
3378 .push((dest_hash, packet_hash, rtt));
3379 }
3380
3381 fn on_proof_requested(&mut self, dest_hash: DestHash, packet_hash: PacketHash) -> bool {
3382 self.proof_requested
3383 .lock()
3384 .unwrap()
3385 .push((dest_hash, packet_hash));
3386 true
3387 }
3388 }
3389
3390 fn make_interface_info(id: u64) -> InterfaceInfo {
3391 InterfaceInfo {
3392 id: InterfaceId(id),
3393 name: format!("test-{}", id),
3394 mode: constants::MODE_FULL,
3395 out_capable: true,
3396 in_capable: true,
3397 bitrate: None,
3398 announce_rate_target: None,
3399 announce_rate_grace: 0,
3400 announce_rate_penalty: 0.0,
3401 announce_cap: rns_core::constants::ANNOUNCE_CAP,
3402 is_local_client: false,
3403 wants_tunnel: false,
3404 tunnel_id: None,
3405 mtu: constants::MTU as u32,
3406 ia_freq: 0.0,
3407 started: 0.0,
3408 ingress_control: false,
3409 }
3410 }
3411
3412 fn make_entry(id: u64, writer: Box<dyn Writer>, online: bool) -> InterfaceEntry {
3413 InterfaceEntry {
3414 id: InterfaceId(id),
3415 info: make_interface_info(id),
3416 writer,
3417 online,
3418 dynamic: false,
3419 ifac: None,
3420 stats: InterfaceStats::default(),
3421 interface_type: String::new(),
3422 }
3423 }
3424
3425 fn build_announce_packet(identity: &Identity) -> Vec<u8> {
3427 let dest_hash =
3428 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
3429 let name_hash = rns_core::destination::name_hash("test", &["app"]);
3430 let random_hash = [0x42u8; 10];
3431
3432 let (announce_data, _has_ratchet) =
3433 AnnounceData::pack(identity, &dest_hash, &name_hash, &random_hash, None, None).unwrap();
3434
3435 let flags = PacketFlags {
3436 header_type: constants::HEADER_1,
3437 context_flag: constants::FLAG_UNSET,
3438 transport_type: constants::TRANSPORT_BROADCAST,
3439 destination_type: constants::DESTINATION_SINGLE,
3440 packet_type: constants::PACKET_TYPE_ANNOUNCE,
3441 };
3442
3443 let packet = RawPacket::pack(
3444 flags,
3445 0,
3446 &dest_hash,
3447 None,
3448 constants::CONTEXT_NONE,
3449 &announce_data,
3450 )
3451 .unwrap();
3452 packet.raw
3453 }
3454
3455 #[test]
3456 fn process_inbound_frame() {
3457 let (tx, rx) = event::channel();
3458 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
3459 let mut driver = Driver::new(
3460 TransportConfig {
3461 transport_enabled: false,
3462 identity_hash: None,
3463 prefer_shorter_path: false,
3464 max_paths_per_destination: 1,
3465 },
3466 rx,
3467 tx.clone(),
3468 Box::new(cbs),
3469 );
3470 let info = make_interface_info(1);
3471 driver.engine.register_interface(info.clone());
3472 let (writer, _sent) = MockWriter::new();
3473 driver
3474 .interfaces
3475 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3476
3477 let identity = Identity::new(&mut OsRng);
3478 let announce_raw = build_announce_packet(&identity);
3479
3480 tx.send(Event::Frame {
3482 interface_id: InterfaceId(1),
3483 data: announce_raw,
3484 })
3485 .unwrap();
3486 tx.send(Event::Shutdown).unwrap();
3487 driver.run();
3488
3489 assert_eq!(announces.lock().unwrap().len(), 1);
3490 }
3491
3492 #[test]
3493 fn dispatch_send() {
3494 let (tx, rx) = event::channel();
3495 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3496 let mut driver = Driver::new(
3497 TransportConfig {
3498 transport_enabled: false,
3499 identity_hash: None,
3500 prefer_shorter_path: false,
3501 max_paths_per_destination: 1,
3502 },
3503 rx,
3504 tx.clone(),
3505 Box::new(cbs),
3506 );
3507 let (writer, sent) = MockWriter::new();
3508 driver
3509 .interfaces
3510 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3511
3512 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3513 interface: InterfaceId(1),
3514 raw: vec![0x01, 0x02, 0x03],
3515 }]);
3516
3517 assert_eq!(sent.lock().unwrap().len(), 1);
3518 assert_eq!(sent.lock().unwrap()[0], vec![0x01, 0x02, 0x03]);
3519
3520 drop(tx);
3521 }
3522
3523 #[test]
3524 fn dispatch_broadcast() {
3525 let (tx, rx) = event::channel();
3526 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3527 let mut driver = Driver::new(
3528 TransportConfig {
3529 transport_enabled: false,
3530 identity_hash: None,
3531 prefer_shorter_path: false,
3532 max_paths_per_destination: 1,
3533 },
3534 rx,
3535 tx.clone(),
3536 Box::new(cbs),
3537 );
3538
3539 let (w1, sent1) = MockWriter::new();
3540 let (w2, sent2) = MockWriter::new();
3541 driver
3542 .interfaces
3543 .insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
3544 driver
3545 .interfaces
3546 .insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
3547
3548 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
3549 raw: vec![0xAA],
3550 exclude: None,
3551 }]);
3552
3553 assert_eq!(sent1.lock().unwrap().len(), 1);
3554 assert_eq!(sent2.lock().unwrap().len(), 1);
3555
3556 drop(tx);
3557 }
3558
3559 #[test]
3560 fn dispatch_broadcast_exclude() {
3561 let (tx, rx) = event::channel();
3562 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3563 let mut driver = Driver::new(
3564 TransportConfig {
3565 transport_enabled: false,
3566 identity_hash: None,
3567 prefer_shorter_path: false,
3568 max_paths_per_destination: 1,
3569 },
3570 rx,
3571 tx.clone(),
3572 Box::new(cbs),
3573 );
3574
3575 let (w1, sent1) = MockWriter::new();
3576 let (w2, sent2) = MockWriter::new();
3577 driver
3578 .interfaces
3579 .insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
3580 driver
3581 .interfaces
3582 .insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
3583
3584 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
3585 raw: vec![0xBB],
3586 exclude: Some(InterfaceId(1)),
3587 }]);
3588
3589 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
3591
3592 drop(tx);
3593 }
3594
3595 #[test]
3596 fn tick_event() {
3597 let (tx, rx) = event::channel();
3598 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3599 let mut driver = Driver::new(
3600 TransportConfig {
3601 transport_enabled: true,
3602 identity_hash: Some([0x42; 16]),
3603 prefer_shorter_path: false,
3604 max_paths_per_destination: 1,
3605 },
3606 rx,
3607 tx.clone(),
3608 Box::new(cbs),
3609 );
3610 let info = make_interface_info(1);
3611 driver.engine.register_interface(info.clone());
3612 let (writer, _sent) = MockWriter::new();
3613 driver
3614 .interfaces
3615 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3616
3617 tx.send(Event::Tick).unwrap();
3619 tx.send(Event::Shutdown).unwrap();
3620 driver.run();
3621 }
3623
3624 #[test]
3625 fn shutdown_event() {
3626 let (tx, rx) = event::channel();
3627 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3628 let mut driver = Driver::new(
3629 TransportConfig {
3630 transport_enabled: false,
3631 identity_hash: None,
3632 prefer_shorter_path: false,
3633 max_paths_per_destination: 1,
3634 },
3635 rx,
3636 tx.clone(),
3637 Box::new(cbs),
3638 );
3639
3640 tx.send(Event::Shutdown).unwrap();
3641 driver.run(); }
3643
3644 #[test]
3645 fn announce_callback() {
3646 let (tx, rx) = event::channel();
3647 let (cbs, announces, paths, _, _, _) = MockCallbacks::new();
3648 let mut driver = Driver::new(
3649 TransportConfig {
3650 transport_enabled: false,
3651 identity_hash: None,
3652 prefer_shorter_path: false,
3653 max_paths_per_destination: 1,
3654 },
3655 rx,
3656 tx.clone(),
3657 Box::new(cbs),
3658 );
3659 let info = make_interface_info(1);
3660 driver.engine.register_interface(info.clone());
3661 let (writer, _sent) = MockWriter::new();
3662 driver
3663 .interfaces
3664 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3665
3666 let identity = Identity::new(&mut OsRng);
3667 let announce_raw = build_announce_packet(&identity);
3668
3669 tx.send(Event::Frame {
3670 interface_id: InterfaceId(1),
3671 data: announce_raw,
3672 })
3673 .unwrap();
3674 tx.send(Event::Shutdown).unwrap();
3675 driver.run();
3676
3677 let ann = announces.lock().unwrap();
3678 assert_eq!(ann.len(), 1);
3679 assert_eq!(ann[0].1, 1);
3681
3682 let p = paths.lock().unwrap();
3683 assert_eq!(p.len(), 1);
3684 }
3685
3686 #[test]
3687 fn dispatch_skips_offline_interface() {
3688 let (tx, rx) = event::channel();
3689 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3690 let mut driver = Driver::new(
3691 TransportConfig {
3692 transport_enabled: false,
3693 identity_hash: None,
3694 prefer_shorter_path: false,
3695 max_paths_per_destination: 1,
3696 },
3697 rx,
3698 tx.clone(),
3699 Box::new(cbs),
3700 );
3701
3702 let (w1, sent1) = MockWriter::new();
3703 let (w2, sent2) = MockWriter::new();
3704 driver
3705 .interfaces
3706 .insert(InterfaceId(1), make_entry(1, Box::new(w1), false)); driver
3708 .interfaces
3709 .insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
3710
3711 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3713 interface: InterfaceId(1),
3714 raw: vec![0x01],
3715 }]);
3716 assert_eq!(sent1.lock().unwrap().len(), 0);
3717
3718 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
3720 raw: vec![0x02],
3721 exclude: None,
3722 }]);
3723 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
3725
3726 drop(tx);
3727 }
3728
3729 #[test]
3730 fn interface_up_refreshes_writer() {
3731 let (tx, rx) = event::channel();
3732 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3733 let mut driver = Driver::new(
3734 TransportConfig {
3735 transport_enabled: false,
3736 identity_hash: None,
3737 prefer_shorter_path: false,
3738 max_paths_per_destination: 1,
3739 },
3740 rx,
3741 tx.clone(),
3742 Box::new(cbs),
3743 );
3744
3745 let (w_old, sent_old) = MockWriter::new();
3746 driver
3747 .interfaces
3748 .insert(InterfaceId(1), make_entry(1, Box::new(w_old), false));
3749
3750 let (w_new, sent_new) = MockWriter::new();
3752 tx.send(Event::InterfaceUp(
3753 InterfaceId(1),
3754 Some(Box::new(w_new)),
3755 None,
3756 ))
3757 .unwrap();
3758 tx.send(Event::Shutdown).unwrap();
3759 driver.run();
3760
3761 assert!(driver.interfaces[&InterfaceId(1)].online);
3763
3764 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3766 interface: InterfaceId(1),
3767 raw: vec![0xFF],
3768 }]);
3769
3770 assert_eq!(sent_old.lock().unwrap().len(), 0);
3772 assert_eq!(sent_new.lock().unwrap().len(), 1);
3774 assert_eq!(sent_new.lock().unwrap()[0], vec![0xFF]);
3775
3776 drop(tx);
3777 }
3778
3779 #[test]
3780 fn dynamic_interface_register() {
3781 let (tx, rx) = event::channel();
3782 let (cbs, _, _, _, iface_ups, _) = MockCallbacks::new();
3783 let mut driver = Driver::new(
3784 TransportConfig {
3785 transport_enabled: false,
3786 identity_hash: None,
3787 prefer_shorter_path: false,
3788 max_paths_per_destination: 1,
3789 },
3790 rx,
3791 tx.clone(),
3792 Box::new(cbs),
3793 );
3794
3795 let info = make_interface_info(100);
3796 let (writer, sent) = MockWriter::new();
3797
3798 tx.send(Event::InterfaceUp(
3800 InterfaceId(100),
3801 Some(Box::new(writer)),
3802 Some(info),
3803 ))
3804 .unwrap();
3805 tx.send(Event::Shutdown).unwrap();
3806 driver.run();
3807
3808 assert!(driver.interfaces.contains_key(&InterfaceId(100)));
3810 assert!(driver.interfaces[&InterfaceId(100)].online);
3811 assert!(driver.interfaces[&InterfaceId(100)].dynamic);
3812
3813 assert_eq!(iface_ups.lock().unwrap().len(), 1);
3815 assert_eq!(iface_ups.lock().unwrap()[0], InterfaceId(100));
3816
3817 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3819 interface: InterfaceId(100),
3820 raw: vec![0x42],
3821 }]);
3822 assert_eq!(sent.lock().unwrap().len(), 1);
3823
3824 drop(tx);
3825 }
3826
3827 #[test]
3828 fn dynamic_interface_deregister() {
3829 let (tx, rx) = event::channel();
3830 let (cbs, _, _, _, _, iface_downs) = MockCallbacks::new();
3831 let mut driver = Driver::new(
3832 TransportConfig {
3833 transport_enabled: false,
3834 identity_hash: None,
3835 prefer_shorter_path: false,
3836 max_paths_per_destination: 1,
3837 },
3838 rx,
3839 tx.clone(),
3840 Box::new(cbs),
3841 );
3842
3843 let info = make_interface_info(200);
3845 driver.engine.register_interface(info.clone());
3846 let (writer, _sent) = MockWriter::new();
3847 driver.interfaces.insert(
3848 InterfaceId(200),
3849 InterfaceEntry {
3850 id: InterfaceId(200),
3851 info,
3852 writer: Box::new(writer),
3853 online: true,
3854 dynamic: true,
3855 ifac: None,
3856 stats: InterfaceStats::default(),
3857 interface_type: String::new(),
3858 },
3859 );
3860
3861 tx.send(Event::InterfaceDown(InterfaceId(200))).unwrap();
3863 tx.send(Event::Shutdown).unwrap();
3864 driver.run();
3865
3866 assert!(!driver.interfaces.contains_key(&InterfaceId(200)));
3867 assert_eq!(iface_downs.lock().unwrap().len(), 1);
3868 assert_eq!(iface_downs.lock().unwrap()[0], InterfaceId(200));
3869 }
3870
3871 #[test]
3872 fn interface_callbacks_fire() {
3873 let (tx, rx) = event::channel();
3874 let (cbs, _, _, _, iface_ups, iface_downs) = MockCallbacks::new();
3875 let mut driver = Driver::new(
3876 TransportConfig {
3877 transport_enabled: false,
3878 identity_hash: None,
3879 prefer_shorter_path: false,
3880 max_paths_per_destination: 1,
3881 },
3882 rx,
3883 tx.clone(),
3884 Box::new(cbs),
3885 );
3886
3887 let (writer, _) = MockWriter::new();
3889 driver
3890 .interfaces
3891 .insert(InterfaceId(1), make_entry(1, Box::new(writer), false));
3892
3893 tx.send(Event::InterfaceUp(InterfaceId(1), None, None))
3894 .unwrap();
3895 tx.send(Event::InterfaceDown(InterfaceId(1))).unwrap();
3896 tx.send(Event::Shutdown).unwrap();
3897 driver.run();
3898
3899 assert_eq!(iface_ups.lock().unwrap().len(), 1);
3900 assert_eq!(iface_downs.lock().unwrap().len(), 1);
3901 assert!(driver.interfaces.contains_key(&InterfaceId(1)));
3903 assert!(!driver.interfaces[&InterfaceId(1)].online);
3904 }
3905
3906 #[test]
3911 fn frame_updates_rx_stats() {
3912 let (tx, rx) = event::channel();
3913 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3914 let mut driver = Driver::new(
3915 TransportConfig {
3916 transport_enabled: false,
3917 identity_hash: None,
3918 prefer_shorter_path: false,
3919 max_paths_per_destination: 1,
3920 },
3921 rx,
3922 tx.clone(),
3923 Box::new(cbs),
3924 );
3925 let info = make_interface_info(1);
3926 driver.engine.register_interface(info.clone());
3927 let (writer, _sent) = MockWriter::new();
3928 driver
3929 .interfaces
3930 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3931
3932 let identity = Identity::new(&mut OsRng);
3933 let announce_raw = build_announce_packet(&identity);
3934 let announce_len = announce_raw.len() as u64;
3935
3936 tx.send(Event::Frame {
3937 interface_id: InterfaceId(1),
3938 data: announce_raw,
3939 })
3940 .unwrap();
3941 tx.send(Event::Shutdown).unwrap();
3942 driver.run();
3943
3944 let stats = &driver.interfaces[&InterfaceId(1)].stats;
3945 assert_eq!(stats.rxb, announce_len);
3946 assert_eq!(stats.rx_packets, 1);
3947 }
3948
3949 #[test]
3950 fn send_updates_tx_stats() {
3951 let (tx, rx) = event::channel();
3952 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3953 let mut driver = Driver::new(
3954 TransportConfig {
3955 transport_enabled: false,
3956 identity_hash: None,
3957 prefer_shorter_path: false,
3958 max_paths_per_destination: 1,
3959 },
3960 rx,
3961 tx.clone(),
3962 Box::new(cbs),
3963 );
3964 let (writer, _sent) = MockWriter::new();
3965 driver
3966 .interfaces
3967 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3968
3969 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3970 interface: InterfaceId(1),
3971 raw: vec![0x01, 0x02, 0x03],
3972 }]);
3973
3974 let stats = &driver.interfaces[&InterfaceId(1)].stats;
3975 assert_eq!(stats.txb, 3);
3976 assert_eq!(stats.tx_packets, 1);
3977
3978 drop(tx);
3979 }
3980
3981 #[test]
3982 fn broadcast_updates_tx_stats() {
3983 let (tx, rx) = event::channel();
3984 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3985 let mut driver = Driver::new(
3986 TransportConfig {
3987 transport_enabled: false,
3988 identity_hash: None,
3989 prefer_shorter_path: false,
3990 max_paths_per_destination: 1,
3991 },
3992 rx,
3993 tx.clone(),
3994 Box::new(cbs),
3995 );
3996 let (w1, _s1) = MockWriter::new();
3997 let (w2, _s2) = MockWriter::new();
3998 driver
3999 .interfaces
4000 .insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
4001 driver
4002 .interfaces
4003 .insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
4004
4005 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
4006 raw: vec![0xAA, 0xBB],
4007 exclude: None,
4008 }]);
4009
4010 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.txb, 2);
4012 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.tx_packets, 1);
4013 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.txb, 2);
4014 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.tx_packets, 1);
4015
4016 drop(tx);
4017 }
4018
4019 #[test]
4020 fn query_interface_stats() {
4021 let (tx, rx) = event::channel();
4022 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4023 let mut driver = Driver::new(
4024 TransportConfig {
4025 transport_enabled: true,
4026 identity_hash: Some([0x42; 16]),
4027 prefer_shorter_path: false,
4028 max_paths_per_destination: 1,
4029 },
4030 rx,
4031 tx.clone(),
4032 Box::new(cbs),
4033 );
4034 let (writer, _sent) = MockWriter::new();
4035 driver
4036 .interfaces
4037 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4038
4039 let (resp_tx, resp_rx) = mpsc::channel();
4040 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx))
4041 .unwrap();
4042 tx.send(Event::Shutdown).unwrap();
4043 driver.run();
4044
4045 let resp = resp_rx.recv().unwrap();
4046 match resp {
4047 QueryResponse::InterfaceStats(stats) => {
4048 assert_eq!(stats.interfaces.len(), 1);
4049 assert_eq!(stats.interfaces[0].name, "test-1");
4050 assert!(stats.interfaces[0].status);
4051 assert_eq!(stats.transport_id, Some([0x42; 16]));
4052 assert!(stats.transport_enabled);
4053 }
4054 _ => panic!("unexpected response"),
4055 }
4056 }
4057
4058 #[test]
4059 fn query_path_table() {
4060 let (tx, rx) = event::channel();
4061 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4062 let mut driver = Driver::new(
4063 TransportConfig {
4064 transport_enabled: false,
4065 identity_hash: None,
4066 prefer_shorter_path: false,
4067 max_paths_per_destination: 1,
4068 },
4069 rx,
4070 tx.clone(),
4071 Box::new(cbs),
4072 );
4073 let info = make_interface_info(1);
4074 driver.engine.register_interface(info);
4075 let (writer, _sent) = MockWriter::new();
4076 driver
4077 .interfaces
4078 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4079
4080 let identity = Identity::new(&mut OsRng);
4082 let announce_raw = build_announce_packet(&identity);
4083 tx.send(Event::Frame {
4084 interface_id: InterfaceId(1),
4085 data: announce_raw,
4086 })
4087 .unwrap();
4088
4089 let (resp_tx, resp_rx) = mpsc::channel();
4090 tx.send(Event::Query(
4091 QueryRequest::PathTable { max_hops: None },
4092 resp_tx,
4093 ))
4094 .unwrap();
4095 tx.send(Event::Shutdown).unwrap();
4096 driver.run();
4097
4098 let resp = resp_rx.recv().unwrap();
4099 match resp {
4100 QueryResponse::PathTable(entries) => {
4101 assert_eq!(entries.len(), 1);
4102 assert_eq!(entries[0].hops, 1);
4103 }
4104 _ => panic!("unexpected response"),
4105 }
4106 }
4107
4108 #[test]
4109 fn query_drop_path() {
4110 let (tx, rx) = event::channel();
4111 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4112 let mut driver = Driver::new(
4113 TransportConfig {
4114 transport_enabled: false,
4115 identity_hash: None,
4116 prefer_shorter_path: false,
4117 max_paths_per_destination: 1,
4118 },
4119 rx,
4120 tx.clone(),
4121 Box::new(cbs),
4122 );
4123 let info = make_interface_info(1);
4124 driver.engine.register_interface(info);
4125 let (writer, _sent) = MockWriter::new();
4126 driver
4127 .interfaces
4128 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4129
4130 let identity = Identity::new(&mut OsRng);
4132 let announce_raw = build_announce_packet(&identity);
4133 let dest_hash =
4134 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
4135
4136 tx.send(Event::Frame {
4137 interface_id: InterfaceId(1),
4138 data: announce_raw,
4139 })
4140 .unwrap();
4141
4142 let (resp_tx, resp_rx) = mpsc::channel();
4143 tx.send(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx))
4144 .unwrap();
4145 tx.send(Event::Shutdown).unwrap();
4146 driver.run();
4147
4148 let resp = resp_rx.recv().unwrap();
4149 match resp {
4150 QueryResponse::DropPath(dropped) => {
4151 assert!(dropped);
4152 }
4153 _ => panic!("unexpected response"),
4154 }
4155 }
4156
4157 #[test]
4158 fn send_outbound_event() {
4159 let (tx, rx) = event::channel();
4160 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4161 let mut driver = Driver::new(
4162 TransportConfig {
4163 transport_enabled: false,
4164 identity_hash: None,
4165 prefer_shorter_path: false,
4166 max_paths_per_destination: 1,
4167 },
4168 rx,
4169 tx.clone(),
4170 Box::new(cbs),
4171 );
4172 let (writer, sent) = MockWriter::new();
4173 let info = make_interface_info(1);
4174 driver.engine.register_interface(info);
4175 driver
4176 .interfaces
4177 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4178
4179 let dest = [0xAA; 16];
4181 let flags = PacketFlags {
4182 header_type: constants::HEADER_1,
4183 context_flag: constants::FLAG_UNSET,
4184 transport_type: constants::TRANSPORT_BROADCAST,
4185 destination_type: constants::DESTINATION_PLAIN,
4186 packet_type: constants::PACKET_TYPE_DATA,
4187 };
4188 let packet =
4189 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4190
4191 tx.send(Event::SendOutbound {
4192 raw: packet.raw,
4193 dest_type: constants::DESTINATION_PLAIN,
4194 attached_interface: None,
4195 })
4196 .unwrap();
4197 tx.send(Event::Shutdown).unwrap();
4198 driver.run();
4199
4200 assert_eq!(sent.lock().unwrap().len(), 1);
4202 }
4203
4204 #[test]
4205 fn register_destination_and_deliver() {
4206 let (tx, rx) = event::channel();
4207 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4208 let mut driver = Driver::new(
4209 TransportConfig {
4210 transport_enabled: false,
4211 identity_hash: None,
4212 prefer_shorter_path: false,
4213 max_paths_per_destination: 1,
4214 },
4215 rx,
4216 tx.clone(),
4217 Box::new(cbs),
4218 );
4219 let info = make_interface_info(1);
4220 driver.engine.register_interface(info);
4221 let (writer, _sent) = MockWriter::new();
4222 driver
4223 .interfaces
4224 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4225
4226 let dest = [0xBB; 16];
4227
4228 tx.send(Event::RegisterDestination {
4230 dest_hash: dest,
4231 dest_type: constants::DESTINATION_SINGLE,
4232 })
4233 .unwrap();
4234
4235 let flags = PacketFlags {
4236 header_type: constants::HEADER_1,
4237 context_flag: constants::FLAG_UNSET,
4238 transport_type: constants::TRANSPORT_BROADCAST,
4239 destination_type: constants::DESTINATION_SINGLE,
4240 packet_type: constants::PACKET_TYPE_DATA,
4241 };
4242 let packet =
4243 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"data").unwrap();
4244 tx.send(Event::Frame {
4245 interface_id: InterfaceId(1),
4246 data: packet.raw,
4247 })
4248 .unwrap();
4249 tx.send(Event::Shutdown).unwrap();
4250 driver.run();
4251
4252 assert_eq!(deliveries.lock().unwrap().len(), 1);
4253 assert_eq!(deliveries.lock().unwrap()[0], DestHash(dest));
4254 }
4255
4256 #[test]
4257 fn query_transport_identity() {
4258 let (tx, rx) = event::channel();
4259 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4260 let mut driver = Driver::new(
4261 TransportConfig {
4262 transport_enabled: true,
4263 identity_hash: Some([0xAA; 16]),
4264 prefer_shorter_path: false,
4265 max_paths_per_destination: 1,
4266 },
4267 rx,
4268 tx.clone(),
4269 Box::new(cbs),
4270 );
4271
4272 let (resp_tx, resp_rx) = mpsc::channel();
4273 tx.send(Event::Query(QueryRequest::TransportIdentity, resp_tx))
4274 .unwrap();
4275 tx.send(Event::Shutdown).unwrap();
4276 driver.run();
4277
4278 match resp_rx.recv().unwrap() {
4279 QueryResponse::TransportIdentity(Some(hash)) => {
4280 assert_eq!(hash, [0xAA; 16]);
4281 }
4282 _ => panic!("unexpected response"),
4283 }
4284 }
4285
4286 #[test]
4287 fn query_link_count() {
4288 let (tx, rx) = event::channel();
4289 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4290 let mut driver = Driver::new(
4291 TransportConfig {
4292 transport_enabled: false,
4293 identity_hash: None,
4294 prefer_shorter_path: false,
4295 max_paths_per_destination: 1,
4296 },
4297 rx,
4298 tx.clone(),
4299 Box::new(cbs),
4300 );
4301
4302 let (resp_tx, resp_rx) = mpsc::channel();
4303 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx))
4304 .unwrap();
4305 tx.send(Event::Shutdown).unwrap();
4306 driver.run();
4307
4308 match resp_rx.recv().unwrap() {
4309 QueryResponse::LinkCount(count) => assert_eq!(count, 0),
4310 _ => panic!("unexpected response"),
4311 }
4312 }
4313
4314 #[test]
4315 fn query_rate_table() {
4316 let (tx, rx) = event::channel();
4317 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4318 let mut driver = Driver::new(
4319 TransportConfig {
4320 transport_enabled: false,
4321 identity_hash: None,
4322 prefer_shorter_path: false,
4323 max_paths_per_destination: 1,
4324 },
4325 rx,
4326 tx.clone(),
4327 Box::new(cbs),
4328 );
4329
4330 let (resp_tx, resp_rx) = mpsc::channel();
4331 tx.send(Event::Query(QueryRequest::RateTable, resp_tx))
4332 .unwrap();
4333 tx.send(Event::Shutdown).unwrap();
4334 driver.run();
4335
4336 match resp_rx.recv().unwrap() {
4337 QueryResponse::RateTable(entries) => assert!(entries.is_empty()),
4338 _ => panic!("unexpected response"),
4339 }
4340 }
4341
4342 #[test]
4343 fn query_next_hop() {
4344 let (tx, rx) = event::channel();
4345 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4346 let mut driver = Driver::new(
4347 TransportConfig {
4348 transport_enabled: false,
4349 identity_hash: None,
4350 prefer_shorter_path: false,
4351 max_paths_per_destination: 1,
4352 },
4353 rx,
4354 tx.clone(),
4355 Box::new(cbs),
4356 );
4357
4358 let dest = [0xBB; 16];
4359 let (resp_tx, resp_rx) = mpsc::channel();
4360 tx.send(Event::Query(
4361 QueryRequest::NextHop { dest_hash: dest },
4362 resp_tx,
4363 ))
4364 .unwrap();
4365 tx.send(Event::Shutdown).unwrap();
4366 driver.run();
4367
4368 match resp_rx.recv().unwrap() {
4369 QueryResponse::NextHop(None) => {}
4370 _ => panic!("unexpected response"),
4371 }
4372 }
4373
4374 #[test]
4375 fn query_next_hop_if_name() {
4376 let (tx, rx) = event::channel();
4377 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4378 let mut driver = Driver::new(
4379 TransportConfig {
4380 transport_enabled: false,
4381 identity_hash: None,
4382 prefer_shorter_path: false,
4383 max_paths_per_destination: 1,
4384 },
4385 rx,
4386 tx.clone(),
4387 Box::new(cbs),
4388 );
4389
4390 let dest = [0xCC; 16];
4391 let (resp_tx, resp_rx) = mpsc::channel();
4392 tx.send(Event::Query(
4393 QueryRequest::NextHopIfName { dest_hash: dest },
4394 resp_tx,
4395 ))
4396 .unwrap();
4397 tx.send(Event::Shutdown).unwrap();
4398 driver.run();
4399
4400 match resp_rx.recv().unwrap() {
4401 QueryResponse::NextHopIfName(None) => {}
4402 _ => panic!("unexpected response"),
4403 }
4404 }
4405
4406 #[test]
4407 fn query_drop_all_via() {
4408 let (tx, rx) = event::channel();
4409 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4410 let mut driver = Driver::new(
4411 TransportConfig {
4412 transport_enabled: false,
4413 identity_hash: None,
4414 prefer_shorter_path: false,
4415 max_paths_per_destination: 1,
4416 },
4417 rx,
4418 tx.clone(),
4419 Box::new(cbs),
4420 );
4421
4422 let transport = [0xDD; 16];
4423 let (resp_tx, resp_rx) = mpsc::channel();
4424 tx.send(Event::Query(
4425 QueryRequest::DropAllVia {
4426 transport_hash: transport,
4427 },
4428 resp_tx,
4429 ))
4430 .unwrap();
4431 tx.send(Event::Shutdown).unwrap();
4432 driver.run();
4433
4434 match resp_rx.recv().unwrap() {
4435 QueryResponse::DropAllVia(count) => assert_eq!(count, 0),
4436 _ => panic!("unexpected response"),
4437 }
4438 }
4439
4440 #[test]
4441 fn query_drop_announce_queues() {
4442 let (tx, rx) = event::channel();
4443 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4444 let mut driver = Driver::new(
4445 TransportConfig {
4446 transport_enabled: false,
4447 identity_hash: None,
4448 prefer_shorter_path: false,
4449 max_paths_per_destination: 1,
4450 },
4451 rx,
4452 tx.clone(),
4453 Box::new(cbs),
4454 );
4455
4456 let (resp_tx, resp_rx) = mpsc::channel();
4457 tx.send(Event::Query(QueryRequest::DropAnnounceQueues, resp_tx))
4458 .unwrap();
4459 tx.send(Event::Shutdown).unwrap();
4460 driver.run();
4461
4462 match resp_rx.recv().unwrap() {
4463 QueryResponse::DropAnnounceQueues => {}
4464 _ => panic!("unexpected response"),
4465 }
4466 }
4467
4468 #[test]
4473 fn register_link_dest_event() {
4474 let (tx, rx) = event::channel();
4475 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4476 let mut driver = Driver::new(
4477 TransportConfig {
4478 transport_enabled: false,
4479 identity_hash: None,
4480 prefer_shorter_path: false,
4481 max_paths_per_destination: 1,
4482 },
4483 rx,
4484 tx.clone(),
4485 Box::new(cbs),
4486 );
4487 let info = make_interface_info(1);
4488 driver.engine.register_interface(info);
4489 let (writer, _sent) = MockWriter::new();
4490 driver
4491 .interfaces
4492 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4493
4494 let mut rng = OsRng;
4495 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
4496 let sig_pub_bytes = sig_prv.public_key().public_bytes();
4497 let sig_prv_bytes = sig_prv.private_bytes();
4498 let dest_hash = [0xDD; 16];
4499
4500 tx.send(Event::RegisterLinkDestination {
4501 dest_hash,
4502 sig_prv_bytes,
4503 sig_pub_bytes,
4504 resource_strategy: 0,
4505 })
4506 .unwrap();
4507 tx.send(Event::Shutdown).unwrap();
4508 driver.run();
4509
4510 assert!(driver.link_manager.is_link_destination(&dest_hash));
4512 }
4513
4514 #[test]
4515 fn create_link_event() {
4516 let (tx, rx) = event::channel();
4517 let (cbs, _link_established, _, _) = MockCallbacks::with_link_tracking();
4518 let mut driver = Driver::new(
4519 TransportConfig {
4520 transport_enabled: false,
4521 identity_hash: None,
4522 prefer_shorter_path: false,
4523 max_paths_per_destination: 1,
4524 },
4525 rx,
4526 tx.clone(),
4527 Box::new(cbs),
4528 );
4529 let info = make_interface_info(1);
4530 driver.engine.register_interface(info);
4531 let (writer, _sent) = MockWriter::new();
4532 driver
4533 .interfaces
4534 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4535
4536 let dest_hash = [0xDD; 16];
4537 let dummy_sig_pub = [0xAA; 32];
4538
4539 let (resp_tx, resp_rx) = mpsc::channel();
4540 tx.send(Event::CreateLink {
4541 dest_hash,
4542 dest_sig_pub_bytes: dummy_sig_pub,
4543 response_tx: resp_tx,
4544 })
4545 .unwrap();
4546 tx.send(Event::Shutdown).unwrap();
4547 driver.run();
4548
4549 let link_id = resp_rx.recv().unwrap();
4551 assert_ne!(link_id, [0u8; 16]);
4552
4553 assert_eq!(driver.link_manager.link_count(), 1);
4555
4556 }
4561
4562 #[test]
4563 fn deliver_local_routes_to_link_manager() {
4564 let (tx, rx) = event::channel();
4567 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4568 let mut driver = Driver::new(
4569 TransportConfig {
4570 transport_enabled: false,
4571 identity_hash: None,
4572 prefer_shorter_path: false,
4573 max_paths_per_destination: 1,
4574 },
4575 rx,
4576 tx.clone(),
4577 Box::new(cbs),
4578 );
4579 let info = make_interface_info(1);
4580 driver.engine.register_interface(info);
4581 let (writer, _sent) = MockWriter::new();
4582 driver
4583 .interfaces
4584 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4585
4586 let mut rng = OsRng;
4588 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
4589 let sig_pub_bytes = sig_prv.public_key().public_bytes();
4590 let dest_hash = [0xEE; 16];
4591 driver.link_manager.register_link_destination(
4592 dest_hash,
4593 sig_prv,
4594 sig_pub_bytes,
4595 crate::link_manager::ResourceStrategy::AcceptNone,
4596 );
4597
4598 assert!(driver.link_manager.is_link_destination(&dest_hash));
4602
4603 assert!(!driver.link_manager.is_link_destination(&[0xFF; 16]));
4605
4606 drop(tx);
4607 }
4608
4609 #[test]
4610 fn teardown_link_event() {
4611 let (tx, rx) = event::channel();
4612 let (cbs, _, link_closed, _) = MockCallbacks::with_link_tracking();
4613 let mut driver = Driver::new(
4614 TransportConfig {
4615 transport_enabled: false,
4616 identity_hash: None,
4617 prefer_shorter_path: false,
4618 max_paths_per_destination: 1,
4619 },
4620 rx,
4621 tx.clone(),
4622 Box::new(cbs),
4623 );
4624 let info = make_interface_info(1);
4625 driver.engine.register_interface(info);
4626 let (writer, _sent) = MockWriter::new();
4627 driver
4628 .interfaces
4629 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4630
4631 let (resp_tx, resp_rx) = mpsc::channel();
4633 tx.send(Event::CreateLink {
4634 dest_hash: [0xDD; 16],
4635 dest_sig_pub_bytes: [0xAA; 32],
4636 response_tx: resp_tx,
4637 })
4638 .unwrap();
4639 tx.send(Event::Shutdown).unwrap();
4644 driver.run();
4645
4646 let link_id = resp_rx.recv().unwrap();
4647 assert_ne!(link_id, [0u8; 16]);
4648 assert_eq!(driver.link_manager.link_count(), 1);
4649
4650 let teardown_actions = driver.link_manager.teardown_link(&link_id);
4652 driver.dispatch_link_actions(teardown_actions);
4653
4654 assert_eq!(link_closed.lock().unwrap().len(), 1);
4656 assert_eq!(link_closed.lock().unwrap()[0], TypedLinkId(link_id));
4657 }
4658
4659 #[test]
4660 fn link_count_includes_link_manager() {
4661 let (tx, rx) = event::channel();
4662 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4663 let mut driver = Driver::new(
4664 TransportConfig {
4665 transport_enabled: false,
4666 identity_hash: None,
4667 prefer_shorter_path: false,
4668 max_paths_per_destination: 1,
4669 },
4670 rx,
4671 tx.clone(),
4672 Box::new(cbs),
4673 );
4674 let info = make_interface_info(1);
4675 driver.engine.register_interface(info);
4676 let (writer, _sent) = MockWriter::new();
4677 driver
4678 .interfaces
4679 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4680
4681 let mut rng = OsRng;
4683 let dummy_sig = [0xAA; 32];
4684 driver.link_manager.create_link(
4685 &[0xDD; 16],
4686 &dummy_sig,
4687 1,
4688 constants::MTU as u32,
4689 &mut rng,
4690 );
4691
4692 let (resp_tx, resp_rx) = mpsc::channel();
4694 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx))
4695 .unwrap();
4696 tx.send(Event::Shutdown).unwrap();
4697 driver.run();
4698
4699 match resp_rx.recv().unwrap() {
4700 QueryResponse::LinkCount(count) => assert_eq!(count, 1),
4701 _ => panic!("unexpected response"),
4702 }
4703 }
4704
4705 #[test]
4706 fn register_request_handler_event() {
4707 let (tx, rx) = event::channel();
4708 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4709 let mut driver = Driver::new(
4710 TransportConfig {
4711 transport_enabled: false,
4712 identity_hash: None,
4713 prefer_shorter_path: false,
4714 max_paths_per_destination: 1,
4715 },
4716 rx,
4717 tx.clone(),
4718 Box::new(cbs),
4719 );
4720
4721 tx.send(Event::RegisterRequestHandler {
4722 path: "/status".to_string(),
4723 allowed_list: None,
4724 handler: Box::new(|_link_id, _path, _data, _remote| Some(b"OK".to_vec())),
4725 })
4726 .unwrap();
4727 tx.send(Event::Shutdown).unwrap();
4728 driver.run();
4729
4730 }
4733
4734 #[test]
4737 fn management_announces_emitted_after_delay() {
4738 let (tx, rx) = event::channel();
4739 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
4740 let identity = Identity::new(&mut OsRng);
4741 let identity_hash = *identity.hash();
4742 let mut driver = Driver::new(
4743 TransportConfig {
4744 transport_enabled: true,
4745 identity_hash: Some(identity_hash),
4746 prefer_shorter_path: false,
4747 max_paths_per_destination: 1,
4748 },
4749 rx,
4750 tx.clone(),
4751 Box::new(cbs),
4752 );
4753
4754 let info = make_interface_info(1);
4756 driver.engine.register_interface(info.clone());
4757 let (writer, sent) = MockWriter::new();
4758 driver
4759 .interfaces
4760 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4761
4762 driver.management_config.enable_remote_management = true;
4764 driver.transport_identity = Some(identity);
4765
4766 driver.started = time::now() - 10.0;
4768
4769 tx.send(Event::Tick).unwrap();
4771 tx.send(Event::Shutdown).unwrap();
4772 driver.run();
4773
4774 let sent_packets = sent.lock().unwrap();
4776 assert!(
4777 !sent_packets.is_empty(),
4778 "Management announce should be sent after startup delay"
4779 );
4780 }
4781
4782 #[test]
4783 fn management_announces_not_emitted_when_disabled() {
4784 let (tx, rx) = event::channel();
4785 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4786 let identity = Identity::new(&mut OsRng);
4787 let identity_hash = *identity.hash();
4788 let mut driver = Driver::new(
4789 TransportConfig {
4790 transport_enabled: true,
4791 identity_hash: Some(identity_hash),
4792 prefer_shorter_path: false,
4793 max_paths_per_destination: 1,
4794 },
4795 rx,
4796 tx.clone(),
4797 Box::new(cbs),
4798 );
4799
4800 let info = make_interface_info(1);
4801 driver.engine.register_interface(info.clone());
4802 let (writer, sent) = MockWriter::new();
4803 driver
4804 .interfaces
4805 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4806
4807 driver.transport_identity = Some(identity);
4809 driver.started = time::now() - 10.0;
4810
4811 tx.send(Event::Tick).unwrap();
4812 tx.send(Event::Shutdown).unwrap();
4813 driver.run();
4814
4815 let sent_packets = sent.lock().unwrap();
4817 assert!(
4818 sent_packets.is_empty(),
4819 "No announces should be sent when management is disabled"
4820 );
4821 }
4822
4823 #[test]
4824 fn management_announces_not_emitted_before_delay() {
4825 let (tx, rx) = event::channel();
4826 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4827 let identity = Identity::new(&mut OsRng);
4828 let identity_hash = *identity.hash();
4829 let mut driver = Driver::new(
4830 TransportConfig {
4831 transport_enabled: true,
4832 identity_hash: Some(identity_hash),
4833 prefer_shorter_path: false,
4834 max_paths_per_destination: 1,
4835 },
4836 rx,
4837 tx.clone(),
4838 Box::new(cbs),
4839 );
4840
4841 let info = make_interface_info(1);
4842 driver.engine.register_interface(info.clone());
4843 let (writer, sent) = MockWriter::new();
4844 driver
4845 .interfaces
4846 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4847
4848 driver.management_config.enable_remote_management = true;
4849 driver.transport_identity = Some(identity);
4850 driver.started = time::now();
4852
4853 tx.send(Event::Tick).unwrap();
4854 tx.send(Event::Shutdown).unwrap();
4855 driver.run();
4856
4857 let sent_packets = sent.lock().unwrap();
4858 assert!(sent_packets.is_empty(), "No announces before startup delay");
4859 }
4860
4861 #[test]
4866 fn announce_received_populates_known_destinations() {
4867 let (tx, rx) = event::channel();
4868 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4869 let mut driver = Driver::new(
4870 TransportConfig {
4871 transport_enabled: false,
4872 identity_hash: None,
4873 prefer_shorter_path: false,
4874 max_paths_per_destination: 1,
4875 },
4876 rx,
4877 tx.clone(),
4878 Box::new(cbs),
4879 );
4880 let info = make_interface_info(1);
4881 driver.engine.register_interface(info);
4882 let (writer, _sent) = MockWriter::new();
4883 driver
4884 .interfaces
4885 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4886
4887 let identity = Identity::new(&mut OsRng);
4888 let announce_raw = build_announce_packet(&identity);
4889
4890 let dest_hash =
4891 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
4892
4893 tx.send(Event::Frame {
4894 interface_id: InterfaceId(1),
4895 data: announce_raw,
4896 })
4897 .unwrap();
4898 tx.send(Event::Shutdown).unwrap();
4899 driver.run();
4900
4901 assert!(driver.known_destinations.contains_key(&dest_hash));
4903 let recalled = &driver.known_destinations[&dest_hash];
4904 assert_eq!(recalled.dest_hash.0, dest_hash);
4905 assert_eq!(recalled.identity_hash.0, *identity.hash());
4906 assert_eq!(&recalled.public_key, &identity.get_public_key().unwrap());
4907 assert_eq!(recalled.hops, 1);
4908 }
4909
4910 #[test]
4911 fn query_has_path() {
4912 let (tx, rx) = event::channel();
4913 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4914 let mut driver = Driver::new(
4915 TransportConfig {
4916 transport_enabled: false,
4917 identity_hash: None,
4918 prefer_shorter_path: false,
4919 max_paths_per_destination: 1,
4920 },
4921 rx,
4922 tx.clone(),
4923 Box::new(cbs),
4924 );
4925 let info = make_interface_info(1);
4926 driver.engine.register_interface(info);
4927 let (writer, _sent) = MockWriter::new();
4928 driver
4929 .interfaces
4930 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4931
4932 let (resp_tx, resp_rx) = mpsc::channel();
4934 tx.send(Event::Query(
4935 QueryRequest::HasPath {
4936 dest_hash: [0xAA; 16],
4937 },
4938 resp_tx,
4939 ))
4940 .unwrap();
4941
4942 let identity = Identity::new(&mut OsRng);
4944 let announce_raw = build_announce_packet(&identity);
4945 let dest_hash =
4946 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
4947 tx.send(Event::Frame {
4948 interface_id: InterfaceId(1),
4949 data: announce_raw,
4950 })
4951 .unwrap();
4952
4953 let (resp_tx2, resp_rx2) = mpsc::channel();
4954 tx.send(Event::Query(QueryRequest::HasPath { dest_hash }, resp_tx2))
4955 .unwrap();
4956
4957 tx.send(Event::Shutdown).unwrap();
4958 driver.run();
4959
4960 match resp_rx.recv().unwrap() {
4962 QueryResponse::HasPath(false) => {}
4963 other => panic!("expected HasPath(false), got {:?}", other),
4964 }
4965
4966 match resp_rx2.recv().unwrap() {
4968 QueryResponse::HasPath(true) => {}
4969 other => panic!("expected HasPath(true), got {:?}", other),
4970 }
4971 }
4972
4973 #[test]
4974 fn query_hops_to() {
4975 let (tx, rx) = event::channel();
4976 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4977 let mut driver = Driver::new(
4978 TransportConfig {
4979 transport_enabled: false,
4980 identity_hash: None,
4981 prefer_shorter_path: false,
4982 max_paths_per_destination: 1,
4983 },
4984 rx,
4985 tx.clone(),
4986 Box::new(cbs),
4987 );
4988 let info = make_interface_info(1);
4989 driver.engine.register_interface(info);
4990 let (writer, _sent) = MockWriter::new();
4991 driver
4992 .interfaces
4993 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4994
4995 let identity = Identity::new(&mut OsRng);
4997 let announce_raw = build_announce_packet(&identity);
4998 let dest_hash =
4999 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
5000
5001 tx.send(Event::Frame {
5002 interface_id: InterfaceId(1),
5003 data: announce_raw,
5004 })
5005 .unwrap();
5006
5007 let (resp_tx, resp_rx) = mpsc::channel();
5008 tx.send(Event::Query(QueryRequest::HopsTo { dest_hash }, resp_tx))
5009 .unwrap();
5010 tx.send(Event::Shutdown).unwrap();
5011 driver.run();
5012
5013 match resp_rx.recv().unwrap() {
5014 QueryResponse::HopsTo(Some(1)) => {}
5015 other => panic!("expected HopsTo(Some(1)), got {:?}", other),
5016 }
5017 }
5018
5019 #[test]
5020 fn query_recall_identity() {
5021 let (tx, rx) = event::channel();
5022 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5023 let mut driver = Driver::new(
5024 TransportConfig {
5025 transport_enabled: false,
5026 identity_hash: None,
5027 prefer_shorter_path: false,
5028 max_paths_per_destination: 1,
5029 },
5030 rx,
5031 tx.clone(),
5032 Box::new(cbs),
5033 );
5034 let info = make_interface_info(1);
5035 driver.engine.register_interface(info);
5036 let (writer, _sent) = MockWriter::new();
5037 driver
5038 .interfaces
5039 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5040
5041 let identity = Identity::new(&mut OsRng);
5042 let announce_raw = build_announce_packet(&identity);
5043 let dest_hash =
5044 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
5045
5046 tx.send(Event::Frame {
5047 interface_id: InterfaceId(1),
5048 data: announce_raw,
5049 })
5050 .unwrap();
5051
5052 let (resp_tx, resp_rx) = mpsc::channel();
5054 tx.send(Event::Query(
5055 QueryRequest::RecallIdentity { dest_hash },
5056 resp_tx,
5057 ))
5058 .unwrap();
5059
5060 let (resp_tx2, resp_rx2) = mpsc::channel();
5062 tx.send(Event::Query(
5063 QueryRequest::RecallIdentity {
5064 dest_hash: [0xFF; 16],
5065 },
5066 resp_tx2,
5067 ))
5068 .unwrap();
5069
5070 tx.send(Event::Shutdown).unwrap();
5071 driver.run();
5072
5073 match resp_rx.recv().unwrap() {
5074 QueryResponse::RecallIdentity(Some(recalled)) => {
5075 assert_eq!(recalled.dest_hash.0, dest_hash);
5076 assert_eq!(recalled.identity_hash.0, *identity.hash());
5077 assert_eq!(recalled.public_key, identity.get_public_key().unwrap());
5078 assert_eq!(recalled.hops, 1);
5079 }
5080 other => panic!("expected RecallIdentity(Some(..)), got {:?}", other),
5081 }
5082
5083 match resp_rx2.recv().unwrap() {
5084 QueryResponse::RecallIdentity(None) => {}
5085 other => panic!("expected RecallIdentity(None), got {:?}", other),
5086 }
5087 }
5088
5089 #[test]
5090 fn request_path_sends_packet() {
5091 let (tx, rx) = event::channel();
5092 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5093 let mut driver = Driver::new(
5094 TransportConfig {
5095 transport_enabled: false,
5096 identity_hash: None,
5097 prefer_shorter_path: false,
5098 max_paths_per_destination: 1,
5099 },
5100 rx,
5101 tx.clone(),
5102 Box::new(cbs),
5103 );
5104 let info = make_interface_info(1);
5105 driver.engine.register_interface(info);
5106 let (writer, sent) = MockWriter::new();
5107 driver
5108 .interfaces
5109 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5110
5111 tx.send(Event::RequestPath {
5113 dest_hash: [0xAA; 16],
5114 })
5115 .unwrap();
5116 tx.send(Event::Shutdown).unwrap();
5117 driver.run();
5118
5119 let sent_packets = sent.lock().unwrap();
5121 assert!(
5122 !sent_packets.is_empty(),
5123 "Path request should be sent on wire"
5124 );
5125
5126 let raw = &sent_packets[0];
5128 let flags = rns_core::packet::PacketFlags::unpack(raw[0] & 0x7F);
5129 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
5130 assert_eq!(flags.destination_type, constants::DESTINATION_PLAIN);
5131 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
5132 }
5133
5134 #[test]
5135 fn request_path_includes_transport_id() {
5136 let (tx, rx) = event::channel();
5137 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5138 let mut driver = Driver::new(
5139 TransportConfig {
5140 transport_enabled: true,
5141 identity_hash: Some([0xBB; 16]),
5142 prefer_shorter_path: false,
5143 max_paths_per_destination: 1,
5144 },
5145 rx,
5146 tx.clone(),
5147 Box::new(cbs),
5148 );
5149 let info = make_interface_info(1);
5150 driver.engine.register_interface(info);
5151 let (writer, sent) = MockWriter::new();
5152 driver
5153 .interfaces
5154 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5155
5156 tx.send(Event::RequestPath {
5157 dest_hash: [0xAA; 16],
5158 })
5159 .unwrap();
5160 tx.send(Event::Shutdown).unwrap();
5161 driver.run();
5162
5163 let sent_packets = sent.lock().unwrap();
5164 assert!(!sent_packets.is_empty());
5165
5166 let raw = &sent_packets[0];
5168 if let Ok(packet) = RawPacket::unpack(raw) {
5169 assert_eq!(
5171 packet.data.len(),
5172 48,
5173 "Path request data should be 48 bytes with transport_id"
5174 );
5175 assert_eq!(
5176 &packet.data[..16],
5177 &[0xAA; 16],
5178 "First 16 bytes should be dest_hash"
5179 );
5180 assert_eq!(
5181 &packet.data[16..32],
5182 &[0xBB; 16],
5183 "Next 16 bytes should be transport_id"
5184 );
5185 } else {
5186 panic!("Could not unpack sent packet");
5187 }
5188 }
5189
5190 #[test]
5191 fn path_request_dest_registered() {
5192 let (tx, rx) = event::channel();
5193 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5194 let driver = Driver::new(
5195 TransportConfig {
5196 transport_enabled: false,
5197 identity_hash: None,
5198 prefer_shorter_path: false,
5199 max_paths_per_destination: 1,
5200 },
5201 rx,
5202 tx.clone(),
5203 Box::new(cbs),
5204 );
5205
5206 let expected_dest =
5208 rns_core::destination::destination_hash("rnstransport", &["path", "request"], None);
5209 assert_eq!(driver.path_request_dest, expected_dest);
5210
5211 drop(tx);
5212 }
5213
5214 #[test]
5219 fn register_proof_strategy_event() {
5220 let (tx, rx) = event::channel();
5221 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5222 let mut driver = Driver::new(
5223 TransportConfig {
5224 transport_enabled: false,
5225 identity_hash: None,
5226 prefer_shorter_path: false,
5227 max_paths_per_destination: 1,
5228 },
5229 rx,
5230 tx.clone(),
5231 Box::new(cbs),
5232 );
5233
5234 let dest = [0xAA; 16];
5235 let identity = Identity::new(&mut OsRng);
5236 let prv_key = identity.get_private_key().unwrap();
5237
5238 tx.send(Event::RegisterProofStrategy {
5239 dest_hash: dest,
5240 strategy: rns_core::types::ProofStrategy::ProveAll,
5241 signing_key: Some(prv_key),
5242 })
5243 .unwrap();
5244 tx.send(Event::Shutdown).unwrap();
5245 driver.run();
5246
5247 assert!(driver.proof_strategies.contains_key(&dest));
5248 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
5249 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveAll);
5250 assert!(id_opt.is_some());
5251 }
5252
5253 #[test]
5254 fn register_proof_strategy_prove_none_no_identity() {
5255 let (tx, rx) = event::channel();
5256 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5257 let mut driver = Driver::new(
5258 TransportConfig {
5259 transport_enabled: false,
5260 identity_hash: None,
5261 prefer_shorter_path: false,
5262 max_paths_per_destination: 1,
5263 },
5264 rx,
5265 tx.clone(),
5266 Box::new(cbs),
5267 );
5268
5269 let dest = [0xBB; 16];
5270 tx.send(Event::RegisterProofStrategy {
5271 dest_hash: dest,
5272 strategy: rns_core::types::ProofStrategy::ProveNone,
5273 signing_key: None,
5274 })
5275 .unwrap();
5276 tx.send(Event::Shutdown).unwrap();
5277 driver.run();
5278
5279 assert!(driver.proof_strategies.contains_key(&dest));
5280 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
5281 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveNone);
5282 assert!(id_opt.is_none());
5283 }
5284
5285 #[test]
5286 fn send_outbound_tracks_sent_packets() {
5287 let (tx, rx) = event::channel();
5288 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5289 let mut driver = Driver::new(
5290 TransportConfig {
5291 transport_enabled: false,
5292 identity_hash: None,
5293 prefer_shorter_path: false,
5294 max_paths_per_destination: 1,
5295 },
5296 rx,
5297 tx.clone(),
5298 Box::new(cbs),
5299 );
5300 let info = make_interface_info(1);
5301 driver.engine.register_interface(info);
5302 let (writer, _sent) = MockWriter::new();
5303 driver
5304 .interfaces
5305 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5306
5307 let dest = [0xCC; 16];
5309 let flags = PacketFlags {
5310 header_type: constants::HEADER_1,
5311 context_flag: constants::FLAG_UNSET,
5312 transport_type: constants::TRANSPORT_BROADCAST,
5313 destination_type: constants::DESTINATION_PLAIN,
5314 packet_type: constants::PACKET_TYPE_DATA,
5315 };
5316 let packet =
5317 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test data").unwrap();
5318 let expected_hash = packet.packet_hash;
5319
5320 tx.send(Event::SendOutbound {
5321 raw: packet.raw,
5322 dest_type: constants::DESTINATION_PLAIN,
5323 attached_interface: None,
5324 })
5325 .unwrap();
5326 tx.send(Event::Shutdown).unwrap();
5327 driver.run();
5328
5329 assert!(driver.sent_packets.contains_key(&expected_hash));
5331 let (tracked_dest, _sent_time) = &driver.sent_packets[&expected_hash];
5332 assert_eq!(tracked_dest, &dest);
5333 }
5334
5335 #[test]
5336 fn prove_all_generates_proof_on_delivery() {
5337 let (tx, rx) = event::channel();
5338 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
5339 let mut driver = Driver::new(
5340 TransportConfig {
5341 transport_enabled: false,
5342 identity_hash: None,
5343 prefer_shorter_path: false,
5344 max_paths_per_destination: 1,
5345 },
5346 rx,
5347 tx.clone(),
5348 Box::new(cbs),
5349 );
5350 let info = make_interface_info(1);
5351 driver.engine.register_interface(info);
5352 let (writer, sent) = MockWriter::new();
5353 driver
5354 .interfaces
5355 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5356
5357 let dest = [0xDD; 16];
5359 let identity = Identity::new(&mut OsRng);
5360 let prv_key = identity.get_private_key().unwrap();
5361 driver
5362 .engine
5363 .register_destination(dest, constants::DESTINATION_SINGLE);
5364 driver.proof_strategies.insert(
5365 dest,
5366 (
5367 rns_core::types::ProofStrategy::ProveAll,
5368 Some(Identity::from_private_key(&prv_key)),
5369 ),
5370 );
5371
5372 let flags = PacketFlags {
5374 header_type: constants::HEADER_1,
5375 context_flag: constants::FLAG_UNSET,
5376 transport_type: constants::TRANSPORT_BROADCAST,
5377 destination_type: constants::DESTINATION_SINGLE,
5378 packet_type: constants::PACKET_TYPE_DATA,
5379 };
5380 let packet =
5381 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
5382
5383 tx.send(Event::Frame {
5384 interface_id: InterfaceId(1),
5385 data: packet.raw,
5386 })
5387 .unwrap();
5388 tx.send(Event::Shutdown).unwrap();
5389 driver.run();
5390
5391 assert_eq!(deliveries.lock().unwrap().len(), 1);
5393
5394 let sent_packets = sent.lock().unwrap();
5396 let has_proof = sent_packets.iter().any(|raw| {
5398 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5399 flags.packet_type == constants::PACKET_TYPE_PROOF
5400 });
5401 assert!(
5402 has_proof,
5403 "ProveAll should generate a proof packet: sent {} packets",
5404 sent_packets.len()
5405 );
5406 }
5407
5408 #[test]
5409 fn prove_none_does_not_generate_proof() {
5410 let (tx, rx) = event::channel();
5411 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
5412 let mut driver = Driver::new(
5413 TransportConfig {
5414 transport_enabled: false,
5415 identity_hash: None,
5416 prefer_shorter_path: false,
5417 max_paths_per_destination: 1,
5418 },
5419 rx,
5420 tx.clone(),
5421 Box::new(cbs),
5422 );
5423 let info = make_interface_info(1);
5424 driver.engine.register_interface(info);
5425 let (writer, sent) = MockWriter::new();
5426 driver
5427 .interfaces
5428 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5429
5430 let dest = [0xDD; 16];
5432 driver
5433 .engine
5434 .register_destination(dest, constants::DESTINATION_SINGLE);
5435 driver
5436 .proof_strategies
5437 .insert(dest, (rns_core::types::ProofStrategy::ProveNone, None));
5438
5439 let flags = PacketFlags {
5441 header_type: constants::HEADER_1,
5442 context_flag: constants::FLAG_UNSET,
5443 transport_type: constants::TRANSPORT_BROADCAST,
5444 destination_type: constants::DESTINATION_SINGLE,
5445 packet_type: constants::PACKET_TYPE_DATA,
5446 };
5447 let packet =
5448 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
5449
5450 tx.send(Event::Frame {
5451 interface_id: InterfaceId(1),
5452 data: packet.raw,
5453 })
5454 .unwrap();
5455 tx.send(Event::Shutdown).unwrap();
5456 driver.run();
5457
5458 assert_eq!(deliveries.lock().unwrap().len(), 1);
5460
5461 let sent_packets = sent.lock().unwrap();
5463 let has_proof = sent_packets.iter().any(|raw| {
5464 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5465 flags.packet_type == constants::PACKET_TYPE_PROOF
5466 });
5467 assert!(!has_proof, "ProveNone should not generate a proof packet");
5468 }
5469
5470 #[test]
5471 fn no_proof_strategy_does_not_generate_proof() {
5472 let (tx, rx) = event::channel();
5473 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
5474 let mut driver = Driver::new(
5475 TransportConfig {
5476 transport_enabled: false,
5477 identity_hash: None,
5478 prefer_shorter_path: false,
5479 max_paths_per_destination: 1,
5480 },
5481 rx,
5482 tx.clone(),
5483 Box::new(cbs),
5484 );
5485 let info = make_interface_info(1);
5486 driver.engine.register_interface(info);
5487 let (writer, sent) = MockWriter::new();
5488 driver
5489 .interfaces
5490 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5491
5492 let dest = [0xDD; 16];
5494 driver
5495 .engine
5496 .register_destination(dest, constants::DESTINATION_SINGLE);
5497
5498 let flags = PacketFlags {
5499 header_type: constants::HEADER_1,
5500 context_flag: constants::FLAG_UNSET,
5501 transport_type: constants::TRANSPORT_BROADCAST,
5502 destination_type: constants::DESTINATION_SINGLE,
5503 packet_type: constants::PACKET_TYPE_DATA,
5504 };
5505 let packet =
5506 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
5507
5508 tx.send(Event::Frame {
5509 interface_id: InterfaceId(1),
5510 data: packet.raw,
5511 })
5512 .unwrap();
5513 tx.send(Event::Shutdown).unwrap();
5514 driver.run();
5515
5516 assert_eq!(deliveries.lock().unwrap().len(), 1);
5517
5518 let sent_packets = sent.lock().unwrap();
5519 let has_proof = sent_packets.iter().any(|raw| {
5520 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5521 flags.packet_type == constants::PACKET_TYPE_PROOF
5522 });
5523 assert!(!has_proof, "No proof strategy means no proof generated");
5524 }
5525
5526 #[test]
5527 fn prove_app_calls_callback() {
5528 let (tx, rx) = event::channel();
5529 let proof_requested = Arc::new(Mutex::new(Vec::new()));
5530 let deliveries = Arc::new(Mutex::new(Vec::new()));
5531 let cbs = MockCallbacks {
5532 announces: Arc::new(Mutex::new(Vec::new())),
5533 paths: Arc::new(Mutex::new(Vec::new())),
5534 deliveries: deliveries.clone(),
5535 iface_ups: Arc::new(Mutex::new(Vec::new())),
5536 iface_downs: Arc::new(Mutex::new(Vec::new())),
5537 link_established: Arc::new(Mutex::new(Vec::new())),
5538 link_closed: Arc::new(Mutex::new(Vec::new())),
5539 remote_identified: Arc::new(Mutex::new(Vec::new())),
5540 resources_received: Arc::new(Mutex::new(Vec::new())),
5541 resource_completed: Arc::new(Mutex::new(Vec::new())),
5542 resource_failed: Arc::new(Mutex::new(Vec::new())),
5543 channel_messages: Arc::new(Mutex::new(Vec::new())),
5544 link_data: Arc::new(Mutex::new(Vec::new())),
5545 responses: Arc::new(Mutex::new(Vec::new())),
5546 proofs: Arc::new(Mutex::new(Vec::new())),
5547 proof_requested: proof_requested.clone(),
5548 };
5549
5550 let mut driver = Driver::new(
5551 TransportConfig {
5552 transport_enabled: false,
5553 identity_hash: None,
5554 prefer_shorter_path: false,
5555 max_paths_per_destination: 1,
5556 },
5557 rx,
5558 tx.clone(),
5559 Box::new(cbs),
5560 );
5561 let info = make_interface_info(1);
5562 driver.engine.register_interface(info);
5563 let (writer, sent) = MockWriter::new();
5564 driver
5565 .interfaces
5566 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5567
5568 let dest = [0xDD; 16];
5570 let identity = Identity::new(&mut OsRng);
5571 let prv_key = identity.get_private_key().unwrap();
5572 driver
5573 .engine
5574 .register_destination(dest, constants::DESTINATION_SINGLE);
5575 driver.proof_strategies.insert(
5576 dest,
5577 (
5578 rns_core::types::ProofStrategy::ProveApp,
5579 Some(Identity::from_private_key(&prv_key)),
5580 ),
5581 );
5582
5583 let flags = PacketFlags {
5584 header_type: constants::HEADER_1,
5585 context_flag: constants::FLAG_UNSET,
5586 transport_type: constants::TRANSPORT_BROADCAST,
5587 destination_type: constants::DESTINATION_SINGLE,
5588 packet_type: constants::PACKET_TYPE_DATA,
5589 };
5590 let packet =
5591 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"app test").unwrap();
5592
5593 tx.send(Event::Frame {
5594 interface_id: InterfaceId(1),
5595 data: packet.raw,
5596 })
5597 .unwrap();
5598 tx.send(Event::Shutdown).unwrap();
5599 driver.run();
5600
5601 let prs = proof_requested.lock().unwrap();
5603 assert_eq!(prs.len(), 1);
5604 assert_eq!(prs[0].0, DestHash(dest));
5605
5606 let sent_packets = sent.lock().unwrap();
5608 let has_proof = sent_packets.iter().any(|raw| {
5609 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5610 flags.packet_type == constants::PACKET_TYPE_PROOF
5611 });
5612 assert!(
5613 has_proof,
5614 "ProveApp (callback returns true) should generate a proof"
5615 );
5616 }
5617
5618 #[test]
5619 fn inbound_proof_fires_callback() {
5620 let (tx, rx) = event::channel();
5621 let proofs = Arc::new(Mutex::new(Vec::new()));
5622 let cbs = MockCallbacks {
5623 announces: Arc::new(Mutex::new(Vec::new())),
5624 paths: Arc::new(Mutex::new(Vec::new())),
5625 deliveries: Arc::new(Mutex::new(Vec::new())),
5626 iface_ups: Arc::new(Mutex::new(Vec::new())),
5627 iface_downs: Arc::new(Mutex::new(Vec::new())),
5628 link_established: Arc::new(Mutex::new(Vec::new())),
5629 link_closed: Arc::new(Mutex::new(Vec::new())),
5630 remote_identified: Arc::new(Mutex::new(Vec::new())),
5631 resources_received: Arc::new(Mutex::new(Vec::new())),
5632 resource_completed: Arc::new(Mutex::new(Vec::new())),
5633 resource_failed: Arc::new(Mutex::new(Vec::new())),
5634 channel_messages: Arc::new(Mutex::new(Vec::new())),
5635 link_data: Arc::new(Mutex::new(Vec::new())),
5636 responses: Arc::new(Mutex::new(Vec::new())),
5637 proofs: proofs.clone(),
5638 proof_requested: Arc::new(Mutex::new(Vec::new())),
5639 };
5640
5641 let mut driver = Driver::new(
5642 TransportConfig {
5643 transport_enabled: false,
5644 identity_hash: None,
5645 prefer_shorter_path: false,
5646 max_paths_per_destination: 1,
5647 },
5648 rx,
5649 tx.clone(),
5650 Box::new(cbs),
5651 );
5652 let info = make_interface_info(1);
5653 driver.engine.register_interface(info);
5654 let (writer, _sent) = MockWriter::new();
5655 driver
5656 .interfaces
5657 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5658
5659 let dest = [0xEE; 16];
5661 driver
5662 .engine
5663 .register_destination(dest, constants::DESTINATION_SINGLE);
5664
5665 let tracked_hash = [0x42u8; 32];
5667 let sent_time = time::now() - 0.5; driver.sent_packets.insert(tracked_hash, (dest, sent_time));
5669
5670 let mut proof_data = Vec::new();
5672 proof_data.extend_from_slice(&tracked_hash);
5673 proof_data.extend_from_slice(&[0xAA; 64]); let flags = PacketFlags {
5676 header_type: constants::HEADER_1,
5677 context_flag: constants::FLAG_UNSET,
5678 transport_type: constants::TRANSPORT_BROADCAST,
5679 destination_type: constants::DESTINATION_SINGLE,
5680 packet_type: constants::PACKET_TYPE_PROOF,
5681 };
5682 let packet =
5683 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
5684
5685 tx.send(Event::Frame {
5686 interface_id: InterfaceId(1),
5687 data: packet.raw,
5688 })
5689 .unwrap();
5690 tx.send(Event::Shutdown).unwrap();
5691 driver.run();
5692
5693 let proof_list = proofs.lock().unwrap();
5695 assert_eq!(proof_list.len(), 1);
5696 assert_eq!(proof_list[0].0, DestHash(dest));
5697 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
5698 assert!(
5699 proof_list[0].2 >= 0.4,
5700 "RTT should be approximately 0.5s, got {}",
5701 proof_list[0].2
5702 );
5703
5704 assert!(!driver.sent_packets.contains_key(&tracked_hash));
5706 }
5707
5708 #[test]
5709 fn inbound_proof_for_unknown_packet_is_ignored() {
5710 let (tx, rx) = event::channel();
5711 let proofs = Arc::new(Mutex::new(Vec::new()));
5712 let cbs = MockCallbacks {
5713 announces: Arc::new(Mutex::new(Vec::new())),
5714 paths: Arc::new(Mutex::new(Vec::new())),
5715 deliveries: Arc::new(Mutex::new(Vec::new())),
5716 iface_ups: Arc::new(Mutex::new(Vec::new())),
5717 iface_downs: Arc::new(Mutex::new(Vec::new())),
5718 link_established: Arc::new(Mutex::new(Vec::new())),
5719 link_closed: Arc::new(Mutex::new(Vec::new())),
5720 remote_identified: Arc::new(Mutex::new(Vec::new())),
5721 resources_received: Arc::new(Mutex::new(Vec::new())),
5722 resource_completed: Arc::new(Mutex::new(Vec::new())),
5723 resource_failed: Arc::new(Mutex::new(Vec::new())),
5724 channel_messages: Arc::new(Mutex::new(Vec::new())),
5725 link_data: Arc::new(Mutex::new(Vec::new())),
5726 responses: Arc::new(Mutex::new(Vec::new())),
5727 proofs: proofs.clone(),
5728 proof_requested: Arc::new(Mutex::new(Vec::new())),
5729 };
5730
5731 let mut driver = Driver::new(
5732 TransportConfig {
5733 transport_enabled: false,
5734 identity_hash: None,
5735 prefer_shorter_path: false,
5736 max_paths_per_destination: 1,
5737 },
5738 rx,
5739 tx.clone(),
5740 Box::new(cbs),
5741 );
5742 let info = make_interface_info(1);
5743 driver.engine.register_interface(info);
5744 let (writer, _sent) = MockWriter::new();
5745 driver
5746 .interfaces
5747 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5748
5749 let dest = [0xEE; 16];
5750 driver
5751 .engine
5752 .register_destination(dest, constants::DESTINATION_SINGLE);
5753
5754 let unknown_hash = [0xFF; 32];
5756 let mut proof_data = Vec::new();
5757 proof_data.extend_from_slice(&unknown_hash);
5758 proof_data.extend_from_slice(&[0xAA; 64]);
5759
5760 let flags = PacketFlags {
5761 header_type: constants::HEADER_1,
5762 context_flag: constants::FLAG_UNSET,
5763 transport_type: constants::TRANSPORT_BROADCAST,
5764 destination_type: constants::DESTINATION_SINGLE,
5765 packet_type: constants::PACKET_TYPE_PROOF,
5766 };
5767 let packet =
5768 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
5769
5770 tx.send(Event::Frame {
5771 interface_id: InterfaceId(1),
5772 data: packet.raw,
5773 })
5774 .unwrap();
5775 tx.send(Event::Shutdown).unwrap();
5776 driver.run();
5777
5778 assert!(proofs.lock().unwrap().is_empty());
5780 }
5781
5782 #[test]
5783 fn inbound_proof_with_valid_signature_fires_callback() {
5784 let (tx, rx) = event::channel();
5786 let proofs = Arc::new(Mutex::new(Vec::new()));
5787 let cbs = MockCallbacks {
5788 announces: Arc::new(Mutex::new(Vec::new())),
5789 paths: Arc::new(Mutex::new(Vec::new())),
5790 deliveries: Arc::new(Mutex::new(Vec::new())),
5791 iface_ups: Arc::new(Mutex::new(Vec::new())),
5792 iface_downs: Arc::new(Mutex::new(Vec::new())),
5793 link_established: Arc::new(Mutex::new(Vec::new())),
5794 link_closed: Arc::new(Mutex::new(Vec::new())),
5795 remote_identified: Arc::new(Mutex::new(Vec::new())),
5796 resources_received: Arc::new(Mutex::new(Vec::new())),
5797 resource_completed: Arc::new(Mutex::new(Vec::new())),
5798 resource_failed: Arc::new(Mutex::new(Vec::new())),
5799 channel_messages: Arc::new(Mutex::new(Vec::new())),
5800 link_data: Arc::new(Mutex::new(Vec::new())),
5801 responses: Arc::new(Mutex::new(Vec::new())),
5802 proofs: proofs.clone(),
5803 proof_requested: Arc::new(Mutex::new(Vec::new())),
5804 };
5805
5806 let mut driver = Driver::new(
5807 TransportConfig {
5808 transport_enabled: false,
5809 identity_hash: None,
5810 prefer_shorter_path: false,
5811 max_paths_per_destination: 1,
5812 },
5813 rx,
5814 tx.clone(),
5815 Box::new(cbs),
5816 );
5817 let info = make_interface_info(1);
5818 driver.engine.register_interface(info);
5819 let (writer, _sent) = MockWriter::new();
5820 driver
5821 .interfaces
5822 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5823
5824 let dest = [0xEE; 16];
5825 driver
5826 .engine
5827 .register_destination(dest, constants::DESTINATION_SINGLE);
5828
5829 let identity = Identity::new(&mut OsRng);
5831 let pub_key = identity.get_public_key();
5832 driver.known_destinations.insert(
5833 dest,
5834 crate::destination::AnnouncedIdentity {
5835 dest_hash: DestHash(dest),
5836 identity_hash: IdentityHash(*identity.hash()),
5837 public_key: pub_key.unwrap(),
5838 app_data: None,
5839 hops: 0,
5840 received_at: time::now(),
5841 receiving_interface: InterfaceId(0),
5842 },
5843 );
5844
5845 let tracked_hash = [0x42u8; 32];
5847 let sent_time = time::now() - 0.5;
5848 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
5849
5850 let signature = identity.sign(&tracked_hash).unwrap();
5851 let mut proof_data = Vec::new();
5852 proof_data.extend_from_slice(&tracked_hash);
5853 proof_data.extend_from_slice(&signature);
5854
5855 let flags = PacketFlags {
5856 header_type: constants::HEADER_1,
5857 context_flag: constants::FLAG_UNSET,
5858 transport_type: constants::TRANSPORT_BROADCAST,
5859 destination_type: constants::DESTINATION_SINGLE,
5860 packet_type: constants::PACKET_TYPE_PROOF,
5861 };
5862 let packet =
5863 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
5864
5865 tx.send(Event::Frame {
5866 interface_id: InterfaceId(1),
5867 data: packet.raw,
5868 })
5869 .unwrap();
5870 tx.send(Event::Shutdown).unwrap();
5871 driver.run();
5872
5873 let proof_list = proofs.lock().unwrap();
5875 assert_eq!(proof_list.len(), 1);
5876 assert_eq!(proof_list[0].0, DestHash(dest));
5877 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
5878 }
5879
5880 #[test]
5881 fn inbound_proof_with_invalid_signature_rejected() {
5882 let (tx, rx) = event::channel();
5884 let proofs = Arc::new(Mutex::new(Vec::new()));
5885 let cbs = MockCallbacks {
5886 announces: Arc::new(Mutex::new(Vec::new())),
5887 paths: Arc::new(Mutex::new(Vec::new())),
5888 deliveries: Arc::new(Mutex::new(Vec::new())),
5889 iface_ups: Arc::new(Mutex::new(Vec::new())),
5890 iface_downs: Arc::new(Mutex::new(Vec::new())),
5891 link_established: Arc::new(Mutex::new(Vec::new())),
5892 link_closed: Arc::new(Mutex::new(Vec::new())),
5893 remote_identified: Arc::new(Mutex::new(Vec::new())),
5894 resources_received: Arc::new(Mutex::new(Vec::new())),
5895 resource_completed: Arc::new(Mutex::new(Vec::new())),
5896 resource_failed: Arc::new(Mutex::new(Vec::new())),
5897 channel_messages: Arc::new(Mutex::new(Vec::new())),
5898 link_data: Arc::new(Mutex::new(Vec::new())),
5899 responses: Arc::new(Mutex::new(Vec::new())),
5900 proofs: proofs.clone(),
5901 proof_requested: Arc::new(Mutex::new(Vec::new())),
5902 };
5903
5904 let mut driver = Driver::new(
5905 TransportConfig {
5906 transport_enabled: false,
5907 identity_hash: None,
5908 prefer_shorter_path: false,
5909 max_paths_per_destination: 1,
5910 },
5911 rx,
5912 tx.clone(),
5913 Box::new(cbs),
5914 );
5915 let info = make_interface_info(1);
5916 driver.engine.register_interface(info);
5917 let (writer, _sent) = MockWriter::new();
5918 driver
5919 .interfaces
5920 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5921
5922 let dest = [0xEE; 16];
5923 driver
5924 .engine
5925 .register_destination(dest, constants::DESTINATION_SINGLE);
5926
5927 let identity = Identity::new(&mut OsRng);
5929 let pub_key = identity.get_public_key();
5930 driver.known_destinations.insert(
5931 dest,
5932 crate::destination::AnnouncedIdentity {
5933 dest_hash: DestHash(dest),
5934 identity_hash: IdentityHash(*identity.hash()),
5935 public_key: pub_key.unwrap(),
5936 app_data: None,
5937 hops: 0,
5938 received_at: time::now(),
5939 receiving_interface: InterfaceId(0),
5940 },
5941 );
5942
5943 let tracked_hash = [0x42u8; 32];
5945 let sent_time = time::now() - 0.5;
5946 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
5947
5948 let mut proof_data = Vec::new();
5950 proof_data.extend_from_slice(&tracked_hash);
5951 proof_data.extend_from_slice(&[0xAA; 64]);
5952
5953 let flags = PacketFlags {
5954 header_type: constants::HEADER_1,
5955 context_flag: constants::FLAG_UNSET,
5956 transport_type: constants::TRANSPORT_BROADCAST,
5957 destination_type: constants::DESTINATION_SINGLE,
5958 packet_type: constants::PACKET_TYPE_PROOF,
5959 };
5960 let packet =
5961 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
5962
5963 tx.send(Event::Frame {
5964 interface_id: InterfaceId(1),
5965 data: packet.raw,
5966 })
5967 .unwrap();
5968 tx.send(Event::Shutdown).unwrap();
5969 driver.run();
5970
5971 assert!(proofs.lock().unwrap().is_empty());
5973 }
5974
5975 #[test]
5976 fn proof_data_is_valid_explicit_proof() {
5977 let (tx, rx) = event::channel();
5979 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5980 let mut driver = Driver::new(
5981 TransportConfig {
5982 transport_enabled: false,
5983 identity_hash: None,
5984 prefer_shorter_path: false,
5985 max_paths_per_destination: 1,
5986 },
5987 rx,
5988 tx.clone(),
5989 Box::new(cbs),
5990 );
5991 let info = make_interface_info(1);
5992 driver.engine.register_interface(info);
5993 let (writer, sent) = MockWriter::new();
5994 driver
5995 .interfaces
5996 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5997
5998 let dest = [0xDD; 16];
5999 let identity = Identity::new(&mut OsRng);
6000 let prv_key = identity.get_private_key().unwrap();
6001 driver
6002 .engine
6003 .register_destination(dest, constants::DESTINATION_SINGLE);
6004 driver.proof_strategies.insert(
6005 dest,
6006 (
6007 rns_core::types::ProofStrategy::ProveAll,
6008 Some(Identity::from_private_key(&prv_key)),
6009 ),
6010 );
6011
6012 let flags = PacketFlags {
6013 header_type: constants::HEADER_1,
6014 context_flag: constants::FLAG_UNSET,
6015 transport_type: constants::TRANSPORT_BROADCAST,
6016 destination_type: constants::DESTINATION_SINGLE,
6017 packet_type: constants::PACKET_TYPE_DATA,
6018 };
6019 let data_packet =
6020 RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"verify me").unwrap();
6021 let data_packet_hash = data_packet.packet_hash;
6022
6023 tx.send(Event::Frame {
6024 interface_id: InterfaceId(1),
6025 data: data_packet.raw,
6026 })
6027 .unwrap();
6028 tx.send(Event::Shutdown).unwrap();
6029 driver.run();
6030
6031 let sent_packets = sent.lock().unwrap();
6033 let proof_raw = sent_packets.iter().find(|raw| {
6034 let f = PacketFlags::unpack(raw[0] & 0x7F);
6035 f.packet_type == constants::PACKET_TYPE_PROOF
6036 });
6037 assert!(proof_raw.is_some(), "Should have sent a proof");
6038
6039 let proof_packet = RawPacket::unpack(proof_raw.unwrap()).unwrap();
6040 assert_eq!(
6042 proof_packet.data.len(),
6043 96,
6044 "Explicit proof should be 96 bytes"
6045 );
6046
6047 let result = rns_core::receipt::validate_proof(
6049 &proof_packet.data,
6050 &data_packet_hash,
6051 &Identity::from_private_key(&prv_key), );
6053 assert_eq!(result, rns_core::receipt::ProofResult::Valid);
6054 }
6055
6056 #[test]
6057 fn query_local_destinations_empty() {
6058 let (tx, rx) = event::channel();
6059 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6060 let driver_config = TransportConfig {
6061 transport_enabled: false,
6062 identity_hash: None,
6063 prefer_shorter_path: false,
6064 max_paths_per_destination: 1,
6065 };
6066 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
6067
6068 let (resp_tx, resp_rx) = mpsc::channel();
6069 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx))
6070 .unwrap();
6071 tx.send(Event::Shutdown).unwrap();
6072 driver.run();
6073
6074 match resp_rx.recv().unwrap() {
6075 QueryResponse::LocalDestinations(entries) => {
6076 assert_eq!(entries.len(), 2);
6078 for entry in &entries {
6079 assert_eq!(entry.dest_type, rns_core::constants::DESTINATION_PLAIN);
6080 }
6081 }
6082 other => panic!("expected LocalDestinations, got {:?}", other),
6083 }
6084 }
6085
6086 #[test]
6087 fn query_local_destinations_with_registered() {
6088 let (tx, rx) = event::channel();
6089 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6090 let driver_config = TransportConfig {
6091 transport_enabled: false,
6092 identity_hash: None,
6093 prefer_shorter_path: false,
6094 max_paths_per_destination: 1,
6095 };
6096 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
6097
6098 let dest_hash = [0xAA; 16];
6099 tx.send(Event::RegisterDestination {
6100 dest_hash,
6101 dest_type: rns_core::constants::DESTINATION_SINGLE,
6102 })
6103 .unwrap();
6104
6105 let (resp_tx, resp_rx) = mpsc::channel();
6106 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx))
6107 .unwrap();
6108 tx.send(Event::Shutdown).unwrap();
6109 driver.run();
6110
6111 match resp_rx.recv().unwrap() {
6112 QueryResponse::LocalDestinations(entries) => {
6113 assert_eq!(entries.len(), 3);
6115 assert!(entries.iter().any(|e| e.hash == dest_hash
6116 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
6117 }
6118 other => panic!("expected LocalDestinations, got {:?}", other),
6119 }
6120 }
6121
6122 #[test]
6123 fn query_local_destinations_tracks_link_dest() {
6124 let (tx, rx) = event::channel();
6125 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6126 let driver_config = TransportConfig {
6127 transport_enabled: false,
6128 identity_hash: None,
6129 prefer_shorter_path: false,
6130 max_paths_per_destination: 1,
6131 };
6132 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
6133
6134 let dest_hash = [0xBB; 16];
6135 tx.send(Event::RegisterLinkDestination {
6136 dest_hash,
6137 sig_prv_bytes: [0x11; 32],
6138 sig_pub_bytes: [0x22; 32],
6139 resource_strategy: 0,
6140 })
6141 .unwrap();
6142
6143 let (resp_tx, resp_rx) = mpsc::channel();
6144 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx))
6145 .unwrap();
6146 tx.send(Event::Shutdown).unwrap();
6147 driver.run();
6148
6149 match resp_rx.recv().unwrap() {
6150 QueryResponse::LocalDestinations(entries) => {
6151 assert_eq!(entries.len(), 3);
6153 assert!(entries.iter().any(|e| e.hash == dest_hash
6154 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
6155 }
6156 other => panic!("expected LocalDestinations, got {:?}", other),
6157 }
6158 }
6159
6160 #[test]
6161 fn query_links_empty() {
6162 let (tx, rx) = event::channel();
6163 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6164 let driver_config = TransportConfig {
6165 transport_enabled: false,
6166 identity_hash: None,
6167 prefer_shorter_path: false,
6168 max_paths_per_destination: 1,
6169 };
6170 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
6171
6172 let (resp_tx, resp_rx) = mpsc::channel();
6173 tx.send(Event::Query(QueryRequest::Links, resp_tx)).unwrap();
6174 tx.send(Event::Shutdown).unwrap();
6175 driver.run();
6176
6177 match resp_rx.recv().unwrap() {
6178 QueryResponse::Links(entries) => {
6179 assert!(entries.is_empty());
6180 }
6181 other => panic!("expected Links, got {:?}", other),
6182 }
6183 }
6184
6185 #[test]
6186 fn query_resources_empty() {
6187 let (tx, rx) = event::channel();
6188 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6189 let driver_config = TransportConfig {
6190 transport_enabled: false,
6191 identity_hash: None,
6192 prefer_shorter_path: false,
6193 max_paths_per_destination: 1,
6194 };
6195 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
6196
6197 let (resp_tx, resp_rx) = mpsc::channel();
6198 tx.send(Event::Query(QueryRequest::Resources, resp_tx))
6199 .unwrap();
6200 tx.send(Event::Shutdown).unwrap();
6201 driver.run();
6202
6203 match resp_rx.recv().unwrap() {
6204 QueryResponse::Resources(entries) => {
6205 assert!(entries.is_empty());
6206 }
6207 other => panic!("expected Resources, got {:?}", other),
6208 }
6209 }
6210
6211 #[test]
6212 fn infer_interface_type_from_name() {
6213 assert_eq!(
6214 super::infer_interface_type("TCPServerInterface/Client-1234"),
6215 "TCPServerClientInterface"
6216 );
6217 assert_eq!(
6218 super::infer_interface_type("BackboneInterface/5"),
6219 "BackboneInterface"
6220 );
6221 assert_eq!(
6222 super::infer_interface_type("LocalInterface"),
6223 "LocalServerClientInterface"
6224 );
6225 assert_eq!(
6226 super::infer_interface_type("MyAutoGroup:fe80::1"),
6227 "AutoInterface"
6228 );
6229 }
6230
6231 #[test]
6234 fn test_extract_dest_hash_empty() {
6235 assert_eq!(super::extract_dest_hash(&[]), [0u8; 16]);
6236 }
6237
6238 #[test]
6243 fn send_probe_unknown_dest_returns_none() {
6244 let (tx, rx) = event::channel();
6245 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6246 let mut driver = Driver::new(
6247 TransportConfig {
6248 transport_enabled: false,
6249 identity_hash: None,
6250 prefer_shorter_path: false,
6251 max_paths_per_destination: 1,
6252 },
6253 rx,
6254 tx.clone(),
6255 Box::new(cbs),
6256 );
6257 let info = make_interface_info(1);
6258 driver.engine.register_interface(info);
6259 let (writer, _sent) = MockWriter::new();
6260 driver
6261 .interfaces
6262 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6263
6264 let (resp_tx, resp_rx) = mpsc::channel();
6266 tx.send(Event::Query(
6267 QueryRequest::SendProbe {
6268 dest_hash: [0xAA; 16],
6269 payload_size: 16,
6270 },
6271 resp_tx,
6272 ))
6273 .unwrap();
6274 tx.send(Event::Shutdown).unwrap();
6275 driver.run();
6276
6277 match resp_rx.recv().unwrap() {
6278 QueryResponse::SendProbe(None) => {}
6279 other => panic!("expected SendProbe(None), got {:?}", other),
6280 }
6281 }
6282
6283 #[test]
6284 fn send_probe_known_dest_returns_packet_hash() {
6285 let (tx, rx) = event::channel();
6286 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6287 let mut driver = Driver::new(
6288 TransportConfig {
6289 transport_enabled: false,
6290 identity_hash: None,
6291 prefer_shorter_path: false,
6292 max_paths_per_destination: 1,
6293 },
6294 rx,
6295 tx.clone(),
6296 Box::new(cbs),
6297 );
6298 let info = make_interface_info(1);
6299 driver.engine.register_interface(info);
6300 let (writer, sent) = MockWriter::new();
6301 driver
6302 .interfaces
6303 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6304
6305 let remote_identity = Identity::new(&mut OsRng);
6307 let dest_hash = rns_core::destination::destination_hash(
6308 "rnstransport",
6309 &["probe"],
6310 Some(remote_identity.hash()),
6311 );
6312
6313 let (inject_tx, inject_rx) = mpsc::channel();
6315 tx.send(Event::Query(
6316 QueryRequest::InjectIdentity {
6317 dest_hash,
6318 identity_hash: *remote_identity.hash(),
6319 public_key: remote_identity.get_public_key().unwrap(),
6320 app_data: None,
6321 hops: 1,
6322 received_at: 0.0,
6323 },
6324 inject_tx,
6325 ))
6326 .unwrap();
6327
6328 let (resp_tx, resp_rx) = mpsc::channel();
6330 tx.send(Event::Query(
6331 QueryRequest::SendProbe {
6332 dest_hash,
6333 payload_size: 16,
6334 },
6335 resp_tx,
6336 ))
6337 .unwrap();
6338 tx.send(Event::Shutdown).unwrap();
6339 driver.run();
6340
6341 match inject_rx.recv().unwrap() {
6343 QueryResponse::InjectIdentity(true) => {}
6344 other => panic!("expected InjectIdentity(true), got {:?}", other),
6345 }
6346
6347 match resp_rx.recv().unwrap() {
6349 QueryResponse::SendProbe(Some((packet_hash, _hops))) => {
6350 assert_ne!(packet_hash, [0u8; 32]);
6352 assert!(driver.sent_packets.contains_key(&packet_hash));
6354 let sent_data = sent.lock().unwrap();
6356 assert!(!sent_data.is_empty(), "Probe packet should be sent on wire");
6357 let raw = &sent_data[0];
6359 let flags = PacketFlags::unpack(raw[0] & 0x7F);
6360 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
6361 assert_eq!(flags.destination_type, constants::DESTINATION_SINGLE);
6362 }
6363 other => panic!("expected SendProbe(Some(..)), got {:?}", other),
6364 }
6365 }
6366
6367 #[test]
6368 fn check_proof_not_found_returns_none() {
6369 let (tx, rx) = event::channel();
6370 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6371 let mut driver = Driver::new(
6372 TransportConfig {
6373 transport_enabled: false,
6374 identity_hash: None,
6375 prefer_shorter_path: false,
6376 max_paths_per_destination: 1,
6377 },
6378 rx,
6379 tx.clone(),
6380 Box::new(cbs),
6381 );
6382
6383 let (resp_tx, resp_rx) = mpsc::channel();
6384 tx.send(Event::Query(
6385 QueryRequest::CheckProof {
6386 packet_hash: [0xBB; 32],
6387 },
6388 resp_tx,
6389 ))
6390 .unwrap();
6391 tx.send(Event::Shutdown).unwrap();
6392 driver.run();
6393
6394 match resp_rx.recv().unwrap() {
6395 QueryResponse::CheckProof(None) => {}
6396 other => panic!("expected CheckProof(None), got {:?}", other),
6397 }
6398 }
6399
6400 #[test]
6401 fn check_proof_found_returns_rtt() {
6402 let (tx, rx) = event::channel();
6403 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6404 let mut driver = Driver::new(
6405 TransportConfig {
6406 transport_enabled: false,
6407 identity_hash: None,
6408 prefer_shorter_path: false,
6409 max_paths_per_destination: 1,
6410 },
6411 rx,
6412 tx.clone(),
6413 Box::new(cbs),
6414 );
6415
6416 let packet_hash = [0xCC; 32];
6418 driver
6419 .completed_proofs
6420 .insert(packet_hash, (0.123, time::now()));
6421
6422 let (resp_tx, resp_rx) = mpsc::channel();
6423 tx.send(Event::Query(
6424 QueryRequest::CheckProof { packet_hash },
6425 resp_tx,
6426 ))
6427 .unwrap();
6428 tx.send(Event::Shutdown).unwrap();
6429 driver.run();
6430
6431 match resp_rx.recv().unwrap() {
6432 QueryResponse::CheckProof(Some(rtt)) => {
6433 assert!(
6434 (rtt - 0.123).abs() < 0.001,
6435 "RTT should be ~0.123, got {}",
6436 rtt
6437 );
6438 }
6439 other => panic!("expected CheckProof(Some(..)), got {:?}", other),
6440 }
6441 assert!(!driver.completed_proofs.contains_key(&packet_hash));
6443 }
6444
6445 #[test]
6446 fn inbound_proof_populates_completed_proofs() {
6447 let (tx, rx) = event::channel();
6448 let proofs = Arc::new(Mutex::new(Vec::new()));
6449 let cbs = MockCallbacks {
6450 announces: Arc::new(Mutex::new(Vec::new())),
6451 paths: Arc::new(Mutex::new(Vec::new())),
6452 deliveries: Arc::new(Mutex::new(Vec::new())),
6453 iface_ups: Arc::new(Mutex::new(Vec::new())),
6454 iface_downs: Arc::new(Mutex::new(Vec::new())),
6455 link_established: Arc::new(Mutex::new(Vec::new())),
6456 link_closed: Arc::new(Mutex::new(Vec::new())),
6457 remote_identified: Arc::new(Mutex::new(Vec::new())),
6458 resources_received: Arc::new(Mutex::new(Vec::new())),
6459 resource_completed: Arc::new(Mutex::new(Vec::new())),
6460 resource_failed: Arc::new(Mutex::new(Vec::new())),
6461 channel_messages: Arc::new(Mutex::new(Vec::new())),
6462 link_data: Arc::new(Mutex::new(Vec::new())),
6463 responses: Arc::new(Mutex::new(Vec::new())),
6464 proofs: proofs.clone(),
6465 proof_requested: Arc::new(Mutex::new(Vec::new())),
6466 };
6467
6468 let mut driver = Driver::new(
6469 TransportConfig {
6470 transport_enabled: false,
6471 identity_hash: None,
6472 prefer_shorter_path: false,
6473 max_paths_per_destination: 1,
6474 },
6475 rx,
6476 tx.clone(),
6477 Box::new(cbs),
6478 );
6479 let info = make_interface_info(1);
6480 driver.engine.register_interface(info);
6481 let (writer, sent) = MockWriter::new();
6482 driver
6483 .interfaces
6484 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6485
6486 let dest = [0xDD; 16];
6488 let identity = Identity::new(&mut OsRng);
6489 let prv_key = identity.get_private_key().unwrap();
6490 driver
6491 .engine
6492 .register_destination(dest, constants::DESTINATION_SINGLE);
6493 driver.proof_strategies.insert(
6494 dest,
6495 (
6496 rns_core::types::ProofStrategy::ProveAll,
6497 Some(Identity::from_private_key(&prv_key)),
6498 ),
6499 );
6500
6501 let flags = PacketFlags {
6503 header_type: constants::HEADER_1,
6504 context_flag: constants::FLAG_UNSET,
6505 transport_type: constants::TRANSPORT_BROADCAST,
6506 destination_type: constants::DESTINATION_SINGLE,
6507 packet_type: constants::PACKET_TYPE_DATA,
6508 };
6509 let data_packet = RawPacket::pack(
6510 flags,
6511 0,
6512 &dest,
6513 None,
6514 constants::CONTEXT_NONE,
6515 b"probe data",
6516 )
6517 .unwrap();
6518 let data_packet_hash = data_packet.packet_hash;
6519
6520 driver
6522 .sent_packets
6523 .insert(data_packet_hash, (dest, time::now()));
6524
6525 tx.send(Event::Frame {
6527 interface_id: InterfaceId(1),
6528 data: data_packet.raw,
6529 })
6530 .unwrap();
6531 tx.send(Event::Shutdown).unwrap();
6532 driver.run();
6533
6534 let sent_packets = sent.lock().unwrap();
6536 let proof_packets: Vec<_> = sent_packets
6537 .iter()
6538 .filter(|raw| {
6539 let flags = PacketFlags::unpack(raw[0] & 0x7F);
6540 flags.packet_type == constants::PACKET_TYPE_PROOF
6541 })
6542 .collect();
6543 assert!(!proof_packets.is_empty(), "Should have sent a proof packet");
6544
6545 let proof_raw = proof_packets[0].clone();
6556 drop(sent_packets); let (tx2, rx2) = event::channel();
6560 let proofs2 = Arc::new(Mutex::new(Vec::new()));
6561 let cbs2 = MockCallbacks {
6562 announces: Arc::new(Mutex::new(Vec::new())),
6563 paths: Arc::new(Mutex::new(Vec::new())),
6564 deliveries: Arc::new(Mutex::new(Vec::new())),
6565 iface_ups: Arc::new(Mutex::new(Vec::new())),
6566 iface_downs: Arc::new(Mutex::new(Vec::new())),
6567 link_established: Arc::new(Mutex::new(Vec::new())),
6568 link_closed: Arc::new(Mutex::new(Vec::new())),
6569 remote_identified: Arc::new(Mutex::new(Vec::new())),
6570 resources_received: Arc::new(Mutex::new(Vec::new())),
6571 resource_completed: Arc::new(Mutex::new(Vec::new())),
6572 resource_failed: Arc::new(Mutex::new(Vec::new())),
6573 channel_messages: Arc::new(Mutex::new(Vec::new())),
6574 link_data: Arc::new(Mutex::new(Vec::new())),
6575 responses: Arc::new(Mutex::new(Vec::new())),
6576 proofs: proofs2.clone(),
6577 proof_requested: Arc::new(Mutex::new(Vec::new())),
6578 };
6579 let mut driver2 = Driver::new(
6580 TransportConfig {
6581 transport_enabled: false,
6582 identity_hash: None,
6583 prefer_shorter_path: false,
6584 max_paths_per_destination: 1,
6585 },
6586 rx2,
6587 tx2.clone(),
6588 Box::new(cbs2),
6589 );
6590 let info2 = make_interface_info(1);
6591 driver2.engine.register_interface(info2);
6592 let (writer2, _sent2) = MockWriter::new();
6593 driver2
6594 .interfaces
6595 .insert(InterfaceId(1), make_entry(1, Box::new(writer2), true));
6596
6597 driver2
6599 .sent_packets
6600 .insert(data_packet_hash, (dest, time::now()));
6601
6602 tx2.send(Event::Frame {
6604 interface_id: InterfaceId(1),
6605 data: proof_raw,
6606 })
6607 .unwrap();
6608 tx2.send(Event::Shutdown).unwrap();
6609 driver2.run();
6610
6611 let proof_events = proofs2.lock().unwrap();
6613 assert_eq!(proof_events.len(), 1, "on_proof callback should fire once");
6614 assert_eq!(
6615 proof_events[0].1 .0, data_packet_hash,
6616 "proof should match original packet hash"
6617 );
6618 assert!(proof_events[0].2 >= 0.0, "RTT should be non-negative");
6619
6620 assert!(
6622 driver2.completed_proofs.contains_key(&data_packet_hash),
6623 "completed_proofs should contain the packet hash"
6624 );
6625 let (rtt, _received) = driver2.completed_proofs[&data_packet_hash];
6626 assert!(rtt >= 0.0, "RTT should be non-negative");
6627 }
6628
6629 #[test]
6630 fn interface_stats_includes_probe_responder() {
6631 let (tx, rx) = event::channel();
6632 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6633 let mut driver = Driver::new(
6634 TransportConfig {
6635 transport_enabled: true,
6636 identity_hash: Some([0x42; 16]),
6637 prefer_shorter_path: false,
6638 max_paths_per_destination: 1,
6639 },
6640 rx,
6641 tx.clone(),
6642 Box::new(cbs),
6643 );
6644 let (writer, _sent) = MockWriter::new();
6645 driver
6646 .interfaces
6647 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6648
6649 driver.probe_responder_hash = Some([0xEE; 16]);
6651
6652 let (resp_tx, resp_rx) = mpsc::channel();
6653 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx))
6654 .unwrap();
6655 tx.send(Event::Shutdown).unwrap();
6656 driver.run();
6657
6658 match resp_rx.recv().unwrap() {
6659 QueryResponse::InterfaceStats(stats) => {
6660 assert_eq!(stats.probe_responder, Some([0xEE; 16]));
6661 }
6662 other => panic!("expected InterfaceStats, got {:?}", other),
6663 }
6664 }
6665
6666 #[test]
6667 fn interface_stats_probe_responder_none_when_disabled() {
6668 let (tx, rx) = event::channel();
6669 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6670 let mut driver = Driver::new(
6671 TransportConfig {
6672 transport_enabled: false,
6673 identity_hash: None,
6674 prefer_shorter_path: false,
6675 max_paths_per_destination: 1,
6676 },
6677 rx,
6678 tx.clone(),
6679 Box::new(cbs),
6680 );
6681 let (writer, _sent) = MockWriter::new();
6682 driver
6683 .interfaces
6684 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6685
6686 let (resp_tx, resp_rx) = mpsc::channel();
6687 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx))
6688 .unwrap();
6689 tx.send(Event::Shutdown).unwrap();
6690 driver.run();
6691
6692 match resp_rx.recv().unwrap() {
6693 QueryResponse::InterfaceStats(stats) => {
6694 assert_eq!(stats.probe_responder, None);
6695 }
6696 other => panic!("expected InterfaceStats, got {:?}", other),
6697 }
6698 }
6699
6700 #[test]
6701 fn test_extract_dest_hash_too_short() {
6702 assert_eq!(super::extract_dest_hash(&[0x00, 0x00, 0xAA]), [0u8; 16]);
6704 }
6705
6706 #[test]
6707 fn test_extract_dest_hash_header1() {
6708 let mut raw = vec![0x00, 0x00]; let dest = [0x11; 16];
6711 raw.extend_from_slice(&dest);
6712 raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
6714 }
6715
6716 #[test]
6717 fn test_extract_dest_hash_header2() {
6718 let mut raw = vec![0x40, 0x00]; raw.extend_from_slice(&[0xAA; 16]); let dest = [0x22; 16];
6722 raw.extend_from_slice(&dest); raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
6725 }
6726
6727 #[test]
6728 fn test_extract_dest_hash_header2_too_short() {
6729 let mut raw = vec![0x40, 0x00];
6731 raw.extend_from_slice(&[0xAA; 16]); assert_eq!(super::extract_dest_hash(&raw), [0u8; 16]);
6733 }
6734
6735 #[test]
6736 fn announce_stores_receiving_interface_in_known_destinations() {
6737 let (tx, rx) = event::channel();
6740 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6741 let mut driver = Driver::new(
6742 TransportConfig {
6743 transport_enabled: false,
6744 identity_hash: None,
6745 prefer_shorter_path: false,
6746 max_paths_per_destination: 1,
6747 },
6748 rx,
6749 tx.clone(),
6750 Box::new(cbs),
6751 );
6752 let info = make_interface_info(1);
6753 driver.engine.register_interface(info);
6754 let (writer, _sent) = MockWriter::new();
6755 driver
6756 .interfaces
6757 .insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
6758
6759 let identity = Identity::new(&mut OsRng);
6760 let announce_raw = build_announce_packet(&identity);
6761
6762 tx.send(Event::Frame {
6763 interface_id: InterfaceId(1),
6764 data: announce_raw,
6765 })
6766 .unwrap();
6767 tx.send(Event::Shutdown).unwrap();
6768 driver.run();
6769
6770 assert_eq!(driver.known_destinations.len(), 1);
6772 let (_, announced) = driver.known_destinations.iter().next().unwrap();
6773 assert_eq!(
6774 announced.receiving_interface,
6775 InterfaceId(1),
6776 "receiving_interface should match the interface the announce arrived on"
6777 );
6778 }
6779
6780 #[test]
6781 fn announce_on_different_interfaces_stores_correct_id() {
6782 let (tx, rx) = event::channel();
6784 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6785 let mut driver = Driver::new(
6786 TransportConfig {
6787 transport_enabled: false,
6788 identity_hash: None,
6789 prefer_shorter_path: false,
6790 max_paths_per_destination: 1,
6791 },
6792 rx,
6793 tx.clone(),
6794 Box::new(cbs),
6795 );
6796 for id in [1, 2] {
6798 driver.engine.register_interface(make_interface_info(id));
6799 let (writer, _) = MockWriter::new();
6800 driver
6801 .interfaces
6802 .insert(InterfaceId(id), make_entry(id, Box::new(writer), true));
6803 }
6804
6805 let identity = Identity::new(&mut OsRng);
6806 let announce_raw = build_announce_packet(&identity);
6807
6808 tx.send(Event::Frame {
6810 interface_id: InterfaceId(2),
6811 data: announce_raw,
6812 })
6813 .unwrap();
6814 tx.send(Event::Shutdown).unwrap();
6815 driver.run();
6816
6817 assert_eq!(driver.known_destinations.len(), 1);
6818 let (_, announced) = driver.known_destinations.iter().next().unwrap();
6819 assert_eq!(announced.receiving_interface, InterfaceId(2));
6820 }
6821
6822 #[test]
6823 fn inject_identity_stores_sentinel_interface() {
6824 let (tx, rx) = event::channel();
6827 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6828 let mut driver = Driver::new(
6829 TransportConfig {
6830 transport_enabled: false,
6831 identity_hash: None,
6832 prefer_shorter_path: false,
6833 max_paths_per_destination: 1,
6834 },
6835 rx,
6836 tx.clone(),
6837 Box::new(cbs),
6838 );
6839
6840 let identity = Identity::new(&mut OsRng);
6841 let dest_hash =
6842 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
6843
6844 let (resp_tx, resp_rx) = mpsc::channel();
6845 tx.send(Event::Query(
6846 QueryRequest::InjectIdentity {
6847 dest_hash,
6848 identity_hash: *identity.hash(),
6849 public_key: identity.get_public_key().unwrap(),
6850 app_data: Some(b"restored".to_vec()),
6851 hops: 2,
6852 received_at: 99.0,
6853 },
6854 resp_tx,
6855 ))
6856 .unwrap();
6857 tx.send(Event::Shutdown).unwrap();
6858 driver.run();
6859
6860 match resp_rx.recv().unwrap() {
6861 QueryResponse::InjectIdentity(true) => {}
6862 other => panic!("expected InjectIdentity(true), got {:?}", other),
6863 }
6864
6865 let announced = driver
6866 .known_destinations
6867 .get(&dest_hash)
6868 .expect("identity should be cached");
6869 assert_eq!(
6870 announced.receiving_interface,
6871 InterfaceId(0),
6872 "injected identity should have sentinel InterfaceId(0)"
6873 );
6874 assert_eq!(announced.dest_hash.0, dest_hash);
6875 assert_eq!(announced.identity_hash.0, *identity.hash());
6876 assert_eq!(announced.public_key, identity.get_public_key().unwrap());
6877 assert_eq!(announced.app_data, Some(b"restored".to_vec()));
6878 assert_eq!(announced.hops, 2);
6879 assert_eq!(announced.received_at, 99.0);
6880 }
6881
6882 #[test]
6883 fn inject_identity_overwrites_previous_entry() {
6884 let (tx, rx) = event::channel();
6886 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6887 let mut driver = Driver::new(
6888 TransportConfig {
6889 transport_enabled: false,
6890 identity_hash: None,
6891 prefer_shorter_path: false,
6892 max_paths_per_destination: 1,
6893 },
6894 rx,
6895 tx.clone(),
6896 Box::new(cbs),
6897 );
6898
6899 let identity = Identity::new(&mut OsRng);
6900 let dest_hash =
6901 rns_core::destination::destination_hash("test", &["app"], Some(identity.hash()));
6902
6903 let (resp_tx1, resp_rx1) = mpsc::channel();
6905 tx.send(Event::Query(
6906 QueryRequest::InjectIdentity {
6907 dest_hash,
6908 identity_hash: *identity.hash(),
6909 public_key: identity.get_public_key().unwrap(),
6910 app_data: Some(b"first".to_vec()),
6911 hops: 1,
6912 received_at: 10.0,
6913 },
6914 resp_tx1,
6915 ))
6916 .unwrap();
6917
6918 let (resp_tx2, resp_rx2) = mpsc::channel();
6920 tx.send(Event::Query(
6921 QueryRequest::InjectIdentity {
6922 dest_hash,
6923 identity_hash: *identity.hash(),
6924 public_key: identity.get_public_key().unwrap(),
6925 app_data: Some(b"second".to_vec()),
6926 hops: 3,
6927 received_at: 20.0,
6928 },
6929 resp_tx2,
6930 ))
6931 .unwrap();
6932
6933 tx.send(Event::Shutdown).unwrap();
6934 driver.run();
6935
6936 assert!(matches!(
6937 resp_rx1.recv().unwrap(),
6938 QueryResponse::InjectIdentity(true)
6939 ));
6940 assert!(matches!(
6941 resp_rx2.recv().unwrap(),
6942 QueryResponse::InjectIdentity(true)
6943 ));
6944
6945 let announced = driver.known_destinations.get(&dest_hash).unwrap();
6947 assert_eq!(announced.app_data, Some(b"second".to_vec()));
6948 assert_eq!(announced.hops, 3);
6949 assert_eq!(announced.received_at, 20.0);
6950 }
6951
6952 #[test]
6953 fn re_announce_updates_receiving_interface() {
6954 let (tx, rx) = event::channel();
6957 let (cbs, _, _, _, _, _) = MockCallbacks::new();
6958 let mut driver = Driver::new(
6959 TransportConfig {
6960 transport_enabled: false,
6961 identity_hash: None,
6962 prefer_shorter_path: false,
6963 max_paths_per_destination: 1,
6964 },
6965 rx,
6966 tx.clone(),
6967 Box::new(cbs),
6968 );
6969 for id in [1, 2] {
6970 driver.engine.register_interface(make_interface_info(id));
6971 let (writer, _) = MockWriter::new();
6972 driver
6973 .interfaces
6974 .insert(InterfaceId(id), make_entry(id, Box::new(writer), true));
6975 }
6976
6977 let identity = Identity::new(&mut OsRng);
6978 let announce_raw = build_announce_packet(&identity);
6979
6980 tx.send(Event::Frame {
6982 interface_id: InterfaceId(1),
6983 data: announce_raw.clone(),
6984 })
6985 .unwrap();
6986 let identity2 = Identity::new(&mut OsRng);
6990 let announce_raw2 = build_announce_packet(&identity2);
6991 tx.send(Event::Frame {
6992 interface_id: InterfaceId(2),
6993 data: announce_raw2,
6994 })
6995 .unwrap();
6996 tx.send(Event::Shutdown).unwrap();
6997 driver.run();
6998
6999 assert_eq!(driver.known_destinations.len(), 2);
7001 for (_, announced) in &driver.known_destinations {
7002 assert!(
7004 announced.receiving_interface == InterfaceId(1)
7005 || announced.receiving_interface == InterfaceId(2)
7006 );
7007 }
7008 let ifaces: Vec<_> = driver
7010 .known_destinations
7011 .values()
7012 .map(|a| a.receiving_interface)
7013 .collect();
7014 assert!(ifaces.contains(&InterfaceId(1)));
7015 assert!(ifaces.contains(&InterfaceId(2)));
7016 }
7017
7018 #[test]
7019 fn test_extract_dest_hash_other_flags_preserved() {
7020 let mut raw = vec![0x3F, 0x00];
7023 let dest = [0x33; 16];
7024 raw.extend_from_slice(&dest);
7025 raw.extend_from_slice(&[0xFF; 10]);
7026 assert_eq!(super::extract_dest_hash(&raw), dest);
7027
7028 let mut raw2 = vec![0xFF, 0x00];
7030 raw2.extend_from_slice(&[0xBB; 16]); let dest2 = [0x44; 16];
7032 raw2.extend_from_slice(&dest2);
7033 raw2.extend_from_slice(&[0xFF; 10]);
7034 assert_eq!(super::extract_dest_hash(&raw2), dest2);
7035 }
7036}