peerlink/lib.rs
1//! # Peer-to-peer networking reactor
2//!
3//! Peerlink is a low-level building block for P2P applications. It uses a nonblocking reactor to
4//! accept inbound connections, make outbound connections, do message streaming and reassembly,
5//! track peers and perform other low-level operations. It entirely abstracts away menial
6//! networking plumbing such as managing TCP sockets and reading bytes off the wire. In other
7//! words, it provides the consumer with a simple interface to talking with other nodes in a P2P
8//! network.
9//!
10//! See the included example for usage.
11
12mod message_stream;
13mod reactor;
14
15pub mod connector;
16
17use std::{io, net::SocketAddr, num::NonZeroUsize};
18
19use crate::connector::Target;
20
21pub use message_stream::StreamConfig;
22pub use reactor::{Handle, run, run_with_connector};
23
24#[cfg(feature = "socks")]
25pub use reactor::run_with_socks5_proxy;
26
27#[cfg(not(feature = "async"))]
28pub use crossbeam_channel;
29
30#[cfg(feature = "async")]
31pub use async_channel;
32
33/// Configuration parameters for the reactor.
34#[derive(Debug, Clone)]
35pub struct Config {
36 /// The list of socket addresses where the reactor listens for inbound connections.
37 pub bind_addr: Vec<std::net::SocketAddr>,
38
39 /// Configuration parameters for individual peer connections. This allows the fine tuning of
40 /// internal buffer sizes etc.
41 pub stream_config: StreamConfig,
42
43 /// The size of the shared receive buffer, i.e. the max number of bytes that can be read in one
44 /// receive operation. Setting this too low can cause many reads to happen, whereas too high a
45 /// figure will use up more memory and open up your application to DoS attacks. The default is
46 /// 1 MB.
47 ///
48 /// This figure is capped by [`Message::MAX_SIZE`] since there is no need to ever take in more
49 /// data in one read than the biggest message requires to decode.
50 pub receive_buffer_size: usize,
51
52 /// Whether the reactor should perform backpressure control on the receive side. Setting this
53 /// to `Some(n)` means that the reactor will start blocking on sending events to the consumer
54 /// when the receive channel of size `n` is full and events are not being read. Setting it to
55 /// `None` means that the capacity of the event channel is unbounded and the reactor will send
56 /// events to the consumer as fast as it can, regardless of whether those events are being read
57 /// (at all). The default is no backpressure control (`None`).
58 pub receive_channel_size: Option<NonZeroUsize>,
59}
60
61impl Default for Config {
62 fn default() -> Self {
63 Self {
64 bind_addr: Default::default(),
65 stream_config: Default::default(),
66 receive_buffer_size: 1024 * 1024,
67 receive_channel_size: None,
68 }
69 }
70}
71
72/// A trait that network messages processed by the reactor must implement.
73pub trait Message: std::fmt::Debug + Sized + Send + Sync + 'static {
74 /// The size of the largest expected message. It is important to set this correctly because an
75 /// incorrect value will interfere with inbound backpressure control and the ability to decode
76 /// large messages. This is also crucial for DoS protection (resource exhaustion attacks).
77 const MAX_SIZE: usize;
78
79 /// Encodes a message into a writer. This is an in-memory sink that never panics so there is no
80 /// need to handle the error path.
81 ///
82 /// Returns the number of encoded bytes.
83 fn encode(&self, sink: &mut impl std::io::Write) -> usize;
84
85 /// Provides access to the underlying read buffer. The buffer may contain any number of
86 /// messages, including no messages at all or only a partial message. If there are enough bytes
87 /// available to decode a message, the function must return an `Ok` with the decoded message and
88 /// the number of bytes it consumed.
89 ///
90 /// If there is not enough data to decode a message (i.e. it is available only partially),
91 /// `Err(DecodeError::NotEnoughData)` must be returned. That signals that decoding should be
92 /// retried when more data comes in. If the message cannot be decoded at all, or exceeds size
93 /// limits or otherwise represents junk data, `Err(DecodeError::MalformedMessage)` must be
94 /// returned. Such peers are disconnected as protocol violators.
95 fn decode(buffer: &[u8]) -> Result<(Self, usize), DecodeError>;
96
97 /// If a message has a known size ahead of encoding, that value can be set here. This is useful
98 /// for outbound backpressure control, so that a message is not preemptively encoded and placed
99 /// into the send buffer only to be realized that the size of the send buffer will be exceeding
100 /// its maximum. Getting this wrong can interfere with outbound backpressure control, so if the
101 /// value is not certain, it is better not to override the method.
102 fn size_hint(&self) -> Option<usize> {
103 None
104 }
105}
106
107/// Possible reasons why a message could not be decoded at a particular time.
108pub enum DecodeError {
109 /// There is not enough data available to reconstruct a message. This does not indicate an
110 /// irrecoverable problem, it just means that not enough data has been taken of the wire yet
111 /// and that the operation should be retried once more data comes in.
112 NotEnoughData,
113 /// The message is malformed in some way. Once this is encountered, the peer that sent it
114 /// is disconnected.
115 MalformedMessage,
116}
117
118/// Unique peer identifier. These are unique for the lifetime of the process and strictly
119/// incrementing for each new connection. Even if the same peer (in terms of socket address)
120/// connects multiple times, a new `PeerId` instance will be issued for each connection.
121#[derive(Debug, Clone, Hash, Copy, PartialEq, Eq, PartialOrd, Ord)]
122pub struct PeerId(u64);
123
124impl PeerId {
125 /// DANGER: allows the user to set peer ids directly. Normally these are assigned by the
126 /// reactor and the consumer of the library should not be creating them manually. For
127 /// development/testing/debugging purposes only. Use only if you really know what you
128 /// are doing.
129 pub fn set_raw(value: u64) -> Self {
130 Self(value)
131 }
132
133 /// Returns the next id in sequence (self + 1).
134 pub fn next(&self) -> Self {
135 Self(self.0 + 1)
136 }
137
138 /// Returns the inner id of the peer id.
139 pub fn inner(&self) -> u64 {
140 self.0
141 }
142}
143
144impl std::fmt::Display for PeerId {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 write!(f, "{}", self.0)
147 }
148}
149
150/// Command variants for the reactor to process.
151#[derive(Debug)]
152pub enum Command<M: Message> {
153 /// Connect to a remote host.
154 Connect(Target),
155 /// Disconnect from a peer.
156 Disconnect(PeerId),
157 /// Send a message to a peer.
158 Message(PeerId, M),
159}
160
161impl<M: Message> Command<M> {
162 /// Convenience function that converts a compatible argument into a connect [`Target`].
163 /// Works on types such as:
164 /// - [`SocketAddr`](std::net::SocketAddr)
165 /// - [`SocketAddrV4`](std::net::SocketAddrV4)
166 /// - [`SocketAddrV6`](std::net::SocketAddrV6)
167 /// - [`(Ipv4Addr, u16)`](std::net::Ipv4Addr) -- (address, port)
168 /// - [`(Ipv6Addr, u16)`](std::net::Ipv4Addr) -- (address, port)
169 /// - [`(String, u16)`] -- (domain, port)
170 /// - [`(&str, u16)`] -- (domain, port)
171 pub fn connect(target: impl Into<Target>) -> Self {
172 Self::Connect(target.into())
173 }
174}
175
176// Event variants produced by the reactor.
177#[derive(Debug)]
178pub enum Event<M: Message> {
179 /// The reactor attempted to connect to a remote peer.
180 ConnectedTo {
181 /// The remote host that was connected to. This is in the same format it was specified.
182 target: Target,
183 /// The result of the connection attempt. A peer id is returned if successful.
184 result: io::Result<PeerId>,
185 },
186 /// The reactor received a connection from a remote peer.
187 ConnectedFrom {
188 /// The peer associated with the event.
189 peer: PeerId,
190 /// The address of the remote peer.
191 addr: SocketAddr,
192 /// The address of the local interface that accepted the connection.
193 interface: SocketAddr,
194 },
195 /// A peer disconnected.
196 Disconnected {
197 /// The peer associated with the event.
198 peer: PeerId,
199 /// The reason the peer left.
200 reason: DisconnectReason,
201 },
202 /// A peer produced a message.
203 Message {
204 /// The peer associated with the event.
205 peer: PeerId,
206 /// The message received from the peer.
207 message: M,
208 /// The original wire size of the message before it was decoded.
209 size: usize,
210 },
211 /// No peer exists with the specified id. Sent when an operation was specified using a peer id
212 /// that is not present in the reactor.
213 NoPeer(PeerId),
214 /// The send buffer associated with the peer is full. It means the peer is probably not
215 /// reading data from the wire in a timely manner.
216 SendBufferFull {
217 /// The peer associated with the event.
218 peer: PeerId,
219 /// The message that could not be sent to the peer.
220 message: M,
221 },
222}
223
224/// Explains why a client connection was disconnected.
225#[derive(Debug)]
226pub enum DisconnectReason {
227 /// The reactor was asked to perform a disconnect.
228 Requested,
229 /// The peer left and the end of stream was reached.
230 Left,
231 /// The peer violated the protocol in some way, usually by sending a malformed message.
232 CodecViolation,
233 /// The write side is stale, i.e. the peer is not reading the data we are sending.
234 WriteStale,
235 /// An IO error occurred.
236 Error(io::Error),
237}