pallas_miniprotocols/chainsync/
buffer.rs

1use std::collections::{vec_deque::Iter, VecDeque};
2
3use crate::Point;
4
5/// A memory buffer to handle chain rollbacks
6///
7/// This structure is intended to facilitate the process of managing rollbacks
8/// in a chain sync process. The goal is to keep points in memory until they
9/// reach a certain depth (# of confirmations). If a rollback happens, the
10/// buffer will try to find the intersection, clear the orphaned points and keep
11/// the remaining still in memory. Further forward rolls will accumulate from
12/// the intersection.
13///
14/// It works by keeping a `VecDeque` data structure of points, where
15/// roll-forward operations accumulate at the end of the deque and retrieving
16/// confirmed points means to pop from the front of the deque.
17///
18/// Notice that it works by keeping track of points, not blocks. It is meant to
19/// be used as a lightweight index where blocks can then be retrieved from a
20/// more suitable memory structure / persistent storage.
21#[derive(Debug)]
22pub struct RollbackBuffer {
23    points: VecDeque<Point>,
24}
25
26impl Default for RollbackBuffer {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32pub enum RollbackEffect {
33    Handled,
34    OutOfScope,
35}
36
37impl RollbackBuffer {
38    pub fn new() -> Self {
39        Self {
40            points: VecDeque::new(),
41        }
42    }
43
44    /// Adds a new point to the back of the buffer
45    pub fn roll_forward(&mut self, point: Point) {
46        self.points.push_back(point);
47    }
48
49    /// Retrieves all points above or equal a certain depth
50    pub fn pop_with_depth(&mut self, min_depth: usize) -> Vec<Point> {
51        match self.points.len().checked_sub(min_depth) {
52            Some(ready) => self.points.drain(0..ready).collect(),
53            None => vec![],
54        }
55    }
56
57    /// Find the position of a point within the buffer
58    pub fn position(&self, point: &Point) -> Option<usize> {
59        self.points.iter().position(|p| p.eq(point))
60    }
61
62    /// Iterates over the contents of the buffer
63    pub fn peek(&self) -> Iter<Point> {
64        self.points.iter()
65    }
66
67    /// Returns the size of the buffer (number of points)
68    pub fn size(&self) -> usize {
69        self.points.len()
70    }
71
72    /// Returns the newest point in the buffer
73    pub fn latest(&self) -> Option<&Point> {
74        self.points.back()
75    }
76
77    /// Returns the oldest point in the buffer
78    pub fn oldest(&self) -> Option<&Point> {
79        self.points.front()
80    }
81
82    /// Unwind the buffer up to a certain point, clearing orphaned items
83    ///
84    /// If the buffer contains the rollback point, we can safely discard from
85    /// the back and return Ok. If the rollback point is outside the scope of
86    /// the buffer, we clear the whole buffer and notify a failure
87    /// in the rollback process.
88    pub fn roll_back(&mut self, point: &Point) -> RollbackEffect {
89        if let Some(x) = self.position(point) {
90            self.points.truncate(x + 1);
91            RollbackEffect::Handled
92        } else {
93            self.points.clear();
94            RollbackEffect::OutOfScope
95        }
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use crate::{chainsync::RollbackEffect, Point};
102
103    use super::RollbackBuffer;
104
105    fn dummy_point(i: u64) -> Point {
106        Point::new(i, i.to_le_bytes().to_vec())
107    }
108
109    fn build_filled_buffer(n: usize) -> RollbackBuffer {
110        let mut buffer = RollbackBuffer::new();
111
112        for i in 0..n {
113            let point = dummy_point(i as u64);
114            buffer.roll_forward(point);
115        }
116
117        buffer
118    }
119
120    #[test]
121    fn roll_forward_accumulates_points() {
122        let buffer = build_filled_buffer(3);
123
124        assert!(matches!(buffer.position(&dummy_point(0)), Some(0)));
125        assert!(matches!(buffer.position(&dummy_point(1)), Some(1)));
126        assert!(matches!(buffer.position(&dummy_point(2)), Some(2)));
127
128        assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
129        assert_eq!(buffer.latest().unwrap(), &dummy_point(2));
130    }
131
132    #[test]
133    fn pop_from_valid_depth_works() {
134        let mut buffer = build_filled_buffer(5);
135
136        let ready = buffer.pop_with_depth(2);
137
138        assert_eq!(dummy_point(0), ready[0]);
139        assert_eq!(dummy_point(1), ready[1]);
140        assert_eq!(dummy_point(2), ready[2]);
141
142        assert_eq!(ready.len(), 3);
143
144        assert_eq!(buffer.oldest().unwrap(), &dummy_point(3));
145        assert_eq!(buffer.latest().unwrap(), &dummy_point(4));
146    }
147
148    #[test]
149    fn pop_from_excessive_depth_returns_empty() {
150        let mut buffer = build_filled_buffer(6);
151
152        let ready = buffer.pop_with_depth(10);
153
154        assert_eq!(ready.len(), 0);
155
156        assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
157        assert_eq!(buffer.latest().unwrap(), &dummy_point(5));
158    }
159
160    #[test]
161    fn roll_back_within_scope_works() {
162        let mut buffer = build_filled_buffer(6);
163
164        let result = buffer.roll_back(&dummy_point(2));
165
166        assert!(matches!(result, RollbackEffect::Handled));
167
168        assert_eq!(buffer.size(), 3);
169        assert_eq!(buffer.oldest().unwrap(), &dummy_point(0));
170        assert_eq!(buffer.latest().unwrap(), &dummy_point(2));
171
172        let remaining = buffer.pop_with_depth(0);
173
174        assert_eq!(dummy_point(0), remaining[0]);
175        assert_eq!(dummy_point(1), remaining[1]);
176        assert_eq!(dummy_point(2), remaining[2]);
177
178        assert_eq!(remaining.len(), 3);
179    }
180
181    #[test]
182    fn roll_back_outside_scope_works() {
183        let mut buffer = build_filled_buffer(6);
184
185        let result = buffer.roll_back(&dummy_point(100));
186
187        assert!(matches!(result, RollbackEffect::OutOfScope));
188
189        assert_eq!(buffer.size(), 0);
190        assert_eq!(buffer.oldest(), None);
191        assert_eq!(buffer.latest(), None);
192    }
193}