1#[cfg(feature = "iface-auto")]
4pub mod auto;
5#[cfg(feature = "iface-backbone")]
6pub mod backbone;
7#[cfg(feature = "iface-i2p")]
8pub mod i2p;
9#[cfg(feature = "iface-kiss")]
10pub mod kiss_iface;
11#[cfg(feature = "iface-local")]
12pub mod local;
13#[cfg(feature = "iface-pipe")]
14pub mod pipe;
15pub mod registry;
16#[cfg(feature = "iface-rnode")]
17pub mod rnode;
18#[cfg(feature = "iface-serial")]
19pub mod serial_iface;
20#[cfg(feature = "iface-tcp")]
21pub mod tcp;
22#[cfg(feature = "iface-tcp")]
23pub mod tcp_server;
24#[cfg(feature = "iface-udp")]
25pub mod udp;
26
27use std::any::Any;
28use std::collections::HashMap;
29use std::io;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
32use std::sync::{Arc, Mutex, MutexGuard};
33use std::thread;
34use std::time::{Duration, Instant};
35
36use crate::event::EventSender;
37use crate::ifac::IfacState;
38use rns_core::transport::types::{InterfaceId, InterfaceInfo};
39
40#[cfg(target_os = "linux")]
44pub fn bind_to_device(fd: std::os::unix::io::RawFd, device: &str) -> io::Result<()> {
45 let dev_bytes = device.as_bytes();
46 if dev_bytes.len() >= libc::IFNAMSIZ {
47 return Err(io::Error::new(
48 io::ErrorKind::InvalidInput,
49 format!("device name too long: {}", device),
50 ));
51 }
52 let ret = unsafe {
53 libc::setsockopt(
54 fd,
55 libc::SOL_SOCKET,
56 libc::SO_BINDTODEVICE,
57 dev_bytes.as_ptr() as *const libc::c_void,
58 dev_bytes.len() as libc::socklen_t,
59 )
60 };
61 if ret != 0 {
62 return Err(io::Error::last_os_error());
63 }
64 Ok(())
65}
66
67pub trait Writer: Send {
71 fn send_frame(&mut self, data: &[u8]) -> io::Result<()>;
72}
73
74pub const DEFAULT_ASYNC_WRITER_QUEUE_CAPACITY: usize = 256;
75
76pub(crate) fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>, label: &str) -> MutexGuard<'a, T> {
77 match mutex.lock() {
78 Ok(guard) => guard,
79 Err(poisoned) => {
80 log::warn!("recovering poisoned mutex: {}", label);
81 poisoned.into_inner()
82 }
83 }
84}
85
86#[derive(Clone, Default)]
87pub struct ListenerControl {
88 stop: Arc<AtomicBool>,
89}
90
91impl ListenerControl {
92 pub fn new() -> Self {
93 Self {
94 stop: Arc::new(AtomicBool::new(false)),
95 }
96 }
97
98 pub fn request_stop(&self) {
99 self.stop.store(true, Ordering::Relaxed);
100 }
101
102 pub fn should_stop(&self) -> bool {
103 self.stop.load(Ordering::Relaxed)
104 }
105}
106
107#[derive(Clone, Default)]
108pub struct AsyncWriterMetrics {
109 queued_frames: Arc<AtomicUsize>,
110 worker_alive: Arc<AtomicBool>,
111}
112
113impl AsyncWriterMetrics {
114 pub fn queued_frames(&self) -> usize {
115 self.queued_frames.load(Ordering::Relaxed)
116 }
117
118 pub fn worker_alive(&self) -> bool {
119 self.worker_alive.load(Ordering::Relaxed)
120 }
121}
122
123struct AsyncWriter {
124 tx: SyncSender<Vec<u8>>,
125 metrics: AsyncWriterMetrics,
126}
127
128impl Writer for AsyncWriter {
129 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
130 if !self.metrics.worker_alive() {
131 return Err(io::Error::new(
132 io::ErrorKind::BrokenPipe,
133 "interface writer worker is offline",
134 ));
135 }
136
137 match self.tx.try_send(data.to_vec()) {
138 Ok(()) => {
139 self.metrics.queued_frames.fetch_add(1, Ordering::Relaxed);
140 Ok(())
141 }
142 Err(TrySendError::Full(_)) => Err(io::Error::new(
143 io::ErrorKind::WouldBlock,
144 "interface writer queue is full",
145 )),
146 Err(TrySendError::Disconnected(_)) => {
147 self.metrics.worker_alive.store(false, Ordering::Relaxed);
148 Err(io::Error::new(
149 io::ErrorKind::BrokenPipe,
150 "interface writer worker disconnected",
151 ))
152 }
153 }
154 }
155}
156
157pub fn wrap_async_writer(
158 writer: Box<dyn Writer>,
159 interface_id: InterfaceId,
160 interface_name: &str,
161 event_tx: EventSender,
162 queue_capacity: usize,
163) -> (Box<dyn Writer>, AsyncWriterMetrics) {
164 let (tx, rx) = sync_channel::<Vec<u8>>(queue_capacity.max(1));
165 let metrics = AsyncWriterMetrics {
166 queued_frames: Arc::new(AtomicUsize::new(0)),
167 worker_alive: Arc::new(AtomicBool::new(true)),
168 };
169 let metrics_thread = metrics.clone();
170 let name = interface_name.to_string();
171
172 let spawn_result = thread::Builder::new()
173 .name(format!("iface-writer-{}", interface_id.0))
174 .spawn(move || async_writer_loop(writer, rx, interface_id, name, event_tx, metrics_thread));
175
176 if let Err(err) = spawn_result {
177 metrics.worker_alive.store(false, Ordering::Relaxed);
178 log::error!(
179 "[{}:{}] failed to spawn async writer thread: {}",
180 interface_name,
181 interface_id.0,
182 err
183 );
184 return (Box::new(DirectWriterFallback), metrics);
185 }
186
187 (
188 Box::new(AsyncWriter {
189 tx,
190 metrics: metrics.clone(),
191 }),
192 metrics,
193 )
194}
195
196struct DirectWriterFallback;
197
198impl Writer for DirectWriterFallback {
199 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
200 Err(io::Error::other("interface writer worker unavailable"))
201 }
202}
203
204fn async_writer_loop(
205 mut writer: Box<dyn Writer>,
206 rx: std::sync::mpsc::Receiver<Vec<u8>>,
207 interface_id: InterfaceId,
208 interface_name: String,
209 event_tx: EventSender,
210 metrics: AsyncWriterMetrics,
211) {
212 while let Ok(frame) = rx.recv() {
213 metrics.queued_frames.fetch_sub(1, Ordering::Relaxed);
214 if let Err(err) = writer.send_frame(&frame) {
215 metrics.worker_alive.store(false, Ordering::Relaxed);
216 log::warn!(
217 "[{}:{}] async writer exiting after send failure: {}",
218 interface_name,
219 interface_id.0,
220 err
221 );
222 let _ = event_tx.send(crate::event::Event::InterfaceDown(interface_id));
223 return;
224 }
225 }
226
227 metrics.worker_alive.store(false, Ordering::Relaxed);
228}
229
230pub use crate::common::interface_stats::{InterfaceStats, ANNOUNCE_SAMPLE_MAX};
231
232use crate::common::management::InterfaceStatusView;
233
234pub struct InterfaceEntry {
236 pub id: InterfaceId,
237 pub info: InterfaceInfo,
238 pub writer: Box<dyn Writer>,
239 pub async_writer_metrics: Option<AsyncWriterMetrics>,
240 pub enabled: bool,
242 pub online: bool,
243 pub dynamic: bool,
246 pub ifac: Option<IfacState>,
248 pub stats: InterfaceStats,
250 pub interface_type: String,
252 pub send_retry_at: Option<Instant>,
254 pub send_retry_backoff: Duration,
256}
257
258pub enum StartResult {
260 Simple {
262 id: InterfaceId,
263 info: InterfaceInfo,
264 writer: Box<dyn Writer>,
265 interface_type_name: String,
266 },
267 Listener { control: Option<ListenerControl> },
269 Multi(Vec<SubInterface>),
271}
272
273pub struct SubInterface {
275 pub id: InterfaceId,
276 pub info: InterfaceInfo,
277 pub writer: Box<dyn Writer>,
278 pub interface_type_name: String,
279}
280
281pub struct StartContext {
283 pub tx: EventSender,
284 pub next_dynamic_id: Arc<AtomicU64>,
285 pub mode: u8,
286 pub ingress_control: rns_core::transport::types::IngressControlConfig,
287}
288
289pub trait InterfaceConfigData: Send + Any {
291 fn as_any(&self) -> &dyn Any;
292 fn into_any(self: Box<Self>) -> Box<dyn Any>;
293}
294
295impl<T: Send + 'static> InterfaceConfigData for T {
296 fn as_any(&self) -> &dyn Any {
297 self
298 }
299
300 fn into_any(self: Box<Self>) -> Box<dyn Any> {
301 self
302 }
303}
304
305pub trait InterfaceFactory: Send + Sync {
307 fn type_name(&self) -> &str;
309
310 fn default_ifac_size(&self) -> usize {
312 16
313 }
314
315 fn parse_config(
317 &self,
318 name: &str,
319 id: InterfaceId,
320 params: &HashMap<String, String>,
321 ) -> Result<Box<dyn InterfaceConfigData>, String>;
322
323 fn start(
325 &self,
326 config: Box<dyn InterfaceConfigData>,
327 ctx: StartContext,
328 ) -> io::Result<StartResult>;
329}
330
331impl InterfaceStatusView for InterfaceEntry {
332 fn id(&self) -> InterfaceId {
333 self.id
334 }
335 fn info(&self) -> &InterfaceInfo {
336 &self.info
337 }
338 fn online(&self) -> bool {
339 self.online
340 }
341 fn stats(&self) -> &InterfaceStats {
342 &self.stats
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::event::Event;
350 use rns_core::constants;
351 use std::sync::mpsc;
352
353 struct MockWriter {
354 sent: Vec<Vec<u8>>,
355 }
356
357 impl MockWriter {
358 fn new() -> Self {
359 MockWriter { sent: Vec::new() }
360 }
361 }
362
363 impl Writer for MockWriter {
364 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
365 self.sent.push(data.to_vec());
366 Ok(())
367 }
368 }
369
370 #[test]
371 fn interface_entry_construction() {
372 let entry = InterfaceEntry {
373 id: InterfaceId(1),
374 info: InterfaceInfo {
375 id: InterfaceId(1),
376 name: String::new(),
377 mode: constants::MODE_FULL,
378 out_capable: true,
379 in_capable: true,
380 bitrate: None,
381 airtime_profile: None,
382 announce_rate_target: None,
383 announce_rate_grace: 0,
384 announce_rate_penalty: 0.0,
385 announce_cap: constants::ANNOUNCE_CAP,
386 is_local_client: false,
387 wants_tunnel: false,
388 tunnel_id: None,
389 mtu: constants::MTU as u32,
390 ia_freq: 0.0,
391 started: 0.0,
392 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
393 },
394 writer: Box::new(MockWriter::new()),
395 async_writer_metrics: None,
396 enabled: true,
397 online: false,
398 dynamic: false,
399 ifac: None,
400 stats: InterfaceStats::default(),
401 interface_type: String::new(),
402 send_retry_at: None,
403 send_retry_backoff: Duration::ZERO,
404 };
405 assert_eq!(entry.id, InterfaceId(1));
406 assert!(!entry.online);
407 assert!(!entry.dynamic);
408 }
409
410 #[test]
411 fn mock_writer_captures_bytes() {
412 let mut writer = MockWriter::new();
413 writer.send_frame(b"hello").unwrap();
414 writer.send_frame(b"world").unwrap();
415 assert_eq!(writer.sent.len(), 2);
416 assert_eq!(writer.sent[0], b"hello");
417 assert_eq!(writer.sent[1], b"world");
418 }
419
420 #[test]
421 fn writer_send_frame_produces_output() {
422 let mut writer = MockWriter::new();
423 let data = vec![0x01, 0x02, 0x03];
424 writer.send_frame(&data).unwrap();
425 assert_eq!(writer.sent[0], data);
426 }
427
428 struct BlockingWriter {
429 entered_tx: mpsc::Sender<()>,
430 release_rx: mpsc::Receiver<()>,
431 }
432
433 impl Writer for BlockingWriter {
434 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
435 let _ = self.entered_tx.send(());
436 let _ = self.release_rx.recv();
437 Ok(())
438 }
439 }
440
441 struct FailingWriter;
442
443 impl Writer for FailingWriter {
444 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
445 Err(io::Error::new(io::ErrorKind::BrokenPipe, "boom"))
446 }
447 }
448
449 #[test]
450 fn async_writer_returns_wouldblock_when_queue_is_full() {
451 let (event_tx, _event_rx) = crate::event::channel();
452 let (entered_tx, entered_rx) = mpsc::channel();
453 let (release_tx, release_rx) = mpsc::channel();
454 let (mut writer, metrics) = wrap_async_writer(
455 Box::new(BlockingWriter {
456 entered_tx,
457 release_rx,
458 }),
459 InterfaceId(7),
460 "test",
461 event_tx,
462 1,
463 );
464
465 writer.send_frame(&[1]).unwrap();
466 entered_rx.recv_timeout(Duration::from_secs(1)).unwrap();
467 writer.send_frame(&[2]).unwrap();
468 assert_eq!(metrics.queued_frames(), 1);
469 let err = writer.send_frame(&[3]).unwrap_err();
470 assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
471
472 let _ = release_tx.send(());
473 }
474
475 #[test]
476 fn async_writer_reports_interface_down_after_worker_failure() {
477 let (event_tx, event_rx) = crate::event::channel();
478 let (mut writer, metrics) =
479 wrap_async_writer(Box::new(FailingWriter), InterfaceId(9), "fail", event_tx, 2);
480
481 writer.send_frame(&[1]).unwrap();
482 let event = event_rx.recv_timeout(Duration::from_secs(1)).unwrap();
483 assert!(matches!(event, Event::InterfaceDown(InterfaceId(9))));
484 assert!(!metrics.worker_alive());
485 }
486}