flowdb 0.8.0

A high-performance embedded time-series + JSON document storage engine (LSM-tree), with built-in IndexedDB-compatible API.
Documentation
use crate::block_meta_index::BlockMetaIndex;
use crate::cache::BlockCache;
use crate::error::Result;
use crate::manifest::{Manifest, ManifestEntry};
use crate::stats::StatsCounters;
use crate::storage::StorageBackend;
use std::sync::Arc;

pub(crate) struct GcRunner {
    manifest: Arc<parking_lot::Mutex<Manifest>>,
    index: Arc<parking_lot::RwLock<BlockMetaIndex>>,
    cache: Arc<BlockCache>,
    stats: Arc<StatsCounters>,
    storage: Arc<dyn StorageBackend>,
}

impl GcRunner {
    pub fn new(
        manifest: Arc<parking_lot::Mutex<Manifest>>,
        index: Arc<parking_lot::RwLock<BlockMetaIndex>>,
        cache: Arc<BlockCache>,
        stats: Arc<StatsCounters>,
        storage: Arc<dyn StorageBackend>,
    ) -> Self {
        Self {
            manifest,
            index,
            cache,
            stats,
            storage,
        }
    }

    pub fn run(&self) -> Result<u64> {
        let now_us = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_micros() as i64;

        let mut purged: u64 = 0;
        let mut to_delete = Vec::new();

        {
            let mf = self.manifest.lock();
            let state = mf.state();
            for (sst_id, info) in &state.sstables {
                if info.max_expire < now_us {
                    to_delete.push(*sst_id);
                    purged += info.records;
                }
            }
        }

        for sst_id in to_delete {
            {
                let mut mf = self.manifest.lock();
                if let Err(e) = mf.append(&ManifestEntry::GcDeleteSst { sst_id }) {
                    tracing::error!("GC: failed to append delete for SST {}: {}", sst_id, e);
                    return Err(e);
                }
            }
            self.cache.invalidate_sst(sst_id);
            {
                let mut idx = self.index.write();
                idx.remove_sst(sst_id);
            }
            if let Err(e) = self.storage.delete_sst(sst_id) {
                tracing::warn!("GC: failed to delete SST {}: {}", sst_id, e);
            }
        }

        self.stats.gc_done(purged);
        self.refresh_stats();
        Ok(purged)
    }

    fn refresh_stats(&self) {
        let mf = self.manifest.lock();
        let sst_count = mf.state().sstables.len();
        let total_bytes: u64 = mf.state().sstables.values().map(|s| s.bytes).sum();
        self.stats.set_sstable(sst_count, total_bytes);
        drop(mf);

        let idx = self.index.read();
        self.stats
            .set_index_stats(idx.total_entries(), idx.bucket_count());
    }
}