use crate::{
config::GarbageCollectorDirectoryOptions, error::SlateDBError, manifest::store::ManifestStore,
};
use chrono::{DateTime, Utc};
use log::error;
use std::collections::HashSet;
use std::sync::Arc;
use super::{GcStats, GcTask};
pub(crate) struct ManifestGcTask {
manifest_store: Arc<ManifestStore>,
stats: Arc<GcStats>,
manifest_options: GarbageCollectorDirectoryOptions,
}
impl std::fmt::Debug for ManifestGcTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ManifestGcTask")
.field("manifest_options", &self.manifest_options)
.finish()
}
}
impl ManifestGcTask {
pub(super) fn new(
manifest_store: Arc<ManifestStore>,
stats: Arc<GcStats>,
manifest_options: GarbageCollectorDirectoryOptions,
) -> Self {
ManifestGcTask {
manifest_store,
stats,
manifest_options,
}
}
fn manifest_min_age(&self) -> chrono::Duration {
chrono::Duration::from_std(self.manifest_options.min_age).expect("invalid duration")
}
}
impl GcTask for ManifestGcTask {
async fn collect(&self, utc_now: DateTime<Utc>) -> Result<(), SlateDBError> {
let min_age = self.manifest_min_age();
let mut manifest_metadata_list = self.manifest_store.list_manifests(..).await?;
let latest_manifest = if let Some(manifest_metadata) = manifest_metadata_list.pop() {
self.manifest_store
.read_manifest(manifest_metadata.id)
.await?
} else {
return Err(SlateDBError::LatestTransactionalObjectVersionMissing);
};
let active_manifest_ids: HashSet<_> = latest_manifest
.core
.checkpoints
.iter()
.map(|checkpoint| checkpoint.manifest_id)
.collect();
for manifest_metadata in manifest_metadata_list {
let is_active = active_manifest_ids.contains(&manifest_metadata.id);
if !is_active
&& utc_now.signed_duration_since(manifest_metadata.last_modified) > min_age
{
if let Err(e) = self
.manifest_store
.delete_manifest(manifest_metadata.id)
.await
{
error!(
"error deleting manifest [id={:?}, error={}]",
manifest_metadata.id, e
);
} else {
self.stats.gc_manifest_count.increment(1);
}
}
}
Ok(())
}
fn resource(&self) -> &str {
"Manifest"
}
}