Skip to main content

logicaffeine_data/crdt/
delta_buffer.rs

1//! Delta Buffer - Ring buffer for recent deltas.
2//!
3//! Stores recent deltas for efficient sync with late joiners.
4
5use super::causal::VClock;
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8
9/// A ring buffer that stores recent deltas along with their versions.
10///
11/// When a peer requests sync, we can provide deltas since their last known
12/// version. If the gap is too large (oldest delta evicted), the peer needs
13/// a full state transfer instead.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct DeltaBuffer<D> {
16    /// Ring buffer of (version, delta) pairs
17    deltas: VecDeque<(VClock, D)>,
18    /// Maximum number of deltas to retain
19    max_size: usize,
20    /// The oldest version we can provide deltas from
21    oldest_version: VClock,
22}
23
24impl<D: Clone> DeltaBuffer<D> {
25    /// Create a new buffer with the given capacity.
26    pub fn new(max_size: usize) -> Self {
27        Self {
28            deltas: VecDeque::with_capacity(max_size),
29            max_size,
30            oldest_version: VClock::new(),
31        }
32    }
33
34    /// Push a new delta with its associated version.
35    pub fn push(&mut self, version: VClock, delta: D) {
36        if self.deltas.len() >= self.max_size {
37            if let Some((old_version, _)) = self.deltas.pop_front() {
38                self.oldest_version = old_version;
39            }
40        }
41        self.deltas.push_back((version, delta));
42    }
43
44    /// Get all deltas since the given version.
45    ///
46    /// Returns `None` if the version is older than or equal to our oldest evicted delta.
47    /// Returns `Some(vec![])` if the peer is up-to-date.
48    pub fn deltas_since(&self, version: &VClock) -> Option<Vec<D>> {
49        // If we've evicted deltas and the peer is at or behind what we evicted,
50        // we can't help them catch up (they need the evicted delta).
51        if self.oldest_version != VClock::new() && self.oldest_version.dominates(version) {
52            return None;
53        }
54
55        // Return deltas that the peer hasn't seen
56        let result: Vec<D> = self
57            .deltas
58            .iter()
59            .filter(|(v, _)| !version.dominates(v))
60            .map(|(_, d)| d.clone())
61            .collect();
62
63        Some(result)
64    }
65
66    /// Check if we can provide deltas since the given version.
67    pub fn can_serve(&self, version: &VClock) -> bool {
68        self.deltas_since(version).is_some()
69    }
70
71    /// Get the number of deltas currently stored.
72    pub fn len(&self) -> usize {
73        self.deltas.len()
74    }
75
76    /// Check if the buffer is empty.
77    pub fn is_empty(&self) -> bool {
78        self.deltas.is_empty()
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[test]
87    fn test_buffer_new() {
88        let buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
89        assert!(buf.is_empty());
90    }
91
92    #[test]
93    fn test_buffer_push_and_retrieve() {
94        let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
95        let mut clock = VClock::new();
96        clock.increment(1);
97        buf.push(clock, 42);
98
99        let empty = VClock::new();
100        let deltas = buf.deltas_since(&empty).unwrap();
101        assert_eq!(deltas, vec![42]);
102    }
103
104    #[test]
105    fn test_buffer_multiple_deltas() {
106        let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
107        let mut clock = VClock::new();
108
109        clock.increment(1);
110        buf.push(clock.clone(), 1);
111        clock.increment(1);
112        buf.push(clock.clone(), 2);
113        clock.increment(1);
114        buf.push(clock.clone(), 3);
115
116        let empty = VClock::new();
117        let deltas = buf.deltas_since(&empty).unwrap();
118        assert_eq!(deltas, vec![1, 2, 3]);
119    }
120
121    #[test]
122    fn test_buffer_since_version() {
123        let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
124        let mut clock = VClock::new();
125
126        clock.increment(1);
127        let v1 = clock.clone();
128        buf.push(clock.clone(), 1);
129
130        clock.increment(1);
131        buf.push(clock.clone(), 2);
132
133        clock.increment(1);
134        buf.push(clock.clone(), 3);
135
136        // Get deltas since v1 - should only get 2 and 3
137        let deltas = buf.deltas_since(&v1).unwrap();
138        assert_eq!(deltas, vec![2, 3]);
139    }
140
141    #[test]
142    fn test_buffer_overflow() {
143        let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(2);
144        let mut clock = VClock::new();
145
146        clock.increment(1);
147        buf.push(clock.clone(), 1);
148        clock.increment(1);
149        buf.push(clock.clone(), 2);
150        clock.increment(1);
151        buf.push(clock.clone(), 3); // Evicts 1
152
153        let empty = VClock::new();
154        // Gap too large - oldest delta we have is after empty
155        assert!(buf.deltas_since(&empty).is_none());
156    }
157
158    #[test]
159    fn test_buffer_overflow_partial() {
160        let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(2);
161        let mut clock = VClock::new();
162
163        clock.increment(1);
164        let v1 = clock.clone();
165        buf.push(clock.clone(), 1);
166
167        clock.increment(1);
168        let v2 = clock.clone();
169        buf.push(clock.clone(), 2);
170
171        clock.increment(1);
172        buf.push(clock.clone(), 3); // Evicts 1
173
174        // v1 is too old - can't provide deltas
175        assert!(buf.deltas_since(&v1).is_none());
176
177        // v2 is still in buffer - can provide 3
178        let deltas = buf.deltas_since(&v2).unwrap();
179        assert_eq!(deltas, vec![3]);
180    }
181}