paxakos/
buffer.rs

1//! Defines the applied log entry [`Buffer`] trait.
2//!
3//! When a node is temporarily shut down or becomes unreachable, it needs to be
4//! catch back up with the rest of the cluster. One way to do that is to poll
5//! other nodes for entries that have been committed in the meantime. These
6//! other nodes will in turn query their applied entry buffer for these entries.
7//!
8//! A node may fall so far behind that no other node can catch it up through its
9//! `Buffer`. In such a case the node must ask another node to create a snapshot
10//! and catch up that way. It should be noted that the cluster will likely
11//! progress while the snapshot is taken, transferred and installed. That is to
12//! say that `Buffer`s are indispensable and should have a reasonable minimal
13//! capacity.
14//!
15//! The default implementation is [`InMemoryBuffer`].
16use std::collections::VecDeque;
17use std::convert::Infallible;
18use std::sync::Arc;
19
20use crate::CoordNum;
21use crate::LogEntry;
22use crate::RoundNum;
23
24/// A buffer of entries that were applied to the local state.
25// TODO provide an (optional) persistent implementation
26pub trait Buffer: Send + 'static {
27    /// The round number type.
28    type RoundNum: RoundNum;
29    /// The coordination number type.
30    type CoordNum: CoordNum;
31    /// The log entry type.
32    type Entry: LogEntry;
33    /// The type of error this buffer may raise.
34    type Error: std::error::Error + Send + Sync + 'static;
35
36    /// Insert an applied entry into the buffer.
37    fn insert(
38        &mut self,
39        round_num: Self::RoundNum,
40        coord_num: Self::CoordNum,
41        entry: Arc<Self::Entry>,
42    );
43
44    /// Retrieve an entry from the buffer.
45    ///
46    /// May be called for round numbers that were not previously inserted.
47    #[allow(clippy::type_complexity)]
48    fn get(
49        &self,
50        round_num: Self::RoundNum,
51    ) -> Result<Option<(Self::CoordNum, Arc<Self::Entry>)>, Self::Error>;
52}
53
54/// An in-memory buffer.
55pub struct InMemoryBuffer<R, C, E: LogEntry> {
56    capacity: usize,
57    buffer: VecDeque<(R, C, Arc<E>)>,
58}
59
60impl<R, C, E: LogEntry> InMemoryBuffer<R, C, E> {
61    /// Creates a new in-memory buffer with the given capacity.
62    pub fn new(capacity: usize) -> Self {
63        assert!(capacity > 0);
64
65        Self {
66            capacity,
67            buffer: VecDeque::new(),
68        }
69    }
70}
71
72impl<R, C, E> Buffer for InMemoryBuffer<R, C, E>
73where
74    R: RoundNum,
75    C: CoordNum,
76    E: LogEntry,
77{
78    type RoundNum = R;
79    type CoordNum = C;
80    type Entry = E;
81    type Error = Infallible;
82
83    fn insert(
84        &mut self,
85        round_num: Self::RoundNum,
86        coord_num: Self::CoordNum,
87        entry: Arc<Self::Entry>,
88    ) {
89        if self.capacity <= self.buffer.len() {
90            self.buffer.pop_front();
91        }
92
93        self.buffer.push_back((round_num, coord_num, entry));
94    }
95
96    #[allow(clippy::type_complexity)]
97    fn get(
98        &self,
99        round_num: Self::RoundNum,
100    ) -> Result<Option<(Self::CoordNum, Arc<Self::Entry>)>, Self::Error> {
101        // hot path
102        if let Some((r, _, _)) = self.buffer.get(0) {
103            if round_num >= *r {
104                if let Some(delta) = crate::util::try_usize_delta(round_num, *r) {
105                    if let Some((r, c, e)) = self.buffer.get(delta) {
106                        if *r == round_num {
107                            return Ok(Some((*c, Arc::clone(e))));
108                        }
109                    }
110                }
111            }
112        }
113
114        // cold path
115        Ok(self
116            .buffer
117            .iter()
118            .find(|(r, _, _)| *r == round_num)
119            .map(|(_, c, e)| (*c, Arc::clone(e))))
120    }
121}