use std::collections::HashSet;
use std::sync::Mutex;
use crate::error::ArrayResult;
use crate::sync::ack::AckVector;
use crate::sync::hlc::Hlc;
use crate::sync::op_log::OpLog;
use crate::sync::snapshot::{SnapshotSink, TileSnapshot};
#[derive(Clone, Debug, PartialEq)]
pub struct GcReport {
pub snapshots_written: u64,
pub ops_dropped: u64,
pub frontier: Hlc,
}
pub fn collapse_below(
log: &dyn OpLog,
acks: &AckVector,
sink: &dyn SnapshotSink,
snapshot_for_array: impl Fn(&str, Hlc) -> ArrayResult<Option<TileSnapshot>>,
) -> ArrayResult<GcReport> {
let frontier = match acks.min_ack_hlc() {
None => {
return Ok(GcReport {
snapshots_written: 0,
ops_dropped: 0,
frontier: Hlc::ZERO,
});
}
Some(h) => h,
};
let mut arrays_to_snapshot: HashSet<String> = HashSet::new();
for item in log.scan_from(Hlc::ZERO)? {
let op = item?;
if op.header.hlc < frontier {
arrays_to_snapshot.insert(op.header.array.clone());
}
}
let mut snapshots_written: u64 = 0;
for array in &arrays_to_snapshot {
if let Some(snap) = snapshot_for_array(array, frontier)? {
sink.write_snapshot(&snap)?;
snapshots_written += 1;
}
}
let ops_dropped = log.drop_below(frontier)?;
Ok(GcReport {
snapshots_written,
ops_dropped,
frontier,
})
}
pub struct MockSnapshotSink {
snapshots: Mutex<Vec<TileSnapshot>>,
}
impl MockSnapshotSink {
pub fn new() -> Self {
Self {
snapshots: Mutex::new(Vec::new()),
}
}
pub fn snapshot_count(&self) -> usize {
self.snapshots
.lock()
.expect("invariant: MockSnapshotSink mutex is not poisoned")
.len()
}
pub fn into_snapshots(self) -> Vec<TileSnapshot> {
self.snapshots
.into_inner()
.expect("invariant: MockSnapshotSink mutex is not poisoned")
}
}
impl Default for MockSnapshotSink {
fn default() -> Self {
Self::new()
}
}
impl SnapshotSink for MockSnapshotSink {
fn write_snapshot(&self, snapshot: &TileSnapshot) -> crate::error::ArrayResult<()> {
self.snapshots
.lock()
.expect("invariant: MockSnapshotSink mutex is not poisoned")
.push(snapshot.clone());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::ArrayError;
use crate::sync::hlc::Hlc;
use crate::sync::op::{ArrayOp, ArrayOpHeader, ArrayOpKind};
use crate::sync::op_log::InMemoryOpLog;
use crate::sync::replica_id::ReplicaId;
use crate::sync::snapshot::CoordRange;
use crate::types::cell_value::value::CellValue;
use crate::types::coord::value::CoordValue;
fn replica() -> ReplicaId {
ReplicaId::new(1)
}
fn hlc(ms: u64) -> Hlc {
Hlc::new(ms, 0, replica()).unwrap()
}
fn make_op(array: &str, ms: u64) -> ArrayOp {
ArrayOp {
header: ArrayOpHeader {
array: array.into(),
hlc: hlc(ms),
schema_hlc: hlc(1),
valid_from_ms: 0,
valid_until_ms: -1,
system_from_ms: ms as i64,
},
kind: ArrayOpKind::Put,
coord: vec![CoordValue::Int64(ms as i64)],
attrs: Some(vec![CellValue::Null]),
}
}
fn dummy_snapshot(array: &str, frontier: Hlc) -> TileSnapshot {
TileSnapshot {
array: array.into(),
coord_range: CoordRange {
lo: vec![CoordValue::Int64(0)],
hi: vec![CoordValue::Int64(100)],
},
tile_blob: vec![0xAA; 16],
snapshot_hlc: frontier,
schema_hlc: hlc(1),
}
}
#[test]
fn gc_with_no_acks_is_noop() {
let log = InMemoryOpLog::new();
log.append(&make_op("arr", 10)).unwrap();
let acks = AckVector::new();
let sink = MockSnapshotSink::new();
let report = collapse_below(&log, &acks, &sink, |_, _| Ok(None)).unwrap();
assert_eq!(report.snapshots_written, 0);
assert_eq!(report.ops_dropped, 0);
assert_eq!(report.frontier, Hlc::ZERO);
assert_eq!(log.len().unwrap(), 1);
}
#[test]
fn gc_collapses_below_min_ack() {
let log = InMemoryOpLog::new();
for ms in [10, 20, 30, 40] {
log.append(&make_op("arr", ms)).unwrap();
}
let mut acks = AckVector::new();
acks.record(replica(), hlc(25));
let sink = MockSnapshotSink::new();
let report = collapse_below(&log, &acks, &sink, |array, frontier| {
Ok(Some(dummy_snapshot(array, frontier)))
})
.unwrap();
assert_eq!(report.frontier, hlc(25));
assert_eq!(report.ops_dropped, 2);
assert_eq!(report.snapshots_written, 1);
assert_eq!(sink.snapshot_count(), 1);
assert_eq!(log.len().unwrap(), 2);
}
#[test]
fn gc_skips_arrays_with_no_live_state() {
let log = InMemoryOpLog::new();
log.append(&make_op("alive", 10)).unwrap();
log.append(&make_op("dead", 10)).unwrap();
let mut acks = AckVector::new();
acks.record(replica(), hlc(50));
let sink = MockSnapshotSink::new();
let report = collapse_below(&log, &acks, &sink, |array, frontier| {
if array == "alive" {
Ok(Some(dummy_snapshot(array, frontier)))
} else {
Ok(None)
}
})
.unwrap();
assert_eq!(report.snapshots_written, 1);
assert_eq!(report.ops_dropped, 2);
}
#[test]
fn gc_propagates_snapshot_error() {
let log = InMemoryOpLog::new();
log.append(&make_op("arr", 10)).unwrap();
let mut acks = AckVector::new();
acks.record(replica(), hlc(50));
let sink = MockSnapshotSink::new();
let result = collapse_below(&log, &acks, &sink, |_, _| {
Err(ArrayError::SegmentCorruption {
detail: "simulated snapshot error".into(),
})
});
assert!(result.is_err());
assert_eq!(log.len().unwrap(), 1);
}
#[test]
fn gc_min_ack_with_two_replicas() {
let log = InMemoryOpLog::new();
for ms in [10, 20, 30, 40] {
log.append(&make_op("arr", ms)).unwrap();
}
let r2 = ReplicaId::new(2);
let mut acks = AckVector::new();
acks.record(replica(), hlc(50)); acks.record(r2, hlc(30));
let sink = MockSnapshotSink::new();
let report = collapse_below(&log, &acks, &sink, |array, frontier| {
Ok(Some(dummy_snapshot(array, frontier)))
})
.unwrap();
assert_eq!(report.frontier, hlc(30));
assert_eq!(report.ops_dropped, 2);
assert_eq!(log.len().unwrap(), 2);
}
}