use crate::checkpoint::Checkpoint;
use crate::clock::SystemClock;
use crate::compactions_store::CompactionsStore;
use crate::config::GarbageCollectorOptions;
use crate::dispatcher::{MessageFactory, MessageHandler};
use crate::error::SlateDBError;
use crate::garbage_collector::stats::GcStats;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::manifest::Manifest;
use crate::stats::StatRegistry;
use crate::tablestore::TableStore;
use crate::transactional_object::{DirtyObject, SimpleTransactionalObject, TransactionalObject};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use compacted_gc::CompactedGcTask;
use compactions_gc::CompactionsGcTask;
use futures::stream::BoxStream;
use log::{debug, error, info};
use manifest_gc::ManifestGcTask;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use wal_gc::WalGcTask;
mod compacted_gc;
mod compactions_gc;
mod manifest_gc;
pub mod stats;
mod wal_gc;
pub const DEFAULT_MIN_AGE: Duration = Duration::from_secs(1800);
pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(300);
pub(crate) const GC_TASK_NAME: &str = "garbage_collector";
trait GcTask {
fn resource(&self) -> &str;
async fn collect(&self, now: DateTime<Utc>) -> Result<(), SlateDBError>;
}
#[derive(Debug)]
pub(crate) enum GcMessage {
GcWal,
GcCompacted,
GcCompactions,
GcManifest,
LogStats,
}
pub struct GarbageCollector {
manifest_store: Arc<ManifestStore>,
options: GarbageCollectorOptions,
stats: Arc<GcStats>,
system_clock: Arc<dyn SystemClock>,
manifest_gc_task: ManifestGcTask,
wal_gc_task: WalGcTask,
compacted_gc_task: CompactedGcTask,
compactions_gc_task: CompactionsGcTask,
}
#[async_trait]
impl MessageHandler<GcMessage> for GarbageCollector {
fn tickers(&mut self) -> Vec<(Duration, Box<MessageFactory<GcMessage>>)> {
let compacted_interval = self
.options
.compacted_options
.and_then(|o| o.interval)
.unwrap_or(DEFAULT_INTERVAL);
let manifest_interval = self
.options
.manifest_options
.and_then(|o| o.interval)
.unwrap_or(DEFAULT_INTERVAL);
let wal_interval = self
.options
.wal_options
.and_then(|o| o.interval)
.unwrap_or(DEFAULT_INTERVAL);
let compactions_interval = self
.options
.compactions_options
.and_then(|o| o.interval)
.unwrap_or(DEFAULT_INTERVAL);
vec![
(manifest_interval, Box::new(|| GcMessage::GcManifest)),
(wal_interval, Box::new(|| GcMessage::GcWal)),
(compacted_interval, Box::new(|| GcMessage::GcCompacted)),
(compactions_interval, Box::new(|| GcMessage::GcCompactions)),
(Duration::from_secs(60), Box::new(|| GcMessage::LogStats)),
]
}
async fn handle(&mut self, message: GcMessage) -> Result<(), SlateDBError> {
match message {
GcMessage::GcManifest => self.run_gc_task(&self.manifest_gc_task).await,
GcMessage::GcWal => self.run_gc_task(&self.wal_gc_task).await,
GcMessage::GcCompacted => self.run_gc_task(&self.compacted_gc_task).await,
GcMessage::GcCompactions => self.run_gc_task(&self.compactions_gc_task).await,
GcMessage::LogStats => self.log_stats(),
}
Ok(())
}
async fn cleanup(
&mut self,
_messages: BoxStream<'async_trait, GcMessage>,
_result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
info!("garbage collector shutdown");
self.log_stats();
Ok(())
}
}
impl GarbageCollector {
pub(crate) fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: GarbageCollectorOptions,
stat_registry: Arc<StatRegistry>,
system_clock: Arc<dyn SystemClock>,
) -> Self {
let stats = Arc::new(GcStats::new(stat_registry.clone()));
let wal_gc_task = WalGcTask::new(
manifest_store.clone(),
table_store.clone(),
stats.clone(),
options.wal_options,
);
let compacted_gc_task = CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats.clone(),
options.compacted_options,
);
let compactions_gc_task = CompactionsGcTask::new(
compactions_store.clone(),
stats.clone(),
options.compactions_options,
);
let manifest_gc_task = ManifestGcTask::new(
manifest_store.clone(),
stats.clone(),
options.manifest_options,
);
Self {
manifest_store,
options,
stats,
system_clock,
manifest_gc_task,
wal_gc_task,
compacted_gc_task,
compactions_gc_task,
}
}
pub async fn run_gc_once(&self) {
self.run_gc_task(&self.manifest_gc_task).await;
self.run_gc_task(&self.wal_gc_task).await;
self.run_gc_task(&self.compacted_gc_task).await;
self.run_gc_task(&self.compactions_gc_task).await;
self.stats.gc_count.inc();
}
#[instrument(level = "debug", skip_all, fields(resource = task.resource()))]
async fn run_gc_task<T: GcTask + std::fmt::Debug>(&self, task: &T) {
if let Err(e) = self.remove_expired_checkpoints().await {
error!("error removing expired checkpoints [error={}]", e);
} else if let Err(e) = task.collect(self.system_clock.now()).await {
error!("error collecting compacted garbage [error={}]", e);
}
}
async fn remove_expired_checkpoints(&self) -> Result<(), SlateDBError> {
let mut stored_manifest =
StoredManifest::load(Arc::clone(&self.manifest_store), self.system_clock.clone())
.await?;
stored_manifest
.maybe_apply_update(|manifest| self.filter_expired_checkpoints(manifest))
.await
}
fn filter_expired_checkpoints(
&self,
manifest: &SimpleTransactionalObject<Manifest>,
) -> Result<Option<DirtyObject<Manifest>>, SlateDBError> {
let utc_now: DateTime<Utc> = self.system_clock.now();
let mut dirty = manifest.prepare_dirty()?;
let retained_checkpoints: Vec<Checkpoint> = dirty
.core()
.checkpoints
.iter()
.filter(|checkpoint| match checkpoint.expire_time {
Some(expire_time) => expire_time > utc_now,
None => true,
})
.cloned()
.collect();
let maybe_dirty = if dirty.core().checkpoints.len() != retained_checkpoints.len() {
dirty.value.core.checkpoints = retained_checkpoints;
Some(dirty)
} else {
None
};
Ok(maybe_dirty)
}
fn log_stats(&self) {
debug!(
"garbage collector stats [manifest_count={}, wals_count={}, compacted_count={}, compactions_count={}]",
self.stats.gc_manifest_count.value.load(Ordering::SeqCst),
self.stats.gc_wal_count.value.load(Ordering::SeqCst),
self.stats.gc_compacted_count.value.load(Ordering::SeqCst),
self.stats.gc_compactions_count.value.load(Ordering::SeqCst)
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::{fs::OpenOptions, sync::Arc};
use chrono::{DateTime, Days, TimeDelta, Utc};
use object_store::{local::LocalFileSystem, path::Path};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::checkpoint::Checkpoint;
use crate::clock::DefaultSystemClock;
use crate::compactions_store::StoredCompactions;
use crate::compactor_state::{Compaction, CompactionSpec, SourceId};
use crate::config::{GarbageCollectorDirectoryOptions, GarbageCollectorOptions};
use crate::dispatcher::MessageHandlerExecutor;
use crate::error::SlateDBError;
use crate::object_stores::ObjectStores;
use crate::paths::PathResolver;
use crate::types::RowEntry;
use crate::utils::WatchableOnceCell;
use crate::{
db_state::{CoreDbState, SortedRun, SsTableHandle, SsTableId},
manifest::store::{ManifestStore, StoredManifest},
sst::SsTableFormat,
tablestore::TableStore,
};
#[tokio::test]
async fn test_collect_garbage_manifest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = CoreDbState::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let now_minus_24h = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
assert_eq!(manifests[0].last_modified, now_minus_24h);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
}
#[tokio::test]
async fn test_collect_garbage_only_recent_manifests() {
let (manifest_store, compactions_store, table_store, _) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
CoreDbState::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
}
#[tokio::test]
async fn test_collect_garbage_compactions_keeps_latest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
StoredManifest::create_new_db(
manifest_store.clone(),
CoreDbState::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut stored_compactions = StoredCompactions::create(compactions_store.clone(), 0)
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
let compaction_count = compactions_store
.list_compactions(..)
.await
.expect("should list compactions")
.len();
assert_eq!(compaction_count, 3);
for id in 1..=compaction_count {
set_modified(
local_object_store.clone(),
&Path::from(format!("compactions/{:020}.{}", id, "compactions")),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let compactions = compactions_store.list_compactions(..).await.unwrap();
assert_eq!(compactions.len(), 1);
assert_eq!(compactions[0].id, 3);
}
#[tokio::test]
async fn test_collect_garbage_compactions_respects_min_age() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
StoredManifest::create_new_db(
manifest_store.clone(),
CoreDbState::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut stored_compactions = StoredCompactions::create(compactions_store.clone(), 0)
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
let compaction_count = compactions_store
.list_compactions(..)
.await
.expect("should list compactions")
.len();
assert_eq!(compaction_count, 2);
for id in 1..=compaction_count {
set_modified(
local_object_store.clone(),
&Path::from(format!("compactions/{:020}.{}", id, "compactions")),
60,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let compactions = compactions_store.list_compactions(..).await.unwrap();
assert_eq!(compactions.len(), 2);
assert_eq!(compactions[0].id, 1);
assert_eq!(compactions[1].id, 2);
}
fn new_checkpoint(manifest_id: u64, expire_time: Option<DateTime<Utc>>) -> Checkpoint {
Checkpoint {
id: uuid::Uuid::new_v4(),
manifest_id,
expire_time,
create_time: DefaultSystemClock::default().now(),
name: None,
}
}
async fn checkpoint_current_manifest(
stored_manifest: &mut StoredManifest,
expire_time: Option<DateTime<Utc>>,
) -> Result<Uuid, SlateDBError> {
let mut dirty = stored_manifest.prepare_dirty()?;
let checkpoint = new_checkpoint(stored_manifest.id(), expire_time);
let checkpoint_id = checkpoint.id;
dirty.value.core.checkpoints.push(checkpoint);
stored_manifest.update(dirty).await?;
Ok(checkpoint_id)
}
async fn remove_checkpoint(
checkpoint_id: Uuid,
stored_manifest: &mut StoredManifest,
) -> Result<(), SlateDBError> {
let mut dirty = stored_manifest.prepare_dirty()?;
let updated_checkpoints = dirty
.core()
.checkpoints
.iter()
.filter(|checkpoint| checkpoint.id != checkpoint_id)
.cloned()
.collect();
dirty.value.core.checkpoints = updated_checkpoints;
stored_manifest.update(dirty).await?;
Ok(())
}
#[tokio::test]
async fn test_remove_expired_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = CoreDbState::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let one_day_ago = DefaultSystemClock::default()
.now()
.checked_sub_days(Days::new(1))
.unwrap();
let _expired_checkpoint_id =
checkpoint_current_manifest(&mut stored_manifest, Some(one_day_ago))
.await
.unwrap();
let one_day_ahead = DefaultSystemClock::default()
.now()
.checked_add_days(Days::new(1))
.unwrap();
let unexpired_checkpoint_id =
checkpoint_current_manifest(&mut stored_manifest, Some(one_day_ahead))
.await
.unwrap();
for i in 1..=3 {
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", i, "manifest")),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let (latest_manifest_id, latest_manifest) =
manifest_store.read_latest_manifest().await.unwrap();
assert_eq!(4, latest_manifest_id);
assert_eq!(1, latest_manifest.core.checkpoints.len());
assert_eq!(
unexpired_checkpoint_id,
latest_manifest.core.checkpoints[0].id
);
assert_eq!(2, latest_manifest.core.checkpoints[0].manifest_id);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 2);
assert_eq!(manifests[1].id, 4);
}
#[tokio::test]
async fn test_collector_should_not_clean_manifests_referenced_by_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = CoreDbState::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let active_checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
let inactive_checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
remove_checkpoint(inactive_checkpoint_id, &mut stored_manifest)
.await
.unwrap();
for i in 1..4 {
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", i, "manifest")),
86400,
);
}
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 4);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let (latest_manifest_id, latest_manifest) =
manifest_store.read_latest_manifest().await.unwrap();
assert_eq!(4, latest_manifest_id);
assert_eq!(1, latest_manifest.core.checkpoints.len());
assert_eq!(active_checkpoint_id, latest_manifest.core.checkpoints[0].id);
assert_eq!(1, latest_manifest.core.checkpoints[0].manifest_id);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 4);
}
#[tokio::test]
async fn test_collect_garbage_old_active_manifest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
CoreDbState::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let now_minus_24h_1 = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let now_minus_24h_2 = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 2, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
assert_eq!(manifests[0].last_modified, now_minus_24h_1);
assert_eq!(manifests[1].last_modified, now_minus_24h_2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
}
async fn write_sst(
table_store: Arc<TableStore>,
table_id: &SsTableId,
) -> Result<(), SlateDBError> {
let mut sst = table_store.table_builder();
sst.add(RowEntry::new_value(b"key", b"value", 0))?;
let table1 = sst.build()?;
table_store.write_sst(table_id, table1, false).await?;
Ok(())
}
#[tokio::test]
async fn test_collect_garbage_wal_ssts() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
write_sst(table_store.clone(), &id1).await.unwrap();
let id2 = SsTableId::Wal(2);
write_sst(table_store.clone(), &id2).await.unwrap();
let now_minus_24h = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(1)),
86400,
);
let mut state = CoreDbState::new();
state.replay_after_wal_id = id2.unwrap_wal_id();
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id1);
assert_eq!(wal_ssts[1].id, id2);
assert_eq!(wal_ssts[0].last_modified, now_minus_24h);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(
current_manifest.core.replay_after_wal_id,
id2.unwrap_wal_id()
);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 1);
assert_eq!(wal_ssts[0].id, id2);
}
#[tokio::test]
async fn test_do_not_remove_wals_referenced_by_active_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
write_sst(table_store.clone(), &id1).await.unwrap();
let id2 = SsTableId::Wal(2);
write_sst(table_store.clone(), &id2).await.unwrap();
let id3 = SsTableId::Wal(3);
write_sst(table_store.clone(), &id3).await.unwrap();
let mut state = CoreDbState::new();
state.replay_after_wal_id = 1;
state.next_wal_sst_id = 4;
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
assert_eq!(1, stored_manifest.id());
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty.value.core.replay_after_wal_id = 3;
dirty.value.core.next_wal_sst_id = 4;
dirty.value.core.checkpoints.push(new_checkpoint(1, None));
stored_manifest.update(dirty).await.unwrap();
assert_eq!(2, stored_manifest.id());
for i in 1..=3 {
set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(i)),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id2);
assert_eq!(wal_ssts[1].id, id3);
}
#[tokio::test]
async fn test_collect_garbage_wal_ssts_and_keep_expired_last_compacted() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
let mut sst1 = table_store.table_builder();
sst1.add(RowEntry::new_value(b"key", b"value", 0)).unwrap();
let table1 = sst1.build().unwrap();
table_store.write_sst(&id1, table1, false).await.unwrap();
let id2 = SsTableId::Wal(2);
let mut sst2 = table_store.table_builder();
sst2.add(RowEntry::new_value(b"key", b"value", 0)).unwrap();
let table2 = sst2.build().unwrap();
table_store.write_sst(&id2, table2, false).await.unwrap();
let now_minus_24h_1 = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(1)),
86400,
);
let now_minus_24h_2 = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(2)),
86400,
);
let mut state = CoreDbState::new();
state.replay_after_wal_id = id2.unwrap_wal_id();
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id1);
assert_eq!(wal_ssts[1].id, id2);
assert_eq!(wal_ssts[0].last_modified, now_minus_24h_1);
assert_eq!(wal_ssts[1].last_modified, now_minus_24h_2);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(
current_manifest.core.replay_after_wal_id,
id2.unwrap_wal_id()
);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 1);
assert_eq!(wal_ssts[0].id, id2);
}
#[tokio::test]
async fn test_collect_garbage_compacted_ssts() {
let (manifest_store, compactions_store, table_store, _local_object_store) = build_objects();
let now = DefaultSystemClock::default().now();
let expired_base_ms = (now - TimeDelta::seconds(7200)).timestamp_millis() as u64; let unexpired_base_ms = (now - TimeDelta::seconds(1800)).timestamp_millis() as u64;
let l0_sst_handle = create_sst(table_store.clone(), unexpired_base_ms).await;
let active_expired_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms).await;
let inactive_expired_l0_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 1).await;
let inactive_unexpired_l0_sst_handle =
create_sst(table_store.clone(), unexpired_base_ms + 1).await;
let active_sst_handle = create_sst(table_store.clone(), unexpired_base_ms + 2).await;
let active_expired_sst_handle = create_sst(table_store.clone(), expired_base_ms + 2).await;
let inactive_expired_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 3).await;
let inactive_unexpired_sst_handle =
create_sst(table_store.clone(), unexpired_base_ms + 3).await;
let mut state = CoreDbState::new();
state.l0.push_back(l0_sst_handle.clone());
state.l0.push_back(active_expired_l0_sst_handle.clone());
state.compacted.push(SortedRun {
id: 1,
ssts: vec![active_sst_handle.clone(), active_expired_sst_handle.clone()],
});
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 8);
let ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
for expected in [
l0_sst_handle.id,
active_expired_l0_sst_handle.id,
inactive_expired_l0_sst_handle.id,
inactive_unexpired_l0_sst_handle.id,
active_sst_handle.id,
active_expired_sst_handle.id,
inactive_expired_sst_handle.id,
inactive_unexpired_sst_handle.id,
] {
assert!(ids.contains(&expected));
}
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(current_manifest.core.l0.len(), 2);
assert_eq!(current_manifest.core.compacted.len(), 1);
assert_eq!(current_manifest.core.compacted[0].ssts.len(), 2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 6);
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
for expected in [
l0_sst_handle.id,
active_expired_l0_sst_handle.id,
inactive_unexpired_l0_sst_handle.id,
active_sst_handle.id,
active_expired_sst_handle.id,
inactive_unexpired_sst_handle.id,
] {
assert!(remaining_ids.contains(&expected));
}
assert!(!remaining_ids.contains(&inactive_expired_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_expired_sst_handle.id));
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(current_manifest.core.l0.len(), 2);
assert_eq!(current_manifest.core.compacted.len(), 1);
assert_eq!(current_manifest.core.compacted[0].ssts.len(), 2);
}
#[tokio::test]
async fn test_collect_garbage_compacted_ssts_respects_checkpoint_references() {
let (manifest_store, compactions_store, table_store, _local_object_store) = build_objects();
let now = DefaultSystemClock::default().now();
let expired_base_ms = (now - TimeDelta::seconds(7200)).timestamp_millis() as u64;
let active_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms + 5).await;
let active_checkpoint_l0_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 4).await;
let active_sst_handle = create_sst(table_store.clone(), expired_base_ms + 3).await;
let active_checkpoint_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 2).await;
let inactive_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms + 1).await;
let inactive_sst_handle = create_sst(table_store.clone(), expired_base_ms).await;
let mut state = CoreDbState::new();
state.l0.push_back(active_l0_sst_handle.clone());
state.l0.push_back(active_checkpoint_l0_sst_handle.clone());
state.compacted.push(SortedRun {
id: 1,
ssts: vec![active_sst_handle.clone()],
});
state.compacted.push(SortedRun {
id: 2,
ssts: vec![active_checkpoint_sst_handle.clone()],
});
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty.value.core.l0.truncate(1);
dirty.value.core.compacted.truncate(1);
stored_manifest.update(dirty).await.unwrap();
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 4);
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
assert!(remaining_ids.contains(&active_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_checkpoint_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_sst_handle.id));
assert!(remaining_ids.contains(&active_checkpoint_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_sst_handle.id));
remove_checkpoint(checkpoint_id, &mut stored_manifest)
.await
.unwrap();
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
eprintln!("remaining_ids: {:#?}", remaining_ids);
assert_eq!(remaining_ids.len(), 2);
assert!(remaining_ids.contains(&active_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_sst_handle.id));
assert!(!remaining_ids.contains(&active_checkpoint_sst_handle.id));
assert!(!remaining_ids.contains(&active_checkpoint_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_sst_handle.id));
}
fn build_objects() -> (
Arc<ManifestStore>,
Arc<CompactionsStore>,
Arc<TableStore>,
Arc<LocalFileSystem>,
) {
let tempdir = tempfile::tempdir().unwrap().keep();
let local_object_store = Arc::new(
LocalFileSystem::new_with_prefix(tempdir)
.unwrap()
.with_automatic_cleanup(true),
);
let path = Path::from("/");
let manifest_store = Arc::new(ManifestStore::new(&path, local_object_store.clone()));
let compactions_store = Arc::new(CompactionsStore::new(&path, local_object_store.clone()));
let sst_format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
ObjectStores::new(local_object_store.clone(), None),
sst_format,
path.clone(),
None,
));
(
manifest_store,
compactions_store,
table_store,
local_object_store,
)
}
async fn create_sst(table_store: Arc<TableStore>, ts_ms: u64) -> SsTableHandle {
let sst_id = SsTableId::Compacted(ulid::Ulid::from_parts(ts_ms, 0));
let mut sst = table_store.table_builder();
sst.add(RowEntry::new_value(b"key", b"value", 0)).unwrap();
let table = sst.build().unwrap();
table_store.write_sst(&sst_id, table, false).await.unwrap()
}
fn set_modified(
local_object_store: Arc<LocalFileSystem>,
path: &Path,
seconds_ago: u64,
) -> DateTime<Utc> {
let file = local_object_store.path_to_filesystem(path).unwrap();
let file = OpenOptions::new().write(true).open(file).unwrap();
let now_minus_24h = DefaultSystemClock::default().now()
- TimeDelta::seconds(seconds_ago.try_into().unwrap());
file.set_modified(now_minus_24h.into()).unwrap();
now_minus_24h
}
async fn assert_no_dangling_references(
manifest_store: Arc<ManifestStore>,
table_store: Arc<TableStore>,
) {
let (manifest_id, manifest) = manifest_store.read_latest_manifest().await.unwrap();
let manifests = manifest_store
.read_referenced_manifests(manifest_id, &manifest)
.await
.unwrap();
let wal_ssts = table_store
.list_wal_ssts(..)
.await
.unwrap()
.iter()
.map(|sst| sst.id)
.collect::<HashSet<SsTableId>>();
let compacted_ssts = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.iter()
.map(|sst| sst.id)
.collect::<HashSet<SsTableId>>();
for manifest in manifests.values() {
let wal_sst_start_inclusive = manifest.core.replay_after_wal_id + 1;
let wal_sst_end_exclusive = manifest.core.next_wal_sst_id;
for wal_sst_id in wal_sst_start_inclusive..wal_sst_end_exclusive {
assert!(wal_ssts.contains(&SsTableId::Wal(wal_sst_id)));
}
for sst in &manifest.core.l0 {
assert!(compacted_ssts.contains(&sst.id));
}
for sr in &manifest.core.compacted {
for sst in &sr.ssts {
assert!(compacted_ssts.contains(&sst.id));
}
}
}
}
async fn run_gc_once(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
compaction_low_watermark_dt: Option<DateTime<Utc>>,
) {
let stats = Arc::new(StatRegistry::new());
if let Some(compaction_low_watermark_dt) = compaction_low_watermark_dt {
let mut stored_compactions = if let Some(stored_compactions) =
StoredCompactions::try_load(compactions_store.clone())
.await
.expect("load failed")
{
stored_compactions
} else {
let manifest = StoredManifest::load(
manifest_store.clone(),
Arc::new(DefaultSystemClock::default()),
)
.await
.expect("manifest load failed");
StoredCompactions::create(
compactions_store.clone(),
manifest.manifest().compactor_epoch,
)
.await
.expect("compactions creation failed")
};
let mut compactions_dirty = stored_compactions.prepare_dirty().unwrap();
compactions_dirty.value.insert(Compaction::new(
ulid::Ulid::from_parts(compaction_low_watermark_dt.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![SourceId::SortedRun(0)], 0),
));
stored_compactions.update(compactions_dirty).await.unwrap();
}
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
wal_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compacted_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compactions_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
};
let gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
stats.clone(),
Arc::new(DefaultSystemClock::default()),
);
gc.run_gc_once().await;
assert_no_dangling_references(manifest_store, table_store).await;
}
#[tokio::test]
async fn test_handle_should_only_run_one_task_per_message() {
use crate::dispatcher::MessageHandler;
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
CoreDbState::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
let stats = Arc::new(StatRegistry::new());
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
wal_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compacted_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compactions_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
};
let mut gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
stats.clone(),
Arc::new(DefaultSystemClock::default()),
);
gc.handle(GcMessage::GcWal).await.unwrap();
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(
manifests.len(),
2,
"manifest GC should not run on WAL message"
);
}
#[tokio::test(start_paused = true)]
async fn test_gc_shutdown() {
let (manifest_store, compactions_store, table_store, _) = build_objects();
let stats = Arc::new(StatRegistry::new());
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
wal_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
compacted_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
compactions_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
};
let gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
stats.clone(),
Arc::new(DefaultSystemClock::default()),
);
let (_, rx) = mpsc::unbounded_channel();
let clock = Arc::new(DefaultSystemClock::default());
let executor = MessageHandlerExecutor::new(WatchableOnceCell::new(), clock);
executor
.add_handler(
"garbage_collector".to_string(),
Box::new(gc),
rx,
&Handle::current(),
)
.expect("failed to spawn task");
let jh = executor
.monitor_on(&Handle::current())
.expect("failed to start monitor task");
executor.cancel_task(GC_TASK_NAME);
let result = executor.join_task(GC_TASK_NAME).await;
assert!(matches!(result, Ok(())));
jh.await.expect("failed to join task");
}
}