use core::borrow::Borrow;
use std::{fmt::Debug, ops::Deref, sync::Arc};
use armour_derive::armour_metrics;
use fjall::{KeyspaceCreateOptions, OptimisticTxKeyspace, Readable, Slice};
use super::{builder::TxTreeBuilder, db::TxDb, raw::TxRawTree, read::TxReadTree};
use crate::{
Cid, DbError, DbResult, Record,
indexes::IndexUpdateCollection,
logdb::ByteValue,
utils::{Change, EventHandlers},
};
use arrayvec::ArrayVec;
pub type IndexesMap<Item> = ArrayVec<Box<dyn IndexUpdateCollection<Item>>, 8>;
#[derive(derive_more::Debug)]
pub struct TxTree<Item: Record> {
pub inner: TxReadTree<Item>,
#[debug("{}", self.index_maps.len())]
pub(crate) index_maps: Arc<IndexesMap<Item>>,
pub(crate) handlers: Arc<EventHandlers<Item>>,
}
impl<Item: Record> Clone for TxTree<Item> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
index_maps: self.index_maps.clone(),
handlers: self.handlers.clone(),
}
}
}
impl<Item: Record> Deref for TxTree<Item> {
type Target = TxReadTree<Item>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<Item> TxTree<Item>
where
Item: Record<Value = Slice> + std::fmt::Debug,
{
#[doc(alias = "new")]
#[doc(alias = "create")]
pub fn open(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> Self {
TxTreeBuilder::<Item>::new(db, options).build()
}
pub fn with_name(db: &TxDb, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
TxTreeBuilder::<Item>::with_name(db, name, options).build()
}
pub fn builder(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> TxTreeBuilder<Item> {
TxTreeBuilder::<Item>::new(db, options)
}
#[doc(alias = "inner")]
pub fn raw(&self) -> &TxRawTree {
&self.inner.raw
}
pub(crate) fn tree(&self) -> &OptimisticTxKeyspace {
&self.inner.raw.tree
}
#[inline]
pub(super) fn invalidate_hash(&self, id: &Item::SelfId) {
let group = id.group_id();
self.raw().inner.invalidate_hash(group);
}
#[inline]
pub(super) fn upsert_event(&self, id: &Item::SelfId, old: Option<&Item>, new: &Item) {
if let Some(f) = self.handlers.on_upsert.as_ref() {
f(id, Change { old, new });
}
}
#[inline]
pub(super) fn remove_event(&self, id: &Item::SelfId, item: &Item) {
if let Some(f) = self.handlers.on_remove.as_ref() {
f(id, item);
}
}
#[inline]
pub(super) fn _id_change_event(
&self,
item: &Item,
old_id: &Item::SelfId,
new_id: &Item::SelfId,
) {
if let Some(f) = self.handlers.on_id_change.as_ref() {
f(item, old_id, new_id);
}
}
#[inline]
pub(super) fn update_indexes(&self, id: &Item::SelfId, old: Option<&Item>, new: Option<&Item>) {
for map in self.index_maps.iter() {
map.update(id, old, new);
}
}
}
impl<Item: Record> From<TxTree<Item>> for TxRawTree {
fn from(value: TxTree<Item>) -> Self {
value.inner.raw
}
}
impl<Item> TxTree<Item>
where
Item: Record<Value = Slice> + Debug,
{
#[doc(alias = "insert")]
#[doc(alias = "update")]
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
pub fn upsert(
&self,
id: impl Borrow<Item::SelfId> + Debug,
item: &Item,
) -> DbResult<Option<Item>> {
let bytes = Item::ser(item);
let borrowed_id = id.borrow();
let key = Cid::encode(borrowed_id);
let key_bytes = ByteValue::from(key.as_ref());
(|| loop {
let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;
let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;
let old_item = old_bytes.as_ref().map(|b| Item::deser(b));
tx.insert(self.tree(), key_bytes.clone(), bytes.clone());
match tx.commit() {
Ok(_) => {
if old_item.is_none() {
self.raw()
.inner
.seq
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
self.update_indexes(borrowed_id, old_item.as_ref(), Some(item));
self.upsert_event(borrowed_id, old_item.as_ref(), item);
self.invalidate_hash(borrowed_id);
return Ok(old_item);
}
Err(_) => continue,
}
})()
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
pub fn upsert_fn<F, Ret>(
&self,
id: impl Borrow<Item::SelfId> + Debug,
f: F,
) -> DbResult<(Option<Item>, Ret)>
where
F: Fn(Option<Item>) -> (Option<Item>, Ret),
Item: Clone,
Ret: Debug,
{
let borrowed_id = id.borrow();
let key = Cid::encode(borrowed_id);
let key_bytes = ByteValue::from(key.as_ref());
(|| loop {
let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;
let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;
let old = old_bytes.as_ref().map(|b| Item::deser(b));
let old_cloned = old.clone();
let (new, ret) = f(old);
let new_bytes = new.as_ref().map(|item| Item::ser(item));
let changed = match (&old_bytes, &new_bytes) {
(Some(o), Some(n)) => o.as_ref() != n.as_ref(),
(None, None) => false,
_ => true,
};
if changed {
match &new_bytes {
Some(bytes) => tx.insert(self.tree(), key_bytes.clone(), bytes.clone()),
None => tx.remove(self.tree(), key_bytes.clone()),
}
}
match tx.commit() {
Ok(_) => {
if changed {
match (&old_cloned, &new) {
(None, Some(_)) => {
self.raw()
.inner
.seq
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
(Some(_), None) => {
self.raw()
.inner
.seq
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
}
_ => {}
}
if !self.index_maps.is_empty() {
let old_ref = old_cloned.as_ref();
let new_ref = new.as_ref();
self.update_indexes(borrowed_id, old_ref, new_ref);
}
match new.as_ref() {
Some(new) => {
self.upsert_event(borrowed_id, old_cloned.as_ref(), new);
}
None => {
if let Some(old) = old_cloned.as_ref() {
self.remove_event(borrowed_id, old);
}
}
}
self.invalidate_hash(borrowed_id);
}
return Ok((new, ret));
}
Err(_) => {
continue;
}
}
})()
}
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
pub fn update_fn<F>(&self, id: impl Borrow<Item::SelfId> + Debug, f: F) -> DbResult<Item>
where
F: FnOnce(Item) -> DbResult<Item>,
Item: Clone,
{
let borrowed_id = id.borrow();
let key = Cid::encode(borrowed_id);
let key_bytes = ByteValue::from(key.as_ref());
let mut tx = self.raw().db.db.write_tx().map_err(DbError::from)?;
let old_bytes = tx.get(self.tree(), &key_bytes).map_err(DbError::from)?;
match old_bytes {
Some(old_bytes) => {
let old = Item::deser(&old_bytes);
let old_cloned = old.clone();
let new = f(old)?;
let new_bytes = Item::ser(&new);
tx.insert(self.tree(), key_bytes, new_bytes);
match tx.commit() {
Ok(_) => {
self.update_indexes(borrowed_id, Some(&old_cloned), Some(&new));
self.upsert_event(borrowed_id, Some(&old_cloned), &new);
self.invalidate_hash(borrowed_id);
Ok(new)
}
Err(_) => Err(DbError::Transaction),
}
}
None => Err(DbError::NotFound),
}
}
#[doc(alias = "delete")]
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
pub fn remove(&self, id: impl Borrow<Item::SelfId> + Debug) -> DbResult<Option<Item>> {
let key = Cid::encode(id.borrow());
let key_bytes = ByteValue::from(key.as_ref());
let old = self.tree().take(key_bytes)?;
let item = old.map(|old| {
let item = Item::deser_owned(old);
self.raw()
.inner
.seq
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
self.update_indexes(id.borrow(), Some(&item), None);
self.remove_event(id.borrow(), &item);
item
});
self.invalidate_hash(id.borrow());
Ok(item)
}
#[doc(alias = "delete")]
#[instrument(level = "debug", skip_all, fields(name = Item::NAME))]
#[armour_metrics(prefix = "armour_txdb_tree", name = self.raw().static_name())]
pub fn soft_remove(&self, id: impl Borrow<Item::SelfId> + Debug) -> DbResult<Option<Item>> {
let key = Cid::encode(id.borrow());
let key_bytes = ByteValue::from(key.as_ref());
match self.tree().take(key_bytes.clone())? {
Some(val) => {
let item = Item::deser(&val);
self.raw()
.inner
.seq
.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
self.update_indexes(id.borrow(), Some(&item), None);
self.remove_event(id.borrow(), &item);
if let Err(err) = self.raw().removed.insert(key_bytes, val) {
error!("Failed to insert removed item: {}", err);
}
self.invalidate_hash(id.borrow());
Ok(Some(item))
}
None => Ok(None),
}
}
pub fn apply_batch<K, V>(&self, iter: impl Iterator<Item = (K, Option<V>)>) -> DbResult<()>
where
K: AsRef<Item::SelfId>,
V: AsRef<Item>,
{
self.inner.raw.apply_batch(iter.map(|(id, item)| {
let key_bytes = Cid::encode(id.as_ref());
let key_bytes = ByteValue::from(key_bytes.as_ref());
let val_bytes = item.map(|item| Item::ser(item.as_ref()));
(key_bytes, val_bytes)
}))
}
pub fn next_id(&self) -> DbResult<u64> {
self.inner.raw.next_id()
}
}