use crate::manifest::Manifest;
use crate::tablestore::SstFileMetadata;
use crate::{
config::GarbageCollectorDirectoryOptions, error::SlateDBError, manifest::store::ManifestStore,
tablestore::TableStore,
};
use chrono::{DateTime, Utc};
use log::error;
use std::collections::BTreeMap;
use std::sync::Arc;
use super::{GcStats, GcTask};
pub(crate) struct WalGcTask {
manifest_store: Arc<ManifestStore>,
table_store: Arc<TableStore>,
stats: Arc<GcStats>,
wal_options: GarbageCollectorDirectoryOptions,
}
impl std::fmt::Debug for WalGcTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WalGcTask")
.field("wal_options", &self.wal_options)
.finish()
}
}
impl WalGcTask {
pub(super) fn new(
manifest_store: Arc<ManifestStore>,
table_store: Arc<TableStore>,
stats: Arc<GcStats>,
wal_options: GarbageCollectorDirectoryOptions,
) -> Self {
WalGcTask {
manifest_store,
table_store,
stats,
wal_options,
}
}
fn is_wal_sst_eligible_for_deletion(
utc_now: &DateTime<Utc>,
wal_sst: &SstFileMetadata,
min_age: &chrono::Duration,
active_manifests: &BTreeMap<u64, Manifest>,
) -> bool {
if utc_now.signed_duration_since(wal_sst.last_modified) <= *min_age {
return false;
}
let wal_sst_id = wal_sst.id.unwrap_wal_id();
!active_manifests
.values()
.any(|manifest| manifest.has_wal_sst_reference(wal_sst_id))
}
fn wal_sst_min_age(&self) -> chrono::Duration {
chrono::Duration::from_std(self.wal_options.min_age).expect("invalid duration")
}
}
impl GcTask for WalGcTask {
async fn collect(&self, utc_now: DateTime<Utc>) -> Result<(), SlateDBError> {
let (latest_manifest_id, latest_manifest) =
self.manifest_store.read_latest_manifest().await?;
let active_manifests = self
.manifest_store
.read_referenced_manifests(latest_manifest_id, &latest_manifest)
.await?;
let last_compacted_wal_sst_id = latest_manifest.core.replay_after_wal_id;
let min_age = self.wal_sst_min_age();
let sst_ids_to_delete = self
.table_store
.list_wal_ssts(..last_compacted_wal_sst_id)
.await?
.into_iter()
.filter(|wal_sst| {
Self::is_wal_sst_eligible_for_deletion(
&utc_now,
wal_sst,
&min_age,
&active_manifests,
)
})
.map(|wal_sst| wal_sst.id)
.collect::<Vec<_>>();
for id in sst_ids_to_delete {
if let Err(e) = self.table_store.delete_sst(&id).await {
error!("error deleting WAL SST [id={:?}, error={}]", id, e);
} else {
self.stats.gc_wal_count.increment(1);
}
}
Ok(())
}
fn resource(&self) -> &str {
"WAL"
}
}