use std::collections::{BTreeMap, HashMap};
use std::ops::{Bound, RangeBounds};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use memmap2::Mmap;
#[cfg(feature = "encrypt")]
use crate::storage::format::FLAG_CIPHER_CHACHA20;
use crate::storage::format::{self, RecordView, FLAG_ENCRYPTED};
#[cfg(feature = "encrypt")]
use crate::storage::format::{OwnedRecord, NONCE_LEN};
use crate::storage::index::Index;
use crate::storage::store::{Header, Store};
use crate::{Error, Result};
pub(crate) const DEFAULT_NAMESPACE_ID: u32 = 0;
struct NamespaceRuntime {
index: Index,
record_count: AtomicU64,
range_index: Option<RwLock<BTreeMap<Vec<u8>, u64>>>,
}
impl NamespaceRuntime {
fn new(range_scans_enabled: bool) -> Self {
Self {
index: Index::new(),
record_count: AtomicU64::new(0),
range_index: range_scans_enabled.then(|| RwLock::new(BTreeMap::new())),
}
}
}
impl std::fmt::Debug for NamespaceRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NamespaceRuntime")
.field("len", &self.record_count.load(Ordering::Acquire))
.finish()
}
}
#[cfg(feature = "encrypt")]
pub(crate) type SharedEncryption = Option<Arc<crate::encryption::EncryptionContext>>;
#[cfg(not(feature = "encrypt"))]
pub(crate) type SharedEncryption = ();
#[derive(Debug, Clone)]
pub(crate) struct EngineConfig {
pub(crate) path: PathBuf,
pub(crate) flags: u32,
pub(crate) enable_range_scans: bool,
#[cfg(feature = "encrypt")]
pub(crate) encryption_key: Option<[u8; 32]>,
#[cfg(feature = "encrypt")]
pub(crate) cipher: Option<crate::encryption::Cipher>,
#[cfg(feature = "encrypt")]
pub(crate) encryption_passphrase: Option<String>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
path: PathBuf::new(),
flags: 0,
enable_range_scans: false,
#[cfg(feature = "encrypt")]
encryption_key: None,
#[cfg(feature = "encrypt")]
cipher: None,
#[cfg(feature = "encrypt")]
encryption_passphrase: None,
}
}
}
pub(crate) struct Engine {
store: Arc<Store>,
namespaces: RwLock<HashMap<u32, Arc<NamespaceRuntime>>>,
namespace_names: RwLock<HashMap<String, u32>>,
next_namespace_id: AtomicU64,
range_scans_enabled: bool,
#[cfg(feature = "encrypt")]
encryption: SharedEncryption,
}
impl std::fmt::Debug for Engine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("store", &self.store)
.finish()
}
}
pub(crate) type RecordSnapshot = (Vec<u8>, Vec<u8>, u64);
#[cfg(feature = "encrypt")]
type ResolvedEncryption = (
Option<[u8; 32]>,
Option<[u8; format::ENCRYPTION_SALT_LEN]>,
Option<crate::encryption::Cipher>,
);
enum RecoveryAction {
Insert { ns_id: u32, key: Vec<u8> },
Remove { ns_id: u32, key: Vec<u8> },
NamespaceName { ns_id: u32, name: Vec<u8> },
}
type RecoveryDecoded = (RecoveryAction, u64);
impl Engine {
pub(crate) fn open(config: EngineConfig) -> Result<Self> {
#[cfg(feature = "encrypt")]
let (resolved_key, fresh_salt, resolved_cipher) = Self::resolve_encryption(&config)?;
#[cfg(feature = "encrypt")]
let flags = {
let mut f = config.flags;
if resolved_key.is_some() {
f |= FLAG_ENCRYPTED;
if let Some(crate::encryption::Cipher::ChaCha20Poly1305) = resolved_cipher {
f |= FLAG_CIPHER_CHACHA20;
}
}
f
};
#[cfg(not(feature = "encrypt"))]
let flags = config.flags;
let store = Arc::new(Store::open(config.path.clone(), flags)?);
let header = store.header()?;
#[cfg(feature = "encrypt")]
let encryption: SharedEncryption = match resolved_key {
None => None,
Some(key) => {
let cipher = resolved_cipher
.or_else(|| Some(Self::cipher_from_flags(header.flags)))
.unwrap_or(crate::encryption::Cipher::Aes256Gcm);
let ctx = crate::encryption::EncryptionContext::from_key_with_cipher(&key, cipher);
let arc = Arc::new(ctx);
Self::handle_verification(&store, &arc, fresh_salt, &header)?;
Some(arc)
}
};
#[cfg(not(feature = "encrypt"))]
if header.flags & FLAG_ENCRYPTED != 0 {
return Err(Error::InvalidConfig(
"this database was created with encryption; rebuild with the `encrypt` feature",
));
}
let range_scans_enabled = config.enable_range_scans;
let engine = Self {
store,
namespaces: RwLock::new(HashMap::new()),
namespace_names: RwLock::new(HashMap::new()),
next_namespace_id: AtomicU64::new(1),
range_scans_enabled,
#[cfg(feature = "encrypt")]
encryption,
};
{
let mut guard = engine.namespaces.write().map_err(|_| Error::LockPoisoned)?;
let _existing = guard.insert(
DEFAULT_NAMESPACE_ID,
Arc::new(NamespaceRuntime::new(range_scans_enabled)),
);
}
engine.recovery_scan()?;
Ok(engine)
}
#[cfg(feature = "encrypt")]
fn resolve_encryption(config: &EngineConfig) -> Result<ResolvedEncryption> {
if config.encryption_key.is_some() && config.encryption_passphrase.is_some() {
return Err(Error::InvalidConfig(
"encryption_key and encryption_passphrase are mutually exclusive — pick one",
));
}
let peeked = peek_header(&config.path)?;
let on_disk_cipher = peeked.map(|h| Self::cipher_from_flags(h.flags));
let cipher = match (config.cipher, on_disk_cipher) {
(Some(requested), Some(disk)) if requested != disk => {
return Err(Error::InvalidConfig(
"EmdbBuilder::cipher disagrees with the cipher this database was created with",
));
}
(Some(requested), _) => Some(requested),
(None, Some(disk)) => Some(disk),
(None, None) => None,
};
if let Some(passphrase) = config.encryption_passphrase.as_ref() {
let (salt, fresh) = match peeked {
Some(header) => {
if header.encryption_salt == [0_u8; format::ENCRYPTION_SALT_LEN] {
return Err(Error::InvalidConfig(
"this database was created with a raw encryption_key; supply via encryption_key, not encryption_passphrase",
));
}
(header.encryption_salt, None)
}
None => {
let s = crate::encryption::random_salt();
(s, Some(s))
}
};
let derived = crate::encryption::derive_key_from_passphrase(passphrase, &salt)?;
return Ok((Some(derived), fresh, cipher));
}
if let Some(key) = config.encryption_key {
if let Some(header) = peeked {
if header.encryption_salt != [0_u8; format::ENCRYPTION_SALT_LEN] {
return Err(Error::InvalidConfig(
"this database was created with an encryption_passphrase; supply via encryption_passphrase, not encryption_key",
));
}
}
return Ok((Some(key), None, cipher));
}
if let Some(header) = peeked {
if header.flags & FLAG_ENCRYPTED != 0 {
return Err(Error::InvalidConfig(
"this database was created with at-rest encryption; supply encryption_key or encryption_passphrase",
));
}
}
Ok((None, None, None))
}
#[cfg(feature = "encrypt")]
fn cipher_from_flags(flags: u32) -> crate::encryption::Cipher {
if flags & FLAG_CIPHER_CHACHA20 != 0 {
crate::encryption::Cipher::ChaCha20Poly1305
} else {
crate::encryption::Cipher::Aes256Gcm
}
}
#[cfg(feature = "encrypt")]
fn handle_verification(
store: &Store,
ctx: &Arc<crate::encryption::EncryptionContext>,
fresh_salt: Option<[u8; format::ENCRYPTION_SALT_LEN]>,
existing_header: &Header,
) -> Result<()> {
if existing_header.encryption_verify == [0_u8; format::ENCRYPTION_VERIFY_LEN] {
let salt = fresh_salt.unwrap_or([0_u8; format::ENCRYPTION_SALT_LEN]);
let nonce_then_ct = ctx.encrypt(format::VERIFICATION_PLAINTEXT)?;
debug_assert_eq!(nonce_then_ct.len(), format::ENCRYPTION_VERIFY_LEN);
let mut verify = [0_u8; format::ENCRYPTION_VERIFY_LEN];
verify.copy_from_slice(&nonce_then_ct);
store.set_encryption_metadata(salt, verify)?;
return Ok(());
}
let plaintext = ctx.decrypt(&existing_header.encryption_verify)?;
if plaintext.as_slice() != format::VERIFICATION_PLAINTEXT {
return Err(Error::EncryptionKeyMismatch);
}
Ok(())
}
fn recovery_scan(&self) -> Result<()> {
let mmap = self.store.mmap()?;
let mut cursor = format::HEADER_LEN as u64;
let tail_hint = self.store.tail();
let bytes: &[u8] = &mmap;
loop {
if cursor as usize >= bytes.len() {
break;
}
#[cfg(feature = "encrypt")]
let encrypted = self.encryption.is_some();
#[cfg(not(feature = "encrypt"))]
let encrypted = false;
let result = if encrypted {
self.decode_encrypted_at(bytes, cursor)?
} else {
self.decode_plaintext_at(bytes, cursor)?
};
match result {
Some((action, next)) => {
self.apply_recovered_action(action, cursor)?;
cursor = next;
}
None => break,
}
if cursor > tail_hint && cursor >= bytes.len() as u64 {
break;
}
}
self.store.set_tail_after_recovery(cursor)?;
Ok(())
}
fn decode_plaintext_at(&self, bytes: &[u8], cursor: u64) -> Result<Option<RecoveryDecoded>> {
let start = cursor as usize;
let decoded = format::try_decode_record(bytes, start, cursor)?;
match decoded {
None => Ok(None),
Some(d) => {
let action = match d.view {
RecordView::Insert { ns_id, key, .. } => RecoveryAction::Insert {
ns_id,
key: key.to_vec(),
},
RecordView::Remove { ns_id, key } => RecoveryAction::Remove {
ns_id,
key: key.to_vec(),
},
RecordView::NamespaceName { ns_id, name } => RecoveryAction::NamespaceName {
ns_id,
name: name.to_vec(),
},
};
Ok(Some((action, d.next_offset)))
}
}
}
#[cfg(feature = "encrypt")]
fn decode_encrypted_at(&self, bytes: &[u8], cursor: u64) -> Result<Option<RecoveryDecoded>> {
let ctx = match self.encryption.as_ref() {
Some(c) => Arc::clone(c),
None => {
return self.decode_plaintext_at(bytes, cursor);
}
};
let start = cursor as usize;
let result = format::try_decode_encrypted_record(bytes, start, cursor, |nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
})?;
match result {
None => Ok(None),
Some((owned, next)) => {
let action = match owned {
OwnedRecord::Insert { ns_id, key, .. } => RecoveryAction::Insert { ns_id, key },
OwnedRecord::Remove { ns_id, key } => RecoveryAction::Remove { ns_id, key },
OwnedRecord::NamespaceName { ns_id, name } => {
RecoveryAction::NamespaceName { ns_id, name }
}
};
Ok(Some((action, next)))
}
}
}
#[cfg(not(feature = "encrypt"))]
fn decode_encrypted_at(&self, bytes: &[u8], cursor: u64) -> Result<Option<RecoveryDecoded>> {
self.decode_plaintext_at(bytes, cursor)
}
fn apply_recovered_action(&self, action: RecoveryAction, offset: u64) -> Result<()> {
match action {
RecoveryAction::Insert { ns_id, key } => {
let ns = self.ensure_namespace_runtime(ns_id)?;
let key_hash = Index::hash_key(&key);
let prev = ns
.index
.replace(key_hash, &key, offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
let _ = range.insert(key, offset);
}
}
RecoveryAction::Remove { ns_id, key } => {
let ns = self.ensure_namespace_runtime(ns_id)?;
let key_hash = Index::hash_key(&key);
if ns.index.remove(key_hash, &key)?.is_some() {
let _ = ns.record_count.fetch_sub(1, Ordering::AcqRel);
}
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
let _ = range.remove(&key);
}
}
RecoveryAction::NamespaceName { ns_id, name } => {
if ns_id == DEFAULT_NAMESPACE_ID || name.is_empty() {
return Ok(());
}
let name_str = match std::str::from_utf8(&name) {
Ok(s) => s.to_string(),
Err(_) => {
return Err(Error::Corrupted {
offset,
reason: "namespace-name record carried non-UTF-8 name",
});
}
};
let _ = self.ensure_namespace_runtime(ns_id)?;
let mut name_guard = self
.namespace_names
.write()
.map_err(|_| Error::LockPoisoned)?;
let _existing = name_guard.insert(name_str, ns_id);
drop(name_guard);
if ns_id as u64 >= self.next_namespace_id.load(Ordering::Acquire) {
self.next_namespace_id
.store(ns_id as u64 + 1, Ordering::Release);
}
}
}
Ok(())
}
fn key_at_offset(&self, offset: u64) -> Result<Option<Vec<u8>>> {
let mmap = self.store.mmap()?;
let bytes: &[u8] = &mmap;
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
return match format::try_decode_encrypted_record(
bytes,
offset as usize,
offset,
|nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
},
)? {
Some((OwnedRecord::Insert { key, .. }, _)) => Ok(Some(key)),
_ => Ok(None),
};
}
let decoded = format::try_decode_record(bytes, offset as usize, offset)?;
Ok(match decoded {
Some(d) => match d.view {
RecordView::Insert { key, .. } => Some(key.to_vec()),
_ => None,
},
None => None,
})
}
fn ensure_namespace_runtime(&self, ns_id: u32) -> Result<Arc<NamespaceRuntime>> {
{
let guard = self.namespaces.read().map_err(|_| Error::LockPoisoned)?;
if let Some(ns) = guard.get(&ns_id) {
return Ok(Arc::clone(ns));
}
}
let mut guard = self.namespaces.write().map_err(|_| Error::LockPoisoned)?;
let range_scans = self.range_scans_enabled;
let entry = guard
.entry(ns_id)
.or_insert_with(|| Arc::new(NamespaceRuntime::new(range_scans)));
if ns_id as u64 >= self.next_namespace_id.load(Ordering::Acquire) {
self.next_namespace_id
.store(ns_id as u64 + 1, Ordering::Release);
}
Ok(Arc::clone(entry))
}
fn namespace(&self, ns_id: u32) -> Result<Arc<NamespaceRuntime>> {
let guard = self.namespaces.read().map_err(|_| Error::LockPoisoned)?;
guard
.get(&ns_id)
.map(Arc::clone)
.ok_or(Error::InvalidConfig("unknown namespace id"))
}
pub(crate) fn insert(
&self,
ns_id: u32,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<()> {
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
let offset = self.append_insert(ns_id, key, value, expires_at)?;
let prev = ns
.index
.replace(key_hash, key, offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
let _ = range.insert(key.to_vec(), offset);
}
Ok(())
}
pub(crate) fn insert_many(
&self,
ns_id: u32,
items: impl IntoIterator<Item = (Vec<u8>, Vec<u8>, u64)>,
) -> Result<()> {
let ns = self.namespace(ns_id)?;
let items: Vec<(Vec<u8>, Vec<u8>, u64)> = items.into_iter().collect();
if items.is_empty() {
return Ok(());
}
#[cfg(feature = "encrypt")]
let encryption = self.encryption.clone();
#[cfg(not(feature = "encrypt"))]
let encryption: Option<()> = None;
let offsets = self.store.append_batch_with(|enc| {
let mut offs = Vec::with_capacity(items.len());
for (key, value, expires_at) in &items {
let off = {
#[cfg(feature = "encrypt")]
{
if let Some(ctx) = encryption.as_ref() {
let mut payload = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(
&mut payload,
ns_id,
key,
value,
*expires_at,
);
let nonce_then_ct = ctx.encrypt(&payload)?;
enc.push_record(|buf| {
buf.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
})?
} else {
enc.push_record(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, ns_id, key, value, *expires_at);
Ok(())
})?
}
}
#[cfg(not(feature = "encrypt"))]
{
let _ = encryption;
enc.push_record(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, ns_id, key, value, *expires_at);
Ok(())
})?
}
};
offs.push(off);
}
Ok(offs)
})?;
let mut range_guard = match ns.range_index.as_ref() {
Some(lock) => Some(lock.write().map_err(|_| Error::LockPoisoned)?),
None => None,
};
for ((key, _value, _exp), offset) in items.iter().zip(offsets.iter()) {
let key_hash = Index::hash_key(key);
let prev = ns
.index
.replace(key_hash, key, *offset, |off| self.key_at_offset(off))?;
if prev.is_none() {
let _ = ns.record_count.fetch_add(1, Ordering::AcqRel);
}
if let Some(range) = range_guard.as_deref_mut() {
let _ = range.insert(key.clone(), *offset);
}
}
Ok(())
}
fn append_insert(&self, ns_id: u32, key: &[u8], value: &[u8], expires_at: u64) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(&mut payload, ns_id, key, value, expires_at);
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_INSERT);
format::encode_insert_body(buf, ns_id, key, value, expires_at);
Ok(())
})
}
fn append_remove(&self, ns_id: u32, key: &[u8]) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(8 + key.len());
format::encode_remove_body(&mut payload, ns_id, key);
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_REMOVE | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_REMOVE);
format::encode_remove_body(buf, ns_id, key);
Ok(())
})
}
pub(crate) fn get(&self, ns_id: u32, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.get_with_meta(ns_id, key)?.map(|(v, _)| v))
}
pub(crate) fn get_with_meta(&self, ns_id: u32, key: &[u8]) -> Result<Option<(Vec<u8>, u64)>> {
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
let offset = match ns.index.get(key_hash, key)? {
Some(o) => o,
None => return Ok(None),
};
self.read_value_at(offset, key)
}
fn read_value_at(&self, offset: u64, expected_key: &[u8]) -> Result<Option<(Vec<u8>, u64)>> {
let mmap = self.store.mmap()?;
let bytes: &[u8] = &mmap;
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
let result = format::try_decode_encrypted_record(
bytes,
offset as usize,
offset,
|nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
},
)?;
return match result {
Some((
OwnedRecord::Insert {
key,
value,
expires_at,
..
},
_,
)) => {
if key.as_slice() == expected_key {
Ok(Some((value, expires_at)))
} else {
Ok(None)
}
}
_ => Ok(None),
};
}
let decoded = format::try_decode_record(bytes, offset as usize, offset)?;
match decoded {
Some(d) => match d.view {
RecordView::Insert {
key,
value,
expires_at,
..
} => {
if key == expected_key {
Ok(Some((value.to_vec(), expires_at)))
} else {
Ok(None)
}
}
_ => Ok(None),
},
None => Ok(None),
}
}
pub(crate) fn remove(&self, ns_id: u32, key: &[u8]) -> Result<Option<Vec<u8>>> {
let prev = self.get(ns_id, key)?;
if prev.is_some() {
let _offset = self.append_remove(ns_id, key)?;
let ns = self.namespace(ns_id)?;
let key_hash = Index::hash_key(key);
if ns.index.remove(key_hash, key)?.is_some() {
let _ = ns.record_count.fetch_sub(1, Ordering::AcqRel);
}
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
let _ = range.remove(key);
}
}
Ok(prev)
}
pub(crate) fn record_count(&self, ns_id: u32) -> Result<u64> {
let ns = self.namespace(ns_id)?;
Ok(ns.record_count.load(Ordering::Acquire))
}
pub(crate) fn flush(&self) -> Result<()> {
self.store.flush()
}
pub(crate) fn compact_in_place(&self) -> Result<()> {
let namespaces: Vec<(u32, String)> = self.list_namespaces()?;
let mut snapshots: Vec<(u32, String, Vec<RecordSnapshot>)> =
Vec::with_capacity(namespaces.len());
for (ns_id, name) in &namespaces {
let records = self.collect_records(*ns_id)?;
snapshots.push((*ns_id, name.clone(), records));
}
let path = self.store.path().to_path_buf();
let tmp_path = compaction_temp_path(&path);
let _ = std::fs::remove_file(&tmp_path);
let header = self.store.header()?;
if let Err(err) = self.write_compacted_file(&tmp_path, &header, &snapshots) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err);
}
if let Err(err) = self.store.swap_underlying(&tmp_path) {
let _ = std::fs::remove_file(&tmp_path);
return Err(err);
}
for (ns_id, _) in &namespaces {
let ns = self.namespace(*ns_id)?;
ns.index.clear()?;
ns.record_count.store(0, Ordering::Release);
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
range.clear();
}
}
self.recovery_scan()?;
Ok(())
}
fn write_compacted_file(
&self,
path: &std::path::Path,
header_template: &Header,
snapshots: &[(u32, String, Vec<RecordSnapshot>)],
) -> Result<()> {
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut header_buf = [0_u8; format::HEADER_LEN];
header_template.encode_into(&mut header_buf);
file.write_all(&header_buf)?;
let mut tail = format::HEADER_LEN as u64;
let mut frame_buf: Vec<u8> = Vec::with_capacity(512);
for (ns_id, name, _) in snapshots {
if *ns_id == DEFAULT_NAMESPACE_ID || name.is_empty() {
continue;
}
frame_buf.clear();
frame_buf.extend_from_slice(&[0_u8; 4]);
let body_start = frame_buf.len();
#[cfg(feature = "encrypt")]
{
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(8 + name.len());
format::encode_namespace_name_body(&mut payload, *ns_id, name.as_bytes());
let nonce_then_ct = ctx.encrypt(&payload)?;
frame_buf.push(format::TAG_NAMESPACE_NAME | format::TAG_ENCRYPTED_FLAG);
frame_buf.extend_from_slice(&nonce_then_ct);
} else {
frame_buf.push(format::TAG_NAMESPACE_NAME);
format::encode_namespace_name_body(&mut frame_buf, *ns_id, name.as_bytes());
}
}
#[cfg(not(feature = "encrypt"))]
{
frame_buf.push(format::TAG_NAMESPACE_NAME);
format::encode_namespace_name_body(&mut frame_buf, *ns_id, name.as_bytes());
}
let body_end = frame_buf.len();
let body_len = (body_end - body_start) as u32;
frame_buf[0..4].copy_from_slice(&body_len.to_le_bytes());
let crc = format::record_crc(&frame_buf[body_start..body_end]);
frame_buf.extend_from_slice(&crc.to_le_bytes());
file.write_all(&frame_buf)?;
tail += frame_buf.len() as u64;
}
for (ns_id, _, records) in snapshots {
for (key, value, expires_at) in records {
frame_buf.clear();
frame_buf.extend_from_slice(&[0_u8; 4]);
let body_start = frame_buf.len();
#[cfg(feature = "encrypt")]
{
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(20 + key.len() + value.len());
format::encode_insert_body(&mut payload, *ns_id, key, value, *expires_at);
let nonce_then_ct = ctx.encrypt(&payload)?;
frame_buf.push(format::TAG_INSERT | format::TAG_ENCRYPTED_FLAG);
frame_buf.extend_from_slice(&nonce_then_ct);
} else {
frame_buf.push(format::TAG_INSERT);
format::encode_insert_body(&mut frame_buf, *ns_id, key, value, *expires_at);
}
}
#[cfg(not(feature = "encrypt"))]
{
frame_buf.push(format::TAG_INSERT);
format::encode_insert_body(&mut frame_buf, *ns_id, key, value, *expires_at);
}
let body_end = frame_buf.len();
let body_len = (body_end - body_start) as u32;
frame_buf[0..4].copy_from_slice(&body_len.to_le_bytes());
let crc = format::record_crc(&frame_buf[body_start..body_end]);
frame_buf.extend_from_slice(&crc.to_le_bytes());
file.write_all(&frame_buf)?;
tail += frame_buf.len() as u64;
}
}
let mut final_header = *header_template;
final_header.tail_hint = tail;
let mut header_buf = [0_u8; format::HEADER_LEN];
final_header.encode_into(&mut header_buf);
let _seek = file.seek(SeekFrom::Start(0))?;
file.write_all(&header_buf)?;
file.set_len(tail)?;
file.sync_data()?;
Ok(())
}
pub(crate) fn path(&self) -> &std::path::Path {
self.store.path()
}
pub(crate) fn clear_namespace(&self, ns_id: u32) -> Result<()> {
let ns = self.namespace(ns_id)?;
ns.index.clear()?;
ns.record_count.store(0, Ordering::Release);
if let Some(range_lock) = ns.range_index.as_ref() {
let mut range = range_lock.write().map_err(|_| Error::LockPoisoned)?;
range.clear();
}
Ok(())
}
pub(crate) fn range_scan<R>(&self, ns_id: u32, range: R) -> Result<Vec<(Vec<u8>, Vec<u8>)>>
where
R: RangeBounds<Vec<u8>>,
{
let ns = self.namespace(ns_id)?;
let range_lock = ns.range_index.as_ref().ok_or(Error::InvalidConfig(
"range scans not enabled; pass `EmdbBuilder::enable_range_scans(true)` at open time",
))?;
let pairs: Vec<(Vec<u8>, u64)> = {
let guard = range_lock.read().map_err(|_| Error::LockPoisoned)?;
let start = match range.start_bound() {
Bound::Included(v) => Bound::Included(v.as_slice()),
Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
Bound::Unbounded => Bound::Unbounded,
};
let end = match range.end_bound() {
Bound::Included(v) => Bound::Included(v.as_slice()),
Bound::Excluded(v) => Bound::Excluded(v.as_slice()),
Bound::Unbounded => Bound::Unbounded,
};
guard
.range::<[u8], _>((start, end))
.map(|(k, off)| (k.clone(), *off))
.collect()
};
let mut out = Vec::with_capacity(pairs.len());
for (key, offset) in pairs {
if let Some((value, _expires)) = self.read_value_at(offset, &key)? {
out.push((key, value));
}
}
Ok(out)
}
pub(crate) fn collect_records(&self, ns_id: u32) -> Result<Vec<RecordSnapshot>> {
let ns = self.namespace(ns_id)?;
let mut offsets = ns.index.collect_offsets()?;
offsets.sort_unstable();
let mut out = Vec::with_capacity(offsets.len());
let mmap = self.store.mmap()?;
let bytes: &[u8] = &mmap;
for offset in offsets {
#[cfg(feature = "encrypt")]
let triple = if let Some(ctx) = self.encryption.as_ref() {
let ctx = Arc::clone(ctx);
match format::try_decode_encrypted_record(
bytes,
offset as usize,
offset,
|nonce, ct| {
let mut input = Vec::with_capacity(NONCE_LEN + ct.len());
input.extend_from_slice(nonce);
input.extend_from_slice(ct);
ctx.decrypt(&input)
},
)? {
Some((
OwnedRecord::Insert {
key,
value,
expires_at,
..
},
_,
)) => Some((key, value, expires_at)),
_ => None,
}
} else {
Self::decode_plaintext_into_triple(bytes, offset)?
};
#[cfg(not(feature = "encrypt"))]
let triple = Self::decode_plaintext_into_triple(bytes, offset)?;
if let Some(t) = triple {
out.push(t);
}
}
Ok(out)
}
fn decode_plaintext_into_triple(bytes: &[u8], offset: u64) -> Result<Option<RecordSnapshot>> {
let decoded = format::try_decode_record(bytes, offset as usize, offset)?;
Ok(match decoded {
Some(d) => match d.view {
RecordView::Insert {
key,
value,
expires_at,
..
} => Some((key.to_vec(), value.to_vec(), expires_at)),
_ => None,
},
None => None,
})
}
pub(crate) fn create_or_open_namespace(&self, name: &str) -> Result<u32> {
if name.is_empty() {
return Err(Error::InvalidConfig(
"namespace name must be non-empty (default namespace is implicit)",
));
}
{
let guard = self
.namespace_names
.read()
.map_err(|_| Error::LockPoisoned)?;
if let Some(id) = guard.get(name) {
return Ok(*id);
}
}
let mut name_guard = self
.namespace_names
.write()
.map_err(|_| Error::LockPoisoned)?;
if let Some(id) = name_guard.get(name) {
return Ok(*id);
}
let id = self.next_namespace_id.fetch_add(1, Ordering::AcqRel) as u32;
let _record_offset = self.append_namespace_name(id, name)?;
let _ = name_guard.insert(name.to_string(), id);
let mut runtimes = self.namespaces.write().map_err(|_| Error::LockPoisoned)?;
let _ = runtimes.insert(
id,
Arc::new(NamespaceRuntime::new(self.range_scans_enabled)),
);
Ok(id)
}
fn append_namespace_name(&self, ns_id: u32, name: &str) -> Result<u64> {
#[cfg(feature = "encrypt")]
if let Some(ctx) = self.encryption.as_ref() {
let mut payload = Vec::with_capacity(8 + name.len());
format::encode_namespace_name_body(&mut payload, ns_id, name.as_bytes());
let nonce_then_ct = ctx.encrypt(&payload)?;
return self.store.append_with(|buf| {
buf.push(format::TAG_NAMESPACE_NAME | format::TAG_ENCRYPTED_FLAG);
buf.extend_from_slice(&nonce_then_ct);
Ok(())
});
}
self.store.append_with(|buf| {
buf.push(format::TAG_NAMESPACE_NAME);
format::encode_namespace_name_body(buf, ns_id, name.as_bytes());
Ok(())
})
}
pub(crate) fn drop_namespace(&self, name: &str) -> Result<bool> {
if name.is_empty() {
return Err(Error::InvalidConfig("default namespace cannot be dropped"));
}
let mut name_guard = self
.namespace_names
.write()
.map_err(|_| Error::LockPoisoned)?;
let id = match name_guard.remove(name) {
Some(id) => id,
None => return Ok(false),
};
let mut runtimes = self.namespaces.write().map_err(|_| Error::LockPoisoned)?;
let _ = runtimes.remove(&id);
Ok(true)
}
pub(crate) fn list_namespaces(&self) -> Result<Vec<(u32, String)>> {
let guard = self
.namespace_names
.read()
.map_err(|_| Error::LockPoisoned)?;
let mut out: Vec<(u32, String)> = vec![(DEFAULT_NAMESPACE_ID, String::new())];
for (name, id) in guard.iter() {
out.push((*id, name.clone()));
}
out.sort_by_key(|(id, _)| *id);
Ok(out)
}
}
fn peek_header(path: &std::path::Path) -> Result<Option<Header>> {
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom};
match OpenOptions::new().read(true).open(path) {
Ok(mut file) => {
if file.metadata()?.len() < format::HEADER_LEN as u64 {
return Ok(None);
}
let mut buf = [0_u8; format::HEADER_LEN];
let _seek = file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut buf)?;
Ok(Some(Header::decode_from(&buf)?))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(Error::from(err)),
}
}
fn compaction_temp_path(path: &std::path::Path) -> std::path::PathBuf {
let mut out = path.to_path_buf();
let original_name = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("emdb");
out.set_file_name(format!("{original_name}.compact.tmp"));
out
}
#[allow(dead_code)]
fn _mmap_type_anchor() -> Option<Arc<Mmap>> {
None
}