Skip to main content

rns_net/interface/
udp.rs

1//! UDP broadcast interface.
2//!
3//! Connectionless, no HDLC framing — each UDP datagram is one packet.
4//! Matches Python `UDPInterface` from `UDPInterface.py`.
5
6use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::thread;
9
10use rns_core::transport::types::InterfaceId;
11
12use crate::event::{Event, EventSender};
13use crate::interface::Writer;
14
15/// Configuration for a UDP interface.
16#[derive(Debug, Clone)]
17pub struct UdpConfig {
18    pub name: String,
19    pub listen_ip: Option<String>,
20    pub listen_port: Option<u16>,
21    pub forward_ip: Option<String>,
22    pub forward_port: Option<u16>,
23    pub interface_id: InterfaceId,
24}
25
26impl Default for UdpConfig {
27    fn default() -> Self {
28        UdpConfig {
29            name: String::new(),
30            listen_ip: None,
31            listen_port: None,
32            forward_ip: None,
33            forward_port: None,
34            interface_id: InterfaceId(0),
35        }
36    }
37}
38
39/// Writer that sends raw data via UDP to a target address.
40struct UdpWriter {
41    socket: UdpSocket,
42    target: SocketAddr,
43}
44
45impl Writer for UdpWriter {
46    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47        self.socket.send_to(data, self.target)?;
48        Ok(())
49    }
50}
51
52/// Start a UDP interface. Spawns a reader thread if listen_ip/port are set.
53/// Returns a writer if forward_ip/port are set.
54pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
55    let id = config.interface_id;
56    let mut writer: Option<Box<dyn Writer>> = None;
57
58    // Create writer socket if forward params are set
59    if let (Some(ref fwd_ip), Some(fwd_port)) = (&config.forward_ip, config.forward_port) {
60        let target: SocketAddr = format!("{}:{}", fwd_ip, fwd_port)
61            .parse()
62            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
63
64        let send_socket = UdpSocket::bind("0.0.0.0:0")?;
65        send_socket.set_broadcast(true)?;
66
67        writer = Some(Box::new(UdpWriter {
68            socket: send_socket,
69            target,
70        }));
71    }
72
73    // Create reader socket if listen params are set
74    if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
75        let bind_addr = format!("{}:{}", bind_ip, bind_port);
76        let recv_socket = UdpSocket::bind(&bind_addr)?;
77
78        log::info!("[{}] UDP listening on {}", config.name, bind_addr);
79
80        // Signal interface is up
81        let _ = tx.send(Event::InterfaceUp(id, None, None));
82
83        let name = config.name.clone();
84        thread::Builder::new()
85            .name(format!("udp-reader-{}", id.0))
86            .spawn(move || {
87                udp_reader_loop(recv_socket, id, name, tx);
88            })?;
89    }
90
91    Ok(writer)
92}
93
94/// Reader thread: receives UDP datagrams and sends them as frames.
95fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
96    let mut buf = [0u8; 2048];
97
98    loop {
99        match socket.recv_from(&mut buf) {
100            Ok((n, _src)) => {
101                if tx
102                    .send(Event::Frame {
103                        interface_id: id,
104                        data: buf[..n].to_vec(),
105                    })
106                    .is_err()
107                {
108                    // Driver shut down
109                    return;
110                }
111            }
112            Err(e) => {
113                log::warn!("[{}] recv error: {}", name, e);
114                let _ = tx.send(Event::InterfaceDown(id));
115                return;
116            }
117        }
118    }
119}
120
121// --- Factory implementation ---
122
123use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
124use rns_core::transport::types::InterfaceInfo;
125use std::collections::HashMap;
126
127/// A no-op writer used when UDP is started in listen-only mode (no forward address).
128/// Preserves engine registration while signalling that outbound writes are not supported.
129struct NoopWriter;
130
131impl Writer for NoopWriter {
132    fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
133        Err(io::Error::new(
134            io::ErrorKind::Other,
135            "listen-only UDP interface",
136        ))
137    }
138}
139
140/// Factory for `UDPInterface`.
141pub struct UdpFactory;
142
143impl InterfaceFactory for UdpFactory {
144    fn type_name(&self) -> &str {
145        "UDPInterface"
146    }
147
148    fn parse_config(
149        &self,
150        name: &str,
151        id: InterfaceId,
152        params: &HashMap<String, String>,
153    ) -> Result<Box<dyn InterfaceConfigData>, String> {
154        let listen_ip = params.get("listen_ip").cloned();
155
156        // 'port' is a shorthand that sets both listen_port and forward_port
157        let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
158
159        let listen_port: Option<u16> = params
160            .get("listen_port")
161            .and_then(|v| v.parse().ok())
162            .or(port_shorthand);
163
164        let forward_ip = params.get("forward_ip").cloned();
165
166        let forward_port: Option<u16> = params
167            .get("forward_port")
168            .and_then(|v| v.parse().ok())
169            .or(port_shorthand);
170
171        Ok(Box::new(UdpConfig {
172            name: name.to_string(),
173            listen_ip,
174            listen_port,
175            forward_ip,
176            forward_port,
177            interface_id: id,
178        }))
179    }
180
181    fn start(
182        &self,
183        config: Box<dyn InterfaceConfigData>,
184        ctx: StartContext,
185    ) -> io::Result<StartResult> {
186        let udp_config = *config
187            .into_any()
188            .downcast::<UdpConfig>()
189            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
190
191        let id = udp_config.interface_id;
192        let name = udp_config.name.clone();
193        let out_capable = udp_config.forward_ip.is_some();
194        let in_capable = udp_config.listen_ip.is_some();
195
196        let info = InterfaceInfo {
197            id,
198            name,
199            mode: ctx.mode,
200            out_capable,
201            in_capable,
202            bitrate: Some(10_000_000),
203            announce_rate_target: None,
204            announce_rate_grace: 0,
205            announce_rate_penalty: 0.0,
206            announce_cap: rns_core::constants::ANNOUNCE_CAP,
207            is_local_client: false,
208            wants_tunnel: false,
209            tunnel_id: None,
210            mtu: 1400,
211            ingress_control: true,
212            ia_freq: 0.0,
213            started: crate::time::now(),
214        };
215
216        let maybe_writer = start(udp_config, ctx.tx)?;
217
218        let writer: Box<dyn Writer> = match maybe_writer {
219            Some(w) => w,
220            None => Box::new(NoopWriter),
221        };
222
223        Ok(StartResult::Simple {
224            id,
225            info,
226            writer,
227            interface_type_name: "UDPInterface".to_string(),
228        })
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use std::net::UdpSocket;
236    use std::sync::mpsc;
237    use std::time::Duration;
238
239    fn find_free_port() -> u16 {
240        std::net::TcpListener::bind("127.0.0.1:0")
241            .unwrap()
242            .local_addr()
243            .unwrap()
244            .port()
245    }
246
247    #[test]
248    fn bind_and_receive() {
249        let port = find_free_port();
250        let (tx, rx) = mpsc::channel();
251
252        let config = UdpConfig {
253            name: "test-udp".into(),
254            listen_ip: Some("127.0.0.1".into()),
255            listen_port: Some(port),
256            forward_ip: None,
257            forward_port: None,
258            interface_id: InterfaceId(10),
259        };
260
261        let _writer = start(config, tx).unwrap();
262
263        // Drain InterfaceUp
264        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
265
266        // Send a UDP packet to the listener
267        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
268        let payload = b"hello udp";
269        sender
270            .send_to(payload, format!("127.0.0.1:{}", port))
271            .unwrap();
272
273        // Should receive Frame event
274        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275        match event {
276            Event::Frame { interface_id, data } => {
277                assert_eq!(interface_id, InterfaceId(10));
278                assert_eq!(data, payload);
279            }
280            other => panic!("expected Frame, got {:?}", other),
281        }
282    }
283
284    #[test]
285    fn send_broadcast() {
286        let recv_port = find_free_port();
287        let (tx, _rx) = mpsc::channel();
288
289        let config = UdpConfig {
290            name: "test-udp-send".into(),
291            listen_ip: None,
292            listen_port: None,
293            forward_ip: Some("127.0.0.1".into()),
294            forward_port: Some(recv_port),
295            interface_id: InterfaceId(11),
296        };
297
298        let writer = start(config, tx).unwrap();
299        let mut writer = writer.unwrap();
300
301        // Bind a receiver
302        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
303        receiver
304            .set_read_timeout(Some(Duration::from_secs(2)))
305            .unwrap();
306
307        // Send via writer
308        let payload = b"broadcast data";
309        writer.send_frame(payload).unwrap();
310
311        // Receive on the other socket
312        let mut buf = [0u8; 256];
313        let (n, _) = receiver.recv_from(&mut buf).unwrap();
314        assert_eq!(&buf[..n], payload);
315    }
316
317    #[test]
318    fn round_trip() {
319        let listen_port = find_free_port();
320        let forward_port = find_free_port();
321        let (tx, rx) = mpsc::channel();
322
323        let config = UdpConfig {
324            name: "test-udp-rt".into(),
325            listen_ip: Some("127.0.0.1".into()),
326            listen_port: Some(listen_port),
327            forward_ip: Some("127.0.0.1".into()),
328            forward_port: Some(forward_port),
329            interface_id: InterfaceId(12),
330        };
331
332        let writer = start(config, tx).unwrap();
333        assert!(writer.is_some());
334
335        // Drain InterfaceUp
336        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
337
338        // Send to the listener
339        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
340        sender
341            .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
342            .unwrap();
343
344        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
345        match event {
346            Event::Frame { data, .. } => assert_eq!(data, b"ping"),
347            other => panic!("expected Frame, got {:?}", other),
348        }
349    }
350
351    #[test]
352    fn multiple_datagrams() {
353        let port = find_free_port();
354        let (tx, rx) = mpsc::channel();
355
356        let config = UdpConfig {
357            name: "test-udp-multi".into(),
358            listen_ip: Some("127.0.0.1".into()),
359            listen_port: Some(port),
360            forward_ip: None,
361            forward_port: None,
362            interface_id: InterfaceId(13),
363        };
364
365        let _writer = start(config, tx).unwrap();
366
367        // Drain InterfaceUp
368        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
369
370        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
371        for i in 0..5u8 {
372            sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
373        }
374
375        for i in 0..5u8 {
376            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
377            match event {
378                Event::Frame { data, .. } => assert_eq!(data, vec![i]),
379                other => panic!("expected Frame, got {:?}", other),
380            }
381        }
382    }
383
384    #[test]
385    fn writer_send_to() {
386        let recv_port = find_free_port();
387
388        // Bind receiver first
389        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
390        receiver
391            .set_read_timeout(Some(Duration::from_secs(2)))
392            .unwrap();
393
394        // Create writer directly
395        let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
396        send_socket.set_broadcast(true).unwrap();
397        let target: SocketAddr = format!("127.0.0.1:{}", recv_port).parse().unwrap();
398        let mut writer = UdpWriter {
399            socket: send_socket,
400            target,
401        };
402
403        let payload = vec![0xAA, 0xBB, 0xCC];
404        writer.send_frame(&payload).unwrap();
405
406        let mut buf = [0u8; 256];
407        let (n, _) = receiver.recv_from(&mut buf).unwrap();
408        assert_eq!(&buf[..n], &payload);
409    }
410}