Skip to main content

rust_p2p_core/tunnel/
mod.rs

1//! Tunnel management for UDP and TCP connections.
2//!
3//! This module provides unified management of UDP and TCP tunnels, handling
4//! connection dispatch, socket management, and tunnel lifecycle.
5//!
6//! # Examples
7//!
8//! ```rust,no_run
9//! use rust_p2p_core::tunnel::{TunnelConfig, new_tunnel_component};
10//!
11//! # #[tokio::main]
12//! # async fn main() -> std::io::Result<()> {
13//! let config = TunnelConfig::default();
14//! let (mut dispatcher, puncher) = new_tunnel_component(config)?;
15//!
16//! // Accept incoming tunnels
17//! while let Ok(tunnel) = dispatcher.dispatch().await {
18//!     tokio::spawn(async move {
19//!         // Handle tunnel
20//!     });
21//! }
22//! # Ok(())
23//! # }
24//! ```
25
26use bytes::Bytes;
27use std::io;
28use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
29
30use crate::tunnel::config::TunnelConfig;
31
32use crate::punch::Puncher;
33use crate::route::{ConnectProtocol, RouteKey};
34use std::sync::Arc;
35
36pub mod config;
37
38pub mod tcp;
39pub mod udp;
40pub const DEFAULT_ADDRESS_V4: SocketAddr =
41    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
42pub const DEFAULT_ADDRESS_V6: SocketAddr =
43    SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
44
45/// Creates tunnel dispatcher and puncher from configuration.
46///
47/// This is the main entry point for setting up the tunnel infrastructure.
48///
49/// # Arguments
50///
51/// * `config` - Tunnel configuration specifying UDP/TCP settings
52///
53/// # Returns
54///
55/// A tuple containing:
56/// - `TunnelDispatcher` - For accepting incoming connections
57/// - `Puncher` - For NAT traversal
58///
59/// # Examples
60///
61/// ```rust,no_run
62/// use rust_p2p_core::tunnel::{TunnelConfig, new_tunnel_component};
63///
64/// # fn main() -> std::io::Result<()> {
65/// let config = TunnelConfig::default();
66/// let (dispatcher, puncher) = new_tunnel_component(config)?;
67/// # Ok(())
68/// # }
69/// ```
70pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
71    let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
72        udp_tunnel_config.main_udp_count = config.major_socket_count;
73        Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
74    } else {
75        None
76    };
77    let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
78        tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
79        Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
80    } else {
81        None
82    };
83
84    let tunnel_dispatcher = TunnelDispatcher {
85        udp_tunnel_dispatcher,
86        tcp_tunnel_dispatcher,
87    };
88    let puncher = Puncher::from(&tunnel_dispatcher);
89    Ok((tunnel_dispatcher, puncher))
90}
91
92/// Dispatcher for accepting incoming tunnel connections.
93///
94/// `TunnelDispatcher` manages both UDP and TCP tunnel dispatchers and
95/// provides a unified interface for accepting connections.
96pub struct TunnelDispatcher {
97    udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
98    tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
99}
100
101/// Unified tunnel type for UDP or TCP connections.
102///
103/// # Examples
104///
105/// ```rust,no_run
106/// use rust_p2p_core::tunnel::Tunnel;
107///
108/// # async fn example(tunnel: Tunnel) {
109/// match tunnel {
110///     Tunnel::Udp(udp) => println!("UDP tunnel"),
111///     Tunnel::Tcp(tcp) => println!("TCP tunnel"),
112/// }
113/// # }
114/// ```
115pub enum Tunnel {
116    Udp(udp::UdpTunnel),
117    Tcp(tcp::TcpTunnel),
118}
119
120/// Manager for UDP and TCP sockets.
121#[derive(Clone)]
122pub struct SocketManager {
123    udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
124    tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
125}
126
127impl TunnelDispatcher {
128    /// Accepts the next incoming tunnel connection.
129    ///
130    /// This method blocks until a connection is available from either
131    /// UDP or TCP dispatcher.
132    ///
133    /// # Returns
134    ///
135    /// The next available tunnel connection.
136    ///
137    /// # Examples
138    ///
139    /// ```rust,no_run
140    /// # use rust_p2p_core::tunnel::TunnelDispatcher;
141    /// # async fn example(mut dispatcher: TunnelDispatcher) -> std::io::Result<()> {
142    /// loop {
143    ///     let tunnel = dispatcher.dispatch().await?;
144    ///     tokio::spawn(async move {
145    ///         // Handle tunnel
146    ///     });
147    /// }
148    /// # }
149    /// ```
150    pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
151        tokio::select! {
152            rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
153                rs
154            }
155            rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
156                rs
157            }
158        }
159    }
160    pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
161        self.udp_tunnel_dispatcher
162            .as_ref()
163            .map(|v| v.socket_manager.clone())
164    }
165    pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
166        self.tcp_tunnel_dispatcher
167            .as_ref()
168            .map(|v| v.socket_manager.clone())
169    }
170    pub fn socket_manager(&self) -> SocketManager {
171        SocketManager {
172            udp_socket_manager: self.shared_udp_socket_manager(),
173            tcp_socket_manager: self.shared_tcp_socket_manager(),
174        }
175    }
176    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
177        self.udp_tunnel_dispatcher
178            .as_ref()
179            .map(|v| &v.socket_manager)
180    }
181    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
182        self.tcp_tunnel_dispatcher
183            .as_ref()
184            .map(|v| &v.socket_manager)
185    }
186}
187
188impl TunnelDispatcher {
189    pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
190        self.udp_tunnel_dispatcher.as_mut()
191    }
192    pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
193        self.tcp_tunnel_dispatcher.as_mut()
194    }
195}
196
197async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
198    if let Some(tcp_tunnel_factory) = tcp {
199        Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
200    } else {
201        futures::future::pending().await
202    }
203}
204async fn dispatch_udp_tunnel(
205    udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
206) -> io::Result<Tunnel> {
207    if let Some(udp_tunnel_factory) = udp_tunnel_factory {
208        Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
209    } else {
210        futures::future::pending().await
211    }
212}
213
214impl SocketManager {
215    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
216        self.udp_socket_manager.as_ref()
217    }
218    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
219        self.tcp_socket_manager.as_ref()
220    }
221}
222
223impl SocketManager {
224    /// Writing `buf` to the target denoted by `route_key`
225    pub async fn send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
226        match route_key.protocol() {
227            ConnectProtocol::UDP => {
228                if let Some(w) = self.udp_socket_manager.as_ref() {
229                    return w.send_bytes_to(buf, route_key).await;
230                }
231            }
232            ConnectProtocol::TCP => {
233                if let Some(w) = self.tcp_socket_manager.as_ref() {
234                    return w.send_to(buf, route_key).await;
235                }
236            }
237        }
238        Err(io::Error::from(io::ErrorKind::InvalidInput))
239    }
240    pub fn try_send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
241        match route_key.protocol() {
242            ConnectProtocol::UDP => {
243                if let Some(w) = self.udp_socket_manager.as_ref() {
244                    return w.try_send_bytes_to(buf, route_key);
245                }
246            }
247            ConnectProtocol::TCP => {
248                if let Some(w) = self.tcp_socket_manager.as_ref() {
249                    return w.try_send_to(buf, route_key);
250                }
251            }
252        }
253        Err(io::Error::from(io::ErrorKind::InvalidInput))
254    }
255
256    /// Writing `buf` to the target denoted by SocketAddr with the specified protocol
257    pub async fn send_to_addr<A: Into<SocketAddr>>(
258        &self,
259        connect_protocol: ConnectProtocol,
260        buf: Bytes,
261        addr: A,
262    ) -> io::Result<()> {
263        match connect_protocol {
264            ConnectProtocol::UDP => {
265                if let Some(w) = self.udp_socket_manager.as_ref() {
266                    return w.send_bytes_to(buf, addr).await;
267                }
268            }
269            ConnectProtocol::TCP => {
270                if let Some(w) = self.tcp_socket_manager.as_ref() {
271                    return w.send_to_addr(buf, addr).await;
272                }
273            }
274        }
275        Err(io::Error::from(io::ErrorKind::InvalidInput))
276    }
277}
278// impl<PeerID: Hash + Eq> UnifiedSocketManager<PeerID> {
279//     /// Writing `buf` to the target named by `peer_id`
280//     pub async fn send_to_id(&self, buf: Bytes, peer_id: &PeerID) -> io::Result<()> {
281//         let route = self.route_table.get_route_by_id(peer_id)?;
282//         self.send_to(buf, &route.route_key()).await
283//     }
284//     /// Writing `buf` to the target named by `peer_id`
285//     pub async fn avoid_loop_send_to_id(
286//         &self,
287//         buf: Bytes,
288//         src_id: &PeerID,
289//         peer_id: &PeerID,
290//     ) -> io::Result<()> {
291//         let route = self.route_table.get_route_by_id(peer_id)?;
292//         if self
293//             .route_table
294//             .is_route_of_peer_id(src_id, &route.route_key())
295//         {
296//             return Err(io::Error::new(io::ErrorKind::InvalidInput, "loop route"));
297//         }
298//         self.send_to(buf, &route.route_key()).await
299//     }
300// }
301
302impl Tunnel {
303    /// Receiving buf from the associated tunnel
304    /// `usize` in the `Ok` branch indicates how many bytes are received
305    /// `RouteKey` in the `Ok` branch denotes the source where these bytes are received from
306    pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
307        match self {
308            Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
309            Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
310        }
311    }
312    pub async fn batch_recv_from<B: AsMut<[u8]>>(
313        &mut self,
314        bufs: &mut [B],
315        sizes: &mut [usize],
316        addrs: &mut [RouteKey],
317    ) -> Option<io::Result<usize>> {
318        match self {
319            Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
320            Tunnel::Tcp(tunnel) => {
321                if addrs.len() != bufs.len() {
322                    return Some(Err(io::Error::other("addrs error")));
323                }
324                match tunnel.batch_recv_from(bufs, sizes).await {
325                    Ok((n, route_key)) => {
326                        addrs[..n].fill(route_key);
327                        Some(Ok(n))
328                    }
329                    Err(e) => Some(Err(e)),
330                }
331            }
332        }
333    }
334    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: Bytes, addr: A) -> io::Result<()> {
335        match self {
336            Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
337            Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
338        }
339    }
340    pub fn done(&mut self) {
341        match self {
342            Tunnel::Udp(tunnel) => tunnel.done(),
343            Tunnel::Tcp(tunnel) => tunnel.done(),
344        }
345    }
346}
347
348impl Tunnel {
349    /// The protocol this tunnel is using
350    pub fn protocol(&self) -> ConnectProtocol {
351        match self {
352            Tunnel::Udp(_) => ConnectProtocol::UDP,
353            Tunnel::Tcp(_) => ConnectProtocol::TCP,
354        }
355    }
356    pub fn remote_addr(&self) -> Option<SocketAddr> {
357        match self {
358            Tunnel::Udp(_) => None,
359            Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
360        }
361    }
362}