Skip to main content

rns_net/interface/
udp.rs

1//! UDP broadcast interface.
2//!
3//! Connectionless, no HDLC framing — each UDP datagram is one packet.
4//! Matches Python `UDPInterface` from `UDPInterface.py`.
5
6use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::{lock_or_recover, Writer};
15
16/// Configuration for a UDP interface.
17#[derive(Debug, Clone)]
18pub struct UdpConfig {
19    pub name: String,
20    pub listen_ip: Option<String>,
21    pub listen_port: Option<u16>,
22    pub forward_ip: Option<String>,
23    pub forward_port: Option<u16>,
24    pub interface_id: InterfaceId,
25    pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30    pub forward_ip: Option<String>,
31    pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35    pub fn from_config(config: &UdpConfig) -> Self {
36        Self {
37            forward_ip: config.forward_ip.clone(),
38            forward_port: config.forward_port,
39        }
40    }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45    pub interface_name: String,
46    pub runtime: Arc<Mutex<UdpRuntime>>,
47    pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51    fn default() -> Self {
52        let mut config = UdpConfig {
53            name: String::new(),
54            listen_ip: None,
55            listen_port: None,
56            forward_ip: None,
57            forward_port: None,
58            interface_id: InterfaceId(0),
59            runtime: Arc::new(Mutex::new(UdpRuntime {
60                forward_ip: None,
61                forward_port: None,
62            })),
63        };
64        let startup = UdpRuntime::from_config(&config);
65        config.runtime = Arc::new(Mutex::new(startup));
66        config
67    }
68}
69
70/// Writer that sends raw data via UDP to a target address.
71struct UdpWriter {
72    socket: UdpSocket,
73    runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        let runtime = lock_or_recover(&self.runtime, "udp runtime").clone();
79        let target = match (runtime.forward_ip, runtime.forward_port) {
80            (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81                .parse::<SocketAddr>()
82                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83            _ => {
84                return Err(io::Error::new(
85                    io::ErrorKind::Other,
86                    "UDP interface has no forward target configured",
87                ))
88            }
89        };
90        self.socket.send_to(data, target)?;
91        Ok(())
92    }
93}
94
95/// Start a UDP interface. Spawns a reader thread if listen_ip/port are set.
96/// Returns a writer if forward_ip/port are set.
97pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98    let id = config.interface_id;
99    {
100        let startup = UdpRuntime::from_config(&config);
101        *lock_or_recover(&config.runtime, "udp runtime") = startup;
102    }
103    let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104    send_socket.set_broadcast(true)?;
105    let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106        socket: send_socket,
107        runtime: Arc::clone(&config.runtime),
108    }));
109
110    // Create reader socket if listen params are set
111    if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112        let bind_addr = format!("{}:{}", bind_ip, bind_port);
113        let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115        log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117        // Signal interface is up
118        let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120        let name = config.name.clone();
121        thread::Builder::new()
122            .name(format!("udp-reader-{}", id.0))
123            .spawn(move || {
124                udp_reader_loop(recv_socket, id, name, tx);
125            })?;
126    }
127
128    Ok(writer)
129}
130
131/// Reader thread: receives UDP datagrams and sends them as frames.
132fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133    let mut buf = [0u8; 2048];
134
135    loop {
136        match socket.recv_from(&mut buf) {
137            Ok((n, _src)) => {
138                if tx
139                    .send(Event::Frame {
140                        interface_id: id,
141                        data: buf[..n].to_vec(),
142                    })
143                    .is_err()
144                {
145                    // Driver shut down
146                    return;
147                }
148            }
149            Err(e) => {
150                log::warn!("[{}] recv error: {}", name, e);
151                let _ = tx.send(Event::InterfaceDown(id));
152                return;
153            }
154        }
155    }
156}
157
158// --- Factory implementation ---
159
160use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
161use rns_core::transport::types::InterfaceInfo;
162use std::collections::HashMap;
163
164/// Factory for `UDPInterface`.
165pub struct UdpFactory;
166
167impl InterfaceFactory for UdpFactory {
168    fn type_name(&self) -> &str {
169        "UDPInterface"
170    }
171
172    fn parse_config(
173        &self,
174        name: &str,
175        id: InterfaceId,
176        params: &HashMap<String, String>,
177    ) -> Result<Box<dyn InterfaceConfigData>, String> {
178        let listen_ip = params.get("listen_ip").cloned();
179
180        // 'port' is a shorthand that sets both listen_port and forward_port
181        let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
182
183        let listen_port: Option<u16> = params
184            .get("listen_port")
185            .and_then(|v| v.parse().ok())
186            .or(port_shorthand);
187
188        let forward_ip = params.get("forward_ip").cloned();
189
190        let forward_port: Option<u16> = params
191            .get("forward_port")
192            .and_then(|v| v.parse().ok())
193            .or(port_shorthand);
194
195        let mut config = UdpConfig {
196            name: name.to_string(),
197            listen_ip,
198            listen_port,
199            forward_ip,
200            forward_port,
201            interface_id: id,
202            runtime: Arc::new(Mutex::new(UdpRuntime {
203                forward_ip: None,
204                forward_port: None,
205            })),
206        };
207        let startup = UdpRuntime::from_config(&config);
208        config.runtime = Arc::new(Mutex::new(startup));
209        Ok(Box::new(config))
210    }
211
212    fn start(
213        &self,
214        config: Box<dyn InterfaceConfigData>,
215        ctx: StartContext,
216    ) -> io::Result<StartResult> {
217        let udp_config = *config
218            .into_any()
219            .downcast::<UdpConfig>()
220            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
221
222        let id = udp_config.interface_id;
223        let name = udp_config.name.clone();
224        let out_capable = udp_config.forward_ip.is_some();
225        let in_capable = udp_config.listen_ip.is_some();
226
227        let info = InterfaceInfo {
228            id,
229            name,
230            mode: ctx.mode,
231            out_capable,
232            in_capable,
233            bitrate: Some(10_000_000),
234            airtime_profile: None,
235            announce_rate_target: None,
236            announce_rate_grace: 0,
237            announce_rate_penalty: 0.0,
238            announce_cap: rns_core::constants::ANNOUNCE_CAP,
239            is_local_client: false,
240            wants_tunnel: false,
241            tunnel_id: None,
242            mtu: 1400,
243            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
244            ia_freq: 0.0,
245            started: crate::time::now(),
246        };
247
248        let maybe_writer = start(udp_config, ctx.tx)?;
249
250        let writer: Box<dyn Writer> = maybe_writer
251            .ok_or_else(|| io::Error::other("UDPInterface did not provide a writer"))?;
252
253        Ok(StartResult::Simple {
254            id,
255            info,
256            writer,
257            interface_type_name: "UDPInterface".to_string(),
258        })
259    }
260}
261
262pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
263    UdpRuntimeConfigHandle {
264        interface_name: config.name.clone(),
265        runtime: Arc::clone(&config.runtime),
266        startup: UdpRuntime::from_config(config),
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use std::net::UdpSocket;
274    use std::time::Duration;
275
276    fn find_free_port() -> u16 {
277        std::net::TcpListener::bind("127.0.0.1:0")
278            .unwrap()
279            .local_addr()
280            .unwrap()
281            .port()
282    }
283
284    #[test]
285    fn bind_and_receive() {
286        let port = find_free_port();
287        let (tx, rx) = crate::event::channel();
288
289        let config = UdpConfig {
290            name: "test-udp".into(),
291            listen_ip: Some("127.0.0.1".into()),
292            listen_port: Some(port),
293            forward_ip: None,
294            forward_port: None,
295            interface_id: InterfaceId(10),
296            ..UdpConfig::default()
297        };
298
299        let _writer = start(config, tx).unwrap();
300
301        // Drain InterfaceUp
302        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
303
304        // Send a UDP packet to the listener
305        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
306        let payload = b"hello udp";
307        sender
308            .send_to(payload, format!("127.0.0.1:{}", port))
309            .unwrap();
310
311        // Should receive Frame event
312        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
313        match event {
314            Event::Frame { interface_id, data } => {
315                assert_eq!(interface_id, InterfaceId(10));
316                assert_eq!(data, payload);
317            }
318            other => panic!("expected Frame, got {:?}", other),
319        }
320    }
321
322    #[test]
323    fn send_broadcast() {
324        let recv_port = find_free_port();
325        let (tx, _rx) = crate::event::channel();
326
327        let config = UdpConfig {
328            name: "test-udp-send".into(),
329            listen_ip: None,
330            listen_port: None,
331            forward_ip: Some("127.0.0.1".into()),
332            forward_port: Some(recv_port),
333            interface_id: InterfaceId(11),
334            ..UdpConfig::default()
335        };
336
337        let writer = start(config, tx).unwrap();
338        let mut writer = writer.unwrap();
339
340        // Bind a receiver
341        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
342        receiver
343            .set_read_timeout(Some(Duration::from_secs(2)))
344            .unwrap();
345
346        // Send via writer
347        let payload = b"broadcast data";
348        writer.send_frame(payload).unwrap();
349
350        // Receive on the other socket
351        let mut buf = [0u8; 256];
352        let (n, _) = receiver.recv_from(&mut buf).unwrap();
353        assert_eq!(&buf[..n], payload);
354    }
355
356    #[test]
357    fn round_trip() {
358        let listen_port = find_free_port();
359        let forward_port = find_free_port();
360        let (tx, rx) = crate::event::channel();
361
362        let config = UdpConfig {
363            name: "test-udp-rt".into(),
364            listen_ip: Some("127.0.0.1".into()),
365            listen_port: Some(listen_port),
366            forward_ip: Some("127.0.0.1".into()),
367            forward_port: Some(forward_port),
368            interface_id: InterfaceId(12),
369            ..UdpConfig::default()
370        };
371
372        let writer = start(config, tx).unwrap();
373        assert!(writer.is_some());
374
375        // Drain InterfaceUp
376        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
377
378        // Send to the listener
379        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
380        sender
381            .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
382            .unwrap();
383
384        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
385        match event {
386            Event::Frame { data, .. } => assert_eq!(data, b"ping"),
387            other => panic!("expected Frame, got {:?}", other),
388        }
389    }
390
391    #[test]
392    fn multiple_datagrams() {
393        let port = find_free_port();
394        let (tx, rx) = crate::event::channel();
395
396        let config = UdpConfig {
397            name: "test-udp-multi".into(),
398            listen_ip: Some("127.0.0.1".into()),
399            listen_port: Some(port),
400            forward_ip: None,
401            forward_port: None,
402            interface_id: InterfaceId(13),
403            ..UdpConfig::default()
404        };
405
406        let _writer = start(config, tx).unwrap();
407
408        // Drain InterfaceUp
409        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
410
411        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
412        for i in 0..5u8 {
413            sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
414        }
415
416        for i in 0..5u8 {
417            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
418            match event {
419                Event::Frame { data, .. } => assert_eq!(data, vec![i]),
420                other => panic!("expected Frame, got {:?}", other),
421            }
422        }
423    }
424
425    #[test]
426    fn writer_send_to() {
427        let recv_port = find_free_port();
428
429        // Bind receiver first
430        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
431        receiver
432            .set_read_timeout(Some(Duration::from_secs(2)))
433            .unwrap();
434
435        // Create writer directly
436        let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
437        send_socket.set_broadcast(true).unwrap();
438        let mut writer = UdpWriter {
439            socket: send_socket,
440            runtime: Arc::new(Mutex::new(UdpRuntime {
441                forward_ip: Some("127.0.0.1".into()),
442                forward_port: Some(recv_port),
443            })),
444        };
445
446        let payload = vec![0xAA, 0xBB, 0xCC];
447        writer.send_frame(&payload).unwrap();
448
449        let mut buf = [0u8; 256];
450        let (n, _) = receiver.recv_from(&mut buf).unwrap();
451        assert_eq!(&buf[..n], &payload);
452    }
453}