1use std::collections::HashMap;
4
5use rns_core::packet::RawPacket;
6use rns_core::transport::tables::PathEntry;
7use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
8use rns_core::transport::TransportEngine;
9use rns_crypto::{OsRng, Rng};
10
11#[cfg(feature = "rns-hooks")]
12use rns_hooks::{
13 create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot,
14};
15
16use crate::event::{
17 BlackholeInfo, Event, EventReceiver, InterfaceStatsResponse,
18 LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest, QueryResponse,
19 RateTableEntry, SingleInterfaceStat,
20};
21use crate::ifac;
22use crate::interface::{InterfaceEntry, InterfaceStats};
23use crate::link_manager::{LinkManager, LinkManagerAction};
24use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
25use crate::time;
26
27#[cfg(feature = "rns-hooks")]
29struct EngineRef<'a> {
30 engine: &'a TransportEngine,
31 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
32 link_manager: &'a LinkManager,
33 now: f64,
34}
35
36#[cfg(feature = "rns-hooks")]
37impl<'a> EngineAccess for EngineRef<'a> {
38 fn has_path(&self, dest: &[u8; 16]) -> bool {
39 self.engine.has_path(dest)
40 }
41 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
42 self.engine.hops_to(dest)
43 }
44 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
45 self.engine.next_hop(dest)
46 }
47 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
48 self.engine.is_blackholed(identity, self.now)
49 }
50 fn interface_name(&self, id: u64) -> Option<String> {
51 self.interfaces
52 .get(&InterfaceId(id))
53 .map(|e| e.info.name.clone())
54 }
55 fn interface_mode(&self, id: u64) -> Option<u8> {
56 self.interfaces
57 .get(&InterfaceId(id))
58 .map(|e| e.info.mode)
59 }
60 fn identity_hash(&self) -> Option<[u8; 16]> {
61 self.engine.identity_hash().copied()
62 }
63 fn announce_rate(&self, id: u64) -> Option<i32> {
64 self.interfaces
65 .get(&InterfaceId(id))
66 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
67 }
68 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
69 use rns_core::link::types::LinkState;
70 self.link_manager.link_state(link_hash).map(|s| match s {
71 LinkState::Pending => 0,
72 LinkState::Handshake => 1,
73 LinkState::Active => 2,
74 LinkState::Stale => 3,
75 LinkState::Closed => 4,
76 })
77 }
78}
79
80#[cfg(any(test, feature = "rns-hooks"))]
85fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
86 let mut dest = [0u8; 16];
87 if raw.is_empty() {
88 return dest;
89 }
90 let is_header2 = raw[0] & 0x40 != 0;
91 let start = if is_header2 { 18 } else { 2 };
92 let end = start + 16;
93 if raw.len() >= end {
94 dest.copy_from_slice(&raw[start..end]);
95 }
96 dest
97}
98
99#[cfg(feature = "rns-hooks")]
101fn run_hook_inner(
102 programs: &mut [rns_hooks::LoadedProgram],
103 hook_manager: &Option<HookManager>,
104 engine_access: &dyn EngineAccess,
105 ctx: &HookContext,
106 now: f64,
107) -> Option<rns_hooks::ExecuteResult> {
108 if programs.is_empty() {
109 return None;
110 }
111 let mgr = hook_manager.as_ref()?;
112 mgr.run_chain(programs, ctx, engine_access, now)
113}
114
115#[cfg(feature = "rns-hooks")]
117fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
118 actions
119 .into_iter()
120 .map(|a| {
121 use rns_hooks::ActionWire;
122 match a {
123 ActionWire::SendOnInterface { interface, raw } => {
124 TransportAction::SendOnInterface {
125 interface: InterfaceId(interface),
126 raw,
127 }
128 }
129 ActionWire::BroadcastOnAllInterfaces { raw, exclude, has_exclude } => {
130 TransportAction::BroadcastOnAllInterfaces {
131 raw,
132 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
133 }
134 }
135 ActionWire::DeliverLocal { destination_hash, raw, packet_hash, receiving_interface } => {
136 TransportAction::DeliverLocal {
137 destination_hash,
138 raw,
139 packet_hash,
140 receiving_interface: InterfaceId(receiving_interface),
141 }
142 }
143 ActionWire::PathUpdated { destination_hash, hops, next_hop, interface } => {
144 TransportAction::PathUpdated {
145 destination_hash,
146 hops,
147 next_hop,
148 interface: InterfaceId(interface),
149 }
150 }
151 ActionWire::CacheAnnounce { packet_hash, raw } => {
152 TransportAction::CacheAnnounce { packet_hash, raw }
153 }
154 ActionWire::TunnelEstablished { tunnel_id, interface } => {
155 TransportAction::TunnelEstablished {
156 tunnel_id,
157 interface: InterfaceId(interface),
158 }
159 }
160 ActionWire::TunnelSynthesize { interface, data, dest_hash } => {
161 TransportAction::TunnelSynthesize {
162 interface: InterfaceId(interface),
163 data,
164 dest_hash,
165 }
166 }
167 ActionWire::ForwardToLocalClients { raw, exclude, has_exclude } => {
168 TransportAction::ForwardToLocalClients {
169 raw,
170 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
171 }
172 }
173 ActionWire::ForwardPlainBroadcast { raw, to_local, exclude, has_exclude } => {
174 TransportAction::ForwardPlainBroadcast {
175 raw,
176 to_local: to_local != 0,
177 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
178 }
179 }
180 ActionWire::AnnounceReceived {
181 destination_hash, identity_hash, public_key,
182 name_hash, random_hash, app_data, hops, receiving_interface,
183 } => {
184 TransportAction::AnnounceReceived {
185 destination_hash,
186 identity_hash,
187 public_key,
188 name_hash,
189 random_hash,
190 app_data,
191 hops,
192 receiving_interface: InterfaceId(receiving_interface),
193 }
194 }
195 }
196 })
197 .collect()
198}
199
200fn infer_interface_type(name: &str) -> String {
204 if name.starts_with("TCPServerInterface") {
205 "TCPServerClientInterface".to_string()
206 } else if name.starts_with("BackboneInterface") {
207 "BackboneInterface".to_string()
208 } else if name.starts_with("LocalInterface") {
209 "LocalServerClientInterface".to_string()
210 } else {
211 "AutoInterface".to_string()
214 }
215}
216
217pub trait Callbacks: Send {
222 fn on_announce(
223 &mut self,
224 announced: crate::destination::AnnouncedIdentity,
225 );
226
227 fn on_path_updated(&mut self, dest_hash: rns_core::types::DestHash, hops: u8);
228
229 fn on_local_delivery(&mut self, dest_hash: rns_core::types::DestHash, raw: Vec<u8>, packet_hash: rns_core::types::PacketHash);
230
231 fn on_interface_up(&mut self, _id: InterfaceId) {}
233
234 fn on_interface_down(&mut self, _id: InterfaceId) {}
236
237 fn on_link_established(&mut self, _link_id: rns_core::types::LinkId, _dest_hash: rns_core::types::DestHash, _rtt: f64, _is_initiator: bool) {}
239
240 fn on_link_closed(&mut self, _link_id: rns_core::types::LinkId, _reason: Option<rns_core::link::TeardownReason>) {}
242
243 fn on_remote_identified(&mut self, _link_id: rns_core::types::LinkId, _identity_hash: rns_core::types::IdentityHash, _public_key: [u8; 64]) {}
245
246 fn on_resource_received(&mut self, _link_id: rns_core::types::LinkId, _data: Vec<u8>, _metadata: Option<Vec<u8>>) {}
248
249 fn on_resource_completed(&mut self, _link_id: rns_core::types::LinkId) {}
251
252 fn on_resource_failed(&mut self, _link_id: rns_core::types::LinkId, _error: String) {}
254
255 fn on_resource_progress(&mut self, _link_id: rns_core::types::LinkId, _received: usize, _total: usize) {}
257
258 fn on_resource_accept_query(&mut self, _link_id: rns_core::types::LinkId, _resource_hash: Vec<u8>, _transfer_size: u64, _has_metadata: bool) -> bool {
261 false
262 }
263
264 fn on_channel_message(&mut self, _link_id: rns_core::types::LinkId, _msgtype: u16, _payload: Vec<u8>) {}
266
267 fn on_link_data(&mut self, _link_id: rns_core::types::LinkId, _context: u8, _data: Vec<u8>) {}
269
270 fn on_response(&mut self, _link_id: rns_core::types::LinkId, _request_id: [u8; 16], _data: Vec<u8>) {}
272
273 fn on_proof(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash, _rtt: f64) {}
276
277 fn on_proof_requested(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash) -> bool {
280 true
281 }
282
283 fn on_direct_connect_proposed(&mut self, _link_id: rns_core::types::LinkId, _peer_identity: Option<rns_core::types::IdentityHash>) -> bool {
286 false
287 }
288
289 fn on_direct_connect_established(&mut self, _link_id: rns_core::types::LinkId, _interface_id: InterfaceId) {}
291
292 fn on_direct_connect_failed(&mut self, _link_id: rns_core::types::LinkId, _reason: u8) {}
294}
295
296pub struct Driver {
298 pub(crate) engine: TransportEngine,
299 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
300 pub(crate) rng: OsRng,
301 pub(crate) rx: EventReceiver,
302 pub(crate) callbacks: Box<dyn Callbacks>,
303 pub(crate) started: f64,
304 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
305 pub(crate) tunnel_synth_dest: [u8; 16],
307 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
309 pub(crate) link_manager: LinkManager,
311 pub(crate) management_config: crate::management::ManagementConfig,
313 pub(crate) last_management_announce: f64,
315 pub(crate) initial_announce_sent: bool,
317 pub(crate) known_destinations: HashMap<[u8; 16], crate::destination::AnnouncedIdentity>,
319 pub(crate) path_request_dest: [u8; 16],
321 pub(crate) proof_strategies: HashMap<[u8; 16], (rns_core::types::ProofStrategy, Option<rns_crypto::identity::Identity>)>,
324 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
326 pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
328 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
330 pub(crate) holepunch_manager: HolePunchManager,
332 pub(crate) event_tx: crate::event::EventSender,
334 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
336 pub(crate) discovery_required_value: u8,
338 pub(crate) discovery_name_hash: [u8; 10],
340 pub(crate) probe_responder_hash: Option<[u8; 16]>,
342 pub(crate) discover_interfaces: bool,
344 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
346 pub(crate) discovery_cleanup_counter: u32,
348 #[cfg(feature = "rns-hooks")]
350 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
351 #[cfg(feature = "rns-hooks")]
353 pub(crate) hook_manager: Option<HookManager>,
354}
355
356impl Driver {
357 pub fn new(
359 config: TransportConfig,
360 rx: EventReceiver,
361 tx: crate::event::EventSender,
362 callbacks: Box<dyn Callbacks>,
363 ) -> Self {
364 let tunnel_synth_dest = rns_core::destination::destination_hash(
365 "rnstransport",
366 &["tunnel", "synthesize"],
367 None,
368 );
369 let path_request_dest = rns_core::destination::destination_hash(
370 "rnstransport",
371 &["path", "request"],
372 None,
373 );
374 let discovery_name_hash = crate::discovery::discovery_name_hash();
375 let mut engine = TransportEngine::new(config);
376 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
377 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
379 let mut local_destinations = HashMap::new();
382 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
383 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
384 Driver {
385 engine,
386 interfaces: HashMap::new(),
387 rng: OsRng,
388 rx,
389 callbacks,
390 started: time::now(),
391 announce_cache: None,
392 tunnel_synth_dest,
393 transport_identity: None,
394 link_manager: LinkManager::new(),
395 management_config: Default::default(),
396 last_management_announce: 0.0,
397 initial_announce_sent: false,
398 known_destinations: HashMap::new(),
399 path_request_dest,
400 proof_strategies: HashMap::new(),
401 sent_packets: HashMap::new(),
402 completed_proofs: HashMap::new(),
403 local_destinations,
404 holepunch_manager: HolePunchManager::new(None, None),
405 event_tx: tx,
406 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
407 std::env::temp_dir().join("rns-discovered-interfaces")
408 ),
409 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
410 discovery_name_hash,
411 probe_responder_hash: None,
412 discover_interfaces: false,
413 interface_announcer: None,
414 discovery_cleanup_counter: 0,
415 #[cfg(feature = "rns-hooks")]
416 hook_slots: create_hook_slots(),
417 #[cfg(feature = "rns-hooks")]
418 hook_manager: HookManager::new().ok(),
419 }
420 }
421
422 pub fn set_probe_config(&mut self, addr: Option<std::net::SocketAddr>, device: Option<String>) {
424 self.holepunch_manager = HolePunchManager::new(addr, device);
425 }
426
427 pub fn run(&mut self) {
429 loop {
430 let event = match self.rx.recv() {
431 Ok(e) => e,
432 Err(_) => break, };
434
435 match event {
436 Event::Frame { interface_id, data } => {
437 if data.len() > 2 && (data[0] & 0x03) == 0x01 {
439 log::debug!("Frame: announce from iface {} (len={}, flags=0x{:02x})",
440 interface_id.0, data.len(), data[0]);
441 }
442 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
444 entry.stats.rxb += data.len() as u64;
445 entry.stats.rx_packets += 1;
446 }
447
448 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
450 if let Some(ref ifac_state) = entry.ifac {
451 match ifac::unmask_inbound(&data, ifac_state) {
453 Some(unmasked) => unmasked,
454 None => {
455 log::debug!("[{}] IFAC rejected packet", interface_id.0);
456 continue;
457 }
458 }
459 } else {
460 if data.len() > 2 && data[0] & 0x80 == 0x80 {
462 log::debug!("[{}] dropping packet with IFAC flag on non-IFAC interface", interface_id.0);
463 continue;
464 }
465 data
466 }
467 } else {
468 data
469 };
470
471 #[cfg(feature = "rns-hooks")]
473 {
474 let pkt_ctx = rns_hooks::PacketContext {
475 flags: if packet.is_empty() { 0 } else { packet[0] },
476 hops: if packet.len() > 1 { packet[1] } else { 0 },
477 destination_hash: extract_dest_hash(&packet),
478 context: 0,
479 packet_hash: [0; 32],
480 interface_id: interface_id.0,
481 data_offset: 0,
482 data_len: packet.len() as u32,
483 };
484 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &packet };
485 let now = time::now();
486 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
487 {
488 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::PreIngress as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
489 if let Some(ref e) = exec {
490 if !e.injected_actions.is_empty() {
491 let extra = convert_injected_actions(e.injected_actions.clone());
492 self.dispatch_all(extra);
493 }
494 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
495 continue;
496 }
497 }
498 }
499 }
500
501 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
503 let now = time::now();
504 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
505 entry.stats.record_incoming_announce(now);
506 }
507 }
508
509 if let Some(entry) = self.interfaces.get(&interface_id) {
511 self.engine.update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
512 }
513
514 let actions = self.engine.handle_inbound(
515 &packet,
516 interface_id,
517 time::now(),
518 &mut self.rng,
519 );
520
521 #[cfg(feature = "rns-hooks")]
523 {
524 let pkt_ctx2 = rns_hooks::PacketContext {
525 flags: if packet.is_empty() { 0 } else { packet[0] },
526 hops: if packet.len() > 1 { packet[1] } else { 0 },
527 destination_hash: extract_dest_hash(&packet),
528 context: 0,
529 packet_hash: [0; 32],
530 interface_id: interface_id.0,
531 data_offset: 0,
532 data_len: packet.len() as u32,
533 };
534 let ctx = HookContext::Packet { ctx: &pkt_ctx2, raw: &packet };
535 let now = time::now();
536 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
537 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PreDispatch as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
538 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
539 }
540 }
541
542 self.dispatch_all(actions);
543 }
544 Event::Tick => {
545 #[cfg(feature = "rns-hooks")]
547 {
548 let ctx = HookContext::Tick;
549 let now = time::now();
550 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
551 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::Tick as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
552 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
553 }
554 }
555
556 let now = time::now();
557 for (id, entry) in &self.interfaces {
559 self.engine.update_interface_freq(*id, entry.stats.incoming_announce_freq());
560 }
561 let actions = self.engine.tick(now, &mut self.rng);
562 self.dispatch_all(actions);
563 let link_actions = self.link_manager.tick(&mut self.rng);
565 self.dispatch_link_actions(link_actions);
566 {
568 let tx = self.get_event_sender();
569 let hp_actions = self.holepunch_manager.tick(&tx);
570 self.dispatch_holepunch_actions(hp_actions);
571 }
572 self.tick_management_announces(now);
574 self.sent_packets.retain(|_, (_, sent_time)| now - *sent_time < 60.0);
576 self.completed_proofs.retain(|_, (_, received)| now - *received < 120.0);
578
579 self.tick_discovery_announcer(now);
580
581 if self.discover_interfaces {
583 self.discovery_cleanup_counter += 1;
584 if self.discovery_cleanup_counter >= 3600 {
585 self.discovery_cleanup_counter = 0;
586 if let Ok(removed) = self.discovered_interfaces.cleanup() {
587 if removed > 0 {
588 log::info!("Discovery cleanup: removed {} stale entries", removed);
589 }
590 }
591 }
592 }
593 }
594 Event::InterfaceUp(id, new_writer, info) => {
595 let wants_tunnel;
596 if let Some(mut info) = info {
597 log::info!("[{}] dynamic interface registered", id.0);
599 wants_tunnel = info.wants_tunnel;
600 let iface_type = infer_interface_type(&info.name);
601 info.started = time::now();
603 self.engine.register_interface(info.clone());
604 if let Some(writer) = new_writer {
605 self.interfaces.insert(
606 id,
607 InterfaceEntry {
608 id,
609 info,
610 writer,
611 online: true,
612 dynamic: true,
613 ifac: None,
614 stats: InterfaceStats {
615 started: time::now(),
616 ..Default::default()
617 },
618 interface_type: iface_type,
619 },
620 );
621 }
622 self.callbacks.on_interface_up(id);
623 #[cfg(feature = "rns-hooks")]
624 {
625 let ctx = HookContext::Interface { interface_id: id.0 };
626 let now = time::now();
627 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
628 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
629 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
630 }
631 }
632 } else if let Some(entry) = self.interfaces.get_mut(&id) {
633 log::info!("[{}] interface online", id.0);
635 wants_tunnel = entry.info.wants_tunnel;
636 entry.online = true;
637 if let Some(writer) = new_writer {
638 log::info!("[{}] writer refreshed after reconnect", id.0);
639 entry.writer = writer;
640 }
641 self.callbacks.on_interface_up(id);
642 #[cfg(feature = "rns-hooks")]
643 {
644 let ctx = HookContext::Interface { interface_id: id.0 };
645 let now = time::now();
646 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
647 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
648 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
649 }
650 }
651 } else {
652 wants_tunnel = false;
653 }
654
655 if wants_tunnel {
657 self.synthesize_tunnel_for_interface(id);
658 }
659 }
660 Event::InterfaceDown(id) => {
661 if let Some(entry) = self.interfaces.get(&id) {
663 if let Some(tunnel_id) = entry.info.tunnel_id {
664 self.engine.void_tunnel_interface(&tunnel_id);
665 }
666 }
667
668 if let Some(entry) = self.interfaces.get(&id) {
669 if entry.dynamic {
670 log::info!("[{}] dynamic interface removed", id.0);
672 self.engine.deregister_interface(id);
673 self.interfaces.remove(&id);
674 } else {
675 log::info!("[{}] interface offline", id.0);
677 self.interfaces.get_mut(&id).unwrap().online = false;
678 }
679 self.callbacks.on_interface_down(id);
680 #[cfg(feature = "rns-hooks")]
681 {
682 let ctx = HookContext::Interface { interface_id: id.0 };
683 let now = time::now();
684 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
685 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceDown as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
686 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
687 }
688 }
689 }
690 }
691 Event::SendOutbound { raw, dest_type, attached_interface } => {
692 match RawPacket::unpack(&raw) {
693 Ok(packet) => {
694 let is_announce = packet.flags.packet_type == rns_core::constants::PACKET_TYPE_ANNOUNCE;
695 if is_announce {
696 log::debug!("SendOutbound: ANNOUNCE for {:02x?} (len={}, dest_type={}, attached={:?})",
697 &packet.destination_hash[..4], raw.len(), dest_type, attached_interface);
698 }
699 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
701 self.sent_packets.insert(
702 packet.packet_hash,
703 (packet.destination_hash, time::now()),
704 );
705 }
706 let actions = self.engine.handle_outbound(
707 &packet,
708 dest_type,
709 attached_interface,
710 time::now(),
711 );
712 if is_announce {
713 log::debug!("SendOutbound: announce routed to {} actions: {:?}",
714 actions.len(),
715 actions.iter().map(|a| match a {
716 TransportAction::SendOnInterface { interface, .. } => format!("SendOn({})", interface.0),
717 TransportAction::BroadcastOnAllInterfaces { .. } => "BroadcastAll".to_string(),
718 _ => "other".to_string(),
719 }).collect::<Vec<_>>());
720 }
721 self.dispatch_all(actions);
722 }
723 Err(e) => {
724 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
725 }
726 }
727 }
728 Event::RegisterDestination { dest_hash, dest_type } => {
729 self.engine.register_destination(dest_hash, dest_type);
730 self.local_destinations.insert(dest_hash, dest_type);
731 }
732 Event::DeregisterDestination { dest_hash } => {
733 self.engine.deregister_destination(&dest_hash);
734 self.local_destinations.remove(&dest_hash);
735 }
736 Event::Query(request, response_tx) => {
737 let response = self.handle_query_mut(request);
738 let _ = response_tx.send(response);
739 }
740 Event::DeregisterLinkDestination { dest_hash } => {
741 self.link_manager.deregister_link_destination(&dest_hash);
742 }
743 Event::RegisterLinkDestination { dest_hash, sig_prv_bytes, sig_pub_bytes, resource_strategy } => {
744 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
745 let strat = match resource_strategy {
746 1 => crate::link_manager::ResourceStrategy::AcceptAll,
747 2 => crate::link_manager::ResourceStrategy::AcceptApp,
748 _ => crate::link_manager::ResourceStrategy::AcceptNone,
749 };
750 self.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes, strat);
751 self.engine.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
753 self.local_destinations.insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
754 }
755 Event::RegisterRequestHandler { path, allowed_list, handler } => {
756 self.link_manager.register_request_handler(&path, allowed_list, move |link_id, p, data, remote| {
757 handler(link_id, p, data, remote)
758 });
759 }
760 Event::CreateLink { dest_hash, dest_sig_pub_bytes, response_tx } => {
761 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
762 let mtu = self.engine.next_hop_interface(&dest_hash)
763 .and_then(|iface_id| self.interfaces.get(&iface_id))
764 .map(|entry| entry.info.mtu)
765 .unwrap_or(rns_core::constants::MTU as u32);
766 let (link_id, link_actions) = self.link_manager.create_link(
767 &dest_hash, &dest_sig_pub_bytes, hops, mtu, &mut self.rng,
768 );
769 let _ = response_tx.send(link_id);
770 self.dispatch_link_actions(link_actions);
771 }
772 Event::SendRequest { link_id, path, data } => {
773 let link_actions = self.link_manager.send_request(
774 &link_id, &path, &data, &mut self.rng,
775 );
776 self.dispatch_link_actions(link_actions);
777 }
778 Event::IdentifyOnLink { link_id, identity_prv_key } => {
779 let identity = rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
780 let link_actions = self.link_manager.identify(&link_id, &identity, &mut self.rng);
781 self.dispatch_link_actions(link_actions);
782 }
783 Event::TeardownLink { link_id } => {
784 let link_actions = self.link_manager.teardown_link(&link_id);
785 self.dispatch_link_actions(link_actions);
786 }
787 Event::SendResource { link_id, data, metadata } => {
788 let link_actions = self.link_manager.send_resource(
789 &link_id, &data, metadata.as_deref(), &mut self.rng,
790 );
791 self.dispatch_link_actions(link_actions);
792 }
793 Event::SetResourceStrategy { link_id, strategy } => {
794 use crate::link_manager::ResourceStrategy;
795 let strat = match strategy {
796 0 => ResourceStrategy::AcceptNone,
797 1 => ResourceStrategy::AcceptAll,
798 2 => ResourceStrategy::AcceptApp,
799 _ => ResourceStrategy::AcceptNone,
800 };
801 self.link_manager.set_resource_strategy(&link_id, strat);
802 }
803 Event::AcceptResource { link_id, resource_hash, accept } => {
804 let link_actions = self.link_manager.accept_resource(
805 &link_id, &resource_hash, accept, &mut self.rng,
806 );
807 self.dispatch_link_actions(link_actions);
808 }
809 Event::SendChannelMessage { link_id, msgtype, payload } => {
810 let link_actions = self.link_manager.send_channel_message(
811 &link_id, msgtype, &payload, &mut self.rng,
812 );
813 self.dispatch_link_actions(link_actions);
814 }
815 Event::SendOnLink { link_id, data, context } => {
816 let link_actions = self.link_manager.send_on_link(
817 &link_id, &data, context, &mut self.rng,
818 );
819 self.dispatch_link_actions(link_actions);
820 }
821 Event::RequestPath { dest_hash } => {
822 self.handle_request_path(dest_hash);
823 }
824 Event::RegisterProofStrategy { dest_hash, strategy, signing_key } => {
825 let identity = signing_key.map(|key| {
826 rns_crypto::identity::Identity::from_private_key(&key)
827 });
828 self.proof_strategies.insert(dest_hash, (strategy, identity));
829 }
830 Event::ProposeDirectConnect { link_id } => {
831 let derived_key = self.link_manager.get_derived_key(&link_id);
832 if let Some(dk) = derived_key {
833 let tx = self.get_event_sender();
834 let hp_actions = self.holepunch_manager.propose(
835 link_id, &dk, &mut self.rng, &tx,
836 );
837 self.dispatch_holepunch_actions(hp_actions);
838 } else {
839 log::warn!("Cannot propose direct connect: no derived key for link {:02x?}", &link_id[..4]);
840 }
841 }
842 Event::SetDirectConnectPolicy { policy } => {
843 self.holepunch_manager.set_policy(policy);
844 }
845 Event::HolePunchProbeResult { link_id, session_id, observed_addr, socket } => {
846 let hp_actions = self.holepunch_manager.handle_probe_result(
847 link_id, session_id, observed_addr, socket,
848 );
849 self.dispatch_holepunch_actions(hp_actions);
850 }
851 Event::HolePunchProbeFailed { link_id, session_id } => {
852 let hp_actions = self.holepunch_manager.handle_probe_failed(
853 link_id, session_id,
854 );
855 self.dispatch_holepunch_actions(hp_actions);
856 }
857 Event::LoadHook { name, wasm_bytes, attach_point, priority, response_tx } => {
858 #[cfg(feature = "rns-hooks")]
859 {
860 let result = (|| -> Result<(), String> {
861 let point_idx = crate::config::parse_hook_point(&attach_point)
862 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
863 let mgr = self.hook_manager.as_ref()
864 .ok_or_else(|| "hook manager not available".to_string())?;
865 let program = mgr.compile(name.clone(), &wasm_bytes, priority)
866 .map_err(|e| format!("compile error: {}", e))?;
867 self.hook_slots[point_idx].attach(program);
868 log::info!("Loaded hook '{}' at point {} (priority {})", name, attach_point, priority);
869 Ok(())
870 })();
871 let _ = response_tx.send(result);
872 }
873 #[cfg(not(feature = "rns-hooks"))]
874 {
875 let _ = (name, wasm_bytes, attach_point, priority);
876 let _ = response_tx.send(Err("hooks not enabled".to_string()));
877 }
878 }
879 Event::UnloadHook { name, attach_point, response_tx } => {
880 #[cfg(feature = "rns-hooks")]
881 {
882 let result = (|| -> Result<(), String> {
883 let point_idx = crate::config::parse_hook_point(&attach_point)
884 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
885 match self.hook_slots[point_idx].detach(&name) {
886 Some(_) => {
887 log::info!("Unloaded hook '{}' from point {}", name, attach_point);
888 Ok(())
889 }
890 None => Err(format!("hook '{}' not found at point '{}'", name, attach_point)),
891 }
892 })();
893 let _ = response_tx.send(result);
894 }
895 #[cfg(not(feature = "rns-hooks"))]
896 {
897 let _ = (name, attach_point);
898 let _ = response_tx.send(Err("hooks not enabled".to_string()));
899 }
900 }
901 Event::ReloadHook { name, attach_point, wasm_bytes, response_tx } => {
902 #[cfg(feature = "rns-hooks")]
903 {
904 let result = (|| -> Result<(), String> {
905 let point_idx = crate::config::parse_hook_point(&attach_point)
906 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
907 let old = self.hook_slots[point_idx].detach(&name)
908 .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
909 let priority = old.priority;
910 let mgr = match self.hook_manager.as_ref() {
911 Some(m) => m,
912 None => {
913 self.hook_slots[point_idx].attach(old);
914 return Err("hook manager not available".to_string());
915 }
916 };
917 match mgr.compile(name.clone(), &wasm_bytes, priority) {
918 Ok(program) => {
919 self.hook_slots[point_idx].attach(program);
920 log::info!("Reloaded hook '{}' at point {} (priority {})", name, attach_point, priority);
921 Ok(())
922 }
923 Err(e) => {
924 self.hook_slots[point_idx].attach(old);
925 Err(format!("compile error: {}", e))
926 }
927 }
928 })();
929 let _ = response_tx.send(result);
930 }
931 #[cfg(not(feature = "rns-hooks"))]
932 {
933 let _ = (name, attach_point, wasm_bytes);
934 let _ = response_tx.send(Err("hooks not enabled".to_string()));
935 }
936 }
937 Event::ListHooks { response_tx } => {
938 #[cfg(feature = "rns-hooks")]
939 {
940 let hook_point_names = [
941 "PreIngress", "PreDispatch", "AnnounceReceived", "PathUpdated",
942 "AnnounceRetransmit", "LinkRequestReceived", "LinkEstablished",
943 "LinkClosed", "InterfaceUp", "InterfaceDown", "InterfaceConfigChanged",
944 "SendOnInterface", "BroadcastOnAllInterfaces", "DeliverLocal",
945 "TunnelSynthesize", "Tick",
946 ];
947 let mut infos = Vec::new();
948 for (idx, slot) in self.hook_slots.iter().enumerate() {
949 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
950 for prog in &slot.programs {
951 infos.push(crate::event::HookInfo {
952 name: prog.name.clone(),
953 attach_point: point_name.to_string(),
954 priority: prog.priority,
955 enabled: prog.enabled,
956 consecutive_traps: prog.consecutive_traps,
957 });
958 }
959 }
960 let _ = response_tx.send(infos);
961 }
962 #[cfg(not(feature = "rns-hooks"))]
963 {
964 let _ = response_tx.send(Vec::new());
965 }
966 }
967 Event::InterfaceConfigChanged(id) => {
968 #[cfg(feature = "rns-hooks")]
969 {
970 let ctx = HookContext::Interface { interface_id: id.0 };
971 let now = time::now();
972 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
973 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
974 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
975 }
976 }
977 #[cfg(not(feature = "rns-hooks"))]
978 let _ = id;
979 }
980 Event::Shutdown => break,
981 }
982 }
983 }
984
985 fn handle_query(&self, request: QueryRequest) -> QueryResponse {
987 match request {
988 QueryRequest::InterfaceStats => {
989 let mut interfaces = Vec::new();
990 let mut total_rxb: u64 = 0;
991 let mut total_txb: u64 = 0;
992 for entry in self.interfaces.values() {
993 total_rxb += entry.stats.rxb;
994 total_txb += entry.stats.txb;
995 interfaces.push(SingleInterfaceStat {
996 name: entry.info.name.clone(),
997 status: entry.online,
998 mode: entry.info.mode,
999 rxb: entry.stats.rxb,
1000 txb: entry.stats.txb,
1001 rx_packets: entry.stats.rx_packets,
1002 tx_packets: entry.stats.tx_packets,
1003 bitrate: entry.info.bitrate,
1004 ifac_size: entry.ifac.as_ref().map(|s| s.size),
1005 started: entry.stats.started,
1006 ia_freq: entry.stats.incoming_announce_freq(),
1007 oa_freq: entry.stats.outgoing_announce_freq(),
1008 interface_type: entry.interface_type.clone(),
1009 });
1010 }
1011 interfaces.sort_by(|a, b| a.name.cmp(&b.name));
1013 QueryResponse::InterfaceStats(InterfaceStatsResponse {
1014 interfaces,
1015 transport_id: self.engine.identity_hash().copied(),
1016 transport_enabled: self.engine.transport_enabled(),
1017 transport_uptime: time::now() - self.started,
1018 total_rxb,
1019 total_txb,
1020 probe_responder: self.probe_responder_hash,
1021 })
1022 }
1023 QueryRequest::PathTable { max_hops } => {
1024 let entries: Vec<PathTableEntry> = self
1025 .engine
1026 .path_table_entries()
1027 .filter(|(_, entry)| {
1028 max_hops.map_or(true, |max| entry.hops <= max)
1029 })
1030 .map(|(hash, entry)| {
1031 let iface_name = self.interfaces.get(&entry.receiving_interface)
1032 .map(|e| e.info.name.clone())
1033 .or_else(|| self.engine.interface_info(&entry.receiving_interface)
1034 .map(|i| i.name.clone()))
1035 .unwrap_or_default();
1036 PathTableEntry {
1037 hash: *hash,
1038 timestamp: entry.timestamp,
1039 via: entry.next_hop,
1040 hops: entry.hops,
1041 expires: entry.expires,
1042 interface: entry.receiving_interface,
1043 interface_name: iface_name,
1044 }
1045 })
1046 .collect();
1047 QueryResponse::PathTable(entries)
1048 }
1049 QueryRequest::RateTable => {
1050 let entries: Vec<RateTableEntry> = self
1051 .engine
1052 .rate_limiter()
1053 .entries()
1054 .map(|(hash, entry)| RateTableEntry {
1055 hash: *hash,
1056 last: entry.last,
1057 rate_violations: entry.rate_violations,
1058 blocked_until: entry.blocked_until,
1059 timestamps: entry.timestamps.clone(),
1060 })
1061 .collect();
1062 QueryResponse::RateTable(entries)
1063 }
1064 QueryRequest::NextHop { dest_hash } => {
1065 let resp = self.engine.next_hop(&dest_hash).map(|next_hop| {
1066 NextHopResponse {
1067 next_hop,
1068 hops: self.engine.hops_to(&dest_hash).unwrap_or(0),
1069 interface: self.engine.next_hop_interface(&dest_hash).unwrap_or(InterfaceId(0)),
1070 }
1071 });
1072 QueryResponse::NextHop(resp)
1073 }
1074 QueryRequest::NextHopIfName { dest_hash } => {
1075 let name = self
1076 .engine
1077 .next_hop_interface(&dest_hash)
1078 .and_then(|id| self.interfaces.get(&id))
1079 .map(|entry| entry.info.name.clone());
1080 QueryResponse::NextHopIfName(name)
1081 }
1082 QueryRequest::LinkCount => {
1083 QueryResponse::LinkCount(self.engine.link_table_count() + self.link_manager.link_count())
1084 }
1085 QueryRequest::DropPath { .. } => {
1086 QueryResponse::DropPath(false)
1088 }
1089 QueryRequest::DropAllVia { .. } => {
1090 QueryResponse::DropAllVia(0)
1091 }
1092 QueryRequest::DropAnnounceQueues => {
1093 QueryResponse::DropAnnounceQueues
1094 }
1095 QueryRequest::TransportIdentity => {
1096 QueryResponse::TransportIdentity(self.engine.identity_hash().copied())
1097 }
1098 QueryRequest::GetBlackholed => {
1099 let now = time::now();
1100 let entries: Vec<BlackholeInfo> = self.engine.blackholed_entries()
1101 .filter(|(_, e)| e.expires == 0.0 || e.expires > now)
1102 .map(|(hash, entry)| BlackholeInfo {
1103 identity_hash: *hash,
1104 created: entry.created,
1105 expires: entry.expires,
1106 reason: entry.reason.clone(),
1107 })
1108 .collect();
1109 QueryResponse::Blackholed(entries)
1110 }
1111 QueryRequest::BlackholeIdentity { .. }
1112 | QueryRequest::UnblackholeIdentity { .. } => {
1113 QueryResponse::BlackholeResult(false)
1115 }
1116 QueryRequest::InjectPath { .. } => {
1117 QueryResponse::InjectPath(false)
1119 }
1120 QueryRequest::InjectIdentity { .. } => {
1121 QueryResponse::InjectIdentity(false)
1123 }
1124 QueryRequest::HasPath { dest_hash } => {
1125 QueryResponse::HasPath(self.engine.has_path(&dest_hash))
1126 }
1127 QueryRequest::HopsTo { dest_hash } => {
1128 QueryResponse::HopsTo(self.engine.hops_to(&dest_hash))
1129 }
1130 QueryRequest::RecallIdentity { dest_hash } => {
1131 QueryResponse::RecallIdentity(self.known_destinations.get(&dest_hash).cloned())
1132 }
1133 QueryRequest::LocalDestinations => {
1134 let entries: Vec<LocalDestinationEntry> = self
1135 .local_destinations
1136 .iter()
1137 .map(|(hash, dest_type)| LocalDestinationEntry {
1138 hash: *hash,
1139 dest_type: *dest_type,
1140 })
1141 .collect();
1142 QueryResponse::LocalDestinations(entries)
1143 }
1144 QueryRequest::Links => {
1145 QueryResponse::Links(self.link_manager.link_entries())
1146 }
1147 QueryRequest::Resources => {
1148 QueryResponse::Resources(self.link_manager.resource_entries())
1149 }
1150 QueryRequest::DiscoveredInterfaces { only_available, only_transport } => {
1151 let mut interfaces = self.discovered_interfaces.list().unwrap_or_default();
1152 crate::discovery::filter_and_sort_interfaces(
1153 &mut interfaces, only_available, only_transport,
1154 );
1155 QueryResponse::DiscoveredInterfaces(interfaces)
1156 }
1157 QueryRequest::SendProbe { .. } => QueryResponse::SendProbe(None),
1159 QueryRequest::CheckProof { .. } => QueryResponse::CheckProof(None),
1160 }
1161 }
1162
1163 fn handle_query_mut(&mut self, request: QueryRequest) -> QueryResponse {
1165 match request {
1166 QueryRequest::BlackholeIdentity { identity_hash, duration_hours, reason } => {
1167 let now = time::now();
1168 self.engine.blackhole_identity(identity_hash, now, duration_hours, reason);
1169 QueryResponse::BlackholeResult(true)
1170 }
1171 QueryRequest::UnblackholeIdentity { identity_hash } => {
1172 let result = self.engine.unblackhole_identity(&identity_hash);
1173 QueryResponse::UnblackholeResult(result)
1174 }
1175 QueryRequest::DropPath { dest_hash } => {
1176 QueryResponse::DropPath(self.engine.drop_path(&dest_hash))
1177 }
1178 QueryRequest::DropAllVia { transport_hash } => {
1179 QueryResponse::DropAllVia(self.engine.drop_all_via(&transport_hash))
1180 }
1181 QueryRequest::DropAnnounceQueues => {
1182 self.engine.drop_announce_queues();
1183 QueryResponse::DropAnnounceQueues
1184 }
1185 QueryRequest::InjectPath {
1186 dest_hash,
1187 next_hop,
1188 hops,
1189 expires,
1190 interface_name,
1191 packet_hash,
1192 } => {
1193 let iface_id = self
1195 .interfaces
1196 .iter()
1197 .find(|(_, entry)| entry.info.name == interface_name)
1198 .map(|(id, _)| *id);
1199 match iface_id {
1200 Some(id) => {
1201 let entry = PathEntry {
1202 timestamp: time::now(),
1203 next_hop,
1204 hops,
1205 expires,
1206 random_blobs: Vec::new(),
1207 receiving_interface: id,
1208 packet_hash,
1209 announce_raw: None,
1210 };
1211 self.engine.inject_path(dest_hash, entry);
1212 QueryResponse::InjectPath(true)
1213 }
1214 None => QueryResponse::InjectPath(false),
1215 }
1216 }
1217 QueryRequest::InjectIdentity {
1218 dest_hash,
1219 identity_hash,
1220 public_key,
1221 app_data,
1222 hops,
1223 received_at,
1224 } => {
1225 self.known_destinations.insert(
1226 dest_hash,
1227 crate::destination::AnnouncedIdentity {
1228 dest_hash: rns_core::types::DestHash(dest_hash),
1229 identity_hash: rns_core::types::IdentityHash(identity_hash),
1230 public_key,
1231 app_data,
1232 hops,
1233 received_at,
1234 },
1235 );
1236 QueryResponse::InjectIdentity(true)
1237 }
1238 QueryRequest::SendProbe { dest_hash, payload_size } => {
1239 let announced = self.known_destinations.get(&dest_hash).cloned();
1241 match announced {
1242 Some(recalled) => {
1243 let remote_id = rns_crypto::identity::Identity::from_public_key(&recalled.public_key);
1245 let mut payload = vec![0u8; payload_size];
1246 self.rng.fill_bytes(&mut payload);
1247 match remote_id.encrypt(&payload, &mut self.rng) {
1248 Ok(ciphertext) => {
1249 let flags = rns_core::packet::PacketFlags {
1251 header_type: rns_core::constants::HEADER_1,
1252 context_flag: rns_core::constants::FLAG_UNSET,
1253 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1254 destination_type: rns_core::constants::DESTINATION_SINGLE,
1255 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1256 };
1257 match RawPacket::pack(
1258 flags, 0, &dest_hash, None,
1259 rns_core::constants::CONTEXT_NONE, &ciphertext,
1260 ) {
1261 Ok(packet) => {
1262 let packet_hash = packet.packet_hash;
1263 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
1264 self.sent_packets.insert(
1266 packet_hash,
1267 (dest_hash, time::now()),
1268 );
1269 let actions = self.engine.handle_outbound(
1271 &packet,
1272 rns_core::constants::DESTINATION_SINGLE,
1273 None,
1274 time::now(),
1275 );
1276 self.dispatch_all(actions);
1277 log::debug!(
1278 "Sent probe ({} bytes) to {:02x?}",
1279 payload_size, &dest_hash[..4],
1280 );
1281 QueryResponse::SendProbe(Some((packet_hash, hops)))
1282 }
1283 Err(_) => {
1284 log::warn!("Failed to pack probe packet");
1285 QueryResponse::SendProbe(None)
1286 }
1287 }
1288 }
1289 Err(_) => {
1290 log::warn!("Failed to encrypt probe payload");
1291 QueryResponse::SendProbe(None)
1292 }
1293 }
1294 }
1295 None => {
1296 log::debug!("No known identity for probe dest {:02x?}", &dest_hash[..4]);
1297 QueryResponse::SendProbe(None)
1298 }
1299 }
1300 }
1301 QueryRequest::CheckProof { packet_hash } => {
1302 match self.completed_proofs.remove(&packet_hash) {
1303 Some((rtt, _received)) => QueryResponse::CheckProof(Some(rtt)),
1304 None => QueryResponse::CheckProof(None),
1305 }
1306 }
1307 other => self.handle_query(other),
1308 }
1309 }
1310
1311 fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1313 let packet = match RawPacket::unpack(raw) {
1315 Ok(p) => p,
1316 Err(_) => return,
1317 };
1318
1319 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1320 Ok(validated) => {
1321 let iface_id = self
1324 .interfaces
1325 .iter()
1326 .find(|(_, entry)| entry.info.wants_tunnel && entry.online)
1327 .map(|(id, _)| *id);
1328
1329 if let Some(iface) = iface_id {
1330 let now = time::now();
1331 let tunnel_actions =
1332 self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1333 self.dispatch_all(tunnel_actions);
1334 }
1335 }
1336 Err(e) => {
1337 log::debug!("Tunnel synthesis validation failed: {}", e);
1338 }
1339 }
1340 }
1341
1342 fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1346 if let Some(ref identity) = self.transport_identity {
1347 let actions = self.engine.synthesize_tunnel(identity, interface, &mut self.rng);
1348 self.dispatch_all(actions);
1349 }
1350 }
1351
1352 fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1354 let mut data = Vec::with_capacity(48);
1356 data.extend_from_slice(&dest_hash);
1357
1358 if self.engine.transport_enabled() {
1359 if let Some(id_hash) = self.engine.identity_hash() {
1360 data.extend_from_slice(id_hash);
1361 }
1362 }
1363
1364 let mut tag = [0u8; 16];
1366 self.rng.fill_bytes(&mut tag);
1367 data.extend_from_slice(&tag);
1368
1369 let flags = rns_core::packet::PacketFlags {
1371 header_type: rns_core::constants::HEADER_1,
1372 context_flag: rns_core::constants::FLAG_UNSET,
1373 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1374 destination_type: rns_core::constants::DESTINATION_PLAIN,
1375 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1376 };
1377
1378 if let Ok(packet) = RawPacket::pack(
1379 flags, 0, &self.path_request_dest, None,
1380 rns_core::constants::CONTEXT_NONE, &data,
1381 ) {
1382 let actions = self.engine.handle_outbound(
1383 &packet,
1384 rns_core::constants::DESTINATION_PLAIN,
1385 None,
1386 time::now(),
1387 );
1388 self.dispatch_all(actions);
1389 }
1390 }
1391
1392 fn maybe_generate_proof(&mut self, dest_hash: [u8; 16], packet_hash: &[u8; 32]) {
1395 use rns_core::types::ProofStrategy;
1396
1397 let (strategy, identity) = match self.proof_strategies.get(&dest_hash) {
1398 Some((s, id)) => (*s, id.as_ref()),
1399 None => return,
1400 };
1401
1402 let should_prove = match strategy {
1403 ProofStrategy::ProveAll => true,
1404 ProofStrategy::ProveApp => {
1405 self.callbacks.on_proof_requested(
1406 rns_core::types::DestHash(dest_hash),
1407 rns_core::types::PacketHash(*packet_hash),
1408 )
1409 }
1410 ProofStrategy::ProveNone => false,
1411 };
1412
1413 if !should_prove {
1414 return;
1415 }
1416
1417 let identity = match identity {
1418 Some(id) => id,
1419 None => {
1420 log::warn!("Cannot generate proof for {:02x?}: no signing key", &dest_hash[..4]);
1421 return;
1422 }
1423 };
1424
1425 let signature = match identity.sign(packet_hash) {
1427 Ok(sig) => sig,
1428 Err(e) => {
1429 log::warn!("Failed to sign proof for {:02x?}: {:?}", &dest_hash[..4], e);
1430 return;
1431 }
1432 };
1433
1434 let mut proof_data = Vec::with_capacity(96);
1436 proof_data.extend_from_slice(packet_hash);
1437 proof_data.extend_from_slice(&signature);
1438
1439 let mut proof_dest = [0u8; 16];
1445 proof_dest.copy_from_slice(&packet_hash[..16]);
1446
1447 let flags = rns_core::packet::PacketFlags {
1448 header_type: rns_core::constants::HEADER_1,
1449 context_flag: rns_core::constants::FLAG_UNSET,
1450 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1451 destination_type: rns_core::constants::DESTINATION_SINGLE,
1452 packet_type: rns_core::constants::PACKET_TYPE_PROOF,
1453 };
1454
1455 if let Ok(packet) = RawPacket::pack(
1456 flags, 0, &proof_dest, None,
1457 rns_core::constants::CONTEXT_NONE, &proof_data,
1458 ) {
1459 let actions = self.engine.handle_outbound(
1460 &packet,
1461 rns_core::constants::DESTINATION_SINGLE,
1462 None,
1463 time::now(),
1464 );
1465 self.dispatch_all(actions);
1466 log::debug!("Generated proof for packet on dest {:02x?}", &dest_hash[..4]);
1467 }
1468 }
1469
1470 fn handle_inbound_proof(&mut self, dest_hash: [u8; 16], proof_data: &[u8], _raw_packet_hash: &[u8; 32]) {
1472 if proof_data.len() < 96 {
1474 log::debug!("Proof too short for explicit proof: {} bytes", proof_data.len());
1475 return;
1476 }
1477
1478 let mut tracked_hash = [0u8; 32];
1479 tracked_hash.copy_from_slice(&proof_data[..32]);
1480
1481 let signature = &proof_data[32..96];
1482
1483 if let Some((tracked_dest, sent_time)) = self.sent_packets.remove(&tracked_hash) {
1485 if let Some(announced) = self.known_destinations.get(&tracked_dest) {
1488 let identity = rns_crypto::identity::Identity::from_public_key(&announced.public_key);
1489 let mut sig = [0u8; 64];
1490 sig.copy_from_slice(signature);
1491 if !identity.verify(&sig, &tracked_hash) {
1492 log::debug!(
1493 "Proof signature invalid for {:02x?}",
1494 &tracked_hash[..4],
1495 );
1496 return;
1497 }
1498 } else {
1499 log::debug!(
1500 "No known identity for dest {:02x?}, accepting proof without signature check",
1501 &tracked_dest[..4],
1502 );
1503 }
1504
1505 let now = time::now();
1506 let rtt = now - sent_time;
1507 log::debug!(
1508 "Proof received for {:02x?} rtt={:.3}s",
1509 &tracked_hash[..4], rtt,
1510 );
1511 self.completed_proofs.insert(tracked_hash, (rtt, now));
1512 self.callbacks.on_proof(
1513 rns_core::types::DestHash(tracked_dest),
1514 rns_core::types::PacketHash(tracked_hash),
1515 rtt,
1516 );
1517 } else {
1518 log::debug!(
1519 "Proof for unknown packet {:02x?} on dest {:02x?}",
1520 &tracked_hash[..4], &dest_hash[..4],
1521 );
1522 }
1523 }
1524
1525 fn dispatch_all(&mut self, actions: Vec<TransportAction>) {
1527 #[cfg(feature = "rns-hooks")]
1528 let mut hook_injected: Vec<TransportAction> = Vec::new();
1529
1530 for action in actions {
1531 match action {
1532 TransportAction::SendOnInterface { interface, raw } => {
1533 #[cfg(feature = "rns-hooks")]
1534 {
1535 let pkt_ctx = rns_hooks::PacketContext {
1536 flags: if raw.is_empty() { 0 } else { raw[0] },
1537 hops: if raw.len() > 1 { raw[1] } else { 0 },
1538 destination_hash: extract_dest_hash(&raw),
1539 context: 0,
1540 packet_hash: [0; 32],
1541 interface_id: interface.0,
1542 data_offset: 0,
1543 data_len: raw.len() as u32,
1544 };
1545 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1546 let now = time::now();
1547 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1548 {
1549 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::SendOnInterface as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1550 if let Some(ref e) = exec {
1551 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1552 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1553 }
1554 }
1555 }
1556 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1557 if is_announce {
1558 log::debug!("Dispatching announce to interface {} (len={}, online={})",
1559 interface.0, raw.len(),
1560 self.interfaces.get(&interface).map(|e| e.online).unwrap_or(false));
1561 }
1562 if let Some(entry) = self.interfaces.get_mut(&interface) {
1563 if entry.online {
1564 let data = if let Some(ref ifac_state) = entry.ifac {
1565 ifac::mask_outbound(&raw, ifac_state)
1566 } else {
1567 raw
1568 };
1569 entry.stats.txb += data.len() as u64;
1571 entry.stats.tx_packets += 1;
1572 if is_announce {
1573 entry.stats.record_outgoing_announce(time::now());
1574 }
1575 if let Err(e) = entry.writer.send_frame(&data) {
1576 log::warn!("[{}] send failed: {}", entry.info.id.0, e);
1577 } else if is_announce {
1578 let header_type = (data[0] >> 6) & 0x03;
1581 let dest_start = if header_type == 1 { 18usize } else { 2usize };
1582 let dest_preview = if data.len() >= dest_start + 4 {
1583 format!("{:02x?}", &data[dest_start..dest_start+4])
1584 } else {
1585 "??".into()
1586 };
1587 log::debug!("Announce SENT OK on interface {} (len={}, h={}, dest=[{}])",
1588 interface.0, data.len(), header_type, dest_preview);
1589 }
1590 }
1591 }
1592 }
1593 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1594 #[cfg(feature = "rns-hooks")]
1595 {
1596 let pkt_ctx = rns_hooks::PacketContext {
1597 flags: if raw.is_empty() { 0 } else { raw[0] },
1598 hops: if raw.len() > 1 { raw[1] } else { 0 },
1599 destination_hash: extract_dest_hash(&raw),
1600 context: 0,
1601 packet_hash: [0; 32],
1602 interface_id: 0,
1603 data_offset: 0,
1604 data_len: raw.len() as u32,
1605 };
1606 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1607 let now = time::now();
1608 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1609 {
1610 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::BroadcastOnAllInterfaces as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1611 if let Some(ref e) = exec {
1612 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1613 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1614 }
1615 }
1616 }
1617 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1618 for entry in self.interfaces.values_mut() {
1619 if entry.online && Some(entry.id) != exclude {
1620 let data = if let Some(ref ifac_state) = entry.ifac {
1621 ifac::mask_outbound(&raw, ifac_state)
1622 } else {
1623 raw.clone()
1624 };
1625 entry.stats.txb += data.len() as u64;
1627 entry.stats.tx_packets += 1;
1628 if is_announce {
1629 entry.stats.record_outgoing_announce(time::now());
1630 }
1631 if let Err(e) = entry.writer.send_frame(&data) {
1632 log::warn!("[{}] broadcast failed: {}", entry.info.id.0, e);
1633 }
1634 }
1635 }
1636 }
1637 TransportAction::DeliverLocal {
1638 destination_hash,
1639 raw,
1640 packet_hash,
1641 receiving_interface,
1642 } => {
1643 #[cfg(feature = "rns-hooks")]
1644 {
1645 let pkt_ctx = rns_hooks::PacketContext {
1646 flags: 0,
1647 hops: 0,
1648 destination_hash,
1649 context: 0,
1650 packet_hash,
1651 interface_id: receiving_interface.0,
1652 data_offset: 0,
1653 data_len: raw.len() as u32,
1654 };
1655 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &raw };
1656 let now = time::now();
1657 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1658 {
1659 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::DeliverLocal as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1660 if let Some(ref e) = exec {
1661 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1662 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1663 }
1664 }
1665 }
1666 if destination_hash == self.tunnel_synth_dest {
1667 self.handle_tunnel_synth_delivery(&raw);
1669 } else if destination_hash == self.path_request_dest {
1670 if let Ok(packet) = RawPacket::unpack(&raw) {
1672 let actions = self.engine.handle_path_request(
1673 &packet.data,
1674 InterfaceId(0), time::now(),
1676 );
1677 self.dispatch_all(actions);
1678 }
1679 } else if self.link_manager.is_link_destination(&destination_hash) {
1680 let link_actions = self.link_manager.handle_local_delivery(
1682 destination_hash, &raw, packet_hash, receiving_interface, &mut self.rng,
1683 );
1684 if link_actions.is_empty() {
1685 if let Ok(packet) = RawPacket::unpack(&raw) {
1689 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1690 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1691 continue;
1692 }
1693 }
1694 self.maybe_generate_proof(destination_hash, &packet_hash);
1695 self.callbacks.on_local_delivery(
1696 rns_core::types::DestHash(destination_hash),
1697 raw,
1698 rns_core::types::PacketHash(packet_hash),
1699 );
1700 } else {
1701 self.dispatch_link_actions(link_actions);
1702 }
1703 } else {
1704 if let Ok(packet) = RawPacket::unpack(&raw) {
1706 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1707 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1708 continue;
1709 }
1710 }
1711
1712 self.maybe_generate_proof(destination_hash, &packet_hash);
1714
1715 self.callbacks
1716 .on_local_delivery(
1717 rns_core::types::DestHash(destination_hash),
1718 raw,
1719 rns_core::types::PacketHash(packet_hash),
1720 );
1721 }
1722 }
1723 TransportAction::AnnounceReceived {
1724 destination_hash,
1725 identity_hash,
1726 public_key,
1727 name_hash,
1728 app_data,
1729 hops,
1730 receiving_interface,
1731 ..
1732 } => {
1733 #[cfg(feature = "rns-hooks")]
1734 {
1735 let ctx = HookContext::Announce {
1736 destination_hash,
1737 hops,
1738 interface_id: receiving_interface.0,
1739 };
1740 let now = time::now();
1741 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1742 {
1743 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1744 if let Some(ref e) = exec {
1745 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1746 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1747 }
1748 }
1749 }
1750 #[cfg(not(feature = "rns-hooks"))]
1752 let _ = receiving_interface;
1753
1754 if name_hash == self.discovery_name_hash {
1758 if self.discover_interfaces {
1759 if let Some(ref app_data) = app_data {
1760 if let Some(mut discovered) = crate::discovery::parse_interface_announce(
1761 app_data,
1762 &identity_hash,
1763 hops,
1764 self.discovery_required_value,
1765 ) {
1766 if let Ok(Some(existing)) = self.discovered_interfaces.load(&discovered.discovery_hash) {
1768 discovered.discovered = existing.discovered;
1769 discovered.heard_count = existing.heard_count + 1;
1770 }
1771 if let Err(e) = self.discovered_interfaces.store(&discovered) {
1772 log::warn!("Failed to store discovered interface: {}", e);
1773 } else {
1774 log::debug!(
1775 "Discovered interface '{}' ({}) at {}:{} [stamp={}]",
1776 discovered.name,
1777 discovered.interface_type,
1778 discovered.reachable_on.as_deref().unwrap_or("?"),
1779 discovered.port.map(|p| p.to_string()).unwrap_or_else(|| "?".into()),
1780 discovered.stamp_value,
1781 );
1782 }
1783 }
1784 }
1785 }
1786 }
1788
1789 let announced = crate::destination::AnnouncedIdentity {
1791 dest_hash: rns_core::types::DestHash(destination_hash),
1792 identity_hash: rns_core::types::IdentityHash(identity_hash),
1793 public_key,
1794 app_data: app_data.clone(),
1795 hops,
1796 received_at: time::now(),
1797 };
1798 self.known_destinations.insert(destination_hash, announced.clone());
1799 self.callbacks.on_announce(announced);
1800 }
1801 TransportAction::PathUpdated {
1802 destination_hash,
1803 hops,
1804 interface,
1805 ..
1806 } => {
1807 #[cfg(feature = "rns-hooks")]
1808 {
1809 let ctx = HookContext::Announce {
1810 destination_hash,
1811 hops,
1812 interface_id: interface.0,
1813 };
1814 let now = time::now();
1815 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1816 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PathUpdated as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1817 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1818 }
1819 }
1820 #[cfg(not(feature = "rns-hooks"))]
1821 let _ = interface;
1822
1823 self.callbacks.on_path_updated(rns_core::types::DestHash(destination_hash), hops);
1824 }
1825 TransportAction::ForwardToLocalClients { raw, exclude } => {
1826 for entry in self.interfaces.values_mut() {
1827 if entry.online
1828 && entry.info.is_local_client
1829 && Some(entry.id) != exclude
1830 {
1831 let data = if let Some(ref ifac_state) = entry.ifac {
1832 ifac::mask_outbound(&raw, ifac_state)
1833 } else {
1834 raw.clone()
1835 };
1836 entry.stats.txb += data.len() as u64;
1837 entry.stats.tx_packets += 1;
1838 if let Err(e) = entry.writer.send_frame(&data) {
1839 log::warn!("[{}] forward to local client failed: {}", entry.info.id.0, e);
1840 }
1841 }
1842 }
1843 }
1844 TransportAction::ForwardPlainBroadcast { raw, to_local, exclude } => {
1845 for entry in self.interfaces.values_mut() {
1846 if entry.online
1847 && entry.info.is_local_client == to_local
1848 && Some(entry.id) != exclude
1849 {
1850 let data = if let Some(ref ifac_state) = entry.ifac {
1851 ifac::mask_outbound(&raw, ifac_state)
1852 } else {
1853 raw.clone()
1854 };
1855 entry.stats.txb += data.len() as u64;
1856 entry.stats.tx_packets += 1;
1857 if let Err(e) = entry.writer.send_frame(&data) {
1858 log::warn!("[{}] forward plain broadcast failed: {}", entry.info.id.0, e);
1859 }
1860 }
1861 }
1862 }
1863 TransportAction::CacheAnnounce { packet_hash, raw } => {
1864 if let Some(ref cache) = self.announce_cache {
1865 if let Err(e) = cache.store(&packet_hash, &raw, None) {
1866 log::warn!("Failed to cache announce: {}", e);
1867 }
1868 }
1869 }
1870 TransportAction::TunnelSynthesize { interface, data, dest_hash } => {
1871 #[cfg(feature = "rns-hooks")]
1872 {
1873 let pkt_ctx = rns_hooks::PacketContext {
1874 flags: 0,
1875 hops: 0,
1876 destination_hash: dest_hash,
1877 context: 0,
1878 packet_hash: [0; 32],
1879 interface_id: interface.0,
1880 data_offset: 0,
1881 data_len: data.len() as u32,
1882 };
1883 let ctx = HookContext::Packet { ctx: &pkt_ctx, raw: &data };
1884 let now = time::now();
1885 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1886 {
1887 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::TunnelSynthesize as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1888 if let Some(ref e) = exec {
1889 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1890 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1891 }
1892 }
1893 }
1894 let flags = rns_core::packet::PacketFlags {
1896 header_type: rns_core::constants::HEADER_1,
1897 context_flag: rns_core::constants::FLAG_UNSET,
1898 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1899 destination_type: rns_core::constants::DESTINATION_PLAIN,
1900 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1901 };
1902 if let Ok(packet) = rns_core::packet::RawPacket::pack(
1903 flags, 0, &dest_hash, None,
1904 rns_core::constants::CONTEXT_NONE, &data,
1905 ) {
1906 if let Some(entry) = self.interfaces.get_mut(&interface) {
1907 if entry.online {
1908 let raw = if let Some(ref ifac_state) = entry.ifac {
1909 ifac::mask_outbound(&packet.raw, ifac_state)
1910 } else {
1911 packet.raw
1912 };
1913 entry.stats.txb += raw.len() as u64;
1914 entry.stats.tx_packets += 1;
1915 if let Err(e) = entry.writer.send_frame(&raw) {
1916 log::warn!("[{}] tunnel synthesize send failed: {}", entry.info.id.0, e);
1917 }
1918 }
1919 }
1920 }
1921 }
1922 TransportAction::TunnelEstablished { tunnel_id, interface } => {
1923 log::info!("Tunnel established: {:02x?} on interface {}", &tunnel_id[..4], interface.0);
1924 }
1925 TransportAction::AnnounceRetransmit { destination_hash, hops, interface } => {
1926 #[cfg(feature = "rns-hooks")]
1927 {
1928 let ctx = HookContext::Announce {
1929 destination_hash,
1930 hops,
1931 interface_id: interface.map(|i| i.0).unwrap_or(0),
1932 };
1933 let now = time::now();
1934 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1935 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceRetransmit as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1936 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1937 }
1938 }
1939 #[cfg(not(feature = "rns-hooks"))]
1940 {
1941 let _ = (destination_hash, hops, interface);
1942 }
1943 }
1944 TransportAction::LinkRequestReceived { link_id, destination_hash: _, receiving_interface } => {
1945 #[cfg(feature = "rns-hooks")]
1946 {
1947 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
1948 let now = time::now();
1949 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1950 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1951 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1952 }
1953 }
1954 #[cfg(not(feature = "rns-hooks"))]
1955 {
1956 let _ = (link_id, receiving_interface);
1957 }
1958 }
1959 TransportAction::LinkEstablished { link_id, interface } => {
1960 #[cfg(feature = "rns-hooks")]
1961 {
1962 let ctx = HookContext::Link { link_id, interface_id: interface.0 };
1963 let now = time::now();
1964 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1965 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1966 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1967 }
1968 }
1969 #[cfg(not(feature = "rns-hooks"))]
1970 {
1971 let _ = (link_id, interface);
1972 }
1973 }
1974 TransportAction::LinkClosed { link_id } => {
1975 #[cfg(feature = "rns-hooks")]
1976 {
1977 let ctx = HookContext::Link { link_id, interface_id: 0 };
1978 let now = time::now();
1979 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1980 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1981 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1982 }
1983 }
1984 #[cfg(not(feature = "rns-hooks"))]
1985 {
1986 let _ = link_id;
1987 }
1988 }
1989 }
1990 }
1991
1992 #[cfg(feature = "rns-hooks")]
1994 if !hook_injected.is_empty() {
1995 self.dispatch_all(hook_injected);
1996 }
1997 }
1998
1999 fn dispatch_link_actions(&mut self, actions: Vec<LinkManagerAction>) {
2001 #[cfg(feature = "rns-hooks")]
2002 let mut hook_injected: Vec<TransportAction> = Vec::new();
2003
2004 for action in actions {
2005 match action {
2006 LinkManagerAction::SendPacket { raw, dest_type, attached_interface } => {
2007 match RawPacket::unpack(&raw) {
2009 Ok(packet) => {
2010 let transport_actions = self.engine.handle_outbound(
2011 &packet,
2012 dest_type,
2013 attached_interface,
2014 time::now(),
2015 );
2016 self.dispatch_all(transport_actions);
2017 }
2018 Err(e) => {
2019 log::warn!("LinkManager SendPacket: failed to unpack: {:?}", e);
2020 }
2021 }
2022 }
2023 LinkManagerAction::LinkEstablished { link_id, dest_hash, rtt, is_initiator } => {
2024 #[cfg(feature = "rns-hooks")]
2025 {
2026 let ctx = HookContext::Link { link_id, interface_id: 0 };
2027 let now = time::now();
2028 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
2029 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
2030 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
2031 }
2032 }
2033 log::info!(
2034 "Link established: {:02x?} rtt={:.3}s initiator={}",
2035 &link_id[..4], rtt, is_initiator,
2036 );
2037 self.callbacks.on_link_established(
2038 rns_core::types::LinkId(link_id),
2039 rns_core::types::DestHash(dest_hash),
2040 rtt,
2041 is_initiator,
2042 );
2043 }
2044 LinkManagerAction::LinkClosed { link_id, reason } => {
2045 #[cfg(feature = "rns-hooks")]
2046 {
2047 let ctx = HookContext::Link { link_id, interface_id: 0 };
2048 let now = time::now();
2049 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
2050 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
2051 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
2052 }
2053 }
2054 log::info!("Link closed: {:02x?} reason={:?}", &link_id[..4], reason);
2055 self.holepunch_manager.link_closed(&link_id);
2056 self.callbacks.on_link_closed(rns_core::types::LinkId(link_id), reason);
2057 }
2058 LinkManagerAction::RemoteIdentified { link_id, identity_hash, public_key } => {
2059 log::debug!(
2060 "Remote identified on link {:02x?}: {:02x?}",
2061 &link_id[..4], &identity_hash[..4],
2062 );
2063 self.callbacks.on_remote_identified(
2064 rns_core::types::LinkId(link_id),
2065 rns_core::types::IdentityHash(identity_hash),
2066 public_key,
2067 );
2068 }
2069 LinkManagerAction::RegisterLinkDest { link_id } => {
2070 self.engine.register_destination(link_id, rns_core::constants::DESTINATION_LINK);
2072 }
2073 LinkManagerAction::DeregisterLinkDest { link_id } => {
2074 self.engine.deregister_destination(&link_id);
2075 }
2076 LinkManagerAction::ManagementRequest {
2077 link_id, path_hash, data, request_id, remote_identity,
2078 } => {
2079 self.handle_management_request(
2080 link_id, path_hash, data, request_id, remote_identity,
2081 );
2082 }
2083 LinkManagerAction::ResourceReceived { link_id, data, metadata } => {
2084 self.callbacks.on_resource_received(rns_core::types::LinkId(link_id), data, metadata);
2085 }
2086 LinkManagerAction::ResourceCompleted { link_id } => {
2087 self.callbacks.on_resource_completed(rns_core::types::LinkId(link_id));
2088 }
2089 LinkManagerAction::ResourceFailed { link_id, error } => {
2090 log::debug!("Resource failed on link {:02x?}: {}", &link_id[..4], error);
2091 self.callbacks.on_resource_failed(rns_core::types::LinkId(link_id), error);
2092 }
2093 LinkManagerAction::ResourceProgress { link_id, received, total } => {
2094 self.callbacks.on_resource_progress(rns_core::types::LinkId(link_id), received, total);
2095 }
2096 LinkManagerAction::ResourceAcceptQuery { link_id, resource_hash, transfer_size, has_metadata } => {
2097 let accept = self.callbacks.on_resource_accept_query(
2098 rns_core::types::LinkId(link_id), resource_hash.clone(), transfer_size, has_metadata,
2099 );
2100 let accept_actions = self.link_manager.accept_resource(
2101 &link_id, &resource_hash, accept, &mut self.rng,
2102 );
2103 self.dispatch_link_actions(accept_actions);
2105 }
2106 LinkManagerAction::ChannelMessageReceived { link_id, msgtype, payload } => {
2107 if HolePunchManager::is_holepunch_message(msgtype) {
2109 let derived_key = self.link_manager.get_derived_key(&link_id);
2110 let tx = self.get_event_sender();
2111 let (handled, hp_actions) = self.holepunch_manager.handle_signal(
2112 link_id, msgtype, payload, derived_key.as_deref(), &tx,
2113 );
2114 if handled {
2115 self.dispatch_holepunch_actions(hp_actions);
2116 }
2117 } else {
2118 self.callbacks.on_channel_message(rns_core::types::LinkId(link_id), msgtype, payload);
2119 }
2120 }
2121 LinkManagerAction::LinkDataReceived { link_id, context, data } => {
2122 self.callbacks.on_link_data(rns_core::types::LinkId(link_id), context, data);
2123 }
2124 LinkManagerAction::ResponseReceived { link_id, request_id, data } => {
2125 self.callbacks.on_response(rns_core::types::LinkId(link_id), request_id, data);
2126 }
2127 LinkManagerAction::LinkRequestReceived { link_id, receiving_interface } => {
2128 #[cfg(feature = "rns-hooks")]
2129 {
2130 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
2131 let now = time::now();
2132 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
2133 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
2134 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
2135 }
2136 }
2137 #[cfg(not(feature = "rns-hooks"))]
2138 { let _ = (link_id, receiving_interface); }
2139 }
2140 }
2141 }
2142
2143 #[cfg(feature = "rns-hooks")]
2145 if !hook_injected.is_empty() {
2146 self.dispatch_all(hook_injected);
2147 }
2148 }
2149
2150 fn dispatch_holepunch_actions(&mut self, actions: Vec<HolePunchManagerAction>) {
2152 for action in actions {
2153 match action {
2154 HolePunchManagerAction::SendChannelMessage { link_id, msgtype, payload } => {
2155 let link_actions = self.link_manager.send_channel_message(
2156 &link_id, msgtype, &payload, &mut self.rng,
2157 );
2158 self.dispatch_link_actions(link_actions);
2159 }
2160 HolePunchManagerAction::DirectConnectEstablished { link_id, session_id, interface_id, rtt, mtu } => {
2161 log::info!(
2162 "Direct connection established for link {:02x?} session {:02x?} iface {} rtt={:.1}ms mtu={}",
2163 &link_id[..4], &session_id[..4], interface_id.0, rtt * 1000.0, mtu
2164 );
2165 self.engine.redirect_path(&link_id, interface_id, time::now());
2167 self.link_manager.set_link_rtt(&link_id, rtt);
2169 self.link_manager.set_link_mtu(&link_id, mtu);
2170 self.link_manager.record_link_inbound(&link_id);
2173 self.link_manager.flush_channel_tx(&link_id);
2175 self.callbacks.on_direct_connect_established(
2176 rns_core::types::LinkId(link_id),
2177 interface_id,
2178 );
2179 }
2180 HolePunchManagerAction::DirectConnectFailed { link_id, session_id, reason } => {
2181 log::debug!(
2182 "Direct connection failed for link {:02x?} session {:02x?} reason={}",
2183 &link_id[..4], &session_id[..4], reason
2184 );
2185 self.callbacks.on_direct_connect_failed(
2186 rns_core::types::LinkId(link_id),
2187 reason,
2188 );
2189 }
2190 }
2191 }
2192 }
2193
2194 fn get_event_sender(&self) -> crate::event::EventSender {
2199 self.event_tx.clone()
2203 }
2204
2205 const MANAGEMENT_ANNOUNCE_INTERVAL: f64 = 300.0;
2207
2208 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
2210
2211 fn tick_discovery_announcer(&mut self, now: f64) {
2213 let announcer = match self.interface_announcer.as_mut() {
2214 Some(a) => a,
2215 None => return,
2216 };
2217
2218 announcer.maybe_start(now);
2219
2220 let stamp_result = match announcer.poll_ready() {
2221 Some(r) => r,
2222 None => return,
2223 };
2224
2225 let identity = match self.transport_identity.as_ref() {
2226 Some(id) => id,
2227 None => {
2228 log::warn!("Discovery: stamp ready but no transport identity");
2229 return;
2230 }
2231 };
2232
2233 let identity_hash = identity.hash();
2235 let disc_dest = rns_core::destination::destination_hash(
2236 crate::discovery::APP_NAME,
2237 &["discovery", "interface"],
2238 Some(&identity_hash),
2239 );
2240 let name_hash = self.discovery_name_hash;
2241 let mut random_hash = [0u8; 10];
2242 self.rng.fill_bytes(&mut random_hash);
2243
2244 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
2245 identity, &disc_dest, &name_hash, &random_hash,
2246 None, Some(&stamp_result.app_data),
2247 ) {
2248 Ok(v) => v,
2249 Err(e) => {
2250 log::warn!("Discovery: failed to pack announce: {}", e);
2251 return;
2252 }
2253 };
2254
2255 let flags = rns_core::packet::PacketFlags {
2256 header_type: rns_core::constants::HEADER_1,
2257 context_flag: rns_core::constants::FLAG_UNSET,
2258 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
2259 destination_type: rns_core::constants::DESTINATION_SINGLE,
2260 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
2261 };
2262
2263 let packet = match RawPacket::pack(
2264 flags, 0, &disc_dest, None,
2265 rns_core::constants::CONTEXT_NONE, &announce_data,
2266 ) {
2267 Ok(p) => p,
2268 Err(e) => {
2269 log::warn!("Discovery: failed to pack packet: {}", e);
2270 return;
2271 }
2272 };
2273
2274 let outbound_actions = self.engine.handle_outbound(
2275 &packet, rns_core::constants::DESTINATION_SINGLE, None, now,
2276 );
2277 log::debug!(
2278 "Discovery announce sent for interface #{} ({} actions, dest={:02x?})",
2279 stamp_result.index, outbound_actions.len(), &disc_dest[..4],
2280 );
2281 self.dispatch_all(outbound_actions);
2282 }
2283
2284 fn tick_management_announces(&mut self, now: f64) {
2286 if self.transport_identity.is_none() {
2287 return;
2288 }
2289
2290 let uptime = now - self.started;
2291
2292 if !self.initial_announce_sent {
2294 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
2295 return;
2296 }
2297 self.initial_announce_sent = true;
2298 self.emit_management_announces(now);
2299 return;
2300 }
2301
2302 if now - self.last_management_announce >= Self::MANAGEMENT_ANNOUNCE_INTERVAL {
2304 self.emit_management_announces(now);
2305 }
2306 }
2307
2308 fn emit_management_announces(&mut self, now: f64) {
2310 use crate::management;
2311
2312 self.last_management_announce = now;
2313
2314 let identity = match self.transport_identity {
2315 Some(ref id) => id,
2316 None => return,
2317 };
2318
2319 let mgmt_raw = if self.management_config.enable_remote_management {
2321 management::build_management_announce(identity, &mut self.rng)
2322 } else {
2323 None
2324 };
2325
2326 let bh_raw = if self.management_config.publish_blackhole {
2327 management::build_blackhole_announce(identity, &mut self.rng)
2328 } else {
2329 None
2330 };
2331
2332 let probe_raw = if self.probe_responder_hash.is_some() {
2333 management::build_probe_announce(identity, &mut self.rng)
2334 } else {
2335 None
2336 };
2337
2338 if let Some(raw) = mgmt_raw {
2339 if let Ok(packet) = RawPacket::unpack(&raw) {
2340 let actions = self.engine.handle_outbound(
2341 &packet,
2342 rns_core::constants::DESTINATION_SINGLE,
2343 None,
2344 now,
2345 );
2346 self.dispatch_all(actions);
2347 log::debug!("Emitted management destination announce");
2348 }
2349 }
2350
2351 if let Some(raw) = bh_raw {
2352 if let Ok(packet) = RawPacket::unpack(&raw) {
2353 let actions = self.engine.handle_outbound(
2354 &packet,
2355 rns_core::constants::DESTINATION_SINGLE,
2356 None,
2357 now,
2358 );
2359 self.dispatch_all(actions);
2360 log::debug!("Emitted blackhole info announce");
2361 }
2362 }
2363
2364 if let Some(raw) = probe_raw {
2365 if let Ok(packet) = RawPacket::unpack(&raw) {
2366 let actions = self.engine.handle_outbound(
2367 &packet,
2368 rns_core::constants::DESTINATION_SINGLE,
2369 None,
2370 now,
2371 );
2372 self.dispatch_all(actions);
2373 log::debug!("Emitted probe responder announce");
2374 }
2375 }
2376 }
2377
2378 fn handle_management_request(
2380 &mut self,
2381 link_id: [u8; 16],
2382 path_hash: [u8; 16],
2383 data: Vec<u8>,
2384 request_id: [u8; 16],
2385 remote_identity: Option<([u8; 16], [u8; 64])>,
2386 ) {
2387 use crate::management;
2388
2389 let is_restricted = path_hash == management::status_path_hash()
2391 || path_hash == management::path_path_hash();
2392
2393 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2394 match remote_identity {
2395 Some((identity_hash, _)) => {
2396 if !self.management_config.remote_management_allowed.contains(&identity_hash) {
2397 log::debug!("Management request denied: identity not in allowed list");
2398 return;
2399 }
2400 }
2401 None => {
2402 log::debug!("Management request denied: peer not identified");
2403 return;
2404 }
2405 }
2406 }
2407
2408 let response_data = if path_hash == management::status_path_hash() {
2409 management::handle_status_request(&data, &self.engine, &self.interfaces, self.started, self.probe_responder_hash)
2410 } else if path_hash == management::path_path_hash() {
2411 management::handle_path_request(&data, &self.engine)
2412 } else if path_hash == management::list_path_hash() {
2413 management::handle_blackhole_list_request(&self.engine)
2414 } else {
2415 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2416 None
2417 };
2418
2419 if let Some(response) = response_data {
2420 let actions = self.link_manager.send_management_response(
2421 &link_id, &request_id, &response, &mut self.rng,
2422 );
2423 self.dispatch_link_actions(actions);
2424 }
2425 }
2426}
2427
2428#[cfg(test)]
2429mod tests {
2430 use super::*;
2431 use crate::event;
2432 use crate::interface::Writer;
2433 use rns_core::announce::AnnounceData;
2434 use rns_core::constants;
2435 use rns_core::packet::PacketFlags;
2436 use rns_core::transport::types::InterfaceInfo;
2437 use rns_crypto::identity::Identity;
2438 use std::io;
2439 use std::sync::mpsc;
2440 use std::sync::{Arc, Mutex};
2441
2442 struct MockWriter {
2443 sent: Arc<Mutex<Vec<Vec<u8>>>>,
2444 }
2445
2446 impl MockWriter {
2447 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
2448 let sent = Arc::new(Mutex::new(Vec::new()));
2449 (MockWriter { sent: sent.clone() }, sent)
2450 }
2451 }
2452
2453 impl Writer for MockWriter {
2454 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
2455 self.sent.lock().unwrap().push(data.to_vec());
2456 Ok(())
2457 }
2458 }
2459
2460 use rns_core::types::{DestHash, IdentityHash, LinkId as TypedLinkId, PacketHash};
2461
2462 struct MockCallbacks {
2463 announces: Arc<Mutex<Vec<(DestHash, u8)>>>,
2464 paths: Arc<Mutex<Vec<(DestHash, u8)>>>,
2465 deliveries: Arc<Mutex<Vec<DestHash>>>,
2466 iface_ups: Arc<Mutex<Vec<InterfaceId>>>,
2467 iface_downs: Arc<Mutex<Vec<InterfaceId>>>,
2468 link_established: Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2469 link_closed: Arc<Mutex<Vec<TypedLinkId>>>,
2470 remote_identified: Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2471 resources_received: Arc<Mutex<Vec<(TypedLinkId, Vec<u8>)>>>,
2472 resource_completed: Arc<Mutex<Vec<TypedLinkId>>>,
2473 resource_failed: Arc<Mutex<Vec<(TypedLinkId, String)>>>,
2474 channel_messages: Arc<Mutex<Vec<(TypedLinkId, u16, Vec<u8>)>>>,
2475 link_data: Arc<Mutex<Vec<(TypedLinkId, u8, Vec<u8>)>>>,
2476 responses: Arc<Mutex<Vec<(TypedLinkId, [u8; 16], Vec<u8>)>>>,
2477 proofs: Arc<Mutex<Vec<(DestHash, PacketHash, f64)>>>,
2478 proof_requested: Arc<Mutex<Vec<(DestHash, PacketHash)>>>,
2479 }
2480
2481 impl MockCallbacks {
2482 fn new() -> (
2483 Self,
2484 Arc<Mutex<Vec<(DestHash, u8)>>>,
2485 Arc<Mutex<Vec<(DestHash, u8)>>>,
2486 Arc<Mutex<Vec<DestHash>>>,
2487 Arc<Mutex<Vec<InterfaceId>>>,
2488 Arc<Mutex<Vec<InterfaceId>>>,
2489 ) {
2490 let announces = Arc::new(Mutex::new(Vec::new()));
2491 let paths = Arc::new(Mutex::new(Vec::new()));
2492 let deliveries = Arc::new(Mutex::new(Vec::new()));
2493 let iface_ups = Arc::new(Mutex::new(Vec::new()));
2494 let iface_downs = Arc::new(Mutex::new(Vec::new()));
2495 (
2496 MockCallbacks {
2497 announces: announces.clone(),
2498 paths: paths.clone(),
2499 deliveries: deliveries.clone(),
2500 iface_ups: iface_ups.clone(),
2501 iface_downs: iface_downs.clone(),
2502 link_established: Arc::new(Mutex::new(Vec::new())),
2503 link_closed: Arc::new(Mutex::new(Vec::new())),
2504 remote_identified: Arc::new(Mutex::new(Vec::new())),
2505 resources_received: Arc::new(Mutex::new(Vec::new())),
2506 resource_completed: Arc::new(Mutex::new(Vec::new())),
2507 resource_failed: Arc::new(Mutex::new(Vec::new())),
2508 channel_messages: Arc::new(Mutex::new(Vec::new())),
2509 link_data: Arc::new(Mutex::new(Vec::new())),
2510 responses: Arc::new(Mutex::new(Vec::new())),
2511 proofs: Arc::new(Mutex::new(Vec::new())),
2512 proof_requested: Arc::new(Mutex::new(Vec::new())),
2513 },
2514 announces,
2515 paths,
2516 deliveries,
2517 iface_ups,
2518 iface_downs,
2519 )
2520 }
2521
2522 fn with_link_tracking() -> (
2523 Self,
2524 Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2525 Arc<Mutex<Vec<TypedLinkId>>>,
2526 Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2527 ) {
2528 let link_established = Arc::new(Mutex::new(Vec::new()));
2529 let link_closed = Arc::new(Mutex::new(Vec::new()));
2530 let remote_identified = Arc::new(Mutex::new(Vec::new()));
2531 (
2532 MockCallbacks {
2533 announces: Arc::new(Mutex::new(Vec::new())),
2534 paths: Arc::new(Mutex::new(Vec::new())),
2535 deliveries: Arc::new(Mutex::new(Vec::new())),
2536 iface_ups: Arc::new(Mutex::new(Vec::new())),
2537 iface_downs: Arc::new(Mutex::new(Vec::new())),
2538 link_established: link_established.clone(),
2539 link_closed: link_closed.clone(),
2540 remote_identified: remote_identified.clone(),
2541 resources_received: Arc::new(Mutex::new(Vec::new())),
2542 resource_completed: Arc::new(Mutex::new(Vec::new())),
2543 resource_failed: Arc::new(Mutex::new(Vec::new())),
2544 channel_messages: Arc::new(Mutex::new(Vec::new())),
2545 link_data: Arc::new(Mutex::new(Vec::new())),
2546 responses: Arc::new(Mutex::new(Vec::new())),
2547 proofs: Arc::new(Mutex::new(Vec::new())),
2548 proof_requested: Arc::new(Mutex::new(Vec::new())),
2549 },
2550 link_established,
2551 link_closed,
2552 remote_identified,
2553 )
2554 }
2555 }
2556
2557 impl Callbacks for MockCallbacks {
2558 fn on_announce(&mut self, announced: crate::destination::AnnouncedIdentity) {
2559 self.announces.lock().unwrap().push((announced.dest_hash, announced.hops));
2560 }
2561
2562 fn on_path_updated(&mut self, dest_hash: DestHash, hops: u8) {
2563 self.paths.lock().unwrap().push((dest_hash, hops));
2564 }
2565
2566 fn on_local_delivery(&mut self, dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
2567 self.deliveries.lock().unwrap().push(dest_hash);
2568 }
2569
2570 fn on_interface_up(&mut self, id: InterfaceId) {
2571 self.iface_ups.lock().unwrap().push(id);
2572 }
2573
2574 fn on_interface_down(&mut self, id: InterfaceId) {
2575 self.iface_downs.lock().unwrap().push(id);
2576 }
2577
2578 fn on_link_established(&mut self, link_id: TypedLinkId, _dest_hash: DestHash, rtt: f64, is_initiator: bool) {
2579 self.link_established.lock().unwrap().push((link_id, rtt, is_initiator));
2580 }
2581
2582 fn on_link_closed(&mut self, link_id: TypedLinkId, _reason: Option<rns_core::link::TeardownReason>) {
2583 self.link_closed.lock().unwrap().push(link_id);
2584 }
2585
2586 fn on_remote_identified(&mut self, link_id: TypedLinkId, identity_hash: IdentityHash, _public_key: [u8; 64]) {
2587 self.remote_identified.lock().unwrap().push((link_id, identity_hash));
2588 }
2589
2590 fn on_resource_received(&mut self, link_id: TypedLinkId, data: Vec<u8>, _metadata: Option<Vec<u8>>) {
2591 self.resources_received.lock().unwrap().push((link_id, data));
2592 }
2593
2594 fn on_resource_completed(&mut self, link_id: TypedLinkId) {
2595 self.resource_completed.lock().unwrap().push(link_id);
2596 }
2597
2598 fn on_resource_failed(&mut self, link_id: TypedLinkId, error: String) {
2599 self.resource_failed.lock().unwrap().push((link_id, error));
2600 }
2601
2602 fn on_channel_message(&mut self, link_id: TypedLinkId, msgtype: u16, payload: Vec<u8>) {
2603 self.channel_messages.lock().unwrap().push((link_id, msgtype, payload));
2604 }
2605
2606 fn on_link_data(&mut self, link_id: TypedLinkId, context: u8, data: Vec<u8>) {
2607 self.link_data.lock().unwrap().push((link_id, context, data));
2608 }
2609
2610 fn on_response(&mut self, link_id: TypedLinkId, request_id: [u8; 16], data: Vec<u8>) {
2611 self.responses.lock().unwrap().push((link_id, request_id, data));
2612 }
2613
2614 fn on_proof(&mut self, dest_hash: DestHash, packet_hash: PacketHash, rtt: f64) {
2615 self.proofs.lock().unwrap().push((dest_hash, packet_hash, rtt));
2616 }
2617
2618 fn on_proof_requested(&mut self, dest_hash: DestHash, packet_hash: PacketHash) -> bool {
2619 self.proof_requested.lock().unwrap().push((dest_hash, packet_hash));
2620 true
2621 }
2622 }
2623
2624 fn make_interface_info(id: u64) -> InterfaceInfo {
2625 InterfaceInfo {
2626 id: InterfaceId(id),
2627 name: format!("test-{}", id),
2628 mode: constants::MODE_FULL,
2629 out_capable: true,
2630 in_capable: true,
2631 bitrate: None,
2632 announce_rate_target: None,
2633 announce_rate_grace: 0,
2634 announce_rate_penalty: 0.0,
2635 announce_cap: rns_core::constants::ANNOUNCE_CAP,
2636 is_local_client: false,
2637 wants_tunnel: false,
2638 tunnel_id: None,
2639 mtu: constants::MTU as u32,
2640 ia_freq: 0.0,
2641 started: 0.0,
2642 ingress_control: false,
2643 }
2644 }
2645
2646 fn make_entry(id: u64, writer: Box<dyn Writer>, online: bool) -> InterfaceEntry {
2647 InterfaceEntry {
2648 id: InterfaceId(id),
2649 info: make_interface_info(id),
2650 writer,
2651 online,
2652 dynamic: false,
2653 ifac: None,
2654 stats: InterfaceStats::default(),
2655 interface_type: String::new(),
2656 }
2657 }
2658
2659 fn build_announce_packet(identity: &Identity) -> Vec<u8> {
2661 let dest_hash = rns_core::destination::destination_hash(
2662 "test",
2663 &["app"],
2664 Some(identity.hash()),
2665 );
2666 let name_hash = rns_core::destination::name_hash("test", &["app"]);
2667 let random_hash = [0x42u8; 10];
2668
2669 let (announce_data, _has_ratchet) = AnnounceData::pack(
2670 identity,
2671 &dest_hash,
2672 &name_hash,
2673 &random_hash,
2674 None,
2675 None,
2676 )
2677 .unwrap();
2678
2679 let flags = PacketFlags {
2680 header_type: constants::HEADER_1,
2681 context_flag: constants::FLAG_UNSET,
2682 transport_type: constants::TRANSPORT_BROADCAST,
2683 destination_type: constants::DESTINATION_SINGLE,
2684 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2685 };
2686
2687 let packet = RawPacket::pack(flags, 0, &dest_hash, None, constants::CONTEXT_NONE, &announce_data).unwrap();
2688 packet.raw
2689 }
2690
2691 #[test]
2692 fn process_inbound_frame() {
2693 let (tx, rx) = event::channel();
2694 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
2695 let mut driver = Driver::new(
2696 TransportConfig { transport_enabled: false, identity_hash: None },
2697 rx,
2698 tx.clone(),
2699 Box::new(cbs),
2700 );
2701 let info = make_interface_info(1);
2702 driver.engine.register_interface(info.clone());
2703 let (writer, _sent) = MockWriter::new();
2704 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2705
2706 let identity = Identity::new(&mut OsRng);
2707 let announce_raw = build_announce_packet(&identity);
2708
2709 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2711 tx.send(Event::Shutdown).unwrap();
2712 driver.run();
2713
2714 assert_eq!(announces.lock().unwrap().len(), 1);
2715 }
2716
2717 #[test]
2718 fn dispatch_send() {
2719 let (tx, rx) = event::channel();
2720 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2721 let mut driver = Driver::new(
2722 TransportConfig { transport_enabled: false, identity_hash: None },
2723 rx,
2724 tx.clone(),
2725 Box::new(cbs),
2726 );
2727 let (writer, sent) = MockWriter::new();
2728 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2729
2730 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2731 interface: InterfaceId(1),
2732 raw: vec![0x01, 0x02, 0x03],
2733 }]);
2734
2735 assert_eq!(sent.lock().unwrap().len(), 1);
2736 assert_eq!(sent.lock().unwrap()[0], vec![0x01, 0x02, 0x03]);
2737
2738 drop(tx);
2739 }
2740
2741 #[test]
2742 fn dispatch_broadcast() {
2743 let (tx, rx) = event::channel();
2744 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2745 let mut driver = Driver::new(
2746 TransportConfig { transport_enabled: false, identity_hash: None },
2747 rx,
2748 tx.clone(),
2749 Box::new(cbs),
2750 );
2751
2752 let (w1, sent1) = MockWriter::new();
2753 let (w2, sent2) = MockWriter::new();
2754 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2755 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2756
2757 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2758 raw: vec![0xAA],
2759 exclude: None,
2760 }]);
2761
2762 assert_eq!(sent1.lock().unwrap().len(), 1);
2763 assert_eq!(sent2.lock().unwrap().len(), 1);
2764
2765 drop(tx);
2766 }
2767
2768 #[test]
2769 fn dispatch_broadcast_exclude() {
2770 let (tx, rx) = event::channel();
2771 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2772 let mut driver = Driver::new(
2773 TransportConfig { transport_enabled: false, identity_hash: None },
2774 rx,
2775 tx.clone(),
2776 Box::new(cbs),
2777 );
2778
2779 let (w1, sent1) = MockWriter::new();
2780 let (w2, sent2) = MockWriter::new();
2781 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2782 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2783
2784 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2785 raw: vec![0xBB],
2786 exclude: Some(InterfaceId(1)),
2787 }]);
2788
2789 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2791
2792 drop(tx);
2793 }
2794
2795 #[test]
2796 fn tick_event() {
2797 let (tx, rx) = event::channel();
2798 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2799 let mut driver = Driver::new(
2800 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
2801 rx,
2802 tx.clone(),
2803 Box::new(cbs),
2804 );
2805 let info = make_interface_info(1);
2806 driver.engine.register_interface(info.clone());
2807 let (writer, _sent) = MockWriter::new();
2808 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2809
2810 tx.send(Event::Tick).unwrap();
2812 tx.send(Event::Shutdown).unwrap();
2813 driver.run();
2814 }
2816
2817 #[test]
2818 fn shutdown_event() {
2819 let (tx, rx) = event::channel();
2820 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2821 let mut driver = Driver::new(
2822 TransportConfig { transport_enabled: false, identity_hash: None },
2823 rx,
2824 tx.clone(),
2825 Box::new(cbs),
2826 );
2827
2828 tx.send(Event::Shutdown).unwrap();
2829 driver.run(); }
2831
2832 #[test]
2833 fn announce_callback() {
2834 let (tx, rx) = event::channel();
2835 let (cbs, announces, paths, _, _, _) = MockCallbacks::new();
2836 let mut driver = Driver::new(
2837 TransportConfig { transport_enabled: false, identity_hash: None },
2838 rx,
2839 tx.clone(),
2840 Box::new(cbs),
2841 );
2842 let info = make_interface_info(1);
2843 driver.engine.register_interface(info.clone());
2844 let (writer, _sent) = MockWriter::new();
2845 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2846
2847 let identity = Identity::new(&mut OsRng);
2848 let announce_raw = build_announce_packet(&identity);
2849
2850 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2851 tx.send(Event::Shutdown).unwrap();
2852 driver.run();
2853
2854 let ann = announces.lock().unwrap();
2855 assert_eq!(ann.len(), 1);
2856 assert_eq!(ann[0].1, 1);
2858
2859 let p = paths.lock().unwrap();
2860 assert_eq!(p.len(), 1);
2861 }
2862
2863 #[test]
2864 fn dispatch_skips_offline_interface() {
2865 let (tx, rx) = event::channel();
2866 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2867 let mut driver = Driver::new(
2868 TransportConfig { transport_enabled: false, identity_hash: None },
2869 rx,
2870 tx.clone(),
2871 Box::new(cbs),
2872 );
2873
2874 let (w1, sent1) = MockWriter::new();
2875 let (w2, sent2) = MockWriter::new();
2876 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), false)); driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2878
2879 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2881 interface: InterfaceId(1),
2882 raw: vec![0x01],
2883 }]);
2884 assert_eq!(sent1.lock().unwrap().len(), 0);
2885
2886 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2888 raw: vec![0x02],
2889 exclude: None,
2890 }]);
2891 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2893
2894 drop(tx);
2895 }
2896
2897 #[test]
2898 fn interface_up_refreshes_writer() {
2899 let (tx, rx) = event::channel();
2900 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2901 let mut driver = Driver::new(
2902 TransportConfig { transport_enabled: false, identity_hash: None },
2903 rx,
2904 tx.clone(),
2905 Box::new(cbs),
2906 );
2907
2908 let (w_old, sent_old) = MockWriter::new();
2909 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w_old), false));
2910
2911 let (w_new, sent_new) = MockWriter::new();
2913 tx.send(Event::InterfaceUp(InterfaceId(1), Some(Box::new(w_new)), None)).unwrap();
2914 tx.send(Event::Shutdown).unwrap();
2915 driver.run();
2916
2917 assert!(driver.interfaces[&InterfaceId(1)].online);
2919
2920 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2922 interface: InterfaceId(1),
2923 raw: vec![0xFF],
2924 }]);
2925
2926 assert_eq!(sent_old.lock().unwrap().len(), 0);
2928 assert_eq!(sent_new.lock().unwrap().len(), 1);
2930 assert_eq!(sent_new.lock().unwrap()[0], vec![0xFF]);
2931
2932 drop(tx);
2933 }
2934
2935 #[test]
2936 fn dynamic_interface_register() {
2937 let (tx, rx) = event::channel();
2938 let (cbs, _, _, _, iface_ups, _) = MockCallbacks::new();
2939 let mut driver = Driver::new(
2940 TransportConfig { transport_enabled: false, identity_hash: None },
2941 rx,
2942 tx.clone(),
2943 Box::new(cbs),
2944 );
2945
2946 let info = make_interface_info(100);
2947 let (writer, sent) = MockWriter::new();
2948
2949 tx.send(Event::InterfaceUp(
2951 InterfaceId(100),
2952 Some(Box::new(writer)),
2953 Some(info),
2954 ))
2955 .unwrap();
2956 tx.send(Event::Shutdown).unwrap();
2957 driver.run();
2958
2959 assert!(driver.interfaces.contains_key(&InterfaceId(100)));
2961 assert!(driver.interfaces[&InterfaceId(100)].online);
2962 assert!(driver.interfaces[&InterfaceId(100)].dynamic);
2963
2964 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2966 assert_eq!(iface_ups.lock().unwrap()[0], InterfaceId(100));
2967
2968 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2970 interface: InterfaceId(100),
2971 raw: vec![0x42],
2972 }]);
2973 assert_eq!(sent.lock().unwrap().len(), 1);
2974
2975 drop(tx);
2976 }
2977
2978 #[test]
2979 fn dynamic_interface_deregister() {
2980 let (tx, rx) = event::channel();
2981 let (cbs, _, _, _, _, iface_downs) = MockCallbacks::new();
2982 let mut driver = Driver::new(
2983 TransportConfig { transport_enabled: false, identity_hash: None },
2984 rx,
2985 tx.clone(),
2986 Box::new(cbs),
2987 );
2988
2989 let info = make_interface_info(200);
2991 driver.engine.register_interface(info.clone());
2992 let (writer, _sent) = MockWriter::new();
2993 driver.interfaces.insert(InterfaceId(200), InterfaceEntry {
2994 id: InterfaceId(200),
2995 info,
2996 writer: Box::new(writer),
2997 online: true,
2998 dynamic: true,
2999 ifac: None,
3000 stats: InterfaceStats::default(),
3001 interface_type: String::new(),
3002 });
3003
3004 tx.send(Event::InterfaceDown(InterfaceId(200))).unwrap();
3006 tx.send(Event::Shutdown).unwrap();
3007 driver.run();
3008
3009 assert!(!driver.interfaces.contains_key(&InterfaceId(200)));
3010 assert_eq!(iface_downs.lock().unwrap().len(), 1);
3011 assert_eq!(iface_downs.lock().unwrap()[0], InterfaceId(200));
3012 }
3013
3014 #[test]
3015 fn interface_callbacks_fire() {
3016 let (tx, rx) = event::channel();
3017 let (cbs, _, _, _, iface_ups, iface_downs) = MockCallbacks::new();
3018 let mut driver = Driver::new(
3019 TransportConfig { transport_enabled: false, identity_hash: None },
3020 rx,
3021 tx.clone(),
3022 Box::new(cbs),
3023 );
3024
3025 let (writer, _) = MockWriter::new();
3027 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), false));
3028
3029 tx.send(Event::InterfaceUp(InterfaceId(1), None, None)).unwrap();
3030 tx.send(Event::InterfaceDown(InterfaceId(1))).unwrap();
3031 tx.send(Event::Shutdown).unwrap();
3032 driver.run();
3033
3034 assert_eq!(iface_ups.lock().unwrap().len(), 1);
3035 assert_eq!(iface_downs.lock().unwrap().len(), 1);
3036 assert!(driver.interfaces.contains_key(&InterfaceId(1)));
3038 assert!(!driver.interfaces[&InterfaceId(1)].online);
3039 }
3040
3041 #[test]
3046 fn frame_updates_rx_stats() {
3047 let (tx, rx) = event::channel();
3048 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3049 let mut driver = Driver::new(
3050 TransportConfig { transport_enabled: false, identity_hash: None },
3051 rx,
3052 tx.clone(),
3053 Box::new(cbs),
3054 );
3055 let info = make_interface_info(1);
3056 driver.engine.register_interface(info.clone());
3057 let (writer, _sent) = MockWriter::new();
3058 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3059
3060 let identity = Identity::new(&mut OsRng);
3061 let announce_raw = build_announce_packet(&identity);
3062 let announce_len = announce_raw.len() as u64;
3063
3064 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3065 tx.send(Event::Shutdown).unwrap();
3066 driver.run();
3067
3068 let stats = &driver.interfaces[&InterfaceId(1)].stats;
3069 assert_eq!(stats.rxb, announce_len);
3070 assert_eq!(stats.rx_packets, 1);
3071 }
3072
3073 #[test]
3074 fn send_updates_tx_stats() {
3075 let (tx, rx) = event::channel();
3076 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3077 let mut driver = Driver::new(
3078 TransportConfig { transport_enabled: false, identity_hash: None },
3079 rx,
3080 tx.clone(),
3081 Box::new(cbs),
3082 );
3083 let (writer, _sent) = MockWriter::new();
3084 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3085
3086 driver.dispatch_all(vec![TransportAction::SendOnInterface {
3087 interface: InterfaceId(1),
3088 raw: vec![0x01, 0x02, 0x03],
3089 }]);
3090
3091 let stats = &driver.interfaces[&InterfaceId(1)].stats;
3092 assert_eq!(stats.txb, 3);
3093 assert_eq!(stats.tx_packets, 1);
3094
3095 drop(tx);
3096 }
3097
3098 #[test]
3099 fn broadcast_updates_tx_stats() {
3100 let (tx, rx) = event::channel();
3101 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3102 let mut driver = Driver::new(
3103 TransportConfig { transport_enabled: false, identity_hash: None },
3104 rx,
3105 tx.clone(),
3106 Box::new(cbs),
3107 );
3108 let (w1, _s1) = MockWriter::new();
3109 let (w2, _s2) = MockWriter::new();
3110 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
3111 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
3112
3113 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
3114 raw: vec![0xAA, 0xBB],
3115 exclude: None,
3116 }]);
3117
3118 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.txb, 2);
3120 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.tx_packets, 1);
3121 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.txb, 2);
3122 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.tx_packets, 1);
3123
3124 drop(tx);
3125 }
3126
3127 #[test]
3128 fn query_interface_stats() {
3129 let (tx, rx) = event::channel();
3130 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3131 let mut driver = Driver::new(
3132 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
3133 rx,
3134 tx.clone(),
3135 Box::new(cbs),
3136 );
3137 let (writer, _sent) = MockWriter::new();
3138 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3139
3140 let (resp_tx, resp_rx) = mpsc::channel();
3141 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
3142 tx.send(Event::Shutdown).unwrap();
3143 driver.run();
3144
3145 let resp = resp_rx.recv().unwrap();
3146 match resp {
3147 QueryResponse::InterfaceStats(stats) => {
3148 assert_eq!(stats.interfaces.len(), 1);
3149 assert_eq!(stats.interfaces[0].name, "test-1");
3150 assert!(stats.interfaces[0].status);
3151 assert_eq!(stats.transport_id, Some([0x42; 16]));
3152 assert!(stats.transport_enabled);
3153 }
3154 _ => panic!("unexpected response"),
3155 }
3156 }
3157
3158 #[test]
3159 fn query_path_table() {
3160 let (tx, rx) = event::channel();
3161 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3162 let mut driver = Driver::new(
3163 TransportConfig { transport_enabled: false, identity_hash: None },
3164 rx,
3165 tx.clone(),
3166 Box::new(cbs),
3167 );
3168 let info = make_interface_info(1);
3169 driver.engine.register_interface(info);
3170 let (writer, _sent) = MockWriter::new();
3171 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3172
3173 let identity = Identity::new(&mut OsRng);
3175 let announce_raw = build_announce_packet(&identity);
3176 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3177
3178 let (resp_tx, resp_rx) = mpsc::channel();
3179 tx.send(Event::Query(QueryRequest::PathTable { max_hops: None }, resp_tx)).unwrap();
3180 tx.send(Event::Shutdown).unwrap();
3181 driver.run();
3182
3183 let resp = resp_rx.recv().unwrap();
3184 match resp {
3185 QueryResponse::PathTable(entries) => {
3186 assert_eq!(entries.len(), 1);
3187 assert_eq!(entries[0].hops, 1);
3188 }
3189 _ => panic!("unexpected response"),
3190 }
3191 }
3192
3193 #[test]
3194 fn query_drop_path() {
3195 let (tx, rx) = event::channel();
3196 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3197 let mut driver = Driver::new(
3198 TransportConfig { transport_enabled: false, identity_hash: None },
3199 rx,
3200 tx.clone(),
3201 Box::new(cbs),
3202 );
3203 let info = make_interface_info(1);
3204 driver.engine.register_interface(info);
3205 let (writer, _sent) = MockWriter::new();
3206 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3207
3208 let identity = Identity::new(&mut OsRng);
3210 let announce_raw = build_announce_packet(&identity);
3211 let dest_hash = rns_core::destination::destination_hash(
3212 "test", &["app"], Some(identity.hash()),
3213 );
3214
3215 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3216
3217 let (resp_tx, resp_rx) = mpsc::channel();
3218 tx.send(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)).unwrap();
3219 tx.send(Event::Shutdown).unwrap();
3220 driver.run();
3221
3222 let resp = resp_rx.recv().unwrap();
3223 match resp {
3224 QueryResponse::DropPath(dropped) => {
3225 assert!(dropped);
3226 }
3227 _ => panic!("unexpected response"),
3228 }
3229 }
3230
3231 #[test]
3232 fn send_outbound_event() {
3233 let (tx, rx) = event::channel();
3234 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3235 let mut driver = Driver::new(
3236 TransportConfig { transport_enabled: false, identity_hash: None },
3237 rx,
3238 tx.clone(),
3239 Box::new(cbs),
3240 );
3241 let (writer, sent) = MockWriter::new();
3242 let info = make_interface_info(1);
3243 driver.engine.register_interface(info);
3244 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3245
3246 let dest = [0xAA; 16];
3248 let flags = PacketFlags {
3249 header_type: constants::HEADER_1,
3250 context_flag: constants::FLAG_UNSET,
3251 transport_type: constants::TRANSPORT_BROADCAST,
3252 destination_type: constants::DESTINATION_PLAIN,
3253 packet_type: constants::PACKET_TYPE_DATA,
3254 };
3255 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3256
3257 tx.send(Event::SendOutbound {
3258 raw: packet.raw,
3259 dest_type: constants::DESTINATION_PLAIN,
3260 attached_interface: None,
3261 }).unwrap();
3262 tx.send(Event::Shutdown).unwrap();
3263 driver.run();
3264
3265 assert_eq!(sent.lock().unwrap().len(), 1);
3267 }
3268
3269 #[test]
3270 fn register_destination_and_deliver() {
3271 let (tx, rx) = event::channel();
3272 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3273 let mut driver = Driver::new(
3274 TransportConfig { transport_enabled: false, identity_hash: None },
3275 rx,
3276 tx.clone(),
3277 Box::new(cbs),
3278 );
3279 let info = make_interface_info(1);
3280 driver.engine.register_interface(info);
3281 let (writer, _sent) = MockWriter::new();
3282 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3283
3284 let dest = [0xBB; 16];
3285
3286 tx.send(Event::RegisterDestination {
3288 dest_hash: dest,
3289 dest_type: constants::DESTINATION_SINGLE,
3290 }).unwrap();
3291
3292 let flags = PacketFlags {
3293 header_type: constants::HEADER_1,
3294 context_flag: constants::FLAG_UNSET,
3295 transport_type: constants::TRANSPORT_BROADCAST,
3296 destination_type: constants::DESTINATION_SINGLE,
3297 packet_type: constants::PACKET_TYPE_DATA,
3298 };
3299 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"data").unwrap();
3300 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3301 tx.send(Event::Shutdown).unwrap();
3302 driver.run();
3303
3304 assert_eq!(deliveries.lock().unwrap().len(), 1);
3305 assert_eq!(deliveries.lock().unwrap()[0], DestHash(dest));
3306 }
3307
3308 #[test]
3309 fn query_transport_identity() {
3310 let (tx, rx) = event::channel();
3311 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3312 let mut driver = Driver::new(
3313 TransportConfig { transport_enabled: true, identity_hash: Some([0xAA; 16]) },
3314 rx,
3315 tx.clone(),
3316 Box::new(cbs),
3317 );
3318
3319 let (resp_tx, resp_rx) = mpsc::channel();
3320 tx.send(Event::Query(QueryRequest::TransportIdentity, resp_tx)).unwrap();
3321 tx.send(Event::Shutdown).unwrap();
3322 driver.run();
3323
3324 match resp_rx.recv().unwrap() {
3325 QueryResponse::TransportIdentity(Some(hash)) => {
3326 assert_eq!(hash, [0xAA; 16]);
3327 }
3328 _ => panic!("unexpected response"),
3329 }
3330 }
3331
3332 #[test]
3333 fn query_link_count() {
3334 let (tx, rx) = event::channel();
3335 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3336 let mut driver = Driver::new(
3337 TransportConfig { transport_enabled: false, identity_hash: None },
3338 rx,
3339 tx.clone(),
3340 Box::new(cbs),
3341 );
3342
3343 let (resp_tx, resp_rx) = mpsc::channel();
3344 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3345 tx.send(Event::Shutdown).unwrap();
3346 driver.run();
3347
3348 match resp_rx.recv().unwrap() {
3349 QueryResponse::LinkCount(count) => assert_eq!(count, 0),
3350 _ => panic!("unexpected response"),
3351 }
3352 }
3353
3354 #[test]
3355 fn query_rate_table() {
3356 let (tx, rx) = event::channel();
3357 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3358 let mut driver = Driver::new(
3359 TransportConfig { transport_enabled: false, identity_hash: None },
3360 rx,
3361 tx.clone(),
3362 Box::new(cbs),
3363 );
3364
3365 let (resp_tx, resp_rx) = mpsc::channel();
3366 tx.send(Event::Query(QueryRequest::RateTable, resp_tx)).unwrap();
3367 tx.send(Event::Shutdown).unwrap();
3368 driver.run();
3369
3370 match resp_rx.recv().unwrap() {
3371 QueryResponse::RateTable(entries) => assert!(entries.is_empty()),
3372 _ => panic!("unexpected response"),
3373 }
3374 }
3375
3376 #[test]
3377 fn query_next_hop() {
3378 let (tx, rx) = event::channel();
3379 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3380 let mut driver = Driver::new(
3381 TransportConfig { transport_enabled: false, identity_hash: None },
3382 rx,
3383 tx.clone(),
3384 Box::new(cbs),
3385 );
3386
3387 let dest = [0xBB; 16];
3388 let (resp_tx, resp_rx) = mpsc::channel();
3389 tx.send(Event::Query(QueryRequest::NextHop { dest_hash: dest }, resp_tx)).unwrap();
3390 tx.send(Event::Shutdown).unwrap();
3391 driver.run();
3392
3393 match resp_rx.recv().unwrap() {
3394 QueryResponse::NextHop(None) => {}
3395 _ => panic!("unexpected response"),
3396 }
3397 }
3398
3399 #[test]
3400 fn query_next_hop_if_name() {
3401 let (tx, rx) = event::channel();
3402 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3403 let mut driver = Driver::new(
3404 TransportConfig { transport_enabled: false, identity_hash: None },
3405 rx,
3406 tx.clone(),
3407 Box::new(cbs),
3408 );
3409
3410 let dest = [0xCC; 16];
3411 let (resp_tx, resp_rx) = mpsc::channel();
3412 tx.send(Event::Query(QueryRequest::NextHopIfName { dest_hash: dest }, resp_tx)).unwrap();
3413 tx.send(Event::Shutdown).unwrap();
3414 driver.run();
3415
3416 match resp_rx.recv().unwrap() {
3417 QueryResponse::NextHopIfName(None) => {}
3418 _ => panic!("unexpected response"),
3419 }
3420 }
3421
3422 #[test]
3423 fn query_drop_all_via() {
3424 let (tx, rx) = event::channel();
3425 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3426 let mut driver = Driver::new(
3427 TransportConfig { transport_enabled: false, identity_hash: None },
3428 rx,
3429 tx.clone(),
3430 Box::new(cbs),
3431 );
3432
3433 let transport = [0xDD; 16];
3434 let (resp_tx, resp_rx) = mpsc::channel();
3435 tx.send(Event::Query(
3436 QueryRequest::DropAllVia { transport_hash: transport },
3437 resp_tx,
3438 )).unwrap();
3439 tx.send(Event::Shutdown).unwrap();
3440 driver.run();
3441
3442 match resp_rx.recv().unwrap() {
3443 QueryResponse::DropAllVia(count) => assert_eq!(count, 0),
3444 _ => panic!("unexpected response"),
3445 }
3446 }
3447
3448 #[test]
3449 fn query_drop_announce_queues() {
3450 let (tx, rx) = event::channel();
3451 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3452 let mut driver = Driver::new(
3453 TransportConfig { transport_enabled: false, identity_hash: None },
3454 rx,
3455 tx.clone(),
3456 Box::new(cbs),
3457 );
3458
3459 let (resp_tx, resp_rx) = mpsc::channel();
3460 tx.send(Event::Query(QueryRequest::DropAnnounceQueues, resp_tx)).unwrap();
3461 tx.send(Event::Shutdown).unwrap();
3462 driver.run();
3463
3464 match resp_rx.recv().unwrap() {
3465 QueryResponse::DropAnnounceQueues => {}
3466 _ => panic!("unexpected response"),
3467 }
3468 }
3469
3470 #[test]
3475 fn register_link_dest_event() {
3476 let (tx, rx) = event::channel();
3477 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3478 let mut driver = Driver::new(
3479 TransportConfig { transport_enabled: false, identity_hash: None },
3480 rx,
3481 tx.clone(),
3482 Box::new(cbs),
3483 );
3484 let info = make_interface_info(1);
3485 driver.engine.register_interface(info);
3486 let (writer, _sent) = MockWriter::new();
3487 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3488
3489 let mut rng = OsRng;
3490 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3491 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3492 let sig_prv_bytes = sig_prv.private_bytes();
3493 let dest_hash = [0xDD; 16];
3494
3495 tx.send(Event::RegisterLinkDestination {
3496 dest_hash,
3497 sig_prv_bytes,
3498 sig_pub_bytes,
3499 resource_strategy: 0,
3500 }).unwrap();
3501 tx.send(Event::Shutdown).unwrap();
3502 driver.run();
3503
3504 assert!(driver.link_manager.is_link_destination(&dest_hash));
3506 }
3507
3508 #[test]
3509 fn create_link_event() {
3510 let (tx, rx) = event::channel();
3511 let (cbs, _link_established, _, _) = MockCallbacks::with_link_tracking();
3512 let mut driver = Driver::new(
3513 TransportConfig { transport_enabled: false, identity_hash: None },
3514 rx,
3515 tx.clone(),
3516 Box::new(cbs),
3517 );
3518 let info = make_interface_info(1);
3519 driver.engine.register_interface(info);
3520 let (writer, _sent) = MockWriter::new();
3521 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3522
3523 let dest_hash = [0xDD; 16];
3524 let dummy_sig_pub = [0xAA; 32];
3525
3526 let (resp_tx, resp_rx) = mpsc::channel();
3527 tx.send(Event::CreateLink {
3528 dest_hash,
3529 dest_sig_pub_bytes: dummy_sig_pub,
3530 response_tx: resp_tx,
3531 }).unwrap();
3532 tx.send(Event::Shutdown).unwrap();
3533 driver.run();
3534
3535 let link_id = resp_rx.recv().unwrap();
3537 assert_ne!(link_id, [0u8; 16]);
3538
3539 assert_eq!(driver.link_manager.link_count(), 1);
3541
3542 }
3547
3548 #[test]
3549 fn deliver_local_routes_to_link_manager() {
3550 let (tx, rx) = event::channel();
3553 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3554 let mut driver = Driver::new(
3555 TransportConfig { transport_enabled: false, identity_hash: None },
3556 rx,
3557 tx.clone(),
3558 Box::new(cbs),
3559 );
3560 let info = make_interface_info(1);
3561 driver.engine.register_interface(info);
3562 let (writer, _sent) = MockWriter::new();
3563 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3564
3565 let mut rng = OsRng;
3567 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3568 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3569 let dest_hash = [0xEE; 16];
3570 driver.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes, crate::link_manager::ResourceStrategy::AcceptNone);
3571
3572 assert!(driver.link_manager.is_link_destination(&dest_hash));
3576
3577 assert!(!driver.link_manager.is_link_destination(&[0xFF; 16]));
3579
3580 drop(tx);
3581 }
3582
3583 #[test]
3584 fn teardown_link_event() {
3585 let (tx, rx) = event::channel();
3586 let (cbs, _, link_closed, _) = MockCallbacks::with_link_tracking();
3587 let mut driver = Driver::new(
3588 TransportConfig { transport_enabled: false, identity_hash: None },
3589 rx,
3590 tx.clone(),
3591 Box::new(cbs),
3592 );
3593 let info = make_interface_info(1);
3594 driver.engine.register_interface(info);
3595 let (writer, _sent) = MockWriter::new();
3596 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3597
3598 let (resp_tx, resp_rx) = mpsc::channel();
3600 tx.send(Event::CreateLink {
3601 dest_hash: [0xDD; 16],
3602 dest_sig_pub_bytes: [0xAA; 32],
3603 response_tx: resp_tx,
3604 }).unwrap();
3605 tx.send(Event::Shutdown).unwrap();
3610 driver.run();
3611
3612 let link_id = resp_rx.recv().unwrap();
3613 assert_ne!(link_id, [0u8; 16]);
3614 assert_eq!(driver.link_manager.link_count(), 1);
3615
3616 let teardown_actions = driver.link_manager.teardown_link(&link_id);
3618 driver.dispatch_link_actions(teardown_actions);
3619
3620 assert_eq!(link_closed.lock().unwrap().len(), 1);
3622 assert_eq!(link_closed.lock().unwrap()[0], TypedLinkId(link_id));
3623 }
3624
3625 #[test]
3626 fn link_count_includes_link_manager() {
3627 let (tx, rx) = event::channel();
3628 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3629 let mut driver = Driver::new(
3630 TransportConfig { transport_enabled: false, identity_hash: None },
3631 rx,
3632 tx.clone(),
3633 Box::new(cbs),
3634 );
3635 let info = make_interface_info(1);
3636 driver.engine.register_interface(info);
3637 let (writer, _sent) = MockWriter::new();
3638 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3639
3640 let mut rng = OsRng;
3642 let dummy_sig = [0xAA; 32];
3643 driver.link_manager.create_link(&[0xDD; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3644
3645 let (resp_tx, resp_rx) = mpsc::channel();
3647 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3648 tx.send(Event::Shutdown).unwrap();
3649 driver.run();
3650
3651 match resp_rx.recv().unwrap() {
3652 QueryResponse::LinkCount(count) => assert_eq!(count, 1),
3653 _ => panic!("unexpected response"),
3654 }
3655 }
3656
3657 #[test]
3658 fn register_request_handler_event() {
3659 let (tx, rx) = event::channel();
3660 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3661 let mut driver = Driver::new(
3662 TransportConfig { transport_enabled: false, identity_hash: None },
3663 rx,
3664 tx.clone(),
3665 Box::new(cbs),
3666 );
3667
3668 tx.send(Event::RegisterRequestHandler {
3669 path: "/status".to_string(),
3670 allowed_list: None,
3671 handler: Box::new(|_link_id, _path, _data, _remote| Some(b"OK".to_vec())),
3672 }).unwrap();
3673 tx.send(Event::Shutdown).unwrap();
3674 driver.run();
3675
3676 }
3679
3680 #[test]
3683 fn management_announces_emitted_after_delay() {
3684 let (tx, rx) = event::channel();
3685 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
3686 let identity = Identity::new(&mut OsRng);
3687 let identity_hash = *identity.hash();
3688 let mut driver = Driver::new(
3689 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3690 rx,
3691 tx.clone(),
3692 Box::new(cbs),
3693 );
3694
3695 let info = make_interface_info(1);
3697 driver.engine.register_interface(info.clone());
3698 let (writer, sent) = MockWriter::new();
3699 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3700
3701 driver.management_config.enable_remote_management = true;
3703 driver.transport_identity = Some(identity);
3704
3705 driver.started = time::now() - 10.0;
3707
3708 tx.send(Event::Tick).unwrap();
3710 tx.send(Event::Shutdown).unwrap();
3711 driver.run();
3712
3713 let sent_packets = sent.lock().unwrap();
3715 assert!(!sent_packets.is_empty(),
3716 "Management announce should be sent after startup delay");
3717 }
3718
3719 #[test]
3720 fn management_announces_not_emitted_when_disabled() {
3721 let (tx, rx) = event::channel();
3722 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3723 let identity = Identity::new(&mut OsRng);
3724 let identity_hash = *identity.hash();
3725 let mut driver = Driver::new(
3726 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3727 rx,
3728 tx.clone(),
3729 Box::new(cbs),
3730 );
3731
3732 let info = make_interface_info(1);
3733 driver.engine.register_interface(info.clone());
3734 let (writer, sent) = MockWriter::new();
3735 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3736
3737 driver.transport_identity = Some(identity);
3739 driver.started = time::now() - 10.0;
3740
3741 tx.send(Event::Tick).unwrap();
3742 tx.send(Event::Shutdown).unwrap();
3743 driver.run();
3744
3745 let sent_packets = sent.lock().unwrap();
3747 assert!(sent_packets.is_empty(),
3748 "No announces should be sent when management is disabled");
3749 }
3750
3751 #[test]
3752 fn management_announces_not_emitted_before_delay() {
3753 let (tx, rx) = event::channel();
3754 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3755 let identity = Identity::new(&mut OsRng);
3756 let identity_hash = *identity.hash();
3757 let mut driver = Driver::new(
3758 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3759 rx,
3760 tx.clone(),
3761 Box::new(cbs),
3762 );
3763
3764 let info = make_interface_info(1);
3765 driver.engine.register_interface(info.clone());
3766 let (writer, sent) = MockWriter::new();
3767 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3768
3769 driver.management_config.enable_remote_management = true;
3770 driver.transport_identity = Some(identity);
3771 driver.started = time::now();
3773
3774 tx.send(Event::Tick).unwrap();
3775 tx.send(Event::Shutdown).unwrap();
3776 driver.run();
3777
3778 let sent_packets = sent.lock().unwrap();
3779 assert!(sent_packets.is_empty(),
3780 "No announces before startup delay");
3781 }
3782
3783 #[test]
3788 fn announce_received_populates_known_destinations() {
3789 let (tx, rx) = event::channel();
3790 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3791 let mut driver = Driver::new(
3792 TransportConfig { transport_enabled: false, identity_hash: None },
3793 rx,
3794 tx.clone(),
3795 Box::new(cbs),
3796 );
3797 let info = make_interface_info(1);
3798 driver.engine.register_interface(info);
3799 let (writer, _sent) = MockWriter::new();
3800 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3801
3802 let identity = Identity::new(&mut OsRng);
3803 let announce_raw = build_announce_packet(&identity);
3804
3805 let dest_hash = rns_core::destination::destination_hash(
3806 "test", &["app"], Some(identity.hash()),
3807 );
3808
3809 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3810 tx.send(Event::Shutdown).unwrap();
3811 driver.run();
3812
3813 assert!(driver.known_destinations.contains_key(&dest_hash));
3815 let recalled = &driver.known_destinations[&dest_hash];
3816 assert_eq!(recalled.dest_hash.0, dest_hash);
3817 assert_eq!(recalled.identity_hash.0, *identity.hash());
3818 assert_eq!(&recalled.public_key, &identity.get_public_key().unwrap());
3819 assert_eq!(recalled.hops, 1);
3820 }
3821
3822 #[test]
3823 fn query_has_path() {
3824 let (tx, rx) = event::channel();
3825 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3826 let mut driver = Driver::new(
3827 TransportConfig { transport_enabled: false, identity_hash: None },
3828 rx,
3829 tx.clone(),
3830 Box::new(cbs),
3831 );
3832 let info = make_interface_info(1);
3833 driver.engine.register_interface(info);
3834 let (writer, _sent) = MockWriter::new();
3835 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3836
3837 let (resp_tx, resp_rx) = mpsc::channel();
3839 tx.send(Event::Query(QueryRequest::HasPath { dest_hash: [0xAA; 16] }, resp_tx)).unwrap();
3840
3841 let identity = Identity::new(&mut OsRng);
3843 let announce_raw = build_announce_packet(&identity);
3844 let dest_hash = rns_core::destination::destination_hash(
3845 "test", &["app"], Some(identity.hash()),
3846 );
3847 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3848
3849 let (resp_tx2, resp_rx2) = mpsc::channel();
3850 tx.send(Event::Query(QueryRequest::HasPath { dest_hash }, resp_tx2)).unwrap();
3851
3852 tx.send(Event::Shutdown).unwrap();
3853 driver.run();
3854
3855 match resp_rx.recv().unwrap() {
3857 QueryResponse::HasPath(false) => {}
3858 other => panic!("expected HasPath(false), got {:?}", other),
3859 }
3860
3861 match resp_rx2.recv().unwrap() {
3863 QueryResponse::HasPath(true) => {}
3864 other => panic!("expected HasPath(true), got {:?}", other),
3865 }
3866 }
3867
3868 #[test]
3869 fn query_hops_to() {
3870 let (tx, rx) = event::channel();
3871 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3872 let mut driver = Driver::new(
3873 TransportConfig { transport_enabled: false, identity_hash: None },
3874 rx,
3875 tx.clone(),
3876 Box::new(cbs),
3877 );
3878 let info = make_interface_info(1);
3879 driver.engine.register_interface(info);
3880 let (writer, _sent) = MockWriter::new();
3881 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3882
3883 let identity = Identity::new(&mut OsRng);
3885 let announce_raw = build_announce_packet(&identity);
3886 let dest_hash = rns_core::destination::destination_hash(
3887 "test", &["app"], Some(identity.hash()),
3888 );
3889
3890 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3891
3892 let (resp_tx, resp_rx) = mpsc::channel();
3893 tx.send(Event::Query(QueryRequest::HopsTo { dest_hash }, resp_tx)).unwrap();
3894 tx.send(Event::Shutdown).unwrap();
3895 driver.run();
3896
3897 match resp_rx.recv().unwrap() {
3898 QueryResponse::HopsTo(Some(1)) => {}
3899 other => panic!("expected HopsTo(Some(1)), got {:?}", other),
3900 }
3901 }
3902
3903 #[test]
3904 fn query_recall_identity() {
3905 let (tx, rx) = event::channel();
3906 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3907 let mut driver = Driver::new(
3908 TransportConfig { transport_enabled: false, identity_hash: None },
3909 rx,
3910 tx.clone(),
3911 Box::new(cbs),
3912 );
3913 let info = make_interface_info(1);
3914 driver.engine.register_interface(info);
3915 let (writer, _sent) = MockWriter::new();
3916 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3917
3918 let identity = Identity::new(&mut OsRng);
3919 let announce_raw = build_announce_packet(&identity);
3920 let dest_hash = rns_core::destination::destination_hash(
3921 "test", &["app"], Some(identity.hash()),
3922 );
3923
3924 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3925
3926 let (resp_tx, resp_rx) = mpsc::channel();
3928 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash }, resp_tx)).unwrap();
3929
3930 let (resp_tx2, resp_rx2) = mpsc::channel();
3932 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash: [0xFF; 16] }, resp_tx2)).unwrap();
3933
3934 tx.send(Event::Shutdown).unwrap();
3935 driver.run();
3936
3937 match resp_rx.recv().unwrap() {
3938 QueryResponse::RecallIdentity(Some(recalled)) => {
3939 assert_eq!(recalled.dest_hash.0, dest_hash);
3940 assert_eq!(recalled.identity_hash.0, *identity.hash());
3941 assert_eq!(recalled.public_key, identity.get_public_key().unwrap());
3942 assert_eq!(recalled.hops, 1);
3943 }
3944 other => panic!("expected RecallIdentity(Some(..)), got {:?}", other),
3945 }
3946
3947 match resp_rx2.recv().unwrap() {
3948 QueryResponse::RecallIdentity(None) => {}
3949 other => panic!("expected RecallIdentity(None), got {:?}", other),
3950 }
3951 }
3952
3953 #[test]
3954 fn request_path_sends_packet() {
3955 let (tx, rx) = event::channel();
3956 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3957 let mut driver = Driver::new(
3958 TransportConfig { transport_enabled: false, identity_hash: None },
3959 rx,
3960 tx.clone(),
3961 Box::new(cbs),
3962 );
3963 let info = make_interface_info(1);
3964 driver.engine.register_interface(info);
3965 let (writer, sent) = MockWriter::new();
3966 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3967
3968 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
3970 tx.send(Event::Shutdown).unwrap();
3971 driver.run();
3972
3973 let sent_packets = sent.lock().unwrap();
3975 assert!(!sent_packets.is_empty(), "Path request should be sent on wire");
3976
3977 let raw = &sent_packets[0];
3979 let flags = rns_core::packet::PacketFlags::unpack(raw[0] & 0x7F);
3980 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
3981 assert_eq!(flags.destination_type, constants::DESTINATION_PLAIN);
3982 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
3983 }
3984
3985 #[test]
3986 fn request_path_includes_transport_id() {
3987 let (tx, rx) = event::channel();
3988 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3989 let mut driver = Driver::new(
3990 TransportConfig { transport_enabled: true, identity_hash: Some([0xBB; 16]) },
3991 rx,
3992 tx.clone(),
3993 Box::new(cbs),
3994 );
3995 let info = make_interface_info(1);
3996 driver.engine.register_interface(info);
3997 let (writer, sent) = MockWriter::new();
3998 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3999
4000 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
4001 tx.send(Event::Shutdown).unwrap();
4002 driver.run();
4003
4004 let sent_packets = sent.lock().unwrap();
4005 assert!(!sent_packets.is_empty());
4006
4007 let raw = &sent_packets[0];
4009 if let Ok(packet) = RawPacket::unpack(raw) {
4010 assert_eq!(packet.data.len(), 48, "Path request data should be 48 bytes with transport_id");
4012 assert_eq!(&packet.data[..16], &[0xAA; 16], "First 16 bytes should be dest_hash");
4013 assert_eq!(&packet.data[16..32], &[0xBB; 16], "Next 16 bytes should be transport_id");
4014 } else {
4015 panic!("Could not unpack sent packet");
4016 }
4017 }
4018
4019 #[test]
4020 fn path_request_dest_registered() {
4021 let (tx, rx) = event::channel();
4022 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4023 let driver = Driver::new(
4024 TransportConfig { transport_enabled: false, identity_hash: None },
4025 rx,
4026 tx.clone(),
4027 Box::new(cbs),
4028 );
4029
4030 let expected_dest = rns_core::destination::destination_hash(
4032 "rnstransport", &["path", "request"], None,
4033 );
4034 assert_eq!(driver.path_request_dest, expected_dest);
4035
4036 drop(tx);
4037 }
4038
4039 #[test]
4044 fn register_proof_strategy_event() {
4045 let (tx, rx) = event::channel();
4046 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4047 let mut driver = Driver::new(
4048 TransportConfig { transport_enabled: false, identity_hash: None },
4049 rx,
4050 tx.clone(),
4051 Box::new(cbs),
4052 );
4053
4054 let dest = [0xAA; 16];
4055 let identity = Identity::new(&mut OsRng);
4056 let prv_key = identity.get_private_key().unwrap();
4057
4058 tx.send(Event::RegisterProofStrategy {
4059 dest_hash: dest,
4060 strategy: rns_core::types::ProofStrategy::ProveAll,
4061 signing_key: Some(prv_key),
4062 }).unwrap();
4063 tx.send(Event::Shutdown).unwrap();
4064 driver.run();
4065
4066 assert!(driver.proof_strategies.contains_key(&dest));
4067 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
4068 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveAll);
4069 assert!(id_opt.is_some());
4070 }
4071
4072 #[test]
4073 fn register_proof_strategy_prove_none_no_identity() {
4074 let (tx, rx) = event::channel();
4075 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4076 let mut driver = Driver::new(
4077 TransportConfig { transport_enabled: false, identity_hash: None },
4078 rx,
4079 tx.clone(),
4080 Box::new(cbs),
4081 );
4082
4083 let dest = [0xBB; 16];
4084 tx.send(Event::RegisterProofStrategy {
4085 dest_hash: dest,
4086 strategy: rns_core::types::ProofStrategy::ProveNone,
4087 signing_key: None,
4088 }).unwrap();
4089 tx.send(Event::Shutdown).unwrap();
4090 driver.run();
4091
4092 assert!(driver.proof_strategies.contains_key(&dest));
4093 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
4094 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveNone);
4095 assert!(id_opt.is_none());
4096 }
4097
4098 #[test]
4099 fn send_outbound_tracks_sent_packets() {
4100 let (tx, rx) = event::channel();
4101 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4102 let mut driver = Driver::new(
4103 TransportConfig { transport_enabled: false, identity_hash: None },
4104 rx,
4105 tx.clone(),
4106 Box::new(cbs),
4107 );
4108 let info = make_interface_info(1);
4109 driver.engine.register_interface(info);
4110 let (writer, _sent) = MockWriter::new();
4111 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4112
4113 let dest = [0xCC; 16];
4115 let flags = PacketFlags {
4116 header_type: constants::HEADER_1,
4117 context_flag: constants::FLAG_UNSET,
4118 transport_type: constants::TRANSPORT_BROADCAST,
4119 destination_type: constants::DESTINATION_PLAIN,
4120 packet_type: constants::PACKET_TYPE_DATA,
4121 };
4122 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test data").unwrap();
4123 let expected_hash = packet.packet_hash;
4124
4125 tx.send(Event::SendOutbound {
4126 raw: packet.raw,
4127 dest_type: constants::DESTINATION_PLAIN,
4128 attached_interface: None,
4129 }).unwrap();
4130 tx.send(Event::Shutdown).unwrap();
4131 driver.run();
4132
4133 assert!(driver.sent_packets.contains_key(&expected_hash));
4135 let (tracked_dest, _sent_time) = &driver.sent_packets[&expected_hash];
4136 assert_eq!(tracked_dest, &dest);
4137 }
4138
4139 #[test]
4140 fn prove_all_generates_proof_on_delivery() {
4141 let (tx, rx) = event::channel();
4142 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4143 let mut driver = Driver::new(
4144 TransportConfig { transport_enabled: false, identity_hash: None },
4145 rx,
4146 tx.clone(),
4147 Box::new(cbs),
4148 );
4149 let info = make_interface_info(1);
4150 driver.engine.register_interface(info);
4151 let (writer, sent) = MockWriter::new();
4152 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4153
4154 let dest = [0xDD; 16];
4156 let identity = Identity::new(&mut OsRng);
4157 let prv_key = identity.get_private_key().unwrap();
4158 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4159 driver.proof_strategies.insert(dest, (
4160 rns_core::types::ProofStrategy::ProveAll,
4161 Some(Identity::from_private_key(&prv_key)),
4162 ));
4163
4164 let flags = PacketFlags {
4166 header_type: constants::HEADER_1,
4167 context_flag: constants::FLAG_UNSET,
4168 transport_type: constants::TRANSPORT_BROADCAST,
4169 destination_type: constants::DESTINATION_SINGLE,
4170 packet_type: constants::PACKET_TYPE_DATA,
4171 };
4172 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4173
4174 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4175 tx.send(Event::Shutdown).unwrap();
4176 driver.run();
4177
4178 assert_eq!(deliveries.lock().unwrap().len(), 1);
4180
4181 let sent_packets = sent.lock().unwrap();
4183 let has_proof = sent_packets.iter().any(|raw| {
4185 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4186 flags.packet_type == constants::PACKET_TYPE_PROOF
4187 });
4188 assert!(has_proof, "ProveAll should generate a proof packet: sent {} packets", sent_packets.len());
4189 }
4190
4191 #[test]
4192 fn prove_none_does_not_generate_proof() {
4193 let (tx, rx) = event::channel();
4194 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4195 let mut driver = Driver::new(
4196 TransportConfig { transport_enabled: false, identity_hash: None },
4197 rx,
4198 tx.clone(),
4199 Box::new(cbs),
4200 );
4201 let info = make_interface_info(1);
4202 driver.engine.register_interface(info);
4203 let (writer, sent) = MockWriter::new();
4204 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4205
4206 let dest = [0xDD; 16];
4208 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4209 driver.proof_strategies.insert(dest, (
4210 rns_core::types::ProofStrategy::ProveNone,
4211 None,
4212 ));
4213
4214 let flags = PacketFlags {
4216 header_type: constants::HEADER_1,
4217 context_flag: constants::FLAG_UNSET,
4218 transport_type: constants::TRANSPORT_BROADCAST,
4219 destination_type: constants::DESTINATION_SINGLE,
4220 packet_type: constants::PACKET_TYPE_DATA,
4221 };
4222 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4223
4224 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4225 tx.send(Event::Shutdown).unwrap();
4226 driver.run();
4227
4228 assert_eq!(deliveries.lock().unwrap().len(), 1);
4230
4231 let sent_packets = sent.lock().unwrap();
4233 let has_proof = sent_packets.iter().any(|raw| {
4234 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4235 flags.packet_type == constants::PACKET_TYPE_PROOF
4236 });
4237 assert!(!has_proof, "ProveNone should not generate a proof packet");
4238 }
4239
4240 #[test]
4241 fn no_proof_strategy_does_not_generate_proof() {
4242 let (tx, rx) = event::channel();
4243 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4244 let mut driver = Driver::new(
4245 TransportConfig { transport_enabled: false, identity_hash: None },
4246 rx,
4247 tx.clone(),
4248 Box::new(cbs),
4249 );
4250 let info = make_interface_info(1);
4251 driver.engine.register_interface(info);
4252 let (writer, sent) = MockWriter::new();
4253 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4254
4255 let dest = [0xDD; 16];
4257 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4258
4259 let flags = PacketFlags {
4260 header_type: constants::HEADER_1,
4261 context_flag: constants::FLAG_UNSET,
4262 transport_type: constants::TRANSPORT_BROADCAST,
4263 destination_type: constants::DESTINATION_SINGLE,
4264 packet_type: constants::PACKET_TYPE_DATA,
4265 };
4266 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4267
4268 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4269 tx.send(Event::Shutdown).unwrap();
4270 driver.run();
4271
4272 assert_eq!(deliveries.lock().unwrap().len(), 1);
4273
4274 let sent_packets = sent.lock().unwrap();
4275 let has_proof = sent_packets.iter().any(|raw| {
4276 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4277 flags.packet_type == constants::PACKET_TYPE_PROOF
4278 });
4279 assert!(!has_proof, "No proof strategy means no proof generated");
4280 }
4281
4282 #[test]
4283 fn prove_app_calls_callback() {
4284 let (tx, rx) = event::channel();
4285 let proof_requested = Arc::new(Mutex::new(Vec::new()));
4286 let deliveries = Arc::new(Mutex::new(Vec::new()));
4287 let cbs = MockCallbacks {
4288 announces: Arc::new(Mutex::new(Vec::new())),
4289 paths: Arc::new(Mutex::new(Vec::new())),
4290 deliveries: deliveries.clone(),
4291 iface_ups: Arc::new(Mutex::new(Vec::new())),
4292 iface_downs: Arc::new(Mutex::new(Vec::new())),
4293 link_established: Arc::new(Mutex::new(Vec::new())),
4294 link_closed: Arc::new(Mutex::new(Vec::new())),
4295 remote_identified: Arc::new(Mutex::new(Vec::new())),
4296 resources_received: Arc::new(Mutex::new(Vec::new())),
4297 resource_completed: Arc::new(Mutex::new(Vec::new())),
4298 resource_failed: Arc::new(Mutex::new(Vec::new())),
4299 channel_messages: Arc::new(Mutex::new(Vec::new())),
4300 link_data: Arc::new(Mutex::new(Vec::new())),
4301 responses: Arc::new(Mutex::new(Vec::new())),
4302 proofs: Arc::new(Mutex::new(Vec::new())),
4303 proof_requested: proof_requested.clone(),
4304 };
4305
4306 let mut driver = Driver::new(
4307 TransportConfig { transport_enabled: false, identity_hash: None },
4308 rx,
4309 tx.clone(),
4310 Box::new(cbs),
4311 );
4312 let info = make_interface_info(1);
4313 driver.engine.register_interface(info);
4314 let (writer, sent) = MockWriter::new();
4315 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4316
4317 let dest = [0xDD; 16];
4319 let identity = Identity::new(&mut OsRng);
4320 let prv_key = identity.get_private_key().unwrap();
4321 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4322 driver.proof_strategies.insert(dest, (
4323 rns_core::types::ProofStrategy::ProveApp,
4324 Some(Identity::from_private_key(&prv_key)),
4325 ));
4326
4327 let flags = PacketFlags {
4328 header_type: constants::HEADER_1,
4329 context_flag: constants::FLAG_UNSET,
4330 transport_type: constants::TRANSPORT_BROADCAST,
4331 destination_type: constants::DESTINATION_SINGLE,
4332 packet_type: constants::PACKET_TYPE_DATA,
4333 };
4334 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"app test").unwrap();
4335
4336 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4337 tx.send(Event::Shutdown).unwrap();
4338 driver.run();
4339
4340 let prs = proof_requested.lock().unwrap();
4342 assert_eq!(prs.len(), 1);
4343 assert_eq!(prs[0].0, DestHash(dest));
4344
4345 let sent_packets = sent.lock().unwrap();
4347 let has_proof = sent_packets.iter().any(|raw| {
4348 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4349 flags.packet_type == constants::PACKET_TYPE_PROOF
4350 });
4351 assert!(has_proof, "ProveApp (callback returns true) should generate a proof");
4352 }
4353
4354 #[test]
4355 fn inbound_proof_fires_callback() {
4356 let (tx, rx) = event::channel();
4357 let proofs = Arc::new(Mutex::new(Vec::new()));
4358 let cbs = MockCallbacks {
4359 announces: Arc::new(Mutex::new(Vec::new())),
4360 paths: Arc::new(Mutex::new(Vec::new())),
4361 deliveries: Arc::new(Mutex::new(Vec::new())),
4362 iface_ups: Arc::new(Mutex::new(Vec::new())),
4363 iface_downs: Arc::new(Mutex::new(Vec::new())),
4364 link_established: Arc::new(Mutex::new(Vec::new())),
4365 link_closed: Arc::new(Mutex::new(Vec::new())),
4366 remote_identified: Arc::new(Mutex::new(Vec::new())),
4367 resources_received: Arc::new(Mutex::new(Vec::new())),
4368 resource_completed: Arc::new(Mutex::new(Vec::new())),
4369 resource_failed: Arc::new(Mutex::new(Vec::new())),
4370 channel_messages: Arc::new(Mutex::new(Vec::new())),
4371 link_data: Arc::new(Mutex::new(Vec::new())),
4372 responses: Arc::new(Mutex::new(Vec::new())),
4373 proofs: proofs.clone(),
4374 proof_requested: Arc::new(Mutex::new(Vec::new())),
4375 };
4376
4377 let mut driver = Driver::new(
4378 TransportConfig { transport_enabled: false, identity_hash: None },
4379 rx,
4380 tx.clone(),
4381 Box::new(cbs),
4382 );
4383 let info = make_interface_info(1);
4384 driver.engine.register_interface(info);
4385 let (writer, _sent) = MockWriter::new();
4386 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4387
4388 let dest = [0xEE; 16];
4390 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4391
4392 let tracked_hash = [0x42u8; 32];
4394 let sent_time = time::now() - 0.5; driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4396
4397 let mut proof_data = Vec::new();
4399 proof_data.extend_from_slice(&tracked_hash);
4400 proof_data.extend_from_slice(&[0xAA; 64]); let flags = PacketFlags {
4403 header_type: constants::HEADER_1,
4404 context_flag: constants::FLAG_UNSET,
4405 transport_type: constants::TRANSPORT_BROADCAST,
4406 destination_type: constants::DESTINATION_SINGLE,
4407 packet_type: constants::PACKET_TYPE_PROOF,
4408 };
4409 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4410
4411 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4412 tx.send(Event::Shutdown).unwrap();
4413 driver.run();
4414
4415 let proof_list = proofs.lock().unwrap();
4417 assert_eq!(proof_list.len(), 1);
4418 assert_eq!(proof_list[0].0, DestHash(dest));
4419 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4420 assert!(proof_list[0].2 >= 0.4, "RTT should be approximately 0.5s, got {}", proof_list[0].2);
4421
4422 assert!(!driver.sent_packets.contains_key(&tracked_hash));
4424 }
4425
4426 #[test]
4427 fn inbound_proof_for_unknown_packet_is_ignored() {
4428 let (tx, rx) = event::channel();
4429 let proofs = Arc::new(Mutex::new(Vec::new()));
4430 let cbs = MockCallbacks {
4431 announces: Arc::new(Mutex::new(Vec::new())),
4432 paths: Arc::new(Mutex::new(Vec::new())),
4433 deliveries: Arc::new(Mutex::new(Vec::new())),
4434 iface_ups: Arc::new(Mutex::new(Vec::new())),
4435 iface_downs: Arc::new(Mutex::new(Vec::new())),
4436 link_established: Arc::new(Mutex::new(Vec::new())),
4437 link_closed: Arc::new(Mutex::new(Vec::new())),
4438 remote_identified: Arc::new(Mutex::new(Vec::new())),
4439 resources_received: Arc::new(Mutex::new(Vec::new())),
4440 resource_completed: Arc::new(Mutex::new(Vec::new())),
4441 resource_failed: Arc::new(Mutex::new(Vec::new())),
4442 channel_messages: Arc::new(Mutex::new(Vec::new())),
4443 link_data: Arc::new(Mutex::new(Vec::new())),
4444 responses: Arc::new(Mutex::new(Vec::new())),
4445 proofs: proofs.clone(),
4446 proof_requested: Arc::new(Mutex::new(Vec::new())),
4447 };
4448
4449 let mut driver = Driver::new(
4450 TransportConfig { transport_enabled: false, identity_hash: None },
4451 rx,
4452 tx.clone(),
4453 Box::new(cbs),
4454 );
4455 let info = make_interface_info(1);
4456 driver.engine.register_interface(info);
4457 let (writer, _sent) = MockWriter::new();
4458 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4459
4460 let dest = [0xEE; 16];
4461 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4462
4463 let unknown_hash = [0xFF; 32];
4465 let mut proof_data = Vec::new();
4466 proof_data.extend_from_slice(&unknown_hash);
4467 proof_data.extend_from_slice(&[0xAA; 64]);
4468
4469 let flags = PacketFlags {
4470 header_type: constants::HEADER_1,
4471 context_flag: constants::FLAG_UNSET,
4472 transport_type: constants::TRANSPORT_BROADCAST,
4473 destination_type: constants::DESTINATION_SINGLE,
4474 packet_type: constants::PACKET_TYPE_PROOF,
4475 };
4476 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4477
4478 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4479 tx.send(Event::Shutdown).unwrap();
4480 driver.run();
4481
4482 assert!(proofs.lock().unwrap().is_empty());
4484 }
4485
4486 #[test]
4487 fn inbound_proof_with_valid_signature_fires_callback() {
4488 let (tx, rx) = event::channel();
4490 let proofs = Arc::new(Mutex::new(Vec::new()));
4491 let cbs = MockCallbacks {
4492 announces: Arc::new(Mutex::new(Vec::new())),
4493 paths: Arc::new(Mutex::new(Vec::new())),
4494 deliveries: Arc::new(Mutex::new(Vec::new())),
4495 iface_ups: Arc::new(Mutex::new(Vec::new())),
4496 iface_downs: Arc::new(Mutex::new(Vec::new())),
4497 link_established: Arc::new(Mutex::new(Vec::new())),
4498 link_closed: Arc::new(Mutex::new(Vec::new())),
4499 remote_identified: Arc::new(Mutex::new(Vec::new())),
4500 resources_received: Arc::new(Mutex::new(Vec::new())),
4501 resource_completed: Arc::new(Mutex::new(Vec::new())),
4502 resource_failed: Arc::new(Mutex::new(Vec::new())),
4503 channel_messages: Arc::new(Mutex::new(Vec::new())),
4504 link_data: Arc::new(Mutex::new(Vec::new())),
4505 responses: Arc::new(Mutex::new(Vec::new())),
4506 proofs: proofs.clone(),
4507 proof_requested: Arc::new(Mutex::new(Vec::new())),
4508 };
4509
4510 let mut driver = Driver::new(
4511 TransportConfig { transport_enabled: false, identity_hash: None },
4512 rx,
4513 tx.clone(),
4514 Box::new(cbs),
4515 );
4516 let info = make_interface_info(1);
4517 driver.engine.register_interface(info);
4518 let (writer, _sent) = MockWriter::new();
4519 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4520
4521 let dest = [0xEE; 16];
4522 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4523
4524 let identity = Identity::new(&mut OsRng);
4526 let pub_key = identity.get_public_key();
4527 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4528 dest_hash: DestHash(dest),
4529 identity_hash: IdentityHash(*identity.hash()),
4530 public_key: pub_key.unwrap(),
4531 app_data: None,
4532 hops: 0,
4533 received_at: time::now(),
4534 });
4535
4536 let tracked_hash = [0x42u8; 32];
4538 let sent_time = time::now() - 0.5;
4539 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4540
4541 let signature = identity.sign(&tracked_hash).unwrap();
4542 let mut proof_data = Vec::new();
4543 proof_data.extend_from_slice(&tracked_hash);
4544 proof_data.extend_from_slice(&signature);
4545
4546 let flags = PacketFlags {
4547 header_type: constants::HEADER_1,
4548 context_flag: constants::FLAG_UNSET,
4549 transport_type: constants::TRANSPORT_BROADCAST,
4550 destination_type: constants::DESTINATION_SINGLE,
4551 packet_type: constants::PACKET_TYPE_PROOF,
4552 };
4553 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4554
4555 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4556 tx.send(Event::Shutdown).unwrap();
4557 driver.run();
4558
4559 let proof_list = proofs.lock().unwrap();
4561 assert_eq!(proof_list.len(), 1);
4562 assert_eq!(proof_list[0].0, DestHash(dest));
4563 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4564 }
4565
4566 #[test]
4567 fn inbound_proof_with_invalid_signature_rejected() {
4568 let (tx, rx) = event::channel();
4570 let proofs = Arc::new(Mutex::new(Vec::new()));
4571 let cbs = MockCallbacks {
4572 announces: Arc::new(Mutex::new(Vec::new())),
4573 paths: Arc::new(Mutex::new(Vec::new())),
4574 deliveries: Arc::new(Mutex::new(Vec::new())),
4575 iface_ups: Arc::new(Mutex::new(Vec::new())),
4576 iface_downs: Arc::new(Mutex::new(Vec::new())),
4577 link_established: Arc::new(Mutex::new(Vec::new())),
4578 link_closed: Arc::new(Mutex::new(Vec::new())),
4579 remote_identified: Arc::new(Mutex::new(Vec::new())),
4580 resources_received: Arc::new(Mutex::new(Vec::new())),
4581 resource_completed: Arc::new(Mutex::new(Vec::new())),
4582 resource_failed: Arc::new(Mutex::new(Vec::new())),
4583 channel_messages: Arc::new(Mutex::new(Vec::new())),
4584 link_data: Arc::new(Mutex::new(Vec::new())),
4585 responses: Arc::new(Mutex::new(Vec::new())),
4586 proofs: proofs.clone(),
4587 proof_requested: Arc::new(Mutex::new(Vec::new())),
4588 };
4589
4590 let mut driver = Driver::new(
4591 TransportConfig { transport_enabled: false, identity_hash: None },
4592 rx,
4593 tx.clone(),
4594 Box::new(cbs),
4595 );
4596 let info = make_interface_info(1);
4597 driver.engine.register_interface(info);
4598 let (writer, _sent) = MockWriter::new();
4599 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4600
4601 let dest = [0xEE; 16];
4602 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4603
4604 let identity = Identity::new(&mut OsRng);
4606 let pub_key = identity.get_public_key();
4607 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4608 dest_hash: DestHash(dest),
4609 identity_hash: IdentityHash(*identity.hash()),
4610 public_key: pub_key.unwrap(),
4611 app_data: None,
4612 hops: 0,
4613 received_at: time::now(),
4614 });
4615
4616 let tracked_hash = [0x42u8; 32];
4618 let sent_time = time::now() - 0.5;
4619 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4620
4621 let mut proof_data = Vec::new();
4623 proof_data.extend_from_slice(&tracked_hash);
4624 proof_data.extend_from_slice(&[0xAA; 64]);
4625
4626 let flags = PacketFlags {
4627 header_type: constants::HEADER_1,
4628 context_flag: constants::FLAG_UNSET,
4629 transport_type: constants::TRANSPORT_BROADCAST,
4630 destination_type: constants::DESTINATION_SINGLE,
4631 packet_type: constants::PACKET_TYPE_PROOF,
4632 };
4633 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4634
4635 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4636 tx.send(Event::Shutdown).unwrap();
4637 driver.run();
4638
4639 assert!(proofs.lock().unwrap().is_empty());
4641 }
4642
4643 #[test]
4644 fn proof_data_is_valid_explicit_proof() {
4645 let (tx, rx) = event::channel();
4647 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4648 let mut driver = Driver::new(
4649 TransportConfig { transport_enabled: false, identity_hash: None },
4650 rx,
4651 tx.clone(),
4652 Box::new(cbs),
4653 );
4654 let info = make_interface_info(1);
4655 driver.engine.register_interface(info);
4656 let (writer, sent) = MockWriter::new();
4657 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4658
4659 let dest = [0xDD; 16];
4660 let identity = Identity::new(&mut OsRng);
4661 let prv_key = identity.get_private_key().unwrap();
4662 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4663 driver.proof_strategies.insert(dest, (
4664 rns_core::types::ProofStrategy::ProveAll,
4665 Some(Identity::from_private_key(&prv_key)),
4666 ));
4667
4668 let flags = PacketFlags {
4669 header_type: constants::HEADER_1,
4670 context_flag: constants::FLAG_UNSET,
4671 transport_type: constants::TRANSPORT_BROADCAST,
4672 destination_type: constants::DESTINATION_SINGLE,
4673 packet_type: constants::PACKET_TYPE_DATA,
4674 };
4675 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"verify me").unwrap();
4676 let data_packet_hash = data_packet.packet_hash;
4677
4678 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
4679 tx.send(Event::Shutdown).unwrap();
4680 driver.run();
4681
4682 let sent_packets = sent.lock().unwrap();
4684 let proof_raw = sent_packets.iter().find(|raw| {
4685 let f = PacketFlags::unpack(raw[0] & 0x7F);
4686 f.packet_type == constants::PACKET_TYPE_PROOF
4687 });
4688 assert!(proof_raw.is_some(), "Should have sent a proof");
4689
4690 let proof_packet = RawPacket::unpack(proof_raw.unwrap()).unwrap();
4691 assert_eq!(proof_packet.data.len(), 96, "Explicit proof should be 96 bytes");
4693
4694 let result = rns_core::receipt::validate_proof(
4696 &proof_packet.data,
4697 &data_packet_hash,
4698 &Identity::from_private_key(&prv_key), );
4700 assert_eq!(result, rns_core::receipt::ProofResult::Valid);
4701 }
4702
4703 #[test]
4704 fn query_local_destinations_empty() {
4705 let (tx, rx) = event::channel();
4706 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4707 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4708 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4709
4710 let (resp_tx, resp_rx) = mpsc::channel();
4711 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4712 tx.send(Event::Shutdown).unwrap();
4713 driver.run();
4714
4715 match resp_rx.recv().unwrap() {
4716 QueryResponse::LocalDestinations(entries) => {
4717 assert_eq!(entries.len(), 2);
4719 for entry in &entries {
4720 assert_eq!(entry.dest_type, rns_core::constants::DESTINATION_PLAIN);
4721 }
4722 }
4723 other => panic!("expected LocalDestinations, got {:?}", other),
4724 }
4725 }
4726
4727 #[test]
4728 fn query_local_destinations_with_registered() {
4729 let (tx, rx) = event::channel();
4730 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4731 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4732 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4733
4734 let dest_hash = [0xAA; 16];
4735 tx.send(Event::RegisterDestination {
4736 dest_hash,
4737 dest_type: rns_core::constants::DESTINATION_SINGLE,
4738 }).unwrap();
4739
4740 let (resp_tx, resp_rx) = mpsc::channel();
4741 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4742 tx.send(Event::Shutdown).unwrap();
4743 driver.run();
4744
4745 match resp_rx.recv().unwrap() {
4746 QueryResponse::LocalDestinations(entries) => {
4747 assert_eq!(entries.len(), 3);
4749 assert!(entries.iter().any(|e| e.hash == dest_hash
4750 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4751 }
4752 other => panic!("expected LocalDestinations, got {:?}", other),
4753 }
4754 }
4755
4756 #[test]
4757 fn query_local_destinations_tracks_link_dest() {
4758 let (tx, rx) = event::channel();
4759 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4760 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4761 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4762
4763 let dest_hash = [0xBB; 16];
4764 tx.send(Event::RegisterLinkDestination {
4765 dest_hash,
4766 sig_prv_bytes: [0x11; 32],
4767 sig_pub_bytes: [0x22; 32],
4768 resource_strategy: 0,
4769 }).unwrap();
4770
4771 let (resp_tx, resp_rx) = mpsc::channel();
4772 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4773 tx.send(Event::Shutdown).unwrap();
4774 driver.run();
4775
4776 match resp_rx.recv().unwrap() {
4777 QueryResponse::LocalDestinations(entries) => {
4778 assert_eq!(entries.len(), 3);
4780 assert!(entries.iter().any(|e| e.hash == dest_hash
4781 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4782 }
4783 other => panic!("expected LocalDestinations, got {:?}", other),
4784 }
4785 }
4786
4787 #[test]
4788 fn query_links_empty() {
4789 let (tx, rx) = event::channel();
4790 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4791 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4792 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4793
4794 let (resp_tx, resp_rx) = mpsc::channel();
4795 tx.send(Event::Query(QueryRequest::Links, resp_tx)).unwrap();
4796 tx.send(Event::Shutdown).unwrap();
4797 driver.run();
4798
4799 match resp_rx.recv().unwrap() {
4800 QueryResponse::Links(entries) => {
4801 assert!(entries.is_empty());
4802 }
4803 other => panic!("expected Links, got {:?}", other),
4804 }
4805 }
4806
4807 #[test]
4808 fn query_resources_empty() {
4809 let (tx, rx) = event::channel();
4810 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4811 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4812 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4813
4814 let (resp_tx, resp_rx) = mpsc::channel();
4815 tx.send(Event::Query(QueryRequest::Resources, resp_tx)).unwrap();
4816 tx.send(Event::Shutdown).unwrap();
4817 driver.run();
4818
4819 match resp_rx.recv().unwrap() {
4820 QueryResponse::Resources(entries) => {
4821 assert!(entries.is_empty());
4822 }
4823 other => panic!("expected Resources, got {:?}", other),
4824 }
4825 }
4826
4827 #[test]
4828 fn infer_interface_type_from_name() {
4829 assert_eq!(
4830 super::infer_interface_type("TCPServerInterface/Client-1234"),
4831 "TCPServerClientInterface"
4832 );
4833 assert_eq!(
4834 super::infer_interface_type("BackboneInterface/5"),
4835 "BackboneInterface"
4836 );
4837 assert_eq!(
4838 super::infer_interface_type("LocalInterface"),
4839 "LocalServerClientInterface"
4840 );
4841 assert_eq!(
4842 super::infer_interface_type("MyAutoGroup:fe80::1"),
4843 "AutoInterface"
4844 );
4845 }
4846
4847 #[test]
4850 fn test_extract_dest_hash_empty() {
4851 assert_eq!(super::extract_dest_hash(&[]), [0u8; 16]);
4852 }
4853
4854 #[test]
4859 fn send_probe_unknown_dest_returns_none() {
4860 let (tx, rx) = event::channel();
4861 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4862 let mut driver = Driver::new(
4863 TransportConfig { transport_enabled: false, identity_hash: None },
4864 rx,
4865 tx.clone(),
4866 Box::new(cbs),
4867 );
4868 let info = make_interface_info(1);
4869 driver.engine.register_interface(info);
4870 let (writer, _sent) = MockWriter::new();
4871 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4872
4873 let (resp_tx, resp_rx) = mpsc::channel();
4875 tx.send(Event::Query(QueryRequest::SendProbe {
4876 dest_hash: [0xAA; 16],
4877 payload_size: 16,
4878 }, resp_tx)).unwrap();
4879 tx.send(Event::Shutdown).unwrap();
4880 driver.run();
4881
4882 match resp_rx.recv().unwrap() {
4883 QueryResponse::SendProbe(None) => {}
4884 other => panic!("expected SendProbe(None), got {:?}", other),
4885 }
4886 }
4887
4888 #[test]
4889 fn send_probe_known_dest_returns_packet_hash() {
4890 let (tx, rx) = event::channel();
4891 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4892 let mut driver = Driver::new(
4893 TransportConfig { transport_enabled: false, identity_hash: None },
4894 rx,
4895 tx.clone(),
4896 Box::new(cbs),
4897 );
4898 let info = make_interface_info(1);
4899 driver.engine.register_interface(info);
4900 let (writer, sent) = MockWriter::new();
4901 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4902
4903 let remote_identity = Identity::new(&mut OsRng);
4905 let dest_hash = rns_core::destination::destination_hash(
4906 "rnstransport", &["probe"], Some(remote_identity.hash()),
4907 );
4908
4909 let (inject_tx, inject_rx) = mpsc::channel();
4911 tx.send(Event::Query(QueryRequest::InjectIdentity {
4912 dest_hash,
4913 identity_hash: *remote_identity.hash(),
4914 public_key: remote_identity.get_public_key().unwrap(),
4915 app_data: None,
4916 hops: 1,
4917 received_at: 0.0,
4918 }, inject_tx)).unwrap();
4919
4920 let (resp_tx, resp_rx) = mpsc::channel();
4922 tx.send(Event::Query(QueryRequest::SendProbe {
4923 dest_hash,
4924 payload_size: 16,
4925 }, resp_tx)).unwrap();
4926 tx.send(Event::Shutdown).unwrap();
4927 driver.run();
4928
4929 match inject_rx.recv().unwrap() {
4931 QueryResponse::InjectIdentity(true) => {}
4932 other => panic!("expected InjectIdentity(true), got {:?}", other),
4933 }
4934
4935 match resp_rx.recv().unwrap() {
4937 QueryResponse::SendProbe(Some((packet_hash, _hops))) => {
4938 assert_ne!(packet_hash, [0u8; 32]);
4940 assert!(driver.sent_packets.contains_key(&packet_hash));
4942 let sent_data = sent.lock().unwrap();
4944 assert!(!sent_data.is_empty(), "Probe packet should be sent on wire");
4945 let raw = &sent_data[0];
4947 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4948 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
4949 assert_eq!(flags.destination_type, constants::DESTINATION_SINGLE);
4950 }
4951 other => panic!("expected SendProbe(Some(..)), got {:?}", other),
4952 }
4953 }
4954
4955 #[test]
4956 fn check_proof_not_found_returns_none() {
4957 let (tx, rx) = event::channel();
4958 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4959 let mut driver = Driver::new(
4960 TransportConfig { transport_enabled: false, identity_hash: None },
4961 rx,
4962 tx.clone(),
4963 Box::new(cbs),
4964 );
4965
4966 let (resp_tx, resp_rx) = mpsc::channel();
4967 tx.send(Event::Query(QueryRequest::CheckProof {
4968 packet_hash: [0xBB; 32],
4969 }, resp_tx)).unwrap();
4970 tx.send(Event::Shutdown).unwrap();
4971 driver.run();
4972
4973 match resp_rx.recv().unwrap() {
4974 QueryResponse::CheckProof(None) => {}
4975 other => panic!("expected CheckProof(None), got {:?}", other),
4976 }
4977 }
4978
4979 #[test]
4980 fn check_proof_found_returns_rtt() {
4981 let (tx, rx) = event::channel();
4982 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4983 let mut driver = Driver::new(
4984 TransportConfig { transport_enabled: false, identity_hash: None },
4985 rx,
4986 tx.clone(),
4987 Box::new(cbs),
4988 );
4989
4990 let packet_hash = [0xCC; 32];
4992 driver.completed_proofs.insert(packet_hash, (0.123, time::now()));
4993
4994 let (resp_tx, resp_rx) = mpsc::channel();
4995 tx.send(Event::Query(QueryRequest::CheckProof {
4996 packet_hash,
4997 }, resp_tx)).unwrap();
4998 tx.send(Event::Shutdown).unwrap();
4999 driver.run();
5000
5001 match resp_rx.recv().unwrap() {
5002 QueryResponse::CheckProof(Some(rtt)) => {
5003 assert!((rtt - 0.123).abs() < 0.001, "RTT should be ~0.123, got {}", rtt);
5004 }
5005 other => panic!("expected CheckProof(Some(..)), got {:?}", other),
5006 }
5007 assert!(!driver.completed_proofs.contains_key(&packet_hash));
5009 }
5010
5011 #[test]
5012 fn inbound_proof_populates_completed_proofs() {
5013 let (tx, rx) = event::channel();
5014 let proofs = Arc::new(Mutex::new(Vec::new()));
5015 let cbs = MockCallbacks {
5016 announces: Arc::new(Mutex::new(Vec::new())),
5017 paths: Arc::new(Mutex::new(Vec::new())),
5018 deliveries: Arc::new(Mutex::new(Vec::new())),
5019 iface_ups: Arc::new(Mutex::new(Vec::new())),
5020 iface_downs: Arc::new(Mutex::new(Vec::new())),
5021 link_established: Arc::new(Mutex::new(Vec::new())),
5022 link_closed: Arc::new(Mutex::new(Vec::new())),
5023 remote_identified: Arc::new(Mutex::new(Vec::new())),
5024 resources_received: Arc::new(Mutex::new(Vec::new())),
5025 resource_completed: Arc::new(Mutex::new(Vec::new())),
5026 resource_failed: Arc::new(Mutex::new(Vec::new())),
5027 channel_messages: Arc::new(Mutex::new(Vec::new())),
5028 link_data: Arc::new(Mutex::new(Vec::new())),
5029 responses: Arc::new(Mutex::new(Vec::new())),
5030 proofs: proofs.clone(),
5031 proof_requested: Arc::new(Mutex::new(Vec::new())),
5032 };
5033
5034 let mut driver = Driver::new(
5035 TransportConfig { transport_enabled: false, identity_hash: None },
5036 rx,
5037 tx.clone(),
5038 Box::new(cbs),
5039 );
5040 let info = make_interface_info(1);
5041 driver.engine.register_interface(info);
5042 let (writer, sent) = MockWriter::new();
5043 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5044
5045 let dest = [0xDD; 16];
5047 let identity = Identity::new(&mut OsRng);
5048 let prv_key = identity.get_private_key().unwrap();
5049 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
5050 driver.proof_strategies.insert(dest, (
5051 rns_core::types::ProofStrategy::ProveAll,
5052 Some(Identity::from_private_key(&prv_key)),
5053 ));
5054
5055 let flags = PacketFlags {
5057 header_type: constants::HEADER_1,
5058 context_flag: constants::FLAG_UNSET,
5059 transport_type: constants::TRANSPORT_BROADCAST,
5060 destination_type: constants::DESTINATION_SINGLE,
5061 packet_type: constants::PACKET_TYPE_DATA,
5062 };
5063 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"probe data").unwrap();
5064 let data_packet_hash = data_packet.packet_hash;
5065
5066 driver.sent_packets.insert(data_packet_hash, (dest, time::now()));
5068
5069 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
5071 tx.send(Event::Shutdown).unwrap();
5072 driver.run();
5073
5074 let sent_packets = sent.lock().unwrap();
5076 let proof_packets: Vec<_> = sent_packets.iter().filter(|raw| {
5077 let flags = PacketFlags::unpack(raw[0] & 0x7F);
5078 flags.packet_type == constants::PACKET_TYPE_PROOF
5079 }).collect();
5080 assert!(!proof_packets.is_empty(), "Should have sent a proof packet");
5081
5082 let proof_raw = proof_packets[0].clone();
5093 drop(sent_packets); let (tx2, rx2) = event::channel();
5097 let proofs2 = Arc::new(Mutex::new(Vec::new()));
5098 let cbs2 = MockCallbacks {
5099 announces: Arc::new(Mutex::new(Vec::new())),
5100 paths: Arc::new(Mutex::new(Vec::new())),
5101 deliveries: Arc::new(Mutex::new(Vec::new())),
5102 iface_ups: Arc::new(Mutex::new(Vec::new())),
5103 iface_downs: Arc::new(Mutex::new(Vec::new())),
5104 link_established: Arc::new(Mutex::new(Vec::new())),
5105 link_closed: Arc::new(Mutex::new(Vec::new())),
5106 remote_identified: Arc::new(Mutex::new(Vec::new())),
5107 resources_received: Arc::new(Mutex::new(Vec::new())),
5108 resource_completed: Arc::new(Mutex::new(Vec::new())),
5109 resource_failed: Arc::new(Mutex::new(Vec::new())),
5110 channel_messages: Arc::new(Mutex::new(Vec::new())),
5111 link_data: Arc::new(Mutex::new(Vec::new())),
5112 responses: Arc::new(Mutex::new(Vec::new())),
5113 proofs: proofs2.clone(),
5114 proof_requested: Arc::new(Mutex::new(Vec::new())),
5115 };
5116 let mut driver2 = Driver::new(
5117 TransportConfig { transport_enabled: false, identity_hash: None },
5118 rx2,
5119 tx2.clone(),
5120 Box::new(cbs2),
5121 );
5122 let info2 = make_interface_info(1);
5123 driver2.engine.register_interface(info2);
5124 let (writer2, _sent2) = MockWriter::new();
5125 driver2.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer2), true));
5126
5127 driver2.sent_packets.insert(data_packet_hash, (dest, time::now()));
5129
5130 tx2.send(Event::Frame { interface_id: InterfaceId(1), data: proof_raw }).unwrap();
5132 tx2.send(Event::Shutdown).unwrap();
5133 driver2.run();
5134
5135 let proof_events = proofs2.lock().unwrap();
5137 assert_eq!(proof_events.len(), 1, "on_proof callback should fire once");
5138 assert_eq!(proof_events[0].1.0, data_packet_hash, "proof should match original packet hash");
5139 assert!(proof_events[0].2 >= 0.0, "RTT should be non-negative");
5140
5141 assert!(driver2.completed_proofs.contains_key(&data_packet_hash),
5143 "completed_proofs should contain the packet hash");
5144 let (rtt, _received) = driver2.completed_proofs[&data_packet_hash];
5145 assert!(rtt >= 0.0, "RTT should be non-negative");
5146 }
5147
5148 #[test]
5149 fn interface_stats_includes_probe_responder() {
5150 let (tx, rx) = event::channel();
5151 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5152 let mut driver = Driver::new(
5153 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
5154 rx,
5155 tx.clone(),
5156 Box::new(cbs),
5157 );
5158 let (writer, _sent) = MockWriter::new();
5159 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5160
5161 driver.probe_responder_hash = Some([0xEE; 16]);
5163
5164 let (resp_tx, resp_rx) = mpsc::channel();
5165 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
5166 tx.send(Event::Shutdown).unwrap();
5167 driver.run();
5168
5169 match resp_rx.recv().unwrap() {
5170 QueryResponse::InterfaceStats(stats) => {
5171 assert_eq!(stats.probe_responder, Some([0xEE; 16]));
5172 }
5173 other => panic!("expected InterfaceStats, got {:?}", other),
5174 }
5175 }
5176
5177 #[test]
5178 fn interface_stats_probe_responder_none_when_disabled() {
5179 let (tx, rx) = event::channel();
5180 let (cbs, _, _, _, _, _) = MockCallbacks::new();
5181 let mut driver = Driver::new(
5182 TransportConfig { transport_enabled: false, identity_hash: None },
5183 rx,
5184 tx.clone(),
5185 Box::new(cbs),
5186 );
5187 let (writer, _sent) = MockWriter::new();
5188 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
5189
5190 let (resp_tx, resp_rx) = mpsc::channel();
5191 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
5192 tx.send(Event::Shutdown).unwrap();
5193 driver.run();
5194
5195 match resp_rx.recv().unwrap() {
5196 QueryResponse::InterfaceStats(stats) => {
5197 assert_eq!(stats.probe_responder, None);
5198 }
5199 other => panic!("expected InterfaceStats, got {:?}", other),
5200 }
5201 }
5202
5203 #[test]
5204 fn test_extract_dest_hash_too_short() {
5205 assert_eq!(super::extract_dest_hash(&[0x00, 0x00, 0xAA]), [0u8; 16]);
5207 }
5208
5209 #[test]
5210 fn test_extract_dest_hash_header1() {
5211 let mut raw = vec![0x00, 0x00]; let dest = [0x11; 16];
5214 raw.extend_from_slice(&dest);
5215 raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
5217 }
5218
5219 #[test]
5220 fn test_extract_dest_hash_header2() {
5221 let mut raw = vec![0x40, 0x00]; raw.extend_from_slice(&[0xAA; 16]); let dest = [0x22; 16];
5225 raw.extend_from_slice(&dest); raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
5228 }
5229
5230 #[test]
5231 fn test_extract_dest_hash_header2_too_short() {
5232 let mut raw = vec![0x40, 0x00];
5234 raw.extend_from_slice(&[0xAA; 16]); assert_eq!(super::extract_dest_hash(&raw), [0u8; 16]);
5236 }
5237
5238 #[test]
5239 fn test_extract_dest_hash_other_flags_preserved() {
5240 let mut raw = vec![0x3F, 0x00];
5243 let dest = [0x33; 16];
5244 raw.extend_from_slice(&dest);
5245 raw.extend_from_slice(&[0xFF; 10]);
5246 assert_eq!(super::extract_dest_hash(&raw), dest);
5247
5248 let mut raw2 = vec![0xFF, 0x00];
5250 raw2.extend_from_slice(&[0xBB; 16]); let dest2 = [0x44; 16];
5252 raw2.extend_from_slice(&dest2);
5253 raw2.extend_from_slice(&[0xFF; 10]);
5254 assert_eq!(super::extract_dest_hash(&raw2), dest2);
5255 }
5256}