1#[cfg(feature = "iface-auto")]
4pub mod auto;
5#[cfg(feature = "iface-backbone")]
6pub mod backbone;
7#[cfg(feature = "iface-i2p")]
8pub mod i2p;
9#[cfg(feature = "iface-kiss")]
10pub mod kiss_iface;
11#[cfg(feature = "iface-local")]
12pub mod local;
13#[cfg(feature = "iface-pipe")]
14pub mod pipe;
15pub mod registry;
16#[cfg(feature = "iface-rnode")]
17pub mod rnode;
18#[cfg(feature = "iface-serial")]
19pub mod serial_iface;
20#[cfg(feature = "iface-tcp")]
21pub mod tcp;
22#[cfg(feature = "iface-tcp")]
23pub mod tcp_server;
24#[cfg(feature = "iface-udp")]
25pub mod udp;
26
27use std::any::Any;
28use std::collections::HashMap;
29use std::io;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
32use std::sync::Arc;
33use std::thread;
34use std::time::{Duration, Instant};
35
36use crate::event::EventSender;
37use crate::ifac::IfacState;
38use rns_core::transport::types::{InterfaceId, InterfaceInfo};
39
40#[cfg(target_os = "linux")]
44pub fn bind_to_device(fd: std::os::unix::io::RawFd, device: &str) -> io::Result<()> {
45 let dev_bytes = device.as_bytes();
46 if dev_bytes.len() >= libc::IFNAMSIZ {
47 return Err(io::Error::new(
48 io::ErrorKind::InvalidInput,
49 format!("device name too long: {}", device),
50 ));
51 }
52 let ret = unsafe {
53 libc::setsockopt(
54 fd,
55 libc::SOL_SOCKET,
56 libc::SO_BINDTODEVICE,
57 dev_bytes.as_ptr() as *const libc::c_void,
58 dev_bytes.len() as libc::socklen_t,
59 )
60 };
61 if ret != 0 {
62 return Err(io::Error::last_os_error());
63 }
64 Ok(())
65}
66
67pub trait Writer: Send {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()>;
72}
73
74pub const DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY: usize = 256;
75
76#[derive(Clone, Default)]
77pub struct ListenerControl {
78 stop: Arc<AtomicBool>,
79}
80
81impl ListenerControl {
82 pub fn new() -> Self {
83 Self {
84 stop: Arc::new(AtomicBool::new(false)),
85 }
86 }
87
88 pub fn request_stop(&self) {
89 self.stop.store(true, Ordering::Relaxed);
90 }
91
92 pub fn should_stop(&self) -> bool {
93 self.stop.load(Ordering::Relaxed)
94 }
95}
96
97#[derive(Clone, Default)]
98pub struct AsyncWriterMetrics {
99 queued_frames: Arc<AtomicUsize>,
100 worker_alive: Arc<AtomicBool>,
101}
102
103impl AsyncWriterMetrics {
104 pub fn queued_frames(&self) -> usize {
105 self.queued_frames.load(Ordering::Relaxed)
106 }
107
108 pub fn worker_alive(&self) -> bool {
109 self.worker_alive.load(Ordering::Relaxed)
110 }
111}
112
113struct AsyncWriter {
114 tx: SyncSender<Vec<u8>>,
115 metrics: AsyncWriterMetrics,
116}
117
118impl Writer for AsyncWriter {
119 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
120 if !self.metrics.worker_alive() {
121 return Err(io::Error::new(
122 io::ErrorKind::BrokenPipe,
123 "interface writer worker is offline",
124 ));
125 }
126
127 match self.tx.try_send(data.to_vec()) {
128 Ok(()) => {
129 self.metrics.queued_frames.fetch_add(1, Ordering::Relaxed);
130 Ok(())
131 }
132 Err(TrySendError::Full(_)) => Err(io::Error::new(
133 io::ErrorKind::WouldBlock,
134 "interface writer queue is full",
135 )),
136 Err(TrySendError::Disconnected(_)) => {
137 self.metrics.worker_alive.store(false, Ordering::Relaxed);
138 Err(io::Error::new(
139 io::ErrorKind::BrokenPipe,
140 "interface writer worker disconnected",
141 ))
142 }
143 }
144 }
145}
146
147pub fn wrap_async_writer(
148 writer: Box<dyn Writer>,
149 interface_id: InterfaceId,
150 interface_name: &str,
151 event_tx: EventSender,
152 queue_capacity: usize,
153) -> (Box<dyn Writer>, AsyncWriterMetrics) {
154 let (tx, rx) = sync_channel::<Vec<u8>>(queue_capacity.max(1));
155 let metrics = AsyncWriterMetrics {
156 queued_frames: Arc::new(AtomicUsize::new(0)),
157 worker_alive: Arc::new(AtomicBool::new(true)),
158 };
159 let metrics_thread = metrics.clone();
160 let name = interface_name.to_string();
161
162 thread::Builder::new()
163 .name(format!("iface-writer-{}", interface_id.0))
164 .spawn(move || async_writer_loop(writer, rx, interface_id, name, event_tx, metrics_thread))
165 .expect("failed to spawn interface writer thread");
166
167 (
168 Box::new(AsyncWriter {
169 tx,
170 metrics: metrics.clone(),
171 }),
172 metrics,
173 )
174}
175
176fn async_writer_loop(
177 mut writer: Box<dyn Writer>,
178 rx: std::sync::mpsc::Receiver<Vec<u8>>,
179 interface_id: InterfaceId,
180 interface_name: String,
181 event_tx: EventSender,
182 metrics: AsyncWriterMetrics,
183) {
184 while let Ok(frame) = rx.recv() {
185 metrics.queued_frames.fetch_sub(1, Ordering::Relaxed);
186 if let Err(err) = writer.send_frame(&frame) {
187 metrics.worker_alive.store(false, Ordering::Relaxed);
188 log::warn!(
189 "[{}:{}] async writer exiting after send failure: {}",
190 interface_name,
191 interface_id.0,
192 err
193 );
194 let _ = event_tx.send(crate::event::Event::InterfaceDown(interface_id));
195 return;
196 }
197 }
198
199 metrics.worker_alive.store(false, Ordering::Relaxed);
200}
201
202pub use crate::common::interface_stats::{InterfaceStats, ANNOUNCE_SAMPLE_MAX};
203
204use crate::common::management::InterfaceStatusView;
205
206pub struct InterfaceEntry {
208 pub id: InterfaceId,
209 pub info: InterfaceInfo,
210 pub writer: Box<dyn Writer>,
211 pub async_writer_metrics: Option<AsyncWriterMetrics>,
212 pub enabled: bool,
214 pub online: bool,
215 pub dynamic: bool,
218 pub ifac: Option<IfacState>,
220 pub stats: InterfaceStats,
222 pub interface_type: String,
224 pub send_retry_at: Option<Instant>,
226 pub send_retry_backoff: Duration,
228}
229
230pub enum StartResult {
232 Simple {
234 id: InterfaceId,
235 info: InterfaceInfo,
236 writer: Box<dyn Writer>,
237 interface_type_name: String,
238 },
239 Listener { control: Option<ListenerControl> },
241 Multi(Vec<SubInterface>),
243}
244
245pub struct SubInterface {
247 pub id: InterfaceId,
248 pub info: InterfaceInfo,
249 pub writer: Box<dyn Writer>,
250 pub interface_type_name: String,
251}
252
253pub struct StartContext {
255 pub tx: EventSender,
256 pub next_dynamic_id: Arc<AtomicU64>,
257 pub mode: u8,
258 pub ingress_control: rns_core::transport::types::IngressControlConfig,
259}
260
261pub trait InterfaceConfigData: Send + Any {
263 fn as_any(&self) -> &dyn Any;
264 fn into_any(self: Box<Self>) -> Box<dyn Any>;
265}
266
267impl<T: Send + 'static> InterfaceConfigData for T {
268 fn as_any(&self) -> &dyn Any {
269 self
270 }
271
272 fn into_any(self: Box<Self>) -> Box<dyn Any> {
273 self
274 }
275}
276
277pub trait InterfaceFactory: Send + Sync {
279 fn type_name(&self) -> &str;
281
282 fn default_ifac_size(&self) -> usize {
284 16
285 }
286
287 fn parse_config(
289 &self,
290 name: &str,
291 id: InterfaceId,
292 params: &HashMap<String, String>,
293 ) -> Result<Box<dyn InterfaceConfigData>, String>;
294
295 fn start(
297 &self,
298 config: Box<dyn InterfaceConfigData>,
299 ctx: StartContext,
300 ) -> io::Result<StartResult>;
301}
302
303impl InterfaceStatusView for InterfaceEntry {
304 fn id(&self) -> InterfaceId {
305 self.id
306 }
307 fn info(&self) -> &InterfaceInfo {
308 &self.info
309 }
310 fn online(&self) -> bool {
311 self.online
312 }
313 fn stats(&self) -> &InterfaceStats {
314 &self.stats
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use crate::event::Event;
322 use rns_core::constants;
323 use std::sync::mpsc;
324
325 struct MockWriter {
326 sent: Vec<Vec<u8>>,
327 }
328
329 impl MockWriter {
330 fn new() -> Self {
331 MockWriter { sent: Vec::new() }
332 }
333 }
334
335 impl Writer for MockWriter {
336 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
337 self.sent.push(data.to_vec());
338 Ok(())
339 }
340 }
341
342 #[test]
343 fn interface_entry_construction() {
344 let entry = InterfaceEntry {
345 id: InterfaceId(1),
346 info: InterfaceInfo {
347 id: InterfaceId(1),
348 name: String::new(),
349 mode: constants::MODE_FULL,
350 out_capable: true,
351 in_capable: true,
352 bitrate: None,
353 announce_rate_target: None,
354 announce_rate_grace: 0,
355 announce_rate_penalty: 0.0,
356 announce_cap: constants::ANNOUNCE_CAP,
357 is_local_client: false,
358 wants_tunnel: false,
359 tunnel_id: None,
360 mtu: constants::MTU as u32,
361 ia_freq: 0.0,
362 started: 0.0,
363 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
364 },
365 writer: Box::new(MockWriter::new()),
366 async_writer_metrics: None,
367 enabled: true,
368 online: false,
369 dynamic: false,
370 ifac: None,
371 stats: InterfaceStats::default(),
372 interface_type: String::new(),
373 send_retry_at: None,
374 send_retry_backoff: Duration::ZERO,
375 };
376 assert_eq!(entry.id, InterfaceId(1));
377 assert!(!entry.online);
378 assert!(!entry.dynamic);
379 }
380
381 #[test]
382 fn mock_writer_captures_bytes() {
383 let mut writer = MockWriter::new();
384 writer.send_frame(b"hello").unwrap();
385 writer.send_frame(b"world").unwrap();
386 assert_eq!(writer.sent.len(), 2);
387 assert_eq!(writer.sent[0], b"hello");
388 assert_eq!(writer.sent[1], b"world");
389 }
390
391 #[test]
392 fn writer_send_frame_produces_output() {
393 let mut writer = MockWriter::new();
394 let data = vec![0x01, 0x02, 0x03];
395 writer.send_frame(&data).unwrap();
396 assert_eq!(writer.sent[0], data);
397 }
398
399 struct BlockingWriter {
400 entered_tx: mpsc::Sender<()>,
401 release_rx: mpsc::Receiver<()>,
402 }
403
404 impl Writer for BlockingWriter {
405 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
406 let _ = self.entered_tx.send(());
407 let _ = self.release_rx.recv();
408 Ok(())
409 }
410 }
411
412 struct FailingWriter;
413
414 impl Writer for FailingWriter {
415 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
416 Err(io::Error::new(io::ErrorKind::BrokenPipe, "boom"))
417 }
418 }
419
420 #[test]
421 fn async_writer_returns_wouldblock_when_queue_is_full() {
422 let (event_tx, _event_rx) = crate::event::channel();
423 let (entered_tx, entered_rx) = mpsc::channel();
424 let (release_tx, release_rx) = mpsc::channel();
425 let (mut writer, metrics) = wrap_async_writer(
426 Box::new(BlockingWriter {
427 entered_tx,
428 release_rx,
429 }),
430 InterfaceId(7),
431 "test",
432 event_tx,
433 1,
434 );
435
436 writer.send_frame(&[1]).unwrap();
437 entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
438 writer.send_frame(&[2]).unwrap();
439 assert_eq!(metrics.queued_frames(), 1);
440 let err = writer.send_frame(&[3]).unwrap_err();
441 assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
442
443 let _ = release_tx.send(());
444 }
445
446 #[test]
447 fn async_writer_reports_interface_down_after_worker_failure() {
448 let (event_tx, event_rx) = crate::event::channel();
449 let (mut writer, metrics) =
450 wrap_async_writer(Box::new(FailingWriter), InterfaceId(9), "fail", event_tx, 2);
451
452 writer.send_frame(&[1]).unwrap();
453 let event = event_rx.recv_timeout(Duration::from_secs(1)).unwrap();
454 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9))));
455 assert!(!metrics.worker_alive());
456 }
457}