rustp2p_reliable/lib.rs
1//! # rustp2p-reliable - Reliable Transport over UDP
2//!
3//! `rustp2p-reliable` provides reliable, ordered transport over UDP using the KCP protocol.
4//! It combines the low latency of UDP with the reliability of TCP, making it ideal for
5//! real-time applications that require guaranteed delivery.
6//!
7//! ## Features
8//!
9//! - **KCP Protocol**: Fast and reliable UDP transport
10//! - **TCP Fallback**: Automatic fallback to TCP when needed
11//! - **NAT Traversal**: Built-in hole punching support
12//! - **Automatic Maintenance**: Connection health monitoring and recovery
13//! - **Unified Interface**: Same API for both KCP and TCP tunnels
14//!
15//! ## Quick Start
16//!
17//! ### Creating a Reliable Tunnel Listener
18//!
19//! ```rust,no_run
20//! use rustp2p_reliable::{from_config, Config};
21//!
22//! # #[tokio::main]
23//! # async fn main() -> std::io::Result<()> {
24//! let config = Config::default();
25//! let (mut listener, puncher) = from_config(config).await?;
26//!
27//! // Accept incoming connections
28//! while let Ok(tunnel) = listener.accept().await {
29//! tokio::spawn(async move {
30//! // Handle the reliable tunnel
31//! while let Ok(data) = tunnel.next().await {
32//! println!("Received: {} bytes", data.len());
33//! }
34//! });
35//! }
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! ## Basic Usage
41//!
42//! ### Sending Data
43//!
44//! ```rust,no_run
45//! use rustp2p_reliable::ReliableTunnel;
46//! use bytes::BytesMut;
47//!
48//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
49//! let data = BytesMut::from(&b"Hello, world!"[..]);
50//! tunnel.send(data).await?;
51//! # Ok(())
52//! # }
53//! ```
54//!
55//! ### Receiving Data
56//!
57//! ```rust,no_run
58//! use rustp2p_reliable::ReliableTunnel;
59//!
60//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
61//! loop {
62//! match tunnel.next().await {
63//! Ok(data) => {
64//! println!("Received {} bytes", data.len());
65//! }
66//! Err(e) => {
67//! eprintln!("Error: {}", e);
68//! break;
69//! }
70//! }
71//! }
72//! # Ok(())
73//! # }
74//! ```
75//!
76//! ## NAT Traversal with Puncher
77//!
78//! Use the `Puncher` to establish connections through NATs:
79//!
80//! ```rust,no_run
81//! use rustp2p_reliable::{Puncher, PunchInfo};
82//!
83//! # async fn example(puncher: Puncher) -> std::io::Result<()> {
84//! // Get your NAT information
85//! let nat_info = puncher.nat_info();
86//! println!("NAT Type: {:?}", nat_info.nat_type);
87//! println!("Public IPs: {:?}", nat_info.public_ips);
88//!
89//! // Punch through NAT to reach a peer
90//! let punch_info = PunchInfo::default();
91//! puncher.punch(punch_info).await?;
92//! # Ok(())
93//! # }
94//! ```
95//!
96//! ## KCP vs TCP
97//!
98//! The library automatically selects between KCP and TCP based on network conditions:
99//!
100//! - **KCP**: Used for direct UDP connections (low latency, good for real-time data)
101//! - **TCP**: Used when UDP is blocked or unreliable (guaranteed delivery)
102//!
103//! You can check the tunnel type:
104//!
105//! ```rust,no_run
106//! use rustp2p_reliable::{ReliableTunnel, ReliableTunnelType};
107//!
108//! # fn example(tunnel: ReliableTunnel) {
109//! match tunnel.tunnel_type() {
110//! ReliableTunnelType::Kcp => println!("Using KCP (fast UDP)"),
111//! ReliableTunnelType::Tcp => println!("Using TCP (reliable)"),
112//! }
113//! # }
114//! ```
115//!
116//! ## Configuration
117//!
118//! Configure the reliable transport layer:
119//!
120//! ```rust,no_run
121//! use rustp2p_reliable::Config;
122//! use rust_p2p_core::tunnel::TunnelConfig;
123//!
124//! # fn example() {
125//! let config = Config {
126//! tunnel_config: TunnelConfig::default(),
127//! tcp_stun_servers: vec![
128//! "stun.l.google.com:19302".to_string(),
129//! ],
130//! udp_stun_servers: vec![
131//! "stun1.l.google.com:19302".to_string(),
132//! ],
133//! };
134//! # }
135//! ```
136//!
137//! ## Advanced Features
138//!
139//! ### Custom KCP Conversation IDs
140//!
141//! For applications that need multiple independent KCP streams:
142//!
143//! ```rust,no_run
144//! use rustp2p_reliable::Puncher;
145//!
146//! # async fn example(puncher: Puncher) -> std::io::Result<()> {
147//! let kcp_conv = 12345;
148//! let punch_info = Default::default();
149//! puncher.punch_conv(kcp_conv, punch_info).await?;
150//! # Ok(())
151//! # }
152//! ```
153//!
154//! ### Raw Data Sending
155//!
156//! Send raw UDP data without KCP protocol overhead:
157//!
158//! ```rust,no_run
159//! use rustp2p_reliable::ReliableTunnel;
160//! use bytes::BytesMut;
161//!
162//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
163//! let data = BytesMut::from(&b"raw data"[..]);
164//! tunnel.send_raw(data).await?;
165//! # Ok(())
166//! # }
167//! ```
168//!
169//! ## Connection Information
170//!
171//! Get information about the connection:
172//!
173//! ```rust,no_run
174//! use rustp2p_reliable::ReliableTunnel;
175//!
176//! # fn example(tunnel: ReliableTunnel) {
177//! println!("Local address: {}", tunnel.local_addr());
178//! println!("Remote address: {}", tunnel.remote_addr());
179//! println!("Tunnel type: {:?}", tunnel.tunnel_type());
180//! # }
181//! ```
182//!
183//! ## Thread Safety
184//!
185//! All types are designed to work with Tokio's async runtime and can be safely
186//! shared across tasks using `Arc` when needed.
187//!
188//! ## See Also
189//!
190//! - [`rustp2p`](../rustp2p/index.html) - High-level P2P library
191//! - [`rustp2p-core`](../rust_p2p_core/index.html) - Core NAT traversal functionality
192
193pub use crate::config::Config;
194use crate::kcp::{DataType, KcpHandle};
195use crate::maintain::start_task;
196use async_shutdown::ShutdownManager;
197use bytes::BytesMut;
198use flume::Receiver;
199use parking_lot::Mutex;
200use rand::seq::SliceRandom;
201pub use rust_p2p_core::nat::NatInfo;
202use rust_p2p_core::nat::NatType;
203pub use rust_p2p_core::punch::config::*;
204use rust_p2p_core::punch::Puncher as CorePuncher;
205use rust_p2p_core::route::Index;
206use rust_p2p_core::socket::LocalInterface;
207pub use rust_p2p_core::tunnel::config::*;
208pub use rust_p2p_core::tunnel::tcp::{
209 BytesCodec, BytesInitCodec, Decoder, Encoder, InitCodec, LengthPrefixedCodec,
210 LengthPrefixedInitCodec,
211};
212use rust_p2p_core::tunnel::tcp::{TcpTunnel, WeakTcpTunnelSender};
213use rust_p2p_core::tunnel::udp::{UDPIndex, UdpTunnel};
214use rust_p2p_core::tunnel::{SocketManager, Tunnel, TunnelDispatcher};
215use std::io;
216use std::net::{Ipv4Addr, SocketAddr};
217use std::sync::Arc;
218use tokio::sync::mpsc::Sender;
219
220mod config;
221mod kcp;
222mod maintain;
223
224/// Creates a reliable tunnel system from configuration.
225///
226/// This is the main entry point for creating a reliable transport layer. It returns
227/// a listener for accepting incoming connections and a puncher for NAT traversal.
228///
229/// # Arguments
230///
231/// * `config` - Configuration for tunnels and STUN servers
232///
233/// # Returns
234///
235/// Returns a tuple containing:
236/// - `ReliableTunnelListener`: For accepting incoming connections
237/// - `Puncher`: For NAT traversal and connection establishment
238///
239/// # Examples
240///
241/// ```rust,no_run
242/// use rustp2p_reliable::{from_config, Config};
243///
244/// # #[tokio::main]
245/// # async fn main() -> std::io::Result<()> {
246/// let config = Config::default();
247/// let (listener, puncher) = from_config(config).await?;
248/// # Ok(())
249/// # }
250/// ```
251pub async fn from_config(config: Config) -> io::Result<(ReliableTunnelListener, Puncher)> {
252 let tunnel_config = config.tunnel_config;
253 let tcp_stun_servers = config.tcp_stun_servers;
254 let udp_stun_servers = config.udp_stun_servers;
255 let default_interface = tunnel_config
256 .udp_tunnel_config
257 .as_ref()
258 .map(|v| v.default_interface.clone())
259 .unwrap_or_default();
260
261 let (unified_tunnel_factory, puncher) =
262 rust_p2p_core::tunnel::new_tunnel_component(tunnel_config)?;
263 let manager = unified_tunnel_factory.socket_manager();
264 let shutdown_manager = ShutdownManager::<()>::new();
265 let puncher = Puncher::new(
266 default_interface,
267 tcp_stun_servers,
268 udp_stun_servers,
269 puncher,
270 manager,
271 )
272 .await?;
273 let listener = ReliableTunnelListener::new(
274 shutdown_manager.clone(),
275 unified_tunnel_factory,
276 puncher.punch_context.clone(),
277 );
278 start_task(shutdown_manager, puncher.clone());
279 Ok((listener, puncher))
280}
281/// Listener for accepting reliable tunnel connections.
282///
283/// The listener accepts both KCP (UDP-based) and TCP connections,
284/// providing a unified interface for handling incoming connections.
285///
286/// # Examples
287///
288/// ```rust,no_run
289/// # use rustp2p_reliable::{from_config, Config, ReliableTunnelListener};
290/// # #[tokio::main]
291/// # async fn main() -> std::io::Result<()> {
292/// # let (mut listener, puncher) = from_config(Config::default()).await?;
293/// while let Ok(tunnel) = listener.accept().await {
294/// tokio::spawn(async move {
295/// // Handle the connection
296/// });
297/// }
298/// # Ok(())
299/// # }
300/// ```
301pub struct ReliableTunnelListener {
302 shutdown_manager: ShutdownManager<()>,
303 punch_context: Arc<PunchContext>,
304 unified_tunnel_factory: TunnelDispatcher,
305 kcp_receiver: tokio::sync::mpsc::Receiver<KcpMessageHub>,
306 kcp_sender: Sender<KcpMessageHub>,
307}
308/// NAT traversal puncher for establishing direct connections.
309///
310/// The `Puncher` handles NAT type detection and hole punching to establish
311/// direct peer-to-peer connections.
312///
313/// # Examples
314///
315/// ```rust,no_run
316/// # use rustp2p_reliable::Puncher;
317/// # async fn example(puncher: Puncher) -> std::io::Result<()> {
318/// // Get NAT information
319/// let nat_info = puncher.nat_info();
320/// println!("NAT Type: {:?}", nat_info.nat_type);
321/// # Ok(())
322/// # }
323/// ```
324#[derive(Clone)]
325pub struct Puncher {
326 punch_context: Arc<PunchContext>,
327 puncher: CorePuncher,
328 socket_manager: SocketManager,
329}
330impl Drop for ReliableTunnelListener {
331 fn drop(&mut self) {
332 _ = self.shutdown_manager.trigger_shutdown(());
333 }
334}
335pub(crate) struct PunchContext {
336 default_interface: Option<LocalInterface>,
337 tcp_stun_servers: Vec<String>,
338 udp_stun_servers: Vec<String>,
339 nat_info: Arc<Mutex<NatInfo>>,
340}
341impl PunchContext {
342 pub fn new(
343 default_interface: Option<LocalInterface>,
344 tcp_stun_servers: Vec<String>,
345 udp_stun_servers: Vec<String>,
346 local_udp_ports: Vec<u16>,
347 local_tcp_port: u16,
348 ) -> Self {
349 let public_udp_ports = vec![0; local_udp_ports.len()];
350 let nat_info = NatInfo {
351 nat_type: Default::default(),
352 public_ips: vec![],
353 public_udp_ports,
354 mapping_tcp_addr: vec![],
355 mapping_udp_addr: vec![],
356 public_port_range: 0,
357 local_ipv4: Ipv4Addr::UNSPECIFIED,
358 local_ipv4s: vec![],
359 ipv6: None,
360 local_udp_ports,
361 local_tcp_port,
362 public_tcp_port: 0,
363 };
364 Self {
365 default_interface,
366 tcp_stun_servers,
367 udp_stun_servers,
368 nat_info: Arc::new(Mutex::new(nat_info)),
369 }
370 }
371 pub fn set_public_info(
372 &self,
373 nat_type: NatType,
374 mut ips: Vec<Ipv4Addr>,
375 public_port_range: u16,
376 ) {
377 ips.retain(rust_p2p_core::extend::addr::is_ipv4_global);
378 let mut guard = self.nat_info.lock();
379 guard.public_ips = ips;
380 guard.nat_type = nat_type;
381 guard.public_port_range = public_port_range;
382 }
383 fn mapping_addr(addr: SocketAddr) -> Option<(Ipv4Addr, u16)> {
384 match addr {
385 SocketAddr::V4(addr) => Some((*addr.ip(), addr.port())),
386 SocketAddr::V6(addr) => addr.ip().to_ipv4_mapped().map(|ip| (ip, addr.port())),
387 }
388 }
389 pub fn update_tcp_public_addr(&self, addr: SocketAddr) {
390 let (ip, port) = if let Some(r) = Self::mapping_addr(addr) {
391 r
392 } else {
393 return;
394 };
395 let mut nat_info = self.nat_info.lock();
396 if rust_p2p_core::extend::addr::is_ipv4_global(&ip) && !nat_info.public_ips.contains(&ip) {
397 nat_info.public_ips.push(ip);
398 }
399 nat_info.public_tcp_port = port;
400 }
401 pub fn update_public_addr(&self, index: Index, addr: SocketAddr) {
402 let (ip, port) = if let Some(r) = Self::mapping_addr(addr) {
403 r
404 } else {
405 return;
406 };
407 let mut nat_info = self.nat_info.lock();
408
409 if rust_p2p_core::extend::addr::is_ipv4_global(&ip) {
410 if !nat_info.public_ips.contains(&ip) {
411 nat_info.public_ips.push(ip);
412 }
413 match index {
414 Index::Udp(index) => {
415 let index = match index {
416 UDPIndex::MainV4(index) => index,
417 UDPIndex::MainV6(index) => index,
418 UDPIndex::SubV4(_) => return,
419 };
420 if let Some(p) = nat_info.public_udp_ports.get_mut(index) {
421 *p = port;
422 }
423 }
424 Index::Tcp(_) => {
425 nat_info.public_tcp_port = port;
426 }
427 _ => {}
428 }
429 } else {
430 log::debug!("not public addr: {addr:?}")
431 }
432 }
433 pub async fn update_local_addr(&self) {
434 let local_ipv4 = rust_p2p_core::extend::addr::local_ipv4().await;
435 let local_ipv6 = rust_p2p_core::extend::addr::local_ipv6().await;
436 let mut nat_info = self.nat_info.lock();
437 if let Ok(local_ipv4) = local_ipv4 {
438 nat_info.local_ipv4 = local_ipv4;
439 }
440 nat_info.ipv6 = local_ipv6.ok();
441 }
442 pub async fn update_nat_info(&self) -> io::Result<NatInfo> {
443 self.update_local_addr().await;
444 let mut udp_stun_servers = self.udp_stun_servers.clone();
445 udp_stun_servers.shuffle(&mut rand::rng());
446 let udp_stun_servers = if udp_stun_servers.len() > 3 {
447 &udp_stun_servers[..3]
448 } else {
449 &udp_stun_servers
450 };
451 let (nat_type, ips, port_range) = rust_p2p_core::stun::stun_test_nat(
452 udp_stun_servers.to_vec(),
453 self.default_interface.as_ref(),
454 )
455 .await?;
456 self.set_public_info(nat_type, ips, port_range);
457 Ok(self.nat_info())
458 }
459 pub fn nat_info(&self) -> NatInfo {
460 self.nat_info.lock().clone()
461 }
462}
463impl Puncher {
464 async fn new(
465 default_interface: Option<LocalInterface>,
466 tcp_stun_servers: Vec<String>,
467 udp_stun_servers: Vec<String>,
468 puncher: CorePuncher,
469 socket_manager: SocketManager,
470 ) -> io::Result<Self> {
471 let local_tcp_port = if let Some(v) = socket_manager.tcp_socket_manager_as_ref() {
472 v.local_addr().port()
473 } else {
474 0
475 };
476 let local_udp_ports = if let Some(v) = socket_manager.udp_socket_manager_as_ref() {
477 v.local_ports()?
478 } else {
479 vec![]
480 };
481 let punch_context = Arc::new(PunchContext::new(
482 default_interface,
483 tcp_stun_servers,
484 udp_stun_servers,
485 local_udp_ports,
486 local_tcp_port,
487 ));
488 punch_context.update_local_addr().await;
489 Ok(Self {
490 punch_context,
491 puncher,
492 socket_manager,
493 })
494 }
495
496 /// Attempts to punch through NAT to reach a peer.
497 ///
498 /// This method performs UDP hole punching to establish a direct connection
499 /// with a peer behind NAT.
500 ///
501 /// # Arguments
502 ///
503 /// * `punch_info` - Information about the target peer for punching
504 ///
505 /// # Examples
506 ///
507 /// ```rust,no_run
508 /// # use rustp2p_reliable::{Puncher, PunchInfo};
509 /// # async fn example(puncher: Puncher) -> std::io::Result<()> {
510 /// let punch_info = PunchInfo::default();
511 /// puncher.punch(punch_info).await?;
512 /// # Ok(())
513 /// # }
514 /// ```
515 pub async fn punch(&self, punch_info: PunchInfo) -> io::Result<()> {
516 self.punch_conv(0, punch_info).await
517 }
518
519 /// Attempts to punch through NAT with a custom KCP conversation ID.
520 ///
521 /// This allows you to establish multiple independent KCP connections with
522 /// different conversation IDs.
523 ///
524 /// # Arguments
525 ///
526 /// * `kcp_conv` - The KCP conversation ID to use
527 /// * `punch_info` - Information about the target peer
528 ///
529 /// # Examples
530 ///
531 /// ```rust,no_run
532 /// # use rustp2p_reliable::{Puncher, PunchInfo};
533 /// # async fn example(puncher: Puncher) -> std::io::Result<()> {
534 /// let kcp_conv = 12345;
535 /// let punch_info = PunchInfo::default();
536 /// puncher.punch_conv(kcp_conv, punch_info).await?;
537 /// # Ok(())
538 /// # }
539 /// ```
540 pub async fn punch_conv(&self, kcp_conv: u32, punch_info: PunchInfo) -> io::Result<()> {
541 let mut punch_udp_buf = [0; 8];
542 punch_udp_buf[..4].copy_from_slice(&kcp_conv.to_le_bytes());
543 // kcp flag
544 punch_udp_buf[0] = 0x02;
545 if rust_p2p_core::stun::is_stun_response(&punch_udp_buf) {
546 return Err(io::Error::other("kcp_conv error"));
547 }
548 if !self.puncher.need_punch(&punch_info) {
549 return Ok(());
550 }
551 self.puncher
552 .punch_now(None, &punch_udp_buf, punch_info)
553 .await
554 }
555 pub fn nat_info(&self) -> NatInfo {
556 self.punch_context.nat_info()
557 }
558}
559
560/// A reliable tunnel connection (either KCP or TCP).
561///
562/// `ReliableTunnel` provides a unified interface for both KCP and TCP connections,
563/// offering reliable, ordered data transfer.
564///
565/// # Examples
566///
567/// ```rust,no_run
568/// use rustp2p_reliable::ReliableTunnel;
569/// use bytes::BytesMut;
570///
571/// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
572/// // Send data
573/// tunnel.send(BytesMut::from(&b"hello"[..])).await?;
574///
575/// // Receive data
576/// let data = tunnel.next().await?;
577/// # Ok(())
578/// # }
579/// ```
580pub enum ReliableTunnel {
581 Tcp(TcpMessageHub),
582 Kcp(KcpMessageHub),
583}
584/// The type of reliable tunnel (KCP or TCP).
585///
586/// # Examples
587///
588/// ```rust
589/// use rustp2p_reliable::ReliableTunnelType;
590///
591/// let tunnel_type = ReliableTunnelType::Kcp;
592/// assert_eq!(tunnel_type, ReliableTunnelType::Kcp);
593/// ```
594#[derive(Copy, Clone, Eq, PartialEq, Debug)]
595pub enum ReliableTunnelType {
596 Tcp,
597 Kcp,
598}
599
600impl ReliableTunnelListener {
601 fn new(
602 shutdown_manager: ShutdownManager<()>,
603 unified_tunnel_factory: TunnelDispatcher,
604 punch_context: Arc<PunchContext>,
605 ) -> Self {
606 let (kcp_sender, kcp_receiver) = tokio::sync::mpsc::channel(64);
607 Self {
608 shutdown_manager,
609 punch_context,
610 unified_tunnel_factory,
611 kcp_receiver,
612 kcp_sender,
613 }
614 }
615 /// Accepts an incoming reliable tunnel connection.
616 ///
617 /// This method blocks until a connection is available. The connection
618 /// can be either KCP (UDP-based) or TCP.
619 ///
620 /// # Returns
621 ///
622 /// Returns a `ReliableTunnel` that can be used for bidirectional communication.
623 ///
624 /// # Examples
625 ///
626 /// ```rust,no_run
627 /// # use rustp2p_reliable::{from_config, Config};
628 /// # #[tokio::main]
629 /// # async fn main() -> std::io::Result<()> {
630 /// # let (mut listener, puncher) = from_config(Config::default()).await?;
631 /// loop {
632 /// match listener.accept().await {
633 /// Ok(tunnel) => {
634 /// tokio::spawn(async move {
635 /// // Handle the tunnel
636 /// });
637 /// }
638 /// Err(e) => {
639 /// eprintln!("Accept error: {}", e);
640 /// break;
641 /// }
642 /// }
643 /// }
644 /// # Ok(())
645 /// # }
646 /// ```
647 pub async fn accept(&mut self) -> io::Result<ReliableTunnel> {
648 loop {
649 tokio::select! {
650 rs=self.unified_tunnel_factory.dispatch()=>{
651 let unified_tunnel = rs?;
652 match unified_tunnel {
653 Tunnel::Udp(udp) => {
654 handle_udp(self.shutdown_manager.clone(), udp, self.kcp_sender.clone(), self.punch_context.clone())?;
655 }
656 Tunnel::Tcp(tcp) => {
657 let local_addr = tcp.local_addr();
658 let remote_addr = tcp.route_key().addr();
659 let sender = tcp.sender()?;
660 let receiver = handle_tcp(self.shutdown_manager.clone(),tcp).await?;
661 let hub = TcpMessageHub::new(local_addr,remote_addr,sender,receiver);
662 return Ok(ReliableTunnel::Tcp(hub))
663 }
664 }
665 }
666 rs=self.kcp_receiver.recv()=>{
667 return if let Some(hub) = rs{
668 Ok(ReliableTunnel::Kcp(hub))
669 }else{
670 Err(io::Error::from(io::ErrorKind::UnexpectedEof))
671 }
672 }
673 }
674 }
675 }
676}
677
678impl ReliableTunnel {
679 /// Sends data through the reliable tunnel.
680 ///
681 /// The data will be sent using either KCP or TCP, depending on the tunnel type.
682 ///
683 /// # Arguments
684 ///
685 /// * `buf` - The data to send
686 ///
687 /// # Examples
688 ///
689 /// ```rust,no_run
690 /// use rustp2p_reliable::ReliableTunnel;
691 /// use bytes::BytesMut;
692 ///
693 /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
694 /// let data = BytesMut::from(&b"Hello, world!"[..]);
695 /// tunnel.send(data).await?;
696 /// # Ok(())
697 /// # }
698 /// ```
699 pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
700 match &self {
701 ReliableTunnel::Tcp(tcp) => tcp.send(buf).await,
702 ReliableTunnel::Kcp(kcp) => kcp.send(buf).await,
703 }
704 }
705 /// Sends raw data without KCP protocol overhead.
706 ///
707 /// For TCP tunnels, this is the same as `send()`. For KCP tunnels, this sends
708 /// raw UDP data without the KCP protocol wrapper.
709 ///
710 /// # Arguments
711 ///
712 /// * `buf` - The raw data to send
713 ///
714 /// # Examples
715 ///
716 /// ```rust,no_run
717 /// use rustp2p_reliable::ReliableTunnel;
718 /// use bytes::BytesMut;
719 ///
720 /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
721 /// let data = BytesMut::from(&b"raw data"[..]);
722 /// tunnel.send_raw(data).await?;
723 /// # Ok(())
724 /// # }
725 /// ```
726 pub async fn send_raw(&self, buf: BytesMut) -> io::Result<()> {
727 match &self {
728 ReliableTunnel::Tcp(tcp) => tcp.send(buf).await,
729 ReliableTunnel::Kcp(kcp) => kcp.send_raw(buf).await,
730 }
731 }
732 /// Receives the next message from the tunnel.
733 ///
734 /// This method blocks until data is available or an error occurs.
735 ///
736 /// # Returns
737 ///
738 /// Returns the received data as `BytesMut`.
739 ///
740 /// # Examples
741 ///
742 /// ```rust,no_run
743 /// use rustp2p_reliable::ReliableTunnel;
744 ///
745 /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
746 /// loop {
747 /// let data = tunnel.next().await?;
748 /// println!("Received {} bytes", data.len());
749 /// }
750 /// # }
751 /// ```
752 pub async fn next(&self) -> io::Result<BytesMut> {
753 match &self {
754 ReliableTunnel::Tcp(tcp) => tcp.next().await,
755 ReliableTunnel::Kcp(kcp) => kcp.next().await,
756 }
757 }
758 /// Returns the local socket address of this tunnel.
759 ///
760 /// # Examples
761 ///
762 /// ```rust,no_run
763 /// # use rustp2p_reliable::ReliableTunnel;
764 /// # fn example(tunnel: ReliableTunnel) {
765 /// println!("Local address: {}", tunnel.local_addr());
766 /// # }
767 /// ```
768 pub fn local_addr(&self) -> SocketAddr {
769 match &self {
770 ReliableTunnel::Tcp(tcp) => tcp.local_addr,
771 ReliableTunnel::Kcp(kcp) => kcp.local_addr,
772 }
773 }
774 /// Returns the remote socket address of this tunnel.
775 ///
776 /// # Examples
777 ///
778 /// ```rust,no_run
779 /// # use rustp2p_reliable::ReliableTunnel;
780 /// # fn example(tunnel: ReliableTunnel) {
781 /// println!("Remote address: {}", tunnel.remote_addr());
782 /// # }
783 /// ```
784 pub fn remote_addr(&self) -> SocketAddr {
785 match &self {
786 ReliableTunnel::Tcp(tcp) => tcp.remote_addr,
787 ReliableTunnel::Kcp(kcp) => kcp.remote_addr,
788 }
789 }
790 /// Returns the type of this tunnel (KCP or TCP).
791 ///
792 /// # Examples
793 ///
794 /// ```rust,no_run
795 /// # use rustp2p_reliable::{ReliableTunnel, ReliableTunnelType};
796 /// # fn example(tunnel: ReliableTunnel) {
797 /// match tunnel.tunnel_type() {
798 /// ReliableTunnelType::Kcp => println!("Using KCP"),
799 /// ReliableTunnelType::Tcp => println!("Using TCP"),
800 /// }
801 /// # }
802 /// ```
803 pub fn tunnel_type(&self) -> ReliableTunnelType {
804 match &self {
805 ReliableTunnel::Tcp(_tcp) => ReliableTunnelType::Tcp,
806 ReliableTunnel::Kcp(_kcp) => ReliableTunnelType::Kcp,
807 }
808 }
809}
810pub struct TcpMessageHub {
811 local_addr: SocketAddr,
812 remote_addr: SocketAddr,
813 input: WeakTcpTunnelSender,
814 output: Receiver<BytesMut>,
815}
816impl TcpMessageHub {
817 pub(crate) fn new(
818 local_addr: SocketAddr,
819 remote_addr: SocketAddr,
820 input: WeakTcpTunnelSender,
821 output: Receiver<BytesMut>,
822 ) -> Self {
823 Self {
824 local_addr,
825 remote_addr,
826 input,
827 output,
828 }
829 }
830 pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
831 self.input.send(buf.into()).await
832 }
833 pub async fn next(&self) -> io::Result<BytesMut> {
834 self.output
835 .recv_async()
836 .await
837 .map_err(|_| io::Error::from(io::ErrorKind::UnexpectedEof))
838 }
839}
840pub struct KcpMessageHub {
841 local_addr: SocketAddr,
842 remote_addr: SocketAddr,
843 input: Sender<DataType>,
844 output: Receiver<BytesMut>,
845}
846
847impl KcpMessageHub {
848 pub(crate) fn new(
849 local_addr: SocketAddr,
850 remote_addr: SocketAddr,
851 input: Sender<DataType>,
852 output: Receiver<BytesMut>,
853 ) -> Self {
854 Self {
855 local_addr,
856 remote_addr,
857 input,
858 output,
859 }
860 }
861 pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
862 self.input
863 .send(DataType::Kcp(buf))
864 .await
865 .map_err(|_| io::Error::from(io::ErrorKind::WriteZero))
866 }
867 pub async fn send_raw(&self, buf: BytesMut) -> io::Result<()> {
868 self.input
869 .send(DataType::Raw(buf))
870 .await
871 .map_err(|_| io::Error::from(io::ErrorKind::WriteZero))
872 }
873 pub async fn next(&self) -> io::Result<BytesMut> {
874 self.output
875 .recv_async()
876 .await
877 .map_err(|_| io::Error::from(io::ErrorKind::UnexpectedEof))
878 }
879}
880async fn handle_tcp(
881 shutdown_manager: ShutdownManager<()>,
882 mut tcp_tunnel: TcpTunnel,
883) -> io::Result<Receiver<BytesMut>> {
884 let (sender, receiver) = flume::bounded(128);
885 tokio::spawn(async move {
886 let mut buf = [0; 65536];
887 while let Ok(Ok(len)) = shutdown_manager
888 .wrap_cancel(tcp_tunnel.recv(&mut buf))
889 .await
890 {
891 if sender.send_async(buf[..len].into()).await.is_err() {
892 break;
893 }
894 }
895 });
896 Ok(receiver)
897}
898
899fn handle_udp(
900 shutdown_manager: ShutdownManager<()>,
901 mut udp_tunnel: UdpTunnel,
902 sender: Sender<KcpMessageHub>,
903 punch_context: Arc<PunchContext>,
904) -> io::Result<()> {
905 let mut kcp_handle = KcpHandle::new(udp_tunnel.local_addr(), udp_tunnel.sender()?, sender);
906 tokio::spawn(async move {
907 let mut buf = [0; 65536];
908
909 while let Ok(Some(rs)) = shutdown_manager
910 .wrap_cancel(udp_tunnel.recv_from(&mut buf))
911 .await
912 {
913 let (len, route_key) = match rs {
914 Ok(rs) => rs,
915 Err(e) => {
916 log::warn!("udp_tunnel.recv_from {e:?}");
917 continue;
918 }
919 };
920 // check stun data
921 if rust_p2p_core::stun::is_stun_response(&buf[..len]) {
922 if let Some(pub_addr) = rust_p2p_core::stun::recv_stun_response(&buf[..len]) {
923 punch_context.update_public_addr(route_key.index(), pub_addr);
924 continue;
925 }
926 }
927 kcp_handle.handle(&buf[..len], route_key).await;
928 }
929 });
930 Ok(())
931}