qp2p 0.8.4

Peer-to-peer networking library using QUIC
Documentation
// Copyright 2020 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

#[cfg(feature = "upnp")]
use super::igd;
use super::{
    bootstrap_cache::BootstrapCache,
    config::{Config, SerialisableCertificate},
    connections::{Connection, RecvStream, SendStream},
    dirs::{Dirs, OverRide},
    endpoint::Endpoint,
    error::{Error, Result},
    peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
use bytes::Bytes;
use futures::future::select_ok;
use log::{debug, error, info, trace};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};

/// In the absence of a port supplied by the user via the config we will first try using this
/// before using a random port.
pub const DEFAULT_PORT_TO_TRY: u16 = 12000;

/// Message received from a peer
pub enum Message {
    /// A message sent by peer on a uni-directional stream
    UniStream {
        /// Message's bytes
        bytes: Bytes,
        /// Address the message was sent from
        src: SocketAddr,
        /// Stream to read more messages
        recv: RecvStream,
    },
    /// A message sent by peer on a bi-directional stream
    BiStream {
        /// Message's bytes
        bytes: Bytes,
        /// Address the message was sent from
        src: SocketAddr,
        /// Stream to send a message back to the initiator
        send: SendStream,
        /// Stream to read more messages
        recv: RecvStream,
    },
}

impl Message {
    /// Returns the data from the message
    pub fn get_message_data(&self) -> Bytes {
        match self {
            Self::UniStream { bytes, .. } | Self::BiStream { bytes, .. } => bytes.clone(),
        }
    }
}

/// Main QuicP2p instance to communicate with QuicP2p using an async API
#[derive(Clone)]
pub struct QuicP2p {
    local_addr: SocketAddr,
    allow_random_port: bool,
    bootstrap_cache: BootstrapCache,
    endpoint_cfg: quinn::ServerConfig,
    client_cfg: quinn::ClientConfig,
}

impl QuicP2p {
    /// Construct `QuicP2p` with the default config and bootstrap cache enabled
    ///
    /// # Example
    ///
    /// ```
    /// use qp2p::QuicP2p;
    ///
    /// let quic_p2p = QuicP2p::new().expect("Error initializing QuicP2p");
    /// ```

    pub fn new() -> Result<Self> {
        Self::with_config(None, Default::default(), true)
    }

    /// Construct `QuicP2p` with supplied parameters, ready to be used.
    /// If config is not specified it'll call `Config::read_or_construct_default()`
    ///
    /// `bootstrap_nodes`: takes bootstrap nodes from the user.
    ///
    /// In addition to bootstrap nodes provided, optionally use the nodes found
    /// in the bootstrap cache file (if such a file exists) or disable this feature.
    ///
    /// # Example
    ///
    /// ```
    /// use qp2p::{QuicP2p, Config};
    /// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    ///
    /// let mut config = Config::default();
    /// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
    /// config.port = Some(3000);
    /// let hcc = &["127.0.0.1:8080".parse().unwrap()];
    /// let quic_p2p = QuicP2p::with_config(Some(config), hcc, true).expect("Error initializing QuicP2p");
    /// ```
    pub fn with_config(
        cfg: Option<Config>,
        bootstrap_nodes: &[SocketAddr],
        use_bootstrap_cache: bool,
    ) -> Result<Self> {
        let cfg = unwrap_config_or_default(cfg)?;
        debug!("Config passed in to qp2p: {:?}", cfg);

        let (port, allow_random_port) = cfg
            .port
            .map(|p| (p, false))
            .unwrap_or((DEFAULT_PORT_TO_TRY, true));

        let ip = cfg.ip.unwrap_or_else(|| {
            let mut our_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);

            // check hard coded contacts for being local (aka loopback)
            if let Some(contact) = cfg.hard_coded_contacts.iter().next() {
                let ip = contact.ip();

                if ip.is_loopback() {
                    trace!(
                        "IP from hardcoded contact is loopback, setting our IP to: {:?}",
                        ip
                    );
                    our_ip = ip;
                }
            }

            our_ip
        });

        let idle_timeout_msec = cfg.idle_timeout_msec.unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC);

        let keep_alive_interval_msec = cfg
            .keep_alive_interval_msec
            .unwrap_or(DEFAULT_KEEP_ALIVE_INTERVAL_MSEC);

        let (key, cert) = {
            let our_complete_cert: SerialisableCertificate = Default::default();
            our_complete_cert.obtain_priv_key_and_cert()?
        };

        let custom_dirs = cfg
            .bootstrap_cache_dir
            .clone()
            .map(|custom_dir| Dirs::Overide(OverRide::new(&custom_dir)));

        let mut bootstrap_cache =
            BootstrapCache::new(cfg.hard_coded_contacts, custom_dirs.as_ref())?;
        if use_bootstrap_cache {
            bootstrap_cache.peers_mut().extend(bootstrap_nodes);
        } else {
            let bootstrap_cache = bootstrap_cache.peers_mut();
            bootstrap_cache.clear();
            for addr in bootstrap_nodes {
                bootstrap_cache.push_back(*addr);
            }
        }

        let endpoint_cfg =
            peer_config::new_our_cfg(idle_timeout_msec, keep_alive_interval_msec, cert, key)?;

        let client_cfg = peer_config::new_client_cfg(idle_timeout_msec, keep_alive_interval_msec);

        let qp2p = Self {
            local_addr: SocketAddr::new(ip, port),
            allow_random_port,
            bootstrap_cache,
            endpoint_cfg,
            client_cfg,
        };

        Ok(qp2p)
    }

    /// Bootstrap to the network.
    ///
    /// Bootstrap concept is different from "connect" in several ways: `bootstrap()` will try to
    /// connect to all peers which are specified in the config (`hard_coded_contacts`) or were
    /// previously cached.
    /// Once a connection with a peer succeeds, a `Connection` for such peer will be returned
    /// and all other connections will be dropped.
    ///
    /// # Example
    ///
    /// ```
    /// use qp2p::{QuicP2p, Config, Error};
    /// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), Error> {
    ///
    ///     let mut config = Config::default();
    ///     config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
    ///     config.port = Some(3000);
    ///     let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), &[], true)?;
    ///     let endpoint = quic_p2p.new_endpoint()?;
    ///     let peer_addr = endpoint.our_endpoint()?;
    ///
    ///     config.port = Some(3001);
    ///     let mut quic_p2p = QuicP2p::with_config(Some(config), &[peer_addr], true)?;
    ///     let (endpoint, connection) = quic_p2p.bootstrap().await?;
    ///     Ok(())
    /// }
    /// ```
    pub async fn bootstrap(&self) -> Result<(Endpoint, Connection)> {
        // TODO: refactor bootstrap_cache so we can simply get the list of nodes
        let bootstrap_nodes: Vec<SocketAddr> = self
            .bootstrap_cache
            .peers()
            .iter()
            .rev()
            .chain(self.bootstrap_cache.hard_coded_contacts().iter())
            .cloned()
            .collect();

        trace!("Bootstrapping with nodes {:?}", bootstrap_nodes);
        // Attempt to connect to all nodes and return the first one to succeed
        let mut tasks = Vec::default();
        for node_addr in bootstrap_nodes {
            let endpoint_cfg = self.endpoint_cfg.clone();
            let client_cfg = self.client_cfg.clone();
            let local_addr = self.local_addr;
            let allow_random_port = self.allow_random_port;
            let task_handle = tokio::spawn(async move {
                new_connection_to(
                    &node_addr,
                    endpoint_cfg,
                    client_cfg,
                    local_addr,
                    allow_random_port,
                )
                .await
            });
            tasks.push(task_handle);
        }

        let (result, _) = select_ok(tasks).await.map_err(|err| {
            error!("Failed to botstrap to the network: {}", err);
            Error::BootstrapFailure
        })?;

        let (endpoint, connection) = result?;
        Ok((endpoint, connection))
    }

    /// Connect to the given peer and return the `Endpoint` created along with the `Connection`
    /// object if it succeeds, which can then be used to send messages to the connected peer.
    ///
    /// # Example
    ///
    /// ```
    /// use qp2p::{QuicP2p, Config, Error};
    /// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), Error> {
    ///
    ///     let mut config = Config::default();
    ///     config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
    ///     let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
    ///     let peer_1 = quic_p2p.new_endpoint()?;
    ///     let peer1_addr = peer_1.our_endpoint()?;
    ///
    ///     let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
    ///     Ok(())
    /// }
    /// ```
    pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {
        new_connection_to(
            node_addr,
            self.endpoint_cfg.clone(),
            self.client_cfg.clone(),
            self.local_addr,
            self.allow_random_port,
        )
        .await
    }

    /// Create a new `Endpoint`  which can be used to connect to peers and send
    /// messages to them, as well as listen to messages incoming from other peers.
    ///
    /// # Example
    ///
    /// ```
    /// use qp2p::{QuicP2p, Config, Error};
    /// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    /// #[tokio::main]
    /// async fn main() -> Result<(), Error> {
    ///
    ///     let mut config = Config::default();
    ///     config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
    ///     let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
    ///     let endpoint = quic_p2p.new_endpoint()?;
    ///     Ok(())
    /// }
    /// ```
    pub fn new_endpoint(&self) -> Result<Endpoint> {
        trace!("Creating a new enpoint");
        let (quinn_endpoint, quinn_incoming) = bind(
            self.endpoint_cfg.clone(),
            self.local_addr,
            self.allow_random_port,
        )?;

        trace!("Bound endpoint to local address: {}", self.local_addr);

        let endpoint = Endpoint::new(quinn_endpoint, quinn_incoming, self.client_cfg.clone())?;

        Ok(endpoint)
    }
}

// Creates a new Connection
async fn new_connection_to(
    node_addr: &SocketAddr,
    endpoint_cfg: quinn::ServerConfig,
    client_cfg: quinn::ClientConfig,
    local_addr: SocketAddr,
    allow_random_port: bool,
) -> Result<(Endpoint, Connection)> {
    trace!("Attempting to connect to peer: {}", node_addr);

    let (quinn_endpoint, quinn_incoming) = bind(endpoint_cfg, local_addr, allow_random_port)?;

    trace!("Bound connection to local address: {}", local_addr);

    let endpoint = Endpoint::new(quinn_endpoint, quinn_incoming, client_cfg)?;
    let connection = endpoint.connect_to(node_addr).await?;

    Ok((endpoint, connection))
}

// Bind a new socket with a local address
fn bind(
    endpoint_cfg: quinn::ServerConfig,
    local_addr: SocketAddr,
    allow_random_port: bool,
) -> Result<(quinn::Endpoint, quinn::Incoming)> {
    let mut endpoint_builder = quinn::Endpoint::builder();
    // TODO: allow to optionally accept incoming conns, needed for clients
    let _ = endpoint_builder.listen(endpoint_cfg);

    match UdpSocket::bind(&local_addr) {
        Ok(udp) => endpoint_builder.with_socket(udp).map_err(Error::Endpoint),
        Err(err) if allow_random_port => {
            info!(
                "Failed to bind to local address: {} - Error: {}. Trying random port instead.",
                local_addr, err
            );
            let bind_addr = SocketAddr::new(local_addr.ip(), 0);

            endpoint_builder.bind(&bind_addr).map_err(|e| {
                error!("Failed to bind to random port too: {}", e);
                Error::Endpoint(e)
            })
        }
        Err(err) => Err(Error::Configuration(format!(
            "Could not bind to the user supplied port: {}! Error: {}",
            local_addr.port(),
            err
        ))),
    }
}

// Private helpers

// Unwrap the conffig if provided by the user, otherwise construct the default one
#[cfg(not(feature = "upnp"))]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
    cfg.map_or(Config::read_or_construct_default(None), Ok)
}

#[cfg(feature = "upnp")]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
    let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
    if cfg.ip.is_none() {
        cfg.ip = igd::get_local_ip().ok();
    };

    Ok(cfg)
}