1use std::collections::HashMap;
4
5use rns_core::packet::RawPacket;
6use rns_core::transport::tables::PathEntry;
7use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
8use rns_core::transport::TransportEngine;
9use rns_crypto::{OsRng, Rng};
10
11#[cfg(feature = "rns-hooks")]
12use rns_hooks::{
13 create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot,
14};
15
16use crate::event::{
17 BlackholeInfo, Event, EventReceiver, InterfaceStatsResponse,
18 LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest, QueryResponse,
19 RateTableEntry, SingleInterfaceStat,
20};
21use crate::ifac;
22use crate::interface::{InterfaceEntry, InterfaceStats};
23use crate::link_manager::{LinkManager, LinkManagerAction};
24use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
25use crate::time;
26
27#[cfg(feature = "rns-hooks")]
29struct EngineRef<'a> {
30 engine: &'a TransportEngine,
31 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
32 link_manager: &'a LinkManager,
33 now: f64,
34}
35
36#[cfg(feature = "rns-hooks")]
37impl<'a> EngineAccess for EngineRef<'a> {
38 fn has_path(&self, dest: &[u8; 16]) -> bool {
39 self.engine.has_path(dest)
40 }
41 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
42 self.engine.hops_to(dest)
43 }
44 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
45 self.engine.next_hop(dest)
46 }
47 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
48 self.engine.is_blackholed(identity, self.now)
49 }
50 fn interface_name(&self, id: u64) -> Option<String> {
51 self.interfaces
52 .get(&InterfaceId(id))
53 .map(|e| e.info.name.clone())
54 }
55 fn interface_mode(&self, id: u64) -> Option<u8> {
56 self.interfaces
57 .get(&InterfaceId(id))
58 .map(|e| e.info.mode)
59 }
60 fn identity_hash(&self) -> Option<[u8; 16]> {
61 self.engine.identity_hash().copied()
62 }
63 fn announce_rate(&self, id: u64) -> Option<i32> {
64 self.interfaces
65 .get(&InterfaceId(id))
66 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
67 }
68 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
69 use rns_core::link::types::LinkState;
70 self.link_manager.link_state(link_hash).map(|s| match s {
71 LinkState::Pending => 0,
72 LinkState::Handshake => 1,
73 LinkState::Active => 2,
74 LinkState::Stale => 3,
75 LinkState::Closed => 4,
76 })
77 }
78}
79
80#[cfg(any(test, feature = "rns-hooks"))]
85fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
86 let mut dest = [0u8; 16];
87 if raw.is_empty() {
88 return dest;
89 }
90 let is_header2 = raw[0] & 0x40 != 0;
91 let start = if is_header2 { 18 } else { 2 };
92 let end = start + 16;
93 if raw.len() >= end {
94 dest.copy_from_slice(&raw[start..end]);
95 }
96 dest
97}
98
99#[cfg(feature = "rns-hooks")]
101fn run_hook_inner(
102 programs: &mut [rns_hooks::LoadedProgram],
103 hook_manager: &Option<HookManager>,
104 engine_access: &dyn EngineAccess,
105 ctx: &HookContext,
106 now: f64,
107) -> Option<rns_hooks::ExecuteResult> {
108 if programs.is_empty() {
109 return None;
110 }
111 let mgr = hook_manager.as_ref()?;
112 mgr.run_chain(programs, ctx, engine_access, now)
113}
114
115#[cfg(feature = "rns-hooks")]
117fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
118 actions
119 .into_iter()
120 .map(|a| {
121 use rns_hooks::ActionWire;
122 match a {
123 ActionWire::SendOnInterface { interface, raw } => {
124 TransportAction::SendOnInterface {
125 interface: InterfaceId(interface),
126 raw,
127 }
128 }
129 ActionWire::BroadcastOnAllInterfaces { raw, exclude, has_exclude } => {
130 TransportAction::BroadcastOnAllInterfaces {
131 raw,
132 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
133 }
134 }
135 ActionWire::DeliverLocal { destination_hash, raw, packet_hash, receiving_interface } => {
136 TransportAction::DeliverLocal {
137 destination_hash,
138 raw,
139 packet_hash,
140 receiving_interface: InterfaceId(receiving_interface),
141 }
142 }
143 ActionWire::PathUpdated { destination_hash, hops, next_hop, interface } => {
144 TransportAction::PathUpdated {
145 destination_hash,
146 hops,
147 next_hop,
148 interface: InterfaceId(interface),
149 }
150 }
151 ActionWire::CacheAnnounce { packet_hash, raw } => {
152 TransportAction::CacheAnnounce { packet_hash, raw }
153 }
154 ActionWire::TunnelEstablished { tunnel_id, interface } => {
155 TransportAction::TunnelEstablished {
156 tunnel_id,
157 interface: InterfaceId(interface),
158 }
159 }
160 ActionWire::TunnelSynthesize { interface, data, dest_hash } => {
161 TransportAction::TunnelSynthesize {
162 interface: InterfaceId(interface),
163 data,
164 dest_hash,
165 }
166 }
167 ActionWire::ForwardToLocalClients { raw, exclude, has_exclude } => {
168 TransportAction::ForwardToLocalClients {
169 raw,
170 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
171 }
172 }
173 ActionWire::ForwardPlainBroadcast { raw, to_local, exclude, has_exclude } => {
174 TransportAction::ForwardPlainBroadcast {
175 raw,
176 to_local: to_local != 0,
177 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
178 }
179 }
180 ActionWire::AnnounceReceived {
181 destination_hash, identity_hash, public_key,
182 name_hash, random_hash, app_data, hops, receiving_interface,
183 } => {
184 TransportAction::AnnounceReceived {
185 destination_hash,
186 identity_hash,
187 public_key,
188 name_hash,
189 random_hash,
190 app_data,
191 hops,
192 receiving_interface: InterfaceId(receiving_interface),
193 }
194 }
195 }
196 })
197 .collect()
198}
199
200fn infer_interface_type(name: &str) -> String {
204 if name.starts_with("TCPServerInterface") {
205 "TCPServerClientInterface".to_string()
206 } else if name.starts_with("BackboneInterface") {
207 "BackboneInterface".to_string()
208 } else if name.starts_with("LocalInterface") {
209 "LocalServerClientInterface".to_string()
210 } else {
211 "AutoInterface".to_string()
214 }
215}
216
217pub use crate::common::callbacks::Callbacks;
218
219pub struct Driver {
221 pub(crate) engine: TransportEngine,
222 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
223 pub(crate) rng: OsRng,
224 pub(crate) rx: EventReceiver,
225 pub(crate) callbacks: Box<dyn Callbacks>,
226 pub(crate) started: f64,
227 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
228 pub(crate) tunnel_synth_dest: [u8; 16],
230 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
232 pub(crate) link_manager: LinkManager,
234 pub(crate) management_config: crate::management::ManagementConfig,
236 pub(crate) last_management_announce: f64,
238 pub(crate) initial_announce_sent: bool,
240 pub(crate) known_destinations: HashMap<[u8; 16], crate::destination::AnnouncedIdentity>,
242 pub(crate) path_request_dest: [u8; 16],
244 pub(crate) proof_strategies: HashMap<[u8; 16], (rns_core::types::ProofStrategy, Option<rns_crypto::identity::Identity>)>,
247 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
249 pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
251 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
253 pub(crate) holepunch_manager: HolePunchManager,
255 pub(crate) event_tx: crate::event::EventSender,
257 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
259 pub(crate) discovery_required_value: u8,
261 pub(crate) discovery_name_hash: [u8; 10],
263 pub(crate) probe_responder_hash: Option<[u8; 16]>,
265 pub(crate) discover_interfaces: bool,
267 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
269 pub(crate) discovery_cleanup_counter: u32,
271 #[cfg(feature = "rns-hooks")]
273 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
274 #[cfg(feature = "rns-hooks")]
276 pub(crate) hook_manager: Option<HookManager>,
277}
278
279impl Driver {
280 pub fn new(
282 config: TransportConfig,
283 rx: EventReceiver,
284 tx: crate::event::EventSender,
285 callbacks: Box<dyn Callbacks>,
286 ) -> Self {
287 let tunnel_synth_dest = rns_core::destination::destination_hash(
288 "rnstransport",
289 &["tunnel", "synthesize"],
290 None,
291 );
292 let path_request_dest = rns_core::destination::destination_hash(
293 "rnstransport",
294 &["path", "request"],
295 None,
296 );
297 let discovery_name_hash = crate::discovery::discovery_name_hash();
298 let mut engine = TransportEngine::new(config);
299 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
300 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
302 let mut local_destinations = HashMap::new();
305 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
306 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
307 Driver {
308 engine,
309 interfaces: HashMap::new(),
310 rng: OsRng,
311 rx,
312 callbacks,
313 started: time::now(),
314 announce_cache: None,
315 tunnel_synth_dest,
316 transport_identity: None,
317 link_manager: LinkManager::new(),
318 management_config: Default::default(),
319 last_management_announce: 0.0,
320 initial_announce_sent: false,
321 known_destinations: HashMap::new(),
322 path_request_dest,
323 proof_strategies: HashMap::new(),
324 sent_packets: HashMap::new(),
325 completed_proofs: HashMap::new(),
326 local_destinations,
327 holepunch_manager: HolePunchManager::new(vec![], rns_core::holepunch::ProbeProtocol::Rnsp, None),
328 event_tx: tx,
329 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
330 std::env::temp_dir().join("rns-discovered-interfaces")
331 ),
332 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
333 discovery_name_hash,
334 probe_responder_hash: None,
335 discover_interfaces: false,
336 interface_announcer: None,
337 discovery_cleanup_counter: 0,
338 #[cfg(feature = "rns-hooks")]
339 hook_slots: create_hook_slots(),
340 #[cfg(feature = "rns-hooks")]
341 hook_manager: HookManager::new().ok(),
342 }
343 }
344
345 pub fn set_probe_config(&mut self, addrs: Vec<std::net::SocketAddr>, protocol: rns_core::holepunch::ProbeProtocol, device: Option<String>) {
347 self.holepunch_manager = HolePunchManager::new(addrs, protocol, device);
348 }
349
350 pub fn run(&mut self) {
352 loop {
353 let event = match self.rx.recv() {
354 Ok(e) => e,
355 Err(_) => break, };
357
358 match event {
359 Event::Frame { interface_id, data } => {
360 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
362 log::debug!("Announce:frame from iface {} (len={}, flags=0x{:02x})",
363 interface_id.0, data.len(), data[0]);
364 }
365 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
367 entry.stats.rxb += data.len() as u64;
368 entry.stats.rx_packets += 1;
369 }
370
371 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
373 if let Some(ref ifac_state) = entry.ifac {
374 match ifac::unmask_inbound(&data, ifac_state) {
376 Some(unmasked) => unmasked,
377 None => {
378 log::debug!("[{}] IFAC rejected packet", interface_id.0);
379 continue;
380 }
381 }
382 } else {
383 if data.len() > 2 && data[0] & 0x80 == 0x80 {
385 log::debug!("[{}] dropping packet with IFAC flag on non-IFAC interface", interface_id.0);
386 continue;
387 }
388 data
389 }
390 } else {
391 data
392 };
393
394 #[cfg(feature = "rns-hooks")]
396 {
397 let pkt_ctx = rns_hooks::PacketContext {
398 flags: if packet.is_empty() { 0 } else { packet[0] },
399 hops: if packet.len() > 1 { packet[1] } else { 0 },
400 destination_hash: extract_dest_hash(&packet),
401 context: 0,
402 packet_hash: [0; 32],
403 interface_id: interface_id.0,
404 data_offset: 0,
405 data_len: packet.len() as u32,
406 };
407 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &packet };
408 let now = time::now();
409 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
410 {
411 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::PreIngress as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
412 if let Some(ref e) = exec {
413 if !e.injected_actions.is_empty() {
414 let extra = convert_injected_actions(e.injected_actions.clone());
415 self.dispatch_all(extra);
416 }
417 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
418 continue;
419 }
420 }
421 }
422 }
423
424 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
426 let now = time::now();
427 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
428 entry.stats.record_incoming_announce(now);
429 }
430 }
431
432 if let Some(entry) = self.interfaces.get(&interface_id) {
434 self.engine.update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
435 }
436
437 let actions = self.engine.handle_inbound(
438 &packet,
439 interface_id,
440 time::now(),
441 &mut self.rng,
442 );
443
444 #[cfg(feature = "rns-hooks")]
446 {
447 let pkt_ctx2 = rns_hooks::PacketContext {
448 flags: if packet.is_empty() { 0 } else { packet[0] },
449 hops: if packet.len() > 1 { packet[1] } else { 0 },
450 destination_hash: extract_dest_hash(&packet),
451 context: 0,
452 packet_hash: [0; 32],
453 interface_id: interface_id.0,
454 data_offset: 0,
455 data_len: packet.len() as u32,
456 };
457 let ctx = HookContext::Packet { ctx: &pkt_ctx2, raw: &packet };
458 let now = time::now();
459 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
460 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PreDispatch as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
461 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
462 }
463 }
464
465 self.dispatch_all(actions);
466 }
467 Event::Tick => {
468 #[cfg(feature = "rns-hooks")]
470 {
471 let ctx = HookContext::Tick;
472 let now = time::now();
473 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
474 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::Tick as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
475 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
476 }
477 }
478
479 let now = time::now();
480 for (id, entry) in &self.interfaces {
482 self.engine.update_interface_freq(*id, entry.stats.incoming_announce_freq());
483 }
484 let actions = self.engine.tick(now, &mut self.rng);
485 self.dispatch_all(actions);
486 let link_actions = self.link_manager.tick(&mut self.rng);
488 self.dispatch_link_actions(link_actions);
489 {
491 let tx = self.get_event_sender();
492 let hp_actions = self.holepunch_manager.tick(&tx);
493 self.dispatch_holepunch_actions(hp_actions);
494 }
495 self.tick_management_announces(now);
497 self.sent_packets.retain(|_, (_, sent_time)| now - *sent_time < 60.0);
499 self.completed_proofs.retain(|_, (_, received)| now - *received < 120.0);
501
502 self.tick_discovery_announcer(now);
503
504 if self.discover_interfaces {
506 self.discovery_cleanup_counter += 1;
507 if self.discovery_cleanup_counter >= 3600 {
508 self.discovery_cleanup_counter = 0;
509 if let Ok(removed) = self.discovered_interfaces.cleanup() {
510 if removed > 0 {
511 log::info!("Discovery cleanup: removed {} stale entries", removed);
512 }
513 }
514 }
515 }
516 }
517 Event::InterfaceUp(id, new_writer, info) => {
518 let wants_tunnel;
519 if let Some(mut info) = info {
520 log::info!("[{}] dynamic interface registered", id.0);
522 wants_tunnel = info.wants_tunnel;
523 let iface_type = infer_interface_type(&info.name);
524 info.started = time::now();
526 self.engine.register_interface(info.clone());
527 if let Some(writer) = new_writer {
528 self.interfaces.insert(
529 id,
530 InterfaceEntry {
531 id,
532 info,
533 writer,
534 online: true,
535 dynamic: true,
536 ifac: None,
537 stats: InterfaceStats {
538 started: time::now(),
539 ..Default::default()
540 },
541 interface_type: iface_type,
542 },
543 );
544 }
545 self.callbacks.on_interface_up(id);
546 #[cfg(feature = "rns-hooks")]
547 {
548 let ctx = HookContext::Interface { interface_id: id.0 };
549 let now = time::now();
550 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
551 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
552 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
553 }
554 }
555 } else if let Some(entry) = self.interfaces.get_mut(&id) {
556 log::info!("[{}] interface online", id.0);
558 wants_tunnel = entry.info.wants_tunnel;
559 entry.online = true;
560 if let Some(writer) = new_writer {
561 log::info!("[{}] writer refreshed after reconnect", id.0);
562 entry.writer = writer;
563 }
564 self.callbacks.on_interface_up(id);
565 #[cfg(feature = "rns-hooks")]
566 {
567 let ctx = HookContext::Interface { interface_id: id.0 };
568 let now = time::now();
569 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
570 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
571 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
572 }
573 }
574 } else {
575 wants_tunnel = false;
576 }
577
578 if wants_tunnel {
580 self.synthesize_tunnel_for_interface(id);
581 }
582 }
583 Event::InterfaceDown(id) => {
584 if let Some(entry) = self.interfaces.get(&id) {
586 if let Some(tunnel_id) = entry.info.tunnel_id {
587 self.engine.void_tunnel_interface(&tunnel_id);
588 }
589 }
590
591 if let Some(entry) = self.interfaces.get(&id) {
592 if entry.dynamic {
593 log::info!("[{}] dynamic interface removed", id.0);
595 self.engine.deregister_interface(id);
596 self.interfaces.remove(&id);
597 } else {
598 log::info!("[{}] interface offline", id.0);
600 self.interfaces.get_mut(&id).unwrap().online = false;
601 }
602 self.callbacks.on_interface_down(id);
603 #[cfg(feature = "rns-hooks")]
604 {
605 let ctx = HookContext::Interface { interface_id: id.0 };
606 let now = time::now();
607 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
608 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceDown as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
609 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
610 }
611 }
612 }
613 }
614 Event::SendOutbound { raw, dest_type, attached_interface } => {
615 match RawPacket::unpack(&raw) {
616 Ok(packet) => {
617 let is_announce = packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
618 if is_announce {
619 log::debug!("SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
620 &packet.destination_hash[..4], raw.len(), dest_type, attached_interface);
621 }
622 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
624 self.sent_packets.insert(
625 packet.packet_hash,
626 (packet.destination_hash, time::now()),
627 );
628 }
629 let actions = self.engine.handle_outbound(
630 &packet,
631 dest_type,
632 attached_interface,
633 time::now(),
634 );
635 if is_announce {
636 log::debug!("SendOutbound: announce routed to {} actions: {:?}",
637 actions.len(),
638 actions.iter().map(|a| match a {
639 TransportAction::SendOnInterface { interface, .. } => format!("SendOn({})", interface.0),
640 TransportAction::BroadcastOnAllInterfaces { .. } => "BroadcastAll".to_string(),
641 _ => "other".to_string(),
642 }).collect::<Vec<_>>());
643 }
644 self.dispatch_all(actions);
645 }
646 Err(e) => {
647 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
648 }
649 }
650 }
651 Event::RegisterDestination { dest_hash, dest_type } => {
652 self.engine.register_destination(dest_hash, dest_type);
653 self.local_destinations.insert(dest_hash, dest_type);
654 }
655 Event::DeregisterDestination { dest_hash } => {
656 self.engine.deregister_destination(&dest_hash);
657 self.local_destinations.remove(&dest_hash);
658 }
659 Event::Query(request, response_tx) => {
660 let response = self.handle_query_mut(request);
661 let _ = response_tx.send(response);
662 }
663 Event::DeregisterLinkDestination { dest_hash } => {
664 self.link_manager.deregister_link_destination(&dest_hash);
665 }
666 Event::RegisterLinkDestination { dest_hash, sig_prv_bytes, sig_pub_bytes, resource_strategy } => {
667 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
668 let strat = match resource_strategy {
669 1 => crate::link_manager::ResourceStrategy::AcceptAll,
670 2 => crate::link_manager::ResourceStrategy::AcceptApp,
671 _ => crate::link_manager::ResourceStrategy::AcceptNone,
672 };
673 self.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes, strat);
674 self.engine.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
676 self.local_destinations.insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
677 }
678 Event::RegisterRequestHandler { path, allowed_list, handler } => {
679 self.link_manager.register_request_handler(&path, allowed_list, move |link_id, p, data, remote| {
680 handler(link_id, p, data, remote)
681 });
682 }
683 Event::CreateLink { dest_hash, dest_sig_pub_bytes, response_tx } => {
684 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
685 let mtu = self.engine.next_hop_interface(&dest_hash)
686 .and_then(|iface_id| self.interfaces.get(&iface_id))
687 .map(|entry| entry.info.mtu)
688 .unwrap_or(rns_core::constants::MTU as u32);
689 let (link_id, link_actions) = self.link_manager.create_link(
690 &dest_hash, &dest_sig_pub_bytes, hops, mtu, &mut self.rng,
691 );
692 let _ = response_tx.send(link_id);
693 self.dispatch_link_actions(link_actions);
694 }
695 Event::SendRequest { link_id, path, data } => {
696 let link_actions = self.link_manager.send_request(
697 &link_id, &path, &data, &mut self.rng,
698 );
699 self.dispatch_link_actions(link_actions);
700 }
701 Event::IdentifyOnLink { link_id, identity_prv_key } => {
702 let identity = rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
703 let link_actions = self.link_manager.identify(&link_id, &identity, &mut self.rng);
704 self.dispatch_link_actions(link_actions);
705 }
706 Event::TeardownLink { link_id } => {
707 let link_actions = self.link_manager.teardown_link(&link_id);
708 self.dispatch_link_actions(link_actions);
709 }
710 Event::SendResource { link_id, data, metadata } => {
711 let link_actions = self.link_manager.send_resource(
712 &link_id, &data, metadata.as_deref(), &mut self.rng,
713 );
714 self.dispatch_link_actions(link_actions);
715 }
716 Event::SetResourceStrategy { link_id, strategy } => {
717 use crate::link_manager::ResourceStrategy;
718 let strat = match strategy {
719 0 => ResourceStrategy::AcceptNone,
720 1 => ResourceStrategy::AcceptAll,
721 2 => ResourceStrategy::AcceptApp,
722 _ => ResourceStrategy::AcceptNone,
723 };
724 self.link_manager.set_resource_strategy(&link_id, strat);
725 }
726 Event::AcceptResource { link_id, resource_hash, accept } => {
727 let link_actions = self.link_manager.accept_resource(
728 &link_id, &resource_hash, accept, &mut self.rng,
729 );
730 self.dispatch_link_actions(link_actions);
731 }
732 Event::SendChannelMessage { link_id, msgtype, payload } => {
733 let link_actions = self.link_manager.send_channel_message(
734 &link_id, msgtype, &payload, &mut self.rng,
735 );
736 self.dispatch_link_actions(link_actions);
737 }
738 Event::SendOnLink { link_id, data, context } => {
739 let link_actions = self.link_manager.send_on_link(
740 &link_id, &data, context, &mut self.rng,
741 );
742 self.dispatch_link_actions(link_actions);
743 }
744 Event::RequestPath { dest_hash } => {
745 self.handle_request_path(dest_hash);
746 }
747 Event::RegisterProofStrategy { dest_hash, strategy, signing_key } => {
748 let identity = signing_key.map(|key| {
749 rns_crypto::identity::Identity::from_private_key(&key)
750 });
751 self.proof_strategies.insert(dest_hash, (strategy, identity));
752 }
753 Event::ProposeDirectConnect { link_id } => {
754 let derived_key = self.link_manager.get_derived_key(&link_id);
755 if let Some(dk) = derived_key {
756 let tx = self.get_event_sender();
757 let hp_actions = self.holepunch_manager.propose(
758 link_id, &dk, &mut self.rng, &tx,
759 );
760 self.dispatch_holepunch_actions(hp_actions);
761 } else {
762 log::warn!("Cannot propose direct connect: no derived key for link {:02x?}", &link_id[..4]);
763 }
764 }
765 Event::SetDirectConnectPolicy { policy } => {
766 self.holepunch_manager.set_policy(policy);
767 }
768 Event::HolePunchProbeResult { link_id, session_id, observed_addr, socket, probe_server } => {
769 let hp_actions = self.holepunch_manager.handle_probe_result(
770 link_id, session_id, observed_addr, socket, probe_server,
771 );
772 self.dispatch_holepunch_actions(hp_actions);
773 }
774 Event::HolePunchProbeFailed { link_id, session_id } => {
775 let hp_actions = self.holepunch_manager.handle_probe_failed(
776 link_id, session_id,
777 );
778 self.dispatch_holepunch_actions(hp_actions);
779 }
780 Event::LoadHook { name, wasm_bytes, attach_point, priority, response_tx } => {
781 #[cfg(feature = "rns-hooks")]
782 {
783 let result = (|| -> Result<(), String> {
784 let point_idx = crate::config::parse_hook_point(&attach_point)
785 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
786 let mgr = self.hook_manager.as_ref()
787 .ok_or_else(|| "hook manager not available".to_string())?;
788 let program = mgr.compile(name.clone(), &wasm_bytes, priority)
789 .map_err(|e| format!("compile error: {}", e))?;
790 self.hook_slots[point_idx].attach(program);
791 log::info!("Loaded hook '{}' at point {} (priority {})", name, attach_point, priority);
792 Ok(())
793 })();
794 let _ = response_tx.send(result);
795 }
796 #[cfg(not(feature = "rns-hooks"))]
797 {
798 let _ = (name, wasm_bytes, attach_point, priority);
799 let _ = response_tx.send(Err("hooks not enabled".to_string()));
800 }
801 }
802 Event::UnloadHook { name, attach_point, response_tx } => {
803 #[cfg(feature = "rns-hooks")]
804 {
805 let result = (|| -> Result<(), String> {
806 let point_idx = crate::config::parse_hook_point(&attach_point)
807 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
808 match self.hook_slots[point_idx].detach(&name) {
809 Some(_) => {
810 log::info!("Unloaded hook '{}' from point {}", name, attach_point);
811 Ok(())
812 }
813 None => Err(format!("hook '{}' not found at point '{}'", name, attach_point)),
814 }
815 })();
816 let _ = response_tx.send(result);
817 }
818 #[cfg(not(feature = "rns-hooks"))]
819 {
820 let _ = (name, attach_point);
821 let _ = response_tx.send(Err("hooks not enabled".to_string()));
822 }
823 }
824 Event::ReloadHook { name, attach_point, wasm_bytes, response_tx } => {
825 #[cfg(feature = "rns-hooks")]
826 {
827 let result = (|| -> Result<(), String> {
828 let point_idx = crate::config::parse_hook_point(&attach_point)
829 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
830 let old = self.hook_slots[point_idx].detach(&name)
831 .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
832 let priority = old.priority;
833 let mgr = match self.hook_manager.as_ref() {
834 Some(m) => m,
835 None => {
836 self.hook_slots[point_idx].attach(old);
837 return Err("hook manager not available".to_string());
838 }
839 };
840 match mgr.compile(name.clone(), &wasm_bytes, priority) {
841 Ok(program) => {
842 self.hook_slots[point_idx].attach(program);
843 log::info!("Reloaded hook '{}' at point {} (priority {})", name, attach_point, priority);
844 Ok(())
845 }
846 Err(e) => {
847 self.hook_slots[point_idx].attach(old);
848 Err(format!("compile error: {}", e))
849 }
850 }
851 })();
852 let _ = response_tx.send(result);
853 }
854 #[cfg(not(feature = "rns-hooks"))]
855 {
856 let _ = (name, attach_point, wasm_bytes);
857 let _ = response_tx.send(Err("hooks not enabled".to_string()));
858 }
859 }
860 Event::ListHooks { response_tx } => {
861 #[cfg(feature = "rns-hooks")]
862 {
863 let hook_point_names = [
864 "PreIngress", "PreDispatch", "AnnounceReceived", "PathUpdated",
865 "AnnounceRetransmit", "LinkRequestReceived", "LinkEstablished",
866 "LinkClosed", "InterfaceUp", "InterfaceDown", "InterfaceConfigChanged",
867 "SendOnInterface", "BroadcastOnAllInterfaces", "DeliverLocal",
868 "TunnelSynthesize", "Tick",
869 ];
870 let mut infos = Vec::new();
871 for (idx, slot) in self.hook_slots.iter().enumerate() {
872 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
873 for prog in &slot.programs {
874 infos.push(crate::event::HookInfo {
875 name: prog.name.clone(),
876 attach_point: point_name.to_string(),
877 priority: prog.priority,
878 enabled: prog.enabled,
879 consecutive_traps: prog.consecutive_traps,
880 });
881 }
882 }
883 let _ = response_tx.send(infos);
884 }
885 #[cfg(not(feature = "rns-hooks"))]
886 {
887 let _ = response_tx.send(Vec::new());
888 }
889 }
890 Event::InterfaceConfigChanged(id) => {
891 #[cfg(feature = "rns-hooks")]
892 {
893 let ctx = HookContext::Interface { interface_id: id.0 };
894 let now = time::now();
895 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
896 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
897 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
898 }
899 }
900 #[cfg(not(feature = "rns-hooks"))]
901 let _ = id;
902 }
903 Event::Shutdown => break,
904 }
905 }
906 }
907
908 fn handle_query(&self, request: QueryRequest) -> QueryResponse {
910 match request {
911 QueryRequest::InterfaceStats => {
912 let mut interfaces = Vec::new();
913 let mut total_rxb: u64 = 0;
914 let mut total_txb: u64 = 0;
915 for entry in self.interfaces.values() {
916 total_rxb += entry.stats.rxb;
917 total_txb += entry.stats.txb;
918 interfaces.push(SingleInterfaceStat {
919 name: entry.info.name.clone(),
920 status: entry.online,
921 mode: entry.info.mode,
922 rxb: entry.stats.rxb,
923 txb: entry.stats.txb,
924 rx_packets: entry.stats.rx_packets,
925 tx_packets: entry.stats.tx_packets,
926 bitrate: entry.info.bitrate,
927 ifac_size: entry.ifac.as_ref().map(|s| s.size),
928 started: entry.stats.started,
929 ia_freq: entry.stats.incoming_announce_freq(),
930 oa_freq: entry.stats.outgoing_announce_freq(),
931 interface_type: entry.interface_type.clone(),
932 });
933 }
934 interfaces.sort_by(|a, b| a.name.cmp(&b.name));
936 QueryResponse::InterfaceStats(InterfaceStatsResponse {
937 interfaces,
938 transport_id: self.engine.identity_hash().copied(),
939 transport_enabled: self.engine.transport_enabled(),
940 transport_uptime: time::now() - self.started,
941 total_rxb,
942 total_txb,
943 probe_responder: self.probe_responder_hash,
944 })
945 }
946 QueryRequest::PathTable { max_hops } => {
947 let entries: Vec<PathTableEntry> = self
948 .engine
949 .path_table_entries()
950 .filter(|(_, entry)| {
951 max_hops.map_or(true, |max| entry.hops <= max)
952 })
953 .map(|(hash, entry)| {
954 let iface_name = self.interfaces.get(&entry.receiving_interface)
955 .map(|e| e.info.name.clone())
956 .or_else(|| self.engine.interface_info(&entry.receiving_interface)
957 .map(|i| i.name.clone()))
958 .unwrap_or_default();
959 PathTableEntry {
960 hash: *hash,
961 timestamp: entry.timestamp,
962 via: entry.next_hop,
963 hops: entry.hops,
964 expires: entry.expires,
965 interface: entry.receiving_interface,
966 interface_name: iface_name,
967 }
968 })
969 .collect();
970 QueryResponse::PathTable(entries)
971 }
972 QueryRequest::RateTable => {
973 let entries: Vec<RateTableEntry> = self
974 .engine
975 .rate_limiter()
976 .entries()
977 .map(|(hash, entry)| RateTableEntry {
978 hash: *hash,
979 last: entry.last,
980 rate_violations: entry.rate_violations,
981 blocked_until: entry.blocked_until,
982 timestamps: entry.timestamps.clone(),
983 })
984 .collect();
985 QueryResponse::RateTable(entries)
986 }
987 QueryRequest::NextHop { dest_hash } => {
988 let resp = self.engine.next_hop(&dest_hash).map(|next_hop| {
989 NextHopResponse {
990 next_hop,
991 hops: self.engine.hops_to(&dest_hash).unwrap_or(0),
992 interface: self.engine.next_hop_interface(&dest_hash).unwrap_or(InterfaceId(0)),
993 }
994 });
995 QueryResponse::NextHop(resp)
996 }
997 QueryRequest::NextHopIfName { dest_hash } => {
998 let name = self
999 .engine
1000 .next_hop_interface(&dest_hash)
1001 .and_then(|id| self.interfaces.get(&id))
1002 .map(|entry| entry.info.name.clone());
1003 QueryResponse::NextHopIfName(name)
1004 }
1005 QueryRequest::LinkCount => {
1006 QueryResponse::LinkCount(self.engine.link_table_count() + self.link_manager.link_count())
1007 }
1008 QueryRequest::DropPath { .. } => {
1009 QueryResponse::DropPath(false)
1011 }
1012 QueryRequest::DropAllVia { .. } => {
1013 QueryResponse::DropAllVia(0)
1014 }
1015 QueryRequest::DropAnnounceQueues => {
1016 QueryResponse::DropAnnounceQueues
1017 }
1018 QueryRequest::TransportIdentity => {
1019 QueryResponse::TransportIdentity(self.engine.identity_hash().copied())
1020 }
1021 QueryRequest::GetBlackholed => {
1022 let now = time::now();
1023 let entries: Vec<BlackholeInfo> = self.engine.blackholed_entries()
1024 .filter(|(_, e)| e.expires == 0.0 || e.expires > now)
1025 .map(|(hash, entry)| BlackholeInfo {
1026 identity_hash: *hash,
1027 created: entry.created,
1028 expires: entry.expires,
1029 reason: entry.reason.clone(),
1030 })
1031 .collect();
1032 QueryResponse::Blackholed(entries)
1033 }
1034 QueryRequest::BlackholeIdentity { .. }
1035 | QueryRequest::UnblackholeIdentity { .. } => {
1036 QueryResponse::BlackholeResult(false)
1038 }
1039 QueryRequest::InjectPath { .. } => {
1040 QueryResponse::InjectPath(false)
1042 }
1043 QueryRequest::InjectIdentity { .. } => {
1044 QueryResponse::InjectIdentity(false)
1046 }
1047 QueryRequest::HasPath { dest_hash } => {
1048 QueryResponse::HasPath(self.engine.has_path(&dest_hash))
1049 }
1050 QueryRequest::HopsTo { dest_hash } => {
1051 QueryResponse::HopsTo(self.engine.hops_to(&dest_hash))
1052 }
1053 QueryRequest::RecallIdentity { dest_hash } => {
1054 QueryResponse::RecallIdentity(self.known_destinations.get(&dest_hash).cloned())
1055 }
1056 QueryRequest::LocalDestinations => {
1057 let entries: Vec<LocalDestinationEntry> = self
1058 .local_destinations
1059 .iter()
1060 .map(|(hash, dest_type)| LocalDestinationEntry {
1061 hash: *hash,
1062 dest_type: *dest_type,
1063 })
1064 .collect();
1065 QueryResponse::LocalDestinations(entries)
1066 }
1067 QueryRequest::Links => {
1068 QueryResponse::Links(self.link_manager.link_entries())
1069 }
1070 QueryRequest::Resources => {
1071 QueryResponse::Resources(self.link_manager.resource_entries())
1072 }
1073 QueryRequest::DiscoveredInterfaces { only_available, only_transport } => {
1074 let mut interfaces = self.discovered_interfaces.list().unwrap_or_default();
1075 crate::discovery::filter_and_sort_interfaces(
1076 &mut interfaces, only_available, only_transport,
1077 );
1078 QueryResponse::DiscoveredInterfaces(interfaces)
1079 }
1080 QueryRequest::SendProbe { .. } => QueryResponse::SendProbe(None),
1082 QueryRequest::CheckProof { .. } => QueryResponse::CheckProof(None),
1083 }
1084 }
1085
1086 fn handle_query_mut(&mut self, request: QueryRequest) -> QueryResponse {
1088 match request {
1089 QueryRequest::BlackholeIdentity { identity_hash, duration_hours, reason } => {
1090 let now = time::now();
1091 self.engine.blackhole_identity(identity_hash, now, duration_hours, reason);
1092 QueryResponse::BlackholeResult(true)
1093 }
1094 QueryRequest::UnblackholeIdentity { identity_hash } => {
1095 let result = self.engine.unblackhole_identity(&identity_hash);
1096 QueryResponse::UnblackholeResult(result)
1097 }
1098 QueryRequest::DropPath { dest_hash } => {
1099 QueryResponse::DropPath(self.engine.drop_path(&dest_hash))
1100 }
1101 QueryRequest::DropAllVia { transport_hash } => {
1102 QueryResponse::DropAllVia(self.engine.drop_all_via(&transport_hash))
1103 }
1104 QueryRequest::DropAnnounceQueues => {
1105 self.engine.drop_announce_queues();
1106 QueryResponse::DropAnnounceQueues
1107 }
1108 QueryRequest::InjectPath {
1109 dest_hash,
1110 next_hop,
1111 hops,
1112 expires,
1113 interface_name,
1114 packet_hash,
1115 } => {
1116 let iface_id = self
1118 .interfaces
1119 .iter()
1120 .find(|(_, entry)| entry.info.name == interface_name)
1121 .map(|(id, _)| *id);
1122 match iface_id {
1123 Some(id) => {
1124 let entry = PathEntry {
1125 timestamp: time::now(),
1126 next_hop,
1127 hops,
1128 expires,
1129 random_blobs: Vec::new(),
1130 receiving_interface: id,
1131 packet_hash,
1132 announce_raw: None,
1133 };
1134 self.engine.inject_path(dest_hash, entry);
1135 QueryResponse::InjectPath(true)
1136 }
1137 None => QueryResponse::InjectPath(false),
1138 }
1139 }
1140 QueryRequest::InjectIdentity {
1141 dest_hash,
1142 identity_hash,
1143 public_key,
1144 app_data,
1145 hops,
1146 received_at,
1147 } => {
1148 self.known_destinations.insert(
1149 dest_hash,
1150 crate::destination::AnnouncedIdentity {
1151 dest_hash: rns_core::types::DestHash(dest_hash),
1152 identity_hash: rns_core::types::IdentityHash(identity_hash),
1153 public_key,
1154 app_data,
1155 hops,
1156 received_at,
1157 receiving_interface: rns_core::transport::types::InterfaceId(0),
1158 },
1159 );
1160 QueryResponse::InjectIdentity(true)
1161 }
1162 QueryRequest::SendProbe { dest_hash, payload_size } => {
1163 let announced = self.known_destinations.get(&dest_hash).cloned();
1165 match announced {
1166 Some(recalled) => {
1167 let remote_id = rns_crypto::identity::Identity::from_public_key(&recalled.public_key);
1169 let mut payload = vec![0u8; payload_size];
1170 self.rng.fill_bytes(&mut payload);
1171 match remote_id.encrypt(&payload, &mut self.rng) {
1172 Ok(ciphertext) => {
1173 let flags = rns_core::packet::PacketFlags {
1175 header_type: rns_core::constants::HEADER_1,
1176 context_flag: rns_core::constants::FLAG_UNSET,
1177 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1178 destination_type: rns_core::constants::DESTINATION_SINGLE,
1179 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1180 };
1181 match RawPacket::pack(
1182 flags, 0, &dest_hash, None,
1183 rns_core::constants::CONTEXT_NONE, &ciphertext,
1184 ) {
1185 Ok(packet) => {
1186 let packet_hash = packet.packet_hash;
1187 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
1188 self.sent_packets.insert(
1190 packet_hash,
1191 (dest_hash, time::now()),
1192 );
1193 let actions = self.engine.handle_outbound(
1195 &packet,
1196 rns_core::constants::DESTINATION_SINGLE,
1197 None,
1198 time::now(),
1199 );
1200 self.dispatch_all(actions);
1201 log::debug!(
1202 "Sent probe ({} bytes) to {:02x?}",
1203 payload_size, &dest_hash[..4],
1204 );
1205 QueryResponse::SendProbe(Some((packet_hash, hops)))
1206 }
1207 Err(_) => {
1208 log::warn!("Failed to pack probe packet");
1209 QueryResponse::SendProbe(None)
1210 }
1211 }
1212 }
1213 Err(_) => {
1214 log::warn!("Failed to encrypt probe payload");
1215 QueryResponse::SendProbe(None)
1216 }
1217 }
1218 }
1219 None => {
1220 log::debug!("No known identity for probe dest {:02x?}", &dest_hash[..4]);
1221 QueryResponse::SendProbe(None)
1222 }
1223 }
1224 }
1225 QueryRequest::CheckProof { packet_hash } => {
1226 match self.completed_proofs.remove(&packet_hash) {
1227 Some((rtt, _received)) => QueryResponse::CheckProof(Some(rtt)),
1228 None => QueryResponse::CheckProof(None),
1229 }
1230 }
1231 other => self.handle_query(other),
1232 }
1233 }
1234
1235 fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1237 let packet = match RawPacket::unpack(raw) {
1239 Ok(p) => p,
1240 Err(_) => return,
1241 };
1242
1243 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1244 Ok(validated) => {
1245 let iface_id = self
1248 .interfaces
1249 .iter()
1250 .find(|(_, entry)| entry.info.wants_tunnel && entry.online)
1251 .map(|(id, _)| *id);
1252
1253 if let Some(iface) = iface_id {
1254 let now = time::now();
1255 let tunnel_actions =
1256 self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1257 self.dispatch_all(tunnel_actions);
1258 }
1259 }
1260 Err(e) => {
1261 log::debug!("Tunnel synthesis validation failed: {}", e);
1262 }
1263 }
1264 }
1265
1266 fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1270 if let Some(ref identity) = self.transport_identity {
1271 let actions = self.engine.synthesize_tunnel(identity, interface, &mut self.rng);
1272 self.dispatch_all(actions);
1273 }
1274 }
1275
1276 fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1278 let mut data = Vec::with_capacity(48);
1280 data.extend_from_slice(&dest_hash);
1281
1282 if self.engine.transport_enabled() {
1283 if let Some(id_hash) = self.engine.identity_hash() {
1284 data.extend_from_slice(id_hash);
1285 }
1286 }
1287
1288 let mut tag = [0u8; 16];
1290 self.rng.fill_bytes(&mut tag);
1291 data.extend_from_slice(&tag);
1292
1293 let flags = rns_core::packet::PacketFlags {
1295 header_type: rns_core::constants::HEADER_1,
1296 context_flag: rns_core::constants::FLAG_UNSET,
1297 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1298 destination_type: rns_core::constants::DESTINATION_PLAIN,
1299 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1300 };
1301
1302 if let Ok(packet) = RawPacket::pack(
1303 flags, 0, &self.path_request_dest, None,
1304 rns_core::constants::CONTEXT_NONE, &data,
1305 ) {
1306 let actions = self.engine.handle_outbound(
1307 &packet,
1308 rns_core::constants::DESTINATION_PLAIN,
1309 None,
1310 time::now(),
1311 );
1312 self.dispatch_all(actions);
1313 }
1314 }
1315
1316 fn maybe_generate_proof(&mut self, dest_hash: [u8; 16], packet_hash: &[u8; 32]) {
1319 use rns_core::types::ProofStrategy;
1320
1321 let (strategy, identity) = match self.proof_strategies.get(&dest_hash) {
1322 Some((s, id)) => (*s, id.as_ref()),
1323 None => return,
1324 };
1325
1326 let should_prove = match strategy {
1327 ProofStrategy::ProveAll => true,
1328 ProofStrategy::ProveApp => {
1329 self.callbacks.on_proof_requested(
1330 rns_core::types::DestHash(dest_hash),
1331 rns_core::types::PacketHash(*packet_hash),
1332 )
1333 }
1334 ProofStrategy::ProveNone => false,
1335 };
1336
1337 if !should_prove {
1338 return;
1339 }
1340
1341 let identity = match identity {
1342 Some(id) => id,
1343 None => {
1344 log::warn!("Cannot generate proof for {:02x?}: no signing key", &dest_hash[..4]);
1345 return;
1346 }
1347 };
1348
1349 let signature = match identity.sign(packet_hash) {
1351 Ok(sig) => sig,
1352 Err(e) => {
1353 log::warn!("Failed to sign proof for {:02x?}: {:?}", &dest_hash[..4], e);
1354 return;
1355 }
1356 };
1357
1358 let mut proof_data = Vec::with_capacity(96);
1360 proof_data.extend_from_slice(packet_hash);
1361 proof_data.extend_from_slice(&signature);
1362
1363 let mut proof_dest = [0u8; 16];
1369 proof_dest.copy_from_slice(&packet_hash[..16]);
1370
1371 let flags = rns_core::packet::PacketFlags {
1372 header_type: rns_core::constants::HEADER_1,
1373 context_flag: rns_core::constants::FLAG_UNSET,
1374 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1375 destination_type: rns_core::constants::DESTINATION_SINGLE,
1376 packet_type: rns_core::constants::PACKET_TYPE_PROOF,
1377 };
1378
1379 if let Ok(packet) = RawPacket::pack(
1380 flags, 0, &proof_dest, None,
1381 rns_core::constants::CONTEXT_NONE, &proof_data,
1382 ) {
1383 let actions = self.engine.handle_outbound(
1384 &packet,
1385 rns_core::constants::DESTINATION_SINGLE,
1386 None,
1387 time::now(),
1388 );
1389 self.dispatch_all(actions);
1390 log::debug!("Generated proof for packet on dest {:02x?}", &dest_hash[..4]);
1391 }
1392 }
1393
1394 fn handle_inbound_proof(&mut self, dest_hash: [u8; 16], proof_data: &[u8], _raw_packet_hash: &[u8; 32]) {
1396 if proof_data.len() < 96 {
1398 log::debug!("Proof too short for explicit proof: {} bytes", proof_data.len());
1399 return;
1400 }
1401
1402 let mut tracked_hash = [0u8; 32];
1403 tracked_hash.copy_from_slice(&proof_data[..32]);
1404
1405 let signature = &proof_data[32..96];
1406
1407 if let Some((tracked_dest, sent_time)) = self.sent_packets.remove(&tracked_hash) {
1409 if let Some(announced) = self.known_destinations.get(&tracked_dest) {
1412 let identity = rns_crypto::identity::Identity::from_public_key(&announced.public_key);
1413 let mut sig = [0u8; 64];
1414 sig.copy_from_slice(signature);
1415 if !identity.verify(&sig, &tracked_hash) {
1416 log::debug!(
1417 "Proof signature invalid for {:02x?}",
1418 &tracked_hash[..4],
1419 );
1420 return;
1421 }
1422 } else {
1423 log::debug!(
1424 "No known identity for dest {:02x?}, accepting proof without signature check",
1425 &tracked_dest[..4],
1426 );
1427 }
1428
1429 let now = time::now();
1430 let rtt = now - sent_time;
1431 log::debug!(
1432 "Proof received for {:02x?} rtt={:.3}s",
1433 &tracked_hash[..4], rtt,
1434 );
1435 self.completed_proofs.insert(tracked_hash, (rtt, now));
1436 self.callbacks.on_proof(
1437 rns_core::types::DestHash(tracked_dest),
1438 rns_core::types::PacketHash(tracked_hash),
1439 rtt,
1440 );
1441 } else {
1442 log::debug!(
1443 "Proof for unknown packet {:02x?} on dest {:02x?}",
1444 &tracked_hash[..4], &dest_hash[..4],
1445 );
1446 }
1447 }
1448
1449 fn dispatch_all(&mut self, actions: Vec<TransportAction>) {
1451 #[cfg(feature = "rns-hooks")]
1452 let mut hook_injected: Vec<TransportAction> = Vec::new();
1453
1454 for action in actions {
1455 match action {
1456 TransportAction::SendOnInterface { interface, raw } => {
1457 #[cfg(feature = "rns-hooks")]
1458 {
1459 let pkt_ctx = rns_hooks::PacketContext {
1460 flags: if raw.is_empty() { 0 } else { raw[0] },
1461 hops: if raw.len() > 1 { raw[1] } else { 0 },
1462 destination_hash: extract_dest_hash(&raw),
1463 context: 0,
1464 packet_hash: [0; 32],
1465 interface_id: interface.0,
1466 data_offset: 0,
1467 data_len: raw.len() as u32,
1468 };
1469 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1470 let now = time::now();
1471 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1472 {
1473 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::SendOnInterface as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1474 if let Some(ref e) = exec {
1475 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1476 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1477 }
1478 }
1479 }
1480 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1481 if is_announce {
1482 log::debug!("Announce:dispatching to iface {} (len={}, online={})",
1483 interface.0, raw.len(),
1484 self.interfaces.get(&interface).map(|e| e.online).unwrap_or(false));
1485 }
1486 if let Some(entry) = self.interfaces.get_mut(&interface) {
1487 if entry.online {
1488 let data = if let Some(ref ifac_state) = entry.ifac {
1489 ifac::mask_outbound(&raw, ifac_state)
1490 } else {
1491 raw
1492 };
1493 entry.stats.txb += data.len() as u64;
1495 entry.stats.tx_packets += 1;
1496 if is_announce {
1497 entry.stats.record_outgoing_announce(time::now());
1498 }
1499 if let Err(e) = entry.writer.send_frame(&data) {
1500 log::warn!("[{}] send failed: {}", entry.info.id.0, e);
1501 } else if is_announce {
1502 let header_type = (data[0] >> 6) & 0x03;
1505 let dest_start = if header_type == 1 { 18usize } else { 2usize };
1506 let dest_preview = if data.len() >= dest_start + 4 {
1507 format!("{:02x?}", &data[dest_start..dest_start+4])
1508 } else {
1509 "??".into()
1510 };
1511 log::debug!("Announce:SENT on iface {} (len={}, h={}, dest=[{}])",
1512 interface.0, data.len(), header_type, dest_preview);
1513 }
1514 }
1515 }
1516 }
1517 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1518 #[cfg(feature = "rns-hooks")]
1519 {
1520 let pkt_ctx = rns_hooks::PacketContext {
1521 flags: if raw.is_empty() { 0 } else { raw[0] },
1522 hops: if raw.len() > 1 { raw[1] } else { 0 },
1523 destination_hash: extract_dest_hash(&raw),
1524 context: 0,
1525 packet_hash: [0; 32],
1526 interface_id: 0,
1527 data_offset: 0,
1528 data_len: raw.len() as u32,
1529 };
1530 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1531 let now = time::now();
1532 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1533 {
1534 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::BroadcastOnAllInterfaces as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1535 if let Some(ref e) = exec {
1536 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1537 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1538 }
1539 }
1540 }
1541 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1542 for entry in self.interfaces.values_mut() {
1543 if entry.online && Some(entry.id) != exclude {
1544 let data = if let Some(ref ifac_state) = entry.ifac {
1545 ifac::mask_outbound(&raw, ifac_state)
1546 } else {
1547 raw.clone()
1548 };
1549 entry.stats.txb += data.len() as u64;
1551 entry.stats.tx_packets += 1;
1552 if is_announce {
1553 entry.stats.record_outgoing_announce(time::now());
1554 }
1555 if let Err(e) = entry.writer.send_frame(&data) {
1556 log::warn!("[{}] broadcast failed: {}", entry.info.id.0, e);
1557 }
1558 }
1559 }
1560 }
1561 TransportAction::DeliverLocal {
1562 destination_hash,
1563 raw,
1564 packet_hash,
1565 receiving_interface,
1566 } => {
1567 #[cfg(feature = "rns-hooks")]
1568 {
1569 let pkt_ctx = rns_hooks::PacketContext {
1570 flags: 0,
1571 hops: 0,
1572 destination_hash,
1573 context: 0,
1574 packet_hash,
1575 interface_id: receiving_interface.0,
1576 data_offset: 0,
1577 data_len: raw.len() as u32,
1578 };
1579 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1580 let now = time::now();
1581 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1582 {
1583 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::DeliverLocal as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1584 if let Some(ref e) = exec {
1585 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1586 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1587 }
1588 }
1589 }
1590 if destination_hash == self.tunnel_synth_dest {
1591 self.handle_tunnel_synth_delivery(&raw);
1593 } else if destination_hash == self.path_request_dest {
1594 if let Ok(packet) = RawPacket::unpack(&raw) {
1596 let actions = self.engine.handle_path_request(
1597 &packet.data,
1598 InterfaceId(0), time::now(),
1600 );
1601 self.dispatch_all(actions);
1602 }
1603 } else if self.link_manager.is_link_destination(&destination_hash) {
1604 let link_actions = self.link_manager.handle_local_delivery(
1606 destination_hash, &raw, packet_hash, receiving_interface, &mut self.rng,
1607 );
1608 if link_actions.is_empty() {
1609 if let Ok(packet) = RawPacket::unpack(&raw) {
1613 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1614 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1615 continue;
1616 }
1617 }
1618 self.maybe_generate_proof(destination_hash, &packet_hash);
1619 self.callbacks.on_local_delivery(
1620 rns_core::types::DestHash(destination_hash),
1621 raw,
1622 rns_core::types::PacketHash(packet_hash),
1623 );
1624 } else {
1625 self.dispatch_link_actions(link_actions);
1626 }
1627 } else {
1628 if let Ok(packet) = RawPacket::unpack(&raw) {
1630 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1631 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1632 continue;
1633 }
1634 }
1635
1636 self.maybe_generate_proof(destination_hash, &packet_hash);
1638
1639 self.callbacks
1640 .on_local_delivery(
1641 rns_core::types::DestHash(destination_hash),
1642 raw,
1643 rns_core::types::PacketHash(packet_hash),
1644 );
1645 }
1646 }
1647 TransportAction::AnnounceReceived {
1648 destination_hash,
1649 identity_hash,
1650 public_key,
1651 name_hash,
1652 app_data,
1653 hops,
1654 receiving_interface,
1655 ..
1656 } => {
1657 #[cfg(feature = "rns-hooks")]
1658 {
1659 let ctx = HookContext::Announce {
1660 destination_hash,
1661 hops,
1662 interface_id: receiving_interface.0,
1663 };
1664 let now = time::now();
1665 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1666 {
1667 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1668 if let Some(ref e) = exec {
1669 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1670 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1671 }
1672 }
1673 }
1674
1675 if name_hash == self.discovery_name_hash {
1679 if self.discover_interfaces {
1680 if let Some(ref app_data) = app_data {
1681 if let Some(mut discovered) = crate::discovery::parse_interface_announce(
1682 app_data,
1683 &identity_hash,
1684 hops,
1685 self.discovery_required_value,
1686 ) {
1687 if let Ok(Some(existing)) = self.discovered_interfaces.load(&discovered.discovery_hash) {
1689 discovered.discovered = existing.discovered;
1690 discovered.heard_count = existing.heard_count + 1;
1691 }
1692 if let Err(e) = self.discovered_interfaces.store(&discovered) {
1693 log::warn!("Failed to store discovered interface: {}", e);
1694 } else {
1695 log::debug!(
1696 "Discovered interface '{}' ({}) at {}:{} [stamp={}]",
1697 discovered.name,
1698 discovered.interface_type,
1699 discovered.reachable_on.as_deref().unwrap_or("?"),
1700 discovered.port.map(|p| p.to_string()).unwrap_or_else(|| "?".into()),
1701 discovered.stamp_value,
1702 );
1703 }
1704 }
1705 }
1706 }
1707 }
1709
1710 let announced = crate::destination::AnnouncedIdentity {
1712 dest_hash: rns_core::types::DestHash(destination_hash),
1713 identity_hash: rns_core::types::IdentityHash(identity_hash),
1714 public_key,
1715 app_data: app_data.clone(),
1716 hops,
1717 received_at: time::now(),
1718 receiving_interface,
1719 };
1720 self.known_destinations.insert(destination_hash, announced.clone());
1721 log::info!(
1722 "Announce:validated dest={:02x}{:02x}{:02x}{:02x}.. hops={}",
1723 destination_hash[0], destination_hash[1], destination_hash[2], destination_hash[3],
1724 hops,
1725 );
1726 self.callbacks.on_announce(announced);
1727 }
1728 TransportAction::PathUpdated {
1729 destination_hash,
1730 hops,
1731 interface,
1732 ..
1733 } => {
1734 #[cfg(feature = "rns-hooks")]
1735 {
1736 let ctx = HookContext::Announce {
1737 destination_hash,
1738 hops,
1739 interface_id: interface.0,
1740 };
1741 let now = time::now();
1742 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1743 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PathUpdated as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1744 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1745 }
1746 }
1747 #[cfg(not(feature = "rns-hooks"))]
1748 let _ = interface;
1749
1750 self.callbacks.on_path_updated(rns_core::types::DestHash(destination_hash), hops);
1751 }
1752 TransportAction::ForwardToLocalClients { raw, exclude } => {
1753 for entry in self.interfaces.values_mut() {
1754 if entry.online
1755 && entry.info.is_local_client
1756 && Some(entry.id) != exclude
1757 {
1758 let data = if let Some(ref ifac_state) = entry.ifac {
1759 ifac::mask_outbound(&raw, ifac_state)
1760 } else {
1761 raw.clone()
1762 };
1763 entry.stats.txb += data.len() as u64;
1764 entry.stats.tx_packets += 1;
1765 if let Err(e) = entry.writer.send_frame(&data) {
1766 log::warn!("[{}] forward to local client failed: {}", entry.info.id.0, e);
1767 }
1768 }
1769 }
1770 }
1771 TransportAction::ForwardPlainBroadcast { raw, to_local, exclude } => {
1772 for entry in self.interfaces.values_mut() {
1773 if entry.online
1774 && entry.info.is_local_client == to_local
1775 && Some(entry.id) != exclude
1776 {
1777 let data = if let Some(ref ifac_state) = entry.ifac {
1778 ifac::mask_outbound(&raw, ifac_state)
1779 } else {
1780 raw.clone()
1781 };
1782 entry.stats.txb += data.len() as u64;
1783 entry.stats.tx_packets += 1;
1784 if let Err(e) = entry.writer.send_frame(&data) {
1785 log::warn!("[{}] forward plain broadcast failed: {}", entry.info.id.0, e);
1786 }
1787 }
1788 }
1789 }
1790 TransportAction::CacheAnnounce { packet_hash, raw } => {
1791 if let Some(ref cache) = self.announce_cache {
1792 if let Err(e) = cache.store(&packet_hash, &raw, None) {
1793 log::warn!("Failed to cache announce: {}", e);
1794 }
1795 }
1796 }
1797 TransportAction::TunnelSynthesize { interface, data, dest_hash } => {
1798 #[cfg(feature = "rns-hooks")]
1799 {
1800 let pkt_ctx = rns_hooks::PacketContext {
1801 flags: 0,
1802 hops: 0,
1803 destination_hash: dest_hash,
1804 context: 0,
1805 packet_hash: [0; 32],
1806 interface_id: interface.0,
1807 data_offset: 0,
1808 data_len: data.len() as u32,
1809 };
1810 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &data };
1811 let now = time::now();
1812 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1813 {
1814 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::TunnelSynthesize as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1815 if let Some(ref e) = exec {
1816 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1817 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1818 }
1819 }
1820 }
1821 let flags = rns_core::packet::PacketFlags {
1823 header_type: rns_core::constants::HEADER_1,
1824 context_flag: rns_core::constants::FLAG_UNSET,
1825 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1826 destination_type: rns_core::constants::DESTINATION_PLAIN,
1827 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1828 };
1829 if let Ok(packet) = rns_core::packet::RawPacket::pack(
1830 flags, 0, &dest_hash, None,
1831 rns_core::constants::CONTEXT_NONE, &data,
1832 ) {
1833 if let Some(entry) = self.interfaces.get_mut(&interface) {
1834 if entry.online {
1835 let raw = if let Some(ref ifac_state) = entry.ifac {
1836 ifac::mask_outbound(&packet.raw, ifac_state)
1837 } else {
1838 packet.raw
1839 };
1840 entry.stats.txb += raw.len() as u64;
1841 entry.stats.tx_packets += 1;
1842 if let Err(e) = entry.writer.send_frame(&raw) {
1843 log::warn!("[{}] tunnel synthesize send failed: {}", entry.info.id.0, e);
1844 }
1845 }
1846 }
1847 }
1848 }
1849 TransportAction::TunnelEstablished { tunnel_id, interface } => {
1850 log::info!("Tunnel established: {:02x?} on interface {}", &tunnel_id[..4], interface.0);
1851 }
1852 TransportAction::AnnounceRetransmit { destination_hash, hops, interface } => {
1853 #[cfg(feature = "rns-hooks")]
1854 {
1855 let ctx = HookContext::Announce {
1856 destination_hash,
1857 hops,
1858 interface_id: interface.map(|i| i.0).unwrap_or(0),
1859 };
1860 let now = time::now();
1861 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1862 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceRetransmit as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1863 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1864 }
1865 }
1866 #[cfg(not(feature = "rns-hooks"))]
1867 {
1868 let _ = (destination_hash, hops, interface);
1869 }
1870 }
1871 TransportAction::LinkRequestReceived { link_id, destination_hash: _, receiving_interface } => {
1872 #[cfg(feature = "rns-hooks")]
1873 {
1874 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
1875 let now = time::now();
1876 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1877 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1878 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1879 }
1880 }
1881 #[cfg(not(feature = "rns-hooks"))]
1882 {
1883 let _ = (link_id, receiving_interface);
1884 }
1885 }
1886 TransportAction::LinkEstablished { link_id, interface } => {
1887 #[cfg(feature = "rns-hooks")]
1888 {
1889 let ctx = HookContext::Link { link_id, interface_id: interface.0 };
1890 let now = time::now();
1891 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1892 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1893 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1894 }
1895 }
1896 #[cfg(not(feature = "rns-hooks"))]
1897 {
1898 let _ = (link_id, interface);
1899 }
1900 }
1901 TransportAction::LinkClosed { link_id } => {
1902 #[cfg(feature = "rns-hooks")]
1903 {
1904 let ctx = HookContext::Link { link_id, interface_id: 0 };
1905 let now = time::now();
1906 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1907 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1908 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1909 }
1910 }
1911 #[cfg(not(feature = "rns-hooks"))]
1912 {
1913 let _ = link_id;
1914 }
1915 }
1916 }
1917 }
1918
1919 #[cfg(feature = "rns-hooks")]
1921 if !hook_injected.is_empty() {
1922 self.dispatch_all(hook_injected);
1923 }
1924 }
1925
1926 fn dispatch_link_actions(&mut self, actions: Vec<LinkManagerAction>) {
1928 #[cfg(feature = "rns-hooks")]
1929 let mut hook_injected: Vec<TransportAction> = Vec::new();
1930
1931 for action in actions {
1932 match action {
1933 LinkManagerAction::SendPacket { raw, dest_type, attached_interface } => {
1934 match RawPacket::unpack(&raw) {
1936 Ok(packet) => {
1937 let transport_actions = self.engine.handle_outbound(
1938 &packet,
1939 dest_type,
1940 attached_interface,
1941 time::now(),
1942 );
1943 self.dispatch_all(transport_actions);
1944 }
1945 Err(e) => {
1946 log::warn!("LinkManager SendPacket: failed to unpack: {:?}", e);
1947 }
1948 }
1949 }
1950 LinkManagerAction::LinkEstablished { link_id, dest_hash, rtt, is_initiator } => {
1951 #[cfg(feature = "rns-hooks")]
1952 {
1953 let ctx = HookContext::Link { link_id, interface_id: 0 };
1954 let now = time::now();
1955 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1956 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1957 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1958 }
1959 }
1960 log::info!(
1961 "Link established: {:02x?} rtt={:.3}s initiator={}",
1962 &link_id[..4], rtt, is_initiator,
1963 );
1964 self.callbacks.on_link_established(
1965 rns_core::types::LinkId(link_id),
1966 rns_core::types::DestHash(dest_hash),
1967 rtt,
1968 is_initiator,
1969 );
1970 }
1971 LinkManagerAction::LinkClosed { link_id, reason } => {
1972 #[cfg(feature = "rns-hooks")]
1973 {
1974 let ctx = HookContext::Link { link_id, interface_id: 0 };
1975 let now = time::now();
1976 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1977 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1978 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1979 }
1980 }
1981 log::info!("Link closed: {:02x?} reason={:?}", &link_id[..4], reason);
1982 self.holepunch_manager.link_closed(&link_id);
1983 self.callbacks.on_link_closed(rns_core::types::LinkId(link_id), reason);
1984 }
1985 LinkManagerAction::RemoteIdentified { link_id, identity_hash, public_key } => {
1986 log::debug!(
1987 "Remote identified on link {:02x?}: {:02x?}",
1988 &link_id[..4], &identity_hash[..4],
1989 );
1990 self.callbacks.on_remote_identified(
1991 rns_core::types::LinkId(link_id),
1992 rns_core::types::IdentityHash(identity_hash),
1993 public_key,
1994 );
1995 }
1996 LinkManagerAction::RegisterLinkDest { link_id } => {
1997 self.engine.register_destination(link_id, rns_core::constants::DESTINATION_LINK);
1999 }
2000 LinkManagerAction::DeregisterLinkDest { link_id } => {
2001 self.engine.deregister_destination(&link_id);
2002 }
2003 LinkManagerAction::ManagementRequest {
2004 link_id, path_hash, data, request_id, remote_identity,
2005 } => {
2006 self.handle_management_request(
2007 link_id, path_hash, data, request_id, remote_identity,
2008 );
2009 }
2010 LinkManagerAction::ResourceReceived { link_id, data, metadata } => {
2011 self.callbacks.on_resource_received(rns_core::types::LinkId(link_id), data, metadata);
2012 }
2013 LinkManagerAction::ResourceCompleted { link_id } => {
2014 self.callbacks.on_resource_completed(rns_core::types::LinkId(link_id));
2015 }
2016 LinkManagerAction::ResourceFailed { link_id, error } => {
2017 log::debug!("Resource failed on link {:02x?}: {}", &link_id[..4], error);
2018 self.callbacks.on_resource_failed(rns_core::types::LinkId(link_id), error);
2019 }
2020 LinkManagerAction::ResourceProgress { link_id, received, total } => {
2021 self.callbacks.on_resource_progress(rns_core::types::LinkId(link_id), received, total);
2022 }
2023 LinkManagerAction::ResourceAcceptQuery { link_id, resource_hash, transfer_size, has_metadata } => {
2024 let accept = self.callbacks.on_resource_accept_query(
2025 rns_core::types::LinkId(link_id), resource_hash.clone(), transfer_size, has_metadata,
2026 );
2027 let accept_actions = self.link_manager.accept_resource(
2028 &link_id, &resource_hash, accept, &mut self.rng,
2029 );
2030 self.dispatch_link_actions(accept_actions);
2032 }
2033 LinkManagerAction::ChannelMessageReceived { link_id, msgtype, payload } => {
2034 if HolePunchManager::is_holepunch_message(msgtype) {
2036 let derived_key = self.link_manager.get_derived_key(&link_id);
2037 let tx = self.get_event_sender();
2038 let (handled, hp_actions) = self.holepunch_manager.handle_signal(
2039 link_id, msgtype, payload, derived_key.as_deref(), &tx,
2040 );
2041 if handled {
2042 self.dispatch_holepunch_actions(hp_actions);
2043 }
2044 } else {
2045 self.callbacks.on_channel_message(rns_core::types::LinkId(link_id), msgtype, payload);
2046 }
2047 }
2048 LinkManagerAction::LinkDataReceived { link_id, context, data } => {
2049 self.callbacks.on_link_data(rns_core::types::LinkId(link_id), context, data);
2050 }
2051 LinkManagerAction::ResponseReceived { link_id, request_id, data } => {
2052 self.callbacks.on_response(rns_core::types::LinkId(link_id), request_id, data);
2053 }
2054 LinkManagerAction::LinkRequestReceived { link_id, receiving_interface } => {
2055 #[cfg(feature = "rns-hooks")]
2056 {
2057 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
2058 let now = time::now();
2059 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
2060 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
2061 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
2062 }
2063 }
2064 #[cfg(not(feature = "rns-hooks"))]
2065 { let _ = (link_id, receiving_interface); }
2066 }
2067 }
2068 }
2069
2070 #[cfg(feature = "rns-hooks")]
2072 if !hook_injected.is_empty() {
2073 self.dispatch_all(hook_injected);
2074 }
2075 }
2076
2077 fn dispatch_holepunch_actions(&mut self, actions: Vec<HolePunchManagerAction>) {
2079 for action in actions {
2080 match action {
2081 HolePunchManagerAction::SendChannelMessage { link_id, msgtype, payload } => {
2082 let link_actions = self.link_manager.send_channel_message(
2083 &link_id, msgtype, &payload, &mut self.rng,
2084 );
2085 self.dispatch_link_actions(link_actions);
2086 }
2087 HolePunchManagerAction::DirectConnectEstablished { link_id, session_id, interface_id, rtt, mtu } => {
2088 log::info!(
2089 "Direct connection established for link {:02x?} session {:02x?} iface {} rtt={:.1}ms mtu={}",
2090 &link_id[..4], &session_id[..4], interface_id.0, rtt * 1000.0, mtu
2091 );
2092 self.engine.redirect_path(&link_id, interface_id, time::now());
2094 self.link_manager.set_link_rtt(&link_id, rtt);
2096 self.link_manager.set_link_mtu(&link_id, mtu);
2097 self.link_manager.record_link_inbound(&link_id);
2100 self.link_manager.flush_channel_tx(&link_id);
2102 self.callbacks.on_direct_connect_established(
2103 rns_core::types::LinkId(link_id),
2104 interface_id,
2105 );
2106 }
2107 HolePunchManagerAction::DirectConnectFailed { link_id, session_id, reason } => {
2108 log::debug!(
2109 "Direct connection failed for link {:02x?} session {:02x?} reason={}",
2110 &link_id[..4], &session_id[..4], reason
2111 );
2112 self.callbacks.on_direct_connect_failed(
2113 rns_core::types::LinkId(link_id),
2114 reason,
2115 );
2116 }
2117 }
2118 }
2119 }
2120
2121 fn get_event_sender(&self) -> crate::event::EventSender {
2126 self.event_tx.clone()
2130 }
2131
2132 const MANAGEMENT_ANNOUNCE_INTERVAL: f64 = 300.0;
2134
2135 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
2137
2138 fn tick_discovery_announcer(&mut self, now: f64) {
2140 let announcer = match self.interface_announcer.as_mut() {
2141 Some(a) => a,
2142 None => return,
2143 };
2144
2145 announcer.maybe_start(now);
2146
2147 let stamp_result = match announcer.poll_ready() {
2148 Some(r) => r,
2149 None => return,
2150 };
2151
2152 let identity = match self.transport_identity.as_ref() {
2153 Some(id) => id,
2154 None => {
2155 log::warn!("Discovery: stamp ready but no transport identity");
2156 return;
2157 }
2158 };
2159
2160 let identity_hash = identity.hash();
2162 let disc_dest = rns_core::destination::destination_hash(
2163 crate::discovery::APP_NAME,
2164 &["discovery", "interface"],
2165 Some(&identity_hash),
2166 );
2167 let name_hash = self.discovery_name_hash;
2168 let mut random_hash = [0u8; 10];
2169 self.rng.fill_bytes(&mut random_hash);
2170
2171 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
2172 identity, &disc_dest, &name_hash, &random_hash,
2173 None, Some(&stamp_result.app_data),
2174 ) {
2175 Ok(v) => v,
2176 Err(e) => {
2177 log::warn!("Discovery: failed to pack announce: {}", e);
2178 return;
2179 }
2180 };
2181
2182 let flags = rns_core::packet::PacketFlags {
2183 header_type: rns_core::constants::HEADER_1,
2184 context_flag: rns_core::constants::FLAG_UNSET,
2185 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
2186 destination_type: rns_core::constants::DESTINATION_SINGLE,
2187 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
2188 };
2189
2190 let packet = match RawPacket::pack(
2191 flags, 0, &disc_dest, None,
2192 rns_core::constants::CONTEXT_NONE, &announce_data,
2193 ) {
2194 Ok(p) => p,
2195 Err(e) => {
2196 log::warn!("Discovery: failed to pack packet: {}", e);
2197 return;
2198 }
2199 };
2200
2201 let outbound_actions = self.engine.handle_outbound(
2202 &packet, rns_core::constants::DESTINATION_SINGLE, None, now,
2203 );
2204 log::debug!(
2205 "Discovery announce sent for interface #{} ({} actions, dest={:02x?})",
2206 stamp_result.index, outbound_actions.len(), &disc_dest[..4],
2207 );
2208 self.dispatch_all(outbound_actions);
2209 }
2210
2211 fn tick_management_announces(&mut self, now: f64) {
2213 if self.transport_identity.is_none() {
2214 return;
2215 }
2216
2217 let uptime = now - self.started;
2218
2219 if !self.initial_announce_sent {
2221 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
2222 return;
2223 }
2224 self.initial_announce_sent = true;
2225 self.emit_management_announces(now);
2226 return;
2227 }
2228
2229 if now - self.last_management_announce >= Self::MANAGEMENT_ANNOUNCE_INTERVAL {
2231 self.emit_management_announces(now);
2232 }
2233 }
2234
2235 fn emit_management_announces(&mut self, now: f64) {
2237 use crate::management;
2238
2239 self.last_management_announce = now;
2240
2241 let identity = match self.transport_identity {
2242 Some(ref id) => id,
2243 None => return,
2244 };
2245
2246 let mgmt_raw = if self.management_config.enable_remote_management {
2248 management::build_management_announce(identity, &mut self.rng)
2249 } else {
2250 None
2251 };
2252
2253 let bh_raw = if self.management_config.publish_blackhole {
2254 management::build_blackhole_announce(identity, &mut self.rng)
2255 } else {
2256 None
2257 };
2258
2259 let probe_raw = if self.probe_responder_hash.is_some() {
2260 management::build_probe_announce(identity, &mut self.rng)
2261 } else {
2262 None
2263 };
2264
2265 if let Some(raw) = mgmt_raw {
2266 if let Ok(packet) = RawPacket::unpack(&raw) {
2267 let actions = self.engine.handle_outbound(
2268 &packet,
2269 rns_core::constants::DESTINATION_SINGLE,
2270 None,
2271 now,
2272 );
2273 self.dispatch_all(actions);
2274 log::debug!("Emitted management destination announce");
2275 }
2276 }
2277
2278 if let Some(raw) = bh_raw {
2279 if let Ok(packet) = RawPacket::unpack(&raw) {
2280 let actions = self.engine.handle_outbound(
2281 &packet,
2282 rns_core::constants::DESTINATION_SINGLE,
2283 None,
2284 now,
2285 );
2286 self.dispatch_all(actions);
2287 log::debug!("Emitted blackhole info announce");
2288 }
2289 }
2290
2291 if let Some(raw) = probe_raw {
2292 if let Ok(packet) = RawPacket::unpack(&raw) {
2293 let actions = self.engine.handle_outbound(
2294 &packet,
2295 rns_core::constants::DESTINATION_SINGLE,
2296 None,
2297 now,
2298 );
2299 self.dispatch_all(actions);
2300 log::debug!("Emitted probe responder announce");
2301 }
2302 }
2303 }
2304
2305 fn handle_management_request(
2307 &mut self,
2308 link_id: [u8; 16],
2309 path_hash: [u8; 16],
2310 data: Vec<u8>,
2311 request_id: [u8; 16],
2312 remote_identity: Option<([u8; 16], [u8; 64])>,
2313 ) {
2314 use crate::management;
2315
2316 let is_restricted = path_hash == management::status_path_hash()
2318 || path_hash == management::path_path_hash();
2319
2320 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2321 match remote_identity {
2322 Some((identity_hash, _)) => {
2323 if !self.management_config.remote_management_allowed.contains(&identity_hash) {
2324 log::debug!("Management request denied: identity not in allowed list");
2325 return;
2326 }
2327 }
2328 None => {
2329 log::debug!("Management request denied: peer not identified");
2330 return;
2331 }
2332 }
2333 }
2334
2335 let response_data = if path_hash == management::status_path_hash() {
2336 {
2337 let views: Vec<&dyn management::InterfaceStatusView> = self.interfaces.values().map(|e| e as &dyn management::InterfaceStatusView).collect();
2338 management::handle_status_request(&data, &self.engine, &views, self.started, self.probe_responder_hash)
2339 }
2340 } else if path_hash == management::path_path_hash() {
2341 management::handle_path_request(&data, &self.engine)
2342 } else if path_hash == management::list_path_hash() {
2343 management::handle_blackhole_list_request(&self.engine)
2344 } else {
2345 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2346 None
2347 };
2348
2349 if let Some(response) = response_data {
2350 let actions = self.link_manager.send_management_response(
2351 &link_id, &request_id, &response, &mut self.rng,
2352 );
2353 self.dispatch_link_actions(actions);
2354 }
2355 }
2356}
2357
2358#[cfg(test)]
2359mod tests {
2360 use super::*;
2361 use crate::event;
2362 use crate::interface::Writer;
2363 use rns_core::announce::AnnounceData;
2364 use rns_core::constants;
2365 use rns_core::packet::PacketFlags;
2366 use rns_core::transport::types::InterfaceInfo;
2367 use rns_crypto::identity::Identity;
2368 use std::io;
2369 use std::sync::mpsc;
2370 use std::sync::{Arc, Mutex};
2371
2372 struct MockWriter {
2373 sent: Arc<Mutex<Vec<Vec<u8>>>>,
2374 }
2375
2376 impl MockWriter {
2377 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
2378 let sent = Arc::new(Mutex::new(Vec::new()));
2379 (MockWriter { sent: sent.clone() }, sent)
2380 }
2381 }
2382
2383 impl Writer for MockWriter {
2384 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
2385 self.sent.lock().unwrap().push(data.to_vec());
2386 Ok(())
2387 }
2388 }
2389
2390 use rns_core::types::{DestHash, IdentityHash, LinkId as TypedLinkId, PacketHash};
2391
2392 struct MockCallbacks {
2393 announces: Arc<Mutex<Vec<(DestHash, u8)>>>,
2394 paths: Arc<Mutex<Vec<(DestHash, u8)>>>,
2395 deliveries: Arc<Mutex<Vec<DestHash>>>,
2396 iface_ups: Arc<Mutex<Vec<InterfaceId>>>,
2397 iface_downs: Arc<Mutex<Vec<InterfaceId>>>,
2398 link_established: Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2399 link_closed: Arc<Mutex<Vec<TypedLinkId>>>,
2400 remote_identified: Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2401 resources_received: Arc<Mutex<Vec<(TypedLinkId, Vec<u8>)>>>,
2402 resource_completed: Arc<Mutex<Vec<TypedLinkId>>>,
2403 resource_failed: Arc<Mutex<Vec<(TypedLinkId, String)>>>,
2404 channel_messages: Arc<Mutex<Vec<(TypedLinkId, u16, Vec<u8>)>>>,
2405 link_data: Arc<Mutex<Vec<(TypedLinkId, u8, Vec<u8>)>>>,
2406 responses: Arc<Mutex<Vec<(TypedLinkId, [u8; 16], Vec<u8>)>>>,
2407 proofs: Arc<Mutex<Vec<(DestHash, PacketHash, f64)>>>,
2408 proof_requested: Arc<Mutex<Vec<(DestHash, PacketHash)>>>,
2409 }
2410
2411 impl MockCallbacks {
2412 fn new() -> (
2413 Self,
2414 Arc<Mutex<Vec<(DestHash, u8)>>>,
2415 Arc<Mutex<Vec<(DestHash, u8)>>>,
2416 Arc<Mutex<Vec<DestHash>>>,
2417 Arc<Mutex<Vec<InterfaceId>>>,
2418 Arc<Mutex<Vec<InterfaceId>>>,
2419 ) {
2420 let announces = Arc::new(Mutex::new(Vec::new()));
2421 let paths = Arc::new(Mutex::new(Vec::new()));
2422 let deliveries = Arc::new(Mutex::new(Vec::new()));
2423 let iface_ups = Arc::new(Mutex::new(Vec::new()));
2424 let iface_downs = Arc::new(Mutex::new(Vec::new()));
2425 (
2426 MockCallbacks {
2427 announces: announces.clone(),
2428 paths: paths.clone(),
2429 deliveries: deliveries.clone(),
2430 iface_ups: iface_ups.clone(),
2431 iface_downs: iface_downs.clone(),
2432 link_established: Arc::new(Mutex::new(Vec::new())),
2433 link_closed: Arc::new(Mutex::new(Vec::new())),
2434 remote_identified: Arc::new(Mutex::new(Vec::new())),
2435 resources_received: Arc::new(Mutex::new(Vec::new())),
2436 resource_completed: Arc::new(Mutex::new(Vec::new())),
2437 resource_failed: Arc::new(Mutex::new(Vec::new())),
2438 channel_messages: Arc::new(Mutex::new(Vec::new())),
2439 link_data: Arc::new(Mutex::new(Vec::new())),
2440 responses: Arc::new(Mutex::new(Vec::new())),
2441 proofs: Arc::new(Mutex::new(Vec::new())),
2442 proof_requested: Arc::new(Mutex::new(Vec::new())),
2443 },
2444 announces,
2445 paths,
2446 deliveries,
2447 iface_ups,
2448 iface_downs,
2449 )
2450 }
2451
2452 fn with_link_tracking() -> (
2453 Self,
2454 Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2455 Arc<Mutex<Vec<TypedLinkId>>>,
2456 Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2457 ) {
2458 let link_established = Arc::new(Mutex::new(Vec::new()));
2459 let link_closed = Arc::new(Mutex::new(Vec::new()));
2460 let remote_identified = Arc::new(Mutex::new(Vec::new()));
2461 (
2462 MockCallbacks {
2463 announces: Arc::new(Mutex::new(Vec::new())),
2464 paths: Arc::new(Mutex::new(Vec::new())),
2465 deliveries: Arc::new(Mutex::new(Vec::new())),
2466 iface_ups: Arc::new(Mutex::new(Vec::new())),
2467 iface_downs: Arc::new(Mutex::new(Vec::new())),
2468 link_established: link_established.clone(),
2469 link_closed: link_closed.clone(),
2470 remote_identified: remote_identified.clone(),
2471 resources_received: Arc::new(Mutex::new(Vec::new())),
2472 resource_completed: Arc::new(Mutex::new(Vec::new())),
2473 resource_failed: Arc::new(Mutex::new(Vec::new())),
2474 channel_messages: Arc::new(Mutex::new(Vec::new())),
2475 link_data: Arc::new(Mutex::new(Vec::new())),
2476 responses: Arc::new(Mutex::new(Vec::new())),
2477 proofs: Arc::new(Mutex::new(Vec::new())),
2478 proof_requested: Arc::new(Mutex::new(Vec::new())),
2479 },
2480 link_established,
2481 link_closed,
2482 remote_identified,
2483 )
2484 }
2485 }
2486
2487 impl Callbacks for MockCallbacks {
2488 fn on_announce(&mut self, announced: crate::destination::AnnouncedIdentity) {
2489 self.announces.lock().unwrap().push((announced.dest_hash, announced.hops));
2490 }
2491
2492 fn on_path_updated(&mut self, dest_hash: DestHash, hops: u8) {
2493 self.paths.lock().unwrap().push((dest_hash, hops));
2494 }
2495
2496 fn on_local_delivery(&mut self, dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
2497 self.deliveries.lock().unwrap().push(dest_hash);
2498 }
2499
2500 fn on_interface_up(&mut self, id: InterfaceId) {
2501 self.iface_ups.lock().unwrap().push(id);
2502 }
2503
2504 fn on_interface_down(&mut self, id: InterfaceId) {
2505 self.iface_downs.lock().unwrap().push(id);
2506 }
2507
2508 fn on_link_established(&mut self, link_id: TypedLinkId, _dest_hash: DestHash, rtt: f64, is_initiator: bool) {
2509 self.link_established.lock().unwrap().push((link_id, rtt, is_initiator));
2510 }
2511
2512 fn on_link_closed(&mut self, link_id: TypedLinkId, _reason: Option<rns_core::link::TeardownReason>) {
2513 self.link_closed.lock().unwrap().push(link_id);
2514 }
2515
2516 fn on_remote_identified(&mut self, link_id: TypedLinkId, identity_hash: IdentityHash, _public_key: [u8; 64]) {
2517 self.remote_identified.lock().unwrap().push((link_id, identity_hash));
2518 }
2519
2520 fn on_resource_received(&mut self, link_id: TypedLinkId, data: Vec<u8>, _metadata: Option<Vec<u8>>) {
2521 self.resources_received.lock().unwrap().push((link_id, data));
2522 }
2523
2524 fn on_resource_completed(&mut self, link_id: TypedLinkId) {
2525 self.resource_completed.lock().unwrap().push(link_id);
2526 }
2527
2528 fn on_resource_failed(&mut self, link_id: TypedLinkId, error: String) {
2529 self.resource_failed.lock().unwrap().push((link_id, error));
2530 }
2531
2532 fn on_channel_message(&mut self, link_id: TypedLinkId, msgtype: u16, payload: Vec<u8>) {
2533 self.channel_messages.lock().unwrap().push((link_id, msgtype, payload));
2534 }
2535
2536 fn on_link_data(&mut self, link_id: TypedLinkId, context: u8, data: Vec<u8>) {
2537 self.link_data.lock().unwrap().push((link_id, context, data));
2538 }
2539
2540 fn on_response(&mut self, link_id: TypedLinkId, request_id: [u8; 16], data: Vec<u8>) {
2541 self.responses.lock().unwrap().push((link_id, request_id, data));
2542 }
2543
2544 fn on_proof(&mut self, dest_hash: DestHash, packet_hash: PacketHash, rtt: f64) {
2545 self.proofs.lock().unwrap().push((dest_hash, packet_hash, rtt));
2546 }
2547
2548 fn on_proof_requested(&mut self, dest_hash: DestHash, packet_hash: PacketHash) -> bool {
2549 self.proof_requested.lock().unwrap().push((dest_hash, packet_hash));
2550 true
2551 }
2552 }
2553
2554 fn make_interface_info(id: u64) -> InterfaceInfo {
2555 InterfaceInfo {
2556 id: InterfaceId(id),
2557 name: format!("test-{}", id),
2558 mode: constants::MODE_FULL,
2559 out_capable: true,
2560 in_capable: true,
2561 bitrate: None,
2562 announce_rate_target: None,
2563 announce_rate_grace: 0,
2564 announce_rate_penalty: 0.0,
2565 announce_cap: rns_core::constants::ANNOUNCE_CAP,
2566 is_local_client: false,
2567 wants_tunnel: false,
2568 tunnel_id: None,
2569 mtu: constants::MTU as u32,
2570 ia_freq: 0.0,
2571 started: 0.0,
2572 ingress_control: false,
2573 }
2574 }
2575
2576 fn make_entry(id: u64, writer: Box<dyn Writer>, online: bool) -> InterfaceEntry {
2577 InterfaceEntry {
2578 id: InterfaceId(id),
2579 info: make_interface_info(id),
2580 writer,
2581 online,
2582 dynamic: false,
2583 ifac: None,
2584 stats: InterfaceStats::default(),
2585 interface_type: String::new(),
2586 }
2587 }
2588
2589 fn build_announce_packet(identity: &Identity) -> Vec<u8> {
2591 let dest_hash = rns_core::destination::destination_hash(
2592 "test",
2593 &["app"],
2594 Some(identity.hash()),
2595 );
2596 let name_hash = rns_core::destination::name_hash("test", &["app"]);
2597 let random_hash = [0x42u8; 10];
2598
2599 let (announce_data, _has_ratchet) = AnnounceData::pack(
2600 identity,
2601 &dest_hash,
2602 &name_hash,
2603 &random_hash,
2604 None,
2605 None,
2606 )
2607 .unwrap();
2608
2609 let flags = PacketFlags {
2610 header_type: constants::HEADER_1,
2611 context_flag: constants::FLAG_UNSET,
2612 transport_type: constants::TRANSPORT_BROADCAST,
2613 destination_type: constants::DESTINATION_SINGLE,
2614 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2615 };
2616
2617 let packet = RawPacket::pack(flags, 0, &dest_hash, None, constants::CONTEXT_NONE, &announce_data).unwrap();
2618 packet.raw
2619 }
2620
2621 #[test]
2622 fn process_inbound_frame() {
2623 let (tx, rx) = event::channel();
2624 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
2625 let mut driver = Driver::new(
2626 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2627 rx,
2628 tx.clone(),
2629 Box::new(cbs),
2630 );
2631 let info = make_interface_info(1);
2632 driver.engine.register_interface(info.clone());
2633 let (writer, _sent) = MockWriter::new();
2634 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2635
2636 let identity = Identity::new(&mut OsRng);
2637 let announce_raw = build_announce_packet(&identity);
2638
2639 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2641 tx.send(Event::Shutdown).unwrap();
2642 driver.run();
2643
2644 assert_eq!(announces.lock().unwrap().len(), 1);
2645 }
2646
2647 #[test]
2648 fn dispatch_send() {
2649 let (tx, rx) = event::channel();
2650 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2651 let mut driver = Driver::new(
2652 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2653 rx,
2654 tx.clone(),
2655 Box::new(cbs),
2656 );
2657 let (writer, sent) = MockWriter::new();
2658 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2659
2660 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2661 interface: InterfaceId(1),
2662 raw: vec![0x01, 0x02, 0x03],
2663 }]);
2664
2665 assert_eq!(sent.lock().unwrap().len(), 1);
2666 assert_eq!(sent.lock().unwrap()[0], vec![0x01, 0x02, 0x03]);
2667
2668 drop(tx);
2669 }
2670
2671 #[test]
2672 fn dispatch_broadcast() {
2673 let (tx, rx) = event::channel();
2674 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2675 let mut driver = Driver::new(
2676 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2677 rx,
2678 tx.clone(),
2679 Box::new(cbs),
2680 );
2681
2682 let (w1, sent1) = MockWriter::new();
2683 let (w2, sent2) = MockWriter::new();
2684 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2685 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2686
2687 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2688 raw: vec![0xAA],
2689 exclude: None,
2690 }]);
2691
2692 assert_eq!(sent1.lock().unwrap().len(), 1);
2693 assert_eq!(sent2.lock().unwrap().len(), 1);
2694
2695 drop(tx);
2696 }
2697
2698 #[test]
2699 fn dispatch_broadcast_exclude() {
2700 let (tx, rx) = event::channel();
2701 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2702 let mut driver = Driver::new(
2703 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2704 rx,
2705 tx.clone(),
2706 Box::new(cbs),
2707 );
2708
2709 let (w1, sent1) = MockWriter::new();
2710 let (w2, sent2) = MockWriter::new();
2711 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2712 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2713
2714 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2715 raw: vec![0xBB],
2716 exclude: Some(InterfaceId(1)),
2717 }]);
2718
2719 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2721
2722 drop(tx);
2723 }
2724
2725 #[test]
2726 fn tick_event() {
2727 let (tx, rx) = event::channel();
2728 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2729 let mut driver = Driver::new(
2730 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]), prefer_shorter_path: false, max_paths_per_destination: 1 },
2731 rx,
2732 tx.clone(),
2733 Box::new(cbs),
2734 );
2735 let info = make_interface_info(1);
2736 driver.engine.register_interface(info.clone());
2737 let (writer, _sent) = MockWriter::new();
2738 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2739
2740 tx.send(Event::Tick).unwrap();
2742 tx.send(Event::Shutdown).unwrap();
2743 driver.run();
2744 }
2746
2747 #[test]
2748 fn shutdown_event() {
2749 let (tx, rx) = event::channel();
2750 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2751 let mut driver = Driver::new(
2752 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2753 rx,
2754 tx.clone(),
2755 Box::new(cbs),
2756 );
2757
2758 tx.send(Event::Shutdown).unwrap();
2759 driver.run(); }
2761
2762 #[test]
2763 fn announce_callback() {
2764 let (tx, rx) = event::channel();
2765 let (cbs, announces, paths, _, _, _) = MockCallbacks::new();
2766 let mut driver = Driver::new(
2767 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2768 rx,
2769 tx.clone(),
2770 Box::new(cbs),
2771 );
2772 let info = make_interface_info(1);
2773 driver.engine.register_interface(info.clone());
2774 let (writer, _sent) = MockWriter::new();
2775 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2776
2777 let identity = Identity::new(&mut OsRng);
2778 let announce_raw = build_announce_packet(&identity);
2779
2780 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2781 tx.send(Event::Shutdown).unwrap();
2782 driver.run();
2783
2784 let ann = announces.lock().unwrap();
2785 assert_eq!(ann.len(), 1);
2786 assert_eq!(ann[0].1, 1);
2788
2789 let p = paths.lock().unwrap();
2790 assert_eq!(p.len(), 1);
2791 }
2792
2793 #[test]
2794 fn dispatch_skips_offline_interface() {
2795 let (tx, rx) = event::channel();
2796 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2797 let mut driver = Driver::new(
2798 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2799 rx,
2800 tx.clone(),
2801 Box::new(cbs),
2802 );
2803
2804 let (w1, sent1) = MockWriter::new();
2805 let (w2, sent2) = MockWriter::new();
2806 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), false)); driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2808
2809 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2811 interface: InterfaceId(1),
2812 raw: vec![0x01],
2813 }]);
2814 assert_eq!(sent1.lock().unwrap().len(), 0);
2815
2816 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2818 raw: vec![0x02],
2819 exclude: None,
2820 }]);
2821 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2823
2824 drop(tx);
2825 }
2826
2827 #[test]
2828 fn interface_up_refreshes_writer() {
2829 let (tx, rx) = event::channel();
2830 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2831 let mut driver = Driver::new(
2832 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2833 rx,
2834 tx.clone(),
2835 Box::new(cbs),
2836 );
2837
2838 let (w_old, sent_old) = MockWriter::new();
2839 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w_old), false));
2840
2841 let (w_new, sent_new) = MockWriter::new();
2843 tx.send(Event::InterfaceUp(InterfaceId(1), Some(Box::new(w_new)), None)).unwrap();
2844 tx.send(Event::Shutdown).unwrap();
2845 driver.run();
2846
2847 assert!(driver.interfaces[&InterfaceId(1)].online);
2849
2850 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2852 interface: InterfaceId(1),
2853 raw: vec![0xFF],
2854 }]);
2855
2856 assert_eq!(sent_old.lock().unwrap().len(), 0);
2858 assert_eq!(sent_new.lock().unwrap().len(), 1);
2860 assert_eq!(sent_new.lock().unwrap()[0], vec![0xFF]);
2861
2862 drop(tx);
2863 }
2864
2865 #[test]
2866 fn dynamic_interface_register() {
2867 let (tx, rx) = event::channel();
2868 let (cbs, _, _, _, iface_ups, _) = MockCallbacks::new();
2869 let mut driver = Driver::new(
2870 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2871 rx,
2872 tx.clone(),
2873 Box::new(cbs),
2874 );
2875
2876 let info = make_interface_info(100);
2877 let (writer, sent) = MockWriter::new();
2878
2879 tx.send(Event::InterfaceUp(
2881 InterfaceId(100),
2882 Some(Box::new(writer)),
2883 Some(info),
2884 ))
2885 .unwrap();
2886 tx.send(Event::Shutdown).unwrap();
2887 driver.run();
2888
2889 assert!(driver.interfaces.contains_key(&InterfaceId(100)));
2891 assert!(driver.interfaces[&InterfaceId(100)].online);
2892 assert!(driver.interfaces[&InterfaceId(100)].dynamic);
2893
2894 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2896 assert_eq!(iface_ups.lock().unwrap()[0], InterfaceId(100));
2897
2898 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2900 interface: InterfaceId(100),
2901 raw: vec![0x42],
2902 }]);
2903 assert_eq!(sent.lock().unwrap().len(), 1);
2904
2905 drop(tx);
2906 }
2907
2908 #[test]
2909 fn dynamic_interface_deregister() {
2910 let (tx, rx) = event::channel();
2911 let (cbs, _, _, _, _, iface_downs) = MockCallbacks::new();
2912 let mut driver = Driver::new(
2913 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2914 rx,
2915 tx.clone(),
2916 Box::new(cbs),
2917 );
2918
2919 let info = make_interface_info(200);
2921 driver.engine.register_interface(info.clone());
2922 let (writer, _sent) = MockWriter::new();
2923 driver.interfaces.insert(InterfaceId(200), InterfaceEntry {
2924 id: InterfaceId(200),
2925 info,
2926 writer: Box::new(writer),
2927 online: true,
2928 dynamic: true,
2929 ifac: None,
2930 stats: InterfaceStats::default(),
2931 interface_type: String::new(),
2932 });
2933
2934 tx.send(Event::InterfaceDown(InterfaceId(200))).unwrap();
2936 tx.send(Event::Shutdown).unwrap();
2937 driver.run();
2938
2939 assert!(!driver.interfaces.contains_key(&InterfaceId(200)));
2940 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2941 assert_eq!(iface_downs.lock().unwrap()[0], InterfaceId(200));
2942 }
2943
2944 #[test]
2945 fn interface_callbacks_fire() {
2946 let (tx, rx) = event::channel();
2947 let (cbs, _, _, _, iface_ups, iface_downs) = MockCallbacks::new();
2948 let mut driver = Driver::new(
2949 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2950 rx,
2951 tx.clone(),
2952 Box::new(cbs),
2953 );
2954
2955 let (writer, _) = MockWriter::new();
2957 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), false));
2958
2959 tx.send(Event::InterfaceUp(InterfaceId(1), None, None)).unwrap();
2960 tx.send(Event::InterfaceDown(InterfaceId(1))).unwrap();
2961 tx.send(Event::Shutdown).unwrap();
2962 driver.run();
2963
2964 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2965 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2966 assert!(driver.interfaces.contains_key(&InterfaceId(1)));
2968 assert!(!driver.interfaces[&InterfaceId(1)].online);
2969 }
2970
2971 #[test]
2976 fn frame_updates_rx_stats() {
2977 let (tx, rx) = event::channel();
2978 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2979 let mut driver = Driver::new(
2980 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
2981 rx,
2982 tx.clone(),
2983 Box::new(cbs),
2984 );
2985 let info = make_interface_info(1);
2986 driver.engine.register_interface(info.clone());
2987 let (writer, _sent) = MockWriter::new();
2988 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2989
2990 let identity = Identity::new(&mut OsRng);
2991 let announce_raw = build_announce_packet(&identity);
2992 let announce_len = announce_raw.len() as u64;
2993
2994 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2995 tx.send(Event::Shutdown).unwrap();
2996 driver.run();
2997
2998 let stats = &driver.interfaces[&InterfaceId(1)].stats;
2999 assert_eq!(stats.rxb, announce_len);
3000 assert_eq!(stats.rx_packets, 1);
3001 }
3002
3003 #[test]
3004 fn send_updates_tx_stats() {
3005 let (tx, rx) = event::channel();
3006 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3007 let mut driver = Driver::new(
3008 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3009 rx,
3010 tx.clone(),
3011 Box::new(cbs),
3012 );
3013 let (writer, _sent) = MockWriter::new();
3014 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3015
3016 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3017 interface: InterfaceId(1),
3018 raw: vec![0x01, 0x02, 0x03],
3019 }]);
3020
3021 let stats = &driver.interfaces[&InterfaceId(1)].stats;
3022 assert_eq!(stats.txb, 3);
3023 assert_eq!(stats.tx_packets, 1);
3024
3025 drop(tx);
3026 }
3027
3028 #[test]
3029 fn broadcast_updates_tx_stats() {
3030 let (tx, rx) = event::channel();
3031 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3032 let mut driver = Driver::new(
3033 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3034 rx,
3035 tx.clone(),
3036 Box::new(cbs),
3037 );
3038 let (w1, _s1) = MockWriter::new();
3039 let (w2, _s2) = MockWriter::new();
3040 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
3041 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
3042
3043 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
3044 raw: vec![0xAA, 0xBB],
3045 exclude: None,
3046 }]);
3047
3048 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.txb, 2);
3050 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.tx_packets, 1);
3051 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.txb, 2);
3052 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.tx_packets, 1);
3053
3054 drop(tx);
3055 }
3056
3057 #[test]
3058 fn query_interface_stats() {
3059 let (tx, rx) = event::channel();
3060 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3061 let mut driver = Driver::new(
3062 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]), prefer_shorter_path: false, max_paths_per_destination: 1 },
3063 rx,
3064 tx.clone(),
3065 Box::new(cbs),
3066 );
3067 let (writer, _sent) = MockWriter::new();
3068 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3069
3070 let (resp_tx, resp_rx) = mpsc::channel();
3071 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
3072 tx.send(Event::Shutdown).unwrap();
3073 driver.run();
3074
3075 let resp = resp_rx.recv().unwrap();
3076 match resp {
3077 QueryResponse::InterfaceStats(stats) => {
3078 assert_eq!(stats.interfaces.len(), 1);
3079 assert_eq!(stats.interfaces[0].name, "test-1");
3080 assert!(stats.interfaces[0].status);
3081 assert_eq!(stats.transport_id, Some([0x42; 16]));
3082 assert!(stats.transport_enabled);
3083 }
3084 _ => panic!("unexpected response"),
3085 }
3086 }
3087
3088 #[test]
3089 fn query_path_table() {
3090 let (tx, rx) = event::channel();
3091 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3092 let mut driver = Driver::new(
3093 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3094 rx,
3095 tx.clone(),
3096 Box::new(cbs),
3097 );
3098 let info = make_interface_info(1);
3099 driver.engine.register_interface(info);
3100 let (writer, _sent) = MockWriter::new();
3101 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3102
3103 let identity = Identity::new(&mut OsRng);
3105 let announce_raw = build_announce_packet(&identity);
3106 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3107
3108 let (resp_tx, resp_rx) = mpsc::channel();
3109 tx.send(Event::Query(QueryRequest::PathTable { max_hops: None }, resp_tx)).unwrap();
3110 tx.send(Event::Shutdown).unwrap();
3111 driver.run();
3112
3113 let resp = resp_rx.recv().unwrap();
3114 match resp {
3115 QueryResponse::PathTable(entries) => {
3116 assert_eq!(entries.len(), 1);
3117 assert_eq!(entries[0].hops, 1);
3118 }
3119 _ => panic!("unexpected response"),
3120 }
3121 }
3122
3123 #[test]
3124 fn query_drop_path() {
3125 let (tx, rx) = event::channel();
3126 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3127 let mut driver = Driver::new(
3128 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3129 rx,
3130 tx.clone(),
3131 Box::new(cbs),
3132 );
3133 let info = make_interface_info(1);
3134 driver.engine.register_interface(info);
3135 let (writer, _sent) = MockWriter::new();
3136 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3137
3138 let identity = Identity::new(&mut OsRng);
3140 let announce_raw = build_announce_packet(&identity);
3141 let dest_hash = rns_core::destination::destination_hash(
3142 "test", &["app"], Some(identity.hash()),
3143 );
3144
3145 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3146
3147 let (resp_tx, resp_rx) = mpsc::channel();
3148 tx.send(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)).unwrap();
3149 tx.send(Event::Shutdown).unwrap();
3150 driver.run();
3151
3152 let resp = resp_rx.recv().unwrap();
3153 match resp {
3154 QueryResponse::DropPath(dropped) => {
3155 assert!(dropped);
3156 }
3157 _ => panic!("unexpected response"),
3158 }
3159 }
3160
3161 #[test]
3162 fn send_outbound_event() {
3163 let (tx, rx) = event::channel();
3164 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3165 let mut driver = Driver::new(
3166 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3167 rx,
3168 tx.clone(),
3169 Box::new(cbs),
3170 );
3171 let (writer, sent) = MockWriter::new();
3172 let info = make_interface_info(1);
3173 driver.engine.register_interface(info);
3174 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3175
3176 let dest = [0xAA; 16];
3178 let flags = PacketFlags {
3179 header_type: constants::HEADER_1,
3180 context_flag: constants::FLAG_UNSET,
3181 transport_type: constants::TRANSPORT_BROADCAST,
3182 destination_type: constants::DESTINATION_PLAIN,
3183 packet_type: constants::PACKET_TYPE_DATA,
3184 };
3185 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3186
3187 tx.send(Event::SendOutbound {
3188 raw: packet.raw,
3189 dest_type: constants::DESTINATION_PLAIN,
3190 attached_interface: None,
3191 }).unwrap();
3192 tx.send(Event::Shutdown).unwrap();
3193 driver.run();
3194
3195 assert_eq!(sent.lock().unwrap().len(), 1);
3197 }
3198
3199 #[test]
3200 fn register_destination_and_deliver() {
3201 let (tx, rx) = event::channel();
3202 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3203 let mut driver = Driver::new(
3204 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3205 rx,
3206 tx.clone(),
3207 Box::new(cbs),
3208 );
3209 let info = make_interface_info(1);
3210 driver.engine.register_interface(info);
3211 let (writer, _sent) = MockWriter::new();
3212 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3213
3214 let dest = [0xBB; 16];
3215
3216 tx.send(Event::RegisterDestination {
3218 dest_hash: dest,
3219 dest_type: constants::DESTINATION_SINGLE,
3220 }).unwrap();
3221
3222 let flags = PacketFlags {
3223 header_type: constants::HEADER_1,
3224 context_flag: constants::FLAG_UNSET,
3225 transport_type: constants::TRANSPORT_BROADCAST,
3226 destination_type: constants::DESTINATION_SINGLE,
3227 packet_type: constants::PACKET_TYPE_DATA,
3228 };
3229 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"data").unwrap();
3230 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3231 tx.send(Event::Shutdown).unwrap();
3232 driver.run();
3233
3234 assert_eq!(deliveries.lock().unwrap().len(), 1);
3235 assert_eq!(deliveries.lock().unwrap()[0], DestHash(dest));
3236 }
3237
3238 #[test]
3239 fn query_transport_identity() {
3240 let (tx, rx) = event::channel();
3241 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3242 let mut driver = Driver::new(
3243 TransportConfig { transport_enabled: true, identity_hash: Some([0xAA; 16]), prefer_shorter_path: false, max_paths_per_destination: 1 },
3244 rx,
3245 tx.clone(),
3246 Box::new(cbs),
3247 );
3248
3249 let (resp_tx, resp_rx) = mpsc::channel();
3250 tx.send(Event::Query(QueryRequest::TransportIdentity, resp_tx)).unwrap();
3251 tx.send(Event::Shutdown).unwrap();
3252 driver.run();
3253
3254 match resp_rx.recv().unwrap() {
3255 QueryResponse::TransportIdentity(Some(hash)) => {
3256 assert_eq!(hash, [0xAA; 16]);
3257 }
3258 _ => panic!("unexpected response"),
3259 }
3260 }
3261
3262 #[test]
3263 fn query_link_count() {
3264 let (tx, rx) = event::channel();
3265 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3266 let mut driver = Driver::new(
3267 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3268 rx,
3269 tx.clone(),
3270 Box::new(cbs),
3271 );
3272
3273 let (resp_tx, resp_rx) = mpsc::channel();
3274 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3275 tx.send(Event::Shutdown).unwrap();
3276 driver.run();
3277
3278 match resp_rx.recv().unwrap() {
3279 QueryResponse::LinkCount(count) => assert_eq!(count, 0),
3280 _ => panic!("unexpected response"),
3281 }
3282 }
3283
3284 #[test]
3285 fn query_rate_table() {
3286 let (tx, rx) = event::channel();
3287 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3288 let mut driver = Driver::new(
3289 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3290 rx,
3291 tx.clone(),
3292 Box::new(cbs),
3293 );
3294
3295 let (resp_tx, resp_rx) = mpsc::channel();
3296 tx.send(Event::Query(QueryRequest::RateTable, resp_tx)).unwrap();
3297 tx.send(Event::Shutdown).unwrap();
3298 driver.run();
3299
3300 match resp_rx.recv().unwrap() {
3301 QueryResponse::RateTable(entries) => assert!(entries.is_empty()),
3302 _ => panic!("unexpected response"),
3303 }
3304 }
3305
3306 #[test]
3307 fn query_next_hop() {
3308 let (tx, rx) = event::channel();
3309 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3310 let mut driver = Driver::new(
3311 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3312 rx,
3313 tx.clone(),
3314 Box::new(cbs),
3315 );
3316
3317 let dest = [0xBB; 16];
3318 let (resp_tx, resp_rx) = mpsc::channel();
3319 tx.send(Event::Query(QueryRequest::NextHop { dest_hash: dest }, resp_tx)).unwrap();
3320 tx.send(Event::Shutdown).unwrap();
3321 driver.run();
3322
3323 match resp_rx.recv().unwrap() {
3324 QueryResponse::NextHop(None) => {}
3325 _ => panic!("unexpected response"),
3326 }
3327 }
3328
3329 #[test]
3330 fn query_next_hop_if_name() {
3331 let (tx, rx) = event::channel();
3332 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3333 let mut driver = Driver::new(
3334 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3335 rx,
3336 tx.clone(),
3337 Box::new(cbs),
3338 );
3339
3340 let dest = [0xCC; 16];
3341 let (resp_tx, resp_rx) = mpsc::channel();
3342 tx.send(Event::Query(QueryRequest::NextHopIfName { dest_hash: dest }, resp_tx)).unwrap();
3343 tx.send(Event::Shutdown).unwrap();
3344 driver.run();
3345
3346 match resp_rx.recv().unwrap() {
3347 QueryResponse::NextHopIfName(None) => {}
3348 _ => panic!("unexpected response"),
3349 }
3350 }
3351
3352 #[test]
3353 fn query_drop_all_via() {
3354 let (tx, rx) = event::channel();
3355 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3356 let mut driver = Driver::new(
3357 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3358 rx,
3359 tx.clone(),
3360 Box::new(cbs),
3361 );
3362
3363 let transport = [0xDD; 16];
3364 let (resp_tx, resp_rx) = mpsc::channel();
3365 tx.send(Event::Query(
3366 QueryRequest::DropAllVia { transport_hash: transport },
3367 resp_tx,
3368 )).unwrap();
3369 tx.send(Event::Shutdown).unwrap();
3370 driver.run();
3371
3372 match resp_rx.recv().unwrap() {
3373 QueryResponse::DropAllVia(count) => assert_eq!(count, 0),
3374 _ => panic!("unexpected response"),
3375 }
3376 }
3377
3378 #[test]
3379 fn query_drop_announce_queues() {
3380 let (tx, rx) = event::channel();
3381 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3382 let mut driver = Driver::new(
3383 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3384 rx,
3385 tx.clone(),
3386 Box::new(cbs),
3387 );
3388
3389 let (resp_tx, resp_rx) = mpsc::channel();
3390 tx.send(Event::Query(QueryRequest::DropAnnounceQueues, resp_tx)).unwrap();
3391 tx.send(Event::Shutdown).unwrap();
3392 driver.run();
3393
3394 match resp_rx.recv().unwrap() {
3395 QueryResponse::DropAnnounceQueues => {}
3396 _ => panic!("unexpected response"),
3397 }
3398 }
3399
3400 #[test]
3405 fn register_link_dest_event() {
3406 let (tx, rx) = event::channel();
3407 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3408 let mut driver = Driver::new(
3409 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3410 rx,
3411 tx.clone(),
3412 Box::new(cbs),
3413 );
3414 let info = make_interface_info(1);
3415 driver.engine.register_interface(info);
3416 let (writer, _sent) = MockWriter::new();
3417 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3418
3419 let mut rng = OsRng;
3420 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3421 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3422 let sig_prv_bytes = sig_prv.private_bytes();
3423 let dest_hash = [0xDD; 16];
3424
3425 tx.send(Event::RegisterLinkDestination {
3426 dest_hash,
3427 sig_prv_bytes,
3428 sig_pub_bytes,
3429 resource_strategy: 0,
3430 }).unwrap();
3431 tx.send(Event::Shutdown).unwrap();
3432 driver.run();
3433
3434 assert!(driver.link_manager.is_link_destination(&dest_hash));
3436 }
3437
3438 #[test]
3439 fn create_link_event() {
3440 let (tx, rx) = event::channel();
3441 let (cbs, _link_established, _, _) = MockCallbacks::with_link_tracking();
3442 let mut driver = Driver::new(
3443 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3444 rx,
3445 tx.clone(),
3446 Box::new(cbs),
3447 );
3448 let info = make_interface_info(1);
3449 driver.engine.register_interface(info);
3450 let (writer, _sent) = MockWriter::new();
3451 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3452
3453 let dest_hash = [0xDD; 16];
3454 let dummy_sig_pub = [0xAA; 32];
3455
3456 let (resp_tx, resp_rx) = mpsc::channel();
3457 tx.send(Event::CreateLink {
3458 dest_hash,
3459 dest_sig_pub_bytes: dummy_sig_pub,
3460 response_tx: resp_tx,
3461 }).unwrap();
3462 tx.send(Event::Shutdown).unwrap();
3463 driver.run();
3464
3465 let link_id = resp_rx.recv().unwrap();
3467 assert_ne!(link_id, [0u8; 16]);
3468
3469 assert_eq!(driver.link_manager.link_count(), 1);
3471
3472 }
3477
3478 #[test]
3479 fn deliver_local_routes_to_link_manager() {
3480 let (tx, rx) = event::channel();
3483 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3484 let mut driver = Driver::new(
3485 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3486 rx,
3487 tx.clone(),
3488 Box::new(cbs),
3489 );
3490 let info = make_interface_info(1);
3491 driver.engine.register_interface(info);
3492 let (writer, _sent) = MockWriter::new();
3493 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3494
3495 let mut rng = OsRng;
3497 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3498 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3499 let dest_hash = [0xEE; 16];
3500 driver.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes, crate::link_manager::ResourceStrategy::AcceptNone);
3501
3502 assert!(driver.link_manager.is_link_destination(&dest_hash));
3506
3507 assert!(!driver.link_manager.is_link_destination(&[0xFF; 16]));
3509
3510 drop(tx);
3511 }
3512
3513 #[test]
3514 fn teardown_link_event() {
3515 let (tx, rx) = event::channel();
3516 let (cbs, _, link_closed, _) = MockCallbacks::with_link_tracking();
3517 let mut driver = Driver::new(
3518 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3519 rx,
3520 tx.clone(),
3521 Box::new(cbs),
3522 );
3523 let info = make_interface_info(1);
3524 driver.engine.register_interface(info);
3525 let (writer, _sent) = MockWriter::new();
3526 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3527
3528 let (resp_tx, resp_rx) = mpsc::channel();
3530 tx.send(Event::CreateLink {
3531 dest_hash: [0xDD; 16],
3532 dest_sig_pub_bytes: [0xAA; 32],
3533 response_tx: resp_tx,
3534 }).unwrap();
3535 tx.send(Event::Shutdown).unwrap();
3540 driver.run();
3541
3542 let link_id = resp_rx.recv().unwrap();
3543 assert_ne!(link_id, [0u8; 16]);
3544 assert_eq!(driver.link_manager.link_count(), 1);
3545
3546 let teardown_actions = driver.link_manager.teardown_link(&link_id);
3548 driver.dispatch_link_actions(teardown_actions);
3549
3550 assert_eq!(link_closed.lock().unwrap().len(), 1);
3552 assert_eq!(link_closed.lock().unwrap()[0], TypedLinkId(link_id));
3553 }
3554
3555 #[test]
3556 fn link_count_includes_link_manager() {
3557 let (tx, rx) = event::channel();
3558 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3559 let mut driver = Driver::new(
3560 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3561 rx,
3562 tx.clone(),
3563 Box::new(cbs),
3564 );
3565 let info = make_interface_info(1);
3566 driver.engine.register_interface(info);
3567 let (writer, _sent) = MockWriter::new();
3568 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3569
3570 let mut rng = OsRng;
3572 let dummy_sig = [0xAA; 32];
3573 driver.link_manager.create_link(&[0xDD; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3574
3575 let (resp_tx, resp_rx) = mpsc::channel();
3577 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3578 tx.send(Event::Shutdown).unwrap();
3579 driver.run();
3580
3581 match resp_rx.recv().unwrap() {
3582 QueryResponse::LinkCount(count) => assert_eq!(count, 1),
3583 _ => panic!("unexpected response"),
3584 }
3585 }
3586
3587 #[test]
3588 fn register_request_handler_event() {
3589 let (tx, rx) = event::channel();
3590 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3591 let mut driver = Driver::new(
3592 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3593 rx,
3594 tx.clone(),
3595 Box::new(cbs),
3596 );
3597
3598 tx.send(Event::RegisterRequestHandler {
3599 path: "/status".to_string(),
3600 allowed_list: None,
3601 handler: Box::new(|_link_id, _path, _data, _remote| Some(b"OK".to_vec())),
3602 }).unwrap();
3603 tx.send(Event::Shutdown).unwrap();
3604 driver.run();
3605
3606 }
3609
3610 #[test]
3613 fn management_announces_emitted_after_delay() {
3614 let (tx, rx) = event::channel();
3615 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
3616 let identity = Identity::new(&mut OsRng);
3617 let identity_hash = *identity.hash();
3618 let mut driver = Driver::new(
3619 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash), prefer_shorter_path: false, max_paths_per_destination: 1 },
3620 rx,
3621 tx.clone(),
3622 Box::new(cbs),
3623 );
3624
3625 let info = make_interface_info(1);
3627 driver.engine.register_interface(info.clone());
3628 let (writer, sent) = MockWriter::new();
3629 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3630
3631 driver.management_config.enable_remote_management = true;
3633 driver.transport_identity = Some(identity);
3634
3635 driver.started = time::now() - 10.0;
3637
3638 tx.send(Event::Tick).unwrap();
3640 tx.send(Event::Shutdown).unwrap();
3641 driver.run();
3642
3643 let sent_packets = sent.lock().unwrap();
3645 assert!(!sent_packets.is_empty(),
3646 "Management announce should be sent after startup delay");
3647 }
3648
3649 #[test]
3650 fn management_announces_not_emitted_when_disabled() {
3651 let (tx, rx) = event::channel();
3652 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3653 let identity = Identity::new(&mut OsRng);
3654 let identity_hash = *identity.hash();
3655 let mut driver = Driver::new(
3656 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash), prefer_shorter_path: false, max_paths_per_destination: 1 },
3657 rx,
3658 tx.clone(),
3659 Box::new(cbs),
3660 );
3661
3662 let info = make_interface_info(1);
3663 driver.engine.register_interface(info.clone());
3664 let (writer, sent) = MockWriter::new();
3665 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3666
3667 driver.transport_identity = Some(identity);
3669 driver.started = time::now() - 10.0;
3670
3671 tx.send(Event::Tick).unwrap();
3672 tx.send(Event::Shutdown).unwrap();
3673 driver.run();
3674
3675 let sent_packets = sent.lock().unwrap();
3677 assert!(sent_packets.is_empty(),
3678 "No announces should be sent when management is disabled");
3679 }
3680
3681 #[test]
3682 fn management_announces_not_emitted_before_delay() {
3683 let (tx, rx) = event::channel();
3684 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3685 let identity = Identity::new(&mut OsRng);
3686 let identity_hash = *identity.hash();
3687 let mut driver = Driver::new(
3688 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash), prefer_shorter_path: false, max_paths_per_destination: 1 },
3689 rx,
3690 tx.clone(),
3691 Box::new(cbs),
3692 );
3693
3694 let info = make_interface_info(1);
3695 driver.engine.register_interface(info.clone());
3696 let (writer, sent) = MockWriter::new();
3697 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3698
3699 driver.management_config.enable_remote_management = true;
3700 driver.transport_identity = Some(identity);
3701 driver.started = time::now();
3703
3704 tx.send(Event::Tick).unwrap();
3705 tx.send(Event::Shutdown).unwrap();
3706 driver.run();
3707
3708 let sent_packets = sent.lock().unwrap();
3709 assert!(sent_packets.is_empty(),
3710 "No announces before startup delay");
3711 }
3712
3713 #[test]
3718 fn announce_received_populates_known_destinations() {
3719 let (tx, rx) = event::channel();
3720 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3721 let mut driver = Driver::new(
3722 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3723 rx,
3724 tx.clone(),
3725 Box::new(cbs),
3726 );
3727 let info = make_interface_info(1);
3728 driver.engine.register_interface(info);
3729 let (writer, _sent) = MockWriter::new();
3730 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3731
3732 let identity = Identity::new(&mut OsRng);
3733 let announce_raw = build_announce_packet(&identity);
3734
3735 let dest_hash = rns_core::destination::destination_hash(
3736 "test", &["app"], Some(identity.hash()),
3737 );
3738
3739 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3740 tx.send(Event::Shutdown).unwrap();
3741 driver.run();
3742
3743 assert!(driver.known_destinations.contains_key(&dest_hash));
3745 let recalled = &driver.known_destinations[&dest_hash];
3746 assert_eq!(recalled.dest_hash.0, dest_hash);
3747 assert_eq!(recalled.identity_hash.0, *identity.hash());
3748 assert_eq!(&recalled.public_key, &identity.get_public_key().unwrap());
3749 assert_eq!(recalled.hops, 1);
3750 }
3751
3752 #[test]
3753 fn query_has_path() {
3754 let (tx, rx) = event::channel();
3755 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3756 let mut driver = Driver::new(
3757 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3758 rx,
3759 tx.clone(),
3760 Box::new(cbs),
3761 );
3762 let info = make_interface_info(1);
3763 driver.engine.register_interface(info);
3764 let (writer, _sent) = MockWriter::new();
3765 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3766
3767 let (resp_tx, resp_rx) = mpsc::channel();
3769 tx.send(Event::Query(QueryRequest::HasPath { dest_hash: [0xAA; 16] }, resp_tx)).unwrap();
3770
3771 let identity = Identity::new(&mut OsRng);
3773 let announce_raw = build_announce_packet(&identity);
3774 let dest_hash = rns_core::destination::destination_hash(
3775 "test", &["app"], Some(identity.hash()),
3776 );
3777 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3778
3779 let (resp_tx2, resp_rx2) = mpsc::channel();
3780 tx.send(Event::Query(QueryRequest::HasPath { dest_hash }, resp_tx2)).unwrap();
3781
3782 tx.send(Event::Shutdown).unwrap();
3783 driver.run();
3784
3785 match resp_rx.recv().unwrap() {
3787 QueryResponse::HasPath(false) => {}
3788 other => panic!("expected HasPath(false), got {:?}", other),
3789 }
3790
3791 match resp_rx2.recv().unwrap() {
3793 QueryResponse::HasPath(true) => {}
3794 other => panic!("expected HasPath(true), got {:?}", other),
3795 }
3796 }
3797
3798 #[test]
3799 fn query_hops_to() {
3800 let (tx, rx) = event::channel();
3801 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3802 let mut driver = Driver::new(
3803 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3804 rx,
3805 tx.clone(),
3806 Box::new(cbs),
3807 );
3808 let info = make_interface_info(1);
3809 driver.engine.register_interface(info);
3810 let (writer, _sent) = MockWriter::new();
3811 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3812
3813 let identity = Identity::new(&mut OsRng);
3815 let announce_raw = build_announce_packet(&identity);
3816 let dest_hash = rns_core::destination::destination_hash(
3817 "test", &["app"], Some(identity.hash()),
3818 );
3819
3820 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3821
3822 let (resp_tx, resp_rx) = mpsc::channel();
3823 tx.send(Event::Query(QueryRequest::HopsTo { dest_hash }, resp_tx)).unwrap();
3824 tx.send(Event::Shutdown).unwrap();
3825 driver.run();
3826
3827 match resp_rx.recv().unwrap() {
3828 QueryResponse::HopsTo(Some(1)) => {}
3829 other => panic!("expected HopsTo(Some(1)), got {:?}", other),
3830 }
3831 }
3832
3833 #[test]
3834 fn query_recall_identity() {
3835 let (tx, rx) = event::channel();
3836 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3837 let mut driver = Driver::new(
3838 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3839 rx,
3840 tx.clone(),
3841 Box::new(cbs),
3842 );
3843 let info = make_interface_info(1);
3844 driver.engine.register_interface(info);
3845 let (writer, _sent) = MockWriter::new();
3846 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3847
3848 let identity = Identity::new(&mut OsRng);
3849 let announce_raw = build_announce_packet(&identity);
3850 let dest_hash = rns_core::destination::destination_hash(
3851 "test", &["app"], Some(identity.hash()),
3852 );
3853
3854 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3855
3856 let (resp_tx, resp_rx) = mpsc::channel();
3858 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash }, resp_tx)).unwrap();
3859
3860 let (resp_tx2, resp_rx2) = mpsc::channel();
3862 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash: [0xFF; 16] }, resp_tx2)).unwrap();
3863
3864 tx.send(Event::Shutdown).unwrap();
3865 driver.run();
3866
3867 match resp_rx.recv().unwrap() {
3868 QueryResponse::RecallIdentity(Some(recalled)) => {
3869 assert_eq!(recalled.dest_hash.0, dest_hash);
3870 assert_eq!(recalled.identity_hash.0, *identity.hash());
3871 assert_eq!(recalled.public_key, identity.get_public_key().unwrap());
3872 assert_eq!(recalled.hops, 1);
3873 }
3874 other => panic!("expected RecallIdentity(Some(..)), got {:?}", other),
3875 }
3876
3877 match resp_rx2.recv().unwrap() {
3878 QueryResponse::RecallIdentity(None) => {}
3879 other => panic!("expected RecallIdentity(None), got {:?}", other),
3880 }
3881 }
3882
3883 #[test]
3884 fn request_path_sends_packet() {
3885 let (tx, rx) = event::channel();
3886 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3887 let mut driver = Driver::new(
3888 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3889 rx,
3890 tx.clone(),
3891 Box::new(cbs),
3892 );
3893 let info = make_interface_info(1);
3894 driver.engine.register_interface(info);
3895 let (writer, sent) = MockWriter::new();
3896 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3897
3898 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
3900 tx.send(Event::Shutdown).unwrap();
3901 driver.run();
3902
3903 let sent_packets = sent.lock().unwrap();
3905 assert!(!sent_packets.is_empty(), "Path request should be sent on wire");
3906
3907 let raw = &sent_packets[0];
3909 let flags = rns_core::packet::PacketFlags::unpack(raw[0] & 0x7F);
3910 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
3911 assert_eq!(flags.destination_type, constants::DESTINATION_PLAIN);
3912 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
3913 }
3914
3915 #[test]
3916 fn request_path_includes_transport_id() {
3917 let (tx, rx) = event::channel();
3918 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3919 let mut driver = Driver::new(
3920 TransportConfig { transport_enabled: true, identity_hash: Some([0xBB; 16]), prefer_shorter_path: false, max_paths_per_destination: 1 },
3921 rx,
3922 tx.clone(),
3923 Box::new(cbs),
3924 );
3925 let info = make_interface_info(1);
3926 driver.engine.register_interface(info);
3927 let (writer, sent) = MockWriter::new();
3928 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3929
3930 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
3931 tx.send(Event::Shutdown).unwrap();
3932 driver.run();
3933
3934 let sent_packets = sent.lock().unwrap();
3935 assert!(!sent_packets.is_empty());
3936
3937 let raw = &sent_packets[0];
3939 if let Ok(packet) = RawPacket::unpack(raw) {
3940 assert_eq!(packet.data.len(), 48, "Path request data should be 48 bytes with transport_id");
3942 assert_eq!(&packet.data[..16], &[0xAA; 16], "First 16 bytes should be dest_hash");
3943 assert_eq!(&packet.data[16..32], &[0xBB; 16], "Next 16 bytes should be transport_id");
3944 } else {
3945 panic!("Could not unpack sent packet");
3946 }
3947 }
3948
3949 #[test]
3950 fn path_request_dest_registered() {
3951 let (tx, rx) = event::channel();
3952 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3953 let driver = Driver::new(
3954 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3955 rx,
3956 tx.clone(),
3957 Box::new(cbs),
3958 );
3959
3960 let expected_dest = rns_core::destination::destination_hash(
3962 "rnstransport", &["path", "request"], None,
3963 );
3964 assert_eq!(driver.path_request_dest, expected_dest);
3965
3966 drop(tx);
3967 }
3968
3969 #[test]
3974 fn register_proof_strategy_event() {
3975 let (tx, rx) = event::channel();
3976 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3977 let mut driver = Driver::new(
3978 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
3979 rx,
3980 tx.clone(),
3981 Box::new(cbs),
3982 );
3983
3984 let dest = [0xAA; 16];
3985 let identity = Identity::new(&mut OsRng);
3986 let prv_key = identity.get_private_key().unwrap();
3987
3988 tx.send(Event::RegisterProofStrategy {
3989 dest_hash: dest,
3990 strategy: rns_core::types::ProofStrategy::ProveAll,
3991 signing_key: Some(prv_key),
3992 }).unwrap();
3993 tx.send(Event::Shutdown).unwrap();
3994 driver.run();
3995
3996 assert!(driver.proof_strategies.contains_key(&dest));
3997 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
3998 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveAll);
3999 assert!(id_opt.is_some());
4000 }
4001
4002 #[test]
4003 fn register_proof_strategy_prove_none_no_identity() {
4004 let (tx, rx) = event::channel();
4005 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4006 let mut driver = Driver::new(
4007 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4008 rx,
4009 tx.clone(),
4010 Box::new(cbs),
4011 );
4012
4013 let dest = [0xBB; 16];
4014 tx.send(Event::RegisterProofStrategy {
4015 dest_hash: dest,
4016 strategy: rns_core::types::ProofStrategy::ProveNone,
4017 signing_key: None,
4018 }).unwrap();
4019 tx.send(Event::Shutdown).unwrap();
4020 driver.run();
4021
4022 assert!(driver.proof_strategies.contains_key(&dest));
4023 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
4024 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveNone);
4025 assert!(id_opt.is_none());
4026 }
4027
4028 #[test]
4029 fn send_outbound_tracks_sent_packets() {
4030 let (tx, rx) = event::channel();
4031 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4032 let mut driver = Driver::new(
4033 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4034 rx,
4035 tx.clone(),
4036 Box::new(cbs),
4037 );
4038 let info = make_interface_info(1);
4039 driver.engine.register_interface(info);
4040 let (writer, _sent) = MockWriter::new();
4041 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4042
4043 let dest = [0xCC; 16];
4045 let flags = PacketFlags {
4046 header_type: constants::HEADER_1,
4047 context_flag: constants::FLAG_UNSET,
4048 transport_type: constants::TRANSPORT_BROADCAST,
4049 destination_type: constants::DESTINATION_PLAIN,
4050 packet_type: constants::PACKET_TYPE_DATA,
4051 };
4052 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test data").unwrap();
4053 let expected_hash = packet.packet_hash;
4054
4055 tx.send(Event::SendOutbound {
4056 raw: packet.raw,
4057 dest_type: constants::DESTINATION_PLAIN,
4058 attached_interface: None,
4059 }).unwrap();
4060 tx.send(Event::Shutdown).unwrap();
4061 driver.run();
4062
4063 assert!(driver.sent_packets.contains_key(&expected_hash));
4065 let (tracked_dest, _sent_time) = &driver.sent_packets[&expected_hash];
4066 assert_eq!(tracked_dest, &dest);
4067 }
4068
4069 #[test]
4070 fn prove_all_generates_proof_on_delivery() {
4071 let (tx, rx) = event::channel();
4072 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4073 let mut driver = Driver::new(
4074 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4075 rx,
4076 tx.clone(),
4077 Box::new(cbs),
4078 );
4079 let info = make_interface_info(1);
4080 driver.engine.register_interface(info);
4081 let (writer, sent) = MockWriter::new();
4082 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4083
4084 let dest = [0xDD; 16];
4086 let identity = Identity::new(&mut OsRng);
4087 let prv_key = identity.get_private_key().unwrap();
4088 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4089 driver.proof_strategies.insert(dest, (
4090 rns_core::types::ProofStrategy::ProveAll,
4091 Some(Identity::from_private_key(&prv_key)),
4092 ));
4093
4094 let flags = PacketFlags {
4096 header_type: constants::HEADER_1,
4097 context_flag: constants::FLAG_UNSET,
4098 transport_type: constants::TRANSPORT_BROADCAST,
4099 destination_type: constants::DESTINATION_SINGLE,
4100 packet_type: constants::PACKET_TYPE_DATA,
4101 };
4102 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4103
4104 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4105 tx.send(Event::Shutdown).unwrap();
4106 driver.run();
4107
4108 assert_eq!(deliveries.lock().unwrap().len(), 1);
4110
4111 let sent_packets = sent.lock().unwrap();
4113 let has_proof = sent_packets.iter().any(|raw| {
4115 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4116 flags.packet_type == constants::PACKET_TYPE_PROOF
4117 });
4118 assert!(has_proof, "ProveAll should generate a proof packet: sent {} packets", sent_packets.len());
4119 }
4120
4121 #[test]
4122 fn prove_none_does_not_generate_proof() {
4123 let (tx, rx) = event::channel();
4124 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4125 let mut driver = Driver::new(
4126 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4127 rx,
4128 tx.clone(),
4129 Box::new(cbs),
4130 );
4131 let info = make_interface_info(1);
4132 driver.engine.register_interface(info);
4133 let (writer, sent) = MockWriter::new();
4134 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4135
4136 let dest = [0xDD; 16];
4138 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4139 driver.proof_strategies.insert(dest, (
4140 rns_core::types::ProofStrategy::ProveNone,
4141 None,
4142 ));
4143
4144 let flags = PacketFlags {
4146 header_type: constants::HEADER_1,
4147 context_flag: constants::FLAG_UNSET,
4148 transport_type: constants::TRANSPORT_BROADCAST,
4149 destination_type: constants::DESTINATION_SINGLE,
4150 packet_type: constants::PACKET_TYPE_DATA,
4151 };
4152 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4153
4154 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4155 tx.send(Event::Shutdown).unwrap();
4156 driver.run();
4157
4158 assert_eq!(deliveries.lock().unwrap().len(), 1);
4160
4161 let sent_packets = sent.lock().unwrap();
4163 let has_proof = sent_packets.iter().any(|raw| {
4164 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4165 flags.packet_type == constants::PACKET_TYPE_PROOF
4166 });
4167 assert!(!has_proof, "ProveNone should not generate a proof packet");
4168 }
4169
4170 #[test]
4171 fn no_proof_strategy_does_not_generate_proof() {
4172 let (tx, rx) = event::channel();
4173 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4174 let mut driver = Driver::new(
4175 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4176 rx,
4177 tx.clone(),
4178 Box::new(cbs),
4179 );
4180 let info = make_interface_info(1);
4181 driver.engine.register_interface(info);
4182 let (writer, sent) = MockWriter::new();
4183 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4184
4185 let dest = [0xDD; 16];
4187 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4188
4189 let flags = PacketFlags {
4190 header_type: constants::HEADER_1,
4191 context_flag: constants::FLAG_UNSET,
4192 transport_type: constants::TRANSPORT_BROADCAST,
4193 destination_type: constants::DESTINATION_SINGLE,
4194 packet_type: constants::PACKET_TYPE_DATA,
4195 };
4196 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4197
4198 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4199 tx.send(Event::Shutdown).unwrap();
4200 driver.run();
4201
4202 assert_eq!(deliveries.lock().unwrap().len(), 1);
4203
4204 let sent_packets = sent.lock().unwrap();
4205 let has_proof = sent_packets.iter().any(|raw| {
4206 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4207 flags.packet_type == constants::PACKET_TYPE_PROOF
4208 });
4209 assert!(!has_proof, "No proof strategy means no proof generated");
4210 }
4211
4212 #[test]
4213 fn prove_app_calls_callback() {
4214 let (tx, rx) = event::channel();
4215 let proof_requested = Arc::new(Mutex::new(Vec::new()));
4216 let deliveries = Arc::new(Mutex::new(Vec::new()));
4217 let cbs = MockCallbacks {
4218 announces: Arc::new(Mutex::new(Vec::new())),
4219 paths: Arc::new(Mutex::new(Vec::new())),
4220 deliveries: deliveries.clone(),
4221 iface_ups: Arc::new(Mutex::new(Vec::new())),
4222 iface_downs: Arc::new(Mutex::new(Vec::new())),
4223 link_established: Arc::new(Mutex::new(Vec::new())),
4224 link_closed: Arc::new(Mutex::new(Vec::new())),
4225 remote_identified: Arc::new(Mutex::new(Vec::new())),
4226 resources_received: Arc::new(Mutex::new(Vec::new())),
4227 resource_completed: Arc::new(Mutex::new(Vec::new())),
4228 resource_failed: Arc::new(Mutex::new(Vec::new())),
4229 channel_messages: Arc::new(Mutex::new(Vec::new())),
4230 link_data: Arc::new(Mutex::new(Vec::new())),
4231 responses: Arc::new(Mutex::new(Vec::new())),
4232 proofs: Arc::new(Mutex::new(Vec::new())),
4233 proof_requested: proof_requested.clone(),
4234 };
4235
4236 let mut driver = Driver::new(
4237 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4238 rx,
4239 tx.clone(),
4240 Box::new(cbs),
4241 );
4242 let info = make_interface_info(1);
4243 driver.engine.register_interface(info);
4244 let (writer, sent) = MockWriter::new();
4245 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4246
4247 let dest = [0xDD; 16];
4249 let identity = Identity::new(&mut OsRng);
4250 let prv_key = identity.get_private_key().unwrap();
4251 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4252 driver.proof_strategies.insert(dest, (
4253 rns_core::types::ProofStrategy::ProveApp,
4254 Some(Identity::from_private_key(&prv_key)),
4255 ));
4256
4257 let flags = PacketFlags {
4258 header_type: constants::HEADER_1,
4259 context_flag: constants::FLAG_UNSET,
4260 transport_type: constants::TRANSPORT_BROADCAST,
4261 destination_type: constants::DESTINATION_SINGLE,
4262 packet_type: constants::PACKET_TYPE_DATA,
4263 };
4264 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"app test").unwrap();
4265
4266 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4267 tx.send(Event::Shutdown).unwrap();
4268 driver.run();
4269
4270 let prs = proof_requested.lock().unwrap();
4272 assert_eq!(prs.len(), 1);
4273 assert_eq!(prs[0].0, DestHash(dest));
4274
4275 let sent_packets = sent.lock().unwrap();
4277 let has_proof = sent_packets.iter().any(|raw| {
4278 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4279 flags.packet_type == constants::PACKET_TYPE_PROOF
4280 });
4281 assert!(has_proof, "ProveApp (callback returns true) should generate a proof");
4282 }
4283
4284 #[test]
4285 fn inbound_proof_fires_callback() {
4286 let (tx, rx) = event::channel();
4287 let proofs = Arc::new(Mutex::new(Vec::new()));
4288 let cbs = MockCallbacks {
4289 announces: Arc::new(Mutex::new(Vec::new())),
4290 paths: Arc::new(Mutex::new(Vec::new())),
4291 deliveries: Arc::new(Mutex::new(Vec::new())),
4292 iface_ups: Arc::new(Mutex::new(Vec::new())),
4293 iface_downs: Arc::new(Mutex::new(Vec::new())),
4294 link_established: Arc::new(Mutex::new(Vec::new())),
4295 link_closed: Arc::new(Mutex::new(Vec::new())),
4296 remote_identified: Arc::new(Mutex::new(Vec::new())),
4297 resources_received: Arc::new(Mutex::new(Vec::new())),
4298 resource_completed: Arc::new(Mutex::new(Vec::new())),
4299 resource_failed: Arc::new(Mutex::new(Vec::new())),
4300 channel_messages: Arc::new(Mutex::new(Vec::new())),
4301 link_data: Arc::new(Mutex::new(Vec::new())),
4302 responses: Arc::new(Mutex::new(Vec::new())),
4303 proofs: proofs.clone(),
4304 proof_requested: Arc::new(Mutex::new(Vec::new())),
4305 };
4306
4307 let mut driver = Driver::new(
4308 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4309 rx,
4310 tx.clone(),
4311 Box::new(cbs),
4312 );
4313 let info = make_interface_info(1);
4314 driver.engine.register_interface(info);
4315 let (writer, _sent) = MockWriter::new();
4316 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4317
4318 let dest = [0xEE; 16];
4320 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4321
4322 let tracked_hash = [0x42u8; 32];
4324 let sent_time = time::now() - 0.5; driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4326
4327 let mut proof_data = Vec::new();
4329 proof_data.extend_from_slice(&tracked_hash);
4330 proof_data.extend_from_slice(&[0xAA; 64]); let flags = PacketFlags {
4333 header_type: constants::HEADER_1,
4334 context_flag: constants::FLAG_UNSET,
4335 transport_type: constants::TRANSPORT_BROADCAST,
4336 destination_type: constants::DESTINATION_SINGLE,
4337 packet_type: constants::PACKET_TYPE_PROOF,
4338 };
4339 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4340
4341 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4342 tx.send(Event::Shutdown).unwrap();
4343 driver.run();
4344
4345 let proof_list = proofs.lock().unwrap();
4347 assert_eq!(proof_list.len(), 1);
4348 assert_eq!(proof_list[0].0, DestHash(dest));
4349 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4350 assert!(proof_list[0].2 >= 0.4, "RTT should be approximately 0.5s, got {}", proof_list[0].2);
4351
4352 assert!(!driver.sent_packets.contains_key(&tracked_hash));
4354 }
4355
4356 #[test]
4357 fn inbound_proof_for_unknown_packet_is_ignored() {
4358 let (tx, rx) = event::channel();
4359 let proofs = Arc::new(Mutex::new(Vec::new()));
4360 let cbs = MockCallbacks {
4361 announces: Arc::new(Mutex::new(Vec::new())),
4362 paths: Arc::new(Mutex::new(Vec::new())),
4363 deliveries: Arc::new(Mutex::new(Vec::new())),
4364 iface_ups: Arc::new(Mutex::new(Vec::new())),
4365 iface_downs: Arc::new(Mutex::new(Vec::new())),
4366 link_established: Arc::new(Mutex::new(Vec::new())),
4367 link_closed: Arc::new(Mutex::new(Vec::new())),
4368 remote_identified: Arc::new(Mutex::new(Vec::new())),
4369 resources_received: Arc::new(Mutex::new(Vec::new())),
4370 resource_completed: Arc::new(Mutex::new(Vec::new())),
4371 resource_failed: Arc::new(Mutex::new(Vec::new())),
4372 channel_messages: Arc::new(Mutex::new(Vec::new())),
4373 link_data: Arc::new(Mutex::new(Vec::new())),
4374 responses: Arc::new(Mutex::new(Vec::new())),
4375 proofs: proofs.clone(),
4376 proof_requested: Arc::new(Mutex::new(Vec::new())),
4377 };
4378
4379 let mut driver = Driver::new(
4380 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4381 rx,
4382 tx.clone(),
4383 Box::new(cbs),
4384 );
4385 let info = make_interface_info(1);
4386 driver.engine.register_interface(info);
4387 let (writer, _sent) = MockWriter::new();
4388 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4389
4390 let dest = [0xEE; 16];
4391 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4392
4393 let unknown_hash = [0xFF; 32];
4395 let mut proof_data = Vec::new();
4396 proof_data.extend_from_slice(&unknown_hash);
4397 proof_data.extend_from_slice(&[0xAA; 64]);
4398
4399 let flags = PacketFlags {
4400 header_type: constants::HEADER_1,
4401 context_flag: constants::FLAG_UNSET,
4402 transport_type: constants::TRANSPORT_BROADCAST,
4403 destination_type: constants::DESTINATION_SINGLE,
4404 packet_type: constants::PACKET_TYPE_PROOF,
4405 };
4406 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4407
4408 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4409 tx.send(Event::Shutdown).unwrap();
4410 driver.run();
4411
4412 assert!(proofs.lock().unwrap().is_empty());
4414 }
4415
4416 #[test]
4417 fn inbound_proof_with_valid_signature_fires_callback() {
4418 let (tx, rx) = event::channel();
4420 let proofs = Arc::new(Mutex::new(Vec::new()));
4421 let cbs = MockCallbacks {
4422 announces: Arc::new(Mutex::new(Vec::new())),
4423 paths: Arc::new(Mutex::new(Vec::new())),
4424 deliveries: Arc::new(Mutex::new(Vec::new())),
4425 iface_ups: Arc::new(Mutex::new(Vec::new())),
4426 iface_downs: Arc::new(Mutex::new(Vec::new())),
4427 link_established: Arc::new(Mutex::new(Vec::new())),
4428 link_closed: Arc::new(Mutex::new(Vec::new())),
4429 remote_identified: Arc::new(Mutex::new(Vec::new())),
4430 resources_received: Arc::new(Mutex::new(Vec::new())),
4431 resource_completed: Arc::new(Mutex::new(Vec::new())),
4432 resource_failed: Arc::new(Mutex::new(Vec::new())),
4433 channel_messages: Arc::new(Mutex::new(Vec::new())),
4434 link_data: Arc::new(Mutex::new(Vec::new())),
4435 responses: Arc::new(Mutex::new(Vec::new())),
4436 proofs: proofs.clone(),
4437 proof_requested: Arc::new(Mutex::new(Vec::new())),
4438 };
4439
4440 let mut driver = Driver::new(
4441 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4442 rx,
4443 tx.clone(),
4444 Box::new(cbs),
4445 );
4446 let info = make_interface_info(1);
4447 driver.engine.register_interface(info);
4448 let (writer, _sent) = MockWriter::new();
4449 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4450
4451 let dest = [0xEE; 16];
4452 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4453
4454 let identity = Identity::new(&mut OsRng);
4456 let pub_key = identity.get_public_key();
4457 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4458 dest_hash: DestHash(dest),
4459 identity_hash: IdentityHash(*identity.hash()),
4460 public_key: pub_key.unwrap(),
4461 app_data: None,
4462 hops: 0,
4463 received_at: time::now(),
4464 receiving_interface: InterfaceId(0),
4465 });
4466
4467 let tracked_hash = [0x42u8; 32];
4469 let sent_time = time::now() - 0.5;
4470 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4471
4472 let signature = identity.sign(&tracked_hash).unwrap();
4473 let mut proof_data = Vec::new();
4474 proof_data.extend_from_slice(&tracked_hash);
4475 proof_data.extend_from_slice(&signature);
4476
4477 let flags = PacketFlags {
4478 header_type: constants::HEADER_1,
4479 context_flag: constants::FLAG_UNSET,
4480 transport_type: constants::TRANSPORT_BROADCAST,
4481 destination_type: constants::DESTINATION_SINGLE,
4482 packet_type: constants::PACKET_TYPE_PROOF,
4483 };
4484 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4485
4486 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4487 tx.send(Event::Shutdown).unwrap();
4488 driver.run();
4489
4490 let proof_list = proofs.lock().unwrap();
4492 assert_eq!(proof_list.len(), 1);
4493 assert_eq!(proof_list[0].0, DestHash(dest));
4494 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4495 }
4496
4497 #[test]
4498 fn inbound_proof_with_invalid_signature_rejected() {
4499 let (tx, rx) = event::channel();
4501 let proofs = Arc::new(Mutex::new(Vec::new()));
4502 let cbs = MockCallbacks {
4503 announces: Arc::new(Mutex::new(Vec::new())),
4504 paths: Arc::new(Mutex::new(Vec::new())),
4505 deliveries: Arc::new(Mutex::new(Vec::new())),
4506 iface_ups: Arc::new(Mutex::new(Vec::new())),
4507 iface_downs: Arc::new(Mutex::new(Vec::new())),
4508 link_established: Arc::new(Mutex::new(Vec::new())),
4509 link_closed: Arc::new(Mutex::new(Vec::new())),
4510 remote_identified: Arc::new(Mutex::new(Vec::new())),
4511 resources_received: Arc::new(Mutex::new(Vec::new())),
4512 resource_completed: Arc::new(Mutex::new(Vec::new())),
4513 resource_failed: Arc::new(Mutex::new(Vec::new())),
4514 channel_messages: Arc::new(Mutex::new(Vec::new())),
4515 link_data: Arc::new(Mutex::new(Vec::new())),
4516 responses: Arc::new(Mutex::new(Vec::new())),
4517 proofs: proofs.clone(),
4518 proof_requested: Arc::new(Mutex::new(Vec::new())),
4519 };
4520
4521 let mut driver = Driver::new(
4522 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4523 rx,
4524 tx.clone(),
4525 Box::new(cbs),
4526 );
4527 let info = make_interface_info(1);
4528 driver.engine.register_interface(info);
4529 let (writer, _sent) = MockWriter::new();
4530 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4531
4532 let dest = [0xEE; 16];
4533 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4534
4535 let identity = Identity::new(&mut OsRng);
4537 let pub_key = identity.get_public_key();
4538 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4539 dest_hash: DestHash(dest),
4540 identity_hash: IdentityHash(*identity.hash()),
4541 public_key: pub_key.unwrap(),
4542 app_data: None,
4543 hops: 0,
4544 received_at: time::now(),
4545 receiving_interface: InterfaceId(0),
4546 });
4547
4548 let tracked_hash = [0x42u8; 32];
4550 let sent_time = time::now() - 0.5;
4551 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4552
4553 let mut proof_data = Vec::new();
4555 proof_data.extend_from_slice(&tracked_hash);
4556 proof_data.extend_from_slice(&[0xAA; 64]);
4557
4558 let flags = PacketFlags {
4559 header_type: constants::HEADER_1,
4560 context_flag: constants::FLAG_UNSET,
4561 transport_type: constants::TRANSPORT_BROADCAST,
4562 destination_type: constants::DESTINATION_SINGLE,
4563 packet_type: constants::PACKET_TYPE_PROOF,
4564 };
4565 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4566
4567 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4568 tx.send(Event::Shutdown).unwrap();
4569 driver.run();
4570
4571 assert!(proofs.lock().unwrap().is_empty());
4573 }
4574
4575 #[test]
4576 fn proof_data_is_valid_explicit_proof() {
4577 let (tx, rx) = event::channel();
4579 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4580 let mut driver = Driver::new(
4581 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4582 rx,
4583 tx.clone(),
4584 Box::new(cbs),
4585 );
4586 let info = make_interface_info(1);
4587 driver.engine.register_interface(info);
4588 let (writer, sent) = MockWriter::new();
4589 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4590
4591 let dest = [0xDD; 16];
4592 let identity = Identity::new(&mut OsRng);
4593 let prv_key = identity.get_private_key().unwrap();
4594 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4595 driver.proof_strategies.insert(dest, (
4596 rns_core::types::ProofStrategy::ProveAll,
4597 Some(Identity::from_private_key(&prv_key)),
4598 ));
4599
4600 let flags = PacketFlags {
4601 header_type: constants::HEADER_1,
4602 context_flag: constants::FLAG_UNSET,
4603 transport_type: constants::TRANSPORT_BROADCAST,
4604 destination_type: constants::DESTINATION_SINGLE,
4605 packet_type: constants::PACKET_TYPE_DATA,
4606 };
4607 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"verify me").unwrap();
4608 let data_packet_hash = data_packet.packet_hash;
4609
4610 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
4611 tx.send(Event::Shutdown).unwrap();
4612 driver.run();
4613
4614 let sent_packets = sent.lock().unwrap();
4616 let proof_raw = sent_packets.iter().find(|raw| {
4617 let f = PacketFlags::unpack(raw[0] & 0x7F);
4618 f.packet_type == constants::PACKET_TYPE_PROOF
4619 });
4620 assert!(proof_raw.is_some(), "Should have sent a proof");
4621
4622 let proof_packet = RawPacket::unpack(proof_raw.unwrap()).unwrap();
4623 assert_eq!(proof_packet.data.len(), 96, "Explicit proof should be 96 bytes");
4625
4626 let result = rns_core::receipt::validate_proof(
4628 &proof_packet.data,
4629 &data_packet_hash,
4630 &Identity::from_private_key(&prv_key), );
4632 assert_eq!(result, rns_core::receipt::ProofResult::Valid);
4633 }
4634
4635 #[test]
4636 fn query_local_destinations_empty() {
4637 let (tx, rx) = event::channel();
4638 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4639 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 };
4640 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4641
4642 let (resp_tx, resp_rx) = mpsc::channel();
4643 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4644 tx.send(Event::Shutdown).unwrap();
4645 driver.run();
4646
4647 match resp_rx.recv().unwrap() {
4648 QueryResponse::LocalDestinations(entries) => {
4649 assert_eq!(entries.len(), 2);
4651 for entry in &entries {
4652 assert_eq!(entry.dest_type, rns_core::constants::DESTINATION_PLAIN);
4653 }
4654 }
4655 other => panic!("expected LocalDestinations, got {:?}", other),
4656 }
4657 }
4658
4659 #[test]
4660 fn query_local_destinations_with_registered() {
4661 let (tx, rx) = event::channel();
4662 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4663 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 };
4664 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4665
4666 let dest_hash = [0xAA; 16];
4667 tx.send(Event::RegisterDestination {
4668 dest_hash,
4669 dest_type: rns_core::constants::DESTINATION_SINGLE,
4670 }).unwrap();
4671
4672 let (resp_tx, resp_rx) = mpsc::channel();
4673 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4674 tx.send(Event::Shutdown).unwrap();
4675 driver.run();
4676
4677 match resp_rx.recv().unwrap() {
4678 QueryResponse::LocalDestinations(entries) => {
4679 assert_eq!(entries.len(), 3);
4681 assert!(entries.iter().any(|e| e.hash == dest_hash
4682 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4683 }
4684 other => panic!("expected LocalDestinations, got {:?}", other),
4685 }
4686 }
4687
4688 #[test]
4689 fn query_local_destinations_tracks_link_dest() {
4690 let (tx, rx) = event::channel();
4691 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4692 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 };
4693 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4694
4695 let dest_hash = [0xBB; 16];
4696 tx.send(Event::RegisterLinkDestination {
4697 dest_hash,
4698 sig_prv_bytes: [0x11; 32],
4699 sig_pub_bytes: [0x22; 32],
4700 resource_strategy: 0,
4701 }).unwrap();
4702
4703 let (resp_tx, resp_rx) = mpsc::channel();
4704 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4705 tx.send(Event::Shutdown).unwrap();
4706 driver.run();
4707
4708 match resp_rx.recv().unwrap() {
4709 QueryResponse::LocalDestinations(entries) => {
4710 assert_eq!(entries.len(), 3);
4712 assert!(entries.iter().any(|e| e.hash == dest_hash
4713 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4714 }
4715 other => panic!("expected LocalDestinations, got {:?}", other),
4716 }
4717 }
4718
4719 #[test]
4720 fn query_links_empty() {
4721 let (tx, rx) = event::channel();
4722 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4723 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 };
4724 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4725
4726 let (resp_tx, resp_rx) = mpsc::channel();
4727 tx.send(Event::Query(QueryRequest::Links, resp_tx)).unwrap();
4728 tx.send(Event::Shutdown).unwrap();
4729 driver.run();
4730
4731 match resp_rx.recv().unwrap() {
4732 QueryResponse::Links(entries) => {
4733 assert!(entries.is_empty());
4734 }
4735 other => panic!("expected Links, got {:?}", other),
4736 }
4737 }
4738
4739 #[test]
4740 fn query_resources_empty() {
4741 let (tx, rx) = event::channel();
4742 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4743 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 };
4744 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4745
4746 let (resp_tx, resp_rx) = mpsc::channel();
4747 tx.send(Event::Query(QueryRequest::Resources, resp_tx)).unwrap();
4748 tx.send(Event::Shutdown).unwrap();
4749 driver.run();
4750
4751 match resp_rx.recv().unwrap() {
4752 QueryResponse::Resources(entries) => {
4753 assert!(entries.is_empty());
4754 }
4755 other => panic!("expected Resources, got {:?}", other),
4756 }
4757 }
4758
4759 #[test]
4760 fn infer_interface_type_from_name() {
4761 assert_eq!(
4762 super::infer_interface_type("TCPServerInterface/Client-1234"),
4763 "TCPServerClientInterface"
4764 );
4765 assert_eq!(
4766 super::infer_interface_type("BackboneInterface/5"),
4767 "BackboneInterface"
4768 );
4769 assert_eq!(
4770 super::infer_interface_type("LocalInterface"),
4771 "LocalServerClientInterface"
4772 );
4773 assert_eq!(
4774 super::infer_interface_type("MyAutoGroup:fe80::1"),
4775 "AutoInterface"
4776 );
4777 }
4778
4779 #[test]
4782 fn test_extract_dest_hash_empty() {
4783 assert_eq!(super::extract_dest_hash(&[]), [0u8; 16]);
4784 }
4785
4786 #[test]
4791 fn send_probe_unknown_dest_returns_none() {
4792 let (tx, rx) = event::channel();
4793 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4794 let mut driver = Driver::new(
4795 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4796 rx,
4797 tx.clone(),
4798 Box::new(cbs),
4799 );
4800 let info = make_interface_info(1);
4801 driver.engine.register_interface(info);
4802 let (writer, _sent) = MockWriter::new();
4803 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4804
4805 let (resp_tx, resp_rx) = mpsc::channel();
4807 tx.send(Event::Query(QueryRequest::SendProbe {
4808 dest_hash: [0xAA; 16],
4809 payload_size: 16,
4810 }, resp_tx)).unwrap();
4811 tx.send(Event::Shutdown).unwrap();
4812 driver.run();
4813
4814 match resp_rx.recv().unwrap() {
4815 QueryResponse::SendProbe(None) => {}
4816 other => panic!("expected SendProbe(None), got {:?}", other),
4817 }
4818 }
4819
4820 #[test]
4821 fn send_probe_known_dest_returns_packet_hash() {
4822 let (tx, rx) = event::channel();
4823 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4824 let mut driver = Driver::new(
4825 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4826 rx,
4827 tx.clone(),
4828 Box::new(cbs),
4829 );
4830 let info = make_interface_info(1);
4831 driver.engine.register_interface(info);
4832 let (writer, sent) = MockWriter::new();
4833 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4834
4835 let remote_identity = Identity::new(&mut OsRng);
4837 let dest_hash = rns_core::destination::destination_hash(
4838 "rnstransport", &["probe"], Some(remote_identity.hash()),
4839 );
4840
4841 let (inject_tx, inject_rx) = mpsc::channel();
4843 tx.send(Event::Query(QueryRequest::InjectIdentity {
4844 dest_hash,
4845 identity_hash: *remote_identity.hash(),
4846 public_key: remote_identity.get_public_key().unwrap(),
4847 app_data: None,
4848 hops: 1,
4849 received_at: 0.0,
4850 }, inject_tx)).unwrap();
4851
4852 let (resp_tx, resp_rx) = mpsc::channel();
4854 tx.send(Event::Query(QueryRequest::SendProbe {
4855 dest_hash,
4856 payload_size: 16,
4857 }, resp_tx)).unwrap();
4858 tx.send(Event::Shutdown).unwrap();
4859 driver.run();
4860
4861 match inject_rx.recv().unwrap() {
4863 QueryResponse::InjectIdentity(true) => {}
4864 other => panic!("expected InjectIdentity(true), got {:?}", other),
4865 }
4866
4867 match resp_rx.recv().unwrap() {
4869 QueryResponse::SendProbe(Some((packet_hash, _hops))) => {
4870 assert_ne!(packet_hash, [0u8; 32]);
4872 assert!(driver.sent_packets.contains_key(&packet_hash));
4874 let sent_data = sent.lock().unwrap();
4876 assert!(!sent_data.is_empty(), "Probe packet should be sent on wire");
4877 let raw = &sent_data[0];
4879 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4880 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
4881 assert_eq!(flags.destination_type, constants::DESTINATION_SINGLE);
4882 }
4883 other => panic!("expected SendProbe(Some(..)), got {:?}", other),
4884 }
4885 }
4886
4887 #[test]
4888 fn check_proof_not_found_returns_none() {
4889 let (tx, rx) = event::channel();
4890 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4891 let mut driver = Driver::new(
4892 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4893 rx,
4894 tx.clone(),
4895 Box::new(cbs),
4896 );
4897
4898 let (resp_tx, resp_rx) = mpsc::channel();
4899 tx.send(Event::Query(QueryRequest::CheckProof {
4900 packet_hash: [0xBB; 32],
4901 }, resp_tx)).unwrap();
4902 tx.send(Event::Shutdown).unwrap();
4903 driver.run();
4904
4905 match resp_rx.recv().unwrap() {
4906 QueryResponse::CheckProof(None) => {}
4907 other => panic!("expected CheckProof(None), got {:?}", other),
4908 }
4909 }
4910
4911 #[test]
4912 fn check_proof_found_returns_rtt() {
4913 let (tx, rx) = event::channel();
4914 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4915 let mut driver = Driver::new(
4916 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4917 rx,
4918 tx.clone(),
4919 Box::new(cbs),
4920 );
4921
4922 let packet_hash = [0xCC; 32];
4924 driver.completed_proofs.insert(packet_hash, (0.123, time::now()));
4925
4926 let (resp_tx, resp_rx) = mpsc::channel();
4927 tx.send(Event::Query(QueryRequest::CheckProof {
4928 packet_hash,
4929 }, resp_tx)).unwrap();
4930 tx.send(Event::Shutdown).unwrap();
4931 driver.run();
4932
4933 match resp_rx.recv().unwrap() {
4934 QueryResponse::CheckProof(Some(rtt)) => {
4935 assert!((rtt - 0.123).abs() < 0.001, "RTT should be ~0.123, got {}", rtt);
4936 }
4937 other => panic!("expected CheckProof(Some(..)), got {:?}", other),
4938 }
4939 assert!(!driver.completed_proofs.contains_key(&packet_hash));
4941 }
4942
4943 #[test]
4944 fn inbound_proof_populates_completed_proofs() {
4945 let (tx, rx) = event::channel();
4946 let proofs = Arc::new(Mutex::new(Vec::new()));
4947 let cbs = MockCallbacks {
4948 announces: Arc::new(Mutex::new(Vec::new())),
4949 paths: Arc::new(Mutex::new(Vec::new())),
4950 deliveries: Arc::new(Mutex::new(Vec::new())),
4951 iface_ups: Arc::new(Mutex::new(Vec::new())),
4952 iface_downs: Arc::new(Mutex::new(Vec::new())),
4953 link_established: Arc::new(Mutex::new(Vec::new())),
4954 link_closed: Arc::new(Mutex::new(Vec::new())),
4955 remote_identified: Arc::new(Mutex::new(Vec::new())),
4956 resources_received: Arc::new(Mutex::new(Vec::new())),
4957 resource_completed: Arc::new(Mutex::new(Vec::new())),
4958 resource_failed: Arc::new(Mutex::new(Vec::new())),
4959 channel_messages: Arc::new(Mutex::new(Vec::new())),
4960 link_data: Arc::new(Mutex::new(Vec::new())),
4961 responses: Arc::new(Mutex::new(Vec::new())),
4962 proofs: proofs.clone(),
4963 proof_requested: Arc::new(Mutex::new(Vec::new())),
4964 };
4965
4966 let mut driver = Driver::new(
4967 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
4968 rx,
4969 tx.clone(),
4970 Box::new(cbs),
4971 );
4972 let info = make_interface_info(1);
4973 driver.engine.register_interface(info);
4974 let (writer, sent) = MockWriter::new();
4975 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4976
4977 let dest = [0xDD; 16];
4979 let identity = Identity::new(&mut OsRng);
4980 let prv_key = identity.get_private_key().unwrap();
4981 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4982 driver.proof_strategies.insert(dest, (
4983 rns_core::types::ProofStrategy::ProveAll,
4984 Some(Identity::from_private_key(&prv_key)),
4985 ));
4986
4987 let flags = PacketFlags {
4989 header_type: constants::HEADER_1,
4990 context_flag: constants::FLAG_UNSET,
4991 transport_type: constants::TRANSPORT_BROADCAST,
4992 destination_type: constants::DESTINATION_SINGLE,
4993 packet_type: constants::PACKET_TYPE_DATA,
4994 };
4995 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"probe data").unwrap();
4996 let data_packet_hash = data_packet.packet_hash;
4997
4998 driver.sent_packets.insert(data_packet_hash, (dest, time::now()));
5000
5001 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
5003 tx.send(Event::Shutdown).unwrap();
5004 driver.run();
5005
5006 let sent_packets = sent.lock().unwrap();
5008 let proof_packets: Vec<_> = sent_packets.iter().filter(|raw| {
5009 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5010 flags.packet_type == constants::PACKET_TYPE_PROOF
5011 }).collect();
5012 assert!(!proof_packets.is_empty(), "Should have sent a proof packet");
5013
5014 let proof_raw = proof_packets[0].clone();
5025 drop(sent_packets); let (tx2, rx2) = event::channel();
5029 let proofs2 = Arc::new(Mutex::new(Vec::new()));
5030 let cbs2 = MockCallbacks {
5031 announces: Arc::new(Mutex::new(Vec::new())),
5032 paths: Arc::new(Mutex::new(Vec::new())),
5033 deliveries: Arc::new(Mutex::new(Vec::new())),
5034 iface_ups: Arc::new(Mutex::new(Vec::new())),
5035 iface_downs: Arc::new(Mutex::new(Vec::new())),
5036 link_established: Arc::new(Mutex::new(Vec::new())),
5037 link_closed: Arc::new(Mutex::new(Vec::new())),
5038 remote_identified: Arc::new(Mutex::new(Vec::new())),
5039 resources_received: Arc::new(Mutex::new(Vec::new())),
5040 resource_completed: Arc::new(Mutex::new(Vec::new())),
5041 resource_failed: Arc::new(Mutex::new(Vec::new())),
5042 channel_messages: Arc::new(Mutex::new(Vec::new())),
5043 link_data: Arc::new(Mutex::new(Vec::new())),
5044 responses: Arc::new(Mutex::new(Vec::new())),
5045 proofs: proofs2.clone(),
5046 proof_requested: Arc::new(Mutex::new(Vec::new())),
5047 };
5048 let mut driver2 = Driver::new(
5049 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5050 rx2,
5051 tx2.clone(),
5052 Box::new(cbs2),
5053 );
5054 let info2 = make_interface_info(1);
5055 driver2.engine.register_interface(info2);
5056 let (writer2, _sent2) = MockWriter::new();
5057 driver2.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer2), true));
5058
5059 driver2.sent_packets.insert(data_packet_hash, (dest, time::now()));
5061
5062 tx2.send(Event::Frame { interface_id: InterfaceId(1), data: proof_raw }).unwrap();
5064 tx2.send(Event::Shutdown).unwrap();
5065 driver2.run();
5066
5067 let proof_events = proofs2.lock().unwrap();
5069 assert_eq!(proof_events.len(), 1, "on_proof callback should fire once");
5070 assert_eq!(proof_events[0].1.0, data_packet_hash, "proof should match original packet hash");
5071 assert!(proof_events[0].2 >= 0.0, "RTT should be non-negative");
5072
5073 assert!(driver2.completed_proofs.contains_key(&data_packet_hash),
5075 "completed_proofs should contain the packet hash");
5076 let (rtt, _received) = driver2.completed_proofs[&data_packet_hash];
5077 assert!(rtt >= 0.0, "RTT should be non-negative");
5078 }
5079
5080 #[test]
5081 fn interface_stats_includes_probe_responder() {
5082 let (tx, rx) = event::channel();
5083 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5084 let mut driver = Driver::new(
5085 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]), prefer_shorter_path: false, max_paths_per_destination: 1 },
5086 rx,
5087 tx.clone(),
5088 Box::new(cbs),
5089 );
5090 let (writer, _sent) = MockWriter::new();
5091 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5092
5093 driver.probe_responder_hash = Some([0xEE; 16]);
5095
5096 let (resp_tx, resp_rx) = mpsc::channel();
5097 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
5098 tx.send(Event::Shutdown).unwrap();
5099 driver.run();
5100
5101 match resp_rx.recv().unwrap() {
5102 QueryResponse::InterfaceStats(stats) => {
5103 assert_eq!(stats.probe_responder, Some([0xEE; 16]));
5104 }
5105 other => panic!("expected InterfaceStats, got {:?}", other),
5106 }
5107 }
5108
5109 #[test]
5110 fn interface_stats_probe_responder_none_when_disabled() {
5111 let (tx, rx) = event::channel();
5112 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5113 let mut driver = Driver::new(
5114 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5115 rx,
5116 tx.clone(),
5117 Box::new(cbs),
5118 );
5119 let (writer, _sent) = MockWriter::new();
5120 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5121
5122 let (resp_tx, resp_rx) = mpsc::channel();
5123 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
5124 tx.send(Event::Shutdown).unwrap();
5125 driver.run();
5126
5127 match resp_rx.recv().unwrap() {
5128 QueryResponse::InterfaceStats(stats) => {
5129 assert_eq!(stats.probe_responder, None);
5130 }
5131 other => panic!("expected InterfaceStats, got {:?}", other),
5132 }
5133 }
5134
5135 #[test]
5136 fn test_extract_dest_hash_too_short() {
5137 assert_eq!(super::extract_dest_hash(&[0x00, 0x00, 0xAA]), [0u8; 16]);
5139 }
5140
5141 #[test]
5142 fn test_extract_dest_hash_header1() {
5143 let mut raw = vec![0x00, 0x00]; let dest = [0x11; 16];
5146 raw.extend_from_slice(&dest);
5147 raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
5149 }
5150
5151 #[test]
5152 fn test_extract_dest_hash_header2() {
5153 let mut raw = vec![0x40, 0x00]; raw.extend_from_slice(&[0xAA; 16]); let dest = [0x22; 16];
5157 raw.extend_from_slice(&dest); raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
5160 }
5161
5162 #[test]
5163 fn test_extract_dest_hash_header2_too_short() {
5164 let mut raw = vec![0x40, 0x00];
5166 raw.extend_from_slice(&[0xAA; 16]); assert_eq!(super::extract_dest_hash(&raw), [0u8; 16]);
5168 }
5169
5170 #[test]
5171 fn announce_stores_receiving_interface_in_known_destinations() {
5172 let (tx, rx) = event::channel();
5175 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5176 let mut driver = Driver::new(
5177 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5178 rx,
5179 tx.clone(),
5180 Box::new(cbs),
5181 );
5182 let info = make_interface_info(1);
5183 driver.engine.register_interface(info);
5184 let (writer, _sent) = MockWriter::new();
5185 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5186
5187 let identity = Identity::new(&mut OsRng);
5188 let announce_raw = build_announce_packet(&identity);
5189
5190 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
5191 tx.send(Event::Shutdown).unwrap();
5192 driver.run();
5193
5194 assert_eq!(driver.known_destinations.len(), 1);
5196 let (_, announced) = driver.known_destinations.iter().next().unwrap();
5197 assert_eq!(announced.receiving_interface, InterfaceId(1),
5198 "receiving_interface should match the interface the announce arrived on");
5199 }
5200
5201 #[test]
5202 fn announce_on_different_interfaces_stores_correct_id() {
5203 let (tx, rx) = event::channel();
5205 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5206 let mut driver = Driver::new(
5207 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5208 rx,
5209 tx.clone(),
5210 Box::new(cbs),
5211 );
5212 for id in [1, 2] {
5214 driver.engine.register_interface(make_interface_info(id));
5215 let (writer, _) = MockWriter::new();
5216 driver.interfaces.insert(InterfaceId(id), make_entry(id, Box::new(writer), true));
5217 }
5218
5219 let identity = Identity::new(&mut OsRng);
5220 let announce_raw = build_announce_packet(&identity);
5221
5222 tx.send(Event::Frame { interface_id: InterfaceId(2), data: announce_raw }).unwrap();
5224 tx.send(Event::Shutdown).unwrap();
5225 driver.run();
5226
5227 assert_eq!(driver.known_destinations.len(), 1);
5228 let (_, announced) = driver.known_destinations.iter().next().unwrap();
5229 assert_eq!(announced.receiving_interface, InterfaceId(2));
5230 }
5231
5232 #[test]
5233 fn inject_identity_stores_sentinel_interface() {
5234 let (tx, rx) = event::channel();
5237 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5238 let mut driver = Driver::new(
5239 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5240 rx,
5241 tx.clone(),
5242 Box::new(cbs),
5243 );
5244
5245 let identity = Identity::new(&mut OsRng);
5246 let dest_hash = rns_core::destination::destination_hash(
5247 "test", &["app"], Some(identity.hash()),
5248 );
5249
5250 let (resp_tx, resp_rx) = mpsc::channel();
5251 tx.send(Event::Query(QueryRequest::InjectIdentity {
5252 dest_hash,
5253 identity_hash: *identity.hash(),
5254 public_key: identity.get_public_key().unwrap(),
5255 app_data: Some(b"restored".to_vec()),
5256 hops: 2,
5257 received_at: 99.0,
5258 }, resp_tx)).unwrap();
5259 tx.send(Event::Shutdown).unwrap();
5260 driver.run();
5261
5262 match resp_rx.recv().unwrap() {
5263 QueryResponse::InjectIdentity(true) => {}
5264 other => panic!("expected InjectIdentity(true), got {:?}", other),
5265 }
5266
5267 let announced = driver.known_destinations.get(&dest_hash).expect("identity should be cached");
5268 assert_eq!(announced.receiving_interface, InterfaceId(0),
5269 "injected identity should have sentinel InterfaceId(0)");
5270 assert_eq!(announced.dest_hash.0, dest_hash);
5271 assert_eq!(announced.identity_hash.0, *identity.hash());
5272 assert_eq!(announced.public_key, identity.get_public_key().unwrap());
5273 assert_eq!(announced.app_data, Some(b"restored".to_vec()));
5274 assert_eq!(announced.hops, 2);
5275 assert_eq!(announced.received_at, 99.0);
5276 }
5277
5278 #[test]
5279 fn inject_identity_overwrites_previous_entry() {
5280 let (tx, rx) = event::channel();
5282 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5283 let mut driver = Driver::new(
5284 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5285 rx,
5286 tx.clone(),
5287 Box::new(cbs),
5288 );
5289
5290 let identity = Identity::new(&mut OsRng);
5291 let dest_hash = rns_core::destination::destination_hash(
5292 "test", &["app"], Some(identity.hash()),
5293 );
5294
5295 let (resp_tx1, resp_rx1) = mpsc::channel();
5297 tx.send(Event::Query(QueryRequest::InjectIdentity {
5298 dest_hash,
5299 identity_hash: *identity.hash(),
5300 public_key: identity.get_public_key().unwrap(),
5301 app_data: Some(b"first".to_vec()),
5302 hops: 1,
5303 received_at: 10.0,
5304 }, resp_tx1)).unwrap();
5305
5306 let (resp_tx2, resp_rx2) = mpsc::channel();
5308 tx.send(Event::Query(QueryRequest::InjectIdentity {
5309 dest_hash,
5310 identity_hash: *identity.hash(),
5311 public_key: identity.get_public_key().unwrap(),
5312 app_data: Some(b"second".to_vec()),
5313 hops: 3,
5314 received_at: 20.0,
5315 }, resp_tx2)).unwrap();
5316
5317 tx.send(Event::Shutdown).unwrap();
5318 driver.run();
5319
5320 assert!(matches!(resp_rx1.recv().unwrap(), QueryResponse::InjectIdentity(true)));
5321 assert!(matches!(resp_rx2.recv().unwrap(), QueryResponse::InjectIdentity(true)));
5322
5323 let announced = driver.known_destinations.get(&dest_hash).unwrap();
5325 assert_eq!(announced.app_data, Some(b"second".to_vec()));
5326 assert_eq!(announced.hops, 3);
5327 assert_eq!(announced.received_at, 20.0);
5328 }
5329
5330 #[test]
5331 fn re_announce_updates_receiving_interface() {
5332 let (tx, rx) = event::channel();
5335 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5336 let mut driver = Driver::new(
5337 TransportConfig { transport_enabled: false, identity_hash: None, prefer_shorter_path: false, max_paths_per_destination: 1 },
5338 rx,
5339 tx.clone(),
5340 Box::new(cbs),
5341 );
5342 for id in [1, 2] {
5343 driver.engine.register_interface(make_interface_info(id));
5344 let (writer, _) = MockWriter::new();
5345 driver.interfaces.insert(InterfaceId(id), make_entry(id, Box::new(writer), true));
5346 }
5347
5348 let identity = Identity::new(&mut OsRng);
5349 let announce_raw = build_announce_packet(&identity);
5350
5351 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw.clone() }).unwrap();
5353 let identity2 = Identity::new(&mut OsRng);
5357 let announce_raw2 = build_announce_packet(&identity2);
5358 tx.send(Event::Frame { interface_id: InterfaceId(2), data: announce_raw2 }).unwrap();
5359 tx.send(Event::Shutdown).unwrap();
5360 driver.run();
5361
5362 assert_eq!(driver.known_destinations.len(), 2);
5364 for (_, announced) in &driver.known_destinations {
5365 assert!(announced.receiving_interface == InterfaceId(1) || announced.receiving_interface == InterfaceId(2));
5367 }
5368 let ifaces: Vec<_> = driver.known_destinations.values().map(|a| a.receiving_interface).collect();
5370 assert!(ifaces.contains(&InterfaceId(1)));
5371 assert!(ifaces.contains(&InterfaceId(2)));
5372 }
5373
5374 #[test]
5375 fn test_extract_dest_hash_other_flags_preserved() {
5376 let mut raw = vec![0x3F, 0x00];
5379 let dest = [0x33; 16];
5380 raw.extend_from_slice(&dest);
5381 raw.extend_from_slice(&[0xFF; 10]);
5382 assert_eq!(super::extract_dest_hash(&raw), dest);
5383
5384 let mut raw2 = vec![0xFF, 0x00];
5386 raw2.extend_from_slice(&[0xBB; 16]); let dest2 = [0x44; 16];
5388 raw2.extend_from_slice(&dest2);
5389 raw2.extend_from_slice(&[0xFF; 10]);
5390 assert_eq!(super::extract_dest_hash(&raw2), dest2);
5391 }
5392}