nakamoto_net/
lib.rs

1//! Peer-to-peer networking core types.
2#![allow(clippy::type_complexity)]
3use std::borrow::Cow;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::{fmt, io, net};
7
8use crossbeam_channel as chan;
9
10pub mod error;
11pub mod event;
12pub mod simulator;
13pub mod time;
14
15pub use event::Publisher;
16pub use time::{LocalDuration, LocalTime};
17
18/// Link direction of the peer connection.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum Link {
21    /// Inbound conneciton.
22    Inbound,
23    /// Outbound connection.
24    Outbound,
25}
26
27impl Link {
28    /// Check whether the link is outbound.
29    pub fn is_outbound(&self) -> bool {
30        *self == Link::Outbound
31    }
32
33    /// Check whether the link is inbound.
34    pub fn is_inbound(&self) -> bool {
35        *self == Link::Inbound
36    }
37}
38
39/// Output of a state transition of the state machine.
40#[derive(Debug)]
41pub enum Io<M, E, D, Id: PeerId = net::SocketAddr> {
42    /// There are some bytes ready to be sent to a peer.
43    Write(Id, M),
44    /// Connect to a peer.
45    Connect(Id),
46    /// Disconnect from a peer.
47    Disconnect(Id, D),
48    /// Ask for a wakeup in a specified amount of time.
49    SetTimer(LocalDuration),
50    /// Emit an event.
51    Event(E),
52}
53
54/// Disconnection event which includes the reason.
55#[derive(Debug, Clone)]
56pub enum Disconnect<T> {
57    /// Error while dialing the remote. This error occures before a connection is
58    /// even established. Errors of this kind are usually not transient.
59    DialError(Arc<std::io::Error>),
60    /// Error with an underlying established connection. Sometimes, reconnecting
61    /// after such an error is possible.
62    ConnectionError(Arc<std::io::Error>),
63    /// Peer was disconnected for another reason.
64    StateMachine(T),
65}
66
67impl<T> Disconnect<T> {
68    pub fn is_dial_err(&self) -> bool {
69        matches!(self, Self::DialError(_))
70    }
71
72    pub fn is_connection_err(&self) -> bool {
73        matches!(self, Self::ConnectionError(_))
74    }
75}
76
77impl<T: fmt::Display> fmt::Display for Disconnect<T> {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        match self {
80            Self::DialError(err) => write!(f, "{}", err),
81            Self::ConnectionError(err) => write!(f, "{}", err),
82            Self::StateMachine(reason) => write!(f, "{}", reason),
83        }
84    }
85}
86
87/// Remote peer id, which must be convertible into a [`net::SocketAddr`]
88pub trait PeerId: Eq + Ord + Clone + Hash + fmt::Debug + From<net::SocketAddr> {
89    fn to_socket_addr(&self) -> net::SocketAddr;
90}
91
92impl<T> PeerId for T
93where
94    T: Eq + Ord + Clone + Hash + fmt::Debug,
95    T: Into<net::SocketAddr>,
96    T: From<net::SocketAddr>,
97{
98    fn to_socket_addr(&self) -> net::SocketAddr {
99        self.clone().into()
100    }
101}
102
103/// A network service.
104///
105/// Network protocols must implement this trait to be drivable by the reactor.
106pub trait Service<Id: PeerId = net::SocketAddr>: StateMachine<Id, Message = [u8]> {
107    /// Commands handled by the service. These commands should originate from an
108    /// external "user" thread. They are passed through the reactor via a channel
109    /// given to [`Reactor::run`]. The reactor calls [`Service::command_received`]
110    /// on the service for each command received.
111    type Command;
112
113    /// An external command has been received.
114    fn command_received(&mut self, cmd: Self::Command);
115}
116
117/// A service state-machine to implement a network protocol's logic.
118///
119/// This trait defines an API for connecting specific protocol domain logic to a
120/// [`Reactor`]. It is parametrized by a peer id, which is shared between the reactor
121/// and state machine.
122///
123/// The state machine emits [`Io`] instructions to the reactor via its [`Iterator`] trait.
124pub trait StateMachine<Id: PeerId = net::SocketAddr>:
125    Iterator<Item = Io<<Self::Message as ToOwned>::Owned, Self::Event, Self::DisconnectReason, Id>>
126{
127    /// Message type sent between peers.
128    type Message: fmt::Debug + ToOwned + ?Sized;
129    /// Events emitted by the state machine.
130    /// These are forwarded by the reactor to the user thread.
131    type Event: fmt::Debug;
132    /// Reason a peer was disconnected, in case the peer was disconnected by the internal
133    /// state-machine logic.
134    type DisconnectReason: fmt::Debug + fmt::Display + Into<Disconnect<Self::DisconnectReason>>;
135
136    /// Initialize the state machine. Called once before any event is sent to the state machine.
137    fn initialize(&mut self, _time: LocalTime) {
138        // "He was alone. He was unheeded, happy and near to the wild heart of life. He was alone
139        // and young and wilful and wildhearted, alone amid a waste of wild air and brackish waters
140        // and the sea-harvest of shells and tangle and veiled grey sunlight and gayclad lightclad
141        // figures of children and girls and voices childish and girlish in the air." -JJ
142    }
143    /// Called by the reactor upon receiving a message from a remote peer.
144    fn message_received(&mut self, addr: &Id, message: Cow<Self::Message>);
145    /// Connection attempt underway.
146    ///
147    /// This is only encountered when an outgoing connection attempt is made,
148    /// and is always called before [`StateMachine::connected`].
149    ///
150    /// For incoming connections, [`StateMachine::connected`] is called directly.
151    fn attempted(&mut self, addr: &Id);
152    /// New connection with a peer.
153    fn connected(&mut self, addr: Id, local_addr: &net::SocketAddr, link: Link);
154    /// Called whenever a remote peer was disconnected, either because of a
155    /// network-related event or due to a local instruction from this state machine,
156    /// using [`Io::Disconnect`].
157    fn disconnected(&mut self, addr: &Id, reason: Disconnect<Self::DisconnectReason>);
158    /// Called by the reactor every time the event loop gets data from the network, or times out.
159    /// Used to update the state machine's internal clock.
160    ///
161    /// "a regular short, sharp sound, especially that made by a clock or watch, typically
162    /// every second."
163    fn tick(&mut self, local_time: LocalTime);
164    /// A timer set with [`Io::SetTimer`] has expired.
165    fn timer_expired(&mut self);
166}
167
168/// Used by certain types of reactors to wake the event loop, for example when a
169/// [`Service::Command`] is ready to be processed by the service.
170pub trait Waker: Send + Sync + Clone {
171    /// Wake up! Call this after sending a command to make sure the command is processed
172    /// in a timely fashion.
173    fn wake(&self) -> io::Result<()>;
174}
175
176/// Any network reactor that can drive the light-client service.
177pub trait Reactor<Id: PeerId = net::SocketAddr> {
178    /// The type of waker this reactor uses.
179    type Waker: Waker;
180
181    /// Create a new reactor, initializing it with a publisher for service events,
182    /// a channel to receive commands, and a channel to shut it down.
183    fn new(
184        shutdown: chan::Receiver<()>,
185        listening: chan::Sender<net::SocketAddr>,
186    ) -> Result<Self, io::Error>
187    where
188        Self: Sized;
189
190    /// Run the given service with the reactor.
191    ///
192    /// Takes:
193    ///
194    /// * The addresses to listen for connections on.
195    /// * The [`Service`] to run.
196    /// * The [`StateMachine::Event`] publisher to use when the service emits events.
197    /// * The [`Service::Command`] channel on which commands will be received.
198    fn run<S, E>(
199        &mut self,
200        listen_addrs: &[net::SocketAddr],
201        service: S,
202        publisher: E,
203        commands: chan::Receiver<S::Command>,
204    ) -> Result<(), error::Error>
205    where
206        S: Service<Id>,
207        S::DisconnectReason: Into<Disconnect<S::DisconnectReason>>,
208        E: Publisher<S::Event>;
209
210    /// Return a new waker.
211    ///
212    /// The reactor can provide multiple wakers such that multiple user threads may wake
213    /// the event loop.
214    fn waker(&self) -> Self::Waker;
215}