commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! `commonware-p2p` is **ALPHA** software and is not yet recommended for production use. Developers should
6//! expect breaking changes and occasional instability.
7
8#![doc(
9 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
10 html_favicon_url = "https://commonware.xyz/favicon.ico"
11)]
12
13use bytes::Bytes;
14use commonware_cryptography::PublicKey;
15use commonware_utils::ordered::Set;
16use futures::channel::mpsc;
17use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
18
19pub mod authenticated;
20pub mod simulated;
21pub mod types;
22pub mod utils;
23
24pub use types::{Address, Ingress};
25
26/// Tuple representing a message received from a given public key.
27///
28/// This message is guaranteed to adhere to the configuration of the channel and
29/// will already be decrypted and authenticated.
30pub type Message<P> = (P, Bytes);
31
32/// Alias for identifying communication channels.
33pub type Channel = u64;
34
35/// Enum indicating the set of recipients to send a message to.
36#[derive(Clone, Debug)]
37pub enum Recipients<P: PublicKey> {
38 All,
39 Some(Vec<P>),
40 One(P),
41}
42
43/// Interface for sending messages to a set of recipients without rate-limiting restrictions.
44pub trait UnlimitedSender: Clone + Send + Sync + 'static {
45 /// Public key type used to identify recipients.
46 type PublicKey: PublicKey;
47
48 /// Error that can occur when sending a message.
49 type Error: Debug + StdError + Send + Sync;
50
51 /// Sends a message to a set of recipients.
52 ///
53 /// # Offline Recipients
54 ///
55 /// If a recipient is offline at the time a message is sent, the message
56 /// will be dropped. It is up to the application to handle retries (if
57 /// necessary).
58 ///
59 /// # Returns
60 ///
61 /// A vector of recipients that the message was sent to, or an error if the
62 /// message could not be sent (e.g., too large).
63 ///
64 /// Note: a successful send does not guarantee that the recipient will
65 /// receive the message.
66 fn send(
67 &mut self,
68 recipients: Recipients<Self::PublicKey>,
69 message: Bytes,
70 priority: bool,
71 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
72}
73
74/// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
75/// filtering out any that are currently rate-limited.
76pub trait LimitedSender: Clone + Send + Sync + 'static {
77 /// Public key type used to identify recipients.
78 type PublicKey: PublicKey;
79
80 /// The type of [`CheckedSender`] returned after checking recipients.
81 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
82 where
83 Self: 'a;
84
85 /// Checks which recipients are within their rate limit and returns a
86 /// [`CheckedSender`] for sending to them.
87 ///
88 /// # Rate Limiting
89 ///
90 /// Recipients that exceed their rate limit will be filtered out. The
91 /// returned [`CheckedSender`] will only send to non-limited recipients.
92 ///
93 /// # Returns
94 ///
95 /// A [`CheckedSender`] containing only the recipients that are not
96 /// currently rate-limited, or an error with the earliest instant at which
97 /// all recipients will be available if all are rate-limited.
98 fn check<'a>(
99 &'a mut self,
100 recipients: Recipients<Self::PublicKey>,
101 ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
102}
103
104/// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
105pub trait CheckedSender: Send {
106 /// Public key type used to identify [`Recipients`].
107 type PublicKey: PublicKey;
108
109 /// Error that can occur when sending a message.
110 type Error: Debug + StdError + Send + Sync;
111
112 /// Sends a message to the pre-checked recipients.
113 ///
114 /// # Offline Recipients
115 ///
116 /// If a recipient is offline at the time a message is sent, the message
117 /// will be dropped. It is up to the application to handle retries (if
118 /// necessary).
119 ///
120 /// # Returns
121 ///
122 /// A vector of recipients that the message was sent to, or an error if the
123 /// message could not be sent (e.g., too large).
124 ///
125 /// Note: a successful send does not guarantee that the recipient will
126 /// receive the message.
127 fn send(
128 self,
129 message: Bytes,
130 priority: bool,
131 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
132}
133
134/// Interface for sending messages to a set of recipients.
135pub trait Sender: LimitedSender {
136 /// Sends a message to a set of recipients.
137 ///
138 /// # Offline Recipients
139 ///
140 /// If a recipient is offline at the time a message is sent, the message
141 /// will be dropped. It is up to the application to handle retries (if
142 /// necessary).
143 ///
144 /// # Rate Limiting
145 ///
146 /// Recipients that exceed their rate limit will be skipped. The message is
147 /// still sent to non-limited recipients. Check the returned vector to see
148 /// which peers were sent the message.
149 ///
150 /// # Returns
151 ///
152 /// A vector of recipients that the message was sent to, or an error if the
153 /// message could not be sent (e.g., too large).
154 ///
155 /// Note: a successful send does not guarantee that the recipient will
156 /// receive the message.
157 fn send(
158 &mut self,
159 recipients: Recipients<Self::PublicKey>,
160 message: Bytes,
161 priority: bool,
162 ) -> impl Future<
163 Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
164 > + Send {
165 async move {
166 match self.check(recipients).await {
167 Ok(checked_sender) => checked_sender.send(message, priority).await,
168 Err(_) => Ok(Vec::new()),
169 }
170 }
171 }
172}
173
174// Blanket implementation of `Sender` for all `LimitedSender`s.
175impl<S: LimitedSender> Sender for S {}
176
177/// Interface for receiving messages from arbitrary recipients.
178pub trait Receiver: Debug + Send + 'static {
179 /// Error that can occur when receiving a message.
180 type Error: Debug + StdError + Send + Sync;
181
182 /// Public key type used to identify recipients.
183 type PublicKey: PublicKey;
184
185 /// Receive a message from an arbitrary recipient.
186 fn recv(
187 &mut self,
188 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
189}
190
191/// Interface for registering new peer sets as well as fetching an ordered list of connected peers, given a set id.
192pub trait Manager: Debug + Clone + Send + 'static {
193 /// Public key type used to identify peers.
194 type PublicKey: PublicKey;
195
196 /// The type for the peer set in registration.
197 type Peers;
198
199 /// Update the peer set.
200 ///
201 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
202 /// of the consensus engine. It must be monotonically increasing as new peer sets are registered.
203 fn update(&mut self, id: u64, peers: Self::Peers) -> impl Future<Output = ()> + Send;
204
205 /// Fetch the ordered set of peers for a given ID.
206 fn peer_set(&mut self, id: u64) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
207
208 /// Subscribe to notifications when new peer sets are added.
209 ///
210 /// Returns a receiver that will receive the peer set ID whenever a new peer set
211 /// is registered via `update`.
212 #[allow(clippy::type_complexity)]
213 fn subscribe(
214 &mut self,
215 ) -> impl Future<
216 Output = mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)>,
217 > + Send;
218}
219
220/// Interface for blocking other peers.
221pub trait Blocker: Clone + Send + 'static {
222 /// Public key type used to identify peers.
223 type PublicKey: PublicKey;
224
225 /// Block a peer, disconnecting them if currently connected and preventing future connections.
226 fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
227}