rns_net/holepunch/
udp_direct.rs1use std::io;
8use std::net::{SocketAddr, UdpSocket};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18
19use super::puncher;
20
21const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
23
24const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(120);
26
27pub struct UdpDirectWriter {
31 socket: Arc<UdpSocket>,
32 peer_addr: SocketAddr,
33 running: Arc<AtomicBool>,
34}
35
36impl Writer for UdpDirectWriter {
37 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
38 self.socket.send_to(data, self.peer_addr)?;
39 Ok(())
40 }
41}
42
43impl Drop for UdpDirectWriter {
44 fn drop(&mut self) {
45 self.running.store(false, Ordering::Relaxed);
46 }
47}
48
49pub fn start_direct_interface(
55 socket: UdpSocket,
56 peer_addr: SocketAddr,
57 interface_id: InterfaceId,
58 session_id: [u8; 16],
59 punch_token: [u8; 32],
60 tx: EventSender,
61) -> io::Result<(Box<dyn Writer>, InterfaceInfo)> {
62 let socket = Arc::new(socket);
63 socket.set_read_timeout(Some(Duration::from_secs(1)))?;
64
65 let running = Arc::new(AtomicBool::new(true));
66
67 let writer = UdpDirectWriter {
68 socket: socket.clone(),
69 peer_addr,
70 running: running.clone(),
71 };
72
73 let name = format!(
74 "DirectPeer/{:02x}{:02x}{:02x}{:02x}",
75 session_id[0], session_id[1], session_id[2], session_id[3]
76 );
77
78 let info = InterfaceInfo {
79 id: interface_id,
80 name: name.clone(),
81 mode: rns_core::constants::MODE_FULL,
82 out_capable: true,
83 in_capable: true,
84 bitrate: None,
85 airtime_profile: None,
86 announce_rate_target: None,
87 announce_rate_grace: 0,
88 announce_rate_penalty: 0.0,
89 announce_cap: 1.0,
90 is_local_client: false,
91 wants_tunnel: false,
92 tunnel_id: None,
93 mtu: 1400,
94 ia_freq: 0.0,
95 started: 0.0,
96 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
97 };
98
99 let running_clone = running.clone();
100
101 thread::Builder::new()
103 .name(format!("direct-udp-{}", &name))
104 .spawn(move || {
105 run_reader(
106 socket,
107 peer_addr,
108 interface_id,
109 session_id,
110 punch_token,
111 tx,
112 running_clone,
113 );
114 })?;
115
116 Ok((Box::new(writer), info))
117}
118
119fn run_reader(
120 socket: Arc<UdpSocket>,
121 peer_addr: SocketAddr,
122 interface_id: InterfaceId,
123 session_id: [u8; 16],
124 punch_token: [u8; 32],
125 tx: EventSender,
126 running: Arc<AtomicBool>,
127) {
128 let mut buf = [0u8; 2048];
129 let mut last_inbound = std::time::Instant::now();
130 let mut last_keepalive = std::time::Instant::now();
131 let keepalive_pkt = puncher::build_keepalive_packet(&session_id, &punch_token);
132
133 while running.load(Ordering::Relaxed) {
134 if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL {
136 let _ = socket.send_to(&keepalive_pkt, peer_addr);
137 last_keepalive = std::time::Instant::now();
138 }
139
140 if last_inbound.elapsed() >= INACTIVITY_TIMEOUT {
142 log::info!("[{}] Direct UDP interface timed out", interface_id.0);
143 let _ = tx.send(Event::InterfaceDown(interface_id));
144 break;
145 }
146
147 match socket.recv_from(&mut buf) {
148 Ok((len, src)) => {
149 if src != peer_addr {
151 continue;
152 }
153 last_inbound = std::time::Instant::now();
154
155 if len >= 4 && (buf[..4] == *b"RNSH" || buf[..4] == *b"RNSA") {
157 continue;
158 }
159
160 if len > 0 {
162 let _ = tx.send(Event::Frame {
163 interface_id,
164 data: buf[..len].to_vec(),
165 });
166 }
167 }
168 Err(ref e)
169 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
170 {
171 continue;
172 }
173 Err(e) => {
174 log::warn!("[{}] Direct UDP recv error: {}", interface_id.0, e);
175 let _ = tx.send(Event::InterfaceDown(interface_id));
176 break;
177 }
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn test_udp_direct_writer() {
188 let sock_a = UdpSocket::bind("127.0.0.1:0").unwrap();
190 let sock_b = UdpSocket::bind("127.0.0.1:0").unwrap();
191 let addr_b = sock_b.local_addr().unwrap();
192 sock_b
193 .set_read_timeout(Some(Duration::from_secs(1)))
194 .unwrap();
195
196 let mut writer = UdpDirectWriter {
197 socket: Arc::new(sock_a),
198 peer_addr: addr_b,
199 running: Arc::new(AtomicBool::new(true)),
200 };
201
202 let data = b"hello direct peer";
204 writer.send_frame(data).unwrap();
205
206 let mut buf = [0u8; 64];
208 let (len, _src) = sock_b.recv_from(&mut buf).unwrap();
209 assert_eq!(&buf[..len], data);
210 }
211}