nakamoto-net 0.4.0

Lightweight peer-to-peer networking
Documentation
//! Peer-to-peer networking core types.
#![allow(clippy::type_complexity)]
use std::borrow::Cow;
use std::hash::Hash;
use std::sync::Arc;
use std::{fmt, io, net};

use crossbeam_channel as chan;

pub mod error;
pub mod event;
pub mod simulator;
pub mod time;

pub use event::Publisher;
pub use time::{LocalDuration, LocalTime};

/// Link direction of the peer connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Link {
    /// Inbound conneciton.
    Inbound,
    /// Outbound connection.
    Outbound,
}

impl Link {
    /// Check whether the link is outbound.
    pub fn is_outbound(&self) -> bool {
        *self == Link::Outbound
    }

    /// Check whether the link is inbound.
    pub fn is_inbound(&self) -> bool {
        *self == Link::Inbound
    }
}

/// Output of a state transition of the state machine.
#[derive(Debug)]
pub enum Io<M, E, D, Id: PeerId = net::SocketAddr> {
    /// There are some bytes ready to be sent to a peer.
    Write(Id, M),
    /// Connect to a peer.
    Connect(Id),
    /// Disconnect from a peer.
    Disconnect(Id, D),
    /// Ask for a wakeup in a specified amount of time.
    SetTimer(LocalDuration),
    /// Emit an event.
    Event(E),
}

/// Disconnection event which includes the reason.
#[derive(Debug, Clone)]
pub enum Disconnect<T> {
    /// Error while dialing the remote. This error occures before a connection is
    /// even established. Errors of this kind are usually not transient.
    DialError(Arc<std::io::Error>),
    /// Error with an underlying established connection. Sometimes, reconnecting
    /// after such an error is possible.
    ConnectionError(Arc<std::io::Error>),
    /// Peer was disconnected for another reason.
    StateMachine(T),
}

impl<T> Disconnect<T> {
    pub fn is_dial_err(&self) -> bool {
        matches!(self, Self::DialError(_))
    }

    pub fn is_connection_err(&self) -> bool {
        matches!(self, Self::ConnectionError(_))
    }
}

impl<T: fmt::Display> fmt::Display for Disconnect<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::DialError(err) => write!(f, "{}", err),
            Self::ConnectionError(err) => write!(f, "{}", err),
            Self::StateMachine(reason) => write!(f, "{}", reason),
        }
    }
}

/// Remote peer id, which must be convertible into a [`net::SocketAddr`]
pub trait PeerId: Eq + Ord + Clone + Hash + fmt::Debug + From<net::SocketAddr> {
    fn to_socket_addr(&self) -> net::SocketAddr;
}

impl<T> PeerId for T
where
    T: Eq + Ord + Clone + Hash + fmt::Debug,
    T: Into<net::SocketAddr>,
    T: From<net::SocketAddr>,
{
    fn to_socket_addr(&self) -> net::SocketAddr {
        self.clone().into()
    }
}

/// A network service.
///
/// Network protocols must implement this trait to be drivable by the reactor.
pub trait Service<Id: PeerId = net::SocketAddr>: StateMachine<Id, Message = [u8]> {
    /// Commands handled by the service. These commands should originate from an
    /// external "user" thread. They are passed through the reactor via a channel
    /// given to [`Reactor::run`]. The reactor calls [`Service::command_received`]
    /// on the service for each command received.
    type Command;

    /// An external command has been received.
    fn command_received(&mut self, cmd: Self::Command);
}

/// A service state-machine to implement a network protocol's logic.
///
/// This trait defines an API for connecting specific protocol domain logic to a
/// [`Reactor`]. It is parametrized by a peer id, which is shared between the reactor
/// and state machine.
///
/// The state machine emits [`Io`] instructions to the reactor via its [`Iterator`] trait.
pub trait StateMachine<Id: PeerId = net::SocketAddr>:
    Iterator<Item = Io<<Self::Message as ToOwned>::Owned, Self::Event, Self::DisconnectReason, Id>>
{
    /// Message type sent between peers.
    type Message: fmt::Debug + ToOwned + ?Sized;
    /// Events emitted by the state machine.
    /// These are forwarded by the reactor to the user thread.
    type Event: fmt::Debug;
    /// Reason a peer was disconnected, in case the peer was disconnected by the internal
    /// state-machine logic.
    type DisconnectReason: fmt::Debug + fmt::Display + Into<Disconnect<Self::DisconnectReason>>;

    /// Initialize the state machine. Called once before any event is sent to the state machine.
    fn initialize(&mut self, _time: LocalTime) {
        // "He was alone. He was unheeded, happy and near to the wild heart of life. He was alone
        // and young and wilful and wildhearted, alone amid a waste of wild air and brackish waters
        // and the sea-harvest of shells and tangle and veiled grey sunlight and gayclad lightclad
        // figures of children and girls and voices childish and girlish in the air." -JJ
    }
    /// Called by the reactor upon receiving a message from a remote peer.
    fn message_received(&mut self, addr: &Id, message: Cow<Self::Message>);
    /// Connection attempt underway.
    ///
    /// This is only encountered when an outgoing connection attempt is made,
    /// and is always called before [`StateMachine::connected`].
    ///
    /// For incoming connections, [`StateMachine::connected`] is called directly.
    fn attempted(&mut self, addr: &Id);
    /// New connection with a peer.
    fn connected(&mut self, addr: Id, local_addr: &net::SocketAddr, link: Link);
    /// Called whenever a remote peer was disconnected, either because of a
    /// network-related event or due to a local instruction from this state machine,
    /// using [`Io::Disconnect`].
    fn disconnected(&mut self, addr: &Id, reason: Disconnect<Self::DisconnectReason>);
    /// Called by the reactor every time the event loop gets data from the network, or times out.
    /// Used to update the state machine's internal clock.
    ///
    /// "a regular short, sharp sound, especially that made by a clock or watch, typically
    /// every second."
    fn tick(&mut self, local_time: LocalTime);
    /// A timer set with [`Io::SetTimer`] has expired.
    fn timer_expired(&mut self);
}

/// Used by certain types of reactors to wake the event loop, for example when a
/// [`Service::Command`] is ready to be processed by the service.
pub trait Waker: Send + Sync + Clone {
    /// Wake up! Call this after sending a command to make sure the command is processed
    /// in a timely fashion.
    fn wake(&self) -> io::Result<()>;
}

/// Any network reactor that can drive the light-client service.
pub trait Reactor<Id: PeerId = net::SocketAddr> {
    /// The type of waker this reactor uses.
    type Waker: Waker;

    /// Create a new reactor, initializing it with a publisher for service events,
    /// a channel to receive commands, and a channel to shut it down.
    fn new(
        shutdown: chan::Receiver<()>,
        listening: chan::Sender<net::SocketAddr>,
    ) -> Result<Self, io::Error>
    where
        Self: Sized;

    /// Run the given service with the reactor.
    ///
    /// Takes:
    ///
    /// * The addresses to listen for connections on.
    /// * The [`Service`] to run.
    /// * The [`StateMachine::Event`] publisher to use when the service emits events.
    /// * The [`Service::Command`] channel on which commands will be received.
    fn run<S, E>(
        &mut self,
        listen_addrs: &[net::SocketAddr],
        service: S,
        publisher: E,
        commands: chan::Receiver<S::Command>,
    ) -> Result<(), error::Error>
    where
        S: Service<Id>,
        S::DisconnectReason: Into<Disconnect<S::DisconnectReason>>,
        E: Publisher<S::Event>;

    /// Return a new waker.
    ///
    /// The reactor can provide multiple wakers such that multiple user threads may wake
    /// the event loop.
    fn waker(&self) -> Self::Waker;
}