Skip to main content

rns_net/interface/
udp.rs

1//! UDP broadcast interface.
2//!
3//! Connectionless, no HDLC framing — each UDP datagram is one packet.
4//! Matches Python `UDPInterface` from `UDPInterface.py`.
5
6use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::thread;
9
10use rns_core::transport::types::InterfaceId;
11
12use crate::event::{Event, EventSender};
13use crate::interface::Writer;
14
15/// Configuration for a UDP interface.
16#[derive(Debug, Clone)]
17pub struct UdpConfig {
18    pub name: String,
19    pub listen_ip: Option<String>,
20    pub listen_port: Option<u16>,
21    pub forward_ip: Option<String>,
22    pub forward_port: Option<u16>,
23    pub interface_id: InterfaceId,
24}
25
26impl Default for UdpConfig {
27    fn default() -> Self {
28        UdpConfig {
29            name: String::new(),
30            listen_ip: None,
31            listen_port: None,
32            forward_ip: None,
33            forward_port: None,
34            interface_id: InterfaceId(0),
35        }
36    }
37}
38
39/// Writer that sends raw data via UDP to a target address.
40struct UdpWriter {
41    socket: UdpSocket,
42    target: SocketAddr,
43}
44
45impl Writer for UdpWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.socket.send_to(data, self.target)?;
48        Ok(())
49    }
50}
51
52/// Start a UDP interface. Spawns a reader thread if listen_ip/port are set.
53/// Returns a writer if forward_ip/port are set.
54pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
55    let id = config.interface_id;
56    let mut writer: Option<Box<dyn Writer>> = None;
57
58    // Create writer socket if forward params are set
59    if let (Some(ref fwd_ip), Some(fwd_port)) = (&config.forward_ip, config.forward_port) {
60        let target: SocketAddr = format!("{}:{}", fwd_ip, fwd_port)
61            .parse()
62            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
63
64        let send_socket = UdpSocket::bind("0.0.0.0:0")?;
65        send_socket.set_broadcast(true)?;
66
67        writer = Some(Box::new(UdpWriter {
68            socket: send_socket,
69            target,
70        }));
71    }
72
73    // Create reader socket if listen params are set
74    if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
75        let bind_addr = format!("{}:{}", bind_ip, bind_port);
76        let recv_socket = UdpSocket::bind(&bind_addr)?;
77
78        log::info!("[{}] UDP listening on {}", config.name, bind_addr);
79
80        // Signal interface is up
81        let _ = tx.send(Event::InterfaceUp(id, None, None));
82
83        let name = config.name.clone();
84        thread::Builder::new()
85            .name(format!("udp-reader-{}", id.0))
86            .spawn(move || {
87                udp_reader_loop(recv_socket, id, name, tx);
88            })?;
89    }
90
91    Ok(writer)
92}
93
94/// Reader thread: receives UDP datagrams and sends them as frames.
95fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
96    let mut buf = [0u8; 2048];
97
98    loop {
99        match socket.recv_from(&mut buf) {
100            Ok((n, _src)) => {
101                if tx
102                    .send(Event::Frame {
103                        interface_id: id,
104                        data: buf[..n].to_vec(),
105                    })
106                    .is_err()
107                {
108                    // Driver shut down
109                    return;
110                }
111            }
112            Err(e) => {
113                log::warn!("[{}] recv error: {}", name, e);
114                let _ = tx.send(Event::InterfaceDown(id));
115                return;
116            }
117        }
118    }
119}
120
121// --- Factory implementation ---
122
123use std::collections::HashMap;
124use rns_core::transport::types::InterfaceInfo;
125use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
126
127/// A no-op writer used when UDP is started in listen-only mode (no forward address).
128/// Preserves engine registration while signalling that outbound writes are not supported.
129struct NoopWriter;
130
131impl Writer for NoopWriter {
132    fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
133        Err(io::Error::new(
134            io::ErrorKind::Other,
135            "listen-only UDP interface",
136        ))
137    }
138}
139
140/// Factory for `UDPInterface`.
141pub struct UdpFactory;
142
143impl InterfaceFactory for UdpFactory {
144    fn type_name(&self) -> &str { "UDPInterface" }
145
146    fn parse_config(
147        &self,
148        name: &str,
149        id: InterfaceId,
150        params: &HashMap<String, String>,
151    ) -> Result<Box<dyn InterfaceConfigData>, String> {
152        let listen_ip = params.get("listen_ip").cloned();
153
154        // 'port' is a shorthand that sets both listen_port and forward_port
155        let port_shorthand: Option<u16> = params
156            .get("port")
157            .and_then(|v| v.parse().ok());
158
159        let listen_port: Option<u16> = params
160            .get("listen_port")
161            .and_then(|v| v.parse().ok())
162            .or(port_shorthand);
163
164        let forward_ip = params.get("forward_ip").cloned();
165
166        let forward_port: Option<u16> = params
167            .get("forward_port")
168            .and_then(|v| v.parse().ok())
169            .or(port_shorthand);
170
171        Ok(Box::new(UdpConfig {
172            name: name.to_string(),
173            listen_ip,
174            listen_port,
175            forward_ip,
176            forward_port,
177            interface_id: id,
178        }))
179    }
180
181    fn start(
182        &self,
183        config: Box<dyn InterfaceConfigData>,
184        ctx: StartContext,
185    ) -> io::Result<StartResult> {
186        let udp_config = *config.into_any().downcast::<UdpConfig>()
187            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
188
189        let id = udp_config.interface_id;
190        let name = udp_config.name.clone();
191        let out_capable = udp_config.forward_ip.is_some();
192        let in_capable = udp_config.listen_ip.is_some();
193
194        let info = InterfaceInfo {
195            id,
196            name,
197            mode: ctx.mode,
198            out_capable,
199            in_capable,
200            bitrate: Some(10_000_000),
201            announce_rate_target: None,
202            announce_rate_grace: 0,
203            announce_rate_penalty: 0.0,
204            announce_cap: rns_core::constants::ANNOUNCE_CAP,
205            is_local_client: false,
206            wants_tunnel: false,
207            tunnel_id: None,
208            mtu: 1400,
209            ingress_control: true,
210            ia_freq: 0.0,
211            started: crate::time::now(),
212        };
213
214        let maybe_writer = start(udp_config, ctx.tx)?;
215
216        let writer: Box<dyn Writer> = match maybe_writer {
217            Some(w) => w,
218            None => Box::new(NoopWriter),
219        };
220
221        Ok(StartResult::Simple {
222            id,
223            info,
224            writer,
225            interface_type_name: "UDPInterface".to_string(),
226        })
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use std::net::UdpSocket;
234    use std::sync::mpsc;
235    use std::time::Duration;
236
237    fn find_free_port() -> u16 {
238        std::net::TcpListener::bind("127.0.0.1:0")
239            .unwrap()
240            .local_addr()
241            .unwrap()
242            .port()
243    }
244
245    #[test]
246    fn bind_and_receive() {
247        let port = find_free_port();
248        let (tx, rx) = mpsc::channel();
249
250        let config = UdpConfig {
251            name: "test-udp".into(),
252            listen_ip: Some("127.0.0.1".into()),
253            listen_port: Some(port),
254            forward_ip: None,
255            forward_port: None,
256            interface_id: InterfaceId(10),
257        };
258
259        let _writer = start(config, tx).unwrap();
260
261        // Drain InterfaceUp
262        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
263
264        // Send a UDP packet to the listener
265        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
266        let payload = b"hello udp";
267        sender
268            .send_to(payload, format!("127.0.0.1:{}", port))
269            .unwrap();
270
271        // Should receive Frame event
272        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
273        match event {
274            Event::Frame { interface_id, data } => {
275                assert_eq!(interface_id, InterfaceId(10));
276                assert_eq!(data, payload);
277            }
278            other => panic!("expected Frame, got {:?}", other),
279        }
280    }
281
282    #[test]
283    fn send_broadcast() {
284        let recv_port = find_free_port();
285        let (tx, _rx) = mpsc::channel();
286
287        let config = UdpConfig {
288            name: "test-udp-send".into(),
289            listen_ip: None,
290            listen_port: None,
291            forward_ip: Some("127.0.0.1".into()),
292            forward_port: Some(recv_port),
293            interface_id: InterfaceId(11),
294        };
295
296        let writer = start(config, tx).unwrap();
297        let mut writer = writer.unwrap();
298
299        // Bind a receiver
300        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
301        receiver
302            .set_read_timeout(Some(Duration::from_secs(2)))
303            .unwrap();
304
305        // Send via writer
306        let payload = b"broadcast data";
307        writer.send_frame(payload).unwrap();
308
309        // Receive on the other socket
310        let mut buf = [0u8; 256];
311        let (n, _) = receiver.recv_from(&mut buf).unwrap();
312        assert_eq!(&buf[..n], payload);
313    }
314
315    #[test]
316    fn round_trip() {
317        let listen_port = find_free_port();
318        let forward_port = find_free_port();
319        let (tx, rx) = mpsc::channel();
320
321        let config = UdpConfig {
322            name: "test-udp-rt".into(),
323            listen_ip: Some("127.0.0.1".into()),
324            listen_port: Some(listen_port),
325            forward_ip: Some("127.0.0.1".into()),
326            forward_port: Some(forward_port),
327            interface_id: InterfaceId(12),
328        };
329
330        let writer = start(config, tx).unwrap();
331        assert!(writer.is_some());
332
333        // Drain InterfaceUp
334        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
335
336        // Send to the listener
337        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
338        sender
339            .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
340            .unwrap();
341
342        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
343        match event {
344            Event::Frame { data, .. } => assert_eq!(data, b"ping"),
345            other => panic!("expected Frame, got {:?}", other),
346        }
347    }
348
349    #[test]
350    fn multiple_datagrams() {
351        let port = find_free_port();
352        let (tx, rx) = mpsc::channel();
353
354        let config = UdpConfig {
355            name: "test-udp-multi".into(),
356            listen_ip: Some("127.0.0.1".into()),
357            listen_port: Some(port),
358            forward_ip: None,
359            forward_port: None,
360            interface_id: InterfaceId(13),
361        };
362
363        let _writer = start(config, tx).unwrap();
364
365        // Drain InterfaceUp
366        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
367
368        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
369        for i in 0..5u8 {
370            sender
371                .send_to(&[i], format!("127.0.0.1:{}", port))
372                .unwrap();
373        }
374
375        for i in 0..5u8 {
376            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
377            match event {
378                Event::Frame { data, .. } => assert_eq!(data, vec![i]),
379                other => panic!("expected Frame, got {:?}", other),
380            }
381        }
382    }
383
384    #[test]
385    fn writer_send_to() {
386        let recv_port = find_free_port();
387
388        // Bind receiver first
389        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
390        receiver
391            .set_read_timeout(Some(Duration::from_secs(2)))
392            .unwrap();
393
394        // Create writer directly
395        let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
396        send_socket.set_broadcast(true).unwrap();
397        let target: SocketAddr = format!("127.0.0.1:{}", recv_port).parse().unwrap();
398        let mut writer = UdpWriter {
399            socket: send_socket,
400            target,
401        };
402
403        let payload = vec![0xAA, 0xBB, 0xCC];
404        writer.send_frame(&payload).unwrap();
405
406        let mut buf = [0u8; 256];
407        let (n, _) = receiver.recv_from(&mut buf).unwrap();
408        assert_eq!(&buf[..n], &payload);
409    }
410}