1use std::collections::HashMap;
4
5use rns_core::packet::RawPacket;
6use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
7use rns_core::transport::TransportEngine;
8use rns_crypto::{OsRng, Rng};
9
10use crate::event::{
11 BlackholeInfo, Event, EventReceiver, InterfaceStatsResponse,
12 LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest, QueryResponse,
13 RateTableEntry, SingleInterfaceStat,
14};
15use crate::ifac;
16use crate::interface::{InterfaceEntry, InterfaceStats};
17use crate::link_manager::{LinkManager, LinkManagerAction};
18use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
19use crate::time;
20
21fn infer_interface_type(name: &str) -> String {
25 if name.starts_with("TCPServerInterface") {
26 "TCPServerClientInterface".to_string()
27 } else if name.starts_with("BackboneInterface") {
28 "BackboneInterface".to_string()
29 } else if name.starts_with("LocalInterface") {
30 "LocalServerClientInterface".to_string()
31 } else {
32 "AutoInterface".to_string()
35 }
36}
37
38pub trait Callbacks: Send {
43 fn on_announce(
44 &mut self,
45 announced: crate::destination::AnnouncedIdentity,
46 );
47
48 fn on_path_updated(&mut self, dest_hash: rns_core::types::DestHash, hops: u8);
49
50 fn on_local_delivery(&mut self, dest_hash: rns_core::types::DestHash, raw: Vec<u8>, packet_hash: rns_core::types::PacketHash);
51
52 fn on_interface_up(&mut self, _id: InterfaceId) {}
54
55 fn on_interface_down(&mut self, _id: InterfaceId) {}
57
58 fn on_link_established(&mut self, _link_id: rns_core::types::LinkId, _dest_hash: rns_core::types::DestHash, _rtt: f64, _is_initiator: bool) {}
60
61 fn on_link_closed(&mut self, _link_id: rns_core::types::LinkId, _reason: Option<rns_core::link::TeardownReason>) {}
63
64 fn on_remote_identified(&mut self, _link_id: rns_core::types::LinkId, _identity_hash: rns_core::types::IdentityHash, _public_key: [u8; 64]) {}
66
67 fn on_resource_received(&mut self, _link_id: rns_core::types::LinkId, _data: Vec<u8>, _metadata: Option<Vec<u8>>) {}
69
70 fn on_resource_completed(&mut self, _link_id: rns_core::types::LinkId) {}
72
73 fn on_resource_failed(&mut self, _link_id: rns_core::types::LinkId, _error: String) {}
75
76 fn on_resource_progress(&mut self, _link_id: rns_core::types::LinkId, _received: usize, _total: usize) {}
78
79 fn on_resource_accept_query(&mut self, _link_id: rns_core::types::LinkId, _resource_hash: Vec<u8>, _transfer_size: u64, _has_metadata: bool) -> bool {
82 false
83 }
84
85 fn on_channel_message(&mut self, _link_id: rns_core::types::LinkId, _msgtype: u16, _payload: Vec<u8>) {}
87
88 fn on_link_data(&mut self, _link_id: rns_core::types::LinkId, _context: u8, _data: Vec<u8>) {}
90
91 fn on_response(&mut self, _link_id: rns_core::types::LinkId, _request_id: [u8; 16], _data: Vec<u8>) {}
93
94 fn on_proof(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash, _rtt: f64) {}
97
98 fn on_proof_requested(&mut self, _dest_hash: rns_core::types::DestHash, _packet_hash: rns_core::types::PacketHash) -> bool {
101 true
102 }
103
104 fn on_direct_connect_proposed(&mut self, _link_id: rns_core::types::LinkId, _peer_identity: Option<rns_core::types::IdentityHash>) -> bool {
107 false
108 }
109
110 fn on_direct_connect_established(&mut self, _link_id: rns_core::types::LinkId, _interface_id: InterfaceId) {}
112
113 fn on_direct_connect_failed(&mut self, _link_id: rns_core::types::LinkId, _reason: u8) {}
115}
116
117pub struct Driver {
119 pub(crate) engine: TransportEngine,
120 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
121 pub(crate) rng: OsRng,
122 pub(crate) rx: EventReceiver,
123 pub(crate) callbacks: Box<dyn Callbacks>,
124 pub(crate) started: f64,
125 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
126 pub(crate) tunnel_synth_dest: [u8; 16],
128 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
130 pub(crate) link_manager: LinkManager,
132 pub(crate) management_config: crate::management::ManagementConfig,
134 pub(crate) last_management_announce: f64,
136 pub(crate) initial_announce_sent: bool,
138 pub(crate) known_destinations: HashMap<[u8; 16], crate::destination::AnnouncedIdentity>,
140 pub(crate) path_request_dest: [u8; 16],
142 pub(crate) proof_strategies: HashMap<[u8; 16], (rns_core::types::ProofStrategy, Option<rns_crypto::identity::Identity>)>,
145 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
147 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
149 pub(crate) holepunch_manager: HolePunchManager,
151 pub(crate) event_tx: crate::event::EventSender,
153}
154
155impl Driver {
156 pub fn new(
158 config: TransportConfig,
159 rx: EventReceiver,
160 tx: crate::event::EventSender,
161 callbacks: Box<dyn Callbacks>,
162 ) -> Self {
163 let tunnel_synth_dest = rns_core::destination::destination_hash(
164 "rnstransport",
165 &["tunnel", "synthesize"],
166 None,
167 );
168 let path_request_dest = rns_core::destination::destination_hash(
169 "rnstransport",
170 &["path", "request"],
171 None,
172 );
173 let mut engine = TransportEngine::new(config);
174 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
175 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
177 let mut local_destinations = HashMap::new();
178 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
179 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
180 Driver {
181 engine,
182 interfaces: HashMap::new(),
183 rng: OsRng,
184 rx,
185 callbacks,
186 started: time::now(),
187 announce_cache: None,
188 tunnel_synth_dest,
189 transport_identity: None,
190 link_manager: LinkManager::new(),
191 management_config: Default::default(),
192 last_management_announce: 0.0,
193 initial_announce_sent: false,
194 known_destinations: HashMap::new(),
195 path_request_dest,
196 proof_strategies: HashMap::new(),
197 sent_packets: HashMap::new(),
198 local_destinations,
199 holepunch_manager: HolePunchManager::new(None, None),
200 event_tx: tx,
201 }
202 }
203
204 pub fn set_probe_config(&mut self, addr: Option<std::net::SocketAddr>, device: Option<String>) {
206 self.holepunch_manager = HolePunchManager::new(addr, device);
207 }
208
209 pub fn run(&mut self) {
211 loop {
212 let event = match self.rx.recv() {
213 Ok(e) => e,
214 Err(_) => break, };
216
217 match event {
218 Event::Frame { interface_id, data } => {
219 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
221 entry.stats.rxb += data.len() as u64;
222 entry.stats.rx_packets += 1;
223 }
224
225 let packet = if let Some(entry) = self.interfaces.get(&interface_id) {
227 if let Some(ref ifac_state) = entry.ifac {
228 match ifac::unmask_inbound(&data, ifac_state) {
230 Some(unmasked) => unmasked,
231 None => {
232 log::debug!("[{}] IFAC rejected packet", interface_id.0);
233 continue;
234 }
235 }
236 } else {
237 if data.len() > 2 && data[0] & 0x80 == 0x80 {
239 log::debug!("[{}] dropping packet with IFAC flag on non-IFAC interface", interface_id.0);
240 continue;
241 }
242 data
243 }
244 } else {
245 data
246 };
247
248 if packet.len() > 2 && (packet[0] & 0x03) == 0x01 {
250 let now = time::now();
251 if let Some(entry) = self.interfaces.get_mut(&interface_id) {
252 entry.stats.record_incoming_announce(now);
253 }
254 }
255
256 if let Some(entry) = self.interfaces.get(&interface_id) {
258 self.engine.update_interface_freq(interface_id, entry.stats.incoming_announce_freq());
259 }
260
261 let actions = self.engine.handle_inbound(
262 &packet,
263 interface_id,
264 time::now(),
265 &mut self.rng,
266 );
267 self.dispatch_all(actions);
268 }
269 Event::Tick => {
270 let now = time::now();
271 for (id, entry) in &self.interfaces {
273 self.engine.update_interface_freq(*id, entry.stats.incoming_announce_freq());
274 }
275 let actions = self.engine.tick(now, &mut self.rng);
276 self.dispatch_all(actions);
277 let link_actions = self.link_manager.tick(&mut self.rng);
279 self.dispatch_link_actions(link_actions);
280 {
282 let tx = self.get_event_sender();
283 let hp_actions = self.holepunch_manager.tick(&tx);
284 self.dispatch_holepunch_actions(hp_actions);
285 }
286 self.tick_management_announces(now);
288 self.sent_packets.retain(|_, (_, sent_time)| now - *sent_time < 60.0);
290 }
291 Event::InterfaceUp(id, new_writer, info) => {
292 let wants_tunnel;
293 if let Some(mut info) = info {
294 log::info!("[{}] dynamic interface registered", id.0);
296 wants_tunnel = info.wants_tunnel;
297 let iface_type = infer_interface_type(&info.name);
298 info.started = time::now();
300 self.engine.register_interface(info.clone());
301 if let Some(writer) = new_writer {
302 self.interfaces.insert(
303 id,
304 InterfaceEntry {
305 id,
306 info,
307 writer,
308 online: true,
309 dynamic: true,
310 ifac: None,
311 stats: InterfaceStats {
312 started: time::now(),
313 ..Default::default()
314 },
315 interface_type: iface_type,
316 },
317 );
318 }
319 self.callbacks.on_interface_up(id);
320 } else if let Some(entry) = self.interfaces.get_mut(&id) {
321 log::info!("[{}] interface online", id.0);
323 wants_tunnel = entry.info.wants_tunnel;
324 entry.online = true;
325 if let Some(writer) = new_writer {
326 log::info!("[{}] writer refreshed after reconnect", id.0);
327 entry.writer = writer;
328 }
329 self.callbacks.on_interface_up(id);
330 } else {
331 wants_tunnel = false;
332 }
333
334 if wants_tunnel {
336 self.synthesize_tunnel_for_interface(id);
337 }
338 }
339 Event::InterfaceDown(id) => {
340 if let Some(entry) = self.interfaces.get(&id) {
342 if let Some(tunnel_id) = entry.info.tunnel_id {
343 self.engine.void_tunnel_interface(&tunnel_id);
344 }
345 }
346
347 if let Some(entry) = self.interfaces.get(&id) {
348 if entry.dynamic {
349 log::info!("[{}] dynamic interface removed", id.0);
351 self.engine.deregister_interface(id);
352 self.interfaces.remove(&id);
353 } else {
354 log::info!("[{}] interface offline", id.0);
356 self.interfaces.get_mut(&id).unwrap().online = false;
357 }
358 self.callbacks.on_interface_down(id);
359 }
360 }
361 Event::SendOutbound { raw, dest_type, attached_interface } => {
362 match RawPacket::unpack(&raw) {
363 Ok(packet) => {
364 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_DATA {
366 self.sent_packets.insert(
367 packet.packet_hash,
368 (packet.destination_hash, time::now()),
369 );
370 }
371 let actions = self.engine.handle_outbound(
372 &packet,
373 dest_type,
374 attached_interface,
375 time::now(),
376 );
377 self.dispatch_all(actions);
378 }
379 Err(e) => {
380 log::warn!("SendOutbound: failed to unpack packet: {:?}", e);
381 }
382 }
383 }
384 Event::RegisterDestination { dest_hash, dest_type } => {
385 self.engine.register_destination(dest_hash, dest_type);
386 self.local_destinations.insert(dest_hash, dest_type);
387 }
388 Event::DeregisterDestination { dest_hash } => {
389 self.engine.deregister_destination(&dest_hash);
390 self.local_destinations.remove(&dest_hash);
391 }
392 Event::Query(request, response_tx) => {
393 let response = self.handle_query_mut(request);
394 let _ = response_tx.send(response);
395 }
396 Event::DeregisterLinkDestination { dest_hash } => {
397 self.link_manager.deregister_link_destination(&dest_hash);
398 }
399 Event::RegisterLinkDestination { dest_hash, sig_prv_bytes, sig_pub_bytes } => {
400 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::from_bytes(&sig_prv_bytes);
401 self.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes);
402 self.engine.register_destination(dest_hash, rns_core::constants::DESTINATION_SINGLE);
404 self.local_destinations.insert(dest_hash, rns_core::constants::DESTINATION_SINGLE);
405 }
406 Event::RegisterRequestHandler { path, allowed_list, handler } => {
407 self.link_manager.register_request_handler(&path, allowed_list, move |link_id, p, data, remote| {
408 handler(link_id, p, data, remote)
409 });
410 }
411 Event::CreateLink { dest_hash, dest_sig_pub_bytes, response_tx } => {
412 let hops = self.engine.hops_to(&dest_hash).unwrap_or(0);
413 let mtu = self.engine.next_hop_interface(&dest_hash)
414 .and_then(|iface_id| self.interfaces.get(&iface_id))
415 .map(|entry| entry.info.mtu)
416 .unwrap_or(rns_core::constants::MTU as u32);
417 let (link_id, link_actions) = self.link_manager.create_link(
418 &dest_hash, &dest_sig_pub_bytes, hops, mtu, &mut self.rng,
419 );
420 let _ = response_tx.send(link_id);
421 self.dispatch_link_actions(link_actions);
422 }
423 Event::SendRequest { link_id, path, data } => {
424 let link_actions = self.link_manager.send_request(
425 &link_id, &path, &data, &mut self.rng,
426 );
427 self.dispatch_link_actions(link_actions);
428 }
429 Event::IdentifyOnLink { link_id, identity_prv_key } => {
430 let identity = rns_crypto::identity::Identity::from_private_key(&identity_prv_key);
431 let link_actions = self.link_manager.identify(&link_id, &identity, &mut self.rng);
432 self.dispatch_link_actions(link_actions);
433 }
434 Event::TeardownLink { link_id } => {
435 let link_actions = self.link_manager.teardown_link(&link_id);
436 self.dispatch_link_actions(link_actions);
437 }
438 Event::SendResource { link_id, data, metadata } => {
439 let link_actions = self.link_manager.send_resource(
440 &link_id, &data, metadata.as_deref(), &mut self.rng,
441 );
442 self.dispatch_link_actions(link_actions);
443 }
444 Event::SetResourceStrategy { link_id, strategy } => {
445 use crate::link_manager::ResourceStrategy;
446 let strat = match strategy {
447 0 => ResourceStrategy::AcceptNone,
448 1 => ResourceStrategy::AcceptAll,
449 2 => ResourceStrategy::AcceptApp,
450 _ => ResourceStrategy::AcceptNone,
451 };
452 self.link_manager.set_resource_strategy(&link_id, strat);
453 }
454 Event::AcceptResource { link_id, resource_hash, accept } => {
455 let link_actions = self.link_manager.accept_resource(
456 &link_id, &resource_hash, accept, &mut self.rng,
457 );
458 self.dispatch_link_actions(link_actions);
459 }
460 Event::SendChannelMessage { link_id, msgtype, payload } => {
461 let link_actions = self.link_manager.send_channel_message(
462 &link_id, msgtype, &payload, &mut self.rng,
463 );
464 self.dispatch_link_actions(link_actions);
465 }
466 Event::SendOnLink { link_id, data, context } => {
467 let link_actions = self.link_manager.send_on_link(
468 &link_id, &data, context, &mut self.rng,
469 );
470 self.dispatch_link_actions(link_actions);
471 }
472 Event::RequestPath { dest_hash } => {
473 self.handle_request_path(dest_hash);
474 }
475 Event::RegisterProofStrategy { dest_hash, strategy, signing_key } => {
476 let identity = signing_key.map(|key| {
477 rns_crypto::identity::Identity::from_private_key(&key)
478 });
479 self.proof_strategies.insert(dest_hash, (strategy, identity));
480 }
481 Event::ProposeDirectConnect { link_id } => {
482 let derived_key = self.link_manager.get_derived_key(&link_id);
483 if let Some(dk) = derived_key {
484 let tx = self.get_event_sender();
485 let hp_actions = self.holepunch_manager.propose(
486 link_id, &dk, &mut self.rng, &tx,
487 );
488 self.dispatch_holepunch_actions(hp_actions);
489 } else {
490 log::warn!("Cannot propose direct connect: no derived key for link {:02x?}", &link_id[..4]);
491 }
492 }
493 Event::SetDirectConnectPolicy { policy } => {
494 self.holepunch_manager.set_policy(policy);
495 }
496 Event::HolePunchProbeResult { link_id, session_id, observed_addr, socket } => {
497 let hp_actions = self.holepunch_manager.handle_probe_result(
498 link_id, session_id, observed_addr, socket,
499 );
500 self.dispatch_holepunch_actions(hp_actions);
501 }
502 Event::HolePunchProbeFailed { link_id, session_id } => {
503 let hp_actions = self.holepunch_manager.handle_probe_failed(
504 link_id, session_id,
505 );
506 self.dispatch_holepunch_actions(hp_actions);
507 }
508 Event::Shutdown => break,
509 }
510 }
511 }
512
513 fn handle_query(&self, request: QueryRequest) -> QueryResponse {
515 match request {
516 QueryRequest::InterfaceStats => {
517 let mut interfaces = Vec::new();
518 let mut total_rxb: u64 = 0;
519 let mut total_txb: u64 = 0;
520 for entry in self.interfaces.values() {
521 total_rxb += entry.stats.rxb;
522 total_txb += entry.stats.txb;
523 interfaces.push(SingleInterfaceStat {
524 name: entry.info.name.clone(),
525 status: entry.online,
526 mode: entry.info.mode,
527 rxb: entry.stats.rxb,
528 txb: entry.stats.txb,
529 rx_packets: entry.stats.rx_packets,
530 tx_packets: entry.stats.tx_packets,
531 bitrate: entry.info.bitrate,
532 ifac_size: entry.ifac.as_ref().map(|s| s.size),
533 started: entry.stats.started,
534 ia_freq: entry.stats.incoming_announce_freq(),
535 oa_freq: entry.stats.outgoing_announce_freq(),
536 interface_type: entry.interface_type.clone(),
537 });
538 }
539 interfaces.sort_by(|a, b| a.name.cmp(&b.name));
541 QueryResponse::InterfaceStats(InterfaceStatsResponse {
542 interfaces,
543 transport_id: self.engine.identity_hash().copied(),
544 transport_enabled: self.engine.transport_enabled(),
545 transport_uptime: time::now() - self.started,
546 total_rxb,
547 total_txb,
548 })
549 }
550 QueryRequest::PathTable { max_hops } => {
551 let entries: Vec<PathTableEntry> = self
552 .engine
553 .path_table_entries()
554 .filter(|(_, entry)| {
555 max_hops.map_or(true, |max| entry.hops <= max)
556 })
557 .map(|(hash, entry)| {
558 let iface_name = self.interfaces.get(&entry.receiving_interface)
559 .map(|e| e.info.name.clone())
560 .or_else(|| self.engine.interface_info(&entry.receiving_interface)
561 .map(|i| i.name.clone()))
562 .unwrap_or_default();
563 PathTableEntry {
564 hash: *hash,
565 timestamp: entry.timestamp,
566 via: entry.next_hop,
567 hops: entry.hops,
568 expires: entry.expires,
569 interface: entry.receiving_interface,
570 interface_name: iface_name,
571 }
572 })
573 .collect();
574 QueryResponse::PathTable(entries)
575 }
576 QueryRequest::RateTable => {
577 let entries: Vec<RateTableEntry> = self
578 .engine
579 .rate_limiter()
580 .entries()
581 .map(|(hash, entry)| RateTableEntry {
582 hash: *hash,
583 last: entry.last,
584 rate_violations: entry.rate_violations,
585 blocked_until: entry.blocked_until,
586 timestamps: entry.timestamps.clone(),
587 })
588 .collect();
589 QueryResponse::RateTable(entries)
590 }
591 QueryRequest::NextHop { dest_hash } => {
592 let resp = self.engine.next_hop(&dest_hash).map(|next_hop| {
593 NextHopResponse {
594 next_hop,
595 hops: self.engine.hops_to(&dest_hash).unwrap_or(0),
596 interface: self.engine.next_hop_interface(&dest_hash).unwrap_or(InterfaceId(0)),
597 }
598 });
599 QueryResponse::NextHop(resp)
600 }
601 QueryRequest::NextHopIfName { dest_hash } => {
602 let name = self
603 .engine
604 .next_hop_interface(&dest_hash)
605 .and_then(|id| self.interfaces.get(&id))
606 .map(|entry| entry.info.name.clone());
607 QueryResponse::NextHopIfName(name)
608 }
609 QueryRequest::LinkCount => {
610 QueryResponse::LinkCount(self.engine.link_table_count() + self.link_manager.link_count())
611 }
612 QueryRequest::DropPath { .. } => {
613 QueryResponse::DropPath(false)
615 }
616 QueryRequest::DropAllVia { .. } => {
617 QueryResponse::DropAllVia(0)
618 }
619 QueryRequest::DropAnnounceQueues => {
620 QueryResponse::DropAnnounceQueues
621 }
622 QueryRequest::TransportIdentity => {
623 QueryResponse::TransportIdentity(self.engine.identity_hash().copied())
624 }
625 QueryRequest::GetBlackholed => {
626 let now = time::now();
627 let entries: Vec<BlackholeInfo> = self.engine.blackholed_entries()
628 .filter(|(_, e)| e.expires == 0.0 || e.expires > now)
629 .map(|(hash, entry)| BlackholeInfo {
630 identity_hash: *hash,
631 created: entry.created,
632 expires: entry.expires,
633 reason: entry.reason.clone(),
634 })
635 .collect();
636 QueryResponse::Blackholed(entries)
637 }
638 QueryRequest::BlackholeIdentity { .. }
639 | QueryRequest::UnblackholeIdentity { .. } => {
640 QueryResponse::BlackholeResult(false)
642 }
643 QueryRequest::HasPath { dest_hash } => {
644 QueryResponse::HasPath(self.engine.has_path(&dest_hash))
645 }
646 QueryRequest::HopsTo { dest_hash } => {
647 QueryResponse::HopsTo(self.engine.hops_to(&dest_hash))
648 }
649 QueryRequest::RecallIdentity { dest_hash } => {
650 QueryResponse::RecallIdentity(self.known_destinations.get(&dest_hash).cloned())
651 }
652 QueryRequest::LocalDestinations => {
653 let entries: Vec<LocalDestinationEntry> = self
654 .local_destinations
655 .iter()
656 .map(|(hash, dest_type)| LocalDestinationEntry {
657 hash: *hash,
658 dest_type: *dest_type,
659 })
660 .collect();
661 QueryResponse::LocalDestinations(entries)
662 }
663 QueryRequest::Links => {
664 QueryResponse::Links(self.link_manager.link_entries())
665 }
666 QueryRequest::Resources => {
667 QueryResponse::Resources(self.link_manager.resource_entries())
668 }
669 }
670 }
671
672 fn handle_query_mut(&mut self, request: QueryRequest) -> QueryResponse {
674 match request {
675 QueryRequest::BlackholeIdentity { identity_hash, duration_hours, reason } => {
676 let now = time::now();
677 self.engine.blackhole_identity(identity_hash, now, duration_hours, reason);
678 QueryResponse::BlackholeResult(true)
679 }
680 QueryRequest::UnblackholeIdentity { identity_hash } => {
681 let result = self.engine.unblackhole_identity(&identity_hash);
682 QueryResponse::UnblackholeResult(result)
683 }
684 QueryRequest::DropPath { dest_hash } => {
685 QueryResponse::DropPath(self.engine.drop_path(&dest_hash))
686 }
687 QueryRequest::DropAllVia { transport_hash } => {
688 QueryResponse::DropAllVia(self.engine.drop_all_via(&transport_hash))
689 }
690 QueryRequest::DropAnnounceQueues => {
691 self.engine.drop_announce_queues();
692 QueryResponse::DropAnnounceQueues
693 }
694 other => self.handle_query(other),
695 }
696 }
697
698 fn handle_tunnel_synth_delivery(&mut self, raw: &[u8]) {
700 let packet = match RawPacket::unpack(raw) {
702 Ok(p) => p,
703 Err(_) => return,
704 };
705
706 match rns_core::transport::tunnel::validate_tunnel_synthesize_data(&packet.data) {
707 Ok(validated) => {
708 let iface_id = self
711 .interfaces
712 .iter()
713 .find(|(_, entry)| entry.info.wants_tunnel && entry.online)
714 .map(|(id, _)| *id);
715
716 if let Some(iface) = iface_id {
717 let now = time::now();
718 let tunnel_actions =
719 self.engine.handle_tunnel(validated.tunnel_id, iface, now);
720 self.dispatch_all(tunnel_actions);
721 }
722 }
723 Err(e) => {
724 log::debug!("Tunnel synthesis validation failed: {}", e);
725 }
726 }
727 }
728
729 fn synthesize_tunnel_for_interface(&mut self, interface: InterfaceId) {
733 if let Some(ref identity) = self.transport_identity {
734 let actions = self.engine.synthesize_tunnel(identity, interface, &mut self.rng);
735 self.dispatch_all(actions);
736 }
737 }
738
739 fn handle_request_path(&mut self, dest_hash: [u8; 16]) {
741 let mut data = Vec::with_capacity(48);
743 data.extend_from_slice(&dest_hash);
744
745 if self.engine.transport_enabled() {
746 if let Some(id_hash) = self.engine.identity_hash() {
747 data.extend_from_slice(id_hash);
748 }
749 }
750
751 let mut tag = [0u8; 16];
753 self.rng.fill_bytes(&mut tag);
754 data.extend_from_slice(&tag);
755
756 let flags = rns_core::packet::PacketFlags {
758 header_type: rns_core::constants::HEADER_1,
759 context_flag: rns_core::constants::FLAG_UNSET,
760 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
761 destination_type: rns_core::constants::DESTINATION_PLAIN,
762 packet_type: rns_core::constants::PACKET_TYPE_DATA,
763 };
764
765 if let Ok(packet) = RawPacket::pack(
766 flags, 0, &self.path_request_dest, None,
767 rns_core::constants::CONTEXT_NONE, &data,
768 ) {
769 let actions = self.engine.handle_outbound(
770 &packet,
771 rns_core::constants::DESTINATION_PLAIN,
772 None,
773 time::now(),
774 );
775 self.dispatch_all(actions);
776 }
777 }
778
779 fn maybe_generate_proof(&mut self, dest_hash: [u8; 16], packet_hash: &[u8; 32]) {
782 use rns_core::types::ProofStrategy;
783
784 let (strategy, identity) = match self.proof_strategies.get(&dest_hash) {
785 Some((s, id)) => (*s, id.as_ref()),
786 None => return,
787 };
788
789 let should_prove = match strategy {
790 ProofStrategy::ProveAll => true,
791 ProofStrategy::ProveApp => {
792 self.callbacks.on_proof_requested(
793 rns_core::types::DestHash(dest_hash),
794 rns_core::types::PacketHash(*packet_hash),
795 )
796 }
797 ProofStrategy::ProveNone => false,
798 };
799
800 if !should_prove {
801 return;
802 }
803
804 let identity = match identity {
805 Some(id) => id,
806 None => {
807 log::warn!("Cannot generate proof for {:02x?}: no signing key", &dest_hash[..4]);
808 return;
809 }
810 };
811
812 let signature = match identity.sign(packet_hash) {
814 Ok(sig) => sig,
815 Err(e) => {
816 log::warn!("Failed to sign proof for {:02x?}: {:?}", &dest_hash[..4], e);
817 return;
818 }
819 };
820
821 let mut proof_data = Vec::with_capacity(96);
823 proof_data.extend_from_slice(packet_hash);
824 proof_data.extend_from_slice(&signature);
825
826 let mut proof_dest = [0u8; 16];
832 proof_dest.copy_from_slice(&packet_hash[..16]);
833
834 let flags = rns_core::packet::PacketFlags {
835 header_type: rns_core::constants::HEADER_1,
836 context_flag: rns_core::constants::FLAG_UNSET,
837 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
838 destination_type: rns_core::constants::DESTINATION_SINGLE,
839 packet_type: rns_core::constants::PACKET_TYPE_PROOF,
840 };
841
842 if let Ok(packet) = RawPacket::pack(
843 flags, 0, &proof_dest, None,
844 rns_core::constants::CONTEXT_NONE, &proof_data,
845 ) {
846 let actions = self.engine.handle_outbound(
847 &packet,
848 rns_core::constants::DESTINATION_SINGLE,
849 None,
850 time::now(),
851 );
852 self.dispatch_all(actions);
853 log::debug!("Generated proof for packet on dest {:02x?}", &dest_hash[..4]);
854 }
855 }
856
857 fn handle_inbound_proof(&mut self, dest_hash: [u8; 16], proof_data: &[u8], _raw_packet_hash: &[u8; 32]) {
859 if proof_data.len() < 96 {
861 log::debug!("Proof too short for explicit proof: {} bytes", proof_data.len());
862 return;
863 }
864
865 let mut tracked_hash = [0u8; 32];
866 tracked_hash.copy_from_slice(&proof_data[..32]);
867
868 let signature = &proof_data[32..96];
869
870 if let Some((tracked_dest, sent_time)) = self.sent_packets.remove(&tracked_hash) {
872 if let Some(announced) = self.known_destinations.get(&tracked_dest) {
875 let identity = rns_crypto::identity::Identity::from_public_key(&announced.public_key);
876 let mut sig = [0u8; 64];
877 sig.copy_from_slice(signature);
878 if !identity.verify(&sig, &tracked_hash) {
879 log::debug!(
880 "Proof signature invalid for {:02x?}",
881 &tracked_hash[..4],
882 );
883 return;
884 }
885 } else {
886 log::debug!(
887 "No known identity for dest {:02x?}, accepting proof without signature check",
888 &tracked_dest[..4],
889 );
890 }
891
892 let rtt = time::now() - sent_time;
893 log::debug!(
894 "Proof received for {:02x?} rtt={:.3}s",
895 &tracked_hash[..4], rtt,
896 );
897 self.callbacks.on_proof(
898 rns_core::types::DestHash(tracked_dest),
899 rns_core::types::PacketHash(tracked_hash),
900 rtt,
901 );
902 } else {
903 log::debug!(
904 "Proof for unknown packet {:02x?} on dest {:02x?}",
905 &tracked_hash[..4], &dest_hash[..4],
906 );
907 }
908 }
909
910 fn dispatch_all(&mut self, actions: Vec<TransportAction>) {
912 for action in actions {
913 match action {
914 TransportAction::SendOnInterface { interface, raw } => {
915 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
916 if let Some(entry) = self.interfaces.get_mut(&interface) {
917 if entry.online {
918 let data = if let Some(ref ifac_state) = entry.ifac {
919 ifac::mask_outbound(&raw, ifac_state)
920 } else {
921 raw
922 };
923 entry.stats.txb += data.len() as u64;
925 entry.stats.tx_packets += 1;
926 if is_announce {
927 entry.stats.record_outgoing_announce(time::now());
928 }
929 if let Err(e) = entry.writer.send_frame(&data) {
930 log::warn!("[{}] send failed: {}", entry.info.id.0, e);
931 }
932 }
933 }
934 }
935 TransportAction::BroadcastOnAllInterfaces { raw, exclude } => {
936 let is_announce = raw.len() > 2 && (raw[0] & 0x03) == 0x01;
937 for entry in self.interfaces.values_mut() {
938 if entry.online && Some(entry.id) != exclude {
939 let data = if let Some(ref ifac_state) = entry.ifac {
940 ifac::mask_outbound(&raw, ifac_state)
941 } else {
942 raw.clone()
943 };
944 entry.stats.txb += data.len() as u64;
946 entry.stats.tx_packets += 1;
947 if is_announce {
948 entry.stats.record_outgoing_announce(time::now());
949 }
950 if let Err(e) = entry.writer.send_frame(&data) {
951 log::warn!("[{}] broadcast failed: {}", entry.info.id.0, e);
952 }
953 }
954 }
955 }
956 TransportAction::DeliverLocal {
957 destination_hash,
958 raw,
959 packet_hash,
960 } => {
961 if destination_hash == self.tunnel_synth_dest {
962 self.handle_tunnel_synth_delivery(&raw);
964 } else if destination_hash == self.path_request_dest {
965 if let Ok(packet) = RawPacket::unpack(&raw) {
967 let actions = self.engine.handle_path_request(
968 &packet.data,
969 InterfaceId(0), time::now(),
971 );
972 self.dispatch_all(actions);
973 }
974 } else if self.link_manager.is_link_destination(&destination_hash) {
975 let link_actions = self.link_manager.handle_local_delivery(
977 destination_hash, &raw, packet_hash, &mut self.rng,
978 );
979 if link_actions.is_empty() {
980 if let Ok(packet) = RawPacket::unpack(&raw) {
984 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
985 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
986 continue;
987 }
988 }
989 self.maybe_generate_proof(destination_hash, &packet_hash);
990 self.callbacks.on_local_delivery(
991 rns_core::types::DestHash(destination_hash),
992 raw,
993 rns_core::types::PacketHash(packet_hash),
994 );
995 } else {
996 self.dispatch_link_actions(link_actions);
997 }
998 } else {
999 if let Ok(packet) = RawPacket::unpack(&raw) {
1001 if packet.flags.packet_type == rns_core::constants::PACKET_TYPE_PROOF {
1002 self.handle_inbound_proof(destination_hash, &packet.data, &packet_hash);
1003 continue;
1004 }
1005 }
1006
1007 self.maybe_generate_proof(destination_hash, &packet_hash);
1009
1010 self.callbacks
1011 .on_local_delivery(
1012 rns_core::types::DestHash(destination_hash),
1013 raw,
1014 rns_core::types::PacketHash(packet_hash),
1015 );
1016 }
1017 }
1018 TransportAction::AnnounceReceived {
1019 destination_hash,
1020 identity_hash,
1021 public_key,
1022 app_data,
1023 hops,
1024 ..
1025 } => {
1026 let announced = crate::destination::AnnouncedIdentity {
1028 dest_hash: rns_core::types::DestHash(destination_hash),
1029 identity_hash: rns_core::types::IdentityHash(identity_hash),
1030 public_key,
1031 app_data: app_data.clone(),
1032 hops,
1033 received_at: time::now(),
1034 };
1035 self.known_destinations.insert(destination_hash, announced.clone());
1036 self.callbacks.on_announce(announced);
1037 }
1038 TransportAction::PathUpdated {
1039 destination_hash,
1040 hops,
1041 ..
1042 } => {
1043 self.callbacks.on_path_updated(rns_core::types::DestHash(destination_hash), hops);
1044 }
1045 TransportAction::ForwardToLocalClients { raw, exclude } => {
1046 for entry in self.interfaces.values_mut() {
1047 if entry.online
1048 && entry.info.is_local_client
1049 && Some(entry.id) != exclude
1050 {
1051 let data = if let Some(ref ifac_state) = entry.ifac {
1052 ifac::mask_outbound(&raw, ifac_state)
1053 } else {
1054 raw.clone()
1055 };
1056 entry.stats.txb += data.len() as u64;
1057 entry.stats.tx_packets += 1;
1058 if let Err(e) = entry.writer.send_frame(&data) {
1059 log::warn!("[{}] forward to local client failed: {}", entry.info.id.0, e);
1060 }
1061 }
1062 }
1063 }
1064 TransportAction::ForwardPlainBroadcast { raw, to_local, exclude } => {
1065 for entry in self.interfaces.values_mut() {
1066 if entry.online
1067 && entry.info.is_local_client == to_local
1068 && Some(entry.id) != exclude
1069 {
1070 let data = if let Some(ref ifac_state) = entry.ifac {
1071 ifac::mask_outbound(&raw, ifac_state)
1072 } else {
1073 raw.clone()
1074 };
1075 entry.stats.txb += data.len() as u64;
1076 entry.stats.tx_packets += 1;
1077 if let Err(e) = entry.writer.send_frame(&data) {
1078 log::warn!("[{}] forward plain broadcast failed: {}", entry.info.id.0, e);
1079 }
1080 }
1081 }
1082 }
1083 TransportAction::CacheAnnounce { packet_hash, raw } => {
1084 if let Some(ref cache) = self.announce_cache {
1085 if let Err(e) = cache.store(&packet_hash, &raw, None) {
1086 log::warn!("Failed to cache announce: {}", e);
1087 }
1088 }
1089 }
1090 TransportAction::TunnelSynthesize { interface, data, dest_hash } => {
1091 let flags = rns_core::packet::PacketFlags {
1093 header_type: rns_core::constants::HEADER_1,
1094 context_flag: rns_core::constants::FLAG_UNSET,
1095 transport_type: rns_core::constants::TRANSPORT_BROADCAST,
1096 destination_type: rns_core::constants::DESTINATION_PLAIN,
1097 packet_type: rns_core::constants::PACKET_TYPE_DATA,
1098 };
1099 if let Ok(packet) = rns_core::packet::RawPacket::pack(
1100 flags, 0, &dest_hash, None,
1101 rns_core::constants::CONTEXT_NONE, &data,
1102 ) {
1103 if let Some(entry) = self.interfaces.get_mut(&interface) {
1104 if entry.online {
1105 let raw = if let Some(ref ifac_state) = entry.ifac {
1106 ifac::mask_outbound(&packet.raw, ifac_state)
1107 } else {
1108 packet.raw
1109 };
1110 entry.stats.txb += raw.len() as u64;
1111 entry.stats.tx_packets += 1;
1112 if let Err(e) = entry.writer.send_frame(&raw) {
1113 log::warn!("[{}] tunnel synthesize send failed: {}", entry.info.id.0, e);
1114 }
1115 }
1116 }
1117 }
1118 }
1119 TransportAction::TunnelEstablished { tunnel_id, interface } => {
1120 log::info!("Tunnel established: {:02x?} on interface {}", &tunnel_id[..4], interface.0);
1121 }
1122 }
1123 }
1124 }
1125
1126 fn dispatch_link_actions(&mut self, actions: Vec<LinkManagerAction>) {
1128 for action in actions {
1129 match action {
1130 LinkManagerAction::SendPacket { raw, dest_type, attached_interface } => {
1131 match RawPacket::unpack(&raw) {
1133 Ok(packet) => {
1134 let transport_actions = self.engine.handle_outbound(
1135 &packet,
1136 dest_type,
1137 attached_interface,
1138 time::now(),
1139 );
1140 self.dispatch_all(transport_actions);
1141 }
1142 Err(e) => {
1143 log::warn!("LinkManager SendPacket: failed to unpack: {:?}", e);
1144 }
1145 }
1146 }
1147 LinkManagerAction::LinkEstablished { link_id, dest_hash, rtt, is_initiator } => {
1148 log::info!(
1149 "Link established: {:02x?} rtt={:.3}s initiator={}",
1150 &link_id[..4], rtt, is_initiator,
1151 );
1152 self.callbacks.on_link_established(
1153 rns_core::types::LinkId(link_id),
1154 rns_core::types::DestHash(dest_hash),
1155 rtt,
1156 is_initiator,
1157 );
1158 }
1159 LinkManagerAction::LinkClosed { link_id, reason } => {
1160 log::info!("Link closed: {:02x?} reason={:?}", &link_id[..4], reason);
1161 self.holepunch_manager.link_closed(&link_id);
1162 self.callbacks.on_link_closed(rns_core::types::LinkId(link_id), reason);
1163 }
1164 LinkManagerAction::RemoteIdentified { link_id, identity_hash, public_key } => {
1165 log::debug!(
1166 "Remote identified on link {:02x?}: {:02x?}",
1167 &link_id[..4], &identity_hash[..4],
1168 );
1169 self.callbacks.on_remote_identified(
1170 rns_core::types::LinkId(link_id),
1171 rns_core::types::IdentityHash(identity_hash),
1172 public_key,
1173 );
1174 }
1175 LinkManagerAction::RegisterLinkDest { link_id } => {
1176 self.engine.register_destination(link_id, rns_core::constants::DESTINATION_LINK);
1178 }
1179 LinkManagerAction::DeregisterLinkDest { link_id } => {
1180 self.engine.deregister_destination(&link_id);
1181 }
1182 LinkManagerAction::ManagementRequest {
1183 link_id, path_hash, data, request_id, remote_identity,
1184 } => {
1185 self.handle_management_request(
1186 link_id, path_hash, data, request_id, remote_identity,
1187 );
1188 }
1189 LinkManagerAction::ResourceReceived { link_id, data, metadata } => {
1190 self.callbacks.on_resource_received(rns_core::types::LinkId(link_id), data, metadata);
1191 }
1192 LinkManagerAction::ResourceCompleted { link_id } => {
1193 self.callbacks.on_resource_completed(rns_core::types::LinkId(link_id));
1194 }
1195 LinkManagerAction::ResourceFailed { link_id, error } => {
1196 log::debug!("Resource failed on link {:02x?}: {}", &link_id[..4], error);
1197 self.callbacks.on_resource_failed(rns_core::types::LinkId(link_id), error);
1198 }
1199 LinkManagerAction::ResourceProgress { link_id, received, total } => {
1200 self.callbacks.on_resource_progress(rns_core::types::LinkId(link_id), received, total);
1201 }
1202 LinkManagerAction::ResourceAcceptQuery { link_id, resource_hash, transfer_size, has_metadata } => {
1203 let accept = self.callbacks.on_resource_accept_query(
1204 rns_core::types::LinkId(link_id), resource_hash.clone(), transfer_size, has_metadata,
1205 );
1206 let accept_actions = self.link_manager.accept_resource(
1207 &link_id, &resource_hash, accept, &mut self.rng,
1208 );
1209 self.dispatch_link_actions(accept_actions);
1211 }
1212 LinkManagerAction::ChannelMessageReceived { link_id, msgtype, payload } => {
1213 if HolePunchManager::is_holepunch_message(msgtype) {
1215 let derived_key = self.link_manager.get_derived_key(&link_id);
1216 let tx = self.get_event_sender();
1217 let (handled, hp_actions) = self.holepunch_manager.handle_signal(
1218 link_id, msgtype, payload, derived_key.as_deref(), &tx,
1219 );
1220 if handled {
1221 self.dispatch_holepunch_actions(hp_actions);
1222 }
1223 } else {
1224 self.callbacks.on_channel_message(rns_core::types::LinkId(link_id), msgtype, payload);
1225 }
1226 }
1227 LinkManagerAction::LinkDataReceived { link_id, context, data } => {
1228 self.callbacks.on_link_data(rns_core::types::LinkId(link_id), context, data);
1229 }
1230 LinkManagerAction::ResponseReceived { link_id, request_id, data } => {
1231 self.callbacks.on_response(rns_core::types::LinkId(link_id), request_id, data);
1232 }
1233 }
1234 }
1235 }
1236
1237 fn dispatch_holepunch_actions(&mut self, actions: Vec<HolePunchManagerAction>) {
1239 for action in actions {
1240 match action {
1241 HolePunchManagerAction::SendChannelMessage { link_id, msgtype, payload } => {
1242 let link_actions = self.link_manager.send_channel_message(
1243 &link_id, msgtype, &payload, &mut self.rng,
1244 );
1245 self.dispatch_link_actions(link_actions);
1246 }
1247 HolePunchManagerAction::DirectConnectEstablished { link_id, session_id, interface_id, rtt, mtu } => {
1248 log::info!(
1249 "Direct connection established for link {:02x?} session {:02x?} iface {} rtt={:.1}ms mtu={}",
1250 &link_id[..4], &session_id[..4], interface_id.0, rtt * 1000.0, mtu
1251 );
1252 self.engine.redirect_path(&link_id, interface_id, time::now());
1254 self.link_manager.set_link_rtt(&link_id, rtt);
1256 self.link_manager.set_link_mtu(&link_id, mtu);
1257 self.link_manager.record_link_inbound(&link_id);
1260 self.link_manager.flush_channel_tx(&link_id);
1262 self.callbacks.on_direct_connect_established(
1263 rns_core::types::LinkId(link_id),
1264 interface_id,
1265 );
1266 }
1267 HolePunchManagerAction::DirectConnectFailed { link_id, session_id, reason } => {
1268 log::debug!(
1269 "Direct connection failed for link {:02x?} session {:02x?} reason={}",
1270 &link_id[..4], &session_id[..4], reason
1271 );
1272 self.callbacks.on_direct_connect_failed(
1273 rns_core::types::LinkId(link_id),
1274 reason,
1275 );
1276 }
1277 }
1278 }
1279 }
1280
1281 fn get_event_sender(&self) -> crate::event::EventSender {
1286 self.event_tx.clone()
1290 }
1291
1292 const MANAGEMENT_ANNOUNCE_INTERVAL: f64 = 300.0;
1294
1295 const MANAGEMENT_ANNOUNCE_DELAY: f64 = 5.0;
1297
1298 fn tick_management_announces(&mut self, now: f64) {
1300 if self.transport_identity.is_none() {
1301 return;
1302 }
1303
1304 let uptime = now - self.started;
1305
1306 if !self.initial_announce_sent {
1308 if uptime < Self::MANAGEMENT_ANNOUNCE_DELAY {
1309 return;
1310 }
1311 self.initial_announce_sent = true;
1312 self.emit_management_announces(now);
1313 return;
1314 }
1315
1316 if now - self.last_management_announce >= Self::MANAGEMENT_ANNOUNCE_INTERVAL {
1318 self.emit_management_announces(now);
1319 }
1320 }
1321
1322 fn emit_management_announces(&mut self, now: f64) {
1324 use crate::management;
1325
1326 self.last_management_announce = now;
1327
1328 let identity = match self.transport_identity {
1329 Some(ref id) => id,
1330 None => return,
1331 };
1332
1333 let mgmt_raw = if self.management_config.enable_remote_management {
1335 management::build_management_announce(identity, &mut self.rng)
1336 } else {
1337 None
1338 };
1339
1340 let bh_raw = if self.management_config.publish_blackhole {
1341 management::build_blackhole_announce(identity, &mut self.rng)
1342 } else {
1343 None
1344 };
1345
1346 if let Some(raw) = mgmt_raw {
1347 if let Ok(packet) = RawPacket::unpack(&raw) {
1348 let actions = self.engine.handle_outbound(
1349 &packet,
1350 rns_core::constants::DESTINATION_SINGLE,
1351 None,
1352 now,
1353 );
1354 self.dispatch_all(actions);
1355 log::debug!("Emitted management destination announce");
1356 }
1357 }
1358
1359 if let Some(raw) = bh_raw {
1360 if let Ok(packet) = RawPacket::unpack(&raw) {
1361 let actions = self.engine.handle_outbound(
1362 &packet,
1363 rns_core::constants::DESTINATION_SINGLE,
1364 None,
1365 now,
1366 );
1367 self.dispatch_all(actions);
1368 log::debug!("Emitted blackhole info announce");
1369 }
1370 }
1371 }
1372
1373 fn handle_management_request(
1375 &mut self,
1376 link_id: [u8; 16],
1377 path_hash: [u8; 16],
1378 data: Vec<u8>,
1379 request_id: [u8; 16],
1380 remote_identity: Option<([u8; 16], [u8; 64])>,
1381 ) {
1382 use crate::management;
1383
1384 let is_restricted = path_hash == management::status_path_hash()
1386 || path_hash == management::path_path_hash();
1387
1388 if is_restricted && !self.management_config.remote_management_allowed.is_empty() {
1389 match remote_identity {
1390 Some((identity_hash, _)) => {
1391 if !self.management_config.remote_management_allowed.contains(&identity_hash) {
1392 log::debug!("Management request denied: identity not in allowed list");
1393 return;
1394 }
1395 }
1396 None => {
1397 log::debug!("Management request denied: peer not identified");
1398 return;
1399 }
1400 }
1401 }
1402
1403 let response_data = if path_hash == management::status_path_hash() {
1404 management::handle_status_request(&data, &self.engine, &self.interfaces, self.started)
1405 } else if path_hash == management::path_path_hash() {
1406 management::handle_path_request(&data, &self.engine)
1407 } else if path_hash == management::list_path_hash() {
1408 management::handle_blackhole_list_request(&self.engine)
1409 } else {
1410 log::warn!("Unknown management path_hash: {:02x?}", &path_hash[..4]);
1411 None
1412 };
1413
1414 if let Some(response) = response_data {
1415 let actions = self.link_manager.send_management_response(
1416 &link_id, &request_id, &response, &mut self.rng,
1417 );
1418 self.dispatch_link_actions(actions);
1419 }
1420 }
1421}
1422
1423#[cfg(test)]
1424mod tests {
1425 use super::*;
1426 use crate::event;
1427 use crate::interface::Writer;
1428 use rns_core::announce::AnnounceData;
1429 use rns_core::constants;
1430 use rns_core::packet::PacketFlags;
1431 use rns_core::transport::types::InterfaceInfo;
1432 use rns_crypto::identity::Identity;
1433 use std::io;
1434 use std::sync::mpsc;
1435 use std::sync::{Arc, Mutex};
1436
1437 struct MockWriter {
1438 sent: Arc<Mutex<Vec<Vec<u8>>>>,
1439 }
1440
1441 impl MockWriter {
1442 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
1443 let sent = Arc::new(Mutex::new(Vec::new()));
1444 (MockWriter { sent: sent.clone() }, sent)
1445 }
1446 }
1447
1448 impl Writer for MockWriter {
1449 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
1450 self.sent.lock().unwrap().push(data.to_vec());
1451 Ok(())
1452 }
1453 }
1454
1455 use rns_core::types::{DestHash, IdentityHash, LinkId as TypedLinkId, PacketHash};
1456
1457 struct MockCallbacks {
1458 announces: Arc<Mutex<Vec<(DestHash, u8)>>>,
1459 paths: Arc<Mutex<Vec<(DestHash, u8)>>>,
1460 deliveries: Arc<Mutex<Vec<DestHash>>>,
1461 iface_ups: Arc<Mutex<Vec<InterfaceId>>>,
1462 iface_downs: Arc<Mutex<Vec<InterfaceId>>>,
1463 link_established: Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
1464 link_closed: Arc<Mutex<Vec<TypedLinkId>>>,
1465 remote_identified: Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
1466 resources_received: Arc<Mutex<Vec<(TypedLinkId, Vec<u8>)>>>,
1467 resource_completed: Arc<Mutex<Vec<TypedLinkId>>>,
1468 resource_failed: Arc<Mutex<Vec<(TypedLinkId, String)>>>,
1469 channel_messages: Arc<Mutex<Vec<(TypedLinkId, u16, Vec<u8>)>>>,
1470 link_data: Arc<Mutex<Vec<(TypedLinkId, u8, Vec<u8>)>>>,
1471 responses: Arc<Mutex<Vec<(TypedLinkId, [u8; 16], Vec<u8>)>>>,
1472 proofs: Arc<Mutex<Vec<(DestHash, PacketHash, f64)>>>,
1473 proof_requested: Arc<Mutex<Vec<(DestHash, PacketHash)>>>,
1474 }
1475
1476 impl MockCallbacks {
1477 fn new() -> (
1478 Self,
1479 Arc<Mutex<Vec<(DestHash, u8)>>>,
1480 Arc<Mutex<Vec<(DestHash, u8)>>>,
1481 Arc<Mutex<Vec<DestHash>>>,
1482 Arc<Mutex<Vec<InterfaceId>>>,
1483 Arc<Mutex<Vec<InterfaceId>>>,
1484 ) {
1485 let announces = Arc::new(Mutex::new(Vec::new()));
1486 let paths = Arc::new(Mutex::new(Vec::new()));
1487 let deliveries = Arc::new(Mutex::new(Vec::new()));
1488 let iface_ups = Arc::new(Mutex::new(Vec::new()));
1489 let iface_downs = Arc::new(Mutex::new(Vec::new()));
1490 (
1491 MockCallbacks {
1492 announces: announces.clone(),
1493 paths: paths.clone(),
1494 deliveries: deliveries.clone(),
1495 iface_ups: iface_ups.clone(),
1496 iface_downs: iface_downs.clone(),
1497 link_established: Arc::new(Mutex::new(Vec::new())),
1498 link_closed: Arc::new(Mutex::new(Vec::new())),
1499 remote_identified: Arc::new(Mutex::new(Vec::new())),
1500 resources_received: Arc::new(Mutex::new(Vec::new())),
1501 resource_completed: Arc::new(Mutex::new(Vec::new())),
1502 resource_failed: Arc::new(Mutex::new(Vec::new())),
1503 channel_messages: Arc::new(Mutex::new(Vec::new())),
1504 link_data: Arc::new(Mutex::new(Vec::new())),
1505 responses: Arc::new(Mutex::new(Vec::new())),
1506 proofs: Arc::new(Mutex::new(Vec::new())),
1507 proof_requested: Arc::new(Mutex::new(Vec::new())),
1508 },
1509 announces,
1510 paths,
1511 deliveries,
1512 iface_ups,
1513 iface_downs,
1514 )
1515 }
1516
1517 fn with_link_tracking() -> (
1518 Self,
1519 Arc<Mutex<Vec<(TypedLinkId, f64, bool)>>>,
1520 Arc<Mutex<Vec<TypedLinkId>>>,
1521 Arc<Mutex<Vec<(TypedLinkId, IdentityHash)>>>,
1522 ) {
1523 let link_established = Arc::new(Mutex::new(Vec::new()));
1524 let link_closed = Arc::new(Mutex::new(Vec::new()));
1525 let remote_identified = Arc::new(Mutex::new(Vec::new()));
1526 (
1527 MockCallbacks {
1528 announces: Arc::new(Mutex::new(Vec::new())),
1529 paths: Arc::new(Mutex::new(Vec::new())),
1530 deliveries: Arc::new(Mutex::new(Vec::new())),
1531 iface_ups: Arc::new(Mutex::new(Vec::new())),
1532 iface_downs: Arc::new(Mutex::new(Vec::new())),
1533 link_established: link_established.clone(),
1534 link_closed: link_closed.clone(),
1535 remote_identified: remote_identified.clone(),
1536 resources_received: Arc::new(Mutex::new(Vec::new())),
1537 resource_completed: Arc::new(Mutex::new(Vec::new())),
1538 resource_failed: Arc::new(Mutex::new(Vec::new())),
1539 channel_messages: Arc::new(Mutex::new(Vec::new())),
1540 link_data: Arc::new(Mutex::new(Vec::new())),
1541 responses: Arc::new(Mutex::new(Vec::new())),
1542 proofs: Arc::new(Mutex::new(Vec::new())),
1543 proof_requested: Arc::new(Mutex::new(Vec::new())),
1544 },
1545 link_established,
1546 link_closed,
1547 remote_identified,
1548 )
1549 }
1550 }
1551
1552 impl Callbacks for MockCallbacks {
1553 fn on_announce(&mut self, announced: crate::destination::AnnouncedIdentity) {
1554 self.announces.lock().unwrap().push((announced.dest_hash, announced.hops));
1555 }
1556
1557 fn on_path_updated(&mut self, dest_hash: DestHash, hops: u8) {
1558 self.paths.lock().unwrap().push((dest_hash, hops));
1559 }
1560
1561 fn on_local_delivery(&mut self, dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
1562 self.deliveries.lock().unwrap().push(dest_hash);
1563 }
1564
1565 fn on_interface_up(&mut self, id: InterfaceId) {
1566 self.iface_ups.lock().unwrap().push(id);
1567 }
1568
1569 fn on_interface_down(&mut self, id: InterfaceId) {
1570 self.iface_downs.lock().unwrap().push(id);
1571 }
1572
1573 fn on_link_established(&mut self, link_id: TypedLinkId, _dest_hash: DestHash, rtt: f64, is_initiator: bool) {
1574 self.link_established.lock().unwrap().push((link_id, rtt, is_initiator));
1575 }
1576
1577 fn on_link_closed(&mut self, link_id: TypedLinkId, _reason: Option<rns_core::link::TeardownReason>) {
1578 self.link_closed.lock().unwrap().push(link_id);
1579 }
1580
1581 fn on_remote_identified(&mut self, link_id: TypedLinkId, identity_hash: IdentityHash, _public_key: [u8; 64]) {
1582 self.remote_identified.lock().unwrap().push((link_id, identity_hash));
1583 }
1584
1585 fn on_resource_received(&mut self, link_id: TypedLinkId, data: Vec<u8>, _metadata: Option<Vec<u8>>) {
1586 self.resources_received.lock().unwrap().push((link_id, data));
1587 }
1588
1589 fn on_resource_completed(&mut self, link_id: TypedLinkId) {
1590 self.resource_completed.lock().unwrap().push(link_id);
1591 }
1592
1593 fn on_resource_failed(&mut self, link_id: TypedLinkId, error: String) {
1594 self.resource_failed.lock().unwrap().push((link_id, error));
1595 }
1596
1597 fn on_channel_message(&mut self, link_id: TypedLinkId, msgtype: u16, payload: Vec<u8>) {
1598 self.channel_messages.lock().unwrap().push((link_id, msgtype, payload));
1599 }
1600
1601 fn on_link_data(&mut self, link_id: TypedLinkId, context: u8, data: Vec<u8>) {
1602 self.link_data.lock().unwrap().push((link_id, context, data));
1603 }
1604
1605 fn on_response(&mut self, link_id: TypedLinkId, request_id: [u8; 16], data: Vec<u8>) {
1606 self.responses.lock().unwrap().push((link_id, request_id, data));
1607 }
1608
1609 fn on_proof(&mut self, dest_hash: DestHash, packet_hash: PacketHash, rtt: f64) {
1610 self.proofs.lock().unwrap().push((dest_hash, packet_hash, rtt));
1611 }
1612
1613 fn on_proof_requested(&mut self, dest_hash: DestHash, packet_hash: PacketHash) -> bool {
1614 self.proof_requested.lock().unwrap().push((dest_hash, packet_hash));
1615 true
1616 }
1617 }
1618
1619 fn make_interface_info(id: u64) -> InterfaceInfo {
1620 InterfaceInfo {
1621 id: InterfaceId(id),
1622 name: format!("test-{}", id),
1623 mode: constants::MODE_FULL,
1624 out_capable: true,
1625 in_capable: true,
1626 bitrate: None,
1627 announce_rate_target: None,
1628 announce_rate_grace: 0,
1629 announce_rate_penalty: 0.0,
1630 announce_cap: rns_core::constants::ANNOUNCE_CAP,
1631 is_local_client: false,
1632 wants_tunnel: false,
1633 tunnel_id: None,
1634 mtu: constants::MTU as u32,
1635 ia_freq: 0.0,
1636 started: 0.0,
1637 ingress_control: false,
1638 }
1639 }
1640
1641 fn make_entry(id: u64, writer: Box<dyn Writer>, online: bool) -> InterfaceEntry {
1642 InterfaceEntry {
1643 id: InterfaceId(id),
1644 info: make_interface_info(id),
1645 writer,
1646 online,
1647 dynamic: false,
1648 ifac: None,
1649 stats: InterfaceStats::default(),
1650 interface_type: String::new(),
1651 }
1652 }
1653
1654 fn build_announce_packet(identity: &Identity) -> Vec<u8> {
1656 let dest_hash = rns_core::destination::destination_hash(
1657 "test",
1658 &["app"],
1659 Some(identity.hash()),
1660 );
1661 let name_hash = rns_core::destination::name_hash("test", &["app"]);
1662 let random_hash = [0x42u8; 10];
1663
1664 let (announce_data, _has_ratchet) = AnnounceData::pack(
1665 identity,
1666 &dest_hash,
1667 &name_hash,
1668 &random_hash,
1669 None,
1670 None,
1671 )
1672 .unwrap();
1673
1674 let flags = PacketFlags {
1675 header_type: constants::HEADER_1,
1676 context_flag: constants::FLAG_UNSET,
1677 transport_type: constants::TRANSPORT_BROADCAST,
1678 destination_type: constants::DESTINATION_SINGLE,
1679 packet_type: constants::PACKET_TYPE_ANNOUNCE,
1680 };
1681
1682 let packet = RawPacket::pack(flags, 0, &dest_hash, None, constants::CONTEXT_NONE, &announce_data).unwrap();
1683 packet.raw
1684 }
1685
1686 #[test]
1687 fn process_inbound_frame() {
1688 let (tx, rx) = event::channel();
1689 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
1690 let mut driver = Driver::new(
1691 TransportConfig { transport_enabled: false, identity_hash: None },
1692 rx,
1693 tx.clone(),
1694 Box::new(cbs),
1695 );
1696 let info = make_interface_info(1);
1697 driver.engine.register_interface(info.clone());
1698 let (writer, _sent) = MockWriter::new();
1699 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
1700
1701 let identity = Identity::new(&mut OsRng);
1702 let announce_raw = build_announce_packet(&identity);
1703
1704 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
1706 tx.send(Event::Shutdown).unwrap();
1707 driver.run();
1708
1709 assert_eq!(announces.lock().unwrap().len(), 1);
1710 }
1711
1712 #[test]
1713 fn dispatch_send() {
1714 let (tx, rx) = event::channel();
1715 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1716 let mut driver = Driver::new(
1717 TransportConfig { transport_enabled: false, identity_hash: None },
1718 rx,
1719 tx.clone(),
1720 Box::new(cbs),
1721 );
1722 let (writer, sent) = MockWriter::new();
1723 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
1724
1725 driver.dispatch_all(vec![TransportAction::SendOnInterface {
1726 interface: InterfaceId(1),
1727 raw: vec![0x01, 0x02, 0x03],
1728 }]);
1729
1730 assert_eq!(sent.lock().unwrap().len(), 1);
1731 assert_eq!(sent.lock().unwrap()[0], vec![0x01, 0x02, 0x03]);
1732
1733 drop(tx);
1734 }
1735
1736 #[test]
1737 fn dispatch_broadcast() {
1738 let (tx, rx) = event::channel();
1739 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1740 let mut driver = Driver::new(
1741 TransportConfig { transport_enabled: false, identity_hash: None },
1742 rx,
1743 tx.clone(),
1744 Box::new(cbs),
1745 );
1746
1747 let (w1, sent1) = MockWriter::new();
1748 let (w2, sent2) = MockWriter::new();
1749 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
1750 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
1751
1752 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
1753 raw: vec![0xAA],
1754 exclude: None,
1755 }]);
1756
1757 assert_eq!(sent1.lock().unwrap().len(), 1);
1758 assert_eq!(sent2.lock().unwrap().len(), 1);
1759
1760 drop(tx);
1761 }
1762
1763 #[test]
1764 fn dispatch_broadcast_exclude() {
1765 let (tx, rx) = event::channel();
1766 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1767 let mut driver = Driver::new(
1768 TransportConfig { transport_enabled: false, identity_hash: None },
1769 rx,
1770 tx.clone(),
1771 Box::new(cbs),
1772 );
1773
1774 let (w1, sent1) = MockWriter::new();
1775 let (w2, sent2) = MockWriter::new();
1776 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
1777 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
1778
1779 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
1780 raw: vec![0xBB],
1781 exclude: Some(InterfaceId(1)),
1782 }]);
1783
1784 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
1786
1787 drop(tx);
1788 }
1789
1790 #[test]
1791 fn tick_event() {
1792 let (tx, rx) = event::channel();
1793 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1794 let mut driver = Driver::new(
1795 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
1796 rx,
1797 tx.clone(),
1798 Box::new(cbs),
1799 );
1800 let info = make_interface_info(1);
1801 driver.engine.register_interface(info.clone());
1802 let (writer, _sent) = MockWriter::new();
1803 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
1804
1805 tx.send(Event::Tick).unwrap();
1807 tx.send(Event::Shutdown).unwrap();
1808 driver.run();
1809 }
1811
1812 #[test]
1813 fn shutdown_event() {
1814 let (tx, rx) = event::channel();
1815 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1816 let mut driver = Driver::new(
1817 TransportConfig { transport_enabled: false, identity_hash: None },
1818 rx,
1819 tx.clone(),
1820 Box::new(cbs),
1821 );
1822
1823 tx.send(Event::Shutdown).unwrap();
1824 driver.run(); }
1826
1827 #[test]
1828 fn announce_callback() {
1829 let (tx, rx) = event::channel();
1830 let (cbs, announces, paths, _, _, _) = MockCallbacks::new();
1831 let mut driver = Driver::new(
1832 TransportConfig { transport_enabled: false, identity_hash: None },
1833 rx,
1834 tx.clone(),
1835 Box::new(cbs),
1836 );
1837 let info = make_interface_info(1);
1838 driver.engine.register_interface(info.clone());
1839 let (writer, _sent) = MockWriter::new();
1840 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
1841
1842 let identity = Identity::new(&mut OsRng);
1843 let announce_raw = build_announce_packet(&identity);
1844
1845 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
1846 tx.send(Event::Shutdown).unwrap();
1847 driver.run();
1848
1849 let ann = announces.lock().unwrap();
1850 assert_eq!(ann.len(), 1);
1851 assert_eq!(ann[0].1, 1);
1853
1854 let p = paths.lock().unwrap();
1855 assert_eq!(p.len(), 1);
1856 }
1857
1858 #[test]
1859 fn dispatch_skips_offline_interface() {
1860 let (tx, rx) = event::channel();
1861 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1862 let mut driver = Driver::new(
1863 TransportConfig { transport_enabled: false, identity_hash: None },
1864 rx,
1865 tx.clone(),
1866 Box::new(cbs),
1867 );
1868
1869 let (w1, sent1) = MockWriter::new();
1870 let (w2, sent2) = MockWriter::new();
1871 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), false)); driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
1873
1874 driver.dispatch_all(vec![TransportAction::SendOnInterface {
1876 interface: InterfaceId(1),
1877 raw: vec![0x01],
1878 }]);
1879 assert_eq!(sent1.lock().unwrap().len(), 0);
1880
1881 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
1883 raw: vec![0x02],
1884 exclude: None,
1885 }]);
1886 assert_eq!(sent1.lock().unwrap().len(), 0); assert_eq!(sent2.lock().unwrap().len(), 1);
1888
1889 drop(tx);
1890 }
1891
1892 #[test]
1893 fn interface_up_refreshes_writer() {
1894 let (tx, rx) = event::channel();
1895 let (cbs, _, _, _, _, _) = MockCallbacks::new();
1896 let mut driver = Driver::new(
1897 TransportConfig { transport_enabled: false, identity_hash: None },
1898 rx,
1899 tx.clone(),
1900 Box::new(cbs),
1901 );
1902
1903 let (w_old, sent_old) = MockWriter::new();
1904 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w_old), false));
1905
1906 let (w_new, sent_new) = MockWriter::new();
1908 tx.send(Event::InterfaceUp(InterfaceId(1), Some(Box::new(w_new)), None)).unwrap();
1909 tx.send(Event::Shutdown).unwrap();
1910 driver.run();
1911
1912 assert!(driver.interfaces[&InterfaceId(1)].online);
1914
1915 driver.dispatch_all(vec![TransportAction::SendOnInterface {
1917 interface: InterfaceId(1),
1918 raw: vec![0xFF],
1919 }]);
1920
1921 assert_eq!(sent_old.lock().unwrap().len(), 0);
1923 assert_eq!(sent_new.lock().unwrap().len(), 1);
1925 assert_eq!(sent_new.lock().unwrap()[0], vec![0xFF]);
1926
1927 drop(tx);
1928 }
1929
1930 #[test]
1931 fn dynamic_interface_register() {
1932 let (tx, rx) = event::channel();
1933 let (cbs, _, _, _, iface_ups, _) = MockCallbacks::new();
1934 let mut driver = Driver::new(
1935 TransportConfig { transport_enabled: false, identity_hash: None },
1936 rx,
1937 tx.clone(),
1938 Box::new(cbs),
1939 );
1940
1941 let info = make_interface_info(100);
1942 let (writer, sent) = MockWriter::new();
1943
1944 tx.send(Event::InterfaceUp(
1946 InterfaceId(100),
1947 Some(Box::new(writer)),
1948 Some(info),
1949 ))
1950 .unwrap();
1951 tx.send(Event::Shutdown).unwrap();
1952 driver.run();
1953
1954 assert!(driver.interfaces.contains_key(&InterfaceId(100)));
1956 assert!(driver.interfaces[&InterfaceId(100)].online);
1957 assert!(driver.interfaces[&InterfaceId(100)].dynamic);
1958
1959 assert_eq!(iface_ups.lock().unwrap().len(), 1);
1961 assert_eq!(iface_ups.lock().unwrap()[0], InterfaceId(100));
1962
1963 driver.dispatch_all(vec![TransportAction::SendOnInterface {
1965 interface: InterfaceId(100),
1966 raw: vec![0x42],
1967 }]);
1968 assert_eq!(sent.lock().unwrap().len(), 1);
1969
1970 drop(tx);
1971 }
1972
1973 #[test]
1974 fn dynamic_interface_deregister() {
1975 let (tx, rx) = event::channel();
1976 let (cbs, _, _, _, _, iface_downs) = MockCallbacks::new();
1977 let mut driver = Driver::new(
1978 TransportConfig { transport_enabled: false, identity_hash: None },
1979 rx,
1980 tx.clone(),
1981 Box::new(cbs),
1982 );
1983
1984 let info = make_interface_info(200);
1986 driver.engine.register_interface(info.clone());
1987 let (writer, _sent) = MockWriter::new();
1988 driver.interfaces.insert(InterfaceId(200), InterfaceEntry {
1989 id: InterfaceId(200),
1990 info,
1991 writer: Box::new(writer),
1992 online: true,
1993 dynamic: true,
1994 ifac: None,
1995 stats: InterfaceStats::default(),
1996 interface_type: String::new(),
1997 });
1998
1999 tx.send(Event::InterfaceDown(InterfaceId(200))).unwrap();
2001 tx.send(Event::Shutdown).unwrap();
2002 driver.run();
2003
2004 assert!(!driver.interfaces.contains_key(&InterfaceId(200)));
2005 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2006 assert_eq!(iface_downs.lock().unwrap()[0], InterfaceId(200));
2007 }
2008
2009 #[test]
2010 fn interface_callbacks_fire() {
2011 let (tx, rx) = event::channel();
2012 let (cbs, _, _, _, iface_ups, iface_downs) = MockCallbacks::new();
2013 let mut driver = Driver::new(
2014 TransportConfig { transport_enabled: false, identity_hash: None },
2015 rx,
2016 tx.clone(),
2017 Box::new(cbs),
2018 );
2019
2020 let (writer, _) = MockWriter::new();
2022 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), false));
2023
2024 tx.send(Event::InterfaceUp(InterfaceId(1), None, None)).unwrap();
2025 tx.send(Event::InterfaceDown(InterfaceId(1))).unwrap();
2026 tx.send(Event::Shutdown).unwrap();
2027 driver.run();
2028
2029 assert_eq!(iface_ups.lock().unwrap().len(), 1);
2030 assert_eq!(iface_downs.lock().unwrap().len(), 1);
2031 assert!(driver.interfaces.contains_key(&InterfaceId(1)));
2033 assert!(!driver.interfaces[&InterfaceId(1)].online);
2034 }
2035
2036 #[test]
2041 fn frame_updates_rx_stats() {
2042 let (tx, rx) = event::channel();
2043 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2044 let mut driver = Driver::new(
2045 TransportConfig { transport_enabled: false, identity_hash: None },
2046 rx,
2047 tx.clone(),
2048 Box::new(cbs),
2049 );
2050 let info = make_interface_info(1);
2051 driver.engine.register_interface(info.clone());
2052 let (writer, _sent) = MockWriter::new();
2053 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2054
2055 let identity = Identity::new(&mut OsRng);
2056 let announce_raw = build_announce_packet(&identity);
2057 let announce_len = announce_raw.len() as u64;
2058
2059 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2060 tx.send(Event::Shutdown).unwrap();
2061 driver.run();
2062
2063 let stats = &driver.interfaces[&InterfaceId(1)].stats;
2064 assert_eq!(stats.rxb, announce_len);
2065 assert_eq!(stats.rx_packets, 1);
2066 }
2067
2068 #[test]
2069 fn send_updates_tx_stats() {
2070 let (tx, rx) = event::channel();
2071 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2072 let mut driver = Driver::new(
2073 TransportConfig { transport_enabled: false, identity_hash: None },
2074 rx,
2075 tx.clone(),
2076 Box::new(cbs),
2077 );
2078 let (writer, _sent) = MockWriter::new();
2079 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2080
2081 driver.dispatch_all(vec![TransportAction::SendOnInterface {
2082 interface: InterfaceId(1),
2083 raw: vec![0x01, 0x02, 0x03],
2084 }]);
2085
2086 let stats = &driver.interfaces[&InterfaceId(1)].stats;
2087 assert_eq!(stats.txb, 3);
2088 assert_eq!(stats.tx_packets, 1);
2089
2090 drop(tx);
2091 }
2092
2093 #[test]
2094 fn broadcast_updates_tx_stats() {
2095 let (tx, rx) = event::channel();
2096 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2097 let mut driver = Driver::new(
2098 TransportConfig { transport_enabled: false, identity_hash: None },
2099 rx,
2100 tx.clone(),
2101 Box::new(cbs),
2102 );
2103 let (w1, _s1) = MockWriter::new();
2104 let (w2, _s2) = MockWriter::new();
2105 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(w1), true));
2106 driver.interfaces.insert(InterfaceId(2), make_entry(2, Box::new(w2), true));
2107
2108 driver.dispatch_all(vec![TransportAction::BroadcastOnAllInterfaces {
2109 raw: vec![0xAA, 0xBB],
2110 exclude: None,
2111 }]);
2112
2113 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.txb, 2);
2115 assert_eq!(driver.interfaces[&InterfaceId(1)].stats.tx_packets, 1);
2116 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.txb, 2);
2117 assert_eq!(driver.interfaces[&InterfaceId(2)].stats.tx_packets, 1);
2118
2119 drop(tx);
2120 }
2121
2122 #[test]
2123 fn query_interface_stats() {
2124 let (tx, rx) = event::channel();
2125 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2126 let mut driver = Driver::new(
2127 TransportConfig { transport_enabled: true, identity_hash: Some([0x42; 16]) },
2128 rx,
2129 tx.clone(),
2130 Box::new(cbs),
2131 );
2132 let (writer, _sent) = MockWriter::new();
2133 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2134
2135 let (resp_tx, resp_rx) = mpsc::channel();
2136 tx.send(Event::Query(QueryRequest::InterfaceStats, resp_tx)).unwrap();
2137 tx.send(Event::Shutdown).unwrap();
2138 driver.run();
2139
2140 let resp = resp_rx.recv().unwrap();
2141 match resp {
2142 QueryResponse::InterfaceStats(stats) => {
2143 assert_eq!(stats.interfaces.len(), 1);
2144 assert_eq!(stats.interfaces[0].name, "test-1");
2145 assert!(stats.interfaces[0].status);
2146 assert_eq!(stats.transport_id, Some([0x42; 16]));
2147 assert!(stats.transport_enabled);
2148 }
2149 _ => panic!("unexpected response"),
2150 }
2151 }
2152
2153 #[test]
2154 fn query_path_table() {
2155 let (tx, rx) = event::channel();
2156 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2157 let mut driver = Driver::new(
2158 TransportConfig { transport_enabled: false, identity_hash: None },
2159 rx,
2160 tx.clone(),
2161 Box::new(cbs),
2162 );
2163 let info = make_interface_info(1);
2164 driver.engine.register_interface(info);
2165 let (writer, _sent) = MockWriter::new();
2166 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2167
2168 let identity = Identity::new(&mut OsRng);
2170 let announce_raw = build_announce_packet(&identity);
2171 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2172
2173 let (resp_tx, resp_rx) = mpsc::channel();
2174 tx.send(Event::Query(QueryRequest::PathTable { max_hops: None }, resp_tx)).unwrap();
2175 tx.send(Event::Shutdown).unwrap();
2176 driver.run();
2177
2178 let resp = resp_rx.recv().unwrap();
2179 match resp {
2180 QueryResponse::PathTable(entries) => {
2181 assert_eq!(entries.len(), 1);
2182 assert_eq!(entries[0].hops, 1);
2183 }
2184 _ => panic!("unexpected response"),
2185 }
2186 }
2187
2188 #[test]
2189 fn query_drop_path() {
2190 let (tx, rx) = event::channel();
2191 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2192 let mut driver = Driver::new(
2193 TransportConfig { transport_enabled: false, identity_hash: None },
2194 rx,
2195 tx.clone(),
2196 Box::new(cbs),
2197 );
2198 let info = make_interface_info(1);
2199 driver.engine.register_interface(info);
2200 let (writer, _sent) = MockWriter::new();
2201 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2202
2203 let identity = Identity::new(&mut OsRng);
2205 let announce_raw = build_announce_packet(&identity);
2206 let dest_hash = rns_core::destination::destination_hash(
2207 "test", &["app"], Some(identity.hash()),
2208 );
2209
2210 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2211
2212 let (resp_tx, resp_rx) = mpsc::channel();
2213 tx.send(Event::Query(QueryRequest::DropPath { dest_hash }, resp_tx)).unwrap();
2214 tx.send(Event::Shutdown).unwrap();
2215 driver.run();
2216
2217 let resp = resp_rx.recv().unwrap();
2218 match resp {
2219 QueryResponse::DropPath(dropped) => {
2220 assert!(dropped);
2221 }
2222 _ => panic!("unexpected response"),
2223 }
2224 }
2225
2226 #[test]
2227 fn send_outbound_event() {
2228 let (tx, rx) = event::channel();
2229 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2230 let mut driver = Driver::new(
2231 TransportConfig { transport_enabled: false, identity_hash: None },
2232 rx,
2233 tx.clone(),
2234 Box::new(cbs),
2235 );
2236 let (writer, sent) = MockWriter::new();
2237 let info = make_interface_info(1);
2238 driver.engine.register_interface(info);
2239 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2240
2241 let dest = [0xAA; 16];
2243 let flags = PacketFlags {
2244 header_type: constants::HEADER_1,
2245 context_flag: constants::FLAG_UNSET,
2246 transport_type: constants::TRANSPORT_BROADCAST,
2247 destination_type: constants::DESTINATION_PLAIN,
2248 packet_type: constants::PACKET_TYPE_DATA,
2249 };
2250 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
2251
2252 tx.send(Event::SendOutbound {
2253 raw: packet.raw,
2254 dest_type: constants::DESTINATION_PLAIN,
2255 attached_interface: None,
2256 }).unwrap();
2257 tx.send(Event::Shutdown).unwrap();
2258 driver.run();
2259
2260 assert_eq!(sent.lock().unwrap().len(), 1);
2262 }
2263
2264 #[test]
2265 fn register_destination_and_deliver() {
2266 let (tx, rx) = event::channel();
2267 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
2268 let mut driver = Driver::new(
2269 TransportConfig { transport_enabled: false, identity_hash: None },
2270 rx,
2271 tx.clone(),
2272 Box::new(cbs),
2273 );
2274 let info = make_interface_info(1);
2275 driver.engine.register_interface(info);
2276 let (writer, _sent) = MockWriter::new();
2277 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2278
2279 let dest = [0xBB; 16];
2280
2281 tx.send(Event::RegisterDestination {
2283 dest_hash: dest,
2284 dest_type: constants::DESTINATION_SINGLE,
2285 }).unwrap();
2286
2287 let flags = PacketFlags {
2288 header_type: constants::HEADER_1,
2289 context_flag: constants::FLAG_UNSET,
2290 transport_type: constants::TRANSPORT_BROADCAST,
2291 destination_type: constants::DESTINATION_SINGLE,
2292 packet_type: constants::PACKET_TYPE_DATA,
2293 };
2294 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"data").unwrap();
2295 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
2296 tx.send(Event::Shutdown).unwrap();
2297 driver.run();
2298
2299 assert_eq!(deliveries.lock().unwrap().len(), 1);
2300 assert_eq!(deliveries.lock().unwrap()[0], DestHash(dest));
2301 }
2302
2303 #[test]
2304 fn query_transport_identity() {
2305 let (tx, rx) = event::channel();
2306 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2307 let mut driver = Driver::new(
2308 TransportConfig { transport_enabled: true, identity_hash: Some([0xAA; 16]) },
2309 rx,
2310 tx.clone(),
2311 Box::new(cbs),
2312 );
2313
2314 let (resp_tx, resp_rx) = mpsc::channel();
2315 tx.send(Event::Query(QueryRequest::TransportIdentity, resp_tx)).unwrap();
2316 tx.send(Event::Shutdown).unwrap();
2317 driver.run();
2318
2319 match resp_rx.recv().unwrap() {
2320 QueryResponse::TransportIdentity(Some(hash)) => {
2321 assert_eq!(hash, [0xAA; 16]);
2322 }
2323 _ => panic!("unexpected response"),
2324 }
2325 }
2326
2327 #[test]
2328 fn query_link_count() {
2329 let (tx, rx) = event::channel();
2330 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2331 let mut driver = Driver::new(
2332 TransportConfig { transport_enabled: false, identity_hash: None },
2333 rx,
2334 tx.clone(),
2335 Box::new(cbs),
2336 );
2337
2338 let (resp_tx, resp_rx) = mpsc::channel();
2339 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
2340 tx.send(Event::Shutdown).unwrap();
2341 driver.run();
2342
2343 match resp_rx.recv().unwrap() {
2344 QueryResponse::LinkCount(count) => assert_eq!(count, 0),
2345 _ => panic!("unexpected response"),
2346 }
2347 }
2348
2349 #[test]
2350 fn query_rate_table() {
2351 let (tx, rx) = event::channel();
2352 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2353 let mut driver = Driver::new(
2354 TransportConfig { transport_enabled: false, identity_hash: None },
2355 rx,
2356 tx.clone(),
2357 Box::new(cbs),
2358 );
2359
2360 let (resp_tx, resp_rx) = mpsc::channel();
2361 tx.send(Event::Query(QueryRequest::RateTable, resp_tx)).unwrap();
2362 tx.send(Event::Shutdown).unwrap();
2363 driver.run();
2364
2365 match resp_rx.recv().unwrap() {
2366 QueryResponse::RateTable(entries) => assert!(entries.is_empty()),
2367 _ => panic!("unexpected response"),
2368 }
2369 }
2370
2371 #[test]
2372 fn query_next_hop() {
2373 let (tx, rx) = event::channel();
2374 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2375 let mut driver = Driver::new(
2376 TransportConfig { transport_enabled: false, identity_hash: None },
2377 rx,
2378 tx.clone(),
2379 Box::new(cbs),
2380 );
2381
2382 let dest = [0xBB; 16];
2383 let (resp_tx, resp_rx) = mpsc::channel();
2384 tx.send(Event::Query(QueryRequest::NextHop { dest_hash: dest }, resp_tx)).unwrap();
2385 tx.send(Event::Shutdown).unwrap();
2386 driver.run();
2387
2388 match resp_rx.recv().unwrap() {
2389 QueryResponse::NextHop(None) => {}
2390 _ => panic!("unexpected response"),
2391 }
2392 }
2393
2394 #[test]
2395 fn query_next_hop_if_name() {
2396 let (tx, rx) = event::channel();
2397 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2398 let mut driver = Driver::new(
2399 TransportConfig { transport_enabled: false, identity_hash: None },
2400 rx,
2401 tx.clone(),
2402 Box::new(cbs),
2403 );
2404
2405 let dest = [0xCC; 16];
2406 let (resp_tx, resp_rx) = mpsc::channel();
2407 tx.send(Event::Query(QueryRequest::NextHopIfName { dest_hash: dest }, resp_tx)).unwrap();
2408 tx.send(Event::Shutdown).unwrap();
2409 driver.run();
2410
2411 match resp_rx.recv().unwrap() {
2412 QueryResponse::NextHopIfName(None) => {}
2413 _ => panic!("unexpected response"),
2414 }
2415 }
2416
2417 #[test]
2418 fn query_drop_all_via() {
2419 let (tx, rx) = event::channel();
2420 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2421 let mut driver = Driver::new(
2422 TransportConfig { transport_enabled: false, identity_hash: None },
2423 rx,
2424 tx.clone(),
2425 Box::new(cbs),
2426 );
2427
2428 let transport = [0xDD; 16];
2429 let (resp_tx, resp_rx) = mpsc::channel();
2430 tx.send(Event::Query(
2431 QueryRequest::DropAllVia { transport_hash: transport },
2432 resp_tx,
2433 )).unwrap();
2434 tx.send(Event::Shutdown).unwrap();
2435 driver.run();
2436
2437 match resp_rx.recv().unwrap() {
2438 QueryResponse::DropAllVia(count) => assert_eq!(count, 0),
2439 _ => panic!("unexpected response"),
2440 }
2441 }
2442
2443 #[test]
2444 fn query_drop_announce_queues() {
2445 let (tx, rx) = event::channel();
2446 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2447 let mut driver = Driver::new(
2448 TransportConfig { transport_enabled: false, identity_hash: None },
2449 rx,
2450 tx.clone(),
2451 Box::new(cbs),
2452 );
2453
2454 let (resp_tx, resp_rx) = mpsc::channel();
2455 tx.send(Event::Query(QueryRequest::DropAnnounceQueues, resp_tx)).unwrap();
2456 tx.send(Event::Shutdown).unwrap();
2457 driver.run();
2458
2459 match resp_rx.recv().unwrap() {
2460 QueryResponse::DropAnnounceQueues => {}
2461 _ => panic!("unexpected response"),
2462 }
2463 }
2464
2465 #[test]
2470 fn register_link_dest_event() {
2471 let (tx, rx) = event::channel();
2472 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2473 let mut driver = Driver::new(
2474 TransportConfig { transport_enabled: false, identity_hash: None },
2475 rx,
2476 tx.clone(),
2477 Box::new(cbs),
2478 );
2479 let info = make_interface_info(1);
2480 driver.engine.register_interface(info);
2481 let (writer, _sent) = MockWriter::new();
2482 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2483
2484 let mut rng = OsRng;
2485 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
2486 let sig_pub_bytes = sig_prv.public_key().public_bytes();
2487 let sig_prv_bytes = sig_prv.private_bytes();
2488 let dest_hash = [0xDD; 16];
2489
2490 tx.send(Event::RegisterLinkDestination {
2491 dest_hash,
2492 sig_prv_bytes,
2493 sig_pub_bytes,
2494 }).unwrap();
2495 tx.send(Event::Shutdown).unwrap();
2496 driver.run();
2497
2498 assert!(driver.link_manager.is_link_destination(&dest_hash));
2500 }
2501
2502 #[test]
2503 fn create_link_event() {
2504 let (tx, rx) = event::channel();
2505 let (cbs, _link_established, _, _) = MockCallbacks::with_link_tracking();
2506 let mut driver = Driver::new(
2507 TransportConfig { transport_enabled: false, identity_hash: None },
2508 rx,
2509 tx.clone(),
2510 Box::new(cbs),
2511 );
2512 let info = make_interface_info(1);
2513 driver.engine.register_interface(info);
2514 let (writer, _sent) = MockWriter::new();
2515 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2516
2517 let dest_hash = [0xDD; 16];
2518 let dummy_sig_pub = [0xAA; 32];
2519
2520 let (resp_tx, resp_rx) = mpsc::channel();
2521 tx.send(Event::CreateLink {
2522 dest_hash,
2523 dest_sig_pub_bytes: dummy_sig_pub,
2524 response_tx: resp_tx,
2525 }).unwrap();
2526 tx.send(Event::Shutdown).unwrap();
2527 driver.run();
2528
2529 let link_id = resp_rx.recv().unwrap();
2531 assert_ne!(link_id, [0u8; 16]);
2532
2533 assert_eq!(driver.link_manager.link_count(), 1);
2535
2536 }
2541
2542 #[test]
2543 fn deliver_local_routes_to_link_manager() {
2544 let (tx, rx) = event::channel();
2547 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2548 let mut driver = Driver::new(
2549 TransportConfig { transport_enabled: false, identity_hash: None },
2550 rx,
2551 tx.clone(),
2552 Box::new(cbs),
2553 );
2554 let info = make_interface_info(1);
2555 driver.engine.register_interface(info);
2556 let (writer, _sent) = MockWriter::new();
2557 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2558
2559 let mut rng = OsRng;
2561 let sig_prv = rns_crypto::ed25519::Ed25519PrivateKey::generate(&mut rng);
2562 let sig_pub_bytes = sig_prv.public_key().public_bytes();
2563 let dest_hash = [0xEE; 16];
2564 driver.link_manager.register_link_destination(dest_hash, sig_prv, sig_pub_bytes);
2565
2566 assert!(driver.link_manager.is_link_destination(&dest_hash));
2570
2571 assert!(!driver.link_manager.is_link_destination(&[0xFF; 16]));
2573
2574 drop(tx);
2575 }
2576
2577 #[test]
2578 fn teardown_link_event() {
2579 let (tx, rx) = event::channel();
2580 let (cbs, _, link_closed, _) = MockCallbacks::with_link_tracking();
2581 let mut driver = Driver::new(
2582 TransportConfig { transport_enabled: false, identity_hash: None },
2583 rx,
2584 tx.clone(),
2585 Box::new(cbs),
2586 );
2587 let info = make_interface_info(1);
2588 driver.engine.register_interface(info);
2589 let (writer, _sent) = MockWriter::new();
2590 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2591
2592 let (resp_tx, resp_rx) = mpsc::channel();
2594 tx.send(Event::CreateLink {
2595 dest_hash: [0xDD; 16],
2596 dest_sig_pub_bytes: [0xAA; 32],
2597 response_tx: resp_tx,
2598 }).unwrap();
2599 tx.send(Event::Shutdown).unwrap();
2604 driver.run();
2605
2606 let link_id = resp_rx.recv().unwrap();
2607 assert_ne!(link_id, [0u8; 16]);
2608 assert_eq!(driver.link_manager.link_count(), 1);
2609
2610 let teardown_actions = driver.link_manager.teardown_link(&link_id);
2612 driver.dispatch_link_actions(teardown_actions);
2613
2614 assert_eq!(link_closed.lock().unwrap().len(), 1);
2616 assert_eq!(link_closed.lock().unwrap()[0], TypedLinkId(link_id));
2617 }
2618
2619 #[test]
2620 fn link_count_includes_link_manager() {
2621 let (tx, rx) = event::channel();
2622 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2623 let mut driver = Driver::new(
2624 TransportConfig { transport_enabled: false, identity_hash: None },
2625 rx,
2626 tx.clone(),
2627 Box::new(cbs),
2628 );
2629 let info = make_interface_info(1);
2630 driver.engine.register_interface(info);
2631 let (writer, _sent) = MockWriter::new();
2632 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2633
2634 let mut rng = OsRng;
2636 let dummy_sig = [0xAA; 32];
2637 driver.link_manager.create_link(&[0xDD; 16], &dummy_sig, 1, constants::MTU as u32, &mut rng);
2638
2639 let (resp_tx, resp_rx) = mpsc::channel();
2641 tx.send(Event::Query(QueryRequest::LinkCount, resp_tx)).unwrap();
2642 tx.send(Event::Shutdown).unwrap();
2643 driver.run();
2644
2645 match resp_rx.recv().unwrap() {
2646 QueryResponse::LinkCount(count) => assert_eq!(count, 1),
2647 _ => panic!("unexpected response"),
2648 }
2649 }
2650
2651 #[test]
2652 fn register_request_handler_event() {
2653 let (tx, rx) = event::channel();
2654 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2655 let mut driver = Driver::new(
2656 TransportConfig { transport_enabled: false, identity_hash: None },
2657 rx,
2658 tx.clone(),
2659 Box::new(cbs),
2660 );
2661
2662 tx.send(Event::RegisterRequestHandler {
2663 path: "/status".to_string(),
2664 allowed_list: None,
2665 handler: Box::new(|_link_id, _path, _data, _remote| Some(b"OK".to_vec())),
2666 }).unwrap();
2667 tx.send(Event::Shutdown).unwrap();
2668 driver.run();
2669
2670 }
2673
2674 #[test]
2677 fn management_announces_emitted_after_delay() {
2678 let (tx, rx) = event::channel();
2679 let (cbs, announces, _, _, _, _) = MockCallbacks::new();
2680 let identity = Identity::new(&mut OsRng);
2681 let identity_hash = *identity.hash();
2682 let mut driver = Driver::new(
2683 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
2684 rx,
2685 tx.clone(),
2686 Box::new(cbs),
2687 );
2688
2689 let info = make_interface_info(1);
2691 driver.engine.register_interface(info.clone());
2692 let (writer, sent) = MockWriter::new();
2693 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2694
2695 driver.management_config.enable_remote_management = true;
2697 driver.transport_identity = Some(identity);
2698
2699 driver.started = time::now() - 10.0;
2701
2702 tx.send(Event::Tick).unwrap();
2704 tx.send(Event::Shutdown).unwrap();
2705 driver.run();
2706
2707 let sent_packets = sent.lock().unwrap();
2709 assert!(!sent_packets.is_empty(),
2710 "Management announce should be sent after startup delay");
2711 }
2712
2713 #[test]
2714 fn management_announces_not_emitted_when_disabled() {
2715 let (tx, rx) = event::channel();
2716 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2717 let identity = Identity::new(&mut OsRng);
2718 let identity_hash = *identity.hash();
2719 let mut driver = Driver::new(
2720 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
2721 rx,
2722 tx.clone(),
2723 Box::new(cbs),
2724 );
2725
2726 let info = make_interface_info(1);
2727 driver.engine.register_interface(info.clone());
2728 let (writer, sent) = MockWriter::new();
2729 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2730
2731 driver.transport_identity = Some(identity);
2733 driver.started = time::now() - 10.0;
2734
2735 tx.send(Event::Tick).unwrap();
2736 tx.send(Event::Shutdown).unwrap();
2737 driver.run();
2738
2739 let sent_packets = sent.lock().unwrap();
2741 assert!(sent_packets.is_empty(),
2742 "No announces should be sent when management is disabled");
2743 }
2744
2745 #[test]
2746 fn management_announces_not_emitted_before_delay() {
2747 let (tx, rx) = event::channel();
2748 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2749 let identity = Identity::new(&mut OsRng);
2750 let identity_hash = *identity.hash();
2751 let mut driver = Driver::new(
2752 TransportConfig { transport_enabled: true, identity_hash: Some(identity_hash) },
2753 rx,
2754 tx.clone(),
2755 Box::new(cbs),
2756 );
2757
2758 let info = make_interface_info(1);
2759 driver.engine.register_interface(info.clone());
2760 let (writer, sent) = MockWriter::new();
2761 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2762
2763 driver.management_config.enable_remote_management = true;
2764 driver.transport_identity = Some(identity);
2765 driver.started = time::now();
2767
2768 tx.send(Event::Tick).unwrap();
2769 tx.send(Event::Shutdown).unwrap();
2770 driver.run();
2771
2772 let sent_packets = sent.lock().unwrap();
2773 assert!(sent_packets.is_empty(),
2774 "No announces before startup delay");
2775 }
2776
2777 #[test]
2782 fn announce_received_populates_known_destinations() {
2783 let (tx, rx) = event::channel();
2784 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2785 let mut driver = Driver::new(
2786 TransportConfig { transport_enabled: false, identity_hash: None },
2787 rx,
2788 tx.clone(),
2789 Box::new(cbs),
2790 );
2791 let info = make_interface_info(1);
2792 driver.engine.register_interface(info);
2793 let (writer, _sent) = MockWriter::new();
2794 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2795
2796 let identity = Identity::new(&mut OsRng);
2797 let announce_raw = build_announce_packet(&identity);
2798
2799 let dest_hash = rns_core::destination::destination_hash(
2800 "test", &["app"], Some(identity.hash()),
2801 );
2802
2803 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2804 tx.send(Event::Shutdown).unwrap();
2805 driver.run();
2806
2807 assert!(driver.known_destinations.contains_key(&dest_hash));
2809 let recalled = &driver.known_destinations[&dest_hash];
2810 assert_eq!(recalled.dest_hash.0, dest_hash);
2811 assert_eq!(recalled.identity_hash.0, *identity.hash());
2812 assert_eq!(&recalled.public_key, &identity.get_public_key().unwrap());
2813 assert_eq!(recalled.hops, 1);
2814 }
2815
2816 #[test]
2817 fn query_has_path() {
2818 let (tx, rx) = event::channel();
2819 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2820 let mut driver = Driver::new(
2821 TransportConfig { transport_enabled: false, identity_hash: None },
2822 rx,
2823 tx.clone(),
2824 Box::new(cbs),
2825 );
2826 let info = make_interface_info(1);
2827 driver.engine.register_interface(info);
2828 let (writer, _sent) = MockWriter::new();
2829 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2830
2831 let (resp_tx, resp_rx) = mpsc::channel();
2833 tx.send(Event::Query(QueryRequest::HasPath { dest_hash: [0xAA; 16] }, resp_tx)).unwrap();
2834
2835 let identity = Identity::new(&mut OsRng);
2837 let announce_raw = build_announce_packet(&identity);
2838 let dest_hash = rns_core::destination::destination_hash(
2839 "test", &["app"], Some(identity.hash()),
2840 );
2841 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2842
2843 let (resp_tx2, resp_rx2) = mpsc::channel();
2844 tx.send(Event::Query(QueryRequest::HasPath { dest_hash }, resp_tx2)).unwrap();
2845
2846 tx.send(Event::Shutdown).unwrap();
2847 driver.run();
2848
2849 match resp_rx.recv().unwrap() {
2851 QueryResponse::HasPath(false) => {}
2852 other => panic!("expected HasPath(false), got {:?}", other),
2853 }
2854
2855 match resp_rx2.recv().unwrap() {
2857 QueryResponse::HasPath(true) => {}
2858 other => panic!("expected HasPath(true), got {:?}", other),
2859 }
2860 }
2861
2862 #[test]
2863 fn query_hops_to() {
2864 let (tx, rx) = event::channel();
2865 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2866 let mut driver = Driver::new(
2867 TransportConfig { transport_enabled: false, identity_hash: None },
2868 rx,
2869 tx.clone(),
2870 Box::new(cbs),
2871 );
2872 let info = make_interface_info(1);
2873 driver.engine.register_interface(info);
2874 let (writer, _sent) = MockWriter::new();
2875 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2876
2877 let identity = Identity::new(&mut OsRng);
2879 let announce_raw = build_announce_packet(&identity);
2880 let dest_hash = rns_core::destination::destination_hash(
2881 "test", &["app"], Some(identity.hash()),
2882 );
2883
2884 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2885
2886 let (resp_tx, resp_rx) = mpsc::channel();
2887 tx.send(Event::Query(QueryRequest::HopsTo { dest_hash }, resp_tx)).unwrap();
2888 tx.send(Event::Shutdown).unwrap();
2889 driver.run();
2890
2891 match resp_rx.recv().unwrap() {
2892 QueryResponse::HopsTo(Some(1)) => {}
2893 other => panic!("expected HopsTo(Some(1)), got {:?}", other),
2894 }
2895 }
2896
2897 #[test]
2898 fn query_recall_identity() {
2899 let (tx, rx) = event::channel();
2900 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2901 let mut driver = Driver::new(
2902 TransportConfig { transport_enabled: false, identity_hash: None },
2903 rx,
2904 tx.clone(),
2905 Box::new(cbs),
2906 );
2907 let info = make_interface_info(1);
2908 driver.engine.register_interface(info);
2909 let (writer, _sent) = MockWriter::new();
2910 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2911
2912 let identity = Identity::new(&mut OsRng);
2913 let announce_raw = build_announce_packet(&identity);
2914 let dest_hash = rns_core::destination::destination_hash(
2915 "test", &["app"], Some(identity.hash()),
2916 );
2917
2918 tx.send(Event::Frame { interface_id: InterfaceId(1), data: announce_raw }).unwrap();
2919
2920 let (resp_tx, resp_rx) = mpsc::channel();
2922 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash }, resp_tx)).unwrap();
2923
2924 let (resp_tx2, resp_rx2) = mpsc::channel();
2926 tx.send(Event::Query(QueryRequest::RecallIdentity { dest_hash: [0xFF; 16] }, resp_tx2)).unwrap();
2927
2928 tx.send(Event::Shutdown).unwrap();
2929 driver.run();
2930
2931 match resp_rx.recv().unwrap() {
2932 QueryResponse::RecallIdentity(Some(recalled)) => {
2933 assert_eq!(recalled.dest_hash.0, dest_hash);
2934 assert_eq!(recalled.identity_hash.0, *identity.hash());
2935 assert_eq!(recalled.public_key, identity.get_public_key().unwrap());
2936 assert_eq!(recalled.hops, 1);
2937 }
2938 other => panic!("expected RecallIdentity(Some(..)), got {:?}", other),
2939 }
2940
2941 match resp_rx2.recv().unwrap() {
2942 QueryResponse::RecallIdentity(None) => {}
2943 other => panic!("expected RecallIdentity(None), got {:?}", other),
2944 }
2945 }
2946
2947 #[test]
2948 fn request_path_sends_packet() {
2949 let (tx, rx) = event::channel();
2950 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2951 let mut driver = Driver::new(
2952 TransportConfig { transport_enabled: false, identity_hash: None },
2953 rx,
2954 tx.clone(),
2955 Box::new(cbs),
2956 );
2957 let info = make_interface_info(1);
2958 driver.engine.register_interface(info);
2959 let (writer, sent) = MockWriter::new();
2960 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2961
2962 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
2964 tx.send(Event::Shutdown).unwrap();
2965 driver.run();
2966
2967 let sent_packets = sent.lock().unwrap();
2969 assert!(!sent_packets.is_empty(), "Path request should be sent on wire");
2970
2971 let raw = &sent_packets[0];
2973 let flags = rns_core::packet::PacketFlags::unpack(raw[0] & 0x7F);
2974 assert_eq!(flags.packet_type, constants::PACKET_TYPE_DATA);
2975 assert_eq!(flags.destination_type, constants::DESTINATION_PLAIN);
2976 assert_eq!(flags.transport_type, constants::TRANSPORT_BROADCAST);
2977 }
2978
2979 #[test]
2980 fn request_path_includes_transport_id() {
2981 let (tx, rx) = event::channel();
2982 let (cbs, _, _, _, _, _) = MockCallbacks::new();
2983 let mut driver = Driver::new(
2984 TransportConfig { transport_enabled: true, identity_hash: Some([0xBB; 16]) },
2985 rx,
2986 tx.clone(),
2987 Box::new(cbs),
2988 );
2989 let info = make_interface_info(1);
2990 driver.engine.register_interface(info);
2991 let (writer, sent) = MockWriter::new();
2992 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
2993
2994 tx.send(Event::RequestPath { dest_hash: [0xAA; 16] }).unwrap();
2995 tx.send(Event::Shutdown).unwrap();
2996 driver.run();
2997
2998 let sent_packets = sent.lock().unwrap();
2999 assert!(!sent_packets.is_empty());
3000
3001 let raw = &sent_packets[0];
3003 if let Ok(packet) = RawPacket::unpack(raw) {
3004 assert_eq!(packet.data.len(), 48, "Path request data should be 48 bytes with transport_id");
3006 assert_eq!(&packet.data[..16], &[0xAA; 16], "First 16 bytes should be dest_hash");
3007 assert_eq!(&packet.data[16..32], &[0xBB; 16], "Next 16 bytes should be transport_id");
3008 } else {
3009 panic!("Could not unpack sent packet");
3010 }
3011 }
3012
3013 #[test]
3014 fn path_request_dest_registered() {
3015 let (tx, rx) = event::channel();
3016 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3017 let driver = Driver::new(
3018 TransportConfig { transport_enabled: false, identity_hash: None },
3019 rx,
3020 tx.clone(),
3021 Box::new(cbs),
3022 );
3023
3024 let expected_dest = rns_core::destination::destination_hash(
3026 "rnstransport", &["path", "request"], None,
3027 );
3028 assert_eq!(driver.path_request_dest, expected_dest);
3029
3030 drop(tx);
3031 }
3032
3033 #[test]
3038 fn register_proof_strategy_event() {
3039 let (tx, rx) = event::channel();
3040 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3041 let mut driver = Driver::new(
3042 TransportConfig { transport_enabled: false, identity_hash: None },
3043 rx,
3044 tx.clone(),
3045 Box::new(cbs),
3046 );
3047
3048 let dest = [0xAA; 16];
3049 let identity = Identity::new(&mut OsRng);
3050 let prv_key = identity.get_private_key().unwrap();
3051
3052 tx.send(Event::RegisterProofStrategy {
3053 dest_hash: dest,
3054 strategy: rns_core::types::ProofStrategy::ProveAll,
3055 signing_key: Some(prv_key),
3056 }).unwrap();
3057 tx.send(Event::Shutdown).unwrap();
3058 driver.run();
3059
3060 assert!(driver.proof_strategies.contains_key(&dest));
3061 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
3062 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveAll);
3063 assert!(id_opt.is_some());
3064 }
3065
3066 #[test]
3067 fn register_proof_strategy_prove_none_no_identity() {
3068 let (tx, rx) = event::channel();
3069 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3070 let mut driver = Driver::new(
3071 TransportConfig { transport_enabled: false, identity_hash: None },
3072 rx,
3073 tx.clone(),
3074 Box::new(cbs),
3075 );
3076
3077 let dest = [0xBB; 16];
3078 tx.send(Event::RegisterProofStrategy {
3079 dest_hash: dest,
3080 strategy: rns_core::types::ProofStrategy::ProveNone,
3081 signing_key: None,
3082 }).unwrap();
3083 tx.send(Event::Shutdown).unwrap();
3084 driver.run();
3085
3086 assert!(driver.proof_strategies.contains_key(&dest));
3087 let (strategy, ref id_opt) = driver.proof_strategies[&dest];
3088 assert_eq!(strategy, rns_core::types::ProofStrategy::ProveNone);
3089 assert!(id_opt.is_none());
3090 }
3091
3092 #[test]
3093 fn send_outbound_tracks_sent_packets() {
3094 let (tx, rx) = event::channel();
3095 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3096 let mut driver = Driver::new(
3097 TransportConfig { transport_enabled: false, identity_hash: None },
3098 rx,
3099 tx.clone(),
3100 Box::new(cbs),
3101 );
3102 let info = make_interface_info(1);
3103 driver.engine.register_interface(info);
3104 let (writer, _sent) = MockWriter::new();
3105 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3106
3107 let dest = [0xCC; 16];
3109 let flags = PacketFlags {
3110 header_type: constants::HEADER_1,
3111 context_flag: constants::FLAG_UNSET,
3112 transport_type: constants::TRANSPORT_BROADCAST,
3113 destination_type: constants::DESTINATION_PLAIN,
3114 packet_type: constants::PACKET_TYPE_DATA,
3115 };
3116 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"test data").unwrap();
3117 let expected_hash = packet.packet_hash;
3118
3119 tx.send(Event::SendOutbound {
3120 raw: packet.raw,
3121 dest_type: constants::DESTINATION_PLAIN,
3122 attached_interface: None,
3123 }).unwrap();
3124 tx.send(Event::Shutdown).unwrap();
3125 driver.run();
3126
3127 assert!(driver.sent_packets.contains_key(&expected_hash));
3129 let (tracked_dest, _sent_time) = &driver.sent_packets[&expected_hash];
3130 assert_eq!(tracked_dest, &dest);
3131 }
3132
3133 #[test]
3134 fn prove_all_generates_proof_on_delivery() {
3135 let (tx, rx) = event::channel();
3136 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3137 let mut driver = Driver::new(
3138 TransportConfig { transport_enabled: false, identity_hash: None },
3139 rx,
3140 tx.clone(),
3141 Box::new(cbs),
3142 );
3143 let info = make_interface_info(1);
3144 driver.engine.register_interface(info);
3145 let (writer, sent) = MockWriter::new();
3146 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3147
3148 let dest = [0xDD; 16];
3150 let identity = Identity::new(&mut OsRng);
3151 let prv_key = identity.get_private_key().unwrap();
3152 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3153 driver.proof_strategies.insert(dest, (
3154 rns_core::types::ProofStrategy::ProveAll,
3155 Some(Identity::from_private_key(&prv_key)),
3156 ));
3157
3158 let flags = PacketFlags {
3160 header_type: constants::HEADER_1,
3161 context_flag: constants::FLAG_UNSET,
3162 transport_type: constants::TRANSPORT_BROADCAST,
3163 destination_type: constants::DESTINATION_SINGLE,
3164 packet_type: constants::PACKET_TYPE_DATA,
3165 };
3166 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3167
3168 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3169 tx.send(Event::Shutdown).unwrap();
3170 driver.run();
3171
3172 assert_eq!(deliveries.lock().unwrap().len(), 1);
3174
3175 let sent_packets = sent.lock().unwrap();
3177 let has_proof = sent_packets.iter().any(|raw| {
3179 let flags = PacketFlags::unpack(raw[0] & 0x7F);
3180 flags.packet_type == constants::PACKET_TYPE_PROOF
3181 });
3182 assert!(has_proof, "ProveAll should generate a proof packet: sent {} packets", sent_packets.len());
3183 }
3184
3185 #[test]
3186 fn prove_none_does_not_generate_proof() {
3187 let (tx, rx) = event::channel();
3188 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3189 let mut driver = Driver::new(
3190 TransportConfig { transport_enabled: false, identity_hash: None },
3191 rx,
3192 tx.clone(),
3193 Box::new(cbs),
3194 );
3195 let info = make_interface_info(1);
3196 driver.engine.register_interface(info);
3197 let (writer, sent) = MockWriter::new();
3198 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3199
3200 let dest = [0xDD; 16];
3202 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3203 driver.proof_strategies.insert(dest, (
3204 rns_core::types::ProofStrategy::ProveNone,
3205 None,
3206 ));
3207
3208 let flags = PacketFlags {
3210 header_type: constants::HEADER_1,
3211 context_flag: constants::FLAG_UNSET,
3212 transport_type: constants::TRANSPORT_BROADCAST,
3213 destination_type: constants::DESTINATION_SINGLE,
3214 packet_type: constants::PACKET_TYPE_DATA,
3215 };
3216 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3217
3218 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3219 tx.send(Event::Shutdown).unwrap();
3220 driver.run();
3221
3222 assert_eq!(deliveries.lock().unwrap().len(), 1);
3224
3225 let sent_packets = sent.lock().unwrap();
3227 let has_proof = sent_packets.iter().any(|raw| {
3228 let flags = PacketFlags::unpack(raw[0] & 0x7F);
3229 flags.packet_type == constants::PACKET_TYPE_PROOF
3230 });
3231 assert!(!has_proof, "ProveNone should not generate a proof packet");
3232 }
3233
3234 #[test]
3235 fn no_proof_strategy_does_not_generate_proof() {
3236 let (tx, rx) = event::channel();
3237 let (cbs, _, _, deliveries, _, _) = MockCallbacks::new();
3238 let mut driver = Driver::new(
3239 TransportConfig { transport_enabled: false, identity_hash: None },
3240 rx,
3241 tx.clone(),
3242 Box::new(cbs),
3243 );
3244 let info = make_interface_info(1);
3245 driver.engine.register_interface(info);
3246 let (writer, sent) = MockWriter::new();
3247 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3248
3249 let dest = [0xDD; 16];
3251 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3252
3253 let flags = PacketFlags {
3254 header_type: constants::HEADER_1,
3255 context_flag: constants::FLAG_UNSET,
3256 transport_type: constants::TRANSPORT_BROADCAST,
3257 destination_type: constants::DESTINATION_SINGLE,
3258 packet_type: constants::PACKET_TYPE_DATA,
3259 };
3260 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"hello").unwrap();
3261
3262 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3263 tx.send(Event::Shutdown).unwrap();
3264 driver.run();
3265
3266 assert_eq!(deliveries.lock().unwrap().len(), 1);
3267
3268 let sent_packets = sent.lock().unwrap();
3269 let has_proof = sent_packets.iter().any(|raw| {
3270 let flags = PacketFlags::unpack(raw[0] & 0x7F);
3271 flags.packet_type == constants::PACKET_TYPE_PROOF
3272 });
3273 assert!(!has_proof, "No proof strategy means no proof generated");
3274 }
3275
3276 #[test]
3277 fn prove_app_calls_callback() {
3278 let (tx, rx) = event::channel();
3279 let proof_requested = Arc::new(Mutex::new(Vec::new()));
3280 let deliveries = Arc::new(Mutex::new(Vec::new()));
3281 let cbs = MockCallbacks {
3282 announces: Arc::new(Mutex::new(Vec::new())),
3283 paths: Arc::new(Mutex::new(Vec::new())),
3284 deliveries: deliveries.clone(),
3285 iface_ups: Arc::new(Mutex::new(Vec::new())),
3286 iface_downs: Arc::new(Mutex::new(Vec::new())),
3287 link_established: Arc::new(Mutex::new(Vec::new())),
3288 link_closed: Arc::new(Mutex::new(Vec::new())),
3289 remote_identified: Arc::new(Mutex::new(Vec::new())),
3290 resources_received: Arc::new(Mutex::new(Vec::new())),
3291 resource_completed: Arc::new(Mutex::new(Vec::new())),
3292 resource_failed: Arc::new(Mutex::new(Vec::new())),
3293 channel_messages: Arc::new(Mutex::new(Vec::new())),
3294 link_data: Arc::new(Mutex::new(Vec::new())),
3295 responses: Arc::new(Mutex::new(Vec::new())),
3296 proofs: Arc::new(Mutex::new(Vec::new())),
3297 proof_requested: proof_requested.clone(),
3298 };
3299
3300 let mut driver = Driver::new(
3301 TransportConfig { transport_enabled: false, identity_hash: None },
3302 rx,
3303 tx.clone(),
3304 Box::new(cbs),
3305 );
3306 let info = make_interface_info(1);
3307 driver.engine.register_interface(info);
3308 let (writer, sent) = MockWriter::new();
3309 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3310
3311 let dest = [0xDD; 16];
3313 let identity = Identity::new(&mut OsRng);
3314 let prv_key = identity.get_private_key().unwrap();
3315 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3316 driver.proof_strategies.insert(dest, (
3317 rns_core::types::ProofStrategy::ProveApp,
3318 Some(Identity::from_private_key(&prv_key)),
3319 ));
3320
3321 let flags = PacketFlags {
3322 header_type: constants::HEADER_1,
3323 context_flag: constants::FLAG_UNSET,
3324 transport_type: constants::TRANSPORT_BROADCAST,
3325 destination_type: constants::DESTINATION_SINGLE,
3326 packet_type: constants::PACKET_TYPE_DATA,
3327 };
3328 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"app test").unwrap();
3329
3330 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3331 tx.send(Event::Shutdown).unwrap();
3332 driver.run();
3333
3334 let prs = proof_requested.lock().unwrap();
3336 assert_eq!(prs.len(), 1);
3337 assert_eq!(prs[0].0, DestHash(dest));
3338
3339 let sent_packets = sent.lock().unwrap();
3341 let has_proof = sent_packets.iter().any(|raw| {
3342 let flags = PacketFlags::unpack(raw[0] & 0x7F);
3343 flags.packet_type == constants::PACKET_TYPE_PROOF
3344 });
3345 assert!(has_proof, "ProveApp (callback returns true) should generate a proof");
3346 }
3347
3348 #[test]
3349 fn inbound_proof_fires_callback() {
3350 let (tx, rx) = event::channel();
3351 let proofs = Arc::new(Mutex::new(Vec::new()));
3352 let cbs = MockCallbacks {
3353 announces: Arc::new(Mutex::new(Vec::new())),
3354 paths: Arc::new(Mutex::new(Vec::new())),
3355 deliveries: Arc::new(Mutex::new(Vec::new())),
3356 iface_ups: Arc::new(Mutex::new(Vec::new())),
3357 iface_downs: Arc::new(Mutex::new(Vec::new())),
3358 link_established: Arc::new(Mutex::new(Vec::new())),
3359 link_closed: Arc::new(Mutex::new(Vec::new())),
3360 remote_identified: Arc::new(Mutex::new(Vec::new())),
3361 resources_received: Arc::new(Mutex::new(Vec::new())),
3362 resource_completed: Arc::new(Mutex::new(Vec::new())),
3363 resource_failed: Arc::new(Mutex::new(Vec::new())),
3364 channel_messages: Arc::new(Mutex::new(Vec::new())),
3365 link_data: Arc::new(Mutex::new(Vec::new())),
3366 responses: Arc::new(Mutex::new(Vec::new())),
3367 proofs: proofs.clone(),
3368 proof_requested: Arc::new(Mutex::new(Vec::new())),
3369 };
3370
3371 let mut driver = Driver::new(
3372 TransportConfig { transport_enabled: false, identity_hash: None },
3373 rx,
3374 tx.clone(),
3375 Box::new(cbs),
3376 );
3377 let info = make_interface_info(1);
3378 driver.engine.register_interface(info);
3379 let (writer, _sent) = MockWriter::new();
3380 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3381
3382 let dest = [0xEE; 16];
3384 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3385
3386 let tracked_hash = [0x42u8; 32];
3388 let sent_time = time::now() - 0.5; driver.sent_packets.insert(tracked_hash, (dest, sent_time));
3390
3391 let mut proof_data = Vec::new();
3393 proof_data.extend_from_slice(&tracked_hash);
3394 proof_data.extend_from_slice(&[0xAA; 64]); let flags = PacketFlags {
3397 header_type: constants::HEADER_1,
3398 context_flag: constants::FLAG_UNSET,
3399 transport_type: constants::TRANSPORT_BROADCAST,
3400 destination_type: constants::DESTINATION_SINGLE,
3401 packet_type: constants::PACKET_TYPE_PROOF,
3402 };
3403 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
3404
3405 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3406 tx.send(Event::Shutdown).unwrap();
3407 driver.run();
3408
3409 let proof_list = proofs.lock().unwrap();
3411 assert_eq!(proof_list.len(), 1);
3412 assert_eq!(proof_list[0].0, DestHash(dest));
3413 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
3414 assert!(proof_list[0].2 >= 0.4, "RTT should be approximately 0.5s, got {}", proof_list[0].2);
3415
3416 assert!(!driver.sent_packets.contains_key(&tracked_hash));
3418 }
3419
3420 #[test]
3421 fn inbound_proof_for_unknown_packet_is_ignored() {
3422 let (tx, rx) = event::channel();
3423 let proofs = Arc::new(Mutex::new(Vec::new()));
3424 let cbs = MockCallbacks {
3425 announces: Arc::new(Mutex::new(Vec::new())),
3426 paths: Arc::new(Mutex::new(Vec::new())),
3427 deliveries: Arc::new(Mutex::new(Vec::new())),
3428 iface_ups: Arc::new(Mutex::new(Vec::new())),
3429 iface_downs: Arc::new(Mutex::new(Vec::new())),
3430 link_established: Arc::new(Mutex::new(Vec::new())),
3431 link_closed: Arc::new(Mutex::new(Vec::new())),
3432 remote_identified: Arc::new(Mutex::new(Vec::new())),
3433 resources_received: Arc::new(Mutex::new(Vec::new())),
3434 resource_completed: Arc::new(Mutex::new(Vec::new())),
3435 resource_failed: Arc::new(Mutex::new(Vec::new())),
3436 channel_messages: Arc::new(Mutex::new(Vec::new())),
3437 link_data: Arc::new(Mutex::new(Vec::new())),
3438 responses: Arc::new(Mutex::new(Vec::new())),
3439 proofs: proofs.clone(),
3440 proof_requested: Arc::new(Mutex::new(Vec::new())),
3441 };
3442
3443 let mut driver = Driver::new(
3444 TransportConfig { transport_enabled: false, identity_hash: None },
3445 rx,
3446 tx.clone(),
3447 Box::new(cbs),
3448 );
3449 let info = make_interface_info(1);
3450 driver.engine.register_interface(info);
3451 let (writer, _sent) = MockWriter::new();
3452 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3453
3454 let dest = [0xEE; 16];
3455 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3456
3457 let unknown_hash = [0xFF; 32];
3459 let mut proof_data = Vec::new();
3460 proof_data.extend_from_slice(&unknown_hash);
3461 proof_data.extend_from_slice(&[0xAA; 64]);
3462
3463 let flags = PacketFlags {
3464 header_type: constants::HEADER_1,
3465 context_flag: constants::FLAG_UNSET,
3466 transport_type: constants::TRANSPORT_BROADCAST,
3467 destination_type: constants::DESTINATION_SINGLE,
3468 packet_type: constants::PACKET_TYPE_PROOF,
3469 };
3470 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
3471
3472 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3473 tx.send(Event::Shutdown).unwrap();
3474 driver.run();
3475
3476 assert!(proofs.lock().unwrap().is_empty());
3478 }
3479
3480 #[test]
3481 fn inbound_proof_with_valid_signature_fires_callback() {
3482 let (tx, rx) = event::channel();
3484 let proofs = Arc::new(Mutex::new(Vec::new()));
3485 let cbs = MockCallbacks {
3486 announces: Arc::new(Mutex::new(Vec::new())),
3487 paths: Arc::new(Mutex::new(Vec::new())),
3488 deliveries: Arc::new(Mutex::new(Vec::new())),
3489 iface_ups: Arc::new(Mutex::new(Vec::new())),
3490 iface_downs: Arc::new(Mutex::new(Vec::new())),
3491 link_established: Arc::new(Mutex::new(Vec::new())),
3492 link_closed: Arc::new(Mutex::new(Vec::new())),
3493 remote_identified: Arc::new(Mutex::new(Vec::new())),
3494 resources_received: Arc::new(Mutex::new(Vec::new())),
3495 resource_completed: Arc::new(Mutex::new(Vec::new())),
3496 resource_failed: Arc::new(Mutex::new(Vec::new())),
3497 channel_messages: Arc::new(Mutex::new(Vec::new())),
3498 link_data: Arc::new(Mutex::new(Vec::new())),
3499 responses: Arc::new(Mutex::new(Vec::new())),
3500 proofs: proofs.clone(),
3501 proof_requested: Arc::new(Mutex::new(Vec::new())),
3502 };
3503
3504 let mut driver = Driver::new(
3505 TransportConfig { transport_enabled: false, identity_hash: None },
3506 rx,
3507 tx.clone(),
3508 Box::new(cbs),
3509 );
3510 let info = make_interface_info(1);
3511 driver.engine.register_interface(info);
3512 let (writer, _sent) = MockWriter::new();
3513 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3514
3515 let dest = [0xEE; 16];
3516 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3517
3518 let identity = Identity::new(&mut OsRng);
3520 let pub_key = identity.get_public_key();
3521 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
3522 dest_hash: DestHash(dest),
3523 identity_hash: IdentityHash(*identity.hash()),
3524 public_key: pub_key.unwrap(),
3525 app_data: None,
3526 hops: 0,
3527 received_at: time::now(),
3528 });
3529
3530 let tracked_hash = [0x42u8; 32];
3532 let sent_time = time::now() - 0.5;
3533 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
3534
3535 let signature = identity.sign(&tracked_hash).unwrap();
3536 let mut proof_data = Vec::new();
3537 proof_data.extend_from_slice(&tracked_hash);
3538 proof_data.extend_from_slice(&signature);
3539
3540 let flags = PacketFlags {
3541 header_type: constants::HEADER_1,
3542 context_flag: constants::FLAG_UNSET,
3543 transport_type: constants::TRANSPORT_BROADCAST,
3544 destination_type: constants::DESTINATION_SINGLE,
3545 packet_type: constants::PACKET_TYPE_PROOF,
3546 };
3547 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
3548
3549 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3550 tx.send(Event::Shutdown).unwrap();
3551 driver.run();
3552
3553 let proof_list = proofs.lock().unwrap();
3555 assert_eq!(proof_list.len(), 1);
3556 assert_eq!(proof_list[0].0, DestHash(dest));
3557 assert_eq!(proof_list[0].1, PacketHash(tracked_hash));
3558 }
3559
3560 #[test]
3561 fn inbound_proof_with_invalid_signature_rejected() {
3562 let (tx, rx) = event::channel();
3564 let proofs = Arc::new(Mutex::new(Vec::new()));
3565 let cbs = MockCallbacks {
3566 announces: Arc::new(Mutex::new(Vec::new())),
3567 paths: Arc::new(Mutex::new(Vec::new())),
3568 deliveries: Arc::new(Mutex::new(Vec::new())),
3569 iface_ups: Arc::new(Mutex::new(Vec::new())),
3570 iface_downs: Arc::new(Mutex::new(Vec::new())),
3571 link_established: Arc::new(Mutex::new(Vec::new())),
3572 link_closed: Arc::new(Mutex::new(Vec::new())),
3573 remote_identified: Arc::new(Mutex::new(Vec::new())),
3574 resources_received: Arc::new(Mutex::new(Vec::new())),
3575 resource_completed: Arc::new(Mutex::new(Vec::new())),
3576 resource_failed: Arc::new(Mutex::new(Vec::new())),
3577 channel_messages: Arc::new(Mutex::new(Vec::new())),
3578 link_data: Arc::new(Mutex::new(Vec::new())),
3579 responses: Arc::new(Mutex::new(Vec::new())),
3580 proofs: proofs.clone(),
3581 proof_requested: Arc::new(Mutex::new(Vec::new())),
3582 };
3583
3584 let mut driver = Driver::new(
3585 TransportConfig { transport_enabled: false, identity_hash: None },
3586 rx,
3587 tx.clone(),
3588 Box::new(cbs),
3589 );
3590 let info = make_interface_info(1);
3591 driver.engine.register_interface(info);
3592 let (writer, _sent) = MockWriter::new();
3593 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3594
3595 let dest = [0xEE; 16];
3596 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3597
3598 let identity = Identity::new(&mut OsRng);
3600 let pub_key = identity.get_public_key();
3601 driver.known_destinations.insert(dest, crate::destination::AnnouncedIdentity {
3602 dest_hash: DestHash(dest),
3603 identity_hash: IdentityHash(*identity.hash()),
3604 public_key: pub_key.unwrap(),
3605 app_data: None,
3606 hops: 0,
3607 received_at: time::now(),
3608 });
3609
3610 let tracked_hash = [0x42u8; 32];
3612 let sent_time = time::now() - 0.5;
3613 driver.sent_packets.insert(tracked_hash, (dest, sent_time));
3614
3615 let mut proof_data = Vec::new();
3617 proof_data.extend_from_slice(&tracked_hash);
3618 proof_data.extend_from_slice(&[0xAA; 64]);
3619
3620 let flags = PacketFlags {
3621 header_type: constants::HEADER_1,
3622 context_flag: constants::FLAG_UNSET,
3623 transport_type: constants::TRANSPORT_BROADCAST,
3624 destination_type: constants::DESTINATION_SINGLE,
3625 packet_type: constants::PACKET_TYPE_PROOF,
3626 };
3627 let packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, &proof_data).unwrap();
3628
3629 tx.send(Event::Frame { interface_id: InterfaceId(1), data: packet.raw }).unwrap();
3630 tx.send(Event::Shutdown).unwrap();
3631 driver.run();
3632
3633 assert!(proofs.lock().unwrap().is_empty());
3635 }
3636
3637 #[test]
3638 fn proof_data_is_valid_explicit_proof() {
3639 let (tx, rx) = event::channel();
3641 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3642 let mut driver = Driver::new(
3643 TransportConfig { transport_enabled: false, identity_hash: None },
3644 rx,
3645 tx.clone(),
3646 Box::new(cbs),
3647 );
3648 let info = make_interface_info(1);
3649 driver.engine.register_interface(info);
3650 let (writer, sent) = MockWriter::new();
3651 driver.interfaces.insert(InterfaceId(1), make_entry(1, Box::new(writer), true));
3652
3653 let dest = [0xDD; 16];
3654 let identity = Identity::new(&mut OsRng);
3655 let prv_key = identity.get_private_key().unwrap();
3656 driver.engine.register_destination(dest, constants::DESTINATION_SINGLE);
3657 driver.proof_strategies.insert(dest, (
3658 rns_core::types::ProofStrategy::ProveAll,
3659 Some(Identity::from_private_key(&prv_key)),
3660 ));
3661
3662 let flags = PacketFlags {
3663 header_type: constants::HEADER_1,
3664 context_flag: constants::FLAG_UNSET,
3665 transport_type: constants::TRANSPORT_BROADCAST,
3666 destination_type: constants::DESTINATION_SINGLE,
3667 packet_type: constants::PACKET_TYPE_DATA,
3668 };
3669 let data_packet = RawPacket::pack(flags, 0, &dest, None, constants::CONTEXT_NONE, b"verify me").unwrap();
3670 let data_packet_hash = data_packet.packet_hash;
3671
3672 tx.send(Event::Frame { interface_id: InterfaceId(1), data: data_packet.raw }).unwrap();
3673 tx.send(Event::Shutdown).unwrap();
3674 driver.run();
3675
3676 let sent_packets = sent.lock().unwrap();
3678 let proof_raw = sent_packets.iter().find(|raw| {
3679 let f = PacketFlags::unpack(raw[0] & 0x7F);
3680 f.packet_type == constants::PACKET_TYPE_PROOF
3681 });
3682 assert!(proof_raw.is_some(), "Should have sent a proof");
3683
3684 let proof_packet = RawPacket::unpack(proof_raw.unwrap()).unwrap();
3685 assert_eq!(proof_packet.data.len(), 96, "Explicit proof should be 96 bytes");
3687
3688 let result = rns_core::receipt::validate_proof(
3690 &proof_packet.data,
3691 &data_packet_hash,
3692 &Identity::from_private_key(&prv_key), );
3694 assert_eq!(result, rns_core::receipt::ProofResult::Valid);
3695 }
3696
3697 #[test]
3698 fn query_local_destinations_empty() {
3699 let (tx, rx) = event::channel();
3700 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3701 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
3702 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
3703
3704 let (resp_tx, resp_rx) = mpsc::channel();
3705 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
3706 tx.send(Event::Shutdown).unwrap();
3707 driver.run();
3708
3709 match resp_rx.recv().unwrap() {
3710 QueryResponse::LocalDestinations(entries) => {
3711 assert_eq!(entries.len(), 2);
3713 for entry in &entries {
3714 assert_eq!(entry.dest_type, rns_core::constants::DESTINATION_PLAIN);
3715 }
3716 }
3717 other => panic!("expected LocalDestinations, got {:?}", other),
3718 }
3719 }
3720
3721 #[test]
3722 fn query_local_destinations_with_registered() {
3723 let (tx, rx) = event::channel();
3724 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3725 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
3726 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
3727
3728 let dest_hash = [0xAA; 16];
3729 tx.send(Event::RegisterDestination {
3730 dest_hash,
3731 dest_type: rns_core::constants::DESTINATION_SINGLE,
3732 }).unwrap();
3733
3734 let (resp_tx, resp_rx) = mpsc::channel();
3735 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
3736 tx.send(Event::Shutdown).unwrap();
3737 driver.run();
3738
3739 match resp_rx.recv().unwrap() {
3740 QueryResponse::LocalDestinations(entries) => {
3741 assert_eq!(entries.len(), 3);
3743 assert!(entries.iter().any(|e| e.hash == dest_hash
3744 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
3745 }
3746 other => panic!("expected LocalDestinations, got {:?}", other),
3747 }
3748 }
3749
3750 #[test]
3751 fn query_local_destinations_tracks_link_dest() {
3752 let (tx, rx) = event::channel();
3753 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3754 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
3755 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
3756
3757 let dest_hash = [0xBB; 16];
3758 tx.send(Event::RegisterLinkDestination {
3759 dest_hash,
3760 sig_prv_bytes: [0x11; 32],
3761 sig_pub_bytes: [0x22; 32],
3762 }).unwrap();
3763
3764 let (resp_tx, resp_rx) = mpsc::channel();
3765 tx.send(Event::Query(QueryRequest::LocalDestinations, resp_tx)).unwrap();
3766 tx.send(Event::Shutdown).unwrap();
3767 driver.run();
3768
3769 match resp_rx.recv().unwrap() {
3770 QueryResponse::LocalDestinations(entries) => {
3771 assert_eq!(entries.len(), 3);
3773 assert!(entries.iter().any(|e| e.hash == dest_hash
3774 && e.dest_type == rns_core::constants::DESTINATION_SINGLE));
3775 }
3776 other => panic!("expected LocalDestinations, got {:?}", other),
3777 }
3778 }
3779
3780 #[test]
3781 fn query_links_empty() {
3782 let (tx, rx) = event::channel();
3783 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3784 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
3785 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
3786
3787 let (resp_tx, resp_rx) = mpsc::channel();
3788 tx.send(Event::Query(QueryRequest::Links, resp_tx)).unwrap();
3789 tx.send(Event::Shutdown).unwrap();
3790 driver.run();
3791
3792 match resp_rx.recv().unwrap() {
3793 QueryResponse::Links(entries) => {
3794 assert!(entries.is_empty());
3795 }
3796 other => panic!("expected Links, got {:?}", other),
3797 }
3798 }
3799
3800 #[test]
3801 fn query_resources_empty() {
3802 let (tx, rx) = event::channel();
3803 let (cbs, _, _, _, _, _) = MockCallbacks::new();
3804 let driver_config = TransportConfig { transport_enabled: false, identity_hash: None };
3805 let mut driver = Driver::new(driver_config, rx, tx.clone(), Box::new(cbs));
3806
3807 let (resp_tx, resp_rx) = mpsc::channel();
3808 tx.send(Event::Query(QueryRequest::Resources, resp_tx)).unwrap();
3809 tx.send(Event::Shutdown).unwrap();
3810 driver.run();
3811
3812 match resp_rx.recv().unwrap() {
3813 QueryResponse::Resources(entries) => {
3814 assert!(entries.is_empty());
3815 }
3816 other => panic!("expected Resources, got {:?}", other),
3817 }
3818 }
3819
3820 #[test]
3821 fn infer_interface_type_from_name() {
3822 assert_eq!(
3823 super::infer_interface_type("TCPServerInterface/Client-1234"),
3824 "TCPServerClientInterface"
3825 );
3826 assert_eq!(
3827 super::infer_interface_type("BackboneInterface/5"),
3828 "BackboneInterface"
3829 );
3830 assert_eq!(
3831 super::infer_interface_type("LocalInterface"),
3832 "LocalServerClientInterface"
3833 );
3834 assert_eq!(
3835 super::infer_interface_type("MyAutoGroup:fe80::1"),
3836 "AutoInterface"
3837 );
3838 }
3839}