pallas_network2/lib.rs
1//! A new take on the Ouroboros networking stack that prioritises P2P
2//! operation over the *client / server* shape used by `pallas-network`.
3//!
4//! The public API is split between an [`Interface`] (where IO happens) and a
5//! [`Behavior`] (the business logic), reconciled by a [`Manager`] — a layout
6//! inspired by libp2p's swarm.
7//!
8//! Once this crate is thoroughly tested and adopted by downstream clients,
9//! `network2` is intended to replace the original `pallas-network`.
10//!
11//! # Usage
12//!
13//! A typical setup pairs a transport (an [`Interface`] impl, e.g. the
14//! TCP-backed [`interface::TcpInterface`]) with a protocol (a [`Behavior`]
15//! impl, e.g. the node-to-node [`behavior::InitiatorBehavior`]) and drives
16//! both through a [`Manager`]. The manager polls the interface for IO
17//! events, hands them to the behavior, and pushes any commands the
18//! behavior emits back at the interface — leaving you to consume the
19//! behavior's external events.
20//!
21//! ```ignore
22//! use pallas_network2::{
23//! behavior::{AnyMessage, InitiatorBehavior, InitiatorCommand, InitiatorEvent},
24//! interface::TcpInterface,
25//! Manager, PeerId,
26//! };
27//!
28//! let interface = TcpInterface::<AnyMessage>::new();
29//! let behavior = InitiatorBehavior::default();
30//!
31//! let mut manager = Manager::new(interface, behavior);
32//!
33//! manager.execute(InitiatorCommand::IncludePeer(
34//! "relays-new.cardano-mainnet.iohk.io:3001".parse::<PeerId>().unwrap(),
35//! ));
36//!
37//! while let Some(event) = manager.poll_next().await {
38//! match event {
39//! InitiatorEvent::PeerInitialized(pid, _) => println!("up: {pid}"),
40//! InitiatorEvent::BlockHeaderReceived(pid, header, _) => {
41//! println!("hdr from {pid}: {} bytes", header.cbor.len());
42//! }
43//! _ => {}
44//! }
45//! }
46//! ```
47//!
48//! # Overview
49//!
50//! - [`Manager`] — drives a paired [`Interface`] + [`Behavior`].
51//! [`Manager::poll_next`] advances IO and the behavior;
52//! [`Manager::execute`] forwards an external command to the behavior.
53//! - [`Interface`] trait — the IO side. Receives [`InterfaceCommand`]
54//! (`Connect` / `Send` / `Disconnect`) and yields [`InterfaceEvent`]
55//! (`Connected` / `Disconnected` / `Sent` / `Recv` / `Error` / `Idle`).
56//! - [`Behavior`] trait — the protocol logic. Defines its own `Event`,
57//! `Command`, `PeerState`, and `Message`, and emits [`BehaviorOutput`]s.
58//! - [`Message`] trait — describes a mini-protocol message (channel id +
59//! payload encoding).
60//! - [`OutboundQueue`] — convenience queue of pending [`BehaviorOutput`]s
61//! ready to be polled by the manager.
62//! - [`PeerId`], [`Channel`], [`Payload`], [`MAX_SEGMENT_PAYLOAD_LENGTH`] —
63//! the primitive vocabulary.
64//!
65//! ## Modules
66//!
67//! - [`bearer`] — low-level transport for reading and writing multiplexed
68//! segments.
69//! - [`interface`] — [`Interface`] implementations for TCP connections.
70//! - [`behavior`] — opinionated [`Behavior`] implementations for Cardano
71//! stacks.
72//! - [`protocol`] — the Ouroboros mini-protocol definitions (handshake,
73//! chainsync, blockfetch, …).
74//!
75//! # Feature flags
76//!
77//! - `emulation` — enables the `emulation` module, an in-memory test
78//! harness for exercising behaviors without real network IO.
79//!
80//! # Usage as part of `pallas`
81//!
82//! When depending on the umbrella [`pallas`] crate (with the `network2`
83//! feature), this crate is re-exported as `pallas::network2`.
84//!
85//! [`pallas`]: https://crates.io/crates/pallas
86
87use std::{fmt::Debug, pin::Pin};
88
89use futures::{
90 Stream, StreamExt, select,
91 stream::{FusedStream, FuturesUnordered},
92};
93
94#[cfg(feature = "emulation")]
95pub mod emulation;
96
97/// Low-level transport layer for reading and writing multiplexed segments.
98pub mod bearer;
99/// Opinionated behavior implementations for Cardano network stacks.
100pub mod behavior;
101/// Network interface implementations for TCP connections.
102pub mod interface;
103/// Ouroboros mini-protocol definitions (handshake, chainsync, blockfetch, etc.).
104pub mod protocol;
105
106/// A unique identifier for a peer in the network
107#[derive(Debug, Eq, PartialEq, Hash, Clone)]
108pub struct PeerId {
109 /// The hostname or IP address of the peer.
110 pub host: String,
111 /// The TCP port of the peer.
112 pub port: u16,
113}
114
115impl std::fmt::Display for PeerId {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}:{}", self.host, self.port)
118 }
119}
120
121impl std::str::FromStr for PeerId {
122 type Err = String;
123
124 fn from_str(s: &str) -> Result<Self, Self::Err> {
125 let (host, port) = s.split_once(':').ok_or("invalid peer id")?;
126 Ok(PeerId {
127 host: host.to_string(),
128 port: port.parse().unwrap(),
129 })
130 }
131}
132
133/// An error that occurred within the network interface.
134#[derive(Debug)]
135pub enum InterfaceError {
136 // TODO: add more specific errors
137 /// A generic error with a human-readable description.
138 Other(String),
139}
140
141/// A multiplexer channel identifier for a mini-protocol.
142pub type Channel = u16;
143
144/// Raw bytes of a mini-protocol message payload.
145pub type Payload = Vec<u8>;
146
147/// Protocol value that defines max segment length
148pub const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
149
150/// Describes a message that can be sent over the network
151pub trait Message: Send + 'static + std::fmt::Debug + Sized + Clone + Debug {
152 /// Returns the channel identifier for this message's mini-protocol.
153 fn channel(&self) -> Channel;
154 /// Encodes this message into its raw payload bytes.
155 fn payload(&self) -> Payload;
156
157 /// Try to decode a message from a payload.
158 ///
159 /// This method should use a best-effort approach to decode a message from
160 /// the payload. Implementors need to take into account that payload might
161 /// be partial, in this case should return none and wait for a new call with
162 /// more data.
163 ///
164 /// Whatever payload is successfully consumed during the parsing, should be
165 /// drained from the variable, leaving the remaining data available for a
166 /// next call which will be used in the next attempt.
167 fn from_payload(channel: Channel, payload: &mut Payload) -> Option<Self>;
168
169 /// Converts this message into its channel and raw payload bytes.
170 fn into_payload(self) -> (Channel, Payload);
171
172 /// Converts this message into its channel and a list of payload chunks,
173 /// each respecting [`MAX_SEGMENT_PAYLOAD_LENGTH`].
174 fn into_chunks(self) -> (Channel, Vec<Payload>) {
175 let (channel, payload) = self.into_payload();
176
177 let chunks = payload
178 .chunks(MAX_SEGMENT_PAYLOAD_LENGTH)
179 .map(Vec::from)
180 .collect();
181
182 (channel, chunks)
183 }
184}
185
186/// A low-level command to interact with the network interface
187#[derive(Debug)]
188pub enum InterfaceCommand<M: Message> {
189 /// Initiate a connection to the given peer.
190 Connect(PeerId),
191 /// Send a message to an already-connected peer.
192 Send(PeerId, M),
193 /// Disconnect from the given peer.
194 Disconnect(PeerId),
195}
196
197/// A low-level event from the network interface
198#[derive(Debug)]
199pub enum InterfaceEvent<M: Message> {
200 /// A connection to the peer was successfully established.
201 Connected(PeerId),
202 /// The peer has been disconnected.
203 Disconnected(PeerId),
204 /// A message was successfully sent to the peer.
205 Sent(PeerId, M),
206 /// One or more messages were received from the peer.
207 Recv(PeerId, Vec<M>),
208 /// An error occurred on the connection to the peer.
209 Error(PeerId, InterfaceError),
210 /// No pending IO activity; useful for triggering housekeeping.
211 Idle,
212}
213
214/// Output produced by a [`Behavior`], either a command for the interface or an
215/// event for the external consumer.
216#[derive(Debug)]
217pub enum BehaviorOutput<B: Behavior> {
218 /// A command to be dispatched to the network interface.
219 InterfaceCommand(InterfaceCommand<B::Message>),
220 /// An event to be surfaced to the caller.
221 ExternalEvent(B::Event),
222}
223
224impl<B: Behavior> From<InterfaceCommand<B::Message>> for BehaviorOutput<B> {
225 fn from(cmd: InterfaceCommand<B::Message>) -> Self {
226 BehaviorOutput::InterfaceCommand(cmd)
227 }
228}
229
230/// An abstraction over the network interface where IO happens
231#[trait_variant::make]
232pub trait Interface<M: Message>: Unpin + FusedStream + Stream<Item = InterfaceEvent<M>> {
233 /// Dispatch a command to the interface (connect, send, or disconnect).
234 fn dispatch(&mut self, cmd: InterfaceCommand<M>);
235}
236
237/// Describes the behavior (business logic) of a network stack
238#[trait_variant::make]
239pub trait Behavior:
240 Sized + Unpin + FusedStream + Stream<Item = BehaviorOutput<Self>> + Send + 'static
241{
242 /// The event type that is raised by the behavior
243 type Event: Debug + Send + 'static;
244
245 /// The command type that can be handled by the behavior
246 type Command;
247
248 /// The state type of a peer in the network
249 type PeerState: Default;
250
251 /// The message type that is sent over the network
252 type Message: Message + Debug + Send + 'static;
253
254 /// Apply an IO event to the behavior
255 ///
256 /// This is the hook where a behavior can apply an event coming from the
257 /// network interface.
258 ///
259 /// The behavior is responsible for updating the state of the peer to
260 /// reflect the what has been received from the network interface.
261 fn handle_io(&mut self, event: InterfaceEvent<Self::Message>);
262
263 /// Execute an external command on the behavior.
264 fn execute(&mut self, cmd: Self::Command);
265}
266
267/// Manager to reconcile state between a network interface and a behavior
268pub struct Manager<I, B, M>
269where
270 M: Message,
271 I: Interface<M>,
272 B: Behavior<Message = M>,
273{
274 interface: I,
275 behavior: B,
276}
277
278impl<I, B, M> Manager<I, B, M>
279where
280 M: Message,
281 I: Interface<M>,
282 B: Behavior<Message = M>,
283{
284 /// Creates a new manager from an interface and a behavior.
285 pub fn new(interface: I, behavior: B) -> Self {
286 Self {
287 interface,
288 behavior,
289 }
290 }
291
292 /// Polls the interface and behavior, returning the next external event if
293 /// available. Interface commands produced by the behavior are dispatched
294 /// automatically.
295 pub async fn poll_next(&mut self) -> Option<B::Event> {
296 let Self {
297 behavior,
298 interface,
299 ..
300 } = self;
301
302 select! {
303 output = behavior.select_next_some() => {
304 match output {
305 BehaviorOutput::InterfaceCommand(cmd) => {
306 self.interface.dispatch(cmd);
307 None
308 }
309 BehaviorOutput::ExternalEvent(event) => {
310 Some(event)
311 }
312 }
313 },
314 event = interface.select_next_some() => {
315 self.behavior.handle_io(event);
316 None
317 }
318 }
319 }
320
321 /// Forwards an external command to the underlying behavior.
322 pub fn execute(&mut self, cmd: B::Command) {
323 self.behavior.execute(cmd);
324 }
325}
326
327/// A queue of pending [`BehaviorOutput`] items ready to be polled by the
328/// manager.
329pub struct OutboundQueue<B: Behavior> {
330 futures: FuturesUnordered<Pin<Box<dyn Future<Output = BehaviorOutput<B>> + Send + Unpin>>>,
331}
332
333impl<B: Behavior> OutboundQueue<B> {
334 /// Creates an empty outbound queue.
335 pub fn new() -> Self {
336 Self {
337 futures: FuturesUnordered::new(),
338 }
339 }
340
341 /// Enqueues an output that is immediately ready.
342 pub fn push_ready(&mut self, output: impl Into<BehaviorOutput<B>>) {
343 self.futures
344 .push(Box::pin(futures::future::ready(output.into())));
345 }
346
347 /// Polls for the next available output.
348 pub async fn poll_next(&mut self) -> Option<BehaviorOutput<B>> {
349 futures::stream::StreamExt::next(&mut self.futures).await
350 }
351}
352
353impl<B: Behavior> Default for OutboundQueue<B> {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359#[cfg(test)]
360mod testing;