armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::ops::Bound;

use fjall::{Keyspace, OptimisticTxDatabase, OptimisticTxKeyspace, Readable};

use crate::utils::CollectionInfo;
use crate::{DbError, DbResult, migration};

use super::protocol::UpsertKey;

pub(crate) trait RpcHandler: Send + Sync {
    fn name(&self) -> &str;
    fn partition_name(&self) -> String;
    fn info(&self) -> (u64, u16);
    fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>>;
    fn contains(&self, key: &[u8]) -> DbResult<Option<u32>>;
    fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>>;
    fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>>;
    fn range(
        &self,
        start: Bound<Vec<u8>>,
        end: Bound<Vec<u8>>,
    ) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>>;
    fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>>;
    fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>>;
    fn remove(&self, key: &[u8], soft: bool) -> DbResult<()>;
    fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<Vec<u8>>>;
    fn count(&self, exact: bool) -> DbResult<u64>;
    fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()>;
}

fn guard_to_kv(guard: fjall::Guard) -> Option<(Vec<u8>, Vec<u8>)> {
    match guard.into_inner() {
        Ok((k, v)) => Some((k.to_vec(), v.to_vec())),
        Err(e) => {
            error!(%e);
            None
        }
    }
}

fn guard_to_key(guard: fjall::Guard) -> Option<Vec<u8>> {
    match guard.key() {
        Ok(k) => Some(k.to_vec()),
        Err(e) => {
            error!(%e);
            None
        }
    }
}

fn next_id(seq_tree: &Keyspace, name: &str) -> DbResult<u64> {
    let key = format!("__next_id-{}", name);
    let val = seq_tree.get(&key)?;
    let val = match val {
        Some(val) => {
            let bytes = val.as_ref().try_into().expect("Invalid byte array length");
            u64::from_le_bytes(bytes)
        }
        None => 1,
    };
    let bytes = (val + 1).to_le_bytes();
    seq_tree.insert(&key, bytes)?;
    Ok(val)
}

fn tx_next_id(
    db: &OptimisticTxDatabase,
    seq_tree: &OptimisticTxKeyspace,
    name: &str,
) -> DbResult<u64> {
    let next_id_key = format!("next_id-{}", name);
    let key_ref = next_id_key.as_bytes();

    loop {
        let mut tx = db.write_tx().map_err(DbError::from)?;
        let current = tx.get(seq_tree, key_ref).map_err(DbError::from)?;

        let mut id = 1u64;
        if let Some(bytes) = current {
            let bytes = bytes
                .as_ref()
                .try_into()
                .map_err(|err| DbError::Armour(crate::types::ArmourError::from(err)))?;
            let old = u64::from_le_bytes(bytes);
            id = old + 1;
        }

        let next_val = id.to_le_bytes();
        tx.insert(seq_tree, key_ref, next_val);

        match tx.commit() {
            Ok(_) => return Ok(id),
            Err(_) => continue,
        }
    }
}

pub(crate) struct KsHandler {
    pub name: String,
    pub info: CollectionInfo,
    pub tree: Keyspace,
    pub seq_tree: Keyspace,
}

impl RpcHandler for KsHandler {
    fn name(&self) -> &str {
        &self.name
    }

    fn partition_name(&self) -> String {
        migration::collection_name(&self.name, self.info.version)
    }

    fn info(&self) -> (u64, u16) {
        (self.info.typ_hash, self.info.version)
    }

    fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>> {
        self.tree
            .get(key)
            .map(|item| item.map(|v| v.to_vec()))
            .map_err(DbError::from)
    }

    fn contains(&self, key: &[u8]) -> DbResult<Option<u32>> {
        match self.tree.get(key)? {
            Some(v) => Ok(Some(v.len() as u32)),
            None => Ok(None),
        }
    }

    fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
        match self.tree.first_key_value() {
            Some(guard) => Ok(guard_to_kv(guard)),
            None => Ok(None),
        }
    }

    fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
        match self.tree.last_key_value() {
            Some(guard) => Ok(guard_to_kv(guard)),
            None => Ok(None),
        }
    }

    fn range(
        &self,
        start: Bound<Vec<u8>>,
        end: Bound<Vec<u8>>,
    ) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>> {
        Ok(self
            .tree
            .range((start, end))
            .filter_map(guard_to_kv)
            .collect())
    }

    fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>> {
        Ok(self
            .tree
            .range((start, end))
            .filter_map(guard_to_key)
            .collect())
    }

    fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>> {
        let key_bytes = match key {
            UpsertKey::Sequence => {
                let id = next_id(&self.seq_tree, &self.name)?;
                id.to_le_bytes().to_vec()
            }
            UpsertKey::Provided(k) => k,
        };

        if let Some(update_only) = flag {
            let exists = self.tree.contains_key(&key_bytes)?;
            if update_only && !exists {
                return Err(DbError::NotFound);
            }
            if !update_only && exists {
                return Err(DbError::Client("already exists"));
            }
        }

        self.tree.insert(&key_bytes, &value)?;
        Ok(key_bytes)
    }

    fn remove(&self, key: &[u8], _soft: bool) -> DbResult<()> {
        self.tree.remove(key)?;
        Ok(())
    }

    fn take(&self, _key: &[u8], _soft: bool) -> DbResult<Option<Vec<u8>>> {
        Err(DbError::NotImplemented)
    }

    fn count(&self, exact: bool) -> DbResult<u64> {
        if exact {
            Ok(self.tree.iter().count() as u64)
        } else {
            Ok(self.tree.approximate_len() as u64)
        }
    }

    fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()> {
        for (key, value) in items {
            match value {
                Some(v) => self.tree.insert(&key, &v)?,
                None => self.tree.remove(&key)?,
            };
        }
        Ok(())
    }
}

pub(crate) struct TxKsHandler {
    pub name: String,
    pub info: CollectionInfo,
    pub tree: OptimisticTxKeyspace,
    pub seq_tree: OptimisticTxKeyspace,
    pub db: OptimisticTxDatabase,
}

impl RpcHandler for TxKsHandler {
    fn name(&self) -> &str {
        &self.name
    }

    fn partition_name(&self) -> String {
        migration::collection_name(&self.name, self.info.version)
    }

    fn info(&self) -> (u64, u16) {
        (self.info.typ_hash, self.info.version)
    }

    fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>> {
        self.tree
            .get(key)
            .map(|item| item.map(|v| v.to_vec()))
            .map_err(DbError::from)
    }

    fn contains(&self, key: &[u8]) -> DbResult<Option<u32>> {
        match self.tree.get(key)? {
            Some(v) => Ok(Some(v.len() as u32)),
            None => Ok(None),
        }
    }

    fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
        match self.tree.first_key_value() {
            Some(guard) => Ok(guard_to_kv(guard)),
            None => Ok(None),
        }
    }

    fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
        match self.tree.last_key_value() {
            Some(guard) => Ok(guard_to_kv(guard)),
            None => Ok(None),
        }
    }

    fn range(
        &self,
        start: Bound<Vec<u8>>,
        end: Bound<Vec<u8>>,
    ) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>> {
        let tx = self.db.read_tx();
        Ok(tx
            .range(&self.tree, (start, end))
            .filter_map(guard_to_kv)
            .collect())
    }

    fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>> {
        let tx = self.db.read_tx();
        Ok(tx
            .range(&self.tree, (start, end))
            .filter_map(guard_to_key)
            .collect())
    }

    fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>> {
        let key_bytes = match key {
            UpsertKey::Sequence => {
                let id = tx_next_id(&self.db, &self.seq_tree, &self.name)?;
                id.to_le_bytes().to_vec()
            }
            UpsertKey::Provided(k) => k,
        };

        loop {
            let mut tx = self.db.write_tx().map_err(DbError::from)?;

            if let Some(update_only) = flag {
                let exists = tx
                    .contains_key(&self.tree, &key_bytes)
                    .map_err(DbError::from)?;
                if update_only && !exists {
                    return Err(DbError::NotFound);
                }
                if !update_only && exists {
                    return Err(DbError::Client("already exists"));
                }
            }

            tx.insert(&self.tree, &key_bytes, &value);

            match tx.commit() {
                Ok(_) => return Ok(key_bytes),
                Err(_) => continue,
            }
        }
    }

    fn remove(&self, key: &[u8], _soft: bool) -> DbResult<()> {
        loop {
            let mut tx = self.db.write_tx().map_err(DbError::from)?;
            tx.remove(&self.tree, key);
            match tx.commit() {
                Ok(_) => return Ok(()),
                Err(_) => continue,
            }
        }
    }

    fn take(&self, key: &[u8], _soft: bool) -> DbResult<Option<Vec<u8>>> {
        loop {
            let mut tx = self.db.write_tx().map_err(DbError::from)?;
            let old = tx.get(&self.tree, key).map_err(DbError::from)?;

            if old.is_some() {
                tx.remove(&self.tree, key);
            }

            match tx.commit() {
                Ok(_) => return Ok(old.map(|v: fjall::Slice| v.to_vec())),
                Err(_) => continue,
            }
        }
    }

    fn count(&self, exact: bool) -> DbResult<u64> {
        if exact {
            let tx = self.db.read_tx();
            Ok(tx.range::<Vec<u8>, _>(&self.tree, ..).count() as u64)
        } else {
            Ok(self.tree.approximate_len() as u64)
        }
    }

    fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()> {
        loop {
            let mut tx = self.db.write_tx().map_err(DbError::from)?;
            for (key, value) in &items {
                match value {
                    Some(v) => tx.insert(&self.tree, key, v),
                    None => tx.remove(&self.tree, key),
                }
            }
            match tx.commit() {
                Ok(_) => return Ok(()),
                Err(_) => continue,
            }
        }
    }
}