use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use crate::bloom::Bloom;
use crate::compress::{compress_into, decompress_into, Compressed};
use crate::keymap::Keymap;
use crate::page_cache::PageCache;
use crate::storage::page::rid::Rid;
use crate::storage::page::slotted::{
self, free_space_of, inline_record_len, live_count_of, slot_count_of, InsertError, LeafPage,
RecordView,
};
use crate::storage::page::{Page, PageHeader, PageId, PageType};
use crate::storage::v4::catalog::{Catalog, CatalogEntry};
use crate::storage::v4::io::IoMode;
use crate::storage::v4::store::PageStore;
use crate::storage::v4::wal::{FlushPolicy, Wal};
use crate::value_cache::{CachedValue, ValueCache};
use crate::{Error, Result};
pub(crate) const DEFAULT_NAMESPACE_ID: u32 = 0;
pub(crate) type RecordSnapshot = (Vec<u8>, Vec<u8>, u64);
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub(crate) struct CompactStats {
pub(crate) leaves_compacted: u64,
pub(crate) pages_freed: u64,
pub(crate) namespaces_reclaimed: u64,
}
#[derive(Debug, Clone)]
pub(crate) enum BatchedOp {
Insert {
ns_id: u32,
key: Vec<u8>,
value: Vec<u8>,
expires_at: u64,
},
Remove { ns_id: u32, key: Vec<u8> },
}
#[derive(Debug, Clone)]
pub(crate) struct EngineConfig {
pub(crate) path: PathBuf,
pub(crate) flags: u32,
pub(crate) page_io_mode: IoMode,
pub(crate) wal_io_mode: IoMode,
pub(crate) flush_policy: FlushPolicy,
pub(crate) page_cache_pages: usize,
pub(crate) value_cache_bytes: usize,
pub(crate) bloom_initial_capacity: u64,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
path: PathBuf::new(),
flags: 0,
page_io_mode: IoMode::Buffered,
wal_io_mode: IoMode::Buffered,
flush_policy: FlushPolicy::Manual,
page_cache_pages: 2_048,
value_cache_bytes: 64 * 1024 * 1024,
bloom_initial_capacity: 1_024,
}
}
}
#[derive(Debug)]
pub(crate) struct NamespaceRuntime {
pub(crate) id: u32,
pub(crate) keymap: Keymap,
pub(crate) bloom: Option<Bloom>,
pub(crate) open_leaf: AtomicU64,
pub(crate) chain_head: AtomicU64,
pub(crate) record_count: AtomicU64,
}
impl NamespaceRuntime {
fn new(id: u32, bloom_capacity: u64) -> Self {
let bloom = if bloom_capacity == 0 {
None
} else {
Some(Bloom::for_keys(bloom_capacity))
};
Self {
id,
keymap: Keymap::new(),
bloom,
open_leaf: AtomicU64::new(0),
chain_head: AtomicU64::new(0),
record_count: AtomicU64::new(0),
}
}
fn from_catalog_entry(entry: &CatalogEntry, bloom_capacity: u64) -> Self {
let target_capacity = entry.record_count.max(bloom_capacity);
let bloom = if target_capacity == 0 {
None
} else {
Some(Bloom::for_keys(target_capacity))
};
Self {
id: entry.id,
keymap: Keymap::new(),
bloom,
open_leaf: AtomicU64::new(entry.leaf_head),
chain_head: AtomicU64::new(entry.leaf_head),
record_count: AtomicU64::new(entry.record_count),
}
}
}
#[derive(Debug)]
pub(crate) struct Engine {
config: EngineConfig,
page_store: Arc<PageStore>,
wal: Wal,
value_cache: Option<Arc<ValueCache>>,
catalog: Mutex<Catalog>,
namespaces: RwLock<HashMap<u32, Arc<NamespaceRuntime>>>,
last_tx_id: AtomicU64,
commit_lock: Mutex<()>,
}
impl Engine {
pub(crate) fn open(config: EngineConfig) -> Result<Self> {
let cache = if config.page_cache_pages == 0 {
Arc::new(PageCache::with_default_capacity())
} else {
Arc::new(PageCache::new(config.page_cache_pages))
};
let page_store = Arc::new(PageStore::open_with_mode(
config.path.clone(),
config.flags,
Arc::clone(&cache),
config.page_io_mode,
)?);
let wal_path = Wal::path_for(&config.path);
let wal = Wal::open_with_mode(wal_path, config.flush_policy, config.wal_io_mode)?;
let value_cache = if config.value_cache_bytes == 0 {
None
} else {
Some(Arc::new(ValueCache::new(config.value_cache_bytes)))
};
let header = page_store.header()?;
let catalog = Catalog::load(&page_store, PageId::new(header.namespace_root))?;
let mut runtimes: HashMap<u32, Arc<NamespaceRuntime>> = HashMap::new();
let mut saw_default = false;
for entry in catalog.live_entries() {
if entry.id == DEFAULT_NAMESPACE_ID {
saw_default = true;
}
let runtime = Arc::new(NamespaceRuntime::from_catalog_entry(
entry,
config.bloom_initial_capacity,
));
let _existing = runtimes.insert(entry.id, runtime);
}
if !saw_default {
let synthetic = CatalogEntry::new(DEFAULT_NAMESPACE_ID, "");
let runtime = Arc::new(NamespaceRuntime::from_catalog_entry(
&synthetic,
config.bloom_initial_capacity,
));
let _existing = runtimes.insert(DEFAULT_NAMESPACE_ID, runtime);
}
let last_tx_id = AtomicU64::new(header.last_tx_id);
let engine = Self {
config,
page_store,
wal,
value_cache,
catalog: Mutex::new(catalog),
namespaces: RwLock::new(runtimes),
last_tx_id,
commit_lock: Mutex::new(()),
};
let snapshot = engine.snapshot_namespaces()?;
for ns in &snapshot {
engine.rebuild_keymap_from_leaves(ns)?;
}
engine.replay_wal_after(header.last_persisted_wal_seq)?;
Ok(engine)
}
fn snapshot_namespaces(&self) -> Result<Vec<Arc<NamespaceRuntime>>> {
let guard = self
.namespaces
.read()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut out: Vec<Arc<NamespaceRuntime>> = guard.values().cloned().collect();
out.sort_by_key(|ns| ns.id);
Ok(out)
}
pub(crate) fn create_or_open_namespace(&self, name: &str) -> Result<(u32, bool)> {
if name.is_empty() {
return Err(Error::InvalidConfig(
"namespace name must be non-empty (the default namespace is implicit)",
));
}
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
if let Some(entry) = catalog.find_by_name(name) {
let id = entry.id;
let mut runtimes = self
.namespaces
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
let _runtime = runtimes.entry(id).or_insert_with(|| {
Arc::new(NamespaceRuntime::from_catalog_entry(
entry,
self.config.bloom_initial_capacity,
))
});
return Ok((id, false));
}
let id = catalog.create(name)?;
let entry = catalog
.find_by_id(id)
.ok_or(Error::Corrupted {
offset: 0,
reason: "freshly-created namespace missing from catalog",
})?
.clone();
let runtime = Arc::new(NamespaceRuntime::from_catalog_entry(
&entry,
self.config.bloom_initial_capacity,
));
let mut runtimes = self
.namespaces
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
let _existing = runtimes.insert(id, runtime);
Ok((id, true))
}
pub(crate) fn namespace_id_for(&self, name: &str) -> Result<Option<u32>> {
let catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
Ok(catalog.find_by_name(name).map(|e| e.id))
}
pub(crate) fn drop_namespace(&self, name: &str) -> Result<bool> {
if name.is_empty() {
return Err(Error::InvalidConfig("default namespace cannot be dropped"));
}
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let id = match catalog.find_by_name(name) {
Some(entry) => entry.id,
None => return Ok(false),
};
let was_live = catalog.tombstone(id)?;
drop(catalog);
if was_live {
let mut runtimes = self
.namespaces
.write()
.map_err(|_poisoned| Error::LockPoisoned)?;
let _removed = runtimes.remove(&id);
}
Ok(was_live)
}
pub(crate) fn list_namespaces(&self) -> Result<Vec<(u32, String)>> {
let catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut out: Vec<(u32, String)> = catalog
.live_entries()
.map(|e| (e.id, e.name.clone()))
.collect();
out.sort_by_key(|(id, _)| *id);
Ok(out)
}
fn namespace(&self, ns_id: u32) -> Result<Arc<NamespaceRuntime>> {
let guard = self
.namespaces
.read()
.map_err(|_poisoned| Error::LockPoisoned)?;
match guard.get(&ns_id) {
Some(ns) => Ok(Arc::clone(ns)),
None => Err(Error::InvalidConfig("unknown namespace id")),
}
}
pub(crate) fn insert(
&self,
ns_id: u32,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<()> {
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut wal_buf = Vec::with_capacity(64 + key.len() + value.len());
encode_insert_op(&mut wal_buf, ns_id, key, value, expires_at);
let _ticket = self.wal.append(&wal_buf)?;
self.apply_insert(ns_id, key, value, expires_at)
}
fn apply_insert(&self, ns_id: u32, key: &[u8], value: &[u8], expires_at: u64) -> Result<()> {
let ns = self.namespace(ns_id)?;
let hash = Keymap::hash_key(key);
let rid = self.write_record_into_chain(&ns, key, value, expires_at)?;
ns.keymap.replace_single(hash, rid)?;
if let Some(bloom) = &ns.bloom {
bloom.insert(hash);
}
if let Some(cache) = &self.value_cache {
let key_box: Box<[u8]> = key.to_vec().into_boxed_slice();
let value_arc: Arc<[u8]> = Arc::from(value.to_vec().into_boxed_slice());
cache.insert(ns_id, hash, key_box, value_arc, expires_at)?;
}
let _previous = ns.record_count.fetch_add(1, Ordering::AcqRel);
Ok(())
}
pub(crate) fn get(&self, ns_id: u32, key: &[u8]) -> Result<Option<CachedValue>> {
let ns = self.namespace(ns_id)?;
let hash = Keymap::hash_key(key);
if let Some(cache) = &self.value_cache {
if let Some(cached) = cache.get(ns_id, hash, key)? {
return Ok(Some(cached));
}
}
if let Some(bloom) = &ns.bloom {
if !bloom.contains(hash) {
return Ok(None);
}
}
let slot = match ns.keymap.lookup(hash)? {
Some(s) => s,
None => return Ok(None),
};
for rid in slot.iter().copied() {
let page = self.page_store.read_page(rid.page_id())?;
let view = match slotted::read_record_at(&page, rid.slot_id(), key)? {
Some(view) => view,
None => continue, };
let (value_arc, expires_at) = match view {
RecordView::Inline {
value, expires_at, ..
} => (
Arc::<[u8]>::from(value.to_vec().into_boxed_slice()),
expires_at,
),
RecordView::Overflow { .. } => {
return Err(Error::Corrupted {
offset: 0,
reason: "overflow record found before overflow write path landed",
});
}
};
if let Some(cache) = &self.value_cache {
let key_box: Box<[u8]> = key.to_vec().into_boxed_slice();
cache.insert(ns_id, hash, key_box, Arc::clone(&value_arc), expires_at)?;
}
return Ok(Some(CachedValue {
value: value_arc,
expires_at,
}));
}
Ok(None)
}
pub(crate) fn remove(&self, ns_id: u32, key: &[u8]) -> Result<bool> {
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut wal_buf = Vec::with_capacity(32 + key.len());
encode_remove_op(&mut wal_buf, ns_id, key);
let _ticket = self.wal.append(&wal_buf)?;
self.apply_remove(ns_id, key)
}
fn apply_remove(&self, ns_id: u32, key: &[u8]) -> Result<bool> {
let ns = self.namespace(ns_id)?;
let hash = Keymap::hash_key(key);
let slot = match ns.keymap.lookup(hash)? {
Some(s) => s,
None => return Ok(false),
};
let mut removed = false;
for rid in slot.iter().copied() {
let page_arc = self.page_store.read_page(rid.page_id())?;
let matches = slotted::read_record_at(&page_arc, rid.slot_id(), key)?.is_some();
if !matches {
continue;
}
let mut new_page: Page = (*page_arc).clone();
{
let mut leaf = LeafPage::open(&mut new_page)?;
let _was_live = leaf.tombstone(rid.slot_id())?;
}
let _crc = new_page.refresh_crc()?;
self.page_store
.write_page(rid.page_id(), Arc::new(new_page))?;
let _was_present = ns.keymap.remove(hash, rid)?;
if let Some(cache) = &self.value_cache {
let _ignored = cache.invalidate(ns_id, hash)?;
}
removed = true;
break;
}
if removed {
let _previous = ns.record_count.fetch_sub(1, Ordering::AcqRel);
}
Ok(removed)
}
pub(crate) fn compact(&self) -> Result<CompactStats> {
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let mut stats = CompactStats::default();
let live = self.snapshot_namespaces()?;
for ns in &live {
self.compact_namespace_chain(ns, &mut stats)?;
}
let to_reclaim: Vec<(u32, u64)> = {
let catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
catalog
.tombstoned_entries()
.map(|e| (e.id, e.leaf_head))
.collect()
};
for (id, leaf_head) in to_reclaim {
stats.namespaces_reclaimed += self.free_leaf_chain(leaf_head, &mut stats)?;
let mut catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let _was_present = catalog.remove_tombstoned(id)?;
}
drop(_commit_guard);
self.flush()?;
Ok(stats)
}
fn compact_namespace_chain(
&self,
ns: &Arc<NamespaceRuntime>,
stats: &mut CompactStats,
) -> Result<()> {
let mut prev: Option<u64> = None;
let mut current = ns.chain_head.load(Ordering::Acquire);
while current != 0 {
let page_arc = self.page_store.read_page(PageId::new(current))?;
let next = slotted::next_leaf_of(&page_arc).get();
let total_slots = slot_count_of(&page_arc);
let live_count = live_count_of(&page_arc);
if live_count == 0 && total_slots > 0 {
self.unlink_leaf_from_chain(ns, prev, current, next)?;
self.page_store.free_page(PageId::new(current))?;
stats.pages_freed += 1;
current = next;
continue;
}
if total_slots > live_count {
let mut new_page: Page = (*page_arc).clone();
let outcome = slotted::compact_leaf(&mut new_page)?;
let _crc = new_page.refresh_crc()?;
self.page_store
.write_page(PageId::new(current), Arc::new(new_page))?;
for (key, old_slot, new_slot) in outcome.remap {
if old_slot == new_slot {
continue;
}
let hash = Keymap::hash_key(&key);
let old_rid = Rid::new(current, old_slot);
let new_rid = Rid::new(current, new_slot);
let _was_present = ns.keymap.remove(hash, old_rid)?;
ns.keymap.insert(hash, new_rid)?;
}
stats.leaves_compacted += 1;
}
prev = Some(current);
current = next;
}
Ok(())
}
fn free_leaf_chain(&self, head: u64, stats: &mut CompactStats) -> Result<u64> {
if head == 0 {
return Ok(0);
}
let mut current = head;
while current != 0 {
let page_arc = self.page_store.read_page(PageId::new(current))?;
let next = slotted::next_leaf_of(&page_arc).get();
self.page_store.free_page(PageId::new(current))?;
stats.pages_freed += 1;
current = next;
}
Ok(1)
}
fn unlink_leaf_from_chain(
&self,
ns: &Arc<NamespaceRuntime>,
prev: Option<u64>,
removed_page: u64,
next: u64,
) -> Result<()> {
match prev {
Some(prev_id) => {
let prev_arc = self.page_store.read_page(PageId::new(prev_id))?;
let mut new_prev: Page = (*prev_arc).clone();
{
let mut leaf = LeafPage::open(&mut new_prev)?;
leaf.set_next_leaf(PageId::new(next));
}
let _crc = new_prev.refresh_crc()?;
self.page_store
.write_page(PageId::new(prev_id), Arc::new(new_prev))?;
}
None => {
ns.chain_head.store(next, Ordering::Release);
if ns.open_leaf.load(Ordering::Acquire) == removed_page {
ns.open_leaf.store(next, Ordering::Release);
}
}
}
Ok(())
}
pub(crate) fn commit_batch(&self, ops: &[BatchedOp]) -> Result<()> {
if ops.is_empty() {
return Ok(());
}
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
let tx_id = self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1;
self.page_store.set_last_tx_id(tx_id)?;
let mut wal_buf: Vec<u8> = Vec::with_capacity(32);
encode_batch_begin(&mut wal_buf, tx_id, ops.len() as u32);
let _begin_seq = self.wal.append(&wal_buf)?;
for op in ops {
wal_buf.clear();
match op {
BatchedOp::Insert {
ns_id,
key,
value,
expires_at,
} => encode_insert_op(&mut wal_buf, *ns_id, key, value, *expires_at),
BatchedOp::Remove { ns_id, key } => encode_remove_op(&mut wal_buf, *ns_id, key),
}
let _seq = self.wal.append(&wal_buf)?;
}
wal_buf.clear();
encode_batch_end(&mut wal_buf, tx_id);
let end_seq = self.wal.append(&wal_buf)?;
self.wal.wait_for_seq(end_seq)?;
for op in ops {
match op {
BatchedOp::Insert {
ns_id,
key,
value,
expires_at,
} => self.apply_insert(*ns_id, key, value, *expires_at)?,
BatchedOp::Remove { ns_id, key } => {
let _did = self.apply_remove(*ns_id, key)?;
}
}
}
Ok(())
}
pub(crate) fn flush(&self) -> Result<()> {
let _commit_guard = self
.commit_lock
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
self.wal.flush()?;
let floor = self.wal.next_seq();
{
let mut catalog = self
.catalog
.lock()
.map_err(|_poisoned| Error::LockPoisoned)?;
self.refresh_catalog(&mut catalog)?;
let root = catalog.save(&self.page_store)?;
self.page_store.set_namespace_root(root)?;
}
self.page_store.set_last_persisted_wal_seq(floor)?;
self.page_store.flush()
}
pub(crate) fn next_wal_seq(&self) -> u64 {
self.wal.next_seq()
}
#[cfg(test)]
pub(crate) fn wal_flush_for_test(&self) -> Result<()> {
self.wal.flush()
}
fn refresh_catalog(&self, catalog: &mut Catalog) -> Result<()> {
if catalog.find_by_id(DEFAULT_NAMESPACE_ID).is_none() {
return Err(Error::Corrupted {
offset: 0,
reason: "catalog missing default namespace",
});
}
let snapshot = self.snapshot_namespaces()?;
for ns in &snapshot {
if catalog.find_by_id(ns.id).is_none() {
continue;
}
catalog.set_leaf_head(ns.id, ns.chain_head.load(Ordering::Acquire));
catalog.set_record_count(ns.id, ns.record_count.load(Ordering::Acquire));
}
Ok(())
}
pub(crate) fn record_count(&self, ns_id: u32) -> Result<u64> {
let ns = self.namespace(ns_id)?;
Ok(ns.record_count.load(Ordering::Acquire))
}
pub(crate) fn collect_records(&self, ns_id: u32) -> Result<Vec<RecordSnapshot>> {
let ns = self.namespace(ns_id)?;
let mut out: Vec<RecordSnapshot> = Vec::new();
let mut current = ns.chain_head.load(Ordering::Acquire);
let mut steps = 0_u64;
let max_steps = self.page_store.page_count()?.saturating_mul(2);
while current != 0 {
if steps >= max_steps {
return Err(Error::Corrupted {
offset: 0,
reason: "leaf chain longer than page count",
});
}
steps = steps.saturating_add(1);
let page = self.page_store.read_page(PageId::new(current))?;
let slot_count = slot_count_of(&page);
for slot_id in 0..slot_count {
let view = match slotted::read_record_at_unchecked(&page, slot_id as u16)? {
Some(view) => view,
None => continue,
};
match view {
RecordView::Inline {
key,
value,
expires_at,
} => out.push((key.to_vec(), value.to_vec(), expires_at)),
RecordView::Overflow { .. } => {
return Err(Error::Corrupted {
offset: 0,
reason: "overflow record encountered before overflow read path",
});
}
}
}
current = slotted::next_leaf_of(&page).get();
}
Ok(out)
}
pub(crate) fn clear_namespace(&self, ns_id: u32) -> Result<()> {
let ns = self.namespace(ns_id)?;
ns.keymap.clear()?;
if let Some(bloom) = &ns.bloom {
bloom.clear();
}
ns.chain_head.store(0, Ordering::Release);
ns.open_leaf.store(0, Ordering::Release);
ns.record_count.store(0, Ordering::Release);
if let Some(cache) = &self.value_cache {
cache.clear()?;
}
Ok(())
}
#[must_use]
pub(crate) fn path(&self) -> &std::path::Path {
self.page_store.path()
}
fn write_record_into_chain(
&self,
ns: &Arc<NamespaceRuntime>,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<Rid> {
let record_len = inline_record_len(key.len(), value.len()).ok_or(Error::InvalidConfig(
"record larger than the maximum representable size",
))?;
let need_bytes = SLOT_ENTRY_LEN_GUESS + record_len;
let open_leaf = ns.open_leaf.load(Ordering::Acquire);
if open_leaf != 0 {
match self.try_insert_into(open_leaf, key, value, expires_at, need_bytes) {
Ok(rid) => return Ok(rid),
Err(Error::TransactionAborted("leaf full")) => {
}
Err(other) => return Err(other),
}
}
let new_leaf_id = self.page_store.allocate_page()?;
let mut new_page = Page::new(PageHeader::new(PageType::LeafSlotted));
let new_leaf_rid = {
let mut leaf = LeafPage::init(&mut new_page);
let prev_head = ns.chain_head.swap(new_leaf_id.get(), Ordering::AcqRel);
leaf.set_next_leaf(PageId::new(prev_head));
let slot_id = match leaf.insert_inline(key, value, expires_at) {
Ok(slot) => slot,
Err(InsertError::OutOfSpace) => {
return Err(Error::InvalidConfig(
"fresh leaf out of space immediately — record larger than a single page",
));
}
Err(InsertError::KeyTooLarge) => {
return Err(Error::InvalidConfig(
"key + value too large to fit in a single page",
));
}
};
Rid::new(new_leaf_id.get(), slot_id)
};
let _crc = new_page.refresh_crc()?;
self.page_store
.write_page(new_leaf_id, Arc::new(new_page))?;
ns.open_leaf.store(new_leaf_id.get(), Ordering::Release);
Ok(new_leaf_rid)
}
fn rebuild_keymap_from_leaves(&self, ns: &Arc<NamespaceRuntime>) -> Result<()> {
let mut current = ns.chain_head.load(Ordering::Acquire);
if current == 0 {
return Ok(());
}
let mut live_count = 0_u64;
let max_steps = self.page_store.page_count()?.saturating_mul(2);
let mut steps = 0_u64;
while current != 0 {
if steps >= max_steps {
return Err(Error::Corrupted {
offset: 0,
reason: "leaf chain longer than page count; suspected cycle",
});
}
steps = steps.saturating_add(1);
let page = self.page_store.read_page(PageId::new(current))?;
let slot_count = slot_count_of(&page);
for slot_id in 0..slot_count {
let view = match slotted::read_record_at_unchecked(&page, slot_id as u16)? {
Some(view) => view,
None => continue, };
let hash = Keymap::hash_key(view.key());
let rid = Rid::new(current, slot_id as u16);
ns.keymap.insert(hash, rid)?;
if let Some(bloom) = &ns.bloom {
bloom.insert(hash);
}
live_count = live_count.saturating_add(1);
}
current = slotted::next_leaf_of(&page).get();
}
ns.record_count.store(live_count, Ordering::Release);
Ok(())
}
fn replay_wal_after(&self, start_seq: u64) -> Result<()> {
let mut buf: Vec<u8> = Vec::new();
self.wal.read_all(&mut buf)?;
if buf.is_empty() {
return Ok(());
}
let mut cursor = 0_usize;
let mut current_seq = 0_u64;
let mut pending_batch: Option<PendingBatch> = None;
while cursor < buf.len() {
let mut staged_op: Option<OwnedBatchedOp> = None;
let mut staged_marker: Option<BatchMarker> = None;
let record_end = decode_op_into(&buf, &mut cursor, current_seq, |op| {
match op {
ReplayOp::Insert {
ns_id,
key,
value,
expires_at,
} => {
staged_op = Some(OwnedBatchedOp::Insert {
ns_id,
key: key.to_vec(),
value: value.to_vec(),
expires_at,
});
}
ReplayOp::Remove { ns_id, key } => {
staged_op = Some(OwnedBatchedOp::Remove {
ns_id,
key: key.to_vec(),
});
}
ReplayOp::BatchBegin { tx_id, op_count } => {
staged_marker = Some(BatchMarker::Begin { tx_id, op_count });
}
ReplayOp::BatchEnd { tx_id } => {
staged_marker = Some(BatchMarker::End { tx_id });
}
}
Ok(())
})?;
cursor = record_end;
let skip_this = current_seq < start_seq;
current_seq = current_seq.saturating_add(1);
match staged_marker {
Some(BatchMarker::Begin { tx_id, op_count }) => {
if pending_batch.is_some() {
return Err(Error::Corrupted {
offset: cursor as u64,
reason: "wal batch_begin nested inside another batch",
});
}
pending_batch = Some(PendingBatch {
tx_id,
skip: skip_this,
ops: Vec::with_capacity(op_count as usize),
});
}
Some(BatchMarker::End { tx_id }) => {
let Some(batch) = pending_batch.take() else {
return Err(Error::Corrupted {
offset: cursor as u64,
reason: "wal batch_end without matching batch_begin",
});
};
if batch.tx_id != tx_id {
return Err(Error::Corrupted {
offset: cursor as u64,
reason: "wal batch_end tx_id mismatches batch_begin",
});
}
if !batch.skip {
for owned in batch.ops {
self.apply_replayed_owned(&owned)?;
}
}
}
None => {
if let Some(op) = staged_op {
if let Some(batch) = pending_batch.as_mut() {
batch.ops.push(op);
} else if !skip_this {
self.apply_replayed_owned(&op)?;
}
}
}
}
}
let _abandoned = pending_batch.take();
Ok(())
}
fn apply_replayed_owned(&self, op: &OwnedBatchedOp) -> Result<()> {
match op {
OwnedBatchedOp::Insert {
ns_id,
key,
value,
expires_at,
} => self.apply_replayed_insert(*ns_id, key, value, *expires_at),
OwnedBatchedOp::Remove { ns_id, key } => self.apply_replayed_remove(*ns_id, key),
}
}
fn apply_replayed_insert(
&self,
ns_id: u32,
key: &[u8],
value: &[u8],
expires_at: u64,
) -> Result<()> {
let ns = self.namespace(ns_id)?;
let hash = Keymap::hash_key(key);
let rid = self.write_record_into_chain(&ns, key, value, expires_at)?;
ns.keymap.replace_single(hash, rid)?;
if let Some(bloom) = &ns.bloom {
bloom.insert(hash);
}
let _previous = ns.record_count.fetch_add(1, Ordering::AcqRel);
Ok(())
}
fn apply_replayed_remove(&self, ns_id: u32, key: &[u8]) -> Result<()> {
let ns = self.namespace(ns_id)?;
let hash = Keymap::hash_key(key);
if let Some(slot) = ns.keymap.lookup(hash)? {
for rid in slot.iter().copied() {
let page = self.page_store.read_page(rid.page_id())?;
let matches = slotted::read_record_at(&page, rid.slot_id(), key)?.is_some();
if !matches {
continue;
}
let mut new_page: Page = (*page).clone();
{
let mut leaf = LeafPage::open(&mut new_page)?;
let _was_live = leaf.tombstone(rid.slot_id())?;
}
let _crc = new_page.refresh_crc()?;
self.page_store
.write_page(rid.page_id(), Arc::new(new_page))?;
let _was_present = ns.keymap.remove(hash, rid)?;
let _previous = ns.record_count.fetch_sub(1, Ordering::AcqRel);
break;
}
}
Ok(())
}
fn try_insert_into(
&self,
page_id_raw: u64,
key: &[u8],
value: &[u8],
expires_at: u64,
need_bytes: usize,
) -> Result<Rid> {
let page_id = PageId::new(page_id_raw);
let arc = self.page_store.read_page(page_id)?;
if (free_space_of(&arc) as usize) < need_bytes && slot_count_of(&arc) > 0 {
let _ = need_bytes;
}
let mut new_page: Page = (*arc).clone();
let slot_id = {
let mut leaf = LeafPage::open(&mut new_page)?;
match leaf.insert_inline(key, value, expires_at) {
Ok(slot) => slot,
Err(InsertError::OutOfSpace) => {
return Err(Error::TransactionAborted("leaf full"));
}
Err(InsertError::KeyTooLarge) => {
return Err(Error::InvalidConfig(
"key + value too large to fit in a single page",
));
}
}
};
let _crc = new_page.refresh_crc()?;
self.page_store.write_page(page_id, Arc::new(new_page))?;
Ok(Rid::new(page_id_raw, slot_id))
}
}
const SLOT_ENTRY_LEN_GUESS: usize = 8;
const WAL_TAG_INSERT: u8 = 0;
const WAL_TAG_REMOVE: u8 = 1;
const WAL_TAG_BATCH_BEGIN: u8 = 2;
const WAL_TAG_BATCH_END: u8 = 3;
const WAL_FLAG_COMPRESSED: u8 = 0x80;
const WAL_TAG_MASK: u8 = 0x7F;
enum ReplayOp<'a> {
Insert {
ns_id: u32,
key: &'a [u8],
value: &'a [u8],
expires_at: u64,
},
Remove {
ns_id: u32,
key: &'a [u8],
},
BatchBegin {
tx_id: u64,
op_count: u32,
},
BatchEnd {
tx_id: u64,
},
}
impl<'a> ReplayOp<'a> {
fn namespace_id(&self) -> u32 {
match self {
Self::Insert { ns_id, .. } | Self::Remove { ns_id, .. } => *ns_id,
Self::BatchBegin { .. } | Self::BatchEnd { .. } => DEFAULT_NAMESPACE_ID,
}
}
}
#[derive(Debug, Clone)]
enum OwnedBatchedOp {
Insert {
ns_id: u32,
key: Vec<u8>,
value: Vec<u8>,
expires_at: u64,
},
Remove {
ns_id: u32,
key: Vec<u8>,
},
}
enum BatchMarker {
Begin { tx_id: u64, op_count: u32 },
End { tx_id: u64 },
}
struct PendingBatch {
tx_id: u64,
skip: bool,
ops: Vec<OwnedBatchedOp>,
}
fn encode_insert_op(buf: &mut Vec<u8>, ns_id: u32, key: &[u8], value: &[u8], expires_at: u64) {
let encoded_value = compress_into(value);
let (tag, value_bytes, original_len) = match encoded_value {
Compressed::Passthrough { bytes } => (WAL_TAG_INSERT, bytes, None),
Compressed::Encoded {
ref bytes,
original_len,
} => (
WAL_TAG_INSERT | WAL_FLAG_COMPRESSED,
bytes.as_slice(),
Some(original_len),
),
};
buf.push(tag);
buf.extend_from_slice(&ns_id.to_le_bytes());
buf.extend_from_slice(&(key.len() as u32).to_le_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
if let Some(orig) = original_len {
buf.extend_from_slice(&orig.to_le_bytes());
}
buf.extend_from_slice(value_bytes);
buf.extend_from_slice(&expires_at.to_le_bytes());
}
fn encode_remove_op(buf: &mut Vec<u8>, ns_id: u32, key: &[u8]) {
buf.push(WAL_TAG_REMOVE);
buf.extend_from_slice(&ns_id.to_le_bytes());
buf.extend_from_slice(&(key.len() as u32).to_le_bytes());
buf.extend_from_slice(key);
}
fn encode_batch_begin(buf: &mut Vec<u8>, tx_id: u64, op_count: u32) {
buf.push(WAL_TAG_BATCH_BEGIN);
buf.extend_from_slice(&tx_id.to_le_bytes());
buf.extend_from_slice(&op_count.to_le_bytes());
}
fn encode_batch_end(buf: &mut Vec<u8>, tx_id: u64) {
buf.push(WAL_TAG_BATCH_END);
buf.extend_from_slice(&tx_id.to_le_bytes());
}
fn decode_op_into<F>(
buf: &[u8],
cursor: &mut usize,
seq_for_callback: u64,
mut apply: F,
) -> Result<usize>
where
F: FnMut(ReplayOp<'_>) -> Result<()>,
{
let _ = seq_for_callback;
let start = *cursor;
if start >= buf.len() {
return Err(Error::Corrupted {
offset: start as u64,
reason: "wal record truncated at start",
});
}
let tag_byte = buf[start];
let tag = tag_byte & WAL_TAG_MASK;
let compressed = tag_byte & WAL_FLAG_COMPRESSED != 0;
let mut pos = start + 1;
match tag {
WAL_TAG_INSERT => {
let (ns_id, after_ns) = read_u32(buf, pos)?;
pos = after_ns;
let (key, after_key) = read_lp(buf, pos)?;
pos = after_key;
let (value_bytes, after_value, original_len) = if compressed {
let (stored_len, after_stored) = read_u32(buf, pos)?;
pos = after_stored;
let (orig_len, after_orig) = read_u32(buf, pos)?;
pos = after_orig;
let body_end = pos + stored_len as usize;
if body_end > buf.len() {
return Err(Error::Corrupted {
offset: pos as u64,
reason: "wal compressed insert truncated in body",
});
}
let body = &buf[pos..body_end];
(body, body_end, Some(orig_len))
} else {
let (body, end) = read_lp(buf, pos)?;
(body, end, None)
};
pos = after_value;
if pos + 8 > buf.len() {
return Err(Error::Corrupted {
offset: pos as u64,
reason: "wal insert record truncated before expires_at",
});
}
let mut expires_buf = [0_u8; 8];
expires_buf.copy_from_slice(&buf[pos..pos + 8]);
let expires_at = u64::from_le_bytes(expires_buf);
pos += 8;
let mut decoded_scratch: Vec<u8> = Vec::new();
let value_slice: &[u8] = if compressed {
let orig = original_len.unwrap_or(0);
decompress_into(value_bytes, true, orig, &mut decoded_scratch)?;
decoded_scratch.as_slice()
} else {
value_bytes
};
apply(ReplayOp::Insert {
ns_id,
key,
value: value_slice,
expires_at,
})?;
Ok(pos)
}
WAL_TAG_REMOVE => {
if compressed {
return Err(Error::Corrupted {
offset: start as u64,
reason: "remove record cannot carry the compressed flag",
});
}
let (ns_id, after_ns) = read_u32(buf, pos)?;
pos = after_ns;
let (key, after_key) = read_lp(buf, pos)?;
pos = after_key;
apply(ReplayOp::Remove { ns_id, key })?;
Ok(pos)
}
WAL_TAG_BATCH_BEGIN => {
if compressed {
return Err(Error::Corrupted {
offset: start as u64,
reason: "batch begin cannot carry the compressed flag",
});
}
if pos + 12 > buf.len() {
return Err(Error::Corrupted {
offset: pos as u64,
reason: "wal batch_begin record truncated",
});
}
let mut tx_buf = [0_u8; 8];
tx_buf.copy_from_slice(&buf[pos..pos + 8]);
let tx_id = u64::from_le_bytes(tx_buf);
pos += 8;
let mut count_buf = [0_u8; 4];
count_buf.copy_from_slice(&buf[pos..pos + 4]);
let op_count = u32::from_le_bytes(count_buf);
pos += 4;
apply(ReplayOp::BatchBegin { tx_id, op_count })?;
Ok(pos)
}
WAL_TAG_BATCH_END => {
if compressed {
return Err(Error::Corrupted {
offset: start as u64,
reason: "batch end cannot carry the compressed flag",
});
}
if pos + 8 > buf.len() {
return Err(Error::Corrupted {
offset: pos as u64,
reason: "wal batch_end record truncated",
});
}
let mut tx_buf = [0_u8; 8];
tx_buf.copy_from_slice(&buf[pos..pos + 8]);
let tx_id = u64::from_le_bytes(tx_buf);
pos += 8;
apply(ReplayOp::BatchEnd { tx_id })?;
Ok(pos)
}
_other => Err(Error::Corrupted {
offset: start as u64,
reason: "unknown wal record tag",
}),
}
}
fn read_u32(buf: &[u8], pos: usize) -> Result<(u32, usize)> {
if pos + 4 > buf.len() {
return Err(Error::Corrupted {
offset: pos as u64,
reason: "wal record truncated mid-u32",
});
}
let mut bytes = [0_u8; 4];
bytes.copy_from_slice(&buf[pos..pos + 4]);
Ok((u32::from_le_bytes(bytes), pos + 4))
}
fn read_lp(buf: &[u8], pos: usize) -> Result<(&[u8], usize)> {
let (len, after_len) = read_u32(buf, pos)?;
let len = len as usize;
let end = after_len + len;
if end > buf.len() {
return Err(Error::Corrupted {
offset: after_len as u64,
reason: "wal record truncated in length-prefixed body",
});
}
Ok((&buf[after_len..end], end))
}
#[cfg(test)]
mod tests {
use super::{Engine, EngineConfig, DEFAULT_NAMESPACE_ID};
use crate::storage::v4::wal::FlushPolicy;
fn tmp_path(name: &str) -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
p.push(format!("emdb-v4-engine-{name}-{nanos}.emdb"));
p
}
fn config_for(name: &str) -> EngineConfig {
EngineConfig {
path: tmp_path(name),
flush_policy: FlushPolicy::Manual,
..EngineConfig::default()
}
}
fn cleanup(path: &std::path::Path) {
let _removed = std::fs::remove_file(path);
if let Some(parent) = path.parent() {
if let Some(file_name) = path.file_name() {
if let Some(name_str) = file_name.to_str() {
let wal = parent.join(format!("{name_str}.v4.wal"));
let _removed = std::fs::remove_file(wal);
}
}
}
}
#[test]
fn open_creates_database_files() {
let cfg = config_for("open");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 0);
drop(engine);
cleanup(&path);
}
#[test]
fn insert_then_get_round_trips() {
let cfg = config_for("insert-get");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let inserted = engine.insert(DEFAULT_NAMESPACE_ID, b"alpha", b"one", 0);
assert!(inserted.is_ok(), "insert: {:?}", inserted);
let fetched = engine.get(DEFAULT_NAMESPACE_ID, b"alpha");
let cached = match fetched {
Ok(Some(v)) => v,
Ok(None) => panic!("just-inserted key should be present"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(cached.value.as_ref(), b"one");
assert_eq!(cached.expires_at, 0);
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 1);
drop(engine);
cleanup(&path);
}
#[test]
fn missing_key_returns_none_and_skips_disk_via_bloom() {
let cfg = config_for("missing");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let fetched = engine.get(DEFAULT_NAMESPACE_ID, b"nope");
assert!(matches!(fetched, Ok(None)));
drop(engine);
cleanup(&path);
}
#[test]
fn remove_marks_key_unreadable() {
let cfg = config_for("remove");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
assert!(engine.insert(DEFAULT_NAMESPACE_ID, b"k", b"v", 0).is_ok());
let removed = engine.remove(DEFAULT_NAMESPACE_ID, b"k");
assert!(matches!(removed, Ok(true)));
let fetched = engine.get(DEFAULT_NAMESPACE_ID, b"k");
assert!(matches!(fetched, Ok(None)));
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 0);
drop(engine);
cleanup(&path);
}
#[test]
fn remove_unknown_key_reports_false() {
let cfg = config_for("remove-unknown");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let removed = engine.remove(DEFAULT_NAMESPACE_ID, b"nope");
assert!(matches!(removed, Ok(false)));
drop(engine);
cleanup(&path);
}
#[test]
fn many_inserts_succeed_and_remain_readable() {
let cfg = config_for("many");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
for i in 0_u32..256 {
let key = format!("k{i:04}");
let value = format!("v{i:04}");
let inserted = engine.insert(DEFAULT_NAMESPACE_ID, key.as_bytes(), value.as_bytes(), 0);
assert!(inserted.is_ok(), "insert #{i}: {:?}", inserted);
}
for i in 0_u32..256 {
let key = format!("k{i:04}");
let fetched = engine.get(DEFAULT_NAMESPACE_ID, key.as_bytes());
let cached = match fetched {
Ok(Some(v)) => v,
Ok(None) => panic!("key {key} missing after insert"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(cached.value.as_ref(), format!("v{i:04}").as_bytes());
}
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 256);
drop(engine);
cleanup(&path);
}
#[test]
fn flush_persists_records_through_reopen() {
let cfg = config_for("persist");
let path = cfg.path.clone();
{
let engine = match Engine::open(cfg.clone()) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
assert!(engine.insert(DEFAULT_NAMESPACE_ID, b"k", b"v", 0).is_ok());
assert!(engine.flush().is_ok());
}
let reopened = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("reopen should succeed: {err}"),
};
let fetched = reopened.get(DEFAULT_NAMESPACE_ID, b"k");
let cached = match fetched {
Ok(Some(v)) => v,
Ok(None) => panic!("key should be present after reopen"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(cached.value.as_ref(), b"v");
drop(reopened);
cleanup(&path);
}
#[test]
fn replay_recovers_records_when_flush_was_called() {
let cfg = config_for("replay-flushed");
let path = cfg.path.clone();
{
let engine = match Engine::open(cfg.clone()) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
for i in 0_u32..32 {
let key = format!("k{i:02}");
let value = format!("v{i:02}");
let _ = engine.insert(DEFAULT_NAMESPACE_ID, key.as_bytes(), value.as_bytes(), 0);
}
assert!(engine.flush().is_ok());
}
let reopened = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("reopen should succeed: {err}"),
};
for i in 0_u32..32 {
let key = format!("k{i:02}");
let fetched = reopened.get(DEFAULT_NAMESPACE_ID, key.as_bytes());
match fetched {
Ok(Some(v)) => assert_eq!(v.value.as_ref(), format!("v{i:02}").as_bytes()),
Ok(None) => panic!("key {key} missing after reopen"),
Err(err) => panic!("get should succeed: {err}"),
}
}
let count = match reopened.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 32);
drop(reopened);
cleanup(&path);
}
#[test]
fn replay_recovers_records_from_wal_without_flush() {
let cfg = config_for("replay-wal");
let path = cfg.path.clone();
{
let engine = match Engine::open(cfg.clone()) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
for i in 0_u32..16 {
let key = format!("k{i:02}");
let value = format!("v{i:02}");
let _ = engine.insert(DEFAULT_NAMESPACE_ID, key.as_bytes(), value.as_bytes(), 0);
}
assert!(engine.wal_flush_for_test().is_ok());
}
let reopened = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("reopen should succeed: {err}"),
};
for i in 0_u32..16 {
let key = format!("k{i:02}");
let fetched = reopened.get(DEFAULT_NAMESPACE_ID, key.as_bytes());
match fetched {
Ok(Some(v)) => assert_eq!(v.value.as_ref(), format!("v{i:02}").as_bytes()),
Ok(None) => panic!("key {key} missing after wal-only replay"),
Err(err) => panic!("get should succeed: {err}"),
}
}
let count = match reopened.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count should succeed: {err}"),
};
assert_eq!(count, 16);
drop(reopened);
cleanup(&path);
}
#[test]
fn insert_replaces_existing_key() {
let cfg = config_for("replace");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let _ = engine.insert(DEFAULT_NAMESPACE_ID, b"k", b"first", 0);
let _ = engine.insert(DEFAULT_NAMESPACE_ID, b"k", b"second", 0);
let fetched = match engine.get(DEFAULT_NAMESPACE_ID, b"k") {
Ok(Some(v)) => v,
Ok(None) => panic!("key should be present"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(fetched.value.as_ref(), b"second");
drop(engine);
cleanup(&path);
}
#[test]
fn large_value_round_trips_through_wal_replay() {
let cfg = config_for("large-value");
let path = cfg.path.clone();
let huge: Vec<u8> = (0..2048_u32).map(|i| (i % 17) as u8).collect();
{
let engine = match Engine::open(cfg.clone()) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
assert!(engine
.insert(DEFAULT_NAMESPACE_ID, b"big", &huge, 0)
.is_ok());
assert!(engine.wal_flush_for_test().is_ok());
}
let reopened = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("reopen should succeed: {err}"),
};
let fetched = reopened.get(DEFAULT_NAMESPACE_ID, b"big");
let cached = match fetched {
Ok(Some(v)) => v,
Ok(None) => panic!("large value missing after replay"),
Err(err) => panic!("get should succeed: {err}"),
};
assert_eq!(cached.value.as_ref(), huge.as_slice());
drop(reopened);
cleanup(&path);
}
#[test]
fn unknown_namespace_returns_invalid_config() {
let cfg = config_for("bad-ns");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let inserted = engine.insert(99, b"k", b"v", 0);
assert!(matches!(inserted, Err(crate::Error::InvalidConfig(_))));
drop(engine);
cleanup(&path);
}
#[test]
fn commit_batch_applies_every_op_atomically() {
use super::BatchedOp;
let cfg = config_for("commit-batch");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let ops = vec![
BatchedOp::Insert {
ns_id: DEFAULT_NAMESPACE_ID,
key: b"a".to_vec(),
value: b"1".to_vec(),
expires_at: 0,
},
BatchedOp::Insert {
ns_id: DEFAULT_NAMESPACE_ID,
key: b"b".to_vec(),
value: b"2".to_vec(),
expires_at: 0,
},
BatchedOp::Insert {
ns_id: DEFAULT_NAMESPACE_ID,
key: b"c".to_vec(),
value: b"3".to_vec(),
expires_at: 0,
},
];
match engine.commit_batch(&ops) {
Ok(()) => {}
Err(err) => panic!("commit_batch should succeed: {err}"),
}
for (k, want) in [(b"a", b"1"), (b"b", b"2"), (b"c", b"3")] {
let got = match engine.get(DEFAULT_NAMESPACE_ID, k) {
Ok(g) => g,
Err(err) => panic!("get should succeed: {err}"),
};
let cv = match got {
Some(v) => v,
None => panic!("expected key {:?} to be visible", k),
};
assert_eq!(cv.value.as_ref(), want.as_slice());
}
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count: {err}"),
};
assert_eq!(count, 3);
drop(engine);
cleanup(&path);
}
#[test]
fn partial_batch_in_wal_is_discarded_on_replay() {
use std::io::Write;
use super::{encode_batch_begin, encode_insert_op, BatchedOp, DEFAULT_NAMESPACE_ID};
let cfg = config_for("partial-batch");
let path = cfg.path.clone();
let wal_path = crate::storage::v4::wal::Wal::path_for(&path);
{
let engine = match Engine::open(cfg.clone()) {
Ok(e) => e,
Err(err) => panic!("open should succeed: {err}"),
};
let committed = vec![BatchedOp::Insert {
ns_id: DEFAULT_NAMESPACE_ID,
key: b"committed".to_vec(),
value: b"value".to_vec(),
expires_at: 0,
}];
match engine.commit_batch(&committed) {
Ok(()) => {}
Err(err) => panic!("commit_batch: {err}"),
}
}
{
let mut wal_file = match std::fs::OpenOptions::new()
.read(true)
.append(true)
.open(&wal_path)
{
Ok(f) => f,
Err(err) => panic!("open wal for append: {err}"),
};
let mut buf: Vec<u8> = Vec::new();
encode_batch_begin(&mut buf, 9999, 1);
encode_insert_op(
&mut buf,
DEFAULT_NAMESPACE_ID,
b"orphan",
b"should-not-survive",
0,
);
match wal_file.write_all(&buf) {
Ok(()) => {}
Err(err) => panic!("wal write_all: {err}"),
}
match wal_file.sync_data() {
Ok(()) => {}
Err(err) => panic!("wal sync_data: {err}"),
}
}
{
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("reopen should succeed: {err}"),
};
let committed = match engine.get(DEFAULT_NAMESPACE_ID, b"committed") {
Ok(g) => g,
Err(err) => panic!("get(committed): {err}"),
};
assert!(committed.is_some(), "fully-committed batch must survive");
let orphan = match engine.get(DEFAULT_NAMESPACE_ID, b"orphan") {
Ok(g) => g,
Err(err) => panic!("get(orphan): {err}"),
};
assert!(
orphan.is_none(),
"partial batch with no BatchEnd must be discarded"
);
let count = match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(c) => c,
Err(err) => panic!("record_count: {err}"),
};
assert_eq!(
count, 1,
"only the committed batch contributes to the count"
);
drop(engine);
}
cleanup(&path);
}
#[test]
fn empty_commit_batch_is_a_noop() {
use super::BatchedOp;
let cfg = config_for("empty-batch");
let path = cfg.path.clone();
let engine = match Engine::open(cfg) {
Ok(e) => e,
Err(err) => panic!("open: {err}"),
};
let empty: Vec<BatchedOp> = Vec::new();
match engine.commit_batch(&empty) {
Ok(()) => {}
Err(err) => panic!("empty commit_batch should succeed: {err}"),
}
match engine.record_count(DEFAULT_NAMESPACE_ID) {
Ok(0) => {}
Ok(other) => panic!("expected 0 records, got {other}"),
Err(err) => panic!("record_count: {err}"),
}
drop(engine);
cleanup(&path);
}
}