1use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11use rns_core::transport::types::InterfaceId;
12
13use crate::event::{Event, EventSender};
14use crate::interface::Writer;
15
16#[derive(Debug, Clone)]
18pub struct UdpConfig {
19 pub name: String,
20 pub listen_ip: Option<String>,
21 pub listen_port: Option<u16>,
22 pub forward_ip: Option<String>,
23 pub forward_port: Option<u16>,
24 pub interface_id: InterfaceId,
25 pub runtime: Arc<Mutex<UdpRuntime>>,
26}
27
28#[derive(Debug, Clone)]
29pub struct UdpRuntime {
30 pub forward_ip: Option<String>,
31 pub forward_port: Option<u16>,
32}
33
34impl UdpRuntime {
35 pub fn from_config(config: &UdpConfig) -> Self {
36 Self {
37 forward_ip: config.forward_ip.clone(),
38 forward_port: config.forward_port,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
44pub struct UdpRuntimeConfigHandle {
45 pub interface_name: String,
46 pub runtime: Arc<Mutex<UdpRuntime>>,
47 pub startup: UdpRuntime,
48}
49
50impl Default for UdpConfig {
51 fn default() -> Self {
52 let mut config = UdpConfig {
53 name: String::new(),
54 listen_ip: None,
55 listen_port: None,
56 forward_ip: None,
57 forward_port: None,
58 interface_id: InterfaceId(0),
59 runtime: Arc::new(Mutex::new(UdpRuntime {
60 forward_ip: None,
61 forward_port: None,
62 })),
63 };
64 let startup = UdpRuntime::from_config(&config);
65 config.runtime = Arc::new(Mutex::new(startup));
66 config
67 }
68}
69
70struct UdpWriter {
72 socket: UdpSocket,
73 runtime: Arc<Mutex<UdpRuntime>>,
74}
75
76impl Writer for UdpWriter {
77 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
78 let runtime = self.runtime.lock().unwrap().clone();
79 let target = match (runtime.forward_ip, runtime.forward_port) {
80 (Some(ip), Some(port)) => format!("{}:{}", ip, port)
81 .parse::<SocketAddr>()
82 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
83 _ => {
84 return Err(io::Error::new(
85 io::ErrorKind::Other,
86 "UDP interface has no forward target configured",
87 ))
88 }
89 };
90 self.socket.send_to(data, target)?;
91 Ok(())
92 }
93}
94
95pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
98 let id = config.interface_id;
99 {
100 let startup = UdpRuntime::from_config(&config);
101 *config.runtime.lock().unwrap() = startup;
102 }
103 let send_socket = UdpSocket::bind("0.0.0.0:0")?;
104 send_socket.set_broadcast(true)?;
105 let writer: Option<Box<dyn Writer>> = Some(Box::new(UdpWriter {
106 socket: send_socket,
107 runtime: Arc::clone(&config.runtime),
108 }));
109
110 if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
112 let bind_addr = format!("{}:{}", bind_ip, bind_port);
113 let recv_socket = UdpSocket::bind(&bind_addr)?;
114
115 log::info!("[{}] UDP listening on {}", config.name, bind_addr);
116
117 let _ = tx.send(Event::InterfaceUp(id, None, None));
119
120 let name = config.name.clone();
121 thread::Builder::new()
122 .name(format!("udp-reader-{}", id.0))
123 .spawn(move || {
124 udp_reader_loop(recv_socket, id, name, tx);
125 })?;
126 }
127
128 Ok(writer)
129}
130
131fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
133 let mut buf = [0u8; 2048];
134
135 loop {
136 match socket.recv_from(&mut buf) {
137 Ok((n, _src)) => {
138 if tx
139 .send(Event::Frame {
140 interface_id: id,
141 data: buf[..n].to_vec(),
142 })
143 .is_err()
144 {
145 return;
147 }
148 }
149 Err(e) => {
150 log::warn!("[{}] recv error: {}", name, e);
151 let _ = tx.send(Event::InterfaceDown(id));
152 return;
153 }
154 }
155 }
156}
157
158use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
161use rns_core::transport::types::InterfaceInfo;
162use std::collections::HashMap;
163
164pub struct UdpFactory;
166
167impl InterfaceFactory for UdpFactory {
168 fn type_name(&self) -> &str {
169 "UDPInterface"
170 }
171
172 fn parse_config(
173 &self,
174 name: &str,
175 id: InterfaceId,
176 params: &HashMap<String, String>,
177 ) -> Result<Box<dyn InterfaceConfigData>, String> {
178 let listen_ip = params.get("listen_ip").cloned();
179
180 let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
182
183 let listen_port: Option<u16> = params
184 .get("listen_port")
185 .and_then(|v| v.parse().ok())
186 .or(port_shorthand);
187
188 let forward_ip = params.get("forward_ip").cloned();
189
190 let forward_port: Option<u16> = params
191 .get("forward_port")
192 .and_then(|v| v.parse().ok())
193 .or(port_shorthand);
194
195 let mut config = UdpConfig {
196 name: name.to_string(),
197 listen_ip,
198 listen_port,
199 forward_ip,
200 forward_port,
201 interface_id: id,
202 runtime: Arc::new(Mutex::new(UdpRuntime {
203 forward_ip: None,
204 forward_port: None,
205 })),
206 };
207 let startup = UdpRuntime::from_config(&config);
208 config.runtime = Arc::new(Mutex::new(startup));
209 Ok(Box::new(config))
210 }
211
212 fn start(
213 &self,
214 config: Box<dyn InterfaceConfigData>,
215 ctx: StartContext,
216 ) -> io::Result<StartResult> {
217 let udp_config = *config
218 .into_any()
219 .downcast::<UdpConfig>()
220 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
221
222 let id = udp_config.interface_id;
223 let name = udp_config.name.clone();
224 let out_capable = udp_config.forward_ip.is_some();
225 let in_capable = udp_config.listen_ip.is_some();
226
227 let info = InterfaceInfo {
228 id,
229 name,
230 mode: ctx.mode,
231 out_capable,
232 in_capable,
233 bitrate: Some(10_000_000),
234 announce_rate_target: None,
235 announce_rate_grace: 0,
236 announce_rate_penalty: 0.0,
237 announce_cap: rns_core::constants::ANNOUNCE_CAP,
238 is_local_client: false,
239 wants_tunnel: false,
240 tunnel_id: None,
241 mtu: 1400,
242 ingress_control: true,
243 ia_freq: 0.0,
244 started: crate::time::now(),
245 };
246
247 let maybe_writer = start(udp_config, ctx.tx)?;
248
249 let writer: Box<dyn Writer> = maybe_writer.unwrap();
250
251 Ok(StartResult::Simple {
252 id,
253 info,
254 writer,
255 interface_type_name: "UDPInterface".to_string(),
256 })
257 }
258}
259
260pub(crate) fn udp_runtime_handle_from_config(config: &UdpConfig) -> UdpRuntimeConfigHandle {
261 UdpRuntimeConfigHandle {
262 interface_name: config.name.clone(),
263 runtime: Arc::clone(&config.runtime),
264 startup: UdpRuntime::from_config(config),
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use std::net::UdpSocket;
272 use std::time::Duration;
273
274 fn find_free_port() -> u16 {
275 std::net::TcpListener::bind("127.0.0.1:0")
276 .unwrap()
277 .local_addr()
278 .unwrap()
279 .port()
280 }
281
282 #[test]
283 fn bind_and_receive() {
284 let port = find_free_port();
285 let (tx, rx) = crate::event::channel();
286
287 let config = UdpConfig {
288 name: "test-udp".into(),
289 listen_ip: Some("127.0.0.1".into()),
290 listen_port: Some(port),
291 forward_ip: None,
292 forward_port: None,
293 interface_id: InterfaceId(10),
294 ..UdpConfig::default()
295 };
296
297 let _writer = start(config, tx).unwrap();
298
299 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
301
302 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
304 let payload = b"hello udp";
305 sender
306 .send_to(payload, format!("127.0.0.1:{}", port))
307 .unwrap();
308
309 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
311 match event {
312 Event::Frame { interface_id, data } => {
313 assert_eq!(interface_id, InterfaceId(10));
314 assert_eq!(data, payload);
315 }
316 other => panic!("expected Frame, got {:?}", other),
317 }
318 }
319
320 #[test]
321 fn send_broadcast() {
322 let recv_port = find_free_port();
323 let (tx, _rx) = crate::event::channel();
324
325 let config = UdpConfig {
326 name: "test-udp-send".into(),
327 listen_ip: None,
328 listen_port: None,
329 forward_ip: Some("127.0.0.1".into()),
330 forward_port: Some(recv_port),
331 interface_id: InterfaceId(11),
332 ..UdpConfig::default()
333 };
334
335 let writer = start(config, tx).unwrap();
336 let mut writer = writer.unwrap();
337
338 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
340 receiver
341 .set_read_timeout(Some(Duration::from_secs(2)))
342 .unwrap();
343
344 let payload = b"broadcast data";
346 writer.send_frame(payload).unwrap();
347
348 let mut buf = [0u8; 256];
350 let (n, _) = receiver.recv_from(&mut buf).unwrap();
351 assert_eq!(&buf[..n], payload);
352 }
353
354 #[test]
355 fn round_trip() {
356 let listen_port = find_free_port();
357 let forward_port = find_free_port();
358 let (tx, rx) = crate::event::channel();
359
360 let config = UdpConfig {
361 name: "test-udp-rt".into(),
362 listen_ip: Some("127.0.0.1".into()),
363 listen_port: Some(listen_port),
364 forward_ip: Some("127.0.0.1".into()),
365 forward_port: Some(forward_port),
366 interface_id: InterfaceId(12),
367 ..UdpConfig::default()
368 };
369
370 let writer = start(config, tx).unwrap();
371 assert!(writer.is_some());
372
373 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
375
376 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
378 sender
379 .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
380 .unwrap();
381
382 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
383 match event {
384 Event::Frame { data, .. } => assert_eq!(data, b"ping"),
385 other => panic!("expected Frame, got {:?}", other),
386 }
387 }
388
389 #[test]
390 fn multiple_datagrams() {
391 let port = find_free_port();
392 let (tx, rx) = crate::event::channel();
393
394 let config = UdpConfig {
395 name: "test-udp-multi".into(),
396 listen_ip: Some("127.0.0.1".into()),
397 listen_port: Some(port),
398 forward_ip: None,
399 forward_port: None,
400 interface_id: InterfaceId(13),
401 ..UdpConfig::default()
402 };
403
404 let _writer = start(config, tx).unwrap();
405
406 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
408
409 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
410 for i in 0..5u8 {
411 sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
412 }
413
414 for i in 0..5u8 {
415 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
416 match event {
417 Event::Frame { data, .. } => assert_eq!(data, vec![i]),
418 other => panic!("expected Frame, got {:?}", other),
419 }
420 }
421 }
422
423 #[test]
424 fn writer_send_to() {
425 let recv_port = find_free_port();
426
427 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
429 receiver
430 .set_read_timeout(Some(Duration::from_secs(2)))
431 .unwrap();
432
433 let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
435 send_socket.set_broadcast(true).unwrap();
436 let mut writer = UdpWriter {
437 socket: send_socket,
438 runtime: Arc::new(Mutex::new(UdpRuntime {
439 forward_ip: Some("127.0.0.1".into()),
440 forward_port: Some(recv_port),
441 })),
442 };
443
444 let payload = vec![0xAA, 0xBB, 0xCC];
445 writer.send_frame(&payload).unwrap();
446
447 let mut buf = [0u8; 256];
448 let (n, _) = receiver.recv_from(&mut buf).unwrap();
449 assert_eq!(&buf[..n], &payload);
450 }
451}