Skip to main content

rns_net/interface/
udp.rs

1//! UDP broadcast interface.
2//!
3//! Connectionless, no HDLC framing — each UDP datagram is one packet.
4//! Matches Python `UDPInterface` from `UDPInterface.py`.
5
6use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::{lock_or_recover, Writer};
15
16/// Configuration for a UDP interface.
17#[derive(Debug, Clone)]
18pub struct UdpConfig {
19    pub name: String,
20    pub listen_ip: Option<String>,
21    pub listen_port: Option<u16>,
22    pub forward_ip: Option<String>,
23    pub forward_port: Option<u16>,
24    pub interface_id: InterfaceId,
25    pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30    pub forward_ip: Option<String>,
31    pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35    pub fn from_config(config: &UdpConfig) -> Self {
36        Self {
37            forward_ip: config.forward_ip.clone(),
38            forward_port: config.forward_port,
39        }
40    }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45    pub interface_name: String,
46    pub runtime: Arc<Mutex<UdpRuntime>>,
47    pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51    fn default() -> Self {
52        let mut config = UdpConfig {
53            name: String::new(),
54            listen_ip: None,
55            listen_port: None,
56            forward_ip: None,
57            forward_port: None,
58            interface_id: InterfaceId(0),
59            runtime: Arc::new(Mutex::new(UdpRuntime {
60                forward_ip: None,
61                forward_port: None,
62            })),
63        };
64        let startup = UdpRuntime::from_config(&config);
65        config.runtime = Arc::new(Mutex::new(startup));
66        config
67    }
68}
69
70/// Writer that sends raw data via UDP to a target address.
71struct UdpWriter {
72    socket: UdpSocket,
73    runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        let runtime = lock_or_recover(&self.runtime, "udp runtime").clone();
79        let target = match (runtime.forward_ip, runtime.forward_port) {
80            (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81                .parse::<SocketAddr>()
82                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83            _ => {
84                return Err(io::Error::new(
85                    io::ErrorKind::Other,
86                    "UDP interface has no forward target configured",
87                ))
88            }
89        };
90        self.socket.send_to(data, target)?;
91        Ok(())
92    }
93}
94
95/// Start a UDP interface. Spawns a reader thread if listen_ip/port are set.
96/// Returns a writer if forward_ip/port are set.
97pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98    let id = config.interface_id;
99    {
100        let startup = UdpRuntime::from_config(&config);
101        *lock_or_recover(&config.runtime, "udp runtime") = startup;
102    }
103    let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104    send_socket.set_broadcast(true)?;
105    let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106        socket: send_socket,
107        runtime: Arc::clone(&config.runtime),
108    }));
109
110    // Create reader socket if listen params are set
111    if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112        let bind_addr = format!("{}:{}", bind_ip, bind_port);
113        let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115        log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117        // Signal interface is up
118        let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120        let name = config.name.clone();
121        thread::Builder::new()
122            .name(format!("udp-reader-{}", id.0))
123            .spawn(move || {
124                udp_reader_loop(recv_socket, id, name, tx);
125            })?;
126    }
127
128    Ok(writer)
129}
130
131/// Reader thread: receives UDP datagrams and sends them as frames.
132fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133    let mut buf = [0u8; 2048];
134
135    loop {
136        match socket.recv_from(&mut buf) {
137            Ok((n, _src)) => {
138                if tx
139                    .send(Event::Frame {
140                        interface_id: id,
141                        data: buf[..n].to_vec(),
142                        rssi: None,
143                        snr: None,
144                    })
145                    .is_err()
146                {
147                    // Driver shut down
148                    return;
149                }
150            }
151            Err(e) => {
152                log::warn!("[{}] recv error: {}", name, e);
153                let _ = tx.send(Event::InterfaceDown(id));
154                return;
155            }
156        }
157    }
158}
159
160// --- Factory implementation ---
161
162use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
163use rns_core::transport::types::InterfaceInfo;
164use std::collections::HashMap;
165
166/// Factory for `UDPInterface`.
167pub struct UdpFactory;
168
169impl InterfaceFactory for UdpFactory {
170    fn type_name(&self) -> &str {
171        "UDPInterface"
172    }
173
174    fn parse_config(
175        &self,
176        name: &str,
177        id: InterfaceId,
178        params: &HashMap<String, String>,
179    ) -> Result<Box<dyn InterfaceConfigData>, String> {
180        let listen_ip = params.get("listen_ip").cloned();
181
182        // 'port' is a shorthand that sets both listen_port and forward_port
183        let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
184
185        let listen_port: Option<u16> = params
186            .get("listen_port")
187            .and_then(|v| v.parse().ok())
188            .or(port_shorthand);
189
190        let forward_ip = params.get("forward_ip").cloned();
191
192        let forward_port: Option<u16> = params
193            .get("forward_port")
194            .and_then(|v| v.parse().ok())
195            .or(port_shorthand);
196
197        let mut config = UdpConfig {
198            name: name.to_string(),
199            listen_ip,
200            listen_port,
201            forward_ip,
202            forward_port,
203            interface_id: id,
204            runtime: Arc::new(Mutex::new(UdpRuntime {
205                forward_ip: None,
206                forward_port: None,
207            })),
208        };
209        let startup = UdpRuntime::from_config(&config);
210        config.runtime = Arc::new(Mutex::new(startup));
211        Ok(Box::new(config))
212    }
213
214    fn start(
215        &self,
216        config: Box<dyn InterfaceConfigData>,
217        ctx: StartContext,
218    ) -> io::Result<StartResult> {
219        let udp_config = *config
220            .into_any()
221            .downcast::<UdpConfig>()
222            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
223
224        let id = udp_config.interface_id;
225        let name = udp_config.name.clone();
226        let out_capable = udp_config.forward_ip.is_some();
227        let in_capable = udp_config.listen_ip.is_some();
228
229        let info = InterfaceInfo {
230            id,
231            name,
232            mode: ctx.mode,
233            out_capable,
234            in_capable,
235            bitrate: Some(10_000_000),
236            airtime_profile: None,
237            announce_rate_target: None,
238            announce_rate_grace: 0,
239            announce_rate_penalty: 0.0,
240            announce_cap: rns_core::constants::ANNOUNCE_CAP,
241            is_local_client: false,
242            wants_tunnel: false,
243            tunnel_id: None,
244            mtu: 1400,
245            ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
246            ia_freq: 0.0,
247            ip_freq: 0.0,
248            op_freq: 0.0,
249            op_samples: 0,
250            started: crate::time::now(),
251        };
252
253        let maybe_writer = start(udp_config, ctx.tx)?;
254
255        let writer: Box<dyn Writer> = maybe_writer
256            .ok_or_else(|| io::Error::other("UDPInterface did not provide a writer"))?;
257
258        Ok(StartResult::Simple {
259            id,
260            info,
261            writer,
262            interface_type_name: "UDPInterface".to_string(),
263        })
264    }
265}
266
267pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
268    UdpRuntimeConfigHandle {
269        interface_name: config.name.clone(),
270        runtime: Arc::clone(&config.runtime),
271        startup: UdpRuntime::from_config(config),
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use std::net::UdpSocket;
279    use std::time::Duration;
280
281    fn find_free_port() -> u16 {
282        UdpSocket::bind("127.0.0.1:0")
283            .unwrap()
284            .local_addr()
285            .unwrap()
286            .port()
287    }
288
289    #[test]
290    fn bind_and_receive() {
291        let port = find_free_port();
292        let (tx, rx) = crate::event::channel();
293
294        let config = UdpConfig {
295            name: "test-udp".into(),
296            listen_ip: Some("127.0.0.1".into()),
297            listen_port: Some(port),
298            forward_ip: None,
299            forward_port: None,
300            interface_id: InterfaceId(10),
301            ..UdpConfig::default()
302        };
303
304        let _writer = start(config, tx).unwrap();
305
306        // Drain InterfaceUp
307        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
308
309        // Send a UDP packet to the listener
310        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
311        let payload = b"hello udp";
312        sender
313            .send_to(payload, format!("127.0.0.1:{}", port))
314            .unwrap();
315
316        // Should receive Frame event
317        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
318        match event {
319            Event::Frame {
320                interface_id,
321                data,
322                rssi: _,
323                snr: _,
324            } => {
325                assert_eq!(interface_id, InterfaceId(10));
326                assert_eq!(data, payload);
327            }
328            other => panic!("expected Frame, got {:?}", other),
329        }
330    }
331
332    #[test]
333    fn send_broadcast() {
334        let recv_port = find_free_port();
335        let (tx, _rx) = crate::event::channel();
336
337        let config = UdpConfig {
338            name: "test-udp-send".into(),
339            listen_ip: None,
340            listen_port: None,
341            forward_ip: Some("127.0.0.1".into()),
342            forward_port: Some(recv_port),
343            interface_id: InterfaceId(11),
344            ..UdpConfig::default()
345        };
346
347        let writer = start(config, tx).unwrap();
348        let mut writer = writer.unwrap();
349
350        // Bind a receiver
351        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
352        receiver
353            .set_read_timeout(Some(Duration::from_secs(2)))
354            .unwrap();
355
356        // Send via writer
357        let payload = b"broadcast data";
358        writer.send_frame(payload).unwrap();
359
360        // Receive on the other socket
361        let mut buf = [0u8; 256];
362        let (n, _) = receiver.recv_from(&mut buf).unwrap();
363        assert_eq!(&buf[..n], payload);
364    }
365
366    #[test]
367    fn round_trip() {
368        let listen_port = find_free_port();
369        let forward_port = find_free_port();
370        let (tx, rx) = crate::event::channel();
371
372        let config = UdpConfig {
373            name: "test-udp-rt".into(),
374            listen_ip: Some("127.0.0.1".into()),
375            listen_port: Some(listen_port),
376            forward_ip: Some("127.0.0.1".into()),
377            forward_port: Some(forward_port),
378            interface_id: InterfaceId(12),
379            ..UdpConfig::default()
380        };
381
382        let writer = start(config, tx).unwrap();
383        assert!(writer.is_some());
384
385        // Drain InterfaceUp
386        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
387
388        // Send to the listener
389        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
390        sender
391            .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
392            .unwrap();
393
394        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
395        match event {
396            Event::Frame { data, .. } => assert_eq!(data, b"ping"),
397            other => panic!("expected Frame, got {:?}", other),
398        }
399    }
400
401    #[test]
402    fn multiple_datagrams() {
403        let port = find_free_port();
404        let (tx, rx) = crate::event::channel();
405
406        let config = UdpConfig {
407            name: "test-udp-multi".into(),
408            listen_ip: Some("127.0.0.1".into()),
409            listen_port: Some(port),
410            forward_ip: None,
411            forward_port: None,
412            interface_id: InterfaceId(13),
413            ..UdpConfig::default()
414        };
415
416        let _writer = start(config, tx).unwrap();
417
418        // Drain InterfaceUp
419        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
420
421        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
422        for i in 0..5u8 {
423            sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
424        }
425
426        for i in 0..5u8 {
427            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
428            match event {
429                Event::Frame { data, .. } => assert_eq!(data, vec![i]),
430                other => panic!("expected Frame, got {:?}", other),
431            }
432        }
433    }
434
435    #[test]
436    fn writer_send_to() {
437        let recv_port = find_free_port();
438
439        // Bind receiver first
440        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
441        receiver
442            .set_read_timeout(Some(Duration::from_secs(2)))
443            .unwrap();
444
445        // Create writer directly
446        let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
447        send_socket.set_broadcast(true).unwrap();
448        let mut writer = UdpWriter {
449            socket: send_socket,
450            runtime: Arc::new(Mutex::new(UdpRuntime {
451                forward_ip: Some("127.0.0.1".into()),
452                forward_port: Some(recv_port),
453            })),
454        };
455
456        let payload = vec![0xAA, 0xBB, 0xCC];
457        writer.send_frame(&payload).unwrap();
458
459        let mut buf = [0u8; 256];
460        let (n, _) = receiver.recv_from(&mut buf).unwrap();
461        assert_eq!(&buf[..n], &payload);
462    }
463}