use std::error::Error;
use std::str::from_utf8;
use std::time::{
Duration,
SystemTime,
UNIX_EPOCH
};
use sled::transaction::{
ConflictableTransactionError,
TransactionError,
Transactional,
TransactionalTree
};
use crate::db::errors::TransientError;
use crate::db::transaction::metric_handler::GuardMetricChanged;
use crate::{
DB,
Metadata
};
pub mod metric_handler;
pub struct TransactionalGuard<'a> {
data_tree: &'a TransactionalTree,
meta_tree: &'a TransactionalTree,
ttl_tree: &'a TransactionalTree,
changed_metric: &'a mut GuardMetricChanged
}
impl<'a> TransactionalGuard<'a> {
pub fn set(
&mut self,
key: &str,
val: &str,
ttl: Option<Duration>
) -> Result<(), Box<dyn Error>> {
let data_tree = &self.data_tree;
let freq_tree = &self.meta_tree;
let ttl_tree = &self.ttl_tree;
let byte = key.as_bytes();
let ttl_sec = match ttl {
Some(t) => {
let systime = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Cant get SystemTime");
Some((t + systime).as_secs())
},
None => None
};
match freq_tree.get(byte)? {
Some(m) => {
let mut meta = Metadata::from_u8(&m)?;
if let Some(t) = meta.ttl {
let _ = ttl_tree.remove([&t.to_be_bytes()[..], byte].concat());
}
meta.ttl = ttl_sec;
freq_tree.insert(byte, meta.to_u8()?)?;
},
None => {
freq_tree.insert(byte, Metadata::new(ttl_sec).to_u8()?)?;
}
}
data_tree.insert(byte, val.as_bytes())?;
if let Some(d) = ttl_sec {
ttl_tree.insert([&d.to_be_bytes()[..], byte].concat(), byte)?;
self.changed_metric.ttl_keys_total_changed += 1;
};
self.changed_metric.keys_total_changed += 1;
self.changed_metric.set_operation_total += 1;
Ok(())
}
pub fn get(&mut self, key: &str) -> Result<Option<String>, Box<dyn Error>> {
let data_tree = &self.data_tree;
let byte = key.as_bytes();
let val = data_tree.get(byte)?;
self.changed_metric.get_operation_total += 1;
match val {
Some(val) => Ok(Some(from_utf8(&val)?.to_string())),
None => Ok(None)
}
}
pub fn increment_frequency(&mut self, key: &str) -> Result<(), Box<dyn Error>> {
let freq_tree = &self.meta_tree;
let byte = &key.as_bytes();
let metadata = freq_tree
.get(byte)?
.ok_or(TransientError::IncretmentError)?;
let meta = Metadata::from_u8(&metadata)?;
freq_tree.remove(*byte)?;
freq_tree.insert(*byte, meta.freq_incretement().to_u8()?)?;
self.changed_metric.inc_freq_operation_total += 1;
Ok(())
}
pub fn remove(&mut self, key: &str) -> Result<(), Box<dyn Error>> {
let data_tree = &self.data_tree;
let freq_tree = &self.meta_tree;
let ttl_tree = &self.ttl_tree;
let byte = &key.as_bytes();
data_tree.remove(*byte)?;
let meta = freq_tree
.get(byte)?
.ok_or(TransientError::MetadataNotFound)?;
let time = Metadata::from_u8(&meta)?.ttl;
freq_tree.remove(*byte)?;
self.changed_metric.keys_total_changed -= 1;
if let Some(t) = time {
self.changed_metric.ttl_keys_total_changed -= 1;
let _ = ttl_tree.remove([&t.to_be_bytes()[..], &byte[..]].concat());
}
self.changed_metric.rm_operation_total += 1;
Ok(())
}
pub fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, Box<dyn Error>> {
let freq_tree = &self.meta_tree;
let byte = key.as_bytes();
let meta = freq_tree.get(byte)?;
match meta {
Some(val) => Ok(Some(Metadata::from_u8(&val)?)),
None => Ok(None)
}
}
}
impl DB {
pub fn transaction<F>(&self, f: F) -> Result<(), TransientError>
where
F: Fn(&mut TransactionalGuard) -> Result<(), Box<dyn Error>>
{
let l: Result<GuardMetricChanged, TransactionError<()>> =
(&*self.data_tree, &*self.meta_tree, &*self.ttl_tree).transaction(
|(data_tree, meta_tree, ttl_tree)| {
let mut guard_metrics = GuardMetricChanged {
keys_total_changed: 0,
ttl_keys_total_changed: 0,
set_operation_total: 0,
rm_operation_total: 0,
inc_freq_operation_total: 0,
get_operation_total: 0
};
let mut transaction_guard = TransactionalGuard {
data_tree,
meta_tree,
ttl_tree,
changed_metric: &mut guard_metrics
};
f(&mut transaction_guard)
.map_err(|_| ConflictableTransactionError::Abort(()))?;
Ok(guard_metrics)
}
);
l.map_err(|_| TransientError::SledTransactionError)?
.inc_all_metrics();
Ok(())
}
}