rust_p2p_core/tunnel/
mod.rs1use bytes::BytesMut;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4
5use crate::tunnel::config::TunnelConfig;
6
7use crate::punch::Puncher;
8use crate::route::{ConnectProtocol, RouteKey};
9use std::sync::Arc;
10
11pub mod config;
12
13pub mod recycle;
14pub mod tcp;
15pub mod udp;
16pub const DEFAULT_ADDRESS_V4: SocketAddr =
17 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
18pub const DEFAULT_ADDRESS_V6: SocketAddr =
19 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
20
21pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
23 let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
24 udp_tunnel_config.main_udp_count = config.major_socket_count;
25 Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
26 } else {
27 None
28 };
29 let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
30 tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
31 Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
32 } else {
33 None
34 };
35
36 let tunnel_dispatcher = TunnelDispatcher {
37 udp_tunnel_dispatcher,
38 tcp_tunnel_dispatcher,
39 };
40 let puncher = Puncher::from(&tunnel_dispatcher);
41 Ok((tunnel_dispatcher, puncher))
42}
43
44pub struct TunnelDispatcher {
45 udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
46 tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
47}
48
49pub enum Tunnel {
50 Udp(udp::UdpTunnel),
51 Tcp(tcp::TcpTunnel),
52}
53
54#[derive(Clone)]
55pub struct SocketManager {
56 udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
57 tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
58}
59
60impl TunnelDispatcher {
61 pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
63 tokio::select! {
64 rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
65 rs
66 }
67 rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
68 rs
69 }
70 }
71 }
72 pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
73 self.udp_tunnel_dispatcher
74 .as_ref()
75 .map(|v| v.socket_manager.clone())
76 }
77 pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
78 self.tcp_tunnel_dispatcher
79 .as_ref()
80 .map(|v| v.socket_manager.clone())
81 }
82 pub fn socket_manager(&self) -> SocketManager {
83 SocketManager {
84 udp_socket_manager: self.shared_udp_socket_manager(),
85 tcp_socket_manager: self.shared_tcp_socket_manager(),
86 }
87 }
88 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
89 self.udp_tunnel_dispatcher
90 .as_ref()
91 .map(|v| &v.socket_manager)
92 }
93 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
94 self.tcp_tunnel_dispatcher
95 .as_ref()
96 .map(|v| &v.socket_manager)
97 }
98}
99
100impl TunnelDispatcher {
101 pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
102 self.udp_tunnel_dispatcher.as_mut()
103 }
104 pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
105 self.tcp_tunnel_dispatcher.as_mut()
106 }
107}
108
109async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
110 if let Some(tcp_tunnel_factory) = tcp {
111 Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
112 } else {
113 futures::future::pending().await
114 }
115}
116async fn dispatch_udp_tunnel(
117 udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
118) -> io::Result<Tunnel> {
119 if let Some(udp_tunnel_factory) = udp_tunnel_factory {
120 Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
121 } else {
122 futures::future::pending().await
123 }
124}
125
126impl SocketManager {
127 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
128 self.udp_socket_manager.as_ref()
129 }
130 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
131 self.tcp_socket_manager.as_ref()
132 }
133}
134
135impl SocketManager {
136 pub async fn send_to(&self, buf: BytesMut, route_key: &RouteKey) -> io::Result<()> {
138 match route_key.protocol() {
139 ConnectProtocol::UDP => {
140 if let Some(w) = self.udp_socket_manager.as_ref() {
141 return w.send_bytes_to(buf, route_key).await;
142 }
143 }
144 ConnectProtocol::TCP => {
145 if let Some(w) = self.tcp_socket_manager.as_ref() {
146 return w.send_to(buf, route_key).await;
147 }
148 }
149 }
150 Err(io::Error::from(io::ErrorKind::InvalidInput))
151 }
152 pub fn try_send_to(&self, buf: BytesMut, route_key: &RouteKey) -> io::Result<()> {
153 match route_key.protocol() {
154 ConnectProtocol::UDP => {
155 if let Some(w) = self.udp_socket_manager.as_ref() {
156 return w.try_send_bytes_to(buf, route_key);
157 }
158 }
159 ConnectProtocol::TCP => {
160 if let Some(w) = self.tcp_socket_manager.as_ref() {
161 return w.try_send_to(buf, route_key);
162 }
163 }
164 }
165 Err(io::Error::from(io::ErrorKind::InvalidInput))
166 }
167
168 pub async fn send_to_addr<A: Into<SocketAddr>>(
170 &self,
171 connect_protocol: ConnectProtocol,
172 buf: BytesMut,
173 addr: A,
174 ) -> io::Result<()> {
175 match connect_protocol {
176 ConnectProtocol::UDP => {
177 if let Some(w) = self.udp_socket_manager.as_ref() {
178 return w.send_bytes_to(buf, addr).await;
179 }
180 }
181 ConnectProtocol::TCP => {
182 if let Some(w) = self.tcp_socket_manager.as_ref() {
183 return w.send_to_addr(buf, addr).await;
184 }
185 }
186 }
187 Err(io::Error::from(io::ErrorKind::InvalidInput))
188 }
189}
190impl Tunnel {
215 pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
219 match self {
220 Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
221 Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
222 }
223 }
224 pub async fn batch_recv_from<B: AsMut<[u8]>>(
225 &mut self,
226 bufs: &mut [B],
227 sizes: &mut [usize],
228 addrs: &mut [RouteKey],
229 ) -> Option<io::Result<usize>> {
230 match self {
231 Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
232 Tunnel::Tcp(tunnel) => {
233 if addrs.len() != bufs.len() {
234 return Some(Err(io::Error::new(io::ErrorKind::Other, "addrs error")));
235 }
236 match tunnel.batch_recv_from(bufs, sizes).await {
237 Ok((n, route_key)) => {
238 addrs[..n].fill(route_key);
239 Some(Ok(n))
240 }
241 Err(e) => Some(Err(e)),
242 }
243 }
244 }
245 }
246 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: BytesMut, addr: A) -> io::Result<()> {
247 match self {
248 Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
249 Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
250 }
251 }
252 pub fn done(&mut self) {
253 match self {
254 Tunnel::Udp(tunnel) => tunnel.done(),
255 Tunnel::Tcp(tunnel) => tunnel.done(),
256 }
257 }
258}
259
260impl Tunnel {
261 pub fn protocol(&self) -> ConnectProtocol {
263 match self {
264 Tunnel::Udp(_) => ConnectProtocol::UDP,
265 Tunnel::Tcp(_) => ConnectProtocol::TCP,
266 }
267 }
268 pub fn remote_addr(&self) -> Option<SocketAddr> {
269 match self {
270 Tunnel::Udp(_) => None,
271 Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
272 }
273 }
274}