use crate::checkpoint::Checkpoint;
use crate::compactions_store::CompactionsStore;
use crate::config::GarbageCollectorOptions;
pub use crate::db::builder::GarbageCollectorBuilder;
use crate::dispatcher::{MessageFactory, MessageHandler};
use crate::error::SlateDBError;
use crate::garbage_collector::stats::GcStats;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::manifest::Manifest;
use crate::tablestore::TableStore;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use compacted_gc::CompactedGcTask;
use compactions_gc::CompactionsGcTask;
use futures::stream::BoxStream;
use log::{error, info};
use manifest_gc::ManifestGcTask;
use slatedb_common::clock::SystemClock;
use slatedb_common::metrics::MetricsRecorderHelper;
use slatedb_txn_obj::{DirtyObject, SimpleTransactionalObject, TransactionalObject};
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument;
use wal_gc::WalGcTask;
mod compacted_gc;
mod compactions_gc;
mod manifest_gc;
pub mod stats;
mod wal_gc;
pub(crate) const DEFAULT_MIN_AGE: Duration = Duration::from_secs(300);
pub(crate) const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
pub(crate) const GC_TASK_NAME: &str = "garbage_collector";
trait GcTask {
fn resource(&self) -> &str;
async fn collect(&self, now: DateTime<Utc>) -> Result<(), SlateDBError>;
}
#[derive(Debug)]
pub(crate) enum GcMessage {
Wal,
Compacted,
Compactions,
Manifest,
}
pub struct GarbageCollector {
manifest_store: Arc<ManifestStore>,
options: GarbageCollectorOptions,
stats: Arc<GcStats>,
system_clock: Arc<dyn SystemClock>,
manifest_gc_task: Option<ManifestGcTask>,
wal_gc_task: Option<WalGcTask>,
compacted_gc_task: Option<CompactedGcTask>,
compactions_gc_task: Option<CompactionsGcTask>,
}
#[async_trait]
impl MessageHandler<GcMessage> for GarbageCollector {
fn tickers(&mut self) -> Vec<(Duration, Box<MessageFactory<GcMessage>>)> {
let mut tickers: Vec<(Duration, Box<MessageFactory<GcMessage>>)> = Vec::new();
if let Some(opts) = self.options.manifest_options {
tickers.push((
opts.interval.unwrap_or(DEFAULT_INTERVAL),
Box::new(|| GcMessage::Manifest),
));
}
if let Some(opts) = self.options.wal_options {
tickers.push((
opts.interval.unwrap_or(DEFAULT_INTERVAL),
Box::new(|| GcMessage::Wal),
));
}
if let Some(opts) = self.options.compacted_options {
tickers.push((
opts.interval.unwrap_or(DEFAULT_INTERVAL),
Box::new(|| GcMessage::Compacted),
));
}
if let Some(opts) = self.options.compactions_options {
tickers.push((
opts.interval.unwrap_or(DEFAULT_INTERVAL),
Box::new(|| GcMessage::Compactions),
));
}
tickers
}
async fn handle(&mut self, message: GcMessage) -> Result<(), SlateDBError> {
match message {
GcMessage::Manifest => {
let task = self
.manifest_gc_task
.as_ref()
.expect("got manifest tick with unconfigured manifest task");
self.run_gc_task(task).await;
}
GcMessage::Wal => {
let task = self
.wal_gc_task
.as_ref()
.expect("got wal tick with unconfigured wal task");
self.run_gc_task(task).await;
}
GcMessage::Compacted => {
let task = self
.compacted_gc_task
.as_ref()
.expect("got compacted tick with unconfigured compacted task");
self.run_gc_task(task).await;
}
GcMessage::Compactions => {
let task = self
.compactions_gc_task
.as_ref()
.expect("got compactions tick with unconfigured compactions task");
self.run_gc_task(task).await;
}
}
Ok(())
}
async fn cleanup(
&mut self,
_messages: BoxStream<'async_trait, GcMessage>,
_result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
info!("garbage collector shutdown");
Ok(())
}
}
impl GarbageCollector {
pub(crate) fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: GarbageCollectorOptions,
recorder: &MetricsRecorderHelper,
system_clock: Arc<dyn SystemClock>,
) -> Self {
let stats = Arc::new(GcStats::new(recorder));
let wal_gc_task = options.wal_options.map(|wal_options| {
WalGcTask::new(
manifest_store.clone(),
table_store.clone(),
stats.clone(),
wal_options,
)
});
let compacted_gc_task = options.compacted_options.map(|compacted_options| {
CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats.clone(),
compacted_options,
)
});
let compactions_gc_task = options.compactions_options.map(|compactions_options| {
CompactionsGcTask::new(
compactions_store.clone(),
stats.clone(),
compactions_options,
)
});
let manifest_gc_task = options.manifest_options.map(|manifest_options| {
ManifestGcTask::new(manifest_store.clone(), stats.clone(), manifest_options)
});
Self {
manifest_store,
options,
stats,
system_clock,
manifest_gc_task,
wal_gc_task,
compacted_gc_task,
compactions_gc_task,
}
}
pub async fn run_gc_once(&self) {
if let Some(task) = &self.manifest_gc_task {
self.run_gc_task(task).await;
}
if let Some(task) = &self.wal_gc_task {
self.run_gc_task(task).await;
}
if let Some(task) = &self.compacted_gc_task {
self.run_gc_task(task).await;
}
if let Some(task) = &self.compactions_gc_task {
self.run_gc_task(task).await;
}
self.stats.gc_count.increment(1);
}
#[instrument(level = "debug", skip_all, fields(resource = task.resource()))]
async fn run_gc_task<T: GcTask + std::fmt::Debug>(&self, task: &T) {
if let Err(e) = self.remove_expired_checkpoints().await {
error!("error removing expired checkpoints [error={}]", e);
} else if let Err(e) = task.collect(self.system_clock.now()).await {
error!("error collecting compacted garbage [error={}]", e);
}
}
async fn remove_expired_checkpoints(&self) -> Result<(), SlateDBError> {
let mut stored_manifest =
StoredManifest::load(Arc::clone(&self.manifest_store), self.system_clock.clone())
.await?;
stored_manifest
.maybe_apply_update(|manifest| self.filter_expired_checkpoints(manifest))
.await
}
fn filter_expired_checkpoints(
&self,
manifest: &SimpleTransactionalObject<Manifest>,
) -> Result<Option<DirtyObject<Manifest>>, SlateDBError> {
let utc_now: DateTime<Utc> = self.system_clock.now();
let mut dirty = manifest.prepare_dirty()?;
let retained_checkpoints: Vec<Checkpoint> = dirty
.value
.core
.checkpoints
.iter()
.filter(|checkpoint| match checkpoint.expire_time {
Some(expire_time) => expire_time > utc_now,
None => true,
})
.cloned()
.collect();
let maybe_dirty = if dirty.value.core.checkpoints.len() != retained_checkpoints.len() {
dirty.value.core.checkpoints = retained_checkpoints;
Some(dirty)
} else {
None
};
Ok(maybe_dirty)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::{fs::OpenOptions, sync::Arc};
use chrono::{DateTime, Days, TimeDelta, Utc};
use object_store::{local::LocalFileSystem, path::Path};
use tokio::runtime::Handle;
use uuid::Uuid;
use crate::checkpoint::Checkpoint;
use crate::compactions_store::StoredCompactions;
use crate::compactor_state::{Compaction, CompactionSpec, SourceId};
use crate::config::{GarbageCollectorDirectoryOptions, GarbageCollectorOptions};
use crate::dispatcher::MessageHandlerExecutor;
use crate::error::SlateDBError;
use crate::object_stores::ObjectStores;
use crate::paths::PathResolver;
use crate::types::RowEntry;
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::metrics::{
lookup_metric_with_labels, DefaultMetricsRecorder, MetricsRecorderHelper,
};
use crate::db_status::ClosedResultWriter;
use crate::format::sst::SsTableFormat;
use crate::utils::WatchableOnceCell;
use crate::{
db_state::{ManifestCore, SortedRun, SsTableHandle, SsTableId, SsTableView},
manifest::store::{ManifestStore, StoredManifest},
tablestore::TableStore,
};
#[tokio::test]
async fn test_collect_garbage_manifest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = ManifestCore::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let now_minus_24h = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
assert_eq!(manifests[0].last_modified, now_minus_24h);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
}
#[tokio::test]
async fn test_collect_garbage_only_recent_manifests() {
let (manifest_store, compactions_store, table_store, _) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
}
#[tokio::test]
async fn test_collect_garbage_compactions_keeps_latest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut stored_compactions = StoredCompactions::create(compactions_store.clone(), 0)
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
let compaction_count = compactions_store
.list_compactions(..)
.await
.expect("should list compactions")
.len();
assert_eq!(compaction_count, 3);
for id in 1..=compaction_count {
set_modified(
local_object_store.clone(),
&Path::from(format!("compactions/{:020}.{}", id, "compactions")),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let compactions = compactions_store.list_compactions(..).await.unwrap();
assert_eq!(compactions.len(), 1);
assert_eq!(compactions[0].id, 3);
}
#[tokio::test]
async fn test_collect_garbage_compactions_respects_min_age() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut stored_compactions = StoredCompactions::create(compactions_store.clone(), 0)
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
let compaction_count = compactions_store
.list_compactions(..)
.await
.expect("should list compactions")
.len();
assert_eq!(compaction_count, 2);
for id in 1..=compaction_count {
set_modified(
local_object_store.clone(),
&Path::from(format!("compactions/{:020}.{}", id, "compactions")),
60,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let compactions = compactions_store.list_compactions(..).await.unwrap();
assert_eq!(compactions.len(), 2);
assert_eq!(compactions[0].id, 1);
assert_eq!(compactions[1].id, 2);
}
fn new_checkpoint(manifest_id: u64, expire_time: Option<DateTime<Utc>>) -> Checkpoint {
Checkpoint {
id: uuid::Uuid::new_v4(),
manifest_id,
expire_time,
create_time: DefaultSystemClock::default().now(),
name: None,
}
}
async fn checkpoint_current_manifest(
stored_manifest: &mut StoredManifest,
expire_time: Option<DateTime<Utc>>,
) -> Result<Uuid, SlateDBError> {
let mut dirty = stored_manifest.prepare_dirty()?;
let checkpoint = new_checkpoint(stored_manifest.id(), expire_time);
let checkpoint_id = checkpoint.id;
dirty.value.core.checkpoints.push(checkpoint);
stored_manifest.update(dirty).await?;
Ok(checkpoint_id)
}
async fn remove_checkpoint(
checkpoint_id: Uuid,
stored_manifest: &mut StoredManifest,
) -> Result<(), SlateDBError> {
let mut dirty = stored_manifest.prepare_dirty()?;
let updated_checkpoints = dirty
.value
.core
.checkpoints
.iter()
.filter(|checkpoint| checkpoint.id != checkpoint_id)
.cloned()
.collect();
dirty.value.core.checkpoints = updated_checkpoints;
stored_manifest.update(dirty).await?;
Ok(())
}
#[tokio::test]
async fn test_remove_expired_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = ManifestCore::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let one_day_ago = DefaultSystemClock::default()
.now()
.checked_sub_days(Days::new(1))
.unwrap();
let _expired_checkpoint_id =
checkpoint_current_manifest(&mut stored_manifest, Some(one_day_ago))
.await
.unwrap();
let one_day_ahead = DefaultSystemClock::default()
.now()
.checked_add_days(Days::new(1))
.unwrap();
let unexpired_checkpoint_id =
checkpoint_current_manifest(&mut stored_manifest, Some(one_day_ahead))
.await
.unwrap();
for i in 1..=3 {
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", i, "manifest")),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let (latest_manifest_id, latest_manifest) =
manifest_store.read_latest_manifest().await.unwrap();
assert_eq!(4, latest_manifest_id);
assert_eq!(1, latest_manifest.core.checkpoints.len());
assert_eq!(
unexpired_checkpoint_id,
latest_manifest.core.checkpoints[0].id
);
assert_eq!(2, latest_manifest.core.checkpoints[0].manifest_id);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 2);
assert_eq!(manifests[1].id, 4);
}
#[tokio::test]
async fn test_collector_should_not_clean_manifests_referenced_by_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let state = ManifestCore::new();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let active_checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
let inactive_checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
remove_checkpoint(inactive_checkpoint_id, &mut stored_manifest)
.await
.unwrap();
for i in 1..4 {
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", i, "manifest")),
86400,
);
}
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 4);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let (latest_manifest_id, latest_manifest) =
manifest_store.read_latest_manifest().await.unwrap();
assert_eq!(4, latest_manifest_id);
assert_eq!(1, latest_manifest.core.checkpoints.len());
assert_eq!(active_checkpoint_id, latest_manifest.core.checkpoints[0].id);
assert_eq!(1, latest_manifest.core.checkpoints[0].manifest_id);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 4);
}
#[tokio::test]
async fn test_collect_garbage_old_active_manifest() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
let now_minus_24h_1 = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let now_minus_24h_2 = set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 2, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
assert_eq!(manifests[0].id, 1);
assert_eq!(manifests[1].id, 2);
assert_eq!(manifests[0].last_modified, now_minus_24h_1);
assert_eq!(manifests[1].last_modified, now_minus_24h_2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
assert_eq!(manifests[0].id, 2);
}
async fn write_sst(
table_store: Arc<TableStore>,
table_id: &SsTableId,
) -> Result<(), SlateDBError> {
let mut sst = table_store.table_builder();
sst.add(RowEntry::new_value(b"key", b"value", 0)).await?;
let table1 = sst.build().await?;
table_store.write_sst(table_id, table1, false).await?;
Ok(())
}
#[tokio::test]
async fn test_collect_garbage_wal_ssts() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
write_sst(table_store.clone(), &id1).await.unwrap();
let id2 = SsTableId::Wal(2);
write_sst(table_store.clone(), &id2).await.unwrap();
let now_minus_24h = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(1)),
86400,
);
let mut state = ManifestCore::new();
state.replay_after_wal_id = id2.unwrap_wal_id();
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id1);
assert_eq!(wal_ssts[1].id, id2);
assert_eq!(wal_ssts[0].last_modified, now_minus_24h);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(
current_manifest.core.replay_after_wal_id,
id2.unwrap_wal_id()
);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 1);
assert_eq!(wal_ssts[0].id, id2);
}
#[tokio::test]
async fn test_do_not_remove_wals_referenced_by_active_checkpoints() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
write_sst(table_store.clone(), &id1).await.unwrap();
let id2 = SsTableId::Wal(2);
write_sst(table_store.clone(), &id2).await.unwrap();
let id3 = SsTableId::Wal(3);
write_sst(table_store.clone(), &id3).await.unwrap();
let mut state = ManifestCore::new();
state.replay_after_wal_id = 1;
state.next_wal_sst_id = 4;
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
assert_eq!(1, stored_manifest.id());
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty.value.core.replay_after_wal_id = 3;
dirty.value.core.next_wal_sst_id = 4;
dirty.value.core.checkpoints.push(new_checkpoint(1, None));
stored_manifest.update(dirty).await.unwrap();
assert_eq!(2, stored_manifest.id());
for i in 1..=3 {
set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(i)),
86400,
);
}
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id2);
assert_eq!(wal_ssts[1].id, id3);
}
#[tokio::test]
async fn test_collect_garbage_wal_ssts_and_keep_expired_last_compacted() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
let mut sst1 = table_store.table_builder();
sst1.add(RowEntry::new_value(b"key", b"value", 0))
.await
.unwrap();
let table1 = sst1.build().await.unwrap();
table_store.write_sst(&id1, table1, false).await.unwrap();
let id2 = SsTableId::Wal(2);
let mut sst2 = table_store.table_builder();
sst2.add(RowEntry::new_value(b"key", b"value", 0))
.await
.unwrap();
let table2 = sst2.build().await.unwrap();
table_store.write_sst(&id2, table2, false).await.unwrap();
let now_minus_24h_1 = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(1)),
86400,
);
let now_minus_24h_2 = set_modified(
local_object_store.clone(),
&path_resolver.table_path(&SsTableId::Wal(2)),
86400,
);
let mut state = ManifestCore::new();
state.replay_after_wal_id = id2.unwrap_wal_id();
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 2);
assert_eq!(wal_ssts[0].id, id1);
assert_eq!(wal_ssts[1].id, id2);
assert_eq!(wal_ssts[0].last_modified, now_minus_24h_1);
assert_eq!(wal_ssts[1].last_modified, now_minus_24h_2);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(
current_manifest.core.replay_after_wal_id,
id2.unwrap_wal_id()
);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
None,
)
.await;
let wal_ssts = table_store.list_wal_ssts(..).await.unwrap();
assert_eq!(wal_ssts.len(), 1);
assert_eq!(wal_ssts[0].id, id2);
}
#[tokio::test]
async fn test_collect_garbage_compacted_ssts() {
let (manifest_store, compactions_store, table_store, _local_object_store) = build_objects();
let now = DefaultSystemClock::default().now();
let expired_base_ms = (now - TimeDelta::seconds(7200)).timestamp_millis() as u64; let unexpired_base_ms = (now - TimeDelta::seconds(1800)).timestamp_millis() as u64;
let l0_sst_handle = create_sst(table_store.clone(), unexpired_base_ms).await;
let active_expired_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms).await;
let inactive_expired_l0_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 1).await;
let inactive_unexpired_l0_sst_handle =
create_sst(table_store.clone(), unexpired_base_ms + 1).await;
let active_sst_handle = create_sst(table_store.clone(), unexpired_base_ms + 2).await;
let active_expired_sst_handle = create_sst(table_store.clone(), expired_base_ms + 2).await;
let inactive_expired_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 3).await;
let inactive_unexpired_sst_handle =
create_sst(table_store.clone(), unexpired_base_ms + 3).await;
let mut state = ManifestCore::new();
state
.l0
.push_back(SsTableView::identity(l0_sst_handle.clone()));
state
.l0
.push_back(SsTableView::identity(active_expired_l0_sst_handle.clone()));
state.compacted.push(SortedRun {
id: 1,
sst_views: vec![
SsTableView::identity(active_sst_handle.clone()),
SsTableView::identity(active_expired_sst_handle.clone()),
],
});
StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 8);
let ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
for expected in [
l0_sst_handle.id,
active_expired_l0_sst_handle.id,
inactive_expired_l0_sst_handle.id,
inactive_unexpired_l0_sst_handle.id,
active_sst_handle.id,
active_expired_sst_handle.id,
inactive_expired_sst_handle.id,
inactive_unexpired_sst_handle.id,
] {
assert!(ids.contains(&expected));
}
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 1);
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(current_manifest.core.l0.len(), 2);
assert_eq!(current_manifest.core.compacted.len(), 1);
assert_eq!(current_manifest.core.compacted[0].sst_views.len(), 2);
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 6);
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
for expected in [
l0_sst_handle.id,
active_expired_l0_sst_handle.id,
inactive_unexpired_l0_sst_handle.id,
active_sst_handle.id,
active_expired_sst_handle.id,
inactive_unexpired_sst_handle.id,
] {
assert!(remaining_ids.contains(&expected));
}
assert!(!remaining_ids.contains(&inactive_expired_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_expired_sst_handle.id));
let current_manifest = manifest_store.read_latest_manifest().await.unwrap().1;
assert_eq!(current_manifest.core.l0.len(), 2);
assert_eq!(current_manifest.core.compacted.len(), 1);
assert_eq!(current_manifest.core.compacted[0].sst_views.len(), 2);
}
#[tokio::test]
async fn test_collect_garbage_compacted_ssts_respects_checkpoint_references() {
let (manifest_store, compactions_store, table_store, _local_object_store) = build_objects();
let now = DefaultSystemClock::default().now();
let expired_base_ms = (now - TimeDelta::seconds(7200)).timestamp_millis() as u64;
let active_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms + 5).await;
let active_checkpoint_l0_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 4).await;
let active_sst_handle = create_sst(table_store.clone(), expired_base_ms + 3).await;
let active_checkpoint_sst_handle =
create_sst(table_store.clone(), expired_base_ms + 2).await;
let inactive_l0_sst_handle = create_sst(table_store.clone(), expired_base_ms + 1).await;
let inactive_sst_handle = create_sst(table_store.clone(), expired_base_ms).await;
let mut state = ManifestCore::new();
state
.l0
.push_back(SsTableView::identity(active_l0_sst_handle.clone()));
state.l0.push_back(SsTableView::identity(
active_checkpoint_l0_sst_handle.clone(),
));
state.compacted.push(SortedRun {
id: 1,
sst_views: vec![SsTableView::identity(active_sst_handle.clone())],
});
state.compacted.push(SortedRun {
id: 2,
sst_views: vec![SsTableView::identity(active_checkpoint_sst_handle.clone())],
});
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
state.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let checkpoint_id = checkpoint_current_manifest(&mut stored_manifest, None)
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty.value.core.l0.truncate(1);
dirty.value.core.compacted.truncate(1);
stored_manifest.update(dirty).await.unwrap();
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
assert_eq!(compacted_ssts.len(), 4);
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
assert!(remaining_ids.contains(&active_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_checkpoint_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_sst_handle.id));
assert!(remaining_ids.contains(&active_checkpoint_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_sst_handle.id));
remove_checkpoint(checkpoint_id, &mut stored_manifest)
.await
.unwrap();
run_gc_once(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
Some(now),
)
.await;
let compacted_ssts = table_store.list_compacted_ssts(..).await.unwrap();
let remaining_ids: HashSet<_> = compacted_ssts.iter().map(|m| m.id).collect();
eprintln!("remaining_ids: {:#?}", remaining_ids);
assert_eq!(remaining_ids.len(), 2);
assert!(remaining_ids.contains(&active_l0_sst_handle.id));
assert!(remaining_ids.contains(&active_sst_handle.id));
assert!(!remaining_ids.contains(&active_checkpoint_sst_handle.id));
assert!(!remaining_ids.contains(&active_checkpoint_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_l0_sst_handle.id));
assert!(!remaining_ids.contains(&inactive_sst_handle.id));
}
fn build_objects() -> (
Arc<ManifestStore>,
Arc<CompactionsStore>,
Arc<TableStore>,
Arc<LocalFileSystem>,
) {
let tempdir = tempfile::tempdir().unwrap().keep();
let local_object_store = Arc::new(
LocalFileSystem::new_with_prefix(tempdir)
.unwrap()
.with_automatic_cleanup(true),
);
let path = Path::from("/");
let manifest_store = Arc::new(ManifestStore::new(&path, local_object_store.clone()));
let compactions_store = Arc::new(CompactionsStore::new(&path, local_object_store.clone()));
let sst_format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
ObjectStores::new(local_object_store.clone(), None),
sst_format,
path,
None,
));
(
manifest_store,
compactions_store,
table_store,
local_object_store,
)
}
async fn create_sst(table_store: Arc<TableStore>, ts_ms: u64) -> SsTableHandle {
let sst_id = SsTableId::Compacted(ulid::Ulid::from_parts(ts_ms, 0));
let mut sst = table_store.table_builder();
sst.add(RowEntry::new_value(b"key", b"value", 0))
.await
.unwrap();
let table = sst.build().await.unwrap();
table_store.write_sst(&sst_id, table, false).await.unwrap()
}
fn set_modified(
local_object_store: Arc<LocalFileSystem>,
path: &Path,
seconds_ago: u64,
) -> DateTime<Utc> {
let file = local_object_store.path_to_filesystem(path).unwrap();
let file = OpenOptions::new().write(true).open(file).unwrap();
let now_minus_24h = DefaultSystemClock::default().now()
- TimeDelta::seconds(seconds_ago.try_into().unwrap());
file.set_modified(now_minus_24h.into()).unwrap();
now_minus_24h
}
async fn assert_no_dangling_references(
manifest_store: Arc<ManifestStore>,
table_store: Arc<TableStore>,
) {
let (manifest_id, manifest) = manifest_store.read_latest_manifest().await.unwrap();
let manifests = manifest_store
.read_referenced_manifests(manifest_id, &manifest)
.await
.unwrap();
let wal_ssts = table_store
.list_wal_ssts(..)
.await
.unwrap()
.iter()
.map(|sst| sst.id)
.collect::<HashSet<SsTableId>>();
let compacted_ssts = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.iter()
.map(|sst| sst.id)
.collect::<HashSet<SsTableId>>();
for manifest in manifests.values() {
let wal_sst_start_inclusive = manifest.core.replay_after_wal_id + 1;
let wal_sst_end_exclusive = manifest.core.next_wal_sst_id;
for wal_sst_id in wal_sst_start_inclusive..wal_sst_end_exclusive {
assert!(wal_ssts.contains(&SsTableId::Wal(wal_sst_id)));
}
for view in &manifest.core.l0 {
assert!(compacted_ssts.contains(&view.sst.id));
}
for sr in &manifest.core.compacted {
for view in &sr.sst_views {
assert!(compacted_ssts.contains(&view.sst.id));
}
}
}
}
async fn run_gc_once(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
compaction_low_watermark_dt: Option<DateTime<Utc>>,
) {
run_gc_once_with_recorder(
manifest_store,
compactions_store,
table_store,
compaction_low_watermark_dt,
&MetricsRecorderHelper::noop(),
)
.await;
}
async fn run_gc_once_with_recorder(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
compaction_low_watermark_dt: Option<DateTime<Utc>>,
recorder: &MetricsRecorderHelper,
) {
if let Some(compaction_low_watermark_dt) = compaction_low_watermark_dt {
let mut stored_compactions = if let Some(stored_compactions) =
StoredCompactions::try_load(compactions_store.clone())
.await
.expect("load failed")
{
stored_compactions
} else {
let manifest = StoredManifest::load(
manifest_store.clone(),
Arc::new(DefaultSystemClock::default()),
)
.await
.expect("manifest load failed");
StoredCompactions::create(
compactions_store.clone(),
manifest.manifest().compactor_epoch,
)
.await
.expect("compactions creation failed")
};
let mut compactions_dirty = stored_compactions.prepare_dirty().unwrap();
compactions_dirty.value.insert(Compaction::new(
ulid::Ulid::from_parts(compaction_low_watermark_dt.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![SourceId::SortedRun(0)], 0),
));
stored_compactions.update(compactions_dirty).await.unwrap();
}
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
wal_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compacted_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compactions_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
};
let gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
recorder,
Arc::new(DefaultSystemClock::default()),
);
gc.run_gc_once().await;
assert_no_dangling_references(manifest_store, table_store).await;
}
#[tokio::test]
async fn test_handle_should_only_run_one_task_per_message() {
use crate::dispatcher::MessageHandler;
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(manifests.len(), 2);
let recorder = MetricsRecorderHelper::noop();
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
wal_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compacted_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compactions_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
};
let mut gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
&recorder,
Arc::new(DefaultSystemClock::default()),
);
gc.handle(GcMessage::Wal).await.unwrap();
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(
manifests.len(),
2,
"manifest GC should not run on WAL message"
);
}
#[tokio::test]
async fn test_run_gc_once_skips_disabled_manifest_gc() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
set_modified(
local_object_store.clone(),
&Path::from(format!("manifest/{:020}.{}", 1, "manifest")),
86400,
);
let recorder = MetricsRecorderHelper::noop();
let gc_opts = GarbageCollectorOptions {
manifest_options: None,
wal_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compacted_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
compactions_options: Some(GarbageCollectorDirectoryOptions {
min_age: std::time::Duration::from_secs(3600),
interval: None,
}),
};
let gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
&recorder,
Arc::new(DefaultSystemClock::default()),
);
gc.run_gc_once().await;
let manifests = manifest_store.list_manifests(..).await.unwrap();
assert_eq!(
manifests.len(),
2,
"manifest GC should not run when manifest options are disabled"
);
}
#[tokio::test]
async fn test_tickers_should_skip_disabled_gc_tasks() {
use crate::dispatcher::MessageHandler;
let (manifest_store, compactions_store, table_store, _) = build_objects();
let recorder = MetricsRecorderHelper::noop();
let gc_opts = GarbageCollectorOptions {
manifest_options: None,
wal_options: Some(GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(11)),
}),
compacted_options: None,
compactions_options: Some(GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(17)),
}),
};
let mut gc = GarbageCollector::new(
manifest_store,
compactions_store,
table_store,
gc_opts,
&recorder,
Arc::new(DefaultSystemClock::default()),
);
let intervals: Vec<_> = gc
.tickers()
.into_iter()
.map(|(interval, _)| interval)
.collect();
assert_eq!(
intervals,
vec![Duration::from_secs(11), Duration::from_secs(17),]
);
}
#[tokio::test(start_paused = true)]
async fn test_gc_shutdown() {
let (manifest_store, compactions_store, table_store, _) = build_objects();
let recorder = MetricsRecorderHelper::noop();
let gc_opts = GarbageCollectorOptions {
manifest_options: Some(GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
wal_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
compacted_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
compactions_options: Some(crate::config::GarbageCollectorDirectoryOptions {
min_age: Duration::from_secs(3600),
interval: Some(Duration::from_secs(1)),
}),
};
let gc = GarbageCollector::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
gc_opts,
&recorder,
Arc::new(DefaultSystemClock::default()),
);
let (_, rx) = async_channel::unbounded();
let clock = Arc::new(DefaultSystemClock::default());
let closed_result: Arc<dyn ClosedResultWriter> = Arc::new(WatchableOnceCell::new());
let executor = MessageHandlerExecutor::new(closed_result, clock);
executor
.add_handler(
"garbage_collector".to_string(),
Box::new(gc),
rx,
&Handle::current(),
)
.expect("failed to spawn task");
let jh = executor
.monitor_on(&Handle::current())
.expect("failed to start monitor task");
executor.cancel_task(GC_TASK_NAME);
let result = executor.join_task(GC_TASK_NAME).await;
assert!(matches!(result, Ok(())));
jh.await.expect("failed to join task");
}
#[tokio::test]
async fn test_should_record_gc_manifest_deleted_count() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
stored_manifest
.update(stored_manifest.prepare_dirty().unwrap())
.await
.unwrap();
set_modified(
local_object_store,
&Path::from(format!("manifest/{:020}.manifest", 1)),
86400,
);
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), Default::default());
run_gc_once_with_recorder(
manifest_store,
compactions_store,
table_store,
None,
&helper,
)
.await;
assert_eq!(
lookup_metric_with_labels(
&recorder,
crate::garbage_collector::stats::DELETED_COUNT,
&[("resource", "manifest")]
),
Some(1)
);
}
#[tokio::test]
async fn test_should_record_gc_wal_deleted_count() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
let path_resolver = PathResolver::new("/");
let id1 = SsTableId::Wal(1);
write_sst(table_store.clone(), &id1).await.unwrap();
let id2 = SsTableId::Wal(2);
write_sst(table_store.clone(), &id2).await.unwrap();
set_modified(local_object_store, &path_resolver.table_path(&id1), 86400);
let mut state = ManifestCore::new();
state.replay_after_wal_id = id2.unwrap_wal_id();
StoredManifest::create_new_db(
manifest_store.clone(),
state,
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), Default::default());
run_gc_once_with_recorder(
manifest_store,
compactions_store,
table_store,
None,
&helper,
)
.await;
assert_eq!(
lookup_metric_with_labels(
&recorder,
crate::garbage_collector::stats::DELETED_COUNT,
&[("resource", "wal")]
),
Some(1)
);
}
#[tokio::test]
async fn test_should_record_gc_compacted_deleted_count() {
let (manifest_store, compactions_store, table_store, _) = build_objects();
let now = DefaultSystemClock::default().now();
let expired_ms = (now - TimeDelta::seconds(7200)).timestamp_millis() as u64;
let unexpired_ms = (now - TimeDelta::seconds(1800)).timestamp_millis() as u64;
let active_l0_handle = create_sst(table_store.clone(), unexpired_ms).await;
let active_handle = create_sst(table_store.clone(), unexpired_ms + 1).await;
let inactive_expired_handle = create_sst(table_store.clone(), expired_ms).await;
let mut state = ManifestCore::new();
state.l0.push_back(SsTableView::identity(active_l0_handle));
state.compacted.push(SortedRun {
id: 1,
sst_views: vec![SsTableView::identity(active_handle)],
});
StoredManifest::create_new_db(
manifest_store.clone(),
state,
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), Default::default());
run_gc_once_with_recorder(
manifest_store,
compactions_store,
table_store.clone(),
Some(now),
&helper,
)
.await;
let remaining: HashSet<_> = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.iter()
.map(|s| s.id)
.collect();
assert!(!remaining.contains(&inactive_expired_handle.id));
assert_eq!(
lookup_metric_with_labels(
&recorder,
crate::garbage_collector::stats::DELETED_COUNT,
&[("resource", "compacted")]
),
Some(1)
);
}
#[tokio::test]
async fn test_should_record_gc_compactions_deleted_count() {
let (manifest_store, compactions_store, table_store, local_object_store) = build_objects();
StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let mut stored_compactions = StoredCompactions::create(compactions_store.clone(), 0)
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
stored_compactions
.update(stored_compactions.prepare_dirty().unwrap())
.await
.unwrap();
for id in 1..=3 {
set_modified(
local_object_store.clone(),
&Path::from(format!("compactions/{:020}.compactions", id)),
86400,
);
}
assert_eq!(
compactions_store.list_compactions(..).await.unwrap().len(),
3
);
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), Default::default());
run_gc_once_with_recorder(
manifest_store,
compactions_store.clone(),
table_store,
None,
&helper,
)
.await;
assert_eq!(
compactions_store.list_compactions(..).await.unwrap().len(),
1
);
assert_eq!(
lookup_metric_with_labels(
&recorder,
crate::garbage_collector::stats::DELETED_COUNT,
&[("resource", "compactions")]
),
Some(2)
);
}
}