Skip to main content

rns_net/interface/
mod.rs

1//! Network interface abstractions.
2
3#[cfg(feature = "iface-auto")]
4pub mod auto;
5#[cfg(feature = "iface-backbone")]
6pub mod backbone;
7#[cfg(feature = "iface-i2p")]
8pub mod i2p;
9#[cfg(feature = "iface-kiss")]
10pub mod kiss_iface;
11#[cfg(feature = "iface-local")]
12pub mod local;
13#[cfg(feature = "iface-pipe")]
14pub mod pipe;
15pub mod registry;
16#[cfg(feature = "iface-rnode")]
17pub mod rnode;
18#[cfg(feature = "iface-serial")]
19pub mod serial_iface;
20#[cfg(feature = "iface-tcp")]
21pub mod tcp;
22#[cfg(feature = "iface-tcp")]
23pub mod tcp_server;
24#[cfg(feature = "iface-udp")]
25pub mod udp;
26
27use std::any::Any;
28use std::collections::HashMap;
29use std::io;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
32use std::sync::{Arc, Mutex, MutexGuard};
33use std::thread;
34use std::time::{Duration, Instant};
35
36use crate::event::EventSender;
37use crate::ifac::IfacState;
38use rns_core::transport::types::{InterfaceId, InterfaceInfo};
39
40/// Bind a socket to a specific network interface using `SO_BINDTODEVICE`.
41///
42/// Requires `CAP_NET_RAW` or root on Linux.
43#[cfg(target_os = "linux")]
44pub fn bind_to_device(fd: std::os::unix::io::RawFd, device: &str) -> io::Result<()> {
45    let dev_bytes = device.as_bytes();
46    if dev_bytes.len() >= libc::IFNAMSIZ {
47        return Err(io::Error::new(
48            io::ErrorKind::InvalidInput,
49            format!("device name too long: {}", device),
50        ));
51    }
52    let ret = unsafe {
53        libc::setsockopt(
54            fd,
55            libc::SOL_SOCKET,
56            libc::SO_BINDTODEVICE,
57            dev_bytes.as_ptr() as *const libc::c_void,
58            dev_bytes.len() as libc::socklen_t,
59        )
60    };
61    if ret != 0 {
62        return Err(io::Error::last_os_error());
63    }
64    Ok(())
65}
66
67/// Writable end of an interface. Held by the driver.
68///
69/// Each implementation wraps a socket + framing.
70pub trait Writer: Send {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()>;
72}
73
74pub const DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY: usize = 256;
75
76pub(crate) fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>, label: &str) -> MutexGuard<'a, T> {
77    match mutex.lock() {
78        Ok(guard) => guard,
79        Err(poisoned) => {
80            log::warn!("recovering poisoned mutex: {}", label);
81            poisoned.into_inner()
82        }
83    }
84}
85
86#[derive(Clone, Default)]
87pub struct ListenerControl {
88    stop: Arc<AtomicBool>,
89}
90
91impl ListenerControl {
92    pub fn new() -> Self {
93        Self {
94            stop: Arc::new(AtomicBool::new(false)),
95        }
96    }
97
98    pub fn request_stop(&self) {
99        self.stop.store(true, Ordering::Relaxed);
100    }
101
102    pub fn should_stop(&self) -> bool {
103        self.stop.load(Ordering::Relaxed)
104    }
105}
106
107#[derive(Clone, Default)]
108pub struct AsyncWriterMetrics {
109    queued_frames: Arc<AtomicUsize>,
110    worker_alive: Arc<AtomicBool>,
111}
112
113impl AsyncWriterMetrics {
114    pub fn queued_frames(&self) -> usize {
115        self.queued_frames.load(Ordering::Relaxed)
116    }
117
118    pub fn worker_alive(&self) -> bool {
119        self.worker_alive.load(Ordering::Relaxed)
120    }
121}
122
123struct AsyncWriter {
124    tx: SyncSender<Vec<u8>>,
125    metrics: AsyncWriterMetrics,
126}
127
128impl Writer for AsyncWriter {
129    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
130        if !self.metrics.worker_alive() {
131            return Err(io::Error::new(
132                io::ErrorKind::BrokenPipe,
133                "interface writer worker is offline",
134            ));
135        }
136
137        match self.tx.try_send(data.to_vec()) {
138            Ok(()) => {
139                self.metrics.queued_frames.fetch_add(1, Ordering::Relaxed);
140                Ok(())
141            }
142            Err(TrySendError::Full(_)) => Err(io::Error::new(
143                io::ErrorKind::WouldBlock,
144                "interface writer queue is full",
145            )),
146            Err(TrySendError::Disconnected(_)) => {
147                self.metrics.worker_alive.store(false, Ordering::Relaxed);
148                Err(io::Error::new(
149                    io::ErrorKind::BrokenPipe,
150                    "interface writer worker disconnected",
151                ))
152            }
153        }
154    }
155}
156
157pub fn wrap_async_writer(
158    writer: Box<dyn Writer>,
159    interface_id: InterfaceId,
160    interface_name: &str,
161    event_tx: EventSender,
162    queue_capacity: usize,
163) -> (Box<dyn Writer>, AsyncWriterMetrics) {
164    let (tx, rx) = sync_channel::<Vec<u8>>(queue_capacity.max(1));
165    let metrics = AsyncWriterMetrics {
166        queued_frames: Arc::new(AtomicUsize::new(0)),
167        worker_alive: Arc::new(AtomicBool::new(true)),
168    };
169    let metrics_thread = metrics.clone();
170    let name = interface_name.to_string();
171
172    let spawn_result = thread::Builder::new()
173        .name(format!("iface-writer-{}", interface_id.0))
174        .spawn(move || async_writer_loop(writer, rx, interface_id, name, event_tx, metrics_thread));
175
176    if let Err(err) = spawn_result {
177        metrics.worker_alive.store(false, Ordering::Relaxed);
178        log::error!(
179            "[{}:{}] failed to spawn async writer thread: {}",
180            interface_name,
181            interface_id.0,
182            err
183        );
184        return (Box::new(DirectWriterFallback), metrics);
185    }
186
187    (
188        Box::new(AsyncWriter {
189            tx,
190            metrics: metrics.clone(),
191        }),
192        metrics,
193    )
194}
195
196struct DirectWriterFallback;
197
198impl Writer for DirectWriterFallback {
199    fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
200        Err(io::Error::other("interface writer worker unavailable"))
201    }
202}
203
204fn async_writer_loop(
205    mut writer: Box<dyn Writer>,
206    rx: std::sync::mpsc::Receiver<Vec<u8>>,
207    interface_id: InterfaceId,
208    interface_name: String,
209    event_tx: EventSender,
210    metrics: AsyncWriterMetrics,
211) {
212    while let Ok(frame) = rx.recv() {
213        metrics.queued_frames.fetch_sub(1, Ordering::Relaxed);
214        if let Err(err) = writer.send_frame(&frame) {
215            metrics.worker_alive.store(false, Ordering::Relaxed);
216            log::warn!(
217                "[{}:{}] async writer exiting after send failure: {}",
218                interface_name,
219                interface_id.0,
220                err
221            );
222            let _ = event_tx.send(crate::event::Event::InterfaceDown(interface_id));
223            return;
224        }
225    }
226
227    metrics.worker_alive.store(false, Ordering::Relaxed);
228}
229
230pub use crate::common::interface_stats::{InterfaceStats, ANNOUNCE_SAMPLE_MAX};
231
232use crate::common::management::InterfaceStatusView;
233
234/// Everything the driver tracks per interface.
235pub struct InterfaceEntry {
236    pub id: InterfaceId,
237    pub info: InterfaceInfo,
238    pub writer: Box<dyn Writer>,
239    pub async_writer_metrics: Option<AsyncWriterMetrics>,
240    /// Administrative enable/disable state.
241    pub enabled: bool,
242    pub online: bool,
243    /// True for dynamically spawned interfaces (e.g. TCP server clients).
244    /// These are fully removed on InterfaceDown rather than just marked offline.
245    pub dynamic: bool,
246    /// IFAC state for this interface, if access codes are enabled.
247    pub ifac: Option<IfacState>,
248    /// Traffic statistics.
249    pub stats: InterfaceStats,
250    /// Human-readable interface type string (e.g. "TCPClientInterface").
251    pub interface_type: String,
252    /// Next time a send should be retried after a transient WouldBlock.
253    pub send_retry_at: Option<Instant>,
254    /// Current retry backoff for transient send failures.
255    pub send_retry_backoff: Duration,
256}
257
258/// Result of starting an interface via a factory.
259pub enum StartResult {
260    /// One writer, registered immediately (TcpClient, Udp, Serial, etc.)
261    Simple {
262        id: InterfaceId,
263        info: InterfaceInfo,
264        writer: Box<dyn Writer>,
265        interface_type_name: String,
266    },
267    /// Spawns a listener; dynamic interfaces arrive via Event::InterfaceUp (TcpServer, Auto, I2P, etc.)
268    Listener { control: Option<ListenerControl> },
269    /// Multiple subinterfaces from one config (RNode).
270    Multi(Vec<SubInterface>),
271}
272
273/// A single subinterface returned from a multi-interface factory.
274pub struct SubInterface {
275    pub id: InterfaceId,
276    pub info: InterfaceInfo,
277    pub writer: Box<dyn Writer>,
278    pub interface_type_name: String,
279}
280
281/// Context passed to [`InterfaceFactory::start()`].
282pub struct StartContext {
283    pub tx: EventSender,
284    pub next_dynamic_id: Arc<AtomicU64>,
285    pub mode: u8,
286    pub ingress_control: rns_core::transport::types::IngressControlConfig,
287}
288
289/// Opaque interface config data. Each factory downcasts to its concrete type.
290pub trait InterfaceConfigData: Send + Any {
291    fn as_any(&self) -> &dyn Any;
292    fn into_any(self: Box<Self>) -> Box<dyn Any>;
293}
294
295impl<T: Send + 'static> InterfaceConfigData for T {
296    fn as_any(&self) -> &dyn Any {
297        self
298    }
299
300    fn into_any(self: Box<Self>) -> Box<dyn Any> {
301        self
302    }
303}
304
305/// Factory that can parse config and start an interface type.
306pub trait InterfaceFactory: Send + Sync {
307    /// Config-file type name, e.g. "TCPClientInterface".
308    fn type_name(&self) -> &str;
309
310    /// Default IFAC size (bytes). 8 for serial/kiss/rnode, 16 for others.
311    fn default_ifac_size(&self) -> usize {
312        16
313    }
314
315    /// Parse from key-value params (config file or external).
316    fn parse_config(
317        &self,
318        name: &str,
319        id: InterfaceId,
320        params: &HashMap<String, String>,
321    ) -> Result<Box<dyn InterfaceConfigData>, String>;
322
323    /// Start the interface from parsed config.
324    fn start(
325        &self,
326        config: Box<dyn InterfaceConfigData>,
327        ctx: StartContext,
328    ) -> io::Result<StartResult>;
329}
330
331impl InterfaceStatusView for InterfaceEntry {
332    fn id(&self) -> InterfaceId {
333        self.id
334    }
335    fn info(&self) -> &InterfaceInfo {
336        &self.info
337    }
338    fn online(&self) -> bool {
339        self.online
340    }
341    fn stats(&self) -> &InterfaceStats {
342        &self.stats
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::event::Event;
350    use rns_core::constants;
351    use std::sync::mpsc;
352
353    struct MockWriter {
354        sent: Vec<Vec<u8>>,
355    }
356
357    impl MockWriter {
358        fn new() -> Self {
359            MockWriter { sent: Vec::new() }
360        }
361    }
362
363    impl Writer for MockWriter {
364        fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
365            self.sent.push(data.to_vec());
366            Ok(())
367        }
368    }
369
370    #[test]
371    fn interface_entry_construction() {
372        let entry = InterfaceEntry {
373            id: InterfaceId(1),
374            info: InterfaceInfo {
375                id: InterfaceId(1),
376                name: String::new(),
377                mode: constants::MODE_FULL,
378                out_capable: true,
379                in_capable: true,
380                bitrate: None,
381                airtime_profile: None,
382                announce_rate_target: None,
383                announce_rate_grace: 0,
384                announce_rate_penalty: 0.0,
385                announce_cap: constants::ANNOUNCE_CAP,
386                is_local_client: false,
387                wants_tunnel: false,
388                tunnel_id: None,
389                mtu: constants::MTU as u32,
390                ia_freq: 0.0,
391                started: 0.0,
392                ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
393            },
394            writer: Box::new(MockWriter::new()),
395            async_writer_metrics: None,
396            enabled: true,
397            online: false,
398            dynamic: false,
399            ifac: None,
400            stats: InterfaceStats::default(),
401            interface_type: String::new(),
402            send_retry_at: None,
403            send_retry_backoff: Duration::ZERO,
404        };
405        assert_eq!(entry.id, InterfaceId(1));
406        assert!(!entry.online);
407        assert!(!entry.dynamic);
408    }
409
410    #[test]
411    fn mock_writer_captures_bytes() {
412        let mut writer = MockWriter::new();
413        writer.send_frame(b"hello").unwrap();
414        writer.send_frame(b"world").unwrap();
415        assert_eq!(writer.sent.len(), 2);
416        assert_eq!(writer.sent[0], b"hello");
417        assert_eq!(writer.sent[1], b"world");
418    }
419
420    #[test]
421    fn writer_send_frame_produces_output() {
422        let mut writer = MockWriter::new();
423        let data = vec![0x01, 0x02, 0x03];
424        writer.send_frame(&data).unwrap();
425        assert_eq!(writer.sent[0], data);
426    }
427
428    struct BlockingWriter {
429        entered_tx: mpsc::Sender<()>,
430        release_rx: mpsc::Receiver<()>,
431    }
432
433    impl Writer for BlockingWriter {
434        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
435            let _ = self.entered_tx.send(());
436            let _ = self.release_rx.recv();
437            Ok(())
438        }
439    }
440
441    struct FailingWriter;
442
443    impl Writer for FailingWriter {
444        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
445            Err(io::Error::new(io::ErrorKind::BrokenPipe, "boom"))
446        }
447    }
448
449    #[test]
450    fn async_writer_returns_wouldblock_when_queue_is_full() {
451        let (event_tx, _event_rx) = crate::event::channel();
452        let (entered_tx, entered_rx) = mpsc::channel();
453        let (release_tx, release_rx) = mpsc::channel();
454        let (mut writer, metrics) = wrap_async_writer(
455            Box::new(BlockingWriter {
456                entered_tx,
457                release_rx,
458            }),
459            InterfaceId(7),
460            "test",
461            event_tx,
462            1,
463        );
464
465        writer.send_frame(&[1]).unwrap();
466        entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
467        writer.send_frame(&[2]).unwrap();
468        assert_eq!(metrics.queued_frames(), 1);
469        let err = writer.send_frame(&[3]).unwrap_err();
470        assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
471
472        let _ = release_tx.send(());
473    }
474
475    #[test]
476    fn async_writer_reports_interface_down_after_worker_failure() {
477        let (event_tx, event_rx) = crate::event::channel();
478        let (mut writer, metrics) =
479            wrap_async_writer(Box::new(FailingWriter), InterfaceId(9), "fail", event_tx, 2);
480
481        writer.send_frame(&[1]).unwrap();
482        let event = event_rx.recv_timeout(Duration::from_secs(1)).unwrap();
483        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9))));
484        assert!(!metrics.worker_alive());
485    }
486}