Skip to main content

rns_net/holepunch/
udp_direct.rs

1//! Point-to-point UDP direct interface.
2//!
3//! After hole punching succeeds, this module wraps the punched UDP socket
4//! as a Reticulum interface (implementing Writer + reader thread).
5//! No HDLC framing needed — one datagram = one packet (same as UdpInterface).
6
7use std::io;
8use std::net::{SocketAddr, UdpSocket};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12use std::time::Duration;
13
14use rns_core::transport::types::{InterfaceId, InterfaceInfo};
15
16use crate::event::{Event, EventSender};
17use crate::interface::Writer;
18
19use super::puncher;
20
21/// Keepalive interval for maintaining NAT mappings.
22const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
23
24/// Timeout for considering the direct connection dead.
25const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(120);
26
27/// Writer for a direct UDP peer-to-peer connection.
28///
29/// When dropped, signals the reader thread to stop.
30pub struct UdpDirectWriter {
31    socket: Arc<UdpSocket>,
32    peer_addr: SocketAddr,
33    running: Arc<AtomicBool>,
34}
35
36impl Writer for UdpDirectWriter {
37    fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
38        self.socket.send_to(data, self.peer_addr)?;
39        Ok(())
40    }
41}
42
43impl Drop for UdpDirectWriter {
44    fn drop(&mut self) {
45        self.running.store(false, Ordering::Relaxed);
46    }
47}
48
49/// Start a direct UDP interface from a punched socket.
50///
51/// Returns `(interface_id, writer, InterfaceInfo)` and spawns a reader thread.
52/// The reader thread sends `Event::Frame` for incoming packets and
53/// `Event::InterfaceDown` if the connection times out.
54pub fn start_direct_interface(
55    socket: UdpSocket,
56    peer_addr: SocketAddr,
57    interface_id: InterfaceId,
58    session_id: [u8; 16],
59    punch_token: [u8; 32],
60    tx: EventSender,
61) -> io::Result<(Box<dyn Writer>, InterfaceInfo)> {
62    let socket = Arc::new(socket);
63    socket.set_read_timeout(Some(Duration::from_secs(1)))?;
64
65    let running = Arc::new(AtomicBool::new(true));
66
67    let writer = UdpDirectWriter {
68        socket: socket.clone(),
69        peer_addr,
70        running: running.clone(),
71    };
72
73    let name = format!(
74        "DirectPeer/{:02x}{:02x}{:02x}{:02x}",
75        session_id[0], session_id[1], session_id[2], session_id[3]
76    );
77
78    let info = InterfaceInfo {
79        id: interface_id,
80        name: name.clone(),
81        mode: rns_core::constants::MODE_FULL,
82        out_capable: true,
83        in_capable: true,
84        bitrate: None,
85        airtime_profile: None,
86        announce_rate_target: None,
87        announce_rate_grace: 0,
88        announce_rate_penalty: 0.0,
89        announce_cap: 1.0,
90        is_local_client: false,
91        wants_tunnel: false,
92        tunnel_id: None,
93        mtu: 1400,
94        ia_freq: 0.0,
95        started: 0.0,
96        ingress_control: rns_core::transport::types::IngressControlConfig::disabled(),
97    };
98
99    let running_clone = running.clone();
100
101    // Spawn reader + keepalive thread
102    thread::Builder::new()
103        .name(format!("direct-udp-{}", &name))
104        .spawn(move || {
105            run_reader(
106                socket,
107                peer_addr,
108                interface_id,
109                session_id,
110                punch_token,
111                tx,
112                running_clone,
113            );
114        })?;
115
116    Ok((Box::new(writer), info))
117}
118
119fn run_reader(
120    socket: Arc<UdpSocket>,
121    peer_addr: SocketAddr,
122    interface_id: InterfaceId,
123    session_id: [u8; 16],
124    punch_token: [u8; 32],
125    tx: EventSender,
126    running: Arc<AtomicBool>,
127) {
128    let mut buf = [0u8; 2048];
129    let mut last_inbound = std::time::Instant::now();
130    let mut last_keepalive = std::time::Instant::now();
131    let keepalive_pkt = puncher::build_keepalive_packet(&session_id, &punch_token);
132
133    while running.load(Ordering::Relaxed) {
134        // Send keepalive if due
135        if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL {
136            let _ = socket.send_to(&keepalive_pkt, peer_addr);
137            last_keepalive = std::time::Instant::now();
138        }
139
140        // Check inactivity timeout
141        if last_inbound.elapsed() >= INACTIVITY_TIMEOUT {
142            log::info!("[{}] Direct UDP interface timed out", interface_id.0);
143            let _ = tx.send(Event::InterfaceDown(interface_id));
144            break;
145        }
146
147        match socket.recv_from(&mut buf) {
148            Ok((len, src)) => {
149                // Only accept packets from our verified peer
150                if src != peer_addr {
151                    continue;
152                }
153                last_inbound = std::time::Instant::now();
154
155                // Skip keepalive/punch packets (they have RNSH/RNSA magic)
156                if len >= 4 && (buf[..4] == *b"RNSH" || buf[..4] == *b"RNSA") {
157                    continue;
158                }
159
160                // Deliver as Reticulum frame
161                if len > 0 {
162                    let _ = tx.send(Event::Frame {
163                        interface_id,
164                        data: buf[..len].to_vec(),
165                    });
166                }
167            }
168            Err(ref e)
169                if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
170            {
171                continue;
172            }
173            Err(e) => {
174                log::warn!("[{}] Direct UDP recv error: {}", interface_id.0, e);
175                let _ = tx.send(Event::InterfaceDown(interface_id));
176                break;
177            }
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn test_udp_direct_writer() {
188        // Create a pair of sockets
189        let sock_a = UdpSocket::bind("127.0.0.1:0").unwrap();
190        let sock_b = UdpSocket::bind("127.0.0.1:0").unwrap();
191        let addr_b = sock_b.local_addr().unwrap();
192        sock_b
193            .set_read_timeout(Some(Duration::from_secs(1)))
194            .unwrap();
195
196        let mut writer = UdpDirectWriter {
197            socket: Arc::new(sock_a),
198            peer_addr: addr_b,
199            running: Arc::new(AtomicBool::new(true)),
200        };
201
202        // Send a frame
203        let data = b"hello direct peer";
204        writer.send_frame(data).unwrap();
205
206        // Receive on other end
207        let mut buf = [0u8; 64];
208        let (len, _src) = sock_b.recv_from(&mut buf).unwrap();
209        assert_eq!(&buf[..len], data);
210    }
211}