1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
//! Peer-to-peer networking core types.
#![allow(clippy::type_complexity)]
use std::borrow::Cow;
use std::hash::Hash;
use std::sync::Arc;
use std::{fmt, io, net};
use crossbeam_channel as chan;
pub mod error;
pub mod event;
pub mod simulator;
pub mod time;
pub use event::Publisher;
pub use time::{LocalDuration, LocalTime};
/// Link direction of the peer connection.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Link {
/// Inbound conneciton.
Inbound,
/// Outbound connection.
Outbound,
}
impl Link {
/// Check whether the link is outbound.
pub fn is_outbound(&self) -> bool {
*self == Link::Outbound
}
/// Check whether the link is inbound.
pub fn is_inbound(&self) -> bool {
*self == Link::Inbound
}
}
/// Output of a state transition of the state machine.
#[derive(Debug)]
pub enum Io<M, E, D, Id: PeerId = net::SocketAddr> {
/// There are some bytes ready to be sent to a peer.
Write(Id, M),
/// Connect to a peer.
Connect(Id),
/// Disconnect from a peer.
Disconnect(Id, D),
/// Ask for a wakeup in a specified amount of time.
SetTimer(LocalDuration),
/// Emit an event.
Event(E),
}
/// Disconnection event which includes the reason.
#[derive(Debug, Clone)]
pub enum Disconnect<T> {
/// Error while dialing the remote. This error occures before a connection is
/// even established. Errors of this kind are usually not transient.
DialError(Arc<std::io::Error>),
/// Error with an underlying established connection. Sometimes, reconnecting
/// after such an error is possible.
ConnectionError(Arc<std::io::Error>),
/// Peer was disconnected for another reason.
StateMachine(T),
}
impl<T> Disconnect<T> {
pub fn is_dial_err(&self) -> bool {
matches!(self, Self::DialError(_))
}
pub fn is_connection_err(&self) -> bool {
matches!(self, Self::ConnectionError(_))
}
}
impl<T: fmt::Display> fmt::Display for Disconnect<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DialError(err) => write!(f, "{}", err),
Self::ConnectionError(err) => write!(f, "{}", err),
Self::StateMachine(reason) => write!(f, "{}", reason),
}
}
}
/// Remote peer id, which must be convertible into a [`net::SocketAddr`]
pub trait PeerId: Eq + Ord + Clone + Hash + fmt::Debug + From<net::SocketAddr> {
fn to_socket_addr(&self) -> net::SocketAddr;
}
impl<T> PeerId for T
where
T: Eq + Ord + Clone + Hash + fmt::Debug,
T: Into<net::SocketAddr>,
T: From<net::SocketAddr>,
{
fn to_socket_addr(&self) -> net::SocketAddr {
self.clone().into()
}
}
/// A network service.
///
/// Network protocols must implement this trait to be drivable by the reactor.
pub trait Service<Id: PeerId = net::SocketAddr>: StateMachine<Id, Message = [u8]> {
/// Commands handled by the service. These commands should originate from an
/// external "user" thread. They are passed through the reactor via a channel
/// given to [`Reactor::run`]. The reactor calls [`Service::command_received`]
/// on the service for each command received.
type Command;
/// An external command has been received.
fn command_received(&mut self, cmd: Self::Command);
}
/// A service state-machine to implement a network protocol's logic.
///
/// This trait defines an API for connecting specific protocol domain logic to a
/// [`Reactor`]. It is parametrized by a peer id, which is shared between the reactor
/// and state machine.
///
/// The state machine emits [`Io`] instructions to the reactor via its [`Iterator`] trait.
pub trait StateMachine<Id: PeerId = net::SocketAddr>:
Iterator<Item = Io<<Self::Message as ToOwned>::Owned, Self::Event, Self::DisconnectReason, Id>>
{
/// Message type sent between peers.
type Message: fmt::Debug + ToOwned + ?Sized;
/// Events emitted by the state machine.
/// These are forwarded by the reactor to the user thread.
type Event: fmt::Debug;
/// Reason a peer was disconnected, in case the peer was disconnected by the internal
/// state-machine logic.
type DisconnectReason: fmt::Debug + fmt::Display + Into<Disconnect<Self::DisconnectReason>>;
/// Initialize the state machine. Called once before any event is sent to the state machine.
fn initialize(&mut self, _time: LocalTime) {
// "He was alone. He was unheeded, happy and near to the wild heart of life. He was alone
// and young and wilful and wildhearted, alone amid a waste of wild air and brackish waters
// and the sea-harvest of shells and tangle and veiled grey sunlight and gayclad lightclad
// figures of children and girls and voices childish and girlish in the air." -JJ
}
/// Called by the reactor upon receiving a message from a remote peer.
fn message_received(&mut self, addr: &Id, message: Cow<Self::Message>);
/// Connection attempt underway.
///
/// This is only encountered when an outgoing connection attempt is made,
/// and is always called before [`StateMachine::connected`].
///
/// For incoming connections, [`StateMachine::connected`] is called directly.
fn attempted(&mut self, addr: &Id);
/// New connection with a peer.
fn connected(&mut self, addr: Id, local_addr: &net::SocketAddr, link: Link);
/// Called whenever a remote peer was disconnected, either because of a
/// network-related event or due to a local instruction from this state machine,
/// using [`Io::Disconnect`].
fn disconnected(&mut self, addr: &Id, reason: Disconnect<Self::DisconnectReason>);
/// Called by the reactor every time the event loop gets data from the network, or times out.
/// Used to update the state machine's internal clock.
///
/// "a regular short, sharp sound, especially that made by a clock or watch, typically
/// every second."
fn tick(&mut self, local_time: LocalTime);
/// A timer set with [`Io::SetTimer`] has expired.
fn timer_expired(&mut self);
}
/// Used by certain types of reactors to wake the event loop, for example when a
/// [`Service::Command`] is ready to be processed by the service.
pub trait Waker: Send + Sync + Clone {
/// Wake up! Call this after sending a command to make sure the command is processed
/// in a timely fashion.
fn wake(&self) -> io::Result<()>;
}
/// Any network reactor that can drive the light-client service.
pub trait Reactor<Id: PeerId = net::SocketAddr> {
/// The type of waker this reactor uses.
type Waker: Waker;
/// Create a new reactor, initializing it with a publisher for service events,
/// a channel to receive commands, and a channel to shut it down.
fn new(
shutdown: chan::Receiver<()>,
listening: chan::Sender<net::SocketAddr>,
) -> Result<Self, io::Error>
where
Self: Sized;
/// Run the given service with the reactor.
///
/// Takes:
///
/// * The addresses to listen for connections on.
/// * The [`Service`] to run.
/// * The [`StateMachine::Event`] publisher to use when the service emits events.
/// * The [`Service::Command`] channel on which commands will be received.
fn run<S, E>(
&mut self,
listen_addrs: &[net::SocketAddr],
service: S,
publisher: E,
commands: chan::Receiver<S::Command>,
) -> Result<(), error::Error>
where
S: Service<Id>,
S::DisconnectReason: Into<Disconnect<S::DisconnectReason>>,
E: Publisher<S::Event>;
/// Return a new waker.
///
/// The reactor can provide multiple wakers such that multiple user threads may wake
/// the event loop.
fn waker(&self) -> Self::Waker;
}