1use std::io;
2use std::io::IoSlice;
3use std::net::SocketAddr;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bytes::{Buf, BytesMut};
8
9use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
10
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12
13use crate::protocol::node_id::{GroupCode, NodeID};
14use crate::protocol::{NetPacket, HEAD_LEN};
15use crate::tunnel::{NodeAddress, PeerNodeAddress, RecvResult};
16pub use rust_p2p_core::nat::*;
17pub use rust_p2p_core::punch::config::{PunchModel, PunchPolicy, PunchPolicySet};
18pub use rust_p2p_core::socket::LocalInterface;
19pub use rust_p2p_core::tunnel::config::LoadBalance;
20use rust_p2p_core::tunnel::recycle::RecycleBuf;
21use rust_p2p_core::tunnel::tcp::{Decoder, Encoder, InitCodec};
22pub use rust_p2p_core::tunnel::udp::Model;
23
24pub(crate) mod punch_info;
25
26pub(crate) const ROUTE_IDLE_TIME: Duration = Duration::from_secs(10);
27
28pub struct Config {
29 pub load_balance: LoadBalance,
30 pub major_socket_count: usize,
31 pub route_idle_time: Duration,
32 pub udp_tunnel_config: Option<UdpTunnelConfig>,
33 pub tcp_tunnel_config: Option<TcpTunnelConfig>,
34 pub group_code: Option<GroupCode>,
35 pub self_id: Option<NodeID>,
36 pub direct_addrs: Option<Vec<PeerNodeAddress>>,
37 pub send_buffer_size: usize,
38 pub recv_buffer_size: usize,
39 pub query_id_interval: Duration,
40 pub query_id_max_num: usize,
41 pub heartbeat_interval: Duration,
42 pub tcp_stun_servers: Option<Vec<String>>,
43 pub udp_stun_servers: Option<Vec<String>>,
44 pub mapping_addrs: Option<Vec<NodeAddress>>,
45 pub dns: Option<Vec<String>>,
46 pub recycle_buf_cap: usize,
47 #[cfg(any(
48 feature = "aes-gcm-openssl",
49 feature = "aes-gcm-ring",
50 feature = "chacha20-poly1305-openssl",
51 feature = "chacha20-poly1305-ring"
52 ))]
53 pub encryption: Option<crate::cipher::Algorithm>,
54 pub default_interface: Option<LocalInterface>,
55 pub use_v6: bool,
56}
57
58impl Default for Config {
59 fn default() -> Self {
60 Self {
61 load_balance: LoadBalance::MinHopLowestLatency,
62 major_socket_count: MAX_MAJOR_SOCKET_COUNT,
63 udp_tunnel_config: Some(Default::default()),
64 tcp_tunnel_config: Some(Default::default()),
65 route_idle_time: ROUTE_IDLE_TIME,
66 group_code: None,
67 self_id: None,
68 direct_addrs: None,
69 send_buffer_size: 2048,
70 recv_buffer_size: 2048,
71 query_id_interval: Duration::from_secs(17),
72 query_id_max_num: 3,
73 heartbeat_interval: Duration::from_secs(5),
74 tcp_stun_servers: Some(vec![
75 "stun.flashdance.cx".to_string(),
76 "stun.sipnet.net".to_string(),
77 "stun.nextcloud.com:443".to_string(),
78 ]),
79 udp_stun_servers: Some(vec![
80 "stun.miwifi.com".to_string(),
81 "stun.chat.bilibili.com".to_string(),
82 "stun.hitv.com".to_string(),
83 "stun.l.google.com:19302".to_string(),
84 "stun1.l.google.com:19302".to_string(),
85 "stun2.l.google.com:19302".to_string(),
86 ]),
87 mapping_addrs: None,
88 dns: None,
89 recycle_buf_cap: 64,
90 #[cfg(any(
91 feature = "aes-gcm-openssl",
92 feature = "aes-gcm-ring",
93 feature = "chacha20-poly1305-openssl",
94 feature = "chacha20-poly1305-ring"
95 ))]
96 encryption: None,
97 default_interface: None,
98 use_v6: rust_p2p_core::tunnel::config::UdpTunnelConfig::default()
99 .set_use_v6(true)
100 .check()
101 .is_ok(),
102 }
103 }
104}
105
106pub(crate) const MAX_MAJOR_SOCKET_COUNT: usize = 2;
107pub(crate) const MAX_UDP_SUB_SOCKET_COUNT: usize = 82;
108
109impl Config {
110 pub fn none_tcp(self) -> Self {
111 self
112 }
113}
114
115impl Config {
116 pub fn empty() -> Self {
117 Self::default()
118 }
119 pub fn set_load_balance(mut self, load_balance: LoadBalance) -> Self {
120 self.load_balance = load_balance;
121 self
122 }
123 pub fn set_main_socket_count(mut self, count: usize) -> Self {
124 self.major_socket_count = count;
125 self
126 }
127
128 pub fn set_udp_tunnel_config(mut self, config: UdpTunnelConfig) -> Self {
129 self.udp_tunnel_config.replace(config);
130 self
131 }
132 pub fn set_tcp_tunnel_config(mut self, config: TcpTunnelConfig) -> Self {
133 self.tcp_tunnel_config.replace(config);
134 self
135 }
136 pub fn set_group_code(mut self, group_code: GroupCode) -> Self {
137 self.group_code.replace(group_code);
138 self
139 }
140 pub fn set_node_id(mut self, self_id: NodeID) -> Self {
141 self.self_id.replace(self_id);
142 self
143 }
144 pub fn set_direct_addrs(mut self, direct_addrs: Vec<PeerNodeAddress>) -> Self {
145 self.direct_addrs.replace(direct_addrs);
146 self
147 }
148 pub fn set_send_buffer_size(mut self, send_buffer_size: usize) -> Self {
149 self.send_buffer_size = send_buffer_size;
150 self
151 }
152 pub fn set_recv_buffer_size(mut self, recv_buffer_size: usize) -> Self {
153 self.recv_buffer_size = recv_buffer_size;
154 self
155 }
156 pub fn set_query_id_interval(mut self, query_id_interval: Duration) -> Self {
157 self.query_id_interval = query_id_interval;
158 self
159 }
160 pub fn set_query_id_max_num(mut self, query_id_max_num: usize) -> Self {
161 self.query_id_max_num = query_id_max_num;
162 self
163 }
164 pub fn set_heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
165 self.heartbeat_interval = heartbeat_interval;
166 self
167 }
168 pub fn set_tcp_stun_servers(mut self, tcp_stun_servers: Vec<String>) -> Self {
169 self.tcp_stun_servers.replace(tcp_stun_servers);
170 self
171 }
172 pub fn set_udp_stun_servers(mut self, udp_stun_servers: Vec<String>) -> Self {
173 self.udp_stun_servers.replace(udp_stun_servers);
174 self
175 }
176 pub fn set_mapping_addrs(mut self, mapping_addrs: Vec<NodeAddress>) -> Self {
178 self.mapping_addrs.replace(mapping_addrs);
179 self
180 }
181 pub fn set_dns(mut self, dns: Vec<String>) -> Self {
182 self.dns.replace(dns);
183 self
184 }
185 pub fn set_recycle_buf_cap(mut self, recycle_buf_cap: usize) -> Self {
186 self.recycle_buf_cap = recycle_buf_cap;
187 self
188 }
189 #[cfg(any(
190 feature = "aes-gcm-openssl",
191 feature = "aes-gcm-ring",
192 feature = "chacha20-poly1305-openssl",
193 feature = "chacha20-poly1305-ring"
194 ))]
195 pub fn set_encryption(mut self, encryption: crate::cipher::Algorithm) -> Self {
196 self.encryption.replace(encryption);
197 self
198 }
199 pub fn set_default_interface(mut self, default_interface: LocalInterface) -> Self {
201 self.default_interface = Some(default_interface.clone());
202 self
203 }
204 pub fn set_use_v6(mut self, use_v6: bool) -> Self {
206 self.use_v6 = use_v6;
207 self
208 }
209}
210
211pub struct TcpTunnelConfig {
212 pub route_idle_time: Duration,
213 pub tcp_multiplexing_limit: usize,
214 pub tcp_port: u16,
215}
216
217impl Default for TcpTunnelConfig {
218 fn default() -> Self {
219 Self {
220 route_idle_time: ROUTE_IDLE_TIME,
221 tcp_multiplexing_limit: MAX_MAJOR_SOCKET_COUNT,
222 tcp_port: 0,
223 }
224 }
225}
226
227impl TcpTunnelConfig {
228 pub fn set_tcp_multiplexing_limit(mut self, tcp_multiplexing_limit: usize) -> Self {
229 self.tcp_multiplexing_limit = tcp_multiplexing_limit;
230 self
231 }
232 pub fn set_route_idle_time(mut self, route_idle_time: Duration) -> Self {
233 self.route_idle_time = route_idle_time;
234 self
235 }
236 pub fn set_tcp_port(mut self, tcp_port: u16) -> Self {
237 self.tcp_port = tcp_port;
238 self
239 }
240}
241
242#[derive(Clone)]
243pub struct UdpTunnelConfig {
244 pub main_socket_count: usize,
245 pub sub_socket_count: usize,
246 pub model: Model,
247 pub udp_ports: Vec<u16>,
248}
249
250impl Default for UdpTunnelConfig {
251 fn default() -> Self {
252 Self {
253 main_socket_count: MAX_MAJOR_SOCKET_COUNT,
254 sub_socket_count: MAX_UDP_SUB_SOCKET_COUNT,
255 model: Model::Low,
256 udp_ports: vec![0, 0],
257 }
258 }
259}
260
261impl UdpTunnelConfig {
262 pub fn set_main_socket_count(mut self, count: usize) -> Self {
263 self.main_socket_count = count;
264 self
265 }
266 pub fn set_sub_socket_count(mut self, count: usize) -> Self {
267 self.sub_socket_count = count;
268 self
269 }
270 pub fn set_model(mut self, model: Model) -> Self {
271 self.model = model;
272 self
273 }
274
275 pub fn set_udp_ports(mut self, udp_ports: Vec<u16>) -> Self {
276 self.udp_ports = udp_ports;
277 self
278 }
279 pub fn set_simple_udp_port(mut self, udp_port: u16) -> Self {
280 self.udp_ports = vec![udp_port];
281 self
282 }
283}
284
285impl From<Config> for rust_p2p_core::tunnel::config::TunnelConfig {
286 fn from(value: Config) -> Self {
287 let recycle_buf = if value.recycle_buf_cap > 0 {
288 Some(RecycleBuf::new(
289 value.recycle_buf_cap,
290 value.send_buffer_size..usize::MAX,
291 ))
292 } else {
293 None
294 };
295 let udp_tunnel_config = value.udp_tunnel_config.map(|v| {
296 let mut config: rust_p2p_core::tunnel::config::UdpTunnelConfig = v.into();
297 config.recycle_buf.clone_from(&recycle_buf);
298 config.use_v6 = value.use_v6;
299 config
300 .default_interface
301 .clone_from(&value.default_interface);
302 config
303 });
304 let tcp_tunnel_config = value.tcp_tunnel_config.map(|v| {
305 let mut config: rust_p2p_core::tunnel::config::TcpTunnelConfig = v.into();
306 config.recycle_buf = recycle_buf;
307 config.use_v6 = value.use_v6;
308 config
309 .default_interface
310 .clone_from(&value.default_interface);
311 config
312 });
313 rust_p2p_core::tunnel::config::TunnelConfig {
314 major_socket_count: value.major_socket_count,
315 udp_tunnel_config,
316 tcp_tunnel_config,
317 }
318 }
319}
320
321impl From<UdpTunnelConfig> for rust_p2p_core::tunnel::config::UdpTunnelConfig {
322 fn from(value: UdpTunnelConfig) -> Self {
323 rust_p2p_core::tunnel::config::UdpTunnelConfig {
324 main_udp_count: value.main_socket_count,
325 sub_udp_count: value.sub_socket_count,
326 model: value.model,
327 default_interface: None,
328 udp_ports: value.udp_ports,
329 use_v6: false,
330 recycle_buf: None,
331 }
332 }
333}
334
335impl From<TcpTunnelConfig> for rust_p2p_core::tunnel::config::TcpTunnelConfig {
336 fn from(value: TcpTunnelConfig) -> Self {
337 rust_p2p_core::tunnel::config::TcpTunnelConfig {
338 route_idle_time: value.route_idle_time,
339 tcp_multiplexing_limit: value.tcp_multiplexing_limit,
340 default_interface: None,
341 tcp_port: value.tcp_port,
342 use_v6: false,
343 init_codec: Box::new(LengthPrefixedInitCodec),
344 recycle_buf: None,
345 }
346 }
347}
348
349pub(crate) struct LengthPrefixedEncoder {}
351
352pub(crate) struct LengthPrefixedDecoder {
353 buf: BytesMut,
354}
355
356impl LengthPrefixedEncoder {
357 pub(crate) fn new() -> Self {
358 Self {}
359 }
360}
361
362impl LengthPrefixedDecoder {
363 pub(crate) fn new() -> Self {
364 Self {
365 buf: Default::default(),
366 }
367 }
368}
369
370#[async_trait]
371impl Decoder for LengthPrefixedDecoder {
372 async fn decode(&mut self, read: &mut OwnedReadHalf, src: &mut [u8]) -> io::Result<usize> {
373 if src.len() < HEAD_LEN {
374 return Err(io::Error::other("too short"));
375 }
376 let mut offset = 0;
377 loop {
378 if self.buf.is_empty() {
379 let len = read.read(&mut src[offset..]).await?;
380 if len == 0 {
381 return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
382 }
383 offset += len;
384 if let Some(rs) = self.process_packet(src, offset) {
385 return rs;
386 }
387 } else if let Some(rs) = self.process_buf(src, &mut offset) {
388 return rs;
389 }
390 }
391 }
392
393 fn try_decode(&mut self, read: &mut OwnedReadHalf, src: &mut [u8]) -> io::Result<usize> {
394 if src.len() < HEAD_LEN {
395 return Err(io::Error::other("too short"));
396 }
397 let mut offset = 0;
398 loop {
399 if self.buf.is_empty() {
400 match read.try_read(&mut src[offset..]) {
401 Ok(len) => {
402 if len == 0 {
403 return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
404 }
405 offset += len;
406 }
407 Err(e) => {
408 if e.kind() == io::ErrorKind::WouldBlock && offset > 0 {
409 self.buf.extend_from_slice(&src[..offset]);
410 }
411 return Err(e);
412 }
413 }
414 if let Some(rs) = self.process_packet(src, offset) {
415 return rs;
416 }
417 } else if let Some(rs) = self.process_buf(src, &mut offset) {
418 return rs;
419 }
420 }
421 }
422}
423impl LengthPrefixedDecoder {
424 fn process_buf(&mut self, src: &mut [u8], offset: &mut usize) -> Option<io::Result<usize>> {
425 let len = self.buf.len();
426 if len < HEAD_LEN {
427 src[..len].copy_from_slice(self.buf.as_ref());
428 *offset += len;
429 self.buf.clear();
430 return None;
431 }
432 let packet = unsafe { NetPacket::new_unchecked(self.buf.as_ref()) };
433 let data_length = packet.data_length() as usize;
434 if data_length > src.len() {
435 return Some(Err(io::Error::other("too short")));
436 }
437 if data_length > len {
438 src[..len].copy_from_slice(self.buf.as_ref());
439 *offset += len;
440 self.buf.clear();
441 None
442 } else {
443 src[..data_length].copy_from_slice(&self.buf[..data_length]);
444 if data_length == len {
445 self.buf.clear();
446 } else {
447 self.buf.advance(data_length);
448 }
449 Some(Ok(data_length))
450 }
451 }
452 fn process_packet(&mut self, src: &mut [u8], offset: usize) -> Option<io::Result<usize>> {
453 if offset < HEAD_LEN {
454 return None;
455 }
456 let packet = unsafe { NetPacket::new_unchecked(&src) };
457 let data_length = packet.data_length() as usize;
458 if data_length > src.len() {
459 return Some(Err(io::Error::other("too short")));
460 }
461 match data_length.cmp(&offset) {
462 std::cmp::Ordering::Less => {
463 self.buf.extend_from_slice(&src[data_length..offset]);
464 Some(Ok(data_length))
465 }
466 std::cmp::Ordering::Equal => Some(Ok(data_length)),
467 std::cmp::Ordering::Greater => None,
468 }
469 }
470}
471#[async_trait]
472impl Encoder for LengthPrefixedEncoder {
473 async fn encode(&mut self, write: &mut OwnedWriteHalf, data: &[u8]) -> io::Result<()> {
474 let len = data.len();
475 let packet = unsafe { NetPacket::new_unchecked(data) };
476 if packet.data_length() as usize != len {
477 return Err(io::Error::from(io::ErrorKind::InvalidData));
478 }
479 write.write_all(data).await
480 }
481
482 async fn encode_multiple(
483 &mut self,
484 write: &mut OwnedWriteHalf,
485 bufs: &[IoSlice<'_>],
486 ) -> io::Result<()> {
487 let mut index = 0;
488 let mut total_written = 0;
489 let total: usize = bufs.iter().map(|v| v.len()).sum();
490 loop {
491 if index == bufs.len() - 1 {
492 write.write_all(&bufs[index]).await?;
493 return Ok(());
494 }
495 let len = write.write_vectored(&bufs[index..]).await?;
496 if len == 0 {
497 return Err(io::Error::from(io::ErrorKind::WriteZero));
498 }
499 total_written += len;
500 if total_written == total {
501 return Ok(());
502 }
503 let mut written = len;
504 for buf in &bufs[index..] {
505 if buf.len() > written {
506 if written != 0 {
507 index += 1;
508 total_written += buf.len() - written;
509 write.write_all(&buf[written..]).await?;
510 if index == bufs.len() {
511 return Ok(());
512 }
513 }
514 break;
515 } else {
516 index += 1;
517 written -= buf.len();
518 }
519 }
520 }
521 }
522}
523
524#[derive(Clone)]
525pub(crate) struct LengthPrefixedInitCodec;
526
527impl InitCodec for LengthPrefixedInitCodec {
528 fn codec(&self, _addr: SocketAddr) -> io::Result<(Box<dyn Decoder>, Box<dyn Encoder>)> {
529 Ok((
530 Box::new(LengthPrefixedDecoder::new()),
531 Box::new(LengthPrefixedEncoder::new()),
532 ))
533 }
534}
535#[async_trait]
536pub trait DataInterceptor: Send + Sync {
537 async fn pre_handle(&self, data: &mut RecvResult) -> bool;
538}
539#[derive(Clone)]
540pub struct DefaultInterceptor;
541
542#[async_trait]
543impl DataInterceptor for DefaultInterceptor {
544 async fn pre_handle(&self, _data: &mut RecvResult) -> bool {
545 false
546 }
547}