use std::collections::HashMap;
use std::ops::{Bound, RangeBounds};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crossbeam_skiplist::SkipMap;
use crossbeam_utils::CachePadded;
use memmap2::Mmap;
use parking_lot::RwLock;
use crate::storage::flush::FlushPolicy;
use crate::storage::format::{self, RecordView};
#[cfg(feature = "encrypt")]
use crate::storage::format::{OwnedRecord, NONCE_LEN};
use crate::storage::index::Index;
#[cfg(feature = "encrypt")]
use crate::storage::meta::FLAG_CIPHER_CHACHA20;
use crate::storage::meta::{self, MetaHeader, FLAG_ENCRYPTED};
use crate::storage::store::Store;
use crate::{Error, Result};
pub(crate) const DEFAULT_NAMESPACE_ID: u32 = 0;
struct NamespaceRuntime {
index: Index,
record_count: CachePadded<AtomicU64>,
range_index: Option<Arc<SkipMap<Vec<u8>, u64>>>,
}
impl NamespaceRuntime {
fn new(range_scans_enabled: bool) -> Self {
Self {
index: Index::new(),
record_count: CachePadded::new(AtomicU64::new(0)),
range_index: range_scans_enabled.then(|| Arc::new(SkipMap::new())),
}
}
}
impl std::fmt::Debug for NamespaceRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NamespaceRuntime")
.field("len", &self.record_count.load(Ordering::Acquire))
.finish()
}
}
#[cfg(feature = "encrypt")]
pub(crate) type SharedEncryption = Option<Arc<crate::encryption::EncryptionContext>>;
#[cfg(not(feature = "encrypt"))]
pub(crate) type SharedEncryption = ();
#[derive(Debug, Clone)]
pub(crate) struct EngineConfig {
pub(crate) path: PathBuf,
pub(crate) flags: u32,
pub(crate) enable_range_scans: bool,
pub(crate) flush_policy: FlushPolicy,
pub(crate) iouring_sqpoll_idle_ms: Option<u32>,
#[cfg(feature = "encrypt")]
pub(crate) encryption_key: Option<crate::encryption::KeyBytes>,
#[cfg(feature = "encrypt")]
pub(crate) cipher: Option<crate::encryption::Cipher>,
#[cfg(feature = "encrypt")]
pub(crate) encryption_passphrase: Option<String>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
path: PathBuf::new(),
flags: 0,
enable_range_scans: false,
flush_policy: FlushPolicy::default(),
iouring_sqpoll_idle_ms: None,
#[cfg(feature = "encrypt")]
encryption_key: None,
#[cfg(feature = "encrypt")]
cipher: None,
#[cfg(feature = "encrypt")]
encryption_passphrase: None,
}
}
}
pub(crate) struct Engine {
store: Arc<Store>,
namespaces: RwLock<HashMap<u32, Arc<NamespaceRuntime>>>,
namespace_names: RwLock<HashMap<String, u32>>,
next_namespace_id: AtomicU64,
range_scans_enabled: bool,
#[cfg(feature = "encrypt")]
encryption: SharedEncryption,
}
impl std::fmt::Debug for Engine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("store", &self.store)
.finish()
}
}
pub(crate) type RecordSnapshot = (Vec<u8>, Vec<u8>, u64);
#[cfg(feature = "encrypt")]
type ResolvedEncryption = (
Option<crate::encryption::KeyBytes>,
Option<[u8; meta::META_SALT_LEN]>,
Option<crate::encryption::Cipher>,
);
enum RecoveryAction {
Insert { ns_id: u32, key: Vec<u8> },
Remove { ns_id: u32, key: Vec<u8> },
NamespaceName { ns_id: u32, name: Vec<u8> },
}
impl Engine {
pub(crate) fn open(config: EngineConfig) -> Result<Self> {
#[cfg(feature = "encrypt")]
let (resolved_key, fresh_salt, resolved_cipher) = Self::resolve_encryption(&config)?;
#[cfg(feature = "encrypt")]
let flags = {
let mut f = config.flags;
if resolved_key.is_some() {
f |= FLAG_ENCRYPTED;
if let Some(crate::encryption::Cipher::ChaCha20Poly1305) = resolved_cipher {
f |= FLAG_CIPHER_CHACHA20;
}
}
f
};
#[cfg(not(feature = "encrypt"))]
let flags = config.flags;
let store = Arc::new(Store::open_with_policy(
config.path.clone(),
flags,
config.flush_policy,
config.iouring_sqpoll_idle_ms,
)?);
let header = store.header()?;
#[cfg(feature = "encrypt")]
let encryption: SharedEncryption = match resolved_key {
None => None,
Some(key) => {
let cipher = resolved_cipher
.or_else(|| Some(Self::cipher_from_flags(header.flags)))
.unwrap_or(crate::encryption::Cipher::Aes256Gcm);
let ctx = crate::encryption::EncryptionContext::from_key_with_cipher(&key, cipher);
let arc = Arc::new(ctx);
Self::handle_verification(&store, &arc, fresh_salt, &header)?;
Some(arc)
}
};
#[cfg(not(feature = "encrypt"))]
if header.flags & FLAG_ENCRYPTED != 0 {
return Err(Error::InvalidConfig(
"this database was created with encryption; rebuild with the `encrypt` feature",
));
}
let range_scans_enabled = config.enable_range_scans;
let engine = Self {
store,
namespaces: RwLock::new(HashMap::new()),
namespace_names: RwLock::new(HashMap::new()),
next_namespace_id: AtomicU64::new(1),
range_scans_enabled,
#[cfg(feature = "encrypt")]
encryption,
};
{
let mut guard = engine.namespaces.write();
let _existing = guard.insert(
DEFAULT_NAMESPACE_ID,
Arc::new(NamespaceRuntime::new(range_scans_enabled)),
);
}
engine.recovery_scan()?;
Ok(engine)
}
#[cfg(feature = "encrypt")]
fn resolve_encryption(config: &EngineConfig) -> Result<ResolvedEncryption> {
if config.encryption_key.is_some() && config.encryption_passphrase.is_some() {
return Err(Error::InvalidConfig(
"encryption_key and encryption_passphrase are mutually exclusive — pick one",
));
}
let peeked = peek_header(&config.path)?;
let on_disk_cipher = peeked.map(|h| Self::cipher_from_flags(h.flags));
let cipher = match (config.cipher, on_disk_cipher) {
(Some(requested), Some(disk)) if requested != disk => {
return Err(Error::InvalidConfig(
"EmdbBuilder::cipher disagrees with the cipher this database was created with",
));
}
(Some(requested), _) => Some(requested),
(None, Some(disk)) => Some(disk),
(None, None) => None,
};
if let Some(passphrase) = config.encryption_passphrase.as_ref() {
let (salt, fresh) = match peeked {
Some(header) => {
if header.encryption_salt == [0_u8; meta::META_SALT_LEN] {
return Err(Error::InvalidConfig(
"this database was created with a raw encryption_key; supply via encryption_key, not encryption_passphrase",
));
}
(header.encryption_salt, None)
}
None => {
let s = crate::encryption::random_salt();
(s, Some(s))
}
};
let derived = crate::encryption::derive_key_from_passphrase(passphrase, &salt)?;
return Ok((Some(derived), fresh, cipher));
}
if let Some(key) = config.encryption_key.as_ref() {
if let Some(header) = peeked {
if header.encryption_salt != [0_u8; meta::META_SALT_LEN] {
return Err(Error::InvalidConfig(
"this database was created with an encryption_passphrase; supply via encryption_passphrase, not encryption_key",
));
}
}
return Ok((Some(key.clone()), None, cipher));
}
if let Some(header) = peeked {
if header.flags & FLAG_ENCRYPTED != 0 {
return Err(Error::InvalidConfig(
"this database was created with at-rest encryption; supply encryption_key or encryption_passphrase",
));
}
}
Ok((None, None, None))
}
#[cfg(feature = "encrypt")]
fn cipher_from_flags(flags: u32) -> crate::encryption::Cipher {
if flags & FLAG_CIPHER_CHACHA20 != 0 {
crate::encryption::Cipher::ChaCha20Poly1305
} else {
crate::encryption::Cipher::Aes256Gcm
}
}
#[cfg(feature = "encrypt")]
fn handle_verification(
store: &Store,
ctx: &Arc<crate::encryption::EncryptionContext>,
fresh_salt: Option<[u8; meta::META_SALT_LEN]>,
existing_header: &MetaHeader,
) -> Result<()> {
if existing_header.encryption_verify == [0_u8; meta::META_VERIFY_LEN] {
let salt = fresh_salt.unwrap_or([0_u8; meta::META_SALT_LEN]);
let nonce_then_ct = ctx.encrypt(crate::encryption::VERIFICATION_PLAINTEXT)?;
debug_assert_eq!(nonce_then_ct.len(), meta::META_VERIFY_LEN);
let mut verify = [0_u8; meta::META_VERIFY_LEN];
verify.copy_from_slice(&nonce_then_ct);
store.set_encryption_metadata(salt, verify)?;
return Ok(());
}
let plaintext = ctx.decrypt(&existing_header.encryption_verify)?;
if plaintext.as_slice() != crate::encryption::VERIFICATION_PLAINTEXT {
return Err(Error::EncryptionKeyMismatch);
}
Ok(())
}
fn recovery_scan(&self) -> Result<()> {
let mut reader = self.store.open_reader()?;
let _ = reader.advise_sequential();
let iter = reader.iter();
for record_result in iter {
let record = record_result
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys reader: {err}"))))?;
let payload_start =
record.lsn.as_u64() + crate::storage::store::Store::pre_payload_bytes();
self.apply_recovered_payload(&record.payload, payload_start)?;
}
Ok(())
}
fn apply_recovered_payload(&self, payload: &[u8], payload_start: u64) -> Result<()> {
if payload.is_empty() {
return Err(Error::Corrupted {
offset: payload_start,
reason: "empty record payload during recovery",
});
}
let tag = payload[0];
let encrypted = (tag & format::TAG_ENCRYPTED_FLAG) != 0;
let action = if encrypted {
#[cfg(feature = "encrypt")]
{
let ctx = match self.encryption.as_ref() {
Some(c) => Arc::clone(c),
None => {
return Err(Error::InvalidConfig(
"encrypted record encountered while opening unencrypted database",
));
}
};
let owned = format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
match owned {
OwnedRecord::Insert { ns_id, key, .. } => RecoveryAction::Insert { ns_id, key },
OwnedRecord::Remove { ns_id, key } => RecoveryAction::Remove { ns_id, key },
OwnedRecord::NamespaceName { ns_id, name } => {
RecoveryAction::NamespaceName { ns_id, name }
}
}
}
#[cfg(not(feature = "encrypt"))]
{
let _ = payload_start;
return Err(Error::InvalidConfig(
"encrypted record present but the `encrypt` feature is not compiled in",
));
}
} else {
match format::decode_payload(payload)? {
RecordView::Insert { ns_id, key, .. } => RecoveryAction::Insert {
ns_id,
key: key.to_vec(),
},
RecordView::Remove { ns_id, key } => RecoveryAction::Remove {
ns_id,
key: key.to_vec(),
},
RecordView::NamespaceName { ns_id, name } => RecoveryAction::NamespaceName {
ns_id,
name: name.to_vec(),
},
}
};
self.apply_recovered_action(action, payload_start)
}
fn apply_recovered_action(&self, action: RecoveryAction, offset: u64) -> Result<()> {
match action {
RecoveryAction::Insert { ns_id, key } => {
let ns = self.ensure_namespace_runtime(ns_id)?;
let key_hash = Index::hash_key(&key);
let prev = ns
.index
.replace(key_hash, &key, offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range_map) = ns.range_index.as_ref() {
let _ = range_map.insert(key, offset);
}
}
RecoveryAction::Remove { ns_id, key } => {
let ns = self.ensure_namespace_runtime(ns_id)?;
let key_hash = Index::hash_key(&key);
if ns.index.remove(key_hash, &key)?.is_some() {
let _ = ns.record_count.fetch_sub(1, Ordering::AcqRel);
}
if let Some(range_map) = ns.range_index.as_ref() {
let _ = range_map.remove(&key);
}
}
RecoveryAction::NamespaceName { ns_id, name } => {
if ns_id == DEFAULT_NAMESPACE_ID || name.is_empty() {
return Ok(());
}
let name_str = match std::str::from_utf8(&name) {
Ok(s) => s.to_string(),
Err(_) => {
return Err(Error::Corrupted {
offset,
reason: "namespace-name record carried non-UTF-8 name",
});
}
};
let _ = self.ensure_namespace_runtime(ns_id)?;
let mut name_guard = self.namespace_names.write();
let _existing = name_guard.insert(name_str, ns_id);
drop(name_guard);
if ns_id as u64 >= self.next_namespace_id.load(Ordering::Acquire) {
self.next_namespace_id
.store(ns_id as u64 + 1, Ordering::Release);
}
}
}
Ok(())
}
fn key_at_offset(&self, offset: u64) -> Result<Option<Vec<u8>>> {
let mmap = self.store.mmap_covering(offset + 1)?;
let bytes: &[u8] = &mmap;
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => return Ok(None),
};
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
let owned = format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
return Ok(match owned {
OwnedRecord::Insert { key, .. } => Some(key),
_ => None,
});
}
Ok(match format::decode_payload(payload)? {
RecordView::Insert { key, .. } => Some(key.to_vec()),
_ => None,
})
}
fn ensure_namespace_runtime(&self, ns_id: u32) -> Result<Arc<NamespaceRuntime>> {
{
let guard = self.namespaces.read();
if let Some(ns) = guard.get(&ns_id) {
return Ok(Arc::clone(ns));
}
}
let mut guard = self.namespaces.write();
let range_scans = self.range_scans_enabled;
let entry = guard
.entry(ns_id)
.or_insert_with(|| Arc::new(NamespaceRuntime::new(range_scans)));
if ns_id as u64 >= self.next_namespace_id.load(Ordering::Acquire) {
self.next_namespace_id
.store(ns_id as u64 + 1, Ordering::Release);
}
Ok(Arc::clone(entry))
}
fn namespace(&self, ns_id: u32) -> Result<Arc<NamespaceRuntime>> {
self.namespaces
.read()
.get(&ns_id)
.map(Arc::clone)
.ok_or(Error::InvalidConfig("unknown namespace id"))
}
pub(crate) fn insert(
&self,
ns_id: u32,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<()> {
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
let offset = self.append_insert(ns_id, key, value, expires_at)?;
let prev = ns
.index
.replace(key_hash, key, offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range_map) = ns.range_index.as_ref() {
let _ = range_map.insert(key.to_vec(), offset);
}
Ok(())
}
pub(crate) fn insert_many(
&self,
ns_id: u32,
items: impl IntoIterator<Item = (Vec<u8>, Vec<u8>, u64)>,
) -> Result<()> {
let ns = self.namespace(ns_id)?;
let items: Vec<(Vec<u8>, Vec<u8>, u64)> = items.into_iter().collect();
if items.is_empty() {
return Ok(());
}
#[cfg(feature = "encrypt")]
let encryption = self.encryption.clone();
let mut payloads: Vec<Vec<u8>> = Vec::with_capacity(items.len());
for (key, value, expires_at) in &items {
let payload: Vec<u8> = {
#[cfg(feature = "encrypt")]
{
if let Some(ctx) = encryption.as_ref() {
let mut plain = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(&mut plain, ns_id, key, value, *expires_at);
let nonce_then_ct = ctx.encrypt(&plain)?;
let mut frame = Vec::with_capacity(1 + nonce_then_ct.len());
frame.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
frame.extend_from_slice(&nonce_then_ct);
frame
} else {
let mut frame = Vec::with_capacity(1 + 20 + key.len() + value.len());
frame.push(format::TAG_INSERT);
format::encode_insert_body(&mut frame, ns_id, key, value, *expires_at);
frame
}
}
#[cfg(not(feature = "encrypt"))]
{
let mut frame = Vec::with_capacity(1 + 20 + key.len() + value.len());
frame.push(format::TAG_INSERT);
format::encode_insert_body(&mut frame, ns_id, key, value, *expires_at);
frame
}
};
payloads.push(payload);
}
let offsets = self
.store
.append_batch(payloads.iter().map(Vec::as_slice))?;
let range_map = ns.range_index.as_ref();
for ((key, _value, _exp), offset) in items.iter().zip(offsets.iter()) {
let key_hash = Index::hash_key(key);
let prev = ns
.index
.replace(key_hash, key, *offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range_map) = range_map {
let _ = range_map.insert(key.clone(), *offset);
}
}
Ok(())
}
fn append_insert(&self, ns_id: u32, key: &[u8], value: &[u8], expires_at: u64) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(&mut payload, ns_id, key, value, expires_at);
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, ns_id, key, value, expires_at);
Ok(())
})
}
fn append_remove(&self, ns_id: u32, key: &[u8]) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(8 + key.len());
format::encode_remove_body(&mut payload, ns_id, key);
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_REMOVE | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_REMOVE);
format::encode_remove_body(buf, ns_id, key);
Ok(())
})
}
pub(crate) fn get(&self, ns_id: u32, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.get_with_meta(ns_id, key)?.map(|(v, _)| v))
}
pub(crate) fn get_zerocopy(
&self,
ns_id: u32,
key: &[u8],
) -> Result<Option<(crate::ValueRef, u64)>> {
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
let offset = match ns.index.get(key_hash, key)? {
Some(o) => o,
None => return Ok(None),
};
self.read_zerocopy_at(offset, key)
}
fn read_zerocopy_at(
&self,
offset: u64,
expected_key: &[u8],
) -> Result<Option<(crate::ValueRef, u64)>> {
let mmap = self.store.mmap_covering(offset + 1)?;
let bytes: &[u8] = &mmap;
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => return Ok(None),
};
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
let owned = format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
return Ok(match owned {
OwnedRecord::Insert {
key,
value,
expires_at,
..
} => {
if key.as_slice() == expected_key {
Some((crate::ValueRef::from_owned(value), expires_at))
} else {
None
}
}
_ => None,
});
}
let (value_range, expires_at) = match format::decode_payload(payload)? {
RecordView::Insert {
key,
value,
expires_at,
..
} => {
if key != expected_key {
return Ok(None);
}
let base = bytes.as_ptr() as usize;
let val_start = value.as_ptr() as usize - base;
let val_end = val_start + value.len();
(val_start..val_end, expires_at)
}
_ => return Ok(None),
};
Ok(Some((
crate::ValueRef::from_mmap(mmap, value_range),
expires_at,
)))
}
pub(crate) fn get_with_meta(&self, ns_id: u32, key: &[u8]) -> Result<Option<(Vec<u8>, u64)>> {
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
let offset = match ns.index.get(key_hash, key)? {
Some(o) => o,
None => return Ok(None),
};
self.read_value_at(offset, key)
}
fn read_value_at(&self, offset: u64, expected_key: &[u8]) -> Result<Option<(Vec<u8>, u64)>> {
let mmap = self.store.mmap_covering(offset + 1)?;
let bytes: &[u8] = &mmap;
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => return Ok(None),
};
let ctx = Arc::clone(ctx);
let owned = format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
return Ok(match owned {
OwnedRecord::Insert {
key,
value,
expires_at,
..
} => {
if key.as_slice() == expected_key {
Some((value, expires_at))
} else {
None
}
}
_ => None,
});
}
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => return Ok(None),
};
Ok(match format::decode_payload(payload)? {
RecordView::Insert {
key,
value,
expires_at,
..
} => {
if key == expected_key {
Some((value.to_vec(), expires_at))
} else {
None
}
}
_ => None,
})
}
pub(crate) fn remove(&self, ns_id: u32, key: &[u8]) -> Result<Option<Vec<u8>>> {
let prev = self.get(ns_id, key)?;
if prev.is_some() {
let _offset = self.append_remove(ns_id, key)?;
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
if ns.index.remove(key_hash, key)?.is_some() {
let _ = ns.record_count.fetch_sub(1, Ordering::AcqRel);
}
if let Some(range_map) = ns.range_index.as_ref() {
let _ = range_map.remove(key);
}
}
Ok(prev)
}
pub(crate) fn record_count(&self, ns_id: u32) -> Result<u64> {
let ns = self.namespace(ns_id)?;
Ok(ns.record_count.load(Ordering::Acquire))
}
pub(crate) fn flush(&self) -> Result<()> {
self.store.flush()
}
pub(crate) fn checkpoint(&self) -> Result<()> {
self.store.persist_meta()
}
pub(crate) fn stats(&self) -> Result<crate::EmdbStats> {
let mut live_records: u64 = 0;
let mut named_namespace_count: usize = 0;
{
let guard = self.namespaces.read();
for (ns_id, ns) in guard.iter() {
live_records = live_records.saturating_add(ns.record_count.load(Ordering::Acquire));
if *ns_id != DEFAULT_NAMESPACE_ID {
named_namespace_count += 1;
}
}
}
let logical_size_bytes = self.store.tail();
let file_size_bytes = std::fs::metadata(self.store.path())
.map(|m| m.len())
.unwrap_or(logical_size_bytes);
let preallocated_bytes = file_size_bytes.saturating_sub(logical_size_bytes);
let header = self.store.header()?;
let encrypted = (header.flags & meta::FLAG_ENCRYPTED) != 0;
Ok(crate::EmdbStats {
live_records,
namespace_count: named_namespace_count,
logical_size_bytes,
file_size_bytes,
preallocated_bytes,
range_scans_enabled: self.range_scans_enabled,
encrypted,
})
}
pub(crate) fn compact_in_place(&self) -> Result<()> {
let namespaces: Vec<(u32, String)> = self.list_namespaces()?;
let mut snapshots: Vec<(u32, String, Vec<RecordSnapshot>)> =
Vec::with_capacity(namespaces.len());
for (ns_id, name) in &namespaces {
let records = self.collect_records(*ns_id)?;
snapshots.push((*ns_id, name.clone(), records));
}
let path = self.store.path().to_path_buf();
let tmp_path = compaction_temp_path(&path);
let _ = std::fs::remove_file(&tmp_path);
let header = self.store.header()?;
if let Err(err) = self.write_compacted_file(&tmp_path, &header, &snapshots) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err);
}
if let Err(err) = self.store.swap_underlying(&tmp_path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err);
}
for (ns_id, _) in &namespaces {
let ns = self.namespace(*ns_id)?;
ns.index.clear()?;
ns.record_count.store(0, Ordering::Release);
if let Some(range_map) = ns.range_index.as_ref() {
clear_skipmap(range_map);
}
}
self.recovery_scan()?;
Ok(())
}
pub(crate) fn backup_to(&self, target: &std::path::Path) -> Result<()> {
let source_path = self.store.path().to_path_buf();
let target_canonical = match target.canonicalize() {
Ok(p) => p,
Err(_) => target.to_path_buf(),
};
if let Ok(source_canonical) = source_path.canonicalize() {
if target_canonical == source_canonical || target == source_path {
return Err(Error::InvalidConfig(
"backup target must differ from the source database path",
));
}
} else if target == source_path {
return Err(Error::InvalidConfig(
"backup target must differ from the source database path",
));
}
let namespaces: Vec<(u32, String)> = self.list_namespaces()?;
let mut snapshots: Vec<(u32, String, Vec<RecordSnapshot>)> =
Vec::with_capacity(namespaces.len());
for (ns_id, name) in &namespaces {
let records = self.collect_records(*ns_id)?;
snapshots.push((*ns_id, name.clone(), records));
}
let mut tmp_path = target.to_path_buf();
let original_name = target
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("emdb-backup");
tmp_path.set_file_name(format!("{original_name}.backup.tmp"));
let _ = std::fs::remove_file(&tmp_path);
let header = self.store.header()?;
if let Err(err) = self.write_compacted_file(&tmp_path, &header, &snapshots) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err);
}
if target.exists() {
std::fs::remove_file(target)?;
}
if let Err(err) = std::fs::rename(&tmp_path, target) {
let _ = std::fs::remove_file(&tmp_path);
return Err(Error::from(err));
}
Ok(())
}
fn write_compacted_file(
&self,
path: &std::path::Path,
header_template: &MetaHeader,
snapshots: &[(u32, String, Vec<RecordSnapshot>)],
) -> Result<()> {
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_file(meta::meta_path_for(path));
let fs = fsys::builder()
.tune_for(fsys::Workload::Database)
.build()
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys init: {err}"))))?;
meta::write_with(&fs, path, header_template)?;
let journal_opts =
fsys::JournalOptions::new().write_lifetime_hint(Some(fsys::WriteLifetimeHint::Long));
let journal = fs
.journal_with(path, journal_opts)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys journal: {err}"))))?;
let mut payloads: Vec<Vec<u8>> = Vec::new();
for (ns_id, name, _) in snapshots {
if *ns_id == DEFAULT_NAMESPACE_ID || name.is_empty() {
continue;
}
payloads.push(self.encode_namespace_name_payload(*ns_id, name.as_bytes())?);
}
for (ns_id, _, records) in snapshots {
for (key, value, expires_at) in records {
payloads.push(self.encode_insert_payload(*ns_id, key, value, *expires_at)?);
}
}
if !payloads.is_empty() {
let refs: Vec<&[u8]> = payloads.iter().map(Vec::as_slice).collect();
let _ = journal.append_batch(&refs).map_err(|err| {
Error::Io(std::io::Error::other(format!("fsys append_batch: {err}")))
})?;
}
let target = journal.next_lsn();
journal
.sync_through(target)
.map_err(|err| Error::Io(std::io::Error::other(format!("fsys sync: {err}"))))?;
Ok(())
}
fn encode_namespace_name_payload(&self, ns_id: u32, name: &[u8]) -> Result<Vec<u8>> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut body = Vec::with_capacity(8 + name.len());
format::encode_namespace_name_body(&mut body, ns_id, name);
let nonce_then_ct = ctx.encrypt(&body)?;
let mut payload = Vec::with_capacity(1 + nonce_then_ct.len());
payload.push(format::TAG_NAMESPACE_NAME | format::TAG_ENCRYPTED_FLAG);
payload.extend_from_slice(&nonce_then_ct);
return Ok(payload);
}
let mut payload = Vec::with_capacity(1 + 8 + name.len());
payload.push(format::TAG_NAMESPACE_NAME);
format::encode_namespace_name_body(&mut payload, ns_id, name);
Ok(payload)
}
fn encode_insert_payload(
&self,
ns_id: u32,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<Vec<u8>> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut body = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(&mut body, ns_id, key, value, expires_at);
let nonce_then_ct = ctx.encrypt(&body)?;
let mut payload = Vec::with_capacity(1 + nonce_then_ct.len());
payload.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
payload.extend_from_slice(&nonce_then_ct);
return Ok(payload);
}
let mut payload = Vec::with_capacity(1 + 20 + key.len() + value.len());
payload.push(format::TAG_INSERT);
format::encode_insert_body(&mut payload, ns_id, key, value, expires_at);
Ok(payload)
}
pub(crate) fn path(&self) -> &std::path::Path {
self.store.path()
}
pub(crate) fn clear_namespace(&self, ns_id: u32) -> Result<()> {
let ns = self.namespace(ns_id)?;
ns.index.clear()?;
ns.record_count.store(0, Ordering::Release);
if let Some(range_map) = ns.range_index.as_ref() {
clear_skipmap(range_map);
}
Ok(())
}
pub(crate) fn range_scan<R>(&self, ns_id: u32, range: R) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>>,
{
let ns = self.namespace(ns_id)?;
let range_map = ns.range_index.as_ref().ok_or(Error::InvalidConfig(
"range scans not enabled; pass `EmdbBuilder::enable_range_scans(true)` at open time",
))?;
let pairs: Vec<(Vec<u8>, u64)> = skipmap_range_snapshot(range_map, &range);
let mut out = Vec::with_capacity(pairs.len());
for (key, offset) in pairs {
if let Some((value, _expires)) = self.read_value_at(offset, &key)? {
out.push((key, value));
}
}
Ok(out)
}
pub(crate) fn snapshot_offsets(&self, ns_id: u32) -> Result<Vec<u64>> {
let ns = self.namespace(ns_id)?;
let mut offsets = ns.index.collect_offsets()?;
offsets.sort_unstable();
Ok(offsets)
}
pub(crate) fn snapshot_range_offsets<R>(
&self,
ns_id: u32,
range: R,
) -> Result<Vec<(Vec<u8>, u64)>>
where
R: RangeBounds<Vec<u8>>,
{
let ns = self.namespace(ns_id)?;
let range_map = ns.range_index.as_ref().ok_or(Error::InvalidConfig(
"range scans not enabled; pass `EmdbBuilder::enable_range_scans(true)` at open time",
))?;
Ok(skipmap_range_snapshot(range_map, &range))
}
pub(crate) fn decode_owned_at(&self, offset: u64) -> Result<Option<RecordSnapshot>> {
let mmap = self.store.mmap_covering(offset + 1)?;
let bytes: &[u8] = &mmap;
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => return Ok(None),
};
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
let owned = format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
return Ok(match owned {
OwnedRecord::Insert {
key,
value,
expires_at,
..
} => Some((key, value, expires_at)),
_ => None,
});
}
Self::decode_plaintext_into_triple(payload)
}
pub(crate) fn read_value_with_meta_at(
&self,
offset: u64,
expected_key: &[u8],
) -> Result<Option<(Vec<u8>, u64)>> {
self.read_value_at(offset, expected_key)
}
pub(crate) fn collect_records(&self, ns_id: u32) -> Result<Vec<RecordSnapshot>> {
let ns = self.namespace(ns_id)?;
let mut offsets = ns.index.collect_offsets()?;
offsets.sort_unstable();
let mut out = Vec::with_capacity(offsets.len());
let mmap = self.store.mmap_covering(self.store.tail())?;
let bytes: &[u8] = &mmap;
for offset in offsets {
let payload = match format::payload_at(bytes, offset as usize) {
Ok(p) => p,
Err(_) => continue,
};
#[cfg(feature = "encrypt")]
let triple = if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
match format::decode_payload_encrypted(payload, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})? {
OwnedRecord::Insert {
key,
value,
expires_at,
..
} => Some((key, value, expires_at)),
_ => None,
}
} else {
Self::decode_plaintext_into_triple(payload)?
};
#[cfg(not(feature = "encrypt"))]
let triple = Self::decode_plaintext_into_triple(payload)?;
if let Some(t) = triple {
out.push(t);
}
}
Ok(out)
}
fn decode_plaintext_into_triple(payload: &[u8]) -> Result<Option<RecordSnapshot>> {
Ok(match format::decode_payload(payload)? {
RecordView::Insert {
key,
value,
expires_at,
..
} => Some((key.to_vec(), value.to_vec(), expires_at)),
_ => None,
})
}
pub(crate) fn create_or_open_namespace(&self, name: &str) -> Result<u32> {
if name.is_empty() {
return Err(Error::InvalidConfig(
"namespace name must be non-empty (default namespace is implicit)",
));
}
{
let guard = self.namespace_names.read();
if let Some(id) = guard.get(name) {
return Ok(*id);
}
}
let mut name_guard = self.namespace_names.write();
if let Some(id) = name_guard.get(name) {
return Ok(*id);
}
let id = self.next_namespace_id.fetch_add(1, Ordering::AcqRel) as u32;
let _record_offset = self.append_namespace_name(id, name)?;
let _ = name_guard.insert(name.to_string(), id);
let mut runtimes = self.namespaces.write();
let _ = runtimes.insert(
id,
Arc::new(NamespaceRuntime::new(self.range_scans_enabled)),
);
Ok(id)
}
fn append_namespace_name(&self, ns_id: u32, name: &str) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(8 + name.len());
format::encode_namespace_name_body(&mut payload, ns_id, name.as_bytes());
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_NAMESPACE_NAME | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_NAMESPACE_NAME);
format::encode_namespace_name_body(buf, ns_id, name.as_bytes());
Ok(())
})
}
pub(crate) fn drop_namespace(&self, name: &str) -> Result<bool> {
if name.is_empty() {
return Err(Error::InvalidConfig("default namespace cannot be dropped"));
}
let mut name_guard = self.namespace_names.write();
let id = match name_guard.remove(name) {
Some(id) => id,
None => return Ok(false),
};
let mut runtimes = self.namespaces.write();
let _ = runtimes.remove(&id);
Ok(true)
}
pub(crate) fn list_namespaces(&self) -> Result<Vec<(u32, String)>> {
let guard = self.namespace_names.read();
let mut out: Vec<(u32, String)> = vec![(DEFAULT_NAMESPACE_ID, String::new())];
for (name, id) in guard.iter() {
out.push((*id, name.clone()));
}
out.sort_by_key(|(id, _)| *id);
Ok(out)
}
}
fn skipmap_range_snapshot<R>(map: &SkipMap<Vec<u8>, u64>, bounds: &R) -> Vec<(Vec<u8>, u64)>
where
R: RangeBounds<Vec<u8>>,
{
let start = match bounds.start_bound() {
Bound::Included(v) => Bound::Included(v.as_slice()),
Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
Bound::Unbounded => Bound::Unbounded,
};
let end = match bounds.end_bound() {
Bound::Included(v) => Bound::Included(v.as_slice()),
Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
Bound::Unbounded => Bound::Unbounded,
};
map.range::<[u8], _>((start, end))
.map(|entry| (entry.key().clone(), *entry.value()))
.collect()
}
fn clear_skipmap(map: &SkipMap<Vec<u8>, u64>) {
let keys: Vec<Vec<u8>> = map.iter().map(|entry| entry.key().clone()).collect();
for key in keys {
let _ = map.remove(&key);
}
}
fn peek_header(path: &std::path::Path) -> Result<Option<MetaHeader>> {
meta::read(path)
}
fn compaction_temp_path(path: &std::path::Path) -> std::path::PathBuf {
let mut out = path.to_path_buf();
let original_name = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("emdb");
out.set_file_name(format!("{original_name}.compact.tmp"));
out
}
#[allow(dead_code)]
fn _mmap_type_anchor() -> Option<Arc<Mmap>> {
None
}