rustp2p 0.3.0

A decentralized p2p library powered by Rust, which is devoted to simple use.
Documentation
mod protocol;

pub mod cipher;
mod config;
mod extend;
mod reliable;
mod tunnel;
pub use protocol::{
    node_id::{self, GroupCode, NodeID},
    NetPacket, HEAD_LEN,
};
pub use tunnel::{NodeAddress, PeerNodeAddress, RecvUserData};

#[cfg(feature = "use-kcp")]
pub use reliable::*;

pub use crate::config::{DataInterceptor, DefaultInterceptor};
use crate::protocol::protocol_type::ProtocolType;
pub use crate::tunnel::{RecvMetadata, RecvResult};
use async_trait::async_trait;
use cipher::Algorithm;
pub use config::{
    Config, NatType, PunchModel, PunchPolicy, PunchPolicySet, TcpTunnelConfig, UdpTunnelConfig,
};
use flume::{Receiver, Sender, TryRecvError};
pub use rust_p2p_core::route::RouteKey;
pub use rust_p2p_core::socket::LocalInterface;
use std::io;
use std::ops::Deref;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tunnel::{Tunnel, TunnelDispatcher, TunnelRouter};

pub struct EndPoint {
    #[cfg(feature = "use-kcp")]
    kcp_context: KcpContext,
    input: Receiver<(RecvUserData, RecvMetadata)>,
    output: TunnelRouter,
    _handle: OwnedJoinHandle,
}

impl EndPoint {
    /// Recv remote data from this endpoint
    pub async fn recv_from(&self) -> io::Result<(RecvUserData, RecvMetadata)> {
        self.input
            .recv_async()
            .await
            .map_err(|_| io::Error::new(io::ErrorKind::UnexpectedEof, "shutdown"))
    }
    /// Send data to remote endpoint
    pub async fn send_to<D: Into<NodeID>>(&self, buf: &[u8], dest: D) -> io::Result<()> {
        self.output.send_to(buf, dest).await
    }
    pub fn try_recv_from(&self) -> io::Result<(RecvUserData, RecvMetadata)> {
        match self.input.try_recv() {
            Ok(rs) => Ok(rs),
            Err(e) => match e {
                TryRecvError::Empty => Err(io::Error::from(io::ErrorKind::WouldBlock)),
                TryRecvError::Disconnected => {
                    Err(io::Error::new(io::ErrorKind::UnexpectedEof, "shutdown"))
                }
            },
        }
    }
    pub fn try_send_to<D: Into<NodeID>>(&self, buf: &[u8], dest: D) -> io::Result<()> {
        self.output.try_send_to(buf, dest)
    }
    pub async fn send_to_route<D: Into<NodeID>>(
        &self,
        buf: &[u8],
        dest: D,
        route_key: &RouteKey,
    ) -> io::Result<()> {
        let mut send_packet = self.output.allocate_send_packet();
        send_packet.set_payload(buf);
        self.output
            .send_packet_to_route(send_packet, &dest.into(), Some(route_key))
            .await
    }
    pub async fn try_send_to_route<D: Into<NodeID>>(
        &self,
        buf: &[u8],
        dest: D,
        route_key: &RouteKey,
    ) -> io::Result<()> {
        let mut send_packet = self.output.allocate_send_packet();
        send_packet.set_payload(buf);
        self.output
            .try_send_packet_to_route(send_packet, &dest.into(), Some(route_key))
    }

    pub async fn broadcast(&self, buf: &[u8]) -> io::Result<()> {
        let mut send_packet = self.output.allocate_send_packet();
        send_packet.set_payload(buf);
        self.output.broadcast_packet(send_packet).await
    }
}

impl Deref for EndPoint {
    type Target = TunnelRouter;

    fn deref(&self) -> &Self::Target {
        &self.output
    }
}
struct OwnedJoinHandle {
    handle: JoinHandle<()>,
}
impl Drop for OwnedJoinHandle {
    fn drop(&mut self) {
        self.handle.abort();
    }
}

pub struct Builder {
    udp_port: Option<u16>,
    tcp_port: Option<u16>,
    peers: Option<Vec<PeerNodeAddress>>,
    group_code: Option<GroupCode>,
    encryption: Option<Algorithm>,
    node_id: Option<NodeID>,
    interceptor: Option<Interceptor>,
    interface: Option<LocalInterface>,
    config: Option<Config>,
}
impl Builder {
    pub fn new() -> Self {
        Self {
            udp_port: None,
            tcp_port: None,
            peers: None,
            group_code: None,
            encryption: None,
            node_id: None,
            interceptor: None,
            interface: None,
            config: None,
        }
    }
    pub fn from_config(config: Config) -> Self {
        Self {
            udp_port: None,
            tcp_port: None,
            peers: None,
            group_code: None,
            encryption: None,
            node_id: None,
            interceptor: None,
            interface: None,
            config: Some(config),
        }
    }
    pub fn udp_port(mut self, port: u16) -> Self {
        self.udp_port = Some(port);
        self
    }
    pub fn tcp_port(mut self, port: u16) -> Self {
        self.tcp_port = Some(port);
        self
    }
    pub fn peers(mut self, peers: Vec<PeerNodeAddress>) -> Self {
        self.peers = Some(peers);
        self
    }
    pub fn group_code(mut self, group_code: GroupCode) -> Self {
        self.group_code = Some(group_code);
        self
    }
    pub fn encryption(mut self, encryption: Algorithm) -> Self {
        self.encryption = Some(encryption);
        self
    }
    pub fn node_id(mut self, node_id: NodeID) -> Self {
        self.node_id = Some(node_id);
        self
    }
    pub fn interceptor<I: DataInterceptor + 'static>(mut self, interceptor: I) -> Self {
        self.interceptor = Some(Interceptor {
            inner: Arc::new(interceptor),
        });
        self
    }
    pub fn bind_interface(mut self, interface: LocalInterface) -> Self {
        self.interface = Some(interface);
        self
    }
    pub async fn build(self) -> io::Result<EndPoint> {
        let mut config = if let Some(config) = self.config {
            config
        } else {
            Config::empty()
                .set_udp_tunnel_config(
                    UdpTunnelConfig::default()
                        .set_simple_udp_port(self.udp_port.unwrap_or_default()),
                )
                .set_tcp_tunnel_config(
                    TcpTunnelConfig::default().set_tcp_port(self.tcp_port.unwrap_or(0)),
                )
                .set_node_id(self.node_id.ok_or(io::Error::new(
                    io::ErrorKind::InvalidInput,
                    "node_id is required",
                ))?)
        };
        if let Some(interface) = self.interface {
            config = config.set_default_interface(interface);
        }
        if let Some(group_code) = self.group_code {
            config = config.set_group_code(group_code);
        }
        if let Some(peers) = self.peers {
            config = config.set_direct_addrs(peers);
        }
        #[cfg(any(
            feature = "aes-gcm-openssl",
            feature = "aes-gcm-ring",
            feature = "chacha20-poly1305-openssl",
            feature = "chacha20-poly1305-ring"
        ))]
        if let Some(encryption) = self.encryption {
            config = config.set_encryption(encryption);
        }

        let tunnel_manager = TunnelDispatcher::new(config).await?;

        EndPoint::with_interceptor_impl(tunnel_manager, self.interceptor).await
    }
}
impl EndPoint {
    pub async fn new(config: Config) -> io::Result<Self> {
        let tunnel_hub = TunnelDispatcher::new(config).await?;
        EndPoint::with_interceptor_impl(tunnel_hub, None).await
    }
    pub async fn with_interceptor<T: DataInterceptor + 'static>(
        config: Config,
        interceptor: T,
    ) -> io::Result<Self> {
        let tunnel_hub = TunnelDispatcher::new(config).await?;
        let interceptor = Some(Interceptor {
            inner: Arc::new(interceptor),
        });
        EndPoint::with_interceptor_impl(tunnel_hub, interceptor).await
    }
    async fn with_interceptor_impl(
        mut tunnel_hub: TunnelDispatcher,
        interceptor: Option<Interceptor>,
    ) -> io::Result<Self> {
        let (sender, receiver) = flume::unbounded();
        let writer = tunnel_hub.tunnel_router();
        #[cfg(feature = "use-kcp")]
        let kcp_context = KcpContext::default();
        #[cfg(feature = "use-kcp")]
        let kcp_data_input = kcp_context.clone();
        let handle = tokio::spawn(async move {
            while let Ok(tunnel_rx) = tunnel_hub.dispatch().await {
                tokio::spawn(handle(
                    tunnel_rx,
                    sender.clone(),
                    #[cfg(feature = "use-kcp")]
                    kcp_data_input.clone(),
                    interceptor.clone(),
                ));
            }
        });
        let _handle = OwnedJoinHandle { handle };
        Ok(EndPoint {
            #[cfg(feature = "use-kcp")]
            kcp_context,
            output: writer,
            input: receiver,
            _handle,
        })
    }
}

impl Default for Builder {
    fn default() -> Self {
        Self::new()
    }
}

async fn handle(
    mut tunnel_rx: Tunnel,
    sender: Sender<(RecvUserData, RecvMetadata)>,
    #[cfg(feature = "use-kcp")] kcp_data_input: KcpContext,
    interceptor: Option<Interceptor>,
) {
    let mut list = Vec::with_capacity(16);
    let interceptor = interceptor.as_ref();
    loop {
        let rs = match tunnel_rx
            .preprocess_batch_recv(interceptor, &mut list)
            .await
        {
            Ok(rs) => rs,
            Err(e) => {
                log::debug!(
                    "recv_from {e:?},{:?} {:?}",
                    tunnel_rx.protocol(),
                    tunnel_rx.remote_addr()
                );
                return;
            }
        };
        if let Err(e) = rs {
            log::debug!("recv_data_handle {e:?}");
            continue;
        };
        for (data, meta_data) in list.drain(..) {
            if meta_data.protocol() == ProtocolType::KcpData.into() {
                #[cfg(feature = "use-kcp")]
                kcp_data_input
                    .input(data.payload(), meta_data.src_id())
                    .await;
            } else if sender.send_async((data, meta_data)).await.is_err() {
                break;
            }
        }
    }
}
#[derive(Clone)]
pub(crate) struct Interceptor {
    inner: Arc<dyn DataInterceptor>,
}
#[async_trait]
impl DataInterceptor for Interceptor {
    async fn pre_handle(&self, data: &mut RecvResult) -> bool {
        self.inner.pre_handle(data).await
    }
}