peerlink/
lib.rs

1//! # Peer-to-peer networking reactor
2//!
3//! Peerlink is a low-level building block for P2P applications. It uses a nonblocking reactor to
4//! accept inbound connections, make outbound connections, do message streaming and reassembly,
5//! track peers and perform other low-level operations. It entirely abstracts away menial
6//! networking plumbing such as managing TCP sockets and reading bytes off the wire. In other
7//! words, it provides the consumer with a simple interface to talking with other nodes in a P2P
8//! network.
9//!
10//! See the included example for usage.
11
12mod message_stream;
13mod reactor;
14
15pub mod connector;
16
17use std::{io, net::SocketAddr, num::NonZeroUsize};
18
19use crate::connector::Target;
20
21pub use message_stream::StreamConfig;
22pub use reactor::{Handle, run, run_with_connector};
23
24#[cfg(feature = "socks")]
25pub use reactor::run_with_socks5_proxy;
26
27#[cfg(not(feature = "async"))]
28pub use crossbeam_channel;
29
30#[cfg(feature = "async")]
31pub use async_channel;
32
33/// Configuration parameters for the reactor.
34#[derive(Debug, Clone)]
35pub struct Config {
36    /// The list of socket addresses where the reactor listens for inbound connections.
37    pub bind_addr: Vec<std::net::SocketAddr>,
38
39    /// Configuration parameters for individual peer connections. This allows the fine tuning of
40    /// internal buffer sizes etc.
41    pub stream_config: StreamConfig,
42
43    /// The size of the shared receive buffer, i.e. the max number of bytes that can be read in one
44    /// receive operation. Setting this too low can cause many reads to happen, whereas too high a
45    /// figure will use up more memory and open up your application to DoS attacks. The default is
46    /// 1 MB.
47    ///
48    /// This figure is capped by [`Message::MAX_SIZE`] since there is no need to ever take in more
49    /// data in one read than the biggest message requires to decode.
50    pub receive_buffer_size: usize,
51
52    /// Whether the reactor should perform backpressure control on the receive side. Setting this
53    /// to `Some(n)` means that the reactor will start blocking on sending events to the consumer
54    /// when the receive channel of size `n` is full and events are not being read. Setting it to
55    /// `None` means that the capacity of the event channel is unbounded and the reactor will send
56    /// events to the consumer as fast as it can, regardless of whether those events are being read
57    /// (at all). The default is no backpressure control (`None`).
58    pub receive_channel_size: Option<NonZeroUsize>,
59}
60
61impl Default for Config {
62    fn default() -> Self {
63        Self {
64            bind_addr: Default::default(),
65            stream_config: Default::default(),
66            receive_buffer_size: 1024 * 1024,
67            receive_channel_size: None,
68        }
69    }
70}
71
72/// A trait that network messages processed by the reactor must implement.
73pub trait Message: std::fmt::Debug + Sized + Send + Sync + 'static {
74    /// The size of the largest expected message. It is important to set this correctly because an
75    /// incorrect value will interfere with inbound backpressure control and the ability to decode
76    /// large messages. This is also crucial for DoS protection (resource exhaustion attacks).
77    const MAX_SIZE: usize;
78
79    /// Encodes a message into a writer. This is an in-memory sink that never panics so there is no
80    /// need to handle the error path.
81    ///
82    /// Returns the number of encoded bytes.
83    fn encode(&self, sink: &mut impl std::io::Write) -> usize;
84
85    /// Provides access to the underlying read buffer. The buffer may contain any number of
86    /// messages, including no messages at all or only a partial message. If there are enough bytes
87    /// available to decode a message, the function must return an `Ok` with the decoded message and
88    /// the number of bytes it consumed.
89    ///
90    /// If there is not enough data to decode a message (i.e. it is available only partially),
91    /// `Err(DecodeError::NotEnoughData)` must be returned. That signals that decoding should be
92    /// retried when more data comes in. If the message cannot be decoded at all, or exceeds size
93    /// limits or otherwise represents junk data, `Err(DecodeError::MalformedMessage)` must be
94    /// returned. Such peers are disconnected as protocol violators.
95    fn decode(buffer: &[u8]) -> Result<(Self, usize), DecodeError>;
96
97    /// If a message has a known size ahead of encoding, that value can be set here. This is useful
98    /// for outbound backpressure control, so that a message is not preemptively encoded and placed
99    /// into the send buffer only to be realized that the size of the send buffer will be exceeding
100    /// its maximum. Getting this wrong can interfere with outbound backpressure control, so if the
101    /// value is not certain, it is better not to override the method.
102    fn size_hint(&self) -> Option<usize> {
103        None
104    }
105}
106
107/// Possible reasons why a message could not be decoded at a particular time.
108pub enum DecodeError {
109    /// There is not enough data available to reconstruct a message. This does not indicate an
110    /// irrecoverable problem, it just means that not enough data has been taken of the wire yet
111    /// and that the operation should be retried once more data comes in.
112    NotEnoughData,
113    /// The message is malformed in some way. Once this is encountered, the peer that sent it
114    /// is disconnected.
115    MalformedMessage,
116}
117
118/// Unique peer identifier. These are unique for the lifetime of the process and strictly
119/// incrementing for each new connection. Even if the same peer (in terms of socket address)
120/// connects multiple times, a new `PeerId` instance will be issued for each connection.
121#[derive(Debug, Clone, Hash, Copy, PartialEq, Eq, PartialOrd, Ord)]
122pub struct PeerId(u64);
123
124impl PeerId {
125    /// DANGER: allows the user to set peer ids directly. Normally these are assigned by the
126    /// reactor and the consumer of the library should not be creating them manually. For
127    /// development/testing/debugging purposes only. Use only if you really know what you
128    /// are doing.
129    pub fn set_raw(value: u64) -> Self {
130        Self(value)
131    }
132
133    /// Returns the next id in sequence (self + 1).
134    pub fn next(&self) -> Self {
135        Self(self.0 + 1)
136    }
137
138    /// Returns the inner id of the peer id.
139    pub fn inner(&self) -> u64 {
140        self.0
141    }
142}
143
144impl std::fmt::Display for PeerId {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        write!(f, "{}", self.0)
147    }
148}
149
150/// Command variants for the reactor to process.
151#[derive(Debug)]
152pub enum Command<M: Message> {
153    /// Connect to a remote host.
154    Connect(Target),
155    /// Disconnect from a peer.
156    Disconnect(PeerId),
157    /// Send a message to a peer.
158    Message(PeerId, M),
159}
160
161impl<M: Message> Command<M> {
162    /// Convenience function that converts a compatible argument into a connect [`Target`].
163    /// Works on types such as:
164    ///   - [`SocketAddr`](std::net::SocketAddr)
165    ///   - [`SocketAddrV4`](std::net::SocketAddrV4)
166    ///   - [`SocketAddrV6`](std::net::SocketAddrV6)
167    ///   - [`(Ipv4Addr, u16)`](std::net::Ipv4Addr) -- (address, port)
168    ///   - [`(Ipv6Addr, u16)`](std::net::Ipv4Addr) -- (address, port)
169    ///   - [`(String, u16)`] -- (domain, port)
170    ///   - [`(&str, u16)`] -- (domain, port)
171    pub fn connect(target: impl Into<Target>) -> Self {
172        Self::Connect(target.into())
173    }
174}
175
176// Event variants produced by the reactor.
177#[derive(Debug)]
178pub enum Event<M: Message> {
179    /// The reactor attempted to connect to a remote peer.
180    ConnectedTo {
181        /// The remote host that was connected to. This is in the same format it was specified.
182        target: Target,
183        /// The result of the connection attempt. A peer id is returned if successful.
184        result: io::Result<PeerId>,
185    },
186    /// The reactor received a connection from a remote peer.
187    ConnectedFrom {
188        /// The peer associated with the event.
189        peer: PeerId,
190        /// The address of the remote peer.
191        addr: SocketAddr,
192        /// The address of the local interface that accepted the connection.
193        interface: SocketAddr,
194    },
195    /// A peer disconnected.
196    Disconnected {
197        /// The peer associated with the event.
198        peer: PeerId,
199        /// The reason the peer left.
200        reason: DisconnectReason,
201    },
202    /// A peer produced a message.
203    Message {
204        /// The peer associated with the event.
205        peer: PeerId,
206        /// The message received from the peer.
207        message: M,
208        /// The original wire size of the message before it was decoded.
209        size: usize,
210    },
211    /// No peer exists with the specified id. Sent when an operation was specified using a peer id
212    /// that is not present in the reactor.
213    NoPeer(PeerId),
214    /// The send buffer associated with the peer is full. It means the peer is probably not
215    /// reading data from the wire in a timely manner.
216    SendBufferFull {
217        /// The peer associated with the event.
218        peer: PeerId,
219        /// The message that could not be sent to the peer.
220        message: M,
221    },
222}
223
224/// Explains why a client connection was disconnected.
225#[derive(Debug)]
226pub enum DisconnectReason {
227    /// The reactor was asked to perform a disconnect.
228    Requested,
229    /// The peer left and the end of stream was reached.
230    Left,
231    /// The peer violated the protocol in some way, usually by sending a malformed message.
232    CodecViolation,
233    /// The write side is stale, i.e. the peer is not reading the data we are sending.
234    WriteStale,
235    /// An IO error occurred.
236    Error(io::Error),
237}