1use std::io::{self};
7use std::net::{SocketAddr, UdpSocket};
8use std::thread;
9
10use rns_core::transport::types::InterfaceId;
11
12use crate::event::{Event, EventSender};
13use crate::interface::Writer;
14
15#[derive(Debug, Clone)]
17pub struct UdpConfig {
18 pub name: String,
19 pub listen_ip: Option<String>,
20 pub listen_port: Option<u16>,
21 pub forward_ip: Option<String>,
22 pub forward_port: Option<u16>,
23 pub interface_id: InterfaceId,
24}
25
26impl Default for UdpConfig {
27 fn default() -> Self {
28 UdpConfig {
29 name: String::new(),
30 listen_ip: None,
31 listen_port: None,
32 forward_ip: None,
33 forward_port: None,
34 interface_id: InterfaceId(0),
35 }
36 }
37}
38
39struct UdpWriter {
41 socket: UdpSocket,
42 target: SocketAddr,
43}
44
45impl Writer for UdpWriter {
46 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
47 self.socket.send_to(data, self.target)?;
48 Ok(())
49 }
50}
51
52pub fn start(config: UdpConfig, tx: EventSender) -> io::Result<Option<Box<dyn Writer>>> {
55 let id = config.interface_id;
56 let mut writer: Option<Box<dyn Writer>> = None;
57
58 if let (Some(ref fwd_ip), Some(fwd_port)) = (&config.forward_ip, config.forward_port) {
60 let target: SocketAddr = format!("{}:{}", fwd_ip, fwd_port)
61 .parse()
62 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
63
64 let send_socket = UdpSocket::bind("0.0.0.0:0")?;
65 send_socket.set_broadcast(true)?;
66
67 writer = Some(Box::new(UdpWriter {
68 socket: send_socket,
69 target,
70 }));
71 }
72
73 if let (Some(ref bind_ip), Some(bind_port)) = (&config.listen_ip, config.listen_port) {
75 let bind_addr = format!("{}:{}", bind_ip, bind_port);
76 let recv_socket = UdpSocket::bind(&bind_addr)?;
77
78 log::info!("[{}] UDP listening on {}", config.name, bind_addr);
79
80 let _ = tx.send(Event::InterfaceUp(id, None, None));
82
83 let name = config.name.clone();
84 thread::Builder::new()
85 .name(format!("udp-reader-{}", id.0))
86 .spawn(move || {
87 udp_reader_loop(recv_socket, id, name, tx);
88 })?;
89 }
90
91 Ok(writer)
92}
93
94fn udp_reader_loop(socket: UdpSocket, id: InterfaceId, name: String, tx: EventSender) {
96 let mut buf = [0u8; 2048];
97
98 loop {
99 match socket.recv_from(&mut buf) {
100 Ok((n, _src)) => {
101 if tx
102 .send(Event::Frame {
103 interface_id: id,
104 data: buf[..n].to_vec(),
105 })
106 .is_err()
107 {
108 return;
110 }
111 }
112 Err(e) => {
113 log::warn!("[{}] recv error: {}", name, e);
114 let _ = tx.send(Event::InterfaceDown(id));
115 return;
116 }
117 }
118 }
119}
120
121use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
124use rns_core::transport::types::InterfaceInfo;
125use std::collections::HashMap;
126
127struct NoopWriter;
130
131impl Writer for NoopWriter {
132 fn send_frame(&mut self, _data: &[u8]) -> io::Result<()> {
133 Err(io::Error::new(
134 io::ErrorKind::Other,
135 "listen-only UDP interface",
136 ))
137 }
138}
139
140pub struct UdpFactory;
142
143impl InterfaceFactory for UdpFactory {
144 fn type_name(&self) -> &str {
145 "UDPInterface"
146 }
147
148 fn parse_config(
149 &self,
150 name: &str,
151 id: InterfaceId,
152 params: &HashMap<String, String>,
153 ) -> Result<Box<dyn InterfaceConfigData>, String> {
154 let listen_ip = params.get("listen_ip").cloned();
155
156 let port_shorthand: Option<u16> = params.get("port").and_then(|v| v.parse().ok());
158
159 let listen_port: Option<u16> = params
160 .get("listen_port")
161 .and_then(|v| v.parse().ok())
162 .or(port_shorthand);
163
164 let forward_ip = params.get("forward_ip").cloned();
165
166 let forward_port: Option<u16> = params
167 .get("forward_port")
168 .and_then(|v| v.parse().ok())
169 .or(port_shorthand);
170
171 Ok(Box::new(UdpConfig {
172 name: name.to_string(),
173 listen_ip,
174 listen_port,
175 forward_ip,
176 forward_port,
177 interface_id: id,
178 }))
179 }
180
181 fn start(
182 &self,
183 config: Box<dyn InterfaceConfigData>,
184 ctx: StartContext,
185 ) -> io::Result<StartResult> {
186 let udp_config = *config
187 .into_any()
188 .downcast::<UdpConfig>()
189 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
190
191 let id = udp_config.interface_id;
192 let name = udp_config.name.clone();
193 let out_capable = udp_config.forward_ip.is_some();
194 let in_capable = udp_config.listen_ip.is_some();
195
196 let info = InterfaceInfo {
197 id,
198 name,
199 mode: ctx.mode,
200 out_capable,
201 in_capable,
202 bitrate: Some(10_000_000),
203 announce_rate_target: None,
204 announce_rate_grace: 0,
205 announce_rate_penalty: 0.0,
206 announce_cap: rns_core::constants::ANNOUNCE_CAP,
207 is_local_client: false,
208 wants_tunnel: false,
209 tunnel_id: None,
210 mtu: 1400,
211 ingress_control: true,
212 ia_freq: 0.0,
213 started: crate::time::now(),
214 };
215
216 let maybe_writer = start(udp_config, ctx.tx)?;
217
218 let writer: Box<dyn Writer> = match maybe_writer {
219 Some(w) => w,
220 None => Box::new(NoopWriter),
221 };
222
223 Ok(StartResult::Simple {
224 id,
225 info,
226 writer,
227 interface_type_name: "UDPInterface".to_string(),
228 })
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use std::net::UdpSocket;
236 use std::sync::mpsc;
237 use std::time::Duration;
238
239 fn find_free_port() -> u16 {
240 std::net::TcpListener::bind("127.0.0.1:0")
241 .unwrap()
242 .local_addr()
243 .unwrap()
244 .port()
245 }
246
247 #[test]
248 fn bind_and_receive() {
249 let port = find_free_port();
250 let (tx, rx) = mpsc::channel();
251
252 let config = UdpConfig {
253 name: "test-udp".into(),
254 listen_ip: Some("127.0.0.1".into()),
255 listen_port: Some(port),
256 forward_ip: None,
257 forward_port: None,
258 interface_id: InterfaceId(10),
259 };
260
261 let _writer = start(config, tx).unwrap();
262
263 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
265
266 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
268 let payload = b"hello udp";
269 sender
270 .send_to(payload, format!("127.0.0.1:{}", port))
271 .unwrap();
272
273 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
275 match event {
276 Event::Frame { interface_id, data } => {
277 assert_eq!(interface_id, InterfaceId(10));
278 assert_eq!(data, payload);
279 }
280 other => panic!("expected Frame, got {:?}", other),
281 }
282 }
283
284 #[test]
285 fn send_broadcast() {
286 let recv_port = find_free_port();
287 let (tx, _rx) = mpsc::channel();
288
289 let config = UdpConfig {
290 name: "test-udp-send".into(),
291 listen_ip: None,
292 listen_port: None,
293 forward_ip: Some("127.0.0.1".into()),
294 forward_port: Some(recv_port),
295 interface_id: InterfaceId(11),
296 };
297
298 let writer = start(config, tx).unwrap();
299 let mut writer = writer.unwrap();
300
301 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
303 receiver
304 .set_read_timeout(Some(Duration::from_secs(2)))
305 .unwrap();
306
307 let payload = b"broadcast data";
309 writer.send_frame(payload).unwrap();
310
311 let mut buf = [0u8; 256];
313 let (n, _) = receiver.recv_from(&mut buf).unwrap();
314 assert_eq!(&buf[..n], payload);
315 }
316
317 #[test]
318 fn round_trip() {
319 let listen_port = find_free_port();
320 let forward_port = find_free_port();
321 let (tx, rx) = mpsc::channel();
322
323 let config = UdpConfig {
324 name: "test-udp-rt".into(),
325 listen_ip: Some("127.0.0.1".into()),
326 listen_port: Some(listen_port),
327 forward_ip: Some("127.0.0.1".into()),
328 forward_port: Some(forward_port),
329 interface_id: InterfaceId(12),
330 };
331
332 let writer = start(config, tx).unwrap();
333 assert!(writer.is_some());
334
335 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
337
338 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
340 sender
341 .send_to(b"ping", format!("127.0.0.1:{}", listen_port))
342 .unwrap();
343
344 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
345 match event {
346 Event::Frame { data, .. } => assert_eq!(data, b"ping"),
347 other => panic!("expected Frame, got {:?}", other),
348 }
349 }
350
351 #[test]
352 fn multiple_datagrams() {
353 let port = find_free_port();
354 let (tx, rx) = mpsc::channel();
355
356 let config = UdpConfig {
357 name: "test-udp-multi".into(),
358 listen_ip: Some("127.0.0.1".into()),
359 listen_port: Some(port),
360 forward_ip: None,
361 forward_port: None,
362 interface_id: InterfaceId(13),
363 };
364
365 let _writer = start(config, tx).unwrap();
366
367 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
369
370 let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
371 for i in 0..5u8 {
372 sender.send_to(&[i], format!("127.0.0.1:{}", port)).unwrap();
373 }
374
375 for i in 0..5u8 {
376 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
377 match event {
378 Event::Frame { data, .. } => assert_eq!(data, vec![i]),
379 other => panic!("expected Frame, got {:?}", other),
380 }
381 }
382 }
383
384 #[test]
385 fn writer_send_to() {
386 let recv_port = find_free_port();
387
388 let receiver = UdpSocket::bind(format!("127.0.0.1:{}", recv_port)).unwrap();
390 receiver
391 .set_read_timeout(Some(Duration::from_secs(2)))
392 .unwrap();
393
394 let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
396 send_socket.set_broadcast(true).unwrap();
397 let target: SocketAddr = format!("127.0.0.1:{}", recv_port).parse().unwrap();
398 let mut writer = UdpWriter {
399 socket: send_socket,
400 target,
401 };
402
403 let payload = vec![0xAA, 0xBB, 0xCC];
404 writer.send_frame(&payload).unwrap();
405
406 let mut buf = [0u8; 256];
407 let (n, _) = receiver.recv_from(&mut buf).unwrap();
408 assert_eq!(&buf[..n], &payload);
409 }
410}