use crate::{
compactions_store::CompactionsStore, compactor_state::Compactions,
compactor_state_protocols::CompactorStateReader, config::GarbageCollectorDirectoryOptions,
db_state::SsTableId, error::SlateDBError, manifest::store::ManifestStore, manifest::Manifest,
tablestore::TableStore,
};
use chrono::{DateTime, Utc};
use log::error;
use std::collections::HashSet;
use std::sync::Arc;
use super::{GcStats, GcTask};
pub(crate) struct CompactedGcTask {
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
stats: Arc<GcStats>,
compacted_options: GarbageCollectorDirectoryOptions,
}
impl std::fmt::Debug for CompactedGcTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactedGcTask")
.field("compacted_options", &self.compacted_options)
.finish()
}
}
impl CompactedGcTask {
pub(super) fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
stats: Arc<GcStats>,
compacted_options: GarbageCollectorDirectoryOptions,
) -> Self {
CompactedGcTask {
manifest_store,
compactions_store,
table_store,
stats,
compacted_options,
}
}
fn compacted_sst_min_age(&self) -> chrono::Duration {
chrono::Duration::from_std(self.compacted_options.min_age).expect("invalid duration")
}
async fn list_active_l0_and_compacted_ssts(
&self,
manifest_id: u64,
manifest: &Manifest,
) -> Result<HashSet<SsTableId>, SlateDBError> {
let active_manifests = self
.manifest_store
.read_referenced_manifests(manifest_id, manifest)
.await?;
let mut active_ssts = HashSet::new();
for manifest in active_manifests.values() {
for sr in manifest.core.compacted.iter() {
for view in sr.sst_views.iter() {
active_ssts.insert(view.sst.id);
}
}
for view in manifest.core.l0.iter() {
active_ssts.insert(view.sst.id);
}
}
Ok(active_ssts)
}
async fn newest_l0_dt(&self, manifest: &Manifest) -> Result<DateTime<Utc>, SlateDBError> {
let l0_timestamps = if !manifest.core.l0.is_empty() {
manifest
.core
.l0
.iter()
.map(|view| DateTime::<Utc>::from(view.sst.id.unwrap_compacted_id().datetime()))
.collect::<Vec<_>>()
} else if let Some(l0_last_compacted) = manifest.core.last_compacted_l0_sst_view_id {
vec![DateTime::<Utc>::from(l0_last_compacted.datetime())]
} else {
vec![DateTime::<Utc>::UNIX_EPOCH]
};
let max_l0_ts = l0_timestamps
.into_iter()
.max()
.expect("expected at least unix epoch");
Ok(max_l0_ts)
}
fn compaction_low_watermark_dt(compactions: &Option<(u64, Compactions)>) -> DateTime<Utc> {
match compactions {
Some((_, compactions)) => compactions
.iter()
.map(|c| DateTime::<Utc>::from(c.id().datetime()))
.min()
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH),
None => DateTime::<Utc>::UNIX_EPOCH,
}
}
}
impl GcTask for CompactedGcTask {
async fn collect(&self, utc_now: DateTime<Utc>) -> Result<(), SlateDBError> {
let state_reader = CompactorStateReader::new(&self.manifest_store, &self.compactions_store);
let view = state_reader.read_view().await?;
let compactions = view.compactions;
let (manifest_id, manifest) = view.manifest;
let compaction_low_watermark_dt = Self::compaction_low_watermark_dt(&compactions);
let active_ssts = self
.list_active_l0_and_compacted_ssts(manifest_id, &manifest)
.await?;
let configured_min_age_dt = utc_now - self.compacted_sst_min_age();
let newest_l0_dt = self.newest_l0_dt(&manifest).await?;
let cutoff_dt = configured_min_age_dt
.min(compaction_low_watermark_dt)
.min(newest_l0_dt);
log::debug!(
"calculated compacted SST GC cutoff [cutoff_dt={:?}, configured_min_age_dt={:?}, compaction_low_watermark_dt={:?}, most_recent_sst_dt={:?}]",
cutoff_dt,
configured_min_age_dt,
compaction_low_watermark_dt,
newest_l0_dt,
);
let sst_ids_to_delete = self
.table_store
.list_compacted_ssts(..)
.await?
.into_iter()
.map(|sst| sst.id)
.filter(|id| DateTime::<Utc>::from(id.unwrap_compacted_id().datetime()) < cutoff_dt)
.filter(|id| !active_ssts.contains(id))
.collect::<Vec<_>>();
for id in sst_ids_to_delete {
log::info!("deleting SST [id={:?}]", id);
if let Err(e) = self.table_store.delete_sst(&id).await {
error!("error deleting SST [id={:?}, error={}]", id, e);
} else {
self.stats.gc_compacted_count.increment(1);
}
}
Ok(())
}
fn resource(&self) -> &str {
"Compacted SSTs"
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::compactions_store::{CompactionsStore, StoredCompactions};
use crate::compactor_state::{Compaction, CompactionSpec, SourceId};
use crate::db_state::{ManifestCore, SsTableId, SsTableView};
use crate::format::sst::SsTableFormat;
use crate::manifest::store::StoredManifest;
use crate::object_stores::ObjectStores;
use crate::test_utils::build_test_sst;
use object_store::{memory::InMemory, path::Path};
use slatedb_common::clock::DefaultSystemClock;
#[tokio::test]
async fn test_compacted_gc_respects_min_age_cutoff() {
let main_store = Arc::new(InMemory::new());
let object_stores = ObjectStores::new(main_store.clone(), None);
let format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
object_stores,
format.clone(),
Path::from("/root"),
None,
));
let manifest_store = Arc::new(ManifestStore::new(&Path::from("/root"), main_store.clone()));
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compactions_store = Arc::new(CompactionsStore::new(
&Path::from("/root"),
main_store.clone(),
));
let mut stored_compactions = StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let mut compactions_dirty = stored_compactions.prepare_dirty().unwrap();
compactions_dirty.value.insert(Compaction::new(
ulid::Ulid::from_parts(9_000, 0),
CompactionSpec::new(vec![SourceId::SortedRun(0)], 0),
));
stored_compactions.update(compactions_dirty).await.unwrap();
let id_to_delete = SsTableId::Compacted(ulid::Ulid::from_parts(1_000, 0));
let id_within_min_age = SsTableId::Compacted(ulid::Ulid::from_parts(7_000, 0));
let id_active_recent = SsTableId::Compacted(ulid::Ulid::from_parts(8_000, 0));
let sst_to_delete = build_test_sst(&format, 1).await;
let sst_within_min_age = build_test_sst(&format, 1).await;
let sst_active_recent = build_test_sst(&format, 1).await;
table_store
.write_sst(&id_to_delete, sst_to_delete, false)
.await
.unwrap();
table_store
.write_sst(&id_within_min_age, sst_within_min_age, false)
.await
.unwrap();
let active_handle = table_store
.write_sst(&id_active_recent, sst_active_recent, false)
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty
.value
.core
.l0
.push_back(SsTableView::identity(active_handle));
stored_manifest.update(dirty).await.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let opts = GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_secs(5),
};
let stats = Arc::new(GcStats::new(&recorder));
let task = CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats,
opts,
);
let utc_now = DateTime::<Utc>::from_timestamp_millis(10_000).unwrap();
task.collect(utc_now).await.unwrap();
let remaining: Vec<_> = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(remaining, vec![id_within_min_age, id_active_recent]);
}
#[tokio::test]
async fn test_compacted_gc_respects_manifest_most_recent_sst() {
let main_store = Arc::new(InMemory::new());
let object_stores = ObjectStores::new(main_store.clone(), None);
let format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
object_stores,
format.clone(),
Path::from("/root"),
None,
));
let manifest_store = Arc::new(ManifestStore::new(&Path::from("/root"), main_store.clone()));
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compactions_store = Arc::new(CompactionsStore::new(
&Path::from("/root"),
main_store.clone(),
));
let mut stored_compactions = StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let mut compactions_dirty = stored_compactions.prepare_dirty().unwrap();
compactions_dirty.value.insert(Compaction::new(
ulid::Ulid::from_parts(5_000, 0),
CompactionSpec::new(vec![SourceId::SortedRun(0)], 0),
));
stored_compactions.update(compactions_dirty).await.unwrap();
let id_to_delete = SsTableId::Compacted(ulid::Ulid::from_parts(1_000, 0));
let id_manifest = SsTableId::Compacted(ulid::Ulid::from_parts(3_000, 0));
let id_newer = SsTableId::Compacted(ulid::Ulid::from_parts(4_000, 0));
let sst_to_delete = build_test_sst(&format, 1).await;
let sst_manifest = build_test_sst(&format, 1).await;
let sst_newer = build_test_sst(&format, 1).await;
table_store
.write_sst(&id_to_delete, sst_to_delete, false)
.await
.unwrap();
let manifest_handle = table_store
.write_sst(&id_manifest, sst_manifest, false)
.await
.unwrap();
table_store
.write_sst(&id_newer, sst_newer, false)
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty
.value
.core
.l0
.push_back(SsTableView::identity(manifest_handle));
stored_manifest.update(dirty).await.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let opts = GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_secs(0),
};
let stats = Arc::new(GcStats::new(&recorder));
let task = CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats,
opts,
);
let utc_now = DateTime::<Utc>::from_timestamp_millis(10_000).unwrap();
task.collect(utc_now).await.unwrap();
let remaining: Vec<_> = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(remaining, vec![id_manifest, id_newer]);
}
#[tokio::test]
async fn test_compacted_gc_respects_compaction_barrier() {
let main_store = Arc::new(InMemory::new());
let object_stores = ObjectStores::new(main_store.clone(), None);
let format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
object_stores,
format.clone(),
Path::from("/root"),
None,
));
let manifest_store = Arc::new(ManifestStore::new(&Path::from("/root"), main_store.clone()));
let compactions_store = Arc::new(CompactionsStore::new(
&Path::from("/root"),
main_store.clone(),
));
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compactor_epoch = stored_manifest.manifest().compactor_epoch;
let id_to_delete = SsTableId::Compacted(ulid::Ulid::from_parts(1_000, 0)); let id_barrier = SsTableId::Compacted(ulid::Ulid::from_parts(2_000, 0)); let id_to_newer = SsTableId::Compacted(ulid::Ulid::from_parts(3_000, 0)); let sst_to_delete = build_test_sst(&format, 1).await;
let sst_barrier = build_test_sst(&format, 1).await;
let sst_to_newer = build_test_sst(&format, 1).await;
table_store
.write_sst(&id_to_delete, sst_to_delete, false)
.await
.unwrap();
table_store
.write_sst(&id_barrier, sst_barrier, false)
.await
.unwrap();
let active_handle = table_store
.write_sst(&id_to_newer, sst_to_newer, false)
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
dirty
.value
.core
.l0
.push_back(SsTableView::identity(active_handle));
stored_manifest.update(dirty).await.unwrap();
let mut stored_compactions =
StoredCompactions::create(compactions_store.clone(), compactor_epoch)
.await
.unwrap();
let mut compactions_dirty = stored_compactions.prepare_dirty().unwrap();
compactions_dirty.value.insert(Compaction::new(
ulid::Ulid::from_parts(2_000, 0),
CompactionSpec::new(vec![SourceId::SortedRun(0)], 0),
));
stored_compactions.update(compactions_dirty).await.unwrap();
let opts = GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_secs(0),
};
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let stats = Arc::new(GcStats::new(&recorder));
let task = CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats,
opts,
);
let utc_now = DateTime::<Utc>::from_timestamp_millis(10_000).unwrap();
task.collect(utc_now).await.unwrap();
let remaining: Vec<_> = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert_eq!(remaining, vec![id_barrier, id_to_newer]);
}
#[tokio::test]
async fn test_compacted_gc_skips_running_compaction_output_without_watermark() {
let main_store = Arc::new(InMemory::new());
let object_stores = ObjectStores::new(main_store.clone(), None);
let format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
object_stores,
format.clone(),
Path::from("/root"),
None,
));
let manifest_store = Arc::new(ManifestStore::new(&Path::from("/root"), main_store.clone()));
let mut stored_manifest = StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let compactions_store = Arc::new(CompactionsStore::new(
&Path::from("/root"),
main_store.clone(),
));
StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let l0_id = SsTableId::Compacted(ulid::Ulid::from_parts(9_000, 0));
let l0_handle = table_store
.write_sst(&l0_id, build_test_sst(&format, 1).await, false)
.await
.unwrap();
let mut dirty_manifest = stored_manifest.prepare_dirty().unwrap();
dirty_manifest
.value
.core
.l0
.push_back(SsTableView::identity(l0_handle));
stored_manifest.update(dirty_manifest).await.unwrap();
let compaction_output_id = SsTableId::Compacted(ulid::Ulid::from_parts(6_000, 0));
table_store
.write_sst(
&compaction_output_id,
build_test_sst(&format, 1).await,
false,
)
.await
.unwrap();
let opts = GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_secs(2),
};
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let stats = Arc::new(GcStats::new(&recorder));
let task = CompactedGcTask::new(
manifest_store.clone(),
compactions_store.clone(),
table_store.clone(),
stats,
opts,
);
let utc_now = DateTime::<Utc>::from_timestamp_millis(10_000).unwrap();
task.collect(utc_now).await.unwrap();
let remaining: Vec<_> = table_store
.list_compacted_ssts(..)
.await
.unwrap()
.into_iter()
.map(|m| m.id)
.collect();
assert!(
remaining.contains(&compaction_output_id),
"expected GC to retain compacted SST output from a running compaction when the watermark is missing"
);
}
}