1use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::{lock_or_recover, Writer};
15
16#[derive(Debug, Clone)]
18pub struct UdpConfig {
19 pub name: String,
20 pub listen_ip: Option<String>,
21 pub listen_port: Option<u16>,
22 pub forward_ip: Option<String>,
23 pub forward_port: Option<u16>,
24 pub interface_id: InterfaceId,
25 pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30 pub forward_ip: Option<String>,
31 pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35 pub fn from_config(config: &UdpConfig) -> Self {
36 Self {
37 forward_ip: config.forward_ip.clone(),
38 forward_port: config.forward_port,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45 pub interface_name: String,
46 pub runtime: Arc<Mutex<UdpRuntime>>,
47 pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51 fn default() -> Self {
52 let mut config = UdpConfig {
53 name: String::new(),
54 listen_ip: None,
55 listen_port: None,
56 forward_ip: None,
57 forward_port: None,
58 interface_id: InterfaceId(0),
59 runtime: Arc::new(Mutex::new(UdpRuntime {
60 forward_ip: None,
61 forward_port: None,
62 })),
63 };
64 let startup = UdpRuntime::from_config(&config);
65 config.runtime = Arc::new(Mutex::new(startup));
66 config
67 }
68}
69
70struct UdpWriter {
72 socket: UdpSocket,
73 runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 let runtime = lock_or_recover(&self.runtime, "udp runtime").clone();
79 let target = match (runtime.forward_ip, runtime.forward_port) {
80 (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81 .parse::<SocketAddr>()
82 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83 _ => {
84 return Err(io::Error::new(
85 io::ErrorKind::Other,
86 "UDP interface has no forward target configured",
87 ))
88 }
89 };
90 self.socket.send_to(data, target)?;
91 Ok(())
92 }
93}
94
95pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98 let id = config.interface_id;
99 {
100 let startup = UdpRuntime::from_config(&config);
101 *lock_or_recover(&config.runtime, "udp runtime") = startup;
102 }
103 let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104 send_socket.set_broadcast(true)?;
105 let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106 socket: send_socket,
107 runtime: Arc::clone(&config.runtime),
108 }));
109
110 if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112 let bind_addr = format!("{}:{}", bind_ip, bind_port);
113 let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115 log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117 let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120 let name = config.name.clone();
121 thread::Builder::new()
122 .name(format!("udp-reader-{}", id.0))
123 .spawn(move || {
124 udp_reader_loop(recv_socket, id, name, tx);
125 })?;
126 }
127
128 Ok(writer)
129}
130
131fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133 let mut buf = [0u8; 2048];
134
135 loop {
136 match socket.recv_from(&mut buf) {
137 Ok((n, _src)) => {
138 if tx
139 .send(Event::Frame {
140 interface_id: id,
141 data: buf[..n].to_vec(),
142 })
143 .is_err()
144 {
145 return;
147 }
148 }
149 Err(e) => {
150 log::warn!("[{}] recv error: {}", name, e);
151 let _ = tx.send(Event::InterfaceDown(id));
152 return;
153 }
154 }
155 }
156}
157
158use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
161use rns_core::transport::types::InterfaceInfo;
162use std::collections::HashMap;
163
164pub struct UdpFactory;
166
167impl InterfaceFactory for UdpFactory {
168 fn type_name(&self) -> &str {
169 "UDPInterface"
170 }
171
172 fn parse_config(
173 &self,
174 name: &str,
175 id: InterfaceId,
176 params: &HashMap<String, String>,
177 ) -> Result<Box<dyn InterfaceConfigData>, String> {
178 let listen_ip = params.get("listen_ip").cloned();
179
180 let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
182
183 let listen_port: Option<u16> = params
184 .get("listen_port")
185 .and_then(|v| v.parse().ok())
186 .or(port_shorthand);
187
188 let forward_ip = params.get("forward_ip").cloned();
189
190 let forward_port: Option<u16> = params
191 .get("forward_port")
192 .and_then(|v| v.parse().ok())
193 .or(port_shorthand);
194
195 let mut config = UdpConfig {
196 name: name.to_string(),
197 listen_ip,
198 listen_port,
199 forward_ip,
200 forward_port,
201 interface_id: id,
202 runtime: Arc::new(Mutex::new(UdpRuntime {
203 forward_ip: None,
204 forward_port: None,
205 })),
206 };
207 let startup = UdpRuntime::from_config(&config);
208 config.runtime = Arc::new(Mutex::new(startup));
209 Ok(Box::new(config))
210 }
211
212 fn start(
213 &self,
214 config: Box<dyn InterfaceConfigData>,
215 ctx: StartContext,
216 ) -> io::Result<StartResult> {
217 let udp_config = *config
218 .into_any()
219 .downcast::<UdpConfig>()
220 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
221
222 let id = udp_config.interface_id;
223 let name = udp_config.name.clone();
224 let out_capable = udp_config.forward_ip.is_some();
225 let in_capable = udp_config.listen_ip.is_some();
226
227 let info = InterfaceInfo {
228 id,
229 name,
230 mode: ctx.mode,
231 out_capable,
232 in_capable,
233 bitrate: Some(10_000_000),
234 airtime_profile: None,
235 announce_rate_target: None,
236 announce_rate_grace: 0,
237 announce_rate_penalty: 0.0,
238 announce_cap: rns_core::constants::ANNOUNCE_CAP,
239 is_local_client: false,
240 wants_tunnel: false,
241 tunnel_id: None,
242 mtu: 1400,
243 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
244 ia_freq: 0.0,
245 started: crate::time::now(),
246 };
247
248 let maybe_writer = start(udp_config, ctx.tx)?;
249
250 let writer: Box<dyn Writer> = maybe_writer
251 .ok_or_else(|| io::Error::other("UDPInterface did not provide a writer"))?;
252
253 Ok(StartResult::Simple {
254 id,
255 info,
256 writer,
257 interface_type_name: "UDPInterface".to_string(),
258 })
259 }
260}
261
262pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
263 UdpRuntimeConfigHandle {
264 interface_name: config.name.clone(),
265 runtime: Arc::clone(&config.runtime),
266 startup: UdpRuntime::from_config(config),
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use std::net::UdpSocket;
274 use std::time::Duration;
275
276 fn find_free_port() -> u16 {
277 std::net::TcpListener::bind("127.0.0.1:0")
278 .unwrap()
279 .local_addr()
280 .unwrap()
281 .port()
282 }
283
284 #[test]
285 fn bind_and_receive() {
286 let port = find_free_port();
287 let (tx, rx) = crate::event::channel();
288
289 let config = UdpConfig {
290 name: "test-udp".into(),
291 listen_ip: Some("127.0.0.1".into()),
292 listen_port: Some(port),
293 forward_ip: None,
294 forward_port: None,
295 interface_id: InterfaceId(10),
296 ..UdpConfig::default()
297 };
298
299 let _writer = start(config, tx).unwrap();
300
301 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
303
304 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
306 let payload = b"hello udp";
307 sender
308 .send_to(payload, format!("127.0.0.1:{}", port))
309 .unwrap();
310
311 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
313 match event {
314 Event::Frame { interface_id, data } => {
315 assert_eq!(interface_id, InterfaceId(10));
316 assert_eq!(data, payload);
317 }
318 other => panic!("expected Frame, got {:?}", other),
319 }
320 }
321
322 #[test]
323 fn send_broadcast() {
324 let recv_port = find_free_port();
325 let (tx, _rx) = crate::event::channel();
326
327 let config = UdpConfig {
328 name: "test-udp-send".into(),
329 listen_ip: None,
330 listen_port: None,
331 forward_ip: Some("127.0.0.1".into()),
332 forward_port: Some(recv_port),
333 interface_id: InterfaceId(11),
334 ..UdpConfig::default()
335 };
336
337 let writer = start(config, tx).unwrap();
338 let mut writer = writer.unwrap();
339
340 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
342 receiver
343 .set_read_timeout(Some(Duration::from_secs(2)))
344 .unwrap();
345
346 let payload = b"broadcast data";
348 writer.send_frame(payload).unwrap();
349
350 let mut buf = [0u8; 256];
352 let (n, _) = receiver.recv_from(&mut buf).unwrap();
353 assert_eq!(&buf[..n], payload);
354 }
355
356 #[test]
357 fn round_trip() {
358 let listen_port = find_free_port();
359 let forward_port = find_free_port();
360 let (tx, rx) = crate::event::channel();
361
362 let config = UdpConfig {
363 name: "test-udp-rt".into(),
364 listen_ip: Some("127.0.0.1".into()),
365 listen_port: Some(listen_port),
366 forward_ip: Some("127.0.0.1".into()),
367 forward_port: Some(forward_port),
368 interface_id: InterfaceId(12),
369 ..UdpConfig::default()
370 };
371
372 let writer = start(config, tx).unwrap();
373 assert!(writer.is_some());
374
375 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
377
378 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
380 sender
381 .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
382 .unwrap();
383
384 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
385 match event {
386 Event::Frame { data, .. } => assert_eq!(data, b"ping"),
387 other => panic!("expected Frame, got {:?}", other),
388 }
389 }
390
391 #[test]
392 fn multiple_datagrams() {
393 let port = find_free_port();
394 let (tx, rx) = crate::event::channel();
395
396 let config = UdpConfig {
397 name: "test-udp-multi".into(),
398 listen_ip: Some("127.0.0.1".into()),
399 listen_port: Some(port),
400 forward_ip: None,
401 forward_port: None,
402 interface_id: InterfaceId(13),
403 ..UdpConfig::default()
404 };
405
406 let _writer = start(config, tx).unwrap();
407
408 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
410
411 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
412 for i in 0..5u8 {
413 sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
414 }
415
416 for i in 0..5u8 {
417 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
418 match event {
419 Event::Frame { data, .. } => assert_eq!(data, vec![i]),
420 other => panic!("expected Frame, got {:?}", other),
421 }
422 }
423 }
424
425 #[test]
426 fn writer_send_to() {
427 let recv_port = find_free_port();
428
429 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
431 receiver
432 .set_read_timeout(Some(Duration::from_secs(2)))
433 .unwrap();
434
435 let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
437 send_socket.set_broadcast(true).unwrap();
438 let mut writer = UdpWriter {
439 socket: send_socket,
440 runtime: Arc::new(Mutex::new(UdpRuntime {
441 forward_ip: Some("127.0.0.1".into()),
442 forward_port: Some(recv_port),
443 })),
444 };
445
446 let payload = vec![0xAA, 0xBB, 0xCC];
447 writer.send_frame(&payload).unwrap();
448
449 let mut buf = [0u8; 256];
450 let (n, _) = receiver.recv_from(&mut buf).unwrap();
451 assert_eq!(&buf[..n], &payload);
452 }
453}