Skip to main content

rust_p2p_core/tunnel/
mod.rs

1use bytes::Bytes;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4
5use crate::tunnel::config::TunnelConfig;
6
7use crate::punch::Puncher;
8use crate::route::{ConnectProtocol, RouteKey};
9use std::sync::Arc;
10
11pub mod config;
12
13pub mod tcp;
14pub mod udp;
15pub const DEFAULT_ADDRESS_V4: SocketAddr =
16    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
17pub const DEFAULT_ADDRESS_V6: SocketAddr =
18    SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
19
20/// Construct the needed components for p2p communication with the given tunnel configuration
21pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
22    let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
23        udp_tunnel_config.main_udp_count = config.major_socket_count;
24        Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
25    } else {
26        None
27    };
28    let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
29        tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
30        Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
31    } else {
32        None
33    };
34
35    let tunnel_dispatcher = TunnelDispatcher {
36        udp_tunnel_dispatcher,
37        tcp_tunnel_dispatcher,
38    };
39    let puncher = Puncher::from(&tunnel_dispatcher);
40    Ok((tunnel_dispatcher, puncher))
41}
42
43pub struct TunnelDispatcher {
44    udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
45    tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
46}
47
48pub enum Tunnel {
49    Udp(udp::UdpTunnel),
50    Tcp(tcp::TcpTunnel),
51}
52
53#[derive(Clone)]
54pub struct SocketManager {
55    udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
56    tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
57}
58
59impl TunnelDispatcher {
60    /// Accept tunnels from a given `factory`
61    pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
62        tokio::select! {
63            rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
64                rs
65            }
66            rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
67                rs
68            }
69        }
70    }
71    pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
72        self.udp_tunnel_dispatcher
73            .as_ref()
74            .map(|v| v.socket_manager.clone())
75    }
76    pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
77        self.tcp_tunnel_dispatcher
78            .as_ref()
79            .map(|v| v.socket_manager.clone())
80    }
81    pub fn socket_manager(&self) -> SocketManager {
82        SocketManager {
83            udp_socket_manager: self.shared_udp_socket_manager(),
84            tcp_socket_manager: self.shared_tcp_socket_manager(),
85        }
86    }
87    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
88        self.udp_tunnel_dispatcher
89            .as_ref()
90            .map(|v| &v.socket_manager)
91    }
92    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
93        self.tcp_tunnel_dispatcher
94            .as_ref()
95            .map(|v| &v.socket_manager)
96    }
97}
98
99impl TunnelDispatcher {
100    pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
101        self.udp_tunnel_dispatcher.as_mut()
102    }
103    pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
104        self.tcp_tunnel_dispatcher.as_mut()
105    }
106}
107
108async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
109    if let Some(tcp_tunnel_factory) = tcp {
110        Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
111    } else {
112        futures::future::pending().await
113    }
114}
115async fn dispatch_udp_tunnel(
116    udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
117) -> io::Result<Tunnel> {
118    if let Some(udp_tunnel_factory) = udp_tunnel_factory {
119        Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
120    } else {
121        futures::future::pending().await
122    }
123}
124
125impl SocketManager {
126    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
127        self.udp_socket_manager.as_ref()
128    }
129    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
130        self.tcp_socket_manager.as_ref()
131    }
132}
133
134impl SocketManager {
135    /// Writing `buf` to the target denoted by `route_key`
136    pub async fn send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
137        match route_key.protocol() {
138            ConnectProtocol::UDP => {
139                if let Some(w) = self.udp_socket_manager.as_ref() {
140                    return w.send_bytes_to(buf, route_key).await;
141                }
142            }
143            ConnectProtocol::TCP => {
144                if let Some(w) = self.tcp_socket_manager.as_ref() {
145                    return w.send_to(buf, route_key).await;
146                }
147            }
148        }
149        Err(io::Error::from(io::ErrorKind::InvalidInput))
150    }
151    pub fn try_send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
152        match route_key.protocol() {
153            ConnectProtocol::UDP => {
154                if let Some(w) = self.udp_socket_manager.as_ref() {
155                    return w.try_send_bytes_to(buf, route_key);
156                }
157            }
158            ConnectProtocol::TCP => {
159                if let Some(w) = self.tcp_socket_manager.as_ref() {
160                    return w.try_send_to(buf, route_key);
161                }
162            }
163        }
164        Err(io::Error::from(io::ErrorKind::InvalidInput))
165    }
166
167    /// Writing `buf` to the target denoted by SocketAddr with the specified protocol
168    pub async fn send_to_addr<A: Into<SocketAddr>>(
169        &self,
170        connect_protocol: ConnectProtocol,
171        buf: Bytes,
172        addr: A,
173    ) -> io::Result<()> {
174        match connect_protocol {
175            ConnectProtocol::UDP => {
176                if let Some(w) = self.udp_socket_manager.as_ref() {
177                    return w.send_bytes_to(buf, addr).await;
178                }
179            }
180            ConnectProtocol::TCP => {
181                if let Some(w) = self.tcp_socket_manager.as_ref() {
182                    return w.send_to_addr(buf, addr).await;
183                }
184            }
185        }
186        Err(io::Error::from(io::ErrorKind::InvalidInput))
187    }
188}
189// impl<PeerID: Hash + Eq> UnifiedSocketManager<PeerID> {
190//     /// Writing `buf` to the target named by `peer_id`
191//     pub async fn send_to_id(&self, buf: Bytes, peer_id: &PeerID) -> io::Result<()> {
192//         let route = self.route_table.get_route_by_id(peer_id)?;
193//         self.send_to(buf, &route.route_key()).await
194//     }
195//     /// Writing `buf` to the target named by `peer_id`
196//     pub async fn avoid_loop_send_to_id(
197//         &self,
198//         buf: Bytes,
199//         src_id: &PeerID,
200//         peer_id: &PeerID,
201//     ) -> io::Result<()> {
202//         let route = self.route_table.get_route_by_id(peer_id)?;
203//         if self
204//             .route_table
205//             .is_route_of_peer_id(src_id, &route.route_key())
206//         {
207//             return Err(io::Error::new(io::ErrorKind::InvalidInput, "loop route"));
208//         }
209//         self.send_to(buf, &route.route_key()).await
210//     }
211// }
212
213impl Tunnel {
214    /// Receiving buf from the associated tunnel
215    /// `usize` in the `Ok` branch indicates how many bytes are received
216    /// `RouteKey` in the `Ok` branch denotes the source where these bytes are received from
217    pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
218        match self {
219            Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
220            Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
221        }
222    }
223    pub async fn batch_recv_from<B: AsMut<[u8]>>(
224        &mut self,
225        bufs: &mut [B],
226        sizes: &mut [usize],
227        addrs: &mut [RouteKey],
228    ) -> Option<io::Result<usize>> {
229        match self {
230            Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
231            Tunnel::Tcp(tunnel) => {
232                if addrs.len() != bufs.len() {
233                    return Some(Err(io::Error::other("addrs error")));
234                }
235                match tunnel.batch_recv_from(bufs, sizes).await {
236                    Ok((n, route_key)) => {
237                        addrs[..n].fill(route_key);
238                        Some(Ok(n))
239                    }
240                    Err(e) => Some(Err(e)),
241                }
242            }
243        }
244    }
245    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: Bytes, addr: A) -> io::Result<()> {
246        match self {
247            Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
248            Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
249        }
250    }
251    pub fn done(&mut self) {
252        match self {
253            Tunnel::Udp(tunnel) => tunnel.done(),
254            Tunnel::Tcp(tunnel) => tunnel.done(),
255        }
256    }
257}
258
259impl Tunnel {
260    /// The protocol this tunnel is using
261    pub fn protocol(&self) -> ConnectProtocol {
262        match self {
263            Tunnel::Udp(_) => ConnectProtocol::UDP,
264            Tunnel::Tcp(_) => ConnectProtocol::TCP,
265        }
266    }
267    pub fn remote_addr(&self) -> Option<SocketAddr> {
268        match self {
269            Tunnel::Udp(_) => None,
270            Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
271        }
272    }
273}