infino 0.1.0

A fast retrieval engine that stores data on object storage and runs SQL, full-text search, and vector search over it from a single system — search-on-Parquet.
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Infino Authors

use std::{
    collections::HashSet,
    time::{Duration, SystemTime},
};

use crate::{
    Supertable,
    runtime_bridge::bridge_on_runtime,
    supertable::{
        ManifestSnapshot,
        error::GcError,
        manifest::commit::{MANIFEST_DIR, MANIFEST_PARTS_DIR, POINTER_PATH, manifest_uri},
    },
};

#[derive(Debug, Default, Clone)]
pub struct GcReport {
    pub objects_deleted: u64,
    pub bytes_freed: u64,
    pub objects_skipped_live: u64,
    pub objects_skipped_too_new: u64,
    pub delete_errors: u64,
}

fn build_live_set(manifest: &ManifestSnapshot) -> HashSet<String> {
    let mut live = HashSet::new();
    live.insert(POINTER_PATH.to_string());
    live.insert(manifest_uri(manifest.manifest_id));
    for entry in manifest.get_all_list_entries() {
        live.insert(entry.uri.clone());
    }
    for sf in manifest.get_all_superfiles() {
        live.insert(sf.uri.storage_path());
    }
    live
}

impl Supertable {
    pub fn gc(&self, safety_gap: Duration) -> Result<GcReport, GcError> {
        bridge_on_runtime(self.gc_async(safety_gap), &self.inner().query_runtime())
    }

    pub(crate) async fn gc_async(&self, safety_gap: Duration) -> Result<GcReport, GcError> {
        let inner = self.inner();
        let storage = inner.options.storage.clone().ok_or(GcError::NoStorage)?;
        let manifest = inner.manifest.load_full();
        let live = build_live_set(&manifest);
        let cutoff = SystemTime::now()
            .checked_sub(safety_gap)
            .unwrap_or(SystemTime::UNIX_EPOCH);

        let mut report = GcReport::default();

        for prefix in [MANIFEST_DIR, MANIFEST_PARTS_DIR, "data"] {
            let entries = storage.list_with_prefix_metadata(prefix).await?;
            for (key, meta) in entries {
                if live.contains(&key) {
                    report.objects_skipped_live += 1;
                    continue;
                }
                if meta.last_modified >= cutoff {
                    report.objects_skipped_too_new += 1;
                    continue;
                }
                match storage.delete(&key).await {
                    Ok(()) => {
                        report.objects_deleted += 1;
                        report.bytes_freed += meta.size;
                    }
                    Err(_) => {
                        report.delete_errors += 1;
                    }
                }
            }
        }

        Ok(report)
    }
}

#[cfg(test)]
mod tests {
    use std::{collections::HashMap, sync::Arc};

    use uuid::Uuid;

    use super::*;
    use crate::{
        supertable::{
            SupertableOptions,
            manifest::{ManifestSnapshot, SuperfileEntry, SuperfileUri},
        },
        test_helpers::default_supertable_options,
    };

    fn opts() -> Arc<SupertableOptions> {
        Arc::new(default_supertable_options())
    }

    fn sf_entry(uri: SuperfileUri) -> Arc<SuperfileEntry> {
        Arc::new(SuperfileEntry {
            superfile_id: Uuid::new_v4(),
            uri,
            n_docs: 1,
            id_min: 0,
            id_max: 0,
            scalar_stats: HashMap::new(),
            fts_summary: HashMap::new(),
            vector_summary: HashMap::new(),
            partition_key: vec![],
            partition_hint: None,
            subsection_offsets: None,
        })
    }

    #[test]
    fn build_live_set_contains_pointer_and_manifest_uri() {
        let manifest = ManifestSnapshot::empty(opts());
        let live = build_live_set(&manifest);
        assert!(live.contains(POINTER_PATH));
        assert!(live.contains(&manifest_uri(manifest.manifest_id)));
    }

    #[test]
    fn build_live_set_contains_superfile_uris() {
        let uri = SuperfileUri::new_v4();
        let manifest = ManifestSnapshot::empty(opts()).with_appended(vec![sf_entry(uri)]);
        let live = build_live_set(&manifest);
        assert!(live.contains(&uri.storage_path()));
    }

    #[test]
    fn build_live_set_does_not_contain_older_manifest_uris() {
        let uri = SuperfileUri::new_v4();
        let manifest = ManifestSnapshot::empty(opts()).with_appended(vec![sf_entry(uri)]);
        assert_eq!(manifest.manifest_id, 1);
        let live = build_live_set(&manifest);
        assert!(!live.contains(&manifest_uri(0)));
        assert!(!live.contains(&manifest_uri(2)));
    }
}