1use std::collections::HashMap;
4
5use rns_core::packet::RawPacket;
6use rns_core::transport::tables::PathEntry;
7use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
8use rns_core::transport::TransportEngine;
9use rns_crypto::{OsRng, Rng};
10
11#[cfg(feature = "rns-hooks")]
12use rns_hooks::{
13 create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot,
14};
15
16use crate::event::{
17 BlackholeInfo, Event, EventReceiver, InterfaceStatsResponse,
18 LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest, QueryResponse,
19 RateTableEntry, SingleInterfaceStat,
20};
21use crate::ifac;
22use crate::interface::{InterfaceEntry, InterfaceStats};
23use crate::link_manager::{LinkManager, LinkManagerAction};
24use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
25use crate::time;
26
27#[cfg(feature = "rns-hooks")]
29struct EngineRef<'a> {
30 engine: &'a TransportEngine,
31 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
32 link_manager: &'a LinkManager,
33 now: f64,
34}
35
36#[cfg(feature = "rns-hooks")]
37impl<'a> EngineAccess for EngineRef<'a> {
38 fn has_path(&self, dest: &[u8; 16]) -> bool {
39 self.engine.has_path(dest)
40 }
41 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
42 self.engine.hops_to(dest)
43 }
44 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
45 self.engine.next_hop(dest)
46 }
47 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
48 self.engine.is_blackholed(identity, self.now)
49 }
50 fn interface_name(&self, id: u64) -> Option<String> {
51 self.interfaces
52 .get(&InterfaceId(id))
53 .map(|e| e.info.name.clone())
54 }
55 fn interface_mode(&self, id: u64) -> Option<u8> {
56 self.interfaces
57 .get(&InterfaceId(id))
58 .map(|e| e.info.mode)
59 }
60 fn identity_hash(&self) -> Option<[u8; 16]> {
61 self.engine.identity_hash().copied()
62 }
63 fn announce_rate(&self, id: u64) -> Option<i32> {
64 self.interfaces
65 .get(&InterfaceId(id))
66 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
67 }
68 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
69 use rns_core::link::types::LinkState;
70 self.link_manager.link_state(link_hash).map(|s| match s {
71 LinkState::Pending => 0,
72 LinkState::Handshake => 1,
73 LinkState::Active => 2,
74 LinkState::Stale => 3,
75 LinkState::Closed => 4,
76 })
77 }
78}
79
80#[cfg(any(test, feature = "rns-hooks"))]
85fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
86 let mut dest = [0u8; 16];
87 if raw.is_empty() {
88 return dest;
89 }
90 let is_header2 = raw[0] & 0x40 != 0;
91 let start = if is_header2 { 18 } else { 2 };
92 let end = start + 16;
93 if raw.len() >= end {
94 dest.copy_from_slice(&raw[start..end]);
95 }
96 dest
97}
98
99#[cfg(feature = "rns-hooks")]
101fn run_hook_inner(
102 programs: &mut [rns_hooks::LoadedProgram],
103 hook_manager: &Option<HookManager>,
104 engine_access: &dyn EngineAccess,
105 ctx: &HookContext,
106 now: f64,
107) -> Option<rns_hooks::ExecuteResult> {
108 if programs.is_empty() {
109 return None;
110 }
111 let mgr = hook_manager.as_ref()?;
112 mgr.run_chain(programs, ctx, engine_access, now)
113}
114
115#[cfg(feature = "rns-hooks")]
117fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
118 actions
119 .into_iter()
120 .map(|a| {
121 use rns_hooks::ActionWire;
122 match a {
123 ActionWire::SendOnInterface { interface, raw } => {
124 TransportAction::SendOnInterface {
125 interface: InterfaceId(interface),
126 raw,
127 }
128 }
129 ActionWire::BroadcastOnAllInterfaces { raw, exclude, has_exclude } => {
130 TransportAction::BroadcastOnAllInterfaces {
131 raw,
132 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
133 }
134 }
135 ActionWire::DeliverLocal { destination_hash, raw, packet_hash, receiving_interface } => {
136 TransportAction::DeliverLocal {
137 destination_hash,
138 raw,
139 packet_hash,
140 receiving_interface: InterfaceId(receiving_interface),
141 }
142 }
143 ActionWire::PathUpdated { destination_hash, hops, next_hop, interface } => {
144 TransportAction::PathUpdated {
145 destination_hash,
146 hops,
147 next_hop,
148 interface: InterfaceId(interface),
149 }
150 }
151 ActionWire::CacheAnnounce { packet_hash, raw } => {
152 TransportAction::CacheAnnounce { packet_hash, raw }
153 }
154 ActionWire::TunnelEstablished { tunnel_id, interface } => {
155 TransportAction::TunnelEstablished {
156 tunnel_id,
157 interface: InterfaceId(interface),
158 }
159 }
160 ActionWire::TunnelSynthesize { interface, data, dest_hash } => {
161 TransportAction::TunnelSynthesize {
162 interface: InterfaceId(interface),
163 data,
164 dest_hash,
165 }
166 }
167 ActionWire::ForwardToLocalClients { raw, exclude, has_exclude } => {
168 TransportAction::ForwardToLocalClients {
169 raw,
170 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
171 }
172 }
173 ActionWire::ForwardPlainBroadcast { raw, to_local, exclude, has_exclude } => {
174 TransportAction::ForwardPlainBroadcast {
175 raw,
176 to_local: to_local != 0,
177 exclude: if has_exclude != 0 { Some(InterfaceId(exclude)) } else { None },
178 }
179 }
180 ActionWire::AnnounceReceived {
181 destination_hash, identity_hash, public_key,
182 name_hash, random_hash, app_data, hops, receiving_interface,
183 } => {
184 TransportAction::AnnounceReceived {
185 destination_hash,
186 identity_hash,
187 public_key,
188 name_hash,
189 random_hash,
190 app_data,
191 hops,
192 receiving_interface: InterfaceId(receiving_interface),
193 }
194 }
195 }
196 })
197 .collect()
198}
199
200fn infer_interface_type(name: &str) -> String {
204 if name.starts_with("TCPServerInterface") {
205 "TCPServerClientInterface".to_string()
206 } else if name.starts_with("BackboneInterface") {
207 "BackboneInterface".to_string()
208 } else if name.starts_with("LocalInterface") {
209 "LocalServerClientInterface".to_string()
210 } else {
211 "AutoInterface".to_string()
214 }
215}
216
217pub trait Callbacks: Send {
222 fn on_announce(
223 &mut self,
224 announced: crate::destination::AnnouncedIdentity,
225 );
226
227 fn on_path_updated(&mut self, dest_hash: rns_core::types::DestHash, hops: u8);
228
229 fn on_local_delivery(&mut self, dest_hash: rns_core::types::DestHash, raw: Vec<u8>, packet_hash: rns_core::types::PacketHash);
230
231 fn on_interface_up(&mut self, _id: InterfaceId) {}
233
234 fn on_interface_down(&mut self, _id: InterfaceId) {}
236
237 fn on_link_established(&mut self, _link_id: rns_core::types::LinkId, _dest_hash: rns_core::types::DestHash, _rtt: f64, _is_initiator: bool) {}
239
240 fn on_link_closed(&mut self, _link_id: rns_core::types::LinkId, _reason: Option<rns_core::link::TeardownReason>) {}
242
243 fn on_remote_identified(&mut self, _link_id: rns_core::types::LinkId, _identity_hash: rns_core::types::IdentityHash, _public_key: [u8; 64]) {}
245
246 fn on_resource_received(&mut self, _link_id: rns_core::types::LinkId, _data: Vec<u8>, _metadata: Option<Vec<u8>>) {}
248
249 fn on_resource_completed(&mut self, _link_id: rns_core::types::LinkId) {}
251
252 fn on_resource_failed(&mut self, _link_id: rns_core::types::LinkId, _error: String) {}
254
255 fn on_resource_progress(&mut self, _link_id: rns_core::types::LinkId, _received: usize, _total: usize) {}
257
258 fn on_resource_accept_query(&mut self, _link_id: rns_core::types::LinkId, _resource_hash: Vec<u8>, _transfer_size: u64, _has_metadata: bool) -> bool {
261 false
262 }
263
264 fn on_channel_message(&mut self, _link_id: rns_core::types::LinkId, _msgtype: u16, _payload: Vec<u8>) {}
266
267 fn on_link_data(&mut self, _link_id: rns_core::types::LinkId, _context: u8, _data: Vec<u8>) {}
269
270 fn on_response(&mut self, _link_id: rns_core::types::LinkId, _request_id: [u8; 16], _data: Vec<u8>) {}
272
273 fn on_proof(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash, _rtt: f64) {}
276
277 fn on_proof_requested(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash) -> bool {
280 true
281 }
282
283 fn on_direct_connect_proposed(&mut self, _link_id: rns_core::types::LinkId, _peer_identity: Option<rns_core::types::IdentityHash>) -> bool {
286 false
287 }
288
289 fn on_direct_connect_established(&mut self, _link_id: rns_core::types::LinkId, _interface_id: InterfaceId) {}
291
292 fn on_direct_connect_failed(&mut self, _link_id: rns_core::types::LinkId, _reason: u8) {}
294}
295
296pub struct Driver {
298 pub(crate) engine: TransportEngine,
299 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
300 pub(crate) rng: OsRng,
301 pub(crate) rx: EventReceiver,
302 pub(crate) callbacks: Box<dyn Callbacks>,
303 pub(crate) started: f64,
304 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
305 pub(crate) tunnel_synth_dest: [u8; 16],
307 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
309 pub(crate) link_manager: LinkManager,
311 pub(crate) management_config: crate::management::ManagementConfig,
313 pub(crate) last_management_announce: f64,
315 pub(crate) initial_announce_sent: bool,
317 pub(crate) known_destinations: HashMap<[u8; 16], crate::destination::AnnouncedIdentity>,
319 pub(crate) path_request_dest: [u8; 16],
321 pub(crate) proof_strategies: HashMap<[u8; 16], (rns_core::types::ProofStrategy, Option<rns_crypto::identity::Identity>)>,
324 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
326 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
328 pub(crate) holepunch_manager: HolePunchManager,
330 pub(crate) event_tx: crate::event::EventSender,
332 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
334 pub(crate) discovery_required_value: u8,
336 pub(crate) discovery_name_hash: [u8; 10],
338 pub(crate) discover_interfaces: bool,
340 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
342 pub(crate) discovery_cleanup_counter: u32,
344 #[cfg(feature = "rns-hooks")]
346 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
347 #[cfg(feature = "rns-hooks")]
349 pub(crate) hook_manager: Option<HookManager>,
350}
351
352impl Driver {
353 pub fn new(
355 config: TransportConfig,
356 rx: EventReceiver,
357 tx: crate::event::EventSender,
358 callbacks: Box<dyn Callbacks>,
359 ) -> Self {
360 let tunnel_synth_dest = rns_core::destination::destination_hash(
361 "rnstransport",
362 &["tunnel", "synthesize"],
363 None,
364 );
365 let path_request_dest = rns_core::destination::destination_hash(
366 "rnstransport",
367 &["path", "request"],
368 None,
369 );
370 let discovery_name_hash = crate::discovery::discovery_name_hash();
371 let mut engine = TransportEngine::new(config);
372 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
373 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
375 let mut local_destinations = HashMap::new();
378 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
379 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
380 Driver {
381 engine,
382 interfaces: HashMap::new(),
383 rng: OsRng,
384 rx,
385 callbacks,
386 started: time::now(),
387 announce_cache: None,
388 tunnel_synth_dest,
389 transport_identity: None,
390 link_manager: LinkManager::new(),
391 management_config: Default::default(),
392 last_management_announce: 0.0,
393 initial_announce_sent: false,
394 known_destinations: HashMap::new(),
395 path_request_dest,
396 proof_strategies: HashMap::new(),
397 sent_packets: HashMap::new(),
398 local_destinations,
399 holepunch_manager: HolePunchManager::new(None, None),
400 event_tx: tx,
401 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
402 std::env::temp_dir().join("rns-discovered-interfaces")
403 ),
404 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
405 discovery_name_hash,
406 discover_interfaces: false,
407 interface_announcer: None,
408 discovery_cleanup_counter: 0,
409 #[cfg(feature = "rns-hooks")]
410 hook_slots: create_hook_slots(),
411 #[cfg(feature = "rns-hooks")]
412 hook_manager: HookManager::new().ok(),
413 }
414 }
415
416 pub fn set_probe_config(&mut self, addr: Option<std::net::SocketAddr>, device: Option<String>) {
418 self.holepunch_manager = HolePunchManager::new(addr, device);
419 }
420
421 pub fn run(&mut self) {
423 loop {
424 let event = match self.rx.recv() {
425 Ok(e) => e,
426 Err(_) => break, };
428
429 match event {
430 Event::Frame { interface_id, data } => {
431 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
433 entry.stats.rxb += data.len() as u64;
434 entry.stats.rx_packets += 1;
435 }
436
437 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
439 if let Some(ref ifac_state) = entry.ifac {
440 match ifac::unmask_inbound(&data, ifac_state) {
442 Some(unmasked) => unmasked,
443 None => {
444 log::debug!("[{}] IFAC rejected packet", interface_id.0);
445 continue;
446 }
447 }
448 } else {
449 if data.len() > 2 && data[0] & 0x80 == 0x80 {
451 log::debug!("[{}] dropping packet with IFAC flag on non-IFAC interface", interface_id.0);
452 continue;
453 }
454 data
455 }
456 } else {
457 data
458 };
459
460 #[cfg(feature = "rns-hooks")]
462 {
463 let ctx = HookContext::Packet(&rns_hooks::PacketContext {
464 flags: if packet.is_empty() { 0 } else { packet[0] },
465 hops: if packet.len() > 1 { packet[1] } else { 0 },
466 destination_hash: extract_dest_hash(&packet),
467 context: 0,
468 packet_hash: [0; 32],
469 interface_id: interface_id.0,
470 data_offset: 0,
471 data_len: packet.len() as u32,
472 });
473 let now = time::now();
474 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
475 {
476 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::PreIngress as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
477 if let Some(ref e) = exec {
478 if !e.injected_actions.is_empty() {
479 let extra = convert_injected_actions(e.injected_actions.clone());
480 self.dispatch_all(extra);
481 }
482 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) {
483 continue;
484 }
485 }
486 }
487 }
488
489 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
491 let now = time::now();
492 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
493 entry.stats.record_incoming_announce(now);
494 }
495 }
496
497 if let Some(entry) = self.interfaces.get(&interface_id) {
499 self.engine.update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
500 }
501
502 let actions = self.engine.handle_inbound(
503 &packet,
504 interface_id,
505 time::now(),
506 &mut self.rng,
507 );
508
509 #[cfg(feature = "rns-hooks")]
511 {
512 let ctx = HookContext::Packet(&rns_hooks::PacketContext {
513 flags: if packet.is_empty() { 0 } else { packet[0] },
514 hops: if packet.len() > 1 { packet[1] } else { 0 },
515 destination_hash: extract_dest_hash(&packet),
516 context: 0,
517 packet_hash: [0; 32],
518 interface_id: interface_id.0,
519 data_offset: 0,
520 data_len: packet.len() as u32,
521 });
522 let now = time::now();
523 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
524 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PreDispatch as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
525 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
526 }
527 }
528
529 self.dispatch_all(actions);
530 }
531 Event::Tick => {
532 #[cfg(feature = "rns-hooks")]
534 {
535 let ctx = HookContext::Tick;
536 let now = time::now();
537 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
538 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::Tick as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
539 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
540 }
541 }
542
543 let now = time::now();
544 for (id, entry) in &self.interfaces {
546 self.engine.update_interface_freq(*id, entry.stats.incoming_announce_freq());
547 }
548 let actions = self.engine.tick(now, &mut self.rng);
549 self.dispatch_all(actions);
550 let link_actions = self.link_manager.tick(&mut self.rng);
552 self.dispatch_link_actions(link_actions);
553 {
555 let tx = self.get_event_sender();
556 let hp_actions = self.holepunch_manager.tick(&tx);
557 self.dispatch_holepunch_actions(hp_actions);
558 }
559 self.tick_management_announces(now);
561 self.sent_packets.retain(|_, (_, sent_time)| now - *sent_time < 60.0);
563
564 self.tick_discovery_announcer(now);
565
566 if self.discover_interfaces {
568 self.discovery_cleanup_counter += 1;
569 if self.discovery_cleanup_counter >= 3600 {
570 self.discovery_cleanup_counter = 0;
571 if let Ok(removed) = self.discovered_interfaces.cleanup() {
572 if removed > 0 {
573 log::info!("Discovery cleanup: removed {} stale entries", removed);
574 }
575 }
576 }
577 }
578 }
579 Event::InterfaceUp(id, new_writer, info) => {
580 let wants_tunnel;
581 if let Some(mut info) = info {
582 log::info!("[{}] dynamic interface registered", id.0);
584 wants_tunnel = info.wants_tunnel;
585 let iface_type = infer_interface_type(&info.name);
586 info.started = time::now();
588 self.engine.register_interface(info.clone());
589 if let Some(writer) = new_writer {
590 self.interfaces.insert(
591 id,
592 InterfaceEntry {
593 id,
594 info,
595 writer,
596 online: true,
597 dynamic: true,
598 ifac: None,
599 stats: InterfaceStats {
600 started: time::now(),
601 ..Default::default()
602 },
603 interface_type: iface_type,
604 },
605 );
606 }
607 self.callbacks.on_interface_up(id);
608 #[cfg(feature = "rns-hooks")]
609 {
610 let ctx = HookContext::Interface { interface_id: id.0 };
611 let now = time::now();
612 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
613 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
614 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
615 }
616 }
617 } else if let Some(entry) = self.interfaces.get_mut(&id) {
618 log::info!("[{}] interface online", id.0);
620 wants_tunnel = entry.info.wants_tunnel;
621 entry.online = true;
622 if let Some(writer) = new_writer {
623 log::info!("[{}] writer refreshed after reconnect", id.0);
624 entry.writer = writer;
625 }
626 self.callbacks.on_interface_up(id);
627 #[cfg(feature = "rns-hooks")]
628 {
629 let ctx = HookContext::Interface { interface_id: id.0 };
630 let now = time::now();
631 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
632 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceUp as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
633 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
634 }
635 }
636 } else {
637 wants_tunnel = false;
638 }
639
640 if wants_tunnel {
642 self.synthesize_tunnel_for_interface(id);
643 }
644 }
645 Event::InterfaceDown(id) => {
646 if let Some(entry) = self.interfaces.get(&id) {
648 if let Some(tunnel_id) = entry.info.tunnel_id {
649 self.engine.void_tunnel_interface(&tunnel_id);
650 }
651 }
652
653 if let Some(entry) = self.interfaces.get(&id) {
654 if entry.dynamic {
655 log::info!("[{}] dynamic interface removed", id.0);
657 self.engine.deregister_interface(id);
658 self.interfaces.remove(&id);
659 } else {
660 log::info!("[{}] interface offline", id.0);
662 self.interfaces.get_mut(&id).unwrap().online = false;
663 }
664 self.callbacks.on_interface_down(id);
665 #[cfg(feature = "rns-hooks")]
666 {
667 let ctx = HookContext::Interface { interface_id: id.0 };
668 let now = time::now();
669 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
670 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceDown as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
671 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
672 }
673 }
674 }
675 }
676 Event::SendOutbound { raw, dest_type, attached_interface } => {
677 match RawPacket::unpack(&raw) {
678 Ok(packet) => {
679 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
681 self.sent_packets.insert(
682 packet.packet_hash,
683 (packet.destination_hash, time::now()),
684 );
685 }
686 let actions = self.engine.handle_outbound(
687 &packet,
688 dest_type,
689 attached_interface,
690 time::now(),
691 );
692 self.dispatch_all(actions);
693 }
694 Err(e) => {
695 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
696 }
697 }
698 }
699 Event::RegisterDestination { dest_hash, dest_type } => {
700 self.engine.register_destination(dest_hash, dest_type);
701 self.local_destinations.insert(dest_hash, dest_type);
702 }
703 Event::DeregisterDestination { dest_hash } => {
704 self.engine.deregister_destination(&dest_hash);
705 self.local_destinations.remove(&dest_hash);
706 }
707 Event::Query(request, response_tx) => {
708 let response = self.handle_query_mut(request);
709 let _ = response_tx.send(response);
710 }
711 Event::DeregisterLinkDestination { dest_hash } => {
712 self.link_manager.deregister_link_destination(&dest_hash);
713 }
714 Event::RegisterLinkDestination { dest_hash, sig_prv_bytes, sig_pub_bytes } => {
715 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
716 self.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes);
717 self.engine.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
719 self.local_destinations.insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
720 }
721 Event::RegisterRequestHandler { path, allowed_list, handler } => {
722 self.link_manager.register_request_handler(&path, allowed_list, move |link_id, p, data, remote| {
723 handler(link_id, p, data, remote)
724 });
725 }
726 Event::CreateLink { dest_hash, dest_sig_pub_bytes, response_tx } => {
727 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
728 let mtu = self.engine.next_hop_interface(&dest_hash)
729 .and_then(|iface_id| self.interfaces.get(&iface_id))
730 .map(|entry| entry.info.mtu)
731 .unwrap_or(rns_core::constants::MTU as u32);
732 let (link_id, link_actions) = self.link_manager.create_link(
733 &dest_hash, &dest_sig_pub_bytes, hops, mtu, &mut self.rng,
734 );
735 let _ = response_tx.send(link_id);
736 self.dispatch_link_actions(link_actions);
737 }
738 Event::SendRequest { link_id, path, data } => {
739 let link_actions = self.link_manager.send_request(
740 &link_id, &path, &data, &mut self.rng,
741 );
742 self.dispatch_link_actions(link_actions);
743 }
744 Event::IdentifyOnLink { link_id, identity_prv_key } => {
745 let identity = rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
746 let link_actions = self.link_manager.identify(&link_id, &identity, &mut self.rng);
747 self.dispatch_link_actions(link_actions);
748 }
749 Event::TeardownLink { link_id } => {
750 let link_actions = self.link_manager.teardown_link(&link_id);
751 self.dispatch_link_actions(link_actions);
752 }
753 Event::SendResource { link_id, data, metadata } => {
754 let link_actions = self.link_manager.send_resource(
755 &link_id, &data, metadata.as_deref(), &mut self.rng,
756 );
757 self.dispatch_link_actions(link_actions);
758 }
759 Event::SetResourceStrategy { link_id, strategy } => {
760 use crate::link_manager::ResourceStrategy;
761 let strat = match strategy {
762 0 => ResourceStrategy::AcceptNone,
763 1 => ResourceStrategy::AcceptAll,
764 2 => ResourceStrategy::AcceptApp,
765 _ => ResourceStrategy::AcceptNone,
766 };
767 self.link_manager.set_resource_strategy(&link_id, strat);
768 }
769 Event::AcceptResource { link_id, resource_hash, accept } => {
770 let link_actions = self.link_manager.accept_resource(
771 &link_id, &resource_hash, accept, &mut self.rng,
772 );
773 self.dispatch_link_actions(link_actions);
774 }
775 Event::SendChannelMessage { link_id, msgtype, payload } => {
776 let link_actions = self.link_manager.send_channel_message(
777 &link_id, msgtype, &payload, &mut self.rng,
778 );
779 self.dispatch_link_actions(link_actions);
780 }
781 Event::SendOnLink { link_id, data, context } => {
782 let link_actions = self.link_manager.send_on_link(
783 &link_id, &data, context, &mut self.rng,
784 );
785 self.dispatch_link_actions(link_actions);
786 }
787 Event::RequestPath { dest_hash } => {
788 self.handle_request_path(dest_hash);
789 }
790 Event::RegisterProofStrategy { dest_hash, strategy, signing_key } => {
791 let identity = signing_key.map(|key| {
792 rns_crypto::identity::Identity::from_private_key(&key)
793 });
794 self.proof_strategies.insert(dest_hash, (strategy, identity));
795 }
796 Event::ProposeDirectConnect { link_id } => {
797 let derived_key = self.link_manager.get_derived_key(&link_id);
798 if let Some(dk) = derived_key {
799 let tx = self.get_event_sender();
800 let hp_actions = self.holepunch_manager.propose(
801 link_id, &dk, &mut self.rng, &tx,
802 );
803 self.dispatch_holepunch_actions(hp_actions);
804 } else {
805 log::warn!("Cannot propose direct connect: no derived key for link {:02x?}", &link_id[..4]);
806 }
807 }
808 Event::SetDirectConnectPolicy { policy } => {
809 self.holepunch_manager.set_policy(policy);
810 }
811 Event::HolePunchProbeResult { link_id, session_id, observed_addr, socket } => {
812 let hp_actions = self.holepunch_manager.handle_probe_result(
813 link_id, session_id, observed_addr, socket,
814 );
815 self.dispatch_holepunch_actions(hp_actions);
816 }
817 Event::HolePunchProbeFailed { link_id, session_id } => {
818 let hp_actions = self.holepunch_manager.handle_probe_failed(
819 link_id, session_id,
820 );
821 self.dispatch_holepunch_actions(hp_actions);
822 }
823 Event::LoadHook { name, wasm_bytes, attach_point, priority, response_tx } => {
824 #[cfg(feature = "rns-hooks")]
825 {
826 let result = (|| -> Result<(), String> {
827 let point_idx = crate::config::parse_hook_point(&attach_point)
828 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
829 let mgr = self.hook_manager.as_ref()
830 .ok_or_else(|| "hook manager not available".to_string())?;
831 let program = mgr.compile(name.clone(), &wasm_bytes, priority)
832 .map_err(|e| format!("compile error: {}", e))?;
833 self.hook_slots[point_idx].attach(program);
834 log::info!("Loaded hook '{}' at point {} (priority {})", name, attach_point, priority);
835 Ok(())
836 })();
837 let _ = response_tx.send(result);
838 }
839 #[cfg(not(feature = "rns-hooks"))]
840 {
841 let _ = (name, wasm_bytes, attach_point, priority);
842 let _ = response_tx.send(Err("hooks not enabled".to_string()));
843 }
844 }
845 Event::UnloadHook { name, attach_point, response_tx } => {
846 #[cfg(feature = "rns-hooks")]
847 {
848 let result = (|| -> Result<(), String> {
849 let point_idx = crate::config::parse_hook_point(&attach_point)
850 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
851 match self.hook_slots[point_idx].detach(&name) {
852 Some(_) => {
853 log::info!("Unloaded hook '{}' from point {}", name, attach_point);
854 Ok(())
855 }
856 None => Err(format!("hook '{}' not found at point '{}'", name, attach_point)),
857 }
858 })();
859 let _ = response_tx.send(result);
860 }
861 #[cfg(not(feature = "rns-hooks"))]
862 {
863 let _ = (name, attach_point);
864 let _ = response_tx.send(Err("hooks not enabled".to_string()));
865 }
866 }
867 Event::ReloadHook { name, attach_point, wasm_bytes, response_tx } => {
868 #[cfg(feature = "rns-hooks")]
869 {
870 let result = (|| -> Result<(), String> {
871 let point_idx = crate::config::parse_hook_point(&attach_point)
872 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
873 let old = self.hook_slots[point_idx].detach(&name)
874 .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
875 let priority = old.priority;
876 let mgr = match self.hook_manager.as_ref() {
877 Some(m) => m,
878 None => {
879 self.hook_slots[point_idx].attach(old);
880 return Err("hook manager not available".to_string());
881 }
882 };
883 match mgr.compile(name.clone(), &wasm_bytes, priority) {
884 Ok(program) => {
885 self.hook_slots[point_idx].attach(program);
886 log::info!("Reloaded hook '{}' at point {} (priority {})", name, attach_point, priority);
887 Ok(())
888 }
889 Err(e) => {
890 self.hook_slots[point_idx].attach(old);
891 Err(format!("compile error: {}", e))
892 }
893 }
894 })();
895 let _ = response_tx.send(result);
896 }
897 #[cfg(not(feature = "rns-hooks"))]
898 {
899 let _ = (name, attach_point, wasm_bytes);
900 let _ = response_tx.send(Err("hooks not enabled".to_string()));
901 }
902 }
903 Event::ListHooks { response_tx } => {
904 #[cfg(feature = "rns-hooks")]
905 {
906 let hook_point_names = [
907 "PreIngress", "PreDispatch", "AnnounceReceived", "PathUpdated",
908 "AnnounceRetransmit", "LinkRequestReceived", "LinkEstablished",
909 "LinkClosed", "InterfaceUp", "InterfaceDown", "InterfaceConfigChanged",
910 "SendOnInterface", "BroadcastOnAllInterfaces", "DeliverLocal",
911 "TunnelSynthesize", "Tick",
912 ];
913 let mut infos = Vec::new();
914 for (idx, slot) in self.hook_slots.iter().enumerate() {
915 let point_name = hook_point_names.get(idx).unwrap_or(&"Unknown");
916 for prog in &slot.programs {
917 infos.push(crate::event::HookInfo {
918 name: prog.name.clone(),
919 attach_point: point_name.to_string(),
920 priority: prog.priority,
921 enabled: prog.enabled,
922 consecutive_traps: prog.consecutive_traps,
923 });
924 }
925 }
926 let _ = response_tx.send(infos);
927 }
928 #[cfg(not(feature = "rns-hooks"))]
929 {
930 let _ = response_tx.send(Vec::new());
931 }
932 }
933 Event::InterfaceConfigChanged(id) => {
934 #[cfg(feature = "rns-hooks")]
935 {
936 let ctx = HookContext::Interface { interface_id: id.0 };
937 let now = time::now();
938 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
939 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::InterfaceConfigChanged as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
940 if !e.injected_actions.is_empty() { self.dispatch_all(convert_injected_actions(e.injected_actions.clone())); }
941 }
942 }
943 #[cfg(not(feature = "rns-hooks"))]
944 let _ = id;
945 }
946 Event::Shutdown => break,
947 }
948 }
949 }
950
951 fn handle_query(&self, request: QueryRequest) -> QueryResponse {
953 match request {
954 QueryRequest::InterfaceStats => {
955 let mut interfaces = Vec::new();
956 let mut total_rxb: u64 = 0;
957 let mut total_txb: u64 = 0;
958 for entry in self.interfaces.values() {
959 total_rxb += entry.stats.rxb;
960 total_txb += entry.stats.txb;
961 interfaces.push(SingleInterfaceStat {
962 name: entry.info.name.clone(),
963 status: entry.online,
964 mode: entry.info.mode,
965 rxb: entry.stats.rxb,
966 txb: entry.stats.txb,
967 rx_packets: entry.stats.rx_packets,
968 tx_packets: entry.stats.tx_packets,
969 bitrate: entry.info.bitrate,
970 ifac_size: entry.ifac.as_ref().map(|s| s.size),
971 started: entry.stats.started,
972 ia_freq: entry.stats.incoming_announce_freq(),
973 oa_freq: entry.stats.outgoing_announce_freq(),
974 interface_type: entry.interface_type.clone(),
975 });
976 }
977 interfaces.sort_by(|a, b| a.name.cmp(&b.name));
979 QueryResponse::InterfaceStats(InterfaceStatsResponse {
980 interfaces,
981 transport_id: self.engine.identity_hash().copied(),
982 transport_enabled: self.engine.transport_enabled(),
983 transport_uptime: time::now() - self.started,
984 total_rxb,
985 total_txb,
986 })
987 }
988 QueryRequest::PathTable { max_hops } => {
989 let entries: Vec<PathTableEntry> = self
990 .engine
991 .path_table_entries()
992 .filter(|(_, entry)| {
993 max_hops.map_or(true, |max| entry.hops <= max)
994 })
995 .map(|(hash, entry)| {
996 let iface_name = self.interfaces.get(&entry.receiving_interface)
997 .map(|e| e.info.name.clone())
998 .or_else(|| self.engine.interface_info(&entry.receiving_interface)
999 .map(|i| i.name.clone()))
1000 .unwrap_or_default();
1001 PathTableEntry {
1002 hash: *hash,
1003 timestamp: entry.timestamp,
1004 via: entry.next_hop,
1005 hops: entry.hops,
1006 expires: entry.expires,
1007 interface: entry.receiving_interface,
1008 interface_name: iface_name,
1009 }
1010 })
1011 .collect();
1012 QueryResponse::PathTable(entries)
1013 }
1014 QueryRequest::RateTable => {
1015 let entries: Vec<RateTableEntry> = self
1016 .engine
1017 .rate_limiter()
1018 .entries()
1019 .map(|(hash, entry)| RateTableEntry {
1020 hash: *hash,
1021 last: entry.last,
1022 rate_violations: entry.rate_violations,
1023 blocked_until: entry.blocked_until,
1024 timestamps: entry.timestamps.clone(),
1025 })
1026 .collect();
1027 QueryResponse::RateTable(entries)
1028 }
1029 QueryRequest::NextHop { dest_hash } => {
1030 let resp = self.engine.next_hop(&dest_hash).map(|next_hop| {
1031 NextHopResponse {
1032 next_hop,
1033 hops: self.engine.hops_to(&dest_hash).unwrap_or(0),
1034 interface: self.engine.next_hop_interface(&dest_hash).unwrap_or(InterfaceId(0)),
1035 }
1036 });
1037 QueryResponse::NextHop(resp)
1038 }
1039 QueryRequest::NextHopIfName { dest_hash } => {
1040 let name = self
1041 .engine
1042 .next_hop_interface(&dest_hash)
1043 .and_then(|id| self.interfaces.get(&id))
1044 .map(|entry| entry.info.name.clone());
1045 QueryResponse::NextHopIfName(name)
1046 }
1047 QueryRequest::LinkCount => {
1048 QueryResponse::LinkCount(self.engine.link_table_count() + self.link_manager.link_count())
1049 }
1050 QueryRequest::DropPath { .. } => {
1051 QueryResponse::DropPath(false)
1053 }
1054 QueryRequest::DropAllVia { .. } => {
1055 QueryResponse::DropAllVia(0)
1056 }
1057 QueryRequest::DropAnnounceQueues => {
1058 QueryResponse::DropAnnounceQueues
1059 }
1060 QueryRequest::TransportIdentity => {
1061 QueryResponse::TransportIdentity(self.engine.identity_hash().copied())
1062 }
1063 QueryRequest::GetBlackholed => {
1064 let now = time::now();
1065 let entries: Vec<BlackholeInfo> = self.engine.blackholed_entries()
1066 .filter(|(_, e)| e.expires == 0.0 || e.expires > now)
1067 .map(|(hash, entry)| BlackholeInfo {
1068 identity_hash: *hash,
1069 created: entry.created,
1070 expires: entry.expires,
1071 reason: entry.reason.clone(),
1072 })
1073 .collect();
1074 QueryResponse::Blackholed(entries)
1075 }
1076 QueryRequest::BlackholeIdentity { .. }
1077 | QueryRequest::UnblackholeIdentity { .. } => {
1078 QueryResponse::BlackholeResult(false)
1080 }
1081 QueryRequest::InjectPath { .. } => {
1082 QueryResponse::InjectPath(false)
1084 }
1085 QueryRequest::InjectIdentity { .. } => {
1086 QueryResponse::InjectIdentity(false)
1088 }
1089 QueryRequest::HasPath { dest_hash } => {
1090 QueryResponse::HasPath(self.engine.has_path(&dest_hash))
1091 }
1092 QueryRequest::HopsTo { dest_hash } => {
1093 QueryResponse::HopsTo(self.engine.hops_to(&dest_hash))
1094 }
1095 QueryRequest::RecallIdentity { dest_hash } => {
1096 QueryResponse::RecallIdentity(self.known_destinations.get(&dest_hash).cloned())
1097 }
1098 QueryRequest::LocalDestinations => {
1099 let entries: Vec<LocalDestinationEntry> = self
1100 .local_destinations
1101 .iter()
1102 .map(|(hash, dest_type)| LocalDestinationEntry {
1103 hash: *hash,
1104 dest_type: *dest_type,
1105 })
1106 .collect();
1107 QueryResponse::LocalDestinations(entries)
1108 }
1109 QueryRequest::Links => {
1110 QueryResponse::Links(self.link_manager.link_entries())
1111 }
1112 QueryRequest::Resources => {
1113 QueryResponse::Resources(self.link_manager.resource_entries())
1114 }
1115 QueryRequest::DiscoveredInterfaces { only_available, only_transport } => {
1116 let mut interfaces = self.discovered_interfaces.list().unwrap_or_default();
1117 crate::discovery::filter_and_sort_interfaces(
1118 &mut interfaces, only_available, only_transport,
1119 );
1120 QueryResponse::DiscoveredInterfaces(interfaces)
1121 }
1122 }
1123 }
1124
1125 fn handle_query_mut(&mut self, request: QueryRequest) -> QueryResponse {
1127 match request {
1128 QueryRequest::BlackholeIdentity { identity_hash, duration_hours, reason } => {
1129 let now = time::now();
1130 self.engine.blackhole_identity(identity_hash, now, duration_hours, reason);
1131 QueryResponse::BlackholeResult(true)
1132 }
1133 QueryRequest::UnblackholeIdentity { identity_hash } => {
1134 let result = self.engine.unblackhole_identity(&identity_hash);
1135 QueryResponse::UnblackholeResult(result)
1136 }
1137 QueryRequest::DropPath { dest_hash } => {
1138 QueryResponse::DropPath(self.engine.drop_path(&dest_hash))
1139 }
1140 QueryRequest::DropAllVia { transport_hash } => {
1141 QueryResponse::DropAllVia(self.engine.drop_all_via(&transport_hash))
1142 }
1143 QueryRequest::DropAnnounceQueues => {
1144 self.engine.drop_announce_queues();
1145 QueryResponse::DropAnnounceQueues
1146 }
1147 QueryRequest::InjectPath {
1148 dest_hash,
1149 next_hop,
1150 hops,
1151 expires,
1152 interface_name,
1153 packet_hash,
1154 } => {
1155 let iface_id = self
1157 .interfaces
1158 .iter()
1159 .find(|(_, entry)| entry.info.name == interface_name)
1160 .map(|(id, _)| *id);
1161 match iface_id {
1162 Some(id) => {
1163 let entry = PathEntry {
1164 timestamp: time::now(),
1165 next_hop,
1166 hops,
1167 expires,
1168 random_blobs: Vec::new(),
1169 receiving_interface: id,
1170 packet_hash,
1171 announce_raw: None,
1172 };
1173 self.engine.inject_path(dest_hash, entry);
1174 QueryResponse::InjectPath(true)
1175 }
1176 None => QueryResponse::InjectPath(false),
1177 }
1178 }
1179 QueryRequest::InjectIdentity {
1180 dest_hash,
1181 identity_hash,
1182 public_key,
1183 app_data,
1184 hops,
1185 received_at,
1186 } => {
1187 self.known_destinations.insert(
1188 dest_hash,
1189 crate::destination::AnnouncedIdentity {
1190 dest_hash: rns_core::types::DestHash(dest_hash),
1191 identity_hash: rns_core::types::IdentityHash(identity_hash),
1192 public_key,
1193 app_data,
1194 hops,
1195 received_at,
1196 },
1197 );
1198 QueryResponse::InjectIdentity(true)
1199 }
1200 other => self.handle_query(other),
1201 }
1202 }
1203
1204 fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
1206 let packet = match RawPacket::unpack(raw) {
1208 Ok(p) => p,
1209 Err(_) => return,
1210 };
1211
1212 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
1213 Ok(validated) => {
1214 let iface_id = self
1217 .interfaces
1218 .iter()
1219 .find(|(_, entry)| entry.info.wants_tunnel && entry.online)
1220 .map(|(id, _)| *id);
1221
1222 if let Some(iface) = iface_id {
1223 let now = time::now();
1224 let tunnel_actions =
1225 self.engine.handle_tunnel(validated.tunnel_id, iface, now);
1226 self.dispatch_all(tunnel_actions);
1227 }
1228 }
1229 Err(e) => {
1230 log::debug!("Tunnel synthesis validation failed: {}", e);
1231 }
1232 }
1233 }
1234
1235 fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
1239 if let Some(ref identity) = self.transport_identity {
1240 let actions = self.engine.synthesize_tunnel(identity, interface, &mut self.rng);
1241 self.dispatch_all(actions);
1242 }
1243 }
1244
1245 fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
1247 let mut data = Vec::with_capacity(48);
1249 data.extend_from_slice(&dest_hash);
1250
1251 if self.engine.transport_enabled() {
1252 if let Some(id_hash) = self.engine.identity_hash() {
1253 data.extend_from_slice(id_hash);
1254 }
1255 }
1256
1257 let mut tag = [0u8; 16];
1259 self.rng.fill_bytes(&mut tag);
1260 data.extend_from_slice(&tag);
1261
1262 let flags = rns_core::packet::PacketFlags {
1264 header_type: rns_core::constants::HEADER_1,
1265 context_flag: rns_core::constants::FLAG_UNSET,
1266 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1267 destination_type: rns_core::constants::DESTINATION_PLAIN,
1268 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1269 };
1270
1271 if let Ok(packet) = RawPacket::pack(
1272 flags, 0, &self.path_request_dest, None,
1273 rns_core::constants::CONTEXT_NONE, &data,
1274 ) {
1275 let actions = self.engine.handle_outbound(
1276 &packet,
1277 rns_core::constants::DESTINATION_PLAIN,
1278 None,
1279 time::now(),
1280 );
1281 self.dispatch_all(actions);
1282 }
1283 }
1284
1285 fn maybe_generate_proof(&mut self, dest_hash: [u8; 16], packet_hash: &[u8; 32]) {
1288 use rns_core::types::ProofStrategy;
1289
1290 let (strategy, identity) = match self.proof_strategies.get(&dest_hash) {
1291 Some((s, id)) => (*s, id.as_ref()),
1292 None => return,
1293 };
1294
1295 let should_prove = match strategy {
1296 ProofStrategy::ProveAll => true,
1297 ProofStrategy::ProveApp => {
1298 self.callbacks.on_proof_requested(
1299 rns_core::types::DestHash(dest_hash),
1300 rns_core::types::PacketHash(*packet_hash),
1301 )
1302 }
1303 ProofStrategy::ProveNone => false,
1304 };
1305
1306 if !should_prove {
1307 return;
1308 }
1309
1310 let identity = match identity {
1311 Some(id) => id,
1312 None => {
1313 log::warn!("Cannot generate proof for {:02x?}: no signing key", &dest_hash[..4]);
1314 return;
1315 }
1316 };
1317
1318 let signature = match identity.sign(packet_hash) {
1320 Ok(sig) => sig,
1321 Err(e) => {
1322 log::warn!("Failed to sign proof for {:02x?}: {:?}", &dest_hash[..4], e);
1323 return;
1324 }
1325 };
1326
1327 let mut proof_data = Vec::with_capacity(96);
1329 proof_data.extend_from_slice(packet_hash);
1330 proof_data.extend_from_slice(&signature);
1331
1332 let mut proof_dest = [0u8; 16];
1338 proof_dest.copy_from_slice(&packet_hash[..16]);
1339
1340 let flags = rns_core::packet::PacketFlags {
1341 header_type: rns_core::constants::HEADER_1,
1342 context_flag: rns_core::constants::FLAG_UNSET,
1343 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1344 destination_type: rns_core::constants::DESTINATION_SINGLE,
1345 packet_type: rns_core::constants::PACKET_TYPE_PROOF,
1346 };
1347
1348 if let Ok(packet) = RawPacket::pack(
1349 flags, 0, &proof_dest, None,
1350 rns_core::constants::CONTEXT_NONE, &proof_data,
1351 ) {
1352 let actions = self.engine.handle_outbound(
1353 &packet,
1354 rns_core::constants::DESTINATION_SINGLE,
1355 None,
1356 time::now(),
1357 );
1358 self.dispatch_all(actions);
1359 log::debug!("Generated proof for packet on dest {:02x?}", &dest_hash[..4]);
1360 }
1361 }
1362
1363 fn handle_inbound_proof(&mut self, dest_hash: [u8; 16], proof_data: &[u8], _raw_packet_hash: &[u8; 32]) {
1365 if proof_data.len() < 96 {
1367 log::debug!("Proof too short for explicit proof: {} bytes", proof_data.len());
1368 return;
1369 }
1370
1371 let mut tracked_hash = [0u8; 32];
1372 tracked_hash.copy_from_slice(&proof_data[..32]);
1373
1374 let signature = &proof_data[32..96];
1375
1376 if let Some((tracked_dest, sent_time)) = self.sent_packets.remove(&tracked_hash) {
1378 if let Some(announced) = self.known_destinations.get(&tracked_dest) {
1381 let identity = rns_crypto::identity::Identity::from_public_key(&announced.public_key);
1382 let mut sig = [0u8; 64];
1383 sig.copy_from_slice(signature);
1384 if !identity.verify(&sig, &tracked_hash) {
1385 log::debug!(
1386 "Proof signature invalid for {:02x?}",
1387 &tracked_hash[..4],
1388 );
1389 return;
1390 }
1391 } else {
1392 log::debug!(
1393 "No known identity for dest {:02x?}, accepting proof without signature check",
1394 &tracked_dest[..4],
1395 );
1396 }
1397
1398 let rtt = time::now() - sent_time;
1399 log::debug!(
1400 "Proof received for {:02x?} rtt={:.3}s",
1401 &tracked_hash[..4], rtt,
1402 );
1403 self.callbacks.on_proof(
1404 rns_core::types::DestHash(tracked_dest),
1405 rns_core::types::PacketHash(tracked_hash),
1406 rtt,
1407 );
1408 } else {
1409 log::debug!(
1410 "Proof for unknown packet {:02x?} on dest {:02x?}",
1411 &tracked_hash[..4], &dest_hash[..4],
1412 );
1413 }
1414 }
1415
1416 fn dispatch_all(&mut self, actions: Vec<TransportAction>) {
1418 #[cfg(feature = "rns-hooks")]
1419 let mut hook_injected: Vec<TransportAction> = Vec::new();
1420
1421 for action in actions {
1422 match action {
1423 TransportAction::SendOnInterface { interface, raw } => {
1424 #[cfg(feature = "rns-hooks")]
1425 {
1426 let pkt_ctx = rns_hooks::PacketContext {
1427 flags: if raw.is_empty() { 0 } else { raw[0] },
1428 hops: if raw.len() > 1 { raw[1] } else { 0 },
1429 destination_hash: extract_dest_hash(&raw),
1430 context: 0,
1431 packet_hash: [0; 32],
1432 interface_id: interface.0,
1433 data_offset: 0,
1434 data_len: raw.len() as u32,
1435 };
1436 let ctx = HookContext::Packet(&pkt_ctx);
1437 let now = time::now();
1438 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1439 {
1440 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::SendOnInterface as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1441 if let Some(ref e) = exec {
1442 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1443 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1444 }
1445 }
1446 }
1447 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1448 if is_announce {
1449 log::debug!("Dispatching announce to interface {} (len={})", interface.0, raw.len());
1450 }
1451 if let Some(entry) = self.interfaces.get_mut(&interface) {
1452 if entry.online {
1453 let data = if let Some(ref ifac_state) = entry.ifac {
1454 ifac::mask_outbound(&raw, ifac_state)
1455 } else {
1456 raw
1457 };
1458 entry.stats.txb += data.len() as u64;
1460 entry.stats.tx_packets += 1;
1461 if is_announce {
1462 entry.stats.record_outgoing_announce(time::now());
1463 }
1464 if let Err(e) = entry.writer.send_frame(&data) {
1465 log::warn!("[{}] send failed: {}", entry.info.id.0, e);
1466 }
1467 }
1468 }
1469 }
1470 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
1471 #[cfg(feature = "rns-hooks")]
1472 {
1473 let pkt_ctx = rns_hooks::PacketContext {
1474 flags: if raw.is_empty() { 0 } else { raw[0] },
1475 hops: if raw.len() > 1 { raw[1] } else { 0 },
1476 destination_hash: extract_dest_hash(&raw),
1477 context: 0,
1478 packet_hash: [0; 32],
1479 interface_id: 0,
1480 data_offset: 0,
1481 data_len: raw.len() as u32,
1482 };
1483 let ctx = HookContext::Packet(&pkt_ctx);
1484 let now = time::now();
1485 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1486 {
1487 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::BroadcastOnAllInterfaces as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1488 if let Some(ref e) = exec {
1489 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1490 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1491 }
1492 }
1493 }
1494 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
1495 for entry in self.interfaces.values_mut() {
1496 if entry.online && Some(entry.id) != exclude {
1497 let data = if let Some(ref ifac_state) = entry.ifac {
1498 ifac::mask_outbound(&raw, ifac_state)
1499 } else {
1500 raw.clone()
1501 };
1502 entry.stats.txb += data.len() as u64;
1504 entry.stats.tx_packets += 1;
1505 if is_announce {
1506 entry.stats.record_outgoing_announce(time::now());
1507 }
1508 if let Err(e) = entry.writer.send_frame(&data) {
1509 log::warn!("[{}] broadcast failed: {}", entry.info.id.0, e);
1510 }
1511 }
1512 }
1513 }
1514 TransportAction::DeliverLocal {
1515 destination_hash,
1516 raw,
1517 packet_hash,
1518 receiving_interface,
1519 } => {
1520 #[cfg(feature = "rns-hooks")]
1521 {
1522 let pkt_ctx = rns_hooks::PacketContext {
1523 flags: 0,
1524 hops: 0,
1525 destination_hash,
1526 context: 0,
1527 packet_hash,
1528 interface_id: receiving_interface.0,
1529 data_offset: 0,
1530 data_len: raw.len() as u32,
1531 };
1532 let ctx = HookContext::Packet(&pkt_ctx);
1533 let now = time::now();
1534 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1535 {
1536 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::DeliverLocal as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1537 if let Some(ref e) = exec {
1538 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1539 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1540 }
1541 }
1542 }
1543 if destination_hash == self.tunnel_synth_dest {
1544 self.handle_tunnel_synth_delivery(&raw);
1546 } else if destination_hash == self.path_request_dest {
1547 if let Ok(packet) = RawPacket::unpack(&raw) {
1549 let actions = self.engine.handle_path_request(
1550 &packet.data,
1551 InterfaceId(0), time::now(),
1553 );
1554 self.dispatch_all(actions);
1555 }
1556 } else if self.link_manager.is_link_destination(&destination_hash) {
1557 let link_actions = self.link_manager.handle_local_delivery(
1559 destination_hash, &raw, packet_hash, receiving_interface, &mut self.rng,
1560 );
1561 if link_actions.is_empty() {
1562 if let Ok(packet) = RawPacket::unpack(&raw) {
1566 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1567 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1568 continue;
1569 }
1570 }
1571 self.maybe_generate_proof(destination_hash, &packet_hash);
1572 self.callbacks.on_local_delivery(
1573 rns_core::types::DestHash(destination_hash),
1574 raw,
1575 rns_core::types::PacketHash(packet_hash),
1576 );
1577 } else {
1578 self.dispatch_link_actions(link_actions);
1579 }
1580 } else {
1581 if let Ok(packet) = RawPacket::unpack(&raw) {
1583 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1584 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1585 continue;
1586 }
1587 }
1588
1589 self.maybe_generate_proof(destination_hash, &packet_hash);
1591
1592 self.callbacks
1593 .on_local_delivery(
1594 rns_core::types::DestHash(destination_hash),
1595 raw,
1596 rns_core::types::PacketHash(packet_hash),
1597 );
1598 }
1599 }
1600 TransportAction::AnnounceReceived {
1601 destination_hash,
1602 identity_hash,
1603 public_key,
1604 name_hash,
1605 app_data,
1606 hops,
1607 receiving_interface,
1608 ..
1609 } => {
1610 #[cfg(feature = "rns-hooks")]
1611 {
1612 let ctx = HookContext::Announce {
1613 destination_hash,
1614 hops,
1615 interface_id: receiving_interface.0,
1616 };
1617 let now = time::now();
1618 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1619 {
1620 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1621 if let Some(ref e) = exec {
1622 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1623 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1624 }
1625 }
1626 }
1627 #[cfg(not(feature = "rns-hooks"))]
1629 let _ = receiving_interface;
1630
1631 if name_hash == self.discovery_name_hash {
1635 if self.discover_interfaces {
1636 if let Some(ref app_data) = app_data {
1637 if let Some(mut discovered) = crate::discovery::parse_interface_announce(
1638 app_data,
1639 &identity_hash,
1640 hops,
1641 self.discovery_required_value,
1642 ) {
1643 if let Ok(Some(existing)) = self.discovered_interfaces.load(&discovered.discovery_hash) {
1645 discovered.discovered = existing.discovered;
1646 discovered.heard_count = existing.heard_count + 1;
1647 }
1648 if let Err(e) = self.discovered_interfaces.store(&discovered) {
1649 log::warn!("Failed to store discovered interface: {}", e);
1650 } else {
1651 log::info!(
1652 "Discovered interface '{}' ({}) at {}:{} [stamp={}]",
1653 discovered.name,
1654 discovered.interface_type,
1655 discovered.reachable_on.as_deref().unwrap_or("?"),
1656 discovered.port.map(|p| p.to_string()).unwrap_or_else(|| "?".into()),
1657 discovered.stamp_value,
1658 );
1659 }
1660 }
1661 }
1662 }
1663 }
1665
1666 let announced = crate::destination::AnnouncedIdentity {
1668 dest_hash: rns_core::types::DestHash(destination_hash),
1669 identity_hash: rns_core::types::IdentityHash(identity_hash),
1670 public_key,
1671 app_data: app_data.clone(),
1672 hops,
1673 received_at: time::now(),
1674 };
1675 self.known_destinations.insert(destination_hash, announced.clone());
1676 self.callbacks.on_announce(announced);
1677 }
1678 TransportAction::PathUpdated {
1679 destination_hash,
1680 hops,
1681 interface,
1682 ..
1683 } => {
1684 #[cfg(feature = "rns-hooks")]
1685 {
1686 let ctx = HookContext::Announce {
1687 destination_hash,
1688 hops,
1689 interface_id: interface.0,
1690 };
1691 let now = time::now();
1692 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1693 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::PathUpdated as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1694 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1695 }
1696 }
1697 #[cfg(not(feature = "rns-hooks"))]
1698 let _ = interface;
1699
1700 self.callbacks.on_path_updated(rns_core::types::DestHash(destination_hash), hops);
1701 }
1702 TransportAction::ForwardToLocalClients { raw, exclude } => {
1703 for entry in self.interfaces.values_mut() {
1704 if entry.online
1705 && entry.info.is_local_client
1706 && Some(entry.id) != exclude
1707 {
1708 let data = if let Some(ref ifac_state) = entry.ifac {
1709 ifac::mask_outbound(&raw, ifac_state)
1710 } else {
1711 raw.clone()
1712 };
1713 entry.stats.txb += data.len() as u64;
1714 entry.stats.tx_packets += 1;
1715 if let Err(e) = entry.writer.send_frame(&data) {
1716 log::warn!("[{}] forward to local client failed: {}", entry.info.id.0, e);
1717 }
1718 }
1719 }
1720 }
1721 TransportAction::ForwardPlainBroadcast { raw, to_local, exclude } => {
1722 for entry in self.interfaces.values_mut() {
1723 if entry.online
1724 && entry.info.is_local_client == to_local
1725 && Some(entry.id) != exclude
1726 {
1727 let data = if let Some(ref ifac_state) = entry.ifac {
1728 ifac::mask_outbound(&raw, ifac_state)
1729 } else {
1730 raw.clone()
1731 };
1732 entry.stats.txb += data.len() as u64;
1733 entry.stats.tx_packets += 1;
1734 if let Err(e) = entry.writer.send_frame(&data) {
1735 log::warn!("[{}] forward plain broadcast failed: {}", entry.info.id.0, e);
1736 }
1737 }
1738 }
1739 }
1740 TransportAction::CacheAnnounce { packet_hash, raw } => {
1741 if let Some(ref cache) = self.announce_cache {
1742 if let Err(e) = cache.store(&packet_hash, &raw, None) {
1743 log::warn!("Failed to cache announce: {}", e);
1744 }
1745 }
1746 }
1747 TransportAction::TunnelSynthesize { interface, data, dest_hash } => {
1748 #[cfg(feature = "rns-hooks")]
1749 {
1750 let pkt_ctx = rns_hooks::PacketContext {
1751 flags: 0,
1752 hops: 0,
1753 destination_hash: dest_hash,
1754 context: 0,
1755 packet_hash: [0; 32],
1756 interface_id: interface.0,
1757 data_offset: 0,
1758 data_len: data.len() as u32,
1759 };
1760 let ctx = HookContext::Packet(&pkt_ctx);
1761 let now = time::now();
1762 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1763 {
1764 let exec = run_hook_inner(&mut self.hook_slots[HookPoint::TunnelSynthesize as usize].programs, &self.hook_manager, &engine_ref, &ctx, now);
1765 if let Some(ref e) = exec {
1766 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1767 if e.hook_result.as_ref().map_or(false, |r| r.is_drop()) { continue; }
1768 }
1769 }
1770 }
1771 let flags = rns_core::packet::PacketFlags {
1773 header_type: rns_core::constants::HEADER_1,
1774 context_flag: rns_core::constants::FLAG_UNSET,
1775 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1776 destination_type: rns_core::constants::DESTINATION_PLAIN,
1777 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1778 };
1779 if let Ok(packet) = rns_core::packet::RawPacket::pack(
1780 flags, 0, &dest_hash, None,
1781 rns_core::constants::CONTEXT_NONE, &data,
1782 ) {
1783 if let Some(entry) = self.interfaces.get_mut(&interface) {
1784 if entry.online {
1785 let raw = if let Some(ref ifac_state) = entry.ifac {
1786 ifac::mask_outbound(&packet.raw, ifac_state)
1787 } else {
1788 packet.raw
1789 };
1790 entry.stats.txb += raw.len() as u64;
1791 entry.stats.tx_packets += 1;
1792 if let Err(e) = entry.writer.send_frame(&raw) {
1793 log::warn!("[{}] tunnel synthesize send failed: {}", entry.info.id.0, e);
1794 }
1795 }
1796 }
1797 }
1798 }
1799 TransportAction::TunnelEstablished { tunnel_id, interface } => {
1800 log::info!("Tunnel established: {:02x?} on interface {}", &tunnel_id[..4], interface.0);
1801 }
1802 TransportAction::AnnounceRetransmit { destination_hash, hops, interface } => {
1803 #[cfg(feature = "rns-hooks")]
1804 {
1805 let ctx = HookContext::Announce {
1806 destination_hash,
1807 hops,
1808 interface_id: interface.map(|i| i.0).unwrap_or(0),
1809 };
1810 let now = time::now();
1811 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1812 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::AnnounceRetransmit as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1813 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1814 }
1815 }
1816 #[cfg(not(feature = "rns-hooks"))]
1817 {
1818 let _ = (destination_hash, hops, interface);
1819 }
1820 }
1821 TransportAction::LinkRequestReceived { link_id, destination_hash: _, receiving_interface } => {
1822 #[cfg(feature = "rns-hooks")]
1823 {
1824 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
1825 let now = time::now();
1826 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1827 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1828 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1829 }
1830 }
1831 #[cfg(not(feature = "rns-hooks"))]
1832 {
1833 let _ = (link_id, receiving_interface);
1834 }
1835 }
1836 TransportAction::LinkEstablished { link_id, interface } => {
1837 #[cfg(feature = "rns-hooks")]
1838 {
1839 let ctx = HookContext::Link { link_id, interface_id: interface.0 };
1840 let now = time::now();
1841 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1842 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1843 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1844 }
1845 }
1846 #[cfg(not(feature = "rns-hooks"))]
1847 {
1848 let _ = (link_id, interface);
1849 }
1850 }
1851 TransportAction::LinkClosed { link_id } => {
1852 #[cfg(feature = "rns-hooks")]
1853 {
1854 let ctx = HookContext::Link { link_id, interface_id: 0 };
1855 let now = time::now();
1856 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1857 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1858 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1859 }
1860 }
1861 #[cfg(not(feature = "rns-hooks"))]
1862 {
1863 let _ = link_id;
1864 }
1865 }
1866 }
1867 }
1868
1869 #[cfg(feature = "rns-hooks")]
1871 if !hook_injected.is_empty() {
1872 self.dispatch_all(hook_injected);
1873 }
1874 }
1875
1876 fn dispatch_link_actions(&mut self, actions: Vec<LinkManagerAction>) {
1878 #[cfg(feature = "rns-hooks")]
1879 let mut hook_injected: Vec<TransportAction> = Vec::new();
1880
1881 for action in actions {
1882 match action {
1883 LinkManagerAction::SendPacket { raw, dest_type, attached_interface } => {
1884 match RawPacket::unpack(&raw) {
1886 Ok(packet) => {
1887 let transport_actions = self.engine.handle_outbound(
1888 &packet,
1889 dest_type,
1890 attached_interface,
1891 time::now(),
1892 );
1893 self.dispatch_all(transport_actions);
1894 }
1895 Err(e) => {
1896 log::warn!("LinkManager SendPacket: failed to unpack: {:?}", e);
1897 }
1898 }
1899 }
1900 LinkManagerAction::LinkEstablished { link_id, dest_hash, rtt, is_initiator } => {
1901 #[cfg(feature = "rns-hooks")]
1902 {
1903 let ctx = HookContext::Link { link_id, interface_id: 0 };
1904 let now = time::now();
1905 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1906 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkEstablished as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1907 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1908 }
1909 }
1910 log::info!(
1911 "Link established: {:02x?} rtt={:.3}s initiator={}",
1912 &link_id[..4], rtt, is_initiator,
1913 );
1914 self.callbacks.on_link_established(
1915 rns_core::types::LinkId(link_id),
1916 rns_core::types::DestHash(dest_hash),
1917 rtt,
1918 is_initiator,
1919 );
1920 }
1921 LinkManagerAction::LinkClosed { link_id, reason } => {
1922 #[cfg(feature = "rns-hooks")]
1923 {
1924 let ctx = HookContext::Link { link_id, interface_id: 0 };
1925 let now = time::now();
1926 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
1927 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkClosed as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
1928 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
1929 }
1930 }
1931 log::info!("Link closed: {:02x?} reason={:?}", &link_id[..4], reason);
1932 self.holepunch_manager.link_closed(&link_id);
1933 self.callbacks.on_link_closed(rns_core::types::LinkId(link_id), reason);
1934 }
1935 LinkManagerAction::RemoteIdentified { link_id, identity_hash, public_key } => {
1936 log::debug!(
1937 "Remote identified on link {:02x?}: {:02x?}",
1938 &link_id[..4], &identity_hash[..4],
1939 );
1940 self.callbacks.on_remote_identified(
1941 rns_core::types::LinkId(link_id),
1942 rns_core::types::IdentityHash(identity_hash),
1943 public_key,
1944 );
1945 }
1946 LinkManagerAction::RegisterLinkDest { link_id } => {
1947 self.engine.register_destination(link_id, rns_core::constants::DESTINATION_LINK);
1949 }
1950 LinkManagerAction::DeregisterLinkDest { link_id } => {
1951 self.engine.deregister_destination(&link_id);
1952 }
1953 LinkManagerAction::ManagementRequest {
1954 link_id, path_hash, data, request_id, remote_identity,
1955 } => {
1956 self.handle_management_request(
1957 link_id, path_hash, data, request_id, remote_identity,
1958 );
1959 }
1960 LinkManagerAction::ResourceReceived { link_id, data, metadata } => {
1961 self.callbacks.on_resource_received(rns_core::types::LinkId(link_id), data, metadata);
1962 }
1963 LinkManagerAction::ResourceCompleted { link_id } => {
1964 self.callbacks.on_resource_completed(rns_core::types::LinkId(link_id));
1965 }
1966 LinkManagerAction::ResourceFailed { link_id, error } => {
1967 log::debug!("Resource failed on link {:02x?}: {}", &link_id[..4], error);
1968 self.callbacks.on_resource_failed(rns_core::types::LinkId(link_id), error);
1969 }
1970 LinkManagerAction::ResourceProgress { link_id, received, total } => {
1971 self.callbacks.on_resource_progress(rns_core::types::LinkId(link_id), received, total);
1972 }
1973 LinkManagerAction::ResourceAcceptQuery { link_id, resource_hash, transfer_size, has_metadata } => {
1974 let accept = self.callbacks.on_resource_accept_query(
1975 rns_core::types::LinkId(link_id), resource_hash.clone(), transfer_size, has_metadata,
1976 );
1977 let accept_actions = self.link_manager.accept_resource(
1978 &link_id, &resource_hash, accept, &mut self.rng,
1979 );
1980 self.dispatch_link_actions(accept_actions);
1982 }
1983 LinkManagerAction::ChannelMessageReceived { link_id, msgtype, payload } => {
1984 if HolePunchManager::is_holepunch_message(msgtype) {
1986 let derived_key = self.link_manager.get_derived_key(&link_id);
1987 let tx = self.get_event_sender();
1988 let (handled, hp_actions) = self.holepunch_manager.handle_signal(
1989 link_id, msgtype, payload, derived_key.as_deref(), &tx,
1990 );
1991 if handled {
1992 self.dispatch_holepunch_actions(hp_actions);
1993 }
1994 } else {
1995 self.callbacks.on_channel_message(rns_core::types::LinkId(link_id), msgtype, payload);
1996 }
1997 }
1998 LinkManagerAction::LinkDataReceived { link_id, context, data } => {
1999 self.callbacks.on_link_data(rns_core::types::LinkId(link_id), context, data);
2000 }
2001 LinkManagerAction::ResponseReceived { link_id, request_id, data } => {
2002 self.callbacks.on_response(rns_core::types::LinkId(link_id), request_id, data);
2003 }
2004 LinkManagerAction::LinkRequestReceived { link_id, receiving_interface } => {
2005 #[cfg(feature = "rns-hooks")]
2006 {
2007 let ctx = HookContext::Link { link_id, interface_id: receiving_interface.0 };
2008 let now = time::now();
2009 let engine_ref = EngineRef { engine: &self.engine, interfaces: &self.interfaces, link_manager: &self.link_manager, now };
2010 if let Some(ref e) = run_hook_inner(&mut self.hook_slots[HookPoint::LinkRequestReceived as usize].programs, &self.hook_manager, &engine_ref, &ctx, now) {
2011 if !e.injected_actions.is_empty() { hook_injected.extend(convert_injected_actions(e.injected_actions.clone())); }
2012 }
2013 }
2014 #[cfg(not(feature = "rns-hooks"))]
2015 { let _ = (link_id, receiving_interface); }
2016 }
2017 }
2018 }
2019
2020 #[cfg(feature = "rns-hooks")]
2022 if !hook_injected.is_empty() {
2023 self.dispatch_all(hook_injected);
2024 }
2025 }
2026
2027 fn dispatch_holepunch_actions(&mut self, actions: Vec<HolePunchManagerAction>) {
2029 for action in actions {
2030 match action {
2031 HolePunchManagerAction::SendChannelMessage { link_id, msgtype, payload } => {
2032 let link_actions = self.link_manager.send_channel_message(
2033 &link_id, msgtype, &payload, &mut self.rng,
2034 );
2035 self.dispatch_link_actions(link_actions);
2036 }
2037 HolePunchManagerAction::DirectConnectEstablished { link_id, session_id, interface_id, rtt, mtu } => {
2038 log::info!(
2039 "Direct connection established for link {:02x?} session {:02x?} iface {} rtt={:.1}ms mtu={}",
2040 &link_id[..4], &session_id[..4], interface_id.0, rtt * 1000.0, mtu
2041 );
2042 self.engine.redirect_path(&link_id, interface_id, time::now());
2044 self.link_manager.set_link_rtt(&link_id, rtt);
2046 self.link_manager.set_link_mtu(&link_id, mtu);
2047 self.link_manager.record_link_inbound(&link_id);
2050 self.link_manager.flush_channel_tx(&link_id);
2052 self.callbacks.on_direct_connect_established(
2053 rns_core::types::LinkId(link_id),
2054 interface_id,
2055 );
2056 }
2057 HolePunchManagerAction::DirectConnectFailed { link_id, session_id, reason } => {
2058 log::debug!(
2059 "Direct connection failed for link {:02x?} session {:02x?} reason={}",
2060 &link_id[..4], &session_id[..4], reason
2061 );
2062 self.callbacks.on_direct_connect_failed(
2063 rns_core::types::LinkId(link_id),
2064 reason,
2065 );
2066 }
2067 }
2068 }
2069 }
2070
2071 fn get_event_sender(&self) -> crate::event::EventSender {
2076 self.event_tx.clone()
2080 }
2081
2082 const MANAGEMENT_ANNOUNCE_INTERVAL: f64 = 300.0;
2084
2085 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
2087
2088 fn tick_discovery_announcer(&mut self, now: f64) {
2090 let announcer = match self.interface_announcer.as_mut() {
2091 Some(a) => a,
2092 None => return,
2093 };
2094
2095 announcer.maybe_start(now);
2096
2097 let stamp_result = match announcer.poll_ready() {
2098 Some(r) => r,
2099 None => return,
2100 };
2101
2102 let identity = match self.transport_identity.as_ref() {
2103 Some(id) => id,
2104 None => {
2105 log::warn!("Discovery: stamp ready but no transport identity");
2106 return;
2107 }
2108 };
2109
2110 let identity_hash = identity.hash();
2112 let disc_dest = rns_core::destination::destination_hash(
2113 crate::discovery::APP_NAME,
2114 &["discovery", "interface"],
2115 Some(&identity_hash),
2116 );
2117 let name_hash = self.discovery_name_hash;
2118 let mut random_hash = [0u8; 10];
2119 self.rng.fill_bytes(&mut random_hash);
2120
2121 let (announce_data, _) = match rns_core::announce::AnnounceData::pack(
2122 identity, &disc_dest, &name_hash, &random_hash,
2123 None, Some(&stamp_result.app_data),
2124 ) {
2125 Ok(v) => v,
2126 Err(e) => {
2127 log::warn!("Discovery: failed to pack announce: {}", e);
2128 return;
2129 }
2130 };
2131
2132 let flags = rns_core::packet::PacketFlags {
2133 header_type: rns_core::constants::HEADER_1,
2134 context_flag: rns_core::constants::FLAG_UNSET,
2135 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
2136 destination_type: rns_core::constants::DESTINATION_SINGLE,
2137 packet_type: rns_core::constants::PACKET_TYPE_ANNOUNCE,
2138 };
2139
2140 let packet = match RawPacket::pack(
2141 flags, 0, &disc_dest, None,
2142 rns_core::constants::CONTEXT_NONE, &announce_data,
2143 ) {
2144 Ok(p) => p,
2145 Err(e) => {
2146 log::warn!("Discovery: failed to pack packet: {}", e);
2147 return;
2148 }
2149 };
2150
2151 let outbound_actions = self.engine.handle_outbound(
2152 &packet, rns_core::constants::DESTINATION_SINGLE, None, now,
2153 );
2154 log::info!(
2155 "Discovery announce sent for interface #{} ({} actions, dest={:02x?})",
2156 stamp_result.index, outbound_actions.len(), &disc_dest[..4],
2157 );
2158 self.dispatch_all(outbound_actions);
2159 }
2160
2161 fn tick_management_announces(&mut self, now: f64) {
2163 if self.transport_identity.is_none() {
2164 return;
2165 }
2166
2167 let uptime = now - self.started;
2168
2169 if !self.initial_announce_sent {
2171 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
2172 return;
2173 }
2174 self.initial_announce_sent = true;
2175 self.emit_management_announces(now);
2176 return;
2177 }
2178
2179 if now - self.last_management_announce >= Self::MANAGEMENT_ANNOUNCE_INTERVAL {
2181 self.emit_management_announces(now);
2182 }
2183 }
2184
2185 fn emit_management_announces(&mut self, now: f64) {
2187 use crate::management;
2188
2189 self.last_management_announce = now;
2190
2191 let identity = match self.transport_identity {
2192 Some(ref id) => id,
2193 None => return,
2194 };
2195
2196 let mgmt_raw = if self.management_config.enable_remote_management {
2198 management::build_management_announce(identity, &mut self.rng)
2199 } else {
2200 None
2201 };
2202
2203 let bh_raw = if self.management_config.publish_blackhole {
2204 management::build_blackhole_announce(identity, &mut self.rng)
2205 } else {
2206 None
2207 };
2208
2209 if let Some(raw) = mgmt_raw {
2210 if let Ok(packet) = RawPacket::unpack(&raw) {
2211 let actions = self.engine.handle_outbound(
2212 &packet,
2213 rns_core::constants::DESTINATION_SINGLE,
2214 None,
2215 now,
2216 );
2217 self.dispatch_all(actions);
2218 log::debug!("Emitted management destination announce");
2219 }
2220 }
2221
2222 if let Some(raw) = bh_raw {
2223 if let Ok(packet) = RawPacket::unpack(&raw) {
2224 let actions = self.engine.handle_outbound(
2225 &packet,
2226 rns_core::constants::DESTINATION_SINGLE,
2227 None,
2228 now,
2229 );
2230 self.dispatch_all(actions);
2231 log::debug!("Emitted blackhole info announce");
2232 }
2233 }
2234 }
2235
2236 fn handle_management_request(
2238 &mut self,
2239 link_id: [u8; 16],
2240 path_hash: [u8; 16],
2241 data: Vec<u8>,
2242 request_id: [u8; 16],
2243 remote_identity: Option<([u8; 16], [u8; 64])>,
2244 ) {
2245 use crate::management;
2246
2247 let is_restricted = path_hash == management::status_path_hash()
2249 || path_hash == management::path_path_hash();
2250
2251 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
2252 match remote_identity {
2253 Some((identity_hash, _)) => {
2254 if !self.management_config.remote_management_allowed.contains(&identity_hash) {
2255 log::debug!("Management request denied: identity not in allowed list");
2256 return;
2257 }
2258 }
2259 None => {
2260 log::debug!("Management request denied: peer not identified");
2261 return;
2262 }
2263 }
2264 }
2265
2266 let response_data = if path_hash == management::status_path_hash() {
2267 management::handle_status_request(&data, &self.engine, &self.interfaces, self.started)
2268 } else if path_hash == management::path_path_hash() {
2269 management::handle_path_request(&data, &self.engine)
2270 } else if path_hash == management::list_path_hash() {
2271 management::handle_blackhole_list_request(&self.engine)
2272 } else {
2273 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
2274 None
2275 };
2276
2277 if let Some(response) = response_data {
2278 let actions = self.link_manager.send_management_response(
2279 &link_id, &request_id, &response, &mut self.rng,
2280 );
2281 self.dispatch_link_actions(actions);
2282 }
2283 }
2284}
2285
2286#[cfg(test)]
2287mod tests {
2288 use super::*;
2289 use crate::event;
2290 use crate::interface::Writer;
2291 use rns_core::announce::AnnounceData;
2292 use rns_core::constants;
2293 use rns_core::packet::PacketFlags;
2294 use rns_core::transport::types::InterfaceInfo;
2295 use rns_crypto::identity::Identity;
2296 use std::io;
2297 use std::sync::mpsc;
2298 use std::sync::{Arc, Mutex};
2299
2300 struct MockWriter {
2301 sent: Arc<Mutex<Vec<Vec<u8>>>>,
2302 }
2303
2304 impl MockWriter {
2305 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
2306 let sent = Arc::new(Mutex::new(Vec::new()));
2307 (MockWriter { sent: sent.clone() }, sent)
2308 }
2309 }
2310
2311 impl Writer for MockWriter {
2312 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
2313 self.sent.lock().unwrap().push(data.to_vec());
2314 Ok(())
2315 }
2316 }
2317
2318 use rns_core::types::{DestHash, IdentityHash, LinkId as TypedLinkId, PacketHash};
2319
2320 struct MockCallbacks {
2321 announces: Arc<Mutex<Vec<(DestHash, u8)>>>,
2322 paths: Arc<Mutex<Vec<(DestHash, u8)>>>,
2323 deliveries: Arc<Mutex<Vec<DestHash>>>,
2324 iface_ups: Arc<Mutex<Vec<InterfaceId>>>,
2325 iface_downs: Arc<Mutex<Vec<InterfaceId>>>,
2326 link_established: Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2327 link_closed: Arc<Mutex<Vec<TypedLinkId>>>,
2328 remote_identified: Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2329 resources_received: Arc<Mutex<Vec<(TypedLinkId, Vec<u8>)>>>,
2330 resource_completed: Arc<Mutex<Vec<TypedLinkId>>>,
2331 resource_failed: Arc<Mutex<Vec<(TypedLinkId, String)>>>,
2332 channel_messages: Arc<Mutex<Vec<(TypedLinkId, u16, Vec<u8>)>>>,
2333 link_data: Arc<Mutex<Vec<(TypedLinkId, u8, Vec<u8>)>>>,
2334 responses: Arc<Mutex<Vec<(TypedLinkId, [u8; 16], Vec<u8>)>>>,
2335 proofs: Arc<Mutex<Vec<(DestHash, PacketHash, f64)>>>,
2336 proof_requested: Arc<Mutex<Vec<(DestHash, PacketHash)>>>,
2337 }
2338
2339 impl MockCallbacks {
2340 fn new() -> (
2341 Self,
2342 Arc<Mutex<Vec<(DestHash, u8)>>>,
2343 Arc<Mutex<Vec<(DestHash, u8)>>>,
2344 Arc<Mutex<Vec<DestHash>>>,
2345 Arc<Mutex<Vec<InterfaceId>>>,
2346 Arc<Mutex<Vec<InterfaceId>>>,
2347 ) {
2348 let announces = Arc::new(Mutex::new(Vec::new()));
2349 let paths = Arc::new(Mutex::new(Vec::new()));
2350 let deliveries = Arc::new(Mutex::new(Vec::new()));
2351 let iface_ups = Arc::new(Mutex::new(Vec::new()));
2352 let iface_downs = Arc::new(Mutex::new(Vec::new()));
2353 (
2354 MockCallbacks {
2355 announces: announces.clone(),
2356 paths: paths.clone(),
2357 deliveries: deliveries.clone(),
2358 iface_ups: iface_ups.clone(),
2359 iface_downs: iface_downs.clone(),
2360 link_established: Arc::new(Mutex::new(Vec::new())),
2361 link_closed: Arc::new(Mutex::new(Vec::new())),
2362 remote_identified: Arc::new(Mutex::new(Vec::new())),
2363 resources_received: Arc::new(Mutex::new(Vec::new())),
2364 resource_completed: Arc::new(Mutex::new(Vec::new())),
2365 resource_failed: Arc::new(Mutex::new(Vec::new())),
2366 channel_messages: Arc::new(Mutex::new(Vec::new())),
2367 link_data: Arc::new(Mutex::new(Vec::new())),
2368 responses: Arc::new(Mutex::new(Vec::new())),
2369 proofs: Arc::new(Mutex::new(Vec::new())),
2370 proof_requested: Arc::new(Mutex::new(Vec::new())),
2371 },
2372 announces,
2373 paths,
2374 deliveries,
2375 iface_ups,
2376 iface_downs,
2377 )
2378 }
2379
2380 fn with_link_tracking() -> (
2381 Self,
2382 Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
2383 Arc<Mutex<Vec<TypedLinkId>>>,
2384 Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
2385 ) {
2386 let link_established = Arc::new(Mutex::new(Vec::new()));
2387 let link_closed = Arc::new(Mutex::new(Vec::new()));
2388 let remote_identified = Arc::new(Mutex::new(Vec::new()));
2389 (
2390 MockCallbacks {
2391 announces: Arc::new(Mutex::new(Vec::new())),
2392 paths: Arc::new(Mutex::new(Vec::new())),
2393 deliveries: Arc::new(Mutex::new(Vec::new())),
2394 iface_ups: Arc::new(Mutex::new(Vec::new())),
2395 iface_downs: Arc::new(Mutex::new(Vec::new())),
2396 link_established: link_established.clone(),
2397 link_closed: link_closed.clone(),
2398 remote_identified: remote_identified.clone(),
2399 resources_received: Arc::new(Mutex::new(Vec::new())),
2400 resource_completed: Arc::new(Mutex::new(Vec::new())),
2401 resource_failed: Arc::new(Mutex::new(Vec::new())),
2402 channel_messages: Arc::new(Mutex::new(Vec::new())),
2403 link_data: Arc::new(Mutex::new(Vec::new())),
2404 responses: Arc::new(Mutex::new(Vec::new())),
2405 proofs: Arc::new(Mutex::new(Vec::new())),
2406 proof_requested: Arc::new(Mutex::new(Vec::new())),
2407 },
2408 link_established,
2409 link_closed,
2410 remote_identified,
2411 )
2412 }
2413 }
2414
2415 impl Callbacks for MockCallbacks {
2416 fn on_announce(&mut self, announced: crate::destination::AnnouncedIdentity) {
2417 self.announces.lock().unwrap().push((announced.dest_hash, announced.hops));
2418 }
2419
2420 fn on_path_updated(&mut self, dest_hash: DestHash, hops: u8) {
2421 self.paths.lock().unwrap().push((dest_hash, hops));
2422 }
2423
2424 fn on_local_delivery(&mut self, dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
2425 self.deliveries.lock().unwrap().push(dest_hash);
2426 }
2427
2428 fn on_interface_up(&mut self, id: InterfaceId) {
2429 self.iface_ups.lock().unwrap().push(id);
2430 }
2431
2432 fn on_interface_down(&mut self, id: InterfaceId) {
2433 self.iface_downs.lock().unwrap().push(id);
2434 }
2435
2436 fn on_link_established(&mut self, link_id: TypedLinkId, _dest_hash: DestHash, rtt: f64, is_initiator: bool) {
2437 self.link_established.lock().unwrap().push((link_id, rtt, is_initiator));
2438 }
2439
2440 fn on_link_closed(&mut self, link_id: TypedLinkId, _reason: Option<rns_core::link::TeardownReason>) {
2441 self.link_closed.lock().unwrap().push(link_id);
2442 }
2443
2444 fn on_remote_identified(&mut self, link_id: TypedLinkId, identity_hash: IdentityHash, _public_key: [u8; 64]) {
2445 self.remote_identified.lock().unwrap().push((link_id, identity_hash));
2446 }
2447
2448 fn on_resource_received(&mut self, link_id: TypedLinkId, data: Vec<u8>, _metadata: Option<Vec<u8>>) {
2449 self.resources_received.lock().unwrap().push((link_id, data));
2450 }
2451
2452 fn on_resource_completed(&mut self, link_id: TypedLinkId) {
2453 self.resource_completed.lock().unwrap().push(link_id);
2454 }
2455
2456 fn on_resource_failed(&mut self, link_id: TypedLinkId, error: String) {
2457 self.resource_failed.lock().unwrap().push((link_id, error));
2458 }
2459
2460 fn on_channel_message(&mut self, link_id: TypedLinkId, msgtype: u16, payload: Vec<u8>) {
2461 self.channel_messages.lock().unwrap().push((link_id, msgtype, payload));
2462 }
2463
2464 fn on_link_data(&mut self, link_id: TypedLinkId, context: u8, data: Vec<u8>) {
2465 self.link_data.lock().unwrap().push((link_id, context, data));
2466 }
2467
2468 fn on_response(&mut self, link_id: TypedLinkId, request_id: [u8; 16], data: Vec<u8>) {
2469 self.responses.lock().unwrap().push((link_id, request_id, data));
2470 }
2471
2472 fn on_proof(&mut self, dest_hash: DestHash, packet_hash: PacketHash, rtt: f64) {
2473 self.proofs.lock().unwrap().push((dest_hash, packet_hash, rtt));
2474 }
2475
2476 fn on_proof_requested(&mut self, dest_hash: DestHash, packet_hash: PacketHash) -> bool {
2477 self.proof_requested.lock().unwrap().push((dest_hash, packet_hash));
2478 true
2479 }
2480 }
2481
2482 fn make_interface_info(id: u64) -> InterfaceInfo {
2483 InterfaceInfo {
2484 id: InterfaceId(id),
2485 name: format!("test-{}", id),
2486 mode: constants::MODE_FULL,
2487 out_capable: true,
2488 in_capable: true,
2489 bitrate: None,
2490 announce_rate_target: None,
2491 announce_rate_grace: 0,
2492 announce_rate_penalty: 0.0,
2493 announce_cap: rns_core::constants::ANNOUNCE_CAP,
2494 is_local_client: false,
2495 wants_tunnel: false,
2496 tunnel_id: None,
2497 mtu: constants::MTU as u32,
2498 ia_freq: 0.0,
2499 started: 0.0,
2500 ingress_control: false,
2501 }
2502 }
2503
2504 fn make_entry(id: u64, writer: Box<dyn Writer>, online: bool) -> InterfaceEntry {
2505 InterfaceEntry {
2506 id: InterfaceId(id),
2507 info: make_interface_info(id),
2508 writer,
2509 online,
2510 dynamic: false,
2511 ifac: None,
2512 stats: InterfaceStats::default(),
2513 interface_type: String::new(),
2514 }
2515 }
2516
2517 fn build_announce_packet(identity: &Identity) -> Vec<u8> {
2519 let dest_hash = rns_core::destination::destination_hash(
2520 "test",
2521 &["app"],
2522 Some(identity.hash()),
2523 );
2524 let name_hash = rns_core::destination::name_hash("test", &["app"]);
2525 let random_hash = [0x42u8; 10];
2526
2527 let (announce_data, _has_ratchet) = AnnounceData::pack(
2528 identity,
2529 &dest_hash,
2530 &name_hash,
2531 &random_hash,
2532 None,
2533 None,
2534 )
2535 .unwrap();
2536
2537 let flags = PacketFlags {
2538 header_type: constants::HEADER_1,
2539 context_flag: constants::FLAG_UNSET,
2540 transport_type: constants::TRANSPORT_BROADCAST,
2541 destination_type: constants::DESTINATION_SINGLE,
2542 packet_type: constants::PACKET_TYPE_ANNOUNCE,
2543 };
2544
2545 let packet = RawPacket::pack(flags, 0, &dest_hash, None, constants::CONTEXT_NONE, &announce_data).unwrap();
2546 packet.raw
2547 }
2548
2549 #[test]
2550 fn process_inbound_frame() {
2551 let (tx, rx) = event::channel();
2552 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
2553 let mut driver = Driver::new(
2554 TransportConfig { transport_enabled: false, identity_hash: None },
2555 rx,
2556 tx.clone(),
2557 Box::new(cbs),
2558 );
2559 let info = make_interface_info(1);
2560 driver.engine.register_interface(info.clone());
2561 let (writer, _sent) = MockWriter::new();
2562 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2563
2564 let identity = Identity::new(&mut OsRng);
2565 let announce_raw = build_announce_packet(&identity);
2566
2567 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2569 tx.send(Event::Shutdown).unwrap();
2570 driver.run();
2571
2572 assert_eq!(announces.lock().unwrap().len(), 1);
2573 }
2574
2575 #[test]
2576 fn dispatch_send() {
2577 let (tx, rx) = event::channel();
2578 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2579 let mut driver = Driver::new(
2580 TransportConfig { transport_enabled: false, identity_hash: None },
2581 rx,
2582 tx.clone(),
2583 Box::new(cbs),
2584 );
2585 let (writer, sent) = MockWriter::new();
2586 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2587
2588 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2589 interface: InterfaceId(1),
2590 raw: vec![0x01, 0x02, 0x03],
2591 }]);
2592
2593 assert_eq!(sent.lock().unwrap().len(), 1);
2594 assert_eq!(sent.lock().unwrap()[0], vec![0x01, 0x02, 0x03]);
2595
2596 drop(tx);
2597 }
2598
2599 #[test]
2600 fn dispatch_broadcast() {
2601 let (tx, rx) = event::channel();
2602 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2603 let mut driver = Driver::new(
2604 TransportConfig { transport_enabled: false, identity_hash: None },
2605 rx,
2606 tx.clone(),
2607 Box::new(cbs),
2608 );
2609
2610 let (w1, sent1) = MockWriter::new();
2611 let (w2, sent2) = MockWriter::new();
2612 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2613 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2614
2615 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2616 raw: vec![0xAA],
2617 exclude: None,
2618 }]);
2619
2620 assert_eq!(sent1.lock().unwrap().len(), 1);
2621 assert_eq!(sent2.lock().unwrap().len(), 1);
2622
2623 drop(tx);
2624 }
2625
2626 #[test]
2627 fn dispatch_broadcast_exclude() {
2628 let (tx, rx) = event::channel();
2629 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2630 let mut driver = Driver::new(
2631 TransportConfig { transport_enabled: false, identity_hash: None },
2632 rx,
2633 tx.clone(),
2634 Box::new(cbs),
2635 );
2636
2637 let (w1, sent1) = MockWriter::new();
2638 let (w2, sent2) = MockWriter::new();
2639 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2640 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2641
2642 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2643 raw: vec![0xBB],
2644 exclude: Some(InterfaceId(1)),
2645 }]);
2646
2647 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2649
2650 drop(tx);
2651 }
2652
2653 #[test]
2654 fn tick_event() {
2655 let (tx, rx) = event::channel();
2656 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2657 let mut driver = Driver::new(
2658 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
2659 rx,
2660 tx.clone(),
2661 Box::new(cbs),
2662 );
2663 let info = make_interface_info(1);
2664 driver.engine.register_interface(info.clone());
2665 let (writer, _sent) = MockWriter::new();
2666 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2667
2668 tx.send(Event::Tick).unwrap();
2670 tx.send(Event::Shutdown).unwrap();
2671 driver.run();
2672 }
2674
2675 #[test]
2676 fn shutdown_event() {
2677 let (tx, rx) = event::channel();
2678 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2679 let mut driver = Driver::new(
2680 TransportConfig { transport_enabled: false, identity_hash: None },
2681 rx,
2682 tx.clone(),
2683 Box::new(cbs),
2684 );
2685
2686 tx.send(Event::Shutdown).unwrap();
2687 driver.run(); }
2689
2690 #[test]
2691 fn announce_callback() {
2692 let (tx, rx) = event::channel();
2693 let (cbs, announces, paths, _, _, _) = MockCallbacks::new();
2694 let mut driver = Driver::new(
2695 TransportConfig { transport_enabled: false, identity_hash: None },
2696 rx,
2697 tx.clone(),
2698 Box::new(cbs),
2699 );
2700 let info = make_interface_info(1);
2701 driver.engine.register_interface(info.clone());
2702 let (writer, _sent) = MockWriter::new();
2703 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2704
2705 let identity = Identity::new(&mut OsRng);
2706 let announce_raw = build_announce_packet(&identity);
2707
2708 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2709 tx.send(Event::Shutdown).unwrap();
2710 driver.run();
2711
2712 let ann = announces.lock().unwrap();
2713 assert_eq!(ann.len(), 1);
2714 assert_eq!(ann[0].1, 1);
2716
2717 let p = paths.lock().unwrap();
2718 assert_eq!(p.len(), 1);
2719 }
2720
2721 #[test]
2722 fn dispatch_skips_offline_interface() {
2723 let (tx, rx) = event::channel();
2724 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2725 let mut driver = Driver::new(
2726 TransportConfig { transport_enabled: false, identity_hash: None },
2727 rx,
2728 tx.clone(),
2729 Box::new(cbs),
2730 );
2731
2732 let (w1, sent1) = MockWriter::new();
2733 let (w2, sent2) = MockWriter::new();
2734 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), false)); driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2736
2737 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2739 interface: InterfaceId(1),
2740 raw: vec![0x01],
2741 }]);
2742 assert_eq!(sent1.lock().unwrap().len(), 0);
2743
2744 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2746 raw: vec![0x02],
2747 exclude: None,
2748 }]);
2749 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
2751
2752 drop(tx);
2753 }
2754
2755 #[test]
2756 fn interface_up_refreshes_writer() {
2757 let (tx, rx) = event::channel();
2758 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2759 let mut driver = Driver::new(
2760 TransportConfig { transport_enabled: false, identity_hash: None },
2761 rx,
2762 tx.clone(),
2763 Box::new(cbs),
2764 );
2765
2766 let (w_old, sent_old) = MockWriter::new();
2767 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w_old), false));
2768
2769 let (w_new, sent_new) = MockWriter::new();
2771 tx.send(Event::InterfaceUp(InterfaceId(1), Some(Box::new(w_new)), None)).unwrap();
2772 tx.send(Event::Shutdown).unwrap();
2773 driver.run();
2774
2775 assert!(driver.interfaces[&InterfaceId(1)].online);
2777
2778 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2780 interface: InterfaceId(1),
2781 raw: vec![0xFF],
2782 }]);
2783
2784 assert_eq!(sent_old.lock().unwrap().len(), 0);
2786 assert_eq!(sent_new.lock().unwrap().len(), 1);
2788 assert_eq!(sent_new.lock().unwrap()[0], vec![0xFF]);
2789
2790 drop(tx);
2791 }
2792
2793 #[test]
2794 fn dynamic_interface_register() {
2795 let (tx, rx) = event::channel();
2796 let (cbs, _, _, _, iface_ups, _) = MockCallbacks::new();
2797 let mut driver = Driver::new(
2798 TransportConfig { transport_enabled: false, identity_hash: None },
2799 rx,
2800 tx.clone(),
2801 Box::new(cbs),
2802 );
2803
2804 let info = make_interface_info(100);
2805 let (writer, sent) = MockWriter::new();
2806
2807 tx.send(Event::InterfaceUp(
2809 InterfaceId(100),
2810 Some(Box::new(writer)),
2811 Some(info),
2812 ))
2813 .unwrap();
2814 tx.send(Event::Shutdown).unwrap();
2815 driver.run();
2816
2817 assert!(driver.interfaces.contains_key(&InterfaceId(100)));
2819 assert!(driver.interfaces[&InterfaceId(100)].online);
2820 assert!(driver.interfaces[&InterfaceId(100)].dynamic);
2821
2822 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2824 assert_eq!(iface_ups.lock().unwrap()[0], InterfaceId(100));
2825
2826 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2828 interface: InterfaceId(100),
2829 raw: vec![0x42],
2830 }]);
2831 assert_eq!(sent.lock().unwrap().len(), 1);
2832
2833 drop(tx);
2834 }
2835
2836 #[test]
2837 fn dynamic_interface_deregister() {
2838 let (tx, rx) = event::channel();
2839 let (cbs, _, _, _, _, iface_downs) = MockCallbacks::new();
2840 let mut driver = Driver::new(
2841 TransportConfig { transport_enabled: false, identity_hash: None },
2842 rx,
2843 tx.clone(),
2844 Box::new(cbs),
2845 );
2846
2847 let info = make_interface_info(200);
2849 driver.engine.register_interface(info.clone());
2850 let (writer, _sent) = MockWriter::new();
2851 driver.interfaces.insert(InterfaceId(200), InterfaceEntry {
2852 id: InterfaceId(200),
2853 info,
2854 writer: Box::new(writer),
2855 online: true,
2856 dynamic: true,
2857 ifac: None,
2858 stats: InterfaceStats::default(),
2859 interface_type: String::new(),
2860 });
2861
2862 tx.send(Event::InterfaceDown(InterfaceId(200))).unwrap();
2864 tx.send(Event::Shutdown).unwrap();
2865 driver.run();
2866
2867 assert!(!driver.interfaces.contains_key(&InterfaceId(200)));
2868 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2869 assert_eq!(iface_downs.lock().unwrap()[0], InterfaceId(200));
2870 }
2871
2872 #[test]
2873 fn interface_callbacks_fire() {
2874 let (tx, rx) = event::channel();
2875 let (cbs, _, _, _, iface_ups, iface_downs) = MockCallbacks::new();
2876 let mut driver = Driver::new(
2877 TransportConfig { transport_enabled: false, identity_hash: None },
2878 rx,
2879 tx.clone(),
2880 Box::new(cbs),
2881 );
2882
2883 let (writer, _) = MockWriter::new();
2885 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), false));
2886
2887 tx.send(Event::InterfaceUp(InterfaceId(1), None, None)).unwrap();
2888 tx.send(Event::InterfaceDown(InterfaceId(1))).unwrap();
2889 tx.send(Event::Shutdown).unwrap();
2890 driver.run();
2891
2892 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2893 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2894 assert!(driver.interfaces.contains_key(&InterfaceId(1)));
2896 assert!(!driver.interfaces[&InterfaceId(1)].online);
2897 }
2898
2899 #[test]
2904 fn frame_updates_rx_stats() {
2905 let (tx, rx) = event::channel();
2906 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2907 let mut driver = Driver::new(
2908 TransportConfig { transport_enabled: false, identity_hash: None },
2909 rx,
2910 tx.clone(),
2911 Box::new(cbs),
2912 );
2913 let info = make_interface_info(1);
2914 driver.engine.register_interface(info.clone());
2915 let (writer, _sent) = MockWriter::new();
2916 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2917
2918 let identity = Identity::new(&mut OsRng);
2919 let announce_raw = build_announce_packet(&identity);
2920 let announce_len = announce_raw.len() as u64;
2921
2922 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2923 tx.send(Event::Shutdown).unwrap();
2924 driver.run();
2925
2926 let stats = &driver.interfaces[&InterfaceId(1)].stats;
2927 assert_eq!(stats.rxb, announce_len);
2928 assert_eq!(stats.rx_packets, 1);
2929 }
2930
2931 #[test]
2932 fn send_updates_tx_stats() {
2933 let (tx, rx) = event::channel();
2934 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2935 let mut driver = Driver::new(
2936 TransportConfig { transport_enabled: false, identity_hash: None },
2937 rx,
2938 tx.clone(),
2939 Box::new(cbs),
2940 );
2941 let (writer, _sent) = MockWriter::new();
2942 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2943
2944 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2945 interface: InterfaceId(1),
2946 raw: vec![0x01, 0x02, 0x03],
2947 }]);
2948
2949 let stats = &driver.interfaces[&InterfaceId(1)].stats;
2950 assert_eq!(stats.txb, 3);
2951 assert_eq!(stats.tx_packets, 1);
2952
2953 drop(tx);
2954 }
2955
2956 #[test]
2957 fn broadcast_updates_tx_stats() {
2958 let (tx, rx) = event::channel();
2959 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2960 let mut driver = Driver::new(
2961 TransportConfig { transport_enabled: false, identity_hash: None },
2962 rx,
2963 tx.clone(),
2964 Box::new(cbs),
2965 );
2966 let (w1, _s1) = MockWriter::new();
2967 let (w2, _s2) = MockWriter::new();
2968 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2969 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2970
2971 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2972 raw: vec![0xAA, 0xBB],
2973 exclude: None,
2974 }]);
2975
2976 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.txb, 2);
2978 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.tx_packets, 1);
2979 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.txb, 2);
2980 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.tx_packets, 1);
2981
2982 drop(tx);
2983 }
2984
2985 #[test]
2986 fn query_interface_stats() {
2987 let (tx, rx) = event::channel();
2988 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2989 let mut driver = Driver::new(
2990 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
2991 rx,
2992 tx.clone(),
2993 Box::new(cbs),
2994 );
2995 let (writer, _sent) = MockWriter::new();
2996 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2997
2998 let (resp_tx, resp_rx) = mpsc::channel();
2999 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
3000 tx.send(Event::Shutdown).unwrap();
3001 driver.run();
3002
3003 let resp = resp_rx.recv().unwrap();
3004 match resp {
3005 QueryResponse::InterfaceStats(stats) => {
3006 assert_eq!(stats.interfaces.len(), 1);
3007 assert_eq!(stats.interfaces[0].name, "test-1");
3008 assert!(stats.interfaces[0].status);
3009 assert_eq!(stats.transport_id, Some([0x42; 16]));
3010 assert!(stats.transport_enabled);
3011 }
3012 _ => panic!("unexpected response"),
3013 }
3014 }
3015
3016 #[test]
3017 fn query_path_table() {
3018 let (tx, rx) = event::channel();
3019 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3020 let mut driver = Driver::new(
3021 TransportConfig { transport_enabled: false, identity_hash: None },
3022 rx,
3023 tx.clone(),
3024 Box::new(cbs),
3025 );
3026 let info = make_interface_info(1);
3027 driver.engine.register_interface(info);
3028 let (writer, _sent) = MockWriter::new();
3029 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3030
3031 let identity = Identity::new(&mut OsRng);
3033 let announce_raw = build_announce_packet(&identity);
3034 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3035
3036 let (resp_tx, resp_rx) = mpsc::channel();
3037 tx.send(Event::Query(QueryRequest::PathTable { max_hops: None }, resp_tx)).unwrap();
3038 tx.send(Event::Shutdown).unwrap();
3039 driver.run();
3040
3041 let resp = resp_rx.recv().unwrap();
3042 match resp {
3043 QueryResponse::PathTable(entries) => {
3044 assert_eq!(entries.len(), 1);
3045 assert_eq!(entries[0].hops, 1);
3046 }
3047 _ => panic!("unexpected response"),
3048 }
3049 }
3050
3051 #[test]
3052 fn query_drop_path() {
3053 let (tx, rx) = event::channel();
3054 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3055 let mut driver = Driver::new(
3056 TransportConfig { transport_enabled: false, identity_hash: None },
3057 rx,
3058 tx.clone(),
3059 Box::new(cbs),
3060 );
3061 let info = make_interface_info(1);
3062 driver.engine.register_interface(info);
3063 let (writer, _sent) = MockWriter::new();
3064 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3065
3066 let identity = Identity::new(&mut OsRng);
3068 let announce_raw = build_announce_packet(&identity);
3069 let dest_hash = rns_core::destination::destination_hash(
3070 "test", &["app"], Some(identity.hash()),
3071 );
3072
3073 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3074
3075 let (resp_tx, resp_rx) = mpsc::channel();
3076 tx.send(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)).unwrap();
3077 tx.send(Event::Shutdown).unwrap();
3078 driver.run();
3079
3080 let resp = resp_rx.recv().unwrap();
3081 match resp {
3082 QueryResponse::DropPath(dropped) => {
3083 assert!(dropped);
3084 }
3085 _ => panic!("unexpected response"),
3086 }
3087 }
3088
3089 #[test]
3090 fn send_outbound_event() {
3091 let (tx, rx) = event::channel();
3092 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3093 let mut driver = Driver::new(
3094 TransportConfig { transport_enabled: false, identity_hash: None },
3095 rx,
3096 tx.clone(),
3097 Box::new(cbs),
3098 );
3099 let (writer, sent) = MockWriter::new();
3100 let info = make_interface_info(1);
3101 driver.engine.register_interface(info);
3102 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3103
3104 let dest = [0xAA; 16];
3106 let flags = PacketFlags {
3107 header_type: constants::HEADER_1,
3108 context_flag: constants::FLAG_UNSET,
3109 transport_type: constants::TRANSPORT_BROADCAST,
3110 destination_type: constants::DESTINATION_PLAIN,
3111 packet_type: constants::PACKET_TYPE_DATA,
3112 };
3113 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3114
3115 tx.send(Event::SendOutbound {
3116 raw: packet.raw,
3117 dest_type: constants::DESTINATION_PLAIN,
3118 attached_interface: None,
3119 }).unwrap();
3120 tx.send(Event::Shutdown).unwrap();
3121 driver.run();
3122
3123 assert_eq!(sent.lock().unwrap().len(), 1);
3125 }
3126
3127 #[test]
3128 fn register_destination_and_deliver() {
3129 let (tx, rx) = event::channel();
3130 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3131 let mut driver = Driver::new(
3132 TransportConfig { transport_enabled: false, identity_hash: None },
3133 rx,
3134 tx.clone(),
3135 Box::new(cbs),
3136 );
3137 let info = make_interface_info(1);
3138 driver.engine.register_interface(info);
3139 let (writer, _sent) = MockWriter::new();
3140 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3141
3142 let dest = [0xBB; 16];
3143
3144 tx.send(Event::RegisterDestination {
3146 dest_hash: dest,
3147 dest_type: constants::DESTINATION_SINGLE,
3148 }).unwrap();
3149
3150 let flags = PacketFlags {
3151 header_type: constants::HEADER_1,
3152 context_flag: constants::FLAG_UNSET,
3153 transport_type: constants::TRANSPORT_BROADCAST,
3154 destination_type: constants::DESTINATION_SINGLE,
3155 packet_type: constants::PACKET_TYPE_DATA,
3156 };
3157 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"data").unwrap();
3158 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3159 tx.send(Event::Shutdown).unwrap();
3160 driver.run();
3161
3162 assert_eq!(deliveries.lock().unwrap().len(), 1);
3163 assert_eq!(deliveries.lock().unwrap()[0], DestHash(dest));
3164 }
3165
3166 #[test]
3167 fn query_transport_identity() {
3168 let (tx, rx) = event::channel();
3169 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3170 let mut driver = Driver::new(
3171 TransportConfig { transport_enabled: true, identity_hash: Some([0xAA; 16]) },
3172 rx,
3173 tx.clone(),
3174 Box::new(cbs),
3175 );
3176
3177 let (resp_tx, resp_rx) = mpsc::channel();
3178 tx.send(Event::Query(QueryRequest::TransportIdentity, resp_tx)).unwrap();
3179 tx.send(Event::Shutdown).unwrap();
3180 driver.run();
3181
3182 match resp_rx.recv().unwrap() {
3183 QueryResponse::TransportIdentity(Some(hash)) => {
3184 assert_eq!(hash, [0xAA; 16]);
3185 }
3186 _ => panic!("unexpected response"),
3187 }
3188 }
3189
3190 #[test]
3191 fn query_link_count() {
3192 let (tx, rx) = event::channel();
3193 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3194 let mut driver = Driver::new(
3195 TransportConfig { transport_enabled: false, identity_hash: None },
3196 rx,
3197 tx.clone(),
3198 Box::new(cbs),
3199 );
3200
3201 let (resp_tx, resp_rx) = mpsc::channel();
3202 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3203 tx.send(Event::Shutdown).unwrap();
3204 driver.run();
3205
3206 match resp_rx.recv().unwrap() {
3207 QueryResponse::LinkCount(count) => assert_eq!(count, 0),
3208 _ => panic!("unexpected response"),
3209 }
3210 }
3211
3212 #[test]
3213 fn query_rate_table() {
3214 let (tx, rx) = event::channel();
3215 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3216 let mut driver = Driver::new(
3217 TransportConfig { transport_enabled: false, identity_hash: None },
3218 rx,
3219 tx.clone(),
3220 Box::new(cbs),
3221 );
3222
3223 let (resp_tx, resp_rx) = mpsc::channel();
3224 tx.send(Event::Query(QueryRequest::RateTable, resp_tx)).unwrap();
3225 tx.send(Event::Shutdown).unwrap();
3226 driver.run();
3227
3228 match resp_rx.recv().unwrap() {
3229 QueryResponse::RateTable(entries) => assert!(entries.is_empty()),
3230 _ => panic!("unexpected response"),
3231 }
3232 }
3233
3234 #[test]
3235 fn query_next_hop() {
3236 let (tx, rx) = event::channel();
3237 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3238 let mut driver = Driver::new(
3239 TransportConfig { transport_enabled: false, identity_hash: None },
3240 rx,
3241 tx.clone(),
3242 Box::new(cbs),
3243 );
3244
3245 let dest = [0xBB; 16];
3246 let (resp_tx, resp_rx) = mpsc::channel();
3247 tx.send(Event::Query(QueryRequest::NextHop { dest_hash: dest }, resp_tx)).unwrap();
3248 tx.send(Event::Shutdown).unwrap();
3249 driver.run();
3250
3251 match resp_rx.recv().unwrap() {
3252 QueryResponse::NextHop(None) => {}
3253 _ => panic!("unexpected response"),
3254 }
3255 }
3256
3257 #[test]
3258 fn query_next_hop_if_name() {
3259 let (tx, rx) = event::channel();
3260 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3261 let mut driver = Driver::new(
3262 TransportConfig { transport_enabled: false, identity_hash: None },
3263 rx,
3264 tx.clone(),
3265 Box::new(cbs),
3266 );
3267
3268 let dest = [0xCC; 16];
3269 let (resp_tx, resp_rx) = mpsc::channel();
3270 tx.send(Event::Query(QueryRequest::NextHopIfName { dest_hash: dest }, resp_tx)).unwrap();
3271 tx.send(Event::Shutdown).unwrap();
3272 driver.run();
3273
3274 match resp_rx.recv().unwrap() {
3275 QueryResponse::NextHopIfName(None) => {}
3276 _ => panic!("unexpected response"),
3277 }
3278 }
3279
3280 #[test]
3281 fn query_drop_all_via() {
3282 let (tx, rx) = event::channel();
3283 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3284 let mut driver = Driver::new(
3285 TransportConfig { transport_enabled: false, identity_hash: None },
3286 rx,
3287 tx.clone(),
3288 Box::new(cbs),
3289 );
3290
3291 let transport = [0xDD; 16];
3292 let (resp_tx, resp_rx) = mpsc::channel();
3293 tx.send(Event::Query(
3294 QueryRequest::DropAllVia { transport_hash: transport },
3295 resp_tx,
3296 )).unwrap();
3297 tx.send(Event::Shutdown).unwrap();
3298 driver.run();
3299
3300 match resp_rx.recv().unwrap() {
3301 QueryResponse::DropAllVia(count) => assert_eq!(count, 0),
3302 _ => panic!("unexpected response"),
3303 }
3304 }
3305
3306 #[test]
3307 fn query_drop_announce_queues() {
3308 let (tx, rx) = event::channel();
3309 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3310 let mut driver = Driver::new(
3311 TransportConfig { transport_enabled: false, identity_hash: None },
3312 rx,
3313 tx.clone(),
3314 Box::new(cbs),
3315 );
3316
3317 let (resp_tx, resp_rx) = mpsc::channel();
3318 tx.send(Event::Query(QueryRequest::DropAnnounceQueues, resp_tx)).unwrap();
3319 tx.send(Event::Shutdown).unwrap();
3320 driver.run();
3321
3322 match resp_rx.recv().unwrap() {
3323 QueryResponse::DropAnnounceQueues => {}
3324 _ => panic!("unexpected response"),
3325 }
3326 }
3327
3328 #[test]
3333 fn register_link_dest_event() {
3334 let (tx, rx) = event::channel();
3335 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3336 let mut driver = Driver::new(
3337 TransportConfig { transport_enabled: false, identity_hash: None },
3338 rx,
3339 tx.clone(),
3340 Box::new(cbs),
3341 );
3342 let info = make_interface_info(1);
3343 driver.engine.register_interface(info);
3344 let (writer, _sent) = MockWriter::new();
3345 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3346
3347 let mut rng = OsRng;
3348 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3349 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3350 let sig_prv_bytes = sig_prv.private_bytes();
3351 let dest_hash = [0xDD; 16];
3352
3353 tx.send(Event::RegisterLinkDestination {
3354 dest_hash,
3355 sig_prv_bytes,
3356 sig_pub_bytes,
3357 }).unwrap();
3358 tx.send(Event::Shutdown).unwrap();
3359 driver.run();
3360
3361 assert!(driver.link_manager.is_link_destination(&dest_hash));
3363 }
3364
3365 #[test]
3366 fn create_link_event() {
3367 let (tx, rx) = event::channel();
3368 let (cbs, _link_established, _, _) = MockCallbacks::with_link_tracking();
3369 let mut driver = Driver::new(
3370 TransportConfig { transport_enabled: false, identity_hash: None },
3371 rx,
3372 tx.clone(),
3373 Box::new(cbs),
3374 );
3375 let info = make_interface_info(1);
3376 driver.engine.register_interface(info);
3377 let (writer, _sent) = MockWriter::new();
3378 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3379
3380 let dest_hash = [0xDD; 16];
3381 let dummy_sig_pub = [0xAA; 32];
3382
3383 let (resp_tx, resp_rx) = mpsc::channel();
3384 tx.send(Event::CreateLink {
3385 dest_hash,
3386 dest_sig_pub_bytes: dummy_sig_pub,
3387 response_tx: resp_tx,
3388 }).unwrap();
3389 tx.send(Event::Shutdown).unwrap();
3390 driver.run();
3391
3392 let link_id = resp_rx.recv().unwrap();
3394 assert_ne!(link_id, [0u8; 16]);
3395
3396 assert_eq!(driver.link_manager.link_count(), 1);
3398
3399 }
3404
3405 #[test]
3406 fn deliver_local_routes_to_link_manager() {
3407 let (tx, rx) = event::channel();
3410 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3411 let mut driver = Driver::new(
3412 TransportConfig { transport_enabled: false, identity_hash: None },
3413 rx,
3414 tx.clone(),
3415 Box::new(cbs),
3416 );
3417 let info = make_interface_info(1);
3418 driver.engine.register_interface(info);
3419 let (writer, _sent) = MockWriter::new();
3420 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3421
3422 let mut rng = OsRng;
3424 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
3425 let sig_pub_bytes = sig_prv.public_key().public_bytes();
3426 let dest_hash = [0xEE; 16];
3427 driver.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes);
3428
3429 assert!(driver.link_manager.is_link_destination(&dest_hash));
3433
3434 assert!(!driver.link_manager.is_link_destination(&[0xFF; 16]));
3436
3437 drop(tx);
3438 }
3439
3440 #[test]
3441 fn teardown_link_event() {
3442 let (tx, rx) = event::channel();
3443 let (cbs, _, link_closed, _) = MockCallbacks::with_link_tracking();
3444 let mut driver = Driver::new(
3445 TransportConfig { transport_enabled: false, identity_hash: None },
3446 rx,
3447 tx.clone(),
3448 Box::new(cbs),
3449 );
3450 let info = make_interface_info(1);
3451 driver.engine.register_interface(info);
3452 let (writer, _sent) = MockWriter::new();
3453 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3454
3455 let (resp_tx, resp_rx) = mpsc::channel();
3457 tx.send(Event::CreateLink {
3458 dest_hash: [0xDD; 16],
3459 dest_sig_pub_bytes: [0xAA; 32],
3460 response_tx: resp_tx,
3461 }).unwrap();
3462 tx.send(Event::Shutdown).unwrap();
3467 driver.run();
3468
3469 let link_id = resp_rx.recv().unwrap();
3470 assert_ne!(link_id, [0u8; 16]);
3471 assert_eq!(driver.link_manager.link_count(), 1);
3472
3473 let teardown_actions = driver.link_manager.teardown_link(&link_id);
3475 driver.dispatch_link_actions(teardown_actions);
3476
3477 assert_eq!(link_closed.lock().unwrap().len(), 1);
3479 assert_eq!(link_closed.lock().unwrap()[0], TypedLinkId(link_id));
3480 }
3481
3482 #[test]
3483 fn link_count_includes_link_manager() {
3484 let (tx, rx) = event::channel();
3485 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3486 let mut driver = Driver::new(
3487 TransportConfig { transport_enabled: false, identity_hash: None },
3488 rx,
3489 tx.clone(),
3490 Box::new(cbs),
3491 );
3492 let info = make_interface_info(1);
3493 driver.engine.register_interface(info);
3494 let (writer, _sent) = MockWriter::new();
3495 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3496
3497 let mut rng = OsRng;
3499 let dummy_sig = [0xAA; 32];
3500 driver.link_manager.create_link(&[0xDD; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
3501
3502 let (resp_tx, resp_rx) = mpsc::channel();
3504 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
3505 tx.send(Event::Shutdown).unwrap();
3506 driver.run();
3507
3508 match resp_rx.recv().unwrap() {
3509 QueryResponse::LinkCount(count) => assert_eq!(count, 1),
3510 _ => panic!("unexpected response"),
3511 }
3512 }
3513
3514 #[test]
3515 fn register_request_handler_event() {
3516 let (tx, rx) = event::channel();
3517 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3518 let mut driver = Driver::new(
3519 TransportConfig { transport_enabled: false, identity_hash: None },
3520 rx,
3521 tx.clone(),
3522 Box::new(cbs),
3523 );
3524
3525 tx.send(Event::RegisterRequestHandler {
3526 path: "/status".to_string(),
3527 allowed_list: None,
3528 handler: Box::new(|_link_id, _path, _data, _remote| Some(b"OK".to_vec())),
3529 }).unwrap();
3530 tx.send(Event::Shutdown).unwrap();
3531 driver.run();
3532
3533 }
3536
3537 #[test]
3540 fn management_announces_emitted_after_delay() {
3541 let (tx, rx) = event::channel();
3542 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
3543 let identity = Identity::new(&mut OsRng);
3544 let identity_hash = *identity.hash();
3545 let mut driver = Driver::new(
3546 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3547 rx,
3548 tx.clone(),
3549 Box::new(cbs),
3550 );
3551
3552 let info = make_interface_info(1);
3554 driver.engine.register_interface(info.clone());
3555 let (writer, sent) = MockWriter::new();
3556 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3557
3558 driver.management_config.enable_remote_management = true;
3560 driver.transport_identity = Some(identity);
3561
3562 driver.started = time::now() - 10.0;
3564
3565 tx.send(Event::Tick).unwrap();
3567 tx.send(Event::Shutdown).unwrap();
3568 driver.run();
3569
3570 let sent_packets = sent.lock().unwrap();
3572 assert!(!sent_packets.is_empty(),
3573 "Management announce should be sent after startup delay");
3574 }
3575
3576 #[test]
3577 fn management_announces_not_emitted_when_disabled() {
3578 let (tx, rx) = event::channel();
3579 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3580 let identity = Identity::new(&mut OsRng);
3581 let identity_hash = *identity.hash();
3582 let mut driver = Driver::new(
3583 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3584 rx,
3585 tx.clone(),
3586 Box::new(cbs),
3587 );
3588
3589 let info = make_interface_info(1);
3590 driver.engine.register_interface(info.clone());
3591 let (writer, sent) = MockWriter::new();
3592 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3593
3594 driver.transport_identity = Some(identity);
3596 driver.started = time::now() - 10.0;
3597
3598 tx.send(Event::Tick).unwrap();
3599 tx.send(Event::Shutdown).unwrap();
3600 driver.run();
3601
3602 let sent_packets = sent.lock().unwrap();
3604 assert!(sent_packets.is_empty(),
3605 "No announces should be sent when management is disabled");
3606 }
3607
3608 #[test]
3609 fn management_announces_not_emitted_before_delay() {
3610 let (tx, rx) = event::channel();
3611 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3612 let identity = Identity::new(&mut OsRng);
3613 let identity_hash = *identity.hash();
3614 let mut driver = Driver::new(
3615 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
3616 rx,
3617 tx.clone(),
3618 Box::new(cbs),
3619 );
3620
3621 let info = make_interface_info(1);
3622 driver.engine.register_interface(info.clone());
3623 let (writer, sent) = MockWriter::new();
3624 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3625
3626 driver.management_config.enable_remote_management = true;
3627 driver.transport_identity = Some(identity);
3628 driver.started = time::now();
3630
3631 tx.send(Event::Tick).unwrap();
3632 tx.send(Event::Shutdown).unwrap();
3633 driver.run();
3634
3635 let sent_packets = sent.lock().unwrap();
3636 assert!(sent_packets.is_empty(),
3637 "No announces before startup delay");
3638 }
3639
3640 #[test]
3645 fn announce_received_populates_known_destinations() {
3646 let (tx, rx) = event::channel();
3647 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3648 let mut driver = Driver::new(
3649 TransportConfig { transport_enabled: false, identity_hash: None },
3650 rx,
3651 tx.clone(),
3652 Box::new(cbs),
3653 );
3654 let info = make_interface_info(1);
3655 driver.engine.register_interface(info);
3656 let (writer, _sent) = MockWriter::new();
3657 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3658
3659 let identity = Identity::new(&mut OsRng);
3660 let announce_raw = build_announce_packet(&identity);
3661
3662 let dest_hash = rns_core::destination::destination_hash(
3663 "test", &["app"], Some(identity.hash()),
3664 );
3665
3666 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3667 tx.send(Event::Shutdown).unwrap();
3668 driver.run();
3669
3670 assert!(driver.known_destinations.contains_key(&dest_hash));
3672 let recalled = &driver.known_destinations[&dest_hash];
3673 assert_eq!(recalled.dest_hash.0, dest_hash);
3674 assert_eq!(recalled.identity_hash.0, *identity.hash());
3675 assert_eq!(&recalled.public_key, &identity.get_public_key().unwrap());
3676 assert_eq!(recalled.hops, 1);
3677 }
3678
3679 #[test]
3680 fn query_has_path() {
3681 let (tx, rx) = event::channel();
3682 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3683 let mut driver = Driver::new(
3684 TransportConfig { transport_enabled: false, identity_hash: None },
3685 rx,
3686 tx.clone(),
3687 Box::new(cbs),
3688 );
3689 let info = make_interface_info(1);
3690 driver.engine.register_interface(info);
3691 let (writer, _sent) = MockWriter::new();
3692 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3693
3694 let (resp_tx, resp_rx) = mpsc::channel();
3696 tx.send(Event::Query(QueryRequest::HasPath { dest_hash: [0xAA; 16] }, resp_tx)).unwrap();
3697
3698 let identity = Identity::new(&mut OsRng);
3700 let announce_raw = build_announce_packet(&identity);
3701 let dest_hash = rns_core::destination::destination_hash(
3702 "test", &["app"], Some(identity.hash()),
3703 );
3704 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3705
3706 let (resp_tx2, resp_rx2) = mpsc::channel();
3707 tx.send(Event::Query(QueryRequest::HasPath { dest_hash }, resp_tx2)).unwrap();
3708
3709 tx.send(Event::Shutdown).unwrap();
3710 driver.run();
3711
3712 match resp_rx.recv().unwrap() {
3714 QueryResponse::HasPath(false) => {}
3715 other => panic!("expected HasPath(false), got {:?}", other),
3716 }
3717
3718 match resp_rx2.recv().unwrap() {
3720 QueryResponse::HasPath(true) => {}
3721 other => panic!("expected HasPath(true), got {:?}", other),
3722 }
3723 }
3724
3725 #[test]
3726 fn query_hops_to() {
3727 let (tx, rx) = event::channel();
3728 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3729 let mut driver = Driver::new(
3730 TransportConfig { transport_enabled: false, identity_hash: None },
3731 rx,
3732 tx.clone(),
3733 Box::new(cbs),
3734 );
3735 let info = make_interface_info(1);
3736 driver.engine.register_interface(info);
3737 let (writer, _sent) = MockWriter::new();
3738 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3739
3740 let identity = Identity::new(&mut OsRng);
3742 let announce_raw = build_announce_packet(&identity);
3743 let dest_hash = rns_core::destination::destination_hash(
3744 "test", &["app"], Some(identity.hash()),
3745 );
3746
3747 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3748
3749 let (resp_tx, resp_rx) = mpsc::channel();
3750 tx.send(Event::Query(QueryRequest::HopsTo { dest_hash }, resp_tx)).unwrap();
3751 tx.send(Event::Shutdown).unwrap();
3752 driver.run();
3753
3754 match resp_rx.recv().unwrap() {
3755 QueryResponse::HopsTo(Some(1)) => {}
3756 other => panic!("expected HopsTo(Some(1)), got {:?}", other),
3757 }
3758 }
3759
3760 #[test]
3761 fn query_recall_identity() {
3762 let (tx, rx) = event::channel();
3763 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3764 let mut driver = Driver::new(
3765 TransportConfig { transport_enabled: false, identity_hash: None },
3766 rx,
3767 tx.clone(),
3768 Box::new(cbs),
3769 );
3770 let info = make_interface_info(1);
3771 driver.engine.register_interface(info);
3772 let (writer, _sent) = MockWriter::new();
3773 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3774
3775 let identity = Identity::new(&mut OsRng);
3776 let announce_raw = build_announce_packet(&identity);
3777 let dest_hash = rns_core::destination::destination_hash(
3778 "test", &["app"], Some(identity.hash()),
3779 );
3780
3781 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
3782
3783 let (resp_tx, resp_rx) = mpsc::channel();
3785 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash }, resp_tx)).unwrap();
3786
3787 let (resp_tx2, resp_rx2) = mpsc::channel();
3789 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash: [0xFF; 16] }, resp_tx2)).unwrap();
3790
3791 tx.send(Event::Shutdown).unwrap();
3792 driver.run();
3793
3794 match resp_rx.recv().unwrap() {
3795 QueryResponse::RecallIdentity(Some(recalled)) => {
3796 assert_eq!(recalled.dest_hash.0, dest_hash);
3797 assert_eq!(recalled.identity_hash.0, *identity.hash());
3798 assert_eq!(recalled.public_key, identity.get_public_key().unwrap());
3799 assert_eq!(recalled.hops, 1);
3800 }
3801 other => panic!("expected RecallIdentity(Some(..)), got {:?}", other),
3802 }
3803
3804 match resp_rx2.recv().unwrap() {
3805 QueryResponse::RecallIdentity(None) => {}
3806 other => panic!("expected RecallIdentity(None), got {:?}", other),
3807 }
3808 }
3809
3810 #[test]
3811 fn request_path_sends_packet() {
3812 let (tx, rx) = event::channel();
3813 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3814 let mut driver = Driver::new(
3815 TransportConfig { transport_enabled: false, identity_hash: None },
3816 rx,
3817 tx.clone(),
3818 Box::new(cbs),
3819 );
3820 let info = make_interface_info(1);
3821 driver.engine.register_interface(info);
3822 let (writer, sent) = MockWriter::new();
3823 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3824
3825 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
3827 tx.send(Event::Shutdown).unwrap();
3828 driver.run();
3829
3830 let sent_packets = sent.lock().unwrap();
3832 assert!(!sent_packets.is_empty(), "Path request should be sent on wire");
3833
3834 let raw = &sent_packets[0];
3836 let flags = rns_core::packet::PacketFlags::unpack(raw[0] & 0x7F);
3837 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
3838 assert_eq!(flags.destination_type, constants::DESTINATION_PLAIN);
3839 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
3840 }
3841
3842 #[test]
3843 fn request_path_includes_transport_id() {
3844 let (tx, rx) = event::channel();
3845 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3846 let mut driver = Driver::new(
3847 TransportConfig { transport_enabled: true, identity_hash: Some([0xBB; 16]) },
3848 rx,
3849 tx.clone(),
3850 Box::new(cbs),
3851 );
3852 let info = make_interface_info(1);
3853 driver.engine.register_interface(info);
3854 let (writer, sent) = MockWriter::new();
3855 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3856
3857 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
3858 tx.send(Event::Shutdown).unwrap();
3859 driver.run();
3860
3861 let sent_packets = sent.lock().unwrap();
3862 assert!(!sent_packets.is_empty());
3863
3864 let raw = &sent_packets[0];
3866 if let Ok(packet) = RawPacket::unpack(raw) {
3867 assert_eq!(packet.data.len(), 48, "Path request data should be 48 bytes with transport_id");
3869 assert_eq!(&packet.data[..16], &[0xAA; 16], "First 16 bytes should be dest_hash");
3870 assert_eq!(&packet.data[16..32], &[0xBB; 16], "Next 16 bytes should be transport_id");
3871 } else {
3872 panic!("Could not unpack sent packet");
3873 }
3874 }
3875
3876 #[test]
3877 fn path_request_dest_registered() {
3878 let (tx, rx) = event::channel();
3879 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3880 let driver = Driver::new(
3881 TransportConfig { transport_enabled: false, identity_hash: None },
3882 rx,
3883 tx.clone(),
3884 Box::new(cbs),
3885 );
3886
3887 let expected_dest = rns_core::destination::destination_hash(
3889 "rnstransport", &["path", "request"], None,
3890 );
3891 assert_eq!(driver.path_request_dest, expected_dest);
3892
3893 drop(tx);
3894 }
3895
3896 #[test]
3901 fn register_proof_strategy_event() {
3902 let (tx, rx) = event::channel();
3903 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3904 let mut driver = Driver::new(
3905 TransportConfig { transport_enabled: false, identity_hash: None },
3906 rx,
3907 tx.clone(),
3908 Box::new(cbs),
3909 );
3910
3911 let dest = [0xAA; 16];
3912 let identity = Identity::new(&mut OsRng);
3913 let prv_key = identity.get_private_key().unwrap();
3914
3915 tx.send(Event::RegisterProofStrategy {
3916 dest_hash: dest,
3917 strategy: rns_core::types::ProofStrategy::ProveAll,
3918 signing_key: Some(prv_key),
3919 }).unwrap();
3920 tx.send(Event::Shutdown).unwrap();
3921 driver.run();
3922
3923 assert!(driver.proof_strategies.contains_key(&dest));
3924 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
3925 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveAll);
3926 assert!(id_opt.is_some());
3927 }
3928
3929 #[test]
3930 fn register_proof_strategy_prove_none_no_identity() {
3931 let (tx, rx) = event::channel();
3932 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3933 let mut driver = Driver::new(
3934 TransportConfig { transport_enabled: false, identity_hash: None },
3935 rx,
3936 tx.clone(),
3937 Box::new(cbs),
3938 );
3939
3940 let dest = [0xBB; 16];
3941 tx.send(Event::RegisterProofStrategy {
3942 dest_hash: dest,
3943 strategy: rns_core::types::ProofStrategy::ProveNone,
3944 signing_key: None,
3945 }).unwrap();
3946 tx.send(Event::Shutdown).unwrap();
3947 driver.run();
3948
3949 assert!(driver.proof_strategies.contains_key(&dest));
3950 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
3951 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveNone);
3952 assert!(id_opt.is_none());
3953 }
3954
3955 #[test]
3956 fn send_outbound_tracks_sent_packets() {
3957 let (tx, rx) = event::channel();
3958 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3959 let mut driver = Driver::new(
3960 TransportConfig { transport_enabled: false, identity_hash: None },
3961 rx,
3962 tx.clone(),
3963 Box::new(cbs),
3964 );
3965 let info = make_interface_info(1);
3966 driver.engine.register_interface(info);
3967 let (writer, _sent) = MockWriter::new();
3968 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3969
3970 let dest = [0xCC; 16];
3972 let flags = PacketFlags {
3973 header_type: constants::HEADER_1,
3974 context_flag: constants::FLAG_UNSET,
3975 transport_type: constants::TRANSPORT_BROADCAST,
3976 destination_type: constants::DESTINATION_PLAIN,
3977 packet_type: constants::PACKET_TYPE_DATA,
3978 };
3979 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test data").unwrap();
3980 let expected_hash = packet.packet_hash;
3981
3982 tx.send(Event::SendOutbound {
3983 raw: packet.raw,
3984 dest_type: constants::DESTINATION_PLAIN,
3985 attached_interface: None,
3986 }).unwrap();
3987 tx.send(Event::Shutdown).unwrap();
3988 driver.run();
3989
3990 assert!(driver.sent_packets.contains_key(&expected_hash));
3992 let (tracked_dest, _sent_time) = &driver.sent_packets[&expected_hash];
3993 assert_eq!(tracked_dest, &dest);
3994 }
3995
3996 #[test]
3997 fn prove_all_generates_proof_on_delivery() {
3998 let (tx, rx) = event::channel();
3999 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4000 let mut driver = Driver::new(
4001 TransportConfig { transport_enabled: false, identity_hash: None },
4002 rx,
4003 tx.clone(),
4004 Box::new(cbs),
4005 );
4006 let info = make_interface_info(1);
4007 driver.engine.register_interface(info);
4008 let (writer, sent) = MockWriter::new();
4009 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4010
4011 let dest = [0xDD; 16];
4013 let identity = Identity::new(&mut OsRng);
4014 let prv_key = identity.get_private_key().unwrap();
4015 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4016 driver.proof_strategies.insert(dest, (
4017 rns_core::types::ProofStrategy::ProveAll,
4018 Some(Identity::from_private_key(&prv_key)),
4019 ));
4020
4021 let flags = PacketFlags {
4023 header_type: constants::HEADER_1,
4024 context_flag: constants::FLAG_UNSET,
4025 transport_type: constants::TRANSPORT_BROADCAST,
4026 destination_type: constants::DESTINATION_SINGLE,
4027 packet_type: constants::PACKET_TYPE_DATA,
4028 };
4029 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4030
4031 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4032 tx.send(Event::Shutdown).unwrap();
4033 driver.run();
4034
4035 assert_eq!(deliveries.lock().unwrap().len(), 1);
4037
4038 let sent_packets = sent.lock().unwrap();
4040 let has_proof = sent_packets.iter().any(|raw| {
4042 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4043 flags.packet_type == constants::PACKET_TYPE_PROOF
4044 });
4045 assert!(has_proof, "ProveAll should generate a proof packet: sent {} packets", sent_packets.len());
4046 }
4047
4048 #[test]
4049 fn prove_none_does_not_generate_proof() {
4050 let (tx, rx) = event::channel();
4051 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4052 let mut driver = Driver::new(
4053 TransportConfig { transport_enabled: false, identity_hash: None },
4054 rx,
4055 tx.clone(),
4056 Box::new(cbs),
4057 );
4058 let info = make_interface_info(1);
4059 driver.engine.register_interface(info);
4060 let (writer, sent) = MockWriter::new();
4061 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4062
4063 let dest = [0xDD; 16];
4065 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4066 driver.proof_strategies.insert(dest, (
4067 rns_core::types::ProofStrategy::ProveNone,
4068 None,
4069 ));
4070
4071 let flags = PacketFlags {
4073 header_type: constants::HEADER_1,
4074 context_flag: constants::FLAG_UNSET,
4075 transport_type: constants::TRANSPORT_BROADCAST,
4076 destination_type: constants::DESTINATION_SINGLE,
4077 packet_type: constants::PACKET_TYPE_DATA,
4078 };
4079 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4080
4081 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4082 tx.send(Event::Shutdown).unwrap();
4083 driver.run();
4084
4085 assert_eq!(deliveries.lock().unwrap().len(), 1);
4087
4088 let sent_packets = sent.lock().unwrap();
4090 let has_proof = sent_packets.iter().any(|raw| {
4091 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4092 flags.packet_type == constants::PACKET_TYPE_PROOF
4093 });
4094 assert!(!has_proof, "ProveNone should not generate a proof packet");
4095 }
4096
4097 #[test]
4098 fn no_proof_strategy_does_not_generate_proof() {
4099 let (tx, rx) = event::channel();
4100 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
4101 let mut driver = Driver::new(
4102 TransportConfig { transport_enabled: false, identity_hash: None },
4103 rx,
4104 tx.clone(),
4105 Box::new(cbs),
4106 );
4107 let info = make_interface_info(1);
4108 driver.engine.register_interface(info);
4109 let (writer, sent) = MockWriter::new();
4110 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4111
4112 let dest = [0xDD; 16];
4114 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4115
4116 let flags = PacketFlags {
4117 header_type: constants::HEADER_1,
4118 context_flag: constants::FLAG_UNSET,
4119 transport_type: constants::TRANSPORT_BROADCAST,
4120 destination_type: constants::DESTINATION_SINGLE,
4121 packet_type: constants::PACKET_TYPE_DATA,
4122 };
4123 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
4124
4125 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4126 tx.send(Event::Shutdown).unwrap();
4127 driver.run();
4128
4129 assert_eq!(deliveries.lock().unwrap().len(), 1);
4130
4131 let sent_packets = sent.lock().unwrap();
4132 let has_proof = sent_packets.iter().any(|raw| {
4133 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4134 flags.packet_type == constants::PACKET_TYPE_PROOF
4135 });
4136 assert!(!has_proof, "No proof strategy means no proof generated");
4137 }
4138
4139 #[test]
4140 fn prove_app_calls_callback() {
4141 let (tx, rx) = event::channel();
4142 let proof_requested = Arc::new(Mutex::new(Vec::new()));
4143 let deliveries = Arc::new(Mutex::new(Vec::new()));
4144 let cbs = MockCallbacks {
4145 announces: Arc::new(Mutex::new(Vec::new())),
4146 paths: Arc::new(Mutex::new(Vec::new())),
4147 deliveries: deliveries.clone(),
4148 iface_ups: Arc::new(Mutex::new(Vec::new())),
4149 iface_downs: Arc::new(Mutex::new(Vec::new())),
4150 link_established: Arc::new(Mutex::new(Vec::new())),
4151 link_closed: Arc::new(Mutex::new(Vec::new())),
4152 remote_identified: Arc::new(Mutex::new(Vec::new())),
4153 resources_received: Arc::new(Mutex::new(Vec::new())),
4154 resource_completed: Arc::new(Mutex::new(Vec::new())),
4155 resource_failed: Arc::new(Mutex::new(Vec::new())),
4156 channel_messages: Arc::new(Mutex::new(Vec::new())),
4157 link_data: Arc::new(Mutex::new(Vec::new())),
4158 responses: Arc::new(Mutex::new(Vec::new())),
4159 proofs: Arc::new(Mutex::new(Vec::new())),
4160 proof_requested: proof_requested.clone(),
4161 };
4162
4163 let mut driver = Driver::new(
4164 TransportConfig { transport_enabled: false, identity_hash: None },
4165 rx,
4166 tx.clone(),
4167 Box::new(cbs),
4168 );
4169 let info = make_interface_info(1);
4170 driver.engine.register_interface(info);
4171 let (writer, sent) = MockWriter::new();
4172 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4173
4174 let dest = [0xDD; 16];
4176 let identity = Identity::new(&mut OsRng);
4177 let prv_key = identity.get_private_key().unwrap();
4178 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4179 driver.proof_strategies.insert(dest, (
4180 rns_core::types::ProofStrategy::ProveApp,
4181 Some(Identity::from_private_key(&prv_key)),
4182 ));
4183
4184 let flags = PacketFlags {
4185 header_type: constants::HEADER_1,
4186 context_flag: constants::FLAG_UNSET,
4187 transport_type: constants::TRANSPORT_BROADCAST,
4188 destination_type: constants::DESTINATION_SINGLE,
4189 packet_type: constants::PACKET_TYPE_DATA,
4190 };
4191 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"app test").unwrap();
4192
4193 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4194 tx.send(Event::Shutdown).unwrap();
4195 driver.run();
4196
4197 let prs = proof_requested.lock().unwrap();
4199 assert_eq!(prs.len(), 1);
4200 assert_eq!(prs[0].0, DestHash(dest));
4201
4202 let sent_packets = sent.lock().unwrap();
4204 let has_proof = sent_packets.iter().any(|raw| {
4205 let flags = PacketFlags::unpack(raw[0] & 0x7F);
4206 flags.packet_type == constants::PACKET_TYPE_PROOF
4207 });
4208 assert!(has_proof, "ProveApp (callback returns true) should generate a proof");
4209 }
4210
4211 #[test]
4212 fn inbound_proof_fires_callback() {
4213 let (tx, rx) = event::channel();
4214 let proofs = Arc::new(Mutex::new(Vec::new()));
4215 let cbs = MockCallbacks {
4216 announces: Arc::new(Mutex::new(Vec::new())),
4217 paths: Arc::new(Mutex::new(Vec::new())),
4218 deliveries: Arc::new(Mutex::new(Vec::new())),
4219 iface_ups: Arc::new(Mutex::new(Vec::new())),
4220 iface_downs: Arc::new(Mutex::new(Vec::new())),
4221 link_established: Arc::new(Mutex::new(Vec::new())),
4222 link_closed: Arc::new(Mutex::new(Vec::new())),
4223 remote_identified: Arc::new(Mutex::new(Vec::new())),
4224 resources_received: Arc::new(Mutex::new(Vec::new())),
4225 resource_completed: Arc::new(Mutex::new(Vec::new())),
4226 resource_failed: Arc::new(Mutex::new(Vec::new())),
4227 channel_messages: Arc::new(Mutex::new(Vec::new())),
4228 link_data: Arc::new(Mutex::new(Vec::new())),
4229 responses: Arc::new(Mutex::new(Vec::new())),
4230 proofs: proofs.clone(),
4231 proof_requested: Arc::new(Mutex::new(Vec::new())),
4232 };
4233
4234 let mut driver = Driver::new(
4235 TransportConfig { transport_enabled: false, identity_hash: None },
4236 rx,
4237 tx.clone(),
4238 Box::new(cbs),
4239 );
4240 let info = make_interface_info(1);
4241 driver.engine.register_interface(info);
4242 let (writer, _sent) = MockWriter::new();
4243 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4244
4245 let dest = [0xEE; 16];
4247 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4248
4249 let tracked_hash = [0x42u8; 32];
4251 let sent_time = time::now() - 0.5; driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4253
4254 let mut proof_data = Vec::new();
4256 proof_data.extend_from_slice(&tracked_hash);
4257 proof_data.extend_from_slice(&[0xAA; 64]); let flags = PacketFlags {
4260 header_type: constants::HEADER_1,
4261 context_flag: constants::FLAG_UNSET,
4262 transport_type: constants::TRANSPORT_BROADCAST,
4263 destination_type: constants::DESTINATION_SINGLE,
4264 packet_type: constants::PACKET_TYPE_PROOF,
4265 };
4266 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4267
4268 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4269 tx.send(Event::Shutdown).unwrap();
4270 driver.run();
4271
4272 let proof_list = proofs.lock().unwrap();
4274 assert_eq!(proof_list.len(), 1);
4275 assert_eq!(proof_list[0].0, DestHash(dest));
4276 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4277 assert!(proof_list[0].2 >= 0.4, "RTT should be approximately 0.5s, got {}", proof_list[0].2);
4278
4279 assert!(!driver.sent_packets.contains_key(&tracked_hash));
4281 }
4282
4283 #[test]
4284 fn inbound_proof_for_unknown_packet_is_ignored() {
4285 let (tx, rx) = event::channel();
4286 let proofs = Arc::new(Mutex::new(Vec::new()));
4287 let cbs = MockCallbacks {
4288 announces: Arc::new(Mutex::new(Vec::new())),
4289 paths: Arc::new(Mutex::new(Vec::new())),
4290 deliveries: Arc::new(Mutex::new(Vec::new())),
4291 iface_ups: Arc::new(Mutex::new(Vec::new())),
4292 iface_downs: Arc::new(Mutex::new(Vec::new())),
4293 link_established: Arc::new(Mutex::new(Vec::new())),
4294 link_closed: Arc::new(Mutex::new(Vec::new())),
4295 remote_identified: Arc::new(Mutex::new(Vec::new())),
4296 resources_received: Arc::new(Mutex::new(Vec::new())),
4297 resource_completed: Arc::new(Mutex::new(Vec::new())),
4298 resource_failed: Arc::new(Mutex::new(Vec::new())),
4299 channel_messages: Arc::new(Mutex::new(Vec::new())),
4300 link_data: Arc::new(Mutex::new(Vec::new())),
4301 responses: Arc::new(Mutex::new(Vec::new())),
4302 proofs: proofs.clone(),
4303 proof_requested: Arc::new(Mutex::new(Vec::new())),
4304 };
4305
4306 let mut driver = Driver::new(
4307 TransportConfig { transport_enabled: false, identity_hash: None },
4308 rx,
4309 tx.clone(),
4310 Box::new(cbs),
4311 );
4312 let info = make_interface_info(1);
4313 driver.engine.register_interface(info);
4314 let (writer, _sent) = MockWriter::new();
4315 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4316
4317 let dest = [0xEE; 16];
4318 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4319
4320 let unknown_hash = [0xFF; 32];
4322 let mut proof_data = Vec::new();
4323 proof_data.extend_from_slice(&unknown_hash);
4324 proof_data.extend_from_slice(&[0xAA; 64]);
4325
4326 let flags = PacketFlags {
4327 header_type: constants::HEADER_1,
4328 context_flag: constants::FLAG_UNSET,
4329 transport_type: constants::TRANSPORT_BROADCAST,
4330 destination_type: constants::DESTINATION_SINGLE,
4331 packet_type: constants::PACKET_TYPE_PROOF,
4332 };
4333 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4334
4335 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4336 tx.send(Event::Shutdown).unwrap();
4337 driver.run();
4338
4339 assert!(proofs.lock().unwrap().is_empty());
4341 }
4342
4343 #[test]
4344 fn inbound_proof_with_valid_signature_fires_callback() {
4345 let (tx, rx) = event::channel();
4347 let proofs = Arc::new(Mutex::new(Vec::new()));
4348 let cbs = MockCallbacks {
4349 announces: Arc::new(Mutex::new(Vec::new())),
4350 paths: Arc::new(Mutex::new(Vec::new())),
4351 deliveries: Arc::new(Mutex::new(Vec::new())),
4352 iface_ups: Arc::new(Mutex::new(Vec::new())),
4353 iface_downs: Arc::new(Mutex::new(Vec::new())),
4354 link_established: Arc::new(Mutex::new(Vec::new())),
4355 link_closed: Arc::new(Mutex::new(Vec::new())),
4356 remote_identified: Arc::new(Mutex::new(Vec::new())),
4357 resources_received: Arc::new(Mutex::new(Vec::new())),
4358 resource_completed: Arc::new(Mutex::new(Vec::new())),
4359 resource_failed: Arc::new(Mutex::new(Vec::new())),
4360 channel_messages: Arc::new(Mutex::new(Vec::new())),
4361 link_data: Arc::new(Mutex::new(Vec::new())),
4362 responses: Arc::new(Mutex::new(Vec::new())),
4363 proofs: proofs.clone(),
4364 proof_requested: Arc::new(Mutex::new(Vec::new())),
4365 };
4366
4367 let mut driver = Driver::new(
4368 TransportConfig { transport_enabled: false, identity_hash: None },
4369 rx,
4370 tx.clone(),
4371 Box::new(cbs),
4372 );
4373 let info = make_interface_info(1);
4374 driver.engine.register_interface(info);
4375 let (writer, _sent) = MockWriter::new();
4376 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4377
4378 let dest = [0xEE; 16];
4379 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4380
4381 let identity = Identity::new(&mut OsRng);
4383 let pub_key = identity.get_public_key();
4384 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4385 dest_hash: DestHash(dest),
4386 identity_hash: IdentityHash(*identity.hash()),
4387 public_key: pub_key.unwrap(),
4388 app_data: None,
4389 hops: 0,
4390 received_at: time::now(),
4391 });
4392
4393 let tracked_hash = [0x42u8; 32];
4395 let sent_time = time::now() - 0.5;
4396 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4397
4398 let signature = identity.sign(&tracked_hash).unwrap();
4399 let mut proof_data = Vec::new();
4400 proof_data.extend_from_slice(&tracked_hash);
4401 proof_data.extend_from_slice(&signature);
4402
4403 let flags = PacketFlags {
4404 header_type: constants::HEADER_1,
4405 context_flag: constants::FLAG_UNSET,
4406 transport_type: constants::TRANSPORT_BROADCAST,
4407 destination_type: constants::DESTINATION_SINGLE,
4408 packet_type: constants::PACKET_TYPE_PROOF,
4409 };
4410 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4411
4412 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4413 tx.send(Event::Shutdown).unwrap();
4414 driver.run();
4415
4416 let proof_list = proofs.lock().unwrap();
4418 assert_eq!(proof_list.len(), 1);
4419 assert_eq!(proof_list[0].0, DestHash(dest));
4420 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
4421 }
4422
4423 #[test]
4424 fn inbound_proof_with_invalid_signature_rejected() {
4425 let (tx, rx) = event::channel();
4427 let proofs = Arc::new(Mutex::new(Vec::new()));
4428 let cbs = MockCallbacks {
4429 announces: Arc::new(Mutex::new(Vec::new())),
4430 paths: Arc::new(Mutex::new(Vec::new())),
4431 deliveries: Arc::new(Mutex::new(Vec::new())),
4432 iface_ups: Arc::new(Mutex::new(Vec::new())),
4433 iface_downs: Arc::new(Mutex::new(Vec::new())),
4434 link_established: Arc::new(Mutex::new(Vec::new())),
4435 link_closed: Arc::new(Mutex::new(Vec::new())),
4436 remote_identified: Arc::new(Mutex::new(Vec::new())),
4437 resources_received: Arc::new(Mutex::new(Vec::new())),
4438 resource_completed: Arc::new(Mutex::new(Vec::new())),
4439 resource_failed: Arc::new(Mutex::new(Vec::new())),
4440 channel_messages: Arc::new(Mutex::new(Vec::new())),
4441 link_data: Arc::new(Mutex::new(Vec::new())),
4442 responses: Arc::new(Mutex::new(Vec::new())),
4443 proofs: proofs.clone(),
4444 proof_requested: Arc::new(Mutex::new(Vec::new())),
4445 };
4446
4447 let mut driver = Driver::new(
4448 TransportConfig { transport_enabled: false, identity_hash: None },
4449 rx,
4450 tx.clone(),
4451 Box::new(cbs),
4452 );
4453 let info = make_interface_info(1);
4454 driver.engine.register_interface(info);
4455 let (writer, _sent) = MockWriter::new();
4456 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4457
4458 let dest = [0xEE; 16];
4459 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4460
4461 let identity = Identity::new(&mut OsRng);
4463 let pub_key = identity.get_public_key();
4464 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
4465 dest_hash: DestHash(dest),
4466 identity_hash: IdentityHash(*identity.hash()),
4467 public_key: pub_key.unwrap(),
4468 app_data: None,
4469 hops: 0,
4470 received_at: time::now(),
4471 });
4472
4473 let tracked_hash = [0x42u8; 32];
4475 let sent_time = time::now() - 0.5;
4476 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
4477
4478 let mut proof_data = Vec::new();
4480 proof_data.extend_from_slice(&tracked_hash);
4481 proof_data.extend_from_slice(&[0xAA; 64]);
4482
4483 let flags = PacketFlags {
4484 header_type: constants::HEADER_1,
4485 context_flag: constants::FLAG_UNSET,
4486 transport_type: constants::TRANSPORT_BROADCAST,
4487 destination_type: constants::DESTINATION_SINGLE,
4488 packet_type: constants::PACKET_TYPE_PROOF,
4489 };
4490 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
4491
4492 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
4493 tx.send(Event::Shutdown).unwrap();
4494 driver.run();
4495
4496 assert!(proofs.lock().unwrap().is_empty());
4498 }
4499
4500 #[test]
4501 fn proof_data_is_valid_explicit_proof() {
4502 let (tx, rx) = event::channel();
4504 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4505 let mut driver = Driver::new(
4506 TransportConfig { transport_enabled: false, identity_hash: None },
4507 rx,
4508 tx.clone(),
4509 Box::new(cbs),
4510 );
4511 let info = make_interface_info(1);
4512 driver.engine.register_interface(info);
4513 let (writer, sent) = MockWriter::new();
4514 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
4515
4516 let dest = [0xDD; 16];
4517 let identity = Identity::new(&mut OsRng);
4518 let prv_key = identity.get_private_key().unwrap();
4519 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
4520 driver.proof_strategies.insert(dest, (
4521 rns_core::types::ProofStrategy::ProveAll,
4522 Some(Identity::from_private_key(&prv_key)),
4523 ));
4524
4525 let flags = PacketFlags {
4526 header_type: constants::HEADER_1,
4527 context_flag: constants::FLAG_UNSET,
4528 transport_type: constants::TRANSPORT_BROADCAST,
4529 destination_type: constants::DESTINATION_SINGLE,
4530 packet_type: constants::PACKET_TYPE_DATA,
4531 };
4532 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"verify me").unwrap();
4533 let data_packet_hash = data_packet.packet_hash;
4534
4535 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
4536 tx.send(Event::Shutdown).unwrap();
4537 driver.run();
4538
4539 let sent_packets = sent.lock().unwrap();
4541 let proof_raw = sent_packets.iter().find(|raw| {
4542 let f = PacketFlags::unpack(raw[0] & 0x7F);
4543 f.packet_type == constants::PACKET_TYPE_PROOF
4544 });
4545 assert!(proof_raw.is_some(), "Should have sent a proof");
4546
4547 let proof_packet = RawPacket::unpack(proof_raw.unwrap()).unwrap();
4548 assert_eq!(proof_packet.data.len(), 96, "Explicit proof should be 96 bytes");
4550
4551 let result = rns_core::receipt::validate_proof(
4553 &proof_packet.data,
4554 &data_packet_hash,
4555 &Identity::from_private_key(&prv_key), );
4557 assert_eq!(result, rns_core::receipt::ProofResult::Valid);
4558 }
4559
4560 #[test]
4561 fn query_local_destinations_empty() {
4562 let (tx, rx) = event::channel();
4563 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4564 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4565 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4566
4567 let (resp_tx, resp_rx) = mpsc::channel();
4568 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4569 tx.send(Event::Shutdown).unwrap();
4570 driver.run();
4571
4572 match resp_rx.recv().unwrap() {
4573 QueryResponse::LocalDestinations(entries) => {
4574 assert_eq!(entries.len(), 2);
4576 for entry in &entries {
4577 assert_eq!(entry.dest_type, rns_core::constants::DESTINATION_PLAIN);
4578 }
4579 }
4580 other => panic!("expected LocalDestinations, got {:?}", other),
4581 }
4582 }
4583
4584 #[test]
4585 fn query_local_destinations_with_registered() {
4586 let (tx, rx) = event::channel();
4587 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4588 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4589 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4590
4591 let dest_hash = [0xAA; 16];
4592 tx.send(Event::RegisterDestination {
4593 dest_hash,
4594 dest_type: rns_core::constants::DESTINATION_SINGLE,
4595 }).unwrap();
4596
4597 let (resp_tx, resp_rx) = mpsc::channel();
4598 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4599 tx.send(Event::Shutdown).unwrap();
4600 driver.run();
4601
4602 match resp_rx.recv().unwrap() {
4603 QueryResponse::LocalDestinations(entries) => {
4604 assert_eq!(entries.len(), 3);
4606 assert!(entries.iter().any(|e| e.hash == dest_hash
4607 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4608 }
4609 other => panic!("expected LocalDestinations, got {:?}", other),
4610 }
4611 }
4612
4613 #[test]
4614 fn query_local_destinations_tracks_link_dest() {
4615 let (tx, rx) = event::channel();
4616 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4617 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4618 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4619
4620 let dest_hash = [0xBB; 16];
4621 tx.send(Event::RegisterLinkDestination {
4622 dest_hash,
4623 sig_prv_bytes: [0x11; 32],
4624 sig_pub_bytes: [0x22; 32],
4625 }).unwrap();
4626
4627 let (resp_tx, resp_rx) = mpsc::channel();
4628 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
4629 tx.send(Event::Shutdown).unwrap();
4630 driver.run();
4631
4632 match resp_rx.recv().unwrap() {
4633 QueryResponse::LocalDestinations(entries) => {
4634 assert_eq!(entries.len(), 3);
4636 assert!(entries.iter().any(|e| e.hash == dest_hash
4637 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
4638 }
4639 other => panic!("expected LocalDestinations, got {:?}", other),
4640 }
4641 }
4642
4643 #[test]
4644 fn query_links_empty() {
4645 let (tx, rx) = event::channel();
4646 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4647 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4648 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4649
4650 let (resp_tx, resp_rx) = mpsc::channel();
4651 tx.send(Event::Query(QueryRequest::Links, resp_tx)).unwrap();
4652 tx.send(Event::Shutdown).unwrap();
4653 driver.run();
4654
4655 match resp_rx.recv().unwrap() {
4656 QueryResponse::Links(entries) => {
4657 assert!(entries.is_empty());
4658 }
4659 other => panic!("expected Links, got {:?}", other),
4660 }
4661 }
4662
4663 #[test]
4664 fn query_resources_empty() {
4665 let (tx, rx) = event::channel();
4666 let (cbs, _, _, _, _, _) = MockCallbacks::new();
4667 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
4668 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
4669
4670 let (resp_tx, resp_rx) = mpsc::channel();
4671 tx.send(Event::Query(QueryRequest::Resources, resp_tx)).unwrap();
4672 tx.send(Event::Shutdown).unwrap();
4673 driver.run();
4674
4675 match resp_rx.recv().unwrap() {
4676 QueryResponse::Resources(entries) => {
4677 assert!(entries.is_empty());
4678 }
4679 other => panic!("expected Resources, got {:?}", other),
4680 }
4681 }
4682
4683 #[test]
4684 fn infer_interface_type_from_name() {
4685 assert_eq!(
4686 super::infer_interface_type("TCPServerInterface/Client-1234"),
4687 "TCPServerClientInterface"
4688 );
4689 assert_eq!(
4690 super::infer_interface_type("BackboneInterface/5"),
4691 "BackboneInterface"
4692 );
4693 assert_eq!(
4694 super::infer_interface_type("LocalInterface"),
4695 "LocalServerClientInterface"
4696 );
4697 assert_eq!(
4698 super::infer_interface_type("MyAutoGroup:fe80::1"),
4699 "AutoInterface"
4700 );
4701 }
4702
4703 #[test]
4706 fn test_extract_dest_hash_empty() {
4707 assert_eq!(super::extract_dest_hash(&[]), [0u8; 16]);
4708 }
4709
4710 #[test]
4711 fn test_extract_dest_hash_too_short() {
4712 assert_eq!(super::extract_dest_hash(&[0x00, 0x00, 0xAA]), [0u8; 16]);
4714 }
4715
4716 #[test]
4717 fn test_extract_dest_hash_header1() {
4718 let mut raw = vec![0x00, 0x00]; let dest = [0x11; 16];
4721 raw.extend_from_slice(&dest);
4722 raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
4724 }
4725
4726 #[test]
4727 fn test_extract_dest_hash_header2() {
4728 let mut raw = vec![0x40, 0x00]; raw.extend_from_slice(&[0xAA; 16]); let dest = [0x22; 16];
4732 raw.extend_from_slice(&dest); raw.extend_from_slice(&[0xFF; 10]); assert_eq!(super::extract_dest_hash(&raw), dest);
4735 }
4736
4737 #[test]
4738 fn test_extract_dest_hash_header2_too_short() {
4739 let mut raw = vec![0x40, 0x00];
4741 raw.extend_from_slice(&[0xAA; 16]); assert_eq!(super::extract_dest_hash(&raw), [0u8; 16]);
4743 }
4744
4745 #[test]
4746 fn test_extract_dest_hash_other_flags_preserved() {
4747 let mut raw = vec![0x3F, 0x00];
4750 let dest = [0x33; 16];
4751 raw.extend_from_slice(&dest);
4752 raw.extend_from_slice(&[0xFF; 10]);
4753 assert_eq!(super::extract_dest_hash(&raw), dest);
4754
4755 let mut raw2 = vec![0xFF, 0x00];
4757 raw2.extend_from_slice(&[0xBB; 16]); let dest2 = [0x44; 16];
4759 raw2.extend_from_slice(&dest2);
4760 raw2.extend_from_slice(&[0xFF; 10]);
4761 assert_eq!(super::extract_dest_hash(&raw2), dest2);
4762 }
4763}