use crate::Database;
use crate::error::DbxResult;
use crate::storage::StorageBackend;
use crate::transaction::mvcc::version::VersionedKey;
use std::collections::HashMap;
#[derive(Debug)]
pub struct GarbageCollector {
min_versions_per_key: usize,
}
impl GarbageCollector {
pub fn new() -> Self {
Self {
min_versions_per_key: 1,
}
}
pub fn with_min_versions(min_versions: usize) -> Self {
Self {
min_versions_per_key: min_versions.max(1), }
}
pub fn collect(&self, db: &Database, min_active_ts: u64) -> DbxResult<usize> {
let mut deleted_count = 0;
let tables = db.table_names()?;
for table in tables {
let all_entries = db.delta.scan(&table, vec![]..)?;
let mut key_versions: HashMap<Vec<u8>, Vec<(Vec<u8>, u64)>> = HashMap::new();
for (encoded_key, _value) in all_entries {
if let Ok(vk) = VersionedKey::decode(&encoded_key) {
key_versions
.entry(vk.user_key.clone())
.or_default()
.push((encoded_key, vk.commit_ts));
}
}
for (_user_key, mut versions) in key_versions {
versions.sort_by(|a, b| b.1.cmp(&a.1));
let to_keep = self.min_versions_per_key;
for (i, (encoded_key, commit_ts)) in versions.iter().enumerate() {
if i < to_keep {
continue;
}
let has_visible_newer_version =
versions[..i].iter().any(|(_, ts)| *ts <= min_active_ts);
if *commit_ts < min_active_ts && has_visible_newer_version {
db.delta.delete(&table, encoded_key)?;
deleted_count += 1;
}
}
}
}
Ok(deleted_count)
}
pub fn estimate_garbage(&self, db: &Database, min_active_ts: u64) -> DbxResult<usize> {
let mut garbage_count = 0;
let tables = db.table_names()?;
for table in tables {
let all_entries = db.delta.scan(&table, vec![]..)?;
let mut key_versions: HashMap<Vec<u8>, Vec<u64>> = HashMap::new();
for (encoded_key, _value) in all_entries {
if let Ok(vk) = VersionedKey::decode(&encoded_key) {
key_versions
.entry(vk.user_key.clone())
.or_default()
.push(vk.commit_ts);
}
}
for (_user_key, mut versions) in key_versions {
versions.sort_by(|a, b| b.cmp(a));
for (i, commit_ts) in versions.iter().enumerate() {
if i < self.min_versions_per_key {
continue;
}
let has_visible_newer_version =
versions[..i].iter().any(|ts| *ts <= min_active_ts);
if *commit_ts < min_active_ts && has_visible_newer_version {
garbage_count += 1;
}
}
}
}
Ok(garbage_count)
}
}
impl Default for GarbageCollector {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_gc_removes_old_versions() -> DbxResult<()> {
let db = Database::open_in_memory()?;
let gc = GarbageCollector::new();
db.insert_versioned("test", b"key1", Some(b"v1"), 10)?;
db.insert_versioned("test", b"key1", Some(b"v2"), 20)?;
db.insert_versioned("test", b"key1", Some(b"v3"), 30)?;
let estimate_before = gc.estimate_garbage(&db, 25)?;
assert_eq!(estimate_before, 1);
let deleted = gc.collect(&db, 25)?;
assert_eq!(deleted, 1);
let estimate_after = gc.estimate_garbage(&db, 25)?;
assert_eq!(estimate_after, 0);
Ok(())
}
#[test]
fn test_gc_keeps_minimum_versions() -> DbxResult<()> {
let db = Database::open_in_memory()?;
let gc = GarbageCollector::with_min_versions(2);
db.insert_versioned("test", b"key1", Some(b"v1"), 10)?;
db.insert_versioned("test", b"key1", Some(b"v2"), 20)?;
db.insert_versioned("test", b"key1", Some(b"v3"), 30)?;
db.insert_versioned("test", b"key1", Some(b"v4"), 40)?;
let deleted = gc.collect(&db, 100)?;
assert_eq!(deleted, 2);
Ok(())
}
#[test]
fn test_gc_multiple_keys() -> DbxResult<()> {
let db = Database::open_in_memory()?;
let gc = GarbageCollector::new();
db.insert_versioned("test", b"key1", Some(b"v1"), 10)?;
db.insert_versioned("test", b"key1", Some(b"v2"), 20)?;
db.insert_versioned("test", b"key1", Some(b"v3"), 30)?;
db.insert_versioned("test", b"key2", Some(b"v1"), 15)?;
db.insert_versioned("test", b"key2", Some(b"v2"), 20)?;
db.insert_versioned("test", b"key2", Some(b"v3"), 25)?;
let deleted = gc.collect(&db, 22)?;
assert_eq!(deleted, 2);
Ok(())
}
#[test]
fn test_gc_estimate_accuracy() -> DbxResult<()> {
let db = Database::open_in_memory()?;
let gc = GarbageCollector::new();
db.insert_versioned("test", b"key1", Some(b"v1"), 10)?;
db.insert_versioned("test", b"key1", Some(b"v2"), 20)?;
db.insert_versioned("test", b"key1", Some(b"v3"), 30)?;
let estimate = gc.estimate_garbage(&db, 25)?;
let actual = gc.collect(&db, 25)?;
assert_eq!(estimate, actual);
Ok(())
}
#[test]
fn test_gc_empty_database() -> DbxResult<()> {
let db = Database::open_in_memory()?;
let gc = GarbageCollector::new();
let deleted = gc.collect(&db, 100)?;
assert_eq!(deleted, 0);
Ok(())
}
}