nodedb_array/sync/
op_log.rs1use std::collections::BTreeMap;
11use std::sync::Mutex;
12
13use crate::error::{ArrayError, ArrayResult};
14use crate::sync::hlc::Hlc;
15use crate::sync::op::ArrayOp;
16
17pub type OpIter<'a> = Box<dyn Iterator<Item = ArrayResult<ArrayOp>> + 'a>;
19
20#[allow(clippy::len_without_is_empty)]
26pub trait OpLog: Send + Sync {
27 fn append(&self, op: &ArrayOp) -> ArrayResult<()>;
32
33 fn scan_from<'a>(&'a self, from: Hlc) -> ArrayResult<OpIter<'a>>;
35
36 fn scan_range<'a>(&'a self, array: &str, from: Hlc, to: Hlc) -> ArrayResult<OpIter<'a>>;
38
39 fn len(&self) -> ArrayResult<u64>;
41
42 fn drop_below(&self, hlc: Hlc) -> ArrayResult<u64>;
46}
47
48type OpMap = BTreeMap<(String, [u8; 18]), ArrayOp>;
51
52pub struct InMemoryOpLog {
57 ops: Mutex<OpMap>,
58}
59
60impl InMemoryOpLog {
61 pub fn new() -> Self {
63 Self {
64 ops: Mutex::new(BTreeMap::new()),
65 }
66 }
67
68 fn lock(&self) -> ArrayResult<std::sync::MutexGuard<'_, OpMap>> {
69 self.ops.lock().map_err(|_| ArrayError::HlcLockPoisoned)
70 }
71}
72
73impl Default for InMemoryOpLog {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl OpLog for InMemoryOpLog {
80 fn append(&self, op: &ArrayOp) -> ArrayResult<()> {
81 let key = (op.header.array.clone(), op.header.hlc.to_bytes());
82 self.lock()?.entry(key).or_insert_with(|| op.clone());
83 Ok(())
84 }
85
86 fn scan_from<'a>(&'a self, from: Hlc) -> ArrayResult<OpIter<'a>> {
87 let guard = self.lock()?;
88 let results: Vec<ArrayOp> = guard
89 .iter()
90 .filter(|((_, hlc_bytes), _)| {
91 let hlc = Hlc::from_bytes(hlc_bytes);
92 hlc >= from
93 })
94 .map(|(_, op)| op.clone())
95 .collect();
96 Ok(Box::new(results.into_iter().map(Ok)))
97 }
98
99 fn scan_range<'a>(&'a self, array: &str, from: Hlc, to: Hlc) -> ArrayResult<OpIter<'a>> {
100 let guard = self.lock()?;
101 let results: Vec<ArrayOp> = guard
102 .iter()
103 .filter(|((arr, hlc_bytes), _)| {
104 if arr != array {
105 return false;
106 }
107 let hlc = Hlc::from_bytes(hlc_bytes);
108 hlc >= from && hlc <= to
109 })
110 .map(|(_, op)| op.clone())
111 .collect();
112 Ok(Box::new(results.into_iter().map(Ok)))
113 }
114
115 fn len(&self) -> ArrayResult<u64> {
116 Ok(self.lock()?.len() as u64)
117 }
118
119 fn drop_below(&self, hlc: Hlc) -> ArrayResult<u64> {
120 let mut guard = self.lock()?;
121 let before = guard.len() as u64;
122 guard.retain(|(_, hlc_bytes), _| Hlc::from_bytes(hlc_bytes) >= hlc);
123 let after = guard.len() as u64;
124 Ok(before - after)
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use crate::sync::hlc::Hlc;
132 use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
133 use crate::sync::replica_id::ReplicaId;
134 use crate::types::cell_value::value::CellValue;
135 use crate::types::coord::value::CoordValue;
136
137 fn replica() -> ReplicaId {
138 ReplicaId::new(1)
139 }
140
141 fn hlc(ms: u64, logical: u16) -> Hlc {
142 Hlc::new(ms, logical, replica()).unwrap()
143 }
144
145 fn make_op(array: &str, ms: u64, logical: u16) -> ArrayOp {
146 ArrayOp {
147 header: ArrayOpHeader {
148 array: array.into(),
149 hlc: hlc(ms, logical),
150 schema_hlc: hlc(1, 0),
151 valid_from_ms: 0,
152 valid_until_ms: -1,
153 system_from_ms: ms as i64,
154 },
155 kind: ArrayOpKind::Put,
156 coord: vec![CoordValue::Int64(ms as i64)],
157 attrs: Some(vec![CellValue::Null]),
158 }
159 }
160
161 #[test]
162 fn append_then_scan() {
163 let log = InMemoryOpLog::new();
164 log.append(&make_op("arr", 10, 0)).unwrap();
165 log.append(&make_op("arr", 20, 0)).unwrap();
166 assert_eq!(log.len().unwrap(), 2);
167
168 let ops: Vec<_> = log
169 .scan_from(Hlc::ZERO)
170 .unwrap()
171 .map(|r| r.unwrap())
172 .collect();
173 assert_eq!(ops.len(), 2);
174 }
175
176 #[test]
177 fn scan_from_skips_below() {
178 let log = InMemoryOpLog::new();
179 for ms in [10, 20, 30, 40] {
180 log.append(&make_op("arr", ms, 0)).unwrap();
181 }
182
183 let ops: Vec<_> = log
184 .scan_from(hlc(25, 0))
185 .unwrap()
186 .map(|r| r.unwrap())
187 .collect();
188 assert_eq!(ops.len(), 2);
190 assert!(ops.iter().all(|op| op.header.hlc.physical_ms >= 25));
191 }
192
193 #[test]
194 fn drop_below_drops_correctly() {
195 let log = InMemoryOpLog::new();
196 for ms in [10, 20, 30] {
197 log.append(&make_op("arr", ms, 0)).unwrap();
198 }
199 let dropped = log.drop_below(hlc(20, 0)).unwrap();
200 assert_eq!(dropped, 1); assert_eq!(log.len().unwrap(), 2);
202 }
203
204 #[test]
205 fn scan_range_filters_array() {
206 let log = InMemoryOpLog::new();
207 log.append(&make_op("a", 10, 0)).unwrap();
208 log.append(&make_op("b", 20, 0)).unwrap();
209 log.append(&make_op("a", 30, 0)).unwrap();
210
211 let ops: Vec<_> = log
212 .scan_range("a", Hlc::ZERO, hlc(u64::MAX >> 16, 0))
213 .unwrap()
214 .map(|r| r.unwrap())
215 .collect();
216 assert_eq!(ops.len(), 2);
217 assert!(ops.iter().all(|op| op.header.array == "a"));
218 }
219
220 #[test]
221 fn len_counts_all() {
222 let log = InMemoryOpLog::new();
223 assert_eq!(log.len().unwrap(), 0);
224 log.append(&make_op("x", 1, 0)).unwrap();
225 log.append(&make_op("y", 2, 0)).unwrap();
226 assert_eq!(log.len().unwrap(), 2);
227 log.append(&make_op("x", 1, 0)).unwrap();
229 assert_eq!(log.len().unwrap(), 2);
230 }
231}