rns_net/holepunch/
udp_direct.rs1use std::io;
8use std::net::{SocketAddr, UdpSocket};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18
19use super::puncher;
20
21const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
23
24const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(120);
26
27pub struct UdpDirectWriter {
31 socket: Arc<UdpSocket>,
32 peer_addr: SocketAddr,
33 running: Arc<AtomicBool>,
34}
35
36impl Writer for UdpDirectWriter {
37 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
38 self.socket.send_to(data, self.peer_addr)?;
39 Ok(())
40 }
41}
42
43impl Drop for UdpDirectWriter {
44 fn drop(&mut self) {
45 self.running.store(false, Ordering::Relaxed);
46 }
47}
48
49pub fn start_direct_interface(
55 socket: UdpSocket,
56 peer_addr: SocketAddr,
57 interface_id: InterfaceId,
58 session_id: [u8; 16],
59 punch_token: [u8; 32],
60 tx: EventSender,
61) -> io::Result<(Box<dyn Writer>, InterfaceInfo)> {
62 let socket = Arc::new(socket);
63 socket.set_read_timeout(Some(Duration::from_secs(1)))?;
64
65 let running = Arc::new(AtomicBool::new(true));
66
67 let writer = UdpDirectWriter {
68 socket: socket.clone(),
69 peer_addr,
70 running: running.clone(),
71 };
72
73 let name = format!(
74 "DirectPeer/{:02x}{:02x}{:02x}{:02x}",
75 session_id[0], session_id[1], session_id[2], session_id[3]
76 );
77
78 let info = InterfaceInfo {
79 id: interface_id,
80 name: name.clone(),
81 mode: rns_core::constants::MODE_FULL,
82 out_capable: true,
83 in_capable: true,
84 bitrate: None,
85 announce_rate_target: None,
86 announce_rate_grace: 0,
87 announce_rate_penalty: 0.0,
88 announce_cap: 1.0,
89 is_local_client: false,
90 wants_tunnel: false,
91 tunnel_id: None,
92 mtu: 1400,
93 ia_freq: 0.0,
94 started: 0.0,
95 ingress_control: false,
96 };
97
98 let running_clone = running.clone();
99
100 thread::Builder::new()
102 .name(format!("direct-udp-{}", &name))
103 .spawn(move || {
104 run_reader(
105 socket,
106 peer_addr,
107 interface_id,
108 session_id,
109 punch_token,
110 tx,
111 running_clone,
112 );
113 })?;
114
115 Ok((Box::new(writer), info))
116}
117
118fn run_reader(
119 socket: Arc<UdpSocket>,
120 peer_addr: SocketAddr,
121 interface_id: InterfaceId,
122 session_id: [u8; 16],
123 punch_token: [u8; 32],
124 tx: EventSender,
125 running: Arc<AtomicBool>,
126) {
127 let mut buf = [0u8; 2048];
128 let mut last_inbound = std::time::Instant::now();
129 let mut last_keepalive = std::time::Instant::now();
130 let keepalive_pkt = puncher::build_keepalive_packet(&session_id, &punch_token);
131
132 while running.load(Ordering::Relaxed) {
133 if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL {
135 let _ = socket.send_to(&keepalive_pkt, peer_addr);
136 last_keepalive = std::time::Instant::now();
137 }
138
139 if last_inbound.elapsed() >= INACTIVITY_TIMEOUT {
141 log::info!("[{}] Direct UDP interface timed out", interface_id.0);
142 let _ = tx.send(Event::InterfaceDown(interface_id));
143 break;
144 }
145
146 match socket.recv_from(&mut buf) {
147 Ok((len, src)) => {
148 if src != peer_addr {
150 continue;
151 }
152 last_inbound = std::time::Instant::now();
153
154 if len >= 4 && (buf[..4] == *b"RNSH" || buf[..4] == *b"RNSA") {
156 continue;
157 }
158
159 if len > 0 {
161 let _ = tx.send(Event::Frame {
162 interface_id,
163 data: buf[..len].to_vec(),
164 });
165 }
166 }
167 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
168 continue;
169 }
170 Err(e) => {
171 log::warn!("[{}] Direct UDP recv error: {}", interface_id.0, e);
172 let _ = tx.send(Event::InterfaceDown(interface_id));
173 break;
174 }
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_udp_direct_writer() {
185 let sock_a = UdpSocket::bind("127.0.0.1:0").unwrap();
187 let sock_b = UdpSocket::bind("127.0.0.1:0").unwrap();
188 let addr_b = sock_b.local_addr().unwrap();
189 sock_b.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
190
191 let mut writer = UdpDirectWriter {
192 socket: Arc::new(sock_a),
193 peer_addr: addr_b,
194 running: Arc::new(AtomicBool::new(true)),
195 };
196
197 let data = b"hello direct peer";
199 writer.send_frame(data).unwrap();
200
201 let mut buf = [0u8; 64];
203 let (len, _src) = sock_b.recv_from(&mut buf).unwrap();
204 assert_eq!(&buf[..len], data);
205 }
206}