Skip to main content

nodedb_array/sync/
op_log.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Append-only operation log abstraction for array CRDT sync.
4//!
5//! [`OpLog`] is a storage-agnostic trait. Concrete implementations live in
6//! `nodedb-lite` (redb-backed) and `nodedb` (WAL-backed); [`InMemoryOpLog`]
7//! is a `BTreeMap`-backed implementation suitable for tests in any crate
8//! that depends on `nodedb-array`.
9
10use std::collections::BTreeMap;
11use std::sync::Mutex;
12
13use crate::error::{ArrayError, ArrayResult};
14use crate::sync::hlc::Hlc;
15use crate::sync::op::ArrayOp;
16
17/// Boxed fallible iterator over array ops, returned by [`OpLog`] scans.
18pub type OpIter<'a> = Box<dyn Iterator<Item = ArrayResult<ArrayOp>> + 'a>;
19
20/// Storage-agnostic interface for an append-only array operation log.
21///
22/// Implementations are expected to be `Send + Sync` and durable. An
23/// in-memory implementation suitable for tests is provided by
24/// [`InMemoryOpLog`] below.
25#[allow(clippy::len_without_is_empty)]
26pub trait OpLog: Send + Sync {
27    /// Append an operation to the log.
28    ///
29    /// Must be idempotent: re-appending an op with the same `(array, hlc)`
30    /// is a no-op.
31    fn append(&self, op: &ArrayOp) -> ArrayResult<()>;
32
33    /// Iterate all ops with `hlc >= from`, in HLC order, across all arrays.
34    fn scan_from<'a>(&'a self, from: Hlc) -> ArrayResult<OpIter<'a>>;
35
36    /// Iterate ops for `array` with `from <= hlc <= to`, in HLC order.
37    fn scan_range<'a>(&'a self, array: &str, from: Hlc, to: Hlc) -> ArrayResult<OpIter<'a>>;
38
39    /// Return the total number of ops in the log (across all arrays).
40    fn len(&self) -> ArrayResult<u64>;
41
42    /// Drop all ops whose `hlc < hlc` and return the count dropped.
43    ///
44    /// Used by GC after snapshotting ops below the min-ack frontier.
45    fn drop_below(&self, hlc: Hlc) -> ArrayResult<u64>;
46}
47
48// ─── In-memory implementation ───────────────────────────────────────────────
49
50type OpMap = BTreeMap<(String, [u8; 18]), ArrayOp>;
51
52/// `BTreeMap`-backed [`OpLog`] implementation.
53///
54/// Key: `(array_name, hlc_bytes)` so iteration is in HLC order per array.
55/// Operations with the same key (same array + same HLC) are idempotent.
56pub struct InMemoryOpLog {
57    ops: Mutex<OpMap>,
58}
59
60impl InMemoryOpLog {
61    /// Create a new empty log.
62    pub fn new() -> Self {
63        Self {
64            ops: Mutex::new(BTreeMap::new()),
65        }
66    }
67
68    fn lock(&self) -> ArrayResult<std::sync::MutexGuard<'_, OpMap>> {
69        self.ops.lock().map_err(|_| ArrayError::HlcLockPoisoned)
70    }
71}
72
73impl Default for InMemoryOpLog {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl OpLog for InMemoryOpLog {
80    fn append(&self, op: &ArrayOp) -> ArrayResult<()> {
81        let key = (op.header.array.clone(), op.header.hlc.to_bytes());
82        self.lock()?.entry(key).or_insert_with(|| op.clone());
83        Ok(())
84    }
85
86    fn scan_from<'a>(&'a self, from: Hlc) -> ArrayResult<OpIter<'a>> {
87        let guard = self.lock()?;
88        let results: Vec<ArrayOp> = guard
89            .iter()
90            .filter(|((_, hlc_bytes), _)| {
91                let hlc = Hlc::from_bytes(hlc_bytes);
92                hlc >= from
93            })
94            .map(|(_, op)| op.clone())
95            .collect();
96        Ok(Box::new(results.into_iter().map(Ok)))
97    }
98
99    fn scan_range<'a>(&'a self, array: &str, from: Hlc, to: Hlc) -> ArrayResult<OpIter<'a>> {
100        let guard = self.lock()?;
101        let results: Vec<ArrayOp> = guard
102            .iter()
103            .filter(|((arr, hlc_bytes), _)| {
104                if arr != array {
105                    return false;
106                }
107                let hlc = Hlc::from_bytes(hlc_bytes);
108                hlc >= from && hlc <= to
109            })
110            .map(|(_, op)| op.clone())
111            .collect();
112        Ok(Box::new(results.into_iter().map(Ok)))
113    }
114
115    fn len(&self) -> ArrayResult<u64> {
116        Ok(self.lock()?.len() as u64)
117    }
118
119    fn drop_below(&self, hlc: Hlc) -> ArrayResult<u64> {
120        let mut guard = self.lock()?;
121        let before = guard.len() as u64;
122        guard.retain(|(_, hlc_bytes), _| Hlc::from_bytes(hlc_bytes) >= hlc);
123        let after = guard.len() as u64;
124        Ok(before - after)
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use crate::sync::hlc::Hlc;
132    use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
133    use crate::sync::replica_id::ReplicaId;
134    use crate::types::cell_value::value::CellValue;
135    use crate::types::coord::value::CoordValue;
136
137    fn replica() -> ReplicaId {
138        ReplicaId::new(1)
139    }
140
141    fn hlc(ms: u64, logical: u16) -> Hlc {
142        Hlc::new(ms, logical, replica()).unwrap()
143    }
144
145    fn make_op(array: &str, ms: u64, logical: u16) -> ArrayOp {
146        ArrayOp {
147            header: ArrayOpHeader {
148                array: array.into(),
149                hlc: hlc(ms, logical),
150                schema_hlc: hlc(1, 0),
151                valid_from_ms: 0,
152                valid_until_ms: -1,
153                system_from_ms: ms as i64,
154            },
155            kind: ArrayOpKind::Put,
156            coord: vec![CoordValue::Int64(ms as i64)],
157            attrs: Some(vec![CellValue::Null]),
158        }
159    }
160
161    #[test]
162    fn append_then_scan() {
163        let log = InMemoryOpLog::new();
164        log.append(&make_op("arr", 10, 0)).unwrap();
165        log.append(&make_op("arr", 20, 0)).unwrap();
166        assert_eq!(log.len().unwrap(), 2);
167
168        let ops: Vec<_> = log
169            .scan_from(Hlc::ZERO)
170            .unwrap()
171            .map(|r| r.unwrap())
172            .collect();
173        assert_eq!(ops.len(), 2);
174    }
175
176    #[test]
177    fn scan_from_skips_below() {
178        let log = InMemoryOpLog::new();
179        for ms in [10, 20, 30, 40] {
180            log.append(&make_op("arr", ms, 0)).unwrap();
181        }
182
183        let ops: Vec<_> = log
184            .scan_from(hlc(25, 0))
185            .unwrap()
186            .map(|r| r.unwrap())
187            .collect();
188        // Only ms=30 and ms=40 should appear.
189        assert_eq!(ops.len(), 2);
190        assert!(ops.iter().all(|op| op.header.hlc.physical_ms >= 25));
191    }
192
193    #[test]
194    fn drop_below_drops_correctly() {
195        let log = InMemoryOpLog::new();
196        for ms in [10, 20, 30] {
197            log.append(&make_op("arr", ms, 0)).unwrap();
198        }
199        let dropped = log.drop_below(hlc(20, 0)).unwrap();
200        assert_eq!(dropped, 1); // ms=10 dropped
201        assert_eq!(log.len().unwrap(), 2);
202    }
203
204    #[test]
205    fn scan_range_filters_array() {
206        let log = InMemoryOpLog::new();
207        log.append(&make_op("a", 10, 0)).unwrap();
208        log.append(&make_op("b", 20, 0)).unwrap();
209        log.append(&make_op("a", 30, 0)).unwrap();
210
211        let ops: Vec<_> = log
212            .scan_range("a", Hlc::ZERO, hlc(u64::MAX >> 16, 0))
213            .unwrap()
214            .map(|r| r.unwrap())
215            .collect();
216        assert_eq!(ops.len(), 2);
217        assert!(ops.iter().all(|op| op.header.array == "a"));
218    }
219
220    #[test]
221    fn len_counts_all() {
222        let log = InMemoryOpLog::new();
223        assert_eq!(log.len().unwrap(), 0);
224        log.append(&make_op("x", 1, 0)).unwrap();
225        log.append(&make_op("y", 2, 0)).unwrap();
226        assert_eq!(log.len().unwrap(), 2);
227        // Idempotent re-append.
228        log.append(&make_op("x", 1, 0)).unwrap();
229        assert_eq!(log.len().unwrap(), 2);
230    }
231}