Skip to main content

rustp2p_reliable/
lib.rs

1//! # rustp2p-reliable - Reliable Transport over UDP
2//!
3//! `rustp2p-reliable` provides reliable, ordered transport over UDP using the KCP protocol.
4//! It combines the low latency of UDP with the reliability of TCP, making it ideal for
5//! real-time applications that require guaranteed delivery.
6//!
7//! ## Features
8//!
9//! - **KCP Protocol**: Fast and reliable UDP transport
10//! - **TCP Fallback**: Automatic fallback to TCP when needed
11//! - **NAT Traversal**: Built-in hole punching support
12//! - **Automatic Maintenance**: Connection health monitoring and recovery
13//! - **Unified Interface**: Same API for both KCP and TCP tunnels
14//!
15//! ## Quick Start
16//!
17//! ### Creating a Reliable Tunnel Listener
18//!
19//! ```rust,no_run
20//! use rustp2p_reliable::{from_config, Config};
21//!
22//! # #[tokio::main]
23//! # async fn main() -> std::io::Result<()> {
24//! let config = Config::default();
25//! let (mut listener, puncher) = from_config(config).await?;
26//!
27//! // Accept incoming connections
28//! while let Ok(tunnel) = listener.accept().await {
29//!     tokio::spawn(async move {
30//!         // Handle the reliable tunnel
31//!         while let Ok(data) = tunnel.next().await {
32//!             println!("Received: {} bytes", data.len());
33//!         }
34//!     });
35//! }
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! ## Basic Usage
41//!
42//! ### Sending Data
43//!
44//! ```rust,no_run
45//! use rustp2p_reliable::ReliableTunnel;
46//! use bytes::BytesMut;
47//!
48//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
49//! let data = BytesMut::from(&b"Hello, world!"[..]);
50//! tunnel.send(data).await?;
51//! # Ok(())
52//! # }
53//! ```
54//!
55//! ### Receiving Data
56//!
57//! ```rust,no_run
58//! use rustp2p_reliable::ReliableTunnel;
59//!
60//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
61//! loop {
62//!     match tunnel.next().await {
63//!         Ok(data) => {
64//!             println!("Received {} bytes", data.len());
65//!         }
66//!         Err(e) => {
67//!             eprintln!("Error: {}", e);
68//!             break;
69//!         }
70//!     }
71//! }
72//! # Ok(())
73//! # }
74//! ```
75//!
76//! ## NAT Traversal with Puncher
77//!
78//! Use the `Puncher` to establish connections through NATs:
79//!
80//! ```rust,no_run
81//! use rustp2p_reliable::{Puncher, PunchInfo};
82//!
83//! # async fn example(puncher: Puncher) -> std::io::Result<()> {
84//! // Get your NAT information
85//! let nat_info = puncher.nat_info();
86//! println!("NAT Type: {:?}", nat_info.nat_type);
87//! println!("Public IPs: {:?}", nat_info.public_ips);
88//!
89//! // Punch through NAT to reach a peer
90//! let punch_info = PunchInfo::default();
91//! puncher.punch(punch_info).await?;
92//! # Ok(())
93//! # }
94//! ```
95//!
96//! ## KCP vs TCP
97//!
98//! The library automatically selects between KCP and TCP based on network conditions:
99//!
100//! - **KCP**: Used for direct UDP connections (low latency, good for real-time data)
101//! - **TCP**: Used when UDP is blocked or unreliable (guaranteed delivery)
102//!
103//! You can check the tunnel type:
104//!
105//! ```rust,no_run
106//! use rustp2p_reliable::{ReliableTunnel, ReliableTunnelType};
107//!
108//! # fn example(tunnel: ReliableTunnel) {
109//! match tunnel.tunnel_type() {
110//!     ReliableTunnelType::Kcp => println!("Using KCP (fast UDP)"),
111//!     ReliableTunnelType::Tcp => println!("Using TCP (reliable)"),
112//! }
113//! # }
114//! ```
115//!
116//! ## Configuration
117//!
118//! Configure the reliable transport layer:
119//!
120//! ```rust,no_run
121//! use rustp2p_reliable::Config;
122//! use rust_p2p_core::tunnel::TunnelConfig;
123//!
124//! # fn example() {
125//! let config = Config {
126//!     tunnel_config: TunnelConfig::default(),
127//!     tcp_stun_servers: vec![
128//!         "stun.l.google.com:19302".to_string(),
129//!     ],
130//!     udp_stun_servers: vec![
131//!         "stun1.l.google.com:19302".to_string(),
132//!     ],
133//! };
134//! # }
135//! ```
136//!
137//! ## Advanced Features
138//!
139//! ### Custom KCP Conversation IDs
140//!
141//! For applications that need multiple independent KCP streams:
142//!
143//! ```rust,no_run
144//! use rustp2p_reliable::Puncher;
145//!
146//! # async fn example(puncher: Puncher) -> std::io::Result<()> {
147//! let kcp_conv = 12345;
148//! let punch_info = Default::default();
149//! puncher.punch_conv(kcp_conv, punch_info).await?;
150//! # Ok(())
151//! # }
152//! ```
153//!
154//! ### Raw Data Sending
155//!
156//! Send raw UDP data without KCP protocol overhead:
157//!
158//! ```rust,no_run
159//! use rustp2p_reliable::ReliableTunnel;
160//! use bytes::BytesMut;
161//!
162//! # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
163//! let data = BytesMut::from(&b"raw data"[..]);
164//! tunnel.send_raw(data).await?;
165//! # Ok(())
166//! # }
167//! ```
168//!
169//! ## Connection Information
170//!
171//! Get information about the connection:
172//!
173//! ```rust,no_run
174//! use rustp2p_reliable::ReliableTunnel;
175//!
176//! # fn example(tunnel: ReliableTunnel) {
177//! println!("Local address: {}", tunnel.local_addr());
178//! println!("Remote address: {}", tunnel.remote_addr());
179//! println!("Tunnel type: {:?}", tunnel.tunnel_type());
180//! # }
181//! ```
182//!
183//! ## Thread Safety
184//!
185//! All types are designed to work with Tokio's async runtime and can be safely
186//! shared across tasks using `Arc` when needed.
187//!
188//! ## See Also
189//!
190//! - [`rustp2p`](../rustp2p/index.html) - High-level P2P library
191//! - [`rustp2p-core`](../rust_p2p_core/index.html) - Core NAT traversal functionality
192
193pub use crate::config::Config;
194use crate::kcp::{DataType, KcpHandle};
195use crate::maintain::start_task;
196use async_shutdown::ShutdownManager;
197use bytes::BytesMut;
198use flume::Receiver;
199use parking_lot::Mutex;
200use rand::seq::SliceRandom;
201pub use rust_p2p_core::nat::NatInfo;
202use rust_p2p_core::nat::NatType;
203pub use rust_p2p_core::punch::config::*;
204use rust_p2p_core::punch::Puncher as CorePuncher;
205use rust_p2p_core::route::Index;
206use rust_p2p_core::socket::LocalInterface;
207pub use rust_p2p_core::tunnel::config::*;
208pub use rust_p2p_core::tunnel::tcp::{
209    BytesCodec, BytesInitCodec, Decoder, Encoder, InitCodec, LengthPrefixedCodec,
210    LengthPrefixedInitCodec,
211};
212use rust_p2p_core::tunnel::tcp::{TcpTunnel, WeakTcpTunnelSender};
213use rust_p2p_core::tunnel::udp::{UDPIndex, UdpTunnel};
214use rust_p2p_core::tunnel::{SocketManager, Tunnel, TunnelDispatcher};
215use std::io;
216use std::net::{Ipv4Addr, SocketAddr};
217use std::sync::Arc;
218use tokio::sync::mpsc::Sender;
219
220mod config;
221mod kcp;
222mod maintain;
223
224/// Creates a reliable tunnel system from configuration.
225///
226/// This is the main entry point for creating a reliable transport layer. It returns
227/// a listener for accepting incoming connections and a puncher for NAT traversal.
228///
229/// # Arguments
230///
231/// * `config` - Configuration for tunnels and STUN servers
232///
233/// # Returns
234///
235/// Returns a tuple containing:
236/// - `ReliableTunnelListener`: For accepting incoming connections
237/// - `Puncher`: For NAT traversal and connection establishment
238///
239/// # Examples
240///
241/// ```rust,no_run
242/// use rustp2p_reliable::{from_config, Config};
243///
244/// # #[tokio::main]
245/// # async fn main() -> std::io::Result<()> {
246/// let config = Config::default();
247/// let (listener, puncher) = from_config(config).await?;
248/// # Ok(())
249/// # }
250/// ```
251pub async fn from_config(config: Config) -> io::Result<(ReliableTunnelListener, Puncher)> {
252    let tunnel_config = config.tunnel_config;
253    let tcp_stun_servers = config.tcp_stun_servers;
254    let udp_stun_servers = config.udp_stun_servers;
255    let default_interface = tunnel_config
256        .udp_tunnel_config
257        .as_ref()
258        .map(|v| v.default_interface.clone())
259        .unwrap_or_default();
260
261    let (unified_tunnel_factory, puncher) =
262        rust_p2p_core::tunnel::new_tunnel_component(tunnel_config)?;
263    let manager = unified_tunnel_factory.socket_manager();
264    let shutdown_manager = ShutdownManager::<()>::new();
265    let puncher = Puncher::new(
266        default_interface,
267        tcp_stun_servers,
268        udp_stun_servers,
269        puncher,
270        manager,
271    )
272    .await?;
273    let listener = ReliableTunnelListener::new(
274        shutdown_manager.clone(),
275        unified_tunnel_factory,
276        puncher.punch_context.clone(),
277    );
278    start_task(shutdown_manager, puncher.clone());
279    Ok((listener, puncher))
280}
281/// Listener for accepting reliable tunnel connections.
282///
283/// The listener accepts both KCP (UDP-based) and TCP connections,
284/// providing a unified interface for handling incoming connections.
285///
286/// # Examples
287///
288/// ```rust,no_run
289/// # use rustp2p_reliable::{from_config, Config, ReliableTunnelListener};
290/// # #[tokio::main]
291/// # async fn main() -> std::io::Result<()> {
292/// # let (mut listener, puncher) = from_config(Config::default()).await?;
293/// while let Ok(tunnel) = listener.accept().await {
294///     tokio::spawn(async move {
295///         // Handle the connection
296///     });
297/// }
298/// # Ok(())
299/// # }
300/// ```
301pub struct ReliableTunnelListener {
302    shutdown_manager: ShutdownManager<()>,
303    punch_context: Arc<PunchContext>,
304    unified_tunnel_factory: TunnelDispatcher,
305    kcp_receiver: tokio::sync::mpsc::Receiver<KcpMessageHub>,
306    kcp_sender: Sender<KcpMessageHub>,
307}
308/// NAT traversal puncher for establishing direct connections.
309///
310/// The `Puncher` handles NAT type detection and hole punching to establish
311/// direct peer-to-peer connections.
312///
313/// # Examples
314///
315/// ```rust,no_run
316/// # use rustp2p_reliable::Puncher;
317/// # async fn example(puncher: Puncher) -> std::io::Result<()> {
318/// // Get NAT information
319/// let nat_info = puncher.nat_info();
320/// println!("NAT Type: {:?}", nat_info.nat_type);
321/// # Ok(())
322/// # }
323/// ```
324#[derive(Clone)]
325pub struct Puncher {
326    punch_context: Arc<PunchContext>,
327    puncher: CorePuncher,
328    socket_manager: SocketManager,
329}
330impl Drop for ReliableTunnelListener {
331    fn drop(&mut self) {
332        _ = self.shutdown_manager.trigger_shutdown(());
333    }
334}
335pub(crate) struct PunchContext {
336    default_interface: Option<LocalInterface>,
337    tcp_stun_servers: Vec<String>,
338    udp_stun_servers: Vec<String>,
339    nat_info: Arc<Mutex<NatInfo>>,
340}
341impl PunchContext {
342    pub fn new(
343        default_interface: Option<LocalInterface>,
344        tcp_stun_servers: Vec<String>,
345        udp_stun_servers: Vec<String>,
346        local_udp_ports: Vec<u16>,
347        local_tcp_port: u16,
348    ) -> Self {
349        let public_udp_ports = vec![0; local_udp_ports.len()];
350        let nat_info = NatInfo {
351            nat_type: Default::default(),
352            public_ips: vec![],
353            public_udp_ports,
354            mapping_tcp_addr: vec![],
355            mapping_udp_addr: vec![],
356            public_port_range: 0,
357            local_ipv4: Ipv4Addr::UNSPECIFIED,
358            local_ipv4s: vec![],
359            ipv6: None,
360            local_udp_ports,
361            local_tcp_port,
362            public_tcp_port: 0,
363        };
364        Self {
365            default_interface,
366            tcp_stun_servers,
367            udp_stun_servers,
368            nat_info: Arc::new(Mutex::new(nat_info)),
369        }
370    }
371    pub fn set_public_info(
372        &self,
373        nat_type: NatType,
374        mut ips: Vec<Ipv4Addr>,
375        public_port_range: u16,
376    ) {
377        ips.retain(rust_p2p_core::extend::addr::is_ipv4_global);
378        let mut guard = self.nat_info.lock();
379        guard.public_ips = ips;
380        guard.nat_type = nat_type;
381        guard.public_port_range = public_port_range;
382    }
383    fn mapping_addr(addr: SocketAddr) -> Option<(Ipv4Addr, u16)> {
384        match addr {
385            SocketAddr::V4(addr) => Some((*addr.ip(), addr.port())),
386            SocketAddr::V6(addr) => addr.ip().to_ipv4_mapped().map(|ip| (ip, addr.port())),
387        }
388    }
389    pub fn update_tcp_public_addr(&self, addr: SocketAddr) {
390        let (ip, port) = if let Some(r) = Self::mapping_addr(addr) {
391            r
392        } else {
393            return;
394        };
395        let mut nat_info = self.nat_info.lock();
396        if rust_p2p_core::extend::addr::is_ipv4_global(&ip) && !nat_info.public_ips.contains(&ip) {
397            nat_info.public_ips.push(ip);
398        }
399        nat_info.public_tcp_port = port;
400    }
401    pub fn update_public_addr(&self, index: Index, addr: SocketAddr) {
402        let (ip, port) = if let Some(r) = Self::mapping_addr(addr) {
403            r
404        } else {
405            return;
406        };
407        let mut nat_info = self.nat_info.lock();
408
409        if rust_p2p_core::extend::addr::is_ipv4_global(&ip) {
410            if !nat_info.public_ips.contains(&ip) {
411                nat_info.public_ips.push(ip);
412            }
413            match index {
414                Index::Udp(index) => {
415                    let index = match index {
416                        UDPIndex::MainV4(index) => index,
417                        UDPIndex::MainV6(index) => index,
418                        UDPIndex::SubV4(_) => return,
419                    };
420                    if let Some(p) = nat_info.public_udp_ports.get_mut(index) {
421                        *p = port;
422                    }
423                }
424                Index::Tcp(_) => {
425                    nat_info.public_tcp_port = port;
426                }
427                _ => {}
428            }
429        } else {
430            log::debug!("not public addr: {addr:?}")
431        }
432    }
433    pub async fn update_local_addr(&self) {
434        let local_ipv4 = rust_p2p_core::extend::addr::local_ipv4().await;
435        let local_ipv6 = rust_p2p_core::extend::addr::local_ipv6().await;
436        let mut nat_info = self.nat_info.lock();
437        if let Ok(local_ipv4) = local_ipv4 {
438            nat_info.local_ipv4 = local_ipv4;
439        }
440        nat_info.ipv6 = local_ipv6.ok();
441    }
442    pub async fn update_nat_info(&self) -> io::Result<NatInfo> {
443        self.update_local_addr().await;
444        let mut udp_stun_servers = self.udp_stun_servers.clone();
445        udp_stun_servers.shuffle(&mut rand::rng());
446        let udp_stun_servers = if udp_stun_servers.len() > 3 {
447            &udp_stun_servers[..3]
448        } else {
449            &udp_stun_servers
450        };
451        let (nat_type, ips, port_range) = rust_p2p_core::stun::stun_test_nat(
452            udp_stun_servers.to_vec(),
453            self.default_interface.as_ref(),
454        )
455        .await?;
456        self.set_public_info(nat_type, ips, port_range);
457        Ok(self.nat_info())
458    }
459    pub fn nat_info(&self) -> NatInfo {
460        self.nat_info.lock().clone()
461    }
462}
463impl Puncher {
464    async fn new(
465        default_interface: Option<LocalInterface>,
466        tcp_stun_servers: Vec<String>,
467        udp_stun_servers: Vec<String>,
468        puncher: CorePuncher,
469        socket_manager: SocketManager,
470    ) -> io::Result<Self> {
471        let local_tcp_port = if let Some(v) = socket_manager.tcp_socket_manager_as_ref() {
472            v.local_addr().port()
473        } else {
474            0
475        };
476        let local_udp_ports = if let Some(v) = socket_manager.udp_socket_manager_as_ref() {
477            v.local_ports()?
478        } else {
479            vec![]
480        };
481        let punch_context = Arc::new(PunchContext::new(
482            default_interface,
483            tcp_stun_servers,
484            udp_stun_servers,
485            local_udp_ports,
486            local_tcp_port,
487        ));
488        punch_context.update_local_addr().await;
489        Ok(Self {
490            punch_context,
491            puncher,
492            socket_manager,
493        })
494    }
495
496    /// Attempts to punch through NAT to reach a peer.
497    ///
498    /// This method performs UDP hole punching to establish a direct connection
499    /// with a peer behind NAT.
500    ///
501    /// # Arguments
502    ///
503    /// * `punch_info` - Information about the target peer for punching
504    ///
505    /// # Examples
506    ///
507    /// ```rust,no_run
508    /// # use rustp2p_reliable::{Puncher, PunchInfo};
509    /// # async fn example(puncher: Puncher) -> std::io::Result<()> {
510    /// let punch_info = PunchInfo::default();
511    /// puncher.punch(punch_info).await?;
512    /// # Ok(())
513    /// # }
514    /// ```
515    pub async fn punch(&self, punch_info: PunchInfo) -> io::Result<()> {
516        self.punch_conv(0, punch_info).await
517    }
518
519    /// Attempts to punch through NAT with a custom KCP conversation ID.
520    ///
521    /// This allows you to establish multiple independent KCP connections with
522    /// different conversation IDs.
523    ///
524    /// # Arguments
525    ///
526    /// * `kcp_conv` - The KCP conversation ID to use
527    /// * `punch_info` - Information about the target peer
528    ///
529    /// # Examples
530    ///
531    /// ```rust,no_run
532    /// # use rustp2p_reliable::{Puncher, PunchInfo};
533    /// # async fn example(puncher: Puncher) -> std::io::Result<()> {
534    /// let kcp_conv = 12345;
535    /// let punch_info = PunchInfo::default();
536    /// puncher.punch_conv(kcp_conv, punch_info).await?;
537    /// # Ok(())
538    /// # }
539    /// ```
540    pub async fn punch_conv(&self, kcp_conv: u32, punch_info: PunchInfo) -> io::Result<()> {
541        let mut punch_udp_buf = [0; 8];
542        punch_udp_buf[..4].copy_from_slice(&kcp_conv.to_le_bytes());
543        // kcp flag
544        punch_udp_buf[0] = 0x02;
545        if rust_p2p_core::stun::is_stun_response(&punch_udp_buf) {
546            return Err(io::Error::other("kcp_conv error"));
547        }
548        if !self.puncher.need_punch(&punch_info) {
549            return Ok(());
550        }
551        self.puncher
552            .punch_now(None, &punch_udp_buf, punch_info)
553            .await
554    }
555    pub fn nat_info(&self) -> NatInfo {
556        self.punch_context.nat_info()
557    }
558}
559
560/// A reliable tunnel connection (either KCP or TCP).
561///
562/// `ReliableTunnel` provides a unified interface for both KCP and TCP connections,
563/// offering reliable, ordered data transfer.
564///
565/// # Examples
566///
567/// ```rust,no_run
568/// use rustp2p_reliable::ReliableTunnel;
569/// use bytes::BytesMut;
570///
571/// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
572/// // Send data
573/// tunnel.send(BytesMut::from(&b"hello"[..])).await?;
574///
575/// // Receive data
576/// let data = tunnel.next().await?;
577/// # Ok(())
578/// # }
579/// ```
580pub enum ReliableTunnel {
581    Tcp(TcpMessageHub),
582    Kcp(KcpMessageHub),
583}
584/// The type of reliable tunnel (KCP or TCP).
585///
586/// # Examples
587///
588/// ```rust
589/// use rustp2p_reliable::ReliableTunnelType;
590///
591/// let tunnel_type = ReliableTunnelType::Kcp;
592/// assert_eq!(tunnel_type, ReliableTunnelType::Kcp);
593/// ```
594#[derive(Copy, Clone, Eq, PartialEq, Debug)]
595pub enum ReliableTunnelType {
596    Tcp,
597    Kcp,
598}
599
600impl ReliableTunnelListener {
601    fn new(
602        shutdown_manager: ShutdownManager<()>,
603        unified_tunnel_factory: TunnelDispatcher,
604        punch_context: Arc<PunchContext>,
605    ) -> Self {
606        let (kcp_sender, kcp_receiver) = tokio::sync::mpsc::channel(64);
607        Self {
608            shutdown_manager,
609            punch_context,
610            unified_tunnel_factory,
611            kcp_receiver,
612            kcp_sender,
613        }
614    }
615    /// Accepts an incoming reliable tunnel connection.
616    ///
617    /// This method blocks until a connection is available. The connection
618    /// can be either KCP (UDP-based) or TCP.
619    ///
620    /// # Returns
621    ///
622    /// Returns a `ReliableTunnel` that can be used for bidirectional communication.
623    ///
624    /// # Examples
625    ///
626    /// ```rust,no_run
627    /// # use rustp2p_reliable::{from_config, Config};
628    /// # #[tokio::main]
629    /// # async fn main() -> std::io::Result<()> {
630    /// # let (mut listener, puncher) = from_config(Config::default()).await?;
631    /// loop {
632    ///     match listener.accept().await {
633    ///         Ok(tunnel) => {
634    ///             tokio::spawn(async move {
635    ///                 // Handle the tunnel
636    ///             });
637    ///         }
638    ///         Err(e) => {
639    ///             eprintln!("Accept error: {}", e);
640    ///             break;
641    ///         }
642    ///     }
643    /// }
644    /// # Ok(())
645    /// # }
646    /// ```
647    pub async fn accept(&mut self) -> io::Result<ReliableTunnel> {
648        loop {
649            tokio::select! {
650                rs=self.unified_tunnel_factory.dispatch()=>{
651                    let unified_tunnel = rs?;
652                    match unified_tunnel {
653                        Tunnel::Udp(udp) => {
654                            handle_udp(self.shutdown_manager.clone(), udp, self.kcp_sender.clone(), self.punch_context.clone())?;
655                        }
656                        Tunnel::Tcp(tcp) => {
657                            let local_addr = tcp.local_addr();
658                            let remote_addr = tcp.route_key().addr();
659                            let sender = tcp.sender()?;
660                            let receiver = handle_tcp(self.shutdown_manager.clone(),tcp).await?;
661                            let hub = TcpMessageHub::new(local_addr,remote_addr,sender,receiver);
662                            return Ok(ReliableTunnel::Tcp(hub))
663                        }
664                    }
665                }
666                rs=self.kcp_receiver.recv()=>{
667                    return if let Some(hub) = rs{
668                        Ok(ReliableTunnel::Kcp(hub))
669                    }else{
670                        Err(io::Error::from(io::ErrorKind::UnexpectedEof))
671                    }
672                }
673            }
674        }
675    }
676}
677
678impl ReliableTunnel {
679    /// Sends data through the reliable tunnel.
680    ///
681    /// The data will be sent using either KCP or TCP, depending on the tunnel type.
682    ///
683    /// # Arguments
684    ///
685    /// * `buf` - The data to send
686    ///
687    /// # Examples
688    ///
689    /// ```rust,no_run
690    /// use rustp2p_reliable::ReliableTunnel;
691    /// use bytes::BytesMut;
692    ///
693    /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
694    /// let data = BytesMut::from(&b"Hello, world!"[..]);
695    /// tunnel.send(data).await?;
696    /// # Ok(())
697    /// # }
698    /// ```
699    pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
700        match &self {
701            ReliableTunnel::Tcp(tcp) => tcp.send(buf).await,
702            ReliableTunnel::Kcp(kcp) => kcp.send(buf).await,
703        }
704    }
705    /// Sends raw data without KCP protocol overhead.
706    ///
707    /// For TCP tunnels, this is the same as `send()`. For KCP tunnels, this sends
708    /// raw UDP data without the KCP protocol wrapper.
709    ///
710    /// # Arguments
711    ///
712    /// * `buf` - The raw data to send
713    ///
714    /// # Examples
715    ///
716    /// ```rust,no_run
717    /// use rustp2p_reliable::ReliableTunnel;
718    /// use bytes::BytesMut;
719    ///
720    /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
721    /// let data = BytesMut::from(&b"raw data"[..]);
722    /// tunnel.send_raw(data).await?;
723    /// # Ok(())
724    /// # }
725    /// ```
726    pub async fn send_raw(&self, buf: BytesMut) -> io::Result<()> {
727        match &self {
728            ReliableTunnel::Tcp(tcp) => tcp.send(buf).await,
729            ReliableTunnel::Kcp(kcp) => kcp.send_raw(buf).await,
730        }
731    }
732    /// Receives the next message from the tunnel.
733    ///
734    /// This method blocks until data is available or an error occurs.
735    ///
736    /// # Returns
737    ///
738    /// Returns the received data as `BytesMut`.
739    ///
740    /// # Examples
741    ///
742    /// ```rust,no_run
743    /// use rustp2p_reliable::ReliableTunnel;
744    ///
745    /// # async fn example(tunnel: ReliableTunnel) -> std::io::Result<()> {
746    /// loop {
747    ///     let data = tunnel.next().await?;
748    ///     println!("Received {} bytes", data.len());
749    /// }
750    /// # }
751    /// ```
752    pub async fn next(&self) -> io::Result<BytesMut> {
753        match &self {
754            ReliableTunnel::Tcp(tcp) => tcp.next().await,
755            ReliableTunnel::Kcp(kcp) => kcp.next().await,
756        }
757    }
758    /// Returns the local socket address of this tunnel.
759    ///
760    /// # Examples
761    ///
762    /// ```rust,no_run
763    /// # use rustp2p_reliable::ReliableTunnel;
764    /// # fn example(tunnel: ReliableTunnel) {
765    /// println!("Local address: {}", tunnel.local_addr());
766    /// # }
767    /// ```
768    pub fn local_addr(&self) -> SocketAddr {
769        match &self {
770            ReliableTunnel::Tcp(tcp) => tcp.local_addr,
771            ReliableTunnel::Kcp(kcp) => kcp.local_addr,
772        }
773    }
774    /// Returns the remote socket address of this tunnel.
775    ///
776    /// # Examples
777    ///
778    /// ```rust,no_run
779    /// # use rustp2p_reliable::ReliableTunnel;
780    /// # fn example(tunnel: ReliableTunnel) {
781    /// println!("Remote address: {}", tunnel.remote_addr());
782    /// # }
783    /// ```
784    pub fn remote_addr(&self) -> SocketAddr {
785        match &self {
786            ReliableTunnel::Tcp(tcp) => tcp.remote_addr,
787            ReliableTunnel::Kcp(kcp) => kcp.remote_addr,
788        }
789    }
790    /// Returns the type of this tunnel (KCP or TCP).
791    ///
792    /// # Examples
793    ///
794    /// ```rust,no_run
795    /// # use rustp2p_reliable::{ReliableTunnel, ReliableTunnelType};
796    /// # fn example(tunnel: ReliableTunnel) {
797    /// match tunnel.tunnel_type() {
798    ///     ReliableTunnelType::Kcp => println!("Using KCP"),
799    ///     ReliableTunnelType::Tcp => println!("Using TCP"),
800    /// }
801    /// # }
802    /// ```
803    pub fn tunnel_type(&self) -> ReliableTunnelType {
804        match &self {
805            ReliableTunnel::Tcp(_tcp) => ReliableTunnelType::Tcp,
806            ReliableTunnel::Kcp(_kcp) => ReliableTunnelType::Kcp,
807        }
808    }
809}
810pub struct TcpMessageHub {
811    local_addr: SocketAddr,
812    remote_addr: SocketAddr,
813    input: WeakTcpTunnelSender,
814    output: Receiver<BytesMut>,
815}
816impl TcpMessageHub {
817    pub(crate) fn new(
818        local_addr: SocketAddr,
819        remote_addr: SocketAddr,
820        input: WeakTcpTunnelSender,
821        output: Receiver<BytesMut>,
822    ) -> Self {
823        Self {
824            local_addr,
825            remote_addr,
826            input,
827            output,
828        }
829    }
830    pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
831        self.input.send(buf.into()).await
832    }
833    pub async fn next(&self) -> io::Result<BytesMut> {
834        self.output
835            .recv_async()
836            .await
837            .map_err(|_| io::Error::from(io::ErrorKind::UnexpectedEof))
838    }
839}
840pub struct KcpMessageHub {
841    local_addr: SocketAddr,
842    remote_addr: SocketAddr,
843    input: Sender<DataType>,
844    output: Receiver<BytesMut>,
845}
846
847impl KcpMessageHub {
848    pub(crate) fn new(
849        local_addr: SocketAddr,
850        remote_addr: SocketAddr,
851        input: Sender<DataType>,
852        output: Receiver<BytesMut>,
853    ) -> Self {
854        Self {
855            local_addr,
856            remote_addr,
857            input,
858            output,
859        }
860    }
861    pub async fn send(&self, buf: BytesMut) -> io::Result<()> {
862        self.input
863            .send(DataType::Kcp(buf))
864            .await
865            .map_err(|_| io::Error::from(io::ErrorKind::WriteZero))
866    }
867    pub async fn send_raw(&self, buf: BytesMut) -> io::Result<()> {
868        self.input
869            .send(DataType::Raw(buf))
870            .await
871            .map_err(|_| io::Error::from(io::ErrorKind::WriteZero))
872    }
873    pub async fn next(&self) -> io::Result<BytesMut> {
874        self.output
875            .recv_async()
876            .await
877            .map_err(|_| io::Error::from(io::ErrorKind::UnexpectedEof))
878    }
879}
880async fn handle_tcp(
881    shutdown_manager: ShutdownManager<()>,
882    mut tcp_tunnel: TcpTunnel,
883) -> io::Result<Receiver<BytesMut>> {
884    let (sender, receiver) = flume::bounded(128);
885    tokio::spawn(async move {
886        let mut buf = [0; 65536];
887        while let Ok(Ok(len)) = shutdown_manager
888            .wrap_cancel(tcp_tunnel.recv(&mut buf))
889            .await
890        {
891            if sender.send_async(buf[..len].into()).await.is_err() {
892                break;
893            }
894        }
895    });
896    Ok(receiver)
897}
898
899fn handle_udp(
900    shutdown_manager: ShutdownManager<()>,
901    mut udp_tunnel: UdpTunnel,
902    sender: Sender<KcpMessageHub>,
903    punch_context: Arc<PunchContext>,
904) -> io::Result<()> {
905    let mut kcp_handle = KcpHandle::new(udp_tunnel.local_addr(), udp_tunnel.sender()?, sender);
906    tokio::spawn(async move {
907        let mut buf = [0; 65536];
908
909        while let Ok(Some(rs)) = shutdown_manager
910            .wrap_cancel(udp_tunnel.recv_from(&mut buf))
911            .await
912        {
913            let (len, route_key) = match rs {
914                Ok(rs) => rs,
915                Err(e) => {
916                    log::warn!("udp_tunnel.recv_from {e:?}");
917                    continue;
918                }
919            };
920            // check stun data
921            if rust_p2p_core::stun::is_stun_response(&buf[..len]) {
922                if let Some(pub_addr) = rust_p2p_core::stun::recv_stun_response(&buf[..len]) {
923                    punch_context.update_public_addr(route_key.index(), pub_addr);
924                    continue;
925                }
926            }
927            kcp_handle.handle(&buf[..len], route_key).await;
928        }
929    });
930    Ok(())
931}