slatedb 0.12.1

A cloud native embedded storage engine built on object storage.
Documentation
use crate::manifest::Manifest;
use crate::tablestore::SstFileMetadata;
use crate::{
    config::GarbageCollectorDirectoryOptions, error::SlateDBError, manifest::store::ManifestStore,
    tablestore::TableStore,
};
use chrono::{DateTime, Utc};
use log::error;
use std::collections::BTreeMap;
use std::sync::Arc;

use super::{GcStats, GcTask};

pub(crate) struct WalGcTask {
    manifest_store: Arc<ManifestStore>,
    table_store: Arc<TableStore>,
    stats: Arc<GcStats>,
    wal_options: GarbageCollectorDirectoryOptions,
}

impl std::fmt::Debug for WalGcTask {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("WalGcTask")
            .field("wal_options", &self.wal_options)
            .finish()
    }
}

impl WalGcTask {
    pub(super) fn new(
        manifest_store: Arc<ManifestStore>,
        table_store: Arc<TableStore>,
        stats: Arc<GcStats>,
        wal_options: GarbageCollectorDirectoryOptions,
    ) -> Self {
        WalGcTask {
            manifest_store,
            table_store,
            stats,
            wal_options,
        }
    }

    fn is_wal_sst_eligible_for_deletion(
        utc_now: &DateTime<Utc>,
        wal_sst: &SstFileMetadata,
        min_age: &chrono::Duration,
        active_manifests: &BTreeMap<u64, Manifest>,
    ) -> bool {
        if utc_now.signed_duration_since(wal_sst.last_modified) <= *min_age {
            return false;
        }

        let wal_sst_id = wal_sst.id.unwrap_wal_id();
        !active_manifests
            .values()
            .any(|manifest| manifest.has_wal_sst_reference(wal_sst_id))
    }

    fn wal_sst_min_age(&self) -> chrono::Duration {
        chrono::Duration::from_std(self.wal_options.min_age).expect("invalid duration")
    }
}

impl GcTask for WalGcTask {
    /// Collect garbage from the WAL SSTs. This will delete any WAL SSTs that meet
    /// the following conditions:
    ///  - not referenced by an active checkpoint
    ///  - older than the minimum age specified in the options
    ///  - older than the last compacted WAL SST.
    async fn collect(&self, utc_now: DateTime<Utc>) -> Result<(), SlateDBError> {
        let (latest_manifest_id, latest_manifest) =
            self.manifest_store.read_latest_manifest().await?;
        let active_manifests = self
            .manifest_store
            .read_referenced_manifests(latest_manifest_id, &latest_manifest)
            .await?;
        let last_compacted_wal_sst_id = latest_manifest.core.replay_after_wal_id;
        let min_age = self.wal_sst_min_age();
        let sst_ids_to_delete = self
            .table_store
            .list_wal_ssts(..last_compacted_wal_sst_id)
            .await?
            .into_iter()
            .filter(|wal_sst| {
                Self::is_wal_sst_eligible_for_deletion(
                    &utc_now,
                    wal_sst,
                    &min_age,
                    &active_manifests,
                )
            })
            .map(|wal_sst| wal_sst.id)
            .collect::<Vec<_>>();

        for id in sst_ids_to_delete {
            if let Err(e) = self.table_store.delete_sst(&id).await {
                error!("error deleting WAL SST [id={:?}, error={}]", id, e);
            } else {
                self.stats.gc_wal_count.increment(1);
            }
        }

        Ok(())
    }

    fn resource(&self) -> &str {
        "WAL"
    }
}