brk_store 0.2.5

A thin wrapper around fjall
Documentation
#![doc = include_str!("../README.md")]

use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path};

use brk_error::Result;
use brk_types::{Height, Version};
use byteview::ByteView;
use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*};
use rustc_hash::{FxHashMap, FxHashSet};

mod any;
mod item;
mod kind;
mod meta;
mod mode;

pub use any::*;
pub use item::*;
pub use kind::*;
pub use meta::*;
pub use mode::*;

const MAJOR_FJALL_VERSION: Version = Version::new(3);

pub fn open_database(path: &Path) -> fjall::Result<Database> {
    Database::builder(path.join("fjall"))
        .cache_size(3 * 1024 * 1024 * 1024)
        .max_cached_files(Some(512))
        .open()
}

#[derive(Clone)]
pub struct Store<K, V> {
    meta: StoreMeta,
    name: &'static str,
    keyspace: Keyspace,
    puts: FxHashMap<K, V>,
    dels: FxHashSet<K>,
    caches: Vec<FxHashMap<K, V>>,
}

impl<K, V> Store<K, V>
where
    K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
    V: Debug + Clone + From<ByteView>,
    ByteView: From<K> + From<V>,
    Self: Send + Sync,
{
    pub fn import(
        db: &Database,
        path: &Path,
        name: &str,
        version: Version,
        mode: Mode,
        kind: Kind,
    ) -> Result<Self> {
        Self::import_inner(db, path, name, version, mode, kind, 0)
    }

    pub fn import_cached(
        db: &Database,
        path: &Path,
        name: &str,
        version: Version,
        mode: Mode,
        kind: Kind,
        max_batches: u8,
    ) -> Result<Self> {
        Self::import_inner(db, path, name, version, mode, kind, max_batches)
    }

    fn import_inner(
        db: &Database,
        path: &Path,
        name: &str,
        version: Version,
        mode: Mode,
        kind: Kind,
        max_batches: u8,
    ) -> Result<Self> {
        fs::create_dir_all(path)?;

        let (meta, keyspace) = StoreMeta::checked_open(
            &path.join(format!("meta/{name}")),
            MAJOR_FJALL_VERSION + version,
            || {
                Self::open_keyspace(db, name, mode, kind).inspect_err(|e| {
                    eprintln!("{e}");
                    eprintln!("Delete {path:?} and try again");
                })
            },
        )?;

        let mut caches = vec![];
        for _ in 0..max_batches {
            caches.push(FxHashMap::default());
        }

        Ok(Self {
            meta,
            name: Box::leak(Box::new(name.to_string())),
            keyspace,
            puts: FxHashMap::default(),
            dels: FxHashSet::default(),
            caches,
        })
    }

    fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result<Keyspace> {
        let mut options = KeyspaceCreateOptions::default()
            .manual_journal_persist(true)
            .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true]))
            .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true]));

        match kind {
            Kind::Random => {
                options = options
                    .filter_block_pinning_policy(PinningPolicy::new([true, true, true, false]))
                    .filter_policy(FilterPolicy::new([
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
                            0.0001,
                        )),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)),
                    ]));
            }
            Kind::Recent => {
                options = options
                    .expect_point_read_hits(true)
                    .filter_policy(FilterPolicy::new([
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
                            0.0001,
                        )),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)),
                        FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)),
                    ]));
            }
            Kind::Sequential => {
                options = options
                    .filter_block_partitioning_policy(PartitioningPolicy::all(true))
                    .index_block_partitioning_policy(PartitioningPolicy::all(true))
                    .filter_block_pinning_policy(PinningPolicy::all(false))
                    .index_block_pinning_policy(PinningPolicy::all(false));
            }
            Kind::Vec => {
                options = options
                    .max_memtable_size(8 * 1024 * 1024)
                    .filter_policy(FilterPolicy::disabled())
                    .filter_block_pinning_policy(PinningPolicy::all(false))
                    .index_block_pinning_policy(PinningPolicy::all(false));
            }
        }

        database.keyspace(name, || options).map_err(|e| e.into())
    }

    #[inline]
    pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
    where
        ByteView: From<&'a K>,
    {
        if let Some(v) = self.puts.get(key) {
            return Ok(Some(Cow::Borrowed(v)));
        }

        for cache in &self.caches {
            if let Some(v) = cache.get(key) {
                return Ok(Some(Cow::Borrowed(v)));
            }
        }

        if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
            Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
        } else {
            Ok(None)
        }
    }

    #[inline]
    pub fn is_empty(&self) -> Result<bool> {
        self.keyspace.is_empty().map_err(|e| e.into())
    }

    #[inline]
    pub fn insert(&mut self, key: K, value: V) {
        let _ = self.dels.is_empty() || self.dels.remove(&key);
        self.puts.insert(key, value);
    }

    #[inline]
    pub fn remove(&mut self, key: K) {
        if self.puts.remove(&key).is_some() {
            return;
        }
        let newly_inserted = self.dels.insert(key);
        debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
    }

    /// Clear all caches. Call after bulk removals (e.g., rollback) to prevent stale reads.
    #[inline]
    pub fn clear_caches(&mut self) {
        for cache in &mut self.caches {
            *cache = FxHashMap::default();
        }
    }

    /// Takes buffered puts/dels and returns a closure that ingests them into the keyspace.
    /// The store is left with empty buffers, ready for the next batch.
    #[allow(clippy::type_complexity)]
    pub fn take_pending_ingest(
        &mut self,
        height: Height,
    ) -> Result<Option<Box<dyn FnOnce() -> Result<()> + Send>>>
    where
        K: Send + 'static,
        V: Send + 'static,
        for<'a> ByteView: From<&'a K> + From<&'a V>,
    {
        self.export_meta_if_needed(height)?;

        let puts = mem::take(&mut self.puts);
        let dels = mem::take(&mut self.dels);

        if puts.is_empty() && dels.is_empty() {
            return Ok(None);
        }

        let keyspace = self.keyspace.clone();

        Ok(Some(Box::new(move || {
            Self::ingest(&keyspace, puts.iter(), dels.iter())
        })))
    }

    #[inline]
    pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
        self.keyspace
            .iter()
            .map(|res| res.into_inner().unwrap())
            .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
    }

    #[inline]
    pub fn prefix<P: AsRef<[u8]>>(
        &self,
        prefix: P,
    ) -> impl DoubleEndedIterator<Item = (K, V)> + '_ {
        self.keyspace
            .prefix(prefix)
            .map(|res| res.into_inner().unwrap())
            .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
    }

    pub fn approximate_len(&self) -> usize {
        self.keyspace.approximate_len()
    }

    #[inline]
    fn has(&self, height: Height) -> bool {
        self.meta.has(height)
    }

    #[inline]
    pub fn needs(&self, height: Height) -> bool {
        self.meta.needs(height)
    }

    fn export_meta(&mut self, height: Height) -> Result<()> {
        self.meta.export(height)?;
        Ok(())
    }

    fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
        if !self.has(height) {
            self.export_meta(height)?;
        }
        Ok(())
    }

    fn ingest<'a>(
        keyspace: &Keyspace,
        puts: impl Iterator<Item = (&'a K, &'a V)>,
        dels: impl Iterator<Item = &'a K>,
    ) -> Result<()>
    where
        ByteView: From<&'a K> + From<&'a V>,
        K: 'a,
        V: 'a,
    {
        let mut items: Vec<Item<&'a K, &'a V>> = puts
            .map(|(key, value)| Item::Value { key, value })
            .chain(dels.map(Item::Tomb))
            .collect();

        items.sort_unstable();

        let mut ingestion = keyspace.start_ingestion()?;
        for item in items {
            match item {
                Item::Value { key, value } => {
                    ingestion.write(ByteView::from(key), ByteView::from(value))?;
                }
                Item::Tomb(key) => {
                    ingestion.write_weak_tombstone(ByteView::from(key))?;
                }
            }
        }
        ingestion.finish()?;

        Ok(())
    }
}

impl<K, V> AnyStore for Store<K, V>
where
    K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
    V: Debug + Clone + From<ByteView>,
    for<'a> ByteView: From<K> + From<V> + From<&'a K> + From<&'a V>,
    Self: Send + Sync,
{
    fn keyspace(&self) -> &Keyspace {
        &self.keyspace
    }

    fn export_meta(&mut self, height: Height) -> Result<()> {
        self.export_meta(height)
    }

    fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
        self.export_meta_if_needed(height)
    }

    fn name(&self) -> &'static str {
        self.name
    }

    fn height(&self) -> Option<Height> {
        self.meta.height()
    }

    fn has(&self, height: Height) -> bool {
        self.has(height)
    }

    fn needs(&self, height: Height) -> bool {
        self.needs(height)
    }

    fn version(&self) -> Version {
        self.meta.version()
    }

    fn commit(&mut self, height: Height) -> Result<()> {
        self.export_meta_if_needed(height)?;

        let puts = mem::take(&mut self.puts);
        let dels = mem::take(&mut self.dels);

        if puts.is_empty() && dels.is_empty() {
            return Ok(());
        }

        Self::ingest(&self.keyspace, puts.iter(), dels.iter())?;

        if !self.caches.is_empty() {
            self.caches.pop();
            self.caches.insert(0, puts);
        }

        Ok(())
    }

    fn reset(&mut self) -> Result<()> {
        self.meta.reset()?;
        self.puts = FxHashMap::default();
        self.dels = FxHashSet::default();
        for cache in &mut self.caches {
            *cache = FxHashMap::default();
        }
        self.keyspace.clear()?;
        Ok(())
    }
}