use std::path::{Path, PathBuf};
use std::sync::Arc;
#[cfg(feature = "ttl")]
use std::time::Duration;
use crate::builder::EmdbBuilder;
use crate::lockfile::LockFile;
use crate::storage::{Engine, EngineConfig, DEFAULT_NAMESPACE_ID};
use crate::Result;
#[cfg(feature = "ttl")]
use crate::ttl::{
expires_from_ttl, is_expired, now_unix_millis, record_new, record_set_persist, remaining_ttl,
Ttl,
};
#[cfg(feature = "encrypt")]
use crate::encryption::EncryptionInput;
pub struct Emdb {
pub(crate) inner: Arc<Inner>,
}
impl std::fmt::Debug for Emdb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Emdb")
.field("path", &self.inner.path)
.finish()
}
}
pub(crate) struct Inner {
pub(crate) engine: Engine,
pub(crate) path: PathBuf,
#[cfg(feature = "ttl")]
pub(crate) default_ttl: Option<Duration>,
_lock_file: LockFile,
ephemeral: bool,
}
impl Drop for Inner {
fn drop(&mut self) {
if self.ephemeral {
let path = &self.path;
let display = path.display().to_string();
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_file(format!("{display}.lock"));
}
}
}
impl Clone for Emdb {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl Emdb {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
EmdbBuilder::new().path(path.as_ref().to_path_buf()).build()
}
#[must_use]
#[allow(clippy::expect_used)]
pub fn open_in_memory() -> Self {
EmdbBuilder::new()
.build()
.expect("emdb open_in_memory: tempdir is writable")
}
#[must_use]
pub fn builder() -> EmdbBuilder {
EmdbBuilder::new()
}
#[must_use]
pub fn clone_handle(&self) -> Self {
self.clone()
}
pub(crate) fn from_builder(builder: EmdbBuilder) -> Result<Self> {
let mut path = builder.path.clone();
let has_os_resolution = builder.data_root.is_some()
|| builder.app_name.is_some()
|| builder.database_name.is_some();
if has_os_resolution {
if path.is_some() {
return Err(crate::Error::InvalidConfig(
"EmdbBuilder::path is mutually exclusive with app_name / database_name / data_root",
));
}
path = Some(crate::data_dir::resolve_database_path(
builder.data_root.clone(),
builder.app_name.as_deref(),
builder.database_name.as_deref(),
)?);
}
let (path, ephemeral) = match path {
Some(p) => (p, false),
None => {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
let tid = std::thread::current().id();
p.push(format!("emdb-mem-{nanos}-{tid:?}.emdb"));
(p, true)
}
};
let lock_file = LockFile::acquire(path.as_path())?;
let engine_config = EngineConfig {
path: path.clone(),
flags: 0,
enable_range_scans: builder.enable_range_scans,
flush_policy: builder.flush_policy,
#[cfg(feature = "encrypt")]
encryption_key: builder.encryption_key,
#[cfg(feature = "encrypt")]
cipher: builder.cipher,
#[cfg(feature = "encrypt")]
encryption_passphrase: builder.encryption_passphrase.clone(),
};
let engine = Engine::open(engine_config)?;
let db = Self {
inner: Arc::new(Inner {
engine,
path,
#[cfg(feature = "ttl")]
default_ttl: builder.default_ttl,
_lock_file: lock_file,
ephemeral,
}),
};
#[cfg(feature = "ttl")]
{
let _evicted = db.sweep_expired();
}
Ok(db)
}
#[must_use]
pub fn path(&self) -> &Path {
&self.inner.path
}
pub fn insert(&self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Result<()> {
let key = key.into();
let value = value.into();
#[cfg(feature = "ttl")]
let expires_at = self.compute_default_expires_at()?;
#[cfg(not(feature = "ttl"))]
let expires_at = 0_u64;
self.inner
.engine
.insert(DEFAULT_NAMESPACE_ID, &key, &value, expires_at)
}
pub fn insert_many<I, K, V>(&self, items: I) -> Result<()>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
#[cfg(feature = "ttl")]
let expires_at = self.compute_default_expires_at()?;
#[cfg(not(feature = "ttl"))]
let expires_at = 0_u64;
let owned: Vec<(Vec<u8>, Vec<u8>, u64)> = items
.into_iter()
.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec(), expires_at))
.collect();
self.inner.engine.insert_many(DEFAULT_NAMESPACE_ID, owned)
}
pub fn get_zerocopy(&self, key: impl AsRef<[u8]>) -> Result<Option<crate::ValueRef>> {
let key = key.as_ref();
match self.inner.engine.get_zerocopy(DEFAULT_NAMESPACE_ID, key)? {
None => Ok(None),
Some((value_ref, expires_at)) => {
#[cfg(feature = "ttl")]
{
if expires_at != 0 && is_expired(Some(expires_at), now_unix_millis()) {
return Ok(None);
}
}
#[cfg(not(feature = "ttl"))]
let _ = expires_at;
Ok(Some(value_ref))
}
}
}
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let key = key.as_ref();
#[cfg(feature = "ttl")]
{
match self.inner.engine.get_with_meta(DEFAULT_NAMESPACE_ID, key)? {
None => Ok(None),
Some((value, expires_at)) => {
if expires_at != 0 && is_expired(Some(expires_at), now_unix_millis()) {
Ok(None)
} else {
Ok(Some(value))
}
}
}
}
#[cfg(not(feature = "ttl"))]
{
self.inner.engine.get(DEFAULT_NAMESPACE_ID, key)
}
}
pub fn remove(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
self.inner.engine.remove(DEFAULT_NAMESPACE_ID, key.as_ref())
}
pub fn contains_key(&self, key: impl AsRef<[u8]>) -> Result<bool> {
Ok(self.get(key)?.is_some())
}
pub fn len(&self) -> Result<usize> {
let count = self.inner.engine.record_count(DEFAULT_NAMESPACE_ID)?;
usize::try_from(count)
.map_err(|_| crate::Error::InvalidConfig("record count exceeds usize on this target"))
}
pub fn is_empty(&self) -> Result<bool> {
Ok(self.len()? == 0)
}
pub fn clear(&self) -> Result<()> {
self.inner.engine.clear_namespace(DEFAULT_NAMESPACE_ID)
}
pub fn flush(&self) -> Result<()> {
self.inner.engine.flush()
}
pub fn checkpoint(&self) -> Result<()> {
self.inner.engine.checkpoint()
}
pub fn iter(&self) -> Result<EmdbIter> {
let offsets = self.inner.engine.snapshot_offsets(DEFAULT_NAMESPACE_ID)?;
Ok(EmdbIter::new(Arc::clone(&self.inner), offsets))
}
pub fn keys(&self) -> Result<EmdbKeyIter> {
let offsets = self.inner.engine.snapshot_offsets(DEFAULT_NAMESPACE_ID)?;
Ok(EmdbKeyIter::new(Arc::clone(&self.inner), offsets))
}
pub fn range<R>(&self, range: R) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
R: std::ops::RangeBounds<Vec<u8>>,
{
self.inner.engine.range_scan(DEFAULT_NAMESPACE_ID, range)
}
pub fn range_iter<R>(&self, range: R) -> Result<EmdbRangeIter>
where
R: std::ops::RangeBounds<Vec<u8>>,
{
let pairs = self
.inner
.engine
.snapshot_range_offsets(DEFAULT_NAMESPACE_ID, range)?;
Ok(EmdbRangeIter::new(Arc::clone(&self.inner), pairs))
}
pub fn range_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let prefix = prefix.as_ref();
let start = prefix.to_vec();
let end = next_prefix(prefix);
match end {
Some(end) => self.range(start..end),
None => self.range(start..),
}
}
pub fn range_prefix_iter(&self, prefix: impl AsRef<[u8]>) -> Result<EmdbRangeIter> {
let prefix = prefix.as_ref();
let start = prefix.to_vec();
match next_prefix(prefix) {
Some(end) => self.range_iter(start..end),
None => self.range_iter(start..),
}
}
#[cfg(feature = "ttl")]
pub fn insert_with_ttl(
&self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
ttl: Ttl,
) -> Result<()> {
let key = key.into();
let value = value.into();
let now = now_unix_millis();
let expires_at = expires_from_ttl(ttl, self.inner.default_ttl, now)?.unwrap_or(0);
self.inner
.engine
.insert(DEFAULT_NAMESPACE_ID, &key, &value, expires_at)
}
#[cfg(feature = "ttl")]
pub fn expires_at(&self, key: impl AsRef<[u8]>) -> Result<Option<u64>> {
self.inner
.engine_expires_at(DEFAULT_NAMESPACE_ID, key.as_ref())
}
#[cfg(feature = "ttl")]
pub fn ttl(&self, key: impl AsRef<[u8]>) -> Result<Option<Duration>> {
let exp = self.expires_at(key)?;
match exp {
Some(deadline) if deadline > 0 => Ok(remaining_ttl(deadline, now_unix_millis())),
_ => Ok(None),
}
}
#[cfg(feature = "ttl")]
pub fn persist(&self, key: impl AsRef<[u8]>) -> Result<bool> {
let key = key.as_ref();
let value = match self.inner.engine.get(DEFAULT_NAMESPACE_ID, key)? {
Some(v) => v,
None => return Ok(false),
};
let prev_exp = self
.inner
.engine_expires_at(DEFAULT_NAMESPACE_ID, key)?
.unwrap_or(0);
let had_ttl = prev_exp != 0;
self.inner
.engine
.insert(DEFAULT_NAMESPACE_ID, key, &value, 0)?;
let mut probe = record_new(value, if had_ttl { Some(prev_exp) } else { None });
let _flipped = record_set_persist(&mut probe);
Ok(had_ttl)
}
#[cfg(feature = "ttl")]
pub fn sweep_expired(&self) -> usize {
let snapshot = match self.inner.engine.collect_records(DEFAULT_NAMESPACE_ID) {
Ok(snap) => snap,
Err(_) => return 0,
};
let now = now_unix_millis();
let mut evicted = 0;
for (key, _value, expires_at) in snapshot {
if expires_at != 0 && is_expired(Some(expires_at), now) {
if let Ok(Some(_)) = self.inner.engine.remove(DEFAULT_NAMESPACE_ID, &key) {
evicted += 1;
}
}
}
evicted
}
pub fn compact(&self) -> Result<()> {
self.inner.engine.compact_in_place()
}
#[cfg(feature = "ttl")]
fn compute_default_expires_at(&self) -> Result<u64> {
let now = now_unix_millis();
Ok(expires_from_ttl(Ttl::Default, self.inner.default_ttl, now)?.unwrap_or(0))
}
pub fn namespace(&self, name: impl AsRef<str>) -> Result<crate::namespace::Namespace> {
let name_ref = name.as_ref();
let ns_id = self.inner.engine.create_or_open_namespace(name_ref)?;
Ok(crate::namespace::Namespace::new(
Arc::clone(&self.inner),
ns_id,
name_ref.to_string().into_boxed_str(),
))
}
pub fn drop_namespace(&self, name: impl AsRef<str>) -> Result<bool> {
self.inner.engine.drop_namespace(name.as_ref())
}
pub fn list_namespaces(&self) -> Result<Vec<String>> {
let entries = self.inner.engine.list_namespaces()?;
Ok(entries.into_iter().map(|(_, name)| name).collect())
}
pub fn transaction<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut crate::transaction::Transaction<'_>) -> Result<T>,
{
let mut tx = crate::transaction::Transaction::new(self);
let out = f(&mut tx)?;
tx.commit()?;
Ok(out)
}
#[cfg(feature = "encrypt")]
pub fn enable_encryption(path: impl AsRef<Path>, target: EncryptionInput) -> Result<()> {
crate::encryption_admin::enable_encryption(path, target)
}
#[cfg(feature = "encrypt")]
pub fn disable_encryption(path: impl AsRef<Path>, current: EncryptionInput) -> Result<()> {
crate::encryption_admin::disable_encryption(path, current)
}
#[cfg(feature = "encrypt")]
pub fn rotate_encryption_key(
path: impl AsRef<Path>,
from: EncryptionInput,
to: EncryptionInput,
) -> Result<()> {
crate::encryption_admin::rotate_encryption_key(path, from, to)
}
}
impl Inner {
#[cfg(feature = "ttl")]
pub(crate) fn engine_expires_at(&self, ns_id: u32, key: &[u8]) -> Result<Option<u64>> {
Ok(self
.engine
.get_with_meta(ns_id, key)?
.map(|(_, expires_at)| expires_at))
}
}
pub struct EmdbIter {
inner: Arc<Inner>,
offsets: std::vec::IntoIter<u64>,
}
impl EmdbIter {
fn new(inner: Arc<Inner>, offsets: Vec<u64>) -> Self {
Self {
inner,
offsets: offsets.into_iter(),
}
}
}
impl Iterator for EmdbIter {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
for offset in self.offsets.by_ref() {
match self.inner.engine.decode_owned_at(offset) {
Ok(Some((key, value, _))) => return Some((key, value)),
Ok(None) => continue,
Err(_) => continue,
}
}
None
}
}
pub struct EmdbKeyIter {
inner: Arc<Inner>,
offsets: std::vec::IntoIter<u64>,
}
impl EmdbKeyIter {
fn new(inner: Arc<Inner>, offsets: Vec<u64>) -> Self {
Self {
inner,
offsets: offsets.into_iter(),
}
}
}
impl Iterator for EmdbKeyIter {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
for offset in self.offsets.by_ref() {
match self.inner.engine.decode_owned_at(offset) {
Ok(Some((key, _value, _))) => return Some(key),
Ok(None) => continue,
Err(_) => continue,
}
}
None
}
}
pub struct EmdbRangeIter {
inner: Arc<Inner>,
pairs: std::vec::IntoIter<(Vec<u8>, u64)>,
}
impl EmdbRangeIter {
fn new(inner: Arc<Inner>, pairs: Vec<(Vec<u8>, u64)>) -> Self {
Self {
inner,
pairs: pairs.into_iter(),
}
}
}
impl Iterator for EmdbRangeIter {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
for (key, offset) in self.pairs.by_ref() {
match self.inner.engine.read_value_with_meta_at(offset, &key) {
Ok(Some((value, _expires))) => return Some((key, value)),
Ok(None) => continue,
Err(_) => continue,
}
}
None
}
}
pub(crate) fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
let mut out = prefix.to_vec();
while let Some(byte) = out.last_mut() {
if *byte < u8::MAX {
*byte += 1;
return Some(out);
}
let _ = out.pop();
}
None
}