Skip to main content

rns_net/interface/
mod.rs

1//! Network interface abstractions.
2
3#[cfg(feature = "iface-auto")]
4pub mod auto;
5#[cfg(feature = "iface-backbone")]
6pub mod backbone;
7#[cfg(feature = "iface-i2p")]
8pub mod i2p;
9#[cfg(feature = "iface-kiss")]
10pub mod kiss_iface;
11#[cfg(feature = "iface-local")]
12pub mod local;
13#[cfg(feature = "iface-pipe")]
14pub mod pipe;
15pub mod registry;
16#[cfg(feature = "iface-rnode")]
17pub mod rnode;
18#[cfg(feature = "iface-serial")]
19pub mod serial_iface;
20#[cfg(feature = "iface-tcp")]
21pub mod tcp;
22#[cfg(feature = "iface-tcp")]
23pub mod tcp_server;
24#[cfg(feature = "iface-udp")]
25pub mod udp;
26
27use std::any::Any;
28use std::collections::HashMap;
29use std::io;
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
32use std::sync::Arc;
33use std::thread;
34use std::time::{Duration, Instant};
35
36use crate::event::EventSender;
37use crate::ifac::IfacState;
38use rns_core::transport::types::{InterfaceId, InterfaceInfo};
39
40/// Bind a socket to a specific network interface using `SO_BINDTODEVICE`.
41///
42/// Requires `CAP_NET_RAW` or root on Linux.
43#[cfg(target_os = "linux")]
44pub fn bind_to_device(fd: std::os::unix::io::RawFd, device: &str) -> io::Result<()> {
45    let dev_bytes = device.as_bytes();
46    if dev_bytes.len() >= libc::IFNAMSIZ {
47        return Err(io::Error::new(
48            io::ErrorKind::InvalidInput,
49            format!("device name too long: {}", device),
50        ));
51    }
52    let ret = unsafe {
53        libc::setsockopt(
54            fd,
55            libc::SOL_SOCKET,
56            libc::SO_BINDTODEVICE,
57            dev_bytes.as_ptr() as *const libc::c_void,
58            dev_bytes.len() as libc::socklen_t,
59        )
60    };
61    if ret != 0 {
62        return Err(io::Error::last_os_error());
63    }
64    Ok(())
65}
66
67/// Writable end of an interface. Held by the driver.
68///
69/// Each implementation wraps a socket + framing.
70pub trait Writer: Send {
71    fn send_frame(&mut self, data: &[u8]) -> io::Result<()>;
72}
73
74pub const DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY: usize = 256;
75
76struct AsyncWriter {
77    tx: SyncSender<Vec<u8>>,
78    worker_alive: Arc<AtomicBool>,
79}
80
81impl Writer for AsyncWriter {
82    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
83        if !self.worker_alive.load(Ordering::Relaxed) {
84            return Err(io::Error::new(
85                io::ErrorKind::BrokenPipe,
86                "interface writer worker is offline",
87            ));
88        }
89
90        match self.tx.try_send(data.to_vec()) {
91            Ok(()) => Ok(()),
92            Err(TrySendError::Full(_)) => Err(io::Error::new(
93                io::ErrorKind::WouldBlock,
94                "interface writer queue is full",
95            )),
96            Err(TrySendError::Disconnected(_)) => {
97                self.worker_alive.store(false, Ordering::Relaxed);
98                Err(io::Error::new(
99                    io::ErrorKind::BrokenPipe,
100                    "interface writer worker disconnected",
101                ))
102            }
103        }
104    }
105}
106
107pub fn wrap_async_writer(
108    writer: Box<dyn Writer>,
109    interface_id: InterfaceId,
110    interface_name: &str,
111    event_tx: EventSender,
112    queue_capacity: usize,
113) -> Box<dyn Writer> {
114    let (tx, rx) = sync_channel::<Vec<u8>>(queue_capacity.max(1));
115    let worker_alive = Arc::new(AtomicBool::new(true));
116    let worker_alive_thread = Arc::clone(&worker_alive);
117    let name = interface_name.to_string();
118
119    thread::Builder::new()
120        .name(format!("iface-writer-{}", interface_id.0))
121        .spawn(move || {
122            async_writer_loop(
123                writer,
124                rx,
125                interface_id,
126                name,
127                event_tx,
128                worker_alive_thread,
129            )
130        })
131        .expect("failed to spawn interface writer thread");
132
133    Box::new(AsyncWriter { tx, worker_alive })
134}
135
136fn async_writer_loop(
137    mut writer: Box<dyn Writer>,
138    rx: std::sync::mpsc::Receiver<Vec<u8>>,
139    interface_id: InterfaceId,
140    interface_name: String,
141    event_tx: EventSender,
142    worker_alive: Arc<AtomicBool>,
143) {
144    while let Ok(frame) = rx.recv() {
145        if let Err(err) = writer.send_frame(&frame) {
146            worker_alive.store(false, Ordering::Relaxed);
147            log::warn!(
148                "[{}:{}] async writer exiting after send failure: {}",
149                interface_name,
150                interface_id.0,
151                err
152            );
153            let _ = event_tx.send(crate::event::Event::InterfaceDown(interface_id));
154            return;
155        }
156    }
157
158    worker_alive.store(false, Ordering::Relaxed);
159}
160
161pub use crate::common::interface_stats::{InterfaceStats, ANNOUNCE_SAMPLE_MAX};
162
163use crate::common::management::InterfaceStatusView;
164
165/// Everything the driver tracks per interface.
166pub struct InterfaceEntry {
167    pub id: InterfaceId,
168    pub info: InterfaceInfo,
169    pub writer: Box<dyn Writer>,
170    /// Administrative enable/disable state.
171    pub enabled: bool,
172    pub online: bool,
173    /// True for dynamically spawned interfaces (e.g. TCP server clients).
174    /// These are fully removed on InterfaceDown rather than just marked offline.
175    pub dynamic: bool,
176    /// IFAC state for this interface, if access codes are enabled.
177    pub ifac: Option<IfacState>,
178    /// Traffic statistics.
179    pub stats: InterfaceStats,
180    /// Human-readable interface type string (e.g. "TCPClientInterface").
181    pub interface_type: String,
182    /// Next time a send should be retried after a transient WouldBlock.
183    pub send_retry_at: Option<Instant>,
184    /// Current retry backoff for transient send failures.
185    pub send_retry_backoff: Duration,
186}
187
188/// Result of starting an interface via a factory.
189pub enum StartResult {
190    /// One writer, registered immediately (TcpClient, Udp, Serial, etc.)
191    Simple {
192        id: InterfaceId,
193        info: InterfaceInfo,
194        writer: Box<dyn Writer>,
195        interface_type_name: String,
196    },
197    /// Spawns a listener; dynamic interfaces arrive via Event::InterfaceUp (TcpServer, Auto, I2P, etc.)
198    Listener,
199    /// Multiple subinterfaces from one config (RNode).
200    Multi(Vec<SubInterface>),
201}
202
203/// A single subinterface returned from a multi-interface factory.
204pub struct SubInterface {
205    pub id: InterfaceId,
206    pub info: InterfaceInfo,
207    pub writer: Box<dyn Writer>,
208    pub interface_type_name: String,
209}
210
211/// Context passed to [`InterfaceFactory::start()`].
212pub struct StartContext {
213    pub tx: EventSender,
214    pub next_dynamic_id: Arc<AtomicU64>,
215    pub mode: u8,
216}
217
218/// Opaque interface config data. Each factory downcasts to its concrete type.
219pub trait InterfaceConfigData: Send + Any {
220    fn as_any(&self) -> &dyn Any;
221    fn into_any(self: Box<Self>) -> Box<dyn Any>;
222}
223
224impl<T: Send + 'static> InterfaceConfigData for T {
225    fn as_any(&self) -> &dyn Any {
226        self
227    }
228
229    fn into_any(self: Box<Self>) -> Box<dyn Any> {
230        self
231    }
232}
233
234/// Factory that can parse config and start an interface type.
235pub trait InterfaceFactory: Send + Sync {
236    /// Config-file type name, e.g. "TCPClientInterface".
237    fn type_name(&self) -> &str;
238
239    /// Default IFAC size (bytes). 8 for serial/kiss/rnode, 16 for others.
240    fn default_ifac_size(&self) -> usize {
241        16
242    }
243
244    /// Parse from key-value params (config file or external).
245    fn parse_config(
246        &self,
247        name: &str,
248        id: InterfaceId,
249        params: &HashMap<String, String>,
250    ) -> Result<Box<dyn InterfaceConfigData>, String>;
251
252    /// Start the interface from parsed config.
253    fn start(
254        &self,
255        config: Box<dyn InterfaceConfigData>,
256        ctx: StartContext,
257    ) -> io::Result<StartResult>;
258}
259
260impl InterfaceStatusView for InterfaceEntry {
261    fn id(&self) -> InterfaceId {
262        self.id
263    }
264    fn info(&self) -> &InterfaceInfo {
265        &self.info
266    }
267    fn online(&self) -> bool {
268        self.online
269    }
270    fn stats(&self) -> &InterfaceStats {
271        &self.stats
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use crate::event::Event;
279    use rns_core::constants;
280    use std::sync::mpsc;
281
282    struct MockWriter {
283        sent: Vec<Vec<u8>>,
284    }
285
286    impl MockWriter {
287        fn new() -> Self {
288            MockWriter { sent: Vec::new() }
289        }
290    }
291
292    impl Writer for MockWriter {
293        fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
294            self.sent.push(data.to_vec());
295            Ok(())
296        }
297    }
298
299    #[test]
300    fn interface_entry_construction() {
301        let entry = InterfaceEntry {
302            id: InterfaceId(1),
303            info: InterfaceInfo {
304                id: InterfaceId(1),
305                name: String::new(),
306                mode: constants::MODE_FULL,
307                out_capable: true,
308                in_capable: true,
309                bitrate: None,
310                announce_rate_target: None,
311                announce_rate_grace: 0,
312                announce_rate_penalty: 0.0,
313                announce_cap: constants::ANNOUNCE_CAP,
314                is_local_client: false,
315                wants_tunnel: false,
316                tunnel_id: None,
317                mtu: constants::MTU as u32,
318                ia_freq: 0.0,
319                started: 0.0,
320                ingress_control: false,
321            },
322            writer: Box::new(MockWriter::new()),
323            enabled: true,
324            online: false,
325            dynamic: false,
326            ifac: None,
327            stats: InterfaceStats::default(),
328            interface_type: String::new(),
329            send_retry_at: None,
330            send_retry_backoff: Duration::ZERO,
331        };
332        assert_eq!(entry.id, InterfaceId(1));
333        assert!(!entry.online);
334        assert!(!entry.dynamic);
335    }
336
337    #[test]
338    fn mock_writer_captures_bytes() {
339        let mut writer = MockWriter::new();
340        writer.send_frame(b"hello").unwrap();
341        writer.send_frame(b"world").unwrap();
342        assert_eq!(writer.sent.len(), 2);
343        assert_eq!(writer.sent[0], b"hello");
344        assert_eq!(writer.sent[1], b"world");
345    }
346
347    #[test]
348    fn writer_send_frame_produces_output() {
349        let mut writer = MockWriter::new();
350        let data = vec![0x01, 0x02, 0x03];
351        writer.send_frame(&data).unwrap();
352        assert_eq!(writer.sent[0], data);
353    }
354
355    struct BlockingWriter {
356        entered_tx: mpsc::Sender<()>,
357        release_rx: mpsc::Receiver<()>,
358    }
359
360    impl Writer for BlockingWriter {
361        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
362            let _ = self.entered_tx.send(());
363            let _ = self.release_rx.recv();
364            Ok(())
365        }
366    }
367
368    struct FailingWriter;
369
370    impl Writer for FailingWriter {
371        fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
372            Err(io::Error::new(io::ErrorKind::BrokenPipe, "boom"))
373        }
374    }
375
376    #[test]
377    fn async_writer_returns_wouldblock_when_queue_is_full() {
378        let (event_tx, _event_rx) = crate::event::channel();
379        let (entered_tx, entered_rx) = mpsc::channel();
380        let (release_tx, release_rx) = mpsc::channel();
381        let mut writer = wrap_async_writer(
382            Box::new(BlockingWriter {
383                entered_tx,
384                release_rx,
385            }),
386            InterfaceId(7),
387            "test",
388            event_tx,
389            1,
390        );
391
392        writer.send_frame(&[1]).unwrap();
393        entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
394        writer.send_frame(&[2]).unwrap();
395        let err = writer.send_frame(&[3]).unwrap_err();
396        assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
397
398        let _ = release_tx.send(());
399    }
400
401    #[test]
402    fn async_writer_reports_interface_down_after_worker_failure() {
403        let (event_tx, event_rx) = crate::event::channel();
404        let mut writer =
405            wrap_async_writer(Box::new(FailingWriter), InterfaceId(9), "fail", event_tx, 2);
406
407        writer.send_frame(&[1]).unwrap();
408        let event = event_rx.recv_timeout(Duration::from_secs(1)).unwrap();
409        assert!(matches!(event, Event::InterfaceDown(InterfaceId(9))));
410    }
411}