Skip to main content

pallas_network2/
lib.rs

1//! A new take on the Ouroboros networking stack that prioritises P2P
2//! operation over the *client / server* shape used by `pallas-network`.
3//!
4//! The public API is split between an [`Interface`] (where IO happens) and a
5//! [`Behavior`] (the business logic), reconciled by a [`Manager`] — a layout
6//! inspired by libp2p's swarm.
7//!
8//! Once this crate is thoroughly tested and adopted by downstream clients,
9//! `network2` is intended to replace the original `pallas-network`.
10//!
11//! # Usage
12//!
13//! A typical setup pairs a transport (an [`Interface`] impl, e.g. the
14//! TCP-backed [`interface::TcpInterface`]) with a protocol (a [`Behavior`]
15//! impl, e.g. the node-to-node [`behavior::InitiatorBehavior`]) and drives
16//! both through a [`Manager`]. The manager polls the interface for IO
17//! events, hands them to the behavior, and pushes any commands the
18//! behavior emits back at the interface — leaving you to consume the
19//! behavior's external events.
20//!
21//! ```ignore
22//! use pallas_network2::{
23//!     behavior::{AnyMessage, InitiatorBehavior, InitiatorCommand, InitiatorEvent},
24//!     interface::TcpInterface,
25//!     Manager, PeerId,
26//! };
27//!
28//! let interface = TcpInterface::<AnyMessage>::new();
29//! let behavior  = InitiatorBehavior::default();
30//!
31//! let mut manager = Manager::new(interface, behavior);
32//!
33//! manager.execute(InitiatorCommand::IncludePeer(
34//!     "relays-new.cardano-mainnet.iohk.io:3001".parse::<PeerId>().unwrap(),
35//! ));
36//!
37//! while let Some(event) = manager.poll_next().await {
38//!     match event {
39//!         InitiatorEvent::PeerInitialized(pid, _) => println!("up: {pid}"),
40//!         InitiatorEvent::BlockHeaderReceived(pid, header, _) => {
41//!             println!("hdr from {pid}: {} bytes", header.cbor.len());
42//!         }
43//!         _ => {}
44//!     }
45//! }
46//! ```
47//!
48//! # Overview
49//!
50//! - [`Manager`] — drives a paired [`Interface`] + [`Behavior`].
51//!   [`Manager::poll_next`] advances IO and the behavior;
52//!   [`Manager::execute`] forwards an external command to the behavior.
53//! - [`Interface`] trait — the IO side. Receives [`InterfaceCommand`]
54//!   (`Connect` / `Send` / `Disconnect`) and yields [`InterfaceEvent`]
55//!   (`Connected` / `Disconnected` / `Sent` / `Recv` / `Error` / `Idle`).
56//! - [`Behavior`] trait — the protocol logic. Defines its own `Event`,
57//!   `Command`, `PeerState`, and `Message`, and emits [`BehaviorOutput`]s.
58//! - [`Message`] trait — describes a mini-protocol message (channel id +
59//!   payload encoding).
60//! - [`OutboundQueue`] — convenience queue of pending [`BehaviorOutput`]s
61//!   ready to be polled by the manager.
62//! - [`PeerId`], [`Channel`], [`Payload`], [`MAX_SEGMENT_PAYLOAD_LENGTH`] —
63//!   the primitive vocabulary.
64//!
65//! ## Modules
66//!
67//! - [`bearer`] — low-level transport for reading and writing multiplexed
68//!   segments.
69//! - [`interface`] — [`Interface`] implementations for TCP connections.
70//! - [`behavior`] — opinionated [`Behavior`] implementations for Cardano
71//!   stacks.
72//! - [`protocol`] — the Ouroboros mini-protocol definitions (handshake,
73//!   chainsync, blockfetch, …).
74//!
75//! # Feature flags
76//!
77//! - `emulation` — enables the `emulation` module, an in-memory test
78//!   harness for exercising behaviors without real network IO.
79//!
80//! # Usage as part of `pallas`
81//!
82//! When depending on the umbrella [`pallas`] crate (with the `network2`
83//! feature), this crate is re-exported as `pallas::network2`.
84//!
85//! [`pallas`]: https://crates.io/crates/pallas
86
87use std::{fmt::Debug, pin::Pin};
88
89use futures::{
90    Stream, StreamExt, select,
91    stream::{FusedStream, FuturesUnordered},
92};
93
94#[cfg(feature = "emulation")]
95pub mod emulation;
96
97/// Low-level transport layer for reading and writing multiplexed segments.
98pub mod bearer;
99/// Opinionated behavior implementations for Cardano network stacks.
100pub mod behavior;
101/// Network interface implementations for TCP connections.
102pub mod interface;
103/// Ouroboros mini-protocol definitions (handshake, chainsync, blockfetch, etc.).
104pub mod protocol;
105
106/// A unique identifier for a peer in the network
107#[derive(Debug, Eq, PartialEq, Hash, Clone)]
108pub struct PeerId {
109    /// The hostname or IP address of the peer.
110    pub host: String,
111    /// The TCP port of the peer.
112    pub port: u16,
113}
114
115impl std::fmt::Display for PeerId {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        write!(f, "{}:{}", self.host, self.port)
118    }
119}
120
121impl std::str::FromStr for PeerId {
122    type Err = String;
123
124    fn from_str(s: &str) -> Result<Self, Self::Err> {
125        let (host, port) = s.split_once(':').ok_or("invalid peer id")?;
126        Ok(PeerId {
127            host: host.to_string(),
128            port: port.parse().unwrap(),
129        })
130    }
131}
132
133/// An error that occurred within the network interface.
134#[derive(Debug)]
135pub enum InterfaceError {
136    // TODO: add more specific errors
137    /// A generic error with a human-readable description.
138    Other(String),
139}
140
141/// A multiplexer channel identifier for a mini-protocol.
142pub type Channel = u16;
143
144/// Raw bytes of a mini-protocol message payload.
145pub type Payload = Vec<u8>;
146
147/// Protocol value that defines max segment length
148pub const MAX_SEGMENT_PAYLOAD_LENGTH: usize = 65535;
149
150/// Describes a message that can be sent over the network
151pub trait Message: Send + 'static + std::fmt::Debug + Sized + Clone + Debug {
152    /// Returns the channel identifier for this message's mini-protocol.
153    fn channel(&self) -> Channel;
154    /// Encodes this message into its raw payload bytes.
155    fn payload(&self) -> Payload;
156
157    /// Try to decode a message from a payload.
158    ///
159    /// This method should use a best-effort approach to decode a message from
160    /// the payload. Implementors need to take into account that payload might
161    /// be partial, in this case should return none and wait for a new call with
162    /// more data.
163    ///
164    /// Whatever payload is successfully consumed during the parsing, should be
165    /// drained from the variable, leaving the remaining data available for a
166    /// next call which will be used in the next attempt.
167    fn from_payload(channel: Channel, payload: &mut Payload) -> Option<Self>;
168
169    /// Converts this message into its channel and raw payload bytes.
170    fn into_payload(self) -> (Channel, Payload);
171
172    /// Converts this message into its channel and a list of payload chunks,
173    /// each respecting [`MAX_SEGMENT_PAYLOAD_LENGTH`].
174    fn into_chunks(self) -> (Channel, Vec<Payload>) {
175        let (channel, payload) = self.into_payload();
176
177        let chunks = payload
178            .chunks(MAX_SEGMENT_PAYLOAD_LENGTH)
179            .map(Vec::from)
180            .collect();
181
182        (channel, chunks)
183    }
184}
185
186/// A low-level command to interact with the network interface
187#[derive(Debug)]
188pub enum InterfaceCommand<M: Message> {
189    /// Initiate a connection to the given peer.
190    Connect(PeerId),
191    /// Send a message to an already-connected peer.
192    Send(PeerId, M),
193    /// Disconnect from the given peer.
194    Disconnect(PeerId),
195}
196
197/// A low-level event from the network interface
198#[derive(Debug)]
199pub enum InterfaceEvent<M: Message> {
200    /// A connection to the peer was successfully established.
201    Connected(PeerId),
202    /// The peer has been disconnected.
203    Disconnected(PeerId),
204    /// A message was successfully sent to the peer.
205    Sent(PeerId, M),
206    /// One or more messages were received from the peer.
207    Recv(PeerId, Vec<M>),
208    /// An error occurred on the connection to the peer.
209    Error(PeerId, InterfaceError),
210    /// No pending IO activity; useful for triggering housekeeping.
211    Idle,
212}
213
214/// Output produced by a [`Behavior`], either a command for the interface or an
215/// event for the external consumer.
216#[derive(Debug)]
217pub enum BehaviorOutput<B: Behavior> {
218    /// A command to be dispatched to the network interface.
219    InterfaceCommand(InterfaceCommand<B::Message>),
220    /// An event to be surfaced to the caller.
221    ExternalEvent(B::Event),
222}
223
224impl<B: Behavior> From<InterfaceCommand<B::Message>> for BehaviorOutput<B> {
225    fn from(cmd: InterfaceCommand<B::Message>) -> Self {
226        BehaviorOutput::InterfaceCommand(cmd)
227    }
228}
229
230/// An abstraction over the network interface where IO happens
231#[trait_variant::make]
232pub trait Interface<M: Message>: Unpin + FusedStream + Stream<Item = InterfaceEvent<M>> {
233    /// Dispatch a command to the interface (connect, send, or disconnect).
234    fn dispatch(&mut self, cmd: InterfaceCommand<M>);
235}
236
237/// Describes the behavior (business logic) of a network stack
238#[trait_variant::make]
239pub trait Behavior:
240    Sized + Unpin + FusedStream + Stream<Item = BehaviorOutput<Self>> + Send + 'static
241{
242    /// The event type that is raised by the behavior
243    type Event: Debug + Send + 'static;
244
245    /// The command type that can be handled by the behavior
246    type Command;
247
248    /// The state type of a peer in the network
249    type PeerState: Default;
250
251    /// The message type that is sent over the network
252    type Message: Message + Debug + Send + 'static;
253
254    /// Apply an IO event to the behavior
255    ///
256    /// This is the hook where a behavior can apply an event coming from the
257    /// network interface.
258    ///
259    /// The behavior is responsible for updating the state of the peer to
260    /// reflect the what has been received from the network interface.
261    fn handle_io(&mut self, event: InterfaceEvent<Self::Message>);
262
263    /// Execute an external command on the behavior.
264    fn execute(&mut self, cmd: Self::Command);
265}
266
267/// Manager to reconcile state between a network interface and a behavior
268pub struct Manager<I, B, M>
269where
270    M: Message,
271    I: Interface<M>,
272    B: Behavior<Message = M>,
273{
274    interface: I,
275    behavior: B,
276}
277
278impl<I, B, M> Manager<I, B, M>
279where
280    M: Message,
281    I: Interface<M>,
282    B: Behavior<Message = M>,
283{
284    /// Creates a new manager from an interface and a behavior.
285    pub fn new(interface: I, behavior: B) -> Self {
286        Self {
287            interface,
288            behavior,
289        }
290    }
291
292    /// Polls the interface and behavior, returning the next external event if
293    /// available. Interface commands produced by the behavior are dispatched
294    /// automatically.
295    pub async fn poll_next(&mut self) -> Option<B::Event> {
296        let Self {
297            behavior,
298            interface,
299            ..
300        } = self;
301
302        select! {
303            output = behavior.select_next_some() => {
304                match output {
305                    BehaviorOutput::InterfaceCommand(cmd) => {
306                        self.interface.dispatch(cmd);
307                        None
308                    }
309                    BehaviorOutput::ExternalEvent(event) => {
310                        Some(event)
311                    }
312                }
313            },
314            event = interface.select_next_some() => {
315                self.behavior.handle_io(event);
316                None
317            }
318        }
319    }
320
321    /// Forwards an external command to the underlying behavior.
322    pub fn execute(&mut self, cmd: B::Command) {
323        self.behavior.execute(cmd);
324    }
325}
326
327/// A queue of pending [`BehaviorOutput`] items ready to be polled by the
328/// manager.
329pub struct OutboundQueue<B: Behavior> {
330    futures: FuturesUnordered<Pin<Box<dyn Future<Output = BehaviorOutput<B>> + Send + Unpin>>>,
331}
332
333impl<B: Behavior> OutboundQueue<B> {
334    /// Creates an empty outbound queue.
335    pub fn new() -> Self {
336        Self {
337            futures: FuturesUnordered::new(),
338        }
339    }
340
341    /// Enqueues an output that is immediately ready.
342    pub fn push_ready(&mut self, output: impl Into<BehaviorOutput<B>>) {
343        self.futures
344            .push(Box::pin(futures::future::ready(output.into())));
345    }
346
347    /// Polls for the next available output.
348    pub async fn poll_next(&mut self) -> Option<BehaviorOutput<B>> {
349        futures::stream::StreamExt::next(&mut self.futures).await
350    }
351}
352
353impl<B: Behavior> Default for OutboundQueue<B> {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359#[cfg(test)]
360mod testing;