1use std::collections::HashMap;
4use std::io;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8
9use rns_core::packet::RawPacket;
10use rns_core::transport::announce_verify_queue::{AnnounceVerifyQueue, OverflowPolicy};
11use rns_core::transport::tables::PathEntry;
12use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
13use rns_core::transport::TransportEngine;
14use rns_crypto::{OsRng, Rng};
15
16#[cfg(feature = "hooks")]
17use crate::provider_bridge::ProviderBridge;
18#[cfg(feature = "hooks")]
19use rns_hooks::{create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot};
20
21#[cfg(feature = "hooks")]
22use crate::event::BackbonePeerHookEvent;
23use crate::event::{
24 BackbonePeerPoolMemberStatus, BackbonePeerPoolStatus, BackbonePeerStateEntry, BlackholeInfo,
25 DrainStatus, Event, EventReceiver, InterfaceStatsResponse, KnownDestinationEntry,
26 LifecycleState, LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest,
27 QueryResponse, RateTableEntry, RuntimeConfigApplyMode, RuntimeConfigEntry, RuntimeConfigError,
28 RuntimeConfigErrorCode, RuntimeConfigSource, RuntimeConfigValue, SingleInterfaceStat,
29};
30use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
31use crate::ifac;
32#[cfg(all(feature = "iface-auto", test))]
33use crate::interface::auto::AutoRuntime;
34#[cfg(feature = "iface-auto")]
35use crate::interface::auto::AutoRuntimeConfigHandle;
36#[cfg(feature = "iface-backbone")]
37use crate::interface::backbone::{
38 start_client, BackboneClientConfig, BackboneClientRuntime, BackboneClientRuntimeConfigHandle,
39 BackbonePeerStateHandle, BackboneRuntimeConfigHandle,
40};
41#[cfg(all(feature = "iface-backbone", target_os = "linux", test))]
42use crate::interface::backbone::{BackboneAbuseConfig, BackboneServerRuntime};
43#[cfg(all(feature = "iface-i2p", test))]
44use crate::interface::i2p::I2pRuntime;
45#[cfg(feature = "iface-i2p")]
46use crate::interface::i2p::I2pRuntimeConfigHandle;
47#[cfg(all(feature = "iface-pipe", test))]
48use crate::interface::pipe::PipeRuntime;
49#[cfg(feature = "iface-pipe")]
50use crate::interface::pipe::PipeRuntimeConfigHandle;
51#[cfg(feature = "iface-rnode")]
52use crate::interface::rnode::{
53 validate_sub_config, RNodeRuntime, RNodeRuntimeConfigHandle, RNodeSubConfig,
54};
55#[cfg(feature = "iface-tcp")]
56use crate::interface::tcp::TcpClientRuntimeConfigHandle;
57#[cfg(all(feature = "iface-tcp", test))]
58use crate::interface::tcp_server::TcpServerRuntime;
59#[cfg(feature = "iface-tcp")]
60use crate::interface::tcp_server::TcpServerRuntimeConfigHandle;
61#[cfg(all(feature = "iface-udp", test))]
62use crate::interface::udp::UdpRuntime;
63#[cfg(feature = "iface-udp")]
64use crate::interface::udp::UdpRuntimeConfigHandle;
65use crate::interface::{InterfaceEntry, InterfaceStats};
66use crate::link_manager::{LinkManager, LinkManagerAction};
67use crate::time;
68
69const DEFAULT_KNOWN_DESTINATIONS_TTL: f64 = 48.0 * 60.0 * 60.0;
70const DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES: usize = 8192;
71const DEFAULT_RATE_LIMITER_TTL_SECS: f64 = 48.0 * 60.0 * 60.0;
72const DEFAULT_TICK_INTERVAL_MS: u64 = 1000;
73const DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS: u32 = 3600;
74const DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS: u32 = 3600;
75const DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE: usize = 10_000;
76const DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS: u32 = 3600;
77const DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS: f64 = 300.0;
78const DEFAULT_LINK_TEARDOWN_FLUSH: Duration = Duration::from_millis(150);
79const SEND_RETRY_BACKOFF_MIN: Duration = Duration::from_millis(25);
80const SEND_RETRY_BACKOFF_MAX: Duration = Duration::from_millis(1000);
81
82mod dispatch;
83mod events;
84mod lifecycle;
85mod queries;
86mod runtime_config;
87
88#[cfg(test)]
89mod tests;
90
91fn inject_transport_header(raw: &[u8], next_hop: &[u8; 16]) -> Vec<u8> {
92 if raw.len() < 18 {
93 return raw.to_vec();
94 }
95
96 let new_flags = (rns_core::constants::HEADER_2 << 6)
97 | (rns_core::constants::TRANSPORT_TRANSPORT << 4)
98 | (raw[0] & 0x0F);
99
100 let mut new_raw = Vec::with_capacity(raw.len() + 16);
101 new_raw.push(new_flags);
102 new_raw.push(raw[1]);
103 new_raw.extend_from_slice(next_hop);
104 new_raw.extend_from_slice(&raw[2..]);
105 new_raw
106}
107
108fn recover_mutex_guard<'a, T>(mutex: &'a Mutex<T>, label: &str) -> std::sync::MutexGuard<'a, T> {
109 match mutex.lock() {
110 Ok(guard) => guard,
111 Err(poisoned) => {
112 log::error!("recovering from poisoned mutex: {}", label);
113 poisoned.into_inner()
114 }
115 }
116}
117
118#[derive(Debug, Clone, Copy)]
119pub(crate) struct RuntimeConfigDefaults {
120 pub(crate) tick_interval_ms: u64,
121 pub(crate) known_destinations_ttl: f64,
122 pub(crate) rate_limiter_ttl_secs: f64,
123 pub(crate) known_destinations_cleanup_interval_ticks: u32,
124 pub(crate) announce_cache_cleanup_interval_ticks: u32,
125 pub(crate) announce_cache_cleanup_batch_size: usize,
126 pub(crate) discovery_cleanup_interval_ticks: u32,
127 pub(crate) management_announce_interval_secs: f64,
128 pub(crate) direct_connect_policy: crate::event::HolePunchPolicy,
129 #[cfg(feature = "hooks")]
130 pub(crate) provider_queue_max_events: usize,
131 #[cfg(feature = "hooks")]
132 pub(crate) provider_queue_max_bytes: usize,
133}
134
135#[cfg(feature = "iface-backbone")]
136#[derive(Debug, Clone)]
137pub(crate) struct BackboneDiscoveryRuntime {
138 pub(crate) discoverable: bool,
139 pub(crate) config: crate::discovery::DiscoveryConfig,
140 pub(crate) transport_enabled: bool,
141 pub(crate) ifac_netname: Option<String>,
142 pub(crate) ifac_netkey: Option<String>,
143}
144
145#[cfg(feature = "iface-backbone")]
146#[derive(Debug, Clone)]
147pub(crate) struct BackboneDiscoveryRuntimeHandle {
148 pub(crate) interface_name: String,
149 pub(crate) current: BackboneDiscoveryRuntime,
150 pub(crate) startup: BackboneDiscoveryRuntime,
151}
152
153#[cfg(feature = "iface-tcp")]
154#[derive(Debug, Clone)]
155pub(crate) struct TcpServerDiscoveryRuntime {
156 pub(crate) discoverable: bool,
157 pub(crate) config: crate::discovery::DiscoveryConfig,
158 pub(crate) transport_enabled: bool,
159 pub(crate) ifac_netname: Option<String>,
160 pub(crate) ifac_netkey: Option<String>,
161}
162
163#[cfg(feature = "iface-tcp")]
164#[derive(Debug, Clone)]
165pub(crate) struct TcpServerDiscoveryRuntimeHandle {
166 pub(crate) interface_name: String,
167 pub(crate) current: TcpServerDiscoveryRuntime,
168 pub(crate) startup: TcpServerDiscoveryRuntime,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub(crate) struct IfacRuntimeConfig {
173 pub(crate) netname: Option<String>,
174 pub(crate) netkey: Option<String>,
175 pub(crate) size: usize,
176}
177
178#[cfg(feature = "iface-backbone")]
179impl BackboneDiscoveryRuntimeHandle {
180 pub(crate) fn from_parts(
181 interface_name: String,
182 startup_config: crate::discovery::DiscoveryConfig,
183 transport_enabled: bool,
184 ifac_netname: Option<String>,
185 ifac_netkey: Option<String>,
186 discoverable: bool,
187 ) -> Self {
188 let startup = BackboneDiscoveryRuntime {
189 discoverable,
190 config: startup_config,
191 transport_enabled,
192 ifac_netname,
193 ifac_netkey,
194 };
195 Self {
196 interface_name,
197 current: startup.clone(),
198 startup,
199 }
200 }
201}
202
203#[cfg(feature = "iface-tcp")]
204impl TcpServerDiscoveryRuntimeHandle {
205 pub(crate) fn from_parts(
206 interface_name: String,
207 startup_config: crate::discovery::DiscoveryConfig,
208 transport_enabled: bool,
209 ifac_netname: Option<String>,
210 ifac_netkey: Option<String>,
211 discoverable: bool,
212 ) -> Self {
213 let startup = TcpServerDiscoveryRuntime {
214 discoverable,
215 config: startup_config,
216 transport_enabled,
217 ifac_netname,
218 ifac_netkey,
219 };
220 Self {
221 interface_name,
222 current: startup.clone(),
223 startup,
224 }
225 }
226}
227
228impl IfacRuntimeConfig {
229 pub(crate) fn from_parts(netname: Option<String>, netkey: Option<String>, size: usize) -> Self {
230 Self {
231 netname,
232 netkey,
233 size,
234 }
235 }
236}
237
238#[cfg(feature = "iface-backbone")]
239#[derive(Debug, Clone)]
240pub struct BackbonePeerPoolSettings {
241 pub max_connected: usize,
242 pub failure_threshold: usize,
243 pub failure_window: Duration,
244 pub cooldown: Duration,
245}
246
247#[cfg(feature = "iface-backbone")]
248pub(crate) struct BackbonePeerPoolCandidateConfig {
249 pub(crate) client: BackboneClientConfig,
250 pub(crate) mode: u8,
251 pub(crate) ingress_control: rns_core::transport::types::IngressControlConfig,
252 pub(crate) ifac_runtime: IfacRuntimeConfig,
253 pub(crate) ifac_enabled: bool,
254 pub(crate) interface_type_name: String,
255}
256
257#[cfg(feature = "iface-backbone")]
258struct BackbonePeerPool {
259 settings: BackbonePeerPoolSettings,
260 candidates: Vec<BackbonePeerPoolCandidate>,
261}
262
263#[cfg(feature = "iface-backbone")]
264struct BackbonePeerPoolCandidate {
265 config: BackbonePeerPoolCandidateConfig,
266 active_id: Option<InterfaceId>,
267 failures: Vec<f64>,
268 retry_after: Option<f64>,
269 cooldown_until: Option<f64>,
270 last_error: Option<String>,
271}
272
273#[cfg(feature = "hooks")]
275struct EngineRef<'a> {
276 engine: &'a TransportEngine,
277 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
278 link_manager: &'a LinkManager,
279 now: f64,
280}
281
282#[cfg(feature = "hooks")]
283impl<'a> EngineAccess for EngineRef<'a> {
284 fn has_path(&self, dest: &[u8; 16]) -> bool {
285 self.engine.has_path(dest)
286 }
287 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
288 self.engine.hops_to(dest)
289 }
290 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
291 self.engine.next_hop(dest)
292 }
293 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
294 self.engine.is_blackholed(identity, self.now)
295 }
296 fn interface_name(&self, id: u64) -> Option<String> {
297 self.interfaces
298 .get(&InterfaceId(id))
299 .map(|e| e.info.name.clone())
300 }
301 fn interface_mode(&self, id: u64) -> Option<u8> {
302 self.interfaces.get(&InterfaceId(id)).map(|e| e.info.mode)
303 }
304 fn identity_hash(&self) -> Option<[u8; 16]> {
305 self.engine.identity_hash().copied()
306 }
307 fn announce_rate(&self, id: u64) -> Option<i32> {
308 self.interfaces
309 .get(&InterfaceId(id))
310 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
311 }
312 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
313 use rns_core::link::types::LinkState;
314 self.link_manager.link_state(link_hash).map(|s| match s {
315 LinkState::Pending => 0,
316 LinkState::Handshake => 1,
317 LinkState::Active => 2,
318 LinkState::Stale => 3,
319 LinkState::Closed => 4,
320 })
321 }
322}
323
324#[cfg(any(test, feature = "hooks"))]
329fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
330 let mut dest = [0u8; 16];
331 if raw.is_empty() {
332 return dest;
333 }
334 let is_header2 = raw[0] & 0x40 != 0;
335 let start = if is_header2 { 18 } else { 2 };
336 let end = start + 16;
337 if raw.len() >= end {
338 dest.copy_from_slice(&raw[start..end]);
339 }
340 dest
341}
342
343#[cfg(feature = "hooks")]
345fn run_hook_inner(
346 programs: &mut [rns_hooks::LoadedProgram],
347 hook_manager: &Option<HookManager>,
348 engine_access: &dyn EngineAccess,
349 ctx: &HookContext,
350 now: f64,
351 provider_events_enabled: bool,
352) -> Option<rns_hooks::ExecuteResult> {
353 if programs.is_empty() {
354 return None;
355 }
356 let mgr = hook_manager.as_ref()?;
357 mgr.run_chain_with_provider_events(programs, ctx, engine_access, now, provider_events_enabled)
358}
359
360#[cfg(feature = "hooks")]
361fn backbone_peer_hook_context(event: &BackbonePeerHookEvent) -> HookContext<'_> {
362 HookContext::BackbonePeer {
363 server_interface_id: event.server_interface_id.0,
364 peer_interface_id: event.peer_interface_id.map(|id| id.0),
365 peer_ip: event.peer_ip,
366 peer_port: event.peer_port,
367 connected_for: event.connected_for,
368 had_received_data: event.had_received_data,
369 penalty_level: event.penalty_level,
370 blacklist_for: event.blacklist_for,
371 }
372}
373
374#[cfg(feature = "hooks")]
376fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
377 actions
378 .into_iter()
379 .map(|a| {
380 use rns_hooks::ActionWire;
381 match a {
382 ActionWire::SendOnInterface { interface, raw } => {
383 TransportAction::SendOnInterface {
384 interface: InterfaceId(interface),
385 raw: raw.into(),
386 }
387 }
388 ActionWire::BroadcastOnAllInterfaces {
389 raw,
390 exclude,
391 has_exclude,
392 } => TransportAction::BroadcastOnAllInterfaces {
393 raw: raw.into(),
394 exclude: if has_exclude != 0 {
395 Some(InterfaceId(exclude))
396 } else {
397 None
398 },
399 },
400 ActionWire::DeliverLocal {
401 destination_hash,
402 raw,
403 packet_hash,
404 receiving_interface,
405 } => TransportAction::DeliverLocal {
406 destination_hash,
407 raw: raw.into(),
408 packet_hash,
409 receiving_interface: InterfaceId(receiving_interface),
410 },
411 ActionWire::PathUpdated {
412 destination_hash,
413 hops,
414 next_hop,
415 interface,
416 } => TransportAction::PathUpdated {
417 destination_hash,
418 hops,
419 next_hop,
420 interface: InterfaceId(interface),
421 },
422 ActionWire::CacheAnnounce { packet_hash, raw } => TransportAction::CacheAnnounce {
423 packet_hash,
424 raw: raw.into(),
425 },
426 ActionWire::TunnelEstablished {
427 tunnel_id,
428 interface,
429 } => TransportAction::TunnelEstablished {
430 tunnel_id,
431 interface: InterfaceId(interface),
432 },
433 ActionWire::TunnelSynthesize {
434 interface,
435 data,
436 dest_hash,
437 } => TransportAction::TunnelSynthesize {
438 interface: InterfaceId(interface),
439 data,
440 dest_hash,
441 },
442 ActionWire::ForwardToLocalClients {
443 raw,
444 exclude,
445 has_exclude,
446 } => TransportAction::ForwardToLocalClients {
447 raw: raw.into(),
448 exclude: if has_exclude != 0 {
449 Some(InterfaceId(exclude))
450 } else {
451 None
452 },
453 },
454 ActionWire::ForwardPlainBroadcast {
455 raw,
456 to_local,
457 exclude,
458 has_exclude,
459 } => TransportAction::ForwardPlainBroadcast {
460 raw: raw.into(),
461 to_local: to_local != 0,
462 exclude: if has_exclude != 0 {
463 Some(InterfaceId(exclude))
464 } else {
465 None
466 },
467 },
468 ActionWire::AnnounceReceived {
469 destination_hash,
470 identity_hash,
471 public_key,
472 name_hash,
473 random_hash,
474 app_data,
475 hops,
476 receiving_interface,
477 } => TransportAction::AnnounceReceived {
478 destination_hash,
479 identity_hash,
480 public_key,
481 name_hash,
482 random_hash,
483 ratchet: None,
484 app_data,
485 hops,
486 receiving_interface: InterfaceId(receiving_interface),
487 },
488 }
489 })
490 .collect()
491}
492
493fn infer_interface_type(name: &str) -> String {
497 if name.starts_with("TCPServerInterface") {
498 "TCPServerClientInterface".to_string()
499 } else if name.starts_with("BackboneInterface") {
500 "BackboneInterface".to_string()
501 } else if name.starts_with("LocalInterface") {
502 "LocalServerClientInterface".to_string()
503 } else {
504 "AutoInterface".to_string()
507 }
508}
509
510pub use crate::common::callbacks::Callbacks;
511
512#[derive(Clone)]
513struct SharedAnnounceRecord {
514 name_hash: [u8; 10],
515 identity_prv_key: [u8; 64],
516 app_data: Option<Vec<u8>>,
517}
518
519#[derive(Debug, Clone)]
520pub(crate) struct KnownDestinationState {
521 announced: crate::destination::AnnouncedIdentity,
522 was_used: bool,
523 last_used_at: Option<f64>,
524 retained: bool,
525}
526
527pub struct Driver {
529 pub(crate) engine: TransportEngine,
530 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
531 pub(crate) rng: OsRng,
532 pub(crate) rx: EventReceiver,
533 pub(crate) callbacks: Box<dyn Callbacks>,
534 pub(crate) started: f64,
535 pub(crate) lifecycle_state: LifecycleState,
536 pub(crate) drain_started_at: Option<Instant>,
537 pub(crate) drain_deadline: Option<Instant>,
538 pub(crate) listener_controls: Vec<crate::interface::ListenerControl>,
539 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
540 pub(crate) tunnel_synth_dest: [u8; 16],
542 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
544 pub(crate) link_manager: LinkManager,
546 pub(crate) management_config: crate::management::ManagementConfig,
548 pub(crate) last_management_announce: f64,
550 pub(crate) initial_announce_sent: bool,
552 pub(crate) known_destinations: HashMap<[u8; 16], KnownDestinationState>,
554 pub(crate) ratchet_store: Option<Arc<dyn crate::storage::RatchetStore>>,
556 pub(crate) known_destinations_ttl: f64,
558 pub(crate) known_destinations_max_entries: usize,
560 pub(crate) rate_limiter_ttl_secs: f64,
562 pub(crate) path_request_dest: [u8; 16],
564 pub(crate) proof_strategies: HashMap<
567 [u8; 16],
568 (
569 rns_core::types::ProofStrategy,
570 Option<rns_crypto::identity::Identity>,
571 ),
572 >,
573 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
575 pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
577 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
579 shared_announces: HashMap<[u8; 16], SharedAnnounceRecord>,
581 shared_reconnect_pending: HashMap<InterfaceId, bool>,
583 pub(crate) holepunch_manager: HolePunchManager,
585 pub(crate) event_tx: crate::event::EventSender,
587 pub(crate) interface_writer_queue_capacity: usize,
589 pub(crate) tick_interval_ms: Arc<AtomicU64>,
591 #[cfg(feature = "iface-backbone")]
593 pub(crate) backbone_runtime: HashMap<String, BackboneRuntimeConfigHandle>,
594 #[cfg(feature = "iface-backbone")]
596 pub(crate) backbone_peer_state: HashMap<String, BackbonePeerStateHandle>,
597 #[cfg(feature = "iface-backbone")]
599 pub(crate) backbone_client_runtime: HashMap<String, BackboneClientRuntimeConfigHandle>,
600 #[cfg(feature = "iface-backbone")]
602 pub(crate) backbone_discovery_runtime: HashMap<String, BackboneDiscoveryRuntimeHandle>,
603 #[cfg(feature = "iface-backbone")]
605 backbone_peer_pool: Option<BackbonePeerPool>,
606 #[cfg(feature = "iface-tcp")]
608 pub(crate) tcp_server_runtime: HashMap<String, TcpServerRuntimeConfigHandle>,
609 #[cfg(feature = "iface-tcp")]
611 pub(crate) tcp_client_runtime: HashMap<String, TcpClientRuntimeConfigHandle>,
612 #[cfg(feature = "iface-tcp")]
614 pub(crate) tcp_server_discovery_runtime: HashMap<String, TcpServerDiscoveryRuntimeHandle>,
615 #[cfg(feature = "iface-udp")]
617 pub(crate) udp_runtime: HashMap<String, UdpRuntimeConfigHandle>,
618 #[cfg(feature = "iface-auto")]
620 pub(crate) auto_runtime: HashMap<String, AutoRuntimeConfigHandle>,
621 #[cfg(feature = "iface-i2p")]
623 pub(crate) i2p_runtime: HashMap<String, I2pRuntimeConfigHandle>,
624 #[cfg(feature = "iface-pipe")]
626 pub(crate) pipe_runtime: HashMap<String, PipeRuntimeConfigHandle>,
627 #[cfg(feature = "iface-rnode")]
629 pub(crate) rnode_runtime: HashMap<String, RNodeRuntimeConfigHandle>,
630 pub(crate) interface_runtime_defaults:
632 HashMap<String, rns_core::transport::types::InterfaceInfo>,
633 pub(crate) interface_ifac_runtime: HashMap<String, IfacRuntimeConfig>,
635 pub(crate) interface_ifac_runtime_defaults: HashMap<String, IfacRuntimeConfig>,
637 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
639 pub(crate) discovery_required_value: u8,
641 pub(crate) discovery_name_hash: [u8; 10],
643 pub(crate) probe_responder_hash: Option<[u8; 16]>,
645 pub(crate) discover_interfaces: bool,
647 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
649 pub(crate) announce_verify_queue: Arc<Mutex<AnnounceVerifyQueue>>,
651 pub(crate) async_announce_verification: bool,
653 pub(crate) discovery_cleanup_counter: u32,
655 pub(crate) discovery_cleanup_interval_ticks: u32,
657 pub(crate) memory_stats_counter: u32,
659 pub(crate) cache_cleanup_counter: u32,
661 pub(crate) announce_cache_cleanup_counter: u32,
663 pub(crate) known_destinations_cleanup_interval_ticks: u32,
665 pub(crate) known_destinations_cap_evict_count: usize,
667 pub(crate) announce_cache_cleanup_interval_ticks: u32,
669 pub(crate) cache_cleanup_active_hashes: Option<Vec<[u8; 32]>>,
671 pub(crate) cache_cleanup_entries: Option<std::fs::ReadDir>,
673 pub(crate) cache_cleanup_removed: usize,
675 pub(crate) announce_cache_cleanup_batch_size: usize,
677 pub(crate) management_announce_interval_secs: f64,
679 pub(crate) runtime_config_defaults: RuntimeConfigDefaults,
681 #[cfg(feature = "hooks")]
683 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
684 #[cfg(feature = "hooks")]
686 pub(crate) hook_manager: Option<HookManager>,
687 #[cfg(feature = "hooks")]
688 pub(crate) provider_bridge: Option<ProviderBridge>,
689}
690
691impl Driver {
692 pub fn new(
694 config: TransportConfig,
695 rx: EventReceiver,
696 tx: crate::event::EventSender,
697 callbacks: Box<dyn Callbacks>,
698 ) -> Self {
699 let announce_queue_max_entries = config.announce_queue_max_entries;
700 let tunnel_synth_dest = rns_core::destination::destination_hash(
701 "rnstransport",
702 &["tunnel", "synthesize"],
703 None,
704 );
705 let path_request_dest =
706 rns_core::destination::destination_hash("rnstransport", &["path", "request"], None);
707 let discovery_name_hash = crate::discovery::discovery_name_hash();
708 let mut engine = TransportEngine::new(config);
709 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
710 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
712 let mut local_destinations = HashMap::new();
715 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
716 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
717 let runtime_config_defaults = RuntimeConfigDefaults {
718 tick_interval_ms: DEFAULT_TICK_INTERVAL_MS,
719 known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
720 rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
721 known_destinations_cleanup_interval_ticks:
722 DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS,
723 announce_cache_cleanup_interval_ticks: DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS,
724 announce_cache_cleanup_batch_size: DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE,
725 discovery_cleanup_interval_ticks: DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS,
726 management_announce_interval_secs: DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS,
727 direct_connect_policy: crate::event::HolePunchPolicy::default(),
728 #[cfg(feature = "hooks")]
729 provider_queue_max_events: crate::provider_bridge::ProviderBridgeConfig::default()
730 .queue_max_events,
731 #[cfg(feature = "hooks")]
732 provider_queue_max_bytes: crate::provider_bridge::ProviderBridgeConfig::default()
733 .queue_max_bytes,
734 };
735 Driver {
736 engine,
737 interfaces: HashMap::new(),
738 rng: OsRng,
739 rx,
740 callbacks,
741 started: time::now(),
742 lifecycle_state: LifecycleState::Active,
743 drain_started_at: None,
744 drain_deadline: None,
745 listener_controls: Vec::new(),
746 announce_cache: None,
747 tunnel_synth_dest,
748 transport_identity: None,
749 link_manager: LinkManager::new(),
750 management_config: Default::default(),
751 last_management_announce: 0.0,
752 initial_announce_sent: false,
753 known_destinations: HashMap::new(),
754 ratchet_store: None,
755 known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
756 known_destinations_max_entries: DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES,
757 rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
758 path_request_dest,
759 proof_strategies: HashMap::new(),
760 sent_packets: HashMap::new(),
761 completed_proofs: HashMap::new(),
762 local_destinations,
763 shared_announces: HashMap::new(),
764 shared_reconnect_pending: HashMap::new(),
765 holepunch_manager: HolePunchManager::new(
766 vec![],
767 rns_core::holepunch::ProbeProtocol::Rnsp,
768 None,
769 ),
770 event_tx: tx,
771 interface_writer_queue_capacity: crate::interface::DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY,
772 tick_interval_ms: Arc::new(AtomicU64::new(DEFAULT_TICK_INTERVAL_MS)),
773 #[cfg(feature = "iface-backbone")]
774 backbone_runtime: HashMap::new(),
775 #[cfg(feature = "iface-backbone")]
776 backbone_peer_state: HashMap::new(),
777 #[cfg(feature = "iface-backbone")]
778 backbone_client_runtime: HashMap::new(),
779 #[cfg(feature = "iface-backbone")]
780 backbone_discovery_runtime: HashMap::new(),
781 #[cfg(feature = "iface-backbone")]
782 backbone_peer_pool: None,
783 #[cfg(feature = "iface-tcp")]
784 tcp_server_runtime: HashMap::new(),
785 #[cfg(feature = "iface-tcp")]
786 tcp_client_runtime: HashMap::new(),
787 #[cfg(feature = "iface-tcp")]
788 tcp_server_discovery_runtime: HashMap::new(),
789 #[cfg(feature = "iface-udp")]
790 udp_runtime: HashMap::new(),
791 #[cfg(feature = "iface-auto")]
792 auto_runtime: HashMap::new(),
793 #[cfg(feature = "iface-i2p")]
794 i2p_runtime: HashMap::new(),
795 #[cfg(feature = "iface-pipe")]
796 pipe_runtime: HashMap::new(),
797 #[cfg(feature = "iface-rnode")]
798 rnode_runtime: HashMap::new(),
799 interface_runtime_defaults: HashMap::new(),
800 interface_ifac_runtime: HashMap::new(),
801 interface_ifac_runtime_defaults: HashMap::new(),
802 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
803 std::env::temp_dir().join("rns-discovered-interfaces"),
804 ),
805 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
806 discovery_name_hash,
807 probe_responder_hash: None,
808 discover_interfaces: false,
809 interface_announcer: None,
810 announce_verify_queue: Arc::new(Mutex::new(AnnounceVerifyQueue::new(
811 announce_queue_max_entries,
812 ))),
813 async_announce_verification: false,
814 discovery_cleanup_counter: 0,
815 discovery_cleanup_interval_ticks: runtime_config_defaults
816 .discovery_cleanup_interval_ticks,
817 memory_stats_counter: 0,
818 cache_cleanup_counter: 0,
819 announce_cache_cleanup_counter: 0,
820 known_destinations_cleanup_interval_ticks: runtime_config_defaults
821 .known_destinations_cleanup_interval_ticks,
822 known_destinations_cap_evict_count: 0,
823 announce_cache_cleanup_interval_ticks: runtime_config_defaults
824 .announce_cache_cleanup_interval_ticks,
825 cache_cleanup_active_hashes: None,
826 cache_cleanup_entries: None,
827 cache_cleanup_removed: 0,
828 announce_cache_cleanup_batch_size: runtime_config_defaults
829 .announce_cache_cleanup_batch_size,
830 management_announce_interval_secs: runtime_config_defaults
831 .management_announce_interval_secs,
832 runtime_config_defaults,
833 #[cfg(feature = "hooks")]
834 hook_slots: create_hook_slots(),
835 #[cfg(feature = "hooks")]
836 hook_manager: HookManager::new().ok(),
837 #[cfg(feature = "hooks")]
838 provider_bridge: None,
839 }
840 }
841
842 pub fn set_announce_verify_queue_config(
843 &mut self,
844 max_entries: usize,
845 max_bytes: usize,
846 max_stale_secs: f64,
847 overflow_policy: OverflowPolicy,
848 ) {
849 self.announce_verify_queue = Arc::new(Mutex::new(AnnounceVerifyQueue::with_limits(
850 max_entries,
851 max_bytes,
852 max_stale_secs,
853 overflow_policy,
854 )));
855 }
856
857 fn wrap_interface_writer(
858 &self,
859 interface_id: InterfaceId,
860 interface_name: &str,
861 writer: Box<dyn crate::interface::Writer>,
862 ) -> (
863 Box<dyn crate::interface::Writer>,
864 crate::interface::AsyncWriterMetrics,
865 ) {
866 crate::interface::wrap_async_writer(
867 writer,
868 interface_id,
869 interface_name,
870 self.event_tx.clone(),
871 self.interface_writer_queue_capacity,
872 )
873 }
874
875 #[cfg(feature = "hooks")]
876 fn provider_events_enabled(&self) -> bool {
877 self.provider_bridge.is_some()
878 }
879
880 #[cfg(feature = "hooks")]
881 fn run_backbone_peer_hook(
882 &mut self,
883 attach_point: &str,
884 point: HookPoint,
885 event: &BackbonePeerHookEvent,
886 ) {
887 let ctx = backbone_peer_hook_context(event);
888 let now = time::now();
889 let engine_ref = EngineRef {
890 engine: &self.engine,
891 interfaces: &self.interfaces,
892 link_manager: &self.link_manager,
893 now,
894 };
895 let provider_events_enabled = self.provider_events_enabled();
896 if let Some(ref e) = run_hook_inner(
897 &mut self.hook_slots[point as usize].programs,
898 &self.hook_manager,
899 &engine_ref,
900 &ctx,
901 now,
902 provider_events_enabled,
903 ) {
904 self.forward_hook_side_effects(attach_point, e);
905 }
906 }
907
908 #[cfg(feature = "iface-backbone")]
909 fn make_discoverable_interface(
910 runtime: &BackboneDiscoveryRuntimeHandle,
911 ) -> crate::discovery::DiscoverableInterface {
912 crate::discovery::DiscoverableInterface {
913 interface_name: runtime.interface_name.clone(),
914 config: runtime.current.config.clone(),
915 transport_enabled: runtime.current.transport_enabled,
916 ifac_netname: runtime.current.ifac_netname.clone(),
917 ifac_netkey: runtime.current.ifac_netkey.clone(),
918 }
919 }
920
921 #[cfg(feature = "iface-backbone")]
922 fn sync_backbone_discovery_runtime(
923 &mut self,
924 interface_name: &str,
925 ) -> Result<(), RuntimeConfigError> {
926 let handle = self
927 .backbone_discovery_runtime
928 .get(interface_name)
929 .ok_or(RuntimeConfigError {
930 code: RuntimeConfigErrorCode::NotFound,
931 message: format!("backbone interface '{}' not found", interface_name),
932 })?
933 .clone();
934
935 if handle.current.discoverable {
936 let iface = Self::make_discoverable_interface(&handle);
937 if let Some(announcer) = self.interface_announcer.as_mut() {
938 announcer.upsert_interface(iface);
939 } else if let Some(identity) = self.transport_identity.as_ref() {
940 self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
941 *identity.hash(),
942 vec![iface],
943 ));
944 }
945 } else if let Some(announcer) = self.interface_announcer.as_mut() {
946 announcer.remove_interface(interface_name);
947 if announcer.is_empty() {
948 self.interface_announcer = None;
949 }
950 }
951
952 Ok(())
953 }
954
955 #[cfg(feature = "iface-tcp")]
956 fn make_tcp_server_discoverable_interface(
957 runtime: &TcpServerDiscoveryRuntimeHandle,
958 ) -> crate::discovery::DiscoverableInterface {
959 crate::discovery::DiscoverableInterface {
960 interface_name: runtime.interface_name.clone(),
961 config: runtime.current.config.clone(),
962 transport_enabled: runtime.current.transport_enabled,
963 ifac_netname: runtime.current.ifac_netname.clone(),
964 ifac_netkey: runtime.current.ifac_netkey.clone(),
965 }
966 }
967
968 #[cfg(feature = "iface-tcp")]
969 fn sync_tcp_server_discovery_runtime(
970 &mut self,
971 interface_name: &str,
972 ) -> Result<(), RuntimeConfigError> {
973 let handle = self
974 .tcp_server_discovery_runtime
975 .get(interface_name)
976 .ok_or(RuntimeConfigError {
977 code: RuntimeConfigErrorCode::NotFound,
978 message: format!("tcp server interface '{}' not found", interface_name),
979 })?
980 .clone();
981
982 if handle.current.discoverable {
983 let iface = Self::make_tcp_server_discoverable_interface(&handle);
984 if let Some(announcer) = self.interface_announcer.as_mut() {
985 announcer.upsert_interface(iface);
986 } else if let Some(identity) = self.transport_identity.as_ref() {
987 self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
988 *identity.hash(),
989 vec![iface],
990 ));
991 }
992 } else if let Some(announcer) = self.interface_announcer.as_mut() {
993 announcer.remove_interface(interface_name);
994 if announcer.is_empty() {
995 self.interface_announcer = None;
996 }
997 }
998
999 Ok(())
1000 }
1001
1002 #[cfg(feature = "hooks")]
1003 fn update_hook_program<F>(
1004 &mut self,
1005 name: &str,
1006 attach_point: &str,
1007 mut update: F,
1008 ) -> Result<(), String>
1009 where
1010 F: FnMut(&mut rns_hooks::LoadedProgram),
1011 {
1012 let point_idx = crate::config::parse_hook_point(attach_point)
1013 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1014 let program = self.hook_slots[point_idx]
1015 .programs
1016 .iter_mut()
1017 .find(|program| program.name == name)
1018 .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
1019 update(program);
1020 Ok(())
1021 }
1022
1023 pub(crate) fn set_tick_interval_handle(&mut self, tick_interval_ms: Arc<AtomicU64>) {
1024 self.tick_interval_ms = tick_interval_ms;
1025 }
1026
1027 pub(crate) fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
1028 self.engine.set_packet_hashlist_max_entries(max_entries);
1029 }
1030
1031 #[cfg(feature = "hooks")]
1032 fn forward_hook_side_effects(&mut self, attach_point: &str, exec: &rns_hooks::ExecuteResult) {
1033 if !exec.injected_actions.is_empty() {
1034 self.dispatch_all(convert_injected_actions(exec.injected_actions.clone()));
1035 }
1036 if let Some(ref bridge) = self.provider_bridge {
1037 for event in &exec.provider_events {
1038 bridge.emit_event(
1039 attach_point,
1040 event.hook_name.clone(),
1041 event.payload_type.clone(),
1042 event.payload.clone(),
1043 );
1044 }
1045 }
1046 }
1047
1048 #[cfg(feature = "hooks")]
1049 fn collect_hook_side_effects(
1050 &mut self,
1051 attach_point: &str,
1052 exec: &rns_hooks::ExecuteResult,
1053 out: &mut Vec<TransportAction>,
1054 ) {
1055 if !exec.injected_actions.is_empty() {
1056 out.extend(convert_injected_actions(exec.injected_actions.clone()));
1057 }
1058 if let Some(ref bridge) = self.provider_bridge {
1059 for event in &exec.provider_events {
1060 bridge.emit_event(
1061 attach_point,
1062 event.hook_name.clone(),
1063 event.payload_type.clone(),
1064 event.payload.clone(),
1065 );
1066 }
1067 }
1068 }
1069
1070 pub fn set_probe_config(
1072 &mut self,
1073 addrs: Vec<std::net::SocketAddr>,
1074 protocol: rns_core::holepunch::ProbeProtocol,
1075 device: Option<String>,
1076 ) {
1077 self.holepunch_manager = HolePunchManager::new(addrs, protocol, device);
1078 }
1079}