rns_net/holepunch/
udp_direct.rs1use std::io;
8use std::net::{SocketAddr, UdpSocket};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18
19use super::puncher;
20
21const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
23
24const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(120);
26
27pub struct UdpDirectWriter {
31 socket: Arc<UdpSocket>,
32 peer_addr: SocketAddr,
33 running: Arc<AtomicBool>,
34}
35
36impl Writer for UdpDirectWriter {
37 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
38 self.socket.send_to(data, self.peer_addr)?;
39 Ok(())
40 }
41}
42
43impl Drop for UdpDirectWriter {
44 fn drop(&mut self) {
45 self.running.store(false, Ordering::Relaxed);
46 }
47}
48
49pub fn start_direct_interface(
55 socket: UdpSocket,
56 peer_addr: SocketAddr,
57 interface_id: InterfaceId,
58 session_id: [u8; 16],
59 punch_token: [u8; 32],
60 tx: EventSender,
61) -> io::Result<(Box<dyn Writer>, InterfaceInfo)> {
62 let socket = Arc::new(socket);
63 socket.set_read_timeout(Some(Duration::from_secs(1)))?;
64
65 let running = Arc::new(AtomicBool::new(true));
66
67 let writer = UdpDirectWriter {
68 socket: socket.clone(),
69 peer_addr,
70 running: running.clone(),
71 };
72
73 let name = format!(
74 "DirectPeer/{:02x}{:02x}{:02x}{:02x}",
75 session_id[0], session_id[1], session_id[2], session_id[3]
76 );
77
78 let info = InterfaceInfo {
79 id: interface_id,
80 name: name.clone(),
81 mode: rns_core::constants::MODE_FULL,
82 out_capable: true,
83 in_capable: true,
84 bitrate: None,
85 airtime_profile: None,
86 announce_rate_target: None,
87 announce_rate_grace: 0,
88 announce_rate_penalty: 0.0,
89 announce_cap: 1.0,
90 is_local_client: false,
91 wants_tunnel: false,
92 tunnel_id: None,
93 mtu: 1400,
94 ia_freq: 0.0,
95 ip_freq: 0.0,
96 op_freq: 0.0,
97 op_samples: 0,
98 started: 0.0,
99 ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
100 };
101
102 let running_clone = running.clone();
103
104 thread::Builder::new()
106 .name(format!("direct-udp-{}", &name))
107 .spawn(move || {
108 run_reader(
109 socket,
110 peer_addr,
111 interface_id,
112 session_id,
113 punch_token,
114 tx,
115 running_clone,
116 );
117 })?;
118
119 Ok((Box::new(writer), info))
120}
121
122fn run_reader(
123 socket: Arc<UdpSocket>,
124 peer_addr: SocketAddr,
125 interface_id: InterfaceId,
126 session_id: [u8; 16],
127 punch_token: [u8; 32],
128 tx: EventSender,
129 running: Arc<AtomicBool>,
130) {
131 let mut buf = [0u8; 2048];
132 let mut last_inbound = std::time::Instant::now();
133 let mut last_keepalive = std::time::Instant::now();
134 let keepalive_pkt = puncher::build_keepalive_packet(&session_id, &punch_token);
135
136 while running.load(Ordering::Relaxed) {
137 if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL {
139 let _ = socket.send_to(&keepalive_pkt, peer_addr);
140 last_keepalive = std::time::Instant::now();
141 }
142
143 if last_inbound.elapsed() >= INACTIVITY_TIMEOUT {
145 log::info!("[{}] Direct UDP interface timed out", interface_id.0);
146 let _ = tx.send(Event::InterfaceDown(interface_id));
147 break;
148 }
149
150 match socket.recv_from(&mut buf) {
151 Ok((len, src)) => {
152 if src != peer_addr {
154 continue;
155 }
156 last_inbound = std::time::Instant::now();
157
158 if len >= 4 && (buf[..4] == *b"RNSH" || buf[..4] == *b"RNSA") {
160 continue;
161 }
162
163 if len > 0 {
165 let _ = tx.send(Event::Frame {
166 interface_id,
167 data: buf[..len].to_vec(),
168 rssi: None,
169 snr: None,
170 });
171 }
172 }
173 Err(ref e)
174 if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
175 {
176 continue;
177 }
178 Err(e) => {
179 log::warn!("[{}] Direct UDP recv error: {}", interface_id.0, e);
180 let _ = tx.send(Event::InterfaceDown(interface_id));
181 break;
182 }
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_udp_direct_writer() {
193 let sock_a = UdpSocket::bind("127.0.0.1:0").unwrap();
195 let sock_b = UdpSocket::bind("127.0.0.1:0").unwrap();
196 let addr_b = sock_b.local_addr().unwrap();
197 sock_b
198 .set_read_timeout(Some(Duration::from_secs(1)))
199 .unwrap();
200
201 let mut writer = UdpDirectWriter {
202 socket: Arc::new(sock_a),
203 peer_addr: addr_b,
204 running: Arc::new(AtomicBool::new(true)),
205 };
206
207 let data = b"hello direct peer";
209 writer.send_frame(data).unwrap();
210
211 let mut buf = [0u8; 64];
213 let (len, _src) = sock_b.recv_from(&mut buf).unwrap();
214 assert_eq!(&buf[..len], data);
215 }
216}