nakamoto_net/lib.rs
1//! Peer-to-peer networking core types.
2#![allow(clippy::type_complexity)]
3use std::borrow::Cow;
4use std::hash::Hash;
5use std::sync::Arc;
6use std::{fmt, io, net};
7
8use crossbeam_channel as chan;
9
10pub mod error;
11pub mod event;
12pub mod simulator;
13pub mod time;
14
15pub use event::Publisher;
16pub use time::{LocalDuration, LocalTime};
17
18/// Link direction of the peer connection.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum Link {
21 /// Inbound conneciton.
22 Inbound,
23 /// Outbound connection.
24 Outbound,
25}
26
27impl Link {
28 /// Check whether the link is outbound.
29 pub fn is_outbound(&self) -> bool {
30 *self == Link::Outbound
31 }
32
33 /// Check whether the link is inbound.
34 pub fn is_inbound(&self) -> bool {
35 *self == Link::Inbound
36 }
37}
38
39/// Output of a state transition of the state machine.
40#[derive(Debug)]
41pub enum Io<M, E, D, Id: PeerId = net::SocketAddr> {
42 /// There are some bytes ready to be sent to a peer.
43 Write(Id, M),
44 /// Connect to a peer.
45 Connect(Id),
46 /// Disconnect from a peer.
47 Disconnect(Id, D),
48 /// Ask for a wakeup in a specified amount of time.
49 SetTimer(LocalDuration),
50 /// Emit an event.
51 Event(E),
52}
53
54/// Disconnection event which includes the reason.
55#[derive(Debug, Clone)]
56pub enum Disconnect<T> {
57 /// Error while dialing the remote. This error occures before a connection is
58 /// even established. Errors of this kind are usually not transient.
59 DialError(Arc<std::io::Error>),
60 /// Error with an underlying established connection. Sometimes, reconnecting
61 /// after such an error is possible.
62 ConnectionError(Arc<std::io::Error>),
63 /// Peer was disconnected for another reason.
64 StateMachine(T),
65}
66
67impl<T> Disconnect<T> {
68 pub fn is_dial_err(&self) -> bool {
69 matches!(self, Self::DialError(_))
70 }
71
72 pub fn is_connection_err(&self) -> bool {
73 matches!(self, Self::ConnectionError(_))
74 }
75}
76
77impl<T: fmt::Display> fmt::Display for Disconnect<T> {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 match self {
80 Self::DialError(err) => write!(f, "{}", err),
81 Self::ConnectionError(err) => write!(f, "{}", err),
82 Self::StateMachine(reason) => write!(f, "{}", reason),
83 }
84 }
85}
86
87/// Remote peer id, which must be convertible into a [`net::SocketAddr`]
88pub trait PeerId: Eq + Ord + Clone + Hash + fmt::Debug + From<net::SocketAddr> {
89 fn to_socket_addr(&self) -> net::SocketAddr;
90}
91
92impl<T> PeerId for T
93where
94 T: Eq + Ord + Clone + Hash + fmt::Debug,
95 T: Into<net::SocketAddr>,
96 T: From<net::SocketAddr>,
97{
98 fn to_socket_addr(&self) -> net::SocketAddr {
99 self.clone().into()
100 }
101}
102
103/// A network service.
104///
105/// Network protocols must implement this trait to be drivable by the reactor.
106pub trait Service<Id: PeerId = net::SocketAddr>: StateMachine<Id, Message = [u8]> {
107 /// Commands handled by the service. These commands should originate from an
108 /// external "user" thread. They are passed through the reactor via a channel
109 /// given to [`Reactor::run`]. The reactor calls [`Service::command_received`]
110 /// on the service for each command received.
111 type Command;
112
113 /// An external command has been received.
114 fn command_received(&mut self, cmd: Self::Command);
115}
116
117/// A service state-machine to implement a network protocol's logic.
118///
119/// This trait defines an API for connecting specific protocol domain logic to a
120/// [`Reactor`]. It is parametrized by a peer id, which is shared between the reactor
121/// and state machine.
122///
123/// The state machine emits [`Io`] instructions to the reactor via its [`Iterator`] trait.
124pub trait StateMachine<Id: PeerId = net::SocketAddr>:
125 Iterator<Item = Io<<Self::Message as ToOwned>::Owned, Self::Event, Self::DisconnectReason, Id>>
126{
127 /// Message type sent between peers.
128 type Message: fmt::Debug + ToOwned + ?Sized;
129 /// Events emitted by the state machine.
130 /// These are forwarded by the reactor to the user thread.
131 type Event: fmt::Debug;
132 /// Reason a peer was disconnected, in case the peer was disconnected by the internal
133 /// state-machine logic.
134 type DisconnectReason: fmt::Debug + fmt::Display + Into<Disconnect<Self::DisconnectReason>>;
135
136 /// Initialize the state machine. Called once before any event is sent to the state machine.
137 fn initialize(&mut self, _time: LocalTime) {
138 // "He was alone. He was unheeded, happy and near to the wild heart of life. He was alone
139 // and young and wilful and wildhearted, alone amid a waste of wild air and brackish waters
140 // and the sea-harvest of shells and tangle and veiled grey sunlight and gayclad lightclad
141 // figures of children and girls and voices childish and girlish in the air." -JJ
142 }
143 /// Called by the reactor upon receiving a message from a remote peer.
144 fn message_received(&mut self, addr: &Id, message: Cow<Self::Message>);
145 /// Connection attempt underway.
146 ///
147 /// This is only encountered when an outgoing connection attempt is made,
148 /// and is always called before [`StateMachine::connected`].
149 ///
150 /// For incoming connections, [`StateMachine::connected`] is called directly.
151 fn attempted(&mut self, addr: &Id);
152 /// New connection with a peer.
153 fn connected(&mut self, addr: Id, local_addr: &net::SocketAddr, link: Link);
154 /// Called whenever a remote peer was disconnected, either because of a
155 /// network-related event or due to a local instruction from this state machine,
156 /// using [`Io::Disconnect`].
157 fn disconnected(&mut self, addr: &Id, reason: Disconnect<Self::DisconnectReason>);
158 /// Called by the reactor every time the event loop gets data from the network, or times out.
159 /// Used to update the state machine's internal clock.
160 ///
161 /// "a regular short, sharp sound, especially that made by a clock or watch, typically
162 /// every second."
163 fn tick(&mut self, local_time: LocalTime);
164 /// A timer set with [`Io::SetTimer`] has expired.
165 fn timer_expired(&mut self);
166}
167
168/// Used by certain types of reactors to wake the event loop, for example when a
169/// [`Service::Command`] is ready to be processed by the service.
170pub trait Waker: Send + Sync + Clone {
171 /// Wake up! Call this after sending a command to make sure the command is processed
172 /// in a timely fashion.
173 fn wake(&self) -> io::Result<()>;
174}
175
176/// Any network reactor that can drive the light-client service.
177pub trait Reactor<Id: PeerId = net::SocketAddr> {
178 /// The type of waker this reactor uses.
179 type Waker: Waker;
180
181 /// Create a new reactor, initializing it with a publisher for service events,
182 /// a channel to receive commands, and a channel to shut it down.
183 fn new(
184 shutdown: chan::Receiver<()>,
185 listening: chan::Sender<net::SocketAddr>,
186 ) -> Result<Self, io::Error>
187 where
188 Self: Sized;
189
190 /// Run the given service with the reactor.
191 ///
192 /// Takes:
193 ///
194 /// * The addresses to listen for connections on.
195 /// * The [`Service`] to run.
196 /// * The [`StateMachine::Event`] publisher to use when the service emits events.
197 /// * The [`Service::Command`] channel on which commands will be received.
198 fn run<S, E>(
199 &mut self,
200 listen_addrs: &[net::SocketAddr],
201 service: S,
202 publisher: E,
203 commands: chan::Receiver<S::Command>,
204 ) -> Result<(), error::Error>
205 where
206 S: Service<Id>,
207 S::DisconnectReason: Into<Disconnect<S::DisconnectReason>>,
208 E: Publisher<S::Event>;
209
210 /// Return a new waker.
211 ///
212 /// The reactor can provide multiple wakers such that multiple user threads may wake
213 /// the event loop.
214 fn waker(&self) -> Self::Waker;
215}