use crate::{
compactions_store::CompactionsStore, config::GarbageCollectorDirectoryOptions,
error::SlateDBError,
};
use chrono::{DateTime, Utc};
use log::error;
use std::sync::Arc;
use super::{GcStats, GcTask};
pub(crate) struct CompactionsGcTask {
compactions_store: Arc<CompactionsStore>,
stats: Arc<GcStats>,
compactions_options: GarbageCollectorDirectoryOptions,
}
impl std::fmt::Debug for CompactionsGcTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactionsGcTask")
.field("compactions_options", &self.compactions_options)
.finish()
}
}
impl CompactionsGcTask {
pub(super) fn new(
compactions_store: Arc<CompactionsStore>,
stats: Arc<GcStats>,
compactions_options: GarbageCollectorDirectoryOptions,
) -> Self {
Self {
compactions_store,
stats,
compactions_options,
}
}
fn compactions_min_age(&self) -> chrono::Duration {
chrono::Duration::from_std(self.compactions_options.min_age).expect("invalid duration")
}
}
impl GcTask for CompactionsGcTask {
async fn collect(&self, utc_now: DateTime<Utc>) -> Result<(), SlateDBError> {
let min_age = self.compactions_min_age();
let mut compactions_metadata_list = self.compactions_store.list_compactions(..).await?;
compactions_metadata_list.pop();
for compactions_metadata in compactions_metadata_list {
if utc_now.signed_duration_since(compactions_metadata.last_modified) > min_age {
if let Err(e) = self
.compactions_store
.delete_compactions(compactions_metadata.id)
.await
{
error!(
"error deleting compactions [id={:?}, error={}]",
compactions_metadata.id, e
);
} else {
self.stats.gc_compactions_count.increment(1);
}
}
}
Ok(())
}
fn resource(&self) -> &str {
"Compactions"
}
}