use std::collections::BTreeMap;
use crate::storage::DEFAULT_NAMESPACE_ID;
use crate::{Emdb, Result};
#[cfg(feature = "ttl")]
use crate::ttl::{expires_from_ttl, now_unix_millis, Ttl};
enum Staged {
Insert { value: Vec<u8>, expires_at: u64 },
Remove,
}
pub struct Transaction<'db> {
db: &'db Emdb,
overlay: BTreeMap<Vec<u8>, Staged>,
}
impl<'db> Transaction<'db> {
pub(crate) fn new(db: &'db Emdb) -> Self {
Self {
db,
overlay: BTreeMap::new(),
}
}
pub fn insert(&mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Result<()> {
let key = key.into();
let value = value.into();
#[cfg(feature = "ttl")]
let expires_at = {
let now = now_unix_millis();
expires_from_ttl(Ttl::Default, self.db.inner.default_ttl, now)?.unwrap_or(0)
};
#[cfg(not(feature = "ttl"))]
let expires_at = 0_u64;
let _previous = self
.overlay
.insert(key, Staged::Insert { value, expires_at });
Ok(())
}
#[cfg(feature = "ttl")]
pub fn insert_with_ttl(
&mut self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
ttl: Ttl,
) -> Result<()> {
let key = key.into();
let value = value.into();
let now = now_unix_millis();
let expires_at = expires_from_ttl(ttl, self.db.inner.default_ttl, now)?.unwrap_or(0);
let _previous = self
.overlay
.insert(key, Staged::Insert { value, expires_at });
Ok(())
}
pub fn remove(&mut self, key: impl Into<Vec<u8>>) -> Result<Option<Vec<u8>>> {
let key = key.into();
let prev_visible = self.get(&key)?;
let _previous = self.overlay.insert(key, Staged::Remove);
Ok(prev_visible)
}
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let key = key.as_ref();
if let Some(staged) = self.overlay.get(key) {
return match staged {
Staged::Insert { value, .. } => Ok(Some(value.clone())),
Staged::Remove => Ok(None),
};
}
self.db.get(key)
}
pub fn contains_key(&self, key: impl AsRef<[u8]>) -> Result<bool> {
Ok(self.get(key)?.is_some())
}
pub(crate) fn commit(mut self) -> Result<()> {
let staged = std::mem::take(&mut self.overlay);
if staged.is_empty() {
return Ok(());
}
let engine = &self.db.inner.engine;
let mut bulk_inserts: Vec<(Vec<u8>, Vec<u8>, u64)> = Vec::new();
for (key, staged) in staged {
match staged {
Staged::Insert { value, expires_at } => {
bulk_inserts.push((key, value, expires_at));
}
Staged::Remove => {
let _ = engine.remove(DEFAULT_NAMESPACE_ID, &key)?;
}
}
}
if !bulk_inserts.is_empty() {
engine.insert_many(DEFAULT_NAMESPACE_ID, bulk_inserts)?;
}
Ok(())
}
}