rust_p2p_core/tunnel/
mod.rs1use bytes::Bytes;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4
5use crate::tunnel::config::TunnelConfig;
6
7use crate::punch::Puncher;
8use crate::route::{ConnectProtocol, RouteKey};
9use std::sync::Arc;
10
11pub mod config;
12
13pub mod tcp;
14pub mod udp;
15pub const DEFAULT_ADDRESS_V4: SocketAddr =
16 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
17pub const DEFAULT_ADDRESS_V6: SocketAddr =
18 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0));
19
20pub fn new_tunnel_component(config: TunnelConfig) -> io::Result<(TunnelDispatcher, Puncher)> {
22 let udp_tunnel_dispatcher = if let Some(mut udp_tunnel_config) = config.udp_tunnel_config {
23 udp_tunnel_config.main_udp_count = config.major_socket_count;
24 Some(udp::UdpTunnelDispatcher::new(udp_tunnel_config)?)
25 } else {
26 None
27 };
28 let tcp_tunnel_dispatcher = if let Some(mut tcp_tunnel_config) = config.tcp_tunnel_config {
29 tcp_tunnel_config.tcp_multiplexing_limit = config.major_socket_count;
30 Some(tcp::TcpTunnelDispatcher::new(tcp_tunnel_config)?)
31 } else {
32 None
33 };
34
35 let tunnel_dispatcher = TunnelDispatcher {
36 udp_tunnel_dispatcher,
37 tcp_tunnel_dispatcher,
38 };
39 let puncher = Puncher::from(&tunnel_dispatcher);
40 Ok((tunnel_dispatcher, puncher))
41}
42
43pub struct TunnelDispatcher {
44 udp_tunnel_dispatcher: Option<udp::UdpTunnelDispatcher>,
45 tcp_tunnel_dispatcher: Option<tcp::TcpTunnelDispatcher>,
46}
47
48pub enum Tunnel {
49 Udp(udp::UdpTunnel),
50 Tcp(tcp::TcpTunnel),
51}
52
53#[derive(Clone)]
54pub struct SocketManager {
55 udp_socket_manager: Option<Arc<udp::UdpSocketManager>>,
56 tcp_socket_manager: Option<Arc<tcp::TcpSocketManager>>,
57}
58
59impl TunnelDispatcher {
60 pub async fn dispatch(&mut self) -> io::Result<Tunnel> {
62 tokio::select! {
63 rs=dispatch_udp_tunnel(self.udp_tunnel_dispatcher.as_mut())=>{
64 rs
65 }
66 rs=accept_tcp(self.tcp_tunnel_dispatcher.as_mut())=>{
67 rs
68 }
69 }
70 }
71 pub fn shared_udp_socket_manager(&self) -> Option<Arc<udp::UdpSocketManager>> {
72 self.udp_tunnel_dispatcher
73 .as_ref()
74 .map(|v| v.socket_manager.clone())
75 }
76 pub fn shared_tcp_socket_manager(&self) -> Option<Arc<tcp::TcpSocketManager>> {
77 self.tcp_tunnel_dispatcher
78 .as_ref()
79 .map(|v| v.socket_manager.clone())
80 }
81 pub fn socket_manager(&self) -> SocketManager {
82 SocketManager {
83 udp_socket_manager: self.shared_udp_socket_manager(),
84 tcp_socket_manager: self.shared_tcp_socket_manager(),
85 }
86 }
87 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
88 self.udp_tunnel_dispatcher
89 .as_ref()
90 .map(|v| &v.socket_manager)
91 }
92 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
93 self.tcp_tunnel_dispatcher
94 .as_ref()
95 .map(|v| &v.socket_manager)
96 }
97}
98
99impl TunnelDispatcher {
100 pub fn udp_tunnel_manager_as_mut(&mut self) -> Option<&mut udp::UdpTunnelDispatcher> {
101 self.udp_tunnel_dispatcher.as_mut()
102 }
103 pub fn tcp_tunnel_manager_as_mut(&mut self) -> Option<&mut tcp::TcpTunnelDispatcher> {
104 self.tcp_tunnel_dispatcher.as_mut()
105 }
106}
107
108async fn accept_tcp(tcp: Option<&mut tcp::TcpTunnelDispatcher>) -> io::Result<Tunnel> {
109 if let Some(tcp_tunnel_factory) = tcp {
110 Ok(Tunnel::Tcp(tcp_tunnel_factory.dispatch().await?))
111 } else {
112 futures::future::pending().await
113 }
114}
115async fn dispatch_udp_tunnel(
116 udp_tunnel_factory: Option<&mut udp::UdpTunnelDispatcher>,
117) -> io::Result<Tunnel> {
118 if let Some(udp_tunnel_factory) = udp_tunnel_factory {
119 Ok(Tunnel::Udp(udp_tunnel_factory.dispatch().await?))
120 } else {
121 futures::future::pending().await
122 }
123}
124
125impl SocketManager {
126 pub fn udp_socket_manager_as_ref(&self) -> Option<&Arc<udp::UdpSocketManager>> {
127 self.udp_socket_manager.as_ref()
128 }
129 pub fn tcp_socket_manager_as_ref(&self) -> Option<&Arc<tcp::TcpSocketManager>> {
130 self.tcp_socket_manager.as_ref()
131 }
132}
133
134impl SocketManager {
135 pub async fn send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
137 match route_key.protocol() {
138 ConnectProtocol::UDP => {
139 if let Some(w) = self.udp_socket_manager.as_ref() {
140 return w.send_bytes_to(buf, route_key).await;
141 }
142 }
143 ConnectProtocol::TCP => {
144 if let Some(w) = self.tcp_socket_manager.as_ref() {
145 return w.send_to(buf, route_key).await;
146 }
147 }
148 }
149 Err(io::Error::from(io::ErrorKind::InvalidInput))
150 }
151 pub fn try_send_to(&self, buf: Bytes, route_key: &RouteKey) -> io::Result<()> {
152 match route_key.protocol() {
153 ConnectProtocol::UDP => {
154 if let Some(w) = self.udp_socket_manager.as_ref() {
155 return w.try_send_bytes_to(buf, route_key);
156 }
157 }
158 ConnectProtocol::TCP => {
159 if let Some(w) = self.tcp_socket_manager.as_ref() {
160 return w.try_send_to(buf, route_key);
161 }
162 }
163 }
164 Err(io::Error::from(io::ErrorKind::InvalidInput))
165 }
166
167 pub async fn send_to_addr<A: Into<SocketAddr>>(
169 &self,
170 connect_protocol: ConnectProtocol,
171 buf: Bytes,
172 addr: A,
173 ) -> io::Result<()> {
174 match connect_protocol {
175 ConnectProtocol::UDP => {
176 if let Some(w) = self.udp_socket_manager.as_ref() {
177 return w.send_bytes_to(buf, addr).await;
178 }
179 }
180 ConnectProtocol::TCP => {
181 if let Some(w) = self.tcp_socket_manager.as_ref() {
182 return w.send_to_addr(buf, addr).await;
183 }
184 }
185 }
186 Err(io::Error::from(io::ErrorKind::InvalidInput))
187 }
188}
189impl Tunnel {
214 pub async fn recv_from(&mut self, buf: &mut [u8]) -> Option<io::Result<(usize, RouteKey)>> {
218 match self {
219 Tunnel::Udp(tunnel) => tunnel.recv_from(buf).await,
220 Tunnel::Tcp(tunnel) => Some(tunnel.recv_from(buf).await),
221 }
222 }
223 pub async fn batch_recv_from<B: AsMut<[u8]>>(
224 &mut self,
225 bufs: &mut [B],
226 sizes: &mut [usize],
227 addrs: &mut [RouteKey],
228 ) -> Option<io::Result<usize>> {
229 match self {
230 Tunnel::Udp(tunnel) => tunnel.batch_recv_from(bufs, sizes, addrs).await,
231 Tunnel::Tcp(tunnel) => {
232 if addrs.len() != bufs.len() {
233 return Some(Err(io::Error::other("addrs error")));
234 }
235 match tunnel.batch_recv_from(bufs, sizes).await {
236 Ok((n, route_key)) => {
237 addrs[..n].fill(route_key);
238 Some(Ok(n))
239 }
240 Err(e) => Some(Err(e)),
241 }
242 }
243 }
244 }
245 pub async fn send_to<A: Into<SocketAddr>>(&self, buf: Bytes, addr: A) -> io::Result<()> {
246 match self {
247 Tunnel::Udp(tunnel) => tunnel.send_bytes_to(buf, addr).await,
248 Tunnel::Tcp(tunnel) => tunnel.send(buf).await,
249 }
250 }
251 pub fn done(&mut self) {
252 match self {
253 Tunnel::Udp(tunnel) => tunnel.done(),
254 Tunnel::Tcp(tunnel) => tunnel.done(),
255 }
256 }
257}
258
259impl Tunnel {
260 pub fn protocol(&self) -> ConnectProtocol {
262 match self {
263 Tunnel::Udp(_) => ConnectProtocol::UDP,
264 Tunnel::Tcp(_) => ConnectProtocol::TCP,
265 }
266 }
267 pub fn remote_addr(&self) -> Option<SocketAddr> {
268 match self {
269 Tunnel::Udp(_) => None,
270 Tunnel::Tcp(tcp) => Some(tcp.route_key().addr()),
271 }
272 }
273}