Skip to main content

nodedb_array/sync/
gc.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Log compaction: collapse ops below the min-ack frontier into snapshots.
4//!
5//! [`collapse_below`] is the pure GC entry point. It consults an
6//! [`AckVector`] to find the min-ack frontier, writes one snapshot per
7//! affected array via a [`SnapshotSink`], then drops the compacted ops from
8//! the log. No I/O beyond the abstract traits.
9
10use std::collections::HashSet;
11use std::sync::Mutex;
12
13use crate::error::ArrayResult;
14use crate::sync::ack::AckVector;
15use crate::sync::hlc::Hlc;
16use crate::sync::op_log::OpLog;
17use crate::sync::snapshot::{SnapshotSink, TileSnapshot};
18
19// ─── GcReport ────────────────────────────────────────────────────────────────
20
21/// Summary produced by [`collapse_below`].
22#[derive(Clone, Debug, PartialEq)]
23pub struct GcReport {
24    /// Number of arrays for which a snapshot was successfully written.
25    pub snapshots_written: u64,
26    /// Number of ops dropped from the log below the frontier.
27    pub ops_dropped: u64,
28    /// The GC frontier HLC used for this run.
29    pub frontier: Hlc,
30}
31
32// ─── collapse_below ───────────────────────────────────────────────────────────
33
34/// Compact ops below the min-ack frontier into snapshots, then prune the log.
35///
36/// Algorithm:
37/// 1. Determine frontier = `acks.min_ack_hlc()`. If `None`, return early with
38///    an empty report — GC must not proceed without knowing every peer's
39///    progress.
40/// 2. Collect distinct array names whose ops have `hlc < frontier`.
41/// 3. For each such array, call `snapshot_for_array(name, frontier)`.
42///    - `Some(snap)` → write via `sink.write_snapshot(&snap)` and increment
43///      `snapshots_written`.
44///    - `None` → the caller signals "no live state for this array"; skip the
45///      snapshot write (ops will still be dropped).
46///    - `Err` → propagate immediately; the log is **not** mutated.
47/// 4. After all snapshots succeed, call `log.drop_below(frontier)` and record
48///    the count in `ops_dropped`.
49/// 5. Return the report.
50pub fn collapse_below(
51    log: &dyn OpLog,
52    acks: &AckVector,
53    sink: &dyn SnapshotSink,
54    snapshot_for_array: impl Fn(&str, Hlc) -> ArrayResult<Option<TileSnapshot>>,
55) -> ArrayResult<GcReport> {
56    let frontier = match acks.min_ack_hlc() {
57        None => {
58            return Ok(GcReport {
59                snapshots_written: 0,
60                ops_dropped: 0,
61                frontier: Hlc::ZERO,
62            });
63        }
64        Some(h) => h,
65    };
66
67    // Collect distinct array names with ops below the frontier.
68    let mut arrays_to_snapshot: HashSet<String> = HashSet::new();
69    for item in log.scan_from(Hlc::ZERO)? {
70        let op = item?;
71        if op.header.hlc < frontier {
72            arrays_to_snapshot.insert(op.header.array.clone());
73        }
74    }
75
76    // Write snapshots — abort on the first error; do NOT mutate the log yet.
77    let mut snapshots_written: u64 = 0;
78    for array in &arrays_to_snapshot {
79        if let Some(snap) = snapshot_for_array(array, frontier)? {
80            sink.write_snapshot(&snap)?;
81            snapshots_written += 1;
82        }
83    }
84
85    // All snapshots succeeded — now prune the log.
86    let ops_dropped = log.drop_below(frontier)?;
87
88    Ok(GcReport {
89        snapshots_written,
90        ops_dropped,
91        frontier,
92    })
93}
94
95// ─── MockSnapshotSink ────────────────────────────────────────────────────────
96
97/// In-memory [`SnapshotSink`] suitable for tests in any crate that depends
98/// on `nodedb-array`.
99pub struct MockSnapshotSink {
100    snapshots: Mutex<Vec<TileSnapshot>>,
101}
102
103impl MockSnapshotSink {
104    /// Create an empty sink.
105    pub fn new() -> Self {
106        Self {
107            snapshots: Mutex::new(Vec::new()),
108        }
109    }
110
111    /// Return the number of snapshots written so far.
112    pub fn snapshot_count(&self) -> usize {
113        self.snapshots
114            .lock()
115            .expect("invariant: MockSnapshotSink mutex is not poisoned")
116            .len()
117    }
118
119    /// Consume the sink, returning all written snapshots.
120    pub fn into_snapshots(self) -> Vec<TileSnapshot> {
121        self.snapshots
122            .into_inner()
123            .expect("invariant: MockSnapshotSink mutex is not poisoned")
124    }
125}
126
127impl Default for MockSnapshotSink {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl SnapshotSink for MockSnapshotSink {
134    fn write_snapshot(&self, snapshot: &TileSnapshot) -> crate::error::ArrayResult<()> {
135        self.snapshots
136            .lock()
137            .expect("invariant: MockSnapshotSink mutex is not poisoned")
138            .push(snapshot.clone());
139        Ok(())
140    }
141}
142
143// ─── Tests ───────────────────────────────────────────────────────────────────
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use crate::error::ArrayError;
149    use crate::sync::hlc::Hlc;
150    use crate::sync::op::{ArrayOp, ArrayOpHeader, ArrayOpKind};
151    use crate::sync::op_log::InMemoryOpLog;
152    use crate::sync::replica_id::ReplicaId;
153    use crate::sync::snapshot::CoordRange;
154    use crate::types::cell_value::value::CellValue;
155    use crate::types::coord::value::CoordValue;
156
157    fn replica() -> ReplicaId {
158        ReplicaId::new(1)
159    }
160
161    fn hlc(ms: u64) -> Hlc {
162        Hlc::new(ms, 0, replica()).unwrap()
163    }
164
165    fn make_op(array: &str, ms: u64) -> ArrayOp {
166        ArrayOp {
167            header: ArrayOpHeader {
168                array: array.into(),
169                hlc: hlc(ms),
170                schema_hlc: hlc(1),
171                valid_from_ms: 0,
172                valid_until_ms: -1,
173                system_from_ms: ms as i64,
174            },
175            kind: ArrayOpKind::Put,
176            coord: vec![CoordValue::Int64(ms as i64)],
177            attrs: Some(vec![CellValue::Null]),
178        }
179    }
180
181    fn dummy_snapshot(array: &str, frontier: Hlc) -> TileSnapshot {
182        TileSnapshot {
183            array: array.into(),
184            coord_range: CoordRange {
185                lo: vec![CoordValue::Int64(0)],
186                hi: vec![CoordValue::Int64(100)],
187            },
188            tile_blob: vec![0xAA; 16],
189            snapshot_hlc: frontier,
190            schema_hlc: hlc(1),
191        }
192    }
193
194    #[test]
195    fn gc_with_no_acks_is_noop() {
196        let log = InMemoryOpLog::new();
197        log.append(&make_op("arr", 10)).unwrap();
198        let acks = AckVector::new();
199        let sink = MockSnapshotSink::new();
200
201        let report = collapse_below(&log, &acks, &sink, |_, _| Ok(None)).unwrap();
202
203        assert_eq!(report.snapshots_written, 0);
204        assert_eq!(report.ops_dropped, 0);
205        assert_eq!(report.frontier, Hlc::ZERO);
206        assert_eq!(log.len().unwrap(), 1);
207    }
208
209    #[test]
210    fn gc_collapses_below_min_ack() {
211        let log = InMemoryOpLog::new();
212        for ms in [10, 20, 30, 40] {
213            log.append(&make_op("arr", ms)).unwrap();
214        }
215        let mut acks = AckVector::new();
216        acks.record(replica(), hlc(25));
217
218        let sink = MockSnapshotSink::new();
219        let report = collapse_below(&log, &acks, &sink, |array, frontier| {
220            Ok(Some(dummy_snapshot(array, frontier)))
221        })
222        .unwrap();
223
224        // frontier = 25; ops at 10 and 20 are below it
225        assert_eq!(report.frontier, hlc(25));
226        assert_eq!(report.ops_dropped, 2);
227        // One distinct array ("arr") had ops below frontier
228        assert_eq!(report.snapshots_written, 1);
229        assert_eq!(sink.snapshot_count(), 1);
230        // Remaining ops: ms=25 (excluded by strict <), ms=30, ms=40
231        // drop_below(25) drops strictly < 25 → drops 10, 20 → 2 remain (30 and 40)
232        assert_eq!(log.len().unwrap(), 2);
233    }
234
235    #[test]
236    fn gc_skips_arrays_with_no_live_state() {
237        let log = InMemoryOpLog::new();
238        log.append(&make_op("alive", 10)).unwrap();
239        log.append(&make_op("dead", 10)).unwrap();
240
241        let mut acks = AckVector::new();
242        acks.record(replica(), hlc(50));
243
244        let sink = MockSnapshotSink::new();
245        let report = collapse_below(&log, &acks, &sink, |array, frontier| {
246            if array == "alive" {
247                Ok(Some(dummy_snapshot(array, frontier)))
248            } else {
249                Ok(None)
250            }
251        })
252        .unwrap();
253
254        // Only "alive" gets a snapshot; "dead" is skipped.
255        assert_eq!(report.snapshots_written, 1);
256        // Both ops are dropped.
257        assert_eq!(report.ops_dropped, 2);
258    }
259
260    #[test]
261    fn gc_propagates_snapshot_error() {
262        let log = InMemoryOpLog::new();
263        log.append(&make_op("arr", 10)).unwrap();
264
265        let mut acks = AckVector::new();
266        acks.record(replica(), hlc(50));
267
268        let sink = MockSnapshotSink::new();
269        let result = collapse_below(&log, &acks, &sink, |_, _| {
270            Err(ArrayError::SegmentCorruption {
271                detail: "simulated snapshot error".into(),
272            })
273        });
274
275        assert!(result.is_err());
276        // Log must be unchanged.
277        assert_eq!(log.len().unwrap(), 1);
278    }
279
280    #[test]
281    fn gc_min_ack_with_two_replicas() {
282        let log = InMemoryOpLog::new();
283        for ms in [10, 20, 30, 40] {
284            log.append(&make_op("arr", ms)).unwrap();
285        }
286
287        let r2 = ReplicaId::new(2);
288        let mut acks = AckVector::new();
289        acks.record(replica(), hlc(50)); // replica 1 ack at 50
290        acks.record(r2, hlc(30)); // replica 2 ack at 30 → frontier = 30
291
292        let sink = MockSnapshotSink::new();
293        let report = collapse_below(&log, &acks, &sink, |array, frontier| {
294            Ok(Some(dummy_snapshot(array, frontier)))
295        })
296        .unwrap();
297
298        assert_eq!(report.frontier, hlc(30));
299        // Ops at 10, 20 are strictly < 30 → 2 dropped; 30, 40 survive.
300        assert_eq!(report.ops_dropped, 2);
301        assert_eq!(log.len().unwrap(), 2);
302    }
303}