1use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::{lock_or_recover, Writer};
15
16#[derive(Debug, Clone)]
18pub struct UdpConfig {
19 pub name: String,
20 pub listen_ip: Option<String>,
21 pub listen_port: Option<u16>,
22 pub forward_ip: Option<String>,
23 pub forward_port: Option<u16>,
24 pub interface_id: InterfaceId,
25 pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30 pub forward_ip: Option<String>,
31 pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35 pub fn from_config(config: &UdpConfig) -> Self {
36 Self {
37 forward_ip: config.forward_ip.clone(),
38 forward_port: config.forward_port,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45 pub interface_name: String,
46 pub runtime: Arc<Mutex<UdpRuntime>>,
47 pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51 fn default() -> Self {
52 let mut config = UdpConfig {
53 name: String::new(),
54 listen_ip: None,
55 listen_port: None,
56 forward_ip: None,
57 forward_port: None,
58 interface_id: InterfaceId(0),
59 runtime: Arc::new(Mutex::new(UdpRuntime {
60 forward_ip: None,
61 forward_port: None,
62 })),
63 };
64 let startup = UdpRuntime::from_config(&config);
65 config.runtime = Arc::new(Mutex::new(startup));
66 config
67 }
68}
69
70struct UdpWriter {
72 socket: UdpSocket,
73 runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 let runtime = lock_or_recover(&self.runtime, "udp runtime").clone();
79 let target = match (runtime.forward_ip, runtime.forward_port) {
80 (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81 .parse::<SocketAddr>()
82 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83 _ => {
84 return Err(io::Error::new(
85 io::ErrorKind::Other,
86 "UDP interface has no forward target configured",
87 ))
88 }
89 };
90 self.socket.send_to(data, target)?;
91 Ok(())
92 }
93}
94
95pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98 let id = config.interface_id;
99 {
100 let startup = UdpRuntime::from_config(&config);
101 *lock_or_recover(&config.runtime, "udp runtime") = startup;
102 }
103 let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104 send_socket.set_broadcast(true)?;
105 let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106 socket: send_socket,
107 runtime: Arc::clone(&config.runtime),
108 }));
109
110 if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112 let bind_addr = format!("{}:{}", bind_ip, bind_port);
113 let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115 log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117 let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120 let name = config.name.clone();
121 thread::Builder::new()
122 .name(format!("udp-reader-{}", id.0))
123 .spawn(move || {
124 udp_reader_loop(recv_socket, id, name, tx);
125 })?;
126 }
127
128 Ok(writer)
129}
130
131fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133 let mut buf = [0u8; 2048];
134
135 loop {
136 match socket.recv_from(&mut buf) {
137 Ok((n, _src)) => {
138 if tx
139 .send(Event::Frame {
140 interface_id: id,
141 data: buf[..n].to_vec(),
142 rssi: None,
143 snr: None,
144 })
145 .is_err()
146 {
147 return;
149 }
150 }
151 Err(e) => {
152 log::warn!("[{}] recv error: {}", name, e);
153 let _ = tx.send(Event::InterfaceDown(id));
154 return;
155 }
156 }
157 }
158}
159
160use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
163use rns_core::transport::types::InterfaceInfo;
164use std::collections::HashMap;
165
166pub struct UdpFactory;
168
169impl InterfaceFactory for UdpFactory {
170 fn type_name(&self) -> &str {
171 "UDPInterface"
172 }
173
174 fn parse_config(
175 &self,
176 name: &str,
177 id: InterfaceId,
178 params: &HashMap<String, String>,
179 ) -> Result<Box<dyn InterfaceConfigData>, String> {
180 let listen_ip = params.get("listen_ip").cloned();
181
182 let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
184
185 let listen_port: Option<u16> = params
186 .get("listen_port")
187 .and_then(|v| v.parse().ok())
188 .or(port_shorthand);
189
190 let forward_ip = params.get("forward_ip").cloned();
191
192 let forward_port: Option<u16> = params
193 .get("forward_port")
194 .and_then(|v| v.parse().ok())
195 .or(port_shorthand);
196
197 let mut config = UdpConfig {
198 name: name.to_string(),
199 listen_ip,
200 listen_port,
201 forward_ip,
202 forward_port,
203 interface_id: id,
204 runtime: Arc::new(Mutex::new(UdpRuntime {
205 forward_ip: None,
206 forward_port: None,
207 })),
208 };
209 let startup = UdpRuntime::from_config(&config);
210 config.runtime = Arc::new(Mutex::new(startup));
211 Ok(Box::new(config))
212 }
213
214 fn start(
215 &self,
216 config: Box<dyn InterfaceConfigData>,
217 ctx: StartContext,
218 ) -> io::Result<StartResult> {
219 let udp_config = *config
220 .into_any()
221 .downcast::<UdpConfig>()
222 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
223
224 let id = udp_config.interface_id;
225 let name = udp_config.name.clone();
226 let out_capable = udp_config.forward_ip.is_some();
227 let in_capable = udp_config.listen_ip.is_some();
228
229 let info = InterfaceInfo {
230 id,
231 name,
232 mode: ctx.mode,
233 out_capable,
234 in_capable,
235 bitrate: Some(10_000_000),
236 airtime_profile: None,
237 announce_rate_target: None,
238 announce_rate_grace: 0,
239 announce_rate_penalty: 0.0,
240 announce_cap: rns_core::constants::ANNOUNCE_CAP,
241 is_local_client: false,
242 wants_tunnel: false,
243 tunnel_id: None,
244 mtu: 1400,
245 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
246 ia_freq: 0.0,
247 ip_freq: 0.0,
248 op_freq: 0.0,
249 op_samples: 0,
250 started: crate::time::now(),
251 };
252
253 let maybe_writer = start(udp_config, ctx.tx)?;
254
255 let writer: Box<dyn Writer> = maybe_writer
256 .ok_or_else(|| io::Error::other("UDPInterface did not provide a writer"))?;
257
258 Ok(StartResult::Simple {
259 id,
260 info,
261 writer,
262 interface_type_name: "UDPInterface".to_string(),
263 })
264 }
265}
266
267pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
268 UdpRuntimeConfigHandle {
269 interface_name: config.name.clone(),
270 runtime: Arc::clone(&config.runtime),
271 startup: UdpRuntime::from_config(config),
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use std::net::UdpSocket;
279 use std::time::Duration;
280
281 fn find_free_port() -> u16 {
282 UdpSocket::bind("127.0.0.1:0")
283 .unwrap()
284 .local_addr()
285 .unwrap()
286 .port()
287 }
288
289 #[test]
290 fn bind_and_receive() {
291 let port = find_free_port();
292 let (tx, rx) = crate::event::channel();
293
294 let config = UdpConfig {
295 name: "test-udp".into(),
296 listen_ip: Some("127.0.0.1".into()),
297 listen_port: Some(port),
298 forward_ip: None,
299 forward_port: None,
300 interface_id: InterfaceId(10),
301 ..UdpConfig::default()
302 };
303
304 let _writer = start(config, tx).unwrap();
305
306 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
308
309 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
311 let payload = b"hello udp";
312 sender
313 .send_to(payload, format!("127.0.0.1:{}", port))
314 .unwrap();
315
316 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
318 match event {
319 Event::Frame {
320 interface_id,
321 data,
322 rssi: _,
323 snr: _,
324 } => {
325 assert_eq!(interface_id, InterfaceId(10));
326 assert_eq!(data, payload);
327 }
328 other => panic!("expected Frame, got {:?}", other),
329 }
330 }
331
332 #[test]
333 fn send_broadcast() {
334 let recv_port = find_free_port();
335 let (tx, _rx) = crate::event::channel();
336
337 let config = UdpConfig {
338 name: "test-udp-send".into(),
339 listen_ip: None,
340 listen_port: None,
341 forward_ip: Some("127.0.0.1".into()),
342 forward_port: Some(recv_port),
343 interface_id: InterfaceId(11),
344 ..UdpConfig::default()
345 };
346
347 let writer = start(config, tx).unwrap();
348 let mut writer = writer.unwrap();
349
350 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
352 receiver
353 .set_read_timeout(Some(Duration::from_secs(2)))
354 .unwrap();
355
356 let payload = b"broadcast data";
358 writer.send_frame(payload).unwrap();
359
360 let mut buf = [0u8; 256];
362 let (n, _) = receiver.recv_from(&mut buf).unwrap();
363 assert_eq!(&buf[..n], payload);
364 }
365
366 #[test]
367 fn round_trip() {
368 let listen_port = find_free_port();
369 let forward_port = find_free_port();
370 let (tx, rx) = crate::event::channel();
371
372 let config = UdpConfig {
373 name: "test-udp-rt".into(),
374 listen_ip: Some("127.0.0.1".into()),
375 listen_port: Some(listen_port),
376 forward_ip: Some("127.0.0.1".into()),
377 forward_port: Some(forward_port),
378 interface_id: InterfaceId(12),
379 ..UdpConfig::default()
380 };
381
382 let writer = start(config, tx).unwrap();
383 assert!(writer.is_some());
384
385 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
387
388 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
390 sender
391 .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
392 .unwrap();
393
394 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
395 match event {
396 Event::Frame { data, .. } => assert_eq!(data, b"ping"),
397 other => panic!("expected Frame, got {:?}", other),
398 }
399 }
400
401 #[test]
402 fn multiple_datagrams() {
403 let port = find_free_port();
404 let (tx, rx) = crate::event::channel();
405
406 let config = UdpConfig {
407 name: "test-udp-multi".into(),
408 listen_ip: Some("127.0.0.1".into()),
409 listen_port: Some(port),
410 forward_ip: None,
411 forward_port: None,
412 interface_id: InterfaceId(13),
413 ..UdpConfig::default()
414 };
415
416 let _writer = start(config, tx).unwrap();
417
418 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
420
421 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
422 for i in 0..5u8 {
423 sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
424 }
425
426 for i in 0..5u8 {
427 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
428 match event {
429 Event::Frame { data, .. } => assert_eq!(data, vec![i]),
430 other => panic!("expected Frame, got {:?}", other),
431 }
432 }
433 }
434
435 #[test]
436 fn writer_send_to() {
437 let recv_port = find_free_port();
438
439 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
441 receiver
442 .set_read_timeout(Some(Duration::from_secs(2)))
443 .unwrap();
444
445 let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
447 send_socket.set_broadcast(true).unwrap();
448 let mut writer = UdpWriter {
449 socket: send_socket,
450 runtime: Arc::new(Mutex::new(UdpRuntime {
451 forward_ip: Some("127.0.0.1".into()),
452 forward_port: Some(recv_port),
453 })),
454 };
455
456 let payload = vec![0xAA, 0xBB, 0xCC];
457 writer.send_frame(&payload).unwrap();
458
459 let mut buf = [0u8; 256];
460 let (n, _) = receiver.recv_from(&mut buf).unwrap();
461 assert_eq!(&buf[..n], &payload);
462 }
463}