memberlist_plumtree/transport/mod.rs
1//! Transport abstraction for Plumtree message delivery.
2//!
3//! This module provides the `Transport` trait that must be implemented
4//! to enable proper unicast message delivery for Plumtree protocol messages.
5//!
6//! # Important
7//!
8//! Plumtree requires **unicast** (point-to-point) message delivery for correct operation:
9//! - **Gossip**: Must be sent to specific eager peers
10//! - **IHave**: Must be sent to specific lazy peers
11//! - **Graft**: Must be sent to the peer that announced the message
12//! - **Prune**: Must be sent to the peer being demoted
13//!
14//! Using broadcast for these messages will break the protocol.
15//!
16//! # Available Transports
17//!
18//! - [`ChannelTransport`]: Channel-based transport for testing
19//! - [`NoopTransport`]: No-op transport that discards messages
20//! - [`QuicTransport`](quic::QuicTransport): QUIC-based transport (requires `quic` feature)
21
22use bytes::Bytes;
23use std::fmt::Debug;
24use std::future::Future;
25use std::hash::Hash;
26
27#[cfg(feature = "quic")]
28pub mod quic;
29
30/// Transport trait for sending Plumtree messages.
31///
32/// Implementations must provide unicast message delivery to specific peers.
33///
34/// # Example
35///
36/// ```ignore
37/// use memberlist_plumtree::Transport;
38///
39/// struct MyTransport {
40/// memberlist: Memberlist,
41/// }
42///
43/// impl<I: Clone + Send + Sync> Transport<I> for MyTransport {
44/// async fn send_to(&self, target: &I, data: Bytes) -> Result<(), TransportError> {
45/// // Send via memberlist's reliable/best-effort unicast
46/// self.memberlist.send_reliable(target, data).await
47/// }
48/// }
49/// ```
50#[auto_impl::auto_impl(Box, Arc)]
51pub trait Transport<I>: Send + Sync + 'static
52where
53 I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
54{
55 /// Error type for transport operations.
56 type Error: std::error::Error + Send + Sync + 'static;
57
58 /// Send a message to a specific peer (unicast).
59 ///
60 /// This MUST deliver the message to the specified target peer only.
61 /// Broadcasting to multiple peers or random nodes will break Plumtree.
62 fn send_to(
63 &self,
64 target: &I,
65 data: Bytes,
66 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
67}
68
69/// A simple channel-based transport that outputs (target, data) pairs.
70///
71/// Useful for testing or when you want to handle delivery externally.
72#[derive(Debug, Clone)]
73pub struct ChannelTransport<I> {
74 tx: async_channel::Sender<(I, Bytes)>,
75}
76
77impl<I> ChannelTransport<I> {
78 /// Create a new channel transport.
79 pub fn new(tx: async_channel::Sender<(I, Bytes)>) -> Self {
80 Self { tx }
81 }
82
83 /// Create a channel transport with a new bounded channel.
84 ///
85 /// Returns the transport and the receiver for (target, data) pairs.
86 pub fn bounded(capacity: usize) -> (Self, async_channel::Receiver<(I, Bytes)>) {
87 let (tx, rx) = async_channel::bounded(capacity);
88 (Self { tx }, rx)
89 }
90}
91
92/// Error type for channel transport.
93#[derive(Debug, Clone)]
94pub struct ChannelTransportError(pub String);
95
96impl std::fmt::Display for ChannelTransportError {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 write!(f, "channel transport error: {}", self.0)
99 }
100}
101
102impl std::error::Error for ChannelTransportError {}
103
104impl<I> Transport<I> for ChannelTransport<I>
105where
106 I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
107{
108 type Error = ChannelTransportError;
109
110 async fn send_to(&self, target: &I, data: Bytes) -> Result<(), Self::Error> {
111 self.tx
112 .send((target.clone(), data))
113 .await
114 .map_err(|e| ChannelTransportError(e.to_string()))
115 }
116}
117
118/// A no-op transport that discards all messages.
119///
120/// Useful for testing scenarios where message delivery doesn't matter.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct NoopTransport;
123
124impl<I> Transport<I> for NoopTransport
125where
126 I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
127{
128 type Error = std::convert::Infallible;
129
130 async fn send_to(&self, _target: &I, _data: Bytes) -> Result<(), Self::Error> {
131 Ok(())
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[tokio::test]
140 async fn test_channel_transport() {
141 let (transport, rx) = ChannelTransport::<u64>::bounded(16);
142
143 transport
144 .send_to(&42u64, Bytes::from("hello"))
145 .await
146 .unwrap();
147
148 let (target, data) = rx.recv().await.unwrap();
149 assert_eq!(target, 42);
150 assert_eq!(data, Bytes::from("hello"));
151 }
152
153 #[tokio::test]
154 async fn test_noop_transport() {
155 let transport = NoopTransport;
156 transport
157 .send_to(&42u64, Bytes::from("hello"))
158 .await
159 .unwrap();
160 }
161}