logicaffeine_data/crdt/
delta_buffer.rs1use super::causal::VClock;
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct DeltaBuffer<D> {
16 deltas: VecDeque<(VClock, D)>,
18 max_size: usize,
20 oldest_version: VClock,
22}
23
24impl<D: Clone> DeltaBuffer<D> {
25 pub fn new(max_size: usize) -> Self {
27 Self {
28 deltas: VecDeque::with_capacity(max_size),
29 max_size,
30 oldest_version: VClock::new(),
31 }
32 }
33
34 pub fn push(&mut self, version: VClock, delta: D) {
36 if self.deltas.len() >= self.max_size {
37 if let Some((old_version, _)) = self.deltas.pop_front() {
38 self.oldest_version = old_version;
39 }
40 }
41 self.deltas.push_back((version, delta));
42 }
43
44 pub fn deltas_since(&self, version: &VClock) -> Option<Vec<D>> {
49 if self.oldest_version != VClock::new() && self.oldest_version.dominates(version) {
52 return None;
53 }
54
55 let result: Vec<D> = self
57 .deltas
58 .iter()
59 .filter(|(v, _)| !version.dominates(v))
60 .map(|(_, d)| d.clone())
61 .collect();
62
63 Some(result)
64 }
65
66 pub fn can_serve(&self, version: &VClock) -> bool {
68 self.deltas_since(version).is_some()
69 }
70
71 pub fn len(&self) -> usize {
73 self.deltas.len()
74 }
75
76 pub fn is_empty(&self) -> bool {
78 self.deltas.is_empty()
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85
86 #[test]
87 fn test_buffer_new() {
88 let buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
89 assert!(buf.is_empty());
90 }
91
92 #[test]
93 fn test_buffer_push_and_retrieve() {
94 let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
95 let mut clock = VClock::new();
96 clock.increment(1);
97 buf.push(clock, 42);
98
99 let empty = VClock::new();
100 let deltas = buf.deltas_since(&empty).unwrap();
101 assert_eq!(deltas, vec![42]);
102 }
103
104 #[test]
105 fn test_buffer_multiple_deltas() {
106 let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
107 let mut clock = VClock::new();
108
109 clock.increment(1);
110 buf.push(clock.clone(), 1);
111 clock.increment(1);
112 buf.push(clock.clone(), 2);
113 clock.increment(1);
114 buf.push(clock.clone(), 3);
115
116 let empty = VClock::new();
117 let deltas = buf.deltas_since(&empty).unwrap();
118 assert_eq!(deltas, vec![1, 2, 3]);
119 }
120
121 #[test]
122 fn test_buffer_since_version() {
123 let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(10);
124 let mut clock = VClock::new();
125
126 clock.increment(1);
127 let v1 = clock.clone();
128 buf.push(clock.clone(), 1);
129
130 clock.increment(1);
131 buf.push(clock.clone(), 2);
132
133 clock.increment(1);
134 buf.push(clock.clone(), 3);
135
136 let deltas = buf.deltas_since(&v1).unwrap();
138 assert_eq!(deltas, vec![2, 3]);
139 }
140
141 #[test]
142 fn test_buffer_overflow() {
143 let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(2);
144 let mut clock = VClock::new();
145
146 clock.increment(1);
147 buf.push(clock.clone(), 1);
148 clock.increment(1);
149 buf.push(clock.clone(), 2);
150 clock.increment(1);
151 buf.push(clock.clone(), 3); let empty = VClock::new();
154 assert!(buf.deltas_since(&empty).is_none());
156 }
157
158 #[test]
159 fn test_buffer_overflow_partial() {
160 let mut buf: DeltaBuffer<i32> = DeltaBuffer::new(2);
161 let mut clock = VClock::new();
162
163 clock.increment(1);
164 let v1 = clock.clone();
165 buf.push(clock.clone(), 1);
166
167 clock.increment(1);
168 let v2 = clock.clone();
169 buf.push(clock.clone(), 2);
170
171 clock.increment(1);
172 buf.push(clock.clone(), 3); assert!(buf.deltas_since(&v1).is_none());
176
177 let deltas = buf.deltas_since(&v2).unwrap();
179 assert_eq!(deltas, vec![3]);
180 }
181}