armour 0.30.27

DDL and serialization for key-value storage
Documentation
use core::borrow::Borrow;
use std::{fmt::Debug, ops::Deref, sync::Arc};

use armour_derive::armour_metrics;
use fjall::{KeyspaceCreateOptions, OptimisticTxKeyspace, Readable, Slice};

use super::{builder::TxTreeBuilder, db::TxDb, raw::TxRawTree, read::TxReadTree};
use crate::{
    Cid, DbError, DbResult, Record,
    indexes::IndexUpdateCollection,
    logdb::ByteValue,
    utils::{Change, EventHandlers},
};
use arrayvec::ArrayVec;

pub type IndexesMap<Item> = ArrayVec<Box<dyn IndexUpdateCollection<Item>>, 8>;

#[derive(derive_more::Debug)]
pub struct TxTree<Item: Record> {
    pub inner: TxReadTree<Item>,
    #[debug("{}", self.index_maps.len())]
    pub(crate) index_maps: Arc<IndexesMap<Item>>,
    pub(crate) handlers: Arc<EventHandlers<Item>>,
}

impl<Item: Record> Clone for TxTree<Item> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            index_maps: self.index_maps.clone(),
            handlers: self.handlers.clone(),
        }
    }
}

impl<Item: Record> Deref for TxTree<Item> {
    type Target = TxReadTree<Item>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl<Item> TxTree<Item>
where
    Item: Record<Value = Slice> + std::fmt::Debug,
{
    #[doc(alias = "new")]
    #[doc(alias = "create")]
    pub fn open(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> Self {
        TxTreeBuilder::<Item>::new(db, options).build()
    }

    pub fn with_name(db: &TxDb, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
        TxTreeBuilder::<Item>::with_name(db, name, options).build()
    }

    pub fn builder(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> TxTreeBuilder<Item> {
        TxTreeBuilder::<Item>::new(db, options)
    }

    #[doc(alias = "inner")]
    pub fn raw(&self) -> &TxRawTree {
        &self.inner.raw
    }

    pub(crate) fn tree(&self) -> &OptimisticTxKeyspace {
        &self.inner.raw.tree
    }

    #[inline]
    pub(super) fn invalidate_hash(&self, id: &Item::SelfId) {
        let group = id.group_id();
        self.raw().inner.invalidate_hash(group);
    }

    /// update fn when update item body or create item
    #[inline]
    pub(super) fn upsert_event(&self, id: &Item::SelfId, old: Option<&Item>, new: &Item) {
        if let Some(f) = self.handlers.on_upsert.as_ref() {
            f(id, Change { old, new });
        }
    }

    /// call when item removed
    #[inline]
    pub(super) fn remove_event(&self, id: &Item::SelfId, item: &Item) {
        if let Some(f) = self.handlers.on_remove.as_ref() {
            f(id, item);
        }
    }

    /// item, old_id, new_id
    #[inline]
    pub(super) fn _id_change_event(
        &self,
        item: &Item,
        old_id: &Item::SelfId,
        new_id: &Item::SelfId,
    ) {
        if let Some(f) = self.handlers.on_id_change.as_ref() {
            f(item, old_id, new_id);
        }
    }

    /// update indexes when item body is changed
    #[inline]
    pub(super) fn update_indexes(&self, id: &Item::SelfId, old: Option<&Item>, new: Option<&Item>) {
        for map in self.index_maps.iter() {
            map.update(id, old, new);
        }
    }
}

impl<Item: Record> From<TxTree<Item>> for TxRawTree {
    fn from(value: TxTree<Item>) -> Self {
        value.inner.raw
    }
}

// update methods
impl<Item> TxTree<Item>
where
    Item: Record<Value = Slice> + Debug,
{
    /// Insert or update value by id.
    /// Return old value if exist
    #[doc(alias = "insert")]
    #[doc(alias = "update")]
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
    pub fn upsert(
        &self,
        id: impl Borrow<Item::SelfId> + Debug,
        item: &Item,
    ) -> DbResult<Option<Item>> {
        let bytes = Item::ser(item);

        let borrowed_id = id.borrow();
        let key = Cid::encode(borrowed_id);
        let key_bytes = ByteValue::from(key.as_ref());

        (|| loop {
            let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;
            let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;

            let old_item = old_bytes.as_ref().map(|b| Item::deser(b));

            tx.insert(self.tree(), key_bytes.clone(), bytes.clone());

            match tx.commit() {
                Ok(_) => {
                    if old_item.is_none() {
                        self.raw()
                            .inner
                            .seq
                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                    }
                    self.update_indexes(borrowed_id, old_item.as_ref(), Some(item));
                    self.upsert_event(borrowed_id, old_item.as_ref(), item);
                    self.invalidate_hash(borrowed_id);
                    return Ok(old_item);
                }
                Err(_) => continue,
            }
        })()
    }

    /// Update, create or delete value by id with Fn. Can call closure multiple times.
    ///
    /// Return new value and closure result
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
    pub fn upsert_fn<F, Ret>(
        &self,
        id: impl Borrow<Item::SelfId> + Debug,
        f: F,
    ) -> DbResult<(Option<Item>, Ret)>
    where
        F: Fn(Option<Item>) -> (Option<Item>, Ret),
        Item: Clone,
        Ret: Debug,
    {
        let borrowed_id = id.borrow();
        let key = Cid::encode(borrowed_id);
        let key_bytes = ByteValue::from(key.as_ref());

        (|| loop {
            let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;
            let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;

            let old = old_bytes.as_ref().map(|b| Item::deser(b));
            let old_cloned = old.clone();

            let (new, ret) = f(old);

            let new_bytes = new.as_ref().map(|item| Item::ser(item));

            let changed = match (&old_bytes, &new_bytes) {
                (Some(o), Some(n)) => o.as_ref() != n.as_ref(),
                (None, None) => false,
                _ => true,
            };

            if changed {
                match &new_bytes {
                    Some(bytes) => tx.insert(self.tree(), key_bytes.clone(), bytes.clone()),
                    None => tx.remove(self.tree(), key_bytes.clone()),
                }
            }

            match tx.commit() {
                Ok(_) => {
                    if changed {
                        // Update seq based on old/new state
                        match (&old_cloned, &new) {
                            (None, Some(_)) => {
                                self.raw()
                                    .inner
                                    .seq
                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                            }
                            (Some(_), None) => {
                                self.raw()
                                    .inner
                                    .seq
                                    .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
                            }
                            _ => {}
                        }

                        if !self.index_maps.is_empty() {
                            let old_ref = old_cloned.as_ref();
                            let new_ref = new.as_ref();
                            self.update_indexes(borrowed_id, old_ref, new_ref);
                        }
                        match new.as_ref() {
                            Some(new) => {
                                self.upsert_event(borrowed_id, old_cloned.as_ref(), new);
                            }
                            None => {
                                if let Some(old) = old_cloned.as_ref() {
                                    self.remove_event(borrowed_id, old);
                                }
                            }
                        }
                        self.invalidate_hash(borrowed_id);
                    }
                    return Ok((new, ret));
                }
                Err(_) => {
                    continue;
                }
            }
        })()
    }

    /// Update item with closure if exist, return new value. Tried only once.
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
    pub fn update_fn<F>(&self, id: impl Borrow<Item::SelfId> + Debug, f: F) -> DbResult<Item>
    where
        F: FnOnce(Item) -> DbResult<Item>,
        Item: Clone,
    {
        let borrowed_id = id.borrow();
        let key = Cid::encode(borrowed_id);
        let key_bytes = ByteValue::from(key.as_ref());

        let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;

        let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;

        match old_bytes {
            Some(old_bytes) => {
                let old = Item::deser(&old_bytes);
                let old_cloned = old.clone();

                let new = f(old)?;
                let new_bytes = Item::ser(&new);

                tx.insert(self.tree(), key_bytes, new_bytes);

                match tx.commit() {
                    Ok(_) => {
                        self.update_indexes(borrowed_id, Some(&old_cloned), Some(&new));
                        self.upsert_event(borrowed_id, Some(&old_cloned), &new);
                        self.invalidate_hash(borrowed_id);
                        Ok(new)
                    }
                    Err(_) => Err(DbError::Transaction),
                }
            }
            None => Err(DbError::NotFound),
        }
    }

    /// permanently remove item
    #[doc(alias = "delete")]
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
    pub fn remove(&self, id: impl Borrow<Item::SelfId> + Debug) -> DbResult<Option<Item>> {
        let key = Cid::encode(id.borrow());
        let key_bytes = ByteValue::from(key.as_ref());
        let old = self.tree().take(key_bytes)?;

        let item = old.map(|old| {
            let item = Item::deser_owned(old);
            self.raw()
                .inner
                .seq
                .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
            self.update_indexes(id.borrow(), Some(&item), None);
            self.remove_event(id.borrow(), &item);
            item
        });

        self.invalidate_hash(id.borrow());

        Ok(item)
    }

    /// Remove item to other ('removed') tree
    #[doc(alias = "delete")]
    #[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
    #[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
    pub fn soft_remove(&self, id: impl Borrow<Item::SelfId> + Debug) -> DbResult<Option<Item>> {
        let key = Cid::encode(id.borrow());
        let key_bytes = ByteValue::from(key.as_ref());

        match self.tree().take(key_bytes.clone())? {
            Some(val) => {
                let item = Item::deser(&val);

                self.raw()
                    .inner
                    .seq
                    .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
                self.update_indexes(id.borrow(), Some(&item), None);
                self.remove_event(id.borrow(), &item);
                if let Err(err) = self.raw().removed.insert(key_bytes, val) {
                    error!("Failed to insert removed item: {}", err);
                }
                self.invalidate_hash(id.borrow());
                Ok(Some(item))
            }
            None => Ok(None),
        }
    }

    /// doesn't change indexes
    pub fn apply_batch<K, V>(&self, iter: impl Iterator<Item = (K, Option<V>)>) -> DbResult<()>
    where
        K: AsRef<Item::SelfId>,
        V: AsRef<Item>,
    {
        self.inner.raw.apply_batch(iter.map(|(id, item)| {
            let key_bytes = Cid::encode(id.as_ref());
            let key_bytes = ByteValue::from(key_bytes.as_ref());
            let val_bytes = item.map(|item| Item::ser(item.as_ref()));
            (key_bytes, val_bytes)
        }))
    }

    pub fn next_id(&self) -> DbResult<u64> {
        self.inner.raw.next_id()
    }
}