1use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::thread;
9
10use rns_core::transport::types::InterfaceId;
11
12use crate::event::{Event, EventSender};
13use crate::interface::Writer;
14
15#[derive(Debug, Clone)]
17pub struct UdpConfig {
18 pub name: String,
19 pub listen_ip: Option<String>,
20 pub listen_port: Option<u16>,
21 pub forward_ip: Option<String>,
22 pub forward_port: Option<u16>,
23 pub interface_id: InterfaceId,
24}
25
26impl Default for UdpConfig {
27 fn default() -> Self {
28 UdpConfig {
29 name: String::new(),
30 listen_ip: None,
31 listen_port: None,
32 forward_ip: None,
33 forward_port: None,
34 interface_id: InterfaceId(0),
35 }
36 }
37}
38
39struct UdpWriter {
41 socket: UdpSocket,
42 target: SocketAddr,
43}
44
45impl Writer for UdpWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.socket.send_to(data, self.target)?;
48 Ok(())
49 }
50}
51
52pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
55 let id = config.interface_id;
56 let mut writer: Option<Box<dyn Writer>> = None;
57
58 if let (Some(ref fwd_ip), Some(fwd_port)) = (&config.forward_ip, config.forward_port) {
60 let target: SocketAddr = format!("{}:{}", fwd_ip, fwd_port)
61 .parse()
62 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
63
64 let send_socket = UdpSocket::bind("0.0.0.0:0")?;
65 send_socket.set_broadcast(true)?;
66
67 writer = Some(Box::new(UdpWriter {
68 socket: send_socket,
69 target,
70 }));
71 }
72
73 if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
75 let bind_addr = format!("{}:{}", bind_ip, bind_port);
76 let recv_socket = UdpSocket::bind(&bind_addr)?;
77
78 log::info!("[{}] UDP listening on {}", config.name, bind_addr);
79
80 let _ = tx.send(Event::InterfaceUp(id, None, None));
82
83 let name = config.name.clone();
84 thread::Builder::new()
85 .name(format!("udp-reader-{}", id.0))
86 .spawn(move || {
87 udp_reader_loop(recv_socket, id, name, tx);
88 })?;
89 }
90
91 Ok(writer)
92}
93
94fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
96 let mut buf = [0u8; 2048];
97
98 loop {
99 match socket.recv_from(&mut buf) {
100 Ok((n, _src)) => {
101 if tx
102 .send(Event::Frame {
103 interface_id: id,
104 data: buf[..n].to_vec(),
105 })
106 .is_err()
107 {
108 return;
110 }
111 }
112 Err(e) => {
113 log::warn!("[{}] recv error: {}", name, e);
114 let _ = tx.send(Event::InterfaceDown(id));
115 return;
116 }
117 }
118 }
119}
120
121use std::collections::HashMap;
124use rns_core::transport::types::InterfaceInfo;
125use super::{InterfaceFactory, InterfaceConfigData, StartContext, StartResult};
126
127struct NoopWriter;
130
131impl Writer for NoopWriter {
132 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
133 Err(io::Error::new(
134 io::ErrorKind::Other,
135 "listen-only UDP interface",
136 ))
137 }
138}
139
140pub struct UdpFactory;
142
143impl InterfaceFactory for UdpFactory {
144 fn type_name(&self) -> &str { "UDPInterface" }
145
146 fn parse_config(
147 &self,
148 name: &str,
149 id: InterfaceId,
150 params: &HashMap<String, String>,
151 ) -> Result<Box<dyn InterfaceConfigData>, String> {
152 let listen_ip = params.get("listen_ip").cloned();
153
154 let port_shorthand: Option<u16> = params
156 .get("port")
157 .and_then(|v| v.parse().ok());
158
159 let listen_port: Option<u16> = params
160 .get("listen_port")
161 .and_then(|v| v.parse().ok())
162 .or(port_shorthand);
163
164 let forward_ip = params.get("forward_ip").cloned();
165
166 let forward_port: Option<u16> = params
167 .get("forward_port")
168 .and_then(|v| v.parse().ok())
169 .or(port_shorthand);
170
171 Ok(Box::new(UdpConfig {
172 name: name.to_string(),
173 listen_ip,
174 listen_port,
175 forward_ip,
176 forward_port,
177 interface_id: id,
178 }))
179 }
180
181 fn start(
182 &self,
183 config: Box<dyn InterfaceConfigData>,
184 ctx: StartContext,
185 ) -> io::Result<StartResult> {
186 let udp_config = *config.into_any().downcast::<UdpConfig>()
187 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
188
189 let id = udp_config.interface_id;
190 let name = udp_config.name.clone();
191 let out_capable = udp_config.forward_ip.is_some();
192 let in_capable = udp_config.listen_ip.is_some();
193
194 let info = InterfaceInfo {
195 id,
196 name,
197 mode: ctx.mode,
198 out_capable,
199 in_capable,
200 bitrate: Some(10_000_000),
201 announce_rate_target: None,
202 announce_rate_grace: 0,
203 announce_rate_penalty: 0.0,
204 announce_cap: rns_core::constants::ANNOUNCE_CAP,
205 is_local_client: false,
206 wants_tunnel: false,
207 tunnel_id: None,
208 mtu: 1400,
209 ingress_control: true,
210 ia_freq: 0.0,
211 started: crate::time::now(),
212 };
213
214 let maybe_writer = start(udp_config, ctx.tx)?;
215
216 let writer: Box<dyn Writer> = match maybe_writer {
217 Some(w) => w,
218 None => Box::new(NoopWriter),
219 };
220
221 Ok(StartResult::Simple {
222 id,
223 info,
224 writer,
225 interface_type_name: "UDPInterface".to_string(),
226 })
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use std::net::UdpSocket;
234 use std::sync::mpsc;
235 use std::time::Duration;
236
237 fn find_free_port() -> u16 {
238 std::net::TcpListener::bind("127.0.0.1:0")
239 .unwrap()
240 .local_addr()
241 .unwrap()
242 .port()
243 }
244
245 #[test]
246 fn bind_and_receive() {
247 let port = find_free_port();
248 let (tx, rx) = mpsc::channel();
249
250 let config = UdpConfig {
251 name: "test-udp".into(),
252 listen_ip: Some("127.0.0.1".into()),
253 listen_port: Some(port),
254 forward_ip: None,
255 forward_port: None,
256 interface_id: InterfaceId(10),
257 };
258
259 let _writer = start(config, tx).unwrap();
260
261 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
263
264 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
266 let payload = b"hello udp";
267 sender
268 .send_to(payload, format!("127.0.0.1:{}", port))
269 .unwrap();
270
271 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
273 match event {
274 Event::Frame { interface_id, data } => {
275 assert_eq!(interface_id, InterfaceId(10));
276 assert_eq!(data, payload);
277 }
278 other => panic!("expected Frame, got {:?}", other),
279 }
280 }
281
282 #[test]
283 fn send_broadcast() {
284 let recv_port = find_free_port();
285 let (tx, _rx) = mpsc::channel();
286
287 let config = UdpConfig {
288 name: "test-udp-send".into(),
289 listen_ip: None,
290 listen_port: None,
291 forward_ip: Some("127.0.0.1".into()),
292 forward_port: Some(recv_port),
293 interface_id: InterfaceId(11),
294 };
295
296 let writer = start(config, tx).unwrap();
297 let mut writer = writer.unwrap();
298
299 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
301 receiver
302 .set_read_timeout(Some(Duration::from_secs(2)))
303 .unwrap();
304
305 let payload = b"broadcast data";
307 writer.send_frame(payload).unwrap();
308
309 let mut buf = [0u8; 256];
311 let (n, _) = receiver.recv_from(&mut buf).unwrap();
312 assert_eq!(&buf[..n], payload);
313 }
314
315 #[test]
316 fn round_trip() {
317 let listen_port = find_free_port();
318 let forward_port = find_free_port();
319 let (tx, rx) = mpsc::channel();
320
321 let config = UdpConfig {
322 name: "test-udp-rt".into(),
323 listen_ip: Some("127.0.0.1".into()),
324 listen_port: Some(listen_port),
325 forward_ip: Some("127.0.0.1".into()),
326 forward_port: Some(forward_port),
327 interface_id: InterfaceId(12),
328 };
329
330 let writer = start(config, tx).unwrap();
331 assert!(writer.is_some());
332
333 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
335
336 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
338 sender
339 .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
340 .unwrap();
341
342 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
343 match event {
344 Event::Frame { data, .. } => assert_eq!(data, b"ping"),
345 other => panic!("expected Frame, got {:?}", other),
346 }
347 }
348
349 #[test]
350 fn multiple_datagrams() {
351 let port = find_free_port();
352 let (tx, rx) = mpsc::channel();
353
354 let config = UdpConfig {
355 name: "test-udp-multi".into(),
356 listen_ip: Some("127.0.0.1".into()),
357 listen_port: Some(port),
358 forward_ip: None,
359 forward_port: None,
360 interface_id: InterfaceId(13),
361 };
362
363 let _writer = start(config, tx).unwrap();
364
365 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
367
368 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
369 for i in 0..5u8 {
370 sender
371 .send_to(&[i], format!("127.0.0.1:{}", port))
372 .unwrap();
373 }
374
375 for i in 0..5u8 {
376 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
377 match event {
378 Event::Frame { data, .. } => assert_eq!(data, vec![i]),
379 other => panic!("expected Frame, got {:?}", other),
380 }
381 }
382 }
383
384 #[test]
385 fn writer_send_to() {
386 let recv_port = find_free_port();
387
388 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
390 receiver
391 .set_read_timeout(Some(Duration::from_secs(2)))
392 .unwrap();
393
394 let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
396 send_socket.set_broadcast(true).unwrap();
397 let target: SocketAddr = format!("127.0.0.1:{}", recv_port).parse().unwrap();
398 let mut writer = UdpWriter {
399 socket: send_socket,
400 target,
401 };
402
403 let payload = vec![0xAA, 0xBB, 0xCC];
404 writer.send_frame(&payload).unwrap();
405
406 let mut buf = [0u8; 256];
407 let (n, _) = receiver.recv_from(&mut buf).unwrap();
408 assert_eq!(&buf[..n], &payload);
409 }
410}