use crate::err::Error;
use crate::Database;
use imbl::ordmap::Entry;
use imbl::OrdMap;
use std::borrow::Borrow;
use std::fmt::Debug;
use std::mem::drop;
use std::ops::Range;
use std::sync::Arc;
use tokio::sync::OwnedMutexGuard;
pub struct Transaction<K, V>
where
K: Ord + Clone + Debug + Sync + Send + 'static,
V: Eq + Clone + Debug + Sync + Send + 'static,
{
done: bool,
write: bool,
snapshot: OrdMap<K, V>,
database: Database<K, V>,
writelock: Option<OwnedMutexGuard<()>>,
}
impl<K, V> Transaction<K, V>
where
K: Ord + Clone + Debug + Sync + Send + 'static,
V: Eq + Clone + Debug + Sync + Send + 'static,
{
pub(crate) fn read(db: Database<K, V>, lock: Option<OwnedMutexGuard<()>>) -> Transaction<K, V> {
Transaction {
done: false,
write: false,
snapshot: (*(*db.datastore.load())).clone(),
database: db,
writelock: lock,
}
}
pub(crate) fn write(
db: Database<K, V>,
lock: Option<OwnedMutexGuard<()>>,
) -> Transaction<K, V> {
Transaction {
done: false,
write: true,
snapshot: (*(*db.datastore.load())).clone(),
database: db,
writelock: lock,
}
}
pub fn closed(&self) -> bool {
self.done
}
pub fn cancel(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
self.done = true;
if let Some(lock) = self.writelock.take() {
drop(lock);
}
Ok(())
}
pub fn commit(&mut self) -> Result<(), Error> {
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
self.done = true;
self.database.datastore.store(Arc::new(self.snapshot.clone()));
if let Some(lock) = self.writelock.take() {
drop(lock);
}
Ok(())
}
pub fn exists<Q>(&self, key: Q) -> Result<bool, Error>
where
Q: Borrow<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
let res = self.snapshot.contains_key(key.borrow());
Ok(res)
}
pub fn get<Q>(&self, key: Q) -> Result<Option<V>, Error>
where
Q: Borrow<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
let res = self.snapshot.get(key.borrow()).cloned();
Ok(res)
}
pub fn set<Q>(&mut self, key: Q, val: V) -> Result<(), Error>
where
Q: Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
self.snapshot.insert(key.into(), val);
Ok(())
}
pub fn put<Q>(&mut self, key: Q, val: V) -> Result<(), Error>
where
Q: Borrow<K> + Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match self.snapshot.entry(key.into()) {
Entry::Vacant(v) => {
v.insert(val);
}
_ => return Err(Error::KeyAlreadyExists),
};
Ok(())
}
pub fn putc<Q>(&mut self, key: Q, val: V, chk: Option<V>) -> Result<(), Error>
where
Q: Borrow<K> + Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match (self.snapshot.entry(key.into()), &chk) {
(Entry::Occupied(mut v), Some(w)) if v.get() == w => {
v.insert(val);
}
(Entry::Vacant(v), None) => {
v.insert(val);
}
_ => return Err(Error::ValNotExpectedValue),
};
Ok(())
}
pub fn del<Q>(&mut self, key: Q) -> Result<(), Error>
where
Q: Borrow<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
self.snapshot.remove(key.borrow());
Ok(())
}
pub fn delc<Q>(&mut self, key: Q, chk: Option<V>) -> Result<(), Error>
where
Q: Borrow<K> + Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
if self.write == false {
return Err(Error::TxNotWritable);
}
match (self.snapshot.entry(key.into()), &chk) {
(Entry::Occupied(v), Some(w)) if v.get() == w => {
v.remove();
}
(Entry::Vacant(_), None) => {
}
_ => return Err(Error::ValNotExpectedValue),
};
Ok(())
}
pub fn keys<Q>(&self, rng: Range<Q>, limit: usize) -> Result<Vec<K>, Error>
where
Q: Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
let beg = rng.start.into();
let end = rng.end.into();
let res = self.snapshot.range(beg..end).take(limit).map(|(k, _)| k.clone()).collect();
Ok(res)
}
pub fn scan<Q>(&self, rng: Range<Q>, limit: usize) -> Result<Vec<(K, V)>, Error>
where
Q: Into<K>,
{
if self.done == true {
return Err(Error::TxClosed);
}
let beg = rng.start.into();
let end = rng.end.into();
let res = self
.snapshot
.range(beg..end)
.take(limit)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
Ok(res)
}
}