rust_p2p_core/tunnel/
mod.rs1use bytes::Bytes;
27use std::io;
28use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
29
30use crate::tunnel::config::TunnelConfig;
31
32use crate::punch::Puncher;
33use crate::route::{ConnectProtocol, RouteKey};
34use std::sync::Arc;
35
36pub mod config;
37
38pub mod tcp;
39pub mod udp;
40pub const DEFAULT_ADDRESS_V4: SocketAddr =
41 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
42pub const DEFAULT_ADDRESS_V6: SocketAddr =
43 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
44
45pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
71 let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
72 udp_tunnel_config.main_udp_count = config.major_socket_count;
73 Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
74 } else {
75 None
76 };
77 let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
78 tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
79 Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
80 } else {
81 None
82 };
83
84 let tunnel_dispatcher = TunnelDispatcher {
85 udp_tunnel_dispatcher,
86 tcp_tunnel_dispatcher,
87 };
88 let puncher = Puncher::from(&tunnel_dispatcher);
89 Ok((tunnel_dispatcher, puncher))
90}
91
92pub struct TunnelDispatcher {
97 udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
98 tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
99}
100
101pub enum Tunnel {
116 Udp(udp::UdpTunnel),
117 Tcp(tcp::TcpTunnel),
118}
119
120#[derive(Clone)]
122pub struct SocketManager {
123 udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
124 tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
125}
126
127impl TunnelDispatcher {
128 pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
151 tokio::select! {
152 rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
153 rs
154 }
155 rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
156 rs
157 }
158 }
159 }
160 pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
161 self.udp_tunnel_dispatcher
162 .as_ref()
163 .map(|v| v.socket_manager.clone())
164 }
165 pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
166 self.tcp_tunnel_dispatcher
167 .as_ref()
168 .map(|v| v.socket_manager.clone())
169 }
170 pub fn socket_manager(&self) -> SocketManager {
171 SocketManager {
172 udp_socket_manager: self.shared_udp_socket_manager(),
173 tcp_socket_manager: self.shared_tcp_socket_manager(),
174 }
175 }
176 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
177 self.udp_tunnel_dispatcher
178 .as_ref()
179 .map(|v| &v.socket_manager)
180 }
181 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
182 self.tcp_tunnel_dispatcher
183 .as_ref()
184 .map(|v| &v.socket_manager)
185 }
186}
187
188impl TunnelDispatcher {
189 pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
190 self.udp_tunnel_dispatcher.as_mut()
191 }
192 pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
193 self.tcp_tunnel_dispatcher.as_mut()
194 }
195}
196
197async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
198 if let Some(tcp_tunnel_factory) = tcp {
199 Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
200 } else {
201 futures::future::pending().await
202 }
203}
204async fn dispatch_udp_tunnel(
205 udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
206) -> io::Result<Tunnel> {
207 if let Some(udp_tunnel_factory) = udp_tunnel_factory {
208 Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
209 } else {
210 futures::future::pending().await
211 }
212}
213
214impl SocketManager {
215 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
216 self.udp_socket_manager.as_ref()
217 }
218 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
219 self.tcp_socket_manager.as_ref()
220 }
221}
222
223impl SocketManager {
224 pub async fn send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
226 match route_key.protocol() {
227 ConnectProtocol::UDP => {
228 if let Some(w) = self.udp_socket_manager.as_ref() {
229 return w.send_bytes_to(buf, route_key).await;
230 }
231 }
232 ConnectProtocol::TCP => {
233 if let Some(w) = self.tcp_socket_manager.as_ref() {
234 return w.send_to(buf, route_key).await;
235 }
236 }
237 }
238 Err(io::Error::from(io::ErrorKind::InvalidInput))
239 }
240 pub fn try_send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
241 match route_key.protocol() {
242 ConnectProtocol::UDP => {
243 if let Some(w) = self.udp_socket_manager.as_ref() {
244 return w.try_send_bytes_to(buf, route_key);
245 }
246 }
247 ConnectProtocol::TCP => {
248 if let Some(w) = self.tcp_socket_manager.as_ref() {
249 return w.try_send_to(buf, route_key);
250 }
251 }
252 }
253 Err(io::Error::from(io::ErrorKind::InvalidInput))
254 }
255
256 pub async fn send_to_addr<A: Into<SocketAddr>>(
258 &self,
259 connect_protocol: ConnectProtocol,
260 buf: Bytes,
261 addr: A,
262 ) -> io::Result<()> {
263 match connect_protocol {
264 ConnectProtocol::UDP => {
265 if let Some(w) = self.udp_socket_manager.as_ref() {
266 return w.send_bytes_to(buf, addr).await;
267 }
268 }
269 ConnectProtocol::TCP => {
270 if let Some(w) = self.tcp_socket_manager.as_ref() {
271 return w.send_to_addr(buf, addr).await;
272 }
273 }
274 }
275 Err(io::Error::from(io::ErrorKind::InvalidInput))
276 }
277}
278impl Tunnel {
303 pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
307 match self {
308 Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
309 Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
310 }
311 }
312 pub async fn batch_recv_from<B: AsMut<[u8]>>(
313 &mut self,
314 bufs: &mut [B],
315 sizes: &mut [usize],
316 addrs: &mut [RouteKey],
317 ) -> Option<io::Result<usize>> {
318 match self {
319 Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
320 Tunnel::Tcp(tunnel) => {
321 if addrs.len() != bufs.len() {
322 return Some(Err(io::Error::other("addrs error")));
323 }
324 match tunnel.batch_recv_from(bufs, sizes).await {
325 Ok((n, route_key)) => {
326 addrs[..n].fill(route_key);
327 Some(Ok(n))
328 }
329 Err(e) => Some(Err(e)),
330 }
331 }
332 }
333 }
334 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: Bytes, addr: A) -> io::Result<()> {
335 match self {
336 Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
337 Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
338 }
339 }
340 pub fn done(&mut self) {
341 match self {
342 Tunnel::Udp(tunnel) => tunnel.done(),
343 Tunnel::Tcp(tunnel) => tunnel.done(),
344 }
345 }
346}
347
348impl Tunnel {
349 pub fn protocol(&self) -> ConnectProtocol {
351 match self {
352 Tunnel::Udp(_) => ConnectProtocol::UDP,
353 Tunnel::Tcp(_) => ConnectProtocol::TCP,
354 }
355 }
356 pub fn remote_addr(&self) -> Option<SocketAddr> {
357 match self {
358 Tunnel::Udp(_) => None,
359 Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
360 }
361 }
362}