memberlist_plumtree/transport/
mod.rs

1//! Transport abstraction for Plumtree message delivery.
2//!
3//! This module provides the `Transport` trait that must be implemented
4//! to enable proper unicast message delivery for Plumtree protocol messages.
5//!
6//! # Important
7//!
8//! Plumtree requires **unicast** (point-to-point) message delivery for correct operation:
9//! - **Gossip**: Must be sent to specific eager peers
10//! - **IHave**: Must be sent to specific lazy peers
11//! - **Graft**: Must be sent to the peer that announced the message
12//! - **Prune**: Must be sent to the peer being demoted
13//!
14//! Using broadcast for these messages will break the protocol.
15//!
16//! # Available Transports
17//!
18//! - [`ChannelTransport`]: Channel-based transport for testing
19//! - [`NoopTransport`]: No-op transport that discards messages
20//! - [`QuicTransport`](quic::QuicTransport): QUIC-based transport (requires `quic` feature)
21
22use bytes::Bytes;
23use std::fmt::Debug;
24use std::future::Future;
25use std::hash::Hash;
26
27#[cfg(feature = "quic")]
28pub mod quic;
29
30/// Transport trait for sending Plumtree messages.
31///
32/// Implementations must provide unicast message delivery to specific peers.
33///
34/// # Example
35///
36/// ```ignore
37/// use memberlist_plumtree::Transport;
38///
39/// struct MyTransport {
40///     memberlist: Memberlist,
41/// }
42///
43/// impl<I: Clone + Send + Sync> Transport<I> for MyTransport {
44///     async fn send_to(&self, target: &I, data: Bytes) -> Result<(), TransportError> {
45///         // Send via memberlist's reliable/best-effort unicast
46///         self.memberlist.send_reliable(target, data).await
47///     }
48/// }
49/// ```
50#[auto_impl::auto_impl(Box, Arc)]
51pub trait Transport<I>: Send + Sync + 'static
52where
53    I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
54{
55    /// Error type for transport operations.
56    type Error: std::error::Error + Send + Sync + 'static;
57
58    /// Send a message to a specific peer (unicast).
59    ///
60    /// This MUST deliver the message to the specified target peer only.
61    /// Broadcasting to multiple peers or random nodes will break Plumtree.
62    fn send_to(
63        &self,
64        target: &I,
65        data: Bytes,
66    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
67}
68
69/// A simple channel-based transport that outputs (target, data) pairs.
70///
71/// Useful for testing or when you want to handle delivery externally.
72#[derive(Debug, Clone)]
73pub struct ChannelTransport<I> {
74    tx: async_channel::Sender<(I, Bytes)>,
75}
76
77impl<I> ChannelTransport<I> {
78    /// Create a new channel transport.
79    pub fn new(tx: async_channel::Sender<(I, Bytes)>) -> Self {
80        Self { tx }
81    }
82
83    /// Create a channel transport with a new bounded channel.
84    ///
85    /// Returns the transport and the receiver for (target, data) pairs.
86    pub fn bounded(capacity: usize) -> (Self, async_channel::Receiver<(I, Bytes)>) {
87        let (tx, rx) = async_channel::bounded(capacity);
88        (Self { tx }, rx)
89    }
90}
91
92/// Error type for channel transport.
93#[derive(Debug, Clone)]
94pub struct ChannelTransportError(pub String);
95
96impl std::fmt::Display for ChannelTransportError {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        write!(f, "channel transport error: {}", self.0)
99    }
100}
101
102impl std::error::Error for ChannelTransportError {}
103
104impl<I> Transport<I> for ChannelTransport<I>
105where
106    I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
107{
108    type Error = ChannelTransportError;
109
110    async fn send_to(&self, target: &I, data: Bytes) -> Result<(), Self::Error> {
111        self.tx
112            .send((target.clone(), data))
113            .await
114            .map_err(|e| ChannelTransportError(e.to_string()))
115    }
116}
117
118/// A no-op transport that discards all messages.
119///
120/// Useful for testing scenarios where message delivery doesn't matter.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct NoopTransport;
123
124impl<I> Transport<I> for NoopTransport
125where
126    I: Clone + Eq + Hash + Debug + Send + Sync + 'static,
127{
128    type Error = std::convert::Infallible;
129
130    async fn send_to(&self, _target: &I, _data: Bytes) -> Result<(), Self::Error> {
131        Ok(())
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[tokio::test]
140    async fn test_channel_transport() {
141        let (transport, rx) = ChannelTransport::<u64>::bounded(16);
142
143        transport
144            .send_to(&42u64, Bytes::from("hello"))
145            .await
146            .unwrap();
147
148        let (target, data) = rx.recv().await.unwrap();
149        assert_eq!(target, 42);
150        assert_eq!(data, Bytes::from("hello"));
151    }
152
153    #[tokio::test]
154    async fn test_noop_transport() {
155        let transport = NoopTransport;
156        transport
157            .send_to(&42u64, Bytes::from("hello"))
158            .await
159            .unwrap();
160    }
161}