rust_p2p_core/tunnel/
mod.rs

1use bytes::BytesMut;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4
5use crate::tunnel::config::TunnelConfig;
6
7use crate::punch::Puncher;
8use crate::route::{ConnectProtocol, RouteKey};
9use std::sync::Arc;
10
11pub mod config;
12
13pub mod recycle;
14pub mod tcp;
15pub mod udp;
16pub const DEFAULT_ADDRESS_V4: SocketAddr =
17    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
18pub const DEFAULT_ADDRESS_V6: SocketAddr =
19    SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
20
21/// Construct the needed components for p2p communication with the given tunnel configuration
22pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
23    let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
24        udp_tunnel_config.main_udp_count = config.major_socket_count;
25        Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
26    } else {
27        None
28    };
29    let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
30        tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
31        Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
32    } else {
33        None
34    };
35
36    let tunnel_dispatcher = TunnelDispatcher {
37        udp_tunnel_dispatcher,
38        tcp_tunnel_dispatcher,
39    };
40    let puncher = Puncher::from(&tunnel_dispatcher);
41    Ok((tunnel_dispatcher, puncher))
42}
43
44pub struct TunnelDispatcher {
45    udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
46    tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
47}
48
49pub enum Tunnel {
50    Udp(udp::UdpTunnel),
51    Tcp(tcp::TcpTunnel),
52}
53
54#[derive(Clone)]
55pub struct SocketManager {
56    udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
57    tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
58}
59
60impl TunnelDispatcher {
61    /// Accept tunnels from a given `factory`
62    pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
63        tokio::select! {
64            rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
65                rs
66            }
67            rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
68                rs
69            }
70        }
71    }
72    pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
73        self.udp_tunnel_dispatcher
74            .as_ref()
75            .map(|v| v.socket_manager.clone())
76    }
77    pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
78        self.tcp_tunnel_dispatcher
79            .as_ref()
80            .map(|v| v.socket_manager.clone())
81    }
82    pub fn socket_manager(&self) -> SocketManager {
83        SocketManager {
84            udp_socket_manager: self.shared_udp_socket_manager(),
85            tcp_socket_manager: self.shared_tcp_socket_manager(),
86        }
87    }
88    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
89        self.udp_tunnel_dispatcher
90            .as_ref()
91            .map(|v| &v.socket_manager)
92    }
93    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
94        self.tcp_tunnel_dispatcher
95            .as_ref()
96            .map(|v| &v.socket_manager)
97    }
98}
99
100impl TunnelDispatcher {
101    pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
102        self.udp_tunnel_dispatcher.as_mut()
103    }
104    pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
105        self.tcp_tunnel_dispatcher.as_mut()
106    }
107}
108
109async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
110    if let Some(tcp_tunnel_factory) = tcp {
111        Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
112    } else {
113        futures::future::pending().await
114    }
115}
116async fn dispatch_udp_tunnel(
117    udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
118) -> io::Result<Tunnel> {
119    if let Some(udp_tunnel_factory) = udp_tunnel_factory {
120        Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
121    } else {
122        futures::future::pending().await
123    }
124}
125
126impl SocketManager {
127    pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
128        self.udp_socket_manager.as_ref()
129    }
130    pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
131        self.tcp_socket_manager.as_ref()
132    }
133}
134
135impl SocketManager {
136    /// Writing `buf` to the target denoted by `route_key`
137    pub async fn send_to(&self, buf: BytesMut, route_key: &RouteKey) -> io::Result<()> {
138        match route_key.protocol() {
139            ConnectProtocol::UDP => {
140                if let Some(w) = self.udp_socket_manager.as_ref() {
141                    return w.send_bytes_to(buf, route_key).await;
142                }
143            }
144            ConnectProtocol::TCP => {
145                if let Some(w) = self.tcp_socket_manager.as_ref() {
146                    return w.send_to(buf, route_key).await;
147                }
148            }
149        }
150        Err(io::Error::from(io::ErrorKind::InvalidInput))
151    }
152    pub fn try_send_to(&self, buf: BytesMut, route_key: &RouteKey) -> io::Result<()> {
153        match route_key.protocol() {
154            ConnectProtocol::UDP => {
155                if let Some(w) = self.udp_socket_manager.as_ref() {
156                    return w.try_send_bytes_to(buf, route_key);
157                }
158            }
159            ConnectProtocol::TCP => {
160                if let Some(w) = self.tcp_socket_manager.as_ref() {
161                    return w.try_send_to(buf, route_key);
162                }
163            }
164        }
165        Err(io::Error::from(io::ErrorKind::InvalidInput))
166    }
167
168    /// Writing `buf` to the target denoted by SocketAddr with the specified protocol
169    pub async fn send_to_addr<A: Into<SocketAddr>>(
170        &self,
171        connect_protocol: ConnectProtocol,
172        buf: BytesMut,
173        addr: A,
174    ) -> io::Result<()> {
175        match connect_protocol {
176            ConnectProtocol::UDP => {
177                if let Some(w) = self.udp_socket_manager.as_ref() {
178                    return w.send_bytes_to(buf, addr).await;
179                }
180            }
181            ConnectProtocol::TCP => {
182                if let Some(w) = self.tcp_socket_manager.as_ref() {
183                    return w.send_to_addr(buf, addr).await;
184                }
185            }
186        }
187        Err(io::Error::from(io::ErrorKind::InvalidInput))
188    }
189}
190// impl<PeerID: Hash + Eq> UnifiedSocketManager<PeerID> {
191//     /// Writing `buf` to the target named by `peer_id`
192//     pub async fn send_to_id(&self, buf: BytesMut, peer_id: &PeerID) -> io::Result<()> {
193//         let route = self.route_table.get_route_by_id(peer_id)?;
194//         self.send_to(buf, &route.route_key()).await
195//     }
196//     /// Writing `buf` to the target named by `peer_id`
197//     pub async fn avoid_loop_send_to_id(
198//         &self,
199//         buf: BytesMut,
200//         src_id: &PeerID,
201//         peer_id: &PeerID,
202//     ) -> io::Result<()> {
203//         let route = self.route_table.get_route_by_id(peer_id)?;
204//         if self
205//             .route_table
206//             .is_route_of_peer_id(src_id, &route.route_key())
207//         {
208//             return Err(io::Error::new(io::ErrorKind::InvalidInput, "loop route"));
209//         }
210//         self.send_to(buf, &route.route_key()).await
211//     }
212// }
213
214impl Tunnel {
215    /// Receiving buf from the associated tunnel
216    /// `usize` in the `Ok` branch indicates how many bytes are received
217    /// `RouteKey` in the `Ok` branch denotes the source where these bytes are received from
218    pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
219        match self {
220            Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
221            Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
222        }
223    }
224    pub async fn batch_recv_from<B: AsMut<[u8]>>(
225        &mut self,
226        bufs: &mut [B],
227        sizes: &mut [usize],
228        addrs: &mut [RouteKey],
229    ) -> Option<io::Result<usize>> {
230        match self {
231            Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
232            Tunnel::Tcp(tunnel) => {
233                if addrs.len() != bufs.len() {
234                    return Some(Err(io::Error::new(io::ErrorKind::Other, "addrs error")));
235                }
236                match tunnel.batch_recv_from(bufs, sizes).await {
237                    Ok((n, route_key)) => {
238                        addrs[..n].fill(route_key);
239                        Some(Ok(n))
240                    }
241                    Err(e) => Some(Err(e)),
242                }
243            }
244        }
245    }
246    pub async fn send_to<A: Into<SocketAddr>>(&self, buf: BytesMut, addr: A) -> io::Result<()> {
247        match self {
248            Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
249            Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
250        }
251    }
252    pub fn done(&mut self) {
253        match self {
254            Tunnel::Udp(tunnel) => tunnel.done(),
255            Tunnel::Tcp(tunnel) => tunnel.done(),
256        }
257    }
258}
259
260impl Tunnel {
261    /// The protocol this tunnel is using
262    pub fn protocol(&self) -> ConnectProtocol {
263        match self {
264            Tunnel::Udp(_) => ConnectProtocol::UDP,
265            Tunnel::Tcp(_) => ConnectProtocol::TCP,
266        }
267    }
268    pub fn remote_addr(&self) -> Option<SocketAddr> {
269        match self {
270            Tunnel::Udp(_) => None,
271            Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
272        }
273    }
274}