1use std::collections::HashSet;
11use std::sync::Mutex;
12
13use crate::error::ArrayResult;
14use crate::sync::ack::AckVector;
15use crate::sync::hlc::Hlc;
16use crate::sync::op_log::OpLog;
17use crate::sync::snapshot::{SnapshotSink, TileSnapshot};
18
19#[derive(Clone, Debug, PartialEq)]
23pub struct GcReport {
24 pub snapshots_written: u64,
26 pub ops_dropped: u64,
28 pub frontier: Hlc,
30}
31
32pub fn collapse_below(
51 log: &dyn OpLog,
52 acks: &AckVector,
53 sink: &dyn SnapshotSink,
54 snapshot_for_array: impl Fn(&str, Hlc) -> ArrayResult<Option<TileSnapshot>>,
55) -> ArrayResult<GcReport> {
56 let frontier = match acks.min_ack_hlc() {
57 None => {
58 return Ok(GcReport {
59 snapshots_written: 0,
60 ops_dropped: 0,
61 frontier: Hlc::ZERO,
62 });
63 }
64 Some(h) => h,
65 };
66
67 let mut arrays_to_snapshot: HashSet<String> = HashSet::new();
69 for item in log.scan_from(Hlc::ZERO)? {
70 let op = item?;
71 if op.header.hlc < frontier {
72 arrays_to_snapshot.insert(op.header.array.clone());
73 }
74 }
75
76 let mut snapshots_written: u64 = 0;
78 for array in &arrays_to_snapshot {
79 if let Some(snap) = snapshot_for_array(array, frontier)? {
80 sink.write_snapshot(&snap)?;
81 snapshots_written += 1;
82 }
83 }
84
85 let ops_dropped = log.drop_below(frontier)?;
87
88 Ok(GcReport {
89 snapshots_written,
90 ops_dropped,
91 frontier,
92 })
93}
94
95pub struct MockSnapshotSink {
100 snapshots: Mutex<Vec<TileSnapshot>>,
101}
102
103impl MockSnapshotSink {
104 pub fn new() -> Self {
106 Self {
107 snapshots: Mutex::new(Vec::new()),
108 }
109 }
110
111 pub fn snapshot_count(&self) -> usize {
113 self.snapshots
114 .lock()
115 .expect("invariant: MockSnapshotSink mutex is not poisoned")
116 .len()
117 }
118
119 pub fn into_snapshots(self) -> Vec<TileSnapshot> {
121 self.snapshots
122 .into_inner()
123 .expect("invariant: MockSnapshotSink mutex is not poisoned")
124 }
125}
126
127impl Default for MockSnapshotSink {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl SnapshotSink for MockSnapshotSink {
134 fn write_snapshot(&self, snapshot: &TileSnapshot) -> crate::error::ArrayResult<()> {
135 self.snapshots
136 .lock()
137 .expect("invariant: MockSnapshotSink mutex is not poisoned")
138 .push(snapshot.clone());
139 Ok(())
140 }
141}
142
143#[cfg(test)]
146mod tests {
147 use super::*;
148 use crate::error::ArrayError;
149 use crate::sync::hlc::Hlc;
150 use crate::sync::op::{ArrayOp, ArrayOpHeader, ArrayOpKind};
151 use crate::sync::op_log::InMemoryOpLog;
152 use crate::sync::replica_id::ReplicaId;
153 use crate::sync::snapshot::CoordRange;
154 use crate::types::cell_value::value::CellValue;
155 use crate::types::coord::value::CoordValue;
156
157 fn replica() -> ReplicaId {
158 ReplicaId::new(1)
159 }
160
161 fn hlc(ms: u64) -> Hlc {
162 Hlc::new(ms, 0, replica()).unwrap()
163 }
164
165 fn make_op(array: &str, ms: u64) -> ArrayOp {
166 ArrayOp {
167 header: ArrayOpHeader {
168 array: array.into(),
169 hlc: hlc(ms),
170 schema_hlc: hlc(1),
171 valid_from_ms: 0,
172 valid_until_ms: -1,
173 system_from_ms: ms as i64,
174 },
175 kind: ArrayOpKind::Put,
176 coord: vec![CoordValue::Int64(ms as i64)],
177 attrs: Some(vec![CellValue::Null]),
178 }
179 }
180
181 fn dummy_snapshot(array: &str, frontier: Hlc) -> TileSnapshot {
182 TileSnapshot {
183 array: array.into(),
184 coord_range: CoordRange {
185 lo: vec![CoordValue::Int64(0)],
186 hi: vec![CoordValue::Int64(100)],
187 },
188 tile_blob: vec![0xAA; 16],
189 snapshot_hlc: frontier,
190 schema_hlc: hlc(1),
191 }
192 }
193
194 #[test]
195 fn gc_with_no_acks_is_noop() {
196 let log = InMemoryOpLog::new();
197 log.append(&make_op("arr", 10)).unwrap();
198 let acks = AckVector::new();
199 let sink = MockSnapshotSink::new();
200
201 let report = collapse_below(&log, &acks, &sink, |_, _| Ok(None)).unwrap();
202
203 assert_eq!(report.snapshots_written, 0);
204 assert_eq!(report.ops_dropped, 0);
205 assert_eq!(report.frontier, Hlc::ZERO);
206 assert_eq!(log.len().unwrap(), 1);
207 }
208
209 #[test]
210 fn gc_collapses_below_min_ack() {
211 let log = InMemoryOpLog::new();
212 for ms in [10, 20, 30, 40] {
213 log.append(&make_op("arr", ms)).unwrap();
214 }
215 let mut acks = AckVector::new();
216 acks.record(replica(), hlc(25));
217
218 let sink = MockSnapshotSink::new();
219 let report = collapse_below(&log, &acks, &sink, |array, frontier| {
220 Ok(Some(dummy_snapshot(array, frontier)))
221 })
222 .unwrap();
223
224 assert_eq!(report.frontier, hlc(25));
226 assert_eq!(report.ops_dropped, 2);
227 assert_eq!(report.snapshots_written, 1);
229 assert_eq!(sink.snapshot_count(), 1);
230 assert_eq!(log.len().unwrap(), 2);
233 }
234
235 #[test]
236 fn gc_skips_arrays_with_no_live_state() {
237 let log = InMemoryOpLog::new();
238 log.append(&make_op("alive", 10)).unwrap();
239 log.append(&make_op("dead", 10)).unwrap();
240
241 let mut acks = AckVector::new();
242 acks.record(replica(), hlc(50));
243
244 let sink = MockSnapshotSink::new();
245 let report = collapse_below(&log, &acks, &sink, |array, frontier| {
246 if array == "alive" {
247 Ok(Some(dummy_snapshot(array, frontier)))
248 } else {
249 Ok(None)
250 }
251 })
252 .unwrap();
253
254 assert_eq!(report.snapshots_written, 1);
256 assert_eq!(report.ops_dropped, 2);
258 }
259
260 #[test]
261 fn gc_propagates_snapshot_error() {
262 let log = InMemoryOpLog::new();
263 log.append(&make_op("arr", 10)).unwrap();
264
265 let mut acks = AckVector::new();
266 acks.record(replica(), hlc(50));
267
268 let sink = MockSnapshotSink::new();
269 let result = collapse_below(&log, &acks, &sink, |_, _| {
270 Err(ArrayError::SegmentCorruption {
271 detail: "simulated snapshot error".into(),
272 })
273 });
274
275 assert!(result.is_err());
276 assert_eq!(log.len().unwrap(), 1);
278 }
279
280 #[test]
281 fn gc_min_ack_with_two_replicas() {
282 let log = InMemoryOpLog::new();
283 for ms in [10, 20, 30, 40] {
284 log.append(&make_op("arr", ms)).unwrap();
285 }
286
287 let r2 = ReplicaId::new(2);
288 let mut acks = AckVector::new();
289 acks.record(replica(), hlc(50)); acks.record(r2, hlc(30)); let sink = MockSnapshotSink::new();
293 let report = collapse_below(&log, &acks, &sink, |array, frontier| {
294 Ok(Some(dummy_snapshot(array, frontier)))
295 })
296 .unwrap();
297
298 assert_eq!(report.frontier, hlc(30));
299 assert_eq!(report.ops_dropped, 2);
301 assert_eq!(log.len().unwrap(), 2);
302 }
303}