use std::{borrow::Borrow, fmt::Debug, ops::RangeBounds};
use armour_derive::armour_metrics;
use fjall::{Conflict, Readable, Slice};
use tracing::instrument;
use crate::logdb::{ItemWithRaw, IterTree, TryIterTree, helpers::*};
use crate::metrics::{counter, histogram};
use crate::types::ArmourError;
use crate::{Cid, DbError, DbResult, Entry, Record, TxReadTree};
pub struct WriteTx {
pub(crate) inner: fjall::OptimisticWriteTx,
}
impl WriteTx {
pub fn rollback(self) {
self.inner.rollback();
}
pub fn commit(self) -> DbResult<Result<(), Conflict>> {
self.inner.commit().map_err(DbError::from)
}
}
impl WriteTx {
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn get<Item>(
&self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
) -> DbResult<Option<Item>>
where
Item: Record<Value = Slice> + Debug,
{
let key = Cid::encode(id.borrow());
let val = self
.inner
.get(&tree.raw.tree, key.as_ref())
.map_err(DbError::from)?;
match val {
Some(val) => Ok(Some(Item::deser_owned(val))),
None => Ok(None),
}
}
pub fn get_or_err<Item>(
&self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
) -> DbResult<Item>
where
Item: Record<Value = Slice> + Debug,
{
self.get(tree, id)
.and_then(|item| item.ok_or(DbError::NotFound))
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn contains<Item>(
&self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
) -> DbResult<bool>
where
Item: Record<Value = Slice> + Debug,
{
let key = Cid::encode(id.borrow());
self.inner
.contains_key(&tree.raw.tree, key.as_ref())
.map_err(DbError::from)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn size_of<Item>(
&self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
) -> DbResult<Option<u32>>
where
Item: Record<Value = Slice> + Debug,
{
let key = Cid::encode(id.borrow());
self.inner
.size_of(&tree.raw.tree, key.as_ref())
.map_err(DbError::from)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn first<Item>(&self, tree: &TxReadTree<Item>) -> DbResult<Option<Entry<Item>>>
where
Item: Record<Value = Slice> + Debug,
{
let val = self.inner.first_key_value(&tree.raw.tree);
match val {
Some(val) => {
let (key, val) = val.into_inner().map_err(DbError::from)?;
let id = Item::deser_key(&key);
let value = Item::deser_owned(val);
Ok(Some(Entry::new(id, value)))
}
None => Ok(None),
}
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn last<Item>(&self, tree: &TxReadTree<Item>) -> DbResult<Option<Entry<Item>>>
where
Item: Record<Value = Slice> + Debug,
{
let val = self.inner.last_key_value(&tree.raw.tree);
match val {
Some(val) => {
let (key, val) = val.into_inner().map_err(DbError::from)?;
let id = Item::deser_key(&key);
let value = Item::deser_owned(val);
Ok(Some(Entry::new(id, value)))
}
None => Ok(None),
}
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn keys<Item>(
&self,
tree: &TxReadTree<Item>,
) -> impl DoubleEndedIterator<Item = Item::SelfId>
where
Item: Record<Value = Slice> + Debug,
{
counter!("armour_write_tx_keys", "name" => Item::NAME).increment(1);
self.inner
.iter(&tree.raw.tree)
.filter_map(entry_key_filter_map::<Item>)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn iter<Item>(&self, tree: &TxReadTree<Item>) -> IterTree<Entry<Item>>
where
Item: Record<Value = Slice> + Debug,
{
counter!("armour_write_tx_iter", "name" => Item::NAME).increment(1);
self.inner.iter(&tree.raw.tree).filter_map(entry_filter_map)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn range<Item, K, R>(&self, tree: &TxReadTree<Item>, range: R) -> IterTree<Entry<Item>>
where
Item: Record<Value = Slice> + Debug,
K: AsRef<[u8]>,
R: RangeBounds<K> + Debug,
{
counter!("armour_write_tx_range", "name" => Item::NAME).increment(1);
self.inner
.range(&tree.raw.tree, range)
.filter_map(entry_filter_map::<Item> as fn(fjall::Guard) -> Option<Entry<Item>>)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn prefix<Item, P>(&self, tree: &TxReadTree<Item>, prefix: P) -> IterTree<Entry<Item>>
where
Item: Record<Value = Slice> + Debug,
P: AsRef<[u8]> + Debug,
{
counter!("armour_write_tx_prefix", "name" => Item::NAME).increment(1);
self.inner
.prefix(&tree.raw.tree, prefix)
.filter_map(entry_filter_map::<Item> as fn(fjall::Guard) -> Option<Entry<Item>>)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn prefix_key<Item, P>(&self, tree: &TxReadTree<Item>, prefix: P) -> IterTree<Item::SelfId>
where
Item: Record<Value = Slice> + Debug,
P: AsRef<[u8]> + Debug,
{
counter!("armour_write_tx_prefix_key", "name" => Item::NAME).increment(1);
self.inner.prefix(&tree.raw.tree, prefix).filter_map(
entry_key_filter_map::<Item> as fn(fjall::Guard) -> Option<<Item as Record>::SelfId>,
)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn try_iter<Item>(&self, tree: &TxReadTree<Item>) -> TryIterTree<Entry<Item>>
where
Item: Record<Value = Slice> + Debug,
{
counter!("armour_write_tx_try_iter", "name" => Item::NAME).increment(1);
self.inner
.iter(&tree.raw.tree)
.map(entry_map::<Item> as fn(fjall::Guard) -> Result<Entry<Item>, DbError>)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn try_range<Item, K, R>(
&self,
tree: &TxReadTree<Item>,
range: R,
) -> TryIterTree<ItemWithRaw<Item>>
where
Item: Record<Value = Slice> + Debug,
K: AsRef<[u8]>,
R: RangeBounds<K> + Debug,
{
counter!("armour_write_tx_try_range", "name" => Item::NAME).increment(1);
self.inner.range(&tree.raw.tree, range).map(
entry_map_with_raw::<Item>
as fn(fjall::Guard) -> Result<(Slice, Slice, Entry<Item>), DbError>,
)
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
pub fn try_prefix<Item, P>(
&self,
tree: &TxReadTree<Item>,
prefix: P,
) -> TryIterTree<Entry<Item>>
where
Item: Record<Value = Slice> + Debug,
P: AsRef<[u8]> + Debug,
{
counter!("armour_write_tx_try_prefix", "name" => Item::NAME).increment(1);
self.inner
.prefix(&tree.raw.tree, prefix)
.map(entry_map::<Item> as fn(fjall::Guard) -> Result<Entry<Item>, DbError>)
}
}
impl WriteTx {
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn insert<Item>(
&mut self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
value: Item,
) where
Item: Record<Value = Slice> + Debug,
{
let encoded = Cid::encode(id.borrow());
let key = encoded.as_ref();
let serialized = Item::ser(&value);
self.inner.insert(&tree.raw.tree, key, serialized);
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn remove<Item>(&mut self, tree: &TxReadTree<Item>, id: impl Borrow<Item::SelfId> + Debug)
where
Item: Record<Value = Slice> + Debug,
{
let encoded = Cid::encode(id.borrow());
let key = encoded.as_ref();
self.inner.remove(&tree.raw.tree, key);
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn take<Item>(
&mut self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
) -> DbResult<Option<Item>>
where
Item: Record<Value = Slice> + Debug,
{
let encoded = Cid::encode(id.borrow());
let key = encoded.as_ref();
let val = self.inner.take(&tree.raw.tree, key)?;
match val {
Some(val) => Ok(Some(Item::deser_owned(val))),
None => Ok(None),
}
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn update_fetch<Item, F>(
&mut self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
mut f: F,
) -> DbResult<Option<Item>>
where
Item: Record<Value = Slice> + Debug,
F: FnMut(Option<&Item>) -> Option<Item>,
{
let encoded = Cid::encode(id.borrow());
let key = encoded.as_ref();
let val = self
.inner
.update_fetch(&tree.raw.tree, key, |opt_val| match opt_val {
Some(val) => {
let item = Item::deser_owned(val.clone());
f(Some(&item)).map(|new_item| Item::ser(&new_item))
}
None => f(None).map(|new_item| Item::ser(&new_item)),
})?;
match val {
Some(val) => Ok(Some(Item::deser_owned(val))),
None => Ok(None),
}
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn fetch_update<Item, F>(
&mut self,
tree: &TxReadTree<Item>,
id: impl Borrow<Item::SelfId> + Debug,
mut f: F,
) -> DbResult<Option<Item>>
where
Item: Record<Value = Slice> + Debug,
F: FnMut(Option<&Item>) -> Option<Item>,
{
let encoded = Cid::encode(id.borrow());
let key = encoded.as_ref();
let val = self
.inner
.fetch_update(&tree.raw.tree, key, |opt_val| match opt_val {
Some(val) => {
let item = Item::deser_owned(val.clone());
f(Some(&item)).map(|new_item| Item::ser(&new_item))
}
None => f(None).map(|new_item| Item::ser(&new_item)),
})?;
match val {
Some(val) => Ok(Some(Item::deser_owned(val))),
None => Ok(None),
}
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_write_tx", name = Item::NAME)]
pub fn next_id<Item>(&mut self, tree: &TxReadTree<Item>) -> DbResult<u64>
where
Item: Record<Value = Slice> + Debug,
{
let next_id_key = format!("next_id-{}", tree.raw.name);
let key_ref = next_id_key.as_bytes();
let current = self.inner.get(&tree.raw.db.seq_tree, key_ref)?;
let mut id = 1u64;
if let Some(bytes) = current {
let bytes = bytes
.as_ref()
.try_into()
.map_err(|err| DbError::Armour(ArmourError::from(err)))?;
let old = u64::from_le_bytes(bytes);
id = old + 1;
}
let next_val = id.to_le_bytes();
self.inner.insert(&tree.raw.db.seq_tree, key_ref, next_val);
Ok(id)
}
}