Skip to main content

rns_net/interface/
mod.rs

1//! Network interface abstractions.
2
3#[cfg(feature = "iface-auto")]
4pub mod auto;
5#[cfg(feature = "iface-backbone")]
6pub mod backbone;
7#[cfg(feature = "iface-i2p")]
8pub mod i2p;
9#[cfg(feature = "iface-kiss")]
10pub mod kiss_iface;
11#[cfg(feature = "iface-local")]
12pub mod local;
13#[cfg(feature = "iface-pipe")]
14pub mod pipe;
15pub mod registry;
16#[cfg(feature = "iface-rnode")]
17pub mod rnode;
18#[cfg(feature = "iface-serial")]
19pub mod serial_iface;
20#[cfg(feature = "iface-tcp")]
21pub mod tcp;
22#[cfg(feature = "iface-tcp")]
23pub mod tcp_server;
24#[cfg(feature = "iface-udp")]
25pub mod udp;
26
27use std::any::Any;
28use std::collections::HashMap;
29use std::io;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
32use std::sync::Arc;
33use std::thread;
34use std::time::{Duration, Instant};
35
36use crate::event::EventSender;
37use crate::ifac::IfacState;
38use rns_core::transport::types::{InterfaceId, InterfaceInfo};
39
40/// Bind a socket to a specific network interface using `SO_BINDTODEVICE`.
41///
42/// Requires `CAP_NET_RAW` or root on Linux.
43#[cfg(target_os = "linux")]
44pub fn bind_to_device(fd: std::os::unix::io::RawFd, device: &str) -> io::Result<()> {
45    let dev_bytes = device.as_bytes();
46    if dev_bytes.len() >= libc::IFNAMSIZ {
47        return Err(io::Error::new(
48            io::ErrorKind::InvalidInput,
49            format!("device name too long: {}", device),
50        ));
51    }
52    let ret = unsafe {
53        libc::setsockopt(
54            fd,
55            libc::SOL_SOCKET,
56            libc::SO_BINDTODEVICE,
57            dev_bytes.as_ptr() as *const libc::c_void,
58            dev_bytes.len() as libc::socklen_t,
59        )
60    };
61    if ret != 0 {
62        return Err(io::Error::last_os_error());
63    }
64    Ok(())
65}
66
67/// Writable end of an interface. Held by the driver.
68///
69/// Each implementation wraps a socket + framing.
70pub trait Writer: Send {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()>;
72}
73
74pub const DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY: usize = 256;
75
76#[derive(Clone, Default)]
77pub struct ListenerControl {
78    stop: Arc<AtomicBool>,
79}
80
81impl ListenerControl {
82    pub fn new() -> Self {
83        Self {
84            stop: Arc::new(AtomicBool::new(false)),
85        }
86    }
87
88    pub fn request_stop(&self) {
89        self.stop.store(true, Ordering::Relaxed);
90    }
91
92    pub fn should_stop(&self) -> bool {
93        self.stop.load(Ordering::Relaxed)
94    }
95}
96
97#[derive(Clone, Default)]
98pub struct AsyncWriterMetrics {
99    queued_frames: Arc<AtomicUsize>,
100    worker_alive: Arc<AtomicBool>,
101}
102
103impl AsyncWriterMetrics {
104    pub fn queued_frames(&self) -> usize {
105        self.queued_frames.load(Ordering::Relaxed)
106    }
107
108    pub fn worker_alive(&self) -> bool {
109        self.worker_alive.load(Ordering::Relaxed)
110    }
111}
112
113struct AsyncWriter {
114    tx: SyncSender<Vec<u8>>,
115    metrics: AsyncWriterMetrics,
116}
117
118impl Writer for AsyncWriter {
119    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
120        if !self.metrics.worker_alive() {
121            return Err(io::Error::new(
122                io::ErrorKind::BrokenPipe,
123                "interface writer worker is offline",
124            ));
125        }
126
127        match self.tx.try_send(data.to_vec()) {
128            Ok(()) => {
129                self.metrics.queued_frames.fetch_add(1, Ordering::Relaxed);
130                Ok(())
131            }
132            Err(TrySendError::Full(_)) => Err(io::Error::new(
133                io::ErrorKind::WouldBlock,
134                "interface writer queue is full",
135            )),
136            Err(TrySendError::Disconnected(_)) => {
137                self.metrics.worker_alive.store(false, Ordering::Relaxed);
138                Err(io::Error::new(
139                    io::ErrorKind::BrokenPipe,
140                    "interface writer worker disconnected",
141                ))
142            }
143        }
144    }
145}
146
147pub fn wrap_async_writer(
148    writer: Box<dyn Writer>,
149    interface_id: InterfaceId,
150    interface_name: &str,
151    event_tx: EventSender,
152    queue_capacity: usize,
153) -> (Box<dyn Writer>, AsyncWriterMetrics) {
154    let (tx, rx) = sync_channel::<Vec<u8>>(queue_capacity.max(1));
155    let metrics = AsyncWriterMetrics {
156        queued_frames: Arc::new(AtomicUsize::new(0)),
157        worker_alive: Arc::new(AtomicBool::new(true)),
158    };
159    let metrics_thread = metrics.clone();
160    let name = interface_name.to_string();
161
162    thread::Builder::new()
163        .name(format!("iface-writer-{}", interface_id.0))
164        .spawn(move || async_writer_loop(writer, rx, interface_id, name, event_tx, metrics_thread))
165        .expect("failed to spawn interface writer thread");
166
167    (
168        Box::new(AsyncWriter {
169            tx,
170            metrics: metrics.clone(),
171        }),
172        metrics,
173    )
174}
175
176fn async_writer_loop(
177    mut writer: Box<dyn Writer>,
178    rx: std::sync::mpsc::Receiver<Vec<u8>>,
179    interface_id: InterfaceId,
180    interface_name: String,
181    event_tx: EventSender,
182    metrics: AsyncWriterMetrics,
183) {
184    while let Ok(frame) = rx.recv() {
185        metrics.queued_frames.fetch_sub(1, Ordering::Relaxed);
186        if let Err(err) = writer.send_frame(&frame) {
187            metrics.worker_alive.store(false, Ordering::Relaxed);
188            log::warn!(
189                "[{}:{}] async writer exiting after send failure: {}",
190                interface_name,
191                interface_id.0,
192                err
193            );
194            let _ = event_tx.send(crate::event::Event::InterfaceDown(interface_id));
195            return;
196        }
197    }
198
199    metrics.worker_alive.store(false, Ordering::Relaxed);
200}
201
202pub use crate::common::interface_stats::{InterfaceStats, ANNOUNCE_SAMPLE_MAX};
203
204use crate::common::management::InterfaceStatusView;
205
206/// Everything the driver tracks per interface.
207pub struct InterfaceEntry {
208    pub id: InterfaceId,
209    pub info: InterfaceInfo,
210    pub writer: Box<dyn Writer>,
211    pub async_writer_metrics: Option<AsyncWriterMetrics>,
212    /// Administrative enable/disable state.
213    pub enabled: bool,
214    pub online: bool,
215    /// True for dynamically spawned interfaces (e.g. TCP server clients).
216    /// These are fully removed on InterfaceDown rather than just marked offline.
217    pub dynamic: bool,
218    /// IFAC state for this interface, if access codes are enabled.
219    pub ifac: Option<IfacState>,
220    /// Traffic statistics.
221    pub stats: InterfaceStats,
222    /// Human-readable interface type string (e.g. "TCPClientInterface").
223    pub interface_type: String,
224    /// Next time a send should be retried after a transient WouldBlock.
225    pub send_retry_at: Option<Instant>,
226    /// Current retry backoff for transient send failures.
227    pub send_retry_backoff: Duration,
228}
229
230/// Result of starting an interface via a factory.
231pub enum StartResult {
232    /// One writer, registered immediately (TcpClient, Udp, Serial, etc.)
233    Simple {
234        id: InterfaceId,
235        info: InterfaceInfo,
236        writer: Box<dyn Writer>,
237        interface_type_name: String,
238    },
239    /// Spawns a listener; dynamic interfaces arrive via Event::InterfaceUp (TcpServer, Auto, I2P, etc.)
240    Listener { control: Option<ListenerControl> },
241    /// Multiple subinterfaces from one config (RNode).
242    Multi(Vec<SubInterface>),
243}
244
245/// A single subinterface returned from a multi-interface factory.
246pub struct SubInterface {
247    pub id: InterfaceId,
248    pub info: InterfaceInfo,
249    pub writer: Box<dyn Writer>,
250    pub interface_type_name: String,
251}
252
253/// Context passed to [`InterfaceFactory::start()`].
254pub struct StartContext {
255    pub tx: EventSender,
256    pub next_dynamic_id: Arc<AtomicU64>,
257    pub mode: u8,
258    pub ingress_control: rns_core::transport::types::IngressControlConfig,
259}
260
261/// Opaque interface config data. Each factory downcasts to its concrete type.
262pub trait InterfaceConfigData: Send + Any {
263    fn as_any(&self) -> &dyn Any;
264    fn into_any(self: Box<Self>) -> Box<dyn Any>;
265}
266
267impl<T: Send + 'static> InterfaceConfigData for T {
268    fn as_any(&self) -> &dyn Any {
269        self
270    }
271
272    fn into_any(self: Box<Self>) -> Box<dyn Any> {
273        self
274    }
275}
276
277/// Factory that can parse config and start an interface type.
278pub trait InterfaceFactory: Send + Sync {
279    /// Config-file type name, e.g. "TCPClientInterface".
280    fn type_name(&self) -> &str;
281
282    /// Default IFAC size (bytes). 8 for serial/kiss/rnode, 16 for others.
283    fn default_ifac_size(&self) -> usize {
284        16
285    }
286
287    /// Parse from key-value params (config file or external).
288    fn parse_config(
289        &self,
290        name: &str,
291        id: InterfaceId,
292        params: &HashMap<String, String>,
293    ) -> Result<Box<dyn InterfaceConfigData>, String>;
294
295    /// Start the interface from parsed config.
296    fn start(
297        &self,
298        config: Box<dyn InterfaceConfigData>,
299        ctx: StartContext,
300    ) -> io::Result<StartResult>;
301}
302
303impl InterfaceStatusView for InterfaceEntry {
304    fn id(&self) -> InterfaceId {
305        self.id
306    }
307    fn info(&self) -> &InterfaceInfo {
308        &self.info
309    }
310    fn online(&self) -> bool {
311        self.online
312    }
313    fn stats(&self) -> &InterfaceStats {
314        &self.stats
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use crate::event::Event;
322    use rns_core::constants;
323    use std::sync::mpsc;
324
325    struct MockWriter {
326        sent: Vec<Vec<u8>>,
327    }
328
329    impl MockWriter {
330        fn new() -> Self {
331            MockWriter { sent: Vec::new() }
332        }
333    }
334
335    impl Writer for MockWriter {
336        fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
337            self.sent.push(data.to_vec());
338            Ok(())
339        }
340    }
341
342    #[test]
343    fn interface_entry_construction() {
344        let entry = InterfaceEntry {
345            id: InterfaceId(1),
346            info: InterfaceInfo {
347                id: InterfaceId(1),
348                name: String::new(),
349                mode: constants::MODE_FULL,
350                out_capable: true,
351                in_capable: true,
352                bitrate: None,
353                announce_rate_target: None,
354                announce_rate_grace: 0,
355                announce_rate_penalty: 0.0,
356                announce_cap: constants::ANNOUNCE_CAP,
357                is_local_client: false,
358                wants_tunnel: false,
359                tunnel_id: None,
360                mtu: constants::MTU as u32,
361                ia_freq: 0.0,
362                started: 0.0,
363                ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
364            },
365            writer: Box::new(MockWriter::new()),
366            async_writer_metrics: None,
367            enabled: true,
368            online: false,
369            dynamic: false,
370            ifac: None,
371            stats: InterfaceStats::default(),
372            interface_type: String::new(),
373            send_retry_at: None,
374            send_retry_backoff: Duration::ZERO,
375        };
376        assert_eq!(entry.id, InterfaceId(1));
377        assert!(!entry.online);
378        assert!(!entry.dynamic);
379    }
380
381    #[test]
382    fn mock_writer_captures_bytes() {
383        let mut writer = MockWriter::new();
384        writer.send_frame(b"hello").unwrap();
385        writer.send_frame(b"world").unwrap();
386        assert_eq!(writer.sent.len(), 2);
387        assert_eq!(writer.sent[0], b"hello");
388        assert_eq!(writer.sent[1], b"world");
389    }
390
391    #[test]
392    fn writer_send_frame_produces_output() {
393        let mut writer = MockWriter::new();
394        let data = vec![0x01, 0x02, 0x03];
395        writer.send_frame(&data).unwrap();
396        assert_eq!(writer.sent[0], data);
397    }
398
399    struct BlockingWriter {
400        entered_tx: mpsc::Sender<()>,
401        release_rx: mpsc::Receiver<()>,
402    }
403
404    impl Writer for BlockingWriter {
405        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
406            let _ = self.entered_tx.send(());
407            let _ = self.release_rx.recv();
408            Ok(())
409        }
410    }
411
412    struct FailingWriter;
413
414    impl Writer for FailingWriter {
415        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
416            Err(io::Error::new(io::ErrorKind::BrokenPipe, "boom"))
417        }
418    }
419
420    #[test]
421    fn async_writer_returns_wouldblock_when_queue_is_full() {
422        let (event_tx, _event_rx) = crate::event::channel();
423        let (entered_tx, entered_rx) = mpsc::channel();
424        let (release_tx, release_rx) = mpsc::channel();
425        let (mut writer, metrics) = wrap_async_writer(
426            Box::new(BlockingWriter {
427                entered_tx,
428                release_rx,
429            }),
430            InterfaceId(7),
431            "test",
432            event_tx,
433            1,
434        );
435
436        writer.send_frame(&[1]).unwrap();
437        entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
438        writer.send_frame(&[2]).unwrap();
439        assert_eq!(metrics.queued_frames(), 1);
440        let err = writer.send_frame(&[3]).unwrap_err();
441        assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
442
443        let _ = release_tx.send(());
444    }
445
446    #[test]
447    fn async_writer_reports_interface_down_after_worker_failure() {
448        let (event_tx, event_rx) = crate::event::channel();
449        let (mut writer, metrics) =
450            wrap_async_writer(Box::new(FailingWriter), InterfaceId(9), "fail", event_tx, 2);
451
452        writer.send_frame(&[1]).unwrap();
453        let event = event_rx.recv_timeout(Duration::from_secs(1)).unwrap();
454        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9))));
455        assert!(!metrics.worker_alive());
456    }
457}