Skip to main content

rns_net/interface/
udp.rs

1//! UDP broadcast interface.
2//!
3//! Connectionless, no HDLC framing — each UDP datagram is one packet.
4//! Matches Python `UDPInterface` from `UDPInterface.py`.
5
6use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::Writer;
15
16/// Configuration for a UDP interface.
17#[derive(Debug, Clone)]
18pub struct UdpConfig {
19    pub name: String,
20    pub listen_ip: Option<String>,
21    pub listen_port: Option<u16>,
22    pub forward_ip: Option<String>,
23    pub forward_port: Option<u16>,
24    pub interface_id: InterfaceId,
25    pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30    pub forward_ip: Option<String>,
31    pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35    pub fn from_config(config: &UdpConfig) -> Self {
36        Self {
37            forward_ip: config.forward_ip.clone(),
38            forward_port: config.forward_port,
39        }
40    }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45    pub interface_name: String,
46    pub runtime: Arc<Mutex<UdpRuntime>>,
47    pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51    fn default() -> Self {
52        let mut config = UdpConfig {
53            name: String::new(),
54            listen_ip: None,
55            listen_port: None,
56            forward_ip: None,
57            forward_port: None,
58            interface_id: InterfaceId(0),
59            runtime: Arc::new(Mutex::new(UdpRuntime {
60                forward_ip: None,
61                forward_port: None,
62            })),
63        };
64        let startup = UdpRuntime::from_config(&config);
65        config.runtime = Arc::new(Mutex::new(startup));
66        config
67    }
68}
69
70/// Writer that sends raw data via UDP to a target address.
71struct UdpWriter {
72    socket: UdpSocket,
73    runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78        let runtime = self.runtime.lock().unwrap().clone();
79        let target = match (runtime.forward_ip, runtime.forward_port) {
80            (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81                .parse::<SocketAddr>()
82                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83            _ => {
84                return Err(io::Error::new(
85                    io::ErrorKind::Other,
86                    "UDP interface has no forward target configured",
87                ))
88            }
89        };
90        self.socket.send_to(data, target)?;
91        Ok(())
92    }
93}
94
95/// Start a UDP interface. Spawns a reader thread if listen_ip/port are set.
96/// Returns a writer if forward_ip/port are set.
97pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98    let id = config.interface_id;
99    {
100        let startup = UdpRuntime::from_config(&config);
101        *config.runtime.lock().unwrap() = startup;
102    }
103    let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104    send_socket.set_broadcast(true)?;
105    let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106        socket: send_socket,
107        runtime: Arc::clone(&config.runtime),
108    }));
109
110    // Create reader socket if listen params are set
111    if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112        let bind_addr = format!("{}:{}", bind_ip, bind_port);
113        let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115        log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117        // Signal interface is up
118        let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120        let name = config.name.clone();
121        thread::Builder::new()
122            .name(format!("udp-reader-{}", id.0))
123            .spawn(move || {
124                udp_reader_loop(recv_socket, id, name, tx);
125            })?;
126    }
127
128    Ok(writer)
129}
130
131/// Reader thread: receives UDP datagrams and sends them as frames.
132fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133    let mut buf = [0u8; 2048];
134
135    loop {
136        match socket.recv_from(&mut buf) {
137            Ok((n, _src)) => {
138                if tx
139                    .send(Event::Frame {
140                        interface_id: id,
141                        data: buf[..n].to_vec(),
142                    })
143                    .is_err()
144                {
145                    // Driver shut down
146                    return;
147                }
148            }
149            Err(e) => {
150                log::warn!("[{}] recv error: {}", name, e);
151                let _ = tx.send(Event::InterfaceDown(id));
152                return;
153            }
154        }
155    }
156}
157
158// --- Factory implementation ---
159
160use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
161use rns_core::transport::types::InterfaceInfo;
162use std::collections::HashMap;
163
164/// Factory for `UDPInterface`.
165pub struct UdpFactory;
166
167impl InterfaceFactory for UdpFactory {
168    fn type_name(&self) -> &str {
169        "UDPInterface"
170    }
171
172    fn parse_config(
173        &self,
174        name: &str,
175        id: InterfaceId,
176        params: &HashMap<String, String>,
177    ) -> Result<Box<dyn InterfaceConfigData>, String> {
178        let listen_ip = params.get("listen_ip").cloned();
179
180        // 'port' is a shorthand that sets both listen_port and forward_port
181        let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
182
183        let listen_port: Option<u16> = params
184            .get("listen_port")
185            .and_then(|v| v.parse().ok())
186            .or(port_shorthand);
187
188        let forward_ip = params.get("forward_ip").cloned();
189
190        let forward_port: Option<u16> = params
191            .get("forward_port")
192            .and_then(|v| v.parse().ok())
193            .or(port_shorthand);
194
195        let mut config = UdpConfig {
196            name: name.to_string(),
197            listen_ip,
198            listen_port,
199            forward_ip,
200            forward_port,
201            interface_id: id,
202            runtime: Arc::new(Mutex::new(UdpRuntime {
203                forward_ip: None,
204                forward_port: None,
205            })),
206        };
207        let startup = UdpRuntime::from_config(&config);
208        config.runtime = Arc::new(Mutex::new(startup));
209        Ok(Box::new(config))
210    }
211
212    fn start(
213        &self,
214        config: Box<dyn InterfaceConfigData>,
215        ctx: StartContext,
216    ) -> io::Result<StartResult> {
217        let udp_config = *config
218            .into_any()
219            .downcast::<UdpConfig>()
220            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
221
222        let id = udp_config.interface_id;
223        let name = udp_config.name.clone();
224        let out_capable = udp_config.forward_ip.is_some();
225        let in_capable = udp_config.listen_ip.is_some();
226
227        let info = InterfaceInfo {
228            id,
229            name,
230            mode: ctx.mode,
231            out_capable,
232            in_capable,
233            bitrate: Some(10_000_000),
234            announce_rate_target: None,
235            announce_rate_grace: 0,
236            announce_rate_penalty: 0.0,
237            announce_cap: rns_core::constants::ANNOUNCE_CAP,
238            is_local_client: false,
239            wants_tunnel: false,
240            tunnel_id: None,
241            mtu: 1400,
242            ingress_control: true,
243            ia_freq: 0.0,
244            started: crate::time::now(),
245        };
246
247        let maybe_writer = start(udp_config, ctx.tx)?;
248
249        let writer: Box<dyn Writer> = maybe_writer.unwrap();
250
251        Ok(StartResult::Simple {
252            id,
253            info,
254            writer,
255            interface_type_name: "UDPInterface".to_string(),
256        })
257    }
258}
259
260pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
261    UdpRuntimeConfigHandle {
262        interface_name: config.name.clone(),
263        runtime: Arc::clone(&config.runtime),
264        startup: UdpRuntime::from_config(config),
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use std::net::UdpSocket;
272    use std::time::Duration;
273
274    fn find_free_port() -> u16 {
275        std::net::TcpListener::bind("127.0.0.1:0")
276            .unwrap()
277            .local_addr()
278            .unwrap()
279            .port()
280    }
281
282    #[test]
283    fn bind_and_receive() {
284        let port = find_free_port();
285        let (tx, rx) = crate::event::channel();
286
287        let config = UdpConfig {
288            name: "test-udp".into(),
289            listen_ip: Some("127.0.0.1".into()),
290            listen_port: Some(port),
291            forward_ip: None,
292            forward_port: None,
293            interface_id: InterfaceId(10),
294            ..UdpConfig::default()
295        };
296
297        let _writer = start(config, tx).unwrap();
298
299        // Drain InterfaceUp
300        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
301
302        // Send a UDP packet to the listener
303        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
304        let payload = b"hello udp";
305        sender
306            .send_to(payload, format!("127.0.0.1:{}", port))
307            .unwrap();
308
309        // Should receive Frame event
310        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
311        match event {
312            Event::Frame { interface_id, data } => {
313                assert_eq!(interface_id, InterfaceId(10));
314                assert_eq!(data, payload);
315            }
316            other => panic!("expected Frame, got {:?}", other),
317        }
318    }
319
320    #[test]
321    fn send_broadcast() {
322        let recv_port = find_free_port();
323        let (tx, _rx) = crate::event::channel();
324
325        let config = UdpConfig {
326            name: "test-udp-send".into(),
327            listen_ip: None,
328            listen_port: None,
329            forward_ip: Some("127.0.0.1".into()),
330            forward_port: Some(recv_port),
331            interface_id: InterfaceId(11),
332            ..UdpConfig::default()
333        };
334
335        let writer = start(config, tx).unwrap();
336        let mut writer = writer.unwrap();
337
338        // Bind a receiver
339        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
340        receiver
341            .set_read_timeout(Some(Duration::from_secs(2)))
342            .unwrap();
343
344        // Send via writer
345        let payload = b"broadcast data";
346        writer.send_frame(payload).unwrap();
347
348        // Receive on the other socket
349        let mut buf = [0u8; 256];
350        let (n, _) = receiver.recv_from(&mut buf).unwrap();
351        assert_eq!(&buf[..n], payload);
352    }
353
354    #[test]
355    fn round_trip() {
356        let listen_port = find_free_port();
357        let forward_port = find_free_port();
358        let (tx, rx) = crate::event::channel();
359
360        let config = UdpConfig {
361            name: "test-udp-rt".into(),
362            listen_ip: Some("127.0.0.1".into()),
363            listen_port: Some(listen_port),
364            forward_ip: Some("127.0.0.1".into()),
365            forward_port: Some(forward_port),
366            interface_id: InterfaceId(12),
367            ..UdpConfig::default()
368        };
369
370        let writer = start(config, tx).unwrap();
371        assert!(writer.is_some());
372
373        // Drain InterfaceUp
374        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
375
376        // Send to the listener
377        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
378        sender
379            .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
380            .unwrap();
381
382        let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
383        match event {
384            Event::Frame { data, .. } => assert_eq!(data, b"ping"),
385            other => panic!("expected Frame, got {:?}", other),
386        }
387    }
388
389    #[test]
390    fn multiple_datagrams() {
391        let port = find_free_port();
392        let (tx, rx) = crate::event::channel();
393
394        let config = UdpConfig {
395            name: "test-udp-multi".into(),
396            listen_ip: Some("127.0.0.1".into()),
397            listen_port: Some(port),
398            forward_ip: None,
399            forward_port: None,
400            interface_id: InterfaceId(13),
401            ..UdpConfig::default()
402        };
403
404        let _writer = start(config, tx).unwrap();
405
406        // Drain InterfaceUp
407        let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
408
409        let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
410        for i in 0..5u8 {
411            sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
412        }
413
414        for i in 0..5u8 {
415            let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
416            match event {
417                Event::Frame { data, .. } => assert_eq!(data, vec![i]),
418                other => panic!("expected Frame, got {:?}", other),
419            }
420        }
421    }
422
423    #[test]
424    fn writer_send_to() {
425        let recv_port = find_free_port();
426
427        // Bind receiver first
428        let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
429        receiver
430            .set_read_timeout(Some(Duration::from_secs(2)))
431            .unwrap();
432
433        // Create writer directly
434        let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
435        send_socket.set_broadcast(true).unwrap();
436        let mut writer = UdpWriter {
437            socket: send_socket,
438            runtime: Arc::new(Mutex::new(UdpRuntime {
439                forward_ip: Some("127.0.0.1".into()),
440                forward_port: Some(recv_port),
441            })),
442        };
443
444        let payload = vec![0xAA, 0xBB, 0xCC];
445        writer.send_frame(&payload).unwrap();
446
447        let mut buf = [0u8; 256];
448        let (n, _) = receiver.recv_from(&mut buf).unwrap();
449        assert_eq!(&buf[..n], &payload);
450    }
451}