Skip to main content

raft_io/
transport.rs

1//! The message-delivery seam and an in-memory implementation.
2//!
3//! The protocol core does not send anything itself — it emits
4//! [`Action::Send`](crate::Action::Send), and a driver delivers the message.
5//! [`RaftTransport`] is the trait that driver implements. Splitting "decide to
6//! send" from "actually send" is what keeps the core deterministic and free of
7//! networking: the same election can be replayed in a unit test with a transport
8//! that just records messages, then run in production over TCP, with no change
9//! to the protocol.
10//!
11//! [`MemoryTransport`] is the recording implementation used by the test harness
12//! and examples.
13
14use crate::error::Result;
15use crate::message::Message;
16use crate::types::NodeId;
17
18/// Delivers protocol messages to peers.
19///
20/// A driver loop takes each [`Action::Send`](crate::Action::Send) a node emits
21/// and calls [`send`](RaftTransport::send). How delivery happens — an in-process
22/// queue, a channel, a socket — is entirely the implementor's concern; the
23/// protocol only requires that a message handed to `send` is eventually
24/// delivered to the target node's [`step`](crate::RaftNode::step) (Raft already
25/// tolerates loss, reordering, and duplication, so "eventually, maybe" is a
26/// sufficient contract).
27///
28/// # Examples
29///
30/// ```
31/// use raft_io::{MemoryTransport, RaftTransport, Message, RequestVote};
32///
33/// let mut tx = MemoryTransport::new();
34/// tx.send(2, Message::RequestVote(RequestVote {
35///     term: 1, candidate: 1, last_log_index: 0, last_log_term: 0, force: false,
36/// })).unwrap();
37/// assert_eq!(tx.take().len(), 1);
38/// ```
39pub trait RaftTransport {
40    /// Delivers `message` to node `to`.
41    ///
42    /// # Errors
43    ///
44    /// Returns an [`Error`](crate::Error) if the transport cannot accept the
45    /// message. Note that Raft treats the network as unreliable regardless, so
46    /// a delivery that is dropped after being accepted is not an error.
47    fn send(&mut self, to: NodeId, message: Message) -> Result<()>;
48}
49
50/// An in-memory [`RaftTransport`] that records outgoing messages.
51///
52/// Instead of delivering anywhere, it appends each message to an outbox that a
53/// test harness drains with [`take`](MemoryTransport::take) and routes to the
54/// destination node by hand. This makes message ordering, loss, and partitions
55/// something the test controls precisely.
56///
57/// # Examples
58///
59/// ```
60/// use raft_io::{MemoryTransport, RaftTransport, Message, AppendEntries};
61///
62/// let mut tx = MemoryTransport::new();
63/// tx.send(2, Message::AppendEntries(AppendEntries {
64///     term: 1, leader: 1, prev_log_index: 0, prev_log_term: 0,
65///     entries: Vec::new(), leader_commit: 0,
66/// })).unwrap();
67///
68/// let pending = tx.take();
69/// assert_eq!(pending[0].0, 2);          // destination
70/// assert!(tx.take().is_empty());        // draining leaves it empty
71/// ```
72#[derive(Clone, Debug, Default)]
73pub struct MemoryTransport {
74    outbox: Vec<(NodeId, Message)>,
75}
76
77impl MemoryTransport {
78    /// Creates an empty transport.
79    #[must_use]
80    pub fn new() -> Self {
81        Self::default()
82    }
83
84    /// Removes and returns every queued `(destination, message)` pair.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use raft_io::MemoryTransport;
90    ///
91    /// let mut tx = MemoryTransport::new();
92    /// assert!(tx.take().is_empty());
93    /// ```
94    #[must_use]
95    pub fn take(&mut self) -> Vec<(NodeId, Message)> {
96        core::mem::take(&mut self.outbox)
97    }
98
99    /// Returns the number of queued messages without draining them.
100    #[inline]
101    #[must_use]
102    pub fn pending(&self) -> usize {
103        self.outbox.len()
104    }
105}
106
107impl RaftTransport for MemoryTransport {
108    #[inline]
109    fn send(&mut self, to: NodeId, message: Message) -> Result<()> {
110        self.outbox.push((to, message));
111        Ok(())
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    #![allow(clippy::unwrap_used, clippy::expect_used)]
118
119    use super::*;
120    use crate::message::RequestVoteReply;
121
122    fn reply(from: NodeId) -> Message {
123        Message::RequestVoteReply(RequestVoteReply {
124            term: 1,
125            vote_granted: true,
126            from,
127        })
128    }
129
130    #[test]
131    fn test_send_queues_in_order() {
132        let mut tx = MemoryTransport::new();
133        tx.send(2, reply(1)).unwrap();
134        tx.send(3, reply(1)).unwrap();
135        assert_eq!(tx.pending(), 2);
136        let drained = tx.take();
137        assert_eq!(drained[0].0, 2);
138        assert_eq!(drained[1].0, 3);
139    }
140
141    #[test]
142    fn test_take_drains() {
143        let mut tx = MemoryTransport::new();
144        tx.send(2, reply(1)).unwrap();
145        let _ = tx.take();
146        assert_eq!(tx.pending(), 0);
147        assert!(tx.take().is_empty());
148    }
149}