1use std::collections::HashMap;
4use std::io;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8
9use rns_core::packet::RawPacket;
10use rns_core::transport::announce_verify_queue::{AnnounceVerifyQueue, OverflowPolicy};
11use rns_core::transport::tables::PathEntry;
12use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
13use rns_core::transport::TransportEngine;
14use rns_crypto::{OsRng, Rng};
15
16#[cfg(feature = "hooks")]
17use crate::provider_bridge::ProviderBridge;
18#[cfg(feature = "hooks")]
19use rns_hooks::{create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot};
20
21#[cfg(feature = "hooks")]
22use crate::event::BackbonePeerHookEvent;
23use crate::event::{
24 BackbonePeerPoolMemberStatus, BackbonePeerPoolStatus, BackbonePeerStateEntry, BlackholeInfo,
25 DrainStatus, Event, EventReceiver, InterfaceStatsResponse, KnownDestinationEntry,
26 LifecycleState, LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest,
27 QueryResponse, RateTableEntry, RuntimeConfigApplyMode, RuntimeConfigEntry, RuntimeConfigError,
28 RuntimeConfigErrorCode, RuntimeConfigSource, RuntimeConfigValue, SingleInterfaceStat,
29};
30use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
31use crate::ifac;
32#[cfg(all(feature = "iface-auto", test))]
33use crate::interface::auto::AutoRuntime;
34#[cfg(feature = "iface-auto")]
35use crate::interface::auto::AutoRuntimeConfigHandle;
36#[cfg(feature = "iface-backbone")]
37use crate::interface::backbone::{
38 start_client, BackboneClientConfig, BackboneClientRuntime, BackboneClientRuntimeConfigHandle,
39 BackbonePeerStateHandle, BackboneRuntimeConfigHandle,
40};
41#[cfg(all(feature = "iface-backbone", target_os = "linux", test))]
42use crate::interface::backbone::{BackboneAbuseConfig, BackboneServerRuntime};
43#[cfg(all(feature = "iface-i2p", test))]
44use crate::interface::i2p::I2pRuntime;
45#[cfg(feature = "iface-i2p")]
46use crate::interface::i2p::I2pRuntimeConfigHandle;
47#[cfg(all(feature = "iface-pipe", test))]
48use crate::interface::pipe::PipeRuntime;
49#[cfg(feature = "iface-pipe")]
50use crate::interface::pipe::PipeRuntimeConfigHandle;
51#[cfg(feature = "iface-rnode")]
52use crate::interface::rnode::{
53 validate_sub_config, RNodeRuntime, RNodeRuntimeConfigHandle, RNodeSubConfig,
54};
55#[cfg(feature = "iface-tcp")]
56use crate::interface::tcp::TcpClientRuntimeConfigHandle;
57#[cfg(all(feature = "iface-tcp", test))]
58use crate::interface::tcp_server::TcpServerRuntime;
59#[cfg(feature = "iface-tcp")]
60use crate::interface::tcp_server::TcpServerRuntimeConfigHandle;
61#[cfg(all(feature = "iface-udp", test))]
62use crate::interface::udp::UdpRuntime;
63#[cfg(feature = "iface-udp")]
64use crate::interface::udp::UdpRuntimeConfigHandle;
65use crate::interface::{InterfaceEntry, InterfaceStats};
66use crate::link_manager::{LinkManager, LinkManagerAction};
67use crate::time;
68
69const DEFAULT_KNOWN_DESTINATIONS_TTL: f64 = 48.0 * 60.0 * 60.0;
70const DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES: usize = 8192;
71const DEFAULT_RATE_LIMITER_TTL_SECS: f64 = 48.0 * 60.0 * 60.0;
72const DEFAULT_TICK_INTERVAL_MS: u64 = 1000;
73const DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS: u32 = 3600;
74const DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS: u32 = 3600;
75const DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE: usize = 10_000;
76const DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS: u32 = 3600;
77const DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS: f64 = 300.0;
78const DEFAULT_LINK_TEARDOWN_FLUSH: Duration = Duration::from_millis(150);
79const SEND_RETRY_BACKOFF_MIN: Duration = Duration::from_millis(25);
80const SEND_RETRY_BACKOFF_MAX: Duration = Duration::from_millis(1000);
81
82#[derive(Debug, Clone, Copy, PartialEq)]
84pub struct AnnounceRateDefaults {
85 pub target: Option<f64>,
86 pub penalty: f64,
87 pub grace: u32,
88}
89
90impl Default for AnnounceRateDefaults {
91 fn default() -> Self {
92 Self {
93 target: Some(3600.0),
94 penalty: 0.0,
95 grace: 5,
96 }
97 }
98}
99
100mod dispatch;
101mod events;
102mod lifecycle;
103mod queries;
104mod runtime_config;
105
106#[cfg(test)]
107mod tests;
108
109fn inject_transport_header(raw: &[u8], next_hop: &[u8; 16]) -> Vec<u8> {
110 if raw.len() < 18 {
111 return raw.to_vec();
112 }
113
114 let new_flags = (rns_core::constants::HEADER_2 << 6)
115 | (rns_core::constants::TRANSPORT_TRANSPORT << 4)
116 | (raw[0] & 0x0F);
117
118 let mut new_raw = Vec::with_capacity(raw.len() + 16);
119 new_raw.push(new_flags);
120 new_raw.push(raw[1]);
121 new_raw.extend_from_slice(next_hop);
122 new_raw.extend_from_slice(&raw[2..]);
123 new_raw
124}
125
126fn recover_mutex_guard<'a, T>(mutex: &'a Mutex<T>, label: &str) -> std::sync::MutexGuard<'a, T> {
127 match mutex.lock() {
128 Ok(guard) => guard,
129 Err(poisoned) => {
130 log::error!("recovering from poisoned mutex: {}", label);
131 poisoned.into_inner()
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy)]
137pub(crate) struct RuntimeConfigDefaults {
138 pub(crate) tick_interval_ms: u64,
139 pub(crate) known_destinations_ttl: f64,
140 pub(crate) rate_limiter_ttl_secs: f64,
141 pub(crate) known_destinations_cleanup_interval_ticks: u32,
142 pub(crate) announce_cache_cleanup_interval_ticks: u32,
143 pub(crate) announce_cache_cleanup_batch_size: usize,
144 pub(crate) discovery_cleanup_interval_ticks: u32,
145 pub(crate) management_announce_interval_secs: f64,
146 pub(crate) direct_connect_policy: crate::event::HolePunchPolicy,
147 #[cfg(feature = "hooks")]
148 pub(crate) provider_queue_max_events: usize,
149 #[cfg(feature = "hooks")]
150 pub(crate) provider_queue_max_bytes: usize,
151}
152
153#[cfg(feature = "iface-backbone")]
154#[derive(Debug, Clone)]
155pub(crate) struct BackboneDiscoveryRuntime {
156 pub(crate) discoverable: bool,
157 pub(crate) config: crate::discovery::DiscoveryConfig,
158 pub(crate) transport_enabled: bool,
159 pub(crate) ifac_netname: Option<String>,
160 pub(crate) ifac_netkey: Option<String>,
161}
162
163#[cfg(feature = "iface-backbone")]
164#[derive(Debug, Clone)]
165pub(crate) struct BackboneDiscoveryRuntimeHandle {
166 pub(crate) interface_name: String,
167 pub(crate) current: BackboneDiscoveryRuntime,
168 pub(crate) startup: BackboneDiscoveryRuntime,
169}
170
171#[cfg(feature = "iface-tcp")]
172#[derive(Debug, Clone)]
173pub(crate) struct TcpServerDiscoveryRuntime {
174 pub(crate) discoverable: bool,
175 pub(crate) config: crate::discovery::DiscoveryConfig,
176 pub(crate) transport_enabled: bool,
177 pub(crate) ifac_netname: Option<String>,
178 pub(crate) ifac_netkey: Option<String>,
179}
180
181#[cfg(feature = "iface-tcp")]
182#[derive(Debug, Clone)]
183pub(crate) struct TcpServerDiscoveryRuntimeHandle {
184 pub(crate) interface_name: String,
185 pub(crate) current: TcpServerDiscoveryRuntime,
186 pub(crate) startup: TcpServerDiscoveryRuntime,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub(crate) struct IfacRuntimeConfig {
191 pub(crate) netname: Option<String>,
192 pub(crate) netkey: Option<String>,
193 pub(crate) size: usize,
194}
195
196#[cfg(feature = "iface-backbone")]
197impl BackboneDiscoveryRuntimeHandle {
198 pub(crate) fn from_parts(
199 interface_name: String,
200 startup_config: crate::discovery::DiscoveryConfig,
201 transport_enabled: bool,
202 ifac_netname: Option<String>,
203 ifac_netkey: Option<String>,
204 discoverable: bool,
205 ) -> Self {
206 let startup = BackboneDiscoveryRuntime {
207 discoverable,
208 config: startup_config,
209 transport_enabled,
210 ifac_netname,
211 ifac_netkey,
212 };
213 Self {
214 interface_name,
215 current: startup.clone(),
216 startup,
217 }
218 }
219}
220
221#[cfg(feature = "iface-tcp")]
222impl TcpServerDiscoveryRuntimeHandle {
223 pub(crate) fn from_parts(
224 interface_name: String,
225 startup_config: crate::discovery::DiscoveryConfig,
226 transport_enabled: bool,
227 ifac_netname: Option<String>,
228 ifac_netkey: Option<String>,
229 discoverable: bool,
230 ) -> Self {
231 let startup = TcpServerDiscoveryRuntime {
232 discoverable,
233 config: startup_config,
234 transport_enabled,
235 ifac_netname,
236 ifac_netkey,
237 };
238 Self {
239 interface_name,
240 current: startup.clone(),
241 startup,
242 }
243 }
244}
245
246impl IfacRuntimeConfig {
247 pub(crate) fn from_parts(netname: Option<String>, netkey: Option<String>, size: usize) -> Self {
248 Self {
249 netname,
250 netkey,
251 size,
252 }
253 }
254}
255
256#[cfg(feature = "iface-backbone")]
257#[derive(Debug, Clone)]
258pub struct BackbonePeerPoolSettings {
259 pub max_connected: usize,
260 pub failure_threshold: usize,
261 pub failure_window: Duration,
262 pub cooldown: Duration,
263}
264
265#[cfg(feature = "iface-backbone")]
266pub(crate) const BACKBONE_PEER_POOL_CONFIGURED_DEFAULT_PRIORITY: u8 = 60;
267#[cfg(feature = "iface-backbone")]
268pub(crate) const BACKBONE_PEER_POOL_DISCOVERED_PRIORITY: u8 = 40;
269
270#[cfg(feature = "iface-backbone")]
271pub(crate) struct BackbonePeerPoolCandidateConfig {
272 pub(crate) client: BackboneClientConfig,
273 pub(crate) mode: u8,
274 pub(crate) ingress_control: rns_core::transport::types::IngressControlConfig,
275 pub(crate) ifac_runtime: IfacRuntimeConfig,
276 pub(crate) ifac_enabled: bool,
277 pub(crate) interface_type_name: String,
278 pub(crate) source: BackbonePeerPoolCandidateSource,
279 pub(crate) priority: u8,
280 pub(crate) discovery: Option<BackbonePeerPoolDiscoveryCandidate>,
281}
282
283#[cfg(feature = "iface-backbone")]
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
285pub(crate) enum BackbonePeerPoolCandidateSource {
286 Configured,
287 Discovered,
288}
289
290#[cfg(feature = "iface-backbone")]
291impl BackbonePeerPoolCandidateSource {
292 fn as_str(self) -> &'static str {
293 match self {
294 Self::Configured => "configured",
295 Self::Discovered => "discovered",
296 }
297 }
298}
299
300#[cfg(feature = "iface-backbone")]
301#[derive(Debug, Clone)]
302pub(crate) struct BackbonePeerPoolDiscoveryCandidate {
303 pub(crate) discovery_hash: [u8; 32],
304 pub(crate) status: crate::discovery::DiscoveredStatus,
305 pub(crate) hops: u8,
306 pub(crate) stamp_value: u32,
307 pub(crate) last_heard: f64,
308}
309
310#[cfg(feature = "iface-backbone")]
311struct BackbonePeerPool {
312 settings: BackbonePeerPoolSettings,
313 candidates: Vec<BackbonePeerPoolCandidate>,
314}
315
316#[cfg(feature = "iface-backbone")]
317struct BackbonePeerPoolCandidate {
318 config: BackbonePeerPoolCandidateConfig,
319 active_id: Option<InterfaceId>,
320 failures: Vec<f64>,
321 retry_after: Option<f64>,
322 cooldown_until: Option<f64>,
323 last_error: Option<String>,
324}
325
326#[cfg(feature = "hooks")]
328struct EngineRef<'a> {
329 engine: &'a TransportEngine,
330 interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
331 link_manager: &'a LinkManager,
332 now: f64,
333}
334
335#[cfg(feature = "hooks")]
336impl<'a> EngineAccess for EngineRef<'a> {
337 fn has_path(&self, dest: &[u8; 16]) -> bool {
338 self.engine.has_path(dest)
339 }
340 fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
341 self.engine.hops_to(dest)
342 }
343 fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
344 self.engine.next_hop(dest)
345 }
346 fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
347 self.engine.is_blackholed(identity, self.now)
348 }
349 fn interface_name(&self, id: u64) -> Option<String> {
350 self.interfaces
351 .get(&InterfaceId(id))
352 .map(|e| e.info.name.clone())
353 }
354 fn interface_mode(&self, id: u64) -> Option<u8> {
355 self.interfaces.get(&InterfaceId(id)).map(|e| e.info.mode)
356 }
357 fn identity_hash(&self) -> Option<[u8; 16]> {
358 self.engine.identity_hash().copied()
359 }
360 fn announce_rate(&self, id: u64) -> Option<i32> {
361 self.interfaces
362 .get(&InterfaceId(id))
363 .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
364 }
365 fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
366 use rns_core::link::types::LinkState;
367 self.link_manager.link_state(link_hash).map(|s| match s {
368 LinkState::Pending => 0,
369 LinkState::Handshake => 1,
370 LinkState::Active => 2,
371 LinkState::Stale => 3,
372 LinkState::Closed => 4,
373 })
374 }
375}
376
377#[cfg(any(test, feature = "hooks"))]
382fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
383 let mut dest = [0u8; 16];
384 if raw.is_empty() {
385 return dest;
386 }
387 let is_header2 = raw[0] & 0x40 != 0;
388 let start = if is_header2 { 18 } else { 2 };
389 let end = start + 16;
390 if raw.len() >= end {
391 dest.copy_from_slice(&raw[start..end]);
392 }
393 dest
394}
395
396#[cfg(feature = "hooks")]
398fn run_hook_inner(
399 programs: &mut [rns_hooks::LoadedProgram],
400 hook_manager: &Option<HookManager>,
401 engine_access: &dyn EngineAccess,
402 ctx: &HookContext,
403 now: f64,
404 provider_events_enabled: bool,
405) -> Option<rns_hooks::ExecuteResult> {
406 if programs.is_empty() {
407 return None;
408 }
409 let mgr = hook_manager.as_ref()?;
410 mgr.run_chain_with_provider_events(programs, ctx, engine_access, now, provider_events_enabled)
411}
412
413#[cfg(feature = "hooks")]
414fn backbone_peer_hook_context(event: &BackbonePeerHookEvent) -> HookContext<'_> {
415 HookContext::BackbonePeer {
416 server_interface_id: event.server_interface_id.0,
417 peer_interface_id: event.peer_interface_id.map(|id| id.0),
418 peer_ip: event.peer_ip,
419 peer_port: event.peer_port,
420 connected_for: event.connected_for,
421 had_received_data: event.had_received_data,
422 penalty_level: event.penalty_level,
423 blacklist_for: event.blacklist_for,
424 }
425}
426
427#[cfg(feature = "hooks")]
429fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
430 actions
431 .into_iter()
432 .map(|a| {
433 use rns_hooks::ActionWire;
434 match a {
435 ActionWire::SendOnInterface { interface, raw } => {
436 TransportAction::SendOnInterface {
437 interface: InterfaceId(interface),
438 raw: raw.into(),
439 }
440 }
441 ActionWire::BroadcastOnAllInterfaces {
442 raw,
443 exclude,
444 has_exclude,
445 } => TransportAction::BroadcastOnAllInterfaces {
446 raw: raw.into(),
447 exclude: if has_exclude != 0 {
448 Some(InterfaceId(exclude))
449 } else {
450 None
451 },
452 },
453 ActionWire::DeliverLocal {
454 destination_hash,
455 raw,
456 packet_hash,
457 receiving_interface,
458 } => TransportAction::DeliverLocal {
459 destination_hash,
460 raw: raw.into(),
461 packet_hash,
462 receiving_interface: InterfaceId(receiving_interface),
463 },
464 ActionWire::PathUpdated {
465 destination_hash,
466 hops,
467 next_hop,
468 interface,
469 } => TransportAction::PathUpdated {
470 destination_hash,
471 hops,
472 next_hop,
473 interface: InterfaceId(interface),
474 },
475 ActionWire::CacheAnnounce { packet_hash, raw } => TransportAction::CacheAnnounce {
476 packet_hash,
477 raw: raw.into(),
478 },
479 ActionWire::TunnelEstablished {
480 tunnel_id,
481 interface,
482 } => TransportAction::TunnelEstablished {
483 tunnel_id,
484 interface: InterfaceId(interface),
485 },
486 ActionWire::TunnelSynthesize {
487 interface,
488 data,
489 dest_hash,
490 } => TransportAction::TunnelSynthesize {
491 interface: InterfaceId(interface),
492 data,
493 dest_hash,
494 },
495 ActionWire::ForwardToLocalClients {
496 raw,
497 exclude,
498 has_exclude,
499 } => TransportAction::ForwardToLocalClients {
500 raw: raw.into(),
501 exclude: if has_exclude != 0 {
502 Some(InterfaceId(exclude))
503 } else {
504 None
505 },
506 },
507 ActionWire::ForwardPlainBroadcast {
508 raw,
509 to_local,
510 exclude,
511 has_exclude,
512 } => TransportAction::ForwardPlainBroadcast {
513 raw: raw.into(),
514 to_local: to_local != 0,
515 exclude: if has_exclude != 0 {
516 Some(InterfaceId(exclude))
517 } else {
518 None
519 },
520 },
521 ActionWire::AnnounceReceived {
522 destination_hash,
523 identity_hash,
524 public_key,
525 name_hash,
526 random_hash,
527 app_data,
528 hops,
529 receiving_interface,
530 } => TransportAction::AnnounceReceived {
531 destination_hash,
532 identity_hash,
533 public_key,
534 name_hash,
535 random_hash,
536 ratchet: None,
537 app_data,
538 hops,
539 receiving_interface: InterfaceId(receiving_interface),
540 rx: rns_core::transport::RxMetadata::default(),
541 },
542 }
543 })
544 .collect()
545}
546
547fn infer_interface_type(name: &str) -> String {
551 if name.starts_with("TCPServerInterface") {
552 "TCPServerClientInterface".to_string()
553 } else if name.starts_with("BackboneInterface") {
554 "BackboneInterface".to_string()
555 } else if name.starts_with("LocalInterface") {
556 "LocalServerClientInterface".to_string()
557 } else {
558 "AutoInterface".to_string()
561 }
562}
563
564pub use crate::common::callbacks::Callbacks;
565
566#[derive(Clone)]
567struct SharedAnnounceRecord {
568 name_hash: [u8; 10],
569 identity_prv_key: [u8; 64],
570 app_data: Option<Vec<u8>>,
571}
572
573#[derive(Debug, Clone)]
574pub(crate) struct KnownDestinationState {
575 announced: crate::destination::AnnouncedIdentity,
576 was_used: bool,
577 last_used_at: Option<f64>,
578 retained: bool,
579}
580
581pub struct Driver {
583 pub(crate) engine: TransportEngine,
584 pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
585 pub(crate) rng: OsRng,
586 pub(crate) rx: EventReceiver,
587 pub(crate) callbacks: Box<dyn Callbacks>,
588 pub(crate) started: f64,
589 pub(crate) lifecycle_state: LifecycleState,
590 pub(crate) drain_started_at: Option<Instant>,
591 pub(crate) drain_deadline: Option<Instant>,
592 pub(crate) listener_controls: Vec<crate::interface::ListenerControl>,
593 pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
594 pub(crate) tunnel_synth_dest: [u8; 16],
596 pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
598 pub(crate) link_manager: LinkManager,
600 pub(crate) management_config: crate::management::ManagementConfig,
602 pub(crate) last_management_announce: f64,
604 pub(crate) initial_announce_sent: bool,
606 pub(crate) known_destinations: HashMap<[u8; 16], KnownDestinationState>,
608 pub(crate) ratchet_store: Option<Arc<dyn crate::storage::RatchetStore>>,
610 pub(crate) known_destinations_ttl: f64,
612 pub(crate) known_destinations_max_entries: usize,
614 pub(crate) rate_limiter_ttl_secs: f64,
616 pub(crate) path_request_dest: [u8; 16],
618 pub(crate) proof_strategies: HashMap<
621 [u8; 16],
622 (
623 rns_core::types::ProofStrategy,
624 Option<rns_crypto::identity::Identity>,
625 ),
626 >,
627 pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
629 pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
631 pub(crate) local_destinations: HashMap<[u8; 16], u8>,
633 shared_announces: HashMap<[u8; 16], SharedAnnounceRecord>,
635 shared_reconnect_pending: HashMap<InterfaceId, bool>,
637 pub(crate) holepunch_manager: HolePunchManager,
639 pub(crate) event_tx: crate::event::EventSender,
641 pub(crate) interface_writer_queue_capacity: usize,
643 pub(crate) announce_rate_defaults: AnnounceRateDefaults,
645 pub(crate) ingress_control_defaults: rns_core::transport::types::IngressControlConfig,
647 pub(crate) tick_interval_ms: Arc<AtomicU64>,
649 #[cfg(feature = "iface-backbone")]
651 pub(crate) backbone_runtime: HashMap<String, BackboneRuntimeConfigHandle>,
652 #[cfg(feature = "iface-backbone")]
654 pub(crate) backbone_peer_state: HashMap<String, BackbonePeerStateHandle>,
655 #[cfg(feature = "iface-backbone")]
657 pub(crate) backbone_client_runtime: HashMap<String, BackboneClientRuntimeConfigHandle>,
658 #[cfg(feature = "iface-backbone")]
660 pub(crate) backbone_discovery_runtime: HashMap<String, BackboneDiscoveryRuntimeHandle>,
661 #[cfg(feature = "iface-backbone")]
663 backbone_peer_pool: Option<BackbonePeerPool>,
664 #[cfg(feature = "iface-backbone")]
666 pub(crate) next_dynamic_interface_id: Arc<AtomicU64>,
667 #[cfg(feature = "iface-tcp")]
669 pub(crate) tcp_server_runtime: HashMap<String, TcpServerRuntimeConfigHandle>,
670 #[cfg(feature = "iface-tcp")]
672 pub(crate) tcp_client_runtime: HashMap<String, TcpClientRuntimeConfigHandle>,
673 #[cfg(feature = "iface-tcp")]
675 pub(crate) tcp_server_discovery_runtime: HashMap<String, TcpServerDiscoveryRuntimeHandle>,
676 #[cfg(feature = "iface-udp")]
678 pub(crate) udp_runtime: HashMap<String, UdpRuntimeConfigHandle>,
679 #[cfg(feature = "iface-auto")]
681 pub(crate) auto_runtime: HashMap<String, AutoRuntimeConfigHandle>,
682 #[cfg(feature = "iface-i2p")]
684 pub(crate) i2p_runtime: HashMap<String, I2pRuntimeConfigHandle>,
685 #[cfg(feature = "iface-pipe")]
687 pub(crate) pipe_runtime: HashMap<String, PipeRuntimeConfigHandle>,
688 #[cfg(feature = "iface-rnode")]
690 pub(crate) rnode_runtime: HashMap<String, RNodeRuntimeConfigHandle>,
691 pub(crate) interface_runtime_defaults:
693 HashMap<String, rns_core::transport::types::InterfaceInfo>,
694 pub(crate) interface_ifac_runtime: HashMap<String, IfacRuntimeConfig>,
696 pub(crate) interface_ifac_runtime_defaults: HashMap<String, IfacRuntimeConfig>,
698 pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
700 pub(crate) discovery_required_value: u8,
702 pub(crate) discovery_name_hash: [u8; 10],
704 pub(crate) probe_responder_hash: Option<[u8; 16]>,
706 pub(crate) discover_interfaces: bool,
708 pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
710 pub(crate) announce_verify_queue: Arc<Mutex<AnnounceVerifyQueue>>,
712 pub(crate) async_announce_verification: bool,
714 pub(crate) discovery_cleanup_counter: u32,
716 pub(crate) discovery_cleanup_interval_ticks: u32,
718 pub(crate) memory_stats_counter: u32,
720 pub(crate) cache_cleanup_counter: u32,
722 pub(crate) announce_cache_cleanup_counter: u32,
724 pub(crate) known_destinations_cleanup_interval_ticks: u32,
726 pub(crate) known_destinations_cap_evict_count: usize,
728 pub(crate) announce_cache_cleanup_interval_ticks: u32,
730 pub(crate) cache_cleanup_active_hashes: Option<Vec<[u8; 32]>>,
732 pub(crate) cache_cleanup_entries: Option<std::fs::ReadDir>,
734 pub(crate) cache_cleanup_removed: usize,
736 pub(crate) announce_cache_cleanup_batch_size: usize,
738 pub(crate) management_announce_interval_secs: f64,
740 pub(crate) runtime_config_defaults: RuntimeConfigDefaults,
742 #[cfg(feature = "hooks")]
744 pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
745 #[cfg(feature = "hooks")]
747 pub(crate) hook_manager: Option<HookManager>,
748 #[cfg(feature = "hooks")]
749 pub(crate) provider_bridge: Option<ProviderBridge>,
750}
751
752impl Driver {
753 pub fn new(
755 config: TransportConfig,
756 rx: EventReceiver,
757 tx: crate::event::EventSender,
758 callbacks: Box<dyn Callbacks>,
759 ) -> Self {
760 let announce_queue_max_entries = config.announce_queue_max_entries;
761 let tunnel_synth_dest = rns_core::destination::destination_hash(
762 "rnstransport",
763 &["tunnel", "synthesize"],
764 None,
765 );
766 let path_request_dest =
767 rns_core::destination::destination_hash("rnstransport", &["path", "request"], None);
768 let discovery_name_hash = crate::discovery::discovery_name_hash();
769 let mut engine = TransportEngine::new(config);
770 engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
771 engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
773 let mut local_destinations = HashMap::new();
776 local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
777 local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
778 let runtime_config_defaults = RuntimeConfigDefaults {
779 tick_interval_ms: DEFAULT_TICK_INTERVAL_MS,
780 known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
781 rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
782 known_destinations_cleanup_interval_ticks:
783 DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS,
784 announce_cache_cleanup_interval_ticks: DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS,
785 announce_cache_cleanup_batch_size: DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE,
786 discovery_cleanup_interval_ticks: DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS,
787 management_announce_interval_secs: DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS,
788 direct_connect_policy: crate::event::HolePunchPolicy::default(),
789 #[cfg(feature = "hooks")]
790 provider_queue_max_events: crate::provider_bridge::ProviderBridgeConfig::default()
791 .queue_max_events,
792 #[cfg(feature = "hooks")]
793 provider_queue_max_bytes: crate::provider_bridge::ProviderBridgeConfig::default()
794 .queue_max_bytes,
795 };
796 Driver {
797 engine,
798 interfaces: HashMap::new(),
799 rng: OsRng,
800 rx,
801 callbacks,
802 started: time::now(),
803 lifecycle_state: LifecycleState::Active,
804 drain_started_at: None,
805 drain_deadline: None,
806 listener_controls: Vec::new(),
807 announce_cache: None,
808 tunnel_synth_dest,
809 transport_identity: None,
810 link_manager: LinkManager::new(),
811 management_config: Default::default(),
812 last_management_announce: 0.0,
813 initial_announce_sent: false,
814 known_destinations: HashMap::new(),
815 ratchet_store: None,
816 known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
817 known_destinations_max_entries: DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES,
818 rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
819 path_request_dest,
820 proof_strategies: HashMap::new(),
821 sent_packets: HashMap::new(),
822 completed_proofs: HashMap::new(),
823 local_destinations,
824 shared_announces: HashMap::new(),
825 shared_reconnect_pending: HashMap::new(),
826 holepunch_manager: HolePunchManager::new(
827 vec![],
828 rns_core::holepunch::ProbeProtocol::Rnsp,
829 None,
830 ),
831 event_tx: tx,
832 interface_writer_queue_capacity: crate::interface::DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY,
833 announce_rate_defaults: AnnounceRateDefaults::default(),
834 ingress_control_defaults: rns_core::transport::types::IngressControlConfig::enabled(),
835 tick_interval_ms: Arc::new(AtomicU64::new(DEFAULT_TICK_INTERVAL_MS)),
836 #[cfg(feature = "iface-backbone")]
837 backbone_runtime: HashMap::new(),
838 #[cfg(feature = "iface-backbone")]
839 backbone_peer_state: HashMap::new(),
840 #[cfg(feature = "iface-backbone")]
841 backbone_client_runtime: HashMap::new(),
842 #[cfg(feature = "iface-backbone")]
843 backbone_discovery_runtime: HashMap::new(),
844 #[cfg(feature = "iface-backbone")]
845 backbone_peer_pool: None,
846 #[cfg(feature = "iface-backbone")]
847 next_dynamic_interface_id: Arc::new(AtomicU64::new(10000)),
848 #[cfg(feature = "iface-tcp")]
849 tcp_server_runtime: HashMap::new(),
850 #[cfg(feature = "iface-tcp")]
851 tcp_client_runtime: HashMap::new(),
852 #[cfg(feature = "iface-tcp")]
853 tcp_server_discovery_runtime: HashMap::new(),
854 #[cfg(feature = "iface-udp")]
855 udp_runtime: HashMap::new(),
856 #[cfg(feature = "iface-auto")]
857 auto_runtime: HashMap::new(),
858 #[cfg(feature = "iface-i2p")]
859 i2p_runtime: HashMap::new(),
860 #[cfg(feature = "iface-pipe")]
861 pipe_runtime: HashMap::new(),
862 #[cfg(feature = "iface-rnode")]
863 rnode_runtime: HashMap::new(),
864 interface_runtime_defaults: HashMap::new(),
865 interface_ifac_runtime: HashMap::new(),
866 interface_ifac_runtime_defaults: HashMap::new(),
867 discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
868 std::env::temp_dir().join("rns-discovered-interfaces"),
869 ),
870 discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
871 discovery_name_hash,
872 probe_responder_hash: None,
873 discover_interfaces: false,
874 interface_announcer: None,
875 announce_verify_queue: Arc::new(Mutex::new(AnnounceVerifyQueue::new(
876 announce_queue_max_entries,
877 ))),
878 async_announce_verification: false,
879 discovery_cleanup_counter: 0,
880 discovery_cleanup_interval_ticks: runtime_config_defaults
881 .discovery_cleanup_interval_ticks,
882 memory_stats_counter: 0,
883 cache_cleanup_counter: 0,
884 announce_cache_cleanup_counter: 0,
885 known_destinations_cleanup_interval_ticks: runtime_config_defaults
886 .known_destinations_cleanup_interval_ticks,
887 known_destinations_cap_evict_count: 0,
888 announce_cache_cleanup_interval_ticks: runtime_config_defaults
889 .announce_cache_cleanup_interval_ticks,
890 cache_cleanup_active_hashes: None,
891 cache_cleanup_entries: None,
892 cache_cleanup_removed: 0,
893 announce_cache_cleanup_batch_size: runtime_config_defaults
894 .announce_cache_cleanup_batch_size,
895 management_announce_interval_secs: runtime_config_defaults
896 .management_announce_interval_secs,
897 runtime_config_defaults,
898 #[cfg(feature = "hooks")]
899 hook_slots: create_hook_slots(),
900 #[cfg(feature = "hooks")]
901 hook_manager: HookManager::new().ok(),
902 #[cfg(feature = "hooks")]
903 provider_bridge: None,
904 }
905 }
906
907 pub fn set_announce_verify_queue_config(
908 &mut self,
909 max_entries: usize,
910 max_bytes: usize,
911 max_stale_secs: f64,
912 overflow_policy: OverflowPolicy,
913 ) {
914 self.announce_verify_queue = Arc::new(Mutex::new(AnnounceVerifyQueue::with_limits(
915 max_entries,
916 max_bytes,
917 max_stale_secs,
918 overflow_policy,
919 )));
920 }
921
922 pub fn set_announce_rate_defaults(&mut self, defaults: AnnounceRateDefaults) {
923 self.announce_rate_defaults = defaults;
924 }
925
926 pub fn set_ingress_control_defaults(
927 &mut self,
928 defaults: rns_core::transport::types::IngressControlConfig,
929 ) {
930 self.ingress_control_defaults = defaults;
931 }
932
933 pub(crate) fn apply_announce_rate_defaults(
934 &self,
935 info: &mut rns_core::transport::types::InterfaceInfo,
936 ) {
937 if !self.engine.transport_enabled() || info.announce_rate_target.is_some() {
938 return;
939 }
940
941 let Some(target) = self.announce_rate_defaults.target else {
942 return;
943 };
944
945 info.announce_rate_target = Some(target);
946 if info.announce_rate_grace == 0 {
947 info.announce_rate_grace = self.announce_rate_defaults.grace;
948 }
949 if info.announce_rate_penalty == 0.0 {
950 info.announce_rate_penalty = self.announce_rate_defaults.penalty;
951 }
952 }
953
954 pub(crate) fn apply_ingress_control_defaults(
955 &self,
956 info: &mut rns_core::transport::types::InterfaceInfo,
957 ) {
958 if !self.engine.transport_enabled() {
959 return;
960 }
961
962 let baseline = if info.ingress_control.enabled {
963 rns_core::transport::types::IngressControlConfig::enabled()
964 } else {
965 rns_core::transport::types::IngressControlConfig::disabled()
966 };
967 let defaults = self.ingress_control_defaults;
968 let ic = &mut info.ingress_control;
969
970 if ic.max_held_announces == baseline.max_held_announces {
971 ic.max_held_announces = defaults.max_held_announces;
972 }
973 if ic.burst_hold == baseline.burst_hold {
974 ic.burst_hold = defaults.burst_hold;
975 }
976 if ic.burst_freq_new == baseline.burst_freq_new {
977 ic.burst_freq_new = defaults.burst_freq_new;
978 }
979 if ic.burst_freq == baseline.burst_freq {
980 ic.burst_freq = defaults.burst_freq;
981 }
982 if ic.pr_burst_freq_new == baseline.pr_burst_freq_new {
983 ic.pr_burst_freq_new = defaults.pr_burst_freq_new;
984 }
985 if ic.pr_burst_freq == baseline.pr_burst_freq {
986 ic.pr_burst_freq = defaults.pr_burst_freq;
987 }
988 if ic.new_time == baseline.new_time {
989 ic.new_time = defaults.new_time;
990 }
991 if ic.burst_penalty == baseline.burst_penalty {
992 ic.burst_penalty = defaults.burst_penalty;
993 }
994 if ic.held_release_interval == baseline.held_release_interval {
995 ic.held_release_interval = defaults.held_release_interval;
996 }
997 if ic.egress_enabled == baseline.egress_enabled {
998 ic.egress_enabled = defaults.egress_enabled;
999 }
1000 if ic.egress_pr_freq == baseline.egress_pr_freq {
1001 ic.egress_pr_freq = defaults.egress_pr_freq;
1002 }
1003 }
1004
1005 fn wrap_interface_writer(
1006 &self,
1007 interface_id: InterfaceId,
1008 interface_name: &str,
1009 writer: Box<dyn crate::interface::Writer>,
1010 ) -> (
1011 Box<dyn crate::interface::Writer>,
1012 crate::interface::AsyncWriterMetrics,
1013 ) {
1014 crate::interface::wrap_async_writer(
1015 writer,
1016 interface_id,
1017 interface_name,
1018 self.event_tx.clone(),
1019 self.interface_writer_queue_capacity,
1020 )
1021 }
1022
1023 #[cfg(feature = "hooks")]
1024 fn provider_events_enabled(&self) -> bool {
1025 self.provider_bridge.is_some()
1026 }
1027
1028 #[cfg(feature = "hooks")]
1029 fn run_backbone_peer_hook(
1030 &mut self,
1031 attach_point: &str,
1032 point: HookPoint,
1033 event: &BackbonePeerHookEvent,
1034 ) {
1035 let ctx = backbone_peer_hook_context(event);
1036 let now = time::now();
1037 let engine_ref = EngineRef {
1038 engine: &self.engine,
1039 interfaces: &self.interfaces,
1040 link_manager: &self.link_manager,
1041 now,
1042 };
1043 let provider_events_enabled = self.provider_events_enabled();
1044 if let Some(ref e) = run_hook_inner(
1045 &mut self.hook_slots[point as usize].programs,
1046 &self.hook_manager,
1047 &engine_ref,
1048 &ctx,
1049 now,
1050 provider_events_enabled,
1051 ) {
1052 self.forward_hook_side_effects(attach_point, e);
1053 }
1054 }
1055
1056 #[cfg(feature = "iface-backbone")]
1057 fn make_discoverable_interface(
1058 runtime: &BackboneDiscoveryRuntimeHandle,
1059 ) -> crate::discovery::DiscoverableInterface {
1060 crate::discovery::DiscoverableInterface {
1061 interface_name: runtime.interface_name.clone(),
1062 config: runtime.current.config.clone(),
1063 transport_enabled: runtime.current.transport_enabled,
1064 ifac_netname: runtime.current.ifac_netname.clone(),
1065 ifac_netkey: runtime.current.ifac_netkey.clone(),
1066 }
1067 }
1068
1069 #[cfg(feature = "iface-backbone")]
1070 fn sync_backbone_discovery_runtime(
1071 &mut self,
1072 interface_name: &str,
1073 ) -> Result<(), RuntimeConfigError> {
1074 let handle = self
1075 .backbone_discovery_runtime
1076 .get(interface_name)
1077 .ok_or(RuntimeConfigError {
1078 code: RuntimeConfigErrorCode::NotFound,
1079 message: format!("backbone interface '{}' not found", interface_name),
1080 })?
1081 .clone();
1082
1083 if handle.current.discoverable {
1084 let iface = Self::make_discoverable_interface(&handle);
1085 if let Some(announcer) = self.interface_announcer.as_mut() {
1086 announcer.upsert_interface(iface);
1087 } else if let Some(identity) = self.transport_identity.as_ref() {
1088 self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
1089 *identity.hash(),
1090 vec![iface],
1091 ));
1092 }
1093 } else if let Some(announcer) = self.interface_announcer.as_mut() {
1094 announcer.remove_interface(interface_name);
1095 if announcer.is_empty() {
1096 self.interface_announcer = None;
1097 }
1098 }
1099
1100 Ok(())
1101 }
1102
1103 #[cfg(feature = "iface-tcp")]
1104 fn make_tcp_server_discoverable_interface(
1105 runtime: &TcpServerDiscoveryRuntimeHandle,
1106 ) -> crate::discovery::DiscoverableInterface {
1107 crate::discovery::DiscoverableInterface {
1108 interface_name: runtime.interface_name.clone(),
1109 config: runtime.current.config.clone(),
1110 transport_enabled: runtime.current.transport_enabled,
1111 ifac_netname: runtime.current.ifac_netname.clone(),
1112 ifac_netkey: runtime.current.ifac_netkey.clone(),
1113 }
1114 }
1115
1116 #[cfg(feature = "iface-tcp")]
1117 fn sync_tcp_server_discovery_runtime(
1118 &mut self,
1119 interface_name: &str,
1120 ) -> Result<(), RuntimeConfigError> {
1121 let handle = self
1122 .tcp_server_discovery_runtime
1123 .get(interface_name)
1124 .ok_or(RuntimeConfigError {
1125 code: RuntimeConfigErrorCode::NotFound,
1126 message: format!("tcp server interface '{}' not found", interface_name),
1127 })?
1128 .clone();
1129
1130 if handle.current.discoverable {
1131 let iface = Self::make_tcp_server_discoverable_interface(&handle);
1132 if let Some(announcer) = self.interface_announcer.as_mut() {
1133 announcer.upsert_interface(iface);
1134 } else if let Some(identity) = self.transport_identity.as_ref() {
1135 self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
1136 *identity.hash(),
1137 vec![iface],
1138 ));
1139 }
1140 } else if let Some(announcer) = self.interface_announcer.as_mut() {
1141 announcer.remove_interface(interface_name);
1142 if announcer.is_empty() {
1143 self.interface_announcer = None;
1144 }
1145 }
1146
1147 Ok(())
1148 }
1149
1150 #[cfg(feature = "hooks")]
1151 fn update_hook_program<F>(
1152 &mut self,
1153 name: &str,
1154 attach_point: &str,
1155 mut update: F,
1156 ) -> Result<(), String>
1157 where
1158 F: FnMut(&mut rns_hooks::LoadedProgram),
1159 {
1160 let point_idx = crate::config::parse_hook_point(attach_point)
1161 .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1162 let program = self.hook_slots[point_idx]
1163 .programs
1164 .iter_mut()
1165 .find(|program| program.name == name)
1166 .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
1167 update(program);
1168 Ok(())
1169 }
1170
1171 pub(crate) fn set_tick_interval_handle(&mut self, tick_interval_ms: Arc<AtomicU64>) {
1172 self.tick_interval_ms = tick_interval_ms;
1173 }
1174
1175 pub(crate) fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
1176 self.engine.set_packet_hashlist_max_entries(max_entries);
1177 }
1178
1179 #[cfg(feature = "hooks")]
1180 fn forward_hook_side_effects(&mut self, attach_point: &str, exec: &rns_hooks::ExecuteResult) {
1181 if !exec.injected_actions.is_empty() {
1182 self.dispatch_all(convert_injected_actions(exec.injected_actions.clone()));
1183 }
1184 if let Some(ref bridge) = self.provider_bridge {
1185 for event in &exec.provider_events {
1186 bridge.emit_event(
1187 attach_point,
1188 event.hook_name.clone(),
1189 event.payload_type.clone(),
1190 event.payload.clone(),
1191 );
1192 }
1193 }
1194 }
1195
1196 #[cfg(feature = "hooks")]
1197 fn collect_hook_side_effects(
1198 &mut self,
1199 attach_point: &str,
1200 exec: &rns_hooks::ExecuteResult,
1201 out: &mut Vec<TransportAction>,
1202 ) {
1203 if !exec.injected_actions.is_empty() {
1204 out.extend(convert_injected_actions(exec.injected_actions.clone()));
1205 }
1206 if let Some(ref bridge) = self.provider_bridge {
1207 for event in &exec.provider_events {
1208 bridge.emit_event(
1209 attach_point,
1210 event.hook_name.clone(),
1211 event.payload_type.clone(),
1212 event.payload.clone(),
1213 );
1214 }
1215 }
1216 }
1217
1218 pub fn set_probe_config(
1220 &mut self,
1221 addrs: Vec<std::net::SocketAddr>,
1222 protocol: rns_core::holepunch::ProbeProtocol,
1223 device: Option<String>,
1224 ) {
1225 self.holepunch_manager = HolePunchManager::new(addrs, protocol, device);
1226 }
1227}