Skip to main content

rns_net/
driver.rs

1//! Driver loop: receives events, drives the TransportEngine, dispatches actions.
2
3use std::collections::HashMap;
4use std::io;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8
9use rns_core::packet::RawPacket;
10use rns_core::transport::announce_verify_queue::{AnnounceVerifyQueue, OverflowPolicy};
11use rns_core::transport::tables::PathEntry;
12use rns_core::transport::types::{InterfaceId, TransportAction, TransportConfig};
13use rns_core::transport::TransportEngine;
14use rns_crypto::{OsRng, Rng};
15
16#[cfg(feature = "hooks")]
17use crate::provider_bridge::ProviderBridge;
18#[cfg(feature = "hooks")]
19use rns_hooks::{create_hook_slots, EngineAccess, HookContext, HookManager, HookPoint, HookSlot};
20
21#[cfg(feature = "hooks")]
22use crate::event::BackbonePeerHookEvent;
23use crate::event::{
24    BackbonePeerPoolMemberStatus, BackbonePeerPoolStatus, BackbonePeerStateEntry, BlackholeInfo,
25    DrainStatus, Event, EventReceiver, InterfaceStatsResponse, KnownDestinationEntry,
26    LifecycleState, LocalDestinationEntry, NextHopResponse, PathTableEntry, QueryRequest,
27    QueryResponse, RateTableEntry, RuntimeConfigApplyMode, RuntimeConfigEntry, RuntimeConfigError,
28    RuntimeConfigErrorCode, RuntimeConfigSource, RuntimeConfigValue, SingleInterfaceStat,
29};
30use crate::holepunch::orchestrator::{HolePunchManager, HolePunchManagerAction};
31use crate::ifac;
32#[cfg(all(feature = "iface-auto", test))]
33use crate::interface::auto::AutoRuntime;
34#[cfg(feature = "iface-auto")]
35use crate::interface::auto::AutoRuntimeConfigHandle;
36#[cfg(feature = "iface-backbone")]
37use crate::interface::backbone::{
38    start_client, BackboneClientConfig, BackboneClientRuntime, BackboneClientRuntimeConfigHandle,
39    BackbonePeerStateHandle, BackboneRuntimeConfigHandle,
40};
41#[cfg(all(feature = "iface-backbone", target_os = "linux", test))]
42use crate::interface::backbone::{BackboneAbuseConfig, BackboneServerRuntime};
43#[cfg(all(feature = "iface-i2p", test))]
44use crate::interface::i2p::I2pRuntime;
45#[cfg(feature = "iface-i2p")]
46use crate::interface::i2p::I2pRuntimeConfigHandle;
47#[cfg(all(feature = "iface-pipe", test))]
48use crate::interface::pipe::PipeRuntime;
49#[cfg(feature = "iface-pipe")]
50use crate::interface::pipe::PipeRuntimeConfigHandle;
51#[cfg(feature = "iface-rnode")]
52use crate::interface::rnode::{
53    validate_sub_config, RNodeRuntime, RNodeRuntimeConfigHandle, RNodeSubConfig,
54};
55#[cfg(feature = "iface-tcp")]
56use crate::interface::tcp::TcpClientRuntimeConfigHandle;
57#[cfg(all(feature = "iface-tcp", test))]
58use crate::interface::tcp_server::TcpServerRuntime;
59#[cfg(feature = "iface-tcp")]
60use crate::interface::tcp_server::TcpServerRuntimeConfigHandle;
61#[cfg(all(feature = "iface-udp", test))]
62use crate::interface::udp::UdpRuntime;
63#[cfg(feature = "iface-udp")]
64use crate::interface::udp::UdpRuntimeConfigHandle;
65use crate::interface::{InterfaceEntry, InterfaceStats};
66use crate::link_manager::{LinkManager, LinkManagerAction};
67use crate::time;
68
69const DEFAULT_KNOWN_DESTINATIONS_TTL: f64 = 48.0 * 60.0 * 60.0;
70const DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES: usize = 8192;
71const DEFAULT_RATE_LIMITER_TTL_SECS: f64 = 48.0 * 60.0 * 60.0;
72const DEFAULT_TICK_INTERVAL_MS: u64 = 1000;
73const DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS: u32 = 3600;
74const DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS: u32 = 3600;
75const DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE: usize = 10_000;
76const DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS: u32 = 3600;
77const DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS: f64 = 300.0;
78const DEFAULT_LINK_TEARDOWN_FLUSH: Duration = Duration::from_millis(150);
79const SEND_RETRY_BACKOFF_MIN: Duration = Duration::from_millis(25);
80const SEND_RETRY_BACKOFF_MAX: Duration = Duration::from_millis(1000);
81
82mod dispatch;
83mod events;
84mod lifecycle;
85mod queries;
86mod runtime_config;
87
88#[cfg(test)]
89mod tests;
90
91fn inject_transport_header(raw: &[u8], next_hop: &[u8; 16]) -> Vec<u8> {
92    if raw.len() < 18 {
93        return raw.to_vec();
94    }
95
96    let new_flags = (rns_core::constants::HEADER_2 << 6)
97        | (rns_core::constants::TRANSPORT_TRANSPORT << 4)
98        | (raw[0] & 0x0F);
99
100    let mut new_raw = Vec::with_capacity(raw.len() + 16);
101    new_raw.push(new_flags);
102    new_raw.push(raw[1]);
103    new_raw.extend_from_slice(next_hop);
104    new_raw.extend_from_slice(&raw[2..]);
105    new_raw
106}
107
108fn recover_mutex_guard<'a, T>(mutex: &'a Mutex<T>, label: &str) -> std::sync::MutexGuard<'a, T> {
109    match mutex.lock() {
110        Ok(guard) => guard,
111        Err(poisoned) => {
112            log::error!("recovering from poisoned mutex: {}", label);
113            poisoned.into_inner()
114        }
115    }
116}
117
118#[derive(Debug, Clone, Copy)]
119pub(crate) struct RuntimeConfigDefaults {
120    pub(crate) tick_interval_ms: u64,
121    pub(crate) known_destinations_ttl: f64,
122    pub(crate) rate_limiter_ttl_secs: f64,
123    pub(crate) known_destinations_cleanup_interval_ticks: u32,
124    pub(crate) announce_cache_cleanup_interval_ticks: u32,
125    pub(crate) announce_cache_cleanup_batch_size: usize,
126    pub(crate) discovery_cleanup_interval_ticks: u32,
127    pub(crate) management_announce_interval_secs: f64,
128    pub(crate) direct_connect_policy: crate::event::HolePunchPolicy,
129    #[cfg(feature = "hooks")]
130    pub(crate) provider_queue_max_events: usize,
131    #[cfg(feature = "hooks")]
132    pub(crate) provider_queue_max_bytes: usize,
133}
134
135#[cfg(feature = "iface-backbone")]
136#[derive(Debug, Clone)]
137pub(crate) struct BackboneDiscoveryRuntime {
138    pub(crate) discoverable: bool,
139    pub(crate) config: crate::discovery::DiscoveryConfig,
140    pub(crate) transport_enabled: bool,
141    pub(crate) ifac_netname: Option<String>,
142    pub(crate) ifac_netkey: Option<String>,
143}
144
145#[cfg(feature = "iface-backbone")]
146#[derive(Debug, Clone)]
147pub(crate) struct BackboneDiscoveryRuntimeHandle {
148    pub(crate) interface_name: String,
149    pub(crate) current: BackboneDiscoveryRuntime,
150    pub(crate) startup: BackboneDiscoveryRuntime,
151}
152
153#[cfg(feature = "iface-tcp")]
154#[derive(Debug, Clone)]
155pub(crate) struct TcpServerDiscoveryRuntime {
156    pub(crate) discoverable: bool,
157    pub(crate) config: crate::discovery::DiscoveryConfig,
158    pub(crate) transport_enabled: bool,
159    pub(crate) ifac_netname: Option<String>,
160    pub(crate) ifac_netkey: Option<String>,
161}
162
163#[cfg(feature = "iface-tcp")]
164#[derive(Debug, Clone)]
165pub(crate) struct TcpServerDiscoveryRuntimeHandle {
166    pub(crate) interface_name: String,
167    pub(crate) current: TcpServerDiscoveryRuntime,
168    pub(crate) startup: TcpServerDiscoveryRuntime,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub(crate) struct IfacRuntimeConfig {
173    pub(crate) netname: Option<String>,
174    pub(crate) netkey: Option<String>,
175    pub(crate) size: usize,
176}
177
178#[cfg(feature = "iface-backbone")]
179impl BackboneDiscoveryRuntimeHandle {
180    pub(crate) fn from_parts(
181        interface_name: String,
182        startup_config: crate::discovery::DiscoveryConfig,
183        transport_enabled: bool,
184        ifac_netname: Option<String>,
185        ifac_netkey: Option<String>,
186        discoverable: bool,
187    ) -> Self {
188        let startup = BackboneDiscoveryRuntime {
189            discoverable,
190            config: startup_config,
191            transport_enabled,
192            ifac_netname,
193            ifac_netkey,
194        };
195        Self {
196            interface_name,
197            current: startup.clone(),
198            startup,
199        }
200    }
201}
202
203#[cfg(feature = "iface-tcp")]
204impl TcpServerDiscoveryRuntimeHandle {
205    pub(crate) fn from_parts(
206        interface_name: String,
207        startup_config: crate::discovery::DiscoveryConfig,
208        transport_enabled: bool,
209        ifac_netname: Option<String>,
210        ifac_netkey: Option<String>,
211        discoverable: bool,
212    ) -> Self {
213        let startup = TcpServerDiscoveryRuntime {
214            discoverable,
215            config: startup_config,
216            transport_enabled,
217            ifac_netname,
218            ifac_netkey,
219        };
220        Self {
221            interface_name,
222            current: startup.clone(),
223            startup,
224        }
225    }
226}
227
228impl IfacRuntimeConfig {
229    pub(crate) fn from_parts(netname: Option<String>, netkey: Option<String>, size: usize) -> Self {
230        Self {
231            netname,
232            netkey,
233            size,
234        }
235    }
236}
237
238#[cfg(feature = "iface-backbone")]
239#[derive(Debug, Clone)]
240pub struct BackbonePeerPoolSettings {
241    pub max_connected: usize,
242    pub failure_threshold: usize,
243    pub failure_window: Duration,
244    pub cooldown: Duration,
245}
246
247#[cfg(feature = "iface-backbone")]
248pub(crate) struct BackbonePeerPoolCandidateConfig {
249    pub(crate) client: BackboneClientConfig,
250    pub(crate) mode: u8,
251    pub(crate) ingress_control: rns_core::transport::types::IngressControlConfig,
252    pub(crate) ifac_runtime: IfacRuntimeConfig,
253    pub(crate) ifac_enabled: bool,
254    pub(crate) interface_type_name: String,
255}
256
257#[cfg(feature = "iface-backbone")]
258struct BackbonePeerPool {
259    settings: BackbonePeerPoolSettings,
260    candidates: Vec<BackbonePeerPoolCandidate>,
261}
262
263#[cfg(feature = "iface-backbone")]
264struct BackbonePeerPoolCandidate {
265    config: BackbonePeerPoolCandidateConfig,
266    active_id: Option<InterfaceId>,
267    failures: Vec<f64>,
268    retry_after: Option<f64>,
269    cooldown_until: Option<f64>,
270    last_error: Option<String>,
271}
272
273/// Thin wrapper providing `EngineAccess` for a `TransportEngine` + Driver interfaces.
274#[cfg(feature = "hooks")]
275struct EngineRef<'a> {
276    engine: &'a TransportEngine,
277    interfaces: &'a HashMap<InterfaceId, InterfaceEntry>,
278    link_manager: &'a LinkManager,
279    now: f64,
280}
281
282#[cfg(feature = "hooks")]
283impl<'a> EngineAccess for EngineRef<'a> {
284    fn has_path(&self, dest: &[u8; 16]) -> bool {
285        self.engine.has_path(dest)
286    }
287    fn hops_to(&self, dest: &[u8; 16]) -> Option<u8> {
288        self.engine.hops_to(dest)
289    }
290    fn next_hop(&self, dest: &[u8; 16]) -> Option<[u8; 16]> {
291        self.engine.next_hop(dest)
292    }
293    fn is_blackholed(&self, identity: &[u8; 16]) -> bool {
294        self.engine.is_blackholed(identity, self.now)
295    }
296    fn interface_name(&self, id: u64) -> Option<String> {
297        self.interfaces
298            .get(&InterfaceId(id))
299            .map(|e| e.info.name.clone())
300    }
301    fn interface_mode(&self, id: u64) -> Option<u8> {
302        self.interfaces.get(&InterfaceId(id)).map(|e| e.info.mode)
303    }
304    fn identity_hash(&self) -> Option<[u8; 16]> {
305        self.engine.identity_hash().copied()
306    }
307    fn announce_rate(&self, id: u64) -> Option<i32> {
308        self.interfaces
309            .get(&InterfaceId(id))
310            .map(|e| (e.stats.outgoing_announce_freq() * 1000.0) as i32)
311    }
312    fn link_state(&self, link_hash: &[u8; 16]) -> Option<u8> {
313        use rns_core::link::types::LinkState;
314        self.link_manager.link_state(link_hash).map(|s| match s {
315            LinkState::Pending => 0,
316            LinkState::Handshake => 1,
317            LinkState::Active => 2,
318            LinkState::Stale => 3,
319            LinkState::Closed => 4,
320        })
321    }
322}
323
324/// Extract the 16-byte destination hash from a raw packet header.
325///
326/// HEADER_1 (raw[0] & 0x40 == 0): dest at bytes 2..18
327/// HEADER_2 (raw[0] & 0x40 != 0): dest at bytes 18..34 (after transport ID)
328#[cfg(any(test, feature = "hooks"))]
329fn extract_dest_hash(raw: &[u8]) -> [u8; 16] {
330    let mut dest = [0u8; 16];
331    if raw.is_empty() {
332        return dest;
333    }
334    let is_header2 = raw[0] & 0x40 != 0;
335    let start = if is_header2 { 18 } else { 2 };
336    let end = start + 16;
337    if raw.len() >= end {
338        dest.copy_from_slice(&raw[start..end]);
339    }
340    dest
341}
342
343/// Execute a hook chain on disjoint Driver fields (avoids &mut self borrow conflict).
344#[cfg(feature = "hooks")]
345fn run_hook_inner(
346    programs: &mut [rns_hooks::LoadedProgram],
347    hook_manager: &Option<HookManager>,
348    engine_access: &dyn EngineAccess,
349    ctx: &HookContext,
350    now: f64,
351    provider_events_enabled: bool,
352) -> Option<rns_hooks::ExecuteResult> {
353    if programs.is_empty() {
354        return None;
355    }
356    let mgr = hook_manager.as_ref()?;
357    mgr.run_chain_with_provider_events(programs, ctx, engine_access, now, provider_events_enabled)
358}
359
360#[cfg(feature = "hooks")]
361fn backbone_peer_hook_context(event: &BackbonePeerHookEvent) -> HookContext<'_> {
362    HookContext::BackbonePeer {
363        server_interface_id: event.server_interface_id.0,
364        peer_interface_id: event.peer_interface_id.map(|id| id.0),
365        peer_ip: event.peer_ip,
366        peer_port: event.peer_port,
367        connected_for: event.connected_for,
368        had_received_data: event.had_received_data,
369        penalty_level: event.penalty_level,
370        blacklist_for: event.blacklist_for,
371    }
372}
373
374/// Convert a Vec of ActionWire into TransportActions for dispatch.
375#[cfg(feature = "hooks")]
376fn convert_injected_actions(actions: Vec<rns_hooks::ActionWire>) -> Vec<TransportAction> {
377    actions
378        .into_iter()
379        .map(|a| {
380            use rns_hooks::ActionWire;
381            match a {
382                ActionWire::SendOnInterface { interface, raw } => {
383                    TransportAction::SendOnInterface {
384                        interface: InterfaceId(interface),
385                        raw: raw.into(),
386                    }
387                }
388                ActionWire::BroadcastOnAllInterfaces {
389                    raw,
390                    exclude,
391                    has_exclude,
392                } => TransportAction::BroadcastOnAllInterfaces {
393                    raw: raw.into(),
394                    exclude: if has_exclude != 0 {
395                        Some(InterfaceId(exclude))
396                    } else {
397                        None
398                    },
399                },
400                ActionWire::DeliverLocal {
401                    destination_hash,
402                    raw,
403                    packet_hash,
404                    receiving_interface,
405                } => TransportAction::DeliverLocal {
406                    destination_hash,
407                    raw: raw.into(),
408                    packet_hash,
409                    receiving_interface: InterfaceId(receiving_interface),
410                },
411                ActionWire::PathUpdated {
412                    destination_hash,
413                    hops,
414                    next_hop,
415                    interface,
416                } => TransportAction::PathUpdated {
417                    destination_hash,
418                    hops,
419                    next_hop,
420                    interface: InterfaceId(interface),
421                },
422                ActionWire::CacheAnnounce { packet_hash, raw } => TransportAction::CacheAnnounce {
423                    packet_hash,
424                    raw: raw.into(),
425                },
426                ActionWire::TunnelEstablished {
427                    tunnel_id,
428                    interface,
429                } => TransportAction::TunnelEstablished {
430                    tunnel_id,
431                    interface: InterfaceId(interface),
432                },
433                ActionWire::TunnelSynthesize {
434                    interface,
435                    data,
436                    dest_hash,
437                } => TransportAction::TunnelSynthesize {
438                    interface: InterfaceId(interface),
439                    data,
440                    dest_hash,
441                },
442                ActionWire::ForwardToLocalClients {
443                    raw,
444                    exclude,
445                    has_exclude,
446                } => TransportAction::ForwardToLocalClients {
447                    raw: raw.into(),
448                    exclude: if has_exclude != 0 {
449                        Some(InterfaceId(exclude))
450                    } else {
451                        None
452                    },
453                },
454                ActionWire::ForwardPlainBroadcast {
455                    raw,
456                    to_local,
457                    exclude,
458                    has_exclude,
459                } => TransportAction::ForwardPlainBroadcast {
460                    raw: raw.into(),
461                    to_local: to_local != 0,
462                    exclude: if has_exclude != 0 {
463                        Some(InterfaceId(exclude))
464                    } else {
465                        None
466                    },
467                },
468                ActionWire::AnnounceReceived {
469                    destination_hash,
470                    identity_hash,
471                    public_key,
472                    name_hash,
473                    random_hash,
474                    app_data,
475                    hops,
476                    receiving_interface,
477                } => TransportAction::AnnounceReceived {
478                    destination_hash,
479                    identity_hash,
480                    public_key,
481                    name_hash,
482                    random_hash,
483                    ratchet: None,
484                    app_data,
485                    hops,
486                    receiving_interface: InterfaceId(receiving_interface),
487                },
488            }
489        })
490        .collect()
491}
492
493/// Infer the interface type string from a dynamic interface's name.
494/// Dynamic interfaces (TCP server clients, backbone peers, auto peers, local server clients)
495/// include their type in the name prefix set at construction.
496fn infer_interface_type(name: &str) -> String {
497    if name.starts_with("TCPServerInterface") {
498        "TCPServerClientInterface".to_string()
499    } else if name.starts_with("BackboneInterface") {
500        "BackboneInterface".to_string()
501    } else if name.starts_with("LocalInterface") {
502        "LocalServerClientInterface".to_string()
503    } else {
504        // AutoInterface peers use "{group_name}:{peer_addr}" format where
505        // group_name is the config section name (typically "AutoInterface" or similar).
506        "AutoInterface".to_string()
507    }
508}
509
510pub use crate::common::callbacks::Callbacks;
511
512#[derive(Clone)]
513struct SharedAnnounceRecord {
514    name_hash: [u8; 10],
515    identity_prv_key: [u8; 64],
516    app_data: Option<Vec<u8>>,
517}
518
519#[derive(Debug, Clone)]
520pub(crate) struct KnownDestinationState {
521    announced: crate::destination::AnnouncedIdentity,
522    was_used: bool,
523    last_used_at: Option<f64>,
524    retained: bool,
525}
526
527/// The driver loop. Owns the engine and all interface entries.
528pub struct Driver {
529    pub(crate) engine: TransportEngine,
530    pub(crate) interfaces: HashMap<InterfaceId, InterfaceEntry>,
531    pub(crate) rng: OsRng,
532    pub(crate) rx: EventReceiver,
533    pub(crate) callbacks: Box<dyn Callbacks>,
534    pub(crate) started: f64,
535    pub(crate) lifecycle_state: LifecycleState,
536    pub(crate) drain_started_at: Option<Instant>,
537    pub(crate) drain_deadline: Option<Instant>,
538    pub(crate) listener_controls: Vec<crate::interface::ListenerControl>,
539    pub(crate) announce_cache: Option<crate::announce_cache::AnnounceCache>,
540    /// Destination hash for rnstransport.tunnel.synthesize (PLAIN).
541    pub(crate) tunnel_synth_dest: [u8; 16],
542    /// Transport identity (optional, needed for tunnel synthesis).
543    pub(crate) transport_identity: Option<rns_crypto::identity::Identity>,
544    /// Link manager: handles link lifecycle, request/response.
545    pub(crate) link_manager: LinkManager,
546    /// Management configuration for ACL checks.
547    pub(crate) management_config: crate::management::ManagementConfig,
548    /// Last time management announces were emitted.
549    pub(crate) last_management_announce: f64,
550    /// Whether initial management announce has been sent (delayed 5s after start).
551    pub(crate) initial_announce_sent: bool,
552    /// Cache of known announced identities and lifecycle state, keyed by destination hash.
553    pub(crate) known_destinations: HashMap<[u8; 16], KnownDestinationState>,
554    /// Store for received remote ratchets, if persistence/use is enabled.
555    pub(crate) ratchet_store: Option<Arc<dyn crate::storage::RatchetStore>>,
556    /// TTL for known destinations without an active path, in seconds.
557    pub(crate) known_destinations_ttl: f64,
558    /// Maximum number of retained known destinations.
559    pub(crate) known_destinations_max_entries: usize,
560    /// TTL for announce rate-limiter entries without an active path, in seconds.
561    pub(crate) rate_limiter_ttl_secs: f64,
562    /// Destination hash for rnstransport.path.request (PLAIN).
563    pub(crate) path_request_dest: [u8; 16],
564    /// Proof strategies per destination hash.
565    /// Maps dest_hash → (strategy, optional signing identity for generating proofs).
566    pub(crate) proof_strategies: HashMap<
567        [u8; 16],
568        (
569            rns_core::types::ProofStrategy,
570            Option<rns_crypto::identity::Identity>,
571        ),
572    >,
573    /// Tracked sent packets for proof matching: packet_hash → (dest_hash, sent_time).
574    pub(crate) sent_packets: HashMap<[u8; 32], ([u8; 16], f64)>,
575    /// Completed proofs for probe polling: packet_hash → (rtt_seconds, received_time).
576    pub(crate) completed_proofs: HashMap<[u8; 32], (f64, f64)>,
577    /// Locally registered destinations: hash → dest_type.
578    pub(crate) local_destinations: HashMap<[u8; 16], u8>,
579    /// Latest explicit SINGLE announces to replay after shared-client reconnect.
580    shared_announces: HashMap<[u8; 16], SharedAnnounceRecord>,
581    /// Shared local interfaces that went down and should replay announces on reconnect.
582    shared_reconnect_pending: HashMap<InterfaceId, bool>,
583    /// Hole-punch manager for direct P2P connections.
584    pub(crate) holepunch_manager: HolePunchManager,
585    /// Event sender for worker threads to send results back to the driver loop.
586    pub(crate) event_tx: crate::event::EventSender,
587    /// Maximum queued outbound frames per interface writer worker.
588    pub(crate) interface_writer_queue_capacity: usize,
589    /// Shared timer interval used by the node timer thread.
590    pub(crate) tick_interval_ms: Arc<AtomicU64>,
591    /// Runtime-config handles for backbone server interfaces, keyed by config name.
592    #[cfg(feature = "iface-backbone")]
593    pub(crate) backbone_runtime: HashMap<String, BackboneRuntimeConfigHandle>,
594    /// Live peer-state handles for backbone server interfaces, keyed by config name.
595    #[cfg(feature = "iface-backbone")]
596    pub(crate) backbone_peer_state: HashMap<String, BackbonePeerStateHandle>,
597    /// Runtime-config handles for backbone client interfaces, keyed by config name.
598    #[cfg(feature = "iface-backbone")]
599    pub(crate) backbone_client_runtime: HashMap<String, BackboneClientRuntimeConfigHandle>,
600    /// Runtime-config state for backbone discovery metadata, keyed by config name.
601    #[cfg(feature = "iface-backbone")]
602    pub(crate) backbone_discovery_runtime: HashMap<String, BackboneDiscoveryRuntimeHandle>,
603    /// Ordered outbound Backbone peer pool, if enabled.
604    #[cfg(feature = "iface-backbone")]
605    backbone_peer_pool: Option<BackbonePeerPool>,
606    /// Runtime-config handles for TCP server interfaces, keyed by config name.
607    #[cfg(feature = "iface-tcp")]
608    pub(crate) tcp_server_runtime: HashMap<String, TcpServerRuntimeConfigHandle>,
609    /// Runtime-config handles for TCP client interfaces, keyed by config name.
610    #[cfg(feature = "iface-tcp")]
611    pub(crate) tcp_client_runtime: HashMap<String, TcpClientRuntimeConfigHandle>,
612    /// Runtime-config state for TCP server discovery metadata, keyed by config name.
613    #[cfg(feature = "iface-tcp")]
614    pub(crate) tcp_server_discovery_runtime: HashMap<String, TcpServerDiscoveryRuntimeHandle>,
615    /// Runtime-config handles for UDP interfaces, keyed by config name.
616    #[cfg(feature = "iface-udp")]
617    pub(crate) udp_runtime: HashMap<String, UdpRuntimeConfigHandle>,
618    /// Runtime-config handles for Auto interfaces, keyed by config name.
619    #[cfg(feature = "iface-auto")]
620    pub(crate) auto_runtime: HashMap<String, AutoRuntimeConfigHandle>,
621    /// Runtime-config handles for I2P interfaces, keyed by config name.
622    #[cfg(feature = "iface-i2p")]
623    pub(crate) i2p_runtime: HashMap<String, I2pRuntimeConfigHandle>,
624    /// Runtime-config handles for Pipe interfaces, keyed by config name.
625    #[cfg(feature = "iface-pipe")]
626    pub(crate) pipe_runtime: HashMap<String, PipeRuntimeConfigHandle>,
627    /// Runtime-config handles for RNode interfaces, keyed by config name.
628    #[cfg(feature = "iface-rnode")]
629    pub(crate) rnode_runtime: HashMap<String, RNodeRuntimeConfigHandle>,
630    /// Startup/default interface metadata for generic cross-cutting runtime config.
631    pub(crate) interface_runtime_defaults:
632        HashMap<String, rns_core::transport::types::InterfaceInfo>,
633    /// Current IFAC runtime config for static interfaces that support IFAC mutation.
634    pub(crate) interface_ifac_runtime: HashMap<String, IfacRuntimeConfig>,
635    /// Startup/default IFAC runtime config for static interfaces.
636    pub(crate) interface_ifac_runtime_defaults: HashMap<String, IfacRuntimeConfig>,
637    /// Storage for discovered interfaces.
638    pub(crate) discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage,
639    /// Required stamp value for accepting discovered interfaces.
640    pub(crate) discovery_required_value: u8,
641    /// Name hash for interface discovery announces ("rnstransport.discovery.interface").
642    pub(crate) discovery_name_hash: [u8; 10],
643    /// Destination hash for the probe responder (if respond_to_probes is enabled).
644    pub(crate) probe_responder_hash: Option<[u8; 16]>,
645    /// Whether interface discovery is enabled.
646    pub(crate) discover_interfaces: bool,
647    /// Announcer for discoverable interfaces (None if nothing to announce).
648    pub(crate) interface_announcer: Option<crate::discovery::InterfaceAnnouncer>,
649    /// Shared async announce verification queue.
650    pub(crate) announce_verify_queue: Arc<Mutex<AnnounceVerifyQueue>>,
651    /// Whether inbound announces should be verified off the driver thread.
652    pub(crate) async_announce_verification: bool,
653    /// Tick counter for periodic discovery cleanup (every ~3600 ticks = ~1 hour).
654    pub(crate) discovery_cleanup_counter: u32,
655    /// Runtime-configurable discovery cleanup interval.
656    pub(crate) discovery_cleanup_interval_ticks: u32,
657    /// Tick counter for periodic MEMSTATS logging (every 300 ticks = ~5 min).
658    pub(crate) memory_stats_counter: u32,
659    /// Tick counter for periodic memory/cache cleanup (every ~3600 ticks = ~1 hour).
660    pub(crate) cache_cleanup_counter: u32,
661    /// Tick counter for incremental announce-cache cleanup scheduling.
662    pub(crate) announce_cache_cleanup_counter: u32,
663    /// Runtime-configurable cleanup interval for known destinations.
664    pub(crate) known_destinations_cleanup_interval_ticks: u32,
665    /// Count of known-destination cap evictions since start.
666    pub(crate) known_destinations_cap_evict_count: usize,
667    /// Runtime-configurable interval for starting announce cache cleanup.
668    pub(crate) announce_cache_cleanup_interval_ticks: u32,
669    /// When set, announce cache cleanup is in progress (contains active packet hashes).
670    pub(crate) cache_cleanup_active_hashes: Option<Vec<[u8; 32]>>,
671    /// Directory iterator for incremental announce cache cleanup.
672    pub(crate) cache_cleanup_entries: Option<std::fs::ReadDir>,
673    /// Running total of files removed during current cache cleanup cycle.
674    pub(crate) cache_cleanup_removed: usize,
675    /// Runtime-configurable announce cache cleanup batch size.
676    pub(crate) announce_cache_cleanup_batch_size: usize,
677    /// Runtime-configurable management announce interval.
678    pub(crate) management_announce_interval_secs: f64,
679    /// Startup/default runtime-config values.
680    pub(crate) runtime_config_defaults: RuntimeConfigDefaults,
681    /// Hook slots for the programmable hook system (one per HookPoint).
682    #[cfg(feature = "hooks")]
683    pub(crate) hook_slots: [HookSlot; HookPoint::COUNT],
684    /// Hook manager. None if initialization failed.
685    #[cfg(feature = "hooks")]
686    pub(crate) hook_manager: Option<HookManager>,
687    #[cfg(feature = "hooks")]
688    pub(crate) provider_bridge: Option<ProviderBridge>,
689}
690
691impl Driver {
692    /// Create a new driver.
693    pub fn new(
694        config: TransportConfig,
695        rx: EventReceiver,
696        tx: crate::event::EventSender,
697        callbacks: Box<dyn Callbacks>,
698    ) -> Self {
699        let announce_queue_max_entries = config.announce_queue_max_entries;
700        let tunnel_synth_dest = rns_core::destination::destination_hash(
701            "rnstransport",
702            &["tunnel", "synthesize"],
703            None,
704        );
705        let path_request_dest =
706            rns_core::destination::destination_hash("rnstransport", &["path", "request"], None);
707        let discovery_name_hash = crate::discovery::discovery_name_hash();
708        let mut engine = TransportEngine::new(config);
709        engine.register_destination(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
710        // Register path request destination so inbound path requests are delivered locally
711        engine.register_destination(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
712        // Note: discovery destination is NOT registered as local — it's a SINGLE destination
713        // whose hash depends on the sender's identity. We match it by name_hash instead.
714        let mut local_destinations = HashMap::new();
715        local_destinations.insert(tunnel_synth_dest, rns_core::constants::DESTINATION_PLAIN);
716        local_destinations.insert(path_request_dest, rns_core::constants::DESTINATION_PLAIN);
717        let runtime_config_defaults = RuntimeConfigDefaults {
718            tick_interval_ms: DEFAULT_TICK_INTERVAL_MS,
719            known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
720            rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
721            known_destinations_cleanup_interval_ticks:
722                DEFAULT_KNOWN_DESTINATIONS_CLEANUP_INTERVAL_TICKS,
723            announce_cache_cleanup_interval_ticks: DEFAULT_ANNOUNCE_CACHE_CLEANUP_INTERVAL_TICKS,
724            announce_cache_cleanup_batch_size: DEFAULT_ANNOUNCE_CACHE_CLEANUP_BATCH_SIZE,
725            discovery_cleanup_interval_ticks: DEFAULT_DISCOVERY_CLEANUP_INTERVAL_TICKS,
726            management_announce_interval_secs: DEFAULT_MANAGEMENT_ANNOUNCE_INTERVAL_SECS,
727            direct_connect_policy: crate::event::HolePunchPolicy::default(),
728            #[cfg(feature = "hooks")]
729            provider_queue_max_events: crate::provider_bridge::ProviderBridgeConfig::default()
730                .queue_max_events,
731            #[cfg(feature = "hooks")]
732            provider_queue_max_bytes: crate::provider_bridge::ProviderBridgeConfig::default()
733                .queue_max_bytes,
734        };
735        Driver {
736            engine,
737            interfaces: HashMap::new(),
738            rng: OsRng,
739            rx,
740            callbacks,
741            started: time::now(),
742            lifecycle_state: LifecycleState::Active,
743            drain_started_at: None,
744            drain_deadline: None,
745            listener_controls: Vec::new(),
746            announce_cache: None,
747            tunnel_synth_dest,
748            transport_identity: None,
749            link_manager: LinkManager::new(),
750            management_config: Default::default(),
751            last_management_announce: 0.0,
752            initial_announce_sent: false,
753            known_destinations: HashMap::new(),
754            ratchet_store: None,
755            known_destinations_ttl: DEFAULT_KNOWN_DESTINATIONS_TTL,
756            known_destinations_max_entries: DEFAULT_KNOWN_DESTINATIONS_MAX_ENTRIES,
757            rate_limiter_ttl_secs: DEFAULT_RATE_LIMITER_TTL_SECS,
758            path_request_dest,
759            proof_strategies: HashMap::new(),
760            sent_packets: HashMap::new(),
761            completed_proofs: HashMap::new(),
762            local_destinations,
763            shared_announces: HashMap::new(),
764            shared_reconnect_pending: HashMap::new(),
765            holepunch_manager: HolePunchManager::new(
766                vec![],
767                rns_core::holepunch::ProbeProtocol::Rnsp,
768                None,
769            ),
770            event_tx: tx,
771            interface_writer_queue_capacity: crate::interface::DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY,
772            tick_interval_ms: Arc::new(AtomicU64::new(DEFAULT_TICK_INTERVAL_MS)),
773            #[cfg(feature = "iface-backbone")]
774            backbone_runtime: HashMap::new(),
775            #[cfg(feature = "iface-backbone")]
776            backbone_peer_state: HashMap::new(),
777            #[cfg(feature = "iface-backbone")]
778            backbone_client_runtime: HashMap::new(),
779            #[cfg(feature = "iface-backbone")]
780            backbone_discovery_runtime: HashMap::new(),
781            #[cfg(feature = "iface-backbone")]
782            backbone_peer_pool: None,
783            #[cfg(feature = "iface-tcp")]
784            tcp_server_runtime: HashMap::new(),
785            #[cfg(feature = "iface-tcp")]
786            tcp_client_runtime: HashMap::new(),
787            #[cfg(feature = "iface-tcp")]
788            tcp_server_discovery_runtime: HashMap::new(),
789            #[cfg(feature = "iface-udp")]
790            udp_runtime: HashMap::new(),
791            #[cfg(feature = "iface-auto")]
792            auto_runtime: HashMap::new(),
793            #[cfg(feature = "iface-i2p")]
794            i2p_runtime: HashMap::new(),
795            #[cfg(feature = "iface-pipe")]
796            pipe_runtime: HashMap::new(),
797            #[cfg(feature = "iface-rnode")]
798            rnode_runtime: HashMap::new(),
799            interface_runtime_defaults: HashMap::new(),
800            interface_ifac_runtime: HashMap::new(),
801            interface_ifac_runtime_defaults: HashMap::new(),
802            discovered_interfaces: crate::discovery::DiscoveredInterfaceStorage::new(
803                std::env::temp_dir().join("rns-discovered-interfaces"),
804            ),
805            discovery_required_value: crate::discovery::DEFAULT_STAMP_VALUE,
806            discovery_name_hash,
807            probe_responder_hash: None,
808            discover_interfaces: false,
809            interface_announcer: None,
810            announce_verify_queue: Arc::new(Mutex::new(AnnounceVerifyQueue::new(
811                announce_queue_max_entries,
812            ))),
813            async_announce_verification: false,
814            discovery_cleanup_counter: 0,
815            discovery_cleanup_interval_ticks: runtime_config_defaults
816                .discovery_cleanup_interval_ticks,
817            memory_stats_counter: 0,
818            cache_cleanup_counter: 0,
819            announce_cache_cleanup_counter: 0,
820            known_destinations_cleanup_interval_ticks: runtime_config_defaults
821                .known_destinations_cleanup_interval_ticks,
822            known_destinations_cap_evict_count: 0,
823            announce_cache_cleanup_interval_ticks: runtime_config_defaults
824                .announce_cache_cleanup_interval_ticks,
825            cache_cleanup_active_hashes: None,
826            cache_cleanup_entries: None,
827            cache_cleanup_removed: 0,
828            announce_cache_cleanup_batch_size: runtime_config_defaults
829                .announce_cache_cleanup_batch_size,
830            management_announce_interval_secs: runtime_config_defaults
831                .management_announce_interval_secs,
832            runtime_config_defaults,
833            #[cfg(feature = "hooks")]
834            hook_slots: create_hook_slots(),
835            #[cfg(feature = "hooks")]
836            hook_manager: HookManager::new().ok(),
837            #[cfg(feature = "hooks")]
838            provider_bridge: None,
839        }
840    }
841
842    pub fn set_announce_verify_queue_config(
843        &mut self,
844        max_entries: usize,
845        max_bytes: usize,
846        max_stale_secs: f64,
847        overflow_policy: OverflowPolicy,
848    ) {
849        self.announce_verify_queue = Arc::new(Mutex::new(AnnounceVerifyQueue::with_limits(
850            max_entries,
851            max_bytes,
852            max_stale_secs,
853            overflow_policy,
854        )));
855    }
856
857    fn wrap_interface_writer(
858        &self,
859        interface_id: InterfaceId,
860        interface_name: &str,
861        writer: Box<dyn crate::interface::Writer>,
862    ) -> (
863        Box<dyn crate::interface::Writer>,
864        crate::interface::AsyncWriterMetrics,
865    ) {
866        crate::interface::wrap_async_writer(
867            writer,
868            interface_id,
869            interface_name,
870            self.event_tx.clone(),
871            self.interface_writer_queue_capacity,
872        )
873    }
874
875    #[cfg(feature = "hooks")]
876    fn provider_events_enabled(&self) -> bool {
877        self.provider_bridge.is_some()
878    }
879
880    #[cfg(feature = "hooks")]
881    fn run_backbone_peer_hook(
882        &mut self,
883        attach_point: &str,
884        point: HookPoint,
885        event: &BackbonePeerHookEvent,
886    ) {
887        let ctx = backbone_peer_hook_context(event);
888        let now = time::now();
889        let engine_ref = EngineRef {
890            engine: &self.engine,
891            interfaces: &self.interfaces,
892            link_manager: &self.link_manager,
893            now,
894        };
895        let provider_events_enabled = self.provider_events_enabled();
896        if let Some(ref e) = run_hook_inner(
897            &mut self.hook_slots[point as usize].programs,
898            &self.hook_manager,
899            &engine_ref,
900            &ctx,
901            now,
902            provider_events_enabled,
903        ) {
904            self.forward_hook_side_effects(attach_point, e);
905        }
906    }
907
908    #[cfg(feature = "iface-backbone")]
909    fn make_discoverable_interface(
910        runtime: &BackboneDiscoveryRuntimeHandle,
911    ) -> crate::discovery::DiscoverableInterface {
912        crate::discovery::DiscoverableInterface {
913            interface_name: runtime.interface_name.clone(),
914            config: runtime.current.config.clone(),
915            transport_enabled: runtime.current.transport_enabled,
916            ifac_netname: runtime.current.ifac_netname.clone(),
917            ifac_netkey: runtime.current.ifac_netkey.clone(),
918        }
919    }
920
921    #[cfg(feature = "iface-backbone")]
922    fn sync_backbone_discovery_runtime(
923        &mut self,
924        interface_name: &str,
925    ) -> Result<(), RuntimeConfigError> {
926        let handle = self
927            .backbone_discovery_runtime
928            .get(interface_name)
929            .ok_or(RuntimeConfigError {
930                code: RuntimeConfigErrorCode::NotFound,
931                message: format!("backbone interface '{}' not found", interface_name),
932            })?
933            .clone();
934
935        if handle.current.discoverable {
936            let iface = Self::make_discoverable_interface(&handle);
937            if let Some(announcer) = self.interface_announcer.as_mut() {
938                announcer.upsert_interface(iface);
939            } else if let Some(identity) = self.transport_identity.as_ref() {
940                self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
941                    *identity.hash(),
942                    vec![iface],
943                ));
944            }
945        } else if let Some(announcer) = self.interface_announcer.as_mut() {
946            announcer.remove_interface(interface_name);
947            if announcer.is_empty() {
948                self.interface_announcer = None;
949            }
950        }
951
952        Ok(())
953    }
954
955    #[cfg(feature = "iface-tcp")]
956    fn make_tcp_server_discoverable_interface(
957        runtime: &TcpServerDiscoveryRuntimeHandle,
958    ) -> crate::discovery::DiscoverableInterface {
959        crate::discovery::DiscoverableInterface {
960            interface_name: runtime.interface_name.clone(),
961            config: runtime.current.config.clone(),
962            transport_enabled: runtime.current.transport_enabled,
963            ifac_netname: runtime.current.ifac_netname.clone(),
964            ifac_netkey: runtime.current.ifac_netkey.clone(),
965        }
966    }
967
968    #[cfg(feature = "iface-tcp")]
969    fn sync_tcp_server_discovery_runtime(
970        &mut self,
971        interface_name: &str,
972    ) -> Result<(), RuntimeConfigError> {
973        let handle = self
974            .tcp_server_discovery_runtime
975            .get(interface_name)
976            .ok_or(RuntimeConfigError {
977                code: RuntimeConfigErrorCode::NotFound,
978                message: format!("tcp server interface '{}' not found", interface_name),
979            })?
980            .clone();
981
982        if handle.current.discoverable {
983            let iface = Self::make_tcp_server_discoverable_interface(&handle);
984            if let Some(announcer) = self.interface_announcer.as_mut() {
985                announcer.upsert_interface(iface);
986            } else if let Some(identity) = self.transport_identity.as_ref() {
987                self.interface_announcer = Some(crate::discovery::InterfaceAnnouncer::new(
988                    *identity.hash(),
989                    vec![iface],
990                ));
991            }
992        } else if let Some(announcer) = self.interface_announcer.as_mut() {
993            announcer.remove_interface(interface_name);
994            if announcer.is_empty() {
995                self.interface_announcer = None;
996            }
997        }
998
999        Ok(())
1000    }
1001
1002    #[cfg(feature = "hooks")]
1003    fn update_hook_program<F>(
1004        &mut self,
1005        name: &str,
1006        attach_point: &str,
1007        mut update: F,
1008    ) -> Result<(), String>
1009    where
1010        F: FnMut(&mut rns_hooks::LoadedProgram),
1011    {
1012        let point_idx = crate::config::parse_hook_point(attach_point)
1013            .ok_or_else(|| format!("unknown hook point '{}'", attach_point))?;
1014        let program = self.hook_slots[point_idx]
1015            .programs
1016            .iter_mut()
1017            .find(|program| program.name == name)
1018            .ok_or_else(|| format!("hook '{}' not found at point '{}'", name, attach_point))?;
1019        update(program);
1020        Ok(())
1021    }
1022
1023    pub(crate) fn set_tick_interval_handle(&mut self, tick_interval_ms: Arc<AtomicU64>) {
1024        self.tick_interval_ms = tick_interval_ms;
1025    }
1026
1027    pub(crate) fn set_packet_hashlist_max_entries(&mut self, max_entries: usize) {
1028        self.engine.set_packet_hashlist_max_entries(max_entries);
1029    }
1030
1031    #[cfg(feature = "hooks")]
1032    fn forward_hook_side_effects(&mut self, attach_point: &str, exec: &rns_hooks::ExecuteResult) {
1033        if !exec.injected_actions.is_empty() {
1034            self.dispatch_all(convert_injected_actions(exec.injected_actions.clone()));
1035        }
1036        if let Some(ref bridge) = self.provider_bridge {
1037            for event in &exec.provider_events {
1038                bridge.emit_event(
1039                    attach_point,
1040                    event.hook_name.clone(),
1041                    event.payload_type.clone(),
1042                    event.payload.clone(),
1043                );
1044            }
1045        }
1046    }
1047
1048    #[cfg(feature = "hooks")]
1049    fn collect_hook_side_effects(
1050        &mut self,
1051        attach_point: &str,
1052        exec: &rns_hooks::ExecuteResult,
1053        out: &mut Vec<TransportAction>,
1054    ) {
1055        if !exec.injected_actions.is_empty() {
1056            out.extend(convert_injected_actions(exec.injected_actions.clone()));
1057        }
1058        if let Some(ref bridge) = self.provider_bridge {
1059            for event in &exec.provider_events {
1060                bridge.emit_event(
1061                    attach_point,
1062                    event.hook_name.clone(),
1063                    event.payload_type.clone(),
1064                    event.payload.clone(),
1065                );
1066            }
1067        }
1068    }
1069
1070    /// Set the probe addresses, protocol, and optional device for hole punching.
1071    pub fn set_probe_config(
1072        &mut self,
1073        addrs: Vec<std::net::SocketAddr>,
1074        protocol: rns_core::holepunch::ProbeProtocol,
1075        device: Option<String>,
1076    ) {
1077        self.holepunch_manager = HolePunchManager::new(addrs, protocol, device);
1078    }
1079}