rustp2p/config/
mod.rs

1use std::io;
2use std::io::IoSlice;
3use std::net::SocketAddr;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bytes::{Buf, BytesMut};
8
9use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
10
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12
13use crate::protocol::node_id::{GroupCode, NodeID};
14use crate::protocol::{NetPacket, HEAD_LEN};
15use crate::tunnel::{NodeAddress, PeerNodeAddress, RecvResult};
16pub use rust_p2p_core::nat::*;
17pub use rust_p2p_core::punch::config::{PunchModel, PunchPolicy, PunchPolicySet};
18pub use rust_p2p_core::socket::LocalInterface;
19pub use rust_p2p_core::tunnel::config::LoadBalance;
20use rust_p2p_core::tunnel::recycle::RecycleBuf;
21use rust_p2p_core::tunnel::tcp::{Decoder, Encoder, InitCodec};
22pub use rust_p2p_core::tunnel::udp::Model;
23
24pub(crate) mod punch_info;
25
26pub(crate) const ROUTE_IDLE_TIME: Duration = Duration::from_secs(10);
27
28pub struct Config {
29    pub load_balance: LoadBalance,
30    pub major_socket_count: usize,
31    pub route_idle_time: Duration,
32    pub udp_tunnel_config: Option<UdpTunnelConfig>,
33    pub tcp_tunnel_config: Option<TcpTunnelConfig>,
34    pub group_code: Option<GroupCode>,
35    pub self_id: Option<NodeID>,
36    pub direct_addrs: Option<Vec<PeerNodeAddress>>,
37    pub send_buffer_size: usize,
38    pub recv_buffer_size: usize,
39    pub query_id_interval: Duration,
40    pub query_id_max_num: usize,
41    pub heartbeat_interval: Duration,
42    pub tcp_stun_servers: Option<Vec<String>>,
43    pub udp_stun_servers: Option<Vec<String>>,
44    pub mapping_addrs: Option<Vec<NodeAddress>>,
45    pub dns: Option<Vec<String>>,
46    pub recycle_buf_cap: usize,
47    #[cfg(any(
48        feature = "aes-gcm-openssl",
49        feature = "aes-gcm-ring",
50        feature = "chacha20-poly1305-openssl",
51        feature = "chacha20-poly1305-ring"
52    ))]
53    pub encryption: Option<crate::cipher::Algorithm>,
54    pub default_interface: Option<LocalInterface>,
55    pub use_v6: bool,
56}
57
58impl Default for Config {
59    fn default() -> Self {
60        Self {
61            load_balance: LoadBalance::MinHopLowestLatency,
62            major_socket_count: MAX_MAJOR_SOCKET_COUNT,
63            udp_tunnel_config: Some(Default::default()),
64            tcp_tunnel_config: Some(Default::default()),
65            route_idle_time: ROUTE_IDLE_TIME,
66            group_code: None,
67            self_id: None,
68            direct_addrs: None,
69            send_buffer_size: 2048,
70            recv_buffer_size: 2048,
71            query_id_interval: Duration::from_secs(17),
72            query_id_max_num: 3,
73            heartbeat_interval: Duration::from_secs(5),
74            tcp_stun_servers: Some(vec![
75                "stun.flashdance.cx".to_string(),
76                "stun.sipnet.net".to_string(),
77                "stun.nextcloud.com:443".to_string(),
78            ]),
79            udp_stun_servers: Some(vec![
80                "stun.miwifi.com".to_string(),
81                "stun.chat.bilibili.com".to_string(),
82                "stun.hitv.com".to_string(),
83                "stun.l.google.com:19302".to_string(),
84                "stun1.l.google.com:19302".to_string(),
85                "stun2.l.google.com:19302".to_string(),
86            ]),
87            mapping_addrs: None,
88            dns: None,
89            recycle_buf_cap: 64,
90            #[cfg(any(
91                feature = "aes-gcm-openssl",
92                feature = "aes-gcm-ring",
93                feature = "chacha20-poly1305-openssl",
94                feature = "chacha20-poly1305-ring"
95            ))]
96            encryption: None,
97            default_interface: None,
98            use_v6: rust_p2p_core::tunnel::config::UdpTunnelConfig::default()
99                .set_use_v6(true)
100                .check()
101                .is_ok(),
102        }
103    }
104}
105
106pub(crate) const MAX_MAJOR_SOCKET_COUNT: usize = 2;
107pub(crate) const MAX_UDP_SUB_SOCKET_COUNT: usize = 82;
108
109impl Config {
110    pub fn none_tcp(self) -> Self {
111        self
112    }
113}
114
115impl Config {
116    pub fn empty() -> Self {
117        Self::default()
118    }
119    pub fn set_load_balance(mut self, load_balance: LoadBalance) -> Self {
120        self.load_balance = load_balance;
121        self
122    }
123    pub fn set_main_socket_count(mut self, count: usize) -> Self {
124        self.major_socket_count = count;
125        self
126    }
127
128    pub fn set_udp_tunnel_config(mut self, config: UdpTunnelConfig) -> Self {
129        self.udp_tunnel_config.replace(config);
130        self
131    }
132    pub fn set_tcp_tunnel_config(mut self, config: TcpTunnelConfig) -> Self {
133        self.tcp_tunnel_config.replace(config);
134        self
135    }
136    pub fn set_group_code(mut self, group_code: GroupCode) -> Self {
137        self.group_code.replace(group_code);
138        self
139    }
140    pub fn set_node_id(mut self, self_id: NodeID) -> Self {
141        self.self_id.replace(self_id);
142        self
143    }
144    pub fn set_direct_addrs(mut self, direct_addrs: Vec<PeerNodeAddress>) -> Self {
145        self.direct_addrs.replace(direct_addrs);
146        self
147    }
148    pub fn set_send_buffer_size(mut self, send_buffer_size: usize) -> Self {
149        self.send_buffer_size = send_buffer_size;
150        self
151    }
152    pub fn set_recv_buffer_size(mut self, recv_buffer_size: usize) -> Self {
153        self.recv_buffer_size = recv_buffer_size;
154        self
155    }
156    pub fn set_query_id_interval(mut self, query_id_interval: Duration) -> Self {
157        self.query_id_interval = query_id_interval;
158        self
159    }
160    pub fn set_query_id_max_num(mut self, query_id_max_num: usize) -> Self {
161        self.query_id_max_num = query_id_max_num;
162        self
163    }
164    pub fn set_heartbeat_interval(mut self, heartbeat_interval: Duration) -> Self {
165        self.heartbeat_interval = heartbeat_interval;
166        self
167    }
168    pub fn set_tcp_stun_servers(mut self, tcp_stun_servers: Vec<String>) -> Self {
169        self.tcp_stun_servers.replace(tcp_stun_servers);
170        self
171    }
172    pub fn set_udp_stun_servers(mut self, udp_stun_servers: Vec<String>) -> Self {
173        self.udp_stun_servers.replace(udp_stun_servers);
174        self
175    }
176    /// Other nodes will attempt to connect to the current node through this configuration
177    pub fn set_mapping_addrs(mut self, mapping_addrs: Vec<NodeAddress>) -> Self {
178        self.mapping_addrs.replace(mapping_addrs);
179        self
180    }
181    pub fn set_dns(mut self, dns: Vec<String>) -> Self {
182        self.dns.replace(dns);
183        self
184    }
185    pub fn set_recycle_buf_cap(mut self, recycle_buf_cap: usize) -> Self {
186        self.recycle_buf_cap = recycle_buf_cap;
187        self
188    }
189    #[cfg(any(
190        feature = "aes-gcm-openssl",
191        feature = "aes-gcm-ring",
192        feature = "chacha20-poly1305-openssl",
193        feature = "chacha20-poly1305-ring"
194    ))]
195    pub fn set_encryption(mut self, encryption: crate::cipher::Algorithm) -> Self {
196        self.encryption.replace(encryption);
197        self
198    }
199    /// Bind to this network card
200    pub fn set_default_interface(mut self, default_interface: LocalInterface) -> Self {
201        self.default_interface = Some(default_interface.clone());
202        self
203    }
204    /// Whether to use IPv6
205    pub fn set_use_v6(mut self, use_v6: bool) -> Self {
206        self.use_v6 = use_v6;
207        self
208    }
209}
210
211pub struct TcpTunnelConfig {
212    pub route_idle_time: Duration,
213    pub tcp_multiplexing_limit: usize,
214    pub tcp_port: u16,
215}
216
217impl Default for TcpTunnelConfig {
218    fn default() -> Self {
219        Self {
220            route_idle_time: ROUTE_IDLE_TIME,
221            tcp_multiplexing_limit: MAX_MAJOR_SOCKET_COUNT,
222            tcp_port: 0,
223        }
224    }
225}
226
227impl TcpTunnelConfig {
228    pub fn set_tcp_multiplexing_limit(mut self, tcp_multiplexing_limit: usize) -> Self {
229        self.tcp_multiplexing_limit = tcp_multiplexing_limit;
230        self
231    }
232    pub fn set_route_idle_time(mut self, route_idle_time: Duration) -> Self {
233        self.route_idle_time = route_idle_time;
234        self
235    }
236    pub fn set_tcp_port(mut self, tcp_port: u16) -> Self {
237        self.tcp_port = tcp_port;
238        self
239    }
240}
241
242#[derive(Clone)]
243pub struct UdpTunnelConfig {
244    pub main_socket_count: usize,
245    pub sub_socket_count: usize,
246    pub model: Model,
247    pub udp_ports: Vec<u16>,
248}
249
250impl Default for UdpTunnelConfig {
251    fn default() -> Self {
252        Self {
253            main_socket_count: MAX_MAJOR_SOCKET_COUNT,
254            sub_socket_count: MAX_UDP_SUB_SOCKET_COUNT,
255            model: Model::Low,
256            udp_ports: vec![0, 0],
257        }
258    }
259}
260
261impl UdpTunnelConfig {
262    pub fn set_main_socket_count(mut self, count: usize) -> Self {
263        self.main_socket_count = count;
264        self
265    }
266    pub fn set_sub_socket_count(mut self, count: usize) -> Self {
267        self.sub_socket_count = count;
268        self
269    }
270    pub fn set_model(mut self, model: Model) -> Self {
271        self.model = model;
272        self
273    }
274
275    pub fn set_udp_ports(mut self, udp_ports: Vec<u16>) -> Self {
276        self.udp_ports = udp_ports;
277        self
278    }
279    pub fn set_simple_udp_port(mut self, udp_port: u16) -> Self {
280        self.udp_ports = vec![udp_port];
281        self
282    }
283}
284
285impl From<Config> for rust_p2p_core::tunnel::config::TunnelConfig {
286    fn from(value: Config) -> Self {
287        let recycle_buf = if value.recycle_buf_cap > 0 {
288            Some(RecycleBuf::new(
289                value.recycle_buf_cap,
290                value.send_buffer_size..usize::MAX,
291            ))
292        } else {
293            None
294        };
295        let udp_tunnel_config = value.udp_tunnel_config.map(|v| {
296            let mut config: rust_p2p_core::tunnel::config::UdpTunnelConfig = v.into();
297            config.recycle_buf.clone_from(&recycle_buf);
298            config.use_v6 = value.use_v6;
299            config
300                .default_interface
301                .clone_from(&value.default_interface);
302            config
303        });
304        let tcp_tunnel_config = value.tcp_tunnel_config.map(|v| {
305            let mut config: rust_p2p_core::tunnel::config::TcpTunnelConfig = v.into();
306            config.recycle_buf = recycle_buf;
307            config.use_v6 = value.use_v6;
308            config
309                .default_interface
310                .clone_from(&value.default_interface);
311            config
312        });
313        rust_p2p_core::tunnel::config::TunnelConfig {
314            major_socket_count: value.major_socket_count,
315            udp_tunnel_config,
316            tcp_tunnel_config,
317        }
318    }
319}
320
321impl From<UdpTunnelConfig> for rust_p2p_core::tunnel::config::UdpTunnelConfig {
322    fn from(value: UdpTunnelConfig) -> Self {
323        rust_p2p_core::tunnel::config::UdpTunnelConfig {
324            main_udp_count: value.main_socket_count,
325            sub_udp_count: value.sub_socket_count,
326            model: value.model,
327            default_interface: None,
328            udp_ports: value.udp_ports,
329            use_v6: false,
330            recycle_buf: None,
331        }
332    }
333}
334
335impl From<TcpTunnelConfig> for rust_p2p_core::tunnel::config::TcpTunnelConfig {
336    fn from(value: TcpTunnelConfig) -> Self {
337        rust_p2p_core::tunnel::config::TcpTunnelConfig {
338            route_idle_time: value.route_idle_time,
339            tcp_multiplexing_limit: value.tcp_multiplexing_limit,
340            default_interface: None,
341            tcp_port: value.tcp_port,
342            use_v6: false,
343            init_codec: Box::new(LengthPrefixedInitCodec),
344            recycle_buf: None,
345        }
346    }
347}
348
349/// Fixed-length prefix encoder/decoder.
350pub(crate) struct LengthPrefixedEncoder {}
351
352pub(crate) struct LengthPrefixedDecoder {
353    buf: BytesMut,
354}
355
356impl LengthPrefixedEncoder {
357    pub(crate) fn new() -> Self {
358        Self {}
359    }
360}
361
362impl LengthPrefixedDecoder {
363    pub(crate) fn new() -> Self {
364        Self {
365            buf: Default::default(),
366        }
367    }
368}
369
370#[async_trait]
371impl Decoder for LengthPrefixedDecoder {
372    async fn decode(&mut self, read: &mut OwnedReadHalf, src: &mut [u8]) -> io::Result<usize> {
373        if src.len() < HEAD_LEN {
374            return Err(io::Error::other("too short"));
375        }
376        let mut offset = 0;
377        loop {
378            if self.buf.is_empty() {
379                let len = read.read(&mut src[offset..]).await?;
380                if len == 0 {
381                    return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
382                }
383                offset += len;
384                if let Some(rs) = self.process_packet(src, offset) {
385                    return rs;
386                }
387            } else if let Some(rs) = self.process_buf(src, &mut offset) {
388                return rs;
389            }
390        }
391    }
392
393    fn try_decode(&mut self, read: &mut OwnedReadHalf, src: &mut [u8]) -> io::Result<usize> {
394        if src.len() < HEAD_LEN {
395            return Err(io::Error::other("too short"));
396        }
397        let mut offset = 0;
398        loop {
399            if self.buf.is_empty() {
400                match read.try_read(&mut src[offset..]) {
401                    Ok(len) => {
402                        if len == 0 {
403                            return Err(io::Error::from(io::ErrorKind::UnexpectedEof));
404                        }
405                        offset += len;
406                    }
407                    Err(e) => {
408                        if e.kind() == io::ErrorKind::WouldBlock && offset > 0 {
409                            self.buf.extend_from_slice(&src[..offset]);
410                        }
411                        return Err(e);
412                    }
413                }
414                if let Some(rs) = self.process_packet(src, offset) {
415                    return rs;
416                }
417            } else if let Some(rs) = self.process_buf(src, &mut offset) {
418                return rs;
419            }
420        }
421    }
422}
423impl LengthPrefixedDecoder {
424    fn process_buf(&mut self, src: &mut [u8], offset: &mut usize) -> Option<io::Result<usize>> {
425        let len = self.buf.len();
426        if len < HEAD_LEN {
427            src[..len].copy_from_slice(self.buf.as_ref());
428            *offset += len;
429            self.buf.clear();
430            return None;
431        }
432        let packet = unsafe { NetPacket::new_unchecked(self.buf.as_ref()) };
433        let data_length = packet.data_length() as usize;
434        if data_length > src.len() {
435            return Some(Err(io::Error::other("too short")));
436        }
437        if data_length > len {
438            src[..len].copy_from_slice(self.buf.as_ref());
439            *offset += len;
440            self.buf.clear();
441            None
442        } else {
443            src[..data_length].copy_from_slice(&self.buf[..data_length]);
444            if data_length == len {
445                self.buf.clear();
446            } else {
447                self.buf.advance(data_length);
448            }
449            Some(Ok(data_length))
450        }
451    }
452    fn process_packet(&mut self, src: &mut [u8], offset: usize) -> Option<io::Result<usize>> {
453        if offset < HEAD_LEN {
454            return None;
455        }
456        let packet = unsafe { NetPacket::new_unchecked(&src) };
457        let data_length = packet.data_length() as usize;
458        if data_length > src.len() {
459            return Some(Err(io::Error::other("too short")));
460        }
461        match data_length.cmp(&offset) {
462            std::cmp::Ordering::Less => {
463                self.buf.extend_from_slice(&src[data_length..offset]);
464                Some(Ok(data_length))
465            }
466            std::cmp::Ordering::Equal => Some(Ok(data_length)),
467            std::cmp::Ordering::Greater => None,
468        }
469    }
470}
471#[async_trait]
472impl Encoder for LengthPrefixedEncoder {
473    async fn encode(&mut self, write: &mut OwnedWriteHalf, data: &[u8]) -> io::Result<()> {
474        let len = data.len();
475        let packet = unsafe { NetPacket::new_unchecked(data) };
476        if packet.data_length() as usize != len {
477            return Err(io::Error::from(io::ErrorKind::InvalidData));
478        }
479        write.write_all(data).await
480    }
481
482    async fn encode_multiple(
483        &mut self,
484        write: &mut OwnedWriteHalf,
485        bufs: &[IoSlice<'_>],
486    ) -> io::Result<()> {
487        let mut index = 0;
488        let mut total_written = 0;
489        let total: usize = bufs.iter().map(|v| v.len()).sum();
490        loop {
491            if index == bufs.len() - 1 {
492                write.write_all(&bufs[index]).await?;
493                return Ok(());
494            }
495            let len = write.write_vectored(&bufs[index..]).await?;
496            if len == 0 {
497                return Err(io::Error::from(io::ErrorKind::WriteZero));
498            }
499            total_written += len;
500            if total_written == total {
501                return Ok(());
502            }
503            let mut written = len;
504            for buf in &bufs[index..] {
505                if buf.len() > written {
506                    if written != 0 {
507                        index += 1;
508                        total_written += buf.len() - written;
509                        write.write_all(&buf[written..]).await?;
510                        if index == bufs.len() {
511                            return Ok(());
512                        }
513                    }
514                    break;
515                } else {
516                    index += 1;
517                    written -= buf.len();
518                }
519            }
520        }
521    }
522}
523
524#[derive(Clone)]
525pub(crate) struct LengthPrefixedInitCodec;
526
527impl InitCodec for LengthPrefixedInitCodec {
528    fn codec(&self, _addr: SocketAddr) -> io::Result<(Box<dyn Decoder>, Box<dyn Encoder>)> {
529        Ok((
530            Box::new(LengthPrefixedDecoder::new()),
531            Box::new(LengthPrefixedEncoder::new()),
532        ))
533    }
534}
535#[async_trait]
536pub trait DataInterceptor: Send + Sync {
537    async fn pre_handle(&self, data: &mut RecvResult) -> bool;
538}
539#[derive(Clone)]
540pub struct DefaultInterceptor;
541
542#[async_trait]
543impl DataInterceptor for DefaultInterceptor {
544    async fn pre_handle(&self, _data: &mut RecvResult) -> bool {
545        false
546    }
547}