Skip to main content

rns_net/holepunch/
udp_direct.rs

1//! Point-to-point UDP direct interface.
2//!
3//! After hole punching succeeds, this module wraps the punched UDP socket
4//! as a Reticulum interface (implementing Writer + reader thread).
5//! No HDLC framing needed — one datagram = one packet (same as UdpInterface).
6
7use std::io;
8use std::net::{SocketAddr, UdpSocket};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18
19use super::puncher;
20
21/// Keepalive interval for maintaining NAT mappings.
22const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
23
24/// Timeout for considering the direct connection dead.
25const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(120);
26
27/// Writer for a direct UDP peer-to-peer connection.
28///
29/// When dropped, signals the reader thread to stop.
30pub struct UdpDirectWriter {
31    socket: Arc<UdpSocket>,
32    peer_addr: SocketAddr,
33    running: Arc<AtomicBool>,
34}
35
36impl Writer for UdpDirectWriter {
37    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
38        self.socket.send_to(data, self.peer_addr)?;
39        Ok(())
40    }
41}
42
43impl Drop for UdpDirectWriter {
44    fn drop(&mut self) {
45        self.running.store(false, Ordering::Relaxed);
46    }
47}
48
49/// Start a direct UDP interface from a punched socket.
50///
51/// Returns `(interface_id, writer, InterfaceInfo)` and spawns a reader thread.
52/// The reader thread sends `Event::Frame` for incoming packets and
53/// `Event::InterfaceDown` if the connection times out.
54pub fn start_direct_interface(
55    socket: UdpSocket,
56    peer_addr: SocketAddr,
57    interface_id: InterfaceId,
58    session_id: [u8; 16],
59    punch_token: [u8; 32],
60    tx: EventSender,
61) -> io::Result<(Box<dyn Writer>, InterfaceInfo)> {
62    let socket = Arc::new(socket);
63    socket.set_read_timeout(Some(Duration::from_secs(1)))?;
64
65    let running = Arc::new(AtomicBool::new(true));
66
67    let writer = UdpDirectWriter {
68        socket: socket.clone(),
69        peer_addr,
70        running: running.clone(),
71    };
72
73    let name = format!(
74        "DirectPeer/{:02x}{:02x}{:02x}{:02x}",
75        session_id[0], session_id[1], session_id[2], session_id[3]
76    );
77
78    let info = InterfaceInfo {
79        id: interface_id,
80        name: name.clone(),
81        mode: rns_core::constants::MODE_FULL,
82        out_capable: true,
83        in_capable: true,
84        bitrate: None,
85        announce_rate_target: None,
86        announce_rate_grace: 0,
87        announce_rate_penalty: 0.0,
88        announce_cap: 1.0,
89        is_local_client: false,
90        wants_tunnel: false,
91        tunnel_id: None,
92        mtu: 1400,
93        ia_freq: 0.0,
94        started: 0.0,
95        ingress_control: false,
96    };
97
98    let running_clone = running.clone();
99
100    // Spawn reader + keepalive thread
101    thread::Builder::new()
102        .name(format!("direct-udp-{}", &name))
103        .spawn(move || {
104            run_reader(
105                socket,
106                peer_addr,
107                interface_id,
108                session_id,
109                punch_token,
110                tx,
111                running_clone,
112            );
113        })?;
114
115    Ok((Box::new(writer), info))
116}
117
118fn run_reader(
119    socket: Arc<UdpSocket>,
120    peer_addr: SocketAddr,
121    interface_id: InterfaceId,
122    session_id: [u8; 16],
123    punch_token: [u8; 32],
124    tx: EventSender,
125    running: Arc<AtomicBool>,
126) {
127    let mut buf = [0u8; 2048];
128    let mut last_inbound = std::time::Instant::now();
129    let mut last_keepalive = std::time::Instant::now();
130    let keepalive_pkt = puncher::build_keepalive_packet(&session_id, &punch_token);
131
132    while running.load(Ordering::Relaxed) {
133        // Send keepalive if due
134        if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL {
135            let _ = socket.send_to(&keepalive_pkt, peer_addr);
136            last_keepalive = std::time::Instant::now();
137        }
138
139        // Check inactivity timeout
140        if last_inbound.elapsed() >= INACTIVITY_TIMEOUT {
141            log::info!("[{}] Direct UDP interface timed out", interface_id.0);
142            let _ = tx.send(Event::InterfaceDown(interface_id));
143            break;
144        }
145
146        match socket.recv_from(&mut buf) {
147            Ok((len, src)) => {
148                // Only accept packets from our verified peer
149                if src != peer_addr {
150                    continue;
151                }
152                last_inbound = std::time::Instant::now();
153
154                // Skip keepalive/punch packets (they have RNSH/RNSA magic)
155                if len >= 4 && (buf[..4] == *b"RNSH" || buf[..4] == *b"RNSA") {
156                    continue;
157                }
158
159                // Deliver as Reticulum frame
160                if len > 0 {
161                    let _ = tx.send(Event::Frame {
162                        interface_id,
163                        data: buf[..len].to_vec(),
164                    });
165                }
166            }
167            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
168                continue;
169            }
170            Err(e) => {
171                log::warn!("[{}] Direct UDP recv error: {}", interface_id.0, e);
172                let _ = tx.send(Event::InterfaceDown(interface_id));
173                break;
174            }
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn test_udp_direct_writer() {
185        // Create a pair of sockets
186        let sock_a = UdpSocket::bind("127.0.0.1:0").unwrap();
187        let sock_b = UdpSocket::bind("127.0.0.1:0").unwrap();
188        let addr_b = sock_b.local_addr().unwrap();
189        sock_b.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
190
191        let mut writer = UdpDirectWriter {
192            socket: Arc::new(sock_a),
193            peer_addr: addr_b,
194            running: Arc::new(AtomicBool::new(true)),
195        };
196
197        // Send a frame
198        let data = b"hello direct peer";
199        writer.send_frame(data).unwrap();
200
201        // Receive on other end
202        let mut buf = [0u8; 64];
203        let (len, _src) = sock_b.recv_from(&mut buf).unwrap();
204        assert_eq!(&buf[..len], data);
205    }
206}