armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::{borrow::Borrow, fmt::Debug, ops::RangeBounds};

use armour_derive::armour_metrics;
use fjall::{Conflict, Readable, Slice};
use tracing::instrument;

use crate::logdb::{ItemWithRaw, IterTree, TryIterTree, helpers::*};
use crate::metrics::{counter, histogram};
use crate::types::ArmourError;
use crate::{Cid, DbError, DbResult, Entry, Record, TxReadTree};

pub struct WriteTx {
    pub(crate) inner: fjall::OptimisticWriteTx,
}

impl WriteTx {
    pub fn rollback(self) {
        self.inner.rollback();
    }

    pub fn commit(self) -> DbResult<Result<(), Conflict>> {
        self.inner.commit().map_err(DbError::from)
    }
}

impl WriteTx {
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn get<Item>(
        &self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
    ) -> DbResult<Option<Item>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let key = Cid::encode(id.borrow());
        let val = self
            .inner
            .get(&tree.raw.tree, key.as_ref())
            .map_err(DbError::from)?;
        match val {
            Some(val) => Ok(Some(Item::deser_owned(val))),
            None => Ok(None),
        }
    }

    pub fn get_or_err<Item>(
        &self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
    ) -> DbResult<Item>
    where
        Item: Record<Value = Slice> + Debug,
    {
        self.get(tree, id)
            .and_then(|item| item.ok_or(DbError::NotFound))
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn contains<Item>(
        &self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
    ) -> DbResult<bool>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let key = Cid::encode(id.borrow());
        self.inner
            .contains_key(&tree.raw.tree, key.as_ref())
            .map_err(DbError::from)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn size_of<Item>(
        &self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
    ) -> DbResult<Option<u32>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let key = Cid::encode(id.borrow());
        self.inner
            .size_of(&tree.raw.tree, key.as_ref())
            .map_err(DbError::from)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn first<Item>(&self, tree: &TxReadTree<Item>) -> DbResult<Option<Entry<Item>>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let val = self.inner.first_key_value(&tree.raw.tree);
        match val {
            Some(val) => {
                let (key, val) = val.into_inner().map_err(DbError::from)?;
                let id = Item::deser_key(&key);
                let value = Item::deser_owned(val);
                Ok(Some(Entry::new(id, value)))
            }
            None => Ok(None),
        }
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn last<Item>(&self, tree: &TxReadTree<Item>) -> DbResult<Option<Entry<Item>>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let val = self.inner.last_key_value(&tree.raw.tree);
        match val {
            Some(val) => {
                let (key, val) = val.into_inner().map_err(DbError::from)?;
                let id = Item::deser_key(&key);
                let value = Item::deser_owned(val);
                Ok(Some(Entry::new(id, value)))
            }
            None => Ok(None),
        }
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn keys<Item>(
        &self,
        tree: &TxReadTree<Item>,
    ) -> impl DoubleEndedIterator<Item = Item::SelfId>
    where
        Item: Record<Value = Slice> + Debug,
    {
        counter!("armour_write_tx_keys", "name" => Item::NAME).increment(1);
        self.inner
            .iter(&tree.raw.tree)
            .filter_map(entry_key_filter_map::<Item>)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn iter<Item>(&self, tree: &TxReadTree<Item>) -> IterTree<Entry<Item>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        counter!("armour_write_tx_iter", "name" => Item::NAME).increment(1);
        self.inner.iter(&tree.raw.tree).filter_map(entry_filter_map)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn range<Item, K, R>(&self, tree: &TxReadTree<Item>, range: R) -> IterTree<Entry<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        K: AsRef<[u8]>,
        R: RangeBounds<K> + Debug,
    {
        counter!("armour_write_tx_range", "name" => Item::NAME).increment(1);
        self.inner
            .range(&tree.raw.tree, range)
            .filter_map(entry_filter_map::<Item> as fn(fjall::Guard) -> Option<Entry<Item>>)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn prefix<Item, P>(&self, tree: &TxReadTree<Item>, prefix: P) -> IterTree<Entry<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        P: AsRef<[u8]> + Debug,
    {
        counter!("armour_write_tx_prefix", "name" => Item::NAME).increment(1);
        self.inner
            .prefix(&tree.raw.tree, prefix)
            .filter_map(entry_filter_map::<Item> as fn(fjall::Guard) -> Option<Entry<Item>>)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn prefix_key<Item, P>(&self, tree: &TxReadTree<Item>, prefix: P) -> IterTree<Item::SelfId>
    where
        Item: Record<Value = Slice> + Debug,
        P: AsRef<[u8]> + Debug,
    {
        counter!("armour_write_tx_prefix_key", "name" => Item::NAME).increment(1);
        self.inner.prefix(&tree.raw.tree, prefix).filter_map(
            entry_key_filter_map::<Item> as fn(fjall::Guard) -> Option<<Item as Record>::SelfId>,
        )
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn try_iter<Item>(&self, tree: &TxReadTree<Item>) -> TryIterTree<Entry<Item>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        counter!("armour_write_tx_try_iter", "name" => Item::NAME).increment(1);
        self.inner
            .iter(&tree.raw.tree)
            .map(entry_map::<Item> as fn(fjall::Guard) -> Result<Entry<Item>, DbError>)
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn try_range<Item, K, R>(
        &self,
        tree: &TxReadTree<Item>,
        range: R,
    ) -> TryIterTree<ItemWithRaw<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        K: AsRef<[u8]>,
        R: RangeBounds<K> + Debug,
    {
        counter!("armour_write_tx_try_range", "name" => Item::NAME).increment(1);
        self.inner.range(&tree.raw.tree, range).map(
            entry_map_with_raw::<Item>
                as fn(fjall::Guard) -> Result<(Slice, Slice, Entry<Item>), DbError>,
        )
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    pub fn try_prefix<Item, P>(
        &self,
        tree: &TxReadTree<Item>,
        prefix: P,
    ) -> TryIterTree<Entry<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        P: AsRef<[u8]> + Debug,
    {
        counter!("armour_write_tx_try_prefix", "name" => Item::NAME).increment(1);
        self.inner
            .prefix(&tree.raw.tree, prefix)
            .map(entry_map::<Item> as fn(fjall::Guard) -> Result<Entry<Item>, DbError>)
    }
}

impl WriteTx {
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn insert<Item>(
        &mut self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
        value: Item,
    ) where
        Item: Record<Value = Slice> + Debug,
    {
        let encoded = Cid::encode(id.borrow());
        let key = encoded.as_ref();
        let serialized = Item::ser(&value);
        self.inner.insert(&tree.raw.tree, key, serialized);
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn remove<Item>(&mut self, tree: &TxReadTree<Item>, id: impl Borrow<Item::SelfId> + Debug)
    where
        Item: Record<Value = Slice> + Debug,
    {
        let encoded = Cid::encode(id.borrow());
        let key = encoded.as_ref();
        self.inner.remove(&tree.raw.tree, key);
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn take<Item>(
        &mut self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
    ) -> DbResult<Option<Item>>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let encoded = Cid::encode(id.borrow());
        let key = encoded.as_ref();
        let val = self.inner.take(&tree.raw.tree, key)?;
        match val {
            Some(val) => Ok(Some(Item::deser_owned(val))),
            None => Ok(None),
        }
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn update_fetch<Item, F>(
        &mut self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
        mut f: F,
    ) -> DbResult<Option<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        F: FnMut(Option<&Item>) -> Option<Item>,
    {
        let encoded = Cid::encode(id.borrow());
        let key = encoded.as_ref();
        let val = self
            .inner
            .update_fetch(&tree.raw.tree, key, |opt_val| match opt_val {
                Some(val) => {
                    let item = Item::deser_owned(val.clone());
                    f(Some(&item)).map(|new_item| Item::ser(&new_item))
                }
                None => f(None).map(|new_item| Item::ser(&new_item)),
            })?;
        match val {
            Some(val) => Ok(Some(Item::deser_owned(val))),
            None => Ok(None),
        }
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn fetch_update<Item, F>(
        &mut self,
        tree: &TxReadTree<Item>,
        id: impl Borrow<Item::SelfId> + Debug,
        mut f: F,
    ) -> DbResult<Option<Item>>
    where
        Item: Record<Value = Slice> + Debug,
        F: FnMut(Option<&Item>) -> Option<Item>,
    {
        let encoded = Cid::encode(id.borrow());
        let key = encoded.as_ref();
        let val = self
            .inner
            .fetch_update(&tree.raw.tree, key, |opt_val| match opt_val {
                Some(val) => {
                    let item = Item::deser_owned(val.clone());
                    f(Some(&item)).map(|new_item| Item::ser(&new_item))
                }
                None => f(None).map(|new_item| Item::ser(&new_item)),
            })?;
        match val {
            Some(val) => Ok(Some(Item::deser_owned(val))),
            None => Ok(None),
        }
    }

    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
    pub fn next_id<Item>(&mut self, tree: &TxReadTree<Item>) -> DbResult<u64>
    where
        Item: Record<Value = Slice> + Debug,
    {
        let next_id_key = format!("next_id-{}", tree.raw.name);
        let key_ref = next_id_key.as_bytes();
        let current = self.inner.get(&tree.raw.db.seq_tree, key_ref)?;

        let mut id = 1u64;
        if let Some(bytes) = current {
            let bytes = bytes
                .as_ref()
                .try_into()
                .map_err(|err| DbError::Armour(ArmourError::from(err)))?;
            let old = u64::from_le_bytes(bytes);
            id = old + 1;
        }
        let next_val = id.to_le_bytes();
        self.inner.insert(&tree.raw.db.seq_tree, key_ref, next_val);

        Ok(id)
    }
}