use core::{ops::RangeBounds, sync::atomic::AtomicBool};
use std::{
ops::Bound,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use armour_derive::armour_metrics;
use derive_more::Debug;
use fjall::{Keyspace, OptimisticTxKeyspace, Readable, UserValue};
use xxhash_rust::xxh3::Xxh3Default;
use super::db::TxDb;
use crate::{
DbError, DbResult,
logdb::{ByteValue, RawIterTree, events::ChangeEvent, raw_filter_map},
types::{ArmourError, attribute::EntityAttribute, num_ops::g4bits},
utils::{CheckSumVec, CollectionInfo, GroupVal, HashPoints},
};
#[derive(Debug)]
pub(crate) struct InnerFields {
pub(crate) info: CollectionInfo,
pub(crate) hashpoints: HashPoints,
pub(crate) seq: AtomicU64,
}
impl InnerFields {
pub(crate) fn invalidate_hash(&self, group_id: u32) {
self.hashpoints.insert(
group_id,
GroupVal {
hash: 0,
changed: true,
},
);
}
}
#[derive(Clone, Debug)]
pub struct TxRawTree {
pub name: String,
pub partition_name: String,
pub hashname: u64,
pub attributes: &'static EntityAttribute,
#[debug(skip)]
pub tree: OptimisticTxKeyspace,
#[debug(skip)]
pub(crate) removed: Keyspace,
pub(crate) inner: Arc<InnerFields>,
pub(crate) meta_saved: Arc<AtomicBool>,
pub(crate) db: TxDb,
}
impl Drop for TxRawTree {
fn drop(&mut self) {
let count = Arc::strong_count(&self.meta_saved);
if count == 1 {
self.close();
}
}
}
impl TxRawTree {
pub fn static_name(&self) -> &'static str {
self.attributes.name
}
pub fn is_empty(&self) -> bool {
self.tree.first_key_value().is_none()
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub fn count(&self) -> u64 {
self.inner.seq.load(Ordering::Relaxed)
}
#[instrument(skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn hashpoints(&self) -> CheckSumVec {
self.inner
.hashpoints
.iter()
.map(|entry| {
let key = *entry.key();
(key, entry.value().hash)
})
.collect()
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub fn scan_group(&self, group: u32) -> RawIterTree {
counter!("armour_txdb_rawtree_scan_group_total", "name" => self.name.clone()).increment(1);
let start_bytes = group.to_be_bytes();
let group_bits = self.attributes.group_bits;
let bits_sub = u32::BITS - group_bits;
let bits_pow_of_two = 2u32.pow(bits_sub);
let end = group + bits_pow_of_two;
let end_bytes = end.to_be_bytes();
let start_bound = Bound::Included(start_bytes.to_vec());
let end_bound = Bound::Excluded(end_bytes.to_vec());
let range = (start_bound, end_bound);
let tx = self.db.db.read_tx();
tx.range(&self.tree, range).filter_map(raw_filter_map)
}
#[instrument(skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn recalcucate_hash(&self) -> u64 {
let hash = self
.inner
.hashpoints
.iter()
.map(|item| {
let group_val = item.value();
if group_val.changed {
let group = *item.key();
drop(item);
let mut hash_val = Xxh3Default::new();
for (key, value) in self.scan_group(group) {
hash_val.update(&key);
hash_val.update(&value);
}
let hash = hash_val.digest();
self.inner.hashpoints.insert(
group,
GroupVal {
hash,
changed: false,
},
);
hash
} else {
item.value().hash
}
})
.fold(Xxh3Default::new(), |mut hasher, item| {
hasher.update(&item.to_le_bytes());
hasher
});
hash.digest()
}
#[instrument(skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn close(&self) {
if !self.meta_saved.swap(true, Ordering::AcqRel) {
let seq = self.inner.seq.load(Ordering::SeqCst);
if seq != 0 {
debug!(seq, "close seq");
}
let typ_hash = self.attributes.ty.h();
let version = self.attributes.version;
let info = CollectionInfo { typ_hash, version };
self.db.db_info.update(|db_info| {
db_info.collections.insert(self.name.clone(), info);
});
} else {
warn!("tree already closed");
}
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn get(&self, id: &[u8]) -> DbResult<Option<Vec<u8>>> {
let start = std::time::Instant::now();
let res = self
.tree
.get(id)
.map(|item| {
item.map(|item| {
let len = id.len() + item.len();
let mut v = vec![0; len];
v[..id.len()].copy_from_slice(id);
v[id.len()..].copy_from_slice(&item);
v
})
})
.map_err(DbError::from);
histogram!("armour_txdb_rawtree_get_duration", "name" => self.name.clone())
.record(start.elapsed().as_secs_f64());
counter!("armour_txdb_rawtree_get_total", "name" => self.name.clone()).increment(1);
res
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub fn iter(&self) -> RawIterTree {
counter!("armour_logdb_rawtree_range_total", "name" => self.name.clone()).increment(1);
let tx = self.db.db.read_tx();
let iter = tx.iter(&self.tree);
iter.filter_map(|item| match item.into_inner() {
Ok((key, value)) => Some((key, value)),
Err(e) => {
error!(%e);
None
}
})
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub fn range<K: AsRef<[u8]>, R: RangeBounds<K> + std::fmt::Debug>(
&self,
range: R,
) -> RawIterTree {
counter!("armour_txdb_rawtree_range_total", "name" => self.name.clone()).increment(1);
let tx = self.db.db.read_tx();
let iter = tx.range(&self.tree, range);
iter.filter_map(|guard| match guard.into_inner() {
Ok(kv) => Some(kv),
Err(e) => {
error!(%e);
None
}
})
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub fn prefix<K: AsRef<[u8]> + std::fmt::Debug>(&self, prefix: K) -> RawIterTree {
counter!("armour_logdb_rawtree_range_total", "name" => self.name.clone()).increment(1);
let tx = self.db.db.read_tx();
let iter = tx.prefix(&self.tree, prefix);
iter.filter_map(|item| match item.into_inner() {
Ok((key, value)) => Some((key, value)),
Err(e) => {
error!(%e);
None
}
})
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
pub(crate) fn invalidate_hash(&self, id: &[u8]) {
let mut bytes = [0; 4];
bytes.copy_from_slice(&id[..4]);
let group = u32::from_be_bytes(bytes);
let group = g4bits(group, self.attributes.group_bits);
self.inner.invalidate_hash(group);
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn next_id(&self) -> DbResult<u64> {
let next_id_key = format!("next_id-{}", self.name);
let key_ref = next_id_key.as_bytes();
(|| loop {
let mut tx = self.db.db.write_tx().map_err(DbError::from)?;
let current = tx.get(&self.db.seq_tree, key_ref).map_err(DbError::from)?;
let mut id = 1u64;
if let Some(bytes) = current {
let bytes = bytes
.as_ref()
.try_into()
.map_err(|err| DbError::Armour(ArmourError::from(err)))?;
let old = u64::from_le_bytes(bytes);
id = old + 1;
}
let next_val = id.to_le_bytes();
tx.insert(&self.db.seq_tree, key_ref, next_val);
match tx.commit() {
Ok(_) => return Ok(id),
Err(_) => continue,
}
})()
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn apply_event(&self, event: ChangeEvent) -> DbResult<()> {
(|| loop {
let mut tx = self.db.db.write_tx().map_err(DbError::from)?;
match &event {
ChangeEvent::Upsert((key, val)) => {
let old = tx.get(&self.tree, key).map_err(DbError::from)?;
tx.insert(&self.tree, key.clone(), val.clone());
match tx.commit() {
Ok(_) => {
if old.is_none() {
self.inner.seq.fetch_add(1, Ordering::Relaxed);
}
self.invalidate_hash(event.key());
return Ok(());
}
Err(_) => continue,
}
}
ChangeEvent::Delete(key) => {
let exists = tx.contains_key(&self.tree, key).map_err(DbError::from)?;
tx.remove(&self.tree, key.clone());
match tx.commit() {
Ok(_) => {
if exists {
self.inner.seq.fetch_sub(1, Ordering::AcqRel);
} else {
error!(?key, "delete not found");
}
self.invalidate_hash(event.key());
return Ok(());
}
Err(_) => continue,
}
}
}
})()
}
#[instrument(level = "debug", skip_all, fields(name = self.name))]
#[armour_metrics(prefix = "armour_txdb_raw", name = self.static_name())]
pub fn apply_batch<Val>(
&self,
iter: impl Iterator<Item = (ByteValue, Option<Val>)>,
) -> DbResult<()>
where
Val: Into<UserValue>,
{
let mut tx = self.db.db.write_tx()?;
let mut seq_delta: i64 = 0;
for (key, val) in iter {
self.invalidate_hash(&key);
let old_exists = tx.contains_key(&self.tree, &key)?;
match val {
Some(val) => {
tx.insert(&self.tree, key, val);
if !old_exists {
seq_delta += 1;
}
}
None => {
tx.remove(&self.tree, key);
if old_exists {
seq_delta -= 1;
}
}
}
}
match tx.commit()? {
Ok(_) => {
if seq_delta > 0 {
self.inner
.seq
.fetch_add(seq_delta as u64, Ordering::Relaxed);
} else if seq_delta < 0 {
self.inner
.seq
.fetch_sub((-seq_delta) as u64, Ordering::AcqRel);
}
Ok(())
}
Err(_) => Err(DbError::Transaction),
}
}
}